feat: move RPC launch to add-ons (#11532)

This commit is contained in:
Arsenii Kulikov
2024-10-15 18:51:40 +04:00
committed by GitHub
parent a235f7214c
commit 6fb271036d
27 changed files with 547 additions and 518 deletions

View File

@ -1,8 +1,8 @@
//! Node add-ons. Depend on core [`NodeComponents`](crate::NodeComponents).
use reth_node_api::{EthApiTypes, FullNodeComponents, NodeAddOns};
use reth_node_api::{FullNodeComponents, NodeAddOns};
use crate::{exex::BoxedLaunchExEx, hooks::NodeHooks, rpc::RpcHooks};
use crate::{exex::BoxedLaunchExEx, hooks::NodeHooks};
/// Additional node extensions.
///
@ -12,16 +12,6 @@ pub struct AddOns<Node: FullNodeComponents, AddOns: NodeAddOns<Node>> {
pub hooks: NodeHooks<Node, AddOns>,
/// The `ExExs` (execution extensions) of the node.
pub exexs: Vec<(String, Box<dyn BoxedLaunchExEx<Node>>)>,
/// Additional RPC add-ons.
pub rpc: RpcAddOns<Node, AddOns::EthApi>,
/// Additional captured addons.
pub addons: AddOns,
}
/// Captures node specific addons that can be installed on top of the type configured node and are
/// required for launching the node, such as RPC.
#[derive(Default)]
pub struct RpcAddOns<Node: FullNodeComponents, EthApi: EthApiTypes> {
/// Additional RPC hooks.
pub hooks: RpcHooks<Node, EthApi>,
pub add_ons: AddOns,
}

View File

@ -13,7 +13,7 @@ use crate::{
common::WithConfigs,
components::NodeComponentsBuilder,
node::FullNode,
rpc::{EthApiBuilderProvider, RethRpcServerHandles, RpcContext},
rpc::{RethRpcAddOns, RethRpcServerHandles, RpcContext},
DefaultNodeLauncher, LaunchNode, Node, NodeHandle,
};
use futures::Future;
@ -37,7 +37,6 @@ use reth_node_core::{
dirs::{ChainPath, DataDirPath},
node_config::NodeConfig,
primitives::Head,
rpc::eth::{helpers::AddDevSigners, FullEthApiServer},
};
use reth_primitives::revm_primitives::EnvKzgSettings;
use reth_provider::{providers::BlockchainProvider, ChainSpecProvider, FullProvider};
@ -358,19 +357,11 @@ where
>
where
N: Node<RethFullAdapter<DB, N>, ChainSpec = ChainSpec>,
N::AddOns: NodeAddOns<
N::AddOns: RethRpcAddOns<
NodeAdapter<
RethFullAdapter<DB, N>,
<N::ComponentsBuilder as NodeComponentsBuilder<RethFullAdapter<DB, N>>>::Components,
>,
EthApi: EthApiBuilderProvider<
NodeAdapter<
RethFullAdapter<DB, N>,
<N::ComponentsBuilder as NodeComponentsBuilder<RethFullAdapter<DB, N>>>::Components,
>
>
+ FullEthApiServer
+ AddDevSigners
>,
{
self.node(node).launch().await
@ -418,7 +409,7 @@ impl<T, CB, AO> WithLaunchContext<NodeBuilderWithComponents<T, CB, AO>>
where
T: FullNodeTypes,
CB: NodeComponentsBuilder<T>,
AO: NodeAddOns<NodeAdapter<T, CB::Components>, EthApi: FullEthApiServer + AddDevSigners>,
AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>,
{
/// Returns a reference to the node builder's config.
pub const fn config(&self) -> &NodeConfig<<T::Types as NodeTypes>::ChainSpec> {
@ -466,6 +457,14 @@ where
Self { builder: self.builder.on_node_started(hook), task_executor: self.task_executor }
}
/// Modifies the addons with the given closure.
pub fn map_add_ons<F>(self, f: F) -> Self
where
F: FnOnce(AO) -> AO,
{
Self { builder: self.builder.map_add_ons(f), task_executor: self.task_executor }
}
/// Sets the hook that is run once the rpc server is started.
pub fn on_rpc_started<F>(self, hook: F) -> Self
where
@ -553,12 +552,7 @@ where
DB: Database + DatabaseMetrics + DatabaseMetadata + Clone + Unpin + 'static,
T: NodeTypesWithEngine<ChainSpec: EthereumHardforks + EthChainSpec>,
CB: NodeComponentsBuilder<RethFullAdapter<DB, T>>,
AO: NodeAddOns<
NodeAdapter<RethFullAdapter<DB, T>, CB::Components>,
EthApi: EthApiBuilderProvider<NodeAdapter<RethFullAdapter<DB, T>, CB::Components>>
+ FullEthApiServer
+ AddDevSigners,
>,
AO: RethRpcAddOns<NodeAdapter<RethFullAdapter<DB, T>, CB::Components>>,
{
/// Launches the node with the [`DefaultNodeLauncher`] that sets up engine API consensus and rpc
pub async fn launch(

View File

@ -11,10 +11,7 @@ use reth_exex::ExExContext;
use reth_node_api::{
FullNodeComponents, FullNodeTypes, NodeAddOns, NodeTypes, NodeTypesWithDB, NodeTypesWithEngine,
};
use reth_node_core::{
node_config::NodeConfig,
rpc::eth::{helpers::AddDevSigners, FullEthApiServer},
};
use reth_node_core::node_config::NodeConfig;
use reth_payload_builder::PayloadBuilderHandle;
use reth_tasks::TaskExecutor;
@ -22,8 +19,8 @@ use crate::{
components::{NodeComponents, NodeComponentsBuilder},
hooks::NodeHooks,
launch::LaunchNode,
rpc::{EthApiBuilderProvider, RethRpcServerHandles, RpcContext, RpcHooks},
AddOns, FullNode, RpcAddOns,
rpc::{RethRpcAddOns, RethRpcServerHandles, RpcContext},
AddOns, FullNode,
};
/// A node builder that also has the configured types.
@ -54,12 +51,7 @@ impl<T: FullNodeTypes> NodeBuilderWithTypes<T> {
config,
adapter,
components_builder,
add_ons: AddOns {
hooks: NodeHooks::default(),
rpc: RpcAddOns { hooks: RpcHooks::default() },
exexs: Vec::new(),
addons: (),
},
add_ons: AddOns { hooks: NodeHooks::default(), exexs: Vec::new(), add_ons: () },
}
}
}
@ -83,8 +75,8 @@ impl<T: FullNodeTypes> fmt::Debug for NodeTypesAdapter<T> {
}
}
/// Container for the node's types and the components and other internals that can be used by addons
/// of the node.
/// Container for the node's types and the components and other internals that can be used by
/// addons of the node.
pub struct NodeAdapter<T: FullNodeTypes, C: NodeComponents<T>> {
/// The components of the node.
pub components: C,
@ -104,6 +96,8 @@ impl<T: FullNodeTypes, C: NodeComponents<T>> FullNodeComponents for NodeAdapter<
type Evm = C::Evm;
type Executor = C::Executor;
type Network = C::Network;
type Consensus = C::Consensus;
type EngineValidator = C::EngineValidator;
fn pool(&self) -> &Self::Pool {
self.components.pool()
@ -132,6 +126,14 @@ impl<T: FullNodeTypes, C: NodeComponents<T>> FullNodeComponents for NodeAdapter<
fn task_executor(&self) -> &TaskExecutor {
&self.task_executor
}
fn consensus(&self) -> &Self::Consensus {
self.components.consensus()
}
fn engine_validator(&self) -> &Self::EngineValidator {
self.components.engine_validator()
}
}
impl<T: FullNodeTypes, C: NodeComponents<T>> Clone for NodeAdapter<T, C> {
@ -169,7 +171,7 @@ where
{
/// Advances the state of the node builder to the next state where all customizable
/// [`NodeAddOns`] types are configured.
pub fn with_add_ons<AO>(self, addons: AO) -> NodeBuilderWithComponents<T, CB, AO>
pub fn with_add_ons<AO>(self, add_ons: AO) -> NodeBuilderWithComponents<T, CB, AO>
where
AO: NodeAddOns<NodeAdapter<T, CB::Components>>,
{
@ -179,12 +181,7 @@ where
config,
adapter,
components_builder,
add_ons: AddOns {
hooks: NodeHooks::default(),
rpc: RpcAddOns { hooks: RpcHooks::default() },
exexs: Vec::new(),
addons,
},
add_ons: AddOns { hooks: NodeHooks::default(), exexs: Vec::new(), add_ons },
}
}
}
@ -215,31 +212,6 @@ where
self
}
/// Sets the hook that is run once the rpc server is started.
pub fn on_rpc_started<F>(mut self, hook: F) -> Self
where
F: FnOnce(
RpcContext<'_, NodeAdapter<T, CB::Components>, AO::EthApi>,
RethRpcServerHandles,
) -> eyre::Result<()>
+ Send
+ 'static,
{
self.add_ons.rpc.hooks.set_on_rpc_started(hook);
self
}
/// Sets the hook that is run to configure the rpc modules.
pub fn extend_rpc_modules<F>(mut self, hook: F) -> Self
where
F: FnOnce(RpcContext<'_, NodeAdapter<T, CB::Components>, AO::EthApi>) -> eyre::Result<()>
+ Send
+ 'static,
{
self.add_ons.rpc.hooks.set_extend_rpc_modules(hook);
self
}
/// Installs an `ExEx` (Execution Extension) in the node.
///
/// # Note
@ -269,18 +241,22 @@ where
pub const fn check_launch(self) -> Self {
self
}
/// Modifies the addons with the given closure.
pub fn map_add_ons<F>(mut self, f: F) -> Self
where
F: FnOnce(AO) -> AO,
{
self.add_ons.add_ons = f(self.add_ons.add_ons);
self
}
}
impl<T, CB, AO> NodeBuilderWithComponents<T, CB, AO>
where
T: FullNodeTypes,
CB: NodeComponentsBuilder<T>,
AO: NodeAddOns<
NodeAdapter<T, CB::Components>,
EthApi: EthApiBuilderProvider<NodeAdapter<T, CB::Components>>
+ FullEthApiServer
+ AddDevSigners,
>,
AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>,
{
/// Launches the node with the given launcher.
pub async fn launch_with<L>(self, launcher: L) -> eyre::Result<L::Node>
@ -289,4 +265,33 @@ where
{
launcher.launch_node(self).await
}
/// Sets the hook that is run once the rpc server is started.
pub fn on_rpc_started<F>(self, hook: F) -> Self
where
F: FnOnce(
RpcContext<'_, NodeAdapter<T, CB::Components>, AO::EthApi>,
RethRpcServerHandles,
) -> eyre::Result<()>
+ Send
+ 'static,
{
self.map_add_ons(|mut add_ons| {
add_ons.hooks_mut().set_on_rpc_started(hook);
add_ons
})
}
/// Sets the hook that is run to configure the rpc modules.
pub fn extend_rpc_modules<F>(self, hook: F) -> Self
where
F: FnOnce(RpcContext<'_, NodeAdapter<T, CB::Components>, AO::EthApi>) -> eyre::Result<()>
+ Send
+ 'static,
{
self.map_add_ons(|mut add_ons| {
add_ons.hooks_mut().set_extend_rpc_modules(hook);
add_ons
})
}
}

View File

@ -1,13 +1,13 @@
use std::fmt;
use reth_node_api::{FullNodeComponents, NodeAddOns};
use reth_node_api::FullNodeComponents;
use reth_node_core::exit::NodeExitFuture;
use crate::node::FullNode;
use crate::{node::FullNode, rpc::RethRpcAddOns};
/// A Handle to the launched node.
#[must_use = "Needs to await the node exit future"]
pub struct NodeHandle<Node: FullNodeComponents, AddOns: NodeAddOns<Node>> {
pub struct NodeHandle<Node: FullNodeComponents, AddOns: RethRpcAddOns<Node>> {
/// All node components.
pub node: FullNode<Node, AddOns>,
/// The exit future of the node.
@ -17,7 +17,7 @@ pub struct NodeHandle<Node: FullNodeComponents, AddOns: NodeAddOns<Node>> {
impl<Node, AddOns> NodeHandle<Node, AddOns>
where
Node: FullNodeComponents,
AddOns: NodeAddOns<Node>,
AddOns: RethRpcAddOns<Node>,
{
/// Waits for the node to exit, if it was configured to exit.
pub async fn wait_for_node_exit(self) -> eyre::Result<()> {
@ -28,7 +28,7 @@ where
impl<Node, AddOns> fmt::Debug for NodeHandle<Node, AddOns>
where
Node: FullNodeComponents,
AddOns: NodeAddOns<Node>,
AddOns: RethRpcAddOns<Node>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("NodeHandle")

View File

@ -1,6 +1,5 @@
//! Engine node related functionality.
use alloy_rpc_types::engine::ClientVersionV1;
use futures::{future::Either, stream, stream_select, StreamExt};
use reth_beacon_consensus::{
hooks::{EngineHooks, StaticFileHook},
@ -20,21 +19,17 @@ use reth_exex::ExExManagerHandle;
use reth_network::{NetworkSyncUpdater, SyncState};
use reth_network_api::{BlockDownloaderProvider, NetworkEventListenerProvider};
use reth_node_api::{
BuiltPayload, FullNodeTypes, NodeAddOns, NodeTypesWithEngine, PayloadAttributesBuilder,
PayloadTypes,
BuiltPayload, FullNodeTypes, NodeTypesWithEngine, PayloadAttributesBuilder, PayloadTypes,
};
use reth_node_core::{
dirs::{ChainPath, DataDirPath},
exit::NodeExitFuture,
primitives::Head,
rpc::eth::{helpers::AddDevSigners, FullEthApiServer},
version::{CARGO_PKG_VERSION, CLIENT_CODE, NAME_CLIENT, VERGEN_GIT_SHA},
};
use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
use reth_payload_primitives::PayloadBuilder;
use reth_primitives::EthereumHardforks;
use reth_provider::providers::{BlockchainProvider2, ProviderNodeTypes};
use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventSender;
use reth_tracing::tracing::{debug, error, info};
@ -45,9 +40,9 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::{
common::{Attached, LaunchContextWith, WithConfigs},
hooks::NodeHooks,
rpc::{launch_rpc_servers, EthApiBuilderProvider},
rpc::{RethRpcAddOns, RpcHandle},
setup::build_networked_pipeline,
AddOns, ExExLauncher, FullNode, LaunchContext, LaunchNode, NodeAdapter,
AddOns, AddOnsContext, ExExLauncher, FullNode, LaunchContext, LaunchNode, NodeAdapter,
NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
};
@ -78,12 +73,7 @@ where
Types: ProviderNodeTypes + NodeTypesWithEngine,
T: FullNodeTypes<Types = Types, Provider = BlockchainProvider2<Types>>,
CB: NodeComponentsBuilder<T>,
AO: NodeAddOns<
NodeAdapter<T, CB::Components>,
EthApi: EthApiBuilderProvider<NodeAdapter<T, CB::Components>>
+ FullEthApiServer
+ AddDevSigners,
>,
AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>,
LocalPayloadAttributesBuilder<Types::ChainSpec>: PayloadAttributesBuilder<
<<Types as NodeTypesWithEngine>::Engine as PayloadTypes>::PayloadAttributes,
>,
@ -98,7 +88,7 @@ where
let NodeBuilderWithComponents {
adapter: NodeTypesAdapter { database },
components_builder,
add_ons: AddOns { hooks, rpc, exexs: installed_exex, .. },
add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
config,
} = target;
let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
@ -292,37 +282,18 @@ where
),
);
let client = ClientVersionV1 {
code: CLIENT_CODE,
name: NAME_CLIENT.to_string(),
version: CARGO_PKG_VERSION.to_string(),
commit: VERGEN_GIT_SHA.to_string(),
};
let engine_api = EngineApi::new(
ctx.blockchain_db().clone(),
ctx.chain_spec(),
beacon_engine_handle,
ctx.components().payload_builder().clone().into(),
ctx.components().pool().clone(),
Box::new(ctx.task_executor().clone()),
client,
EngineCapabilities::default(),
ctx.components().engine_validator().clone(),
);
info!(target: "reth::cli", "Engine API handler initialized");
// extract the jwt secret from the args if possible
let jwt_secret = ctx.auth_jwt_secret()?;
// Start RPC servers
let (rpc_server_handles, rpc_registry) = launch_rpc_servers(
ctx.node_adapter().clone(),
engine_api,
ctx.node_config(),
jwt_secret,
rpc,
)
.await?;
let add_ons_ctx = AddOnsContext {
node: ctx.node_adapter(),
config: ctx.node_config(),
beacon_engine_handle: &beacon_engine_handle,
jwt_secret: &jwt_secret,
};
let RpcHandle { rpc_server_handles, rpc_registry } =
add_ons.launch_add_ons(add_ons_ctx).await?;
// TODO: migrate to devmode with https://github.com/paradigmxyz/reth/issues/10104
if let Some(maybe_custom_etherscan_url) = ctx.node_config().debug.etherscan.clone() {
@ -442,13 +413,12 @@ where
provider: ctx.node_adapter().provider.clone(),
payload_builder: ctx.components().payload_builder().clone(),
task_executor: ctx.task_executor().clone(),
rpc_server_handles,
rpc_registry,
config: ctx.node_config().clone(),
data_dir: ctx.data_dir().clone(),
add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry },
};
// Notify on node started
on_node_started.on_event(full_node.clone())?;
on_node_started.on_event(FullNode::clone(&full_node))?;
let handle = NodeHandle {
node_exit_future: NodeExitFuture::new(

View File

@ -12,7 +12,6 @@ pub use exex::ExExLauncher;
use std::{future::Future, sync::Arc};
use alloy_primitives::utils::format_ether;
use alloy_rpc_types::engine::ClientVersionV1;
use futures::{future::Either, stream, stream_select, StreamExt};
use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook, StaticFileHook},
@ -25,17 +24,14 @@ use reth_engine_util::EngineMessageStreamExt;
use reth_exex::ExExManagerHandle;
use reth_network::{BlockDownloaderProvider, NetworkEventListenerProvider};
use reth_node_api::{
FullNodeComponents, FullNodeTypes, NodeAddOns, NodeTypesWithDB, NodeTypesWithEngine,
AddOnsContext, FullNodeComponents, FullNodeTypes, NodeTypesWithDB, NodeTypesWithEngine,
};
use reth_node_core::{
dirs::{ChainPath, DataDirPath},
exit::NodeExitFuture,
rpc::eth::{helpers::AddDevSigners, FullEthApiServer},
version::{CARGO_PKG_VERSION, CLIENT_CODE, NAME_CLIENT, VERGEN_GIT_SHA},
};
use reth_node_events::{cl::ConsensusLayerHealthEvents, node};
use reth_provider::providers::BlockchainProvider;
use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, info};
use reth_transaction_pool::TransactionPool;
@ -47,19 +43,18 @@ use crate::{
components::{NodeComponents, NodeComponentsBuilder},
hooks::NodeHooks,
node::FullNode,
rpc::EthApiBuilderProvider,
rpc::{RethRpcAddOns, RpcHandle},
AddOns, NodeBuilderWithComponents, NodeHandle,
};
/// Alias for [`reth_rpc_eth_types::EthApiBuilderCtx`], adapter for [`FullNodeComponents`].
pub type EthApiBuilderCtx<N, Eth> = reth_rpc_eth_types::EthApiBuilderCtx<
pub type EthApiBuilderCtx<N> = reth_rpc_eth_types::EthApiBuilderCtx<
<N as FullNodeTypes>::Provider,
<N as FullNodeComponents>::Pool,
<N as FullNodeComponents>::Evm,
<N as FullNodeComponents>::Network,
TaskExecutor,
<N as FullNodeTypes>::Provider,
Eth,
>;
/// A general purpose trait that launches a new node of any kind.
@ -109,12 +104,7 @@ where
Types: NodeTypesWithDB<ChainSpec: EthereumHardforks + EthChainSpec> + NodeTypesWithEngine,
T: FullNodeTypes<Provider = BlockchainProvider<Types>, Types = Types>,
CB: NodeComponentsBuilder<T>,
AO: NodeAddOns<
NodeAdapter<T, CB::Components>,
EthApi: EthApiBuilderProvider<NodeAdapter<T, CB::Components>>
+ FullEthApiServer
+ AddDevSigners,
>,
AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>,
{
type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
@ -126,7 +116,7 @@ where
let NodeBuilderWithComponents {
adapter: NodeTypesAdapter { database },
components_builder,
add_ons: AddOns { hooks, rpc, exexs: installed_exex, .. },
add_ons: AddOns { hooks, exexs: installed_exex, add_ons },
config,
} = target;
let NodeHooks { on_component_initialized, on_node_started, .. } = hooks;
@ -336,42 +326,18 @@ where
),
);
let client = ClientVersionV1 {
code: CLIENT_CODE,
name: NAME_CLIENT.to_string(),
version: CARGO_PKG_VERSION.to_string(),
commit: VERGEN_GIT_SHA.to_string(),
};
let engine_api = EngineApi::new(
ctx.blockchain_db().clone(),
ctx.chain_spec(),
beacon_engine_handle,
ctx.components().payload_builder().clone().into(),
ctx.components().pool().clone(),
Box::new(ctx.task_executor().clone()),
client,
EngineCapabilities::default(),
ctx.components().engine_validator().clone(),
);
info!(target: "reth::cli", "Engine API handler initialized");
// extract the jwt secret from the args if possible
let jwt_secret = ctx.auth_jwt_secret()?;
// Start RPC servers
let (rpc_server_handles, rpc_registry) = crate::rpc::launch_rpc_servers(
ctx.node_adapter().clone(),
engine_api,
ctx.node_config(),
jwt_secret,
rpc,
)
.await?;
let add_ons_ctx = AddOnsContext {
node: ctx.node_adapter(),
config: ctx.node_config(),
beacon_engine_handle: &beacon_engine_handle,
jwt_secret: &jwt_secret,
};
// in dev mode we generate 20 random dev-signer accounts
if ctx.is_dev() {
rpc_registry.eth_api().with_dev_accounts();
}
let RpcHandle { rpc_server_handles, rpc_registry } =
add_ons.launch_add_ons(add_ons_ctx).await?;
// Run consensus engine to completion
let (tx, rx) = oneshot::channel();
@ -431,13 +397,12 @@ where
provider: ctx.node_adapter().provider.clone(),
payload_builder: ctx.components().payload_builder().clone(),
task_executor: ctx.task_executor().clone(),
rpc_server_handles,
rpc_registry,
config: ctx.node_config().clone(),
data_dir: ctx.data_dir().clone(),
add_ons_handle: RpcHandle { rpc_server_handles, rpc_registry },
};
// Notify on node started
on_node_started.on_event(full_node.clone())?;
on_node_started.on_event(FullNode::clone(&full_node))?;
let handle = NodeHandle {
node_exit_future: NodeExitFuture::new(

View File

@ -20,10 +20,7 @@ pub mod components;
pub use components::{NodeComponents, NodeComponentsBuilder};
mod builder;
pub use builder::{
add_ons::{AddOns, RpcAddOns},
*,
};
pub use builder::{add_ons::AddOns, *};
mod launch;
pub use launch::{engine::EngineNodeLauncher, *};

View File

@ -1,7 +1,11 @@
// re-export the node api types
pub use reth_node_api::{FullNodeTypes, NodeTypes, NodeTypesWithEngine};
use std::{marker::PhantomData, sync::Arc};
use std::{
marker::PhantomData,
ops::{Deref, DerefMut},
sync::Arc,
};
use reth_node_api::{EngineTypes, FullNodeComponents};
use reth_node_core::{
@ -14,11 +18,7 @@ use reth_provider::ChainSpecProvider;
use reth_rpc_builder::{auth::AuthServerHandle, RpcServerHandle};
use reth_tasks::TaskExecutor;
use crate::{
components::NodeComponentsBuilder,
rpc::{RethRpcServerHandles, RpcRegistry},
NodeAdapter, NodeAddOns,
};
use crate::{components::NodeComponentsBuilder, rpc::RethRpcAddOns, NodeAdapter, NodeAddOns};
/// A [`crate::Node`] is a [`NodeTypesWithEngine`] that comes with preconfigured components.
///
@ -84,7 +84,7 @@ impl<N, C, AO> Node<N> for AnyNode<N, C, AO>
where
N: FullNodeTypes + Clone,
C: NodeComponentsBuilder<N> + Clone + Sync + Unpin + 'static,
AO: NodeAddOns<NodeAdapter<N, C::Components>>,
AO: NodeAddOns<NodeAdapter<N, C::Components>> + Clone + Sync + Unpin + 'static,
{
type ComponentsBuilder = C;
type AddOns = AO;
@ -117,14 +117,12 @@ pub struct FullNode<Node: FullNodeComponents, AddOns: NodeAddOns<Node>> {
pub payload_builder: PayloadBuilderHandle<<Node::Types as NodeTypesWithEngine>::Engine>,
/// Task executor for the node.
pub task_executor: TaskExecutor,
/// Handles to the node's rpc servers
pub rpc_server_handles: RethRpcServerHandles,
/// The configured rpc namespaces
pub rpc_registry: RpcRegistry<Node, AddOns::EthApi>,
/// The initial node config.
pub config: NodeConfig<<Node::Types as NodeTypes>::ChainSpec>,
/// The data dir of the node.
pub data_dir: ChainPath<DataDirPath>,
/// The handle to launched add-ons
pub add_ons_handle: AddOns::Handle,
}
impl<Node: FullNodeComponents, AddOns: NodeAddOns<Node>> Clone for FullNode<Node, AddOns> {
@ -137,10 +135,9 @@ impl<Node: FullNodeComponents, AddOns: NodeAddOns<Node>> Clone for FullNode<Node
provider: self.provider.clone(),
payload_builder: self.payload_builder.clone(),
task_executor: self.task_executor.clone(),
rpc_server_handles: self.rpc_server_handles.clone(),
rpc_registry: self.rpc_registry.clone(),
config: self.config.clone(),
data_dir: self.data_dir.clone(),
add_ons_handle: self.add_ons_handle.clone(),
}
}
}
@ -155,15 +152,22 @@ where
pub fn chain_spec(&self) -> Arc<<Node::Types as NodeTypes>::ChainSpec> {
self.provider.chain_spec()
}
}
impl<Engine, Node, AddOns> FullNode<Node, AddOns>
where
Engine: EngineTypes,
Node: FullNodeComponents<Types: NodeTypesWithEngine<Engine = Engine>>,
AddOns: RethRpcAddOns<Node>,
{
/// Returns the [`RpcServerHandle`] to the started rpc server.
pub const fn rpc_server_handle(&self) -> &RpcServerHandle {
&self.rpc_server_handles.rpc
&self.add_ons_handle.rpc_server_handles.rpc
}
/// Returns the [`AuthServerHandle`] to the started authenticated engine API server.
pub const fn auth_server_handle(&self) -> &AuthServerHandle {
&self.rpc_server_handles.auth
&self.add_ons_handle.rpc_server_handles.auth
}
/// Returns the [`EngineApiClient`] interface for the authenticated engine API.
@ -188,3 +192,17 @@ where
self.auth_server_handle().ipc_client().await
}
}
impl<Node: FullNodeComponents, AddOns: NodeAddOns<Node>> Deref for FullNode<Node, AddOns> {
type Target = AddOns::Handle;
fn deref(&self) -> &Self::Target {
&self.add_ons_handle
}
}
impl<Node: FullNodeComponents, AddOns: NodeAddOns<Node>> DerefMut for FullNode<Node, AddOns> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.add_ons_handle
}
}

View File

@ -1,31 +1,35 @@
//! Builder support for rpc components.
use std::{
fmt,
fmt::{self, Debug},
marker::PhantomData,
ops::{Deref, DerefMut},
};
use alloy_rpc_types::engine::ClientVersionV1;
use futures::TryFutureExt;
use reth_node_api::{BuilderProvider, FullNodeComponents, NodeTypes, NodeTypesWithEngine};
use reth_node_api::{
AddOnsContext, FullNodeComponents, NodeAddOns, NodeTypes, NodeTypesWithEngine,
};
use reth_node_core::{
node_config::NodeConfig,
rpc::{
api::EngineApiServer,
eth::{EthApiTypes, FullEthApiServer},
},
rpc::eth::{EthApiTypes, FullEthApiServer},
version::{CARGO_PKG_VERSION, CLIENT_CODE, NAME_CLIENT, VERGEN_GIT_SHA},
};
use reth_payload_builder::PayloadBuilderHandle;
use reth_provider::providers::ProviderNodeTypes;
use reth_rpc::EthApi;
use reth_rpc_api::eth::helpers::AddDevSigners;
use reth_rpc_builder::{
auth::{AuthRpcModule, AuthServerHandle},
config::RethRpcServerConfig,
RpcModuleBuilder, RpcRegistryInner, RpcServerHandle, TransportRpcModules,
};
use reth_rpc_layer::JwtSecret;
use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, info};
use crate::{EthApiBuilderCtx, RpcAddOns};
use crate::EthApiBuilderCtx;
/// Contains the handles to the spawned RPC servers.
///
@ -292,102 +296,232 @@ where
}
}
/// Launch the rpc servers.
pub async fn launch_rpc_servers<Node, Engine, EthApi>(
node: Node,
engine_api: Engine,
config: &NodeConfig<<Node::Types as NodeTypes>::ChainSpec>,
jwt_secret: JwtSecret,
add_ons: RpcAddOns<Node, EthApi>,
) -> eyre::Result<(RethRpcServerHandles, RpcRegistry<Node, EthApi>)>
where
Node: FullNodeComponents<Types: ProviderNodeTypes> + Clone,
Engine: EngineApiServer<<Node::Types as NodeTypesWithEngine>::Engine>,
EthApi: EthApiBuilderProvider<Node> + FullEthApiServer,
{
let auth_config = config.rpc.auth_server_config(jwt_secret)?;
let module_config = config.rpc.transport_rpc_module_config();
debug!(target: "reth::cli", http=?module_config.http(), ws=?module_config.ws(), "Using RPC module config");
let (mut modules, mut auth_module, registry) = RpcModuleBuilder::default()
.with_provider(node.provider().clone())
.with_pool(node.pool().clone())
.with_network(node.network().clone())
.with_events(node.provider().clone())
.with_executor(node.task_executor().clone())
.with_evm_config(node.evm_config().clone())
.with_block_executor(node.block_executor().clone())
.build_with_auth_server(module_config, engine_api, EthApi::eth_api_builder());
let mut registry = RpcRegistry { registry };
let ctx = RpcContext {
node: node.clone(),
config,
registry: &mut registry,
modules: &mut modules,
auth_module: &mut auth_module,
};
let RpcAddOns { hooks, .. } = add_ons;
let RpcHooks { on_rpc_started, extend_rpc_modules } = hooks;
extend_rpc_modules.extend_rpc_modules(ctx)?;
let server_config = config.rpc.rpc_server_config();
let cloned_modules = modules.clone();
let launch_rpc = server_config.start(&cloned_modules).map_ok(|handle| {
if let Some(path) = handle.ipc_endpoint() {
info!(target: "reth::cli", %path, "RPC IPC server started");
}
if let Some(addr) = handle.http_local_addr() {
info!(target: "reth::cli", url=%addr, "RPC HTTP server started");
}
if let Some(addr) = handle.ws_local_addr() {
info!(target: "reth::cli", url=%addr, "RPC WS server started");
}
handle
});
let launch_auth = auth_module.clone().start_server(auth_config).map_ok(|handle| {
let addr = handle.local_addr();
if let Some(ipc_endpoint) = handle.ipc_endpoint() {
info!(target: "reth::cli", url=%addr, ipc_endpoint=%ipc_endpoint,"RPC auth server started");
} else {
info!(target: "reth::cli", url=%addr, "RPC auth server started");
}
handle
});
// launch servers concurrently
let (rpc, auth) = futures::future::try_join(launch_rpc, launch_auth).await?;
let handles = RethRpcServerHandles { rpc, auth };
let ctx = RpcContext {
node,
config,
registry: &mut registry,
modules: &mut modules,
auth_module: &mut auth_module,
};
on_rpc_started.on_rpc_started(ctx, handles.clone())?;
Ok((handles, registry))
/// Handle to the launched RPC servers.
#[derive(Clone)]
pub struct RpcHandle<Node: FullNodeComponents, EthApi: EthApiTypes> {
/// Handles to launched servers.
pub rpc_server_handles: RethRpcServerHandles,
/// Configured RPC modules.
pub rpc_registry: RpcRegistry<Node, EthApi>,
}
/// Provides builder for the core `eth` API type.
pub trait EthApiBuilderProvider<N: FullNodeComponents>: BuilderProvider<N> + EthApiTypes {
/// Returns the eth api builder.
#[allow(clippy::type_complexity)]
fn eth_api_builder() -> Box<dyn Fn(&EthApiBuilderCtx<N, Self>) -> Self + Send>;
}
impl<Node: FullNodeComponents, EthApi: EthApiTypes> Deref for RpcHandle<Node, EthApi> {
type Target = RpcRegistry<Node, EthApi>;
impl<N, F> EthApiBuilderProvider<N> for F
where
N: FullNodeComponents,
for<'a> F: BuilderProvider<N, Ctx<'a> = &'a EthApiBuilderCtx<N, Self>> + EthApiTypes,
{
fn eth_api_builder() -> Box<dyn Fn(&EthApiBuilderCtx<N, Self>) -> Self + Send> {
F::builder()
fn deref(&self) -> &Self::Target {
&self.rpc_registry
}
}
impl<Node: FullNodeComponents, EthApi: EthApiTypes> Debug for RpcHandle<Node, EthApi>
where
RpcRegistry<Node, EthApi>: Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RpcHandle")
.field("rpc_server_handles", &self.rpc_server_handles)
.field("rpc_registry", &self.rpc_registry)
.finish()
}
}
/// Node add-ons containing RPC server configuration, with customizable eth API handler.
#[allow(clippy::type_complexity)]
pub struct RpcAddOns<Node: FullNodeComponents, EthApi: EthApiTypes> {
/// Additional RPC add-ons.
pub hooks: RpcHooks<Node, EthApi>,
/// Builder for `EthApi`
eth_api_builder: Box<dyn FnOnce(&EthApiBuilderCtx<Node>) -> EthApi + Send + Sync>,
_pd: PhantomData<(Node, EthApi)>,
}
impl<Node: FullNodeComponents, EthApi: EthApiTypes> Debug for RpcAddOns<Node, EthApi> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RpcAddOns")
.field("hooks", &self.hooks)
.field("eth_api_builder", &"...")
.finish()
}
}
impl<Node: FullNodeComponents, EthApi: EthApiTypes> RpcAddOns<Node, EthApi> {
/// Creates a new instance of the RPC add-ons.
pub fn new(
eth_api_builder: impl FnOnce(&EthApiBuilderCtx<Node>) -> EthApi + Send + Sync + 'static,
) -> Self {
Self {
hooks: RpcHooks::default(),
eth_api_builder: Box::new(eth_api_builder),
_pd: PhantomData,
}
}
/// Sets the hook that is run once the rpc server is started.
pub fn on_rpc_started<F>(mut self, hook: F) -> Self
where
F: FnOnce(RpcContext<'_, Node, EthApi>, RethRpcServerHandles) -> eyre::Result<()>
+ Send
+ 'static,
{
self.hooks.set_on_rpc_started(hook);
self
}
/// Sets the hook that is run to configure the rpc modules.
pub fn extend_rpc_modules<F>(mut self, hook: F) -> Self
where
F: FnOnce(RpcContext<'_, Node, EthApi>) -> eyre::Result<()> + Send + 'static,
{
self.hooks.set_extend_rpc_modules(hook);
self
}
}
impl<Node: FullNodeComponents, EthApi: EthApiTypes + EthApiBuilder<Node>> Default
for RpcAddOns<Node, EthApi>
{
fn default() -> Self {
Self::new(EthApi::build)
}
}
impl<N, EthApi> NodeAddOns<N> for RpcAddOns<N, EthApi>
where
N: FullNodeComponents<Types: ProviderNodeTypes>,
EthApi: EthApiTypes + FullEthApiServer + AddDevSigners + Unpin + 'static,
{
type Handle = RpcHandle<N, EthApi>;
async fn launch_add_ons(self, ctx: AddOnsContext<'_, N>) -> eyre::Result<Self::Handle> {
let AddOnsContext { node, config, beacon_engine_handle, jwt_secret } = ctx;
let client = ClientVersionV1 {
code: CLIENT_CODE,
name: NAME_CLIENT.to_string(),
version: CARGO_PKG_VERSION.to_string(),
commit: VERGEN_GIT_SHA.to_string(),
};
let engine_api = EngineApi::new(
node.provider().clone(),
config.chain.clone(),
beacon_engine_handle.clone(),
node.payload_builder().clone().into(),
node.pool().clone(),
Box::new(node.task_executor().clone()),
client,
EngineCapabilities::default(),
node.engine_validator().clone(),
);
info!(target: "reth::cli", "Engine API handler initialized");
let auth_config = config.rpc.auth_server_config(*jwt_secret)?;
let module_config = config.rpc.transport_rpc_module_config();
debug!(target: "reth::cli", http=?module_config.http(), ws=?module_config.ws(), "Using RPC module config");
let (mut modules, mut auth_module, registry) = RpcModuleBuilder::default()
.with_provider(node.provider().clone())
.with_pool(node.pool().clone())
.with_network(node.network().clone())
.with_events(node.provider().clone())
.with_executor(node.task_executor().clone())
.with_evm_config(node.evm_config().clone())
.with_block_executor(node.block_executor().clone())
.build_with_auth_server(module_config, engine_api, self.eth_api_builder);
// in dev mode we generate 20 random dev-signer accounts
if config.dev.dev {
registry.eth_api().with_dev_accounts();
}
let mut registry = RpcRegistry { registry };
let ctx = RpcContext {
node: node.clone(),
config,
registry: &mut registry,
modules: &mut modules,
auth_module: &mut auth_module,
};
let RpcHooks { on_rpc_started, extend_rpc_modules } = self.hooks;
extend_rpc_modules.extend_rpc_modules(ctx)?;
let server_config = config.rpc.rpc_server_config();
let cloned_modules = modules.clone();
let launch_rpc = server_config.start(&cloned_modules).map_ok(|handle| {
if let Some(path) = handle.ipc_endpoint() {
info!(target: "reth::cli", %path, "RPC IPC server started");
}
if let Some(addr) = handle.http_local_addr() {
info!(target: "reth::cli", url=%addr, "RPC HTTP server started");
}
if let Some(addr) = handle.ws_local_addr() {
info!(target: "reth::cli", url=%addr, "RPC WS server started");
}
handle
});
let launch_auth = auth_module.clone().start_server(auth_config).map_ok(|handle| {
let addr = handle.local_addr();
if let Some(ipc_endpoint) = handle.ipc_endpoint() {
info!(target: "reth::cli", url=%addr, ipc_endpoint=%ipc_endpoint,"RPC auth server started");
} else {
info!(target: "reth::cli", url=%addr, "RPC auth server started");
}
handle
});
// launch servers concurrently
let (rpc, auth) = futures::future::try_join(launch_rpc, launch_auth).await?;
let handles = RethRpcServerHandles { rpc, auth };
let ctx = RpcContext {
node: node.clone(),
config,
registry: &mut registry,
modules: &mut modules,
auth_module: &mut auth_module,
};
on_rpc_started.on_rpc_started(ctx, handles.clone())?;
Ok(RpcHandle { rpc_server_handles: handles, rpc_registry: registry })
}
}
/// Helper trait implemented for add-ons producing [`RpcHandle`]. Used by common node launcher
/// implementations.
pub trait RethRpcAddOns<N: FullNodeComponents>:
NodeAddOns<N, Handle = RpcHandle<N, Self::EthApi>>
{
/// eth API implementation.
type EthApi: EthApiTypes;
/// Returns a mutable reference to RPC hooks.
fn hooks_mut(&mut self) -> &mut RpcHooks<N, Self::EthApi>;
}
impl<N: FullNodeComponents, EthApi: EthApiTypes> RethRpcAddOns<N> for RpcAddOns<N, EthApi>
where
Self: NodeAddOns<N, Handle = RpcHandle<N, EthApi>>,
{
type EthApi = EthApi;
fn hooks_mut(&mut self) -> &mut RpcHooks<N, Self::EthApi> {
&mut self.hooks
}
}
/// A `EthApi` that knows how to build itself from [`EthApiBuilderCtx`].
pub trait EthApiBuilder<N: FullNodeComponents>: 'static {
/// Builds the `EthApi` from the given context.
fn build(ctx: &EthApiBuilderCtx<N>) -> Self;
}
impl<N: FullNodeComponents> EthApiBuilder<N> for EthApi<N::Provider, N::Pool, N::Network, N::Evm> {
fn build(ctx: &EthApiBuilderCtx<N>) -> Self {
Self::with_spawner(ctx)
}
}