From b48426efdd4b2556b8f1b1eccf40ef95af675963 Mon Sep 17 00:00:00 2001 From: Poulav Bhowmick Date: Sun, 9 Feb 2025 19:34:12 +0530 Subject: [PATCH] added EventSender to FullNode type (#14268) Co-authored-by: Roman Krasiuk Co-authored-by: Matthias Seitz --- Cargo.lock | 1 + crates/node/api/Cargo.toml | 1 + crates/node/api/src/node.rs | 5 ++++- crates/node/builder/src/launch/engine.rs | 7 +++++-- crates/node/builder/src/rpc.rs | 25 +++++++++++++++++++----- crates/tokio-util/src/event_sender.rs | 10 ++++++++-- 6 files changed, 39 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 707e3089c..164719a63 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8077,6 +8077,7 @@ dependencies = [ "reth-payload-primitives", "reth-provider", "reth-tasks", + "reth-tokio-util", "reth-transaction-pool", ] diff --git a/crates/node/api/Cargo.toml b/crates/node/api/Cargo.toml index afa33a273..7bd619631 100644 --- a/crates/node/api/Cargo.toml +++ b/crates/node/api/Cargo.toml @@ -26,6 +26,7 @@ reth-tasks.workspace = true reth-network-api.workspace = true reth-node-types.workspace = true reth-node-core.workspace = true +reth-tokio-util.workspace = true alloy-rpc-types-engine.workspace = true diff --git a/crates/node/api/src/node.rs b/crates/node/api/src/node.rs index 3e2122321..4ef4dccc1 100644 --- a/crates/node/api/src/node.rs +++ b/crates/node/api/src/node.rs @@ -5,7 +5,7 @@ use alloy_rpc_types_engine::JwtSecret; use reth_basic_payload_builder::PayloadBuilder; use reth_consensus::{ConsensusError, FullConsensus}; use reth_db_api::{database_metrics::DatabaseMetrics, Database}; -use reth_engine_primitives::BeaconConsensusEngineHandle; +use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconConsensusEngineHandle}; use reth_evm::{execute::BlockExecutorProvider, ConfigureEvmFor}; use reth_network_api::FullNetwork; use reth_node_core::node_config::NodeConfig; @@ -13,6 +13,7 @@ use reth_node_types::{NodeTypes, NodeTypesWithDBAdapter, NodeTypesWithEngine, Tx use reth_payload_builder::PayloadBuilderHandle; use reth_provider::FullProvider; use reth_tasks::TaskExecutor; +use reth_tokio_util::EventSender; use reth_transaction_pool::{PoolTransaction, TransactionPool}; use std::{future::Future, marker::PhantomData}; @@ -125,6 +126,8 @@ pub struct AddOnsContext<'a, N: FullNodeComponents> { /// Handle to the beacon consensus engine. pub beacon_engine_handle: BeaconConsensusEngineHandle<::Engine>, + /// Notification channel for engine API events + pub engine_events: EventSender::Primitives>>, /// JWT secret for the node. pub jwt_secret: JwtSecret, } diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 445734a4e..de794a258 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -198,6 +198,7 @@ where info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized"); let event_sender = EventSender::default(); + let beacon_engine_handle = BeaconConsensusEngineHandle::new(consensus_engine_tx.clone()); // extract the jwt secret from the args if possible @@ -208,6 +209,7 @@ where config: ctx.node_config(), beacon_engine_handle: beacon_engine_handle.clone(), jwt_secret, + engine_events: event_sender.clone(), }; let engine_payload_validator = add_ons.engine_validator(&add_ons_ctx).await?; @@ -270,6 +272,7 @@ where pruner_events.map(Into::into), static_file_producer_events.map(Into::into), ); + ctx.task_executor().spawn_critical( "events task", node::handle_events( @@ -279,7 +282,7 @@ where ), ); - let RpcHandle { rpc_server_handles, rpc_registry } = + let RpcHandle { rpc_server_handles, rpc_registry, engine_events } = add_ons.launch_add_ons(add_ons_ctx).await?; // TODO: migrate to devmode with https://github.com/paradigmxyz/reth/issues/10104 @@ -403,7 +406,7 @@ where task_executor: ctx.task_executor().clone(), config: ctx.node_config().clone(), data_dir: ctx.data_dir().clone(), - add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry }, + add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry, engine_events }, }; // Notify on node started on_node_started.on_event(FullNode::clone(&full_node))?; diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index ef597c911..f5ac2eb93 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -7,6 +7,7 @@ use std::{ ops::{Deref, DerefMut}, }; +use crate::{BeaconConsensusEngineEvent, EthApiBuilderCtx}; use alloy_rpc_types::engine::ClientVersionV1; use futures::TryFutureExt; use reth_node_api::{ @@ -32,11 +33,10 @@ use reth_rpc_builder::{ }; use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi}; use reth_tasks::TaskExecutor; +use reth_tokio_util::EventSender; use reth_tracing::tracing::{debug, info}; use std::sync::Arc; -use crate::EthApiBuilderCtx; - /// Contains the handles to the spawned RPC servers. /// /// This can be used to access the endpoints of the servers. @@ -301,12 +301,27 @@ where } /// Handle to the launched RPC servers. -#[derive(Clone)] pub struct RpcHandle { /// Handles to launched servers. pub rpc_server_handles: RethRpcServerHandles, /// Configured RPC modules. pub rpc_registry: RpcRegistry, + /// Notification channel for engine API events + /// + /// Caution: This is a multi-producer, multi-consumer broadcast and allows grants access to + /// dispatch events + pub engine_events: + EventSender::Primitives>>, +} + +impl Clone for RpcHandle { + fn clone(&self) -> Self { + Self { + rpc_server_handles: self.rpc_server_handles.clone(), + rpc_registry: self.rpc_registry.clone(), + engine_events: self.engine_events.clone(), + } + } } impl Deref for RpcHandle { @@ -424,7 +439,7 @@ where let Self { eth_api_builder, engine_validator_builder, hooks, _pd: _ } = self; let engine_validator = engine_validator_builder.build(&ctx).await?; - let AddOnsContext { node, config, beacon_engine_handle, jwt_secret } = ctx; + let AddOnsContext { node, config, beacon_engine_handle, jwt_secret, engine_events } = ctx; let client = ClientVersionV1 { code: CLIENT_CODE, @@ -524,7 +539,7 @@ where on_rpc_started.on_rpc_started(ctx, handles.clone())?; - Ok(RpcHandle { rpc_server_handles: handles, rpc_registry: registry }) + Ok(RpcHandle { rpc_server_handles: handles, rpc_registry: registry, engine_events }) } } diff --git a/crates/tokio-util/src/event_sender.rs b/crates/tokio-util/src/event_sender.rs index 16208ee19..b0e6d0a55 100644 --- a/crates/tokio-util/src/event_sender.rs +++ b/crates/tokio-util/src/event_sender.rs @@ -4,8 +4,8 @@ use tracing::trace; const DEFAULT_SIZE_BROADCAST_CHANNEL: usize = 2000; -/// A bounded broadcast channel for a task. -#[derive(Debug, Clone)] +/// A bounded multi-producer, multi-consumer broadcast channel. +#[derive(Debug)] pub struct EventSender { /// The sender part of the broadcast channel sender: Sender, @@ -20,6 +20,12 @@ where } } +impl Clone for EventSender { + fn clone(&self) -> Self { + Self { sender: self.sender.clone() } + } +} + impl EventSender { /// Creates a new `EventSender`. pub fn new(events_channel_size: usize) -> Self {