chore: make NodeEvent generic over NodePrimitives (#13534)

This commit is contained in:
Arsenii Kulikov
2024-12-24 03:31:59 +04:00
committed by GitHub
parent af1c9b7614
commit 934fd1f7f0
6 changed files with 32 additions and 56 deletions

1
Cargo.lock generated
View File

@ -8315,6 +8315,7 @@ dependencies = [
"alloy-eips", "alloy-eips",
"alloy-primitives", "alloy-primitives",
"alloy-rpc-types-engine", "alloy-rpc-types-engine",
"derive_more",
"futures", "futures",
"humantime", "humantime",
"pin-project", "pin-project",

View File

@ -24,6 +24,7 @@ use reth_network_api::NetworkInfo;
use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient}; use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient};
use reth_node_api::NodeTypesWithDBAdapter; use reth_node_api::NodeTypesWithDBAdapter;
use reth_node_ethereum::EthExecutorProvider; use reth_node_ethereum::EthExecutorProvider;
use reth_node_events::node::NodeEvent;
use reth_provider::{ use reth_provider::{
providers::ProviderNodeTypes, ChainSpecProvider, ProviderFactory, StageCheckpointReader, providers::ProviderNodeTypes, ChainSpecProvider, ProviderFactory, StageCheckpointReader,
}; };
@ -211,7 +212,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
reth_node_events::node::handle_events( reth_node_events::node::handle_events(
Some(Box::new(network)), Some(Box::new(network)),
latest_block_number, latest_block_number,
pipeline.events().map(Into::into), pipeline.events().map(Into::<NodeEvent<N::Primitives>>::into),
), ),
); );

View File

@ -165,7 +165,7 @@ pub fn build_import_pipeline<N, C, E>(
static_file_producer: StaticFileProducer<ProviderFactory<N>>, static_file_producer: StaticFileProducer<ProviderFactory<N>>,
disable_exec: bool, disable_exec: bool,
executor: E, executor: E,
) -> eyre::Result<(Pipeline<N>, impl Stream<Item = NodeEvent>)> ) -> eyre::Result<(Pipeline<N>, impl Stream<Item = NodeEvent<N::Primitives>>)>
where where
N: ProviderNodeTypes + CliNodeTypes, N: ProviderNodeTypes + CliNodeTypes,
C: Consensus + 'static, C: Consensus + 'static,

View File

@ -34,7 +34,7 @@ use reth_node_core::{
dirs::{ChainPath, DataDirPath}, dirs::{ChainPath, DataDirPath},
exit::NodeExitFuture, exit::NodeExitFuture,
}; };
use reth_node_events::{cl::ConsensusLayerHealthEvents, node}; use reth_node_events::{cl::ConsensusLayerHealthEvents, node, node::NodeEvent};
use reth_provider::providers::{BlockchainProvider, NodeTypesForTree}; use reth_provider::providers::{BlockchainProvider, NodeTypesForTree};
use reth_rpc::eth::RpcNodeCore; use reth_rpc::eth::RpcNodeCore;
use reth_tasks::TaskExecutor; use reth_tasks::TaskExecutor;
@ -292,7 +292,7 @@ where
info!(target: "reth::cli", "Consensus engine initialized"); info!(target: "reth::cli", "Consensus engine initialized");
let events = stream_select!( let events = stream_select!(
pipeline_events.map(Into::into), pipeline_events.map(Into::<NodeEvent<Types::Primitives>>::into),
if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
Either::Left( Either::Left(
ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone())) ConsensusLayerHealthEvents::new(Box::new(ctx.blockchain_db().clone()))

View File

@ -38,3 +38,4 @@ tracing.workspace = true
# misc # misc
pin-project.workspace = true pin-project.workspace = true
humantime.workspace = true humantime.workspace = true
derive_more.workspace = true

View File

@ -1,14 +1,14 @@
//! Support for handling events emitted by node components. //! Support for handling events emitted by node components.
use crate::cl::ConsensusLayerHealthEvent; use crate::cl::ConsensusLayerHealthEvent;
use alloy_consensus::constants::GWEI_TO_WEI; use alloy_consensus::{constants::GWEI_TO_WEI, BlockHeader};
use alloy_primitives::{BlockNumber, B256}; use alloy_primitives::{BlockNumber, B256};
use alloy_rpc_types_engine::ForkchoiceState; use alloy_rpc_types_engine::ForkchoiceState;
use futures::Stream; use futures::Stream;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress}; use reth_beacon_consensus::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};
use reth_engine_primitives::ForkchoiceStatus; use reth_engine_primitives::ForkchoiceStatus;
use reth_network_api::PeersInfo; use reth_network_api::PeersInfo;
use reth_primitives_traits::{format_gas, format_gas_throughput}; use reth_primitives_traits::{format_gas, format_gas_throughput, BlockBody, NodePrimitives};
use reth_prune_types::PrunerEvent; use reth_prune_types::PrunerEvent;
use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId}; use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId};
use reth_static_file_types::StaticFileProducerEvent; use reth_static_file_types::StaticFileProducerEvent;
@ -211,7 +211,10 @@ impl NodeState {
} }
} }
fn handle_consensus_engine_event(&mut self, event: BeaconConsensusEngineEvent) { fn handle_consensus_engine_event<N: NodePrimitives>(
&mut self,
event: BeaconConsensusEngineEvent<N>,
) {
match event { match event {
BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => { BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } = let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
@ -248,28 +251,28 @@ impl NodeState {
} }
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => { BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => {
info!( info!(
number=block.number, number=block.number(),
hash=?block.hash(), hash=?block.hash(),
peers=self.num_connected_peers(), peers=self.num_connected_peers(),
txs=block.body.transactions.len(), txs=block.body.transactions().len(),
gas=%format_gas(block.header.gas_used), gas=%format_gas(block.header.gas_used()),
gas_throughput=%format_gas_throughput(block.header.gas_used, elapsed), gas_throughput=%format_gas_throughput(block.header.gas_used(), elapsed),
full=%format!("{:.1}%", block.header.gas_used as f64 * 100.0 / block.header.gas_limit as f64), full=%format!("{:.1}%", block.header.gas_used() as f64 * 100.0 / block.header.gas_limit() as f64),
base_fee=%format!("{:.2}gwei", block.header.base_fee_per_gas.unwrap_or(0) as f64 / GWEI_TO_WEI as f64), base_fee=%format!("{:.2}gwei", block.header.base_fee_per_gas().unwrap_or(0) as f64 / GWEI_TO_WEI as f64),
blobs=block.header.blob_gas_used.unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB, blobs=block.header.blob_gas_used().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
excess_blobs=block.header.excess_blob_gas.unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB, excess_blobs=block.header.excess_blob_gas().unwrap_or(0) / alloy_eips::eip4844::DATA_GAS_PER_BLOB,
?elapsed, ?elapsed,
"Block added to canonical chain" "Block added to canonical chain"
); );
} }
BeaconConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => { BeaconConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
self.latest_block = Some(head.number); self.latest_block = Some(head.number());
self.latest_block_time = Some(head.timestamp); self.latest_block_time = Some(head.timestamp());
info!(number=head.number, hash=?head.hash(), ?elapsed, "Canonical chain committed"); info!(number=head.number(), hash=?head.hash(), ?elapsed, "Canonical chain committed");
} }
BeaconConsensusEngineEvent::ForkBlockAdded(block, elapsed) => { BeaconConsensusEngineEvent::ForkBlockAdded(block, elapsed) => {
info!(number=block.number, hash=?block.hash(), ?elapsed, "Block added to fork chain"); info!(number=block.number(), hash=?block.hash(), ?elapsed, "Block added to fork chain");
} }
} }
} }
@ -350,12 +353,12 @@ struct CurrentStage {
} }
/// A node event. /// A node event.
#[derive(Debug)] #[derive(Debug, derive_more::From)]
pub enum NodeEvent { pub enum NodeEvent<N: NodePrimitives> {
/// A sync pipeline event. /// A sync pipeline event.
Pipeline(PipelineEvent), Pipeline(PipelineEvent),
/// A consensus engine event. /// A consensus engine event.
ConsensusEngine(BeaconConsensusEngineEvent), ConsensusEngine(BeaconConsensusEngineEvent<N>),
/// A Consensus Layer health event. /// A Consensus Layer health event.
ConsensusLayerHealth(ConsensusLayerHealthEvent), ConsensusLayerHealth(ConsensusLayerHealthEvent),
/// A pruner event /// A pruner event
@ -367,44 +370,14 @@ pub enum NodeEvent {
Other(String), Other(String),
} }
impl From<PipelineEvent> for NodeEvent {
fn from(event: PipelineEvent) -> Self {
Self::Pipeline(event)
}
}
impl From<BeaconConsensusEngineEvent> for NodeEvent {
fn from(event: BeaconConsensusEngineEvent) -> Self {
Self::ConsensusEngine(event)
}
}
impl From<ConsensusLayerHealthEvent> for NodeEvent {
fn from(event: ConsensusLayerHealthEvent) -> Self {
Self::ConsensusLayerHealth(event)
}
}
impl From<PrunerEvent> for NodeEvent {
fn from(event: PrunerEvent) -> Self {
Self::Pruner(event)
}
}
impl From<StaticFileProducerEvent> for NodeEvent {
fn from(event: StaticFileProducerEvent) -> Self {
Self::StaticFileProducer(event)
}
}
/// Displays relevant information to the user from components of the node, and periodically /// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node. /// displays the high-level status of the node.
pub async fn handle_events<E>( pub async fn handle_events<E, N: NodePrimitives>(
peers_info: Option<Box<dyn PeersInfo>>, peers_info: Option<Box<dyn PeersInfo>>,
latest_block_number: Option<BlockNumber>, latest_block_number: Option<BlockNumber>,
events: E, events: E,
) where ) where
E: Stream<Item = NodeEvent> + Unpin, E: Stream<Item = NodeEvent<N>> + Unpin,
{ {
let state = NodeState::new(peers_info, latest_block_number); let state = NodeState::new(peers_info, latest_block_number);
@ -426,9 +399,9 @@ struct EventHandler<E> {
info_interval: Interval, info_interval: Interval,
} }
impl<E> Future for EventHandler<E> impl<E, N: NodePrimitives> Future for EventHandler<E>
where where
E: Stream<Item = NodeEvent> + Unpin, E: Stream<Item = NodeEvent<N>> + Unpin,
{ {
type Output = (); type Output = ();