diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index a259cb29e..d394c558e 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -1,11 +1,12 @@ -use std::collections::BTreeMap; -use std::io::{BufRead, BufReader}; -use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::{ + collections::BTreeMap, + io::{BufRead, BufReader}, + path::{Path, PathBuf}, + sync::Arc, +}; use alloy_consensus::{BlockBody, BlockHeader, Transaction}; -use alloy_primitives::TxKind; -use alloy_primitives::{Address, PrimitiveSignature, B256, U256}; +use alloy_primitives::{Address, PrimitiveSignature, TxKind, B256, U256}; use alloy_rpc_types::engine::{ ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum, }; @@ -14,9 +15,7 @@ use reth::network::PeersHandleProvider; use reth_chainspec::{EthChainSpec, EthereumHardforks}; use reth_hyperliquid_types::{PrecompileData, PrecompilesCache}; use reth_node_api::{Block, FullNodeComponents, PayloadTypes}; -use reth_node_builder::EngineTypes; -use reth_node_builder::NodeTypesWithEngine; -use reth_node_builder::{rpc::RethRpcAddOns, FullNode}; +use reth_node_builder::{rpc::RethRpcAddOns, EngineTypes, FullNode, NodeTypesWithEngine}; use reth_payload_builder::{EthBuiltPayload, EthPayloadBuilderAttributes, PayloadId}; use reth_primitives::{Transaction as TypedTransaction, TransactionSigned}; use reth_provider::{BlockHashReader, BlockReader, StageCheckpointReader}; @@ -28,8 +27,10 @@ use time::{format_description, Duration, OffsetDateTime}; use tokio::sync::Mutex; use tracing::{debug, info}; -use crate::serialized::{BlockAndReceipts, EvmBlock}; -use crate::spot_meta::erc20_contract_to_spot_token; +use crate::{ + serialized::{BlockAndReceipts, EvmBlock}, + spot_meta::erc20_contract_to_spot_token, +}; /// Poll interval when tailing an *open* hourly file. const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); @@ -51,17 +52,25 @@ struct ScanResult { new_blocks: Vec, } -fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult { +fn scan_hour_file( + path: &Path, + last_line: &mut usize, + start_height: u64, +) -> Result> { // info!( // "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}", // path, start_height, last_line // ); - let file = std::fs::File::open(path).expect("Failed to open hour file path"); + let file = std::fs::File::open(path) + .map_err(|e| format!("Failed to open hour file path {}: {}", path.display(), e))?; let reader = BufReader::new(file); let mut new_blocks = Vec::::new(); let mut last_height = start_height; - let lines: Vec = reader.lines().collect::>().unwrap(); + let lines: Vec = reader + .lines() + .collect::, _>>() + .map_err(|e| format!("Failed to read lines from file {}: {}", path.display(), e))?; let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 }; for (line_idx, line) in lines.iter().enumerate().skip(skip) { @@ -109,7 +118,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan } } - ScanResult { next_expected_height: last_height + 1, new_blocks } + Ok(ScanResult { next_expected_height: last_height + 1, new_blocks }) } async fn submit_payload( @@ -128,7 +137,9 @@ async fn submit_payload( engine_api_client, envelope.execution_payload, versioned_hashes, - payload_builder_attributes.parent_beacon_block_root.unwrap(), + payload_builder_attributes + .parent_beacon_block_root + .ok_or("Missing required parent_beacon_block_root")?, ) .await? }; @@ -144,7 +155,9 @@ fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime { } fn date_from_datetime(dt: OffsetDateTime) -> String { - dt.format(&format_description::parse("[year][month][day]").unwrap()).unwrap() + // Format string is constant and guaranteed to be valid + dt.format(&format_description::parse("[year][month][day]").expect("Valid format string")) + .expect("DateTime formatting should always succeed with valid format") } impl BlockIngest { @@ -166,7 +179,18 @@ impl BlockIngest { let path = format!("{}/{f}/{s}/{height}.rmp.lz4", self.ingest_dir.to_string_lossy()); let file = std::fs::read(path).ok()?; let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]); - let blocks: Vec = rmp_serde::from_read(&mut decoder).unwrap(); + let blocks: Vec = rmp_serde::from_read(&mut decoder) + .map_err(|e| { + tracing::error!("Failed to deserialize block data for height {}: {}", height, e); + e + }) + .ok()?; + + if blocks.is_empty() { + tracing::error!("Deserialized empty blocks vector for height {}", height); + return None; + } + info!("Returning s3 synced block for @ Height [{height}]"); Some(blocks[0].clone()) } @@ -186,11 +210,11 @@ impl BlockIngest { let mut next_height = current_head; let mut dt = datetime_from_timestamp(current_ts) .replace_minute(0) - .unwrap() + .expect("Valid minute replacement") .replace_second(0) - .unwrap() + .expect("Valid second replacement") .replace_nanosecond(0) - .unwrap(); + .expect("Valid nanosecond replacement"); let mut hour = dt.hour(); let mut day_str = date_from_datetime(dt); @@ -200,26 +224,37 @@ impl BlockIngest { let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); if hour_file.exists() { - let ScanResult { next_expected_height, new_blocks } = - scan_hour_file(&hour_file, &mut last_line, next_height); - if !new_blocks.is_empty() { - let mut u_cache = cache.lock().await; - let mut u_pre_cache = precompiles_cache.lock(); - for blk in new_blocks { - let precompiles = PrecompileData { - precompiles: blk.read_precompile_calls.clone(), - highest_precompile_address: blk.highest_precompile_address, - }; - let h = match &blk.block { - EvmBlock::Reth115(b) => { - let block_number = b.header().number() as u64; - block_number + let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height); + match scan_result { + Ok(ScanResult { next_expected_height, new_blocks }) => { + if !new_blocks.is_empty() { + let mut u_cache = cache.lock().await; + let mut u_pre_cache = precompiles_cache.lock(); + for blk in new_blocks { + let precompiles = PrecompileData { + precompiles: blk.read_precompile_calls.clone(), + highest_precompile_address: blk.highest_precompile_address, + }; + let h = match &blk.block { + EvmBlock::Reth115(b) => { + let block_number = b.header().number() as u64; + block_number + } + }; + u_cache.insert(h, blk); + u_pre_cache.insert(h, precompiles); } - }; - u_cache.insert(h, blk); - u_pre_cache.insert(h, precompiles); + next_height = next_expected_height; + } + } + Err(e) => { + tracing::error!( + "Failed to scan hour file {}: {}", + hour_file.display(), + e + ); + // Continue processing but skip this file } - next_height = next_expected_height; } } @@ -269,8 +304,10 @@ impl BlockIngest { let mut height = head + 1; let mut previous_hash = provider.block_hash(head)?.unwrap_or(genesis_hash); - let mut previous_timestamp = - std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis(); + let mut previous_timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("System time should be after UNIX epoch") + .as_millis(); let engine_api = node.auth_server_handle().http_client(); let mut evm_map = erc20_contract_to_spot_token(node.chain_spec().chain_id()).await?; @@ -278,8 +315,8 @@ impl BlockIngest { const MINIMUM_TIMESTAMP: u64 = 1739849780; let current_block_timestamp: u64 = provider .block_by_number(head) - .expect("Failed to fetch current block in db") - .expect("Block does not exist") + .map_err(|e| format!("Database error fetching block {}: {}", head, e))? + .ok_or_else(|| format!("Block {} does not exist in database", head))? .into_header() .timestamp(); @@ -376,11 +413,11 @@ impl BlockIngest { let current_timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .unwrap() + .expect("System time should be after UNIX epoch") .as_millis(); if height % 100 == 0 || current_timestamp - previous_timestamp > 100 { - EngineApiClient::::fork_choice_updated_v2( + if let Err(e) = EngineApiClient::::fork_choice_updated_v2( &engine_api, ForkchoiceState { head_block_hash: block_hash, @@ -390,7 +427,11 @@ impl BlockIngest { None, ) .await - .unwrap(); + { + tracing::error!("Failed to update fork choice for block {}: {}", height, e); + // Continue processing but log the failure - don't panic the entire + // blockchain + } previous_timestamp = current_timestamp; } previous_hash = block_hash; diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index d71933900..cfd579699 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -149,7 +149,7 @@ impl NetworkHandle { pub async fn transactions_handle(&self) -> Option> { let (tx, rx) = oneshot::channel(); let _ = self.manager().send(NetworkHandleMessage::GetTransactionsHandle(tx)); - rx.await.unwrap() + rx.await.ok().flatten() } /// Send message to gracefully shutdown node. @@ -266,7 +266,8 @@ impl PeersInfo for NetworkHandle { builder.udp6(local_node_record.udp_port); builder.tcp6(local_node_record.tcp_port); } - builder.build(&self.inner.secret_key).expect("valid enr") + builder.build(&self.inner.secret_key) + .expect("ENR builder should always succeed with valid IP and ports") } } diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers.rs index adc95caa2..6433cd2c0 100644 --- a/crates/net/network/src/peers.rs +++ b/crates/net/network/src/peers.rs @@ -647,8 +647,11 @@ impl PeersManager { // remove peer if it has been marked for removal if remove_peer { - let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist"); - self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id)); + if let Some((peer_id, _)) = self.peers.remove_entry(peer_id) { + self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id)); + } else { + tracing::warn!(target: "net::peers", "Attempted to remove non-existent peer: {:?}", peer_id); + } } else if let Some(backoff_until) = backoff_until { // otherwise, backoff the peer if marked as such self.backoff_peer_until(*peer_id, backoff_until); diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index f59055f84..cbcd72324 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -391,7 +391,7 @@ impl ActiveSession { }; self.terminate_message = Some((self.to_session_manager.inner().clone(), msg)); - self.poll_terminate_message(cx).expect("message is set") + self.poll_terminate_message(cx).unwrap_or(Poll::Ready(())) } /// Report back that this session has been closed due to an error @@ -402,7 +402,7 @@ impl ActiveSession { error, }; self.terminate_message = Some((self.to_session_manager.inner().clone(), msg)); - self.poll_terminate_message(cx).expect("message is set") + self.poll_terminate_message(cx).unwrap_or(Poll::Ready(())) } /// Starts the disconnect process diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 4d5e1324f..67c73595a 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -3202,4 +3202,4 @@ impl DBProvider for DatabaseProvider fn prune_modes_ref(&self) -> &PruneModes { self.prune_modes_ref() } -} +} \ No newline at end of file diff --git a/crates/storage/storage-api/src/database_provider.rs b/crates/storage/storage-api/src/database_provider.rs index a4c734677..90322ea5a 100644 --- a/crates/storage/storage-api/src/database_provider.rs +++ b/crates/storage/storage-api/src/database_provider.rs @@ -171,4 +171,4 @@ fn range_size_hint(range: &impl RangeBounds) -> Option { Bound::Unbounded => return None, }; end.checked_sub(start).map(|x| x as _) -} +} \ No newline at end of file