diff --git a/Cargo.lock b/Cargo.lock index ee974619d..5f21ab403 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9357,6 +9357,7 @@ dependencies = [ "rmp-serde", "serde", "serde_json", + "tempfile", "thiserror 2.0.12", "tikv-jemalloc-ctl", "tikv-jemallocator", diff --git a/Cargo.toml b/Cargo.toml index 305aca442..833905ed6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -164,5 +164,5 @@ client = [ "reth-rpc-eth-api/client", ] -[profile.test] -inherits = "release" +[dev-dependencies] +tempfile = "3.20.0" diff --git a/src/node/types/mod.rs b/src/node/types/mod.rs index 93f9c072e..f4b755788 100644 --- a/src/node/types/mod.rs +++ b/src/node/types/mod.rs @@ -14,7 +14,7 @@ pub type ReadPrecompileCall = (Address, Vec<(ReadPrecompileInput, ReadPrecompile #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Default)] pub struct ReadPrecompileCalls(pub Vec); -mod reth_compat; +pub(crate) mod reth_compat; #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct HlExtras { @@ -38,7 +38,7 @@ impl Decodable for ReadPrecompileCalls { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct BlockAndReceipts { pub block: EvmBlock, pub receipts: Vec, @@ -71,12 +71,12 @@ impl BlockAndReceipts { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub enum EvmBlock { Reth115(reth_compat::SealedBlock), } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct LegacyReceipt { tx_type: LegacyTxType, success: bool, @@ -84,7 +84,7 @@ pub struct LegacyReceipt { logs: Vec, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] enum LegacyTxType { Legacy = 0, Eip2930 = 1, @@ -93,7 +93,7 @@ enum LegacyTxType { Eip7702 = 4, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct SystemTx { pub tx: reth_compat::Transaction, pub receipt: Option, diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 0960552c2..96888f8c7 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -9,7 +9,7 @@ use std::{ use futures::future::BoxFuture; use rangemap::RangeInclusiveMap; use reth_network::cache::LruMap; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use time::{macros::format_description, Date, Duration, OffsetDateTime, Time}; use tokio::sync::Mutex; use tracing::{info, warn}; @@ -47,15 +47,7 @@ impl LocalBlocksCache { } } -/// Block source that monitors the local ingest directory for the HL node. -#[derive(Debug, Clone)] -pub struct HlNodeBlockSource { - pub fallback: BlockSourceBoxed, - pub local_ingest_dir: PathBuf, - pub local_blocks_cache: Arc>, // height → block -} - -#[derive(Deserialize)] +#[derive(Serialize, Deserialize, Debug, Clone)] struct LocalBlockAndReceipts(String, BlockAndReceipts); struct ScanResult { @@ -139,14 +131,38 @@ fn date_from_datetime(dt: OffsetDateTime) -> String { dt.format(&format_description!("[year][month][day]")).unwrap() } +/// Block source that monitors the local ingest directory for the HL node. +#[derive(Debug, Clone)] +pub struct HlNodeBlockSource { + pub fallback: BlockSourceBoxed, + pub local_ingest_dir: PathBuf, + pub local_blocks_cache: Arc>, // height → block + pub last_local_fetch: Arc>>, // for rate limiting requests to fallback +} + impl BlockSource for HlNodeBlockSource { fn collect_block(&self, height: u64) -> BoxFuture> { Box::pin(async move { + let now = OffsetDateTime::now_utc(); if let Some(block) = self.try_collect_local_block(height).await { + self.update_last_fetch(height, now).await; Ok(block) } else { + if let Some((last_height, last_poll_time)) = *self.last_local_fetch.lock().await { + let more_recent = last_height < height; + let too_soon = + now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK; + if more_recent && too_soon { + return Err(eyre::eyre!( + "Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up" + )); + } + } + info!("Falling back to s3/ingest-dir for block @ Height [{height}]"); - self.fallback.collect_block(height).await + let block = self.fallback.collect_block(height).await?; + self.update_last_fetch(height, now).await; + Ok(block) } }) } @@ -161,17 +177,16 @@ impl BlockSource for HlNodeBlockSource { return self.fallback.find_latest_block_number().await; }; let mut file = File::open(&dir).expect("Failed to open hour file path"); - let last_line = read_last_complete_line(&mut file); - let Ok((_, height)) = line_to_evm_block(&last_line) else { + if let Some((_, height)) = read_last_complete_line(&mut file) { + info!("Latest block number: {} with path {}", height, dir.display()); + Some(height) + } else { warn!( "Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir", file ); - return self.fallback.find_latest_block_number().await; - }; - - info!("Latest block number: {} with path {}", height, dir.display()); - Some(height) + self.fallback.find_latest_block_number().await + } }) } @@ -180,8 +195,8 @@ impl BlockSource for HlNodeBlockSource { } } -fn read_last_complete_line(read: &mut R) -> String { - const CHUNK_SIZE: u64 = 4096; +fn read_last_complete_line(read: &mut R) -> Option<(BlockAndReceipts, u64)> { + const CHUNK_SIZE: u64 = 50000; let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); let mut pos = read.seek(SeekFrom::End(0)).unwrap(); let mut last_line: Vec = Vec::new(); @@ -201,9 +216,12 @@ fn read_last_complete_line(read: &mut R) -> String { if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { let candidate = &last_line[idx + 1..]; - if line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap()).is_ok() { - return String::from_utf8(candidate.to_vec()).unwrap(); + if let Ok((evm_block, height)) = + line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap()) + { + return Some((evm_block, height)); } + // Incomplete line; truncate and continue last_line.truncate(idx); } @@ -213,10 +231,31 @@ fn read_last_complete_line(read: &mut R) -> String { pos -= read_size; } - String::from_utf8(last_line).unwrap() + println!( + "last_line: {:?} {:?}", + String::from_utf8(last_line.clone()).unwrap(), + line_to_evm_block(&String::from_utf8(last_line.clone()).unwrap()) + ); + line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok() } impl HlNodeBlockSource { + /// [HlNodeBlockSource] picks the faster one between local ingest directory and s3/ingest-dir. + /// But if we immediately fallback to s3/ingest-dir, in case of S3, it may cause unnecessary + /// requests to S3 while it'll return 404. + /// + /// So we allow a small threshold to avoid unnecessary fallback. + pub(crate) const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(500); + + async fn update_last_fetch(&self, height: u64, now: OffsetDateTime) { + if let Some((last_height, _)) = *self.last_local_fetch.lock().await { + if last_height >= height { + return; + } + } + *self.last_local_fetch.lock().await = Some((height, now)); + } + async fn try_collect_local_block(&self, height: u64) -> Option { let mut u_cache = self.local_blocks_cache.lock().await; if let Some(block) = u_cache.cache.remove(&height) { @@ -276,8 +315,9 @@ impl HlNodeBlockSource { for subfile in Self::all_hourly_files(root).unwrap_or_default() { let mut file = File::open(&subfile).expect("Failed to open hour file path"); - let last_line = read_last_complete_line(&mut file); - if let Ok((_, height)) = line_to_evm_block(&last_line) { + println!("subfile: {:?} {:?}", subfile, read_last_complete_line(&mut file)); + let mut file = File::open(&subfile).expect("Failed to open hour file path"); + if let Some((_, height)) = read_last_complete_line(&mut file) { if height < cutoff_height { continue; } @@ -387,6 +427,7 @@ impl HlNodeBlockSource { fallback, local_ingest_dir, local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())), + last_local_fetch: Arc::new(Mutex::new(None)), }; block_source.run(next_block_number).await.unwrap(); block_source @@ -395,7 +436,12 @@ impl HlNodeBlockSource { #[cfg(test)] mod tests { + use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256}; + use super::*; + use crate::node::types::ReadPrecompileCalls; + use crate::pseudo_peer::sources::LocalBlockSource; + #[test] fn test_datetime_from_path() { let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4"); @@ -420,4 +466,175 @@ mod tests { Some(&test_path.join(HOURLY_SUBDIR).join("20250729").join("22")) ); } + + use std::io::Write; + use std::time::Duration; + + use crate::node::types::reth_compat; + use alloy_consensus::{BlockBody, Header}; + + fn scan_result_from_single_block(block: BlockAndReceipts) -> ScanResult { + let height = match &block.block { + EvmBlock::Reth115(b) => b.header.header.number, + }; + ScanResult { + path: PathBuf::from("/nonexistent-block"), + next_expected_height: height + 1, + new_blocks: vec![block], + new_block_ranges: vec![height..=height], + } + } + + fn empty_block( + number: u64, + timestamp: u64, + extra_data: &'static [u8], + ) -> LocalBlockAndReceipts { + let extra_data = Bytes::from_static(extra_data); + let res = BlockAndReceipts { + block: EvmBlock::Reth115(reth_compat::SealedBlock { + header: reth_compat::SealedHeader { + header: Header { + parent_hash: B256::ZERO, + ommers_hash: B256::ZERO, + beneficiary: Address::ZERO, + state_root: B256::ZERO, + transactions_root: B256::ZERO, + receipts_root: B256::ZERO, + logs_bloom: Bloom::ZERO, + difficulty: U256::ZERO, + number, + gas_limit: 0, + gas_used: 0, + timestamp, + extra_data, + mix_hash: B256::ZERO, + nonce: B64::ZERO, + base_fee_per_gas: None, + withdrawals_root: None, + blob_gas_used: None, + excess_blob_gas: None, + parent_beacon_block_root: None, + requests_hash: None, + }, + hash: B256::ZERO, + }, + body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None }, + }), + receipts: vec![], + system_txs: vec![], + read_precompile_calls: ReadPrecompileCalls(vec![]), + highest_precompile_address: None, + }; + LocalBlockAndReceipts(timestamp.to_string(), res) + } + + fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, File)> { + let now = OffsetDateTime::now_utc(); + let day_str = date_from_datetime(now); + let hour = now.hour(); + + let temp_dir = tempfile::tempdir()?; + let path = temp_dir.path().join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); + std::fs::create_dir_all(path.parent().unwrap())?; + + Ok((temp_dir, File::create(path)?)) + } + + struct BlockSourceHierarchy { + block_source: HlNodeBlockSource, + _temp_dir: tempfile::TempDir, + file1: File, + current_block: LocalBlockAndReceipts, + future_block_hl_node: LocalBlockAndReceipts, + future_block_fallback: LocalBlockAndReceipts, + } + + async fn setup_block_source_hierarchy() -> eyre::Result { + // Setup fallback block source + let block_source_fallback = HlNodeBlockSource::new( + BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))), + PathBuf::from("/nonexistent"), + 1000000, + ) + .await; + let block_hl_node_0 = empty_block(1000000, 1722633600, b"hl-node"); + let block_hl_node_1 = empty_block(1000001, 1722633600, b"hl-node"); + let block_fallback_1 = empty_block(1000001, 1722633600, b"fallback"); + + let (temp_dir1, mut file1) = setup_temp_dir_and_file()?; + writeln!(&mut file1, "{}", serde_json::to_string(&block_hl_node_0)?)?; + + let block_source = HlNodeBlockSource::new( + BlockSourceBoxed::new(Box::new(block_source_fallback.clone())), + temp_dir1.path().to_path_buf(), + 1000000, + ) + .await; + + block_source_fallback + .local_blocks_cache + .lock() + .await + .load_scan_result(scan_result_from_single_block(block_fallback_1.1.clone())); + + Ok(BlockSourceHierarchy { + block_source, + _temp_dir: temp_dir1, + file1, + current_block: block_hl_node_0, + future_block_hl_node: block_hl_node_1, + future_block_fallback: block_fallback_1, + }) + } + + #[tokio::test] + async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> { + let hierarchy = setup_block_source_hierarchy().await?; + let BlockSourceHierarchy { + block_source, + current_block, + future_block_hl_node, + mut file1, + .. + } = hierarchy; + + let block = block_source.collect_block(1000000).await.unwrap(); + assert_eq!(block, current_block.1); + + let block = block_source.collect_block(1000001).await; + assert!(block.is_err()); + + writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?; + tokio::time::sleep(Duration::from_millis(100)).await; + + let block = block_source.collect_block(1000001).await.unwrap(); + assert_eq!(block, future_block_hl_node.1); + + Ok(()) + } + + #[tokio::test] + async fn test_update_last_fetch_fallback() -> eyre::Result<()> { + let hierarchy = setup_block_source_hierarchy().await?; + let BlockSourceHierarchy { + block_source, + current_block, + future_block_fallback, + mut file1, + .. + } = hierarchy; + + let block = block_source.collect_block(1000000).await.unwrap(); + assert_eq!(block, current_block.1); + + tokio::time::sleep(HlNodeBlockSource::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs()) + .await; + + writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?; + let block = block_source.collect_block(1000001).await.unwrap(); + assert_eq!(block, future_block_fallback.1); + + Ok(()) + } }