From 4eca2fa1eea6450bc197554afb03a984095a9aca Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 21 Nov 2024 14:28:08 +0100 Subject: [PATCH] chore: rm network event handling from node events (#12736) --- bin/reth/src/commands/debug_cmd/execution.rs | 11 +++-------- crates/node/builder/src/launch/engine.rs | 3 +-- crates/node/builder/src/launch/mod.rs | 4 +--- crates/node/events/src/node.rs | 19 +------------------ 4 files changed, 6 insertions(+), 31 deletions(-) diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index 0210142be..dd060ac2a 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -4,7 +4,7 @@ use crate::{args::NetworkArgs, utils::get_single_header}; use alloy_eips::BlockHashOrNumber; use alloy_primitives::{BlockNumber, B256}; use clap::Parser; -use futures::{stream::select as stream_select, StreamExt}; +use futures::StreamExt; use reth_beacon_consensus::EthBeaconConsensus; use reth_chainspec::ChainSpec; use reth_cli::chainspec::ChainSpecParser; @@ -19,7 +19,7 @@ use reth_downloaders::{ headers::reverse_headers::ReverseHeadersDownloaderBuilder, }; use reth_exex::ExExManagerHandle; -use reth_network::{BlockDownloaderProvider, NetworkEventListenerProvider, NetworkHandle}; +use reth_network::{BlockDownloaderProvider, NetworkHandle}; use reth_network_api::NetworkInfo; use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient}; use reth_node_api::NodeTypesWithDBAdapter; @@ -207,17 +207,12 @@ impl> Command { return Ok(()) } - let pipeline_events = pipeline.events(); - let events = stream_select( - network.event_listener().map(Into::into), - pipeline_events.map(Into::into), - ); ctx.task_executor.spawn_critical( "events task", reth_node_events::node::handle_events( Some(Box::new(network)), latest_block_number, - events, + pipeline.events().map(Into::into), ), ); diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 5a8405047..f485be2c2 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -17,7 +17,7 @@ use reth_engine_tree::{ use reth_engine_util::EngineMessageStreamExt; use reth_exex::ExExManagerHandle; use reth_network::{NetworkSyncUpdater, SyncState}; -use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider}; +use reth_network_api::BlockDownloaderProvider; use reth_node_api::{ BuiltPayload, FullNodePrimitives, FullNodeTypes, NodeTypesWithEngine, PayloadAttributesBuilder, PayloadBuilder, PayloadTypes, @@ -256,7 +256,6 @@ where info!(target: "reth::cli", "Consensus engine initialized"); let events = stream_select!( - ctx.components().network().event_listener().map(Into::into), beacon_engine_handle.event_listener().map(Into::into), pipeline_events.map(Into::into), if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index c4146f483..be317e4be 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -21,7 +21,7 @@ use reth_chainspec::EthChainSpec; use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider}; use reth_engine_util::EngineMessageStreamExt; use reth_exex::ExExManagerHandle; -use reth_network::{BlockDownloaderProvider, NetworkEventListenerProvider}; +use reth_network::BlockDownloaderProvider; use reth_node_api::{AddOnsContext, FullNodePrimitives, FullNodeTypes, NodeTypesWithEngine}; use reth_node_core::{ dirs::{ChainPath, DataDirPath}, @@ -262,8 +262,6 @@ where info!(target: "reth::cli", "Consensus engine initialized"); let events = stream_select!( - ctx.components().network().event_listener().map(Into::into), - beacon_engine_handle.event_listener().map(Into::into), pipeline_events.map(Into::into), if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() { Either::Left( diff --git a/crates/node/events/src/node.rs b/crates/node/events/src/node.rs index 4528bdeaa..edd85501e 100644 --- a/crates/node/events/src/node.rs +++ b/crates/node/events/src/node.rs @@ -7,7 +7,7 @@ use alloy_rpc_types_engine::ForkchoiceState; use futures::Stream; use reth_beacon_consensus::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress}; use reth_engine_primitives::ForkchoiceStatus; -use reth_network_api::{NetworkEvent, PeersInfo}; +use reth_network_api::PeersInfo; use reth_primitives_traits::{format_gas, format_gas_throughput}; use reth_prune_types::PrunerEvent; use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId}; @@ -211,12 +211,6 @@ impl NodeState { } } - fn handle_network_event(&self, _: NetworkEvent) { - // NOTE(onbjerg): This used to log established/disconnecting sessions, but this is already - // logged in the networking component. I kept this stub in case we want to catch other - // networking events later on. - } - fn handle_consensus_engine_event(&mut self, event: BeaconConsensusEngineEvent) { match event { BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => { @@ -358,8 +352,6 @@ struct CurrentStage { /// A node event. #[derive(Debug)] pub enum NodeEvent { - /// A network event. - Network(NetworkEvent), /// A sync pipeline event. Pipeline(PipelineEvent), /// A consensus engine event. @@ -375,12 +367,6 @@ pub enum NodeEvent { Other(String), } -impl From for NodeEvent { - fn from(event: NetworkEvent) -> Self { - Self::Network(event) - } -} - impl From for NodeEvent { fn from(event: PipelineEvent) -> Self { Self::Pipeline(event) @@ -527,9 +513,6 @@ where while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) { match event { - NodeEvent::Network(event) => { - this.state.handle_network_event(event); - } NodeEvent::Pipeline(event) => { this.state.handle_pipeline_event(event); }