From 5ee9053286d8683dcece05b413d5c11e1ff3b0d1 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Thu, 28 Aug 2025 13:47:44 -0400 Subject: [PATCH] fix: Do not crash when collect_block failed Just gracefully return it as error and log it --- src/pseudo_peer/mod.rs | 9 ++++++--- src/pseudo_peer/service.rs | 15 ++++++++------- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/pseudo_peer/mod.rs b/src/pseudo_peer/mod.rs index 612c09b55..a6f354434 100644 --- a/src/pseudo_peer/mod.rs +++ b/src/pseudo_peer/mod.rs @@ -12,7 +12,7 @@ pub mod utils; use std::sync::Arc; use tokio::sync::mpsc; -use tracing::info; +use tracing::{error, info}; pub use cli::*; pub use config::*; @@ -78,8 +78,11 @@ pub async fn start_pseudo_peer( _ = transaction_rx.recv() => {} Some(eth_req) = eth_rx.recv() => { - service.process_eth_request(eth_req).await?; - info!("Processed eth request"); + if let Err(e) = service.process_eth_request(eth_req).await { + error!("Error processing eth request: {e:?}"); + } else { + info!("Processed eth request"); + } } } } diff --git a/src/pseudo_peer/service.rs b/src/pseudo_peer/service.rs index 1e249fb8a..5639e899a 100644 --- a/src/pseudo_peer/service.rs +++ b/src/pseudo_peer/service.rs @@ -152,13 +152,14 @@ impl PseudoPeer { async fn collect_blocks( &self, block_numbers: impl IntoIterator, - ) -> Vec { + ) -> eyre::Result> { let block_numbers = block_numbers.into_iter().collect::>(); - futures::stream::iter(block_numbers) - .map(async |number| self.collect_block(number).await.unwrap()) + let res = futures::stream::iter(block_numbers) + .map(async |number| self.collect_block(number).await) .buffered(self.block_source.recommended_chunk_size() as usize) .collect::>() - .await + .await; + res.into_iter().collect() } pub async fn process_eth_request( @@ -185,7 +186,7 @@ impl PseudoPeer { HeadersDirection::Falling => { self.collect_blocks((number + 1 - limit..number + 1).rev()).await } - } + }? .into_par_iter() .map(|block| block.to_reth_block(chain_id).header.clone()) .collect::>(); @@ -203,7 +204,7 @@ impl PseudoPeer { let block_bodies = self .collect_blocks(numbers) - .await + .await? .into_iter() .map(|block| block.to_reth_block(chain_id).body) .collect::>(); @@ -340,7 +341,7 @@ impl PseudoPeer { debug!("Backfilling from {start_number} to {end_number}"); // Collect blocks and cache them - let blocks = self.collect_blocks(uncached_block_numbers).await; + let blocks = self.collect_blocks(uncached_block_numbers).await?; let block_map: HashMap = blocks.into_iter().map(|block| (block.hash(), block.number())).collect(); let maybe_block_number = block_map.get(&target_hash).copied();