based on the review

This commit is contained in:
kamalbuilds
2025-08-25 15:04:15 +05:30
parent 4e88d19747
commit e72d7df2eb

View File

@ -1,11 +1,12 @@
use std::collections::BTreeMap; use std::{
use std::io::{BufRead, BufReader}; collections::BTreeMap,
use std::path::{Path, PathBuf}; io::{BufRead, BufReader},
use std::sync::Arc; path::{Path, PathBuf},
sync::Arc,
};
use alloy_consensus::{BlockBody, BlockHeader, Transaction}; use alloy_consensus::{BlockBody, BlockHeader, Transaction};
use alloy_primitives::TxKind; use alloy_primitives::{Address, PrimitiveSignature, TxKind, B256, U256};
use alloy_primitives::{Address, PrimitiveSignature, B256, U256};
use alloy_rpc_types::engine::{ use alloy_rpc_types::engine::{
ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum, ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
}; };
@ -14,9 +15,7 @@ use reth::network::PeersHandleProvider;
use reth_chainspec::{EthChainSpec, EthereumHardforks}; use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_hyperliquid_types::{PrecompileData, PrecompilesCache}; use reth_hyperliquid_types::{PrecompileData, PrecompilesCache};
use reth_node_api::{Block, FullNodeComponents, PayloadTypes}; use reth_node_api::{Block, FullNodeComponents, PayloadTypes};
use reth_node_builder::EngineTypes; use reth_node_builder::{rpc::RethRpcAddOns, EngineTypes, FullNode, NodeTypesWithEngine};
use reth_node_builder::NodeTypesWithEngine;
use reth_node_builder::{rpc::RethRpcAddOns, FullNode};
use reth_payload_builder::{EthBuiltPayload, EthPayloadBuilderAttributes, PayloadId}; use reth_payload_builder::{EthBuiltPayload, EthPayloadBuilderAttributes, PayloadId};
use reth_primitives::{Transaction as TypedTransaction, TransactionSigned}; use reth_primitives::{Transaction as TypedTransaction, TransactionSigned};
use reth_provider::{BlockHashReader, BlockReader, StageCheckpointReader}; use reth_provider::{BlockHashReader, BlockReader, StageCheckpointReader};
@ -28,8 +27,10 @@ use time::{format_description, Duration, OffsetDateTime};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::{debug, info}; use tracing::{debug, info};
use crate::serialized::{BlockAndReceipts, EvmBlock}; use crate::{
use crate::spot_meta::erc20_contract_to_spot_token; serialized::{BlockAndReceipts, EvmBlock},
spot_meta::erc20_contract_to_spot_token,
};
/// Poll interval when tailing an *open* hourly file. /// Poll interval when tailing an *open* hourly file.
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
@ -51,7 +52,11 @@ struct ScanResult {
new_blocks: Vec<BlockAndReceipts>, new_blocks: Vec<BlockAndReceipts>,
} }
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Result<ScanResult, Box<dyn std::error::Error>> { fn scan_hour_file(
path: &Path,
last_line: &mut usize,
start_height: u64,
) -> Result<ScanResult, Box<dyn std::error::Error + Send + Sync>> {
// info!( // info!(
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}", // "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
// path, start_height, last_line // path, start_height, last_line
@ -62,7 +67,8 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Resu
let mut new_blocks = Vec::<BlockAndReceipts>::new(); let mut new_blocks = Vec::<BlockAndReceipts>::new();
let mut last_height = start_height; let mut last_height = start_height;
let lines: Vec<String> = reader.lines() let lines: Vec<String> = reader
.lines()
.collect::<Result<Vec<_>, _>>() .collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("Failed to read lines from file {}: {}", path.display(), e))?; .map_err(|e| format!("Failed to read lines from file {}: {}", path.display(), e))?;
let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 }; let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 };
@ -131,7 +137,8 @@ async fn submit_payload<Engine: PayloadTypes + EngineTypes>(
engine_api_client, engine_api_client,
envelope.execution_payload, envelope.execution_payload,
versioned_hashes, versioned_hashes,
payload_builder_attributes.parent_beacon_block_root payload_builder_attributes
.parent_beacon_block_root
.ok_or("Missing required parent_beacon_block_root")?, .ok_or("Missing required parent_beacon_block_root")?,
) )
.await? .await?
@ -221,27 +228,31 @@ impl BlockIngest {
match scan_result { match scan_result {
Ok(ScanResult { next_expected_height, new_blocks }) => { Ok(ScanResult { next_expected_height, new_blocks }) => {
if !new_blocks.is_empty() { if !new_blocks.is_empty() {
let mut u_cache = cache.lock().await; let mut u_cache = cache.lock().await;
let mut u_pre_cache = precompiles_cache.lock(); let mut u_pre_cache = precompiles_cache.lock();
for blk in new_blocks { for blk in new_blocks {
let precompiles = PrecompileData { let precompiles = PrecompileData {
precompiles: blk.read_precompile_calls.clone(), precompiles: blk.read_precompile_calls.clone(),
highest_precompile_address: blk.highest_precompile_address, highest_precompile_address: blk.highest_precompile_address,
}; };
let h = match &blk.block { let h = match &blk.block {
EvmBlock::Reth115(b) => { EvmBlock::Reth115(b) => {
let block_number = b.header().number() as u64; let block_number = b.header().number() as u64;
block_number block_number
}
};
u_cache.insert(h, blk);
u_pre_cache.insert(h, precompiles);
} }
}; next_height = next_expected_height;
u_cache.insert(h, blk); }
u_pre_cache.insert(h, precompiles);
}
next_height = next_expected_height;
}
} }
Err(e) => { Err(e) => {
tracing::error!("Failed to scan hour file {}: {}", hour_file.display(), e); tracing::error!(
"Failed to scan hour file {}: {}",
hour_file.display(),
e
);
// Continue processing but skip this file // Continue processing but skip this file
} }
} }
@ -415,9 +426,11 @@ impl BlockIngest {
}, },
None, None,
) )
.await { .await
{
tracing::error!("Failed to update fork choice for block {}: {}", height, e); tracing::error!("Failed to update fork choice for block {}: {}", height, e);
// Continue processing but log the failure - don't panic the entire blockchain // Continue processing but log the failure - don't panic the entire
// blockchain
} }
previous_timestamp = current_timestamp; previous_timestamp = current_timestamp;
} }