From a766ee023644e24ecfef461329e0188e879eb5c6 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Thu, 31 Jul 2025 01:51:22 -0400 Subject: [PATCH] feat: Support range-based backfill for hl-node ingestion --- Cargo.lock | 7 ++ Cargo.toml | 4 + src/pseudo_peer/sources/hl_node.rs | 127 ++++++++++++++++++++++------- 3 files changed, 107 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 64e868314..ee974619d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6504,6 +6504,12 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rangemap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93e7e49bb0bf967717f7bd674458b3d6b0c5f48ec7e3038166026a69fc22223" + [[package]] name = "ratatui" version = "0.29.0" @@ -9308,6 +9314,7 @@ dependencies = [ "lz4_flex", "once_cell", "parking_lot", + "rangemap", "rayon", "reth", "reth-basic-payload-builder", diff --git a/Cargo.toml b/Cargo.toml index 60e38a5e5..305aca442 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,6 +112,7 @@ aws-sdk-s3 = "1.93.0" aws-config = "1.8.0" rayon = "1.7" time = "0.3.41" +rangemap = "=1.6.0" [target.'cfg(unix)'.dependencies] @@ -162,3 +163,6 @@ client = [ "jsonrpsee/async-client", "reth-rpc-eth-api/client", ] + +[profile.test] +inherits = "release" diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 53af286c4..2d7321986 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -2,11 +2,13 @@ use std::{ collections::HashMap, fs::File, io::{BufRead, BufReader, Read, Seek, SeekFrom}, + ops::RangeInclusive, path::{Path, PathBuf}, sync::Arc, }; use futures::future::BoxFuture; +use rangemap::RangeInclusiveMap; use serde::Deserialize; use time::{macros::format_description, Date, Duration, OffsetDateTime, Time}; use tokio::sync::Mutex; @@ -19,22 +21,45 @@ use super::{BlockSource, BlockSourceBoxed}; const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); const HOURLY_SUBDIR: &str = "hourly"; -type LocalBlocksCache = Arc>>; +#[derive(Debug)] +pub struct LocalBlocksCache { + cache: HashMap, + // Lightweight range map to track the ranges of blocks in the local ingest directory + ranges: RangeInclusiveMap, +} + +impl LocalBlocksCache { + fn new() -> Self { + Self { cache: HashMap::new(), ranges: RangeInclusiveMap::new() } + } + + fn load_scan_result(&mut self, scan_result: ScanResult) { + for blk in scan_result.new_blocks { + let EvmBlock::Reth115(b) = &blk.block; + self.cache.insert(b.header.header.number, blk); + } + for range in scan_result.new_block_ranges { + self.ranges.insert(range, scan_result.path.clone()); + } + } +} /// Block source that monitors the local ingest directory for the HL node. #[derive(Debug, Clone)] pub struct HlNodeBlockSource { pub fallback: BlockSourceBoxed, pub local_ingest_dir: PathBuf, - pub local_blocks_cache: LocalBlocksCache, // height → block + pub local_blocks_cache: Arc>, // height → block } #[derive(Deserialize)] struct LocalBlockAndReceipts(String, BlockAndReceipts); struct ScanResult { + path: PathBuf, next_expected_height: u64, new_blocks: Vec, + new_block_ranges: Vec>, } fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { @@ -55,18 +80,30 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan let lines: Vec = reader.lines().collect::>().unwrap(); let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; + let mut block_ranges: Vec> = Vec::new(); + let mut current_range: Option<(u64, u64)> = None; + for (line_idx, line) in lines.iter().enumerate().skip(skip) { if line_idx < *last_line || line.trim().is_empty() { continue; } match line_to_evm_block(line) { - Ok((parsed_block, height)) if height >= start_height => { - last_height = last_height.max(height); - new_blocks.push(parsed_block); - *last_line = line_idx; + Ok((parsed_block, height)) => { + if height >= start_height { + last_height = last_height.max(height); + new_blocks.push(parsed_block); + *last_line = line_idx; + } + if matches!(current_range, Some((_, end)) if end + 1 == height) { + current_range = Some((current_range.unwrap().0, height)); + } else { + if let Some((start, end)) = current_range.take() { + block_ranges.push(start..=end); + } + current_range = Some((height, height)); + } } - Ok(_) => continue, Err(_) => { warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); continue; @@ -74,7 +111,16 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan } } - ScanResult { next_expected_height: last_height + 1, new_blocks } + if let Some((start, end)) = current_range.take() { + block_ranges.push(start..=end); + } + + ScanResult { + path: path.to_path_buf(), + next_expected_height: last_height + 1, + new_blocks, + new_block_ranges: block_ranges, + } } fn date_from_datetime(dt: OffsetDateTime) -> String { @@ -161,7 +207,16 @@ fn read_last_complete_line(read: &mut R) -> String { impl HlNodeBlockSource { async fn try_collect_local_block(&self, height: u64) -> Option { let mut u_cache = self.local_blocks_cache.lock().await; - u_cache.remove(&height) + if let Some(block) = u_cache.cache.remove(&height) { + return Some(block); + } + + let Some(path) = u_cache.ranges.get(&height).cloned() else { + return None; + }; + + u_cache.load_scan_result(scan_hour_file(&path, &mut 0, height)); + u_cache.cache.get(&height).cloned() } fn datetime_from_path(path: &Path) -> Option { @@ -197,8 +252,8 @@ impl HlNodeBlockSource { async fn try_backfill_local_blocks( root: &Path, - cache: &LocalBlocksCache, - mut next_height: u64, + cache: &Arc>, + cutoff_height: u64, ) -> eyre::Result<()> { let mut u_cache = cache.lock().await; @@ -206,22 +261,19 @@ impl HlNodeBlockSource { let mut file = File::open(&subfile).expect("Failed to open hour file path"); let last_line = read_last_complete_line(&mut file); if let Ok((_, height)) = line_to_evm_block(&last_line) { - if height < next_height { + if height < cutoff_height { continue; } } else { warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile); } - let ScanResult { next_expected_height, new_blocks } = - scan_hour_file(&subfile, &mut 0, next_height); - for blk in new_blocks { - let EvmBlock::Reth115(b) = &blk.block; - u_cache.insert(b.header.header.number, blk); - } - next_height = next_expected_height; + let mut scan_result = scan_hour_file(&subfile, &mut 0, cutoff_height); + // Only store the block ranges for now; actual block data will be loaded lazily later to optimize memory usage + scan_result.new_blocks.clear(); + u_cache.load_scan_result(scan_result); } - info!("Backfilled {} blocks", u_cache.len()); + info!("Backfilled {} blocks", u_cache.cache.len()); Ok(()) } @@ -251,16 +303,11 @@ impl HlNodeBlockSource { let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); if hour_file.exists() { - let ScanResult { next_expected_height, new_blocks } = - scan_hour_file(&hour_file, &mut last_line, next_height); - if !new_blocks.is_empty() { - let mut u_cache = cache.lock().await; - for blk in new_blocks { - let EvmBlock::Reth115(b) = &blk.block; - u_cache.insert(b.header.header.number, blk); - } - next_height = next_expected_height; - } + let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height); + next_height = scan_result.next_expected_height; + + let mut u_cache = cache.lock().await; + u_cache.load_scan_result(scan_result); } let now = OffsetDateTime::now_utc(); @@ -302,7 +349,7 @@ impl HlNodeBlockSource { let block_source = HlNodeBlockSource { fallback, local_ingest_dir, - local_blocks_cache: Arc::new(Mutex::new(HashMap::new())), + local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())), }; block_source.run(next_block_number).await.unwrap(); block_source @@ -318,4 +365,22 @@ mod tests { let dt = HlNodeBlockSource::datetime_from_path(path).unwrap(); println!("{:?}", dt); } + + #[tokio::test] + async fn test_backfill() { + let test_path = Path::new("/root/evm_block_and_receipts"); + if !test_path.exists() { + return; + } + + let cache = Arc::new(Mutex::new(LocalBlocksCache::new())); + HlNodeBlockSource::try_backfill_local_blocks(&test_path, &cache, 1000000).await.unwrap(); + + let u_cache = cache.lock().await; + println!("{:?}", u_cache.ranges); + assert_eq!( + u_cache.ranges.get(&9735058), + Some(&test_path.join(HOURLY_SUBDIR).join("20250729").join("22")) + ); + } }