Merge pull request #52 from hl-archive-node/fix/avoid-crash-on-eth-failure

fix: Do not crash when collect_block failed
This commit is contained in:
sprites0
2025-08-29 02:51:10 +09:00
committed by GitHub
2 changed files with 14 additions and 10 deletions

View File

@ -12,7 +12,7 @@ pub mod utils;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::info;
use tracing::{error, info};
pub use cli::*;
pub use config::*;
@ -78,8 +78,11 @@ pub async fn start_pseudo_peer(
_ = transaction_rx.recv() => {}
Some(eth_req) = eth_rx.recv() => {
service.process_eth_request(eth_req).await?;
info!("Processed eth request");
if let Err(e) = service.process_eth_request(eth_req).await {
error!("Error processing eth request: {e:?}");
} else {
info!("Processed eth request");
}
}
}
}

View File

@ -152,13 +152,14 @@ impl<BS: BlockSource> PseudoPeer<BS> {
async fn collect_blocks(
&self,
block_numbers: impl IntoIterator<Item = u64>,
) -> Vec<BlockAndReceipts> {
) -> eyre::Result<Vec<BlockAndReceipts>> {
let block_numbers = block_numbers.into_iter().collect::<Vec<_>>();
futures::stream::iter(block_numbers)
.map(async |number| self.collect_block(number).await.unwrap())
let res = futures::stream::iter(block_numbers)
.map(async |number| self.collect_block(number).await)
.buffered(self.block_source.recommended_chunk_size() as usize)
.collect::<Vec<_>>()
.await
.await;
res.into_iter().collect()
}
pub async fn process_eth_request(
@ -185,7 +186,7 @@ impl<BS: BlockSource> PseudoPeer<BS> {
HeadersDirection::Falling => {
self.collect_blocks((number + 1 - limit..number + 1).rev()).await
}
}
}?
.into_par_iter()
.map(|block| block.to_reth_block(chain_id).header.clone())
.collect::<Vec<_>>();
@ -203,7 +204,7 @@ impl<BS: BlockSource> PseudoPeer<BS> {
let block_bodies = self
.collect_blocks(numbers)
.await
.await?
.into_iter()
.map(|block| block.to_reth_block(chain_id).body)
.collect::<Vec<_>>();
@ -340,7 +341,7 @@ impl<BS: BlockSource> PseudoPeer<BS> {
debug!("Backfilling from {start_number} to {end_number}");
// Collect blocks and cache them
let blocks = self.collect_blocks(uncached_block_numbers).await;
let blocks = self.collect_blocks(uncached_block_numbers).await?;
let block_map: HashMap<B256, u64> =
blocks.into_iter().map(|block| (block.hash(), block.number())).collect();
let maybe_block_number = block_map.get(&target_hash).copied();