diff --git a/Cargo.lock b/Cargo.lock index 4b5ed5ac2..980b6f16d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6265,8 +6265,10 @@ dependencies = [ "async-trait", "jsonrpsee-core", "jsonrpsee-types", + "metrics", "reth-beacon-consensus", "reth-interfaces", + "reth-metrics", "reth-payload-builder", "reth-primitives", "reth-provider", diff --git a/crates/rpc/rpc-engine-api/Cargo.toml b/crates/rpc/rpc-engine-api/Cargo.toml index ebd0bfeb7..ad2e876e6 100644 --- a/crates/rpc/rpc-engine-api/Cargo.toml +++ b/crates/rpc/rpc-engine-api/Cargo.toml @@ -22,6 +22,10 @@ reth-rpc-types-compat.workspace = true # async tokio = { workspace = true, features = ["sync"] } +# metrics +reth-metrics.workspace = true +metrics.workspace = true + # misc async-trait.workspace = true thiserror.workspace = true diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 1e7cea097..5218e1b96 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -1,5 +1,6 @@ use crate::{ - payload::PayloadOrAttributes, EngineApiError, EngineApiMessageVersion, EngineApiResult, + metrics::EngineApiMetrics, payload::PayloadOrAttributes, EngineApiError, + EngineApiMessageVersion, EngineApiResult, }; use async_trait::async_trait; use jsonrpsee_core::RpcResult; @@ -19,7 +20,7 @@ use reth_rpc_types_compat::engine::payload::{ convert_payload_input_v2_to_payload, convert_to_payload_body_v1, }; use reth_tasks::TaskSpawner; -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use tokio::sync::oneshot; use tracing::trace; @@ -46,6 +47,8 @@ struct EngineApiInner { payload_store: PayloadStore, /// For spawning and executing async tasks task_spawner: Box, + /// The metrics for engine api calls + metrics: EngineApiMetrics, } impl EngineApi @@ -66,6 +69,7 @@ where beacon_consensus, payload_store, task_spawner, + metrics: EngineApiMetrics::default(), }); Self { inner } } @@ -591,14 +595,20 @@ where /// Caution: This should not accept the `withdrawals` field async fn new_payload_v1(&self, payload: ExecutionPayloadV1) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_newPayloadV1"); - Ok(EngineApi::new_payload_v1(self, payload).await?) + let start = Instant::now(); + let res = EngineApi::new_payload_v1(self, payload).await; + self.inner.metrics.new_payload_v1.record(start.elapsed()); + Ok(res?) } /// Handler for `engine_newPayloadV2` /// See also async fn new_payload_v2(&self, payload: ExecutionPayloadInputV2) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_newPayloadV2"); - Ok(EngineApi::new_payload_v2(self, payload).await?) + let start = Instant::now(); + let res = EngineApi::new_payload_v2(self, payload).await; + self.inner.metrics.new_payload_v2.record(start.elapsed()); + Ok(res?) } /// Handler for `engine_newPayloadV3` @@ -610,8 +620,12 @@ where parent_beacon_block_root: B256, ) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_newPayloadV3"); - Ok(EngineApi::new_payload_v3(self, payload, versioned_hashes, parent_beacon_block_root) - .await?) + let start = Instant::now(); + let res = + EngineApi::new_payload_v3(self, payload, versioned_hashes, parent_beacon_block_root) + .await; + self.inner.metrics.new_payload_v3.record(start.elapsed()); + Ok(res?) } /// Handler for `engine_forkchoiceUpdatedV1` @@ -624,7 +638,11 @@ where payload_attributes: Option, ) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV1"); - Ok(EngineApi::fork_choice_updated_v1(self, fork_choice_state, payload_attributes).await?) + let start = Instant::now(); + let res = + EngineApi::fork_choice_updated_v1(self, fork_choice_state, payload_attributes).await; + self.inner.metrics.fork_choice_updated_v1.record(start.elapsed()); + Ok(res?) } /// Handler for `engine_forkchoiceUpdatedV2` @@ -635,7 +653,11 @@ where payload_attributes: Option, ) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV2"); - Ok(EngineApi::fork_choice_updated_v2(self, fork_choice_state, payload_attributes).await?) + let start = Instant::now(); + let res = + EngineApi::fork_choice_updated_v2(self, fork_choice_state, payload_attributes).await; + self.inner.metrics.fork_choice_updated_v2.record(start.elapsed()); + Ok(res?) } /// Handler for `engine_forkchoiceUpdatedV2` @@ -647,7 +669,11 @@ where payload_attributes: Option, ) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_forkchoiceUpdatedV3"); - Ok(EngineApi::fork_choice_updated_v3(self, fork_choice_state, payload_attributes).await?) + let start = Instant::now(); + let res = + EngineApi::fork_choice_updated_v3(self, fork_choice_state, payload_attributes).await; + self.inner.metrics.fork_choice_updated_v3.record(start.elapsed()); + Ok(res?) } /// Handler for `engine_getPayloadV1` @@ -663,7 +689,10 @@ where /// > Provider software MAY stop the corresponding build process after serving this call. async fn get_payload_v1(&self, payload_id: PayloadId) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_getPayloadV1"); - Ok(EngineApi::get_payload_v1(self, payload_id).await?) + let start = Instant::now(); + let res = EngineApi::get_payload_v1(self, payload_id).await; + self.inner.metrics.get_payload_v1.record(start.elapsed()); + Ok(res?) } /// Handler for `engine_getPayloadV2` @@ -677,7 +706,10 @@ where /// > Provider software MAY stop the corresponding build process after serving this call. async fn get_payload_v2(&self, payload_id: PayloadId) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_getPayloadV2"); - Ok(EngineApi::get_payload_v2(self, payload_id).await?) + let start = Instant::now(); + let res = EngineApi::get_payload_v2(self, payload_id).await; + self.inner.metrics.get_payload_v2.record(start.elapsed()); + Ok(res?) } /// Handler for `engine_getPayloadV3` @@ -691,7 +723,10 @@ where /// > Provider software MAY stop the corresponding build process after serving this call. async fn get_payload_v3(&self, payload_id: PayloadId) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_getPayloadV3"); - Ok(EngineApi::get_payload_v3(self, payload_id).await?) + let start = Instant::now(); + let res = EngineApi::get_payload_v3(self, payload_id).await; + self.inner.metrics.get_payload_v3.record(start.elapsed()); + Ok(res?) } /// Handler for `engine_getPayloadBodiesByHashV1` @@ -701,7 +736,10 @@ where block_hashes: Vec, ) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1"); - Ok(EngineApi::get_payload_bodies_by_hash(self, block_hashes)?) + let start = Instant::now(); + let res = EngineApi::get_payload_bodies_by_hash(self, block_hashes); + self.inner.metrics.get_payload_bodies_by_hash_v1.record(start.elapsed()); + Ok(res?) } /// Handler for `engine_getPayloadBodiesByRangeV1` @@ -726,7 +764,10 @@ where count: U64, ) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1"); - Ok(EngineApi::get_payload_bodies_by_range(self, start.to(), count.to()).await?) + let start_time = Instant::now(); + let res = EngineApi::get_payload_bodies_by_range(self, start.to(), count.to()).await; + self.inner.metrics.get_payload_bodies_by_range_v1.record(start_time.elapsed()); + Ok(res?) } /// Handler for `engine_exchangeTransitionConfigurationV1` @@ -736,7 +777,10 @@ where config: TransitionConfiguration, ) -> RpcResult { trace!(target: "rpc::engine", "Serving engine_exchangeTransitionConfigurationV1"); - Ok(EngineApi::exchange_transition_configuration(self, config).await?) + let start = Instant::now(); + let res = EngineApi::exchange_transition_configuration(self, config).await; + self.inner.metrics.exchange_transition_configuration.record(start.elapsed()); + Ok(res?) } /// Handler for `engine_exchangeCapabilitiesV1` diff --git a/crates/rpc/rpc-engine-api/src/lib.rs b/crates/rpc/rpc-engine-api/src/lib.rs index c75a34021..d39025d50 100644 --- a/crates/rpc/rpc-engine-api/src/lib.rs +++ b/crates/rpc/rpc-engine-api/src/lib.rs @@ -22,6 +22,9 @@ mod payload; /// Engine API error. mod error; +/// Engine API metrics. +mod metrics; + pub use engine_api::{EngineApi, EngineApiSender}; pub use error::*; pub use message::EngineApiMessageVersion; diff --git a/crates/rpc/rpc-engine-api/src/metrics.rs b/crates/rpc/rpc-engine-api/src/metrics.rs new file mode 100644 index 000000000..9df8ff790 --- /dev/null +++ b/crates/rpc/rpc-engine-api/src/metrics.rs @@ -0,0 +1,32 @@ +use metrics::Histogram; +use reth_metrics::Metrics; + +/// Beacon consensus engine metrics. +#[derive(Metrics)] +#[metrics(scope = "engine.rpc")] +pub(crate) struct EngineApiMetrics { + /// Latency for `engine_newPayloadV1` + pub(crate) new_payload_v1: Histogram, + /// Latency for `engine_newPayloadV2` + pub(crate) new_payload_v2: Histogram, + /// Latency for `engine_newPayloadV3` + pub(crate) new_payload_v3: Histogram, + /// Latency for `engine_forkchoiceUpdatedV1` + pub(crate) fork_choice_updated_v1: Histogram, + /// Latency for `engine_forkchoiceUpdatedV2` + pub(crate) fork_choice_updated_v2: Histogram, + /// Latency for `engine_forkchoiceUpdatedV3` + pub(crate) fork_choice_updated_v3: Histogram, + /// Latency for `engine_getPayloadV1` + pub(crate) get_payload_v1: Histogram, + /// Latency for `engine_getPayloadV2` + pub(crate) get_payload_v2: Histogram, + /// Latency for `engine_getPayloadV3` + pub(crate) get_payload_v3: Histogram, + /// Latency for `engine_getPayloadBodiesByRangeV1` + pub(crate) get_payload_bodies_by_range_v1: Histogram, + /// Latency for `engine_getPayloadBodiesByHashV1` + pub(crate) get_payload_bodies_by_hash_v1: Histogram, + /// Latency for `engine_exchangeTransitionConfigurationV1` + pub(crate) exchange_transition_configuration: Histogram, +}