fix: Do not crash when collect_block failed

Just gracefully return it as error and log it
This commit is contained in:
sprites0
2025-08-28 13:47:44 -04:00
parent 29e6972d58
commit 5ee9053286
2 changed files with 14 additions and 10 deletions

View File

@ -12,7 +12,7 @@ pub mod utils;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tracing::info; use tracing::{error, info};
pub use cli::*; pub use cli::*;
pub use config::*; pub use config::*;
@ -78,8 +78,11 @@ pub async fn start_pseudo_peer(
_ = transaction_rx.recv() => {} _ = transaction_rx.recv() => {}
Some(eth_req) = eth_rx.recv() => { Some(eth_req) = eth_rx.recv() => {
service.process_eth_request(eth_req).await?; if let Err(e) = service.process_eth_request(eth_req).await {
info!("Processed eth request"); error!("Error processing eth request: {e:?}");
} else {
info!("Processed eth request");
}
} }
} }
} }

View File

@ -152,13 +152,14 @@ impl<BS: BlockSource> PseudoPeer<BS> {
async fn collect_blocks( async fn collect_blocks(
&self, &self,
block_numbers: impl IntoIterator<Item = u64>, block_numbers: impl IntoIterator<Item = u64>,
) -> Vec<BlockAndReceipts> { ) -> eyre::Result<Vec<BlockAndReceipts>> {
let block_numbers = block_numbers.into_iter().collect::<Vec<_>>(); let block_numbers = block_numbers.into_iter().collect::<Vec<_>>();
futures::stream::iter(block_numbers) let res = futures::stream::iter(block_numbers)
.map(async |number| self.collect_block(number).await.unwrap()) .map(async |number| self.collect_block(number).await)
.buffered(self.block_source.recommended_chunk_size() as usize) .buffered(self.block_source.recommended_chunk_size() as usize)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await .await;
res.into_iter().collect()
} }
pub async fn process_eth_request( pub async fn process_eth_request(
@ -185,7 +186,7 @@ impl<BS: BlockSource> PseudoPeer<BS> {
HeadersDirection::Falling => { HeadersDirection::Falling => {
self.collect_blocks((number + 1 - limit..number + 1).rev()).await self.collect_blocks((number + 1 - limit..number + 1).rev()).await
} }
} }?
.into_par_iter() .into_par_iter()
.map(|block| block.to_reth_block(chain_id).header.clone()) .map(|block| block.to_reth_block(chain_id).header.clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -203,7 +204,7 @@ impl<BS: BlockSource> PseudoPeer<BS> {
let block_bodies = self let block_bodies = self
.collect_blocks(numbers) .collect_blocks(numbers)
.await .await?
.into_iter() .into_iter()
.map(|block| block.to_reth_block(chain_id).body) .map(|block| block.to_reth_block(chain_id).body)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -340,7 +341,7 @@ impl<BS: BlockSource> PseudoPeer<BS> {
debug!("Backfilling from {start_number} to {end_number}"); debug!("Backfilling from {start_number} to {end_number}");
// Collect blocks and cache them // Collect blocks and cache them
let blocks = self.collect_blocks(uncached_block_numbers).await; let blocks = self.collect_blocks(uncached_block_numbers).await?;
let block_map: HashMap<B256, u64> = let block_map: HashMap<B256, u64> =
blocks.into_iter().map(|block| (block.hash(), block.number())).collect(); blocks.into_iter().map(|block| (block.hash(), block.number())).collect();
let maybe_block_number = block_map.get(&target_hash).copied(); let maybe_block_number = block_map.get(&target_hash).copied();