rpc server metrics impl (#3913)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
prames
2023-07-26 09:17:36 -04:00
committed by GitHub
parent d5ea16815c
commit caa26833a5
4 changed files with 99 additions and 4 deletions

1
Cargo.lock generated
View File

@ -5746,6 +5746,7 @@ dependencies = [
"reth-beacon-consensus",
"reth-interfaces",
"reth-ipc",
"reth-metrics",
"reth-network-api",
"reth-payload-builder",
"reth-primitives",

View File

@ -21,6 +21,7 @@ reth-rpc-engine-api = { path = "../rpc-engine-api" }
reth-rpc-types = { workspace = true }
reth-tasks = { workspace = true }
reth-transaction-pool = { workspace = true }
reth-metrics = { workspace = true, features = ["common"] }
# rpc/net
jsonrpsee = { version = "0.18", features = ["server"] }

View File

@ -103,7 +103,7 @@
//! }
//! ```
use crate::{auth::AuthRpcModule, error::WsHttpSamePortError};
use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcServerMetrics};
use constants::*;
use error::{RpcError, ServerKind};
use jsonrpsee::{
@ -157,6 +157,9 @@ pub mod constants;
/// Additional support for tracing related rpc calls
pub mod tracing_pool;
// Rpc server metrics
mod metrics;
// re-export for convenience
pub use crate::eth::{EthConfig, EthHandlers};
pub use jsonrpsee::server::ServerBuilder;
@ -1232,7 +1235,7 @@ impl RpcServerConfig {
let ws_socket_addr = self
.ws_addr
.unwrap_or(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_WS_RPC_PORT)));
let metrics = RpcServerMetrics::default();
// If both are configured on the same port, we combine them into one server.
if self.http_addr == self.ws_addr &&
self.http_server_config.is_some() &&
@ -1264,6 +1267,7 @@ impl RpcServerConfig {
http_socket_addr,
cors,
ServerKind::WsHttp(http_socket_addr),
metrics.clone(),
)
.await?;
return Ok(WsHttpServer {
@ -1285,6 +1289,7 @@ impl RpcServerConfig {
ws_socket_addr,
self.ws_cors_domains.take(),
ServerKind::WS(ws_socket_addr),
metrics.clone(),
)
.await?;
ws_local_addr = Some(addr);
@ -1298,6 +1303,7 @@ impl RpcServerConfig {
http_socket_addr,
self.http_cors_domains.take(),
ServerKind::Http(http_socket_addr),
metrics.clone(),
)
.await?;
http_local_addr = Some(addr);
@ -1529,9 +1535,9 @@ impl Default for WsHttpServers {
/// Http Servers Enum
enum WsHttpServerKind {
/// Http server
Plain(Server),
Plain(Server<Identity, RpcServerMetrics>),
/// Http server with cors
WithCors(Server<Stack<CorsLayer, Identity>>),
WithCors(Server<Stack<CorsLayer, Identity>, RpcServerMetrics>),
}
// === impl WsHttpServerKind ===
@ -1551,12 +1557,14 @@ impl WsHttpServerKind {
socket_addr: SocketAddr,
cors_domains: Option<String>,
server_kind: ServerKind,
metrics: RpcServerMetrics,
) -> Result<(Self, SocketAddr), RpcError> {
if let Some(cors) = cors_domains.as_deref().map(cors::create_cors_layer) {
let cors = cors.map_err(|err| RpcError::Custom(err.to_string()))?;
let middleware = tower::ServiceBuilder::new().layer(cors);
let server = builder
.set_middleware(middleware)
.set_logger(metrics)
.build(socket_addr)
.await
.map_err(|err| RpcError::from_jsonrpsee_error(err, server_kind))?;
@ -1565,6 +1573,7 @@ impl WsHttpServerKind {
Ok((server, local_addr))
} else {
let server = builder
.set_logger(metrics)
.build(socket_addr)
.await
.map_err(|err| RpcError::from_jsonrpsee_error(err, server_kind))?;

View File

@ -0,0 +1,84 @@
use jsonrpsee::server::logger::{HttpRequest, Logger, MethodKind, Params, TransportProtocol};
use reth_metrics::{
metrics::{self, Counter, Histogram},
Metrics,
};
use std::{net::SocketAddr, time::Instant};
/// Metrics for the rpc server
#[derive(Metrics, Clone)]
#[metrics(scope = "rpc_server")]
pub(crate) struct RpcServerMetrics {
/// The number of calls started
calls_started: Counter,
/// The number of successful calls
successful_calls: Counter,
/// The number of failed calls
failed_calls: Counter,
/// The number of requests started
requests_started: Counter,
/// The number of requests finished
requests_finished: Counter,
/// The number of ws sessions opened
ws_session_opened: Counter,
/// The number of ws sessions closed
ws_session_closed: Counter,
/// Latency for a single request/response pair
request_latency: Histogram,
/// Latency for a single call
call_latency: Histogram,
}
impl Logger for RpcServerMetrics {
type Instant = Instant;
fn on_connect(
&self,
_remote_addr: SocketAddr,
_request: &HttpRequest,
transport: TransportProtocol,
) {
match transport {
TransportProtocol::Http => {}
TransportProtocol::WebSocket => self.ws_session_opened.increment(1),
}
}
fn on_request(&self, _transport: TransportProtocol) -> Self::Instant {
self.requests_started.increment(1);
Instant::now()
}
fn on_call(
&self,
_method_name: &str,
_params: Params<'_>,
_kind: MethodKind,
_transport: TransportProtocol,
) {
self.calls_started.increment(1);
}
fn on_result(
&self,
_method_name: &str,
success: bool,
started_at: Self::Instant,
_transport: TransportProtocol,
) {
// capture call duration
self.call_latency.record(started_at.elapsed().as_millis() as f64);
if !success {
self.failed_calls.increment(1);
} else {
self.successful_calls.increment(1);
}
}
fn on_response(&self, _result: &str, started_at: Self::Instant, _transport: TransportProtocol) {
// capture request latency for this request/response pair
self.request_latency.record(started_at.elapsed().as_millis() as f64);
self.requests_finished.increment(1);
}
fn on_disconnect(&self, _remote_addr: SocketAddr, transport: TransportProtocol) {
match transport {
TransportProtocol::Http => {}
TransportProtocol::WebSocket => self.ws_session_closed.increment(1),
}
}
}