From 1602baef6d08932a723139ad53f57d2923218bd5 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 11 Dec 2024 14:08:21 +0100 Subject: [PATCH] fix: only terminate the stream if range is empty (#13281) Co-authored-by: Alexey Shekhirin --- crates/exex/exex/src/backfill/stream.rs | 80 ++++++++++++++----------- 1 file changed, 44 insertions(+), 36 deletions(-) diff --git a/crates/exex/exex/src/backfill/stream.rs b/crates/exex/exex/src/backfill/stream.rs index d88ca87e7..0e27954eb 100644 --- a/crates/exex/exex/src/backfill/stream.rs +++ b/crates/exex/exex/src/backfill/stream.rs @@ -1,10 +1,5 @@ +use super::job::BackfillJobResult; use crate::{BackfillJob, SingleBlockBackfillJob}; -use std::{ - ops::RangeInclusive, - pin::Pin, - task::{ready, Context, Poll}, -}; - use alloy_primitives::BlockNumber; use futures::{ stream::{FuturesOrdered, Stream}, @@ -17,10 +12,13 @@ use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory}; use reth_prune_types::PruneModes; use reth_stages_api::ExecutionStageThresholds; use reth_tracing::tracing::debug; +use std::{ + ops::RangeInclusive, + pin::Pin, + task::{ready, Context, Poll}, +}; use tokio::task::JoinHandle; -use super::job::BackfillJobResult; - /// The default parallelism for active tasks in [`StreamBackfillJob`]. pub(crate) const DEFAULT_PARALLELISM: usize = 4; /// The default batch size for active tasks in [`StreamBackfillJob`]. @@ -157,33 +155,44 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - // Spawn new tasks only if we are below the parallelism configured. - while this.tasks.len() < this.parallelism { - // Take the next `batch_size` blocks from the range and calculate the range bounds - let mut range = this.range.by_ref().take(this.batch_size); - let start = range.next(); - let range_bounds = start.zip(range.last().or(start)); + loop { + // Spawn new tasks only if we are below the parallelism configured. + while this.tasks.len() < this.parallelism { + // Take the next `batch_size` blocks from the range and calculate the range bounds + let mut range = this.range.by_ref().take(this.batch_size); + let start = range.next(); + let range_bounds = start.zip(range.last().or(start)); - // Create the range from the range bounds. If it is empty, we are done. - let Some(range) = range_bounds.map(|(first, last)| first..=last) else { - debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill"); - break; - }; + // Create the range from the range bounds. If it is empty, we are done. + let Some(range) = range_bounds.map(|(first, last)| first..=last) else { + debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill"); + break; + }; - // Spawn a new task for that range - debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task"); - let job = Box::new(BackfillJob { - executor: this.executor.clone(), - provider: this.provider.clone(), - prune_modes: this.prune_modes.clone(), - thresholds: this.thresholds.clone(), - range, - stream_parallelism: this.parallelism, - }) as BackfillTaskIterator<_>; - this.push_back(job); + // Spawn a new task for that range + debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task"); + let job = Box::new(BackfillJob { + executor: this.executor.clone(), + provider: this.provider.clone(), + prune_modes: this.prune_modes.clone(), + thresholds: this.thresholds.clone(), + range, + stream_parallelism: this.parallelism, + }) as BackfillTaskIterator<_>; + this.push_back(job); + } + + let res = ready!(this.poll_next_task(cx)); + + if res.is_some() { + return Poll::Ready(res); + } + + if this.range.is_empty() { + // only terminate the stream if there are no more blocks to process + return Poll::Ready(None); + } } - - this.poll_next_task(cx) } } @@ -310,10 +319,9 @@ mod tests { blocks_and_execution_outcome(provider_factory, chain_spec, key_pair)?; // Backfill the same range - let factory = - BackfillJobFactory::new(executor.clone(), blockchain_db.clone()).with_thresholds( - ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() }, - ); + let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone()) + .with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() }) + .with_stream_parallelism(1); let mut backfill_stream = factory.backfill(1..=2).into_stream(); let mut chain = backfill_stream.next().await.unwrap().unwrap(); chain.execution_outcome_mut().state_mut().reverts.sort();