From 25738366e433589862756144f99b5e124b82ee70 Mon Sep 17 00:00:00 2001 From: kamalbuilds Date: Mon, 25 Aug 2025 00:56:29 +0530 Subject: [PATCH 1/6] fix the unwrap --- bin/reth/src/block_ingest.rs | 37 +++++++++++++++---- .../src/providers/database/provider.rs | 19 ++++++++-- 2 files changed, 45 insertions(+), 11 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index a259cb29e..670feeed2 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -51,17 +51,20 @@ struct ScanResult { new_blocks: Vec, } -fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult { +fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Result> { // info!( // "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}", // path, start_height, last_line // ); - let file = std::fs::File::open(path).expect("Failed to open hour file path"); + let file = std::fs::File::open(path) + .map_err(|e| format!("Failed to open hour file path {}: {}", path.display(), e))?; let reader = BufReader::new(file); let mut new_blocks = Vec::::new(); let mut last_height = start_height; - let lines: Vec = reader.lines().collect::>().unwrap(); + let lines: Vec = reader.lines() + .collect::, _>>() + .map_err(|e| format!("Failed to read lines from file {}: {}", path.display(), e))?; let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 }; for (line_idx, line) in lines.iter().enumerate().skip(skip) { @@ -109,7 +112,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan } } - ScanResult { next_expected_height: last_height + 1, new_blocks } + Ok(ScanResult { next_expected_height: last_height + 1, new_blocks }) } async fn submit_payload( @@ -166,7 +169,18 @@ impl BlockIngest { let path = format!("{}/{f}/{s}/{height}.rmp.lz4", self.ingest_dir.to_string_lossy()); let file = std::fs::read(path).ok()?; let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]); - let blocks: Vec = rmp_serde::from_read(&mut decoder).unwrap(); + let blocks: Vec = rmp_serde::from_read(&mut decoder) + .map_err(|e| { + tracing::error!("Failed to deserialize block data for height {}: {}", height, e); + e + }) + .ok()?; + + if blocks.is_empty() { + tracing::error!("Deserialized empty blocks vector for height {}", height); + return None; + } + info!("Returning s3 synced block for @ Height [{height}]"); Some(blocks[0].clone()) } @@ -200,9 +214,10 @@ impl BlockIngest { let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); if hour_file.exists() { - let ScanResult { next_expected_height, new_blocks } = - scan_hour_file(&hour_file, &mut last_line, next_height); - if !new_blocks.is_empty() { + let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height); + match scan_result { + Ok(ScanResult { next_expected_height, new_blocks }) => { + if !new_blocks.is_empty() { let mut u_cache = cache.lock().await; let mut u_pre_cache = precompiles_cache.lock(); for blk in new_blocks { @@ -221,6 +236,12 @@ impl BlockIngest { } next_height = next_expected_height; } + } + Err(e) => { + tracing::error!("Failed to scan hour file {}: {}", hour_file.display(), e); + // Continue processing but skip this file + } + } } // Decide whether the *current* hour file is closed (past) or diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 4d5e1324f..109e83805 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -2278,7 +2278,16 @@ impl StateWriter let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize); for num in block_body.tx_num_range() { if receipts_iter.peek().is_some_and(|(n, _)| *n == num) { - block_receipts.push(receipts_iter.next().unwrap().1); + // Safe to unwrap here since we just peeked and confirmed it exists + // However, for maximum safety in database operations, we still handle it + if let Some((_, receipt)) = receipts_iter.next() { + block_receipts.push(receipt); + } else { + // This should never happen based on peek(), but handle gracefully + return Err(ProviderError::Database(reth_db::DatabaseError::Other( + "Receipt iterator state mismatch during state reconstruction".into() + ))); + } } } receipts.push(block_receipts); @@ -3070,9 +3079,13 @@ impl BlockWrite return Ok(()); } - let first_number = blocks.first().unwrap().number(); + // Safe to unwrap after empty check, but use defensive programming for critical DB ops + let first_number = blocks.first() + .expect("Blocks vector guaranteed non-empty after length check") + .number(); - let last = blocks.last().unwrap(); + let last = blocks.last() + .expect("Blocks vector guaranteed non-empty after length check"); let last_block_number = last.number(); let mut durations_recorder = metrics::DurationsRecorder::default(); From edd5383e437482c683fc7bf60ae81ffd9be78de8 Mon Sep 17 00:00:00 2001 From: kamalbuilds Date: Mon, 25 Aug 2025 00:59:26 +0530 Subject: [PATCH 2/6] blog ingest --- bin/reth/src/block_ingest.rs | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index 670feeed2..5ce1892db 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -131,7 +131,8 @@ async fn submit_payload( engine_api_client, envelope.execution_payload, versioned_hashes, - payload_builder_attributes.parent_beacon_block_root.unwrap(), + payload_builder_attributes.parent_beacon_block_root + .ok_or("Missing required parent_beacon_block_root")?, ) .await? }; @@ -147,7 +148,9 @@ fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime { } fn date_from_datetime(dt: OffsetDateTime) -> String { - dt.format(&format_description::parse("[year][month][day]").unwrap()).unwrap() + // Format string is constant and guaranteed to be valid + dt.format(&format_description::parse("[year][month][day]").expect("Valid format string")) + .expect("DateTime formatting should always succeed with valid format") } impl BlockIngest { @@ -200,11 +203,11 @@ impl BlockIngest { let mut next_height = current_head; let mut dt = datetime_from_timestamp(current_ts) .replace_minute(0) - .unwrap() + .expect("Valid minute replacement") .replace_second(0) - .unwrap() + .expect("Valid second replacement") .replace_nanosecond(0) - .unwrap(); + .expect("Valid nanosecond replacement"); let mut hour = dt.hour(); let mut day_str = date_from_datetime(dt); @@ -290,8 +293,10 @@ impl BlockIngest { let mut height = head + 1; let mut previous_hash = provider.block_hash(head)?.unwrap_or(genesis_hash); - let mut previous_timestamp = - std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis(); + let mut previous_timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("System time should be after UNIX epoch") + .as_millis(); let engine_api = node.auth_server_handle().http_client(); let mut evm_map = erc20_contract_to_spot_token(node.chain_spec().chain_id()).await?; @@ -299,8 +304,8 @@ impl BlockIngest { const MINIMUM_TIMESTAMP: u64 = 1739849780; let current_block_timestamp: u64 = provider .block_by_number(head) - .expect("Failed to fetch current block in db") - .expect("Block does not exist") + .map_err(|e| format!("Database error fetching block {}: {}", head, e))? + .ok_or_else(|| format!("Block {} does not exist in database", head))? .into_header() .timestamp(); @@ -397,11 +402,11 @@ impl BlockIngest { let current_timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .unwrap() + .expect("System time should be after UNIX epoch") .as_millis(); if height % 100 == 0 || current_timestamp - previous_timestamp > 100 { - EngineApiClient::::fork_choice_updated_v2( + if let Err(e) = EngineApiClient::::fork_choice_updated_v2( &engine_api, ForkchoiceState { head_block_hash: block_hash, @@ -410,8 +415,10 @@ impl BlockIngest { }, None, ) - .await - .unwrap(); + .await { + tracing::error!("Failed to update fork choice for block {}: {}", height, e); + // Continue processing but log the failure - don't panic the entire blockchain + } previous_timestamp = current_timestamp; } previous_hash = block_hash; From 4e88d19747a89d0f3883e2a57a91b0d23400aae4 Mon Sep 17 00:00:00 2001 From: kamalbuilds Date: Mon, 25 Aug 2025 03:38:05 +0530 Subject: [PATCH 3/6] fix the peer removal expect --- crates/net/network/src/network.rs | 5 +++-- crates/net/network/src/peers.rs | 7 +++++-- crates/net/network/src/session/active.rs | 4 ++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index d71933900..cfd579699 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -149,7 +149,7 @@ impl NetworkHandle { pub async fn transactions_handle(&self) -> Option> { let (tx, rx) = oneshot::channel(); let _ = self.manager().send(NetworkHandleMessage::GetTransactionsHandle(tx)); - rx.await.unwrap() + rx.await.ok().flatten() } /// Send message to gracefully shutdown node. @@ -266,7 +266,8 @@ impl PeersInfo for NetworkHandle { builder.udp6(local_node_record.udp_port); builder.tcp6(local_node_record.tcp_port); } - builder.build(&self.inner.secret_key).expect("valid enr") + builder.build(&self.inner.secret_key) + .expect("ENR builder should always succeed with valid IP and ports") } } diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers.rs index adc95caa2..6433cd2c0 100644 --- a/crates/net/network/src/peers.rs +++ b/crates/net/network/src/peers.rs @@ -647,8 +647,11 @@ impl PeersManager { // remove peer if it has been marked for removal if remove_peer { - let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist"); - self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id)); + if let Some((peer_id, _)) = self.peers.remove_entry(peer_id) { + self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id)); + } else { + tracing::warn!(target: "net::peers", "Attempted to remove non-existent peer: {:?}", peer_id); + } } else if let Some(backoff_until) = backoff_until { // otherwise, backoff the peer if marked as such self.backoff_peer_until(*peer_id, backoff_until); diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index f59055f84..cbcd72324 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -391,7 +391,7 @@ impl ActiveSession { }; self.terminate_message = Some((self.to_session_manager.inner().clone(), msg)); - self.poll_terminate_message(cx).expect("message is set") + self.poll_terminate_message(cx).unwrap_or(Poll::Ready(())) } /// Report back that this session has been closed due to an error @@ -402,7 +402,7 @@ impl ActiveSession { error, }; self.terminate_message = Some((self.to_session_manager.inner().clone(), msg)); - self.poll_terminate_message(cx).expect("message is set") + self.poll_terminate_message(cx).unwrap_or(Poll::Ready(())) } /// Starts the disconnect process From e72d7df2eb7282d450315ab1b3a7eaf76e4463df Mon Sep 17 00:00:00 2001 From: kamalbuilds Date: Mon, 25 Aug 2025 15:04:15 +0530 Subject: [PATCH 4/6] based on the review --- bin/reth/src/block_ingest.rs | 85 +++++++++++++++++++++--------------- 1 file changed, 49 insertions(+), 36 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index 5ce1892db..d394c558e 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -1,11 +1,12 @@ -use std::collections::BTreeMap; -use std::io::{BufRead, BufReader}; -use std::path::{Path, PathBuf}; -use std::sync::Arc; +use std::{ + collections::BTreeMap, + io::{BufRead, BufReader}, + path::{Path, PathBuf}, + sync::Arc, +}; use alloy_consensus::{BlockBody, BlockHeader, Transaction}; -use alloy_primitives::TxKind; -use alloy_primitives::{Address, PrimitiveSignature, B256, U256}; +use alloy_primitives::{Address, PrimitiveSignature, TxKind, B256, U256}; use alloy_rpc_types::engine::{ ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum, }; @@ -14,9 +15,7 @@ use reth::network::PeersHandleProvider; use reth_chainspec::{EthChainSpec, EthereumHardforks}; use reth_hyperliquid_types::{PrecompileData, PrecompilesCache}; use reth_node_api::{Block, FullNodeComponents, PayloadTypes}; -use reth_node_builder::EngineTypes; -use reth_node_builder::NodeTypesWithEngine; -use reth_node_builder::{rpc::RethRpcAddOns, FullNode}; +use reth_node_builder::{rpc::RethRpcAddOns, EngineTypes, FullNode, NodeTypesWithEngine}; use reth_payload_builder::{EthBuiltPayload, EthPayloadBuilderAttributes, PayloadId}; use reth_primitives::{Transaction as TypedTransaction, TransactionSigned}; use reth_provider::{BlockHashReader, BlockReader, StageCheckpointReader}; @@ -28,8 +27,10 @@ use time::{format_description, Duration, OffsetDateTime}; use tokio::sync::Mutex; use tracing::{debug, info}; -use crate::serialized::{BlockAndReceipts, EvmBlock}; -use crate::spot_meta::erc20_contract_to_spot_token; +use crate::{ + serialized::{BlockAndReceipts, EvmBlock}, + spot_meta::erc20_contract_to_spot_token, +}; /// Poll interval when tailing an *open* hourly file. const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); @@ -51,7 +52,11 @@ struct ScanResult { new_blocks: Vec, } -fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Result> { +fn scan_hour_file( + path: &Path, + last_line: &mut usize, + start_height: u64, +) -> Result> { // info!( // "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}", // path, start_height, last_line @@ -62,7 +67,8 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Resu let mut new_blocks = Vec::::new(); let mut last_height = start_height; - let lines: Vec = reader.lines() + let lines: Vec = reader + .lines() .collect::, _>>() .map_err(|e| format!("Failed to read lines from file {}: {}", path.display(), e))?; let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 }; @@ -131,7 +137,8 @@ async fn submit_payload( engine_api_client, envelope.execution_payload, versioned_hashes, - payload_builder_attributes.parent_beacon_block_root + payload_builder_attributes + .parent_beacon_block_root .ok_or("Missing required parent_beacon_block_root")?, ) .await? @@ -178,12 +185,12 @@ impl BlockIngest { e }) .ok()?; - + if blocks.is_empty() { tracing::error!("Deserialized empty blocks vector for height {}", height); return None; } - + info!("Returning s3 synced block for @ Height [{height}]"); Some(blocks[0].clone()) } @@ -221,27 +228,31 @@ impl BlockIngest { match scan_result { Ok(ScanResult { next_expected_height, new_blocks }) => { if !new_blocks.is_empty() { - let mut u_cache = cache.lock().await; - let mut u_pre_cache = precompiles_cache.lock(); - for blk in new_blocks { - let precompiles = PrecompileData { - precompiles: blk.read_precompile_calls.clone(), - highest_precompile_address: blk.highest_precompile_address, - }; - let h = match &blk.block { - EvmBlock::Reth115(b) => { - let block_number = b.header().number() as u64; - block_number + let mut u_cache = cache.lock().await; + let mut u_pre_cache = precompiles_cache.lock(); + for blk in new_blocks { + let precompiles = PrecompileData { + precompiles: blk.read_precompile_calls.clone(), + highest_precompile_address: blk.highest_precompile_address, + }; + let h = match &blk.block { + EvmBlock::Reth115(b) => { + let block_number = b.header().number() as u64; + block_number + } + }; + u_cache.insert(h, blk); + u_pre_cache.insert(h, precompiles); } - }; - u_cache.insert(h, blk); - u_pre_cache.insert(h, precompiles); - } - next_height = next_expected_height; - } + next_height = next_expected_height; + } } Err(e) => { - tracing::error!("Failed to scan hour file {}: {}", hour_file.display(), e); + tracing::error!( + "Failed to scan hour file {}: {}", + hour_file.display(), + e + ); // Continue processing but skip this file } } @@ -415,9 +426,11 @@ impl BlockIngest { }, None, ) - .await { + .await + { tracing::error!("Failed to update fork choice for block {}: {}", height, e); - // Continue processing but log the failure - don't panic the entire blockchain + // Continue processing but log the failure - don't panic the entire + // blockchain } previous_timestamp = current_timestamp; } From 1a50bdfe12aa1d5645e5b3247d448207f66f767b Mon Sep 17 00:00:00 2001 From: kamalbuilds Date: Mon, 25 Aug 2025 15:15:01 +0530 Subject: [PATCH 5/6] rm all changes in this file --- crates/storage/storage-api/src/database_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/storage-api/src/database_provider.rs b/crates/storage/storage-api/src/database_provider.rs index a4c734677..90322ea5a 100644 --- a/crates/storage/storage-api/src/database_provider.rs +++ b/crates/storage/storage-api/src/database_provider.rs @@ -171,4 +171,4 @@ fn range_size_hint(range: &impl RangeBounds) -> Option { Bound::Unbounded => return None, }; end.checked_sub(start).map(|x| x as _) -} +} \ No newline at end of file From c7d1f61817ae9888c2676f37f3fce92d1533f15f Mon Sep 17 00:00:00 2001 From: kamalbuilds Date: Mon, 25 Aug 2025 15:24:55 +0530 Subject: [PATCH 6/6] revert changes --- .../src/providers/database/provider.rs | 21 ++++--------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 109e83805..67c73595a 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -2278,16 +2278,7 @@ impl StateWriter let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize); for num in block_body.tx_num_range() { if receipts_iter.peek().is_some_and(|(n, _)| *n == num) { - // Safe to unwrap here since we just peeked and confirmed it exists - // However, for maximum safety in database operations, we still handle it - if let Some((_, receipt)) = receipts_iter.next() { - block_receipts.push(receipt); - } else { - // This should never happen based on peek(), but handle gracefully - return Err(ProviderError::Database(reth_db::DatabaseError::Other( - "Receipt iterator state mismatch during state reconstruction".into() - ))); - } + block_receipts.push(receipts_iter.next().unwrap().1); } } receipts.push(block_receipts); @@ -3079,13 +3070,9 @@ impl BlockWrite return Ok(()); } - // Safe to unwrap after empty check, but use defensive programming for critical DB ops - let first_number = blocks.first() - .expect("Blocks vector guaranteed non-empty after length check") - .number(); + let first_number = blocks.first().unwrap().number(); - let last = blocks.last() - .expect("Blocks vector guaranteed non-empty after length check"); + let last = blocks.last().unwrap(); let last_block_number = last.number(); let mut durations_recorder = metrics::DurationsRecorder::default(); @@ -3215,4 +3202,4 @@ impl DBProvider for DatabaseProvider fn prune_modes_ref(&self) -> &PruneModes { self.prune_modes_ref() } -} +} \ No newline at end of file