@ -1,11 +1,12 @@
use std ::collections ::BTreeMap ;
use std ::io ::{ BufRead , BufReader } ;
use std ::path ::{ Path , PathBuf } ;
use std ::sync ::Arc ;
use std ::{
collections ::BTreeMap ,
io ::{ BufRead , BufReader } ,
path ::{ Path , PathBuf } ,
sync ::Arc ,
} ;
use alloy_consensus ::{ BlockBody , BlockHeader , Transaction } ;
use alloy_primitives ::TxKind ;
use alloy_primitives ::{ Address , PrimitiveSignature , B256 , U256 } ;
use alloy_primitives ::{ Address , PrimitiveSignature , TxKind , B256 , U256 } ;
use alloy_rpc_types ::engine ::{
ExecutionPayloadEnvelopeV3 , ForkchoiceState , PayloadAttributes , PayloadStatusEnum ,
} ;
@ -14,9 +15,7 @@ use reth::network::PeersHandleProvider;
use reth_chainspec ::{ EthChainSpec , EthereumHardforks } ;
use reth_hyperliquid_types ::{ PrecompileData , PrecompilesCache } ;
use reth_node_api ::{ Block , FullNodeComponents , PayloadTypes } ;
use reth_node_builder ::EngineTypes ;
use reth_node_builder ::NodeTypesWithEngine ;
use reth_node_builder ::{ rpc ::RethRpcAddOns , FullNode } ;
use reth_node_builder ::{ rpc ::RethRpcAddOns , EngineTypes , FullNode , NodeTypesWithEngine } ;
use reth_payload_builder ::{ EthBuiltPayload , EthPayloadBuilderAttributes , PayloadId } ;
use reth_primitives ::{ Transaction as TypedTransaction , TransactionSigned } ;
use reth_provider ::{ BlockHashReader , BlockReader , StageCheckpointReader } ;
@ -28,8 +27,10 @@ use time::{format_description, Duration, OffsetDateTime};
use tokio ::sync ::Mutex ;
use tracing ::{ debug , info } ;
use crate ::serialized ::{ BlockAndReceipts , EvmBlock } ;
use crate ::spot_meta ::erc20_contract_to_spot_token ;
use crate ::{
serialized ::{ BlockAndReceipts , EvmBlock } ,
spot_meta ::erc20_contract_to_spot_token ,
} ;
/// Poll interval when tailing an *open* hourly file.
const TAIL_INTERVAL : std ::time ::Duration = std ::time ::Duration ::from_millis ( 25 ) ;
@ -51,17 +52,25 @@ struct ScanResult {
new_blocks : Vec < BlockAndReceipts > ,
}
fn scan_hour_file ( path : & Path , last_line : & mut usize , start_height : u64 ) -> ScanResult {
fn scan_hour_file (
path : & Path ,
last_line : & mut usize ,
start_height : u64 ,
) -> Result < ScanResult , Box < dyn std ::error ::Error + Send + Sync > > {
// info!(
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
// path, start_height, last_line
// );
let file = std ::fs ::File ::open ( path ) . expect ( " Failed to open hour file path " ) ;
let file = std ::fs ::File ::open ( path )
. map_err ( | e | format! ( " Failed to open hour file path {} : {} " , path . display ( ) , e ) ) ? ;
let reader = BufReader ::new ( file ) ;
let mut new_blocks = Vec ::< BlockAndReceipts > ::new ( ) ;
let mut last_height = start_height ;
let lines : Vec < String > = reader . lines ( ) . collect ::< Result < _ , _ > > ( ) . unwrap ( ) ;
let lines : Vec < String > = reader
. lines ( )
. collect ::< Result < Vec < _ > , _ > > ( )
. map_err ( | e | format! ( " Failed to read lines from file {} : {} " , path . display ( ) , e ) ) ? ;
let skip = if * last_line = = 0 { 0 } else { ( last_line . clone ( ) ) - 1 } ;
for ( line_idx , line ) in lines . iter ( ) . enumerate ( ) . skip ( skip ) {
@ -109,7 +118,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan
}
}
ScanResult { next_expected_height : last_height + 1 , new_blocks }
Ok ( ScanResult { next_expected_height : last_height + 1 , new_blocks } )
}
async fn submit_payload < Engine : PayloadTypes + EngineTypes > (
@ -128,7 +137,9 @@ async fn submit_payload<Engine: PayloadTypes + EngineTypes>(
engine_api_client ,
envelope . execution_payload ,
versioned_hashes ,
payload_builder_attributes . parent_beacon_block_root . unwrap ( ) ,
payload_builder_attributes
. parent_beacon_block_root
. ok_or ( " Missing required parent_beacon_block_root " ) ? ,
)
. await ?
} ;
@ -144,7 +155,9 @@ fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime {
}
fn date_from_datetime ( dt : OffsetDateTime ) -> String {
dt . format ( & format_description ::parse ( " [year][month][day] " ) . unwrap ( ) ) . unwrap ( )
// Format string is constant and guaranteed to be valid
dt . format ( & format_description ::parse ( " [year][month][day] " ) . expect ( " Valid format string " ) )
. expect ( " DateTime formatting should always succeed with valid format " )
}
impl BlockIngest {
@ -166,7 +179,18 @@ impl BlockIngest {
let path = format! ( " {} / {f} / {s} / {height} .rmp.lz4 " , self . ingest_dir . to_string_lossy ( ) ) ;
let file = std ::fs ::read ( path ) . ok ( ) ? ;
let mut decoder = lz4_flex ::frame ::FrameDecoder ::new ( & file [ .. ] ) ;
let blocks : Vec < BlockAndReceipts > = rmp_serde ::from_read ( & mut decoder ) . unwrap ( ) ;
let blocks : Vec < BlockAndReceipts > = rmp_serde ::from_read ( & mut decoder )
. map_err ( | e | {
tracing ::error! ( " Failed to deserialize block data for height {}: {} " , height , e ) ;
e
} )
. ok ( ) ? ;
if blocks . is_empty ( ) {
tracing ::error! ( " Deserialized empty blocks vector for height {} " , height ) ;
return None ;
}
info! ( " Returning s3 synced block for @ Height [{height}] " ) ;
Some ( blocks [ 0 ] . clone ( ) )
}
@ -186,11 +210,11 @@ impl BlockIngest {
let mut next_height = current_head ;
let mut dt = datetime_from_timestamp ( current_ts )
. replace_minute ( 0 )
. unwrap ( )
. expect ( " Valid minute replacement " )
. replace_second ( 0 )
. unwrap ( )
. expect ( " Valid second replacement " )
. replace_nanosecond ( 0 )
. unwrap ( ) ;
. expect ( " Valid nanosecond replacement " ) ;
let mut hour = dt . hour ( ) ;
let mut day_str = date_from_datetime ( dt ) ;
@ -200,8 +224,9 @@ impl BlockIngest {
let hour_file = root . join ( HOURLY_SUBDIR ) . join ( & day_str ) . join ( format! ( " {hour} " ) ) ;
if hour_file . exists ( ) {
let S canR esult { next_expected_height , new_blocks } =
scan_hour_file ( & hour_file , & mut last_line , next_height ) ;
let s can_r esult = scan_hour_file ( & hour_file , & mut last_line , next_height ) ;
match scan_result {
Ok ( ScanResult { next_expected_height , new_blocks } ) = > {
if ! new_blocks . is_empty ( ) {
let mut u_cache = cache . lock ( ) . await ;
let mut u_pre_cache = precompiles_cache . lock ( ) ;
@ -222,6 +247,16 @@ impl BlockIngest {
next_height = next_expected_height ;
}
}
Err ( e ) = > {
tracing ::error! (
" Failed to scan hour file {}: {} " ,
hour_file . display ( ) ,
e
) ;
// Continue processing but skip this file
}
}
}
// Decide whether the *current* hour file is closed (past) or
// still live. If it’ s in the past by > 1 h, move to next hour;
@ -269,8 +304,10 @@ impl BlockIngest {
let mut height = head + 1 ;
let mut previous_hash = provider . block_hash ( head ) ? . unwrap_or ( genesis_hash ) ;
let mut previous_timestamp =
std ::time ::SystemTime ::now ( ) . duration_since ( std ::time ::UNIX_EPOCH ) . unwrap ( ) . as_millis ( ) ;
let mut previous_timestamp = std ::time ::SystemTime ::now ( )
. duration_since ( std ::time ::UNIX_EPOCH )
. expect ( " System time should be after UNIX epoch " )
. as_millis ( ) ;
let engine_api = node . auth_server_handle ( ) . http_client ( ) ;
let mut evm_map = erc20_contract_to_spot_token ( node . chain_spec ( ) . chain_id ( ) ) . await ? ;
@ -278,8 +315,8 @@ impl BlockIngest {
const MINIMUM_TIMESTAMP : u64 = 1739849780 ;
let current_block_timestamp : u64 = provider
. block_by_number ( head )
. expect ( " Failed to fetch current block in db " )
. expect ( " Block does not exist" )
. map_err ( | e | format! ( " Database error fetching block {} : {} " , head , e ) ) ?
. ok_or_else ( | | format! ( " Block {} does not exist in database " , head ) ) ?
. into_header ( )
. timestamp ( ) ;
@ -376,11 +413,11 @@ impl BlockIngest {
let current_timestamp = std ::time ::SystemTime ::now ( )
. duration_since ( std ::time ::UNIX_EPOCH )
. unwrap ( )
. expect ( " System time should be after UNIX epoch " )
. as_millis ( ) ;
if height % 100 = = 0 | | current_timestamp - previous_timestamp > 100 {
EngineApiClient ::< Engine > ::fork_choice_updated_v2 (
if let Err ( e ) = EngineApiClient ::< Engine > ::fork_choice_updated_v2 (
& engine_api ,
ForkchoiceState {
head_block_hash : block_hash ,
@ -390,7 +427,11 @@ impl BlockIngest {
None ,
)
. await
. unwrap ( ) ;
{
tracing ::error! ( " Failed to update fork choice for block {}: {} " , height , e ) ;
// Continue processing but log the failure - don't panic the entire
// blockchain
}
previous_timestamp = current_timestamp ;
}
previous_hash = block_hash ;