feat(engine): emit events with executed blocks (#14341)

This commit is contained in:
Roman Krasiuk
2025-02-09 12:07:10 +01:00
committed by GitHub
parent 46d3b6a32e
commit a215256075
6 changed files with 41 additions and 41 deletions

1
Cargo.lock generated
View File

@ -7226,6 +7226,7 @@ dependencies = [
"alloy-rpc-types-engine",
"auto_impl",
"futures",
"reth-chain-state",
"reth-errors",
"reth-execution-types",
"reth-payload-builder-primitives",

View File

@ -17,6 +17,7 @@ reth-payload-primitives.workspace = true
reth-payload-builder-primitives.workspace = true
reth-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-chain-state.workspace = true
reth-trie.workspace = true
reth-errors.workspace = true

View File

@ -1,7 +1,7 @@
//! Events emitted by the beacon consensus engine.
use crate::ForkchoiceStatus;
use alloc::{boxed::Box, sync::Arc};
use alloc::boxed::Box;
use alloy_consensus::BlockHeader;
use alloy_primitives::B256;
use alloy_rpc_types_engine::ForkchoiceState;
@ -9,7 +9,8 @@ use core::{
fmt::{Display, Formatter, Result},
time::Duration,
};
use reth_primitives::{EthPrimitives, SealedBlock};
use reth_chain_state::ExecutedBlockWithTrieUpdates;
use reth_primitives::EthPrimitives;
use reth_primitives_traits::{NodePrimitives, SealedHeader};
/// Events emitted by the consensus engine.
@ -18,9 +19,9 @@ pub enum BeaconConsensusEngineEvent<N: NodePrimitives = EthPrimitives> {
/// The fork choice state was updated, and the current fork choice status
ForkchoiceUpdated(ForkchoiceState, ForkchoiceStatus),
/// A block was added to the fork chain.
ForkBlockAdded(Arc<SealedBlock<N::Block>>, Duration),
ForkBlockAdded(ExecutedBlockWithTrieUpdates<N>, Duration),
/// A block was added to the canonical chain, and the elapsed time validating the block
CanonicalBlockAdded(Arc<SealedBlock<N::Block>>, Duration),
CanonicalBlockAdded(ExecutedBlockWithTrieUpdates<N>, Duration),
/// A canonical chain was committed, and the elapsed time committing the data
CanonicalChainCommitted(Box<SealedHeader<N::BlockHeader>>, Duration),
/// The consensus engine is involved in live sync, and has specific progress
@ -48,10 +49,14 @@ where
write!(f, "ForkchoiceUpdated({state:?}, {status:?})")
}
Self::ForkBlockAdded(block, duration) => {
write!(f, "ForkBlockAdded({:?}, {duration:?})", block.num_hash())
write!(f, "ForkBlockAdded({:?}, {duration:?})", block.recovered_block.num_hash())
}
Self::CanonicalBlockAdded(block, duration) => {
write!(f, "CanonicalBlockAdded({:?}, {duration:?})", block.num_hash())
write!(
f,
"CanonicalBlockAdded({:?}, {duration:?})",
block.recovered_block.num_hash()
)
}
Self::CanonicalChainCommitted(block, duration) => {
write!(f, "CanonicalChainCommitted({:?}, {duration:?})", block.num_hash())

View File

@ -11,14 +11,21 @@
extern crate alloc;
use alloy_primitives::B256;
use reth_payload_primitives::{BuiltPayload, PayloadAttributes};
mod error;
use core::fmt::{self, Debug};
use alloy_consensus::BlockHeader;
use alloy_eips::{eip7685::Requests, Decodable2718};
use alloy_primitives::B256;
use alloy_rpc_types_engine::{ExecutionPayloadSidecar, PayloadError};
use core::fmt::{self, Debug};
use reth_payload_primitives::{
validate_execution_requests, BuiltPayload, EngineApiMessageVersion,
EngineObjectValidationError, InvalidPayloadAttributesError, PayloadAttributes,
PayloadOrAttributes, PayloadTypes,
};
use reth_primitives::{NodePrimitives, SealedBlock};
use reth_primitives_traits::Block;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
mod error;
pub use error::*;
mod forkchoice;
@ -33,15 +40,6 @@ pub use event::*;
mod invalid_block_hook;
pub use invalid_block_hook::InvalidBlockHook;
use alloy_eips::{eip7685::Requests, Decodable2718};
use reth_payload_primitives::{
validate_execution_requests, EngineApiMessageVersion, EngineObjectValidationError,
InvalidPayloadAttributesError, PayloadOrAttributes, PayloadTypes,
};
use reth_primitives::{NodePrimitives, SealedBlock};
use reth_primitives_traits::Block;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
/// Struct aggregating [`alloy_rpc_types_engine::ExecutionPayload`] and [`ExecutionPayloadSidecar`]
/// and encapsulating complete payload supplied for execution.
#[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -1392,14 +1392,10 @@ where
self.canonical_in_memory_state.set_pending_block(block.clone());
}
let sealed_block = Arc::new(block.sealed_block().clone());
self.state.tree_state.insert_executed(block);
self.state.tree_state.insert_executed(block.clone());
self.metrics.engine.inserted_already_executed_blocks.increment(1);
self.emit_event(EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(
sealed_block,
now.elapsed(),
),
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, now.elapsed()),
));
}
EngineApiRequest::Beacon(request) => {
@ -2409,8 +2405,6 @@ where
return Err(e.into())
}
let sealed_block = Arc::new(block.clone_sealed_block());
// We only run the parallel state root if we are currently persisting blocks that are all
// ancestors of the one we are executing. If we're committing ancestor blocks, then: any
// trie updates being committed are a subset of the in-memory trie updates collected before
@ -2558,7 +2552,7 @@ where
self.handle_state_root_result(
state_root_handle,
state_root_config,
sealed_block.as_ref(),
block.sealed_block(),
&hashed_state,
&state_provider,
root_time,
@ -2617,8 +2611,7 @@ where
// prewarm tasks are still running at this point however
drop(prewarm_task_lock.write().unwrap());
// apply state updates to cache and save it (if saving was successful)
self.most_recent_cache =
state_provider.save_cache(sealed_block.hash(), &output.state).ok();
self.most_recent_cache = state_provider.save_cache(block.hash(), &output.state).ok();
let elapsed = save_cache_start.elapsed();
// record how long it took to save caches
@ -2641,15 +2634,15 @@ where
self.canonical_in_memory_state.set_pending_block(executed.clone());
}
self.state.tree_state.insert_executed(executed);
self.state.tree_state.insert_executed(executed.clone());
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
// emit insert event
let elapsed = start.elapsed();
let engine_event = if self.is_fork(block_num_hash.hash)? {
BeaconConsensusEngineEvent::ForkBlockAdded(sealed_block, elapsed)
BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed)
} else {
BeaconConsensusEngineEvent::CanonicalBlockAdded(sealed_block, elapsed)
BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed)
};
self.emit_event(EngineApiEvent::BeaconConsensus(engine_event));
@ -3540,9 +3533,9 @@ mod tests {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, _),
) => {
assert_eq!(block.hash(), expected_hash);
assert_eq!(executed.recovered_block.hash(), expected_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}
@ -3552,10 +3545,10 @@ mod tests {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(BeaconConsensusEngineEvent::ForkBlockAdded(
block,
executed,
_,
)) => {
assert_eq!(block.hash(), expected_hash);
assert_eq!(executed.recovered_block.hash(), expected_hash);
}
_ => panic!("Unexpected event: {:#?}", event),
}

View File

@ -250,7 +250,8 @@ impl NodeState {
}
}
}
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => {
BeaconConsensusEngineEvent::CanonicalBlockAdded(executed, elapsed) => {
let block = executed.sealed_block();
info!(
number=block.number(),
hash=?block.hash(),
@ -272,7 +273,8 @@ impl NodeState {
info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed");
}
BeaconConsensusEngineEvent::ForkBlockAdded(block, elapsed) => {
BeaconConsensusEngineEvent::ForkBlockAdded(executed, elapsed) => {
let block = executed.sealed_block();
info!(number=block.number(), hash=?block.hash(), ?elapsed, "Block added to fork chain");
}
}