From 21613bb58df8d9bae77fc771a26e86bfac67719d Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Fri, 31 May 2024 09:17:44 +0200 Subject: [PATCH] fix(engine): do not ignore provider errors (#8519) --- Cargo.lock | 1 + crates/consensus/beacon/Cargo.toml | 1 + crates/consensus/beacon/src/engine/mod.rs | 297 +++++++++++----------- 3 files changed, 152 insertions(+), 147 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f27cec8e8..0642ef6cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6302,6 +6302,7 @@ version = "0.2.0-beta.7" dependencies = [ "assert_matches", "futures", + "itertools 0.12.1", "metrics", "reth-blockchain-tree", "reth-blockchain-tree-api", diff --git a/crates/consensus/beacon/Cargo.toml b/crates/consensus/beacon/Cargo.toml index 67f655858..454382136 100644 --- a/crates/consensus/beacon/Cargo.toml +++ b/crates/consensus/beacon/Cargo.toml @@ -43,6 +43,7 @@ metrics.workspace = true tracing.workspace = true thiserror.workspace = true schnellru.workspace = true +itertools.workspace = true [dev-dependencies] # reth diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 84efe7cff..f86452b5f 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1,4 +1,5 @@ use futures::{stream::BoxStream, Future, StreamExt}; +use itertools::Either; use reth_blockchain_tree_api::{ error::{BlockchainTreeError, CanonicalError, InsertBlockError, InsertBlockErrorKind}, BlockStatus, BlockValidationKind, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk, @@ -333,6 +334,12 @@ where }) } + /// Set the next blockchain tree action. + fn set_blockchain_tree_action(&mut self, action: BlockchainTreeAction) { + let previous_action = self.blockchain_tree_action.replace(action); + debug_assert!(previous_action.is_none(), "Pre-existing action found"); + } + /// Pre-validate forkchoice update and check whether it can be processed. /// /// This method returns the update outcome if validation fails or @@ -340,23 +347,23 @@ where fn pre_validate_forkchoice_update( &mut self, state: ForkchoiceState, - ) -> Option { + ) -> ProviderResult> { if state.head_block_hash.is_zero() { - return Some(OnForkChoiceUpdated::invalid_state()) + return Ok(Some(OnForkChoiceUpdated::invalid_state())) } // check if the new head hash is connected to any ancestor that we previously marked as // invalid let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash); - if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu) { - return Some(OnForkChoiceUpdated::with_invalid(status)) + if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? { + return Ok(Some(OnForkChoiceUpdated::with_invalid(status))) } if self.sync.is_pipeline_active() { // We can only process new forkchoice updates if the pipeline is idle, since it requires // exclusive access to the database trace!(target: "consensus::engine", "Pipeline is syncing, skipping forkchoice update"); - return Some(OnForkChoiceUpdated::syncing()) + return Ok(Some(OnForkChoiceUpdated::syncing())) } if let Some(hook) = self.hooks.active_db_write_hook() { @@ -371,10 +378,10 @@ where "Hook is in progress, skipping forkchoice update. \ This may affect the performance of your node as a validator." ); - return Some(OnForkChoiceUpdated::syncing()) + return Ok(Some(OnForkChoiceUpdated::syncing())) } - None + Ok(None) } /// Process the result of attempting to make forkchoice state head hash canonical. @@ -435,7 +442,7 @@ where Err(err) } else { Ok(OnForkChoiceUpdated::valid( - self.on_failed_canonical_forkchoice_update(&state, err), + self.on_failed_canonical_forkchoice_update(&state, err)?, )) } } @@ -500,16 +507,21 @@ where self.blockchain.on_forkchoice_update_received(&state); trace!(target: "consensus::engine", ?state, "Received new forkchoice state update"); - if let Some(on_updated) = self.pre_validate_forkchoice_update(state) { - // Pre-validate forkchoice state update and return if it's invalid - // or cannot be processed at the moment. - self.on_forkchoice_updated_status(state, on_updated, tx); - } else { - let previous_action = self - .blockchain_tree_action - .replace(BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx }); - debug_assert!(previous_action.is_none(), "Pre-existing action found"); - } + match self.pre_validate_forkchoice_update(state) { + Ok(Some(on_updated)) => { + // Pre-validate forkchoice state update and return if it's invalid + // or cannot be processed at the moment. + self.on_forkchoice_updated_status(state, on_updated, tx); + } + Ok(None) => { + self.set_blockchain_tree_action( + BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx }, + ); + } + Err(error) => { + let _ = tx.send(Err(error.into())); + } + }; } /// Called after the forkchoice update status has been resolved. @@ -713,47 +725,35 @@ where fn latest_valid_hash_for_invalid_payload( &mut self, parent_hash: B256, - insert_err: Option<&InsertBlockErrorKind>, - ) -> Option { - // check pre merge block error - if insert_err.map(|err| err.is_block_pre_merge()).unwrap_or_default() { - return Some(B256::ZERO) - } - + ) -> ProviderResult> { // Check if parent exists in side chain or in canonical chain. - // TODO: handle find_block_by_hash errors. - if matches!(self.blockchain.find_block_by_hash(parent_hash, BlockSource::Any), Ok(Some(_))) - { - Some(parent_hash) - } else { - // iterate over ancestors in the invalid cache - // until we encounter the first valid ancestor - let mut current_hash = parent_hash; - let mut current_header = self.invalid_headers.get(¤t_hash); - while let Some(header) = current_header { - current_hash = header.parent_hash; - current_header = self.invalid_headers.get(¤t_hash); - - // If current_header is None, then the current_hash does not have an invalid - // ancestor in the cache, check its presence in blockchain tree - if current_header.is_none() && - matches!( - // TODO: handle find_block_by_hash errors. - self.blockchain.find_block_by_hash(current_hash, BlockSource::Any), - Ok(Some(_)) - ) - { - return Some(current_hash) - } - } - None + if self.blockchain.find_block_by_hash(parent_hash, BlockSource::Any)?.is_some() { + return Ok(Some(parent_hash)) } + + // iterate over ancestors in the invalid cache + // until we encounter the first valid ancestor + let mut current_hash = parent_hash; + let mut current_header = self.invalid_headers.get(¤t_hash); + while let Some(header) = current_header { + current_hash = header.parent_hash; + current_header = self.invalid_headers.get(¤t_hash); + + // If current_header is None, then the current_hash does not have an invalid + // ancestor in the cache, check its presence in blockchain tree + if current_header.is_none() && + self.blockchain.find_block_by_hash(current_hash, BlockSource::Any)?.is_some() + { + return Ok(Some(current_hash)) + } + } + Ok(None) } /// Prepares the invalid payload response for the given hash, checking the /// database for the parent hash and populating the payload status with the latest valid hash /// according to the engine api spec. - fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> PayloadStatus { + fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult { // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal // PoW block, which we need to identify by looking at the parent's block difficulty if let Ok(Some(parent)) = self.blockchain.header_by_hash_or_number(parent_hash.into()) { @@ -762,12 +762,11 @@ where } } - let valid_parent_hash = - self.latest_valid_hash_for_invalid_payload(parent_hash, None).unwrap_or_default(); - PayloadStatus::from_status(PayloadStatusEnum::Invalid { + let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?; + Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid { validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(), }) - .with_latest_valid_hash(valid_parent_hash) + .with_latest_valid_hash(valid_parent_hash.unwrap_or_default())) } /// Checks if the given `check` hash points to an invalid header, inserting the given `head` @@ -779,27 +778,27 @@ where &mut self, check: B256, head: B256, - ) -> Option { + ) -> ProviderResult> { // check if the check hash was previously marked as invalid - let header = self.invalid_headers.get(&check)?; + let Some(header) = self.invalid_headers.get(&check) else { return Ok(None) }; // populate the latest valid hash field - let status = self.prepare_invalid_response(header.parent_hash); + let status = self.prepare_invalid_response(header.parent_hash)?; // insert the head block into the invalid header cache self.invalid_headers.insert_with_invalid_ancestor(head, header); - Some(status) + Ok(Some(status)) } /// Checks if the given `head` points to an invalid header, which requires a specific response /// to a forkchoice update. - fn check_invalid_ancestor(&mut self, head: B256) -> Option { + fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult> { // check if the head was previously marked as invalid - let header = self.invalid_headers.get(&head)?; + let Some(header) = self.invalid_headers.get(&head) else { return Ok(None) }; // populate the latest valid hash field - Some(self.prepare_invalid_response(header.parent_hash)) + Ok(Some(self.prepare_invalid_response(header.parent_hash)?)) } /// Record latency metrics for one call to make a block canonical @@ -964,24 +963,24 @@ where &mut self, state: &ForkchoiceState, error: CanonicalError, - ) -> PayloadStatus { + ) -> ProviderResult { debug_assert!(self.sync.is_pipeline_idle(), "pipeline must be idle"); // check if the new head was previously invalidated, if so then we deem this FCU // as invalid - if let Some(invalid_ancestor) = self.check_invalid_ancestor(state.head_block_hash) { + if let Some(invalid_ancestor) = self.check_invalid_ancestor(state.head_block_hash)? { warn!(target: "consensus::engine", %error, ?state, ?invalid_ancestor, head=?state.head_block_hash, "Failed to canonicalize the head hash, head is also considered invalid"); debug!(target: "consensus::engine", head=?state.head_block_hash, current_error=%error, "Head was previously marked as invalid"); - return invalid_ancestor + return Ok(invalid_ancestor) } match &error { CanonicalError::Validation(BlockValidationError::BlockPreMerge { .. }) => { warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash"); - return PayloadStatus::from_status(PayloadStatusEnum::Invalid { + return Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid { validation_error: error.to_string(), }) - .with_latest_valid_hash(B256::ZERO) + .with_latest_valid_hash(B256::ZERO)) } CanonicalError::BlockchainTree(BlockchainTreeError::BlockHashNotFoundInChain { .. @@ -992,7 +991,7 @@ where } CanonicalError::OptimisticTargetRevert(block_number) => { self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(*block_number)); - return PayloadStatus::from_status(PayloadStatusEnum::Syncing) + return Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing)) } _ => { warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash"); @@ -1033,7 +1032,7 @@ where } debug!(target: "consensus::engine", %target, "Syncing to new target"); - PayloadStatus::from_status(PayloadStatusEnum::Syncing) + Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing)) } /// Return the parent hash of the lowest buffered ancestor for the requested block, if there @@ -1066,71 +1065,40 @@ where &mut self, payload: ExecutionPayload, cancun_fields: Option, - tx: oneshot::Sender>, - ) { + ) -> Result, BeaconOnNewPayloadError> { self.metrics.new_payload_messages.increment(1); - let block = match self.ensure_well_formed_payload(payload, cancun_fields) { - Ok(block) => block, - Err(status) => { - let _ = tx.send(Ok(status)); - return - } - }; - - let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block.hash()); - if lowest_buffered_ancestor == block.hash() { - lowest_buffered_ancestor = block.parent_hash; - } - - // now check the block itself - if let Some(status) = - self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.hash()) - { - let _ = tx.send(Ok(status)); - return - } - - let previous_action = self - .blockchain_tree_action - .replace(BlockchainTreeAction::InsertNewPayload { block, tx }); - debug_assert!(previous_action.is_none(), "Pre-existing action found"); - } - - /// Ensures that the given payload does not violate any consensus rules that concern the block's - /// layout, like: - /// - missing or invalid base fee - /// - invalid extra data - /// - invalid transactions - /// - incorrect hash - /// - the versioned hashes passed with the payload do not exactly match transaction versioned - /// hashes - /// - the block does not contain blob transactions if it is pre-cancun - /// - /// This validates the following engine API rule: - /// - /// 3. Given the expected array of blob versioned hashes client software **MUST** run its - /// validation by taking the following steps: - /// - /// 1. Obtain the actual array by concatenating blob versioned hashes lists - /// (`tx.blob_versioned_hashes`) of each [blob - /// transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included - /// in the payload, respecting the order of inclusion. If the payload has no blob - /// transactions the expected array **MUST** be `[]`. - /// - /// 2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage | null}` - /// if the expected and the actual arrays don't match. - /// - /// This validation **MUST** be instantly run in all cases even during active sync process. - fn ensure_well_formed_payload( - &mut self, - payload: ExecutionPayload, - cancun_fields: Option, - ) -> Result { + // Ensures that the given payload does not violate any consensus rules that concern the + // block's layout, like: + // - missing or invalid base fee + // - invalid extra data + // - invalid transactions + // - incorrect hash + // - the versioned hashes passed with the payload do not exactly match transaction + // versioned hashes + // - the block does not contain blob transactions if it is pre-cancun + // + // This validates the following engine API rule: + // + // 3. Given the expected array of blob versioned hashes client software **MUST** run its + // validation by taking the following steps: + // + // 1. Obtain the actual array by concatenating blob versioned hashes lists + // (`tx.blob_versioned_hashes`) of each [blob + // transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included + // in the payload, respecting the order of inclusion. If the payload has no blob + // transactions the expected array **MUST** be `[]`. + // + // 2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage | + // null}` if the expected and the actual arrays don't match. + // + // This validation **MUST** be instantly run in all cases even during active sync process. let parent_hash = payload.parent_hash(); - - match self.payload_validator.ensure_well_formed_payload(payload, cancun_fields.into()) { - Ok(block) => Ok(block), + let block = match self + .payload_validator + .ensure_well_formed_payload(payload, cancun_fields.into()) + { + Ok(block) => block, Err(error) => { error!(target: "consensus::engine", %error, "Invalid payload"); // we need to convert the error to a payload status (response to the CL) @@ -1142,12 +1110,28 @@ where // > `latestValidHash: null` if the expected and the actual arrays don't match () None } else { - self.latest_valid_hash_for_invalid_payload(parent_hash, None) + self.latest_valid_hash_for_invalid_payload(parent_hash) + .map_err(BeaconOnNewPayloadError::internal)? }; let status = PayloadStatusEnum::from(error); - Err(PayloadStatus::new(status, latest_valid_hash)) + return Ok(Either::Left(PayloadStatus::new(status, latest_valid_hash))) } + }; + + let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block.hash()); + if lowest_buffered_ancestor == block.hash() { + lowest_buffered_ancestor = block.parent_hash; + } + + // now check the block itself + if let Some(status) = self + .check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.hash()) + .map_err(BeaconOnNewPayloadError::internal)? + { + Ok(Either::Left(status)) + } else { + Ok(Either::Right(block)) } } @@ -1239,10 +1223,10 @@ where let elapsed = start.elapsed(); let mut latest_valid_hash = None; - let block = Arc::new(block); let status = match status { InsertPayloadOk::Inserted(BlockStatus::Valid(attachment)) => { latest_valid_hash = Some(block_hash); + let block = Arc::new(block); let event = if attachment.is_canonical() { BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) } else { @@ -1259,7 +1243,9 @@ where InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => { // check if the block's parent is already marked as invalid if let Some(status) = - self.check_invalid_ancestor_with_head(block.parent_hash, block.hash()) + self.check_invalid_ancestor_with_head(block.parent_hash, block.hash()).map_err( + |error| InsertBlockError::new(block, InsertBlockErrorKind::Provider(error)), + )? { return Ok(status) } @@ -1397,12 +1383,14 @@ where EngineSyncEvent::FetchedFullBlock(block) => { trace!(target: "consensus::engine", hash=?block.hash(), number=%block.number, "Downloaded full block"); // Insert block only if the block's parent is not marked as invalid - if self.check_invalid_ancestor_with_head(block.parent_hash, block.hash()).is_none() + if self + .check_invalid_ancestor_with_head(block.parent_hash, block.hash()) + .map_err(|error| BeaconConsensusEngineError::Common(error.into()))? + .is_none() { - let previous_action = self - .blockchain_tree_action - .replace(BlockchainTreeAction::InsertDownloadedPayload { block }); - debug_assert!(previous_action.is_none(), "Pre-existing action found"); + self.set_blockchain_tree_action( + BlockchainTreeAction::InsertDownloadedPayload { block }, + ); } EngineEventOutcome::Processed } @@ -1489,7 +1477,7 @@ where .check_invalid_ancestor_with_head( lowest_buffered_ancestor, sync_target_state.head_block_hash, - ) + )? .is_some() { warn!( @@ -1657,8 +1645,12 @@ where // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is // returned. warn!(target: "consensus::engine", invalid_hash=?block.hash(), invalid_number=?block.number, %error, "Invalid block error on new payload"); - let latest_valid_hash = self - .latest_valid_hash_for_invalid_payload(block.parent_hash, Some(&error)); + let latest_valid_hash = if error.is_block_pre_merge() { + // zero hash must be returned if block is pre-merge + Some(B256::ZERO) + } else { + self.latest_valid_hash_for_invalid_payload(block.parent_hash)? + }; // keep track of the invalid header self.invalid_headers.insert(block.header); PayloadStatus::new( @@ -1673,14 +1665,13 @@ where // if we're currently syncing and the inserted block is the targeted // FCU head block, we can try to make it canonical. if block_hash == target.head_block_hash { - let previous_action = self.blockchain_tree_action.replace( + self.set_blockchain_tree_action( BlockchainTreeAction::MakeNewPayloadCanonical { payload_num_hash: block_num_hash, status, tx, }, ); - debug_assert!(previous_action.is_none(), "Pre-existing action found"); return Ok(EngineEventOutcome::Processed) } } @@ -1847,7 +1838,19 @@ where this.on_forkchoice_updated(state, payload_attrs, tx); } BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => { - this.on_new_payload(payload, cancun_fields, tx); + match this.on_new_payload(payload, cancun_fields) { + Ok(Either::Right(block)) => { + this.set_blockchain_tree_action( + BlockchainTreeAction::InsertNewPayload { block, tx }, + ); + } + Ok(Either::Left(status)) => { + let _ = tx.send(Ok(status)); + } + Err(error) => { + let _ = tx.send(Err(error)); + } + } } BeaconEngineMessage::TransitionConfigurationExchanged => { this.blockchain.on_transition_configuration_exchanged();