fix: Use correct cutoff block number

This commit is contained in:
sprites0
2025-07-30 21:27:55 -04:00
parent a2e978dc0c
commit 9fcc04e889
5 changed files with 23 additions and 18 deletions

1
Cargo.lock generated
View File

@ -9341,6 +9341,7 @@ dependencies = [
"reth-rpc", "reth-rpc",
"reth-rpc-engine-api", "reth-rpc-engine-api",
"reth-rpc-eth-api", "reth-rpc-eth-api",
"reth-stages-types",
"reth-tracing", "reth-tracing",
"reth-transaction-pool", "reth-transaction-pool",
"reth-trie-common", "reth-trie-common",

View File

@ -61,6 +61,7 @@ reth-trie-common = { git = "https://github.com/sprites0/reth", rev = "fc754e5983
reth-trie-db = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-trie-db = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-codecs = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-codecs = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-transaction-pool = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-transaction-pool = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-stages-types = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
revm = { version = "26.0.1" } revm = { version = "26.0.1" }
# alloy dependencies # alloy dependencies

View File

@ -24,6 +24,8 @@ use reth_eth_wire::{BasicNetworkPrimitives, NewBlock, NewBlockPayload};
use reth_ethereum_primitives::PooledTransactionVariant; use reth_ethereum_primitives::PooledTransactionVariant;
use reth_network::{NetworkConfig, NetworkHandle, NetworkManager}; use reth_network::{NetworkConfig, NetworkHandle, NetworkManager};
use reth_network_api::PeersInfo; use reth_network_api::PeersInfo;
use reth_provider::StageCheckpointReader;
use reth_stages_types::StageId;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::sync::{mpsc, oneshot, Mutex};
use tracing::info; use tracing::info;
@ -235,8 +237,13 @@ where
let chain_spec = ctx.chain_spec(); let chain_spec = ctx.chain_spec();
info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized"); info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized");
let next_block_number =
ctx.provider().get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number
+ 1;
ctx.task_executor().spawn_critical("pseudo peer", async move { ctx.task_executor().spawn_critical("pseudo peer", async move {
let block_source = block_source_config.create_cached_block_source().await; let block_source =
block_source_config.create_cached_block_source(next_block_number).await;
start_pseudo_peer(chain_spec, local_node_record.to_string(), block_source) start_pseudo_peer(chain_spec, local_node_record.to_string(), block_source)
.await .await
.unwrap(); .unwrap();

View File

@ -72,6 +72,7 @@ impl BlockSourceConfig {
pub async fn create_block_source_from_node( pub async fn create_block_source_from_node(
&self, &self,
next_block_number: u64,
fallback_block_source: BlockSourceBoxed, fallback_block_source: BlockSourceBoxed,
) -> BlockSourceBoxed { ) -> BlockSourceBoxed {
let Some(block_source_from_node) = self.block_source_from_node.as_ref() else { let Some(block_source_from_node) = self.block_source_from_node.as_ref() else {
@ -82,14 +83,15 @@ impl BlockSourceConfig {
HlNodeBlockSource::new( HlNodeBlockSource::new(
fallback_block_source, fallback_block_source,
PathBuf::from(block_source_from_node.clone()), PathBuf::from(block_source_from_node.clone()),
next_block_number,
) )
.await, .await,
)) ))
} }
pub async fn create_cached_block_source(&self) -> BlockSourceBoxed { pub async fn create_cached_block_source(&self, next_block_number: u64) -> BlockSourceBoxed {
let block_source = self.create_block_source().await; let block_source = self.create_block_source().await;
let block_source = self.create_block_source_from_node(block_source).await; let block_source = self.create_block_source_from_node(next_block_number, block_source).await;
Arc::new(Box::new(CachedBlockSource::new(block_source))) Arc::new(Box::new(CachedBlockSource::new(block_source)))
} }
} }

View File

@ -4,7 +4,7 @@ use std::{
sync::Arc, sync::Arc,
}; };
use eyre::{Context, ContextCompat}; use eyre::Context;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use reth_network::cache::LruMap; use reth_network::cache::LruMap;
use serde::Deserialize; use serde::Deserialize;
@ -81,7 +81,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan
continue; continue;
} }
let Ok((parsed_block, height)) = line_to_evm_block(&line) else { let Ok((parsed_block, height)) = line_to_evm_block(line) else {
warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line));
continue; continue;
}; };
@ -223,11 +223,11 @@ impl HlNodeBlockSource {
continue; continue;
} }
} else { } else {
warn!("Failed to parse last line of file, fallback to slow path: {:?}", file); warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile);
} }
let ScanResult { next_expected_height, new_blocks } = let ScanResult { next_expected_height, new_blocks } =
scan_hour_file(&file, &mut 0, next_height); scan_hour_file(&subfile, &mut 0, next_height);
for blk in new_blocks { for blk in new_blocks {
let EvmBlock::Reth115(b) = &blk.block; let EvmBlock::Reth115(b) = &blk.block;
u_cache.insert(b.header.header.number, blk); u_cache.insert(b.header.header.number, blk);
@ -298,29 +298,23 @@ impl HlNodeBlockSource {
}); });
} }
pub(crate) async fn run(&self) -> eyre::Result<()> { pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> {
let latest_block_number = self
.fallback
.find_latest_block_number()
.await
.context("Failed to find latest block number")?;
let EvmBlock::Reth115(latest_block) = let EvmBlock::Reth115(latest_block) =
self.fallback.collect_block(latest_block_number).await?.block; self.fallback.collect_block(next_block_number).await?.block;
let latest_block_ts = latest_block.header.header.timestamp; let latest_block_ts = latest_block.header.header.timestamp;
self.start_local_ingest_loop(latest_block_number, latest_block_ts).await; self.start_local_ingest_loop(next_block_number, latest_block_ts).await;
Ok(()) Ok(())
} }
pub async fn new(fallback: BlockSourceBoxed, local_ingest_dir: PathBuf) -> Self { pub async fn new(fallback: BlockSourceBoxed, local_ingest_dir: PathBuf, next_block_number: u64) -> Self {
let block_source = HlNodeBlockSource { let block_source = HlNodeBlockSource {
fallback, fallback,
local_ingest_dir, local_ingest_dir,
local_blocks_cache: Arc::new(Mutex::new(LruMap::new(CACHE_SIZE))), local_blocks_cache: Arc::new(Mutex::new(LruMap::new(CACHE_SIZE))),
}; };
block_source.run().await.unwrap(); block_source.run(next_block_number).await.unwrap();
block_source block_source
} }
} }