From 52909eea3f05851dbadb8c919753d49aa0aeffe1 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Mon, 25 Aug 2025 22:25:12 -0400 Subject: [PATCH] feat: Add --local.fallback-threshold, --s3.polling-interval --- src/pseudo_peer/cli.rs | 37 +++++++++++++++++--- src/pseudo_peer/config.rs | 44 ++++++++++++++++-------- src/pseudo_peer/service.rs | 6 ++-- src/pseudo_peer/sources/cached.rs | 4 +++ src/pseudo_peer/sources/hl_node/mod.rs | 38 +++++++++++--------- src/pseudo_peer/sources/hl_node/tests.rs | 20 +++++++---- src/pseudo_peer/sources/mod.rs | 27 ++++++--------- src/pseudo_peer/sources/s3.rs | 11 ++++-- 8 files changed, 121 insertions(+), 66 deletions(-) diff --git a/src/pseudo_peer/cli.rs b/src/pseudo_peer/cli.rs index 4411b1561..50aa6acef 100644 --- a/src/pseudo_peer/cli.rs +++ b/src/pseudo_peer/cli.rs @@ -1,3 +1,7 @@ +use std::time::Duration; + +use crate::pseudo_peer::HlNodeBlockSourceArgs; + use super::config::BlockSourceConfig; use clap::{Args, Parser}; use reth_node_core::args::LogArgs; @@ -13,7 +17,7 @@ pub struct BlockSourceArgs { block_source: Option, #[arg(long, alias = "local-ingest-dir")] - block_source_from_node: Option, + local_ingest_dir: Option, /// Shorthand of --block-source=s3://hl-mainnet-evm-blocks #[arg(long, default_value_t = false)] @@ -22,6 +26,19 @@ pub struct BlockSourceArgs { /// Shorthand of --block-source-from-node=~/hl/data/evm_blocks_and_receipts #[arg(long)] local: bool, + + /// Interval for polling new blocks in S3 in milliseconds. + #[arg(id = "s3.polling-interval", long = "s3.polling-interval", default_value = "25")] + s3_polling_interval: u64, + + /// Maximum allowed delay for the hl-node block source in milliseconds. + /// If this threshold is exceeded, the client falls back to other sources. + #[arg( + id = "local.fallback-threshold", + long = "local.fallback-threshold", + default_value = "5000" + )] + local_fallback_threshold: u64, } impl BlockSourceArgs { @@ -33,7 +50,10 @@ impl BlockSourceArgs { async fn create_base_config(&self) -> eyre::Result { if self.s3 { - return Ok(BlockSourceConfig::s3_default().await); + return Ok(BlockSourceConfig::s3_default(Duration::from_millis( + self.s3_polling_interval, + )) + .await); } if self.local { @@ -47,18 +67,25 @@ impl BlockSourceArgs { }; if let Some(bucket) = value.strip_prefix("s3://") { - Ok(BlockSourceConfig::s3(bucket.to_string()).await) + Ok(BlockSourceConfig::s3( + bucket.to_string(), + Duration::from_millis(self.s3_polling_interval), + ) + .await) } else { Ok(BlockSourceConfig::local(value.into())) } } fn apply_node_source_config(&self, config: BlockSourceConfig) -> BlockSourceConfig { - let Some(block_source_from_node) = self.block_source_from_node.as_ref() else { + let Some(local_ingest_dir) = self.local_ingest_dir.as_ref() else { return config; }; - config.with_block_source_from_node(block_source_from_node.to_string()) + config.with_block_source_from_node(HlNodeBlockSourceArgs { + root: local_ingest_dir.into(), + fallback_threshold: Duration::from_millis(self.local_fallback_threshold), + }) } } diff --git a/src/pseudo_peer/config.rs b/src/pseudo_peer/config.rs index 0bc6635a4..15fe37587 100644 --- a/src/pseudo_peer/config.rs +++ b/src/pseudo_peer/config.rs @@ -1,31 +1,38 @@ use crate::chainspec::HlChainSpec; use super::sources::{ - BlockSourceBoxed, CachedBlockSource, HlNodeBlockSource, LocalBlockSource, S3BlockSource, + BlockSourceBoxed, CachedBlockSource, HlNodeBlockSource, HlNodeBlockSourceArgs, + LocalBlockSource, S3BlockSource, }; use aws_config::BehaviorVersion; -use std::{env::home_dir, path::PathBuf, sync::Arc}; +use std::{env::home_dir, path::PathBuf, sync::Arc, time::Duration}; #[derive(Debug, Clone)] pub struct BlockSourceConfig { pub source_type: BlockSourceType, - pub block_source_from_node: Option, + pub block_source_from_node: Option, } #[derive(Debug, Clone)] pub enum BlockSourceType { - S3Default, - S3 { bucket: String }, + S3Default { polling_interval: Duration }, + S3 { bucket: String, polling_interval: Duration }, Local { path: PathBuf }, } impl BlockSourceConfig { - pub async fn s3_default() -> Self { - Self { source_type: BlockSourceType::S3Default, block_source_from_node: None } + pub async fn s3_default(polling_interval: Duration) -> Self { + Self { + source_type: BlockSourceType::S3Default { polling_interval }, + block_source_from_node: None, + } } - pub async fn s3(bucket: String) -> Self { - Self { source_type: BlockSourceType::S3 { bucket }, block_source_from_node: None } + pub async fn s3(bucket: String, polling_interval: Duration) -> Self { + Self { + source_type: BlockSourceType::S3 { bucket, polling_interval }, + block_source_from_node: None, + } } pub fn local(path: PathBuf) -> Self { @@ -45,15 +52,22 @@ impl BlockSourceConfig { } } - pub fn with_block_source_from_node(mut self, block_source_from_node: String) -> Self { + pub fn with_block_source_from_node( + mut self, + block_source_from_node: HlNodeBlockSourceArgs, + ) -> Self { self.block_source_from_node = Some(block_source_from_node); self } pub async fn create_block_source(&self, chain_spec: HlChainSpec) -> BlockSourceBoxed { match &self.source_type { - BlockSourceType::S3Default => s3_block_source(chain_spec.official_s3_bucket()).await, - BlockSourceType::S3 { bucket } => s3_block_source(bucket).await, + BlockSourceType::S3Default { polling_interval } => { + s3_block_source(chain_spec.official_s3_bucket(), *polling_interval).await + } + BlockSourceType::S3 { bucket, polling_interval } => { + s3_block_source(bucket, *polling_interval).await + } BlockSourceType::Local { path } => { Arc::new(Box::new(LocalBlockSource::new(path.clone()))) } @@ -72,7 +86,7 @@ impl BlockSourceConfig { Arc::new(Box::new( HlNodeBlockSource::new( fallback_block_source, - PathBuf::from(block_source_from_node.clone()), + block_source_from_node.clone(), next_block_number, ) .await, @@ -91,9 +105,9 @@ impl BlockSourceConfig { } } -async fn s3_block_source(bucket: impl AsRef) -> BlockSourceBoxed { +async fn s3_block_source(bucket: impl AsRef, polling_interval: Duration) -> BlockSourceBoxed { let client = aws_sdk_s3::Client::new( &aws_config::defaults(BehaviorVersion::latest()).region("ap-northeast-1").load().await, ); - Arc::new(Box::new(S3BlockSource::new(client, bucket.as_ref().to_string()))) + Arc::new(Box::new(S3BlockSource::new(client, bucket.as_ref().to_string(), polling_interval))) } diff --git a/src/pseudo_peer/service.rs b/src/pseudo_peer/service.rs index ebf8a6533..1e249fb8a 100644 --- a/src/pseudo_peer/service.rs +++ b/src/pseudo_peer/service.rs @@ -26,7 +26,6 @@ use std::{ pin::Pin, sync::{Arc, Mutex}, task::{Context, Poll}, - time::Duration, }; use tokio::{sync::mpsc, task::JoinHandle}; use tracing::{debug, info}; @@ -49,8 +48,6 @@ pub struct BlockPoller { } impl BlockPoller { - const POLL_INTERVAL: Duration = Duration::from_millis(25); - pub fn new_suspended( chain_id: u64, block_source: BS, @@ -77,6 +74,7 @@ impl BlockPoller { start_rx.recv().await.ok_or(eyre::eyre!("Failed to receive start signal"))?; info!("Starting block poller"); + let polling_interval = block_source.polling_interval(); let mut next_block_number = block_source .find_latest_block_number() .await @@ -88,7 +86,7 @@ impl BlockPoller { block_tx_clone.send((next_block_number, block)).await?; next_block_number += 1; } - Err(_) => tokio::time::sleep(Self::POLL_INTERVAL).await, + Err(_) => tokio::time::sleep(polling_interval).await, } } } diff --git a/src/pseudo_peer/sources/cached.rs b/src/pseudo_peer/sources/cached.rs index d8bfa5983..5f65604fc 100644 --- a/src/pseudo_peer/sources/cached.rs +++ b/src/pseudo_peer/sources/cached.rs @@ -41,4 +41,8 @@ impl BlockSource for CachedBlockSource { fn recommended_chunk_size(&self) -> u64 { self.block_source.recommended_chunk_size() } + + fn polling_interval(&self) -> std::time::Duration { + self.block_source.polling_interval() + } } diff --git a/src/pseudo_peer/sources/hl_node/mod.rs b/src/pseudo_peer/sources/hl_node/mod.rs index c02cc9c79..5317d3fde 100644 --- a/src/pseudo_peer/sources/hl_node/mod.rs +++ b/src/pseudo_peer/sources/hl_node/mod.rs @@ -14,35 +14,39 @@ use self::{ use super::{BlockSource, BlockSourceBoxed}; use crate::node::types::BlockAndReceipts; use futures::future::BoxFuture; -use serde::{Deserialize, Serialize}; use std::{ path::{Path, PathBuf}, sync::Arc, + time::Duration, }; -use time::{Duration, OffsetDateTime}; +use time::OffsetDateTime; use tokio::sync::Mutex; use tracing::{info, warn}; -const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); const HOURLY_SUBDIR: &str = "hourly"; const CACHE_SIZE: u32 = 8000; // 3660 blocks per hour -const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000); +const ONE_HOUR: Duration = Duration::from_secs(60 * 60); +const TAIL_INTERVAL: Duration = Duration::from_millis(25); -#[derive(Serialize, Deserialize, Debug, Clone)] -pub(crate) struct LocalBlockAndReceipts(String, BlockAndReceipts); +#[derive(Debug, Clone)] +pub struct HlNodeBlockSourceArgs { + pub root: PathBuf, + pub fallback_threshold: Duration, +} /// Block source that monitors the local ingest directory for the HL node. #[derive(Debug, Clone)] pub struct HlNodeBlockSource { pub fallback: BlockSourceBoxed, - pub local_ingest_dir: PathBuf, pub local_blocks_cache: Arc>, pub last_local_fetch: Arc>>, + pub args: HlNodeBlockSourceArgs, } impl BlockSource for HlNodeBlockSource { fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { let fallback = self.fallback.clone(); + let args = self.args.clone(); let local_blocks_cache = self.local_blocks_cache.clone(); let last_local_fetch = self.last_local_fetch.clone(); Box::pin(async move { @@ -55,7 +59,7 @@ impl BlockSource for HlNodeBlockSource { if let Some((last_height, last_poll_time)) = *last_local_fetch.lock().await { let more_recent = last_height < height; - let too_soon = now - last_poll_time < MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK; + let too_soon = now - last_poll_time < args.fallback_threshold; if more_recent && too_soon { return Err(eyre::eyre!( "Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up" @@ -71,12 +75,12 @@ impl BlockSource for HlNodeBlockSource { fn find_latest_block_number(&self) -> BoxFuture<'static, Option> { let fallback = self.fallback.clone(); - let local_ingest_dir = self.local_ingest_dir.clone(); + let args = self.args.clone(); Box::pin(async move { - let Some(dir) = FileOperations::find_latest_hourly_file(&local_ingest_dir) else { + let Some(dir) = FileOperations::find_latest_hourly_file(&args.root) else { warn!( "No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir", - local_ingest_dir + args.root ); return fallback.find_latest_block_number().await; }; @@ -160,7 +164,7 @@ impl HlNodeBlockSource { } async fn start_local_ingest_loop(&self, current_head: u64) { - let root = self.local_ingest_dir.to_owned(); + let root = self.args.root.to_owned(); let cache = self.local_blocks_cache.clone(); tokio::spawn(async move { let mut next_height = current_head; @@ -185,8 +189,8 @@ impl HlNodeBlockSource { cache.lock().await.load_scan_result(scan_result); } let now = OffsetDateTime::now_utc(); - if dt + Duration::HOUR < now { - dt += Duration::HOUR; + if dt + ONE_HOUR < now { + dt += ONE_HOUR; (hour, day_str, last_line) = (dt.hour(), TimeUtils::date_from_datetime(dt), 0); info!( "Moving to new file: {:?}", @@ -201,7 +205,7 @@ impl HlNodeBlockSource { pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> { let _ = Self::try_backfill_local_blocks( - &self.local_ingest_dir, + &self.args.root, &self.local_blocks_cache, next_block_number, ) @@ -212,12 +216,12 @@ impl HlNodeBlockSource { pub async fn new( fallback: BlockSourceBoxed, - local_ingest_dir: PathBuf, + args: HlNodeBlockSourceArgs, next_block_number: u64, ) -> Self { let block_source = Self { fallback, - local_ingest_dir, + args, local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))), last_local_fetch: Arc::new(Mutex::new(None)), }; diff --git a/src/pseudo_peer/sources/hl_node/tests.rs b/src/pseudo_peer/sources/hl_node/tests.rs index 4674ef63a..8f5c8a77a 100644 --- a/src/pseudo_peer/sources/hl_node/tests.rs +++ b/src/pseudo_peer/sources/hl_node/tests.rs @@ -1,11 +1,13 @@ use super::*; use crate::{ node::types::{reth_compat, ReadPrecompileCalls}, - pseudo_peer::sources::LocalBlockSource, + pseudo_peer::sources::{hl_node::scan::LocalBlockAndReceipts, LocalBlockSource}, }; use alloy_consensus::{BlockBody, Header}; use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256}; -use std::{io::Write, time::Duration as StdDuration}; +use std::{io::Write, time::Duration}; + +const DEFAULT_FALLBACK_THRESHOLD_FOR_TEST: Duration = Duration::from_millis(5000); #[test] fn test_datetime_from_path() { @@ -111,7 +113,10 @@ async fn setup_block_source_hierarchy() -> eyre::Result { // Setup fallback block source let block_source_fallback = HlNodeBlockSource::new( BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))), - PathBuf::from("/nonexistent"), + HlNodeBlockSourceArgs { + root: { PathBuf::from("/nonexistent") }, + fallback_threshold: DEFAULT_FALLBACK_THRESHOLD_FOR_TEST, + }, 1000000, ) .await; @@ -124,7 +129,10 @@ async fn setup_block_source_hierarchy() -> eyre::Result { let block_source = HlNodeBlockSource::new( BlockSourceBoxed::new(Box::new(block_source_fallback.clone())), - temp_dir1.path().to_path_buf(), + HlNodeBlockSourceArgs { + root: temp_dir1.path().to_path_buf(), + fallback_threshold: DEFAULT_FALLBACK_THRESHOLD_FOR_TEST, + }, 1000000, ) .await; @@ -159,7 +167,7 @@ async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> { assert!(block.is_err()); writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?; - tokio::time::sleep(StdDuration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; let block = block_source.collect_block(1000001).await.unwrap(); assert_eq!(block, future_block_hl_node.1); @@ -177,7 +185,7 @@ async fn test_update_last_fetch_fallback() -> eyre::Result<()> { let block = block_source.collect_block(1000000).await.unwrap(); assert_eq!(block, current_block.1); - tokio::time::sleep(MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs()).await; + tokio::time::sleep(DEFAULT_FALLBACK_THRESHOLD_FOR_TEST).await; writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?; let block = block_source.collect_block(1000001).await.unwrap(); diff --git a/src/pseudo_peer/sources/mod.rs b/src/pseudo_peer/sources/mod.rs index 274222a1f..3def6d566 100644 --- a/src/pseudo_peer/sources/mod.rs +++ b/src/pseudo_peer/sources/mod.rs @@ -1,6 +1,7 @@ use crate::node::types::BlockAndReceipts; +use auto_impl::auto_impl; use futures::future::BoxFuture; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; // Module declarations mod cached; @@ -11,11 +12,14 @@ mod utils; // Public exports pub use cached::CachedBlockSource; -pub use hl_node::HlNodeBlockSource; +pub use hl_node::{HlNodeBlockSource, HlNodeBlockSourceArgs}; pub use local::LocalBlockSource; pub use s3::S3BlockSource; +const DEFAULT_POLLING_INTERVAL: Duration = Duration::from_millis(25); + /// Trait for block sources that can retrieve blocks from various sources +#[auto_impl(&, &mut, Box, Arc)] pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static { /// Retrieves a block at the specified height fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result>; @@ -25,21 +29,12 @@ pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static { /// Returns the recommended chunk size for batch operations fn recommended_chunk_size(&self) -> u64; + + /// Returns the polling interval + fn polling_interval(&self) -> Duration { + DEFAULT_POLLING_INTERVAL + } } /// Type alias for a boxed block source pub type BlockSourceBoxed = Arc>; - -impl BlockSource for BlockSourceBoxed { - fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { - self.as_ref().collect_block(height) - } - - fn find_latest_block_number(&self) -> BoxFuture<'static, Option> { - self.as_ref().find_latest_block_number() - } - - fn recommended_chunk_size(&self) -> u64 { - self.as_ref().recommended_chunk_size() - } -} diff --git a/src/pseudo_peer/sources/s3.rs b/src/pseudo_peer/sources/s3.rs index f7ec15b7b..85999796a 100644 --- a/src/pseudo_peer/sources/s3.rs +++ b/src/pseudo_peer/sources/s3.rs @@ -2,7 +2,7 @@ use super::{utils, BlockSource}; use crate::node::types::BlockAndReceipts; use aws_sdk_s3::types::RequestPayer; use futures::{future::BoxFuture, FutureExt}; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use tracing::info; /// Block source that reads blocks from S3 (--s3) @@ -10,11 +10,12 @@ use tracing::info; pub struct S3BlockSource { client: Arc, bucket: String, + polling_interval: Duration, } impl S3BlockSource { - pub fn new(client: aws_sdk_s3::Client, bucket: String) -> Self { - Self { client: client.into(), bucket } + pub fn new(client: aws_sdk_s3::Client, bucket: String, polling_interval: Duration) -> Self { + Self { client: client.into(), bucket, polling_interval } } async fn pick_path_with_highest_number( @@ -87,4 +88,8 @@ impl BlockSource for S3BlockSource { fn recommended_chunk_size(&self) -> u64 { 1000 } + + fn polling_interval(&self) -> Duration { + self.polling_interval + } }