feat(rpc): record call metrics per method (#5338)

This commit is contained in:
Alexey Shekhirin
2023-11-07 19:01:39 +00:00
committed by GitHub
parent fd7896fd7c
commit 47669c5886
4 changed files with 137 additions and 49 deletions

View File

@ -46,7 +46,10 @@ pub struct IpcServer<B = Identity, L = ()> {
service_builder: tower::ServiceBuilder<B>,
}
impl IpcServer {
impl<L> IpcServer<Identity, L>
where
L: Logger,
{
/// Returns the configured [Endpoint]
pub fn endpoint(&self) -> &Endpoint {
&self.endpoint
@ -162,7 +165,7 @@ impl IpcServer {
stop_handle: stop_handle.clone(),
max_subscriptions_per_connection,
conn_id: id,
logger,
logger: logger.clone(),
conn: Arc::new(conn),
bounded_subscriptions: BoundedSubscriptions::new(
max_subscriptions_per_connection,

View File

@ -1342,13 +1342,16 @@ impl RpcServerConfig {
/// Convenience function to do [RpcServerConfig::build] and [RpcServer::start] in one step
pub async fn start(self, modules: TransportRpcModules) -> Result<RpcServerHandle, RpcError> {
self.build().await?.start(modules).await
self.build(&modules).await?.start(modules).await
}
/// Builds the ws and http server(s).
///
/// If both are on the same port, they are combined into one server.
async fn build_ws_http(&mut self) -> Result<WsHttpServer, RpcError> {
async fn build_ws_http(
&mut self,
modules: &TransportRpcModules,
) -> Result<WsHttpServer, RpcError> {
let http_socket_addr = self.http_addr.unwrap_or(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::LOCALHOST,
DEFAULT_HTTP_RPC_PORT,
@ -1358,7 +1361,7 @@ impl RpcServerConfig {
let ws_socket_addr = self
.ws_addr
.unwrap_or(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, DEFAULT_WS_RPC_PORT)));
let metrics = RpcServerMetrics::default();
// If both are configured on the same port, we combine them into one server.
if self.http_addr == self.ws_addr &&
self.http_server_config.is_some() &&
@ -1386,6 +1389,8 @@ impl RpcServerConfig {
// we merge this into one server using the http setup
self.ws_server_config.take();
modules.config.ensure_ws_http_identical()?;
let builder = self.http_server_config.take().expect("is set; qed");
let (server, addr) = WsHttpServerKind::build(
builder,
@ -1393,7 +1398,12 @@ impl RpcServerConfig {
cors,
secret,
ServerKind::WsHttp(http_socket_addr),
metrics.clone(),
modules
.http
.as_ref()
.or(modules.ws.as_ref())
.map(RpcServerMetrics::new)
.unwrap_or_default(),
)
.await?;
return Ok(WsHttpServer {
@ -1417,7 +1427,7 @@ impl RpcServerConfig {
self.ws_cors_domains.take(),
self.jwt_secret.clone(),
ServerKind::WS(ws_socket_addr),
metrics.clone(),
modules.ws.as_ref().map(RpcServerMetrics::new).unwrap_or_default(),
)
.await?;
ws_local_addr = Some(addr);
@ -1432,7 +1442,7 @@ impl RpcServerConfig {
self.http_cors_domains.take(),
self.jwt_secret.clone(),
ServerKind::Http(http_socket_addr),
metrics.clone(),
modules.http.as_ref().map(RpcServerMetrics::new).unwrap_or_default(),
)
.await?;
http_local_addr = Some(addr);
@ -1452,15 +1462,16 @@ impl RpcServerConfig {
/// This consumes the builder and returns a server.
///
/// Note: The server ist not started and does nothing unless polled, See also [RpcServer::start]
pub async fn build(mut self) -> Result<RpcServer, RpcError> {
pub async fn build(mut self, modules: &TransportRpcModules) -> Result<RpcServer, RpcError> {
let mut server = RpcServer::empty();
server.ws_http = self.build_ws_http().await?;
server.ws_http = self.build_ws_http(modules).await?;
if let Some(builder) = self.ipc_server_config {
let metrics = modules.ipc.as_ref().map(RpcServerMetrics::new).unwrap_or_default();
let ipc_path = self
.ipc_endpoint
.unwrap_or_else(|| Endpoint::new(DEFAULT_IPC_ENDPOINT.to_string()));
let ipc = builder.build(ipc_path.path())?;
let ipc = builder.set_logger(metrics).build(ipc_path.path())?;
server.ipc = Some(ipc);
}
@ -1825,7 +1836,7 @@ pub struct RpcServer {
/// Configured ws,http servers
ws_http: WsHttpServer,
/// ipc server
ipc: Option<IpcServer>,
ipc: Option<IpcServer<Identity, RpcServerMetrics>>,
}
// === impl RpcServer ===

View File

@ -1,87 +1,150 @@
use jsonrpsee::{
helpers::MethodResponseResult,
server::logger::{HttpRequest, Logger, MethodKind, Params, TransportProtocol},
RpcModule,
};
use reth_metrics::{
metrics::{Counter, Histogram},
Metrics,
};
use std::{net::SocketAddr, time::Instant};
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Instant};
/// Metrics for the rpc server
#[derive(Metrics, Clone)]
#[metrics(scope = "rpc_server")]
/// Metrics for the RPC server
#[derive(Default, Clone)]
pub(crate) struct RpcServerMetrics {
inner: Arc<RpcServerMetricsInner>,
}
/// Metrics for the RPC server
#[derive(Default, Clone)]
struct RpcServerMetricsInner {
/// Connection metrics per transport type
connection_metrics: ConnectionMetrics,
/// Call metrics per RPC method
call_metrics: HashMap<&'static str, RpcServerCallMetrics>,
}
impl RpcServerMetrics {
pub(crate) fn new(module: &RpcModule<()>) -> Self {
Self {
inner: Arc::new(RpcServerMetricsInner {
connection_metrics: ConnectionMetrics::default(),
call_metrics: HashMap::from_iter(module.method_names().map(|method| {
(method, RpcServerCallMetrics::new_with_labels(&[("method", method)]))
})),
}),
}
}
}
#[derive(Clone)]
struct ConnectionMetrics {
http: RpcServerConnectionMetrics,
ws: RpcServerConnectionMetrics,
}
impl ConnectionMetrics {
fn get_metrics(&self, transport: TransportProtocol) -> &RpcServerConnectionMetrics {
match transport {
TransportProtocol::Http => &self.http,
TransportProtocol::WebSocket => &self.ws,
}
}
}
impl Default for ConnectionMetrics {
fn default() -> Self {
Self {
http: RpcServerConnectionMetrics::new_with_labels(&[("transport", "http")]),
ws: RpcServerConnectionMetrics::new_with_labels(&[("transport", "ws")]),
}
}
}
/// Metrics for the RPC connections
#[derive(Metrics, Clone)]
#[metrics(scope = "rpc_server.connections")]
struct RpcServerConnectionMetrics {
/// The number of connections opened
connections_opened: Counter,
/// The number of connections closed
connections_closed: Counter,
/// The number of requests started
requests_started: Counter,
/// The number of requests finished
requests_finished: Counter,
/// Latency for a single request/response pair
request_latency: Histogram,
}
/// Metrics for the RPC calls
#[derive(Metrics, Clone)]
#[metrics(scope = "rpc_server.calls")]
struct RpcServerCallMetrics {
/// The number of calls started
calls_started: Counter,
/// The number of successful calls
successful_calls: Counter,
/// The number of failed calls
failed_calls: Counter,
/// The number of requests started
requests_started: Counter,
/// The number of requests finished
requests_finished: Counter,
/// The number of ws sessions opened
ws_session_opened: Counter,
/// The number of ws sessions closed
ws_session_closed: Counter,
/// Latency for a single request/response pair
request_latency: Histogram,
/// Latency for a single call
call_latency: Histogram,
}
impl Logger for RpcServerMetrics {
type Instant = Instant;
fn on_connect(
&self,
_remote_addr: SocketAddr,
_request: &HttpRequest,
transport: TransportProtocol,
) {
match transport {
TransportProtocol::Http => {}
TransportProtocol::WebSocket => self.ws_session_opened.increment(1),
}
self.inner.connection_metrics.get_metrics(transport).connections_opened.increment(1)
}
fn on_request(&self, _transport: TransportProtocol) -> Self::Instant {
self.requests_started.increment(1);
fn on_request(&self, transport: TransportProtocol) -> Self::Instant {
self.inner.connection_metrics.get_metrics(transport).requests_started.increment(1);
Instant::now()
}
fn on_call(
&self,
_method_name: &str,
method_name: &str,
_params: Params<'_>,
_kind: MethodKind,
_transport: TransportProtocol,
) {
self.calls_started.increment(1);
let Some(call_metrics) = self.inner.call_metrics.get(method_name) else { return };
call_metrics.calls_started.increment(1);
}
fn on_result(
&self,
_method_name: &str,
method_name: &str,
success: MethodResponseResult,
started_at: Self::Instant,
_transport: TransportProtocol,
) {
// capture call duration
self.call_latency.record(started_at.elapsed().as_millis() as f64);
if success.is_error() {
self.failed_calls.increment(1);
let Some(call_metrics) = self.inner.call_metrics.get(method_name) else { return };
// capture call latency
call_metrics.call_latency.record(started_at.elapsed().as_millis() as f64);
if success.is_success() {
call_metrics.successful_calls.increment(1);
} else {
self.successful_calls.increment(1);
call_metrics.failed_calls.increment(1);
}
}
fn on_response(&self, _result: &str, started_at: Self::Instant, _transport: TransportProtocol) {
fn on_response(&self, _result: &str, started_at: Self::Instant, transport: TransportProtocol) {
let metrics = self.inner.connection_metrics.get_metrics(transport);
// capture request latency for this request/response pair
self.request_latency.record(started_at.elapsed().as_millis() as f64);
self.requests_finished.increment(1);
metrics.request_latency.record(started_at.elapsed().as_millis() as f64);
metrics.requests_finished.increment(1);
}
fn on_disconnect(&self, _remote_addr: SocketAddr, transport: TransportProtocol) {
match transport {
TransportProtocol::Http => {}
TransportProtocol::WebSocket => self.ws_session_closed.increment(1),
}
self.inner.connection_metrics.get_metrics(transport).connections_closed.increment(1)
}
}

View File

@ -52,7 +52,7 @@ use std::{
sync::{mpsc, Arc},
time::{Duration, Instant},
};
use tracing::debug;
use tracing::{debug, warn};
/// A [`DatabaseProvider`] that holds a read-only database transaction.
pub type DatabaseProviderRO<'this, DB> = DatabaseProvider<<DB as DatabaseGAT<'this>>::TX>;
@ -2148,7 +2148,18 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
let start = Instant::now();
self.tx.put::<tables::Transactions>(next_tx_num, transaction.into())?;
transactions_elapsed += start.elapsed();
let elapsed = start.elapsed();
if elapsed > Duration::from_secs(1) {
warn!(
target: "providers::db",
?block_number,
tx_num = %next_tx_num,
hash = %hash,
?elapsed,
"Transaction insertion took too long"
);
}
transactions_elapsed += elapsed;
if prune_modes
.and_then(|modes| modes.transaction_lookup)