fix: push job front of queue (#13177)

This commit is contained in:
Matthias Seitz
2024-12-06 11:44:40 +01:00
committed by GitHub
parent d3e09c8c43
commit ab87f22cab
2 changed files with 19 additions and 8 deletions

View File

@ -25,7 +25,8 @@ pub(super) type BackfillJobResult<T> = Result<T, BlockExecutionError>;
/// Backfill job started for a specific range.
///
/// It implements [`Iterator`] that executes blocks in batches according to the provided thresholds
/// and yields [`Chain`]
/// and yields [`Chain`]. In other words, this iterator can yield multiple items for the given range
/// depending on the configured thresholds.
#[derive(Debug)]
pub struct BackfillJob<E, P> {
pub(crate) executor: E,

View File

@ -77,15 +77,24 @@ where
self
}
/// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the
/// [`BackfillTasks`] queue.
fn push_task(&mut self, mut job: BackfillTaskIterator<T>) {
/// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the end
/// of the [`BackfillTasks`] queue.
fn push_back(&mut self, mut job: BackfillTaskIterator<T>) {
self.tasks.push_back(tokio::task::spawn_blocking(move || BackfillTaskOutput {
result: job.next(),
job,
}));
}
/// Spawns a new task calling the [`BackfillTaskIterator::next`] method and pushes it to the
/// front of the [`BackfillTasks`] queue.
fn push_front(&mut self, mut job: BackfillTaskIterator<T>) {
self.tasks.push_front(tokio::task::spawn_blocking(move || BackfillTaskOutput {
result: job.next(),
job,
}));
}
/// Polls the next task in the [`BackfillTasks`] queue until it returns a non-empty result.
fn poll_next_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<BackfillJobResult<T>>> {
while let Some(res) = ready!(self.tasks.poll_next_unpin(cx)) {
@ -93,8 +102,9 @@ where
if let BackfillTaskOutput { result: Some(job_result), job } = task_result {
// If the task returned a non-empty result, a new task advancing the job is created
// and pushed to the front of the queue.
self.push_task(job);
// and pushed to the __front__ of the queue, so that the next item of this returned
// next.
self.push_front(job);
return Poll::Ready(Some(job_result))
};
@ -130,7 +140,7 @@ where
range: block_number..=block_number,
stream_parallelism: this.parallelism,
}) as BackfillTaskIterator<_>;
this.push_task(job);
this.push_back(job);
}
this.poll_next_task(cx)
@ -170,7 +180,7 @@ where
range,
stream_parallelism: this.parallelism,
}) as BackfillTaskIterator<_>;
this.push_task(job);
this.push_back(job);
}
this.poll_next_task(cx)