feat(bin, stages): metrics listener (#3483)

This commit is contained in:
Alexey Shekhirin
2023-06-30 15:27:01 +01:00
committed by GitHub
parent 10db78618f
commit 83b14cb611
9 changed files with 164 additions and 83 deletions

View File

@ -51,6 +51,7 @@ use reth_stages::{
ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage,
TotalDifficultyStage,
},
MetricEventsSender, MetricsListener,
};
use reth_tasks::TaskExecutor;
use reth_transaction_pool::{EthTransactionValidator, TransactionPool};
@ -275,6 +276,11 @@ impl Command {
debug!(target: "reth::cli", "Spawning payload builder service");
ctx.task_executor.spawn_critical("payload builder service", payload_service);
debug!(target: "reth::cli", "Spawning metrics listener task");
let (metrics_tx, metrics_rx) = unbounded_channel();
let metrics_listener = MetricsListener::new(metrics_rx);
ctx.task_executor.spawn_critical("metrics listener task", metrics_listener);
// Configure the pipeline
let (mut pipeline, client) = if self.auto_mine {
let (_, client, mut task) = AutoSealBuilder::new(
@ -293,6 +299,7 @@ impl Command {
Arc::clone(&consensus),
db.clone(),
&ctx.task_executor,
metrics_tx,
)
.await?;
@ -310,6 +317,7 @@ impl Command {
Arc::clone(&consensus),
db.clone(),
&ctx.task_executor,
metrics_tx,
)
.await?;
@ -421,6 +429,7 @@ impl Command {
consensus: Arc<dyn Consensus>,
db: DB,
task_executor: &TaskExecutor,
metrics_tx: MetricEventsSender,
) -> eyre::Result<Pipeline<DB>>
where
DB: Database + Unpin + Clone + 'static,
@ -452,6 +461,7 @@ impl Command {
consensus,
max_block,
self.debug.continuous,
metrics_tx,
)
.await?;
@ -632,6 +642,7 @@ impl Command {
consensus: Arc<dyn Consensus>,
max_block: Option<u64>,
continuous: bool,
metrics_tx: MetricEventsSender,
) -> eyre::Result<Pipeline<DB>>
where
DB: Database + Clone + 'static,
@ -670,6 +681,7 @@ impl Command {
if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) };
let pipeline = builder
.with_tip_sender(tip_tx)
.with_metric_events(metrics_tx)
.add_stages(
DefaultStages::new(
header_mode,

View File

@ -9,7 +9,7 @@ repository.workspace = true
description = "reth metrics utilities"
[dependencies]
# reth
# reth
reth-metrics-derive = { path = "./metrics-derive" }
# metrics

View File

@ -64,6 +64,7 @@
//!
//! - `test-utils`: Export utilities for testing
mod error;
mod metrics;
mod pipeline;
mod stage;
mod util;
@ -81,5 +82,6 @@ pub mod stages;
pub mod sets;
pub use error::*;
pub use metrics::*;
pub use pipeline::*;
pub use stage::*;

View File

@ -0,0 +1,85 @@
use crate::metrics::{StageMetrics, SyncMetrics};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
BlockNumber,
};
use std::{
future::Future,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
/// Alias type for metric producers to use.
pub type MetricEventsSender = UnboundedSender<MetricEvent>;
/// Collection of metric events.
#[derive(Clone, Copy, Debug)]
pub enum MetricEvent {
/// Stage reached new checkpoint.
StageCheckpoint {
/// Stage ID.
stage_id: StageId,
/// Stage checkpoint.
checkpoint: StageCheckpoint,
/// Maximum known block number reachable by this stage.
/// If specified, `entities_total` metric is updated.
max_block_number: Option<BlockNumber>,
},
}
/// Metrics routine that listens to new metric events on the `events_rx` receiver.
/// Upon receiving new event, related metrics are updated.
#[derive(Debug)]
pub struct MetricsListener {
events_rx: UnboundedReceiver<MetricEvent>,
pub(crate) sync_metrics: SyncMetrics,
}
impl MetricsListener {
/// Creates a new [MetricsListener] with the provided receiver of [MetricEvent].
pub fn new(events_rx: UnboundedReceiver<MetricEvent>) -> Self {
Self { events_rx, sync_metrics: SyncMetrics::default() }
}
fn handle_event(&mut self, event: MetricEvent) {
match event {
MetricEvent::StageCheckpoint { stage_id, checkpoint, max_block_number } => {
let stage_metrics = self.sync_metrics.stages.entry(stage_id).or_insert_with(|| {
StageMetrics::new_with_labels(&[("stage", stage_id.to_string())])
});
stage_metrics.checkpoint.set(checkpoint.block_number as f64);
let (processed, total) = match checkpoint.entities() {
Some(entities) => (entities.processed, Some(entities.total)),
None => (checkpoint.block_number, max_block_number),
};
stage_metrics.entities_processed.set(processed as f64);
if let Some(total) = total {
stage_metrics.entities_total.set(total as f64);
}
}
}
}
}
impl Future for MetricsListener {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// Loop until we drain the `events_rx` channel
loop {
let Some(event) = ready!(this.events_rx.poll_recv(cx)) else {
// Channel has closed
return Poll::Ready(())
};
this.handle_event(event);
}
}
}

View File

@ -0,0 +1,5 @@
mod listener;
mod sync_metrics;
pub use listener::{MetricEvent, MetricEventsSender, MetricsListener};
use sync_metrics::*;

View File

@ -0,0 +1,22 @@
use reth_metrics::{
metrics::{self, Gauge},
Metrics,
};
use reth_primitives::stage::StageId;
use std::collections::HashMap;
#[derive(Debug, Default)]
pub(crate) struct SyncMetrics {
pub(crate) stages: HashMap<StageId, StageMetrics>,
}
#[derive(Metrics)]
#[metrics(scope = "sync")]
pub(crate) struct StageMetrics {
/// The block number of the last commit for a stage.
pub(crate) checkpoint: Gauge,
/// The number of processed entities of the last commit for a stage, if applicable.
pub(crate) entities_processed: Gauge,
/// The number of total entities of the last commit for a stage, if applicable.
pub(crate) entities_total: Gauge,
}

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use crate::{pipeline::BoxedStage, Pipeline, Stage, StageSet};
use crate::{pipeline::BoxedStage, MetricEventsSender, Pipeline, Stage, StageSet};
use reth_db::database::Database;
use reth_primitives::{stage::StageId, BlockNumber, ChainSpec, H256};
use tokio::sync::watch;
@ -17,6 +17,7 @@ where
max_block: Option<BlockNumber>,
/// A receiver for the current chain tip to sync to.
tip_tx: Option<watch::Sender<H256>>,
metrics_tx: Option<MetricEventsSender>,
}
impl<DB> PipelineBuilder<DB>
@ -60,11 +61,17 @@ where
self
}
/// Set the metric events sender.
pub fn with_metric_events(mut self, metrics_tx: MetricEventsSender) -> Self {
self.metrics_tx = Some(metrics_tx);
self
}
/// Builds the final [`Pipeline`] using the given database.
///
/// Note: it's expected that this is either an [Arc](std::sync::Arc) or an Arc wrapper type.
pub fn build(self, db: DB, chain_spec: Arc<ChainSpec>) -> Pipeline<DB> {
let Self { stages, max_block, tip_tx } = self;
let Self { stages, max_block, tip_tx, metrics_tx } = self;
Pipeline {
db,
chain_spec,
@ -73,14 +80,14 @@ where
tip_tx,
listeners: Default::default(),
progress: Default::default(),
metrics: Default::default(),
metrics_tx,
}
}
}
impl<DB: Database> Default for PipelineBuilder<DB> {
fn default() -> Self {
Self { stages: Vec::new(), max_block: None, tip_tx: None }
Self { stages: Vec::new(), max_block: None, tip_tx: None, metrics_tx: None }
}
}

View File

@ -1,4 +1,7 @@
use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, UnwindInput};
use crate::{
error::*, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage, StageError,
UnwindInput,
};
use futures_util::Future;
use reth_db::database::Database;
use reth_interfaces::executor::BlockExecutionError;
@ -17,14 +20,12 @@ mod ctrl;
mod event;
mod progress;
mod set;
mod sync_metrics;
pub use crate::pipeline::ctrl::ControlFlow;
pub use builder::*;
pub use event::*;
use progress::*;
pub use set::*;
use sync_metrics::*;
/// A container for a queued stage.
pub(crate) type BoxedStage<DB> = Box<dyn Stage<DB>>;
@ -105,7 +106,7 @@ pub struct Pipeline<DB: Database> {
progress: PipelineProgress,
/// A receiver for the current chain tip to sync to.
tip_tx: Option<watch::Sender<H256>>,
metrics: Metrics,
metrics_tx: Option<MetricEventsSender>,
}
impl<DB> Pipeline<DB>
@ -138,16 +139,17 @@ where
/// Registers progress metrics for each registered stage
pub fn register_metrics(&mut self) -> Result<(), PipelineError> {
let Some(metrics_tx) = &mut self.metrics_tx else { return Ok(()) };
let factory = ProviderFactory::new(&self.db, self.chain_spec.clone());
let provider = factory.provider()?;
for stage in &self.stages {
let stage_id = stage.id();
self.metrics.stage_checkpoint(
let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
stage_id,
provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
None,
);
checkpoint: provider.get_stage_checkpoint(stage_id)?.unwrap_or_default(),
max_block_number: None,
});
}
Ok(())
}
@ -288,12 +290,15 @@ where
done = checkpoint.block_number == to,
"Stage unwound"
);
self.metrics.stage_checkpoint(
stage_id, checkpoint,
// We assume it was set in the previous execute iteration, so it
// doesn't change when we unwind.
None,
);
if let Some(metrics_tx) = &mut self.metrics_tx {
let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
stage_id,
checkpoint,
// We assume it was set in the previous execute iteration, so it
// doesn't change when we unwind.
max_block_number: None,
});
}
provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
self.listeners
@ -372,7 +377,13 @@ where
%done,
"Stage committed progress"
);
self.metrics.stage_checkpoint(stage_id, checkpoint, target);
if let Some(metrics_tx) = &mut self.metrics_tx {
let _ = metrics_tx.send(MetricEvent::StageCheckpoint {
stage_id,
checkpoint,
max_block_number: target,
});
}
provider_rw.save_stage_checkpoint(stage_id, checkpoint)?;
self.listeners.notify(PipelineEvent::Ran {

View File

@ -1,63 +0,0 @@
use reth_metrics::{
metrics::{self, Gauge},
Metrics,
};
use reth_primitives::{
stage::{
AccountHashingCheckpoint, EntitiesCheckpoint, ExecutionCheckpoint, HeadersCheckpoint,
IndexHistoryCheckpoint, StageCheckpoint, StageId, StageUnitCheckpoint,
StorageHashingCheckpoint,
},
BlockNumber,
};
use std::collections::HashMap;
#[derive(Metrics)]
#[metrics(scope = "sync")]
pub(crate) struct StageMetrics {
/// The block number of the last commit for a stage.
checkpoint: Gauge,
/// The number of processed entities of the last commit for a stage, if applicable.
entities_processed: Gauge,
/// The number of total entities of the last commit for a stage, if applicable.
entities_total: Gauge,
}
#[derive(Default)]
pub(crate) struct Metrics {
stages: HashMap<StageId, StageMetrics>,
}
impl Metrics {
pub(crate) fn stage_checkpoint(
&mut self,
stage_id: StageId,
checkpoint: StageCheckpoint,
max_block_number: Option<BlockNumber>,
) {
let stage_metrics = self
.stages
.entry(stage_id)
.or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())]));
stage_metrics.checkpoint.set(checkpoint.block_number as f64);
let (processed, total) = match checkpoint.stage_checkpoint {
Some(
StageUnitCheckpoint::Account(AccountHashingCheckpoint { progress, .. }) |
StageUnitCheckpoint::Storage(StorageHashingCheckpoint { progress, .. }) |
StageUnitCheckpoint::Entities(progress @ EntitiesCheckpoint { .. }) |
StageUnitCheckpoint::Execution(ExecutionCheckpoint { progress, .. }) |
StageUnitCheckpoint::Headers(HeadersCheckpoint { progress, .. }) |
StageUnitCheckpoint::IndexHistory(IndexHistoryCheckpoint { progress, .. }),
) => (progress.processed, Some(progress.total)),
None => (checkpoint.block_number, max_block_number),
};
stage_metrics.entities_processed.set(processed as f64);
if let Some(total) = total {
stage_metrics.entities_total.set(total as f64);
}
}
}