From e72d7df2eb7282d450315ab1b3a7eaf76e4463df Mon Sep 17 00:00:00 2001 From: kamalbuilds Date: Mon, 25 Aug 2025 15:04:15 +0530 Subject: [PATCH] based on the review --- bin/reth/src/block_ingest.rs | 85 +++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 36 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index 5ce1892db..d394c558e 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -1,11 +1,12 @@ -use std::collections::BTreeMap; -use std::io::{BufRead, BufReader}; -use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::{ + collections::BTreeMap, + io::{BufRead, BufReader}, + path::{Path, PathBuf}, + sync::Arc, +}; use alloy_consensus::{BlockBody, BlockHeader, Transaction}; -use alloy_primitives::TxKind; -use alloy_primitives::{Address, PrimitiveSignature, B256, U256}; +use alloy_primitives::{Address, PrimitiveSignature, TxKind, B256, U256}; use alloy_rpc_types::engine::{ ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum, }; @@ -14,9 +15,7 @@ use reth::network::PeersHandleProvider; use reth_chainspec::{EthChainSpec, EthereumHardforks}; use reth_hyperliquid_types::{PrecompileData, PrecompilesCache}; use reth_node_api::{Block, FullNodeComponents, PayloadTypes}; -use reth_node_builder::EngineTypes; -use reth_node_builder::NodeTypesWithEngine; -use reth_node_builder::{rpc::RethRpcAddOns, FullNode}; +use reth_node_builder::{rpc::RethRpcAddOns, EngineTypes, FullNode, NodeTypesWithEngine}; use reth_payload_builder::{EthBuiltPayload, EthPayloadBuilderAttributes, PayloadId}; use reth_primitives::{Transaction as TypedTransaction, TransactionSigned}; use reth_provider::{BlockHashReader, BlockReader, StageCheckpointReader}; @@ -28,8 +27,10 @@ use time::{format_description, Duration, OffsetDateTime}; use tokio::sync::Mutex; use tracing::{debug, info}; -use crate::serialized::{BlockAndReceipts, EvmBlock}; -use crate::spot_meta::erc20_contract_to_spot_token; +use crate::{ + serialized::{BlockAndReceipts, EvmBlock}, + spot_meta::erc20_contract_to_spot_token, +}; /// Poll interval when tailing an *open* hourly file. const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); @@ -51,7 +52,11 @@ struct ScanResult { new_blocks: Vec, } -fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Result> { +fn scan_hour_file( + path: &Path, + last_line: &mut usize, + start_height: u64, +) -> Result> { // info!( // "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}", // path, start_height, last_line @@ -62,7 +67,8 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Resu let mut new_blocks = Vec::::new(); let mut last_height = start_height; - let lines: Vec = reader.lines() + let lines: Vec = reader + .lines() .collect::, _>>() .map_err(|e| format!("Failed to read lines from file {}: {}", path.display(), e))?; let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 }; @@ -131,7 +137,8 @@ async fn submit_payload( engine_api_client, envelope.execution_payload, versioned_hashes, - payload_builder_attributes.parent_beacon_block_root + payload_builder_attributes + .parent_beacon_block_root .ok_or("Missing required parent_beacon_block_root")?, ) .await? @@ -178,12 +185,12 @@ impl BlockIngest { e }) .ok()?; - + if blocks.is_empty() { tracing::error!("Deserialized empty blocks vector for height {}", height); return None; } - + info!("Returning s3 synced block for @ Height [{height}]"); Some(blocks[0].clone()) } @@ -221,27 +228,31 @@ impl BlockIngest { match scan_result { Ok(ScanResult { next_expected_height, new_blocks }) => { if !new_blocks.is_empty() { - let mut u_cache = cache.lock().await; - let mut u_pre_cache = precompiles_cache.lock(); - for blk in new_blocks { - let precompiles = PrecompileData { - precompiles: blk.read_precompile_calls.clone(), - highest_precompile_address: blk.highest_precompile_address, - }; - let h = match &blk.block { - EvmBlock::Reth115(b) => { - let block_number = b.header().number() as u64; - block_number + let mut u_cache = cache.lock().await; + let mut u_pre_cache = precompiles_cache.lock(); + for blk in new_blocks { + let precompiles = PrecompileData { + precompiles: blk.read_precompile_calls.clone(), + highest_precompile_address: blk.highest_precompile_address, + }; + let h = match &blk.block { + EvmBlock::Reth115(b) => { + let block_number = b.header().number() as u64; + block_number + } + }; + u_cache.insert(h, blk); + u_pre_cache.insert(h, precompiles); } - }; - u_cache.insert(h, blk); - u_pre_cache.insert(h, precompiles); - } - next_height = next_expected_height; - } + next_height = next_expected_height; + } } Err(e) => { - tracing::error!("Failed to scan hour file {}: {}", hour_file.display(), e); + tracing::error!( + "Failed to scan hour file {}: {}", + hour_file.display(), + e + ); // Continue processing but skip this file } } @@ -415,9 +426,11 @@ impl BlockIngest { }, None, ) - .await { + .await + { tracing::error!("Failed to update fork choice for block {}: {}", height, e); - // Continue processing but log the failure - don't panic the entire blockchain + // Continue processing but log the failure - don't panic the entire + // blockchain } previous_timestamp = current_timestamp; }