chore: pass primiitves generic to EngineApiTreeHandler fields (#13256)

This commit is contained in:
Arsenii Kulikov
2024-12-10 06:04:11 +04:00
committed by GitHub
parent c9bd64018a
commit 5ee776a2ee
6 changed files with 76 additions and 69 deletions

View File

@ -31,6 +31,7 @@ reth-consensus.workspace = true
reth-node-types.workspace = true
# ethereum
alloy-consensus.workspace = true
alloy-primitives.workspace = true
alloy-eips.workspace = true

View File

@ -1,6 +1,8 @@
use crate::metrics::BlockBufferMetrics;
use alloy_consensus::BlockHeader;
use alloy_primitives::{BlockHash, BlockNumber};
use reth_network::cache::LruCache;
use reth_node_types::Block;
use reth_primitives::SealedBlockWithSenders;
use std::collections::{BTreeMap, HashMap, HashSet};
@ -16,9 +18,9 @@ use std::collections::{BTreeMap, HashMap, HashSet};
/// Note: Buffer is limited by number of blocks that it can contain and eviction of the block
/// is done by last recently used block.
#[derive(Debug)]
pub struct BlockBuffer {
pub struct BlockBuffer<B: Block = reth_primitives::Block> {
/// All blocks in the buffer stored by their block hash.
pub(crate) blocks: HashMap<BlockHash, SealedBlockWithSenders>,
pub(crate) blocks: HashMap<BlockHash, SealedBlockWithSenders<B>>,
/// Map of any parent block hash (even the ones not currently in the buffer)
/// to the buffered children.
/// Allows connecting buffered blocks by parent.
@ -35,7 +37,7 @@ pub struct BlockBuffer {
pub(crate) metrics: BlockBufferMetrics,
}
impl BlockBuffer {
impl<B: Block> BlockBuffer<B> {
/// Create new buffer with max limit of blocks
pub fn new(limit: u32) -> Self {
Self {
@ -48,37 +50,37 @@ impl BlockBuffer {
}
/// Return reference to buffered blocks
pub const fn blocks(&self) -> &HashMap<BlockHash, SealedBlockWithSenders> {
pub const fn blocks(&self) -> &HashMap<BlockHash, SealedBlockWithSenders<B>> {
&self.blocks
}
/// Return reference to the requested block.
pub fn block(&self, hash: &BlockHash) -> Option<&SealedBlockWithSenders> {
pub fn block(&self, hash: &BlockHash) -> Option<&SealedBlockWithSenders<B>> {
self.blocks.get(hash)
}
/// Return a reference to the lowest ancestor of the given block in the buffer.
pub fn lowest_ancestor(&self, hash: &BlockHash) -> Option<&SealedBlockWithSenders> {
pub fn lowest_ancestor(&self, hash: &BlockHash) -> Option<&SealedBlockWithSenders<B>> {
let mut current_block = self.blocks.get(hash)?;
while let Some(parent) = self.blocks.get(&current_block.parent_hash) {
while let Some(parent) = self.blocks.get(&current_block.parent_hash()) {
current_block = parent;
}
Some(current_block)
}
/// Insert a correct block inside the buffer.
pub fn insert_block(&mut self, block: SealedBlockWithSenders) {
pub fn insert_block(&mut self, block: SealedBlockWithSenders<B>) {
let hash = block.hash();
self.parent_to_child.entry(block.parent_hash).or_default().insert(hash);
self.earliest_blocks.entry(block.number).or_default().insert(hash);
self.parent_to_child.entry(block.parent_hash()).or_default().insert(hash);
self.earliest_blocks.entry(block.number()).or_default().insert(hash);
self.blocks.insert(hash, block);
if let (_, Some(evicted_hash)) = self.lru.insert_and_get_evicted(hash) {
// evict the block if limit is hit
if let Some(evicted_block) = self.remove_block(&evicted_hash) {
// evict the block if limit is hit
self.remove_from_parent(evicted_block.parent_hash, &evicted_hash);
self.remove_from_parent(evicted_block.parent_hash(), &evicted_hash);
}
}
self.metrics.blocks.set(self.blocks.len() as f64);
@ -93,7 +95,7 @@ impl BlockBuffer {
pub fn remove_block_with_children(
&mut self,
parent_hash: &BlockHash,
) -> Vec<SealedBlockWithSenders> {
) -> Vec<SealedBlockWithSenders<B>> {
let removed = self
.remove_block(parent_hash)
.into_iter()
@ -152,16 +154,16 @@ impl BlockBuffer {
/// This method will only remove the block if it's present inside `self.blocks`.
/// The block might be missing from other collections, the method will only ensure that it has
/// been removed.
fn remove_block(&mut self, hash: &BlockHash) -> Option<SealedBlockWithSenders> {
fn remove_block(&mut self, hash: &BlockHash) -> Option<SealedBlockWithSenders<B>> {
let block = self.blocks.remove(hash)?;
self.remove_from_earliest_blocks(block.number, hash);
self.remove_from_parent(block.parent_hash, hash);
self.remove_from_earliest_blocks(block.number(), hash);
self.remove_from_parent(block.parent_hash(), hash);
self.lru.remove(hash);
Some(block)
}
/// Remove all children and their descendants for the given blocks and return them.
fn remove_children(&mut self, parent_hashes: Vec<BlockHash>) -> Vec<SealedBlockWithSenders> {
fn remove_children(&mut self, parent_hashes: Vec<BlockHash>) -> Vec<SealedBlockWithSenders<B>> {
// remove all parent child connection and all the child children blocks that are connected
// to the discarded parent blocks.
let mut remove_parent_children = parent_hashes;

View File

@ -52,7 +52,7 @@ where
/// Processes requests.
///
/// This type is responsible for processing incoming requests.
handler: EngineApiRequestHandler<EngineApiRequest<N::Engine>>,
handler: EngineApiRequestHandler<EngineApiRequest<N::Engine, N::Primitives>>,
/// Receiver for incoming requests (from the engine API endpoint) that need to be processed.
incoming_requests: EngineMessageStream<N::Engine>,
}

View File

@ -17,7 +17,7 @@ pub use reth_engine_tree::{
};
use reth_evm::execute::BlockExecutorProvider;
use reth_network_p2p::EthBlockClient;
use reth_node_types::{BlockTy, NodeTypesWithEngine};
use reth_node_types::{BlockTy, NodeTypes, NodeTypesWithEngine};
use reth_payload_builder::PayloadBuilderHandle;
use reth_primitives::EthPrimitives;
use reth_provider::{providers::BlockchainProvider2, ProviderFactory};
@ -37,7 +37,9 @@ pub type EngineMessageStream<T> = Pin<Box<dyn Stream<Item = BeaconEngineMessage<
/// Alias for chain orchestrator.
type EngineServiceType<N, Client> = ChainOrchestrator<
EngineHandler<
EngineApiRequestHandler<EngineApiRequest<<N as NodeTypesWithEngine>::Engine>>,
EngineApiRequestHandler<
EngineApiRequest<<N as NodeTypesWithEngine>::Engine, <N as NodeTypes>::Primitives>,
>,
EngineMessageStream<<N as NodeTypesWithEngine>::Engine>,
BasicBlockDownloader<Client>,
>,

View File

@ -238,14 +238,14 @@ impl EngineApiKind {
/// The request variants that the engine API handler can receive.
#[derive(Debug)]
pub enum EngineApiRequest<T: EngineTypes> {
pub enum EngineApiRequest<T: EngineTypes, N: NodePrimitives> {
/// A request received from the consensus engine.
Beacon(BeaconEngineMessage<T>),
/// Request to insert an already executed block, e.g. via payload building.
InsertExecutedBlock(ExecutedBlock),
InsertExecutedBlock(ExecutedBlock<N>),
}
impl<T: EngineTypes> Display for EngineApiRequest<T> {
impl<T: EngineTypes, N: NodePrimitives> Display for EngineApiRequest<T, N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Beacon(msg) => msg.fmt(f),
@ -256,14 +256,16 @@ impl<T: EngineTypes> Display for EngineApiRequest<T> {
}
}
impl<T: EngineTypes> From<BeaconEngineMessage<T>> for EngineApiRequest<T> {
impl<T: EngineTypes, N: NodePrimitives> From<BeaconEngineMessage<T>> for EngineApiRequest<T, N> {
fn from(msg: BeaconEngineMessage<T>) -> Self {
Self::Beacon(msg)
}
}
impl<T: EngineTypes> From<EngineApiRequest<T>> for FromEngine<EngineApiRequest<T>> {
fn from(req: EngineApiRequest<T>) -> Self {
impl<T: EngineTypes, N: NodePrimitives> From<EngineApiRequest<T, N>>
for FromEngine<EngineApiRequest<T, N>>
{
fn from(req: EngineApiRequest<T, N>) -> Self {
Self::Request(req)
}
}

View File

@ -36,8 +36,8 @@ use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_builder_primitives::PayloadBuilder;
use reth_payload_primitives::PayloadBuilderAttributes;
use reth_primitives::{
Block, EthPrimitives, GotExpected, NodePrimitives, SealedBlock, SealedBlockWithSenders,
SealedHeader,
EthPrimitives, GotExpected, NodePrimitives, SealedBlock, SealedBlockFor,
SealedBlockWithSenders, SealedHeader,
};
use reth_provider::{
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
@ -53,7 +53,6 @@ use std::{
cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, VecDeque},
fmt::Debug,
marker::PhantomData,
ops::Bound,
sync::{
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
@ -129,7 +128,7 @@ impl<N: NodePrimitives> TreeState<N> {
}
/// Returns the block by hash.
fn block_by_hash(&self, hash: B256) -> Option<Arc<SealedBlock<N::BlockHeader, N::BlockBody>>> {
fn block_by_hash(&self, hash: B256) -> Option<Arc<SealedBlockFor<N::Block>>> {
self.blocks_by_hash.get(&hash).map(|b| b.block.clone())
}
@ -386,19 +385,19 @@ impl<N: NodePrimitives> TreeState<N> {
///
/// This type is not shareable.
#[derive(Debug)]
pub struct EngineApiTreeState {
pub struct EngineApiTreeState<N: NodePrimitives> {
/// Tracks the state of the blockchain tree.
tree_state: TreeState,
tree_state: TreeState<N>,
/// Tracks the forkchoice state updates received by the CL.
forkchoice_state_tracker: ForkchoiceStateTracker,
/// Buffer of detached blocks.
buffer: BlockBuffer,
buffer: BlockBuffer<N::Block>,
/// Tracks the header of invalid payloads that were rejected by the engine because they're
/// invalid.
invalid_headers: InvalidHeaderCache,
}
impl EngineApiTreeState {
impl<N: NodePrimitives> EngineApiTreeState<N> {
fn new(
block_buffer_limit: u32,
max_invalid_header_cache_length: u32,
@ -474,10 +473,10 @@ where
{
provider: P,
executor_provider: E,
consensus: Arc<dyn FullConsensus>,
consensus: Arc<dyn FullConsensus<N>>,
payload_validator: V,
/// Keeps track of internals such as executed and buffered blocks.
state: EngineApiTreeState,
state: EngineApiTreeState<N>,
/// The half for sending messages to the engine.
///
/// This is kept so that we can queue in messages to ourself that we can process later, for
@ -486,20 +485,20 @@ where
/// them one by one so that we can handle incoming engine API in between and don't become
/// unresponsive. This can happen during live sync transition where we're trying to close the
/// gap (up to 3 epochs of blocks in the worst case).
incoming_tx: Sender<FromEngine<EngineApiRequest<T>>>,
incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>>>,
/// Incoming engine API requests.
incoming: Receiver<FromEngine<EngineApiRequest<T>>>,
incoming: Receiver<FromEngine<EngineApiRequest<T, N>>>,
/// Outgoing events that are emitted to the handler.
outgoing: UnboundedSender<EngineApiEvent>,
outgoing: UnboundedSender<EngineApiEvent<N>>,
/// Channels to the persistence layer.
persistence: PersistenceHandle,
persistence: PersistenceHandle<N>,
/// Tracks the state changes of the persistence task.
persistence_state: PersistenceState,
/// Flag indicating the state of the node's backfill synchronization process.
backfill_sync_state: BackfillSyncState,
/// Keeps track of the state of the canonical chain that isn't persisted yet.
/// This is intended to be accessed from external sources, such as rpc.
canonical_in_memory_state: CanonicalInMemoryState,
canonical_in_memory_state: CanonicalInMemoryState<N>,
/// Handle to the payload builder that will receive payload attributes for valid forkchoice
/// updates
payload_builder: PayloadBuilderHandle<T>,
@ -511,8 +510,6 @@ where
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
/// The engine API variant of this handler
engine_kind: EngineApiKind,
/// Captures the types the engine operates on
_primtives: PhantomData<N>,
}
impl<N, P: Debug, E: Debug, T: EngineTypes + Debug, V: Debug> std::fmt::Debug
@ -546,6 +543,7 @@ where
N: NodePrimitives<
Block = reth_primitives::Block,
BlockHeader = reth_primitives::Header,
BlockBody = reth_primitives::BlockBody,
Receipt = reth_primitives::Receipt,
>,
P: DatabaseProviderFactory
@ -562,16 +560,16 @@ where
V: EngineValidator<T, Block = reth_primitives::Block>,
{
/// Creates a new [`EngineApiTreeHandler`].
#[allow(clippy::too_many_arguments)]
#[expect(clippy::too_many_arguments)]
pub fn new(
provider: P,
executor_provider: E,
consensus: Arc<dyn FullConsensus>,
consensus: Arc<dyn FullConsensus<N>>,
payload_validator: V,
outgoing: UnboundedSender<EngineApiEvent>,
state: EngineApiTreeState,
canonical_in_memory_state: CanonicalInMemoryState,
persistence: PersistenceHandle,
outgoing: UnboundedSender<EngineApiEvent<N>>,
state: EngineApiTreeState<N>,
canonical_in_memory_state: CanonicalInMemoryState<N>,
persistence: PersistenceHandle<N>,
persistence_state: PersistenceState,
payload_builder: PayloadBuilderHandle<T>,
config: TreeConfig,
@ -597,7 +595,6 @@ where
incoming_tx,
invalid_block_hook: Box::new(NoopInvalidBlockHook),
engine_kind,
_primtives: Default::default(),
}
}
@ -611,19 +608,19 @@ where
///
/// Returns the sender through which incoming requests can be sent to the task and the receiver
/// end of a [`EngineApiEvent`] unbounded channel to receive events from the engine.
#[allow(clippy::too_many_arguments)]
#[expect(clippy::complexity)]
pub fn spawn_new(
provider: P,
executor_provider: E,
consensus: Arc<dyn FullConsensus>,
consensus: Arc<dyn FullConsensus<N>>,
payload_validator: V,
persistence: PersistenceHandle,
persistence: PersistenceHandle<N>,
payload_builder: PayloadBuilderHandle<T>,
canonical_in_memory_state: CanonicalInMemoryState,
canonical_in_memory_state: CanonicalInMemoryState<N>,
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
kind: EngineApiKind,
) -> (Sender<FromEngine<EngineApiRequest<T>>>, UnboundedReceiver<EngineApiEvent>) {
) -> (Sender<FromEngine<EngineApiRequest<T, N>>>, UnboundedReceiver<EngineApiEvent<N>>) {
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
@ -661,7 +658,7 @@ where
}
/// Returns a new [`Sender`] to send messages to this type.
pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T>>> {
pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>>> {
self.incoming_tx.clone()
}
@ -859,7 +856,7 @@ where
///
/// Note: This does not update the tracked state and instead returns the new chain based on the
/// given head.
fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain>> {
fn on_new_head(&self, new_head: B256) -> ProviderResult<Option<NewCanonicalChain<N>>> {
// get the executed new head block
let Some(new_head_block) = self.state.tree_state.blocks_by_hash.get(&new_head) else {
return Ok(None)
@ -1133,7 +1130,7 @@ where
/// Returns an error if the engine channel is disconnected.
fn try_recv_engine_message(
&self,
) -> Result<Option<FromEngine<EngineApiRequest<T>>>, RecvError> {
) -> Result<Option<FromEngine<EngineApiRequest<T, N>>>, RecvError> {
if self.persistence_state.in_progress() {
// try to receive the next request with a timeout to not block indefinitely
match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
@ -1211,7 +1208,7 @@ where
/// Handles a message from the engine.
fn on_engine_message(
&mut self,
msg: FromEngine<EngineApiRequest<T>>,
msg: FromEngine<EngineApiRequest<T, N>>,
) -> Result<(), InsertBlockFatalError> {
match msg {
FromEngine::Event(event) => match event {
@ -1452,7 +1449,7 @@ where
}
/// Emits an outgoing event to the engine.
fn emit_event(&mut self, event: impl Into<EngineApiEvent>) {
fn emit_event(&mut self, event: impl Into<EngineApiEvent<N>>) {
let event = event.into();
if event.is_backfill_action() {
@ -1496,7 +1493,7 @@ where
/// Returns a batch of consecutive canonical blocks to persist in the range
/// `(last_persisted_number .. canonical_head - threshold]` . The expected
/// order is oldest -> newest.
fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlock> {
fn get_canonical_blocks_to_persist(&self) -> Vec<ExecutedBlock<N>> {
let mut blocks_to_persist = Vec::new();
let mut current_hash = self.state.tree_state.canonical_block_hash();
let last_persisted_number = self.persistence_state.last_persisted_block.number;
@ -1549,7 +1546,7 @@ where
/// has in memory.
///
/// For finalized blocks, this will return `None`.
fn executed_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock>> {
fn executed_block_by_hash(&self, hash: B256) -> ProviderResult<Option<ExecutedBlock<N>>> {
trace!(target: "engine::tree", ?hash, "Fetching executed block by hash");
// check memory first
let block = self.state.tree_state.executed_block_by_hash(hash).cloned();
@ -1595,7 +1592,7 @@ where
}
/// Return block from database or in-memory state by hash.
fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<Block>> {
fn block_by_hash(&self, hash: B256) -> ProviderResult<Option<N::Block>> {
// check database first
let mut block = self.provider.block_by_hash(hash)?;
if block.is_none() {
@ -1754,7 +1751,10 @@ where
/// Validate if block is correct and satisfies all the consensus rules that concern the header
/// and block body itself.
fn validate_block(&self, block: &SealedBlockWithSenders) -> Result<(), ConsensusError> {
fn validate_block(
&self,
block: &SealedBlockWithSenders<N::Block>,
) -> Result<(), ConsensusError> {
if let Err(e) = self.consensus.validate_header_with_total_difficulty(block, U256::MAX) {
error!(
target: "engine::tree",
@ -1951,7 +1951,7 @@ where
/// If either of these are true, then this returns the height of the first block. Otherwise,
/// this returns [`None`]. This should be used to check whether or not we should be sending a
/// remove command to the persistence task.
fn find_disk_reorg(&self, chain_update: &NewCanonicalChain) -> Option<u64> {
fn find_disk_reorg(&self, chain_update: &NewCanonicalChain<N>) -> Option<u64> {
let NewCanonicalChain::Reorg { new, old: _ } = chain_update else { return None };
let BlockNumHash { number: new_num, hash: new_hash } =
@ -1978,7 +1978,7 @@ where
/// Invoked when we the canonical chain has been updated.
///
/// This is invoked on a valid forkchoice update, or if we can make the target block canonical.
fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain) {
fn on_canonical_chain_update(&mut self, chain_update: NewCanonicalChain<N>) {
trace!(target: "engine::tree", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update");
let start = Instant::now();
@ -2030,7 +2030,7 @@ where
}
/// This reinserts any blocks in the new chain that do not already exist in the tree
fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock>) {
fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
for block in new_chain {
if self.state.tree_state.executed_block_by_hash(block.block.hash()).is_none() {
trace!(target: "engine::tree", num=?block.block.number, hash=?block.block.hash(), "Reinserting block into tree state");
@ -2296,7 +2296,7 @@ where
self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
debug!(target: "engine::tree", ?root_elapsed, block=?sealed_block.num_hash(), "Calculated state root");
let executed: ExecutedBlock = ExecutedBlock {
let executed: ExecutedBlock<N> = ExecutedBlock {
block: sealed_block.clone(),
senders: Arc::new(block.senders),
execution_output: Arc::new(ExecutionOutcome::from((output, block_number))),
@ -2636,7 +2636,7 @@ mod tests {
use reth_engine_primitives::ForkchoiceStatus;
use reth_ethereum_engine_primitives::{EthEngineTypes, EthereumEngineValidator};
use reth_evm::test_utils::MockExecutorProvider;
use reth_primitives::{BlockExt, EthPrimitives};
use reth_primitives::{Block, BlockExt, EthPrimitives};
use reth_provider::test_utils::MockEthProvider;
use reth_rpc_types_compat::engine::{block_to_payload_v1, payload::block_to_payload_v3};
use reth_trie::updates::TrieUpdates;
@ -2708,7 +2708,7 @@ mod tests {
EthEngineTypes,
EthereumEngineValidator,
>,
to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes>>>,
to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>>>,
from_tree_rx: UnboundedReceiver<EngineApiEvent>,
blocks: Vec<ExecutedBlock>,
action_rx: Receiver<PersistenceAction>,