From da99986ea2e9914943715cc635bc9209f179d492 Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Tue, 10 Dec 2024 13:06:39 +0400 Subject: [PATCH] feat: relax bounds for `EngineApiTreeHandler` (#13257) Co-authored-by: Matthias Seitz --- Cargo.lock | 3 + crates/blockchain-tree-api/Cargo.toml | 2 + crates/blockchain-tree-api/src/error.rs | 52 +++---- crates/consensus/beacon/src/engine/event.rs | 6 +- crates/engine/local/src/service.rs | 2 +- crates/engine/service/src/service.rs | 1 + crates/engine/tree/Cargo.toml | 2 + crates/engine/tree/src/engine.rs | 36 ++--- crates/engine/tree/src/tree/mod.rs | 151 +++++++++++--------- 9 files changed, 139 insertions(+), 116 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 08259a54a..3bcc998cf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6712,11 +6712,13 @@ dependencies = [ name = "reth-blockchain-tree-api" version = "1.1.2" dependencies = [ + "alloy-consensus", "alloy-eips", "alloy-primitives", "reth-consensus", "reth-execution-errors", "reth-primitives", + "reth-primitives-traits", "reth-storage-errors", "thiserror 2.0.5", ] @@ -7408,6 +7410,7 @@ dependencies = [ "reth-payload-builder-primitives", "reth-payload-primitives", "reth-primitives", + "reth-primitives-traits", "reth-provider", "reth-prune", "reth-prune-types", diff --git a/crates/blockchain-tree-api/Cargo.toml b/crates/blockchain-tree-api/Cargo.toml index b1c01f859..83ae37809 100644 --- a/crates/blockchain-tree-api/Cargo.toml +++ b/crates/blockchain-tree-api/Cargo.toml @@ -14,9 +14,11 @@ workspace = true reth-consensus.workspace = true reth-execution-errors.workspace = true reth-primitives.workspace = true +reth-primitives-traits.workspace = true reth-storage-errors.workspace = true # alloy +alloy-consensus.workspace = true alloy-primitives.workspace = true alloy-eips.workspace = true diff --git a/crates/blockchain-tree-api/src/error.rs b/crates/blockchain-tree-api/src/error.rs index 4dd42c889..92866b4d4 100644 --- a/crates/blockchain-tree-api/src/error.rs +++ b/crates/blockchain-tree-api/src/error.rs @@ -1,11 +1,13 @@ //! Error handling for the blockchain tree +use alloy_consensus::BlockHeader; use alloy_primitives::{BlockHash, BlockNumber}; use reth_consensus::ConsensusError; use reth_execution_errors::{ BlockExecutionError, BlockValidationError, InternalBlockExecutionError, }; -use reth_primitives::SealedBlock; +use reth_primitives::{SealedBlock, SealedBlockFor}; +use reth_primitives_traits::{Block, BlockBody}; pub use reth_storage_errors::provider::ProviderError; /// Various error cases that can occur when a block violates tree assumptions. @@ -210,48 +212,48 @@ impl InsertBlockErrorData { } } -struct InsertBlockErrorDataTwo { - block: SealedBlock, +struct InsertBlockErrorDataTwo { + block: SealedBlockFor, kind: InsertBlockErrorKindTwo, } -impl std::fmt::Display for InsertBlockErrorDataTwo { +impl std::fmt::Display for InsertBlockErrorDataTwo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, "Failed to insert block (hash={}, number={}, parent_hash={}): {}", self.block.hash(), - self.block.number, - self.block.parent_hash, + self.block.number(), + self.block.parent_hash(), self.kind ) } } -impl std::fmt::Debug for InsertBlockErrorDataTwo { +impl std::fmt::Debug for InsertBlockErrorDataTwo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("InsertBlockError") .field("error", &self.kind) .field("hash", &self.block.hash()) - .field("number", &self.block.number) - .field("parent_hash", &self.block.parent_hash) - .field("num_txs", &self.block.body.transactions.len()) + .field("number", &self.block.number()) + .field("parent_hash", &self.block.parent_hash()) + .field("num_txs", &self.block.body.transactions().len()) .finish_non_exhaustive() } } -impl core::error::Error for InsertBlockErrorDataTwo { +impl core::error::Error for InsertBlockErrorDataTwo { fn source(&self) -> Option<&(dyn core::error::Error + 'static)> { Some(&self.kind) } } -impl InsertBlockErrorDataTwo { - const fn new(block: SealedBlock, kind: InsertBlockErrorKindTwo) -> Self { +impl InsertBlockErrorDataTwo { + const fn new(block: SealedBlockFor, kind: InsertBlockErrorKindTwo) -> Self { Self { block, kind } } - fn boxed(block: SealedBlock, kind: InsertBlockErrorKindTwo) -> Box { + fn boxed(block: SealedBlockFor, kind: InsertBlockErrorKindTwo) -> Box { Box::new(Self::new(block, kind)) } } @@ -259,36 +261,36 @@ impl InsertBlockErrorDataTwo { /// Error thrown when inserting a block failed because the block is considered invalid. #[derive(thiserror::Error)] #[error(transparent)] -pub struct InsertBlockErrorTwo { - inner: Box, +pub struct InsertBlockErrorTwo { + inner: Box>, } // === impl InsertBlockErrorTwo === -impl InsertBlockErrorTwo { +impl InsertBlockErrorTwo { /// Create a new `InsertInvalidBlockErrorTwo` - pub fn new(block: SealedBlock, kind: InsertBlockErrorKindTwo) -> Self { + pub fn new(block: SealedBlockFor, kind: InsertBlockErrorKindTwo) -> Self { Self { inner: InsertBlockErrorDataTwo::boxed(block, kind) } } /// Create a new `InsertInvalidBlockError` from a consensus error - pub fn consensus_error(error: ConsensusError, block: SealedBlock) -> Self { + pub fn consensus_error(error: ConsensusError, block: SealedBlockFor) -> Self { Self::new(block, InsertBlockErrorKindTwo::Consensus(error)) } /// Create a new `InsertInvalidBlockError` from a consensus error - pub fn sender_recovery_error(block: SealedBlock) -> Self { + pub fn sender_recovery_error(block: SealedBlockFor) -> Self { Self::new(block, InsertBlockErrorKindTwo::SenderRecovery) } /// Create a new `InsertInvalidBlockError` from an execution error - pub fn execution_error(error: BlockExecutionError, block: SealedBlock) -> Self { + pub fn execution_error(error: BlockExecutionError, block: SealedBlockFor) -> Self { Self::new(block, InsertBlockErrorKindTwo::Execution(error)) } /// Consumes the error and returns the block that resulted in the error #[inline] - pub fn into_block(self) -> SealedBlock { + pub fn into_block(self) -> SealedBlockFor { self.inner.block } @@ -300,19 +302,19 @@ impl InsertBlockErrorTwo { /// Returns the block that resulted in the error #[inline] - pub const fn block(&self) -> &SealedBlock { + pub const fn block(&self) -> &SealedBlockFor { &self.inner.block } /// Consumes the type and returns the block and error kind. #[inline] - pub fn split(self) -> (SealedBlock, InsertBlockErrorKindTwo) { + pub fn split(self) -> (SealedBlockFor, InsertBlockErrorKindTwo) { let inner = *self.inner; (inner.block, inner.kind) } } -impl std::fmt::Debug for InsertBlockErrorTwo { +impl std::fmt::Debug for InsertBlockErrorTwo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { std::fmt::Debug::fmt(&self.inner, f) } diff --git a/crates/consensus/beacon/src/engine/event.rs b/crates/consensus/beacon/src/engine/event.rs index b503e1e10..acf056b3f 100644 --- a/crates/consensus/beacon/src/engine/event.rs +++ b/crates/consensus/beacon/src/engine/event.rs @@ -2,7 +2,7 @@ use alloy_consensus::BlockHeader; use alloy_primitives::B256; use alloy_rpc_types_engine::ForkchoiceState; use reth_engine_primitives::ForkchoiceStatus; -use reth_primitives::{EthPrimitives, NodePrimitives, SealedBlock, SealedHeader}; +use reth_primitives::{EthPrimitives, NodePrimitives, SealedBlockFor, SealedHeader}; use std::{ fmt::{Display, Formatter, Result}, sync::Arc, @@ -15,9 +15,9 @@ pub enum BeaconConsensusEngineEvent { /// The fork choice state was updated, and the current fork choice status ForkchoiceUpdated(ForkchoiceState, ForkchoiceStatus), /// A block was added to the fork chain. - ForkBlockAdded(Arc>, Duration), + ForkBlockAdded(Arc>, Duration), /// A block was added to the canonical chain, and the elapsed time validating the block - CanonicalBlockAdded(Arc>, Duration), + CanonicalBlockAdded(Arc>, Duration), /// A canonical chain was committed, and the elapsed time committing the data CanonicalChainCommitted(Box>, Duration), /// The consensus engine is involved in live sync, and has specific progress diff --git a/crates/engine/local/src/service.rs b/crates/engine/local/src/service.rs index b4dd47c43..57fdc0c25 100644 --- a/crates/engine/local/src/service.rs +++ b/crates/engine/local/src/service.rs @@ -52,7 +52,7 @@ where /// Processes requests. /// /// This type is responsible for processing incoming requests. - handler: EngineApiRequestHandler>, + handler: EngineApiRequestHandler, N::Primitives>, /// Receiver for incoming requests (from the engine API endpoint) that need to be processed. incoming_requests: EngineMessageStream, } diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index c6a87ea07..5dfe41842 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -39,6 +39,7 @@ type EngineServiceType = ChainOrchestrator< EngineHandler< EngineApiRequestHandler< EngineApiRequest<::Engine, ::Primitives>, + ::Primitives, >, EngineMessageStream<::Engine>, BasicBlockDownloader, diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 67cb72850..f428c8771 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -26,6 +26,7 @@ reth-payload-builder-primitives.workspace = true reth-payload-builder.workspace = true reth-payload-primitives.workspace = true reth-primitives.workspace = true +reth-primitives-traits.workspace = true reth-provider.workspace = true reth-prune.workspace = true reth-revm.workspace = true @@ -108,6 +109,7 @@ test-utils = [ "reth-network-p2p/test-utils", "reth-payload-builder/test-utils", "reth-primitives/test-utils", + "reth-primitives-traits/test-utils", "reth-provider/test-utils", "reth-prune-types", "reth-prune-types?/test-utils", diff --git a/crates/engine/tree/src/engine.rs b/crates/engine/tree/src/engine.rs index 2f0415a10..9fa0a8c1d 100644 --- a/crates/engine/tree/src/engine.rs +++ b/crates/engine/tree/src/engine.rs @@ -11,6 +11,7 @@ use reth_beacon_consensus::BeaconConsensusEngineEvent; use reth_chain_state::ExecutedBlock; use reth_engine_primitives::{BeaconEngineMessage, EngineTypes}; use reth_primitives::{NodePrimitives, SealedBlockWithSenders}; +use reth_primitives_traits::Block; use std::{ collections::HashSet, fmt::Display, @@ -66,7 +67,7 @@ impl EngineHandler { impl ChainHandler for EngineHandler where - T: EngineRequestHandler, + T: EngineRequestHandler, S: Stream + Send + Sync + Unpin + 'static, ::Item: Into, D: BlockDownloader, @@ -139,9 +140,11 @@ pub trait EngineRequestHandler: Send + Sync { type Event: Send; /// The request type this handler can process. type Request; + /// Type of the block sent in [`FromEngine::DownloadedBlocks`] variant. + type Block: Block; /// Informs the handler about an event from the [`EngineHandler`]. - fn on_event(&mut self, event: FromEngine); + fn on_event(&mut self, event: FromEngine); /// Advances the handler. fn poll(&mut self, cx: &mut Context<'_>) -> Poll>; @@ -167,31 +170,32 @@ pub trait EngineRequestHandler: Send + Sync { /// In case required blocks are missing, the handler will request them from the network, by emitting /// a download request upstream. #[derive(Debug)] -pub struct EngineApiRequestHandler { +pub struct EngineApiRequestHandler { /// channel to send messages to the tree to execute the payload. - to_tree: Sender>, + to_tree: Sender>, /// channel to receive messages from the tree. - from_tree: UnboundedReceiver, + from_tree: UnboundedReceiver>, } -impl EngineApiRequestHandler { +impl EngineApiRequestHandler { /// Creates a new `EngineApiRequestHandler`. pub const fn new( - to_tree: Sender>, - from_tree: UnboundedReceiver, + to_tree: Sender>, + from_tree: UnboundedReceiver>, ) -> Self { Self { to_tree, from_tree } } } -impl EngineRequestHandler for EngineApiRequestHandler +impl EngineRequestHandler for EngineApiRequestHandler where Request: Send, { - type Event = BeaconConsensusEngineEvent; + type Event = BeaconConsensusEngineEvent; type Request = Request; + type Block = N::Block; - fn on_event(&mut self, event: FromEngine) { + fn on_event(&mut self, event: FromEngine) { // delegate to the tree let _ = self.to_tree.send(event); } @@ -263,7 +267,7 @@ impl From> for EngineA } impl From> - for FromEngine> + for FromEngine, N::Block> { fn from(req: EngineApiRequest) -> Self { Self::Request(req) @@ -297,16 +301,16 @@ impl From> for EngineApiEvent { +pub enum FromEngine { /// Event from the top level orchestrator. Event(FromOrchestrator), /// Request from the engine. Request(Req), /// Downloaded blocks from the network. - DownloadedBlocks(Vec), + DownloadedBlocks(Vec>), } -impl Display for FromEngine { +impl Display for FromEngine { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Event(ev) => write!(f, "Event({ev:?})"), @@ -318,7 +322,7 @@ impl Display for FromEngine { } } -impl From for FromEngine { +impl From for FromEngine { fn from(event: FromOrchestrator) -> Self { Self::Event(event) } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 29c382c28..234a96a47 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -5,7 +5,7 @@ use crate::{ persistence::PersistenceHandle, tree::metrics::EngineApiMetrics, }; -use alloy_consensus::{BlockHeader, Header}; +use alloy_consensus::BlockHeader; use alloy_eips::BlockNumHash; use alloy_primitives::{ map::{HashMap, HashSet}, @@ -36,9 +36,10 @@ use reth_payload_builder::PayloadBuilderHandle; use reth_payload_builder_primitives::PayloadBuilder; use reth_payload_primitives::PayloadBuilderAttributes; use reth_primitives::{ - EthPrimitives, GotExpected, NodePrimitives, SealedBlock, SealedBlockFor, - SealedBlockWithSenders, SealedHeader, + EthPrimitives, GotExpected, NodePrimitives, SealedBlockFor, SealedBlockWithSenders, + SealedHeader, }; +use reth_primitives_traits::Block; use reth_provider::{ providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome, HashedPostStateProvider, ProviderError, StateCommitmentProvider, StateProviderBox, @@ -485,9 +486,9 @@ where /// them one by one so that we can handle incoming engine API in between and don't become /// unresponsive. This can happen during live sync transition where we're trying to close the /// gap (up to 3 epochs of blocks in the worst case). - incoming_tx: Sender>>, + incoming_tx: Sender, N::Block>>, /// Incoming engine API requests. - incoming: Receiver>>, + incoming: Receiver, N::Block>>, /// Outgoing events that are emitted to the handler. outgoing: UnboundedSender>, /// Channels to the persistence layer. @@ -540,24 +541,20 @@ where impl EngineApiTreeHandler where - N: NodePrimitives< - Block = reth_primitives::Block, - BlockHeader = reth_primitives::Header, - BlockBody = reth_primitives::BlockBody, - Receipt = reth_primitives::Receipt, - >, + N: NodePrimitives, P: DatabaseProviderFactory + BlockReader + StateProviderFactory - + StateReader + + StateReader + StateCommitmentProvider + HashedPostStateProvider + Clone + 'static, -

::Provider: BlockReader, +

::Provider: + BlockReader, E: BlockExecutorProvider, T: EngineTypes, - V: EngineValidator, + V: EngineValidator, { /// Creates a new [`EngineApiTreeHandler`]. #[expect(clippy::too_many_arguments)] @@ -620,7 +617,8 @@ where config: TreeConfig, invalid_block_hook: Box>, kind: EngineApiKind, - ) -> (Sender>>, UnboundedReceiver>) { + ) -> (Sender, N::Block>>, UnboundedReceiver>) + { let best_block_number = provider.best_block_number().unwrap_or(0); let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default(); @@ -658,7 +656,7 @@ where } /// Returns a new [`Sender`] to send messages to this type. - pub fn sender(&self) -> Sender>> { + pub fn sender(&self) -> Sender, N::Block>> { self.incoming_tx.clone() } @@ -698,7 +696,7 @@ where /// block request processing isn't blocked for a long time. fn on_downloaded( &mut self, - mut blocks: Vec, + mut blocks: Vec>, ) -> Result, InsertBlockFatalError> { if blocks.is_empty() { // nothing to execute @@ -797,7 +795,7 @@ where let block_hash = block.hash(); let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash); if lowest_buffered_ancestor == block_hash { - lowest_buffered_ancestor = block.parent_hash; + lowest_buffered_ancestor = block.parent_hash(); } // now check the block itself @@ -862,11 +860,11 @@ where return Ok(None) }; - let new_head_number = new_head_block.block.number; + let new_head_number = new_head_block.block.number(); let mut current_canonical_number = self.state.tree_state.current_canonical_head.number; let mut new_chain = vec![new_head_block.clone()]; - let mut current_hash = new_head_block.block.parent_hash; + let mut current_hash = new_head_block.block.parent_hash(); let mut current_number = new_head_number - 1; // Walk back the new chain until we reach a block we know about @@ -875,7 +873,7 @@ where // that are _above_ the current canonical head. while current_number > current_canonical_number { if let Some(block) = self.executed_block_by_hash(current_hash)? { - current_hash = block.block.parent_hash; + current_hash = block.block.parent_hash(); current_number -= 1; new_chain.push(block); } else { @@ -904,7 +902,7 @@ where while current_canonical_number > current_number { if let Some(block) = self.executed_block_by_hash(old_hash)? { old_chain.push(block.clone()); - old_hash = block.block.header.parent_hash; + old_hash = block.block.header.parent_hash(); current_canonical_number -= 1; } else { // This shouldn't happen as we're walking back the canonical chain @@ -920,7 +918,7 @@ where // a common ancestor (fork block) is reached. while old_hash != current_hash { if let Some(block) = self.executed_block_by_hash(old_hash)? { - old_hash = block.block.header.parent_hash; + old_hash = block.block.header.parent_hash(); old_chain.push(block); } else { // This shouldn't happen as we're walking back the canonical chain @@ -929,7 +927,7 @@ where } if let Some(block) = self.executed_block_by_hash(current_hash)? { - current_hash = block.block.parent_hash; + current_hash = block.block.parent_hash(); new_chain.push(block); } else { // This shouldn't happen as we've already walked this path @@ -958,10 +956,10 @@ where return Ok(false) } // We already passed the canonical head - if current_block.number <= canonical_head.number { + if current_block.number() <= canonical_head.number { break } - current_hash = current_block.parent_hash; + current_hash = current_block.parent_hash(); } // verify that the given hash is not already part of canonical chain stored in memory @@ -1040,7 +1038,7 @@ where // to return an error ProviderError::HeaderNotFound(state.head_block_hash.into()) })?; - let updated = self.process_payload_attributes(attr, &tip, state, version); + let updated = self.process_payload_attributes(attr, tip.header(), state, version); return Ok(TreeOutcome::new(updated)) } @@ -1069,13 +1067,13 @@ where // 3. check if the head is already part of the canonical chain if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) { - debug!(target: "engine::tree", head = canonical_header.number, "fcu head block is already canonical"); + debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical"); // For OpStack the proposers are allowed to reorg their own chain at will, so we need to // always trigger a new payload job if requested. if self.engine_kind.is_opstack() { if let Some(attr) = attrs { - debug!(target: "engine::tree", head = canonical_header.number, "handling payload attributes for canonical head"); + debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head"); let updated = self.process_payload_attributes(attr, &canonical_header, state, version); return Ok(TreeOutcome::new(updated)) @@ -1128,9 +1126,10 @@ where /// received in time. /// /// Returns an error if the engine channel is disconnected. + #[expect(clippy::type_complexity)] fn try_recv_engine_message( &self, - ) -> Result>>, RecvError> { + ) -> Result, N::Block>>, RecvError> { if self.persistence_state.in_progress() { // try to receive the next request with a timeout to not block indefinitely match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) { @@ -1208,7 +1207,7 @@ where /// Handles a message from the engine. fn on_engine_message( &mut self, - msg: FromEngine>, + msg: FromEngine, N::Block>, ) -> Result<(), InsertBlockFatalError> { match msg { FromEngine::Event(event) => match event { @@ -1384,7 +1383,7 @@ where .state .buffer .block(&sync_target_state.finalized_block_hash) - .map(|block| block.number); + .map(|block| block.number()); // The block number that the backfill finished at - if the progress or newest // finalized is None then we can't check the distance anyways. @@ -1505,15 +1504,15 @@ where debug!(target: "engine::tree", ?last_persisted_number, ?canonical_head_number, ?target_number, ?current_hash, "Returning canonical blocks to persist"); while let Some(block) = self.state.tree_state.blocks_by_hash.get(¤t_hash) { - if block.block.number <= last_persisted_number { + if block.block.number() <= last_persisted_number { break; } - if block.block.number <= target_number { + if block.block.number() <= target_number { blocks_to_persist.push(block.clone()); } - current_hash = block.block.parent_hash; + current_hash = block.block.parent_hash(); } // reverse the order so that the oldest block comes first @@ -1579,7 +1578,10 @@ where } /// Return sealed block from database or in-memory state by hash. - fn sealed_header_by_hash(&self, hash: B256) -> ProviderResult> { + fn sealed_header_by_hash( + &self, + hash: B256, + ) -> ProviderResult>> { // check memory first let block = self.state.tree_state.block_by_hash(hash).map(|block| block.as_ref().clone().header); @@ -1649,7 +1651,7 @@ where self.state .buffer .lowest_ancestor(&hash) - .map(|block| block.parent_hash) + .map(|block| block.parent_hash()) .unwrap_or_else(|| hash) } @@ -1696,7 +1698,7 @@ where // Edge case: the `latestValid` field is the zero hash if the parent block is the terminal // PoW block, which we need to identify by looking at the parent's block difficulty if let Some(parent) = self.block_by_hash(parent_hash)? { - if !parent.is_zero_difficulty() { + if !parent.header().difficulty().is_zero() { parent_hash = B256::ZERO; } } @@ -1823,8 +1825,8 @@ where /// Returns an error if sender recovery failed or inserting into the buffer failed. fn buffer_block_without_senders( &mut self, - block: SealedBlock, - ) -> Result<(), InsertBlockErrorTwo> { + block: SealedBlockFor, + ) -> Result<(), InsertBlockErrorTwo> { match block.try_seal_with_senders() { Ok(block) => self.buffer_block(block), Err(block) => Err(InsertBlockErrorTwo::sender_recovery_error(block)), @@ -1832,7 +1834,10 @@ where } /// Pre-validates the block and inserts it into the buffer. - fn buffer_block(&mut self, block: SealedBlockWithSenders) -> Result<(), InsertBlockErrorTwo> { + fn buffer_block( + &mut self, + block: SealedBlockWithSenders, + ) -> Result<(), InsertBlockErrorTwo> { if let Err(err) = self.validate_block(&block) { return Err(InsertBlockErrorTwo::consensus_error(err, block.block)) } @@ -1886,7 +1891,7 @@ where // if we have buffered the finalized block, we should check how far // we're off exceeds_backfill_threshold = - self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number); + self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number()); } // If this is invoked after we downloaded a block we can check if this block is the @@ -2011,7 +2016,7 @@ where self.canonical_in_memory_state.set_canonical_head(tip.clone()); // Update metrics based on new tip - self.metrics.tree.canonical_chain_height.set(tip.number as f64); + self.metrics.tree.canonical_chain_height.set(tip.number() as f64); // sends an event to all active listeners about the new canonical chain self.canonical_in_memory_state.notify_canon_state(notification); @@ -2033,7 +2038,7 @@ where fn reinsert_reorged_blocks(&mut self, new_chain: Vec>) { for block in new_chain { if self.state.tree_state.executed_block_by_hash(block.block.hash()).is_none() { - trace!(target: "engine::tree", num=?block.block.number, hash=?block.block.hash(), "Reinserting block into tree state"); + trace!(target: "engine::tree", num=?block.block.number(), hash=?block.block.hash(), "Reinserting block into tree state"); self.state.tree_state.insert_executed(block); } } @@ -2086,10 +2091,10 @@ where /// Returns an event with the appropriate action to take, such as: /// - download more missing blocks /// - try to canonicalize the target if the `block` is the tracked target (head) block. - #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number,), target = "engine::tree")] + #[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")] fn on_downloaded_block( &mut self, - block: SealedBlockWithSenders, + block: SealedBlockWithSenders, ) -> Result, InsertBlockFatalError> { let block_num_hash = block.num_hash(); let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash); @@ -2147,8 +2152,8 @@ where fn insert_block_without_senders( &mut self, - block: SealedBlock, - ) -> Result { + block: SealedBlockFor, + ) -> Result> { match block.try_seal_with_senders() { Ok(block) => self.insert_block(block), Err(block) => Err(InsertBlockErrorTwo::sender_recovery_error(block)), @@ -2157,17 +2162,17 @@ where fn insert_block( &mut self, - block: SealedBlockWithSenders, - ) -> Result { + block: SealedBlockWithSenders, + ) -> Result> { self.insert_block_inner(block.clone()) .map_err(|kind| InsertBlockErrorTwo::new(block.block, kind)) } fn insert_block_inner( &mut self, - block: SealedBlockWithSenders, + block: SealedBlockWithSenders, ) -> Result { - debug!(target: "engine::tree", block=?block.num_hash(), parent = ?block.parent_hash, state_root = ?block.state_root, "Inserting new block into tree"); + debug!(target: "engine::tree", block=?block.num_hash(), parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree"); if self.block_by_hash(block.hash())?.is_some() { return Ok(InsertPayloadOk2::AlreadySeen(BlockStatus2::Valid)) @@ -2179,14 +2184,14 @@ where // validate block consensus rules self.validate_block(&block)?; - trace!(target: "engine::tree", block=?block.num_hash(), parent=?block.parent_hash, "Fetching block state provider"); - let Some(state_provider) = self.state_provider(block.parent_hash)? else { + trace!(target: "engine::tree", block=?block.num_hash(), parent=?block.parent_hash(), "Fetching block state provider"); + let Some(state_provider) = self.state_provider(block.parent_hash())? else { // we don't have the state required to execute this block, buffering it and find the // missing parent block let missing_ancestor = self .state .buffer - .lowest_ancestor(&block.parent_hash) + .lowest_ancestor(&block.parent_hash()) .map(|block| block.parent_num_hash()) .unwrap_or_else(|| block.parent_num_hash()); @@ -2199,9 +2204,9 @@ where }; // now validate against the parent - let parent_block = self.sealed_header_by_hash(block.parent_hash)?.ok_or_else(|| { + let parent_block = self.sealed_header_by_hash(block.parent_hash())?.ok_or_else(|| { InsertBlockErrorKindTwo::Provider(ProviderError::HeaderNotFound( - block.parent_hash.into(), + block.parent_hash().into(), )) })?; if let Err(e) = self.consensus.validate_header_against_parent(&block, &parent_block) { @@ -2212,7 +2217,7 @@ where trace!(target: "engine::tree", block=?block.num_hash(), "Executing block"); let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider)); - let block_number = block.number; + let block_number = block.number(); let block_hash = block.hash(); let sealed_block = Arc::new(block.block.clone()); let block = block.unseal(); @@ -2260,7 +2265,7 @@ where let persistence_in_progress = self.persistence_state.in_progress(); if !persistence_in_progress { state_root_result = match self - .compute_state_root_parallel(block.parent_hash, &hashed_state) + .compute_state_root_parallel(block.header().parent_hash(), &hashed_state) { Ok((state_root, trie_output)) => Some((state_root, trie_output)), Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => { @@ -2278,7 +2283,7 @@ where state_provider.state_root_with_updates(hashed_state.clone())? }; - if state_root != block.state_root { + if state_root != block.header().state_root() { // call post-block hook self.invalid_block_hook.on_invalid_block( &parent_block, @@ -2287,7 +2292,7 @@ where Some((&trie_output, state_root)), ); return Err(ConsensusError::BodyStateRootDiff( - GotExpected { got: state_root, expected: block.state_root }.into(), + GotExpected { got: state_root, expected: block.header().state_root() }.into(), ) .into()) } @@ -2304,7 +2309,7 @@ where trie: Arc::new(trie_output), }; - if self.state.tree_state.canonical_block_hash() == executed.block().parent_hash { + if self.state.tree_state.canonical_block_hash() == executed.block().parent_hash() { debug!(target: "engine::tree", pending = ?executed.block().num_hash() ,"updating pending block"); // if the parent is the canonical head, we can insert the block as the pending block self.canonical_in_memory_state.set_pending_block(executed.clone()); @@ -2375,7 +2380,7 @@ where /// Returns the proper payload status response if the block is invalid. fn on_insert_block_error( &mut self, - error: InsertBlockErrorTwo, + error: InsertBlockErrorTwo, ) -> Result { let (block, error) = error.split(); @@ -2386,12 +2391,12 @@ where // If the error was due to an invalid payload, the payload is added to the // invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is // returned. - warn!(target: "engine::tree", invalid_hash=?block.hash(), invalid_number=?block.number, %validation_err, "Invalid block error on new payload"); + warn!(target: "engine::tree", invalid_hash=?block.hash(), invalid_number=?block.number(), %validation_err, "Invalid block error on new payload"); let latest_valid_hash = if validation_err.is_block_pre_merge() { // zero hash must be returned if block is pre-merge Some(B256::ZERO) } else { - self.latest_valid_hash_for_invalid_payload(block.parent_hash)? + self.latest_valid_hash_for_invalid_payload(block.parent_hash())? }; // keep track of the invalid header @@ -2403,7 +2408,10 @@ where } /// Attempts to find the header for the given block hash if it is canonical. - pub fn find_canonical_header(&self, hash: B256) -> Result, ProviderError> { + pub fn find_canonical_header( + &self, + hash: B256, + ) -> Result>, ProviderError> { let mut canonical = self.canonical_in_memory_state.header_by_hash(hash); if canonical.is_none() { @@ -2434,7 +2442,7 @@ where { // we're also persisting the finalized block on disk so we can reload it on // restart this is required by optimism which queries the finalized block: - let _ = self.persistence.save_finalized_block_number(finalized.number); + let _ = self.persistence.save_finalized_block_number(finalized.number()); self.canonical_in_memory_state.set_finalized(finalized); } } @@ -2462,7 +2470,7 @@ where if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() { // we're also persisting the safe block on disk so we can reload it on // restart this is required by optimism which queries the safe block: - let _ = self.persistence.save_safe_block_number(safe.number); + let _ = self.persistence.save_safe_block_number(safe.number()); self.canonical_in_memory_state.set_safe(safe); } } @@ -2537,7 +2545,7 @@ where fn process_payload_attributes( &self, attrs: T::PayloadAttributes, - head: &Header, + head: &N::BlockHeader, state: ForkchoiceState, version: EngineApiMessageVersion, ) -> OnForkChoiceUpdated { @@ -2626,6 +2634,7 @@ pub enum AdvancePersistenceError { mod tests { use super::*; use crate::persistence::PersistenceAction; + use alloy_consensus::Header; use alloy_primitives::Bytes; use alloy_rlp::Decodable; use alloy_rpc_types_engine::{CancunPayloadFields, ExecutionPayloadSidecar}; @@ -2708,7 +2717,7 @@ mod tests { EthEngineTypes, EthereumEngineValidator, >, - to_tree_tx: Sender>>, + to_tree_tx: Sender, Block>>, from_tree_rx: UnboundedReceiver, blocks: Vec, action_rx: Receiver, @@ -2843,7 +2852,7 @@ mod tests { fn insert_block( &mut self, block: SealedBlockWithSenders, - ) -> Result { + ) -> Result> { let execution_outcome = self.block_builder.get_execution_outcome(block.clone()); self.extend_execution_outcome([execution_outcome]); self.tree.provider.add_state_root(block.state_root);