feat: relax bounds for EngineApiTreeHandler (#13257)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Arsenii Kulikov
2024-12-10 13:06:39 +04:00
committed by GitHub
parent d856c8e5bc
commit da99986ea2
9 changed files with 139 additions and 116 deletions

3
Cargo.lock generated
View File

@ -6712,11 +6712,13 @@ dependencies = [
name = "reth-blockchain-tree-api"
version = "1.1.2"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-primitives",
"reth-consensus",
"reth-execution-errors",
"reth-primitives",
"reth-primitives-traits",
"reth-storage-errors",
"thiserror 2.0.5",
]
@ -7408,6 +7410,7 @@ dependencies = [
"reth-payload-builder-primitives",
"reth-payload-primitives",
"reth-primitives",
"reth-primitives-traits",
"reth-provider",
"reth-prune",
"reth-prune-types",

View File

@ -14,9 +14,11 @@ workspace = true
reth-consensus.workspace = true
reth-execution-errors.workspace = true
reth-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-storage-errors.workspace = true
# alloy
alloy-consensus.workspace = true
alloy-primitives.workspace = true
alloy-eips.workspace = true

View File

@ -1,11 +1,13 @@
//! Error handling for the blockchain tree
use alloy_consensus::BlockHeader;
use alloy_primitives::{BlockHash, BlockNumber};
use reth_consensus::ConsensusError;
use reth_execution_errors::{
BlockExecutionError, BlockValidationError, InternalBlockExecutionError,
};
use reth_primitives::SealedBlock;
use reth_primitives::{SealedBlock, SealedBlockFor};
use reth_primitives_traits::{Block, BlockBody};
pub use reth_storage_errors::provider::ProviderError;
/// Various error cases that can occur when a block violates tree assumptions.
@ -210,48 +212,48 @@ impl InsertBlockErrorData {
}
}
struct InsertBlockErrorDataTwo {
block: SealedBlock,
struct InsertBlockErrorDataTwo<B: Block> {
block: SealedBlockFor<B>,
kind: InsertBlockErrorKindTwo,
}
impl std::fmt::Display for InsertBlockErrorDataTwo {
impl<B: Block> std::fmt::Display for InsertBlockErrorDataTwo<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Failed to insert block (hash={}, number={}, parent_hash={}): {}",
self.block.hash(),
self.block.number,
self.block.parent_hash,
self.block.number(),
self.block.parent_hash(),
self.kind
)
}
}
impl std::fmt::Debug for InsertBlockErrorDataTwo {
impl<B: Block> std::fmt::Debug for InsertBlockErrorDataTwo<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InsertBlockError")
.field("error", &self.kind)
.field("hash", &self.block.hash())
.field("number", &self.block.number)
.field("parent_hash", &self.block.parent_hash)
.field("num_txs", &self.block.body.transactions.len())
.field("number", &self.block.number())
.field("parent_hash", &self.block.parent_hash())
.field("num_txs", &self.block.body.transactions().len())
.finish_non_exhaustive()
}
}
impl core::error::Error for InsertBlockErrorDataTwo {
impl<B: Block> core::error::Error for InsertBlockErrorDataTwo<B> {
fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
Some(&self.kind)
}
}
impl InsertBlockErrorDataTwo {
const fn new(block: SealedBlock, kind: InsertBlockErrorKindTwo) -> Self {
impl<B: Block> InsertBlockErrorDataTwo<B> {
const fn new(block: SealedBlockFor<B>, kind: InsertBlockErrorKindTwo) -> Self {
Self { block, kind }
}
fn boxed(block: SealedBlock, kind: InsertBlockErrorKindTwo) -> Box<Self> {
fn boxed(block: SealedBlockFor<B>, kind: InsertBlockErrorKindTwo) -> Box<Self> {
Box::new(Self::new(block, kind))
}
}
@ -259,36 +261,36 @@ impl InsertBlockErrorDataTwo {
/// Error thrown when inserting a block failed because the block is considered invalid.
#[derive(thiserror::Error)]
#[error(transparent)]
pub struct InsertBlockErrorTwo {
inner: Box<InsertBlockErrorDataTwo>,
pub struct InsertBlockErrorTwo<B: Block> {
inner: Box<InsertBlockErrorDataTwo<B>>,
}
// === impl InsertBlockErrorTwo ===
impl InsertBlockErrorTwo {
impl<B: Block> InsertBlockErrorTwo<B> {
/// Create a new `InsertInvalidBlockErrorTwo`
pub fn new(block: SealedBlock, kind: InsertBlockErrorKindTwo) -> Self {
pub fn new(block: SealedBlockFor<B>, kind: InsertBlockErrorKindTwo) -> Self {
Self { inner: InsertBlockErrorDataTwo::boxed(block, kind) }
}
/// Create a new `InsertInvalidBlockError` from a consensus error
pub fn consensus_error(error: ConsensusError, block: SealedBlock) -> Self {
pub fn consensus_error(error: ConsensusError, block: SealedBlockFor<B>) -> Self {
Self::new(block, InsertBlockErrorKindTwo::Consensus(error))
}
/// Create a new `InsertInvalidBlockError` from a consensus error
pub fn sender_recovery_error(block: SealedBlock) -> Self {
pub fn sender_recovery_error(block: SealedBlockFor<B>) -> Self {
Self::new(block, InsertBlockErrorKindTwo::SenderRecovery)
}
/// Create a new `InsertInvalidBlockError` from an execution error
pub fn execution_error(error: BlockExecutionError, block: SealedBlock) -> Self {
pub fn execution_error(error: BlockExecutionError, block: SealedBlockFor<B>) -> Self {
Self::new(block, InsertBlockErrorKindTwo::Execution(error))
}
/// Consumes the error and returns the block that resulted in the error
#[inline]
pub fn into_block(self) -> SealedBlock {
pub fn into_block(self) -> SealedBlockFor<B> {
self.inner.block
}
@ -300,19 +302,19 @@ impl InsertBlockErrorTwo {
/// Returns the block that resulted in the error
#[inline]
pub const fn block(&self) -> &SealedBlock {
pub const fn block(&self) -> &SealedBlockFor<B> {
&self.inner.block
}
/// Consumes the type and returns the block and error kind.
#[inline]
pub fn split(self) -> (SealedBlock, InsertBlockErrorKindTwo) {
pub fn split(self) -> (SealedBlockFor<B>, InsertBlockErrorKindTwo) {
let inner = *self.inner;
(inner.block, inner.kind)
}
}
impl std::fmt::Debug for InsertBlockErrorTwo {
impl<B: Block> std::fmt::Debug for InsertBlockErrorTwo<B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&self.inner, f)
}

View File

@ -2,7 +2,7 @@ use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use alloy_rpc_types_engine::ForkchoiceState;
use reth_engine_primitives::ForkchoiceStatus;
use reth_primitives::{EthPrimitives, NodePrimitives, SealedBlock, SealedHeader};
use reth_primitives::{EthPrimitives, NodePrimitives, SealedBlockFor, SealedHeader};
use std::{
fmt::{Display, Formatter, Result},
sync::Arc,
@ -15,9 +15,9 @@ pub enum BeaconConsensusEngineEvent<N: NodePrimitives = EthPrimitives> {
/// The fork choice state was updated, and the current fork choice status
ForkchoiceUpdated(ForkchoiceState, ForkchoiceStatus),
/// A block was added to the fork chain.
ForkBlockAdded(Arc<SealedBlock<N::BlockHeader, N::BlockBody>>, Duration),
ForkBlockAdded(Arc<SealedBlockFor<N::Block>>, Duration),
/// A block was added to the canonical chain, and the elapsed time validating the block
CanonicalBlockAdded(Arc<SealedBlock<N::BlockHeader, N::BlockBody>>, Duration),
CanonicalBlockAdded(Arc<SealedBlockFor<N::Block>>, Duration),
/// A canonical chain was committed, and the elapsed time committing the data
CanonicalChainCommitted(Box<SealedHeader<N::BlockHeader>>, Duration),
/// The consensus engine is involved in live sync, and has specific progress

View File

@ -52,7 +52,7 @@ where
/// Processes requests.
///
/// This type is responsible for processing incoming requests.
handler: EngineApiRequestHandler<EngineApiRequest<N::Engine, N::Primitives>>,
handler: EngineApiRequestHandler<EngineApiRequest<N::Engine, N::Primitives>, N::Primitives>,
/// Receiver for incoming requests (from the engine API endpoint) that need to be processed.
incoming_requests: EngineMessageStream<N::Engine>,
}

View File

@ -39,6 +39,7 @@ type EngineServiceType<N, Client> = ChainOrchestrator<
EngineHandler<
EngineApiRequestHandler<
EngineApiRequest<<N as NodeTypesWithEngine>::Engine, <N as NodeTypes>::Primitives>,
<N as NodeTypes>::Primitives,
>,
EngineMessageStream<<N as NodeTypesWithEngine>::Engine>,
BasicBlockDownloader<Client>,

View File

@ -26,6 +26,7 @@ reth-payload-builder-primitives.workspace = true
reth-payload-builder.workspace = true
reth-payload-primitives.workspace = true
reth-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-provider.workspace = true
reth-prune.workspace = true
reth-revm.workspace = true
@ -108,6 +109,7 @@ test-utils = [
"reth-network-p2p/test-utils",
"reth-payload-builder/test-utils",
"reth-primitives/test-utils",
"reth-primitives-traits/test-utils",
"reth-provider/test-utils",
"reth-prune-types",
"reth-prune-types?/test-utils",

View File

@ -11,6 +11,7 @@ use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_chain_state::ExecutedBlock;
use reth_engine_primitives::{BeaconEngineMessage, EngineTypes};
use reth_primitives::{NodePrimitives, SealedBlockWithSenders};
use reth_primitives_traits::Block;
use std::{
collections::HashSet,
fmt::Display,
@ -66,7 +67,7 @@ impl<T, S, D> EngineHandler<T, S, D> {
impl<T, S, D> ChainHandler for EngineHandler<T, S, D>
where
T: EngineRequestHandler,
T: EngineRequestHandler<Block = reth_primitives::Block>,
S: Stream + Send + Sync + Unpin + 'static,
<S as Stream>::Item: Into<T::Request>,
D: BlockDownloader,
@ -139,9 +140,11 @@ pub trait EngineRequestHandler: Send + Sync {
type Event: Send;
/// The request type this handler can process.
type Request;
/// Type of the block sent in [`FromEngine::DownloadedBlocks`] variant.
type Block: Block;
/// Informs the handler about an event from the [`EngineHandler`].
fn on_event(&mut self, event: FromEngine<Self::Request>);
fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>);
/// Advances the handler.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<RequestHandlerEvent<Self::Event>>;
@ -167,31 +170,32 @@ pub trait EngineRequestHandler: Send + Sync {
/// In case required blocks are missing, the handler will request them from the network, by emitting
/// a download request upstream.
#[derive(Debug)]
pub struct EngineApiRequestHandler<Request> {
pub struct EngineApiRequestHandler<Request, N: NodePrimitives> {
/// channel to send messages to the tree to execute the payload.
to_tree: Sender<FromEngine<Request>>,
to_tree: Sender<FromEngine<Request, N::Block>>,
/// channel to receive messages from the tree.
from_tree: UnboundedReceiver<EngineApiEvent>,
from_tree: UnboundedReceiver<EngineApiEvent<N>>,
}
impl<Request> EngineApiRequestHandler<Request> {
impl<Request, N: NodePrimitives> EngineApiRequestHandler<Request, N> {
/// Creates a new `EngineApiRequestHandler`.
pub const fn new(
to_tree: Sender<FromEngine<Request>>,
from_tree: UnboundedReceiver<EngineApiEvent>,
to_tree: Sender<FromEngine<Request, N::Block>>,
from_tree: UnboundedReceiver<EngineApiEvent<N>>,
) -> Self {
Self { to_tree, from_tree }
}
}
impl<Request> EngineRequestHandler for EngineApiRequestHandler<Request>
impl<Request, N: NodePrimitives> EngineRequestHandler for EngineApiRequestHandler<Request, N>
where
Request: Send,
{
type Event = BeaconConsensusEngineEvent;
type Event = BeaconConsensusEngineEvent<N>;
type Request = Request;
type Block = N::Block;
fn on_event(&mut self, event: FromEngine<Self::Request>) {
fn on_event(&mut self, event: FromEngine<Self::Request, Self::Block>) {
// delegate to the tree
let _ = self.to_tree.send(event);
}
@ -263,7 +267,7 @@ impl<T: EngineTypes, N: NodePrimitives> From<BeaconEngineMessage<T>> for EngineA
}
impl<T: EngineTypes, N: NodePrimitives> From<EngineApiRequest<T, N>>
for FromEngine<EngineApiRequest<T, N>>
for FromEngine<EngineApiRequest<T, N>, N::Block>
{
fn from(req: EngineApiRequest<T, N>) -> Self {
Self::Request(req)
@ -297,16 +301,16 @@ impl<N: NodePrimitives> From<BeaconConsensusEngineEvent<N>> for EngineApiEvent<N
/// Events received from the engine.
#[derive(Debug)]
pub enum FromEngine<Req> {
pub enum FromEngine<Req, B: Block> {
/// Event from the top level orchestrator.
Event(FromOrchestrator),
/// Request from the engine.
Request(Req),
/// Downloaded blocks from the network.
DownloadedBlocks(Vec<SealedBlockWithSenders>),
DownloadedBlocks(Vec<SealedBlockWithSenders<B>>),
}
impl<Req: Display> Display for FromEngine<Req> {
impl<Req: Display, B: Block> Display for FromEngine<Req, B> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Event(ev) => write!(f, "Event({ev:?})"),
@ -318,7 +322,7 @@ impl<Req: Display> Display for FromEngine<Req> {
}
}
impl<Req> From<FromOrchestrator> for FromEngine<Req> {
impl<Req, B: Block> From<FromOrchestrator> for FromEngine<Req, B> {
fn from(event: FromOrchestrator) -> Self {
Self::Event(event)
}

View File

@ -5,7 +5,7 @@ use crate::{
persistence::PersistenceHandle,
tree::metrics::EngineApiMetrics,
};
use alloy_consensus::{BlockHeader, Header};
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumHash;
use alloy_primitives::{
map::{HashMap, HashSet},
@ -36,9 +36,10 @@ use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_builder_primitives::PayloadBuilder;
use reth_payload_primitives::PayloadBuilderAttributes;
use reth_primitives::{
EthPrimitives, GotExpected, NodePrimitives, SealedBlock, SealedBlockFor,
SealedBlockWithSenders, SealedHeader,
EthPrimitives, GotExpected, NodePrimitives, SealedBlockFor, SealedBlockWithSenders,
SealedHeader,
};
use reth_primitives_traits::Block;
use reth_provider::{
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
HashedPostStateProvider, ProviderError, StateCommitmentProvider, StateProviderBox,
@ -485,9 +486,9 @@ where
/// them one by one so that we can handle incoming engine API in between and don't become
/// unresponsive. This can happen during live sync transition where we're trying to close the
/// gap (up to 3 epochs of blocks in the worst case).
incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>>>,
incoming_tx: Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>,
/// Incoming engine API requests.
incoming: Receiver<FromEngine<EngineApiRequest<T, N>>>,
incoming: Receiver<FromEngine<EngineApiRequest<T, N>, N::Block>>,
/// Outgoing events that are emitted to the handler.
outgoing: UnboundedSender<EngineApiEvent<N>>,
/// Channels to the persistence layer.
@ -540,24 +541,20 @@ where
impl<N, P, E, T, V> EngineApiTreeHandler<N, P, E, T, V>
where
N: NodePrimitives<
Block = reth_primitives::Block,
BlockHeader = reth_primitives::Header,
BlockBody = reth_primitives::BlockBody,
Receipt = reth_primitives::Receipt,
>,
N: NodePrimitives,
P: DatabaseProviderFactory
+ BlockReader<Block = N::Block, Header = N::BlockHeader>
+ StateProviderFactory
+ StateReader<Receipt = reth_primitives::Receipt>
+ StateReader<Receipt = N::Receipt>
+ StateCommitmentProvider
+ HashedPostStateProvider
+ Clone
+ 'static,
<P as DatabaseProviderFactory>::Provider: BlockReader,
<P as DatabaseProviderFactory>::Provider:
BlockReader<Block = N::Block, Header = N::BlockHeader>,
E: BlockExecutorProvider<Primitives = N>,
T: EngineTypes,
V: EngineValidator<T, Block = reth_primitives::Block>,
V: EngineValidator<T, Block = N::Block>,
{
/// Creates a new [`EngineApiTreeHandler`].
#[expect(clippy::too_many_arguments)]
@ -620,7 +617,8 @@ where
config: TreeConfig,
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
kind: EngineApiKind,
) -> (Sender<FromEngine<EngineApiRequest<T, N>>>, UnboundedReceiver<EngineApiEvent<N>>) {
) -> (Sender<FromEngine<EngineApiRequest<T, N>, N::Block>>, UnboundedReceiver<EngineApiEvent<N>>)
{
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
@ -658,7 +656,7 @@ where
}
/// Returns a new [`Sender`] to send messages to this type.
pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>>> {
pub fn sender(&self) -> Sender<FromEngine<EngineApiRequest<T, N>, N::Block>> {
self.incoming_tx.clone()
}
@ -698,7 +696,7 @@ where
/// block request processing isn't blocked for a long time.
fn on_downloaded(
&mut self,
mut blocks: Vec<SealedBlockWithSenders>,
mut blocks: Vec<SealedBlockWithSenders<N::Block>>,
) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
if blocks.is_empty() {
// nothing to execute
@ -797,7 +795,7 @@ where
let block_hash = block.hash();
let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_hash);
if lowest_buffered_ancestor == block_hash {
lowest_buffered_ancestor = block.parent_hash;
lowest_buffered_ancestor = block.parent_hash();
}
// now check the block itself
@ -862,11 +860,11 @@ where
return Ok(None)
};
let new_head_number = new_head_block.block.number;
let new_head_number = new_head_block.block.number();
let mut current_canonical_number = self.state.tree_state.current_canonical_head.number;
let mut new_chain = vec![new_head_block.clone()];
let mut current_hash = new_head_block.block.parent_hash;
let mut current_hash = new_head_block.block.parent_hash();
let mut current_number = new_head_number - 1;
// Walk back the new chain until we reach a block we know about
@ -875,7 +873,7 @@ where
// that are _above_ the current canonical head.
while current_number > current_canonical_number {
if let Some(block) = self.executed_block_by_hash(current_hash)? {
current_hash = block.block.parent_hash;
current_hash = block.block.parent_hash();
current_number -= 1;
new_chain.push(block);
} else {
@ -904,7 +902,7 @@ where
while current_canonical_number > current_number {
if let Some(block) = self.executed_block_by_hash(old_hash)? {
old_chain.push(block.clone());
old_hash = block.block.header.parent_hash;
old_hash = block.block.header.parent_hash();
current_canonical_number -= 1;
} else {
// This shouldn't happen as we're walking back the canonical chain
@ -920,7 +918,7 @@ where
// a common ancestor (fork block) is reached.
while old_hash != current_hash {
if let Some(block) = self.executed_block_by_hash(old_hash)? {
old_hash = block.block.header.parent_hash;
old_hash = block.block.header.parent_hash();
old_chain.push(block);
} else {
// This shouldn't happen as we're walking back the canonical chain
@ -929,7 +927,7 @@ where
}
if let Some(block) = self.executed_block_by_hash(current_hash)? {
current_hash = block.block.parent_hash;
current_hash = block.block.parent_hash();
new_chain.push(block);
} else {
// This shouldn't happen as we've already walked this path
@ -958,10 +956,10 @@ where
return Ok(false)
}
// We already passed the canonical head
if current_block.number <= canonical_head.number {
if current_block.number() <= canonical_head.number {
break
}
current_hash = current_block.parent_hash;
current_hash = current_block.parent_hash();
}
// verify that the given hash is not already part of canonical chain stored in memory
@ -1040,7 +1038,7 @@ where
// to return an error
ProviderError::HeaderNotFound(state.head_block_hash.into())
})?;
let updated = self.process_payload_attributes(attr, &tip, state, version);
let updated = self.process_payload_attributes(attr, tip.header(), state, version);
return Ok(TreeOutcome::new(updated))
}
@ -1069,13 +1067,13 @@ where
// 3. check if the head is already part of the canonical chain
if let Ok(Some(canonical_header)) = self.find_canonical_header(state.head_block_hash) {
debug!(target: "engine::tree", head = canonical_header.number, "fcu head block is already canonical");
debug!(target: "engine::tree", head = canonical_header.number(), "fcu head block is already canonical");
// For OpStack the proposers are allowed to reorg their own chain at will, so we need to
// always trigger a new payload job if requested.
if self.engine_kind.is_opstack() {
if let Some(attr) = attrs {
debug!(target: "engine::tree", head = canonical_header.number, "handling payload attributes for canonical head");
debug!(target: "engine::tree", head = canonical_header.number(), "handling payload attributes for canonical head");
let updated =
self.process_payload_attributes(attr, &canonical_header, state, version);
return Ok(TreeOutcome::new(updated))
@ -1128,9 +1126,10 @@ where
/// received in time.
///
/// Returns an error if the engine channel is disconnected.
#[expect(clippy::type_complexity)]
fn try_recv_engine_message(
&self,
) -> Result<Option<FromEngine<EngineApiRequest<T, N>>>, RecvError> {
) -> Result<Option<FromEngine<EngineApiRequest<T, N>, N::Block>>, RecvError> {
if self.persistence_state.in_progress() {
// try to receive the next request with a timeout to not block indefinitely
match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
@ -1208,7 +1207,7 @@ where
/// Handles a message from the engine.
fn on_engine_message(
&mut self,
msg: FromEngine<EngineApiRequest<T, N>>,
msg: FromEngine<EngineApiRequest<T, N>, N::Block>,
) -> Result<(), InsertBlockFatalError> {
match msg {
FromEngine::Event(event) => match event {
@ -1384,7 +1383,7 @@ where
.state
.buffer
.block(&sync_target_state.finalized_block_hash)
.map(|block| block.number);
.map(|block| block.number());
// The block number that the backfill finished at - if the progress or newest
// finalized is None then we can't check the distance anyways.
@ -1505,15 +1504,15 @@ where
debug!(target: "engine::tree", ?last_persisted_number, ?canonical_head_number, ?target_number, ?current_hash, "Returning canonical blocks to persist");
while let Some(block) = self.state.tree_state.blocks_by_hash.get(&current_hash) {
if block.block.number <= last_persisted_number {
if block.block.number() <= last_persisted_number {
break;
}
if block.block.number <= target_number {
if block.block.number() <= target_number {
blocks_to_persist.push(block.clone());
}
current_hash = block.block.parent_hash;
current_hash = block.block.parent_hash();
}
// reverse the order so that the oldest block comes first
@ -1579,7 +1578,10 @@ where
}
/// Return sealed block from database or in-memory state by hash.
fn sealed_header_by_hash(&self, hash: B256) -> ProviderResult<Option<SealedHeader>> {
fn sealed_header_by_hash(
&self,
hash: B256,
) -> ProviderResult<Option<SealedHeader<N::BlockHeader>>> {
// check memory first
let block =
self.state.tree_state.block_by_hash(hash).map(|block| block.as_ref().clone().header);
@ -1649,7 +1651,7 @@ where
self.state
.buffer
.lowest_ancestor(&hash)
.map(|block| block.parent_hash)
.map(|block| block.parent_hash())
.unwrap_or_else(|| hash)
}
@ -1696,7 +1698,7 @@ where
// Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
// PoW block, which we need to identify by looking at the parent's block difficulty
if let Some(parent) = self.block_by_hash(parent_hash)? {
if !parent.is_zero_difficulty() {
if !parent.header().difficulty().is_zero() {
parent_hash = B256::ZERO;
}
}
@ -1823,8 +1825,8 @@ where
/// Returns an error if sender recovery failed or inserting into the buffer failed.
fn buffer_block_without_senders(
&mut self,
block: SealedBlock,
) -> Result<(), InsertBlockErrorTwo> {
block: SealedBlockFor<N::Block>,
) -> Result<(), InsertBlockErrorTwo<N::Block>> {
match block.try_seal_with_senders() {
Ok(block) => self.buffer_block(block),
Err(block) => Err(InsertBlockErrorTwo::sender_recovery_error(block)),
@ -1832,7 +1834,10 @@ where
}
/// Pre-validates the block and inserts it into the buffer.
fn buffer_block(&mut self, block: SealedBlockWithSenders) -> Result<(), InsertBlockErrorTwo> {
fn buffer_block(
&mut self,
block: SealedBlockWithSenders<N::Block>,
) -> Result<(), InsertBlockErrorTwo<N::Block>> {
if let Err(err) = self.validate_block(&block) {
return Err(InsertBlockErrorTwo::consensus_error(err, block.block))
}
@ -1886,7 +1891,7 @@ where
// if we have buffered the finalized block, we should check how far
// we're off
exceeds_backfill_threshold =
self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number);
self.exceeds_backfill_run_threshold(canonical_tip_num, buffered_finalized.number());
}
// If this is invoked after we downloaded a block we can check if this block is the
@ -2011,7 +2016,7 @@ where
self.canonical_in_memory_state.set_canonical_head(tip.clone());
// Update metrics based on new tip
self.metrics.tree.canonical_chain_height.set(tip.number as f64);
self.metrics.tree.canonical_chain_height.set(tip.number() as f64);
// sends an event to all active listeners about the new canonical chain
self.canonical_in_memory_state.notify_canon_state(notification);
@ -2033,7 +2038,7 @@ where
fn reinsert_reorged_blocks(&mut self, new_chain: Vec<ExecutedBlock<N>>) {
for block in new_chain {
if self.state.tree_state.executed_block_by_hash(block.block.hash()).is_none() {
trace!(target: "engine::tree", num=?block.block.number, hash=?block.block.hash(), "Reinserting block into tree state");
trace!(target: "engine::tree", num=?block.block.number(), hash=?block.block.hash(), "Reinserting block into tree state");
self.state.tree_state.insert_executed(block);
}
}
@ -2086,10 +2091,10 @@ where
/// Returns an event with the appropriate action to take, such as:
/// - download more missing blocks
/// - try to canonicalize the target if the `block` is the tracked target (head) block.
#[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number,), target = "engine::tree")]
#[instrument(level = "trace", skip_all, fields(block_hash = %block.hash(), block_num = %block.number(),), target = "engine::tree")]
fn on_downloaded_block(
&mut self,
block: SealedBlockWithSenders,
block: SealedBlockWithSenders<N::Block>,
) -> Result<Option<TreeEvent>, InsertBlockFatalError> {
let block_num_hash = block.num_hash();
let lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block_num_hash.hash);
@ -2147,8 +2152,8 @@ where
fn insert_block_without_senders(
&mut self,
block: SealedBlock,
) -> Result<InsertPayloadOk2, InsertBlockErrorTwo> {
block: SealedBlockFor<N::Block>,
) -> Result<InsertPayloadOk2, InsertBlockErrorTwo<N::Block>> {
match block.try_seal_with_senders() {
Ok(block) => self.insert_block(block),
Err(block) => Err(InsertBlockErrorTwo::sender_recovery_error(block)),
@ -2157,17 +2162,17 @@ where
fn insert_block(
&mut self,
block: SealedBlockWithSenders,
) -> Result<InsertPayloadOk2, InsertBlockErrorTwo> {
block: SealedBlockWithSenders<N::Block>,
) -> Result<InsertPayloadOk2, InsertBlockErrorTwo<N::Block>> {
self.insert_block_inner(block.clone())
.map_err(|kind| InsertBlockErrorTwo::new(block.block, kind))
}
fn insert_block_inner(
&mut self,
block: SealedBlockWithSenders,
block: SealedBlockWithSenders<N::Block>,
) -> Result<InsertPayloadOk2, InsertBlockErrorKindTwo> {
debug!(target: "engine::tree", block=?block.num_hash(), parent = ?block.parent_hash, state_root = ?block.state_root, "Inserting new block into tree");
debug!(target: "engine::tree", block=?block.num_hash(), parent = ?block.parent_hash(), state_root = ?block.state_root(), "Inserting new block into tree");
if self.block_by_hash(block.hash())?.is_some() {
return Ok(InsertPayloadOk2::AlreadySeen(BlockStatus2::Valid))
@ -2179,14 +2184,14 @@ where
// validate block consensus rules
self.validate_block(&block)?;
trace!(target: "engine::tree", block=?block.num_hash(), parent=?block.parent_hash, "Fetching block state provider");
let Some(state_provider) = self.state_provider(block.parent_hash)? else {
trace!(target: "engine::tree", block=?block.num_hash(), parent=?block.parent_hash(), "Fetching block state provider");
let Some(state_provider) = self.state_provider(block.parent_hash())? else {
// we don't have the state required to execute this block, buffering it and find the
// missing parent block
let missing_ancestor = self
.state
.buffer
.lowest_ancestor(&block.parent_hash)
.lowest_ancestor(&block.parent_hash())
.map(|block| block.parent_num_hash())
.unwrap_or_else(|| block.parent_num_hash());
@ -2199,9 +2204,9 @@ where
};
// now validate against the parent
let parent_block = self.sealed_header_by_hash(block.parent_hash)?.ok_or_else(|| {
let parent_block = self.sealed_header_by_hash(block.parent_hash())?.ok_or_else(|| {
InsertBlockErrorKindTwo::Provider(ProviderError::HeaderNotFound(
block.parent_hash.into(),
block.parent_hash().into(),
))
})?;
if let Err(e) = self.consensus.validate_header_against_parent(&block, &parent_block) {
@ -2212,7 +2217,7 @@ where
trace!(target: "engine::tree", block=?block.num_hash(), "Executing block");
let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
let block_number = block.number;
let block_number = block.number();
let block_hash = block.hash();
let sealed_block = Arc::new(block.block.clone());
let block = block.unseal();
@ -2260,7 +2265,7 @@ where
let persistence_in_progress = self.persistence_state.in_progress();
if !persistence_in_progress {
state_root_result = match self
.compute_state_root_parallel(block.parent_hash, &hashed_state)
.compute_state_root_parallel(block.header().parent_hash(), &hashed_state)
{
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
@ -2278,7 +2283,7 @@ where
state_provider.state_root_with_updates(hashed_state.clone())?
};
if state_root != block.state_root {
if state_root != block.header().state_root() {
// call post-block hook
self.invalid_block_hook.on_invalid_block(
&parent_block,
@ -2287,7 +2292,7 @@ where
Some((&trie_output, state_root)),
);
return Err(ConsensusError::BodyStateRootDiff(
GotExpected { got: state_root, expected: block.state_root }.into(),
GotExpected { got: state_root, expected: block.header().state_root() }.into(),
)
.into())
}
@ -2304,7 +2309,7 @@ where
trie: Arc::new(trie_output),
};
if self.state.tree_state.canonical_block_hash() == executed.block().parent_hash {
if self.state.tree_state.canonical_block_hash() == executed.block().parent_hash() {
debug!(target: "engine::tree", pending = ?executed.block().num_hash() ,"updating pending block");
// if the parent is the canonical head, we can insert the block as the pending block
self.canonical_in_memory_state.set_pending_block(executed.clone());
@ -2375,7 +2380,7 @@ where
/// Returns the proper payload status response if the block is invalid.
fn on_insert_block_error(
&mut self,
error: InsertBlockErrorTwo,
error: InsertBlockErrorTwo<N::Block>,
) -> Result<PayloadStatus, InsertBlockFatalError> {
let (block, error) = error.split();
@ -2386,12 +2391,12 @@ where
// If the error was due to an invalid payload, the payload is added to the
// invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
// returned.
warn!(target: "engine::tree", invalid_hash=?block.hash(), invalid_number=?block.number, %validation_err, "Invalid block error on new payload");
warn!(target: "engine::tree", invalid_hash=?block.hash(), invalid_number=?block.number(), %validation_err, "Invalid block error on new payload");
let latest_valid_hash = if validation_err.is_block_pre_merge() {
// zero hash must be returned if block is pre-merge
Some(B256::ZERO)
} else {
self.latest_valid_hash_for_invalid_payload(block.parent_hash)?
self.latest_valid_hash_for_invalid_payload(block.parent_hash())?
};
// keep track of the invalid header
@ -2403,7 +2408,10 @@ where
}
/// Attempts to find the header for the given block hash if it is canonical.
pub fn find_canonical_header(&self, hash: B256) -> Result<Option<SealedHeader>, ProviderError> {
pub fn find_canonical_header(
&self,
hash: B256,
) -> Result<Option<SealedHeader<N::BlockHeader>>, ProviderError> {
let mut canonical = self.canonical_in_memory_state.header_by_hash(hash);
if canonical.is_none() {
@ -2434,7 +2442,7 @@ where
{
// we're also persisting the finalized block on disk so we can reload it on
// restart this is required by optimism which queries the finalized block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
let _ = self.persistence.save_finalized_block_number(finalized.number);
let _ = self.persistence.save_finalized_block_number(finalized.number());
self.canonical_in_memory_state.set_finalized(finalized);
}
}
@ -2462,7 +2470,7 @@ where
if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
// we're also persisting the safe block on disk so we can reload it on
// restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
let _ = self.persistence.save_safe_block_number(safe.number);
let _ = self.persistence.save_safe_block_number(safe.number());
self.canonical_in_memory_state.set_safe(safe);
}
}
@ -2537,7 +2545,7 @@ where
fn process_payload_attributes(
&self,
attrs: T::PayloadAttributes,
head: &Header,
head: &N::BlockHeader,
state: ForkchoiceState,
version: EngineApiMessageVersion,
) -> OnForkChoiceUpdated {
@ -2626,6 +2634,7 @@ pub enum AdvancePersistenceError {
mod tests {
use super::*;
use crate::persistence::PersistenceAction;
use alloy_consensus::Header;
use alloy_primitives::Bytes;
use alloy_rlp::Decodable;
use alloy_rpc_types_engine::{CancunPayloadFields, ExecutionPayloadSidecar};
@ -2708,7 +2717,7 @@ mod tests {
EthEngineTypes,
EthereumEngineValidator,
>,
to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>>>,
to_tree_tx: Sender<FromEngine<EngineApiRequest<EthEngineTypes, EthPrimitives>, Block>>,
from_tree_rx: UnboundedReceiver<EngineApiEvent>,
blocks: Vec<ExecutedBlock>,
action_rx: Receiver<PersistenceAction>,
@ -2843,7 +2852,7 @@ mod tests {
fn insert_block(
&mut self,
block: SealedBlockWithSenders,
) -> Result<InsertPayloadOk2, InsertBlockErrorTwo> {
) -> Result<InsertPayloadOk2, InsertBlockErrorTwo<Block>> {
let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
self.extend_execution_outcome([execution_outcome]);
self.tree.provider.add_state_root(block.state_root);