diff --git a/Cargo.lock b/Cargo.lock index 30b391f69..5f21ab403 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6504,6 +6504,12 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rangemap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f93e7e49bb0bf967717f7bd674458b3d6b0c5f48ec7e3038166026a69fc22223" + [[package]] name = "ratatui" version = "0.29.0" @@ -9308,6 +9314,7 @@ dependencies = [ "lz4_flex", "once_cell", "parking_lot", + "rangemap", "rayon", "reth", "reth-basic-payload-builder", @@ -9341,6 +9348,7 @@ dependencies = [ "reth-rpc", "reth-rpc-engine-api", "reth-rpc-eth-api", + "reth-stages-types", "reth-tracing", "reth-transaction-pool", "reth-trie-common", @@ -9349,6 +9357,7 @@ dependencies = [ "rmp-serde", "serde", "serde_json", + "tempfile", "thiserror 2.0.12", "tikv-jemalloc-ctl", "tikv-jemallocator", diff --git a/Cargo.toml b/Cargo.toml index a45fba461..833905ed6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ reth-trie-common = { git = "https://github.com/sprites0/reth", rev = "fc754e5983 reth-trie-db = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-codecs = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-transaction-pool = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } +reth-stages-types = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } revm = { version = "26.0.1" } # alloy dependencies @@ -111,6 +112,7 @@ aws-sdk-s3 = "1.93.0" aws-config = "1.8.0" rayon = "1.7" time = "0.3.41" +rangemap = "=1.6.0" [target.'cfg(unix)'.dependencies] @@ -161,3 +163,6 @@ client = [ "jsonrpsee/async-client", "reth-rpc-eth-api/client", ] + +[dev-dependencies] +tempfile = "3.20.0" diff --git a/src/node/network/mod.rs b/src/node/network/mod.rs index 75b8f689a..5028bbafc 100644 --- a/src/node/network/mod.rs +++ b/src/node/network/mod.rs @@ -24,6 +24,8 @@ use reth_eth_wire::{BasicNetworkPrimitives, NewBlock, NewBlockPayload}; use reth_ethereum_primitives::PooledTransactionVariant; use reth_network::{NetworkConfig, NetworkHandle, NetworkManager}; use reth_network_api::PeersInfo; +use reth_provider::StageCheckpointReader; +use reth_stages_types::StageId; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex}; use tracing::info; @@ -235,8 +237,13 @@ where let chain_spec = ctx.chain_spec(); info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized"); + let next_block_number = + ctx.provider().get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number + + 1; + ctx.task_executor().spawn_critical("pseudo peer", async move { - let block_source = block_source_config.create_cached_block_source().await; + let block_source = + block_source_config.create_cached_block_source(next_block_number).await; start_pseudo_peer(chain_spec, local_node_record.to_string(), block_source) .await .unwrap(); diff --git a/src/node/types/mod.rs b/src/node/types/mod.rs index 93f9c072e..f4b755788 100644 --- a/src/node/types/mod.rs +++ b/src/node/types/mod.rs @@ -14,7 +14,7 @@ pub type ReadPrecompileCall = (Address, Vec<(ReadPrecompileInput, ReadPrecompile #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)] pub struct ReadPrecompileCalls(pub Vec); -mod reth_compat; +pub(crate) mod reth_compat; #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct HlExtras { @@ -38,7 +38,7 @@ impl Decodable for ReadPrecompileCalls { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct BlockAndReceipts { pub block: EvmBlock, pub receipts: Vec, @@ -71,12 +71,12 @@ impl BlockAndReceipts { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub enum EvmBlock { Reth115(reth_compat::SealedBlock), } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct LegacyReceipt { tx_type: LegacyTxType, success: bool, @@ -84,7 +84,7 @@ pub struct LegacyReceipt { logs: Vec, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] enum LegacyTxType { Legacy = 0, Eip2930 = 1, @@ -93,7 +93,7 @@ enum LegacyTxType { Eip7702 = 4, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct SystemTx { pub tx: reth_compat::Transaction, pub receipt: Option, diff --git a/src/pseudo_peer/config.rs b/src/pseudo_peer/config.rs index e12d3636b..594e0a9fd 100644 --- a/src/pseudo_peer/config.rs +++ b/src/pseudo_peer/config.rs @@ -72,6 +72,7 @@ impl BlockSourceConfig { pub async fn create_block_source_from_node( &self, + next_block_number: u64, fallback_block_source: BlockSourceBoxed, ) -> BlockSourceBoxed { let Some(block_source_from_node) = self.block_source_from_node.as_ref() else { @@ -82,14 +83,15 @@ impl BlockSourceConfig { HlNodeBlockSource::new( fallback_block_source, PathBuf::from(block_source_from_node.clone()), + next_block_number, ) .await, )) } - pub async fn create_cached_block_source(&self) -> BlockSourceBoxed { + pub async fn create_cached_block_source(&self, next_block_number: u64) -> BlockSourceBoxed { let block_source = self.create_block_source().await; - let block_source = self.create_block_source_from_node(block_source).await; + let block_source = self.create_block_source_from_node(next_block_number, block_source).await; Arc::new(Box::new(CachedBlockSource::new(block_source))) } } diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 07cafe3b7..96888f8c7 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -1,123 +1,193 @@ use std::{ - io::{BufRead, BufReader}, + fs::File, + io::{BufRead, BufReader, Read, Seek, SeekFrom}, + ops::RangeInclusive, path::{Path, PathBuf}, sync::Arc, }; -use eyre::ContextCompat; use futures::future::BoxFuture; +use rangemap::RangeInclusiveMap; use reth_network::cache::LruMap; -use serde::Deserialize; -use time::{format_description, Duration, OffsetDateTime}; +use serde::{Deserialize, Serialize}; +use time::{macros::format_description, Date, Duration, OffsetDateTime, Time}; use tokio::sync::Mutex; -use tracing::info; +use tracing::{info, warn}; use crate::node::types::{BlockAndReceipts, EvmBlock}; use super::{BlockSource, BlockSourceBoxed}; -/// Poll interval when tailing an *open* hourly file. const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); -/// Sub‑directory that contains day folders (inside `local_ingest_dir`). const HOURLY_SUBDIR: &str = "hourly"; -/// Maximum number of blocks to cache blocks from hl-node. -/// In normal situation, 0~1 blocks will be cached. -const CACHE_SIZE: u32 = 1000; -/// Block source that monitors the local ingest directory for the HL node. -/// -/// In certain situations, the [hl-node][ref] may offer lower latency compared to S3. -/// This block source caches blocks from the HL node to minimize latency, -/// while still falling back to [super::LocalBlockSource] or [super::S3BlockSource] when needed. -/// -/// Originally introduced in https://github.com/hl-archive-node/nanoreth/pull/7 -/// -/// [ref]: https://github.com/hyperliquid-dex/node -#[derive(Debug, Clone)] -pub struct HlNodeBlockSource { - pub fallback: BlockSourceBoxed, - pub local_ingest_dir: PathBuf, - pub local_blocks_cache: Arc>>, // height → block +#[derive(Debug)] +pub struct LocalBlocksCache { + cache: LruMap, + // Lightweight range map to track the ranges of blocks in the local ingest directory + ranges: RangeInclusiveMap, } -#[derive(Deserialize)] +impl LocalBlocksCache { + // 3660 blocks per hour + const CACHE_SIZE: u32 = 8000; + + fn new() -> Self { + Self { cache: LruMap::new(Self::CACHE_SIZE), ranges: RangeInclusiveMap::new() } + } + + fn load_scan_result(&mut self, scan_result: ScanResult) { + for blk in scan_result.new_blocks { + let EvmBlock::Reth115(b) = &blk.block; + self.cache.insert(b.header.header.number, blk); + } + for range in scan_result.new_block_ranges { + self.ranges.insert(range, scan_result.path.clone()); + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] struct LocalBlockAndReceipts(String, BlockAndReceipts); struct ScanResult { + path: PathBuf, next_expected_height: u64, new_blocks: Vec, + new_block_ranges: Vec>, } -fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult { - // info!( - // "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}", - // path, start_height, last_line - // ); - let file = std::fs::File::open(path).expect("Failed to open hour file path"); +fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { + let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = + serde_json::from_str(line)?; + let height = match &parsed_block.block { + EvmBlock::Reth115(b) => b.header.header.number, + }; + Ok((parsed_block, height)) +} + +struct ScanOptions { + start_height: u64, + only_load_ranges: bool, +} + +fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult { + let file = File::open(path).expect("Failed to open hour file path"); let reader = BufReader::new(file); - let mut new_blocks = Vec::::new(); + let ScanOptions { start_height, only_load_ranges } = options; + + let mut new_blocks = Vec::new(); let mut last_height = start_height; let lines: Vec = reader.lines().collect::>().unwrap(); let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; + let mut block_ranges: Vec> = Vec::new(); + let mut current_range: Option<(u64, u64)> = None; + for (line_idx, line) in lines.iter().enumerate().skip(skip) { - // Safety check ensuring efficiency - if line_idx < *last_line { - continue; - } - if line.trim().is_empty() { + if line_idx < *last_line || line.trim().is_empty() { continue; } - let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = - serde_json::from_str(line).expect("Failed to parse local block and receipts"); - - let height = match &parsed_block.block { - EvmBlock::Reth115(b) => { - let block_number = b.header.header.number; - // Another check to ensure not returning an older block - if block_number < start_height { - continue; + match line_to_evm_block(line) { + Ok((parsed_block, height)) => { + if height >= start_height { + last_height = last_height.max(height); + if !only_load_ranges { + new_blocks.push(parsed_block); + } + *last_line = line_idx; + } + if matches!(current_range, Some((_, end)) if end + 1 == height) { + current_range = Some((current_range.unwrap().0, height)); + } else { + if let Some((start, end)) = current_range.take() { + block_ranges.push(start..=end); + } + current_range = Some((height, height)); } - block_number } - }; - // println!("Iterating block height {:?} | Line {}", height, line_idx); - if height >= start_height { - last_height = last_height.max(height); - new_blocks.push(parsed_block); - *last_line = line_idx; + Err(_) => { + warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); + continue; + } } } - ScanResult { next_expected_height: last_height + 1, new_blocks } -} + if let Some((start, end)) = current_range.take() { + block_ranges.push(start..=end); + } -fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime { - OffsetDateTime::from_unix_timestamp_nanos((ts_sec as i128) * 1_000 * 1_000_000) - .expect("timestamp out of range") + ScanResult { + path: path.to_path_buf(), + next_expected_height: last_height + 1, + new_blocks, + new_block_ranges: block_ranges, + } } fn date_from_datetime(dt: OffsetDateTime) -> String { - dt.format(&format_description::parse("[year][month][day]").unwrap()).unwrap() + dt.format(&format_description!("[year][month][day]")).unwrap() +} + +/// Block source that monitors the local ingest directory for the HL node. +#[derive(Debug, Clone)] +pub struct HlNodeBlockSource { + pub fallback: BlockSourceBoxed, + pub local_ingest_dir: PathBuf, + pub local_blocks_cache: Arc>, // height → block + pub last_local_fetch: Arc>>, // for rate limiting requests to fallback } impl BlockSource for HlNodeBlockSource { fn collect_block(&self, height: u64) -> BoxFuture> { Box::pin(async move { - // Not a one liner (using .or) to include logs + let now = OffsetDateTime::now_utc(); if let Some(block) = self.try_collect_local_block(height).await { - info!("Returning locally synced block for @ Height [{height}]"); + self.update_last_fetch(height, now).await; Ok(block) } else { - self.fallback.collect_block(height).await + if let Some((last_height, last_poll_time)) = *self.last_local_fetch.lock().await { + let more_recent = last_height < height; + let too_soon = + now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK; + if more_recent && too_soon { + return Err(eyre::eyre!( + "Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up" + )); + } + } + + info!("Falling back to s3/ingest-dir for block @ Height [{height}]"); + let block = self.fallback.collect_block(height).await?; + self.update_last_fetch(height, now).await; + Ok(block) } }) } - fn find_latest_block_number(&self) -> futures::future::BoxFuture> { - self.fallback.find_latest_block_number() + fn find_latest_block_number(&self) -> BoxFuture> { + Box::pin(async move { + let Some(dir) = Self::find_latest_hourly_file(&self.local_ingest_dir) else { + warn!( + "No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir", + self.local_ingest_dir + ); + return self.fallback.find_latest_block_number().await; + }; + let mut file = File::open(&dir).expect("Failed to open hour file path"); + if let Some((_, height)) = read_last_complete_line(&mut file) { + info!("Latest block number: {} with path {}", height, dir.display()); + Some(height) + } else { + warn!( + "Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir", + file + ); + self.fallback.find_latest_block_number().await + } + }) } fn recommended_chunk_size(&self) -> u64 { @@ -125,23 +195,176 @@ impl BlockSource for HlNodeBlockSource { } } -fn to_hourly(dt: OffsetDateTime) -> Result { - dt.replace_minute(0)?.replace_second(0)?.replace_nanosecond(0) +fn read_last_complete_line(read: &mut R) -> Option<(BlockAndReceipts, u64)> { + const CHUNK_SIZE: u64 = 50000; + let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); + let mut pos = read.seek(SeekFrom::End(0)).unwrap(); + let mut last_line: Vec = Vec::new(); + + while pos > 0 { + let read_size = std::cmp::min(pos, CHUNK_SIZE); + buf.resize(read_size as usize, 0); + + read.seek(SeekFrom::Start(pos - read_size)).unwrap(); + read.read_exact(&mut buf).unwrap(); + + last_line = [buf.clone(), last_line].concat(); + + if last_line.ends_with(b"\n") { + last_line.pop(); + } + + if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { + let candidate = &last_line[idx + 1..]; + if let Ok((evm_block, height)) = + line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap()) + { + return Some((evm_block, height)); + } + // Incomplete line; truncate and continue + last_line.truncate(idx); + } + + if pos < read_size { + break; + } + pos -= read_size; + } + + println!( + "last_line: {:?} {:?}", + String::from_utf8(last_line.clone()).unwrap(), + line_to_evm_block(&String::from_utf8(last_line.clone()).unwrap()) + ); + line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok() } impl HlNodeBlockSource { - async fn try_collect_local_block(&self, height: u64) -> Option { - let mut u_cache = self.local_blocks_cache.lock().await; - u_cache.remove(&height) + /// [HlNodeBlockSource] picks the faster one between local ingest directory and s3/ingest-dir. + /// But if we immediately fallback to s3/ingest-dir, in case of S3, it may cause unnecessary + /// requests to S3 while it'll return 404. + /// + /// So we allow a small threshold to avoid unnecessary fallback. + pub(crate) const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(500); + + async fn update_last_fetch(&self, height: u64, now: OffsetDateTime) { + if let Some((last_height, _)) = *self.last_local_fetch.lock().await { + if last_height >= height { + return; + } + } + *self.last_local_fetch.lock().await = Some((height, now)); } - async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) { + async fn try_collect_local_block(&self, height: u64) -> Option { + let mut u_cache = self.local_blocks_cache.lock().await; + if let Some(block) = u_cache.cache.remove(&height) { + return Some(block); + } + + let Some(path) = u_cache.ranges.get(&height).cloned() else { + return None; + }; + + info!("Loading block data from {:?}", path); + u_cache.load_scan_result(scan_hour_file( + &path, + &mut 0, + ScanOptions { start_height: 0, only_load_ranges: false }, + )); + u_cache.cache.get(&height).cloned() + } + + fn datetime_from_path(path: &Path) -> Option { + let dt_part = path.parent()?.file_name()?.to_str()?; + let hour_part = path.file_name()?.to_str()?; + + let hour: u8 = hour_part.parse().ok()?; + Some(OffsetDateTime::new_utc( + Date::parse(&format!("{dt_part}"), &format_description!("[year][month][day]")).ok()?, + Time::from_hms(hour, 0, 0).ok()?, + )) + } + + fn all_hourly_files(root: &Path) -> Option> { + let dir = root.join(HOURLY_SUBDIR); + let mut files = Vec::new(); + for entry in std::fs::read_dir(dir).ok()? { + let file = entry.ok()?.path(); + let subfiles: Vec<_> = std::fs::read_dir(&file) + .ok()? + .filter_map(|f| f.ok().map(|f| f.path())) + .filter(|p| Self::datetime_from_path(p).is_some()) + .collect(); + files.extend(subfiles); + } + files.sort(); + Some(files) + } + + fn find_latest_hourly_file(root: &Path) -> Option { + Self::all_hourly_files(root)?.last().cloned() + } + + async fn try_backfill_local_blocks( + root: &Path, + cache: &Arc>, + cutoff_height: u64, + ) -> eyre::Result<()> { + let mut u_cache = cache.lock().await; + + for subfile in Self::all_hourly_files(root).unwrap_or_default() { + let mut file = File::open(&subfile).expect("Failed to open hour file path"); + println!("subfile: {:?} {:?}", subfile, read_last_complete_line(&mut file)); + let mut file = File::open(&subfile).expect("Failed to open hour file path"); + if let Some((_, height)) = read_last_complete_line(&mut file) { + if height < cutoff_height { + continue; + } + } else { + warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile); + } + + let mut scan_result = scan_hour_file( + &subfile, + &mut 0, + ScanOptions { start_height: cutoff_height, only_load_ranges: true }, + ); + // Only store the block ranges for now; actual block data will be loaded lazily later to optimize memory usage + scan_result.new_blocks.clear(); + u_cache.load_scan_result(scan_result); + } + + if u_cache.ranges.is_empty() { + warn!("No ranges found in {:?}", root); + } else { + let (min, _) = u_cache.ranges.first_range_value().unwrap(); + let (max, _) = u_cache.ranges.last_range_value().unwrap(); + info!( + "Populated {} ranges (min: {}, max: {})", + u_cache.ranges.len(), + min.start(), + max.end() + ); + } + + Ok(()) + } + + async fn start_local_ingest_loop(&self, current_head: u64) { let root = self.local_ingest_dir.to_owned(); let cache = self.local_blocks_cache.clone(); tokio::spawn(async move { let mut next_height = current_head; - let mut dt = to_hourly(datetime_from_timestamp(current_ts)).unwrap(); + + // Wait for the first hourly file to be created + let mut dt = loop { + if let Some(latest_file) = Self::find_latest_hourly_file(&root) { + break Self::datetime_from_path(&latest_file).unwrap(); + } + tokio::time::sleep(TAIL_INTERVAL).await; + }; let mut hour = dt.hour(); let mut day_str = date_from_datetime(dt); @@ -153,26 +376,19 @@ impl HlNodeBlockSource { let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); if hour_file.exists() { - let ScanResult { next_expected_height, new_blocks } = - scan_hour_file(&hour_file, &mut last_line, next_height); - if !new_blocks.is_empty() { - let mut u_cache = cache.lock().await; - for blk in new_blocks { - let EvmBlock::Reth115(b) = &blk.block; - u_cache.insert(b.header.header.number, blk); - } - next_height = next_expected_height; - } + let scan_result = scan_hour_file( + &hour_file, + &mut last_line, + ScanOptions { start_height: next_height, only_load_ranges: false }, + ); + next_height = scan_result.next_expected_height; + + let mut u_cache = cache.lock().await; + u_cache.load_scan_result(scan_result); } - // Decide whether the *current* hour file is closed (past) or - // still live. If it’s in the past by > 1 h, move to next hour; - // otherwise, keep tailing the same file. let now = OffsetDateTime::now_utc(); - // println!("Date Current {:?}", dt); - // println!("Now Current {:?}", now); - if dt + Duration::HOUR < now { dt += Duration::HOUR; hour = dt.hour(); @@ -190,29 +406,235 @@ impl HlNodeBlockSource { }); } - pub(crate) async fn run(&self) -> eyre::Result<()> { - let latest_block_number = self - .fallback - .find_latest_block_number() - .await - .context("Failed to find latest block number")?; + pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> { + let _ = Self::try_backfill_local_blocks( + &self.local_ingest_dir, + &self.local_blocks_cache, + next_block_number, + ) + .await; - let EvmBlock::Reth115(latest_block) = - self.fallback.collect_block(latest_block_number).await?.block; - - let latest_block_ts = latest_block.header.header.timestamp; - - self.start_local_ingest_loop(latest_block_number, latest_block_ts).await; + self.start_local_ingest_loop(next_block_number).await; Ok(()) } - pub async fn new(fallback: BlockSourceBoxed, local_ingest_dir: PathBuf) -> Self { + pub async fn new( + fallback: BlockSourceBoxed, + local_ingest_dir: PathBuf, + next_block_number: u64, + ) -> Self { let block_source = HlNodeBlockSource { fallback, local_ingest_dir, - local_blocks_cache: Arc::new(Mutex::new(LruMap::new(CACHE_SIZE))), + local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())), + last_local_fetch: Arc::new(Mutex::new(None)), }; - block_source.run().await.unwrap(); + block_source.run(next_block_number).await.unwrap(); block_source } } + +#[cfg(test)] +mod tests { + use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256}; + + use super::*; + use crate::node::types::ReadPrecompileCalls; + use crate::pseudo_peer::sources::LocalBlockSource; + + #[test] + fn test_datetime_from_path() { + let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4"); + let dt = HlNodeBlockSource::datetime_from_path(path).unwrap(); + println!("{:?}", dt); + } + + #[tokio::test] + async fn test_backfill() { + let test_path = Path::new("/root/evm_block_and_receipts"); + if !test_path.exists() { + return; + } + + let cache = Arc::new(Mutex::new(LocalBlocksCache::new())); + HlNodeBlockSource::try_backfill_local_blocks(&test_path, &cache, 1000000).await.unwrap(); + + let u_cache = cache.lock().await; + println!("{:?}", u_cache.ranges); + assert_eq!( + u_cache.ranges.get(&9735058), + Some(&test_path.join(HOURLY_SUBDIR).join("20250729").join("22")) + ); + } + + use std::io::Write; + use std::time::Duration; + + use crate::node::types::reth_compat; + use alloy_consensus::{BlockBody, Header}; + + fn scan_result_from_single_block(block: BlockAndReceipts) -> ScanResult { + let height = match &block.block { + EvmBlock::Reth115(b) => b.header.header.number, + }; + ScanResult { + path: PathBuf::from("/nonexistent-block"), + next_expected_height: height + 1, + new_blocks: vec![block], + new_block_ranges: vec![height..=height], + } + } + + fn empty_block( + number: u64, + timestamp: u64, + extra_data: &'static [u8], + ) -> LocalBlockAndReceipts { + let extra_data = Bytes::from_static(extra_data); + let res = BlockAndReceipts { + block: EvmBlock::Reth115(reth_compat::SealedBlock { + header: reth_compat::SealedHeader { + header: Header { + parent_hash: B256::ZERO, + ommers_hash: B256::ZERO, + beneficiary: Address::ZERO, + state_root: B256::ZERO, + transactions_root: B256::ZERO, + receipts_root: B256::ZERO, + logs_bloom: Bloom::ZERO, + difficulty: U256::ZERO, + number, + gas_limit: 0, + gas_used: 0, + timestamp, + extra_data, + mix_hash: B256::ZERO, + nonce: B64::ZERO, + base_fee_per_gas: None, + withdrawals_root: None, + blob_gas_used: None, + excess_blob_gas: None, + parent_beacon_block_root: None, + requests_hash: None, + }, + hash: B256::ZERO, + }, + body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None }, + }), + receipts: vec![], + system_txs: vec![], + read_precompile_calls: ReadPrecompileCalls(vec![]), + highest_precompile_address: None, + }; + LocalBlockAndReceipts(timestamp.to_string(), res) + } + + fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, File)> { + let now = OffsetDateTime::now_utc(); + let day_str = date_from_datetime(now); + let hour = now.hour(); + + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path().join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); + std::fs::create_dir_all(path.parent().unwrap())?; + + Ok((temp_dir, File::create(path)?)) + } + + struct BlockSourceHierarchy { + block_source: HlNodeBlockSource, + _temp_dir: tempfile::TempDir, + file1: File, + current_block: LocalBlockAndReceipts, + future_block_hl_node: LocalBlockAndReceipts, + future_block_fallback: LocalBlockAndReceipts, + } + + async fn setup_block_source_hierarchy() -> eyre::Result { + // Setup fallback block source + let block_source_fallback = HlNodeBlockSource::new( + BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))), + PathBuf::from("/nonexistent"), + 1000000, + ) + .await; + let block_hl_node_0 = empty_block(1000000, 1722633600, b"hl-node"); + let block_hl_node_1 = empty_block(1000001, 1722633600, b"hl-node"); + let block_fallback_1 = empty_block(1000001, 1722633600, b"fallback"); + + let (temp_dir1, mut file1) = setup_temp_dir_and_file()?; + writeln!(&mut file1, "{}", serde_json::to_string(&block_hl_node_0)?)?; + + let block_source = HlNodeBlockSource::new( + BlockSourceBoxed::new(Box::new(block_source_fallback.clone())), + temp_dir1.path().to_path_buf(), + 1000000, + ) + .await; + + block_source_fallback + .local_blocks_cache + .lock() + .await + .load_scan_result(scan_result_from_single_block(block_fallback_1.1.clone())); + + Ok(BlockSourceHierarchy { + block_source, + _temp_dir: temp_dir1, + file1, + current_block: block_hl_node_0, + future_block_hl_node: block_hl_node_1, + future_block_fallback: block_fallback_1, + }) + } + + #[tokio::test] + async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> { + let hierarchy = setup_block_source_hierarchy().await?; + let BlockSourceHierarchy { + block_source, + current_block, + future_block_hl_node, + mut file1, + .. + } = hierarchy; + + let block = block_source.collect_block(1000000).await.unwrap(); + assert_eq!(block, current_block.1); + + let block = block_source.collect_block(1000001).await; + assert!(block.is_err()); + + writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?; + tokio::time::sleep(Duration::from_millis(100)).await; + + let block = block_source.collect_block(1000001).await.unwrap(); + assert_eq!(block, future_block_hl_node.1); + + Ok(()) + } + + #[tokio::test] + async fn test_update_last_fetch_fallback() -> eyre::Result<()> { + let hierarchy = setup_block_source_hierarchy().await?; + let BlockSourceHierarchy { + block_source, + current_block, + future_block_fallback, + mut file1, + .. + } = hierarchy; + + let block = block_source.collect_block(1000000).await.unwrap(); + assert_eq!(block, current_block.1); + + tokio::time::sleep(HlNodeBlockSource::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs()) + .await; + + writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?; + let block = block_source.collect_block(1000001).await.unwrap(); + assert_eq!(block, future_block_fallback.1); + + Ok(()) + } +}