diff --git a/crates/exex/exex/src/backfill/job.rs b/crates/exex/exex/src/backfill/job.rs index f93c5efa7..5888368e3 100644 --- a/crates/exex/exex/src/backfill/job.rs +++ b/crates/exex/exex/src/backfill/job.rs @@ -25,7 +25,8 @@ pub(super) type BackfillJobResult = Result; /// Backfill job started for a specific range. /// /// It implements [`Iterator`] that executes blocks in batches according to the provided thresholds -/// and yields [`Chain`] +/// and yields [`Chain`]. In other words, this iterator can yield multiple items for the given range +/// depending on the configured thresholds. #[derive(Debug)] pub struct BackfillJob { pub(crate) executor: E, diff --git a/crates/exex/exex/src/backfill/stream.rs b/crates/exex/exex/src/backfill/stream.rs index 95da076c7..d88ca87e7 100644 --- a/crates/exex/exex/src/backfill/stream.rs +++ b/crates/exex/exex/src/backfill/stream.rs @@ -77,15 +77,24 @@ where self } - /// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the - /// [`BackfillTasks`] queue. - fn push_task(&mut self, mut job: BackfillTaskIterator) { + /// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the end + /// of the [`BackfillTasks`] queue. + fn push_back(&mut self, mut job: BackfillTaskIterator) { self.tasks.push_back(tokio::task::spawn_blocking(move || BackfillTaskOutput { result: job.next(), job, })); } + /// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the + /// front of the [`BackfillTasks`] queue. + fn push_front(&mut self, mut job: BackfillTaskIterator) { + self.tasks.push_front(tokio::task::spawn_blocking(move || BackfillTaskOutput { + result: job.next(), + job, + })); + } + /// Polls the next task in the [`BackfillTasks`] queue until it returns a non-empty result. fn poll_next_task(&mut self, cx: &mut Context<'_>) -> Poll>> { while let Some(res) = ready!(self.tasks.poll_next_unpin(cx)) { @@ -93,8 +102,9 @@ where if let BackfillTaskOutput { result: Some(job_result), job } = task_result { // If the task returned a non-empty result, a new task advancing the job is created - // and pushed to the front of the queue. - self.push_task(job); + // and pushed to the __front__ of the queue, so that the next item of this returned + // next. + self.push_front(job); return Poll::Ready(Some(job_result)) }; @@ -130,7 +140,7 @@ where range: block_number..=block_number, stream_parallelism: this.parallelism, }) as BackfillTaskIterator<_>; - this.push_task(job); + this.push_back(job); } this.poll_next_task(cx) @@ -170,7 +180,7 @@ where range, stream_parallelism: this.parallelism, }) as BackfillTaskIterator<_>; - this.push_task(job); + this.push_back(job); } this.poll_next_task(cx)