chore: rm network event handling from node events (#12736)

This commit is contained in:
Matthias Seitz
2024-11-21 14:28:08 +01:00
committed by GitHub
parent fa7ad036ea
commit 4eca2fa1ee
4 changed files with 6 additions and 31 deletions

View File

@ -4,7 +4,7 @@ use crate::{args::NetworkArgs, utils::get_single_header};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{BlockNumber, B256};
use clap::Parser;
use futures::{stream::select as stream_select, StreamExt};
use futures::StreamExt;
use reth_beacon_consensus::EthBeaconConsensus;
use reth_chainspec::ChainSpec;
use reth_cli::chainspec::ChainSpecParser;
@ -19,7 +19,7 @@ use reth_downloaders::{
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_exex::ExExManagerHandle;
use reth_network::{BlockDownloaderProvider, NetworkEventListenerProvider, NetworkHandle};
use reth_network::{BlockDownloaderProvider, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_network_p2p::{headers::client::HeadersClient, EthBlockClient};
use reth_node_api::NodeTypesWithDBAdapter;
@ -207,17 +207,12 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
return Ok(())
}
let pipeline_events = pipeline.events();
let events = stream_select(
network.event_listener().map(Into::into),
pipeline_events.map(Into::into),
);
ctx.task_executor.spawn_critical(
"events task",
reth_node_events::node::handle_events(
Some(Box::new(network)),
latest_block_number,
events,
pipeline.events().map(Into::into),
),
);

View File

@ -17,7 +17,7 @@ use reth_engine_tree::{
use reth_engine_util::EngineMessageStreamExt;
use reth_exex::ExExManagerHandle;
use reth_network::{NetworkSyncUpdater, SyncState};
use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider};
use reth_network_api::BlockDownloaderProvider;
use reth_node_api::{
BuiltPayload, FullNodePrimitives, FullNodeTypes, NodeTypesWithEngine, PayloadAttributesBuilder,
PayloadBuilder, PayloadTypes,
@ -256,7 +256,6 @@ where
info!(target: "reth::cli", "Consensus engine initialized");
let events = stream_select!(
ctx.components().network().event_listener().map(Into::into),
beacon_engine_handle.event_listener().map(Into::into),
pipeline_events.map(Into::into),
if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {

View File

@ -21,7 +21,7 @@ use reth_chainspec::EthChainSpec;
use reth_consensus_debug_client::{DebugConsensusClient, EtherscanBlockProvider, RpcBlockProvider};
use reth_engine_util::EngineMessageStreamExt;
use reth_exex::ExExManagerHandle;
use reth_network::{BlockDownloaderProvider, NetworkEventListenerProvider};
use reth_network::BlockDownloaderProvider;
use reth_node_api::{AddOnsContext, FullNodePrimitives, FullNodeTypes, NodeTypesWithEngine};
use reth_node_core::{
dirs::{ChainPath, DataDirPath},
@ -262,8 +262,6 @@ where
info!(target: "reth::cli", "Consensus engine initialized");
let events = stream_select!(
ctx.components().network().event_listener().map(Into::into),
beacon_engine_handle.event_listener().map(Into::into),
pipeline_events.map(Into::into),
if ctx.node_config().debug.tip.is_none() && !ctx.is_dev() {
Either::Left(

View File

@ -7,7 +7,7 @@ use alloy_rpc_types_engine::ForkchoiceState;
use futures::Stream;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};
use reth_engine_primitives::ForkchoiceStatus;
use reth_network_api::{NetworkEvent, PeersInfo};
use reth_network_api::PeersInfo;
use reth_primitives_traits::{format_gas, format_gas_throughput};
use reth_prune_types::PrunerEvent;
use reth_stages::{EntitiesCheckpoint, ExecOutput, PipelineEvent, StageCheckpoint, StageId};
@ -211,12 +211,6 @@ impl NodeState {
}
}
fn handle_network_event(&self, _: NetworkEvent) {
// NOTE(onbjerg): This used to log established/disconnecting sessions, but this is already
// logged in the networking component. I kept this stub in case we want to catch other
// networking events later on.
}
fn handle_consensus_engine_event(&mut self, event: BeaconConsensusEngineEvent) {
match event {
BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
@ -358,8 +352,6 @@ struct CurrentStage {
/// A node event.
#[derive(Debug)]
pub enum NodeEvent {
/// A network event.
Network(NetworkEvent),
/// A sync pipeline event.
Pipeline(PipelineEvent),
/// A consensus engine event.
@ -375,12 +367,6 @@ pub enum NodeEvent {
Other(String),
}
impl From<NetworkEvent> for NodeEvent {
fn from(event: NetworkEvent) -> Self {
Self::Network(event)
}
}
impl From<PipelineEvent> for NodeEvent {
fn from(event: PipelineEvent) -> Self {
Self::Pipeline(event)
@ -527,9 +513,6 @@ where
while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {
match event {
NodeEvent::Network(event) => {
this.state.handle_network_event(event);
}
NodeEvent::Pipeline(event) => {
this.state.handle_pipeline_event(event);
}