4 Commits

Author SHA1 Message Date
c0b3acf181 perf: Reduce log 2025-08-01 17:37:29 +00:00
77158aa164 perf: Do not allocate much when backfilling ranges 2025-08-01 17:30:33 +00:00
2d6b5e5cd2 chore: Improve log 2025-08-01 17:28:31 +00:00
ff2e55b5a2 perf: Do not use cutoff when backfilling 2025-08-01 17:20:09 +00:00

View File

@ -7,8 +7,8 @@ use std::{
};
use futures::future::BoxFuture;
use reth_network::cache::LruMap;
use rangemap::RangeInclusiveMap;
use reth_network::cache::LruMap;
use serde::Deserialize;
use time::{macros::format_description, Date, Duration, OffsetDateTime, Time};
use tokio::sync::Mutex;
@ -33,10 +33,7 @@ impl LocalBlocksCache {
const CACHE_SIZE: u32 = 8000;
fn new() -> Self {
Self {
cache: LruMap::new(Self::CACHE_SIZE),
ranges: RangeInclusiveMap::new(),
}
Self { cache: LruMap::new(Self::CACHE_SIZE), ranges: RangeInclusiveMap::new() }
}
fn load_scan_result(&mut self, scan_result: ScanResult) {
@ -77,10 +74,17 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)>
Ok((parsed_block, height))
}
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult {
struct ScanOptions {
start_height: u64,
only_load_ranges: bool,
}
fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult {
let file = File::open(path).expect("Failed to open hour file path");
let reader = BufReader::new(file);
let ScanOptions { start_height, only_load_ranges } = options;
let mut new_blocks = Vec::new();
let mut last_height = start_height;
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
@ -98,7 +102,9 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan
Ok((parsed_block, height)) => {
if height >= start_height {
last_height = last_height.max(height);
if !only_load_ranges {
new_blocks.push(parsed_block);
}
*last_line = line_idx;
}
if matches!(current_range, Some((_, end)) if end + 1 == height) {
@ -137,9 +143,9 @@ impl BlockSource for HlNodeBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
Box::pin(async move {
if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning locally synced block for @ Height [{height}]");
Ok(block)
} else {
info!("Falling back to s3/ingest-dir for block @ Height [{height}]");
self.fallback.collect_block(height).await
}
})
@ -222,7 +228,11 @@ impl HlNodeBlockSource {
};
info!("Loading block data from {:?}", path);
u_cache.load_scan_result(scan_hour_file(&path, &mut 0, height));
u_cache.load_scan_result(scan_hour_file(
&path,
&mut 0,
ScanOptions { start_height: 0, only_load_ranges: false },
));
u_cache.cache.get(&height).cloned()
}
@ -275,12 +285,28 @@ impl HlNodeBlockSource {
warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile);
}
let mut scan_result = scan_hour_file(&subfile, &mut 0, cutoff_height);
let mut scan_result = scan_hour_file(
&subfile,
&mut 0,
ScanOptions { start_height: cutoff_height, only_load_ranges: true },
);
// Only store the block ranges for now; actual block data will be loaded lazily later to optimize memory usage
scan_result.new_blocks.clear();
u_cache.load_scan_result(scan_result);
}
info!("Backfilled {} blocks", u_cache.cache.len());
if u_cache.ranges.is_empty() {
warn!("No ranges found in {:?}", root);
} else {
let (min, _) = u_cache.ranges.first_range_value().unwrap();
let (max, _) = u_cache.ranges.last_range_value().unwrap();
info!(
"Populated {} ranges (min: {}, max: {})",
u_cache.ranges.len(),
min.start(),
max.end()
);
}
Ok(())
}
@ -310,7 +336,11 @@ impl HlNodeBlockSource {
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
if hour_file.exists() {
let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height);
let scan_result = scan_hour_file(
&hour_file,
&mut last_line,
ScanOptions { start_height: next_height, only_load_ranges: false },
);
next_height = scan_result.next_expected_height;
let mut u_cache = cache.lock().await;