From 949b3639c3bed719be886e3aea0869c187afceac Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 2 May 2023 17:16:32 +0200 Subject: [PATCH] fix: track invalid payload hashes (#2521) --- Cargo.lock | 1 + crates/consensus/beacon/Cargo.toml | 1 + crates/consensus/beacon/src/engine/mod.rs | 143 +++++++++++++++--- .../rpc/rpc-types/src/eth/engine/payload.rs | 8 + 4 files changed, 129 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 836aea5f8..54a091b3b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4683,6 +4683,7 @@ dependencies = [ "reth-stages", "reth-tasks", "reth-tracing", + "schnellru", "thiserror", "tokio", "tokio-stream", diff --git a/crates/consensus/beacon/Cargo.toml b/crates/consensus/beacon/Cargo.toml index 2a1fbe42c..3e7e30eb8 100644 --- a/crates/consensus/beacon/Cargo.toml +++ b/crates/consensus/beacon/Cargo.toml @@ -27,6 +27,7 @@ futures = "0.3" tracing = "0.1" thiserror = "1.0" metrics = "0.20.1" +schnellru = "0.2" [dev-dependencies] # reth diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index dad1a9694..b9a09133c 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -9,12 +9,16 @@ use reth_interfaces::{ Error, }; use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle}; -use reth_primitives::{listener::EventListeners, BlockNumber, Header, SealedBlock, H256}; +use reth_primitives::{ + listener::EventListeners, BlockNumber, Header, SealedBlock, SealedHeader, H256, U256, +}; use reth_rpc_types::engine::{ ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum, + PayloadValidationError, }; use reth_stages::{stages::FINISH, Pipeline}; use reth_tasks::TaskSpawner; +use schnellru::{ByLength, LruMap}; use std::{ pin::Pin, sync::Arc, @@ -43,6 +47,9 @@ pub use pipeline_state::PipelineState; mod event; pub use event::BeaconConsensusEngineEvent; +/// The maximum number of invalid headers that can be tracked by the engine. +const MAX_INVALID_HEADERS: u32 = 512u32; + /// A _shareable_ beacon consensus frontend. Used to interact with the spawned beacon consensus /// engine. /// @@ -164,6 +171,9 @@ where payload_builder: PayloadBuilderHandle, /// Listeners for engine events. listeners: EventListeners, + /// Tracks the header of invalid payloads that were rejected by the engine because they're + /// invalid. + invalid_headers: InvalidHeaderCache, /// Consensus engine metrics. metrics: Metrics, } @@ -227,6 +237,7 @@ where continuous, payload_builder, listeners: EventListeners::default(), + invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS), metrics: Metrics::default(), }; @@ -275,6 +286,37 @@ where self.blockchain_tree.find_canonical_ancestor(parent_hash) } + /// Loads the header for the given `block_number` from the database. + fn load_header(&self, block_number: u64) -> Result, Error> { + Ok(self.db.view(|tx| tx.get::(block_number))??) + } + + /// Checks if the given `head` points to an invalid header, which requires a specific response + /// to a forkchoice update. + fn check_invalid_ancestor(&mut self, head: H256) -> Option { + // check if the head was previously marked as invalid + let (parent_hash, parent_number) = { + let header = self.invalid_headers.get(&head)?; + (header.parent_hash, header.number.saturating_sub(1)) + }; + let mut latest_valid_hash = parent_hash; + + // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal + // PoW block, which we need to identify by looking at the parent's block difficulty + if let Ok(Some(parent)) = self.load_header(parent_number) { + if parent.difficulty != U256::ZERO { + latest_valid_hash = H256::zero(); + } + } + + let status = PayloadStatus::from_status(PayloadStatusEnum::Invalid { + validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(), + }) + .with_latest_valid_hash(latest_valid_hash); + + Some(status) + } + /// Called to resolve chain forks and ensure that the Execution layer is working with the latest /// valid chain. /// @@ -314,8 +356,7 @@ where if let Some(attrs) = attrs { // get header for further validation let header = self - .db - .view(|tx| tx.get::(head_block_number))?? + .load_header(head_block_number)? .expect("was canonicalized, so it exists"); return Ok(self.process_payload_attributes(attrs, header, state)) } @@ -323,27 +364,7 @@ where PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)) } Err(error) => { - warn!(target: "consensus::engine", ?error, ?state, "Error canonicalizing the head hash"); - // If this is the first forkchoice received, start downloading from safe block - // hash. - let target = if is_first_forkchoice && - !state.safe_block_hash.is_zero() && - self.get_block_number(state.safe_block_hash)?.is_none() - { - PipelineTarget::Safe - } else { - PipelineTarget::Head - }; - self.require_pipeline_run(target); - match error { - Error::Execution(error @ ExecutorError::BlockPreMerge { .. }) => { - PayloadStatus::from_status(PayloadStatusEnum::Invalid { - validation_error: error.to_string(), - }) - .with_latest_valid_hash(H256::zero()) - } - _ => PayloadStatus::from_status(PayloadStatusEnum::Syncing), - } + self.on_failed_canonical_forkchoice_update(&state, error, is_first_forkchoice) } } } else { @@ -356,6 +377,50 @@ where Ok(OnForkChoiceUpdated::valid(status)) } + /// Handler for a failed a forkchoice update due to a canonicalization error. + /// + /// This will determine if the state's head is invalid, and if so, return immediately. + /// + /// If the newest head is not invalid, then this will trigger a new pipeline run to sync the gap + /// + /// See [Self::on_forkchoice_updated] and [BlockchainTreeEngine::make_canonical]. + fn on_failed_canonical_forkchoice_update( + &mut self, + state: &ForkchoiceState, + error: Error, + is_first_forkchoice: bool, + ) -> PayloadStatus { + warn!(target: "consensus::engine", ?error, ?state, "Error canonicalizing the head hash"); + + // check if the new head was previously invalidated, if so then we deem this FCU + // as invalid + if let Some(invalid_ancestor) = self.check_invalid_ancestor(state.head_block_hash) { + debug!(target: "consensus::engine", head=?state.head_block_hash, "Head was previously marked as invalid"); + return invalid_ancestor + } + + // If this is the first forkchoice received, start downloading from safe block + // hash, if we have that block. + let target = if is_first_forkchoice && + !state.safe_block_hash.is_zero() && + self.get_block_number(state.safe_block_hash).ok().flatten().is_none() + { + PipelineTarget::Safe + } else { + PipelineTarget::Head + }; + self.require_pipeline_run(target); + match error { + Error::Execution(error @ ExecutorError::BlockPreMerge { .. }) => { + PayloadStatus::from_status(PayloadStatusEnum::Invalid { + validation_error: error.to_string(), + }) + .with_latest_valid_hash(H256::zero()) + } + _ => PayloadStatus::from_status(PayloadStatusEnum::Syncing), + } + } + /// Validates the payload attributes with respect to the header and fork choice state. fn process_payload_attributes( &self, @@ -426,6 +491,8 @@ where } }; + let header = block.header.clone(); + let status = if self.is_pipeline_idle() { match self.blockchain_tree.insert_block_without_senders(block) { Ok(status) => { @@ -451,6 +518,9 @@ where PayloadStatus::new(status, latest_valid_hash) } Err(error) => { + // payload is deemed invalid, insert it into the cache + self.invalid_headers.insert(header); + let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(parent_hash, Some(&error)); let status = PayloadStatusEnum::Invalid { validation_error: error.to_string() }; @@ -458,11 +528,13 @@ where } } } else if let Err(error) = self.blockchain_tree.buffer_block_without_sender(block) { + // received a new payload while we're still syncing to the target let latest_valid_hash = self.latest_valid_hash_for_invalid_payload(parent_hash, Some(&error)); let status = PayloadStatusEnum::Invalid { validation_error: error.to_string() }; PayloadStatus::new(status, latest_valid_hash) } else { + // successfully buffered the block PayloadStatus::from_status(PayloadStatusEnum::Syncing) }; trace!(target: "consensus::engine", ?block_hash, block_number, ?status, "Returning payload status"); @@ -692,6 +764,29 @@ enum PipelineTarget { Safe, } +/// Keeps track of invalid headerst. +struct InvalidHeaderCache { + headers: LruMap, +} + +impl InvalidHeaderCache { + fn new(max_length: u32) -> Self { + Self { headers: LruMap::new(ByLength::new(max_length)) } + } + + /// Returns the header if it exists in the cache. + fn get(&mut self, hash: &H256) -> Option<&Header> { + self.headers.get(hash).map(|h| &*h) + } + + /// Inserts a new header into the map. + fn insert(&mut self, header: SealedHeader) { + let hash = header.hash; + let header = header.unseal(); + self.headers.insert(hash, header); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/rpc/rpc-types/src/eth/engine/payload.rs b/crates/rpc/rpc-types/src/eth/engine/payload.rs index 28cb2f9f3..76ac7c4b6 100644 --- a/crates/rpc/rpc-types/src/eth/engine/payload.rs +++ b/crates/rpc/rpc-types/src/eth/engine/payload.rs @@ -385,6 +385,14 @@ impl PayloadStatusEnum { } } +/// Various validation errors +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum PayloadValidationError { + /// Thrown when a forkchoice update's head links to a previously rejected payload. + #[error("links to previously rejected block")] + LinksToRejectedPayload, +} + #[cfg(test)] mod tests { use super::*;