diff --git a/Cargo.lock b/Cargo.lock index 3bd9ff743..9e34b6c61 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6686,9 +6686,11 @@ dependencies = [ "async-trait", "bytes", "futures", + "futures-util", "jsonrpsee", "parity-tokio-ipc", "pin-project", + "reth-tracing", "serde_json", "thiserror", "tokio", diff --git a/crates/ethereum/evm/src/execute.rs b/crates/ethereum/evm/src/execute.rs index b23c35cfd..64b69d1e5 100644 --- a/crates/ethereum/evm/src/execute.rs +++ b/crates/ethereum/evm/src/execute.rs @@ -465,7 +465,6 @@ where #[cfg(test)] mod tests { use super::*; - use crate::EthEvmConfig; use reth_primitives::{ bytes, constants::{BEACON_ROOTS_ADDRESS, SYSTEM_ADDRESS}, diff --git a/crates/node-core/src/args/rpc_server_args.rs b/crates/node-core/src/args/rpc_server_args.rs index b12f2740a..da3095815 100644 --- a/crates/node-core/src/args/rpc_server_args.rs +++ b/crates/node-core/src/args/rpc_server_args.rs @@ -437,7 +437,7 @@ impl RethRpcConfig for RpcServerArgs { .max_subscriptions_per_connection(self.rpc_max_subscriptions_per_connection.get()) } - fn ipc_server_builder(&self) -> IpcServerBuilder { + fn ipc_server_builder(&self) -> IpcServerBuilder { IpcServerBuilder::default() .max_subscriptions_per_connection(self.rpc_max_subscriptions_per_connection.get()) .max_request_body_size(self.rpc_max_request_size_bytes()) diff --git a/crates/node-core/src/cli/config.rs b/crates/node-core/src/cli/config.rs index 988ef34d5..1bce398ef 100644 --- a/crates/node-core/src/cli/config.rs +++ b/crates/node-core/src/cli/config.rs @@ -49,7 +49,7 @@ pub trait RethRpcConfig { fn http_ws_server_builder(&self) -> ServerBuilder; /// Returns the default ipc server builder - fn ipc_server_builder(&self) -> IpcServerBuilder; + fn ipc_server_builder(&self) -> IpcServerBuilder; /// Creates the [RpcServerConfig] from cli args. fn rpc_server_config(&self) -> RpcServerConfig; diff --git a/crates/optimism/node/src/evm/execute.rs b/crates/optimism/node/src/evm/execute.rs index cca13fb7d..f51c6cd3b 100644 --- a/crates/optimism/node/src/evm/execute.rs +++ b/crates/optimism/node/src/evm/execute.rs @@ -545,7 +545,6 @@ mod tests { use revm::L1_BLOCK_CONTRACT; use std::{collections::HashMap, str::FromStr}; - use crate::OptimismEvmConfig; use reth_revm::test_utils::StateProviderTest; fn create_op_state_provider() -> StateProviderTest { diff --git a/crates/rpc/ipc/Cargo.toml b/crates/rpc/ipc/Cargo.toml index 8d93a275c..21b645409 100644 --- a/crates/rpc/ipc/Cargo.toml +++ b/crates/rpc/ipc/Cargo.toml @@ -29,6 +29,8 @@ serde_json.workspace = true tracing.workspace = true bytes.workspace = true thiserror.workspace = true +futures-util = "0.3.30" [dev-dependencies] tokio-stream = { workspace = true, features = ["sync"] } +reth-tracing.workspace = true diff --git a/crates/rpc/ipc/src/server/future.rs b/crates/rpc/ipc/src/server/future.rs index 84df306a5..65aaccc88 100644 --- a/crates/rpc/ipc/src/server/future.rs +++ b/crates/rpc/ipc/src/server/future.rs @@ -84,7 +84,7 @@ where while i < self.futures.len() { if self.futures[i].poll_unpin(cx).is_ready() { - // Using `swap_remove` since we don't care about ordering + // Using `swap_remove` since we don't care about ordering, // but we do care about removing being `O(1)`. // // We don't increment `i` in this branch, since we now diff --git a/crates/rpc/ipc/src/server/ipc.rs b/crates/rpc/ipc/src/server/ipc.rs index 8ce4502a2..1fd600c03 100644 --- a/crates/rpc/ipc/src/server/ipc.rs +++ b/crates/rpc/ipc/src/server/ipc.rs @@ -1,5 +1,7 @@ //! IPC request handling adapted from [`jsonrpsee`] http request handling +use std::sync::Arc; + use futures::{stream::FuturesOrdered, StreamExt}; use jsonrpsee::{ batch_response_error, @@ -8,15 +10,10 @@ use jsonrpsee::{ tracing::server::{rx_log_from_json, tx_log_from_str}, JsonRawValue, }, - server::IdProvider, - types::{ - error::{reject_too_many_subscriptions, ErrorCode}, - ErrorObject, Id, InvalidRequest, Notification, Params, Request, - }, - BatchResponseBuilder, BoundedSubscriptions, CallOrSubscription, MethodCallback, MethodResponse, - MethodSink, Methods, ResponsePayload, SubscriptionState, + server::middleware::rpc::RpcServiceT, + types::{error::ErrorCode, ErrorObject, Id, InvalidRequest, Notification, Request}, + BatchResponseBuilder, MethodResponse, ResponsePayload, }; -use std::{sync::Arc, time::Instant}; use tokio::sync::OwnedSemaphorePermit; use tokio_util::either::Either; use tracing::instrument; @@ -24,42 +21,33 @@ use tracing::instrument; type Notif<'a> = Notification<'a, Option<&'a JsonRawValue>>; #[derive(Debug, Clone)] -pub(crate) struct Batch<'a> { +pub(crate) struct Batch { data: Vec, - call: CallData<'a>, -} - -#[derive(Debug, Clone)] -pub(crate) struct CallData<'a> { - conn_id: usize, - methods: &'a Methods, - id_provider: &'a dyn IdProvider, - sink: &'a MethodSink, - max_response_body_size: u32, - max_log_length: u32, - request_start: Instant, - bounded_subscriptions: BoundedSubscriptions, + rpc_service: S, } // Batch responses must be sent back as a single message so we read the results from each // request in the batch and read the results off of a new channel, `rx_batch`, and then send the // complete batch response back to the client over `tx`. #[instrument(name = "batch", skip(b), level = "TRACE")] -pub(crate) async fn process_batch_request(b: Batch<'_>) -> Option { - let Batch { data, call } = b; +pub(crate) async fn process_batch_request( + b: Batch, + max_response_body_size: usize, +) -> Option +where + for<'a> S: RpcServiceT<'a> + Send, +{ + let Batch { data, rpc_service } = b; if let Ok(batch) = serde_json::from_slice::>(&data) { let mut got_notif = false; - let mut batch_response = - BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize); + let mut batch_response = BatchResponseBuilder::new_with_limit(max_response_body_size); let mut pending_calls: FuturesOrdered<_> = batch .into_iter() .filter_map(|v| { if let Ok(req) = serde_json::from_str::>(v.get()) { - Some(Either::Right(async { - execute_call(req, call.clone()).await.into_response() - })) + Some(Either::Right(rpc_service.call(req))) } else if let Ok(_notif) = serde_json::from_str::>(v.get()) { // notifications should not be answered. got_notif = true; @@ -95,92 +83,32 @@ pub(crate) async fn process_batch_request(b: Batch<'_>) -> Option { } } -pub(crate) async fn process_single_request( +pub(crate) async fn process_single_request( data: Vec, - call: CallData<'_>, -) -> Option { + rpc_service: &S, +) -> Option +where + for<'a> S: RpcServiceT<'a> + Send, +{ if let Ok(req) = serde_json::from_slice::>(&data) { - Some(execute_call_with_tracing(req, call).await) + Some(execute_call_with_tracing(req, rpc_service).await) } else if serde_json::from_slice::>(&data).is_ok() { None } else { let (id, code) = prepare_error(&data); - Some(CallOrSubscription::Call(MethodResponse::error(id, ErrorObject::from(code)))) + Some(MethodResponse::error(id, ErrorObject::from(code))) } } -#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(call, req), level = "TRACE")] -pub(crate) async fn execute_call_with_tracing<'a>( +#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(req, rpc_service), level = "TRACE")] +pub(crate) async fn execute_call_with_tracing<'a, S>( req: Request<'a>, - call: CallData<'_>, -) -> CallOrSubscription { - execute_call(req, call).await -} - -pub(crate) async fn execute_call(req: Request<'_>, call: CallData<'_>) -> CallOrSubscription { - let CallData { - methods, - max_response_body_size, - max_log_length, - conn_id, - id_provider, - sink, - request_start, - bounded_subscriptions, - } = call; - - rx_log_from_json(&req, call.max_log_length); - - let params = Params::new(req.params.as_ref().map(|params| params.get())); - let name = &req.method; - let id = req.id; - - let response = match methods.method_with_name(name) { - None => { - let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)); - CallOrSubscription::Call(response) - } - Some((_name, method)) => match method { - MethodCallback::Sync(callback) => { - let response = (callback)(id, params, max_response_body_size as usize); - CallOrSubscription::Call(response) - } - MethodCallback::Async(callback) => { - let id = id.into_owned(); - let params = params.into_owned(); - let response = - (callback)(id, params, conn_id, max_response_body_size as usize).await; - CallOrSubscription::Call(response) - } - MethodCallback::AsyncWithDetails(_callback) => { - unimplemented!() - } - MethodCallback::Subscription(callback) => { - if let Some(p) = bounded_subscriptions.acquire() { - let conn_state = - SubscriptionState { conn_id, id_provider, subscription_permit: p }; - let response = callback(id, params, sink.clone(), conn_state).await; - CallOrSubscription::Subscription(response) - } else { - let response = MethodResponse::error( - id, - reject_too_many_subscriptions(bounded_subscriptions.max()), - ); - CallOrSubscription::Call(response) - } - } - MethodCallback::Unsubscription(callback) => { - // Don't adhere to any resource or subscription limits; always let unsubscribing - // happen! - let result = callback(id, params, conn_id, max_response_body_size as usize); - CallOrSubscription::Call(result) - } - }, - }; - - tx_log_from_str(response.as_response().as_result(), max_log_length); - let _ = request_start; - response + rpc_service: &S, +) -> MethodResponse +where + for<'b> S: RpcServiceT<'b> + Send, +{ + rpc_service.call(req).await } #[instrument(name = "notification", fields(method = notif.method.as_ref()), skip(notif, max_log_length), level = "TRACE")] @@ -192,31 +120,15 @@ fn execute_notification(notif: &Notif<'_>, max_log_length: u32) -> MethodRespons response } -#[allow(dead_code)] -pub(crate) struct HandleRequest { - pub(crate) methods: Methods, - pub(crate) max_request_body_size: u32, - pub(crate) max_response_body_size: u32, - pub(crate) max_log_length: u32, - pub(crate) batch_requests_supported: bool, - pub(crate) conn: Arc, - pub(crate) bounded_subscriptions: BoundedSubscriptions, - pub(crate) method_sink: MethodSink, - pub(crate) id_provider: Arc, -} - -pub(crate) async fn handle_request(request: String, input: HandleRequest) -> Option { - let HandleRequest { - methods, - max_response_body_size, - max_log_length, - conn, - bounded_subscriptions, - method_sink, - id_provider, - .. - } = input; - +pub(crate) async fn call_with_service( + request: String, + rpc_service: S, + max_response_body_size: usize, + conn: Arc, +) -> Option +where + for<'a> S: RpcServiceT<'a> + Send, +{ enum Kind { Single, Batch, @@ -231,31 +143,23 @@ pub(crate) async fn handle_request(request: String, input: HandleRequest) -> Opt }) .unwrap_or(Kind::Single); - let call = CallData { - conn_id: 0, - methods: &methods, - id_provider: &*id_provider, - sink: &method_sink, - max_response_body_size, - max_log_length, - request_start: Instant::now(), - bounded_subscriptions, - }; - // Single request or notification let res = if matches!(request_kind, Kind::Single) { - let response = process_single_request(request.into_bytes(), call).await; + let response = process_single_request(request.into_bytes(), &rpc_service).await; match response { - Some(CallOrSubscription::Call(response)) => Some(response.to_result()), - Some(CallOrSubscription::Subscription(_)) => { + Some(response) if response.is_method_call() => Some(response.to_result()), + _ => { // subscription responses are sent directly over the sink, return a response here // would lead to duplicate responses for the subscription response None } - None => None, } } else { - process_batch_request(Batch { data: request.into_bytes(), call }).await + process_batch_request( + Batch { data: request.into_bytes(), rpc_service }, + max_response_body_size, + ) + .await }; drop(conn); diff --git a/crates/rpc/ipc/src/server/mod.rs b/crates/rpc/ipc/src/server/mod.rs index e6d1a6051..7afb6bb7d 100644 --- a/crates/rpc/ipc/src/server/mod.rs +++ b/crates/rpc/ipc/src/server/mod.rs @@ -7,7 +7,10 @@ use crate::server::{ use futures::{FutureExt, Stream, StreamExt}; use jsonrpsee::{ core::TEN_MB_SIZE_BYTES, - server::{AlreadyStoppedError, IdProvider, RandomIntegerIdProvider}, + server::{ + middleware::rpc::{either::Either, RpcLoggerLayer, RpcServiceT}, + AlreadyStoppedError, IdProvider, RandomIntegerIdProvider, + }, BoundedSubscriptions, MethodSink, Methods, }; use std::{ @@ -21,36 +24,57 @@ use tokio::{ io::{AsyncRead, AsyncWrite}, sync::{oneshot, watch, OwnedSemaphorePermit}, }; -use tower::{layer::util::Identity, Service}; +use tower::{layer::util::Identity, Layer, Service}; use tracing::{debug, trace, warn}; // re-export so can be used during builder setup -use crate::server::connection::IpcConnDriver; +use crate::server::{ + connection::IpcConnDriver, + rpc_service::{RpcService, RpcServiceCfg}, +}; pub use parity_tokio_ipc::Endpoint; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use tower::layer::{util::Stack, LayerFn}; mod connection; mod future; mod ipc; +mod rpc_service; /// Ipc Server implementation // This is an adapted `jsonrpsee` Server, but for `Ipc` connections. -pub struct IpcServer { +pub struct IpcServer { /// The endpoint we listen for incoming transactions endpoint: Endpoint, id_provider: Arc, cfg: Settings, - service_builder: tower::ServiceBuilder, + rpc_middleware: RpcServiceBuilder, + http_middleware: tower::ServiceBuilder, } -impl IpcServer { +impl IpcServer { /// Returns the configured [Endpoint] pub fn endpoint(&self) -> &Endpoint { &self.endpoint } +} +impl IpcServer +where + RpcMiddleware: Layer + Clone + Send + 'static, + for<'a> >::Service: RpcServiceT<'a>, + HttpMiddleware: Layer> + Send + 'static, + >>::Service: Send + + Service< + String, + Response = Option, + Error = Box, + >, + <>>::Service as Service>::Future: + Send + Unpin, +{ /// Start responding to connections requests. /// /// This will run on the tokio runtime until the server is stopped or the ServerHandle is @@ -123,7 +147,7 @@ impl IpcServer { let incoming = match self.endpoint.incoming() { Ok(connections) => { #[cfg(windows)] - let connections = Box::pin(connections); + let connections = Box::pin(connections); Incoming::new(connections) } Err(err) => { @@ -154,7 +178,7 @@ impl IpcServer { let (tx, rx) = mpsc::channel::(message_buffer_capacity as usize); let method_sink = MethodSink::new_with_limit(tx, max_response_body_size); - let tower_service = TowerService { + let tower_service = TowerServiceNoHttp { inner: ServiceData { methods: methods.clone(), max_request_body_size, @@ -170,9 +194,10 @@ impl IpcServer { ), method_sink, }, + rpc_middleware: self.rpc_middleware.clone(), }; - let service = self.service_builder.service(tower_service); + let service = self.http_middleware.service(tower_service); connections.add(Box::pin(spawn_connection( ipc, service, @@ -244,16 +269,87 @@ pub(crate) struct ServiceData { pub(crate) method_sink: MethodSink, } +/// Similar to [`tower::ServiceBuilder`] but doesn't +/// support any tower middleware implementations. +#[derive(Debug, Clone)] +pub struct RpcServiceBuilder(tower::ServiceBuilder); + +impl Default for RpcServiceBuilder { + fn default() -> Self { + RpcServiceBuilder(tower::ServiceBuilder::new()) + } +} + +impl RpcServiceBuilder { + /// Create a new [`RpcServiceBuilder`]. + pub fn new() -> Self { + Self(tower::ServiceBuilder::new()) + } +} + +impl RpcServiceBuilder { + /// Optionally add a new layer `T` to the [`RpcServiceBuilder`]. + /// + /// See the documentation for [`tower::ServiceBuilder::option_layer`] for more details. + pub fn option_layer( + self, + layer: Option, + ) -> RpcServiceBuilder, L>> { + let layer = if let Some(layer) = layer { + Either::Left(layer) + } else { + Either::Right(Identity::new()) + }; + self.layer(layer) + } + + /// Add a new layer `T` to the [`RpcServiceBuilder`]. + /// + /// See the documentation for [`tower::ServiceBuilder::layer`] for more details. + pub fn layer(self, layer: T) -> RpcServiceBuilder> { + RpcServiceBuilder(self.0.layer(layer)) + } + + /// Add a [`tower::Layer`] built from a function that accepts a service and returns another + /// service. + /// + /// See the documentation for [`tower::ServiceBuilder::layer_fn`] for more details. + pub fn layer_fn(self, f: F) -> RpcServiceBuilder, L>> { + RpcServiceBuilder(self.0.layer_fn(f)) + } + + /// Add a logging layer to [`RpcServiceBuilder`] + /// + /// This logs each request and response for every call. + pub fn rpc_logger(self, max_log_len: u32) -> RpcServiceBuilder> { + RpcServiceBuilder(self.0.layer(RpcLoggerLayer::new(max_log_len))) + } + + /// Wrap the service `S` with the middleware. + pub(crate) fn service(&self, service: S) -> L::Service + where + L: tower::Layer, + { + self.0.service(service) + } +} + /// JsonRPSee service compatible with `tower`. /// /// # Note /// This is similar to [`hyper::service::service_fn`](https://docs.rs/hyper/latest/hyper/service/fn.service_fn.html). -#[derive(Debug)] -pub struct TowerService { +#[derive(Debug, Clone)] +pub struct TowerServiceNoHttp { inner: ServiceData, + rpc_middleware: RpcServiceBuilder, } -impl Service for TowerService { +impl Service for TowerServiceNoHttp +where + RpcMiddleware: for<'a> Layer, + >::Service: Send + Sync + 'static, + for<'a> >::Service: RpcServiceT<'a>, +{ /// The response of a handled RPC call /// /// This is an `Option` because subscriptions and call responses are handled differently. @@ -273,26 +369,32 @@ impl Service for TowerService { fn call(&mut self, request: String) -> Self::Future { trace!("{:?}", request); - // handle the request - let data = ipc::HandleRequest { - methods: self.inner.methods.clone(), - max_request_body_size: self.inner.max_request_body_size, - max_response_body_size: self.inner.max_response_body_size, - max_log_length: self.inner.max_log_length, - batch_requests_supported: true, - conn: self.inner.conn.clone(), - bounded_subscriptions: self.inner.bounded_subscriptions.clone(), - method_sink: self.inner.method_sink.clone(), + let cfg = RpcServiceCfg::CallsAndSubscriptions { + bounded_subscriptions: BoundedSubscriptions::new( + self.inner.max_subscriptions_per_connection, + ), id_provider: self.inner.id_provider.clone(), + sink: self.inner.method_sink.clone(), }; + let max_response_body_size = self.inner.max_response_body_size as usize; + let rpc_service = self.rpc_middleware.service(RpcService::new( + self.inner.methods.clone(), + max_response_body_size, + self.inner.conn_id as usize, + cfg, + )); + let conn = self.inner.conn.clone(); // an ipc connection needs to handle read+write concurrently // even if the underlying rpc handler spawns the actual work or is does a lot of async any // additional overhead performed by `handle_request` can result in I/O latencies, for // example tracing calls are relatively CPU expensive on serde::serialize alone, moving this // work to a separate task takes the pressure off the connection so all concurrent responses // are also serialized concurrently and the connection can focus on read+write - let f = tokio::task::spawn(async move { ipc::handle_request(request, data).await }); + let f = tokio::task::spawn(async move { + ipc::call_with_service(request, rpc_service, max_response_body_size, conn).await + }); + Box::pin(async move { f.await.map_err(|err| err.into()) }) } } @@ -413,24 +515,26 @@ impl Default for Settings { /// Builder to configure and create a JSON-RPC server #[derive(Debug)] -pub struct Builder { +pub struct Builder { settings: Settings, /// Subscription ID provider. id_provider: Arc, - service_builder: tower::ServiceBuilder, + rpc_middleware: RpcServiceBuilder, + http_middleware: tower::ServiceBuilder, } -impl Default for Builder { +impl Default for Builder { fn default() -> Self { Builder { settings: Settings::default(), id_provider: Arc::new(RandomIntegerIdProvider), - service_builder: tower::ServiceBuilder::new(), + rpc_middleware: RpcServiceBuilder::new(), + http_middleware: tower::ServiceBuilder::new(), } } } -impl Builder { +impl Builder { /// Set the maximum size of a request body in bytes. Default is 10 MiB. pub fn max_request_body_size(mut self, size: u32) -> Self { self.settings.max_request_body_size = size; @@ -529,26 +633,114 @@ impl Builder { /// let builder = tower::ServiceBuilder::new(); /// /// let server = - /// reth_ipc::server::Builder::default().set_middleware(builder).build("/tmp/my-uds"); + /// reth_ipc::server::Builder::default().set_http_middleware(builder).build("/tmp/my-uds"); /// } /// ``` - pub fn set_middleware(self, service_builder: tower::ServiceBuilder) -> Builder { - Builder { settings: self.settings, id_provider: self.id_provider, service_builder } + pub fn set_http_middleware( + self, + service_builder: tower::ServiceBuilder, + ) -> Builder { + Builder { + settings: self.settings, + id_provider: self.id_provider, + http_middleware: service_builder, + rpc_middleware: self.rpc_middleware, + } + } + + /// Enable middleware that is invoked on every JSON-RPC call. + /// + /// The middleware itself is very similar to the `tower middleware` but + /// it has a different service trait which takes &self instead &mut self + /// which means that you can't use built-in middleware from tower. + /// + /// Another consequence of `&self` is that you must wrap any of the middleware state in + /// a type which is Send and provides interior mutability such `Arc`. + /// + /// The builder itself exposes a similar API as the [`tower::ServiceBuilder`] + /// where it is possible to compose layers to the middleware. + /// + /// ``` + /// use std::{ + /// net::SocketAddr, + /// sync::{ + /// atomic::{AtomicUsize, Ordering}, + /// Arc, + /// }, + /// time::Instant, + /// }; + /// + /// use futures_util::future::BoxFuture; + /// use jsonrpsee::{ + /// server::{middleware::rpc::RpcServiceT, ServerBuilder}, + /// types::Request, + /// MethodResponse, + /// }; + /// use reth_ipc::server::{Builder, RpcServiceBuilder}; + /// + /// #[derive(Clone)] + /// struct MyMiddleware { + /// service: S, + /// count: Arc, + /// } + /// + /// impl<'a, S> RpcServiceT<'a> for MyMiddleware + /// where + /// S: RpcServiceT<'a> + Send + Sync + Clone + 'static, + /// { + /// type Future = BoxFuture<'a, MethodResponse>; + /// + /// fn call(&self, req: Request<'a>) -> Self::Future { + /// tracing::info!("MyMiddleware processed call {}", req.method); + /// let count = self.count.clone(); + /// let service = self.service.clone(); + /// + /// Box::pin(async move { + /// let rp = service.call(req).await; + /// // Modify the state. + /// count.fetch_add(1, Ordering::Relaxed); + /// rp + /// }) + /// } + /// } + /// + /// // Create a state per connection + /// // NOTE: The service type can be omitted once `start` is called on the server. + /// let m = RpcServiceBuilder::new().layer_fn(move |service: ()| MyMiddleware { + /// service, + /// count: Arc::new(AtomicUsize::new(0)), + /// }); + /// let builder = Builder::default().set_rpc_middleware(m); + /// ``` + pub fn set_rpc_middleware( + self, + rpc_middleware: RpcServiceBuilder, + ) -> Builder { + Builder { + settings: self.settings, + id_provider: self.id_provider, + rpc_middleware, + http_middleware: self.http_middleware, + } } /// Finalize the configuration of the server. Consumes the [`Builder`]. - pub fn build(self, endpoint: impl AsRef) -> IpcServer { + pub fn build(self, endpoint: impl AsRef) -> IpcServer { let endpoint = Endpoint::new(endpoint.as_ref().to_string()); self.build_with_endpoint(endpoint) } /// Finalize the configuration of the server. Consumes the [`Builder`]. - pub fn build_with_endpoint(self, endpoint: Endpoint) -> IpcServer { + pub fn build_with_endpoint( + self, + endpoint: Endpoint, + ) -> IpcServer { IpcServer { endpoint, cfg: self.settings, id_provider: self.id_provider, - service_builder: self.service_builder, + http_middleware: self.http_middleware, + rpc_middleware: self.rpc_middleware, } } } @@ -589,7 +781,9 @@ mod tests { use futures::future::{select, Either}; use jsonrpsee::{ core::client::{ClientT, Subscription, SubscriptionClientT}, - rpc_params, PendingSubscriptionSink, RpcModule, SubscriptionMessage, + rpc_params, + types::Request, + PendingSubscriptionSink, RpcModule, SubscriptionMessage, }; use parity_tokio_ipc::dummy_endpoint; use tokio::sync::broadcast; @@ -657,6 +851,7 @@ mod tests { #[tokio::test] async fn test_ipc_modules() { + reth_tracing::init_test_tracing(); let endpoint = dummy_endpoint(); let server = Builder::default().build(&endpoint); let mut module = RpcModule::new(()); @@ -703,4 +898,50 @@ mod tests { let items = sub.take(16).collect::>().await; assert_eq!(items.len(), 16); } + + #[tokio::test] + async fn test_rpc_middleware() { + #[derive(Clone)] + struct ModifyRequestIf(S); + + impl<'a, S> RpcServiceT<'a> for ModifyRequestIf + where + S: Send + Sync + RpcServiceT<'a>, + { + type Future = S::Future; + + fn call(&self, mut req: Request<'a>) -> Self::Future { + // Re-direct all calls that isn't `say_hello` to `say_goodbye` + if req.method == "say_hello" { + req.method = "say_goodbye".into(); + } else if req.method == "say_goodbye" { + req.method = "say_hello".into(); + } + + self.0.call(req) + } + } + + reth_tracing::init_test_tracing(); + let endpoint = dummy_endpoint(); + + let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf); + let server = Builder::default().set_rpc_middleware(rpc_middleware).build(&endpoint); + + let mut module = RpcModule::new(()); + let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#; + let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#; + module.register_method("say_hello", move |_, _| hello_msg).unwrap(); + module.register_method("say_goodbye", move |_, _| goodbye_msg).unwrap(); + let handle = server.start(module).await.unwrap(); + tokio::spawn(handle.stopped()); + + let client = IpcClientBuilder::default().build(endpoint).await.unwrap(); + let say_hello_response: String = client.request("say_hello", rpc_params![]).await.unwrap(); + let say_goodbye_response: String = + client.request("say_goodbye", rpc_params![]).await.unwrap(); + + assert_eq!(say_hello_response, goodbye_msg); + assert_eq!(say_goodbye_response, hello_msg); + } } diff --git a/crates/rpc/ipc/src/server/rpc_service.rs b/crates/rpc/ipc/src/server/rpc_service.rs new file mode 100644 index 000000000..94e9ed2aa --- /dev/null +++ b/crates/rpc/ipc/src/server/rpc_service.rs @@ -0,0 +1,138 @@ +//! JSON-RPC service middleware. +use futures_util::future::BoxFuture; +use jsonrpsee::{ + server::{ + middleware::rpc::{ResponseFuture, RpcServiceT}, + IdProvider, + }, + types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Request}, + BoundedSubscriptions, ConnectionDetails, MethodCallback, MethodResponse, MethodSink, Methods, + SubscriptionState, +}; +use std::sync::Arc; + +/// JSON-RPC service middleware. +#[derive(Clone, Debug)] +pub struct RpcService { + conn_id: usize, + methods: Methods, + max_response_body_size: usize, + cfg: RpcServiceCfg, +} + +/// Configuration of the RpcService. +#[allow(dead_code)] +#[derive(Clone, Debug)] +pub(crate) enum RpcServiceCfg { + /// The server supports only calls. + OnlyCalls, + /// The server supports both method calls and subscriptions. + CallsAndSubscriptions { + bounded_subscriptions: BoundedSubscriptions, + sink: MethodSink, + id_provider: Arc, + }, +} + +impl RpcService { + /// Create a new service. + pub(crate) fn new( + methods: Methods, + max_response_body_size: usize, + conn_id: usize, + cfg: RpcServiceCfg, + ) -> Self { + Self { methods, max_response_body_size, conn_id, cfg } + } +} + +impl<'a> RpcServiceT<'a> for RpcService { + // The rpc module is already boxing the futures and + // it's used to under the hood by the RpcService. + type Future = ResponseFuture>; + + fn call(&self, req: Request<'a>) -> Self::Future { + let conn_id = self.conn_id; + let max_response_body_size = self.max_response_body_size; + + let params = req.params(); + let name = req.method_name(); + let id = req.id().clone(); + + match self.methods.method_with_name(name) { + None => { + let rp = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)); + ResponseFuture::ready(rp) + } + Some((_name, method)) => match method { + MethodCallback::Async(callback) => { + let params = params.into_owned(); + let id = id.into_owned(); + + let fut = (callback)(id, params, conn_id, max_response_body_size); + ResponseFuture::future(fut) + } + MethodCallback::AsyncWithDetails(callback) => { + let params = params.into_owned(); + let id = id.into_owned(); + + // Note: Add the `Request::extensions` to the connection details when available + // here. + let fut = (callback)( + id, + params, + ConnectionDetails::_new(conn_id), + max_response_body_size, + ); + ResponseFuture::future(fut) + } + MethodCallback::Sync(callback) => { + let rp = (callback)(id, params, max_response_body_size); + ResponseFuture::ready(rp) + } + MethodCallback::Subscription(callback) => { + let RpcServiceCfg::CallsAndSubscriptions { + bounded_subscriptions, + sink, + id_provider, + } = self.cfg.clone() + else { + tracing::warn!("Subscriptions not supported"); + let rp = + MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)); + return ResponseFuture::ready(rp); + }; + + if let Some(p) = bounded_subscriptions.acquire() { + let conn_state = SubscriptionState { + conn_id, + id_provider: &*id_provider.clone(), + subscription_permit: p, + }; + + let fut = callback(id.clone(), params, sink, conn_state); + ResponseFuture::future(fut) + } else { + let max = bounded_subscriptions.max(); + let rp = MethodResponse::error(id, reject_too_many_subscriptions(max)); + ResponseFuture::ready(rp) + } + } + MethodCallback::Unsubscription(callback) => { + // Don't adhere to any resource or subscription limits; always let unsubscribing + // happen! + + let RpcServiceCfg::CallsAndSubscriptions { .. } = self.cfg else { + tracing::warn!("Subscriptions not supported"); + let rp = + MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError)); + return ResponseFuture::ready(rp); + }; + + let rp = callback(id, params, conn_id, max_response_body_size); + ResponseFuture::ready(rp) + } + }, + } + } +} diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 2349c6e85..cd21be271 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -161,7 +161,7 @@ pub struct AuthServerConfig { /// Configs for JSON-RPC Http. pub(crate) server_config: ServerBuilder, /// Configs for IPC server - pub(crate) ipc_server_config: Option, + pub(crate) ipc_server_config: Option>, /// IPC endpoint pub(crate) ipc_endpoint: Option, } @@ -223,7 +223,7 @@ pub struct AuthServerConfigBuilder { socket_addr: Option, secret: JwtSecret, server_config: Option>, - ipc_server_config: Option, + ipc_server_config: Option>, ipc_endpoint: Option, } @@ -289,7 +289,7 @@ impl AuthServerConfigBuilder { /// Configures the IPC server /// /// Note: this always configures an [EthSubscriptionIdProvider] - pub fn with_ipc_config(mut self, config: IpcServerBuilder) -> Self { + pub fn with_ipc_config(mut self, config: IpcServerBuilder) -> Self { self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default())); self } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index ef5b8868c..62f82b8f8 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -171,7 +171,9 @@ use jsonrpsee::{ use reth_engine_primitives::EngineTypes; use reth_evm::ConfigureEvm; use reth_ipc::server::IpcServer; -pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint}; +pub use reth_ipc::server::{ + Builder as IpcServerBuilder, Endpoint, RpcServiceBuilder as IpcRpcServiceBuilder, +}; use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers}; use reth_provider::{ AccountReader, BlockReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, @@ -1472,7 +1474,7 @@ pub struct RpcServerConfig { /// Address where to bind the ws server to ws_addr: Option, /// Configs for JSON-RPC IPC server - ipc_server_config: Option, + ipc_server_config: Option>, /// The Endpoint where to launch the ipc server ipc_endpoint: Option, /// JWT secret for authentication @@ -1508,7 +1510,7 @@ impl RpcServerConfig { } /// Creates a new config with only ipc set - pub fn ipc(config: IpcServerBuilder) -> Self { + pub fn ipc(config: IpcServerBuilder) -> Self { Self::default().with_ipc(config) } @@ -1568,7 +1570,7 @@ impl RpcServerConfig { /// /// Note: this always configures an [EthSubscriptionIdProvider] [IdProvider] for convenience. /// To set a custom [IdProvider], please use [Self::with_id_provider]. - pub fn with_ipc(mut self, config: IpcServerBuilder) -> Self { + pub fn with_ipc(mut self, config: IpcServerBuilder) -> Self { self.ipc_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default())); self } @@ -1756,13 +1758,12 @@ impl RpcServerConfig { server.ws_http = self.build_ws_http(modules).await?; if let Some(builder) = self.ipc_server_config { - // let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::new).unwrap_or_default(); + let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default(); let ipc_path = self .ipc_endpoint .unwrap_or_else(|| Endpoint::new(DEFAULT_IPC_ENDPOINT.to_string())); let ipc = builder - // TODO(mattsse): add metrics middleware for IPC - // .set_middleware(metrics) + .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)) .build(ipc_path.path()); server.ipc = Some(ipc); } @@ -2127,7 +2128,7 @@ pub struct RpcServer { /// Configured ws,http servers ws_http: WsHttpServer, /// ipc server - ipc: Option, + ipc: Option>>, } // === impl RpcServer ===