From 83a032dadaf5e29e748fd315608c894481b8479c Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Mon, 12 Jun 2023 17:18:40 +0300 Subject: [PATCH] chore(engine): run pipeline on start if any of the stages is lagging behind (#3089) --- crates/consensus/beacon/src/engine/mod.rs | 57 +++++++++++++++-------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index cd4f6c6e6..54d9f992b 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -324,26 +324,9 @@ where let maybe_pipeline_target = match target { // Provided target always takes precedence. target @ Some(_) => target, - None => { - // If no target was provided, check if the stages are congruent - check if the - // checkpoint of the last stage matches the checkpoint of the first. - let first_stage_checkpoint = this - .blockchain - .get_stage_checkpoint(*StageId::ALL.first().unwrap())? - .unwrap_or_default() - .block_number; - let last_stage_checkpoint = this - .blockchain - .get_stage_checkpoint(*StageId::ALL.last().unwrap())? - .unwrap_or_default() - .block_number; - if first_stage_checkpoint != last_stage_checkpoint { - this.blockchain.block_hash(first_stage_checkpoint)? - } else { - None - } - } + None => this.check_pipeline_consistency()?, }; + if let Some(target) = maybe_pipeline_target { this.sync.set_pipeline_sync_target(target); } @@ -351,6 +334,42 @@ where Ok((this, handle)) } + /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less + /// than the checkpoint of the first stage). + /// + /// This will return the pipeline target if: + /// * the pipeline was interrupted during its previous run + /// * a new stage was added + /// * stage data was dropped manually through `reth stage drop ...` + /// + /// # Returns + /// + /// A target block hash if the pipeline is inconsistent, otherwise `None`. + fn check_pipeline_consistency(&self) -> Result, reth_interfaces::Error> { + // If no target was provided, check if the stages are congruent - check if the + // checkpoint of the last stage matches the checkpoint of the first. + let first_stage_checkpoint = self + .blockchain + .get_stage_checkpoint(*StageId::ALL.first().unwrap())? + .unwrap_or_default() + .block_number; + + // Skip the first stage as we've already retrieved it and comparing all other checkpoints + // against it. + for stage_id in StageId::ALL.iter().skip(1) { + let stage_checkpoint = + self.blockchain.get_stage_checkpoint(*stage_id)?.unwrap_or_default().block_number; + + // If the checkpoint of any stage is less than the checkpoint of the first stage, + // retrieve and return the block hash of the latest header and use it as the target. + if stage_checkpoint < first_stage_checkpoint { + return self.blockchain.block_hash(first_stage_checkpoint) + } + } + + Ok(None) + } + /// Returns a new [`BeaconConsensusEngineHandle`] that can be cloned and shared. /// /// The [`BeaconConsensusEngineHandle`] can be used to interact with this