1 Commits

Author SHA1 Message Date
7ce91f42b9 Merge 0180711ae4 into cf62e85b34 2025-07-31 05:56:39 +00:00

View File

@ -7,8 +7,8 @@ use std::{
};
use futures::future::BoxFuture;
use rangemap::RangeInclusiveMap;
use reth_network::cache::LruMap;
use rangemap::RangeInclusiveMap;
use serde::Deserialize;
use time::{macros::format_description, Date, Duration, OffsetDateTime, Time};
use tokio::sync::Mutex;
@ -33,7 +33,10 @@ impl LocalBlocksCache {
const CACHE_SIZE: u32 = 8000;
fn new() -> Self {
Self { cache: LruMap::new(Self::CACHE_SIZE), ranges: RangeInclusiveMap::new() }
Self {
cache: LruMap::new(Self::CACHE_SIZE),
ranges: RangeInclusiveMap::new(),
}
}
fn load_scan_result(&mut self, scan_result: ScanResult) {
@ -74,17 +77,10 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)>
Ok((parsed_block, height))
}
struct ScanOptions {
start_height: u64,
only_load_ranges: bool,
}
fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult {
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult {
let file = File::open(path).expect("Failed to open hour file path");
let reader = BufReader::new(file);
let ScanOptions { start_height, only_load_ranges } = options;
let mut new_blocks = Vec::new();
let mut last_height = start_height;
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
@ -102,9 +98,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> S
Ok((parsed_block, height)) => {
if height >= start_height {
last_height = last_height.max(height);
if !only_load_ranges {
new_blocks.push(parsed_block);
}
new_blocks.push(parsed_block);
*last_line = line_idx;
}
if matches!(current_range, Some((_, end)) if end + 1 == height) {
@ -143,9 +137,9 @@ impl BlockSource for HlNodeBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
Box::pin(async move {
if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning locally synced block for @ Height [{height}]");
Ok(block)
} else {
info!("Falling back to s3/ingest-dir for block @ Height [{height}]");
self.fallback.collect_block(height).await
}
})
@ -228,11 +222,7 @@ impl HlNodeBlockSource {
};
info!("Loading block data from {:?}", path);
u_cache.load_scan_result(scan_hour_file(
&path,
&mut 0,
ScanOptions { start_height: 0, only_load_ranges: false },
));
u_cache.load_scan_result(scan_hour_file(&path, &mut 0, height));
u_cache.cache.get(&height).cloned()
}
@ -285,28 +275,12 @@ impl HlNodeBlockSource {
warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile);
}
let mut scan_result = scan_hour_file(
&subfile,
&mut 0,
ScanOptions { start_height: cutoff_height, only_load_ranges: true },
);
let mut scan_result = scan_hour_file(&subfile, &mut 0, cutoff_height);
// Only store the block ranges for now; actual block data will be loaded lazily later to optimize memory usage
scan_result.new_blocks.clear();
u_cache.load_scan_result(scan_result);
}
if u_cache.ranges.is_empty() {
warn!("No ranges found in {:?}", root);
} else {
let (min, _) = u_cache.ranges.first_range_value().unwrap();
let (max, _) = u_cache.ranges.last_range_value().unwrap();
info!(
"Populated {} ranges (min: {}, max: {})",
u_cache.ranges.len(),
min.start(),
max.end()
);
}
info!("Backfilled {} blocks", u_cache.cache.len());
Ok(())
}
@ -336,11 +310,7 @@ impl HlNodeBlockSource {
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
if hour_file.exists() {
let scan_result = scan_hour_file(
&hour_file,
&mut last_line,
ScanOptions { start_height: next_height, only_load_ranges: false },
);
let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height);
next_height = scan_result.next_expected_height;
let mut u_cache = cache.lock().await;