From 9fcc04e88997e4c0c589faf2c944b68b8cf34731 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Wed, 30 Jul 2025 21:27:55 -0400 Subject: [PATCH] fix: Use correct cutoff block number --- Cargo.lock | 1 + Cargo.toml | 1 + src/node/network/mod.rs | 9 ++++++++- src/pseudo_peer/config.rs | 6 ++++-- src/pseudo_peer/sources/hl_node.rs | 24 +++++++++--------------- 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 30b391f69..64e868314 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9341,6 +9341,7 @@ dependencies = [ "reth-rpc", "reth-rpc-engine-api", "reth-rpc-eth-api", + "reth-stages-types", "reth-tracing", "reth-transaction-pool", "reth-trie-common", diff --git a/Cargo.toml b/Cargo.toml index a45fba461..60e38a5e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ reth-trie-common = { git = "https://github.com/sprites0/reth", rev = "fc754e5983 reth-trie-db = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-codecs = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-transaction-pool = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } +reth-stages-types = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } revm = { version = "26.0.1" } # alloy dependencies diff --git a/src/node/network/mod.rs b/src/node/network/mod.rs index 75b8f689a..5028bbafc 100644 --- a/src/node/network/mod.rs +++ b/src/node/network/mod.rs @@ -24,6 +24,8 @@ use reth_eth_wire::{BasicNetworkPrimitives, NewBlock, NewBlockPayload}; use reth_ethereum_primitives::PooledTransactionVariant; use reth_network::{NetworkConfig, NetworkHandle, NetworkManager}; use reth_network_api::PeersInfo; +use reth_provider::StageCheckpointReader; +use reth_stages_types::StageId; use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex}; use tracing::info; @@ -235,8 +237,13 @@ where let chain_spec = ctx.chain_spec(); info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized"); + let next_block_number = + ctx.provider().get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number + + 1; + ctx.task_executor().spawn_critical("pseudo peer", async move { - let block_source = block_source_config.create_cached_block_source().await; + let block_source = + block_source_config.create_cached_block_source(next_block_number).await; start_pseudo_peer(chain_spec, local_node_record.to_string(), block_source) .await .unwrap(); diff --git a/src/pseudo_peer/config.rs b/src/pseudo_peer/config.rs index e12d3636b..594e0a9fd 100644 --- a/src/pseudo_peer/config.rs +++ b/src/pseudo_peer/config.rs @@ -72,6 +72,7 @@ impl BlockSourceConfig { pub async fn create_block_source_from_node( &self, + next_block_number: u64, fallback_block_source: BlockSourceBoxed, ) -> BlockSourceBoxed { let Some(block_source_from_node) = self.block_source_from_node.as_ref() else { @@ -82,14 +83,15 @@ impl BlockSourceConfig { HlNodeBlockSource::new( fallback_block_source, PathBuf::from(block_source_from_node.clone()), + next_block_number, ) .await, )) } - pub async fn create_cached_block_source(&self) -> BlockSourceBoxed { + pub async fn create_cached_block_source(&self, next_block_number: u64) -> BlockSourceBoxed { let block_source = self.create_block_source().await; - let block_source = self.create_block_source_from_node(block_source).await; + let block_source = self.create_block_source_from_node(next_block_number, block_source).await; Arc::new(Box::new(CachedBlockSource::new(block_source))) } } diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 4b27287a4..531d94d93 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -4,7 +4,7 @@ use std::{ sync::Arc, }; -use eyre::{Context, ContextCompat}; +use eyre::Context; use futures::future::BoxFuture; use reth_network::cache::LruMap; use serde::Deserialize; @@ -81,7 +81,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan continue; } - let Ok((parsed_block, height)) = line_to_evm_block(&line) else { + let Ok((parsed_block, height)) = line_to_evm_block(line) else { warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); continue; }; @@ -223,11 +223,11 @@ impl HlNodeBlockSource { continue; } } else { - warn!("Failed to parse last line of file, fallback to slow path: {:?}", file); + warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile); } let ScanResult { next_expected_height, new_blocks } = - scan_hour_file(&file, &mut 0, next_height); + scan_hour_file(&subfile, &mut 0, next_height); for blk in new_blocks { let EvmBlock::Reth115(b) = &blk.block; u_cache.insert(b.header.header.number, blk); @@ -298,29 +298,23 @@ impl HlNodeBlockSource { }); } - pub(crate) async fn run(&self) -> eyre::Result<()> { - let latest_block_number = self - .fallback - .find_latest_block_number() - .await - .context("Failed to find latest block number")?; - + pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> { let EvmBlock::Reth115(latest_block) = - self.fallback.collect_block(latest_block_number).await?.block; + self.fallback.collect_block(next_block_number).await?.block; let latest_block_ts = latest_block.header.header.timestamp; - self.start_local_ingest_loop(latest_block_number, latest_block_ts).await; + self.start_local_ingest_loop(next_block_number, latest_block_ts).await; Ok(()) } - pub async fn new(fallback: BlockSourceBoxed, local_ingest_dir: PathBuf) -> Self { + pub async fn new(fallback: BlockSourceBoxed, local_ingest_dir: PathBuf, next_block_number: u64) -> Self { let block_source = HlNodeBlockSource { fallback, local_ingest_dir, local_blocks_cache: Arc::new(Mutex::new(LruMap::new(CACHE_SIZE))), }; - block_source.run().await.unwrap(); + block_source.run(next_block_number).await.unwrap(); block_source } }