refactor: extract on pipeline finished (#3590)

This commit is contained in:
Matthias Seitz
2023-07-05 11:50:51 +02:00
committed by GitHub
parent 7da36e0421
commit 1e3f0c0e86

View File

@ -27,7 +27,7 @@ use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum,
PayloadValidationError,
};
use reth_stages::{ControlFlow, Pipeline};
use reth_stages::{ControlFlow, Pipeline, PipelineError};
use reth_tasks::TaskSpawner;
use std::{
pin::Pin,
@ -1207,111 +1207,127 @@ where
return Some(Err(BeaconConsensusEngineError::PipelineChannelClosed))
}
EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
trace!(target: "consensus::engine", ?result, ?reached_max_block, "Pipeline finished");
match result {
Ok(ctrl) => {
if reached_max_block {
// Terminate the sync early if it's reached the maximum user
// configured block.
return Some(Ok(()))
}
if let ControlFlow::Unwind { bad_block, .. } = ctrl {
trace!(target: "consensus::engine", hash=?bad_block.hash, "Bad block detected in unwind");
// update the `invalid_headers` cache with the new invalid headers
self.invalid_headers.insert(bad_block);
return None
}
// update the canon chain if continuous is enabled
if self.sync.run_pipeline_continuously() {
let max_block = ctrl.progress().unwrap_or_default();
let max_header = match self.blockchain.sealed_header(max_block) {
Ok(header) => match header {
Some(header) => header,
None => {
return Some(Err(Error::Provider(
ProviderError::HeaderNotFound(max_block.into()),
)
.into()))
}
},
Err(error) => {
error!(target: "consensus::engine", ?error, "Error getting canonical header for continuous sync");
return Some(Err(error.into()))
}
};
self.blockchain.set_canonical_head(max_header);
}
let sync_target_state = match self
.forkchoice_state_tracker
.sync_target_state()
{
Some(current_state) => current_state,
None => {
// This is only possible if the node was run with `debug.tip`
// argument and without CL.
warn!(target: "consensus::engine", "No fork choice state available");
return None
}
};
// Next, we check if we need to schedule another pipeline run or transition
// to live sync via tree.
// This can arise if we buffer the forkchoice head, and if the head is an
// ancestor of an invalid block.
//
// * The forkchoice head could be buffered if it were first sent as a
// `newPayload` request.
//
// In this case, we won't have the head hash in the database, so we would
// set the pipeline sync target to a known-invalid head.
//
// This is why we check the invalid header cache here.
let lowest_buffered_ancestor =
self.lowest_buffered_ancestor_or(sync_target_state.head_block_hash);
// this inserts the head if the lowest buffered ancestor is invalid
if self
.check_invalid_ancestor_with_head(
lowest_buffered_ancestor,
sync_target_state.head_block_hash,
)
.is_none()
{
// Update the state and hashes of the blockchain tree if possible.
match self.update_tree_on_finished_pipeline(
sync_target_state.finalized_block_hash,
) {
Ok(synced) => {
if synced {
// we're consider this synced and transition to live sync
self.sync_state_updater.update_sync_state(SyncState::Idle);
} else {
// We don't have the finalized block in the database, so
// we need to run another pipeline.
self.sync.set_pipeline_sync_target(
sync_target_state.finalized_block_hash,
);
}
}
Err(error) => {
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree state");
return Some(Err(error.into()))
}
};
}
}
// Any pipeline error at this point is fatal.
Err(error) => return Some(Err(error.into())),
};
return self.on_pipeline_finished(result, reached_max_block)
}
};
None
}
/// Invoked when the pipeline has finished.
///
/// Returns an Option to indicate whether the engine future should resolve:
///
/// Returns a result if:
/// - Ok(()) if the pipeline finished successfully
/// - Err(..) if the pipeline failed fatally
///
/// Returns None if the pipeline finished successfully and engine should continue.
fn on_pipeline_finished(
&mut self,
result: Result<ControlFlow, PipelineError>,
reached_max_block: bool,
) -> Option<Result<(), BeaconConsensusEngineError>> {
trace!(target: "consensus::engine", ?result, ?reached_max_block, "Pipeline finished");
match result {
Ok(ctrl) => {
if reached_max_block {
// Terminate the sync early if it's reached the maximum user
// configured block.
return Some(Ok(()))
}
if let ControlFlow::Unwind { bad_block, .. } = ctrl {
trace!(target: "consensus::engine", hash=?bad_block.hash, "Bad block detected in unwind");
// update the `invalid_headers` cache with the new invalid headers
self.invalid_headers.insert(bad_block);
return None
}
// update the canon chain if continuous is enabled
if self.sync.run_pipeline_continuously() {
let max_block = ctrl.progress().unwrap_or_default();
let max_header = match self.blockchain.sealed_header(max_block) {
Ok(header) => match header {
Some(header) => header,
None => {
return Some(Err(Error::Provider(ProviderError::HeaderNotFound(
max_block.into(),
))
.into()))
}
},
Err(error) => {
error!(target: "consensus::engine", ?error, "Error getting canonical header for continuous sync");
return Some(Err(error.into()))
}
};
self.blockchain.set_canonical_head(max_header);
}
let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() {
Some(current_state) => current_state,
None => {
// This is only possible if the node was run with `debug.tip`
// argument and without CL.
warn!(target: "consensus::engine", "No fork choice state available");
return None
}
};
// Next, we check if we need to schedule another pipeline run or transition
// to live sync via tree.
// This can arise if we buffer the forkchoice head, and if the head is an
// ancestor of an invalid block.
//
// * The forkchoice head could be buffered if it were first sent as a `newPayload`
// request.
//
// In this case, we won't have the head hash in the database, so we would
// set the pipeline sync target to a known-invalid head.
//
// This is why we check the invalid header cache here.
let lowest_buffered_ancestor =
self.lowest_buffered_ancestor_or(sync_target_state.head_block_hash);
// this inserts the head if the lowest buffered ancestor is invalid
if self
.check_invalid_ancestor_with_head(
lowest_buffered_ancestor,
sync_target_state.head_block_hash,
)
.is_none()
{
// Update the state and hashes of the blockchain tree if possible.
match self
.update_tree_on_finished_pipeline(sync_target_state.finalized_block_hash)
{
Ok(synced) => {
if synced {
// we're consider this synced and transition to live sync
self.sync_state_updater.update_sync_state(SyncState::Idle);
} else {
// We don't have the finalized block in the database, so
// we need to run another pipeline.
self.sync.set_pipeline_sync_target(
sync_target_state.finalized_block_hash,
);
}
}
Err(error) => {
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree state");
return Some(Err(error.into()))
}
};
}
}
// Any pipeline error at this point is fatal.
Err(error) => return Some(Err(error.into())),
};
None
}
}
/// On initialization, the consensus engine will poll the message receiver and return