added EventSender to FullNode type (#14268)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Poulav Bhowmick
2025-02-09 19:34:12 +05:30
committed by GitHub
parent 197d6edee9
commit b48426efdd
6 changed files with 39 additions and 10 deletions

1
Cargo.lock generated
View File

@ -8077,6 +8077,7 @@ dependencies = [
"reth-payload-primitives",
"reth-provider",
"reth-tasks",
"reth-tokio-util",
"reth-transaction-pool",
]

View File

@ -26,6 +26,7 @@ reth-tasks.workspace = true
reth-network-api.workspace = true
reth-node-types.workspace = true
reth-node-core.workspace = true
reth-tokio-util.workspace = true
alloy-rpc-types-engine.workspace = true

View File

@ -5,7 +5,7 @@ use alloy_rpc_types_engine::JwtSecret;
use reth_basic_payload_builder::PayloadBuilder;
use reth_consensus::{ConsensusError, FullConsensus};
use reth_db_api::{database_metrics::DatabaseMetrics, Database};
use reth_engine_primitives::BeaconConsensusEngineHandle;
use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconConsensusEngineHandle};
use reth_evm::{execute::BlockExecutorProvider, ConfigureEvmFor};
use reth_network_api::FullNetwork;
use reth_node_core::node_config::NodeConfig;
@ -13,6 +13,7 @@ use reth_node_types::{NodeTypes, NodeTypesWithDBAdapter, NodeTypesWithEngine, Tx
use reth_payload_builder::PayloadBuilderHandle;
use reth_provider::FullProvider;
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventSender;
use reth_transaction_pool::{PoolTransaction, TransactionPool};
use std::{future::Future, marker::PhantomData};
@ -125,6 +126,8 @@ pub struct AddOnsContext<'a, N: FullNodeComponents> {
/// Handle to the beacon consensus engine.
pub beacon_engine_handle:
BeaconConsensusEngineHandle<<N::Types as NodeTypesWithEngine>::Engine>,
/// Notification channel for engine API events
pub engine_events: EventSender<BeaconConsensusEngineEvent<<N::Types as NodeTypes>::Primitives>>,
/// JWT secret for the node.
pub jwt_secret: JwtSecret,
}

View File

@ -198,6 +198,7 @@ where
info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
let event_sender = EventSender::default();
let beacon_engine_handle = BeaconConsensusEngineHandle::new(consensus_engine_tx.clone());
// extract the jwt secret from the args if possible
@ -208,6 +209,7 @@ where
config: ctx.node_config(),
beacon_engine_handle: beacon_engine_handle.clone(),
jwt_secret,
engine_events: event_sender.clone(),
};
let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?;
@ -270,6 +272,7 @@ where
pruner_events.map(Into::into),
static_file_producer_events.map(Into::into),
);
ctx.task_executor().spawn_critical(
"events task",
node::handle_events(
@ -279,7 +282,7 @@ where
),
);
let RpcHandle { rpc_server_handles, rpc_registry } =
let RpcHandle { rpc_server_handles, rpc_registry, engine_events } =
add_ons.launch_add_ons(add_ons_ctx).await?;
// TODO: migrate to devmode with https://github.com/paradigmxyz/reth/issues/10104
@ -403,7 +406,7 @@ where
task_executor: ctx.task_executor().clone(),
config: ctx.node_config().clone(),
data_dir: ctx.data_dir().clone(),
add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry },
add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry, engine_events },
};
// Notify on node started
on_node_started.on_event(FullNode::clone(&full_node))?;

View File

@ -7,6 +7,7 @@ use std::{
ops::{Deref, DerefMut},
};
use crate::{BeaconConsensusEngineEvent, EthApiBuilderCtx};
use alloy_rpc_types::engine::ClientVersionV1;
use futures::TryFutureExt;
use reth_node_api::{
@ -32,11 +33,10 @@ use reth_rpc_builder::{
};
use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventSender;
use reth_tracing::tracing::{debug, info};
use std::sync::Arc;
use crate::EthApiBuilderCtx;
/// Contains the handles to the spawned RPC servers.
///
/// This can be used to access the endpoints of the servers.
@ -301,12 +301,27 @@ where
}
/// Handle to the launched RPC servers.
#[derive(Clone)]
pub struct RpcHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
/// Handles to launched servers.
pub rpc_server_handles: RethRpcServerHandles,
/// Configured RPC modules.
pub rpc_registry: RpcRegistry<Node, EthApi>,
/// Notification channel for engine API events
///
/// Caution: This is a multi-producer, multi-consumer broadcast and allows grants access to
/// dispatch events
pub engine_events:
EventSender<BeaconConsensusEngineEvent<<Node::Types as NodeTypes>::Primitives>>,
}
impl<Node: FullNodeComponents, EthApi: EthApiTypes> Clone for RpcHandle<Node, EthApi> {
fn clone(&self) -> Self {
Self {
rpc_server_handles: self.rpc_server_handles.clone(),
rpc_registry: self.rpc_registry.clone(),
engine_events: self.engine_events.clone(),
}
}
}
impl<Node: FullNodeComponents, EthApi: EthApiTypes> Deref for RpcHandle<Node, EthApi> {
@ -424,7 +439,7 @@ where
let Self { eth_api_builder, engine_validator_builder, hooks, _pd: _ } = self;
let engine_validator = engine_validator_builder.build(&ctx).await?;
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret } = ctx;
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events } = ctx;
let client = ClientVersionV1 {
code: CLIENT_CODE,
@ -524,7 +539,7 @@ where
on_rpc_started.on_rpc_started(ctx, handles.clone())?;
Ok(RpcHandle { rpc_server_handles: handles, rpc_registry: registry })
Ok(RpcHandle { rpc_server_handles: handles, rpc_registry: registry, engine_events })
}
}

View File

@ -4,8 +4,8 @@ use tracing::trace;
const DEFAULT_SIZE_BROADCAST_CHANNEL: usize = 2000;
/// A bounded broadcast channel for a task.
#[derive(Debug, Clone)]
/// A bounded multi-producer, multi-consumer broadcast channel.
#[derive(Debug)]
pub struct EventSender<T> {
/// The sender part of the broadcast channel
sender: Sender<T>,
@ -20,6 +20,12 @@ where
}
}
impl<T> Clone for EventSender<T> {
fn clone(&self) -> Self {
Self { sender: self.sender.clone() }
}
}
impl<T: Clone + Send + Sync + 'static> EventSender<T> {
/// Creates a new `EventSender`.
pub fn new(events_channel_size: usize) -> Self {