diff --git a/Cargo.lock b/Cargo.lock index 5b605e51d..386cecc67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -88,9 +88,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.7" +version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" +checksum = "42cd52102d3df161c77a887b608d7a4897d7cc112886a9537b738a887a03aaff" dependencies = [ "cfg-if", "getrandom 0.2.12", @@ -5946,6 +5946,7 @@ dependencies = [ "assert_matches", "bytes", "criterion", + "dashmap", "derive_more", "eyre", "iai", @@ -5967,8 +5968,10 @@ dependencies = [ "reth-nippy-jar", "reth-primitives", "reth-tracing", + "rustc-hash", "serde", "serde_json", + "strum 0.26.1", "tempfile", "test-fuzz 5.0.0", "thiserror", diff --git a/crates/storage/db/Cargo.toml b/crates/storage/db/Cargo.toml index 03b99345b..b133db21c 100644 --- a/crates/storage/db/Cargo.toml +++ b/crates/storage/db/Cargo.toml @@ -38,11 +38,14 @@ parking_lot.workspace = true derive_more.workspace = true eyre.workspace = true paste = "1.0" +dashmap = "5.5.3" +rustc-hash = "1.1.0" # arbitrary utils arbitrary = { workspace = true, features = ["derive"], optional = true } proptest = { workspace = true, optional = true } proptest-derive = { workspace = true, optional = true } +strum = { workspace = true, features = ["derive"] } once_cell.workspace = true [dev-dependencies] diff --git a/crates/storage/db/src/implementation/mdbx/cursor.rs b/crates/storage/db/src/implementation/mdbx/cursor.rs index 4eec83999..f6d6d988d 100644 --- a/crates/storage/db/src/implementation/mdbx/cursor.rs +++ b/crates/storage/db/src/implementation/mdbx/cursor.rs @@ -6,14 +6,14 @@ use crate::{ DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker, ReverseWalker, Walker, }, - metrics::{Operation, OperationMetrics}, + metrics::{DatabaseEnvMetrics, Operation}, table::{Compress, Decode, Decompress, DupSort, Encode, Table}, tables::utils::*, DatabaseError, }; use reth_interfaces::db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation}; use reth_libmdbx::{self, Error as MDBXError, TransactionKind, WriteFlags, RO, RW}; -use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds}; +use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds, sync::Arc}; /// Read only Cursor. pub type CursorRO = Cursor; @@ -27,18 +27,22 @@ pub struct Cursor { pub(crate) inner: reth_libmdbx::Cursor, /// Cache buffer that receives compressed values. buf: Vec, - /// Whether to record metrics or not. - with_metrics: bool, + /// Reference to metric handles in the DB environment. If `None`, metrics are not recorded. + metrics: Option>, /// Phantom data to enforce encoding/decoding. _dbi: PhantomData, } impl Cursor { - pub(crate) fn new_with_metrics(inner: reth_libmdbx::Cursor, with_metrics: bool) -> Self { - Self { inner, buf: Vec::new(), with_metrics, _dbi: PhantomData } + pub(crate) fn new_with_metrics( + inner: reth_libmdbx::Cursor, + metrics: Option>, + ) -> Self { + Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData } } - /// If `self.with_metrics == true`, record a metric with the provided operation and value size. + /// If `self.metrics` is `Some(...)`, record a metric with the provided operation and value + /// size. /// /// Otherwise, just execute the closure. fn execute_with_operation_metric( @@ -47,8 +51,8 @@ impl Cursor { value_size: Option, f: impl FnOnce(&mut Self) -> R, ) -> R { - if self.with_metrics { - OperationMetrics::record(T::NAME, operation, value_size, || f(self)) + if let Some(metrics) = self.metrics.as_ref().cloned() { + metrics.record_operation(T::NAME, operation, value_size, || f(self)) } else { f(self) } diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 821287548..2efd0a748 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -3,6 +3,7 @@ use crate::{ database::Database, database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics}, + metrics::DatabaseEnvMetrics, tables::{TableType, Tables}, utils::default_page_size, DatabaseError, @@ -16,7 +17,7 @@ use reth_libmdbx::{ PageSize, SyncMode, RO, RW, }; use reth_tracing::tracing::error; -use std::{ops::Deref, path::Path}; +use std::{ops::Deref, path::Path, sync::Arc}; use tx::Tx; pub mod cursor; @@ -86,8 +87,8 @@ impl DatabaseArguments { pub struct DatabaseEnv { /// Libmdbx-sys environment. inner: Environment, - /// Whether to record metrics or not. - with_metrics: bool, + /// Cache for metric handles. If `None`, metrics are not recorded. + metrics: Option>, } impl Database for DatabaseEnv { @@ -97,14 +98,14 @@ impl Database for DatabaseEnv { fn tx(&self) -> Result { Ok(Tx::new_with_metrics( self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?, - self.with_metrics, + self.metrics.as_ref().cloned(), )) } fn tx_mut(&self) -> Result { Ok(Tx::new_with_metrics( self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?, - self.with_metrics, + self.metrics.as_ref().cloned(), )) } } @@ -309,7 +310,7 @@ impl DatabaseEnv { let env = DatabaseEnv { inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?, - with_metrics: false, + metrics: None, }; Ok(env) @@ -317,7 +318,7 @@ impl DatabaseEnv { /// Enables metrics on the database. pub fn with_metrics(mut self) -> Self { - self.with_metrics = true; + self.metrics = Some(DatabaseEnvMetrics::new().into()); self } diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 63fb2a799..2a4afc350 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -3,7 +3,7 @@ use super::cursor::Cursor; use crate::{ metrics::{ - Operation, OperationMetrics, TransactionMetrics, TransactionMode, TransactionOutcome, + DatabaseEnvMetrics, Operation, TransactionMetrics, TransactionMode, TransactionOutcome, }, table::{Compress, DupSort, Encode, Table, TableImporter}, tables::{utils::decode_one, Tables, NUM_TABLES}, @@ -50,9 +50,12 @@ impl Tx { /// Creates new `Tx` object with a `RO` or `RW` transaction and optionally enables metrics. #[track_caller] - pub fn new_with_metrics(inner: Transaction, with_metrics: bool) -> Self { - let metrics_handler = if with_metrics { - let handler = MetricsHandler::::new(inner.id()); + pub fn new_with_metrics( + inner: Transaction, + metrics: Option>, + ) -> Self { + let metrics_handler = if let Some(metrics) = metrics { + let handler = MetricsHandler::::new(inner.id(), metrics); TransactionMetrics::record_open(handler.transaction_mode()); handler.log_transaction_opened(); Some(handler) @@ -90,7 +93,10 @@ impl Tx { .cursor_with_dbi(self.get_dbi::()?) .map_err(|e| DatabaseError::InitCursor(e.into()))?; - Ok(Cursor::new_with_metrics(inner, self.metrics_handler.is_some())) + Ok(Cursor::new_with_metrics( + inner, + self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()), + )) } /// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and @@ -137,7 +143,9 @@ impl Tx { ) -> R { if let Some(metrics_handler) = &self.metrics_handler { metrics_handler.log_backtrace_on_long_read_transaction(); - OperationMetrics::record(T::NAME, operation, value_size, || f(&self.inner)) + metrics_handler + .env_metrics + .record_operation(T::NAME, operation, value_size, || f(&self.inner)) } else { f(&self.inner) } @@ -159,17 +167,19 @@ struct MetricsHandler { /// If `true`, the backtrace of transaction has already been recorded and logged. /// See [MetricsHandler::log_backtrace_on_long_read_transaction]. backtrace_recorded: AtomicBool, + env_metrics: Arc, _marker: PhantomData, } impl MetricsHandler { - fn new(txn_id: u64) -> Self { + fn new(txn_id: u64, env_metrics: Arc) -> Self { Self { txn_id, start: Instant::now(), close_recorded: false, record_backtrace: true, backtrace_recorded: AtomicBool::new(false), + env_metrics, _marker: PhantomData, } } diff --git a/crates/storage/db/src/metrics.rs b/crates/storage/db/src/metrics.rs index 971ae194f..ff76a7b65 100644 --- a/crates/storage/db/src/metrics.rs +++ b/crates/storage/db/src/metrics.rs @@ -1,10 +1,57 @@ +use crate::{Tables, NUM_TABLES}; +use dashmap::DashMap; use metrics::{Gauge, Histogram}; use reth_libmdbx::CommitLatency; use reth_metrics::{metrics::Counter, Metrics}; -use std::time::{Duration, Instant}; +use rustc_hash::FxHasher; +use std::{ + hash::BuildHasherDefault, + str::FromStr, + time::{Duration, Instant}, +}; +use strum::EnumCount; const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096; +/// Caches metric handles for database environment to make sure handles are not re-created +/// on every operation. +#[derive(Debug)] +pub struct DatabaseEnvMetrics { + /// Caches OperationMetrics handles for each table and operation tuple. + operations: DashMap<(Tables, Operation), OperationMetrics, BuildHasherDefault>, +} + +impl DatabaseEnvMetrics { + pub(crate) fn new() -> Self { + Self { + operations: DashMap::with_capacity_and_hasher( + NUM_TABLES * Operation::COUNT, + BuildHasherDefault::::default(), + ), + } + } + + /// Record a metric for database operation executed in `f`. Panics if the table name is unknown. + pub(crate) fn record_operation( + &self, + table: &'static str, + operation: Operation, + value_size: Option, + f: impl FnOnce() -> R, + ) -> R { + let handle = self + .operations + .entry((Tables::from_str(table).expect("unknown table name"), operation)) + .or_insert_with(|| { + OperationMetrics::new_with_labels(&[ + (Labels::Table.as_str(), table), + (Labels::Operation.as_str(), operation.as_str()), + ]) + }); + handle.record(value_size, f) + } +} + /// Transaction mode for the database, either read-only or read-write. #[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] pub(crate) enum TransactionMode { @@ -51,7 +98,7 @@ impl TransactionOutcome { } /// Types of operations conducted on the database: get, put, delete, and various cursor operations. -#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount)] pub(crate) enum Operation { /// Database get operation. Get, @@ -199,24 +246,15 @@ impl OperationMetrics { /// /// The duration it took to execute the closure is recorded only if the provided `value_size` is /// larger than [LARGE_VALUE_THRESHOLD_BYTES]. - pub(crate) fn record( - table: &'static str, - operation: Operation, - value_size: Option, - f: impl FnOnce() -> T, - ) -> T { - let metrics = Self::new_with_labels(&[ - (Labels::Table.as_str(), table), - (Labels::Operation.as_str(), operation.as_str()), - ]); - metrics.calls_total.increment(1); + pub(crate) fn record(&self, value_size: Option, f: impl FnOnce() -> R) -> R { + self.calls_total.increment(1); // Record duration only for large values to prevent the performance hit of clock syscall // on small operations if value_size.map_or(false, |size| size > LARGE_VALUE_THRESHOLD_BYTES) { let start = Instant::now(); let result = f(); - metrics.large_value_duration_seconds.record(start.elapsed()); + self.large_value_duration_seconds.record(start.elapsed()); result } else { f() diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index 337554866..558151625 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -108,7 +108,7 @@ macro_rules! tables { (TableType::Table, [$($table:ident),*]), (TableType::DupSort, [$($dupsort:ident),*]) ]) => { - #[derive(Debug, PartialEq, Copy, Clone)] + #[derive(Debug, PartialEq, Copy, Clone, Hash, Eq)] /// Default tables that should be present inside database. pub enum Tables { $(