fix: only terminate the stream if range is empty (#13281)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
Matthias Seitz
2024-12-11 14:08:21 +01:00
committed by GitHub
parent b6e682ef73
commit 1602baef6d

View File

@ -1,10 +1,5 @@
use super::job::BackfillJobResult;
use crate::{BackfillJob, SingleBlockBackfillJob};
use std::{
ops::RangeInclusive,
pin::Pin,
task::{ready, Context, Poll},
};
use alloy_primitives::BlockNumber;
use futures::{
stream::{FuturesOrdered, Stream},
@ -17,10 +12,13 @@ use reth_provider::{BlockReader, Chain, HeaderProvider, StateProviderFactory};
use reth_prune_types::PruneModes;
use reth_stages_api::ExecutionStageThresholds;
use reth_tracing::tracing::debug;
use std::{
ops::RangeInclusive,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::task::JoinHandle;
use super::job::BackfillJobResult;
/// The default parallelism for active tasks in [`StreamBackfillJob`].
pub(crate) const DEFAULT_PARALLELISM: usize = 4;
/// The default batch size for active tasks in [`StreamBackfillJob`].
@ -157,33 +155,44 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
// Spawn new tasks only if we are below the parallelism configured.
while this.tasks.len() < this.parallelism {
// Take the next `batch_size` blocks from the range and calculate the range bounds
let mut range = this.range.by_ref().take(this.batch_size);
let start = range.next();
let range_bounds = start.zip(range.last().or(start));
loop {
// Spawn new tasks only if we are below the parallelism configured.
while this.tasks.len() < this.parallelism {
// Take the next `batch_size` blocks from the range and calculate the range bounds
let mut range = this.range.by_ref().take(this.batch_size);
let start = range.next();
let range_bounds = start.zip(range.last().or(start));
// Create the range from the range bounds. If it is empty, we are done.
let Some(range) = range_bounds.map(|(first, last)| first..=last) else {
debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill");
break;
};
// Create the range from the range bounds. If it is empty, we are done.
let Some(range) = range_bounds.map(|(first, last)| first..=last) else {
debug!(target: "exex::backfill", tasks = %this.tasks.len(), range = ?this.range, "No more block batches to backfill");
break;
};
// Spawn a new task for that range
debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task");
let job = Box::new(BackfillJob {
executor: this.executor.clone(),
provider: this.provider.clone(),
prune_modes: this.prune_modes.clone(),
thresholds: this.thresholds.clone(),
range,
stream_parallelism: this.parallelism,
}) as BackfillTaskIterator<_>;
this.push_back(job);
// Spawn a new task for that range
debug!(target: "exex::backfill", tasks = %this.tasks.len(), ?range, "Spawning new block batch backfill task");
let job = Box::new(BackfillJob {
executor: this.executor.clone(),
provider: this.provider.clone(),
prune_modes: this.prune_modes.clone(),
thresholds: this.thresholds.clone(),
range,
stream_parallelism: this.parallelism,
}) as BackfillTaskIterator<_>;
this.push_back(job);
}
let res = ready!(this.poll_next_task(cx));
if res.is_some() {
return Poll::Ready(res);
}
if this.range.is_empty() {
// only terminate the stream if there are no more blocks to process
return Poll::Ready(None);
}
}
this.poll_next_task(cx)
}
}
@ -310,10 +319,9 @@ mod tests {
blocks_and_execution_outcome(provider_factory, chain_spec, key_pair)?;
// Backfill the same range
let factory =
BackfillJobFactory::new(executor.clone(), blockchain_db.clone()).with_thresholds(
ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() },
);
let factory = BackfillJobFactory::new(executor.clone(), blockchain_db.clone())
.with_thresholds(ExecutionStageThresholds { max_blocks: Some(2), ..Default::default() })
.with_stream_parallelism(1);
let mut backfill_stream = factory.backfill(1..=2).into_stream();
let mut chain = backfill_stream.next().await.unwrap().unwrap();
chain.execution_outcome_mut().state_mut().reverts.sort();