feat: Add RethCliExt (#3983)

This commit is contained in:
Matthias Seitz
2023-07-31 16:55:11 +02:00
committed by GitHub
parent c0544ed7e1
commit a1e68151d3
5 changed files with 242 additions and 78 deletions

View File

@ -1,6 +1,9 @@
//! clap [Args](clap::Args) for RPC related arguments.
use crate::args::GasPriceOracleArgs;
use crate::{
args::GasPriceOracleArgs,
cli::ext::{NoopArgsExt, RethRpcConfig, RethRpcServerArgsExt},
};
use clap::{
builder::{PossibleValue, RangedU64ValueParser, TypedValueParser},
Arg, Args, Command,
@ -52,9 +55,9 @@ pub(crate) const RPC_DEFAULT_MAX_CONNECTIONS: u32 = 100;
pub(crate) const RPC_DEFAULT_MAX_TRACING_REQUESTS: u32 = 25;
/// Parameters for configuring the rpc more granularity via CLI
#[derive(Debug, Args, PartialEq, Eq)]
#[derive(Debug, Args)]
#[command(next_help_heading = "RPC")]
pub struct RpcServerArgs {
pub struct RpcServerArgs<Ext: RethRpcServerArgsExt = NoopArgsExt> {
/// Enable the HTTP-RPC server
#[arg(long, default_value_if("dev", "true", "true"))]
pub http: bool,
@ -160,9 +163,13 @@ pub struct RpcServerArgs {
/// Maximum number of env cache entries.
#[arg(long, default_value_t = DEFAULT_ENV_CACHE_MAX_LEN)]
pub env_cache_len: u32,
/// Additional arguments for rpc.
#[clap(flatten)]
pub ext: Ext,
}
impl RpcServerArgs {
impl<Ext: RethRpcServerArgsExt> RpcServerArgs<Ext> {
/// Returns the max request size in bytes.
pub fn rpc_max_request_size_bytes(&self) -> u32 {
self.rpc_max_request_size * 1024 * 1024
@ -183,21 +190,6 @@ impl RpcServerArgs {
)
}
/// Extracts the [EthConfig] from the args.
pub fn eth_config(&self) -> EthConfig {
EthConfig::default()
.max_tracing_requests(self.rpc_max_tracing_requests)
.rpc_gas_cap(self.rpc_gas_cap)
.gpo_config(self.gas_price_oracle_config())
}
/// Convenience function that returns whether ipc is enabled
///
/// By default IPC is enabled therefor it is enabled if the `ipcdisable` is false.
fn is_ipc_enabled(&self) -> bool {
!self.ipcdisable
}
/// The execution layer and consensus layer clients SHOULD accept a configuration parameter:
/// jwt-secret, which designates a file containing the hex-encoded 256 bit secret key to be used
/// for verifying/generating JWT tokens.
@ -244,7 +236,7 @@ impl RpcServerArgs {
events: Events,
engine_api: Engine,
jwt_secret: JwtSecret,
) -> Result<(RpcServerHandle, AuthServerHandle), RpcError>
) -> eyre::Result<(RpcServerHandle, AuthServerHandle)>
where
Provider: BlockReaderIdExt
+ HeaderProvider
@ -266,7 +258,7 @@ impl RpcServerArgs {
let module_config = self.transport_rpc_module_config();
debug!(target: "reth::cli", http=?module_config.http(), ws=?module_config.ws(), "Using RPC module config");
let (rpc_modules, auth_module) = RpcModuleBuilder::default()
let (mut rpc_modules, auth_module, mut registry) = RpcModuleBuilder::default()
.with_provider(provider)
.with_pool(pool)
.with_network(network)
@ -274,6 +266,9 @@ impl RpcServerArgs {
.with_executor(executor)
.build_with_auth_server(module_config, engine_api);
// apply configured customization
self.ext.extend_rpc_modules(self, &mut registry, &mut rpc_modules)?;
let server_config = self.rpc_server_config();
let launch_rpc = rpc_modules.start_server(server_config).map_ok(|handle| {
if let Some(url) = handle.ipc_endpoint() {
@ -295,7 +290,7 @@ impl RpcServerArgs {
});
// launch servers concurrently
futures::future::try_join(launch_rpc, launch_auth).await
Ok(futures::future::try_join(launch_rpc, launch_auth).await?)
}
/// Convenience function for starting a rpc server with configs which extracted from cli args.
@ -454,6 +449,20 @@ impl RpcServerArgs {
}
}
impl<Ext: RethRpcServerArgsExt> RethRpcConfig for RpcServerArgs<Ext> {
fn is_ipc_enabled(&self) -> bool {
// By default IPC is enabled therefor it is enabled if the `ipcdisable` is false.
!self.ipcdisable
}
fn eth_config(&self) -> EthConfig {
EthConfig::default()
.max_tracing_requests(self.rpc_max_tracing_requests)
.rpc_gas_cap(self.rpc_gas_cap)
.gpo_config(self.gas_price_oracle_config())
}
}
/// clap value parser for [RpcModuleSelection].
#[derive(Clone, Debug, Default)]
#[non_exhaustive]

94
bin/reth/src/cli/ext.rs Normal file
View File

@ -0,0 +1,94 @@
//! Support for integrating customizations into the CLI.
use clap::Args;
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{
BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, EvmEnvProvider,
StateProviderFactory,
};
use reth_rpc_builder::{EthConfig, RethModuleRegistry, TransportRpcModules};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use std::fmt;
/// A trait that allows for extending parts of the CLI with additional functionality.
pub trait RethCliExt {
/// Extends the rpc arguments for the node
type RpcExt: RethRpcServerArgsExt;
}
impl RethCliExt for () {
type RpcExt = NoopArgsExt;
}
/// An [Args] extension that does nothing.
#[derive(Debug, Clone, Copy, Default, Args)]
pub struct NoopArgsExt;
/// A trait that provides configured RPC server.
///
/// This provides all basic config values for the RPC server and is implemented by the
/// [RpcServerArgs](crate::args::RpcServerArgs) type.
pub trait RethRpcConfig {
/// Returns whether ipc is enabled.
fn is_ipc_enabled(&self) -> bool;
/// The configured ethereum RPC settings.
fn eth_config(&self) -> EthConfig;
// TODO extract more functions from RpcServerArgs
}
/// A trait that allows further customization of the RPC server via CLI.
pub trait RethRpcServerArgsExt: fmt::Debug + clap::Args {
/// Allows for registering additional RPC modules for the transports.
///
/// This is expected to call the merge functions of [TransportRpcModules], for example
/// [TransportRpcModules::merge_configured]
fn extend_rpc_modules<Conf, Provider, Pool, Network, Tasks, Events>(
&self,
config: &Conf,
registry: &mut RethModuleRegistry<Provider, Pool, Network, Tasks, Events>,
modules: &mut TransportRpcModules<()>,
) -> eyre::Result<()>
where
Conf: RethRpcConfig,
Provider: BlockReaderIdExt
+ StateProviderFactory
+ EvmEnvProvider
+ ChainSpecProvider
+ ChangeSetReader
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static;
}
impl RethRpcServerArgsExt for NoopArgsExt {
fn extend_rpc_modules<Conf, Provider, Pool, Network, Tasks, Events>(
&self,
_config: &Conf,
_registry: &mut RethModuleRegistry<Provider, Pool, Network, Tasks, Events>,
_modules: &mut TransportRpcModules<()>,
) -> eyre::Result<()>
where
Conf: RethRpcConfig,
Provider: BlockReaderIdExt
+ StateProviderFactory
+ EvmEnvProvider
+ ChainSpecProvider
+ ChangeSetReader
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
{
Ok(())
}
}

View File

@ -1,7 +1,9 @@
//! CLI definition and entrypoint to executable
use crate::{
args::utils::genesis_value_parser,
chain, config, db, debug_cmd,
chain,
cli::ext::RethCliExt,
config, db, debug_cmd,
dirs::{LogsDir, PlatformPath},
node, p2p,
runner::CliRunner,
@ -17,15 +19,17 @@ use reth_tracing::{
};
use std::sync::Arc;
pub mod ext;
/// The main reth cli interface.
///
/// This is the entrypoint to the executable.
#[derive(Debug, Parser)]
#[command(author, version = SHORT_VERSION, long_version = LONG_VERSION, about = "Reth", long_about = None)]
pub struct Cli {
pub struct Cli<Ext: RethCliExt = ()> {
/// The command to run
#[clap(subcommand)]
command: Commands,
command: Commands<Ext>,
/// The chain this node is running.
///
@ -99,10 +103,10 @@ pub fn run() -> eyre::Result<()> {
/// Commands to be executed
#[derive(Debug, Subcommand)]
pub enum Commands {
pub enum Commands<Ext: RethCliExt = ()> {
/// Start the node
#[command(name = "node")]
Node(node::Command),
Node(node::Command<Ext>),
/// Initialize the database from a genesis file.
#[command(name = "init")]
Init(chain::InitCommand),
@ -225,9 +229,9 @@ mod tests {
/// runtime
#[test]
fn test_parse_help_all_subcommands() {
let reth = Cli::command();
let reth = Cli::<()>::command();
for sub_command in reth.get_subcommands() {
let err = Cli::try_parse_from(["reth", sub_command.get_name(), "--help"])
let err = Cli::<()>::try_parse_from(["reth", sub_command.get_name(), "--help"])
.err()
.unwrap_or_else(|| {
panic!("Failed to parse help message {}", sub_command.get_name())
@ -243,13 +247,13 @@ mod tests {
/// name
#[test]
fn parse_logs_path() {
let mut reth = Cli::try_parse_from(["reth", "node", "--log.persistent"]).unwrap();
let mut reth = Cli::<()>::try_parse_from(["reth", "node", "--log.persistent"]).unwrap();
reth.logs.log_directory = reth.logs.log_directory.join(reth.chain.chain.to_string());
let log_dir = reth.logs.log_directory;
assert!(log_dir.as_ref().ends_with("reth/logs/mainnet"), "{:?}", log_dir);
let mut reth =
Cli::try_parse_from(["reth", "node", "--chain", "sepolia", "--log.persistent"])
Cli::<()>::try_parse_from(["reth", "node", "--chain", "sepolia", "--log.persistent"])
.unwrap();
reth.logs.log_directory = reth.logs.log_directory.join(reth.chain.chain.to_string());
let log_dir = reth.logs.log_directory;

View File

@ -2,9 +2,16 @@
//!
//! Starts the client
use crate::{
args::{get_secret_key, DebugArgs, DevArgs, NetworkArgs, RpcServerArgs, TxPoolArgs},
dirs::DataDirPath,
args::{
get_secret_key,
utils::{genesis_value_parser, parse_socket_address},
DatabaseArgs, DebugArgs, DevArgs, NetworkArgs, PayloadBuilderArgs, RpcServerArgs,
TxPoolArgs,
},
cli::ext::RethCliExt,
dirs::{DataDirPath, MaybePlatformPath},
init::init_genesis,
node::cl_events::ConsensusLayerHealthEvents,
prometheus_exporter,
runner::CliContext,
utils::get_single_header,
@ -32,26 +39,30 @@ use reth_interfaces::{
p2p::{
bodies::{client::BodiesClient, downloader::BodyDownloader},
either::EitherDownloader,
headers::downloader::HeaderDownloader,
headers::{client::HeadersClient, downloader::HeaderDownloader},
},
};
use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager};
use reth_network_api::NetworkInfo;
use reth_payload_builder::PayloadBuilderService;
use reth_primitives::{
stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, Head, SealedHeader, H256,
stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, DisplayHardforks, Head,
SealedHeader, H256,
};
use reth_provider::{
BlockHashReader, BlockReader, CanonStateSubscriptions, HeaderProvider, ProviderFactory,
StageCheckpointReader,
providers::BlockchainProvider, BlockHashReader, BlockReader, CanonStateSubscriptions,
HeaderProvider, ProviderFactory, StageCheckpointReader,
};
use reth_prune::BatchSizes;
use reth_revm::Factory;
use reth_revm_inspectors::stack::Hook;
use reth_rpc_engine_api::EngineApi;
use reth_stages::{
prelude::*,
stages::{
ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage,
TotalDifficultyStage,
AccountHashingStage, ExecutionStage, ExecutionStageThresholds, HeaderSyncMode,
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TotalDifficultyStage, TransactionLookupStage,
},
MetricEventsSender, MetricsListener,
};
@ -66,30 +77,12 @@ use std::{
use tokio::sync::{mpsc::unbounded_channel, oneshot, watch};
use tracing::*;
use crate::{
args::{
utils::{genesis_value_parser, parse_socket_address},
DatabaseArgs, PayloadBuilderArgs,
},
dirs::MaybePlatformPath,
node::cl_events::ConsensusLayerHealthEvents,
};
use reth_interfaces::p2p::headers::client::HeadersClient;
use reth_payload_builder::PayloadBuilderService;
use reth_primitives::DisplayHardforks;
use reth_provider::providers::BlockchainProvider;
use reth_prune::BatchSizes;
use reth_stages::stages::{
AccountHashingStage, IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage,
StorageHashingStage, TransactionLookupStage,
};
pub mod cl_events;
pub mod events;
/// Start the node
#[derive(Debug, Parser)]
pub struct Command {
pub struct Command<Ext: RethCliExt = ()> {
/// The path to the data dir for all reth files and subdirectories.
///
/// Defaults to the OS-specific data directory:
@ -134,7 +127,7 @@ pub struct Command {
network: NetworkArgs,
#[clap(flatten)]
rpc: RpcServerArgs,
rpc: RpcServerArgs<Ext::RpcExt>,
#[clap(flatten)]
txpool: TxPoolArgs,
@ -820,60 +813,61 @@ async fn run_network_until_shutdown<C>(
#[cfg(test)]
mod tests {
use reth_primitives::DEV;
use super::*;
use reth_primitives::DEV;
use std::{net::IpAddr, path::Path};
#[test]
fn parse_help_node_command() {
let err = Command::try_parse_from(["reth", "--help"]).unwrap_err();
let err = Command::<()>::try_parse_from(["reth", "--help"]).unwrap_err();
assert_eq!(err.kind(), clap::error::ErrorKind::DisplayHelp);
}
#[test]
fn parse_common_node_command_chain_args() {
for chain in ["mainnet", "sepolia", "goerli"] {
let args: Command = Command::parse_from(["reth", "--chain", chain]);
let args: Command = Command::<()>::parse_from(["reth", "--chain", chain]);
assert_eq!(args.chain.chain, chain.parse().unwrap());
}
}
#[test]
fn parse_discovery_port() {
let cmd = Command::try_parse_from(["reth", "--discovery.port", "300"]).unwrap();
let cmd = Command::<()>::try_parse_from(["reth", "--discovery.port", "300"]).unwrap();
assert_eq!(cmd.network.discovery.port, Some(300));
}
#[test]
fn parse_port() {
let cmd =
Command::try_parse_from(["reth", "--discovery.port", "300", "--port", "99"]).unwrap();
Command::<()>::try_parse_from(["reth", "--discovery.port", "300", "--port", "99"])
.unwrap();
assert_eq!(cmd.network.discovery.port, Some(300));
assert_eq!(cmd.network.port, Some(99));
}
#[test]
fn parse_metrics_port() {
let cmd = Command::try_parse_from(["reth", "--metrics", "9001"]).unwrap();
let cmd = Command::<()>::try_parse_from(["reth", "--metrics", "9001"]).unwrap();
assert_eq!(cmd.metrics, Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9001)));
let cmd = Command::try_parse_from(["reth", "--metrics", ":9001"]).unwrap();
let cmd = Command::<()>::try_parse_from(["reth", "--metrics", ":9001"]).unwrap();
assert_eq!(cmd.metrics, Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9001)));
let cmd = Command::try_parse_from(["reth", "--metrics", "localhost:9001"]).unwrap();
let cmd = Command::<()>::try_parse_from(["reth", "--metrics", "localhost:9001"]).unwrap();
assert_eq!(cmd.metrics, Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 9001)));
}
#[test]
fn parse_config_path() {
let cmd = Command::try_parse_from(["reth", "--config", "my/path/to/reth.toml"]).unwrap();
let cmd =
Command::<()>::try_parse_from(["reth", "--config", "my/path/to/reth.toml"]).unwrap();
// always store reth.toml in the data dir, not the chain specific data dir
let data_dir = cmd.datadir.unwrap_or_chain_default(cmd.chain.chain);
let config_path = cmd.config.unwrap_or(data_dir.config_path());
assert_eq!(config_path, Path::new("my/path/to/reth.toml"));
let cmd = Command::try_parse_from(["reth"]).unwrap();
let cmd = Command::<()>::try_parse_from(["reth"]).unwrap();
// always store reth.toml in the data dir, not the chain specific data dir
let data_dir = cmd.datadir.unwrap_or_chain_default(cmd.chain.chain);
@ -883,12 +877,12 @@ mod tests {
#[test]
fn parse_db_path() {
let cmd = Command::try_parse_from(["reth"]).unwrap();
let cmd = Command::<()>::try_parse_from(["reth"]).unwrap();
let data_dir = cmd.datadir.unwrap_or_chain_default(cmd.chain.chain);
let db_path = data_dir.db_path();
assert!(db_path.ends_with("reth/mainnet/db"), "{:?}", cmd.config);
let cmd = Command::try_parse_from(["reth", "--datadir", "my/custom/path"]).unwrap();
let cmd = Command::<()>::try_parse_from(["reth", "--datadir", "my/custom/path"]).unwrap();
let data_dir = cmd.datadir.unwrap_or_chain_default(cmd.chain.chain);
let db_path = data_dir.db_path();
assert_eq!(db_path, Path::new("my/custom/path/db"));
@ -896,7 +890,7 @@ mod tests {
#[test]
fn parse_dev() {
let cmd = Command::parse_from(["reth", "--dev"]);
let cmd = Command::<()>::parse_from(["reth", "--dev"]);
let chain = DEV.clone();
assert_eq!(cmd.chain.chain, chain.chain);
assert_eq!(cmd.chain.genesis_hash, chain.genesis_hash);

View File

@ -89,7 +89,7 @@
//! let builder = RpcModuleBuilder::new(provider, pool, network, TokioTaskExecutor::default(), events);
//!
//! // configure the server modules
//! let (modules, auth_module) = builder.build_with_auth_server(transports, engine_api);
//! let (modules, auth_module, _registry) = builder.build_with_auth_server(transports, engine_api);
//!
//! // start the servers
//! let auth_config = AuthServerConfig::builder(JwtSecret::random()).build();
@ -343,7 +343,11 @@ where
self,
module_config: TransportRpcModuleConfig,
engine: EngineApi,
) -> (TransportRpcModules<()>, AuthRpcModule)
) -> (
TransportRpcModules<()>,
AuthRpcModule,
RethModuleRegistry<Provider, Pool, Network, Tasks, Events>,
)
where
EngineApi: EngineApiServer,
{
@ -369,7 +373,7 @@ where
let auth_module = registry.create_auth_module(engine);
(modules, auth_module)
(modules, auth_module, registry)
}
/// Configures all [RpcModule]s specific to the given [TransportRpcModuleConfig] which can be
@ -1026,12 +1030,12 @@ where
}
/// Returns the configured [EthHandlers] or creates it if it does not exist yet
fn eth_handlers(&mut self) -> EthHandlers<Provider, Pool, Network, Events> {
pub fn eth_handlers(&mut self) -> EthHandlers<Provider, Pool, Network, Events> {
self.with_eth(|handlers| handlers.clone())
}
/// Returns the configured [EthApi] or creates it if it does not exist yet
fn eth_api(&mut self) -> EthApi<Provider, Pool, Network> {
pub fn eth_api(&mut self) -> EthApi<Provider, Pool, Network> {
self.with_eth(|handlers| handlers.api.clone())
}
}
@ -1456,6 +1460,65 @@ impl TransportRpcModules<()> {
&self.config
}
/// Merge the given Methods in the configured http methods.
///
/// Fails if any of the methods in other is present already.
///
/// Returns Ok(false) if no http transport is configured.
pub fn merge_http(
&mut self,
other: impl Into<Methods>,
) -> Result<bool, jsonrpsee::core::error::Error> {
if let Some(ref mut http) = self.http {
return http.merge(other.into()).map(|_| true)
}
Ok(false)
}
/// Merge the given Methods in the configured ws methods.
///
/// Fails if any of the methods in other is present already.
///
/// Returns Ok(false) if no http transport is configured.
pub fn merge_ws(
&mut self,
other: impl Into<Methods>,
) -> Result<bool, jsonrpsee::core::error::Error> {
if let Some(ref mut ws) = self.ws {
return ws.merge(other.into()).map(|_| true)
}
Ok(false)
}
/// Merge the given Methods in the configured ipc methods.
///
/// Fails if any of the methods in other is present already.
///
/// Returns Ok(false) if no ipc transport is configured.
pub fn merge_ipc(
&mut self,
other: impl Into<Methods>,
) -> Result<bool, jsonrpsee::core::error::Error> {
if let Some(ref mut http) = self.http {
return http.merge(other.into()).map(|_| true)
}
Ok(false)
}
/// Merge the given Methods in all configured methods.
///
/// Fails if any of the methods in other is present already.
pub fn merge_configured(
&mut self,
other: impl Into<Methods>,
) -> Result<(), jsonrpsee::core::error::Error> {
let other = other.into();
self.merge_http(other.clone())?;
self.merge_ws(other.clone())?;
self.merge_ipc(other.clone())?;
Ok(())
}
/// Convenience function for starting a server
pub async fn start_server(self, builder: RpcServerConfig) -> Result<RpcServerHandle, RpcError> {
builder.start(self).await