From 83b14cb611fe4460873c944d9f65da3675799ba8 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 30 Jun 2023 15:27:01 +0100 Subject: [PATCH] feat(bin, stages): metrics listener (#3483) --- bin/reth/src/node/mod.rs | 12 +++ crates/metrics/Cargo.toml | 2 +- crates/stages/src/lib.rs | 2 + crates/stages/src/metrics/listener.rs | 85 ++++++++++++++++++++++ crates/stages/src/metrics/mod.rs | 5 ++ crates/stages/src/metrics/sync_metrics.rs | 22 ++++++ crates/stages/src/pipeline/builder.rs | 15 +++- crates/stages/src/pipeline/mod.rs | 41 +++++++---- crates/stages/src/pipeline/sync_metrics.rs | 63 ---------------- 9 files changed, 164 insertions(+), 83 deletions(-) create mode 100644 crates/stages/src/metrics/listener.rs create mode 100644 crates/stages/src/metrics/mod.rs create mode 100644 crates/stages/src/metrics/sync_metrics.rs delete mode 100644 crates/stages/src/pipeline/sync_metrics.rs diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index aaecdda51..d8c3cbf5a 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -51,6 +51,7 @@ use reth_stages::{ ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage, TotalDifficultyStage, }, + MetricEventsSender, MetricsListener, }; use reth_tasks::TaskExecutor; use reth_transaction_pool::{EthTransactionValidator, TransactionPool}; @@ -275,6 +276,11 @@ impl Command { debug!(target: "reth::cli", "Spawning payload builder service"); ctx.task_executor.spawn_critical("payload builder service", payload_service); + debug!(target: "reth::cli", "Spawning metrics listener task"); + let (metrics_tx, metrics_rx) = unbounded_channel(); + let metrics_listener = MetricsListener::new(metrics_rx); + ctx.task_executor.spawn_critical("metrics listener task", metrics_listener); + // Configure the pipeline let (mut pipeline, client) = if self.auto_mine { let (_, client, mut task) = AutoSealBuilder::new( @@ -293,6 +299,7 @@ impl Command { Arc::clone(&consensus), db.clone(), &ctx.task_executor, + metrics_tx, ) .await?; @@ -310,6 +317,7 @@ impl Command { Arc::clone(&consensus), db.clone(), &ctx.task_executor, + metrics_tx, ) .await?; @@ -421,6 +429,7 @@ impl Command { consensus: Arc, db: DB, task_executor: &TaskExecutor, + metrics_tx: MetricEventsSender, ) -> eyre::Result> where DB: Database + Unpin + Clone + 'static, @@ -452,6 +461,7 @@ impl Command { consensus, max_block, self.debug.continuous, + metrics_tx, ) .await?; @@ -632,6 +642,7 @@ impl Command { consensus: Arc, max_block: Option, continuous: bool, + metrics_tx: MetricEventsSender, ) -> eyre::Result> where DB: Database + Clone + 'static, @@ -670,6 +681,7 @@ impl Command { if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; let pipeline = builder .with_tip_sender(tip_tx) + .with_metric_events(metrics_tx) .add_stages( DefaultStages::new( header_mode, diff --git a/crates/metrics/Cargo.toml b/crates/metrics/Cargo.toml index adbd26cb8..63a14cc83 100644 --- a/crates/metrics/Cargo.toml +++ b/crates/metrics/Cargo.toml @@ -9,7 +9,7 @@ repository.workspace = true description = "reth metrics utilities" [dependencies] -# reth +# reth reth-metrics-derive = { path = "./metrics-derive" } # metrics diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index a1694a6b7..ebfbac6f7 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -64,6 +64,7 @@ //! //! - `test-utils`: Export utilities for testing mod error; +mod metrics; mod pipeline; mod stage; mod util; @@ -81,5 +82,6 @@ pub mod stages; pub mod sets; pub use error::*; +pub use metrics::*; pub use pipeline::*; pub use stage::*; diff --git a/crates/stages/src/metrics/listener.rs b/crates/stages/src/metrics/listener.rs new file mode 100644 index 000000000..f6672a4e6 --- /dev/null +++ b/crates/stages/src/metrics/listener.rs @@ -0,0 +1,85 @@ +use crate::metrics::{StageMetrics, SyncMetrics}; +use reth_primitives::{ + stage::{StageCheckpoint, StageId}, + BlockNumber, +}; +use std::{ + future::Future, + pin::Pin, + task::{ready, Context, Poll}, +}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; + +/// Alias type for metric producers to use. +pub type MetricEventsSender = UnboundedSender; + +/// Collection of metric events. +#[derive(Clone, Copy, Debug)] +pub enum MetricEvent { + /// Stage reached new checkpoint. + StageCheckpoint { + /// Stage ID. + stage_id: StageId, + /// Stage checkpoint. + checkpoint: StageCheckpoint, + /// Maximum known block number reachable by this stage. + /// If specified, `entities_total` metric is updated. + max_block_number: Option, + }, +} + +/// Metrics routine that listens to new metric events on the `events_rx` receiver. +/// Upon receiving new event, related metrics are updated. +#[derive(Debug)] +pub struct MetricsListener { + events_rx: UnboundedReceiver, + pub(crate) sync_metrics: SyncMetrics, +} + +impl MetricsListener { + /// Creates a new [MetricsListener] with the provided receiver of [MetricEvent]. + pub fn new(events_rx: UnboundedReceiver) -> Self { + Self { events_rx, sync_metrics: SyncMetrics::default() } + } + + fn handle_event(&mut self, event: MetricEvent) { + match event { + MetricEvent::StageCheckpoint { stage_id, checkpoint, max_block_number } => { + let stage_metrics = self.sync_metrics.stages.entry(stage_id).or_insert_with(|| { + StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]) + }); + + stage_metrics.checkpoint.set(checkpoint.block_number as f64); + + let (processed, total) = match checkpoint.entities() { + Some(entities) => (entities.processed, Some(entities.total)), + None => (checkpoint.block_number, max_block_number), + }; + + stage_metrics.entities_processed.set(processed as f64); + + if let Some(total) = total { + stage_metrics.entities_total.set(total as f64); + } + } + } + } +} + +impl Future for MetricsListener { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + // Loop until we drain the `events_rx` channel + loop { + let Some(event) = ready!(this.events_rx.poll_recv(cx)) else { + // Channel has closed + return Poll::Ready(()) + }; + + this.handle_event(event); + } + } +} diff --git a/crates/stages/src/metrics/mod.rs b/crates/stages/src/metrics/mod.rs new file mode 100644 index 000000000..bed2742c2 --- /dev/null +++ b/crates/stages/src/metrics/mod.rs @@ -0,0 +1,5 @@ +mod listener; +mod sync_metrics; + +pub use listener::{MetricEvent, MetricEventsSender, MetricsListener}; +use sync_metrics::*; diff --git a/crates/stages/src/metrics/sync_metrics.rs b/crates/stages/src/metrics/sync_metrics.rs new file mode 100644 index 000000000..859a7e6d7 --- /dev/null +++ b/crates/stages/src/metrics/sync_metrics.rs @@ -0,0 +1,22 @@ +use reth_metrics::{ + metrics::{self, Gauge}, + Metrics, +}; +use reth_primitives::stage::StageId; +use std::collections::HashMap; + +#[derive(Debug, Default)] +pub(crate) struct SyncMetrics { + pub(crate) stages: HashMap, +} + +#[derive(Metrics)] +#[metrics(scope = "sync")] +pub(crate) struct StageMetrics { + /// The block number of the last commit for a stage. + pub(crate) checkpoint: Gauge, + /// The number of processed entities of the last commit for a stage, if applicable. + pub(crate) entities_processed: Gauge, + /// The number of total entities of the last commit for a stage, if applicable. + pub(crate) entities_total: Gauge, +} diff --git a/crates/stages/src/pipeline/builder.rs b/crates/stages/src/pipeline/builder.rs index 3cedb35b0..5e72da6b0 100644 --- a/crates/stages/src/pipeline/builder.rs +++ b/crates/stages/src/pipeline/builder.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use crate::{pipeline::BoxedStage, Pipeline, Stage, StageSet}; +use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageSet}; use reth_db::database::Database; use reth_primitives::{stage::StageId, BlockNumber, ChainSpec, H256}; use tokio::sync::watch; @@ -17,6 +17,7 @@ where max_block: Option, /// A receiver for the current chain tip to sync to. tip_tx: Option>, + metrics_tx: Option, } impl PipelineBuilder @@ -60,11 +61,17 @@ where self } + /// Set the metric events sender. + pub fn with_metric_events(mut self, metrics_tx: MetricEventsSender) -> Self { + self.metrics_tx = Some(metrics_tx); + self + } + /// Builds the final [`Pipeline`] using the given database. /// /// Note: it's expected that this is either an [Arc](std::sync::Arc) or an Arc wrapper type. pub fn build(self, db: DB, chain_spec: Arc) -> Pipeline { - let Self { stages, max_block, tip_tx } = self; + let Self { stages, max_block, tip_tx, metrics_tx } = self; Pipeline { db, chain_spec, @@ -73,14 +80,14 @@ where tip_tx, listeners: Default::default(), progress: Default::default(), - metrics: Default::default(), + metrics_tx, } } } impl Default for PipelineBuilder { fn default() -> Self { - Self { stages: Vec::new(), max_block: None, tip_tx: None } + Self { stages: Vec::new(), max_block: None, tip_tx: None, metrics_tx: None } } } diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 454fa377b..fa2ec285e 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -1,4 +1,7 @@ -use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, UnwindInput}; +use crate::{ + error::*, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage, StageError, + UnwindInput, +}; use futures_util::Future; use reth_db::database::Database; use reth_interfaces::executor::BlockExecutionError; @@ -17,14 +20,12 @@ mod ctrl; mod event; mod progress; mod set; -mod sync_metrics; pub use crate::pipeline::ctrl::ControlFlow; pub use builder::*; pub use event::*; use progress::*; pub use set::*; -use sync_metrics::*; /// A container for a queued stage. pub(crate) type BoxedStage = Box>; @@ -105,7 +106,7 @@ pub struct Pipeline { progress: PipelineProgress, /// A receiver for the current chain tip to sync to. tip_tx: Option>, - metrics: Metrics, + metrics_tx: Option, } impl Pipeline @@ -138,16 +139,17 @@ where /// Registers progress metrics for each registered stage pub fn register_metrics(&mut self) -> Result<(), PipelineError> { + let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) }; let factory = ProviderFactory::new(&self.db, self.chain_spec.clone()); let provider = factory.provider()?; for stage in &self.stages { let stage_id = stage.id(); - self.metrics.stage_checkpoint( + let _ = metrics_tx.send(MetricEvent::StageCheckpoint { stage_id, - provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(), - None, - ); + checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(), + max_block_number: None, + }); } Ok(()) } @@ -288,12 +290,15 @@ where done = checkpoint.block_number == to, "Stage unwound" ); - self.metrics.stage_checkpoint( - stage_id, checkpoint, - // We assume it was set in the previous execute iteration, so it - // doesn't change when we unwind. - None, - ); + if let Some(metrics_tx) = &mut self.metrics_tx { + let _ = metrics_tx.send(MetricEvent::StageCheckpoint { + stage_id, + checkpoint, + // We assume it was set in the previous execute iteration, so it + // doesn't change when we unwind. + max_block_number: None, + }); + } provider_rw.save_stage_checkpoint(stage_id, checkpoint)?; self.listeners @@ -372,7 +377,13 @@ where %done, "Stage committed progress" ); - self.metrics.stage_checkpoint(stage_id, checkpoint, target); + if let Some(metrics_tx) = &mut self.metrics_tx { + let _ = metrics_tx.send(MetricEvent::StageCheckpoint { + stage_id, + checkpoint, + max_block_number: target, + }); + } provider_rw.save_stage_checkpoint(stage_id, checkpoint)?; self.listeners.notify(PipelineEvent::Ran { diff --git a/crates/stages/src/pipeline/sync_metrics.rs b/crates/stages/src/pipeline/sync_metrics.rs deleted file mode 100644 index 04a7d6358..000000000 --- a/crates/stages/src/pipeline/sync_metrics.rs +++ /dev/null @@ -1,63 +0,0 @@ -use reth_metrics::{ - metrics::{self, Gauge}, - Metrics, -}; -use reth_primitives::{ - stage::{ - AccountHashingCheckpoint, EntitiesCheckpoint, ExecutionCheckpoint, HeadersCheckpoint, - IndexHistoryCheckpoint, StageCheckpoint, StageId, StageUnitCheckpoint, - StorageHashingCheckpoint, - }, - BlockNumber, -}; -use std::collections::HashMap; - -#[derive(Metrics)] -#[metrics(scope = "sync")] -pub(crate) struct StageMetrics { - /// The block number of the last commit for a stage. - checkpoint: Gauge, - /// The number of processed entities of the last commit for a stage, if applicable. - entities_processed: Gauge, - /// The number of total entities of the last commit for a stage, if applicable. - entities_total: Gauge, -} - -#[derive(Default)] -pub(crate) struct Metrics { - stages: HashMap, -} - -impl Metrics { - pub(crate) fn stage_checkpoint( - &mut self, - stage_id: StageId, - checkpoint: StageCheckpoint, - max_block_number: Option, - ) { - let stage_metrics = self - .stages - .entry(stage_id) - .or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())])); - - stage_metrics.checkpoint.set(checkpoint.block_number as f64); - - let (processed, total) = match checkpoint.stage_checkpoint { - Some( - StageUnitCheckpoint::Account(AccountHashingCheckpoint { progress, .. }) | - StageUnitCheckpoint::Storage(StorageHashingCheckpoint { progress, .. }) | - StageUnitCheckpoint::Entities(progress @ EntitiesCheckpoint { .. }) | - StageUnitCheckpoint::Execution(ExecutionCheckpoint { progress, .. }) | - StageUnitCheckpoint::Headers(HeadersCheckpoint { progress, .. }) | - StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint { progress, .. }), - ) => (progress.processed, Some(progress.total)), - None => (checkpoint.block_number, max_block_number), - }; - - stage_metrics.entities_processed.set(processed as f64); - - if let Some(total) = total { - stage_metrics.entities_total.set(total as f64); - } - } -}