From caa26833a595de99c1285007c13f5f571968194e Mon Sep 17 00:00:00 2001 From: prames <134806363+0xprames@users.noreply.github.com> Date: Wed, 26 Jul 2023 09:17:36 -0400 Subject: [PATCH] rpc server metrics impl (#3913) Co-authored-by: Matthias Seitz --- Cargo.lock | 1 + crates/rpc/rpc-builder/Cargo.toml | 1 + crates/rpc/rpc-builder/src/lib.rs | 17 ++++-- crates/rpc/rpc-builder/src/metrics.rs | 84 +++++++++++++++++++++++++++ 4 files changed, 99 insertions(+), 4 deletions(-) create mode 100644 crates/rpc/rpc-builder/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 6d2141a46..1d5a1f6b2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5746,6 +5746,7 @@ dependencies = [ "reth-beacon-consensus", "reth-interfaces", "reth-ipc", + "reth-metrics", "reth-network-api", "reth-payload-builder", "reth-primitives", diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index c0078969d..07753d133 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -21,6 +21,7 @@ reth-rpc-engine-api = { path = "../rpc-engine-api" } reth-rpc-types = { workspace = true } reth-tasks = { workspace = true } reth-transaction-pool = { workspace = true } +reth-metrics = { workspace = true, features = ["common"] } # rpc/net jsonrpsee = { version = "0.18", features = ["server"] } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index afa7a37bf..4e86181b9 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -103,7 +103,7 @@ //! } //! ``` -use crate::{auth::AuthRpcModule, error::WsHttpSamePortError}; +use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcServerMetrics}; use constants::*; use error::{RpcError, ServerKind}; use jsonrpsee::{ @@ -157,6 +157,9 @@ pub mod constants; /// Additional support for tracing related rpc calls pub mod tracing_pool; +// Rpc server metrics +mod metrics; + // re-export for convenience pub use crate::eth::{EthConfig, EthHandlers}; pub use jsonrpsee::server::ServerBuilder; @@ -1232,7 +1235,7 @@ impl RpcServerConfig { let ws_socket_addr = self .ws_addr .unwrap_or(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_WS_RPC_PORT))); - + let metrics = RpcServerMetrics::default(); // If both are configured on the same port, we combine them into one server. if self.http_addr == self.ws_addr && self.http_server_config.is_some() && @@ -1264,6 +1267,7 @@ impl RpcServerConfig { http_socket_addr, cors, ServerKind::WsHttp(http_socket_addr), + metrics.clone(), ) .await?; return Ok(WsHttpServer { @@ -1285,6 +1289,7 @@ impl RpcServerConfig { ws_socket_addr, self.ws_cors_domains.take(), ServerKind::WS(ws_socket_addr), + metrics.clone(), ) .await?; ws_local_addr = Some(addr); @@ -1298,6 +1303,7 @@ impl RpcServerConfig { http_socket_addr, self.http_cors_domains.take(), ServerKind::Http(http_socket_addr), + metrics.clone(), ) .await?; http_local_addr = Some(addr); @@ -1529,9 +1535,9 @@ impl Default for WsHttpServers { /// Http Servers Enum enum WsHttpServerKind { /// Http server - Plain(Server), + Plain(Server), /// Http server with cors - WithCors(Server>), + WithCors(Server, RpcServerMetrics>), } // === impl WsHttpServerKind === @@ -1551,12 +1557,14 @@ impl WsHttpServerKind { socket_addr: SocketAddr, cors_domains: Option, server_kind: ServerKind, + metrics: RpcServerMetrics, ) -> Result<(Self, SocketAddr), RpcError> { if let Some(cors) = cors_domains.as_deref().map(cors::create_cors_layer) { let cors = cors.map_err(|err| RpcError::Custom(err.to_string()))?; let middleware = tower::ServiceBuilder::new().layer(cors); let server = builder .set_middleware(middleware) + .set_logger(metrics) .build(socket_addr) .await .map_err(|err| RpcError::from_jsonrpsee_error(err, server_kind))?; @@ -1565,6 +1573,7 @@ impl WsHttpServerKind { Ok((server, local_addr)) } else { let server = builder + .set_logger(metrics) .build(socket_addr) .await .map_err(|err| RpcError::from_jsonrpsee_error(err, server_kind))?; diff --git a/crates/rpc/rpc-builder/src/metrics.rs b/crates/rpc/rpc-builder/src/metrics.rs new file mode 100644 index 000000000..e40e2c232 --- /dev/null +++ b/crates/rpc/rpc-builder/src/metrics.rs @@ -0,0 +1,84 @@ +use jsonrpsee::server::logger::{HttpRequest, Logger, MethodKind, Params, TransportProtocol}; +use reth_metrics::{ + metrics::{self, Counter, Histogram}, + Metrics, +}; +use std::{net::SocketAddr, time::Instant}; + +/// Metrics for the rpc server +#[derive(Metrics, Clone)] +#[metrics(scope = "rpc_server")] +pub(crate) struct RpcServerMetrics { + /// The number of calls started + calls_started: Counter, + /// The number of successful calls + successful_calls: Counter, + /// The number of failed calls + failed_calls: Counter, + /// The number of requests started + requests_started: Counter, + /// The number of requests finished + requests_finished: Counter, + /// The number of ws sessions opened + ws_session_opened: Counter, + /// The number of ws sessions closed + ws_session_closed: Counter, + /// Latency for a single request/response pair + request_latency: Histogram, + /// Latency for a single call + call_latency: Histogram, +} + +impl Logger for RpcServerMetrics { + type Instant = Instant; + fn on_connect( + &self, + _remote_addr: SocketAddr, + _request: &HttpRequest, + transport: TransportProtocol, + ) { + match transport { + TransportProtocol::Http => {} + TransportProtocol::WebSocket => self.ws_session_opened.increment(1), + } + } + fn on_request(&self, _transport: TransportProtocol) -> Self::Instant { + self.requests_started.increment(1); + Instant::now() + } + fn on_call( + &self, + _method_name: &str, + _params: Params<'_>, + _kind: MethodKind, + _transport: TransportProtocol, + ) { + self.calls_started.increment(1); + } + fn on_result( + &self, + _method_name: &str, + success: bool, + started_at: Self::Instant, + _transport: TransportProtocol, + ) { + // capture call duration + self.call_latency.record(started_at.elapsed().as_millis() as f64); + if !success { + self.failed_calls.increment(1); + } else { + self.successful_calls.increment(1); + } + } + fn on_response(&self, _result: &str, started_at: Self::Instant, _transport: TransportProtocol) { + // capture request latency for this request/response pair + self.request_latency.record(started_at.elapsed().as_millis() as f64); + self.requests_finished.increment(1); + } + fn on_disconnect(&self, _remote_addr: SocketAddr, transport: TransportProtocol) { + match transport { + TransportProtocol::Http => {} + TransportProtocol::WebSocket => self.ws_session_closed.increment(1), + } + } +}