From 1e3f0c0e8615a36b2b10db1dbcb0faf1b0b8e394 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 5 Jul 2023 11:50:51 +0200 Subject: [PATCH] refactor: extract on pipeline finished (#3590) --- crates/consensus/beacon/src/engine/mod.rs | 218 ++++++++++++---------- 1 file changed, 117 insertions(+), 101 deletions(-) diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 57028c941..ffef3be2b 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -27,7 +27,7 @@ use reth_rpc_types::engine::{ ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum, PayloadValidationError, }; -use reth_stages::{ControlFlow, Pipeline}; +use reth_stages::{ControlFlow, Pipeline, PipelineError}; use reth_tasks::TaskSpawner; use std::{ pin::Pin, @@ -1207,111 +1207,127 @@ where return Some(Err(BeaconConsensusEngineError::PipelineChannelClosed)) } EngineSyncEvent::PipelineFinished { result, reached_max_block } => { - trace!(target: "consensus::engine", ?result, ?reached_max_block, "Pipeline finished"); - match result { - Ok(ctrl) => { - if reached_max_block { - // Terminate the sync early if it's reached the maximum user - // configured block. - return Some(Ok(())) - } - - if let ControlFlow::Unwind { bad_block, .. } = ctrl { - trace!(target: "consensus::engine", hash=?bad_block.hash, "Bad block detected in unwind"); - - // update the `invalid_headers` cache with the new invalid headers - self.invalid_headers.insert(bad_block); - return None - } - - // update the canon chain if continuous is enabled - if self.sync.run_pipeline_continuously() { - let max_block = ctrl.progress().unwrap_or_default(); - let max_header = match self.blockchain.sealed_header(max_block) { - Ok(header) => match header { - Some(header) => header, - None => { - return Some(Err(Error::Provider( - ProviderError::HeaderNotFound(max_block.into()), - ) - .into())) - } - }, - Err(error) => { - error!(target: "consensus::engine", ?error, "Error getting canonical header for continuous sync"); - return Some(Err(error.into())) - } - }; - self.blockchain.set_canonical_head(max_header); - } - - let sync_target_state = match self - .forkchoice_state_tracker - .sync_target_state() - { - Some(current_state) => current_state, - None => { - // This is only possible if the node was run with `debug.tip` - // argument and without CL. - warn!(target: "consensus::engine", "No fork choice state available"); - return None - } - }; - - // Next, we check if we need to schedule another pipeline run or transition - // to live sync via tree. - // This can arise if we buffer the forkchoice head, and if the head is an - // ancestor of an invalid block. - // - // * The forkchoice head could be buffered if it were first sent as a - // `newPayload` request. - // - // In this case, we won't have the head hash in the database, so we would - // set the pipeline sync target to a known-invalid head. - // - // This is why we check the invalid header cache here. - let lowest_buffered_ancestor = - self.lowest_buffered_ancestor_or(sync_target_state.head_block_hash); - - // this inserts the head if the lowest buffered ancestor is invalid - if self - .check_invalid_ancestor_with_head( - lowest_buffered_ancestor, - sync_target_state.head_block_hash, - ) - .is_none() - { - // Update the state and hashes of the blockchain tree if possible. - match self.update_tree_on_finished_pipeline( - sync_target_state.finalized_block_hash, - ) { - Ok(synced) => { - if synced { - // we're consider this synced and transition to live sync - self.sync_state_updater.update_sync_state(SyncState::Idle); - } else { - // We don't have the finalized block in the database, so - // we need to run another pipeline. - self.sync.set_pipeline_sync_target( - sync_target_state.finalized_block_hash, - ); - } - } - Err(error) => { - error!(target: "consensus::engine", ?error, "Error restoring blockchain tree state"); - return Some(Err(error.into())) - } - }; - } - } - // Any pipeline error at this point is fatal. - Err(error) => return Some(Err(error.into())), - }; + return self.on_pipeline_finished(result, reached_max_block) } }; None } + + /// Invoked when the pipeline has finished. + /// + /// Returns an Option to indicate whether the engine future should resolve: + /// + /// Returns a result if: + /// - Ok(()) if the pipeline finished successfully + /// - Err(..) if the pipeline failed fatally + /// + /// Returns None if the pipeline finished successfully and engine should continue. + fn on_pipeline_finished( + &mut self, + result: Result, + reached_max_block: bool, + ) -> Option> { + trace!(target: "consensus::engine", ?result, ?reached_max_block, "Pipeline finished"); + match result { + Ok(ctrl) => { + if reached_max_block { + // Terminate the sync early if it's reached the maximum user + // configured block. + return Some(Ok(())) + } + + if let ControlFlow::Unwind { bad_block, .. } = ctrl { + trace!(target: "consensus::engine", hash=?bad_block.hash, "Bad block detected in unwind"); + + // update the `invalid_headers` cache with the new invalid headers + self.invalid_headers.insert(bad_block); + return None + } + + // update the canon chain if continuous is enabled + if self.sync.run_pipeline_continuously() { + let max_block = ctrl.progress().unwrap_or_default(); + let max_header = match self.blockchain.sealed_header(max_block) { + Ok(header) => match header { + Some(header) => header, + None => { + return Some(Err(Error::Provider(ProviderError::HeaderNotFound( + max_block.into(), + )) + .into())) + } + }, + Err(error) => { + error!(target: "consensus::engine", ?error, "Error getting canonical header for continuous sync"); + return Some(Err(error.into())) + } + }; + self.blockchain.set_canonical_head(max_header); + } + + let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() { + Some(current_state) => current_state, + None => { + // This is only possible if the node was run with `debug.tip` + // argument and without CL. + warn!(target: "consensus::engine", "No fork choice state available"); + return None + } + }; + + // Next, we check if we need to schedule another pipeline run or transition + // to live sync via tree. + // This can arise if we buffer the forkchoice head, and if the head is an + // ancestor of an invalid block. + // + // * The forkchoice head could be buffered if it were first sent as a `newPayload` + // request. + // + // In this case, we won't have the head hash in the database, so we would + // set the pipeline sync target to a known-invalid head. + // + // This is why we check the invalid header cache here. + let lowest_buffered_ancestor = + self.lowest_buffered_ancestor_or(sync_target_state.head_block_hash); + + // this inserts the head if the lowest buffered ancestor is invalid + if self + .check_invalid_ancestor_with_head( + lowest_buffered_ancestor, + sync_target_state.head_block_hash, + ) + .is_none() + { + // Update the state and hashes of the blockchain tree if possible. + match self + .update_tree_on_finished_pipeline(sync_target_state.finalized_block_hash) + { + Ok(synced) => { + if synced { + // we're consider this synced and transition to live sync + self.sync_state_updater.update_sync_state(SyncState::Idle); + } else { + // We don't have the finalized block in the database, so + // we need to run another pipeline. + self.sync.set_pipeline_sync_target( + sync_target_state.finalized_block_hash, + ); + } + } + Err(error) => { + error!(target: "consensus::engine", ?error, "Error restoring blockchain tree state"); + return Some(Err(error.into())) + } + }; + } + } + // Any pipeline error at this point is fatal. + Err(error) => return Some(Err(error.into())), + }; + + None + } } /// On initialization, the consensus engine will poll the message receiver and return