From 9179e4e8ac1c19442b094f0837b6106f9060783f Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Mon, 22 Apr 2024 12:53:39 -0400 Subject: [PATCH] feat(rpc-engine-api): add engine API response type metrics (#7801) --- crates/rpc/rpc-engine-api/src/engine_api.rs | 32 ++++--- crates/rpc/rpc-engine-api/src/metrics.rs | 100 +++++++++++++++++++- 2 files changed, 116 insertions(+), 16 deletions(-) diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index d84b6ed22..7fc52b21c 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -47,7 +47,7 @@ struct EngineApiInner { payload_store: PayloadStore, /// For spawning and executing async tasks task_spawner: Box, - /// The metrics for engine api calls + /// The latency and response type metrics for engine api calls metrics: EngineApiMetrics, } @@ -491,7 +491,8 @@ where trace!(target: "rpc::engine", "Serving engine_newPayloadV1"); let start = Instant::now(); let res = EngineApi::new_payload_v1(self, payload).await; - self.inner.metrics.new_payload_v1.record(start.elapsed()); + self.inner.metrics.latency.new_payload_v1.record(start.elapsed()); + self.inner.metrics.new_payload_response.update_response_metrics(&res); Ok(res?) } @@ -501,7 +502,8 @@ where trace!(target: "rpc::engine", "Serving engine_newPayloadV2"); let start = Instant::now(); let res = EngineApi::new_payload_v2(self, payload).await; - self.inner.metrics.new_payload_v2.record(start.elapsed()); + self.inner.metrics.latency.new_payload_v2.record(start.elapsed()); + self.inner.metrics.new_payload_response.update_response_metrics(&res); Ok(res?) } @@ -518,7 +520,8 @@ where let res = EngineApi::new_payload_v3(self, payload, versioned_hashes, parent_beacon_block_root) .await; - self.inner.metrics.new_payload_v3.record(start.elapsed()); + self.inner.metrics.latency.new_payload_v3.record(start.elapsed()); + self.inner.metrics.new_payload_response.update_response_metrics(&res); Ok(res?) } @@ -535,7 +538,8 @@ where let start = Instant::now(); let res = EngineApi::fork_choice_updated_v1(self, fork_choice_state, payload_attributes).await; - self.inner.metrics.fork_choice_updated_v1.record(start.elapsed()); + self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed()); + self.inner.metrics.fcu_response.update_response_metrics(&res); Ok(res?) } @@ -550,7 +554,8 @@ where let start = Instant::now(); let res = EngineApi::fork_choice_updated_v2(self, fork_choice_state, payload_attributes).await; - self.inner.metrics.fork_choice_updated_v2.record(start.elapsed()); + self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed()); + self.inner.metrics.fcu_response.update_response_metrics(&res); Ok(res?) } @@ -566,7 +571,8 @@ where let start = Instant::now(); let res = EngineApi::fork_choice_updated_v3(self, fork_choice_state, payload_attributes).await; - self.inner.metrics.fork_choice_updated_v3.record(start.elapsed()); + self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed()); + self.inner.metrics.fcu_response.update_response_metrics(&res); Ok(res?) } @@ -588,7 +594,7 @@ where trace!(target: "rpc::engine", "Serving engine_getPayloadV1"); let start = Instant::now(); let res = EngineApi::get_payload_v1(self, payload_id).await; - self.inner.metrics.get_payload_v1.record(start.elapsed()); + self.inner.metrics.latency.get_payload_v1.record(start.elapsed()); Ok(res?) } @@ -608,7 +614,7 @@ where trace!(target: "rpc::engine", "Serving engine_getPayloadV2"); let start = Instant::now(); let res = EngineApi::get_payload_v2(self, payload_id).await; - self.inner.metrics.get_payload_v2.record(start.elapsed()); + self.inner.metrics.latency.get_payload_v2.record(start.elapsed()); Ok(res?) } @@ -628,7 +634,7 @@ where trace!(target: "rpc::engine", "Serving engine_getPayloadV3"); let start = Instant::now(); let res = EngineApi::get_payload_v3(self, payload_id).await; - self.inner.metrics.get_payload_v3.record(start.elapsed()); + self.inner.metrics.latency.get_payload_v3.record(start.elapsed()); Ok(res?) } @@ -641,7 +647,7 @@ where trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1"); let start = Instant::now(); let res = EngineApi::get_payload_bodies_by_hash(self, block_hashes); - self.inner.metrics.get_payload_bodies_by_hash_v1.record(start.elapsed()); + self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed()); Ok(res?) } @@ -669,7 +675,7 @@ where trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1"); let start_time = Instant::now(); let res = EngineApi::get_payload_bodies_by_range(self, start.to(), count.to()).await; - self.inner.metrics.get_payload_bodies_by_range_v1.record(start_time.elapsed()); + self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed()); Ok(res?) } @@ -682,7 +688,7 @@ where trace!(target: "rpc::engine", "Serving engine_exchangeTransitionConfigurationV1"); let start = Instant::now(); let res = EngineApi::exchange_transition_configuration(self, config).await; - self.inner.metrics.exchange_transition_configuration.record(start.elapsed()); + self.inner.metrics.latency.exchange_transition_configuration.record(start.elapsed()); Ok(res?) } diff --git a/crates/rpc/rpc-engine-api/src/metrics.rs b/crates/rpc/rpc-engine-api/src/metrics.rs index 9df8ff790..d63611f7d 100644 --- a/crates/rpc/rpc-engine-api/src/metrics.rs +++ b/crates/rpc/rpc-engine-api/src/metrics.rs @@ -1,10 +1,23 @@ -use metrics::Histogram; +use crate::EngineApiError; +use metrics::{Counter, Histogram}; use reth_metrics::Metrics; +use reth_rpc_types::engine::{ForkchoiceUpdated, PayloadStatus, PayloadStatusEnum}; -/// Beacon consensus engine metrics. +/// All beacon consensus engine metrics +#[derive(Default)] +pub(crate) struct EngineApiMetrics { + /// Engine API latency metrics + pub(crate) latency: EngineApiLatencyMetrics, + /// Engine API forkchoiceUpdated response type metrics + pub(crate) fcu_response: ForkchoiceUpdatedResponseMetrics, + /// Engine API newPayload response type metrics + pub(crate) new_payload_response: NewPayloadStatusResponseMetrics, +} + +/// Beacon consensus engine latency metrics. #[derive(Metrics)] #[metrics(scope = "engine.rpc")] -pub(crate) struct EngineApiMetrics { +pub(crate) struct EngineApiLatencyMetrics { /// Latency for `engine_newPayloadV1` pub(crate) new_payload_v1: Histogram, /// Latency for `engine_newPayloadV2` @@ -30,3 +43,84 @@ pub(crate) struct EngineApiMetrics { /// Latency for `engine_exchangeTransitionConfigurationV1` pub(crate) exchange_transition_configuration: Histogram, } + +/// Metrics for engine API forkchoiceUpdated responses. +#[derive(Metrics)] +#[metrics(scope = "engine.rpc")] +pub(crate) struct ForkchoiceUpdatedResponseMetrics { + /// The total count of forkchoice updated messages received. + pub(crate) forkchoice_updated_messages: Counter, + /// The total count of forkchoice updated messages that we responded to with + /// [Invalid](reth_rpc_types::engine::PayloadStatusEnum#Invalid). + pub(crate) forkchoice_updated_invalid: Counter, + /// The total count of forkchoice updated messages that we responded to with + /// [Valid](reth_rpc_types::engine::PayloadStatusEnum#Valid). + pub(crate) forkchoice_updated_valid: Counter, + /// The total count of forkchoice updated messages that we responded to with + /// [Syncing](reth_rpc_types::engine::PayloadStatusEnum#Syncing). + pub(crate) forkchoice_updated_syncing: Counter, + /// The total count of forkchoice updated messages that we responded to with + /// [Accepted](reth_rpc_types::engine::PayloadStatusEnum#Accepted). + pub(crate) forkchoice_updated_accepted: Counter, + /// The total count of forkchoice updated messages that were unsuccessful, i.e. we responded + /// with an error type that is not a [PayloadStatusEnum]. + pub(crate) forkchoice_updated_error: Counter, +} + +/// Metrics for engine API newPayload responses. +#[derive(Metrics)] +#[metrics(scope = "engine.rpc")] +pub(crate) struct NewPayloadStatusResponseMetrics { + /// The total count of new payload messages received. + pub(crate) new_payload_messages: Counter, + /// The total count of new payload messages that we responded to with + /// [Invalid](reth_rpc_types::engine::PayloadStatusEnum#Invalid). + pub(crate) new_payload_invalid: Counter, + /// The total count of new payload messages that we responded to with + /// [Valid](reth_rpc_types::engine::PayloadStatusEnum#Valid). + pub(crate) new_payload_valid: Counter, + /// The total count of new payload messages that we responded to with + /// [Syncing](reth_rpc_types::engine::PayloadStatusEnum#Syncing). + pub(crate) new_payload_syncing: Counter, + /// The total count of new payload messages that we responded to with + /// [Accepted](reth_rpc_types::engine::PayloadStatusEnum#Accepted). + pub(crate) new_payload_accepted: Counter, + /// The total count of new payload messages that were unsuccessful, i.e. we responded with an + /// error type that is not a [PayloadStatusEnum]. + pub(crate) new_payload_error: Counter, +} + +impl NewPayloadStatusResponseMetrics { + /// Increment the newPayload counter based on the given rpc result + pub(crate) fn update_response_metrics(&self, result: &Result) { + match result { + Ok(status) => match status.status { + PayloadStatusEnum::Valid => self.new_payload_valid.increment(1), + PayloadStatusEnum::Syncing => self.new_payload_syncing.increment(1), + PayloadStatusEnum::Accepted => self.new_payload_accepted.increment(1), + PayloadStatusEnum::Invalid { .. } => self.new_payload_invalid.increment(1), + }, + Err(_) => self.new_payload_error.increment(1), + } + self.new_payload_messages.increment(1); + } +} + +impl ForkchoiceUpdatedResponseMetrics { + /// Increment the forkchoiceUpdated counter based on the given rpc result + pub(crate) fn update_response_metrics( + &self, + result: &Result, + ) { + match result { + Ok(status) => match status.payload_status.status { + PayloadStatusEnum::Valid => self.forkchoice_updated_valid.increment(1), + PayloadStatusEnum::Syncing => self.forkchoice_updated_syncing.increment(1), + PayloadStatusEnum::Accepted => self.forkchoice_updated_accepted.increment(1), + PayloadStatusEnum::Invalid { .. } => self.forkchoice_updated_invalid.increment(1), + }, + Err(_) => self.forkchoice_updated_error.increment(1), + } + self.forkchoice_updated_messages.increment(1); + } +}