feat(rpc-engine-api): add engine API response type metrics (#7801)

This commit is contained in:
Dan Cline
2024-04-22 12:53:39 -04:00
committed by GitHub
parent 9a1d6ea9ca
commit 9179e4e8ac
2 changed files with 116 additions and 16 deletions

View File

@ -47,7 +47,7 @@ struct EngineApiInner<Provider, EngineT: EngineTypes> {
payload_store: PayloadStore<EngineT>,
/// For spawning and executing async tasks
task_spawner: Box<dyn TaskSpawner>,
/// The metrics for engine api calls
/// The latency and response type metrics for engine api calls
metrics: EngineApiMetrics,
}
@ -491,7 +491,8 @@ where
trace!(target: "rpc::engine", "Serving engine_newPayloadV1");
let start = Instant::now();
let res = EngineApi::new_payload_v1(self, payload).await;
self.inner.metrics.new_payload_v1.record(start.elapsed());
self.inner.metrics.latency.new_payload_v1.record(start.elapsed());
self.inner.metrics.new_payload_response.update_response_metrics(&res);
Ok(res?)
}
@ -501,7 +502,8 @@ where
trace!(target: "rpc::engine", "Serving engine_newPayloadV2");
let start = Instant::now();
let res = EngineApi::new_payload_v2(self, payload).await;
self.inner.metrics.new_payload_v2.record(start.elapsed());
self.inner.metrics.latency.new_payload_v2.record(start.elapsed());
self.inner.metrics.new_payload_response.update_response_metrics(&res);
Ok(res?)
}
@ -518,7 +520,8 @@ where
let res =
EngineApi::new_payload_v3(self, payload, versioned_hashes, parent_beacon_block_root)
.await;
self.inner.metrics.new_payload_v3.record(start.elapsed());
self.inner.metrics.latency.new_payload_v3.record(start.elapsed());
self.inner.metrics.new_payload_response.update_response_metrics(&res);
Ok(res?)
}
@ -535,7 +538,8 @@ where
let start = Instant::now();
let res =
EngineApi::fork_choice_updated_v1(self, fork_choice_state, payload_attributes).await;
self.inner.metrics.fork_choice_updated_v1.record(start.elapsed());
self.inner.metrics.latency.fork_choice_updated_v1.record(start.elapsed());
self.inner.metrics.fcu_response.update_response_metrics(&res);
Ok(res?)
}
@ -550,7 +554,8 @@ where
let start = Instant::now();
let res =
EngineApi::fork_choice_updated_v2(self, fork_choice_state, payload_attributes).await;
self.inner.metrics.fork_choice_updated_v2.record(start.elapsed());
self.inner.metrics.latency.fork_choice_updated_v2.record(start.elapsed());
self.inner.metrics.fcu_response.update_response_metrics(&res);
Ok(res?)
}
@ -566,7 +571,8 @@ where
let start = Instant::now();
let res =
EngineApi::fork_choice_updated_v3(self, fork_choice_state, payload_attributes).await;
self.inner.metrics.fork_choice_updated_v3.record(start.elapsed());
self.inner.metrics.latency.fork_choice_updated_v3.record(start.elapsed());
self.inner.metrics.fcu_response.update_response_metrics(&res);
Ok(res?)
}
@ -588,7 +594,7 @@ where
trace!(target: "rpc::engine", "Serving engine_getPayloadV1");
let start = Instant::now();
let res = EngineApi::get_payload_v1(self, payload_id).await;
self.inner.metrics.get_payload_v1.record(start.elapsed());
self.inner.metrics.latency.get_payload_v1.record(start.elapsed());
Ok(res?)
}
@ -608,7 +614,7 @@ where
trace!(target: "rpc::engine", "Serving engine_getPayloadV2");
let start = Instant::now();
let res = EngineApi::get_payload_v2(self, payload_id).await;
self.inner.metrics.get_payload_v2.record(start.elapsed());
self.inner.metrics.latency.get_payload_v2.record(start.elapsed());
Ok(res?)
}
@ -628,7 +634,7 @@ where
trace!(target: "rpc::engine", "Serving engine_getPayloadV3");
let start = Instant::now();
let res = EngineApi::get_payload_v3(self, payload_id).await;
self.inner.metrics.get_payload_v3.record(start.elapsed());
self.inner.metrics.latency.get_payload_v3.record(start.elapsed());
Ok(res?)
}
@ -641,7 +647,7 @@ where
trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByHashV1");
let start = Instant::now();
let res = EngineApi::get_payload_bodies_by_hash(self, block_hashes);
self.inner.metrics.get_payload_bodies_by_hash_v1.record(start.elapsed());
self.inner.metrics.latency.get_payload_bodies_by_hash_v1.record(start.elapsed());
Ok(res?)
}
@ -669,7 +675,7 @@ where
trace!(target: "rpc::engine", "Serving engine_getPayloadBodiesByRangeV1");
let start_time = Instant::now();
let res = EngineApi::get_payload_bodies_by_range(self, start.to(), count.to()).await;
self.inner.metrics.get_payload_bodies_by_range_v1.record(start_time.elapsed());
self.inner.metrics.latency.get_payload_bodies_by_range_v1.record(start_time.elapsed());
Ok(res?)
}
@ -682,7 +688,7 @@ where
trace!(target: "rpc::engine", "Serving engine_exchangeTransitionConfigurationV1");
let start = Instant::now();
let res = EngineApi::exchange_transition_configuration(self, config).await;
self.inner.metrics.exchange_transition_configuration.record(start.elapsed());
self.inner.metrics.latency.exchange_transition_configuration.record(start.elapsed());
Ok(res?)
}

View File

@ -1,10 +1,23 @@
use metrics::Histogram;
use crate::EngineApiError;
use metrics::{Counter, Histogram};
use reth_metrics::Metrics;
use reth_rpc_types::engine::{ForkchoiceUpdated, PayloadStatus, PayloadStatusEnum};
/// Beacon consensus engine metrics.
/// All beacon consensus engine metrics
#[derive(Default)]
pub(crate) struct EngineApiMetrics {
/// Engine API latency metrics
pub(crate) latency: EngineApiLatencyMetrics,
/// Engine API forkchoiceUpdated response type metrics
pub(crate) fcu_response: ForkchoiceUpdatedResponseMetrics,
/// Engine API newPayload response type metrics
pub(crate) new_payload_response: NewPayloadStatusResponseMetrics,
}
/// Beacon consensus engine latency metrics.
#[derive(Metrics)]
#[metrics(scope = "engine.rpc")]
pub(crate) struct EngineApiMetrics {
pub(crate) struct EngineApiLatencyMetrics {
/// Latency for `engine_newPayloadV1`
pub(crate) new_payload_v1: Histogram,
/// Latency for `engine_newPayloadV2`
@ -30,3 +43,84 @@ pub(crate) struct EngineApiMetrics {
/// Latency for `engine_exchangeTransitionConfigurationV1`
pub(crate) exchange_transition_configuration: Histogram,
}
/// Metrics for engine API forkchoiceUpdated responses.
#[derive(Metrics)]
#[metrics(scope = "engine.rpc")]
pub(crate) struct ForkchoiceUpdatedResponseMetrics {
/// The total count of forkchoice updated messages received.
pub(crate) forkchoice_updated_messages: Counter,
/// The total count of forkchoice updated messages that we responded to with
/// [Invalid](reth_rpc_types::engine::PayloadStatusEnum#Invalid).
pub(crate) forkchoice_updated_invalid: Counter,
/// The total count of forkchoice updated messages that we responded to with
/// [Valid](reth_rpc_types::engine::PayloadStatusEnum#Valid).
pub(crate) forkchoice_updated_valid: Counter,
/// The total count of forkchoice updated messages that we responded to with
/// [Syncing](reth_rpc_types::engine::PayloadStatusEnum#Syncing).
pub(crate) forkchoice_updated_syncing: Counter,
/// The total count of forkchoice updated messages that we responded to with
/// [Accepted](reth_rpc_types::engine::PayloadStatusEnum#Accepted).
pub(crate) forkchoice_updated_accepted: Counter,
/// The total count of forkchoice updated messages that were unsuccessful, i.e. we responded
/// with an error type that is not a [PayloadStatusEnum].
pub(crate) forkchoice_updated_error: Counter,
}
/// Metrics for engine API newPayload responses.
#[derive(Metrics)]
#[metrics(scope = "engine.rpc")]
pub(crate) struct NewPayloadStatusResponseMetrics {
/// The total count of new payload messages received.
pub(crate) new_payload_messages: Counter,
/// The total count of new payload messages that we responded to with
/// [Invalid](reth_rpc_types::engine::PayloadStatusEnum#Invalid).
pub(crate) new_payload_invalid: Counter,
/// The total count of new payload messages that we responded to with
/// [Valid](reth_rpc_types::engine::PayloadStatusEnum#Valid).
pub(crate) new_payload_valid: Counter,
/// The total count of new payload messages that we responded to with
/// [Syncing](reth_rpc_types::engine::PayloadStatusEnum#Syncing).
pub(crate) new_payload_syncing: Counter,
/// The total count of new payload messages that we responded to with
/// [Accepted](reth_rpc_types::engine::PayloadStatusEnum#Accepted).
pub(crate) new_payload_accepted: Counter,
/// The total count of new payload messages that were unsuccessful, i.e. we responded with an
/// error type that is not a [PayloadStatusEnum].
pub(crate) new_payload_error: Counter,
}
impl NewPayloadStatusResponseMetrics {
/// Increment the newPayload counter based on the given rpc result
pub(crate) fn update_response_metrics(&self, result: &Result<PayloadStatus, EngineApiError>) {
match result {
Ok(status) => match status.status {
PayloadStatusEnum::Valid => self.new_payload_valid.increment(1),
PayloadStatusEnum::Syncing => self.new_payload_syncing.increment(1),
PayloadStatusEnum::Accepted => self.new_payload_accepted.increment(1),
PayloadStatusEnum::Invalid { .. } => self.new_payload_invalid.increment(1),
},
Err(_) => self.new_payload_error.increment(1),
}
self.new_payload_messages.increment(1);
}
}
impl ForkchoiceUpdatedResponseMetrics {
/// Increment the forkchoiceUpdated counter based on the given rpc result
pub(crate) fn update_response_metrics(
&self,
result: &Result<ForkchoiceUpdated, EngineApiError>,
) {
match result {
Ok(status) => match status.payload_status.status {
PayloadStatusEnum::Valid => self.forkchoice_updated_valid.increment(1),
PayloadStatusEnum::Syncing => self.forkchoice_updated_syncing.increment(1),
PayloadStatusEnum::Accepted => self.forkchoice_updated_accepted.increment(1),
PayloadStatusEnum::Invalid { .. } => self.forkchoice_updated_invalid.increment(1),
},
Err(_) => self.forkchoice_updated_error.increment(1),
}
self.forkchoice_updated_messages.increment(1);
}
}