From 5ee776a2ee9fbecc35558f5dc0d31f8459af8658 Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Tue, 10 Dec 2024 06:04:11 +0400 Subject: [PATCH] chore: pass primiitves generic to `EngineApiTreeHandler` fields (#13256) --- crates/blockchain-tree/Cargo.toml | 1 + crates/blockchain-tree/src/block_buffer.rs | 34 +++++---- crates/engine/local/src/service.rs | 2 +- crates/engine/service/src/service.rs | 6 +- crates/engine/tree/src/engine.rs | 14 ++-- crates/engine/tree/src/tree/mod.rs | 88 +++++++++++----------- 6 files changed, 76 insertions(+), 69 deletions(-) diff --git a/crates/blockchain-tree/Cargo.toml b/crates/blockchain-tree/Cargo.toml index 3fa6de2b4..07ecedf88 100644 --- a/crates/blockchain-tree/Cargo.toml +++ b/crates/blockchain-tree/Cargo.toml @@ -31,6 +31,7 @@ reth-consensus.workspace = true reth-node-types.workspace = true # ethereum +alloy-consensus.workspace = true alloy-primitives.workspace = true alloy-eips.workspace = true diff --git a/crates/blockchain-tree/src/block_buffer.rs b/crates/blockchain-tree/src/block_buffer.rs index 5d4ca2705..994ed82cf 100644 --- a/crates/blockchain-tree/src/block_buffer.rs +++ b/crates/blockchain-tree/src/block_buffer.rs @@ -1,6 +1,8 @@ use crate::metrics::BlockBufferMetrics; +use alloy_consensus::BlockHeader; use alloy_primitives::{BlockHash, BlockNumber}; use reth_network::cache::LruCache; +use reth_node_types::Block; use reth_primitives::SealedBlockWithSenders; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -16,9 +18,9 @@ use std::collections::{BTreeMap, HashMap, HashSet}; /// Note: Buffer is limited by number of blocks that it can contain and eviction of the block /// is done by last recently used block. #[derive(Debug)] -pub struct BlockBuffer { +pub struct BlockBuffer { /// All blocks in the buffer stored by their block hash. - pub(crate) blocks: HashMap, + pub(crate) blocks: HashMap>, /// Map of any parent block hash (even the ones not currently in the buffer) /// to the buffered children. /// Allows connecting buffered blocks by parent. @@ -35,7 +37,7 @@ pub struct BlockBuffer { pub(crate) metrics: BlockBufferMetrics, } -impl BlockBuffer { +impl BlockBuffer { /// Create new buffer with max limit of blocks pub fn new(limit: u32) -> Self { Self { @@ -48,37 +50,37 @@ impl BlockBuffer { } /// Return reference to buffered blocks - pub const fn blocks(&self) -> &HashMap { + pub const fn blocks(&self) -> &HashMap> { &self.blocks } /// Return reference to the requested block. - pub fn block(&self, hash: &BlockHash) -> Option<&SealedBlockWithSenders> { + pub fn block(&self, hash: &BlockHash) -> Option<&SealedBlockWithSenders> { self.blocks.get(hash) } /// Return a reference to the lowest ancestor of the given block in the buffer. - pub fn lowest_ancestor(&self, hash: &BlockHash) -> Option<&SealedBlockWithSenders> { + pub fn lowest_ancestor(&self, hash: &BlockHash) -> Option<&SealedBlockWithSenders> { let mut current_block = self.blocks.get(hash)?; - while let Some(parent) = self.blocks.get(¤t_block.parent_hash) { + while let Some(parent) = self.blocks.get(¤t_block.parent_hash()) { current_block = parent; } Some(current_block) } /// Insert a correct block inside the buffer. - pub fn insert_block(&mut self, block: SealedBlockWithSenders) { + pub fn insert_block(&mut self, block: SealedBlockWithSenders) { let hash = block.hash(); - self.parent_to_child.entry(block.parent_hash).or_default().insert(hash); - self.earliest_blocks.entry(block.number).or_default().insert(hash); + self.parent_to_child.entry(block.parent_hash()).or_default().insert(hash); + self.earliest_blocks.entry(block.number()).or_default().insert(hash); self.blocks.insert(hash, block); if let (_, Some(evicted_hash)) = self.lru.insert_and_get_evicted(hash) { // evict the block if limit is hit if let Some(evicted_block) = self.remove_block(&evicted_hash) { // evict the block if limit is hit - self.remove_from_parent(evicted_block.parent_hash, &evicted_hash); + self.remove_from_parent(evicted_block.parent_hash(), &evicted_hash); } } self.metrics.blocks.set(self.blocks.len() as f64); @@ -93,7 +95,7 @@ impl BlockBuffer { pub fn remove_block_with_children( &mut self, parent_hash: &BlockHash, - ) -> Vec { + ) -> Vec> { let removed = self .remove_block(parent_hash) .into_iter() @@ -152,16 +154,16 @@ impl BlockBuffer { /// This method will only remove the block if it's present inside `self.blocks`. /// The block might be missing from other collections, the method will only ensure that it has /// been removed. - fn remove_block(&mut self, hash: &BlockHash) -> Option { + fn remove_block(&mut self, hash: &BlockHash) -> Option> { let block = self.blocks.remove(hash)?; - self.remove_from_earliest_blocks(block.number, hash); - self.remove_from_parent(block.parent_hash, hash); + self.remove_from_earliest_blocks(block.number(), hash); + self.remove_from_parent(block.parent_hash(), hash); self.lru.remove(hash); Some(block) } /// Remove all children and their descendants for the given blocks and return them. - fn remove_children(&mut self, parent_hashes: Vec) -> Vec { + fn remove_children(&mut self, parent_hashes: Vec) -> Vec> { // remove all parent child connection and all the child children blocks that are connected // to the discarded parent blocks. let mut remove_parent_children = parent_hashes; diff --git a/crates/engine/local/src/service.rs b/crates/engine/local/src/service.rs index 0bdc77dbe..b4dd47c43 100644 --- a/crates/engine/local/src/service.rs +++ b/crates/engine/local/src/service.rs @@ -52,7 +52,7 @@ where /// Processes requests. /// /// This type is responsible for processing incoming requests. - handler: EngineApiRequestHandler>, + handler: EngineApiRequestHandler>, /// Receiver for incoming requests (from the engine API endpoint) that need to be processed. incoming_requests: EngineMessageStream, } diff --git a/crates/engine/service/src/service.rs b/crates/engine/service/src/service.rs index 27de4a636..c6a87ea07 100644 --- a/crates/engine/service/src/service.rs +++ b/crates/engine/service/src/service.rs @@ -17,7 +17,7 @@ pub use reth_engine_tree::{ }; use reth_evm::execute::BlockExecutorProvider; use reth_network_p2p::EthBlockClient; -use reth_node_types::{BlockTy, NodeTypesWithEngine}; +use reth_node_types::{BlockTy, NodeTypes, NodeTypesWithEngine}; use reth_payload_builder::PayloadBuilderHandle; use reth_primitives::EthPrimitives; use reth_provider::{providers::BlockchainProvider2, ProviderFactory}; @@ -37,7 +37,9 @@ pub type EngineMessageStream = Pin = ChainOrchestrator< EngineHandler< - EngineApiRequestHandler::Engine>>, + EngineApiRequestHandler< + EngineApiRequest<::Engine, ::Primitives>, + >, EngineMessageStream<::Engine>, BasicBlockDownloader, >, diff --git a/crates/engine/tree/src/engine.rs b/crates/engine/tree/src/engine.rs index 947d025e9..2f0415a10 100644 --- a/crates/engine/tree/src/engine.rs +++ b/crates/engine/tree/src/engine.rs @@ -238,14 +238,14 @@ impl EngineApiKind { /// The request variants that the engine API handler can receive. #[derive(Debug)] -pub enum EngineApiRequest { +pub enum EngineApiRequest { /// A request received from the consensus engine. Beacon(BeaconEngineMessage), /// Request to insert an already executed block, e.g. via payload building. - InsertExecutedBlock(ExecutedBlock), + InsertExecutedBlock(ExecutedBlock), } -impl Display for EngineApiRequest { +impl Display for EngineApiRequest { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Beacon(msg) => msg.fmt(f), @@ -256,14 +256,16 @@ impl Display for EngineApiRequest { } } -impl From> for EngineApiRequest { +impl From> for EngineApiRequest { fn from(msg: BeaconEngineMessage) -> Self { Self::Beacon(msg) } } -impl From> for FromEngine> { - fn from(req: EngineApiRequest) -> Self { +impl From> + for FromEngine> +{ + fn from(req: EngineApiRequest) -> Self { Self::Request(req) } } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index ce9bddd90..29c382c28 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -36,8 +36,8 @@ use reth_payload_builder::PayloadBuilderHandle; use reth_payload_builder_primitives::PayloadBuilder; use reth_payload_primitives::PayloadBuilderAttributes; use reth_primitives::{ - Block, EthPrimitives, GotExpected, NodePrimitives, SealedBlock, SealedBlockWithSenders, - SealedHeader, + EthPrimitives, GotExpected, NodePrimitives, SealedBlock, SealedBlockFor, + SealedBlockWithSenders, SealedHeader, }; use reth_provider::{ providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome, @@ -53,7 +53,6 @@ use std::{ cmp::Ordering, collections::{btree_map, hash_map, BTreeMap, VecDeque}, fmt::Debug, - marker::PhantomData, ops::Bound, sync::{ mpsc::{Receiver, RecvError, RecvTimeoutError, Sender}, @@ -129,7 +128,7 @@ impl TreeState { } /// Returns the block by hash. - fn block_by_hash(&self, hash: B256) -> Option>> { + fn block_by_hash(&self, hash: B256) -> Option>> { self.blocks_by_hash.get(&hash).map(|b| b.block.clone()) } @@ -386,19 +385,19 @@ impl TreeState { /// /// This type is not shareable. #[derive(Debug)] -pub struct EngineApiTreeState { +pub struct EngineApiTreeState { /// Tracks the state of the blockchain tree. - tree_state: TreeState, + tree_state: TreeState, /// Tracks the forkchoice state updates received by the CL. forkchoice_state_tracker: ForkchoiceStateTracker, /// Buffer of detached blocks. - buffer: BlockBuffer, + buffer: BlockBuffer, /// Tracks the header of invalid payloads that were rejected by the engine because they're /// invalid. invalid_headers: InvalidHeaderCache, } -impl EngineApiTreeState { +impl EngineApiTreeState { fn new( block_buffer_limit: u32, max_invalid_header_cache_length: u32, @@ -474,10 +473,10 @@ where { provider: P, executor_provider: E, - consensus: Arc, + consensus: Arc>, payload_validator: V, /// Keeps track of internals such as executed and buffered blocks. - state: EngineApiTreeState, + state: EngineApiTreeState, /// The half for sending messages to the engine. /// /// This is kept so that we can queue in messages to ourself that we can process later, for @@ -486,20 +485,20 @@ where /// them one by one so that we can handle incoming engine API in between and don't become /// unresponsive. This can happen during live sync transition where we're trying to close the /// gap (up to 3 epochs of blocks in the worst case). - incoming_tx: Sender>>, + incoming_tx: Sender>>, /// Incoming engine API requests. - incoming: Receiver>>, + incoming: Receiver>>, /// Outgoing events that are emitted to the handler. - outgoing: UnboundedSender, + outgoing: UnboundedSender>, /// Channels to the persistence layer. - persistence: PersistenceHandle, + persistence: PersistenceHandle, /// Tracks the state changes of the persistence task. persistence_state: PersistenceState, /// Flag indicating the state of the node's backfill synchronization process. backfill_sync_state: BackfillSyncState, /// Keeps track of the state of the canonical chain that isn't persisted yet. /// This is intended to be accessed from external sources, such as rpc. - canonical_in_memory_state: CanonicalInMemoryState, + canonical_in_memory_state: CanonicalInMemoryState, /// Handle to the payload builder that will receive payload attributes for valid forkchoice /// updates payload_builder: PayloadBuilderHandle, @@ -511,8 +510,6 @@ where invalid_block_hook: Box>, /// The engine API variant of this handler engine_kind: EngineApiKind, - /// Captures the types the engine operates on - _primtives: PhantomData, } impl std::fmt::Debug @@ -546,6 +543,7 @@ where N: NodePrimitives< Block = reth_primitives::Block, BlockHeader = reth_primitives::Header, + BlockBody = reth_primitives::BlockBody, Receipt = reth_primitives::Receipt, >, P: DatabaseProviderFactory @@ -562,16 +560,16 @@ where V: EngineValidator, { /// Creates a new [`EngineApiTreeHandler`]. - #[allow(clippy::too_many_arguments)] + #[expect(clippy::too_many_arguments)] pub fn new( provider: P, executor_provider: E, - consensus: Arc, + consensus: Arc>, payload_validator: V, - outgoing: UnboundedSender, - state: EngineApiTreeState, - canonical_in_memory_state: CanonicalInMemoryState, - persistence: PersistenceHandle, + outgoing: UnboundedSender>, + state: EngineApiTreeState, + canonical_in_memory_state: CanonicalInMemoryState, + persistence: PersistenceHandle, persistence_state: PersistenceState, payload_builder: PayloadBuilderHandle, config: TreeConfig, @@ -597,7 +595,6 @@ where incoming_tx, invalid_block_hook: Box::new(NoopInvalidBlockHook), engine_kind, - _primtives: Default::default(), } } @@ -611,19 +608,19 @@ where /// /// Returns the sender through which incoming requests can be sent to the task and the receiver /// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine. - #[allow(clippy::too_many_arguments)] + #[expect(clippy::complexity)] pub fn spawn_new( provider: P, executor_provider: E, - consensus: Arc, + consensus: Arc>, payload_validator: V, - persistence: PersistenceHandle, + persistence: PersistenceHandle, payload_builder: PayloadBuilderHandle, - canonical_in_memory_state: CanonicalInMemoryState, + canonical_in_memory_state: CanonicalInMemoryState, config: TreeConfig, invalid_block_hook: Box>, kind: EngineApiKind, - ) -> (Sender>>, UnboundedReceiver) { + ) -> (Sender>>, UnboundedReceiver>) { let best_block_number = provider.best_block_number().unwrap_or(0); let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default(); @@ -661,7 +658,7 @@ where } /// Returns a new [`Sender`] to send messages to this type. - pub fn sender(&self) -> Sender>> { + pub fn sender(&self) -> Sender>> { self.incoming_tx.clone() } @@ -859,7 +856,7 @@ where /// /// Note: This does not update the tracked state and instead returns the new chain based on the /// given head. - fn on_new_head(&self, new_head: B256) -> ProviderResult> { + fn on_new_head(&self, new_head: B256) -> ProviderResult>> { // get the executed new head block let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else { return Ok(None) @@ -1133,7 +1130,7 @@ where /// Returns an error if the engine channel is disconnected. fn try_recv_engine_message( &self, - ) -> Result>>, RecvError> { + ) -> Result>>, RecvError> { if self.persistence_state.in_progress() { // try to receive the next request with a timeout to not block indefinitely match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) { @@ -1211,7 +1208,7 @@ where /// Handles a message from the engine. fn on_engine_message( &mut self, - msg: FromEngine>, + msg: FromEngine>, ) -> Result<(), InsertBlockFatalError> { match msg { FromEngine::Event(event) => match event { @@ -1452,7 +1449,7 @@ where } /// Emits an outgoing event to the engine. - fn emit_event(&mut self, event: impl Into) { + fn emit_event(&mut self, event: impl Into>) { let event = event.into(); if event.is_backfill_action() { @@ -1496,7 +1493,7 @@ where /// Returns a batch of consecutive canonical blocks to persist in the range /// `(last_persisted_number .. canonical_head - threshold]` . The expected /// order is oldest -> newest. - fn get_canonical_blocks_to_persist(&self) -> Vec { + fn get_canonical_blocks_to_persist(&self) -> Vec> { let mut blocks_to_persist = Vec::new(); let mut current_hash = self.state.tree_state.canonical_block_hash(); let last_persisted_number = self.persistence_state.last_persisted_block.number; @@ -1549,7 +1546,7 @@ where /// has in memory. /// /// For finalized blocks, this will return `None`. - fn executed_block_by_hash(&self, hash: B256) -> ProviderResult> { + fn executed_block_by_hash(&self, hash: B256) -> ProviderResult>> { trace!(target: "engine::tree", ?hash, "Fetching executed block by hash"); // check memory first let block = self.state.tree_state.executed_block_by_hash(hash).cloned(); @@ -1595,7 +1592,7 @@ where } /// Return block from database or in-memory state by hash. - fn block_by_hash(&self, hash: B256) -> ProviderResult> { + fn block_by_hash(&self, hash: B256) -> ProviderResult> { // check database first let mut block = self.provider.block_by_hash(hash)?; if block.is_none() { @@ -1754,7 +1751,10 @@ where /// Validate if block is correct and satisfies all the consensus rules that concern the header /// and block body itself. - fn validate_block(&self, block: &SealedBlockWithSenders) -> Result<(), ConsensusError> { + fn validate_block( + &self, + block: &SealedBlockWithSenders, + ) -> Result<(), ConsensusError> { if let Err(e) = self.consensus.validate_header_with_total_difficulty(block, U256::MAX) { error!( target: "engine::tree", @@ -1951,7 +1951,7 @@ where /// If either of these are true, then this returns the height of the first block. Otherwise, /// this returns [`None`]. This should be used to check whether or not we should be sending a /// remove command to the persistence task. - fn find_disk_reorg(&self, chain_update: &NewCanonicalChain) -> Option { + fn find_disk_reorg(&self, chain_update: &NewCanonicalChain) -> Option { let NewCanonicalChain::Reorg { new, old: _ } = chain_update else { return None }; let BlockNumHash { number: new_num, hash: new_hash } = @@ -1978,7 +1978,7 @@ where /// Invoked when we the canonical chain has been updated. /// /// This is invoked on a valid forkchoice update, or if we can make the target block canonical. - fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain) { + fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain) { trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update"); let start = Instant::now(); @@ -2030,7 +2030,7 @@ where } /// This reinserts any blocks in the new chain that do not already exist in the tree - fn reinsert_reorged_blocks(&mut self, new_chain: Vec) { + fn reinsert_reorged_blocks(&mut self, new_chain: Vec>) { for block in new_chain { if self.state.tree_state.executed_block_by_hash(block.block.hash()).is_none() { trace!(target: "engine::tree", num=?block.block.number, hash=?block.block.hash(), "Reinserting block into tree state"); @@ -2296,7 +2296,7 @@ where self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64()); debug!(target: "engine::tree", ?root_elapsed, block=?sealed_block.num_hash(), "Calculated state root"); - let executed: ExecutedBlock = ExecutedBlock { + let executed: ExecutedBlock = ExecutedBlock { block: sealed_block.clone(), senders: Arc::new(block.senders), execution_output: Arc::new(ExecutionOutcome::from((output, block_number))), @@ -2636,7 +2636,7 @@ mod tests { use reth_engine_primitives::ForkchoiceStatus; use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator}; use reth_evm::test_utils::MockExecutorProvider; - use reth_primitives::{BlockExt, EthPrimitives}; + use reth_primitives::{Block, BlockExt, EthPrimitives}; use reth_provider::test_utils::MockEthProvider; use reth_rpc_types_compat::engine::{block_to_payload_v1, payload::block_to_payload_v3}; use reth_trie::updates::TrieUpdates; @@ -2708,7 +2708,7 @@ mod tests { EthEngineTypes, EthereumEngineValidator, >, - to_tree_tx: Sender>>, + to_tree_tx: Sender>>, from_tree_rx: UnboundedReceiver, blocks: Vec, action_rx: Receiver,