From 755ce9efc2b9c0bca2e715522778e83df91b2fe6 Mon Sep 17 00:00:00 2001 From: Seva Zhidkov Date: Thu, 22 Feb 2024 13:57:47 +0000 Subject: [PATCH] perf(metrics): cache handles for database transaction metrics (#6730) --- .../storage/db/src/implementation/mdbx/tx.rs | 11 +- crates/storage/db/src/metrics.rs | 152 ++++++++++++++---- 2 files changed, 122 insertions(+), 41 deletions(-) diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 2e23e32f8..7688c6031 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -2,9 +2,7 @@ use super::cursor::Cursor; use crate::{ - metrics::{ - DatabaseEnvMetrics, Operation, TransactionMetrics, TransactionMode, TransactionOutcome, - }, + metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome}, table::{Compress, DupSort, Encode, Table, TableImporter}, tables::{utils::decode_one, Tables}, transaction::{DbTx, DbTxMut}, @@ -55,7 +53,7 @@ impl Tx { ) -> Self { let metrics_handler = if let Some(metrics) = metrics { let handler = MetricsHandler::::new(inner.id(), metrics); - TransactionMetrics::record_open(handler.transaction_mode()); + handler.env_metrics.record_opened_transaction(handler.transaction_mode()); handler.log_transaction_opened(); Some(handler) } else { @@ -128,7 +126,7 @@ impl Tx { let (result, commit_latency, close_duration) = run(self); let open_duration = metrics_handler.start.elapsed(); - TransactionMetrics::record_close( + metrics_handler.env_metrics.record_closed_transaction( metrics_handler.transaction_mode(), outcome, open_duration, @@ -244,8 +242,7 @@ impl Drop for MetricsHandler { fn drop(&mut self) { if !self.close_recorded { self.log_backtrace_on_long_read_transaction(); - - TransactionMetrics::record_close( + self.env_metrics.record_closed_transaction( self.transaction_mode(), TransactionOutcome::Drop, self.start.elapsed(), diff --git a/crates/storage/db/src/metrics.rs b/crates/storage/db/src/metrics.rs index eef7d40ed..1b673eb17 100644 --- a/crates/storage/db/src/metrics.rs +++ b/crates/storage/db/src/metrics.rs @@ -2,7 +2,7 @@ use crate::Tables; use metrics::{Gauge, Histogram}; use reth_libmdbx::CommitLatency; use reth_metrics::{metrics::Counter, Metrics}; -use rustc_hash::FxHasher; +use rustc_hash::{FxHashMap, FxHasher}; use std::{ collections::HashMap, hash::BuildHasherDefault, @@ -20,14 +20,31 @@ const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096; #[derive(Debug)] pub struct DatabaseEnvMetrics { /// Caches OperationMetrics handles for each table and operation tuple. - operations: HashMap<(Tables, Operation), OperationMetrics, BuildHasherDefault>, + operations: FxHashMap<(Tables, Operation), OperationMetrics>, + /// Caches TransactionMetrics handles for counters grouped by only transaction mode. + /// Updated both at tx open and close. + transactions: FxHashMap, + /// Caches TransactionOutcomeMetrics handles for counters grouped by transaction mode and + /// outcome. Can only be updated at tx close, as outcome is only known at that point. + transaction_outcomes: + FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics>, } impl DatabaseEnvMetrics { pub(crate) fn new() -> Self { - // Pre-populate the map with all possible table and operation combinations + // Pre-populate metric handle maps with all possible combinations of labels // to avoid runtime locks on the map when recording metrics. - let mut operations = HashMap::with_capacity_and_hasher( + Self { + operations: Self::generate_operation_handles(), + transactions: Self::generate_transaction_handles(), + transaction_outcomes: Self::generate_transaction_outcome_handles(), + } + } + + /// Generate a map of all possible operation handles for each table and operation tuple. + /// Used for tracking all operation metrics. + fn generate_operation_handles() -> FxHashMap<(Tables, Operation), OperationMetrics> { + let mut operations = FxHashMap::with_capacity_and_hasher( Tables::COUNT * Operation::COUNT, BuildHasherDefault::::default(), ); @@ -42,7 +59,45 @@ impl DatabaseEnvMetrics { ); } } - Self { operations } + operations + } + + /// Generate a map of all possible transaction modes to metric handles. + /// Used for tracking a counter of open transactions. + fn generate_transaction_handles() -> FxHashMap { + TransactionMode::iter() + .map(|mode| { + ( + mode, + TransactionMetrics::new_with_labels(&[( + Labels::TransactionMode.as_str(), + mode.as_str(), + )]), + ) + }) + .collect() + } + + /// Generate a map of all possible transaction mode and outcome handles. + /// Used for tracking various stats for finished transactions (e.g. commit duration). + fn generate_transaction_outcome_handles( + ) -> FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics> { + let mut transaction_outcomes = HashMap::with_capacity_and_hasher( + TransactionMode::COUNT * TransactionOutcome::COUNT, + BuildHasherDefault::::default(), + ); + for mode in TransactionMode::iter() { + for outcome in TransactionOutcome::iter() { + transaction_outcomes.insert( + (mode, outcome), + TransactionOutcomeMetrics::new_with_labels(&[ + (Labels::TransactionMode.as_str(), mode.as_str()), + (Labels::TransactionOutcome.as_str(), outcome.as_str()), + ]), + ); + } + } + transaction_outcomes } /// Record a metric for database operation executed in `f`. @@ -59,10 +114,38 @@ impl DatabaseEnvMetrics { .expect("operation & table metric handle not found") .record(value_size, f) } + + /// Record metrics for opening a database transaction. + pub(crate) fn record_opened_transaction(&self, mode: TransactionMode) { + self.transactions + .get(&mode) + .expect("transaction mode metric handle not found") + .record_open(); + } + + /// Record metrics for closing a database transactions. + pub(crate) fn record_closed_transaction( + &self, + mode: TransactionMode, + outcome: TransactionOutcome, + open_duration: Duration, + close_duration: Option, + commit_latency: Option, + ) { + self.transactions + .get(&mode) + .expect("transaction mode metric handle not found") + .record_close(); + + self.transaction_outcomes + .get(&(mode, outcome)) + .expect("transaction outcome metric handle not found") + .record(open_duration, close_duration, commit_latency); + } } /// Transaction mode for the database, either read-only or read-write. -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)] pub(crate) enum TransactionMode { /// Read-only transaction mode. ReadOnly, @@ -85,7 +168,7 @@ impl TransactionMode { } /// Transaction outcome after a database operation - commit, abort, or drop. -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)] pub(crate) enum TransactionOutcome { /// Successful commit of the transaction. Commit, @@ -175,6 +258,21 @@ impl Labels { pub(crate) struct TransactionMetrics { /// Total number of currently open database transactions open_total: Gauge, +} + +impl TransactionMetrics { + pub(crate) fn record_open(&self) { + self.open_total.increment(1.0); + } + + pub(crate) fn record_close(&self) { + self.open_total.decrement(1.0); + } +} + +#[derive(Metrics, Clone)] +#[metrics(scope = "database.transaction")] +pub(crate) struct TransactionOutcomeMetrics { /// The time a database transaction has been open open_duration_seconds: Histogram, /// The time it took to close a database transaction @@ -198,44 +296,30 @@ pub(crate) struct TransactionMetrics { commit_gc_cputime_duration_seconds: Histogram, } -impl TransactionMetrics { - /// Record transaction opening. - pub(crate) fn record_open(mode: TransactionMode) { - let metrics = Self::new_with_labels(&[(Labels::TransactionMode.as_str(), mode.as_str())]); - metrics.open_total.increment(1.0); - } - +impl TransactionOutcomeMetrics { /// Record transaction closing with the duration it was open and the duration it took to close /// it. - pub(crate) fn record_close( - mode: TransactionMode, - outcome: TransactionOutcome, + pub(crate) fn record( + &self, open_duration: Duration, close_duration: Option, commit_latency: Option, ) { - let metrics = Self::new_with_labels(&[(Labels::TransactionMode.as_str(), mode.as_str())]); - metrics.open_total.decrement(1.0); - - let metrics = Self::new_with_labels(&[ - (Labels::TransactionMode.as_str(), mode.as_str()), - (Labels::TransactionOutcome.as_str(), outcome.as_str()), - ]); - metrics.open_duration_seconds.record(open_duration); + self.open_duration_seconds.record(open_duration); if let Some(close_duration) = close_duration { - metrics.close_duration_seconds.record(close_duration) + self.close_duration_seconds.record(close_duration) } if let Some(commit_latency) = commit_latency { - metrics.commit_preparation_duration_seconds.record(commit_latency.preparation()); - metrics.commit_gc_wallclock_duration_seconds.record(commit_latency.gc_wallclock()); - metrics.commit_audit_duration_seconds.record(commit_latency.audit()); - metrics.commit_write_duration_seconds.record(commit_latency.write()); - metrics.commit_sync_duration_seconds.record(commit_latency.sync()); - metrics.commit_ending_duration_seconds.record(commit_latency.ending()); - metrics.commit_whole_duration_seconds.record(commit_latency.whole()); - metrics.commit_gc_cputime_duration_seconds.record(commit_latency.gc_cputime()); + self.commit_preparation_duration_seconds.record(commit_latency.preparation()); + self.commit_gc_wallclock_duration_seconds.record(commit_latency.gc_wallclock()); + self.commit_audit_duration_seconds.record(commit_latency.audit()); + self.commit_write_duration_seconds.record(commit_latency.write()); + self.commit_sync_duration_seconds.record(commit_latency.sync()); + self.commit_ending_duration_seconds.record(commit_latency.ending()); + self.commit_whole_duration_seconds.record(commit_latency.whole()); + self.commit_gc_cputime_duration_seconds.record(commit_latency.gc_cputime()); } } }