fix(exex): use thresholds in stream backfill (#11478)

This commit is contained in:
Alexey Shekhirin
2024-10-04 14:05:57 +03:00
committed by GitHub
parent 0ce1dd6794
commit 227e293390

View File

@ -41,6 +41,7 @@ pub struct StreamBackfillJob<E, P, T> {
tasks: BackfillTasks<T>,
parallelism: usize,
batch_size: usize,
thresholds: ExecutionStageThresholds,
}
impl<E, P, T> StreamBackfillJob<E, P, T> {
@ -124,7 +125,7 @@ where
executor: this.executor.clone(),
provider: this.provider.clone(),
prune_modes: this.prune_modes.clone(),
thresholds: ExecutionStageThresholds::default(),
thresholds: this.thresholds.clone(),
range,
stream_parallelism: this.parallelism,
};
@ -150,12 +151,14 @@ impl<E, P> From<SingleBlockBackfillJob<E, P>> for StreamBackfillJob<E, P, Single
tasks: FuturesOrdered::new(),
parallelism: job.stream_parallelism,
batch_size: 1,
thresholds: ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
}
}
}
impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem> {
fn from(job: BackfillJob<E, P>) -> Self {
let batch_size = job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize);
Self {
executor: job.executor,
provider: job.provider,
@ -163,7 +166,11 @@ impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamI
range: job.range,
tasks: FuturesOrdered::new(),
parallelism: job.stream_parallelism,
batch_size: job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize),
batch_size,
thresholds: ExecutionStageThresholds {
max_blocks: Some(batch_size as u64),
..job.thresholds
},
}
}
}