refactor: move exec metrics into executor (#10426)

This commit is contained in:
Oliver
2024-09-04 03:30:12 +02:00
committed by GitHub
parent fd8ca17076
commit d659cd36d2
12 changed files with 121 additions and 133 deletions

2
Cargo.lock generated
View File

@ -7089,10 +7089,12 @@ dependencies = [
"alloy-eips",
"auto_impl",
"futures-util",
"metrics",
"parking_lot 0.12.3",
"reth-chainspec",
"reth-execution-errors",
"reth-execution-types",
"reth-metrics",
"reth-primitives",
"reth-prune-types",
"reth-storage-errors",

View File

@ -98,7 +98,7 @@ impl AppendableChain {
block_validation_kind,
)?;
Ok(Self { chain: Chain::new(vec![block], bundle_state, trie_updates) })
Ok(Self::new(Chain::new(vec![block], bundle_state, trie_updates)))
}
/// Create a new chain that forks off of an existing sidechain.
@ -155,7 +155,7 @@ impl AppendableChain {
execution_outcome.set_first_block(block.number);
// If all is okay, return new chain back. Present chain is not modified.
Ok(Self { chain: Chain::from_block(block, execution_outcome, None) })
Ok(Self::new(Chain::from_block(block, execution_outcome, None)))
}
/// Validate and execute the given block that _extends the canonical chain_, validating its

View File

@ -1,12 +1,22 @@
use reth_evm::metrics::ExecutorMetrics;
use reth_metrics::{
metrics::{Counter, Gauge, Histogram},
Metrics,
};
/// Metrics for the `EngineApi`.
#[derive(Debug, Default)]
pub(crate) struct EngineApiMetrics {
/// Engine API-specific metrics.
pub(crate) engine: EngineMetrics,
/// Block executor metrics.
pub(crate) executor: ExecutorMetrics,
}
/// Metrics for the `EngineApi`.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
pub(crate) struct EngineApiMetrics {
pub(crate) struct EngineMetrics {
/// How many executed blocks are currently stored.
pub(crate) executed_blocks: Gauge,
/// The number of times the pipeline was run.

View File

@ -737,7 +737,7 @@ where
cancun_fields: Option<CancunPayloadFields>,
) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
trace!(target: "engine", "invoked new payload");
self.metrics.new_payload_messages.increment(1);
self.metrics.engine.new_payload_messages.increment(1);
// Ensures that the given payload does not violate any consensus rules that concern the
// block's layout, like:
@ -859,7 +859,7 @@ where
attrs: Option<T::PayloadAttributes>,
) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
trace!(target: "engine", ?attrs, "invoked forkchoice update");
self.metrics.forkchoice_updated_messages.increment(1);
self.metrics.engine.forkchoice_updated_messages.increment(1);
self.canonical_in_memory_state.on_forkchoice_update_received();
if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
@ -1028,7 +1028,7 @@ where
// Check if persistence has complete
match rx.try_recv() {
Ok(last_persisted_block_hash) => {
self.metrics.persistence_duration.record(start_time.elapsed());
self.metrics.engine.persistence_duration.record(start_time.elapsed());
let Some(last_persisted_block_hash) = last_persisted_block_hash else {
// if this happened, then we persisted no blocks because we sent an
// empty vec of blocks
@ -1161,7 +1161,7 @@ where
// We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
// before that
self.state.tree_state.remove_until(backfill_height, Some(backfill_height));
self.metrics.executed_blocks.set(self.state.tree_state.block_count() as f64);
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
// remove all buffered blocks below the backfill height
self.state.buffer.remove_old_blocks(backfill_height);
@ -1271,7 +1271,7 @@ where
}
self.backfill_sync_state = BackfillSyncState::Pending;
self.metrics.pipeline_runs.increment(1);
self.metrics.engine.pipeline_runs.increment(1);
debug!(target: "engine", "emitting backfill action event");
}
@ -1945,7 +1945,10 @@ where
let block = block.unseal();
let exec_time = Instant::now();
let output = executor.execute((&block, U256::MAX).into())?;
let output = self
.metrics
.executor
.metered((&block, U256::MAX).into(), |input| executor.execute(input))?;
debug!(target: "engine", elapsed=?exec_time.elapsed(), ?block_number, "Executed block");
if let Err(err) = self.consensus.validate_block_post_execution(
@ -1998,7 +2001,7 @@ where
}
self.state.tree_state.insert_executed(executed);
self.metrics.executed_blocks.set(self.state.tree_state.block_count() as f64);
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
// emit insert event
let engine_event = if self.state.tree_state.is_fork(block_hash) {

View File

@ -17,6 +17,7 @@ reth-execution-errors.workspace = true
reth-primitives.workspace = true
revm-primitives.workspace = true
reth-prune-types.workspace = true
reth-metrics.workspace = true
reth-storage-errors.workspace = true
reth-execution-types.workspace = true
@ -24,6 +25,7 @@ revm.workspace = true
alloy-eips.workspace = true
auto_impl.workspace = true
futures-util.workspace = true
metrics.workspace = true
parking_lot = { workspace = true, optional = true }
[dev-dependencies]

View File

@ -25,6 +25,8 @@ use revm_primitives::{
pub mod builder;
pub mod either;
pub mod execute;
#[cfg(feature = "std")]
pub mod metrics;
pub mod noop;
pub mod provider;
pub mod system_calls;

42
crates/evm/src/metrics.rs Normal file
View File

@ -0,0 +1,42 @@
//! Executor metrics.
//!
//! Block processing related to syncing should take care to update the metrics by using e.g.
//! [`ExecutorMetrics::metered`].
use std::time::Instant;
use metrics::{Counter, Gauge};
use reth_execution_types::BlockExecutionInput;
use reth_metrics::Metrics;
use reth_primitives::BlockWithSenders;
/// Executor metrics.
// TODO(onbjerg): add sload/sstore, acc load/acc change, bytecode metrics
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.execution")]
pub struct ExecutorMetrics {
/// The total amount of gas processed.
pub gas_processed_total: Counter,
/// The instantaneous amount of gas processed per second.
pub gas_per_second: Gauge,
}
impl ExecutorMetrics {
/// Execute the given block and update metrics for the execution.
pub fn metered<F, R>(&self, input: BlockExecutionInput<'_, BlockWithSenders>, f: F) -> R
where
F: FnOnce(BlockExecutionInput<'_, BlockWithSenders>) -> R,
{
let gas_used = input.block.gas_used;
// Execute the block and record the elapsed time.
let execute_start = Instant::now();
let output = f(input);
let execution_duration = execute_start.elapsed().as_secs_f64();
// Update gas metrics.
self.gas_processed_total.increment(gas_used);
self.gas_per_second.set(gas_used as f64 / execution_duration);
output
}
}

View File

@ -102,7 +102,7 @@ where
let pipeline = builder
.with_tip_sender(tip_tx)
.with_metrics_tx(metrics_tx.clone())
.with_metrics_tx(metrics_tx)
.add_stages(
DefaultStages::new(
provider_factory.clone(),
@ -114,16 +114,13 @@ where
stage_config.clone(),
prune_modes.clone(),
)
.set(
ExecutionStage::new(
executor,
stage_config.execution.into(),
stage_config.execution_external_clean_threshold(),
prune_modes,
exex_manager_handle,
)
.with_metrics_tx(metrics_tx),
),
.set(ExecutionStage::new(
executor,
stage_config.execution.into(),
stage_config.execution_external_clean_threshold(),
prune_modes,
exex_manager_handle,
)),
)
.build(provider_factory, static_file_producer);

View File

@ -1,6 +1,5 @@
use crate::{metrics::SyncMetrics, StageCheckpoint, StageId};
use alloy_primitives::BlockNumber;
use reth_primitives_traits::constants::MEGAGAS;
use std::{
future::Future,
pin::Pin,
@ -30,11 +29,6 @@ pub enum MetricEvent {
/// If specified, `entities_total` metric is updated.
max_block_number: Option<BlockNumber>,
},
/// Execution stage processed some amount of gas.
ExecutionStageGas {
/// Gas processed.
gas: u64,
},
}
/// Metrics routine that listens to new metric events on the `events_rx` receiver.
@ -82,9 +76,6 @@ impl MetricsListener {
stage_metrics.entities_total.set(total as f64);
}
}
MetricEvent::ExecutionStageGas { gas } => {
self.sync_metrics.execution_stage.mgas_processed_total.increment(gas / MEGAGAS)
}
}
}
}

View File

@ -1,14 +1,10 @@
use crate::StageId;
use reth_metrics::{
metrics::{Counter, Gauge},
Metrics,
};
use reth_metrics::{metrics::Gauge, Metrics};
use std::collections::HashMap;
#[derive(Debug, Default)]
pub(crate) struct SyncMetrics {
pub(crate) stages: HashMap<StageId, StageMetrics>,
pub(crate) execution_stage: ExecutionStageMetrics,
}
impl SyncMetrics {
@ -31,11 +27,3 @@ pub(crate) struct StageMetrics {
/// The number of total entities of the last commit for a stage, if applicable.
pub(crate) entities_total: Gauge,
}
/// Execution stage metrics.
#[derive(Metrics)]
#[metrics(scope = "sync.execution")]
pub(crate) struct ExecutionStageMetrics {
/// The total amount of gas processed (in millions)
pub(crate) mgas_processed_total: Counter,
}

View File

@ -3,7 +3,10 @@ use num_traits::Zero;
use reth_config::config::ExecutionConfig;
use reth_db::{static_file::HeaderMask, tables};
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_evm::execute::{BatchExecutor, BlockExecutorProvider};
use reth_evm::{
execute::{BatchExecutor, BlockExecutorProvider},
metrics::ExecutorMetrics,
};
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_exex::{ExExManagerHandle, ExExNotification};
use reth_primitives::{BlockNumber, Header, StaticFileSegment};
@ -18,8 +21,8 @@ use reth_prune_types::PruneModes;
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::{
BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput, ExecOutput,
ExecutionCheckpoint, ExecutionStageThresholds, MetricEvent, MetricEventsSender, Stage,
StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
ExecutionCheckpoint, ExecutionStageThresholds, Stage, StageCheckpoint, StageError, StageId,
UnwindInput, UnwindOutput,
};
use std::{
cmp::Ordering,
@ -61,7 +64,6 @@ use tracing::*;
// false positive, we cannot derive it if !DB: Debug.
#[allow(missing_debug_implementations)]
pub struct ExecutionStage<E> {
metrics_tx: Option<MetricEventsSender>,
/// The stage's internal block executor
executor_provider: E,
/// The commit thresholds of the execution stage.
@ -83,11 +85,13 @@ pub struct ExecutionStage<E> {
post_unwind_commit_input: Option<Chain>,
/// Handle to communicate with `ExEx` manager.
exex_manager_handle: ExExManagerHandle,
/// Executor metrics.
metrics: ExecutorMetrics,
}
impl<E> ExecutionStage<E> {
/// Create new execution stage with specified config.
pub const fn new(
pub fn new(
executor_provider: E,
thresholds: ExecutionStageThresholds,
external_clean_threshold: u64,
@ -95,7 +99,6 @@ impl<E> ExecutionStage<E> {
exex_manager_handle: ExExManagerHandle,
) -> Self {
Self {
metrics_tx: None,
external_clean_threshold,
executor_provider,
thresholds,
@ -103,6 +106,7 @@ impl<E> ExecutionStage<E> {
post_execute_commit_input: None,
post_unwind_commit_input: None,
exex_manager_handle,
metrics: ExecutorMetrics::default(),
}
}
@ -135,12 +139,6 @@ impl<E> ExecutionStage<E> {
)
}
/// Set the metric events sender.
pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self {
self.metrics_tx = Some(metrics_tx);
self
}
/// Adjusts the prune modes related to changesets.
///
/// This function verifies whether the [`super::MerkleStage`] or Hashing stages will run from
@ -272,12 +270,13 @@ where
// Execute the block
let execute_start = Instant::now();
executor.execute_and_verify_one((&block, td).into()).map_err(|error| {
StageError::Block {
self.metrics.metered((&block, td).into(), |input| {
executor.execute_and_verify_one(input).map_err(|error| StageError::Block {
block: Box::new(block.header.clone().seal_slow()),
error: BlockErrorKind::Execution(error),
}
})
})?;
execution_duration += execute_start.elapsed();
// Log execution throughput
@ -296,12 +295,6 @@ where
last_log_instant = Instant::now();
}
// Gas metrics
if let Some(metrics_tx) = &mut self.metrics_tx {
let _ =
metrics_tx.send(MetricEvent::ExecutionStageGas { gas: block.header.gas_used });
}
stage_progress = block_number;
stage_checkpoint.progress.processed += block.gas_used;

View File

@ -147,9 +147,7 @@
"orientation": "auto",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@ -217,9 +215,7 @@
"orientation": "auto",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@ -287,9 +283,7 @@
"orientation": "auto",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@ -357,9 +351,7 @@
"orientation": "auto",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@ -427,9 +419,7 @@
"orientation": "auto",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@ -497,9 +487,7 @@
"orientation": "auto",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@ -579,9 +567,7 @@
"minVizWidth": 75,
"orientation": "auto",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@ -649,9 +635,7 @@
"namePlacement": "auto",
"orientation": "horizontal",
"reduceOptions": {
"calcs": [
"last"
],
"calcs": ["last"],
"fields": "",
"values": false
},
@ -751,9 +735,7 @@
"orientation": "auto",
"percentChangeColorMode": "standard",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@ -1632,22 +1614,16 @@
},
"id": 48,
"options": {
"displayLabels": [
"name"
],
"displayLabels": ["name"],
"legend": {
"displayMode": "table",
"placement": "right",
"showLegend": true,
"values": [
"value"
]
"values": ["value"]
},
"pieType": "pie",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@ -1806,15 +1782,11 @@
"displayMode": "table",
"placement": "right",
"showLegend": true,
"values": [
"value"
]
"values": ["value"]
},
"pieType": "pie",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@ -1974,9 +1946,7 @@
"footer": {
"countRows": false,
"fields": "",
"reducer": [
"sum"
],
"reducer": ["sum"],
"show": false
},
"showHeader": true
@ -2142,22 +2112,16 @@
},
"id": 202,
"options": {
"displayLabels": [
"name"
],
"displayLabels": ["name"],
"legend": {
"displayMode": "table",
"placement": "right",
"showLegend": true,
"values": [
"value"
]
"values": ["value"]
},
"pieType": "pie",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@ -2307,9 +2271,7 @@
"footer": {
"countRows": false,
"fields": "",
"reducer": [
"sum"
],
"reducer": ["sum"],
"show": false
},
"showHeader": true
@ -2457,9 +2419,7 @@
"footer": {
"countRows": false,
"fields": "",
"reducer": [
"sum"
],
"reducer": ["sum"],
"show": false
},
"showHeader": true
@ -2696,7 +2656,7 @@
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "The amount of gas processed by the execution stage in millions per second.\n\nNote: For mainnet, the block range 2,383,397-2,620,384 will be slow because of the 2016 DoS attack.",
"description": "The amount of gas processed by the executor per second.\n\nNote: For mainnet, the block range 2,383,397-2,620,384 will be slow because of the 2016 DoS attack.",
"fieldConfig": {
"defaults": {
"color": {
@ -2743,7 +2703,7 @@
}
]
},
"unit": "Mgas/s",
"unit": "gas/s",
"unitScale": true
},
"overrides": []
@ -2774,7 +2734,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "rate(reth_sync_execution_mgas_processed_total{instance=~\"$instance\"}[30s])",
"expr": "rate(reth_sync_execution_gas_per_second{instance=~\"$instance\"}[30s])",
"legendFormat": "Gas/s (30s)",
"range": true,
"refId": "A"
@ -2785,7 +2745,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "rate(reth_sync_execution_mgas_processed_total{instance=~\"$instance\"}[1m])",
"expr": "rate(reth_sync_execution_gas_per_second{instance=~\"$instance\"}[1m])",
"hide": false,
"legendFormat": "Gas/s (1m)",
"range": true,
@ -2797,7 +2757,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "rate(reth_sync_execution_mgas_processed_total{instance=~\"$instance\"}[5m])",
"expr": "rate(reth_sync_execution_gas_per_second{instance=~\"$instance\"}[5m])",
"hide": false,
"legendFormat": "Gas/s (5m)",
"range": true,
@ -2809,7 +2769,7 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "builder",
"expr": "rate(reth_sync_execution_mgas_processed_total{instance=~\"$instance\"}[10m])",
"expr": "rate(reth_sync_execution_gas_per_second{instance=~\"$instance\"}[10m])",
"hide": false,
"legendFormat": "Gas/s (10m)",
"range": true,
@ -8099,9 +8059,7 @@
"justifyMode": "auto",
"orientation": "auto",
"reduceOptions": {
"calcs": [
"lastNotNull"
],
"calcs": ["lastNotNull"],
"fields": "",
"values": false
},