From a47674ac30da7710a9557cdc123d907227a658a2 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Wed, 30 Jul 2025 04:05:52 +0000 Subject: [PATCH 01/14] feat: Add backfill support from local node This currently does not support huge amount of backfill log within local node --- src/pseudo_peer/sources/hl_node.rs | 153 +++++++++++++++++++++++++---- 1 file changed, 132 insertions(+), 21 deletions(-) diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 07cafe3b7..bc712265e 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -1,16 +1,16 @@ use std::{ - io::{BufRead, BufReader}, + io::{BufRead, BufReader, Read, Seek, SeekFrom}, path::{Path, PathBuf}, sync::Arc, }; -use eyre::ContextCompat; +use eyre::{Context, ContextCompat}; use futures::future::BoxFuture; use reth_network::cache::LruMap; use serde::Deserialize; use time::{format_description, Duration, OffsetDateTime}; use tokio::sync::Mutex; -use tracing::info; +use tracing::{info, warn}; use crate::node::types::{BlockAndReceipts, EvmBlock}; @@ -24,6 +24,8 @@ const HOURLY_SUBDIR: &str = "hourly"; /// In normal situation, 0~1 blocks will be cached. const CACHE_SIZE: u32 = 1000; +type LocalBlocksCache = Arc>>; + /// Block source that monitors the local ingest directory for the HL node. /// /// In certain situations, the [hl-node][ref] may offer lower latency compared to S3. @@ -37,7 +39,7 @@ const CACHE_SIZE: u32 = 1000; pub struct HlNodeBlockSource { pub fallback: BlockSourceBoxed, pub local_ingest_dir: PathBuf, - pub local_blocks_cache: Arc>>, // height → block + pub local_blocks_cache: LocalBlocksCache, // height → block } #[derive(Deserialize)] @@ -48,6 +50,15 @@ struct ScanResult { new_blocks: Vec, } +fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { + let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = + serde_json::from_str(line)?; + let height = match &parsed_block.block { + EvmBlock::Reth115(b) => b.header.header.number, + }; + Ok((parsed_block, height)) +} + fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult { // info!( // "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}", @@ -70,25 +81,17 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan continue; } - let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = - serde_json::from_str(line).expect("Failed to parse local block and receipts"); - - let height = match &parsed_block.block { - EvmBlock::Reth115(b) => { - let block_number = b.header.header.number; - // Another check to ensure not returning an older block - if block_number < start_height { - continue; - } - block_number - } + let Ok((parsed_block, height)) = line_to_evm_block(&line) else { + warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); + continue; }; - // println!("Iterating block height {:?} | Line {}", height, line_idx); - if height >= start_height { - last_height = last_height.max(height); - new_blocks.push(parsed_block); - *last_line = line_idx; + if height < start_height { + continue; } + + last_height = last_height.max(height); + new_blocks.push(parsed_block); + *last_line = line_idx; } ScanResult { next_expected_height: last_height + 1, new_blocks } @@ -129,17 +132,125 @@ fn to_hourly(dt: OffsetDateTime) -> Result String { + const CHUNK_SIZE: u64 = 4096; + let mut file = std::fs::File::open(path).expect("Failed to open hour file path"); + let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); + let mut pos = file.seek(SeekFrom::End(0)).unwrap(); + let mut last_line: Vec = Vec::new(); + + // Read backwards in chunks until we find a newline or reach the start + while pos > 0 { + let read_size = std::cmp::min(pos, CHUNK_SIZE); + buf.resize(read_size as usize, 0); + + file.seek(SeekFrom::Start(pos - read_size)).unwrap(); + file.read_exact(&mut buf).unwrap(); + + last_line = [buf.clone(), last_line].concat(); + + // Remove trailing newline + if last_line.ends_with(b"\n") { + last_line.pop(); + } + + if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { + // Found a newline, so the last line starts after this + let start = idx + 1; + return String::from_utf8(last_line[start..].to_vec()).unwrap(); + } + + if pos < read_size { + break; + } + pos -= read_size; + } + + // There is 0~1 lines in the entire file + String::from_utf8(last_line).unwrap() +} + impl HlNodeBlockSource { async fn try_collect_local_block(&self, height: u64) -> Option { let mut u_cache = self.local_blocks_cache.lock().await; u_cache.remove(&height) } + async fn try_backfill_local_blocks( + root: &Path, + cache: &LocalBlocksCache, + mut next_height: u64, + ) -> eyre::Result<()> { + fn parse_file_name(f: PathBuf) -> Option<(u64, PathBuf)> { + // Validate and returns sort key for hourly//<0-23> + let file_name = f.file_name()?.to_str()?; + let Ok(file_name_num) = file_name.parse::() else { + warn!("Failed to parse file name: {:?}", f); + return None; + }; + + // Check if filename is numeric and 0..24 + if !(0..=24).contains(&file_name_num) { + return None; + } + Some((file_name_num, f)) + } + + let mut u_cache = cache.lock().await; + + // We assume that ISO format is sorted properly using naive string sort + let hourly_subdir = root.join(HOURLY_SUBDIR); + let mut files: Vec<_> = std::fs::read_dir(hourly_subdir) + .context("Failed to read hourly subdir")? + .filter_map(|f| f.ok().map(|f| f.path())) + .collect(); + files.sort(); + + for file in files { + let mut subfiles: Vec<_> = file + .read_dir() + .context("Failed to read hourly subdir")? + .filter_map(|f| f.ok().map(|f| f.path())) + .filter_map(parse_file_name) + .collect(); + subfiles.sort(); + + for (file_name_num, subfile) in subfiles { + if file_name_num < next_height { + continue; + } + + // Fast path: check the last line of the file + let last_line = read_last_line(&subfile); + if let Ok((_, height)) = line_to_evm_block(&last_line) { + if height < next_height { + continue; + } + } else { + warn!("Failed to parse last line of file, fallback to slow path: {:?}", file); + } + + let ScanResult { next_expected_height, new_blocks } = + scan_hour_file(&file, &mut 0, next_height); + for blk in new_blocks { + let EvmBlock::Reth115(b) = &blk.block; + u_cache.insert(b.header.header.number, blk); + } + next_height = next_expected_height; + } + } + + Ok(()) + } + async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) { let root = self.local_ingest_dir.to_owned(); let cache = self.local_blocks_cache.clone(); tokio::spawn(async move { + // hl-node backfill is for fast path; do not exit when it fails + let _ = Self::try_backfill_local_blocks(&root, &cache, current_head).await; + let mut next_height = current_head; let mut dt = to_hourly(datetime_from_timestamp(current_ts)).unwrap(); From a2e978dc0ca2b011893d4d73b958b38516d003f3 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Wed, 30 Jul 2025 04:37:49 +0000 Subject: [PATCH 02/14] fix typo --- src/pseudo_peer/sources/hl_node.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index bc712265e..4b27287a4 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -215,11 +215,7 @@ impl HlNodeBlockSource { .collect(); subfiles.sort(); - for (file_name_num, subfile) in subfiles { - if file_name_num < next_height { - continue; - } - + for (_, subfile) in subfiles { // Fast path: check the last line of the file let last_line = read_last_line(&subfile); if let Ok((_, height)) = line_to_evm_block(&last_line) { @@ -239,6 +235,7 @@ impl HlNodeBlockSource { next_height = next_expected_height; } } + info!("Backfilled {} blocks", u_cache.len()); Ok(()) } From 9fcc04e88997e4c0c589faf2c944b68b8cf34731 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Wed, 30 Jul 2025 21:27:55 -0400 Subject: [PATCH 03/14] fix: Use correct cutoff block number --- Cargo.lock | 1 + Cargo.toml | 1 + src/node/network/mod.rs | 9 ++++++++- src/pseudo_peer/config.rs | 6 ++++-- src/pseudo_peer/sources/hl_node.rs | 24 +++++++++--------------- 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 30b391f69..64e868314 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9341,6 +9341,7 @@ dependencies = [ "reth-rpc", "reth-rpc-engine-api", "reth-rpc-eth-api", + "reth-stages-types", "reth-tracing", "reth-transaction-pool", "reth-trie-common", diff --git a/Cargo.toml b/Cargo.toml index a45fba461..60e38a5e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ reth-trie-common = { git = "https://github.com/sprites0/reth", rev = "fc754e5983 reth-trie-db = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-codecs = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-transaction-pool = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } +reth-stages-types = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } revm = { version = "26.0.1" } # alloy dependencies diff --git a/src/node/network/mod.rs b/src/node/network/mod.rs index 75b8f689a..5028bbafc 100644 --- a/src/node/network/mod.rs +++ b/src/node/network/mod.rs @@ -24,6 +24,8 @@ use reth_eth_wire::{BasicNetworkPrimitives, NewBlock, NewBlockPayload}; use reth_ethereum_primitives::PooledTransactionVariant; use reth_network::{NetworkConfig, NetworkHandle, NetworkManager}; use reth_network_api::PeersInfo; +use reth_provider::StageCheckpointReader; +use reth_stages_types::StageId; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex}; use tracing::info; @@ -235,8 +237,13 @@ where let chain_spec = ctx.chain_spec(); info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized"); + let next_block_number = + ctx.provider().get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number + + 1; + ctx.task_executor().spawn_critical("pseudo peer", async move { - let block_source = block_source_config.create_cached_block_source().await; + let block_source = + block_source_config.create_cached_block_source(next_block_number).await; start_pseudo_peer(chain_spec, local_node_record.to_string(), block_source) .await .unwrap(); diff --git a/src/pseudo_peer/config.rs b/src/pseudo_peer/config.rs index e12d3636b..594e0a9fd 100644 --- a/src/pseudo_peer/config.rs +++ b/src/pseudo_peer/config.rs @@ -72,6 +72,7 @@ impl BlockSourceConfig { pub async fn create_block_source_from_node( &self, + next_block_number: u64, fallback_block_source: BlockSourceBoxed, ) -> BlockSourceBoxed { let Some(block_source_from_node) = self.block_source_from_node.as_ref() else { @@ -82,14 +83,15 @@ impl BlockSourceConfig { HlNodeBlockSource::new( fallback_block_source, PathBuf::from(block_source_from_node.clone()), + next_block_number, ) .await, )) } - pub async fn create_cached_block_source(&self) -> BlockSourceBoxed { + pub async fn create_cached_block_source(&self, next_block_number: u64) -> BlockSourceBoxed { let block_source = self.create_block_source().await; - let block_source = self.create_block_source_from_node(block_source).await; + let block_source = self.create_block_source_from_node(next_block_number, block_source).await; Arc::new(Box::new(CachedBlockSource::new(block_source))) } } diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 4b27287a4..531d94d93 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -4,7 +4,7 @@ use std::{ sync::Arc, }; -use eyre::{Context, ContextCompat}; +use eyre::Context; use futures::future::BoxFuture; use reth_network::cache::LruMap; use serde::Deserialize; @@ -81,7 +81,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan continue; } - let Ok((parsed_block, height)) = line_to_evm_block(&line) else { + let Ok((parsed_block, height)) = line_to_evm_block(line) else { warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); continue; }; @@ -223,11 +223,11 @@ impl HlNodeBlockSource { continue; } } else { - warn!("Failed to parse last line of file, fallback to slow path: {:?}", file); + warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile); } let ScanResult { next_expected_height, new_blocks } = - scan_hour_file(&file, &mut 0, next_height); + scan_hour_file(&subfile, &mut 0, next_height); for blk in new_blocks { let EvmBlock::Reth115(b) = &blk.block; u_cache.insert(b.header.header.number, blk); @@ -298,29 +298,23 @@ impl HlNodeBlockSource { }); } - pub(crate) async fn run(&self) -> eyre::Result<()> { - let latest_block_number = self - .fallback - .find_latest_block_number() - .await - .context("Failed to find latest block number")?; - + pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> { let EvmBlock::Reth115(latest_block) = - self.fallback.collect_block(latest_block_number).await?.block; + self.fallback.collect_block(next_block_number).await?.block; let latest_block_ts = latest_block.header.header.timestamp; - self.start_local_ingest_loop(latest_block_number, latest_block_ts).await; + self.start_local_ingest_loop(next_block_number, latest_block_ts).await; Ok(()) } - pub async fn new(fallback: BlockSourceBoxed, local_ingest_dir: PathBuf) -> Self { + pub async fn new(fallback: BlockSourceBoxed, local_ingest_dir: PathBuf, next_block_number: u64) -> Self { let block_source = HlNodeBlockSource { fallback, local_ingest_dir, local_blocks_cache: Arc::new(Mutex::new(LruMap::new(CACHE_SIZE))), }; - block_source.run().await.unwrap(); + block_source.run(next_block_number).await.unwrap(); block_source } } From c34ee01b702d7a8479d157b9c1bfe3b4406072e7 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Wed, 30 Jul 2025 22:20:21 -0400 Subject: [PATCH 04/14] fix: Do not use LruMap LruMap does not support backfill. --- src/pseudo_peer/sources/hl_node.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 531d94d93..5281e5e6d 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -1,4 +1,5 @@ use std::{ + collections::HashMap, io::{BufRead, BufReader, Read, Seek, SeekFrom}, path::{Path, PathBuf}, sync::Arc, @@ -6,7 +7,6 @@ use std::{ use eyre::Context; use futures::future::BoxFuture; -use reth_network::cache::LruMap; use serde::Deserialize; use time::{format_description, Duration, OffsetDateTime}; use tokio::sync::Mutex; @@ -20,11 +20,8 @@ use super::{BlockSource, BlockSourceBoxed}; const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); /// Sub‑directory that contains day folders (inside `local_ingest_dir`). const HOURLY_SUBDIR: &str = "hourly"; -/// Maximum number of blocks to cache blocks from hl-node. -/// In normal situation, 0~1 blocks will be cached. -const CACHE_SIZE: u32 = 1000; -type LocalBlocksCache = Arc>>; +type LocalBlocksCache = Arc>>; /// Block source that monitors the local ingest directory for the HL node. /// @@ -223,7 +220,10 @@ impl HlNodeBlockSource { continue; } } else { - warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile); + warn!( + "Failed to parse last line of file, fallback to slow path: {:?}", + subfile + ); } let ScanResult { next_expected_height, new_blocks } = @@ -308,11 +308,15 @@ impl HlNodeBlockSource { Ok(()) } - pub async fn new(fallback: BlockSourceBoxed, local_ingest_dir: PathBuf, next_block_number: u64) -> Self { + pub async fn new( + fallback: BlockSourceBoxed, + local_ingest_dir: PathBuf, + next_block_number: u64, + ) -> Self { let block_source = HlNodeBlockSource { fallback, local_ingest_dir, - local_blocks_cache: Arc::new(Mutex::new(LruMap::new(CACHE_SIZE))), + local_blocks_cache: Arc::new(Mutex::new(HashMap::new())), }; block_source.run(next_block_number).await.unwrap(); block_source From 4e59ee62dcfe80482d148be08c0a13f1efce4fa0 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Wed, 30 Jul 2025 23:29:18 -0400 Subject: [PATCH 05/14] fix: Reduce fallback usage in hl-node ingestion --- src/pseudo_peer/sources/hl_node.rs | 227 +++++++++++++---------------- 1 file changed, 104 insertions(+), 123 deletions(-) diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 5281e5e6d..d3136bf73 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -1,11 +1,11 @@ use std::{ collections::HashMap, + fs::File, io::{BufRead, BufReader, Read, Seek, SeekFrom}, path::{Path, PathBuf}, sync::Arc, }; -use eyre::Context; use futures::future::BoxFuture; use serde::Deserialize; use time::{format_description, Duration, OffsetDateTime}; @@ -16,22 +16,12 @@ use crate::node::types::{BlockAndReceipts, EvmBlock}; use super::{BlockSource, BlockSourceBoxed}; -/// Poll interval when tailing an *open* hourly file. const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); -/// Sub‑directory that contains day folders (inside `local_ingest_dir`). const HOURLY_SUBDIR: &str = "hourly"; type LocalBlocksCache = Arc>>; /// Block source that monitors the local ingest directory for the HL node. -/// -/// In certain situations, the [hl-node][ref] may offer lower latency compared to S3. -/// This block source caches blocks from the HL node to minimize latency, -/// while still falling back to [super::LocalBlockSource] or [super::S3BlockSource] when needed. -/// -/// Originally introduced in https://github.com/hl-archive-node/nanoreth/pull/7 -/// -/// [ref]: https://github.com/hyperliquid-dex/node #[derive(Debug, Clone)] pub struct HlNodeBlockSource { pub fallback: BlockSourceBoxed, @@ -57,48 +47,36 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> } fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult { - // info!( - // "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}", - // path, start_height, last_line - // ); - let file = std::fs::File::open(path).expect("Failed to open hour file path"); + let file = File::open(path).expect("Failed to open hour file path"); let reader = BufReader::new(file); - let mut new_blocks = Vec::::new(); + let mut new_blocks = Vec::new(); let mut last_height = start_height; let lines: Vec = reader.lines().collect::>().unwrap(); let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; for (line_idx, line) in lines.iter().enumerate().skip(skip) { - // Safety check ensuring efficiency - if line_idx < *last_line { - continue; - } - if line.trim().is_empty() { + if line_idx < *last_line || line.trim().is_empty() { continue; } - let Ok((parsed_block, height)) = line_to_evm_block(line) else { - warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); - continue; - }; - if height < start_height { - continue; + match line_to_evm_block(line) { + Ok((parsed_block, height)) if height >= start_height => { + last_height = last_height.max(height); + new_blocks.push(parsed_block); + *last_line = line_idx; + } + Ok(_) => continue, + Err(_) => { + warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); + continue; + } } - - last_height = last_height.max(height); - new_blocks.push(parsed_block); - *last_line = line_idx; } ScanResult { next_expected_height: last_height + 1, new_blocks } } -fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime { - OffsetDateTime::from_unix_timestamp_nanos((ts_sec as i128) * 1_000 * 1_000_000) - .expect("timestamp out of range") -} - fn date_from_datetime(dt: OffsetDateTime) -> String { dt.format(&format_description::parse("[year][month][day]").unwrap()).unwrap() } @@ -106,7 +84,6 @@ fn date_from_datetime(dt: OffsetDateTime) -> String { impl BlockSource for HlNodeBlockSource { fn collect_block(&self, height: u64) -> BoxFuture> { Box::pin(async move { - // Not a one liner (using .or) to include logs if let Some(block) = self.try_collect_local_block(height).await { info!("Returning locally synced block for @ Height [{height}]"); Ok(block) @@ -116,8 +93,24 @@ impl BlockSource for HlNodeBlockSource { }) } - fn find_latest_block_number(&self) -> futures::future::BoxFuture> { - self.fallback.find_latest_block_number() + fn find_latest_block_number(&self) -> BoxFuture> { + Box::pin(async move { + let Some(dir) = Self::find_latest_hourly_file(&self.local_ingest_dir) else { + warn!( + "No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir", + self.local_ingest_dir + ); + return None; + }; + let mut file = File::open(&dir).expect("Failed to open hour file path"); + let last_line = read_last_complete_line(&mut file); + let Ok((_, height)) = line_to_evm_block(&last_line) else { + return None; + }; + + info!("Latest block number: {} with path {}", height, dir.display()); + Some(height) + }) } fn recommended_chunk_size(&self) -> u64 { @@ -125,36 +118,31 @@ impl BlockSource for HlNodeBlockSource { } } -fn to_hourly(dt: OffsetDateTime) -> Result { - dt.replace_minute(0)?.replace_second(0)?.replace_nanosecond(0) -} - -fn read_last_line(path: &Path) -> String { +fn read_last_complete_line(read: &mut R) -> String { const CHUNK_SIZE: u64 = 4096; - let mut file = std::fs::File::open(path).expect("Failed to open hour file path"); let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); - let mut pos = file.seek(SeekFrom::End(0)).unwrap(); + let mut pos = read.seek(SeekFrom::End(0)).unwrap(); let mut last_line: Vec = Vec::new(); - // Read backwards in chunks until we find a newline or reach the start while pos > 0 { let read_size = std::cmp::min(pos, CHUNK_SIZE); buf.resize(read_size as usize, 0); - file.seek(SeekFrom::Start(pos - read_size)).unwrap(); - file.read_exact(&mut buf).unwrap(); + read.seek(SeekFrom::Start(pos - read_size)).unwrap(); + read.read_exact(&mut buf).unwrap(); last_line = [buf.clone(), last_line].concat(); - // Remove trailing newline if last_line.ends_with(b"\n") { last_line.pop(); } if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { - // Found a newline, so the last line starts after this - let start = idx + 1; - return String::from_utf8(last_line[start..].to_vec()).unwrap(); + let candidate = &last_line[idx + 1..]; + if line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap()).is_ok() { + return String::from_utf8(candidate.to_vec()).unwrap(); + } + last_line.truncate(idx); } if pos < read_size { @@ -163,7 +151,6 @@ fn read_last_line(path: &Path) -> String { pos -= read_size; } - // There is 0~1 lines in the entire file String::from_utf8(last_line).unwrap() } @@ -173,83 +160,81 @@ impl HlNodeBlockSource { u_cache.remove(&height) } + fn datetime_from_path(path: &Path) -> Option { + let dt_part = path.parent()?.parent()?.file_name()?.to_str()?; + let hour_part = path.file_name()?.to_str()?; + let dt = OffsetDateTime::parse( + dt_part, + &format_description::parse("[year][month][day]").unwrap(), + ) + .ok()?; + let hour: u8 = hour_part.parse().ok()?; + let dt = dt.replace_hour(hour).ok()?; + Some(dt) + } + + fn all_hourly_files(root: &Path) -> Option> { + let dir = root.join(HOURLY_SUBDIR); + let mut files = Vec::new(); + for entry in std::fs::read_dir(dir).ok()? { + let file = entry.ok()?.path(); + let subfiles: Vec<_> = + std::fs::read_dir(&file).ok()?.filter_map(|f| f.ok().map(|f| f.path())).collect(); + files.extend(subfiles); + } + files.sort(); + Some(files) + } + + fn find_latest_hourly_file(root: &Path) -> Option { + Self::all_hourly_files(root)?.last().cloned() + } + async fn try_backfill_local_blocks( root: &Path, cache: &LocalBlocksCache, mut next_height: u64, ) -> eyre::Result<()> { - fn parse_file_name(f: PathBuf) -> Option<(u64, PathBuf)> { - // Validate and returns sort key for hourly//<0-23> - let file_name = f.file_name()?.to_str()?; - let Ok(file_name_num) = file_name.parse::() else { - warn!("Failed to parse file name: {:?}", f); - return None; - }; - - // Check if filename is numeric and 0..24 - if !(0..=24).contains(&file_name_num) { - return None; - } - Some((file_name_num, f)) - } - let mut u_cache = cache.lock().await; - // We assume that ISO format is sorted properly using naive string sort - let hourly_subdir = root.join(HOURLY_SUBDIR); - let mut files: Vec<_> = std::fs::read_dir(hourly_subdir) - .context("Failed to read hourly subdir")? - .filter_map(|f| f.ok().map(|f| f.path())) - .collect(); - files.sort(); - - for file in files { - let mut subfiles: Vec<_> = file - .read_dir() - .context("Failed to read hourly subdir")? - .filter_map(|f| f.ok().map(|f| f.path())) - .filter_map(parse_file_name) - .collect(); - subfiles.sort(); - - for (_, subfile) in subfiles { - // Fast path: check the last line of the file - let last_line = read_last_line(&subfile); - if let Ok((_, height)) = line_to_evm_block(&last_line) { - if height < next_height { - continue; - } - } else { - warn!( - "Failed to parse last line of file, fallback to slow path: {:?}", - subfile - ); + for subfile in Self::all_hourly_files(root).unwrap_or_default() { + let mut file = File::open(&subfile).expect("Failed to open hour file path"); + let last_line = read_last_complete_line(&mut file); + if let Ok((_, height)) = line_to_evm_block(&last_line) { + if height < next_height { + continue; } - - let ScanResult { next_expected_height, new_blocks } = - scan_hour_file(&subfile, &mut 0, next_height); - for blk in new_blocks { - let EvmBlock::Reth115(b) = &blk.block; - u_cache.insert(b.header.header.number, blk); - } - next_height = next_expected_height; + } else { + warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile); } + + let ScanResult { next_expected_height, new_blocks } = + scan_hour_file(&subfile, &mut 0, next_height); + for blk in new_blocks { + let EvmBlock::Reth115(b) = &blk.block; + u_cache.insert(b.header.header.number, blk); + } + next_height = next_expected_height; } info!("Backfilled {} blocks", u_cache.len()); Ok(()) } - async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) { + async fn start_local_ingest_loop(&self, current_head: u64) { let root = self.local_ingest_dir.to_owned(); let cache = self.local_blocks_cache.clone(); tokio::spawn(async move { - // hl-node backfill is for fast path; do not exit when it fails - let _ = Self::try_backfill_local_blocks(&root, &cache, current_head).await; - let mut next_height = current_head; - let mut dt = to_hourly(datetime_from_timestamp(current_ts)).unwrap(); + + // Wait for the first hourly file to be created + let mut dt = loop { + if let Some(latest_file) = Self::find_latest_hourly_file(&root) { + break Self::datetime_from_path(&latest_file).unwrap(); + } + tokio::time::sleep(TAIL_INTERVAL).await; + }; let mut hour = dt.hour(); let mut day_str = date_from_datetime(dt); @@ -273,14 +258,8 @@ impl HlNodeBlockSource { } } - // Decide whether the *current* hour file is closed (past) or - // still live. If it’s in the past by > 1 h, move to next hour; - // otherwise, keep tailing the same file. let now = OffsetDateTime::now_utc(); - // println!("Date Current {:?}", dt); - // println!("Now Current {:?}", now); - if dt + Duration::HOUR < now { dt += Duration::HOUR; hour = dt.hour(); @@ -299,12 +278,14 @@ impl HlNodeBlockSource { } pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> { - let EvmBlock::Reth115(latest_block) = - self.fallback.collect_block(next_block_number).await?.block; + let _ = Self::try_backfill_local_blocks( + &self.local_ingest_dir, + &self.local_blocks_cache, + next_block_number, + ) + .await; - let latest_block_ts = latest_block.header.header.timestamp; - - self.start_local_ingest_loop(next_block_number, latest_block_ts).await; + self.start_local_ingest_loop(next_block_number).await; Ok(()) } From 78ae5643b1aa21b1058a70ae88bb47b3dfe10f34 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Wed, 30 Jul 2025 23:32:25 -0400 Subject: [PATCH 06/14] fix: Should use fallback when there is no hl-node files --- src/pseudo_peer/sources/hl_node.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index d3136bf73..fae4d36aa 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -100,12 +100,16 @@ impl BlockSource for HlNodeBlockSource { "No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir", self.local_ingest_dir ); - return None; + return self.fallback.find_latest_block_number().await; }; let mut file = File::open(&dir).expect("Failed to open hour file path"); let last_line = read_last_complete_line(&mut file); let Ok((_, height)) = line_to_evm_block(&last_line) else { - return None; + warn!( + "Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir", + file + ); + return self.fallback.find_latest_block_number().await; }; info!("Latest block number: {} with path {}", height, dir.display()); From 46c9d4cbf959416fb18950892d247bfac01491ad Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Thu, 31 Jul 2025 01:00:28 -0400 Subject: [PATCH 07/14] fix: Fix path parser --- src/pseudo_peer/sources/hl_node.rs | 36 ++++++++++++++++++++---------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index fae4d36aa..53af286c4 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -8,7 +8,7 @@ use std::{ use futures::future::BoxFuture; use serde::Deserialize; -use time::{format_description, Duration, OffsetDateTime}; +use time::{macros::format_description, Date, Duration, OffsetDateTime, Time}; use tokio::sync::Mutex; use tracing::{info, warn}; @@ -78,7 +78,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan } fn date_from_datetime(dt: OffsetDateTime) -> String { - dt.format(&format_description::parse("[year][month][day]").unwrap()).unwrap() + dt.format(&format_description!("[year][month][day]")).unwrap() } impl BlockSource for HlNodeBlockSource { @@ -165,16 +165,14 @@ impl HlNodeBlockSource { } fn datetime_from_path(path: &Path) -> Option { - let dt_part = path.parent()?.parent()?.file_name()?.to_str()?; + let dt_part = path.parent()?.file_name()?.to_str()?; let hour_part = path.file_name()?.to_str()?; - let dt = OffsetDateTime::parse( - dt_part, - &format_description::parse("[year][month][day]").unwrap(), - ) - .ok()?; + let hour: u8 = hour_part.parse().ok()?; - let dt = dt.replace_hour(hour).ok()?; - Some(dt) + Some(OffsetDateTime::new_utc( + Date::parse(&format!("{dt_part}"), &format_description!("[year][month][day]")).ok()?, + Time::from_hms(hour, 0, 0).ok()?, + )) } fn all_hourly_files(root: &Path) -> Option> { @@ -182,8 +180,11 @@ impl HlNodeBlockSource { let mut files = Vec::new(); for entry in std::fs::read_dir(dir).ok()? { let file = entry.ok()?.path(); - let subfiles: Vec<_> = - std::fs::read_dir(&file).ok()?.filter_map(|f| f.ok().map(|f| f.path())).collect(); + let subfiles: Vec<_> = std::fs::read_dir(&file) + .ok()? + .filter_map(|f| f.ok().map(|f| f.path())) + .filter(|p| Self::datetime_from_path(p).is_some()) + .collect(); files.extend(subfiles); } files.sort(); @@ -307,3 +308,14 @@ impl HlNodeBlockSource { block_source } } + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_datetime_from_path() { + let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4"); + let dt = HlNodeBlockSource::datetime_from_path(path).unwrap(); + println!("{:?}", dt); + } +} From a766ee023644e24ecfef461329e0188e879eb5c6 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Thu, 31 Jul 2025 01:51:22 -0400 Subject: [PATCH 08/14] feat: Support range-based backfill for hl-node ingestion --- Cargo.lock | 7 ++ Cargo.toml | 4 + src/pseudo_peer/sources/hl_node.rs | 127 ++++++++++++++++++++++------- 3 files changed, 107 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 64e868314..ee974619d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6504,6 +6504,12 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rangemap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93e7e49bb0bf967717f7bd674458b3d6b0c5f48ec7e3038166026a69fc22223" + [[package]] name = "ratatui" version = "0.29.0" @@ -9308,6 +9314,7 @@ dependencies = [ "lz4_flex", "once_cell", "parking_lot", + "rangemap", "rayon", "reth", "reth-basic-payload-builder", diff --git a/Cargo.toml b/Cargo.toml index 60e38a5e5..305aca442 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -112,6 +112,7 @@ aws-sdk-s3 = "1.93.0" aws-config = "1.8.0" rayon = "1.7" time = "0.3.41" +rangemap = "=1.6.0" [target.'cfg(unix)'.dependencies] @@ -162,3 +163,6 @@ client = [ "jsonrpsee/async-client", "reth-rpc-eth-api/client", ] + +[profile.test] +inherits = "release" diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 53af286c4..2d7321986 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -2,11 +2,13 @@ use std::{ collections::HashMap, fs::File, io::{BufRead, BufReader, Read, Seek, SeekFrom}, + ops::RangeInclusive, path::{Path, PathBuf}, sync::Arc, }; use futures::future::BoxFuture; +use rangemap::RangeInclusiveMap; use serde::Deserialize; use time::{macros::format_description, Date, Duration, OffsetDateTime, Time}; use tokio::sync::Mutex; @@ -19,22 +21,45 @@ use super::{BlockSource, BlockSourceBoxed}; const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); const HOURLY_SUBDIR: &str = "hourly"; -type LocalBlocksCache = Arc>>; +#[derive(Debug)] +pub struct LocalBlocksCache { + cache: HashMap, + // Lightweight range map to track the ranges of blocks in the local ingest directory + ranges: RangeInclusiveMap, +} + +impl LocalBlocksCache { + fn new() -> Self { + Self { cache: HashMap::new(), ranges: RangeInclusiveMap::new() } + } + + fn load_scan_result(&mut self, scan_result: ScanResult) { + for blk in scan_result.new_blocks { + let EvmBlock::Reth115(b) = &blk.block; + self.cache.insert(b.header.header.number, blk); + } + for range in scan_result.new_block_ranges { + self.ranges.insert(range, scan_result.path.clone()); + } + } +} /// Block source that monitors the local ingest directory for the HL node. #[derive(Debug, Clone)] pub struct HlNodeBlockSource { pub fallback: BlockSourceBoxed, pub local_ingest_dir: PathBuf, - pub local_blocks_cache: LocalBlocksCache, // height → block + pub local_blocks_cache: Arc>, // height → block } #[derive(Deserialize)] struct LocalBlockAndReceipts(String, BlockAndReceipts); struct ScanResult { + path: PathBuf, next_expected_height: u64, new_blocks: Vec, + new_block_ranges: Vec>, } fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { @@ -55,18 +80,30 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan let lines: Vec = reader.lines().collect::>().unwrap(); let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; + let mut block_ranges: Vec> = Vec::new(); + let mut current_range: Option<(u64, u64)> = None; + for (line_idx, line) in lines.iter().enumerate().skip(skip) { if line_idx < *last_line || line.trim().is_empty() { continue; } match line_to_evm_block(line) { - Ok((parsed_block, height)) if height >= start_height => { - last_height = last_height.max(height); - new_blocks.push(parsed_block); - *last_line = line_idx; + Ok((parsed_block, height)) => { + if height >= start_height { + last_height = last_height.max(height); + new_blocks.push(parsed_block); + *last_line = line_idx; + } + if matches!(current_range, Some((_, end)) if end + 1 == height) { + current_range = Some((current_range.unwrap().0, height)); + } else { + if let Some((start, end)) = current_range.take() { + block_ranges.push(start..=end); + } + current_range = Some((height, height)); + } } - Ok(_) => continue, Err(_) => { warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); continue; @@ -74,7 +111,16 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan } } - ScanResult { next_expected_height: last_height + 1, new_blocks } + if let Some((start, end)) = current_range.take() { + block_ranges.push(start..=end); + } + + ScanResult { + path: path.to_path_buf(), + next_expected_height: last_height + 1, + new_blocks, + new_block_ranges: block_ranges, + } } fn date_from_datetime(dt: OffsetDateTime) -> String { @@ -161,7 +207,16 @@ fn read_last_complete_line(read: &mut R) -> String { impl HlNodeBlockSource { async fn try_collect_local_block(&self, height: u64) -> Option { let mut u_cache = self.local_blocks_cache.lock().await; - u_cache.remove(&height) + if let Some(block) = u_cache.cache.remove(&height) { + return Some(block); + } + + let Some(path) = u_cache.ranges.get(&height).cloned() else { + return None; + }; + + u_cache.load_scan_result(scan_hour_file(&path, &mut 0, height)); + u_cache.cache.get(&height).cloned() } fn datetime_from_path(path: &Path) -> Option { @@ -197,8 +252,8 @@ impl HlNodeBlockSource { async fn try_backfill_local_blocks( root: &Path, - cache: &LocalBlocksCache, - mut next_height: u64, + cache: &Arc>, + cutoff_height: u64, ) -> eyre::Result<()> { let mut u_cache = cache.lock().await; @@ -206,22 +261,19 @@ impl HlNodeBlockSource { let mut file = File::open(&subfile).expect("Failed to open hour file path"); let last_line = read_last_complete_line(&mut file); if let Ok((_, height)) = line_to_evm_block(&last_line) { - if height < next_height { + if height < cutoff_height { continue; } } else { warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile); } - let ScanResult { next_expected_height, new_blocks } = - scan_hour_file(&subfile, &mut 0, next_height); - for blk in new_blocks { - let EvmBlock::Reth115(b) = &blk.block; - u_cache.insert(b.header.header.number, blk); - } - next_height = next_expected_height; + let mut scan_result = scan_hour_file(&subfile, &mut 0, cutoff_height); + // Only store the block ranges for now; actual block data will be loaded lazily later to optimize memory usage + scan_result.new_blocks.clear(); + u_cache.load_scan_result(scan_result); } - info!("Backfilled {} blocks", u_cache.len()); + info!("Backfilled {} blocks", u_cache.cache.len()); Ok(()) } @@ -251,16 +303,11 @@ impl HlNodeBlockSource { let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); if hour_file.exists() { - let ScanResult { next_expected_height, new_blocks } = - scan_hour_file(&hour_file, &mut last_line, next_height); - if !new_blocks.is_empty() { - let mut u_cache = cache.lock().await; - for blk in new_blocks { - let EvmBlock::Reth115(b) = &blk.block; - u_cache.insert(b.header.header.number, blk); - } - next_height = next_expected_height; - } + let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height); + next_height = scan_result.next_expected_height; + + let mut u_cache = cache.lock().await; + u_cache.load_scan_result(scan_result); } let now = OffsetDateTime::now_utc(); @@ -302,7 +349,7 @@ impl HlNodeBlockSource { let block_source = HlNodeBlockSource { fallback, local_ingest_dir, - local_blocks_cache: Arc::new(Mutex::new(HashMap::new())), + local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())), }; block_source.run(next_block_number).await.unwrap(); block_source @@ -318,4 +365,22 @@ mod tests { let dt = HlNodeBlockSource::datetime_from_path(path).unwrap(); println!("{:?}", dt); } + + #[tokio::test] + async fn test_backfill() { + let test_path = Path::new("/root/evm_block_and_receipts"); + if !test_path.exists() { + return; + } + + let cache = Arc::new(Mutex::new(LocalBlocksCache::new())); + HlNodeBlockSource::try_backfill_local_blocks(&test_path, &cache, 1000000).await.unwrap(); + + let u_cache = cache.lock().await; + println!("{:?}", u_cache.ranges); + assert_eq!( + u_cache.ranges.get(&9735058), + Some(&test_path.join(HOURLY_SUBDIR).join("20250729").join("22")) + ); + } } From 0180711ae4824d57db183ffde648ee2bc8cb2957 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Thu, 31 Jul 2025 01:56:39 -0400 Subject: [PATCH 09/14] perf: Constrain memory size again, add log --- src/pseudo_peer/sources/hl_node.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 2d7321986..f2c402296 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -1,5 +1,4 @@ use std::{ - collections::HashMap, fs::File, io::{BufRead, BufReader, Read, Seek, SeekFrom}, ops::RangeInclusive, @@ -8,6 +7,7 @@ use std::{ }; use futures::future::BoxFuture; +use reth_network::cache::LruMap; use rangemap::RangeInclusiveMap; use serde::Deserialize; use time::{macros::format_description, Date, Duration, OffsetDateTime, Time}; @@ -23,14 +23,20 @@ const HOURLY_SUBDIR: &str = "hourly"; #[derive(Debug)] pub struct LocalBlocksCache { - cache: HashMap, + cache: LruMap, // Lightweight range map to track the ranges of blocks in the local ingest directory ranges: RangeInclusiveMap, } impl LocalBlocksCache { + // 3660 blocks per hour + const CACHE_SIZE: u32 = 8000; + fn new() -> Self { - Self { cache: HashMap::new(), ranges: RangeInclusiveMap::new() } + Self { + cache: LruMap::new(Self::CACHE_SIZE), + ranges: RangeInclusiveMap::new(), + } } fn load_scan_result(&mut self, scan_result: ScanResult) { @@ -215,6 +221,7 @@ impl HlNodeBlockSource { return None; }; + info!("Loading block data from {:?}", path); u_cache.load_scan_result(scan_hour_file(&path, &mut 0, height)); u_cache.cache.get(&height).cloned() } From ff2e55b5a2621a2467f5ca3dcb0a454aa63feaad Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Fri, 1 Aug 2025 17:20:09 +0000 Subject: [PATCH 10/14] perf: Do not use cutoff when backfilling --- src/pseudo_peer/sources/hl_node.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index f2c402296..babf2fdbe 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -222,7 +222,7 @@ impl HlNodeBlockSource { }; info!("Loading block data from {:?}", path); - u_cache.load_scan_result(scan_hour_file(&path, &mut 0, height)); + u_cache.load_scan_result(scan_hour_file(&path, &mut 0, 0)); u_cache.cache.get(&height).cloned() } From 2d6b5e5cd207e5f1dc9ce3e2fd428e61ac2ff8eb Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Fri, 1 Aug 2025 17:28:31 +0000 Subject: [PATCH 11/14] chore: Improve log --- src/pseudo_peer/sources/hl_node.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index babf2fdbe..30bbe67c6 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -7,8 +7,8 @@ use std::{ }; use futures::future::BoxFuture; -use reth_network::cache::LruMap; use rangemap::RangeInclusiveMap; +use reth_network::cache::LruMap; use serde::Deserialize; use time::{macros::format_description, Date, Duration, OffsetDateTime, Time}; use tokio::sync::Mutex; @@ -33,10 +33,7 @@ impl LocalBlocksCache { const CACHE_SIZE: u32 = 8000; fn new() -> Self { - Self { - cache: LruMap::new(Self::CACHE_SIZE), - ranges: RangeInclusiveMap::new(), - } + Self { cache: LruMap::new(Self::CACHE_SIZE), ranges: RangeInclusiveMap::new() } } fn load_scan_result(&mut self, scan_result: ScanResult) { @@ -280,7 +277,19 @@ impl HlNodeBlockSource { scan_result.new_blocks.clear(); u_cache.load_scan_result(scan_result); } - info!("Backfilled {} blocks", u_cache.cache.len()); + + if u_cache.ranges.is_empty() { + warn!("No ranges found in {:?}", root); + } else { + let (min, _) = u_cache.ranges.first_range_value().unwrap(); + let (max, _) = u_cache.ranges.last_range_value().unwrap(); + info!( + "Populated {} ranges (min: {}, max: {})", + u_cache.ranges.len(), + min.start(), + max.end() + ); + } Ok(()) } From 77158aa164d8089f2e92bc82f1c9f13d7f2524a3 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Fri, 1 Aug 2025 17:30:33 +0000 Subject: [PATCH 12/14] perf: Do not allocate much when backfilling ranges --- src/pseudo_peer/sources/hl_node.rs | 31 +++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 30bbe67c6..157683c3d 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -74,10 +74,17 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> Ok((parsed_block, height)) } -fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult { +struct ScanOptions { + start_height: u64, + only_load_ranges: bool, +} + +fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult { let file = File::open(path).expect("Failed to open hour file path"); let reader = BufReader::new(file); + let ScanOptions { start_height, only_load_ranges } = options; + let mut new_blocks = Vec::new(); let mut last_height = start_height; let lines: Vec = reader.lines().collect::>().unwrap(); @@ -95,7 +102,9 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan Ok((parsed_block, height)) => { if height >= start_height { last_height = last_height.max(height); - new_blocks.push(parsed_block); + if !only_load_ranges { + new_blocks.push(parsed_block); + } *last_line = line_idx; } if matches!(current_range, Some((_, end)) if end + 1 == height) { @@ -219,7 +228,11 @@ impl HlNodeBlockSource { }; info!("Loading block data from {:?}", path); - u_cache.load_scan_result(scan_hour_file(&path, &mut 0, 0)); + u_cache.load_scan_result(scan_hour_file( + &path, + &mut 0, + ScanOptions { start_height: 0, only_load_ranges: false }, + )); u_cache.cache.get(&height).cloned() } @@ -272,7 +285,11 @@ impl HlNodeBlockSource { warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile); } - let mut scan_result = scan_hour_file(&subfile, &mut 0, cutoff_height); + let mut scan_result = scan_hour_file( + &subfile, + &mut 0, + ScanOptions { start_height: cutoff_height, only_load_ranges: true }, + ); // Only store the block ranges for now; actual block data will be loaded lazily later to optimize memory usage scan_result.new_blocks.clear(); u_cache.load_scan_result(scan_result); @@ -319,7 +336,11 @@ impl HlNodeBlockSource { let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); if hour_file.exists() { - let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height); + let scan_result = scan_hour_file( + &hour_file, + &mut last_line, + ScanOptions { start_height: next_height, only_load_ranges: false }, + ); next_height = scan_result.next_expected_height; let mut u_cache = cache.lock().await; From c0b3acf18167d6ff03492a3205bd17f4d8ccff1c Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Fri, 1 Aug 2025 17:37:29 +0000 Subject: [PATCH 13/14] perf: Reduce log --- src/pseudo_peer/sources/hl_node.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 157683c3d..0960552c2 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -143,9 +143,9 @@ impl BlockSource for HlNodeBlockSource { fn collect_block(&self, height: u64) -> BoxFuture> { Box::pin(async move { if let Some(block) = self.try_collect_local_block(height).await { - info!("Returning locally synced block for @ Height [{height}]"); Ok(block) } else { + info!("Falling back to s3/ingest-dir for block @ Height [{height}]"); self.fallback.collect_block(height).await } }) From c27e5e5a64cf1207c0fa3ebf79d869ca56ebae81 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Sat, 2 Aug 2025 17:10:30 -0400 Subject: [PATCH 14/14] fix/perf: Fix last line scanner, wait 0.5s before fallback if it's more recent, add tests --- Cargo.lock | 1 + Cargo.toml | 4 +- src/node/types/mod.rs | 12 +- src/pseudo_peer/sources/hl_node.rs | 267 ++++++++++++++++++++++++++--- 4 files changed, 251 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee974619d..5f21ab403 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9357,6 +9357,7 @@ dependencies = [ "rmp-serde", "serde", "serde_json", + "tempfile", "thiserror 2.0.12", "tikv-jemalloc-ctl", "tikv-jemallocator", diff --git a/Cargo.toml b/Cargo.toml index 305aca442..833905ed6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -164,5 +164,5 @@ client = [ "reth-rpc-eth-api/client", ] -[profile.test] -inherits = "release" +[dev-dependencies] +tempfile = "3.20.0" diff --git a/src/node/types/mod.rs b/src/node/types/mod.rs index 93f9c072e..f4b755788 100644 --- a/src/node/types/mod.rs +++ b/src/node/types/mod.rs @@ -14,7 +14,7 @@ pub type ReadPrecompileCall = (Address, Vec<(ReadPrecompileInput, ReadPrecompile #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)] pub struct ReadPrecompileCalls(pub Vec); -mod reth_compat; +pub(crate) mod reth_compat; #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct HlExtras { @@ -38,7 +38,7 @@ impl Decodable for ReadPrecompileCalls { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct BlockAndReceipts { pub block: EvmBlock, pub receipts: Vec, @@ -71,12 +71,12 @@ impl BlockAndReceipts { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub enum EvmBlock { Reth115(reth_compat::SealedBlock), } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct LegacyReceipt { tx_type: LegacyTxType, success: bool, @@ -84,7 +84,7 @@ pub struct LegacyReceipt { logs: Vec, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] enum LegacyTxType { Legacy = 0, Eip2930 = 1, @@ -93,7 +93,7 @@ enum LegacyTxType { Eip7702 = 4, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct SystemTx { pub tx: reth_compat::Transaction, pub receipt: Option, diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 0960552c2..96888f8c7 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -9,7 +9,7 @@ use std::{ use futures::future::BoxFuture; use rangemap::RangeInclusiveMap; use reth_network::cache::LruMap; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use time::{macros::format_description, Date, Duration, OffsetDateTime, Time}; use tokio::sync::Mutex; use tracing::{info, warn}; @@ -47,15 +47,7 @@ impl LocalBlocksCache { } } -/// Block source that monitors the local ingest directory for the HL node. -#[derive(Debug, Clone)] -pub struct HlNodeBlockSource { - pub fallback: BlockSourceBoxed, - pub local_ingest_dir: PathBuf, - pub local_blocks_cache: Arc>, // height → block -} - -#[derive(Deserialize)] +#[derive(Serialize, Deserialize, Debug, Clone)] struct LocalBlockAndReceipts(String, BlockAndReceipts); struct ScanResult { @@ -139,14 +131,38 @@ fn date_from_datetime(dt: OffsetDateTime) -> String { dt.format(&format_description!("[year][month][day]")).unwrap() } +/// Block source that monitors the local ingest directory for the HL node. +#[derive(Debug, Clone)] +pub struct HlNodeBlockSource { + pub fallback: BlockSourceBoxed, + pub local_ingest_dir: PathBuf, + pub local_blocks_cache: Arc>, // height → block + pub last_local_fetch: Arc>>, // for rate limiting requests to fallback +} + impl BlockSource for HlNodeBlockSource { fn collect_block(&self, height: u64) -> BoxFuture> { Box::pin(async move { + let now = OffsetDateTime::now_utc(); if let Some(block) = self.try_collect_local_block(height).await { + self.update_last_fetch(height, now).await; Ok(block) } else { + if let Some((last_height, last_poll_time)) = *self.last_local_fetch.lock().await { + let more_recent = last_height < height; + let too_soon = + now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK; + if more_recent && too_soon { + return Err(eyre::eyre!( + "Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up" + )); + } + } + info!("Falling back to s3/ingest-dir for block @ Height [{height}]"); - self.fallback.collect_block(height).await + let block = self.fallback.collect_block(height).await?; + self.update_last_fetch(height, now).await; + Ok(block) } }) } @@ -161,17 +177,16 @@ impl BlockSource for HlNodeBlockSource { return self.fallback.find_latest_block_number().await; }; let mut file = File::open(&dir).expect("Failed to open hour file path"); - let last_line = read_last_complete_line(&mut file); - let Ok((_, height)) = line_to_evm_block(&last_line) else { + if let Some((_, height)) = read_last_complete_line(&mut file) { + info!("Latest block number: {} with path {}", height, dir.display()); + Some(height) + } else { warn!( "Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir", file ); - return self.fallback.find_latest_block_number().await; - }; - - info!("Latest block number: {} with path {}", height, dir.display()); - Some(height) + self.fallback.find_latest_block_number().await + } }) } @@ -180,8 +195,8 @@ impl BlockSource for HlNodeBlockSource { } } -fn read_last_complete_line(read: &mut R) -> String { - const CHUNK_SIZE: u64 = 4096; +fn read_last_complete_line(read: &mut R) -> Option<(BlockAndReceipts, u64)> { + const CHUNK_SIZE: u64 = 50000; let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); let mut pos = read.seek(SeekFrom::End(0)).unwrap(); let mut last_line: Vec = Vec::new(); @@ -201,9 +216,12 @@ fn read_last_complete_line(read: &mut R) -> String { if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { let candidate = &last_line[idx + 1..]; - if line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap()).is_ok() { - return String::from_utf8(candidate.to_vec()).unwrap(); + if let Ok((evm_block, height)) = + line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap()) + { + return Some((evm_block, height)); } + // Incomplete line; truncate and continue last_line.truncate(idx); } @@ -213,10 +231,31 @@ fn read_last_complete_line(read: &mut R) -> String { pos -= read_size; } - String::from_utf8(last_line).unwrap() + println!( + "last_line: {:?} {:?}", + String::from_utf8(last_line.clone()).unwrap(), + line_to_evm_block(&String::from_utf8(last_line.clone()).unwrap()) + ); + line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok() } impl HlNodeBlockSource { + /// [HlNodeBlockSource] picks the faster one between local ingest directory and s3/ingest-dir. + /// But if we immediately fallback to s3/ingest-dir, in case of S3, it may cause unnecessary + /// requests to S3 while it'll return 404. + /// + /// So we allow a small threshold to avoid unnecessary fallback. + pub(crate) const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(500); + + async fn update_last_fetch(&self, height: u64, now: OffsetDateTime) { + if let Some((last_height, _)) = *self.last_local_fetch.lock().await { + if last_height >= height { + return; + } + } + *self.last_local_fetch.lock().await = Some((height, now)); + } + async fn try_collect_local_block(&self, height: u64) -> Option { let mut u_cache = self.local_blocks_cache.lock().await; if let Some(block) = u_cache.cache.remove(&height) { @@ -276,8 +315,9 @@ impl HlNodeBlockSource { for subfile in Self::all_hourly_files(root).unwrap_or_default() { let mut file = File::open(&subfile).expect("Failed to open hour file path"); - let last_line = read_last_complete_line(&mut file); - if let Ok((_, height)) = line_to_evm_block(&last_line) { + println!("subfile: {:?} {:?}", subfile, read_last_complete_line(&mut file)); + let mut file = File::open(&subfile).expect("Failed to open hour file path"); + if let Some((_, height)) = read_last_complete_line(&mut file) { if height < cutoff_height { continue; } @@ -387,6 +427,7 @@ impl HlNodeBlockSource { fallback, local_ingest_dir, local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())), + last_local_fetch: Arc::new(Mutex::new(None)), }; block_source.run(next_block_number).await.unwrap(); block_source @@ -395,7 +436,12 @@ impl HlNodeBlockSource { #[cfg(test)] mod tests { + use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256}; + use super::*; + use crate::node::types::ReadPrecompileCalls; + use crate::pseudo_peer::sources::LocalBlockSource; + #[test] fn test_datetime_from_path() { let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4"); @@ -420,4 +466,175 @@ mod tests { Some(&test_path.join(HOURLY_SUBDIR).join("20250729").join("22")) ); } + + use std::io::Write; + use std::time::Duration; + + use crate::node::types::reth_compat; + use alloy_consensus::{BlockBody, Header}; + + fn scan_result_from_single_block(block: BlockAndReceipts) -> ScanResult { + let height = match &block.block { + EvmBlock::Reth115(b) => b.header.header.number, + }; + ScanResult { + path: PathBuf::from("/nonexistent-block"), + next_expected_height: height + 1, + new_blocks: vec![block], + new_block_ranges: vec![height..=height], + } + } + + fn empty_block( + number: u64, + timestamp: u64, + extra_data: &'static [u8], + ) -> LocalBlockAndReceipts { + let extra_data = Bytes::from_static(extra_data); + let res = BlockAndReceipts { + block: EvmBlock::Reth115(reth_compat::SealedBlock { + header: reth_compat::SealedHeader { + header: Header { + parent_hash: B256::ZERO, + ommers_hash: B256::ZERO, + beneficiary: Address::ZERO, + state_root: B256::ZERO, + transactions_root: B256::ZERO, + receipts_root: B256::ZERO, + logs_bloom: Bloom::ZERO, + difficulty: U256::ZERO, + number, + gas_limit: 0, + gas_used: 0, + timestamp, + extra_data, + mix_hash: B256::ZERO, + nonce: B64::ZERO, + base_fee_per_gas: None, + withdrawals_root: None, + blob_gas_used: None, + excess_blob_gas: None, + parent_beacon_block_root: None, + requests_hash: None, + }, + hash: B256::ZERO, + }, + body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None }, + }), + receipts: vec![], + system_txs: vec![], + read_precompile_calls: ReadPrecompileCalls(vec![]), + highest_precompile_address: None, + }; + LocalBlockAndReceipts(timestamp.to_string(), res) + } + + fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, File)> { + let now = OffsetDateTime::now_utc(); + let day_str = date_from_datetime(now); + let hour = now.hour(); + + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path().join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); + std::fs::create_dir_all(path.parent().unwrap())?; + + Ok((temp_dir, File::create(path)?)) + } + + struct BlockSourceHierarchy { + block_source: HlNodeBlockSource, + _temp_dir: tempfile::TempDir, + file1: File, + current_block: LocalBlockAndReceipts, + future_block_hl_node: LocalBlockAndReceipts, + future_block_fallback: LocalBlockAndReceipts, + } + + async fn setup_block_source_hierarchy() -> eyre::Result { + // Setup fallback block source + let block_source_fallback = HlNodeBlockSource::new( + BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))), + PathBuf::from("/nonexistent"), + 1000000, + ) + .await; + let block_hl_node_0 = empty_block(1000000, 1722633600, b"hl-node"); + let block_hl_node_1 = empty_block(1000001, 1722633600, b"hl-node"); + let block_fallback_1 = empty_block(1000001, 1722633600, b"fallback"); + + let (temp_dir1, mut file1) = setup_temp_dir_and_file()?; + writeln!(&mut file1, "{}", serde_json::to_string(&block_hl_node_0)?)?; + + let block_source = HlNodeBlockSource::new( + BlockSourceBoxed::new(Box::new(block_source_fallback.clone())), + temp_dir1.path().to_path_buf(), + 1000000, + ) + .await; + + block_source_fallback + .local_blocks_cache + .lock() + .await + .load_scan_result(scan_result_from_single_block(block_fallback_1.1.clone())); + + Ok(BlockSourceHierarchy { + block_source, + _temp_dir: temp_dir1, + file1, + current_block: block_hl_node_0, + future_block_hl_node: block_hl_node_1, + future_block_fallback: block_fallback_1, + }) + } + + #[tokio::test] + async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> { + let hierarchy = setup_block_source_hierarchy().await?; + let BlockSourceHierarchy { + block_source, + current_block, + future_block_hl_node, + mut file1, + .. + } = hierarchy; + + let block = block_source.collect_block(1000000).await.unwrap(); + assert_eq!(block, current_block.1); + + let block = block_source.collect_block(1000001).await; + assert!(block.is_err()); + + writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?; + tokio::time::sleep(Duration::from_millis(100)).await; + + let block = block_source.collect_block(1000001).await.unwrap(); + assert_eq!(block, future_block_hl_node.1); + + Ok(()) + } + + #[tokio::test] + async fn test_update_last_fetch_fallback() -> eyre::Result<()> { + let hierarchy = setup_block_source_hierarchy().await?; + let BlockSourceHierarchy { + block_source, + current_block, + future_block_fallback, + mut file1, + .. + } = hierarchy; + + let block = block_source.collect_block(1000000).await.unwrap(); + assert_eq!(block, current_block.1); + + tokio::time::sleep(HlNodeBlockSource::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs()) + .await; + + writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?; + let block = block_source.collect_block(1000001).await.unwrap(); + assert_eq!(block, future_block_fallback.1); + + Ok(()) + } }