diff --git a/README.md b/README.md index b40d6d7d0..109350dde 100644 --- a/README.md +++ b/README.md @@ -13,4 +13,4 @@ Heavily inspired by [reth-bsc](https://github.com/loocapro/reth-bsc). - [x] Downloader - [x] S3 format (file) - [x] S3 format (AWS API) - - [ ] hl-node format + - [x] hl-node format diff --git a/src/pseudo_peer/cli.rs b/src/pseudo_peer/cli.rs index a5deb5394..f4f53ae43 100644 --- a/src/pseudo_peer/cli.rs +++ b/src/pseudo_peer/cli.rs @@ -12,6 +12,9 @@ pub struct BlockSourceArgs { #[arg(long)] block_source: Option, + #[arg(long)] + block_source_from_node: Option, + /// Shorthand of --block-source=s3://hl-mainnet-evm-blocks #[arg(long = "s3", default_value_t = false)] s3: bool, @@ -29,12 +32,16 @@ impl BlockSourceArgs { )); }; - let config = if let Some(bucket) = value.strip_prefix("s3://") { + let mut config = if let Some(bucket) = value.strip_prefix("s3://") { BlockSourceConfig::s3(bucket.to_string()).await } else { BlockSourceConfig::local(value.to_string()) }; + if let Some(block_source_from_node) = self.block_source_from_node.as_ref() { + config = config.with_block_source_from_node(block_source_from_node.to_string()); + } + Ok(config) } } diff --git a/src/pseudo_peer/config.rs b/src/pseudo_peer/config.rs index 63f1fa04d..70511d492 100644 --- a/src/pseudo_peer/config.rs +++ b/src/pseudo_peer/config.rs @@ -1,14 +1,18 @@ use aws_config::BehaviorVersion; +use super::sources::HlNodeBlockSource; + use super::{ consts::DEFAULT_S3_BUCKET, sources::{BlockSourceBoxed, LocalBlockSource, S3BlockSource}, }; +use std::path::PathBuf; use std::sync::Arc; #[derive(Debug, Clone)] pub struct BlockSourceConfig { pub source_type: BlockSourceType, + pub block_source_from_node: Option, } #[derive(Debug, Clone)] @@ -19,19 +23,27 @@ pub enum BlockSourceType { impl BlockSourceConfig { pub async fn s3_default() -> Self { - Self { source_type: BlockSourceType::S3 { bucket: DEFAULT_S3_BUCKET.to_string() } } + Self { + source_type: BlockSourceType::S3 { bucket: DEFAULT_S3_BUCKET.to_string() }, + block_source_from_node: None, + } } pub async fn s3(bucket: String) -> Self { - Self { source_type: BlockSourceType::S3 { bucket } } + Self { source_type: BlockSourceType::S3 { bucket }, block_source_from_node: None } } pub fn local(path: String) -> Self { - Self { source_type: BlockSourceType::Local { path } } + Self { source_type: BlockSourceType::Local { path }, block_source_from_node: None } + } + + pub fn with_block_source_from_node(mut self, block_source_from_node: String) -> Self { + self.block_source_from_node = Some(block_source_from_node); + self } pub async fn create_block_source(&self) -> BlockSourceBoxed { - match &self.source_type { + let block_source: BlockSourceBoxed = match &self.source_type { BlockSourceType::S3 { bucket } => { let client = aws_sdk_s3::Client::new( &aws_config::defaults(BehaviorVersion::latest()) @@ -46,7 +58,18 @@ impl BlockSourceConfig { let block_source = LocalBlockSource::new(path.clone()); Arc::new(Box::new(block_source)) } - } + }; + + let Some(block_source_from_node) = self.block_source_from_node.as_ref() else { + return block_source; + }; + + let block_source = HlNodeBlockSource::new( + block_source.clone(), + PathBuf::from(block_source_from_node.clone()), + ) + .await; + Arc::new(Box::new(block_source)) } pub async fn create_cached_block_source(&self) -> BlockSourceBoxed { diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs new file mode 100644 index 000000000..f96b0eb39 --- /dev/null +++ b/src/pseudo_peer/sources/hl_node.rs @@ -0,0 +1,213 @@ +use std::io::{BufRead, BufReader}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use eyre::ContextCompat; +use futures::future::BoxFuture; +use reth_network::cache::LruMap; +use serde::Deserialize; +use time::{format_description, Duration, OffsetDateTime}; +use tokio::sync::Mutex; +use tracing::info; + +use crate::node::types::{BlockAndReceipts, EvmBlock}; + +use super::{BlockSource, BlockSourceBoxed}; + +/// Poll interval when tailing an *open* hourly file. +const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); +/// Sub‑directory that contains day folders (inside `local_ingest_dir`). +const HOURLY_SUBDIR: &str = "hourly"; +/// Maximum number of blocks to cache blocks from hl-node. +/// In normal situation, 0~1 blocks will be cached. +const CACHE_SIZE: u32 = 1000; + +/// Block source that tails the local ingest directory for the HL node. +/// +/// Originally written at https://github.com/hl-archive-node/nanoreth/pull/7 +#[derive(Debug, Clone)] +pub struct HlNodeBlockSource { + pub fallback: BlockSourceBoxed, + pub local_ingest_dir: PathBuf, + pub local_blocks_cache: Arc>>, // height → block +} + +#[derive(Deserialize)] +struct LocalBlockAndReceipts(String, BlockAndReceipts); + +struct ScanResult { + next_expected_height: u64, + new_blocks: Vec, +} + +fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult { + // info!( + // "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}", + // path, start_height, last_line + // ); + let file = std::fs::File::open(path).expect("Failed to open hour file path"); + let reader = BufReader::new(file); + + let mut new_blocks = Vec::::new(); + let mut last_height = start_height; + let lines: Vec = reader.lines().collect::>().unwrap(); + let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 }; + + for (line_idx, line) in lines.iter().enumerate().skip(skip) { + // Safety check ensuring efficiency + if line_idx < *last_line { + continue; + } + if line.trim().is_empty() { + continue; + } + + let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = + serde_json::from_str(&line).expect("Failed to parse local block and receipts"); + + let height = match &parsed_block.block { + EvmBlock::Reth115(b) => { + let block_number = b.header.header.number; + // Another check to ensure not returning an older block + if block_number < start_height { + continue; + } + block_number + } + }; + // println!("Iterating block height {:?} | Line {}", height, line_idx); + if height >= start_height { + last_height = last_height.max(height); + new_blocks.push(parsed_block); + *last_line = line_idx; + } + } + + ScanResult { next_expected_height: last_height + 1, new_blocks } +} + +fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime { + OffsetDateTime::from_unix_timestamp_nanos((ts_sec as i128) * 1_000 * 1_000_000) + .expect("timestamp out of range") +} + +fn date_from_datetime(dt: OffsetDateTime) -> String { + dt.format(&format_description::parse("[year][month][day]").unwrap()).unwrap() +} + +impl BlockSource for HlNodeBlockSource { + fn collect_block(&self, height: u64) -> BoxFuture> { + Box::pin(async move { + // Not a one liner (using .or) to include logs + if let Some(block) = self.try_collect_local_block(height).await { + info!("Returning locally synced block for @ Height [{height}]"); + return Ok(block); + } else { + self.fallback.collect_block(height).await + } + }) + } + + fn find_latest_block_number(&self) -> futures::future::BoxFuture> { + self.fallback.find_latest_block_number() + } + + fn recommended_chunk_size(&self) -> u64 { + self.fallback.recommended_chunk_size() + } +} + +fn to_hourly(dt: OffsetDateTime) -> Result { + dt.replace_minute(0)?.replace_second(0)?.replace_nanosecond(0) +} + +impl HlNodeBlockSource { + async fn try_collect_local_block(&self, height: u64) -> Option { + let mut u_cache = self.local_blocks_cache.lock().await; + u_cache.remove(&height) + } + + async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) { + let root = self.local_ingest_dir.to_owned(); + let cache = self.local_blocks_cache.clone(); + + tokio::spawn(async move { + let mut next_height = current_head; + let mut dt = to_hourly(datetime_from_timestamp(current_ts)).unwrap(); + + let mut hour = dt.hour(); + let mut day_str = date_from_datetime(dt); + let mut last_line = 0; + + loop { + let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); + + if hour_file.exists() { + let ScanResult { next_expected_height, new_blocks } = + scan_hour_file(&hour_file, &mut last_line, next_height); + if !new_blocks.is_empty() { + let mut u_cache = cache.lock().await; + for blk in new_blocks { + let h = match &blk.block { + EvmBlock::Reth115(b) => { + let block_number = b.header.header.number; + block_number + } + }; + u_cache.insert(h, blk); + } + next_height = next_expected_height; + } + } + + // Decide whether the *current* hour file is closed (past) or + // still live. If it’s in the past by > 1 h, move to next hour; + // otherwise, keep tailing the same file. + let now = OffsetDateTime::now_utc(); + + // println!("Date Current {:?}", dt); + // println!("Now Current {:?}", now); + + if dt + Duration::HOUR < now { + dt += Duration::HOUR; + hour = dt.hour(); + day_str = date_from_datetime(dt); + last_line = 0; + info!( + "Moving to a new file. {:?}", + root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")) + ); + continue; + } + + tokio::time::sleep(TAIL_INTERVAL).await; + } + }); + } + + pub(crate) async fn run(&self) -> eyre::Result<()> { + let latest_block_number = self + .fallback + .find_latest_block_number() + .await + .context("Failed to find latest block number")?; + + let EvmBlock::Reth115(latest_block) = + self.fallback.collect_block(latest_block_number).await?.block; + + let latest_block_ts = latest_block.header.header.timestamp; + + self.start_local_ingest_loop(latest_block_number, latest_block_ts).await; + Ok(()) + } + + pub async fn new(fallback: BlockSourceBoxed, local_ingest_dir: PathBuf) -> Self { + let block_source = HlNodeBlockSource { + fallback, + local_ingest_dir, + local_blocks_cache: Arc::new(Mutex::new(LruMap::new(CACHE_SIZE))), + }; + block_source.run().await.unwrap(); + block_source + } +} diff --git a/src/pseudo_peer/sources.rs b/src/pseudo_peer/sources/mod.rs similarity index 99% rename from src/pseudo_peer/sources.rs rename to src/pseudo_peer/sources/mod.rs index 68ba69ec5..dfd497c0c 100644 --- a/src/pseudo_peer/sources.rs +++ b/src/pseudo_peer/sources/mod.rs @@ -9,6 +9,9 @@ use std::{ }; use tracing::info; +mod hl_node; +pub use hl_node::HlNodeBlockSource; + pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static { fn collect_block(&self, height: u64) -> BoxFuture>; fn find_latest_block_number(&self) -> BoxFuture>; @@ -129,7 +132,7 @@ impl BlockSource for S3BlockSource { } fn recommended_chunk_size(&self) -> u64 { - 100 + 1000 } }