chore(engine): ensure previous pipeline run finished (#2920)

This commit is contained in:
Roman Krasiuk
2023-06-01 11:58:48 +03:00
committed by GitHub
parent 10717a7cc3
commit c25c398d34
2 changed files with 41 additions and 7 deletions

View File

@ -341,7 +341,7 @@ impl Command {
initial_target,
consensus_engine_tx,
consensus_engine_rx,
);
)?;
info!(target: "reth::cli", "Consensus engine initialized");
let events = stream_select(

View File

@ -83,7 +83,7 @@ impl BeaconConsensusEngineHandle {
/// Sends a new payload message to the beacon consensus engine and waits for a response.
///
///See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv2>
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv2>
pub async fn new_payload(
&self,
payload: ExecutionPayload,
@ -200,7 +200,7 @@ where
run_pipeline_continuously: bool,
payload_builder: PayloadBuilderHandle,
target: Option<H256>,
) -> (Self, BeaconConsensusEngineHandle) {
) -> Result<(Self, BeaconConsensusEngineHandle), reth_interfaces::Error> {
let (to_engine, rx) = mpsc::unbounded_channel();
Self::with_channel(
client,
@ -219,6 +219,16 @@ where
/// Create a new instance of the [BeaconConsensusEngine] using the given channel to configure
/// the [BeaconEngineMessage] communication channel.
///
/// By default the engine is started with idle pipeline.
/// The pipeline can be launched immediately in one of the following ways descending in
/// priority:
/// - Explicit [Option::Some] target block hash provided via a constructor argument.
/// - The process was previously interrupted amidst the pipeline run. This is checked by
/// comparing the checkpoints of the first ([StageId::Headers]) and last ([StageId::Finish])
/// stages. In this case, the latest available header in the database is used as the target.
///
/// Propagates any database related error.
#[allow(clippy::too_many_arguments)]
pub fn with_channel(
client: Client,
@ -232,7 +242,7 @@ where
target: Option<H256>,
to_engine: UnboundedSender<BeaconEngineMessage>,
rx: UnboundedReceiver<BeaconEngineMessage>,
) -> (Self, BeaconConsensusEngineHandle) {
) -> Result<(Self, BeaconConsensusEngineHandle), reth_interfaces::Error> {
let handle = BeaconConsensusEngineHandle { to_engine };
let sync = EngineSyncController::new(
pipeline,
@ -254,11 +264,34 @@ where
metrics: Metrics::default(),
};
if let Some(target) = target {
let maybe_pipeline_target = match target {
// Provided target always takes precedence.
target @ Some(_) => target,
None => {
// If no target was provided, check if the stages are congruent - check if the
// checkpoint of the last stage matches the checkpoint of the first.
let first_stage_checkpoint = this
.blockchain
.get_stage_checkpoint(*StageId::ALL.first().unwrap())?
.unwrap_or_default()
.block_number;
let last_stage_checkpoint = this
.blockchain
.get_stage_checkpoint(*StageId::ALL.last().unwrap())?
.unwrap_or_default()
.block_number;
if first_stage_checkpoint != last_stage_checkpoint {
this.blockchain.block_hash(first_stage_checkpoint)?
} else {
None
}
}
};
if let Some(target) = maybe_pipeline_target {
this.sync.set_pipeline_sync_target(target);
}
(this, handle)
Ok((this, handle))
}
/// Returns a new [`BeaconConsensusEngineHandle`] that can be cloned and shared.
@ -1186,7 +1219,8 @@ mod tests {
false,
payload_builder,
None,
);
)
.expect("failed to create consensus engine");
(engine, TestEnv::new(db, tip_rx, handle))
}