feat: track persistence metrics (#10250)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
This commit is contained in:
malik
2024-08-20 10:29:07 +01:00
committed by GitHub
parent 9926c6d38f
commit ec46fccece
4 changed files with 70 additions and 36 deletions

View File

@ -1,4 +1,7 @@
use reth_metrics::{metrics::Gauge, Metrics};
use reth_metrics::{
metrics::{Gauge, Histogram},
Metrics,
};
/// Metrics for the `BasicBlockDownloader`.
#[derive(Metrics)]
@ -7,3 +10,15 @@ pub(crate) struct BlockDownloaderMetrics {
/// How many blocks are currently being downloaded.
pub(crate) active_block_downloads: Gauge,
}
/// Metrics for the `PersistenceService`
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.persistence")]
pub(crate) struct PersistenceMetrics {
/// How long it took for blocks to be removed
pub(crate) remove_blocks_above_duration_seconds: Histogram,
/// How long it took for blocks to be saved
pub(crate) save_blocks_duration_seconds: Histogram,
/// How long it took for blocks to be pruned
pub(crate) prune_before_duration_seconds: Histogram,
}

View File

@ -1,12 +1,16 @@
#![allow(dead_code)]
use crate::metrics::PersistenceMetrics;
use reth_chain_state::ExecutedBlock;
use reth_db::Database;
use reth_errors::ProviderError;
use reth_primitives::B256;
use reth_provider::{writer::UnifiedStorageWriter, ProviderFactory, StaticFileProviderFactory};
use reth_prune::{Pruner, PrunerError, PrunerOutput};
use std::sync::mpsc::{Receiver, SendError, Sender};
use std::{
sync::mpsc::{Receiver, SendError, Sender},
time::Instant,
};
use thiserror::Error;
use tokio::sync::oneshot;
use tracing::{debug, error};
@ -26,24 +30,29 @@ pub struct PersistenceService<DB> {
incoming: Receiver<PersistenceAction>,
/// The pruner
pruner: Pruner<DB, ProviderFactory<DB>>,
/// metrics
metrics: PersistenceMetrics,
}
impl<DB: Database> PersistenceService<DB> {
/// Create a new persistence service
pub const fn new(
pub fn new(
provider: ProviderFactory<DB>,
incoming: Receiver<PersistenceAction>,
pruner: Pruner<DB, ProviderFactory<DB>>,
) -> Self {
Self { provider, incoming, pruner }
Self { provider, incoming, pruner, metrics: PersistenceMetrics::default() }
}
/// Prunes block data before the given block hash according to the configured prune
/// configuration.
fn prune_before(&mut self, block_num: u64) -> Result<PrunerOutput, PrunerError> {
debug!(target: "tree::persistence", ?block_num, "Running pruner");
let start_time = Instant::now();
// TODO: doing this properly depends on pruner segment changes
self.pruner.run(block_num)
let result = self.pruner.run(block_num);
self.metrics.prune_before_duration_seconds.record(start_time.elapsed());
result
}
}
@ -58,33 +67,14 @@ where
while let Ok(action) = self.incoming.recv() {
match action {
PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => {
let provider_rw = self.provider.provider_rw()?;
let sf_provider = self.provider.static_file_provider();
UnifiedStorageWriter::from(&provider_rw, &sf_provider)
.remove_blocks_above(new_tip_num)?;
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;
self.on_remove_blocks_above(new_tip_num)?;
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(());
}
PersistenceAction::SaveBlocks(blocks, sender) => {
let Some(last_block) = blocks.last() else {
let _ = sender.send(None);
continue
};
let last_block_hash = last_block.block().hash();
let provider_rw = self.provider.provider_rw()?;
let static_file_provider = self.provider.static_file_provider();
UnifiedStorageWriter::from(&provider_rw, &static_file_provider)
.save_blocks(&blocks)?;
UnifiedStorageWriter::commit(provider_rw, static_file_provider)?;
let result = self.on_save_blocks(blocks)?;
// we ignore the error because the caller may or may not care about the result
let _ = sender.send(Some(last_block_hash));
let _ = sender.send(result);
}
PersistenceAction::PruneBefore(block_num, sender) => {
let res = self.prune_before(block_num)?;
@ -96,6 +86,33 @@ where
}
Ok(())
}
fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result<(), PersistenceError> {
let start_time = Instant::now();
let provider_rw = self.provider.provider_rw()?;
let sf_provider = self.provider.static_file_provider();
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
Ok(())
}
fn on_save_blocks(&self, blocks: Vec<ExecutedBlock>) -> Result<Option<B256>, PersistenceError> {
let start_time = Instant::now();
let last_block_hash = blocks.last().map(|block| block.block().hash());
if last_block_hash.is_some() {
let provider_rw = self.provider.provider_rw()?;
let static_file_provider = self.provider.static_file_provider();
UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?;
UnifiedStorageWriter::commit(provider_rw, static_file_provider)?;
}
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block_hash)
}
}
/// One of the errors that can happen when using the persistence service.

View File

@ -1,5 +1,5 @@
use reth_metrics::{
metrics::{Counter, Gauge},
metrics::{Counter, Gauge, Histogram},
Metrics,
};
@ -15,5 +15,7 @@ pub(crate) struct EngineApiMetrics {
pub(crate) forkchoice_updated_messages: Counter,
/// The total count of new payload messages received.
pub(crate) new_payload_messages: Counter,
/// Histogram of persistence operation durations (in seconds)
pub(crate) persistence_duration: Histogram,
// TODO add latency metrics
}

View File

@ -858,18 +858,18 @@ where
}
if self.persistence_state.in_progress() {
let mut rx = self
let (mut rx, start_time) = self
.persistence_state
.rx
.take()
.expect("if a persistence task is in progress Receiver must be Some");
// Check if persistence has completed
// Check if persistence has complete
match rx.try_recv() {
Ok(last_persisted_block_hash) => {
self.metrics.persistence_duration.record(start_time.elapsed());
let Some(last_persisted_block_hash) = last_persisted_block_hash else {
// if this happened, then we persisted no blocks because we sent an empty
// vec of blocks
// if this happened, then we persisted no blocks because we sent an
// empty vec of blocks
warn!(target: "engine", "Persistence task completed but did not persist any blocks");
return Ok(())
};
@ -883,7 +883,7 @@ where
}
}
Err(TryRecvError::Closed) => return Err(TryRecvError::Closed),
Err(TryRecvError::Empty) => self.persistence_state.rx = Some(rx),
Err(TryRecvError::Empty) => self.persistence_state.rx = Some((rx, start_time)),
}
}
Ok(())
@ -1993,7 +1993,7 @@ pub struct PersistenceState {
last_persisted_block_hash: B256,
/// Receiver end of channel where the result of the persistence task will be
/// sent when done. A None value means there's no persistence task in progress.
rx: Option<oneshot::Receiver<Option<B256>>>,
rx: Option<(oneshot::Receiver<Option<B256>>, Instant)>,
/// The last persisted block number.
///
/// This tracks the chain height that is persisted on disk
@ -2009,7 +2009,7 @@ impl PersistenceState {
/// Sets state for a started persistence task.
fn start(&mut self, rx: oneshot::Receiver<Option<B256>>) {
self.rx = Some(rx);
self.rx = Some((rx, Instant::now()));
}
/// Sets state for a finished persistence task.