diff --git a/crates/engine/tree/src/metrics.rs b/crates/engine/tree/src/metrics.rs index 9579affe6..ecbddc68d 100644 --- a/crates/engine/tree/src/metrics.rs +++ b/crates/engine/tree/src/metrics.rs @@ -1,4 +1,7 @@ -use reth_metrics::{metrics::Gauge, Metrics}; +use reth_metrics::{ + metrics::{Gauge, Histogram}, + Metrics, +}; /// Metrics for the `BasicBlockDownloader`. #[derive(Metrics)] @@ -7,3 +10,15 @@ pub(crate) struct BlockDownloaderMetrics { /// How many blocks are currently being downloaded. pub(crate) active_block_downloads: Gauge, } + +/// Metrics for the `PersistenceService` +#[derive(Metrics)] +#[metrics(scope = "consensus.engine.persistence")] +pub(crate) struct PersistenceMetrics { + /// How long it took for blocks to be removed + pub(crate) remove_blocks_above_duration_seconds: Histogram, + /// How long it took for blocks to be saved + pub(crate) save_blocks_duration_seconds: Histogram, + /// How long it took for blocks to be pruned + pub(crate) prune_before_duration_seconds: Histogram, +} diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index d8b2f254a..04943cd9b 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -1,12 +1,16 @@ #![allow(dead_code)] +use crate::metrics::PersistenceMetrics; use reth_chain_state::ExecutedBlock; use reth_db::Database; use reth_errors::ProviderError; use reth_primitives::B256; use reth_provider::{writer::UnifiedStorageWriter, ProviderFactory, StaticFileProviderFactory}; use reth_prune::{Pruner, PrunerError, PrunerOutput}; -use std::sync::mpsc::{Receiver, SendError, Sender}; +use std::{ + sync::mpsc::{Receiver, SendError, Sender}, + time::Instant, +}; use thiserror::Error; use tokio::sync::oneshot; use tracing::{debug, error}; @@ -26,24 +30,29 @@ pub struct PersistenceService { incoming: Receiver, /// The pruner pruner: Pruner>, + /// metrics + metrics: PersistenceMetrics, } impl PersistenceService { /// Create a new persistence service - pub const fn new( + pub fn new( provider: ProviderFactory, incoming: Receiver, pruner: Pruner>, ) -> Self { - Self { provider, incoming, pruner } + Self { provider, incoming, pruner, metrics: PersistenceMetrics::default() } } /// Prunes block data before the given block hash according to the configured prune /// configuration. fn prune_before(&mut self, block_num: u64) -> Result { debug!(target: "tree::persistence", ?block_num, "Running pruner"); + let start_time = Instant::now(); // TODO: doing this properly depends on pruner segment changes - self.pruner.run(block_num) + let result = self.pruner.run(block_num); + self.metrics.prune_before_duration_seconds.record(start_time.elapsed()); + result } } @@ -58,33 +67,14 @@ where while let Ok(action) = self.incoming.recv() { match action { PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => { - let provider_rw = self.provider.provider_rw()?; - let sf_provider = self.provider.static_file_provider(); - - UnifiedStorageWriter::from(&provider_rw, &sf_provider) - .remove_blocks_above(new_tip_num)?; - UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?; - + self.on_remove_blocks_above(new_tip_num)?; // we ignore the error because the caller may or may not care about the result let _ = sender.send(()); } PersistenceAction::SaveBlocks(blocks, sender) => { - let Some(last_block) = blocks.last() else { - let _ = sender.send(None); - continue - }; - - let last_block_hash = last_block.block().hash(); - - let provider_rw = self.provider.provider_rw()?; - let static_file_provider = self.provider.static_file_provider(); - - UnifiedStorageWriter::from(&provider_rw, &static_file_provider) - .save_blocks(&blocks)?; - UnifiedStorageWriter::commit(provider_rw, static_file_provider)?; - + let result = self.on_save_blocks(blocks)?; // we ignore the error because the caller may or may not care about the result - let _ = sender.send(Some(last_block_hash)); + let _ = sender.send(result); } PersistenceAction::PruneBefore(block_num, sender) => { let res = self.prune_before(block_num)?; @@ -96,6 +86,33 @@ where } Ok(()) } + + fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result<(), PersistenceError> { + let start_time = Instant::now(); + let provider_rw = self.provider.provider_rw()?; + let sf_provider = self.provider.static_file_provider(); + + UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?; + UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?; + + self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed()); + Ok(()) + } + + fn on_save_blocks(&self, blocks: Vec) -> Result, PersistenceError> { + let start_time = Instant::now(); + let last_block_hash = blocks.last().map(|block| block.block().hash()); + + if last_block_hash.is_some() { + let provider_rw = self.provider.provider_rw()?; + let static_file_provider = self.provider.static_file_provider(); + + UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?; + UnifiedStorageWriter::commit(provider_rw, static_file_provider)?; + } + self.metrics.save_blocks_duration_seconds.record(start_time.elapsed()); + Ok(last_block_hash) + } } /// One of the errors that can happen when using the persistence service. diff --git a/crates/engine/tree/src/tree/metrics.rs b/crates/engine/tree/src/tree/metrics.rs index bc6720e50..1a1c2edf2 100644 --- a/crates/engine/tree/src/tree/metrics.rs +++ b/crates/engine/tree/src/tree/metrics.rs @@ -1,5 +1,5 @@ use reth_metrics::{ - metrics::{Counter, Gauge}, + metrics::{Counter, Gauge, Histogram}, Metrics, }; @@ -15,5 +15,7 @@ pub(crate) struct EngineApiMetrics { pub(crate) forkchoice_updated_messages: Counter, /// The total count of new payload messages received. pub(crate) new_payload_messages: Counter, + /// Histogram of persistence operation durations (in seconds) + pub(crate) persistence_duration: Histogram, // TODO add latency metrics } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index e88f3cb9e..c4e3e9ecf 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -858,18 +858,18 @@ where } if self.persistence_state.in_progress() { - let mut rx = self + let (mut rx, start_time) = self .persistence_state .rx .take() .expect("if a persistence task is in progress Receiver must be Some"); - - // Check if persistence has completed + // Check if persistence has complete match rx.try_recv() { Ok(last_persisted_block_hash) => { + self.metrics.persistence_duration.record(start_time.elapsed()); let Some(last_persisted_block_hash) = last_persisted_block_hash else { - // if this happened, then we persisted no blocks because we sent an empty - // vec of blocks + // if this happened, then we persisted no blocks because we sent an + // empty vec of blocks warn!(target: "engine", "Persistence task completed but did not persist any blocks"); return Ok(()) }; @@ -883,7 +883,7 @@ where } } Err(TryRecvError::Closed) => return Err(TryRecvError::Closed), - Err(TryRecvError::Empty) => self.persistence_state.rx = Some(rx), + Err(TryRecvError::Empty) => self.persistence_state.rx = Some((rx, start_time)), } } Ok(()) @@ -1993,7 +1993,7 @@ pub struct PersistenceState { last_persisted_block_hash: B256, /// Receiver end of channel where the result of the persistence task will be /// sent when done. A None value means there's no persistence task in progress. - rx: Option>>, + rx: Option<(oneshot::Receiver>, Instant)>, /// The last persisted block number. /// /// This tracks the chain height that is persisted on disk @@ -2009,7 +2009,7 @@ impl PersistenceState { /// Sets state for a started persistence task. fn start(&mut self, rx: oneshot::Receiver>) { - self.rx = Some(rx); + self.rx = Some((rx, Instant::now())); } /// Sets state for a finished persistence task.