chore(rpc): rpc and auth server cleanup (#2272)

This commit is contained in:
Matthias Seitz
2023-04-16 22:49:39 +02:00
committed by GitHub
parent 581e9ffcc4
commit f1fc9ecafb
5 changed files with 309 additions and 42 deletions

View File

@ -2,24 +2,26 @@
use crate::dirs::{JwtSecretPath, PlatformPath};
use clap::Args;
use futures::FutureExt;
use jsonrpsee::server::ServerHandle;
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{
BlockProvider, CanonStateSubscriptions, EvmEnvProvider, HeaderProvider, StateProviderFactory,
};
use reth_rpc::{JwtError, JwtSecret};
use reth_rpc_builder::{
constants, error::RpcError, IpcServerBuilder, RethRpcModule, RpcModuleSelection,
RpcServerConfig, RpcServerHandle, ServerBuilder, TransportRpcModuleConfig,
auth::AuthServerConfig, constants, error::RpcError, IpcServerBuilder, RethRpcModule,
RpcModuleBuilder, RpcModuleSelection, RpcServerConfig, RpcServerHandle, ServerBuilder,
TransportRpcModuleConfig,
};
use reth_rpc_engine_api::EngineApi;
use reth_rpc_engine_api::{EngineApi, EngineApiServer};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
path::Path,
};
use tracing::info;
/// Parameters for configuring the rpc more granularity via CLI
#[derive(Debug, Args, PartialEq, Default)]
@ -114,8 +116,62 @@ impl RpcServerArgs {
}
}
/// Configures and launches _all_ servers.
///
/// Returns the handles for the launched regular RPC server(s) (if any) and the server handle
/// for the auth server that handles the `engine_` API that's accessed by the consensus
/// layer.
pub async fn start_servers<Client, Pool, Network, Tasks, Events, Engine>(
&self,
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
events: Events,
engine_api: Engine,
) -> Result<(RpcServerHandle, ServerHandle), RpcError>
where
Client: BlockProvider
+ HeaderProvider
+ StateProviderFactory
+ EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
Engine: EngineApiServer,
{
let auth_config = self.auth_server_config()?;
let (rpc_modules, auth_module) = RpcModuleBuilder::default()
.with_client(client)
.with_pool(pool)
.with_network(network)
.with_events(events)
.with_executor(executor)
.build_with_auth_server(self.transport_rpc_module_config(), engine_api);
let server_config = self.rpc_server_config();
let has_server = server_config.has_server();
let launch_rpc = rpc_modules.start_server(server_config).inspect(|_| {
if has_server {
info!(target: "reth::cli", "Started RPC server");
}
});
let launch_auth = auth_module.start_server(auth_config).inspect(|_| {
info!(target: "reth::cli", "Started Auth server");
});
// launch servers concurrently
futures::future::try_join(launch_rpc, launch_auth).await
}
/// Convenience function for starting a rpc server with configs which extracted from cli args.
pub(crate) async fn start_rpc_server<Client, Pool, Network, Tasks, Events>(
pub async fn start_rpc_server<Client, Pool, Network, Tasks, Events>(
&self,
client: Client,
pool: Pool,
@ -149,7 +205,7 @@ impl RpcServerArgs {
}
/// Create Engine API server.
pub(crate) async fn start_auth_server<Client, Pool, Network, Tasks>(
pub async fn start_auth_server<Client, Pool, Network, Tasks>(
&self,
client: Client,
pool: Pool,
@ -235,6 +291,17 @@ impl RpcServerArgs {
config
}
/// Creates the [AuthServerConfig] from cli args.
fn auth_server_config(&self) -> Result<AuthServerConfig, RpcError> {
let secret = self.jwt_secret().map_err(|err| RpcError::Custom(err.to_string()))?;
let address = SocketAddr::new(
self.auth_addr.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)),
self.auth_port.unwrap_or(constants::DEFAULT_AUTH_PORT),
);
Ok(AuthServerConfig::builder(secret).socket_addr(address).build())
}
}
#[cfg(test)]

View File

@ -11,7 +11,7 @@ use crate::{
use clap::{crate_version, Parser};
use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{pin_mut, stream::select as stream_select, FutureExt, StreamExt};
use futures::{pin_mut, stream::select as stream_select, StreamExt};
use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus};
use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine, BeaconEngineMessage};
@ -317,35 +317,18 @@ impl Command {
);
info!(target: "reth::cli", "Engine API handler initialized");
let launch_rpc = self
// Start RPC servers
let (_rpc_server, _auth_server) = self
.rpc
.start_rpc_server(
.start_servers(
shareable_db.clone(),
transaction_pool.clone(),
network.clone(),
ctx.task_executor.clone(),
blockchain_tree,
)
.inspect(|_| {
info!(target: "reth::cli", "Started RPC server");
});
let launch_auth = self
.rpc
.start_auth_server(
shareable_db.clone(),
transaction_pool.clone(),
network.clone(),
ctx.task_executor.clone(),
engine_api,
)
.inspect(|_| {
info!(target: "reth::cli", "Started Auth server");
});
// launch servers
let (_rpc_server, _auth_server) =
futures::future::try_join(launch_rpc, launch_auth).await?;
.await?;
// Run consensus engine to completion
let (rx, tx) = oneshot::channel();

View File

@ -1,25 +1,27 @@
use crate::error::{RpcError, ServerKind};
use crate::{
constants,
error::{RpcError, ServerKind},
};
pub use jsonrpsee::server::ServerBuilder;
use jsonrpsee::server::{RpcModule, ServerHandle};
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory};
use reth_rpc::{
eth::cache::EthStateCache, AuthLayer, EngineApi, EthApi, EthFilter, JwtAuthValidator, JwtSecret,
eth::cache::EthStateCache, AuthLayer, EthApi, EthFilter, JwtAuthValidator, JwtSecret,
};
use reth_rpc_api::servers::*;
use reth_rpc_api::{servers::*, EngineApiServer};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use std::net::SocketAddr;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
/// Configure and launch an auth server with `engine` and a _new_ `eth` namespace.
/// Configure and launch a _standalone_ auth server with `engine` and a _new_ `eth` namespace.
#[allow(clippy::too_many_arguments)]
pub async fn launch<Client, Pool, Network, Tasks>(
pub async fn launch<Client, Pool, Network, Tasks, EngineApi>(
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
engine_api: EngineApi<Client>,
engine_api: EngineApi,
socket_addr: SocketAddr,
secret: JwtSecret,
) -> Result<ServerHandle, RpcError>
@ -34,6 +36,7 @@ where
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
EngineApi: EngineApiServer,
{
// spawn a new cache task
let eth_cache = EthStateCache::spawn_with(client.clone(), Default::default(), executor);
@ -42,11 +45,11 @@ where
launch_with_eth_api(eth_api, eth_filter, engine_api, socket_addr, secret).await
}
/// Configure and launch an auth server with existing EthApi implementation.
pub async fn launch_with_eth_api<Client, Pool, Network>(
/// Configure and launch a _standalone_ auth server with existing EthApi implementation.
pub async fn launch_with_eth_api<Client, Pool, Network, EngineApi>(
eth_api: EthApi<Client, Pool, Network>,
eth_filter: EthFilter<Client, Pool>,
engine_api: EngineApi<Client>,
engine_api: EngineApi,
socket_addr: SocketAddr,
secret: JwtSecret,
) -> Result<ServerHandle, RpcError>
@ -60,6 +63,7 @@ where
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
EngineApi: EngineApiServer,
{
// Configure the module and start the server.
let mut module = RpcModule::new(());
@ -80,3 +84,96 @@ where
Ok(server.start(module)?)
}
/// Server configuration for the auth server.
#[derive(Clone, Debug)]
pub struct AuthServerConfig {
/// Where the server should listen.
pub(crate) socket_addr: SocketAddr,
/// The secrete for the auth layer of the server.
pub(crate) secret: JwtSecret,
}
// === impl AuthServerConfig ===
impl AuthServerConfig {
/// Convenience function to create a new `AuthServerConfig`.
pub fn builder(secret: JwtSecret) -> AuthServerConfigBuilder {
AuthServerConfigBuilder::new(secret)
}
/// Convenience function to start a server in one step.
pub async fn start(self, module: AuthRpcModule) -> Result<ServerHandle, RpcError> {
let Self { socket_addr, secret } = self;
// Create auth middleware.
let middleware =
tower::ServiceBuilder::new().layer(AuthLayer::new(JwtAuthValidator::new(secret)));
// By default, both http and ws are enabled.
let server =
ServerBuilder::new().set_middleware(middleware).build(socket_addr).await.map_err(
|err| RpcError::from_jsonrpsee_error(err, ServerKind::Auth(socket_addr)),
)?;
Ok(server.start(module.inner)?)
}
}
/// Builder type for configuring an `AuthServerConfig`.
#[derive(Clone, Debug)]
pub struct AuthServerConfigBuilder {
socket_addr: Option<SocketAddr>,
secret: JwtSecret,
}
// === impl AuthServerConfigBuilder ===
impl AuthServerConfigBuilder {
/// Create a new `AuthServerConfigBuilder` with the given `secret`.
pub fn new(secret: JwtSecret) -> Self {
Self { socket_addr: None, secret }
}
/// Set the socket address for the server.
pub fn socket_addr(mut self, socket_addr: SocketAddr) -> Self {
self.socket_addr = Some(socket_addr);
self
}
/// Set the socket address for the server.
pub fn maybe_socket_addr(mut self, socket_addr: Option<SocketAddr>) -> Self {
self.socket_addr = socket_addr;
self
}
/// Set the secret for the server.
pub fn secret(mut self, secret: JwtSecret) -> Self {
self.secret = secret;
self
}
/// Build the `AuthServerConfig`.
pub fn build(self) -> AuthServerConfig {
AuthServerConfig {
socket_addr: self.socket_addr.unwrap_or_else(|| {
SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), constants::DEFAULT_AUTH_PORT)
}),
secret: self.secret,
}
}
}
/// Holds installed modules for the auth server.
#[derive(Debug)]
pub struct AuthRpcModule {
pub(crate) inner: RpcModule<()>,
}
// === impl TransportRpcModules ===
impl AuthRpcModule {
/// Convenience function for starting a server
pub async fn start_server(self, config: AuthServerConfig) -> Result<ServerHandle, RpcError> {
config.start(self).await
}
}

View File

@ -51,6 +51,51 @@
//! .unwrap();
//! }
//! ```
//!
//! Configure a http and ws server with a separate auth server that handles the `engine_` API
//!
//!
//! ```
//! use tokio::try_join;
//! use reth_network_api::{NetworkInfo, Peers};
//! use reth_provider::{BlockProvider, CanonStateSubscriptions, StateProviderFactory, EvmEnvProvider};
//! use reth_rpc::JwtSecret;
//! use reth_rpc_builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, TransportRpcModuleConfig};
//! use reth_tasks::TokioTaskExecutor;
//! use reth_transaction_pool::TransactionPool;
//! use reth_rpc_api::EngineApiServer;
//! use reth_rpc_builder::auth::AuthServerConfig;
//! pub async fn launch<Client, Pool, Network, Events, EngineApi>(client: Client, pool: Pool, network: Network, events: Events, engine_api: EngineApi)
//! where
//! Client: BlockProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static,
//! Pool: TransactionPool + Clone + 'static,
//! Network: NetworkInfo + Peers + Clone + 'static,
//! Events: CanonStateSubscriptions + Clone + 'static,
//! EngineApi: EngineApiServer
//! {
//! // configure the rpc module per transport
//! let transports = TransportRpcModuleConfig::default().with_http(vec![
//! RethRpcModule::Admin,
//! RethRpcModule::Debug,
//! RethRpcModule::Eth,
//! RethRpcModule::Web3,
//! ]);
//! let builder = RpcModuleBuilder::new(client, pool, network, TokioTaskExecutor::default(), events);
//!
//! // configure the server modules
//! let (modules, auth_module) = builder.build_with_auth_server(transports, engine_api);
//!
//! // start the servers
//! let auth_config = AuthServerConfig::builder(JwtSecret::random()).build();
//! let config = RpcServerConfig::default();
//!
//! let (_rpc_handle, _auth_handle) = try_join!(
//! modules.start_server(config),
//! auth_module.start_server(auth_config),
//! ).unwrap();
//!
//! }
//! ```
use constants::*;
use error::{RpcError, ServerKind};
@ -66,7 +111,7 @@ use reth_rpc::{
eth::cache::EthStateCache, AdminApi, DebugApi, EthApi, EthFilter, EthPubSub,
EthSubscriptionIdProvider, NetApi, TraceApi, TracingCallGuard, Web3Api,
};
use reth_rpc_api::servers::*;
use reth_rpc_api::{servers::*, EngineApiServer};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use serde::{Deserialize, Serialize, Serializer};
@ -77,7 +122,6 @@ use std::{
str::FromStr,
};
use strum::{AsRefStr, EnumString, EnumVariantNames, ParseError, VariantNames};
use tower::layer::util::{Identity, Stack};
use tower_http::cors::CorsLayer;
use tracing::{instrument, trace};
@ -98,6 +142,7 @@ mod eth;
pub mod constants;
// re-export for convenience
use crate::auth::AuthRpcModule;
pub use crate::eth::{EthConfig, EthHandlers};
pub use jsonrpsee::server::ServerBuilder;
pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint};
@ -130,7 +175,7 @@ where
/// A builder type to configure the RPC module: See [RpcModule]
///
/// This is the main entrypoint for up RPC servers.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct RpcModuleBuilder<Client, Pool, Network, Tasks, Events> {
/// The Client type to when creating all rpc handlers
client: Client,
@ -212,6 +257,42 @@ where
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
{
/// Configures all [RpcModule]s specific to the given [TransportRpcModuleConfig] which can be
/// used to start the transport server(s).
///
/// And also configures the auth server, which also exposes the `eth_` namespace.
pub fn build_with_auth_server<EngineApi>(
self,
module_config: TransportRpcModuleConfig,
engine: EngineApi,
) -> (TransportRpcModules<()>, AuthRpcModule)
where
EngineApi: EngineApiServer,
{
let mut modules = TransportRpcModules::default();
let Self { client, pool, network, executor, events } = self;
let TransportRpcModuleConfig { http, ws, ipc, config } = module_config;
let mut registry = RethModuleRegistry::new(
client,
pool,
network,
executor,
events,
config.unwrap_or_default(),
);
modules.http = registry.maybe_module(http.as_ref());
modules.ws = registry.maybe_module(ws.as_ref());
modules.ipc = registry.maybe_module(ipc.as_ref());
let auth_module = registry.create_auth_module(engine);
(modules, auth_module)
}
/// Configures all [RpcModule]s specific to the given [TransportRpcModuleConfig] which can be
/// used to start the transport server(s).
///
@ -548,6 +629,28 @@ where
self
}
/// Configures the auth module that includes the
/// * `engine_` namespace
/// * `api_` namespace
///
/// Note: This does _not_ register the `engine_` in this registry.
pub fn create_auth_module<EngineApi>(&mut self, engine_api: EngineApi) -> AuthRpcModule
where
EngineApi: EngineApiServer,
{
let eth_handlers = self.eth_handlers();
let mut module = RpcModule::new(());
module.merge(engine_api.into_rpc()).expect("No conflicting methods");
// also merge all `eth_` handlers
module.merge(eth_handlers.api.into_rpc()).expect("No conflicting methods");
module.merge(eth_handlers.filter.into_rpc()).expect("No conflicting methods");
module.merge(eth_handlers.pubsub.into_rpc()).expect("No conflicting methods");
AuthRpcModule { inner: module }
}
/// Register Net Namespace
pub fn register_net(&mut self) -> &mut Self {
let eth_api = self.eth_api();
@ -668,6 +771,11 @@ where
f(self.eth.as_ref().expect("exists; qed"))
}
/// Returns the configured [EthHandlers] or creates it if it does not exist yet
fn eth_handlers(&mut self) -> EthHandlers<Client, Pool, Network, Events> {
self.with_eth(|handlers| handlers.clone())
}
/// Returns the configured [EthApi] or creates it if it does not exist yet
fn eth_api(&mut self) -> EthApi<Client, Pool, Network> {
self.with_eth(|handlers| handlers.api.clone())
@ -827,6 +935,15 @@ impl RpcServerConfig {
self
}
/// Returns true if any server is configured.
///
/// If no server is configured, no server will be be launched on [RpcServerConfig::start].
pub fn has_server(&self) -> bool {
self.http_server_config.is_some() ||
self.ws_server_config.is_some() ||
self.ipc_server_config.is_some()
}
/// Returns the [SocketAddr] of the http server
pub fn http_address(&self) -> Option<SocketAddr> {
self.http_addr

View File

@ -20,3 +20,6 @@ mod error;
pub use engine_api::{EngineApi, EngineApiSender};
pub use error::*;
pub use message::EngineApiMessageVersion;
// re-export server trait for convenience
pub use reth_rpc_api::EngineApiServer;