feat(stages): replace progress by block_number in PipelineProgress (#3256)

This commit is contained in:
Thomas Coratger
2023-06-21 17:11:55 +02:00
committed by GitHub
parent 35b005ea0a
commit 8fd3cd2d81
4 changed files with 43 additions and 42 deletions

View File

@ -182,9 +182,9 @@ where
};
let ev = match res {
Ok((pipeline, result)) => {
let minimum_progress = pipeline.minimum_progress();
let minimum_block_number = pipeline.minimum_block_number();
let reached_max_block =
self.has_reached_max_block(minimum_progress.unwrap_or_default());
self.has_reached_max_block(minimum_block_number.unwrap_or_default());
self.pipeline_state = PipelineState::Idle(Some(pipeline));
EngineSyncEvent::PipelineFinished { result, reached_max_block }
}

View File

@ -12,13 +12,13 @@ pub enum ControlFlow {
},
/// The pipeline is allowed to continue executing stages.
Continue {
/// The progress of the last stage
progress: BlockNumber,
/// Block number reached by the stage.
block_number: BlockNumber,
},
/// Pipeline made no progress
NoProgress {
/// The current stage progress.
stage_progress: Option<BlockNumber>,
/// Block number reached by the stage.
block_number: Option<BlockNumber>,
},
}
@ -37,8 +37,8 @@ impl ControlFlow {
pub fn progress(&self) -> Option<BlockNumber> {
match self {
ControlFlow::Unwind { .. } => None,
ControlFlow::Continue { progress } => Some(*progress),
ControlFlow::NoProgress { stage_progress } => *stage_progress,
ControlFlow::Continue { block_number } => Some(*block_number),
ControlFlow::NoProgress { block_number } => *block_number,
}
}
}

View File

@ -117,9 +117,10 @@ where
PipelineBuilder::default()
}
/// Return the minimum pipeline progress
pub fn minimum_progress(&self) -> Option<u64> {
self.progress.minimum_progress
/// Return the minimum block number achieved by
/// any stage during the execution of the pipeline.
pub fn minimum_block_number(&self) -> Option<u64> {
self.progress.minimum_block_number
}
/// Set tip for reverse sync.
@ -181,14 +182,14 @@ where
// configured block.
if next_action.should_continue() &&
self.progress
.minimum_progress
.minimum_block_number
.zip(self.max_block)
.map_or(false, |(progress, target)| progress >= target)
{
trace!(
target: "sync::pipeline",
?next_action,
minimum_progress = ?self.progress.minimum_progress,
minimum_block_number = ?self.progress.minimum_block_number,
max_block = ?self.max_block,
"Terminating pipeline."
);
@ -218,12 +219,12 @@ where
trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage");
match next {
ControlFlow::NoProgress { stage_progress } => {
if let Some(progress) = stage_progress {
self.progress.update(progress);
ControlFlow::NoProgress { block_number } => {
if let Some(block_number) = block_number {
self.progress.update(block_number);
}
}
ControlFlow::Continue { progress } => self.progress.update(progress),
ControlFlow::Continue { block_number } => self.progress.update(block_number),
ControlFlow::Unwind { target, bad_block } => {
self.unwind(target, Some(bad_block.number)).await?;
return Ok(ControlFlow::Unwind { target, bad_block })
@ -345,7 +346,7 @@ where
// We reached the maximum block, so we skip the stage
return Ok(ControlFlow::NoProgress {
stage_progress: prev_checkpoint.map(|progress| progress.block_number),
block_number: prev_checkpoint.map(|progress| progress.block_number),
})
}
@ -386,11 +387,11 @@ where
provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?;
if done {
let stage_progress = checkpoint.block_number;
let block_number = checkpoint.block_number;
return Ok(if made_progress {
ControlFlow::Continue { progress: stage_progress }
ControlFlow::Continue { block_number }
} else {
ControlFlow::NoProgress { stage_progress: Some(stage_progress) }
ControlFlow::NoProgress { block_number: Some(block_number) }
})
}
}
@ -491,26 +492,26 @@ mod tests {
let mut progress = PipelineProgress::default();
progress.update(10);
assert_eq!(progress.minimum_progress, Some(10));
assert_eq!(progress.maximum_progress, Some(10));
assert_eq!(progress.minimum_block_number, Some(10));
assert_eq!(progress.maximum_block_number, Some(10));
progress.update(20);
assert_eq!(progress.minimum_progress, Some(10));
assert_eq!(progress.maximum_progress, Some(20));
assert_eq!(progress.minimum_block_number, Some(10));
assert_eq!(progress.maximum_block_number, Some(20));
progress.update(1);
assert_eq!(progress.minimum_progress, Some(1));
assert_eq!(progress.maximum_progress, Some(20));
assert_eq!(progress.minimum_block_number, Some(1));
assert_eq!(progress.maximum_block_number, Some(20));
}
#[test]
fn progress_ctrl_flow() {
let mut progress = PipelineProgress::default();
assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { stage_progress: None });
assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None });
progress.update(1);
assert_eq!(progress.next_ctrl(), ControlFlow::Continue { progress: 1 });
assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 });
}
/// Runs a simple pipeline.

View File

@ -4,26 +4,26 @@ use reth_primitives::BlockNumber;
#[derive(Debug, Default)]
pub(crate) struct PipelineProgress {
/// The progress of the current stage
pub(crate) progress: Option<BlockNumber>,
/// The maximum progress achieved by any stage during the execution of the pipeline.
pub(crate) maximum_progress: Option<BlockNumber>,
/// The minimum progress achieved by any stage during the execution of the pipeline.
pub(crate) minimum_progress: Option<BlockNumber>,
/// Block number reached by the stage.
pub(crate) block_number: Option<BlockNumber>,
/// The maximum block number achieved by any stage during the execution of the pipeline.
pub(crate) maximum_block_number: Option<BlockNumber>,
/// The minimum block number achieved by any stage during the execution of the pipeline.
pub(crate) minimum_block_number: Option<BlockNumber>,
}
impl PipelineProgress {
pub(crate) fn update(&mut self, progress: BlockNumber) {
self.progress = Some(progress);
self.minimum_progress = opt::min(self.minimum_progress, progress);
self.maximum_progress = opt::max(self.maximum_progress, progress);
pub(crate) fn update(&mut self, block_number: BlockNumber) {
self.block_number = Some(block_number);
self.minimum_block_number = opt::min(self.minimum_block_number, block_number);
self.maximum_block_number = opt::max(self.maximum_block_number, block_number);
}
/// Get next control flow step
pub(crate) fn next_ctrl(&self) -> ControlFlow {
match self.progress {
Some(progress) => ControlFlow::Continue { progress },
None => ControlFlow::NoProgress { stage_progress: None },
match self.block_number {
Some(block_number) => ControlFlow::Continue { block_number },
None => ControlFlow::NoProgress { block_number: None },
}
}
}