chore(engine): run pipeline on start if any of the stages is lagging behind (#3089)

This commit is contained in:
Roman Krasiuk
2023-06-12 17:18:40 +03:00
committed by GitHub
parent ff13ec3601
commit 83a032dada

View File

@ -324,26 +324,9 @@ where
let maybe_pipeline_target = match target {
// Provided target always takes precedence.
target @ Some(_) => target,
None => {
// If no target was provided, check if the stages are congruent - check if the
// checkpoint of the last stage matches the checkpoint of the first.
let first_stage_checkpoint = this
.blockchain
.get_stage_checkpoint(*StageId::ALL.first().unwrap())?
.unwrap_or_default()
.block_number;
let last_stage_checkpoint = this
.blockchain
.get_stage_checkpoint(*StageId::ALL.last().unwrap())?
.unwrap_or_default()
.block_number;
if first_stage_checkpoint != last_stage_checkpoint {
this.blockchain.block_hash(first_stage_checkpoint)?
} else {
None
}
}
None => this.check_pipeline_consistency()?,
};
if let Some(target) = maybe_pipeline_target {
this.sync.set_pipeline_sync_target(target);
}
@ -351,6 +334,42 @@ where
Ok((this, handle))
}
/// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
/// than the checkpoint of the first stage).
///
/// This will return the pipeline target if:
/// * the pipeline was interrupted during its previous run
/// * a new stage was added
/// * stage data was dropped manually through `reth stage drop ...`
///
/// # Returns
///
/// A target block hash if the pipeline is inconsistent, otherwise `None`.
fn check_pipeline_consistency(&self) -> Result<Option<H256>, reth_interfaces::Error> {
// If no target was provided, check if the stages are congruent - check if the
// checkpoint of the last stage matches the checkpoint of the first.
let first_stage_checkpoint = self
.blockchain
.get_stage_checkpoint(*StageId::ALL.first().unwrap())?
.unwrap_or_default()
.block_number;
// Skip the first stage as we've already retrieved it and comparing all other checkpoints
// against it.
for stage_id in StageId::ALL.iter().skip(1) {
let stage_checkpoint =
self.blockchain.get_stage_checkpoint(*stage_id)?.unwrap_or_default().block_number;
// If the checkpoint of any stage is less than the checkpoint of the first stage,
// retrieve and return the block hash of the latest header and use it as the target.
if stage_checkpoint < first_stage_checkpoint {
return self.blockchain.block_hash(first_stage_checkpoint)
}
}
Ok(None)
}
/// Returns a new [`BeaconConsensusEngineHandle`] that can be cloned and shared.
///
/// The [`BeaconConsensusEngineHandle`] can be used to interact with this