diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index bc64c6d74..3e9aa91b6 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -182,9 +182,9 @@ where }; let ev = match res { Ok((pipeline, result)) => { - let minimum_progress = pipeline.minimum_progress(); + let minimum_block_number = pipeline.minimum_block_number(); let reached_max_block = - self.has_reached_max_block(minimum_progress.unwrap_or_default()); + self.has_reached_max_block(minimum_block_number.unwrap_or_default()); self.pipeline_state = PipelineState::Idle(Some(pipeline)); EngineSyncEvent::PipelineFinished { result, reached_max_block } } diff --git a/crates/stages/src/pipeline/ctrl.rs b/crates/stages/src/pipeline/ctrl.rs index 8c514eea6..deece92d2 100644 --- a/crates/stages/src/pipeline/ctrl.rs +++ b/crates/stages/src/pipeline/ctrl.rs @@ -12,13 +12,13 @@ pub enum ControlFlow { }, /// The pipeline is allowed to continue executing stages. Continue { - /// The progress of the last stage - progress: BlockNumber, + /// Block number reached by the stage. + block_number: BlockNumber, }, /// Pipeline made no progress NoProgress { - /// The current stage progress. - stage_progress: Option, + /// Block number reached by the stage. + block_number: Option, }, } @@ -37,8 +37,8 @@ impl ControlFlow { pub fn progress(&self) -> Option { match self { ControlFlow::Unwind { .. } => None, - ControlFlow::Continue { progress } => Some(*progress), - ControlFlow::NoProgress { stage_progress } => *stage_progress, + ControlFlow::Continue { block_number } => Some(*block_number), + ControlFlow::NoProgress { block_number } => *block_number, } } } diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 14c78aac1..ca9b7204e 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -117,9 +117,10 @@ where PipelineBuilder::default() } - /// Return the minimum pipeline progress - pub fn minimum_progress(&self) -> Option { - self.progress.minimum_progress + /// Return the minimum block number achieved by + /// any stage during the execution of the pipeline. + pub fn minimum_block_number(&self) -> Option { + self.progress.minimum_block_number } /// Set tip for reverse sync. @@ -181,14 +182,14 @@ where // configured block. if next_action.should_continue() && self.progress - .minimum_progress + .minimum_block_number .zip(self.max_block) .map_or(false, |(progress, target)| progress >= target) { trace!( target: "sync::pipeline", ?next_action, - minimum_progress = ?self.progress.minimum_progress, + minimum_block_number = ?self.progress.minimum_block_number, max_block = ?self.max_block, "Terminating pipeline." ); @@ -218,12 +219,12 @@ where trace!(target: "sync::pipeline", stage = %stage_id, ?next, "Completed stage"); match next { - ControlFlow::NoProgress { stage_progress } => { - if let Some(progress) = stage_progress { - self.progress.update(progress); + ControlFlow::NoProgress { block_number } => { + if let Some(block_number) = block_number { + self.progress.update(block_number); } } - ControlFlow::Continue { progress } => self.progress.update(progress), + ControlFlow::Continue { block_number } => self.progress.update(block_number), ControlFlow::Unwind { target, bad_block } => { self.unwind(target, Some(bad_block.number)).await?; return Ok(ControlFlow::Unwind { target, bad_block }) @@ -345,7 +346,7 @@ where // We reached the maximum block, so we skip the stage return Ok(ControlFlow::NoProgress { - stage_progress: prev_checkpoint.map(|progress| progress.block_number), + block_number: prev_checkpoint.map(|progress| progress.block_number), }) } @@ -386,11 +387,11 @@ where provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?; if done { - let stage_progress = checkpoint.block_number; + let block_number = checkpoint.block_number; return Ok(if made_progress { - ControlFlow::Continue { progress: stage_progress } + ControlFlow::Continue { block_number } } else { - ControlFlow::NoProgress { stage_progress: Some(stage_progress) } + ControlFlow::NoProgress { block_number: Some(block_number) } }) } } @@ -491,26 +492,26 @@ mod tests { let mut progress = PipelineProgress::default(); progress.update(10); - assert_eq!(progress.minimum_progress, Some(10)); - assert_eq!(progress.maximum_progress, Some(10)); + assert_eq!(progress.minimum_block_number, Some(10)); + assert_eq!(progress.maximum_block_number, Some(10)); progress.update(20); - assert_eq!(progress.minimum_progress, Some(10)); - assert_eq!(progress.maximum_progress, Some(20)); + assert_eq!(progress.minimum_block_number, Some(10)); + assert_eq!(progress.maximum_block_number, Some(20)); progress.update(1); - assert_eq!(progress.minimum_progress, Some(1)); - assert_eq!(progress.maximum_progress, Some(20)); + assert_eq!(progress.minimum_block_number, Some(1)); + assert_eq!(progress.maximum_block_number, Some(20)); } #[test] fn progress_ctrl_flow() { let mut progress = PipelineProgress::default(); - assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { stage_progress: None }); + assert_eq!(progress.next_ctrl(), ControlFlow::NoProgress { block_number: None }); progress.update(1); - assert_eq!(progress.next_ctrl(), ControlFlow::Continue { progress: 1 }); + assert_eq!(progress.next_ctrl(), ControlFlow::Continue { block_number: 1 }); } /// Runs a simple pipeline. diff --git a/crates/stages/src/pipeline/progress.rs b/crates/stages/src/pipeline/progress.rs index ef1a17d70..1c4bbcf6c 100644 --- a/crates/stages/src/pipeline/progress.rs +++ b/crates/stages/src/pipeline/progress.rs @@ -4,26 +4,26 @@ use reth_primitives::BlockNumber; #[derive(Debug, Default)] pub(crate) struct PipelineProgress { - /// The progress of the current stage - pub(crate) progress: Option, - /// The maximum progress achieved by any stage during the execution of the pipeline. - pub(crate) maximum_progress: Option, - /// The minimum progress achieved by any stage during the execution of the pipeline. - pub(crate) minimum_progress: Option, + /// Block number reached by the stage. + pub(crate) block_number: Option, + /// The maximum block number achieved by any stage during the execution of the pipeline. + pub(crate) maximum_block_number: Option, + /// The minimum block number achieved by any stage during the execution of the pipeline. + pub(crate) minimum_block_number: Option, } impl PipelineProgress { - pub(crate) fn update(&mut self, progress: BlockNumber) { - self.progress = Some(progress); - self.minimum_progress = opt::min(self.minimum_progress, progress); - self.maximum_progress = opt::max(self.maximum_progress, progress); + pub(crate) fn update(&mut self, block_number: BlockNumber) { + self.block_number = Some(block_number); + self.minimum_block_number = opt::min(self.minimum_block_number, block_number); + self.maximum_block_number = opt::max(self.maximum_block_number, block_number); } /// Get next control flow step pub(crate) fn next_ctrl(&self) -> ControlFlow { - match self.progress { - Some(progress) => ControlFlow::Continue { progress }, - None => ControlFlow::NoProgress { stage_progress: None }, + match self.block_number { + Some(block_number) => ControlFlow::Continue { block_number }, + None => ControlFlow::NoProgress { block_number: None }, } } }