Feat: support for engine api over ipc (#7428)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Nil Medvedev
2024-04-10 21:31:53 +01:00
committed by GitHub
parent 69355affb8
commit dd83c9c4f8
6 changed files with 211 additions and 10 deletions

View File

@ -93,6 +93,14 @@ impl<Node: FullNodeComponents> FullNode<Node> {
pub async fn engine_ws_client(&self) -> impl EngineApiClient<Node::Engine> {
self.auth_server_handle().ws_client().await
}
/// Returns the [EngineApiClient] interface for the authenticated engine API.
///
/// This will send not authenticated IPC requests to the node's auth server.
#[cfg(unix)]
pub async fn engine_ipc_client(&self) -> Option<impl EngineApiClient<Node::Engine>> {
self.auth_server_handle().ipc_client().await
}
}
impl<Node: FullNodeComponents> Clone for FullNode<Node> {

View File

@ -300,7 +300,12 @@ where
let launch_auth = auth_module.clone().start_server(auth_config).map_ok(|handle| {
let addr = handle.local_addr();
info!(target: "reth::cli", url=%addr, "RPC auth server started");
if let Some(ipc_endpoint) = handle.ipc_endpoint() {
let ipc_endpoint = ipc_endpoint.path();
info!(target: "reth::cli", url=%addr, ipc_endpoint=%ipc_endpoint,"RPC auth server started");
} else {
info!(target: "reth::cli", url=%addr, "RPC auth server started");
}
handle
});

View File

@ -176,6 +176,14 @@ pub struct RpcServerArgs {
/// Gas price oracle configuration.
#[command(flatten)]
pub gas_price_oracle: GasPriceOracleArgs,
/// Enable auth engine api over IPC
#[arg(long)]
pub auth_ipc: bool,
/// Filename for auth IPC socket/pipe within the datadir
#[arg(long = "auth-ipc.path", default_value_t = constants::DEFAULT_ENGINE_API_IPC_ENDPOINT.to_string())]
pub auth_ipc_path: String,
}
impl RpcServerArgs {
@ -191,6 +199,12 @@ impl RpcServerArgs {
self
}
/// Enables the Auth IPC
pub fn with_auth_ipc(mut self) -> Self {
self.auth_ipc = true;
self
}
/// Change rpc port numbers based on the instance number.
/// * The `auth_port` is scaled by a factor of `instance * 100`
/// * The `http_port` is scaled by a factor of `-instance`
@ -458,7 +472,11 @@ impl RethRpcConfig for RpcServerArgs {
fn auth_server_config(&self, jwt_secret: JwtSecret) -> Result<AuthServerConfig, RpcError> {
let address = SocketAddr::new(self.auth_addr, self.auth_port);
Ok(AuthServerConfig::builder(jwt_secret).socket_addr(address).build())
let mut builder = AuthServerConfig::builder(jwt_secret).socket_addr(address);
if self.auth_ipc {
builder = builder.ipc_endpoint(self.auth_ipc_path.clone());
}
Ok(builder.build())
}
fn auth_jwt_secret(&self, default_jwt_path: PathBuf) -> Result<JwtSecret, JwtError> {
@ -494,6 +512,8 @@ impl Default for RpcServerArgs {
auth_addr: Ipv4Addr::LOCALHOST.into(),
auth_port: constants::DEFAULT_AUTH_PORT,
auth_jwtsecret: None,
auth_ipc: false,
auth_ipc_path: constants::DEFAULT_ENGINE_API_IPC_ENDPOINT.to_string(),
rpc_jwtsecret: None,
rpc_max_request_size: RPC_DEFAULT_MAX_REQUEST_SIZE_MB.into(),
rpc_max_response_size: RPC_DEFAULT_MAX_RESPONSE_SIZE_MB.into(),

View File

@ -116,6 +116,71 @@ async fn can_run_eth_node() -> eyre::Result<()> {
Ok(())
}
#[tokio::test]
#[cfg(unix)]
async fn can_run_eth_node_with_auth_engine_api_over_ipc() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let test_suite = TestSuite::new();
// Node setup
let node_config = NodeConfig::test()
.with_chain(test_suite.chain_spec())
.with_rpc(RpcServerArgs::default().with_unused_ports().with_http().with_auth_ipc());
let NodeHandle { mut node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(tasks.executor())
.node(EthereumNode::default())
.launch()
.await?;
// setup engine api events and payload service events
let _notifications = node.provider.canonical_state_stream();
let payload_events = node.payload_builder.subscribe().await?;
let mut payload_event_stream = payload_events.into_stream();
// push tx into pool via RPC server
let eth_api = node.rpc_registry.eth_api();
let (_expected_hash, raw_tx) = test_suite.transfer_tx().await;
eth_api.send_raw_transaction(raw_tx).await?;
// trigger new payload building draining the pool
let eth_attr = eth_payload_attributes();
let _payload_id = node.payload_builder.new_payload(eth_attr.clone()).await?;
// first event is the payload attributes
let first_event = payload_event_stream.next().await.unwrap()?;
if let reth::payload::Events::Attributes(attr) = first_event {
assert_eq!(eth_attr.timestamp, attr.timestamp);
} else {
panic!("Expect first event as payload attributes.")
}
Ok(())
}
#[tokio::test]
#[cfg(unix)]
async fn test_failed_run_eth_node_with_no_auth_engine_api_over_ipc_opts() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
let tasks = TaskManager::current();
let test_suite = TestSuite::new();
// Node setup
let node_config = NodeConfig::test().with_chain(test_suite.chain_spec());
let NodeHandle { node, node_exit_future: _ } = NodeBuilder::new(node_config)
.testing_node(tasks.executor())
.node(EthereumNode::default())
.launch()
.await?;
let client = node.engine_ipc_client().await;
assert!(client.is_none(), "ipc auth should be disabled by default");
Ok(())
}
fn eth_payload_attributes() -> EthPayloadBuilderAttributes {
let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();

View File

@ -4,14 +4,18 @@ use crate::{
error::{RpcError, ServerKind},
EthConfig,
};
use hyper::header::AUTHORIZATION;
pub use jsonrpsee::server::ServerBuilder;
use jsonrpsee::{
core::RegisterMethodError,
http_client::HeaderMap,
server::{AlreadyStoppedError, RpcModule, ServerHandle},
server::{AlreadyStoppedError, RpcModule},
Methods,
};
use reth_ipc::client::IpcClientBuilder;
pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint};
use reth_network_api::{NetworkInfo, Peers};
use reth_node_api::{ConfigureEvm, EngineTypes};
use reth_provider::{
@ -30,6 +34,7 @@ use reth_rpc_api::servers::*;
use reth_tasks::{pool::BlockingTaskPool, TaskSpawner};
use reth_transaction_pool::TransactionPool;
use std::{
fmt,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::{Duration, SystemTime, UNIX_EPOCH},
};
@ -142,7 +147,8 @@ where
.map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
let handle = server.start(module);
Ok(AuthServerHandle { handle, local_addr, secret })
Ok(AuthServerHandle { handle, local_addr, secret, ipc_endpoint: None, ipc_handle: None })
}
/// Server configuration for the auth server.
@ -154,6 +160,10 @@ pub struct AuthServerConfig {
pub(crate) secret: JwtSecret,
/// Configs for JSON-RPC Http.
pub(crate) server_config: ServerBuilder<Identity, Identity>,
/// Configs for IPC server
pub(crate) ipc_server_config: Option<IpcServerBuilder>,
/// IPC endpoint
pub(crate) ipc_endpoint: Option<String>,
}
// === impl AuthServerConfig ===
@ -171,7 +181,7 @@ impl AuthServerConfig {
/// Convenience function to start a server in one step.
pub async fn start(self, module: AuthRpcModule) -> Result<AuthServerHandle, RpcError> {
let Self { socket_addr, secret, server_config } = self;
let Self { socket_addr, secret, server_config, ipc_server_config, ipc_endpoint } = self;
// Create auth middleware.
let middleware = tower::ServiceBuilder::new()
@ -188,17 +198,45 @@ impl AuthServerConfig {
.local_addr()
.map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
let handle = server.start(module.inner);
Ok(AuthServerHandle { handle, local_addr, secret })
let handle = server.start(module.inner.clone());
let mut ipc_handle: Option<reth_ipc::server::ServerHandle> = None;
if let Some(ipc_server_config) = ipc_server_config {
let ipc_endpoint_str = ipc_endpoint
.clone()
.unwrap_or_else(|| constants::DEFAULT_ENGINE_API_IPC_ENDPOINT.to_string());
let ipc_path = Endpoint::new(ipc_endpoint_str);
let ipc_server = ipc_server_config.build(ipc_path.path());
let res = ipc_server
.start(module.inner)
.await
.map_err(reth_ipc::server::IpcServerStartError::from)?;
ipc_handle = Some(res);
}
Ok(AuthServerHandle { handle, local_addr, secret, ipc_endpoint, ipc_handle })
}
}
/// Builder type for configuring an `AuthServerConfig`.
#[derive(Debug)]
pub struct AuthServerConfigBuilder {
socket_addr: Option<SocketAddr>,
secret: JwtSecret,
server_config: Option<ServerBuilder<Identity, Identity>>,
ipc_server_config: Option<IpcServerBuilder>,
ipc_endpoint: Option<String>,
}
impl fmt::Debug for AuthServerConfigBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AuthServerConfig")
.field("socket_addr", &self.socket_addr)
.field("secret", &self.secret)
.field("server_config", &self.server_config)
.field("ipc_server_config", &self.ipc_server_config)
.field("ipc_endpoint", &self.ipc_endpoint)
.finish()
}
}
// === impl AuthServerConfigBuilder ===
@ -206,7 +244,13 @@ pub struct AuthServerConfigBuilder {
impl AuthServerConfigBuilder {
/// Create a new `AuthServerConfigBuilder` with the given `secret`.
pub fn new(secret: JwtSecret) -> Self {
Self { socket_addr: None, secret, server_config: None }
Self {
socket_addr: None,
secret,
server_config: None,
ipc_server_config: None,
ipc_endpoint: None,
}
}
/// Set the socket address for the server.
@ -236,6 +280,20 @@ impl AuthServerConfigBuilder {
self
}
/// Set the ipc endpoint for the server.
pub fn ipc_endpoint(mut self, ipc_endpoint: String) -> Self {
self.ipc_endpoint = Some(ipc_endpoint);
self
}
/// Configures the IPC server
///
/// Note: this always configures an [EthSubscriptionIdProvider]
pub fn with_ipc_config(mut self, config: IpcServerBuilder) -> Self {
self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default()));
self
}
/// Build the `AuthServerConfig`.
pub fn build(self) -> AuthServerConfig {
AuthServerConfig {
@ -259,6 +317,14 @@ impl AuthServerConfigBuilder {
.max_request_body_size(128 * 1024 * 1024)
.set_id_provider(EthSubscriptionIdProvider::default())
}),
ipc_server_config: self.ipc_server_config.map(|ipc_server_config| {
ipc_server_config
.max_response_body_size(750 * 1024 * 1024)
.max_connections(500)
.max_request_body_size(128 * 1024 * 1024)
.set_id_provider(EthSubscriptionIdProvider::default())
}),
ipc_endpoint: self.ipc_endpoint,
}
}
}
@ -315,8 +381,10 @@ impl AuthRpcModule {
#[must_use = "Server stops if dropped"]
pub struct AuthServerHandle {
local_addr: SocketAddr,
handle: ServerHandle,
handle: jsonrpsee::server::ServerHandle,
secret: JwtSecret,
ipc_endpoint: Option<String>,
ipc_handle: Option<reth_ipc::server::ServerHandle>,
}
// === impl AuthServerHandle ===
@ -372,4 +440,31 @@ impl AuthServerHandle {
.await
.expect("Failed to create ws client")
}
/// Returns an ipc client connected to the server.
#[cfg(unix)]
pub async fn ipc_client(&self) -> Option<jsonrpsee::async_client::Client> {
if let Some(ipc_endpoint) = self.ipc_endpoint.clone() {
return Some(
IpcClientBuilder::default()
.build(Endpoint::new(ipc_endpoint).path())
.await
.expect("Failed to create ipc client"),
)
}
None
}
/// Returns an ipc handle
pub fn ipc_handle(&self) -> Option<reth_ipc::server::ServerHandle> {
self.ipc_handle.clone()
}
/// Return an ipc endpoint
pub fn ipc_endpoint(&self) -> Option<Endpoint> {
if let Some(ipc_endpoint) = self.ipc_endpoint.clone() {
return Some(Endpoint::new(ipc_endpoint))
}
None
}
}

View File

@ -38,3 +38,11 @@ pub const DEFAULT_IPC_ENDPOINT: &str = r"\\.\pipe\reth.ipc";
/// The default IPC endpoint
#[cfg(not(windows))]
pub const DEFAULT_IPC_ENDPOINT: &str = "/tmp/reth.ipc";
/// The engine_api IPC endpoint
#[cfg(windows)]
pub const DEFAULT_ENGINE_API_IPC_ENDPOINT: &str = r"\\.\pipe\reth_engine_api.ipc";
/// The engine_api IPC endpoint
#[cfg(not(windows))]
pub const DEFAULT_ENGINE_API_IPC_ENDPOINT: &str = "/tmp/reth_engine_api.ipc";