mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
perf(db): introduce environment-level cache for metric handles (#6550)
This commit is contained in:
@ -38,11 +38,14 @@ parking_lot.workspace = true
|
||||
derive_more.workspace = true
|
||||
eyre.workspace = true
|
||||
paste = "1.0"
|
||||
dashmap = "5.5.3"
|
||||
rustc-hash = "1.1.0"
|
||||
|
||||
# arbitrary utils
|
||||
arbitrary = { workspace = true, features = ["derive"], optional = true }
|
||||
proptest = { workspace = true, optional = true }
|
||||
proptest-derive = { workspace = true, optional = true }
|
||||
strum = { workspace = true, features = ["derive"] }
|
||||
once_cell.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
|
||||
@ -6,14 +6,14 @@ use crate::{
|
||||
DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker,
|
||||
ReverseWalker, Walker,
|
||||
},
|
||||
metrics::{Operation, OperationMetrics},
|
||||
metrics::{DatabaseEnvMetrics, Operation},
|
||||
table::{Compress, Decode, Decompress, DupSort, Encode, Table},
|
||||
tables::utils::*,
|
||||
DatabaseError,
|
||||
};
|
||||
use reth_interfaces::db::{DatabaseErrorInfo, DatabaseWriteError, DatabaseWriteOperation};
|
||||
use reth_libmdbx::{self, Error as MDBXError, TransactionKind, WriteFlags, RO, RW};
|
||||
use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds};
|
||||
use std::{borrow::Cow, collections::Bound, marker::PhantomData, ops::RangeBounds, sync::Arc};
|
||||
|
||||
/// Read only Cursor.
|
||||
pub type CursorRO<T> = Cursor<RO, T>;
|
||||
@ -27,18 +27,22 @@ pub struct Cursor<K: TransactionKind, T: Table> {
|
||||
pub(crate) inner: reth_libmdbx::Cursor<K>,
|
||||
/// Cache buffer that receives compressed values.
|
||||
buf: Vec<u8>,
|
||||
/// Whether to record metrics or not.
|
||||
with_metrics: bool,
|
||||
/// Reference to metric handles in the DB environment. If `None`, metrics are not recorded.
|
||||
metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
/// Phantom data to enforce encoding/decoding.
|
||||
_dbi: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl<K: TransactionKind, T: Table> Cursor<K, T> {
|
||||
pub(crate) fn new_with_metrics(inner: reth_libmdbx::Cursor<K>, with_metrics: bool) -> Self {
|
||||
Self { inner, buf: Vec::new(), with_metrics, _dbi: PhantomData }
|
||||
pub(crate) fn new_with_metrics(
|
||||
inner: reth_libmdbx::Cursor<K>,
|
||||
metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
) -> Self {
|
||||
Self { inner, buf: Vec::new(), metrics, _dbi: PhantomData }
|
||||
}
|
||||
|
||||
/// If `self.with_metrics == true`, record a metric with the provided operation and value size.
|
||||
/// If `self.metrics` is `Some(...)`, record a metric with the provided operation and value
|
||||
/// size.
|
||||
///
|
||||
/// Otherwise, just execute the closure.
|
||||
fn execute_with_operation_metric<R>(
|
||||
@ -47,8 +51,8 @@ impl<K: TransactionKind, T: Table> Cursor<K, T> {
|
||||
value_size: Option<usize>,
|
||||
f: impl FnOnce(&mut Self) -> R,
|
||||
) -> R {
|
||||
if self.with_metrics {
|
||||
OperationMetrics::record(T::NAME, operation, value_size, || f(self))
|
||||
if let Some(metrics) = self.metrics.as_ref().cloned() {
|
||||
metrics.record_operation(T::NAME, operation, value_size, || f(self))
|
||||
} else {
|
||||
f(self)
|
||||
}
|
||||
|
||||
@ -3,6 +3,7 @@
|
||||
use crate::{
|
||||
database::Database,
|
||||
database_metrics::{DatabaseMetadata, DatabaseMetadataValue, DatabaseMetrics},
|
||||
metrics::DatabaseEnvMetrics,
|
||||
tables::{TableType, Tables},
|
||||
utils::default_page_size,
|
||||
DatabaseError,
|
||||
@ -16,7 +17,7 @@ use reth_libmdbx::{
|
||||
PageSize, SyncMode, RO, RW,
|
||||
};
|
||||
use reth_tracing::tracing::error;
|
||||
use std::{ops::Deref, path::Path};
|
||||
use std::{ops::Deref, path::Path, sync::Arc};
|
||||
use tx::Tx;
|
||||
|
||||
pub mod cursor;
|
||||
@ -86,8 +87,8 @@ impl DatabaseArguments {
|
||||
pub struct DatabaseEnv {
|
||||
/// Libmdbx-sys environment.
|
||||
inner: Environment,
|
||||
/// Whether to record metrics or not.
|
||||
with_metrics: bool,
|
||||
/// Cache for metric handles. If `None`, metrics are not recorded.
|
||||
metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
}
|
||||
|
||||
impl Database for DatabaseEnv {
|
||||
@ -97,14 +98,14 @@ impl Database for DatabaseEnv {
|
||||
fn tx(&self) -> Result<Self::TX, DatabaseError> {
|
||||
Ok(Tx::new_with_metrics(
|
||||
self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
|
||||
self.with_metrics,
|
||||
self.metrics.as_ref().cloned(),
|
||||
))
|
||||
}
|
||||
|
||||
fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
|
||||
Ok(Tx::new_with_metrics(
|
||||
self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
|
||||
self.with_metrics,
|
||||
self.metrics.as_ref().cloned(),
|
||||
))
|
||||
}
|
||||
}
|
||||
@ -309,7 +310,7 @@ impl DatabaseEnv {
|
||||
|
||||
let env = DatabaseEnv {
|
||||
inner: inner_env.open(path).map_err(|e| DatabaseError::Open(e.into()))?,
|
||||
with_metrics: false,
|
||||
metrics: None,
|
||||
};
|
||||
|
||||
Ok(env)
|
||||
@ -317,7 +318,7 @@ impl DatabaseEnv {
|
||||
|
||||
/// Enables metrics on the database.
|
||||
pub fn with_metrics(mut self) -> Self {
|
||||
self.with_metrics = true;
|
||||
self.metrics = Some(DatabaseEnvMetrics::new().into());
|
||||
self
|
||||
}
|
||||
|
||||
|
||||
@ -3,7 +3,7 @@
|
||||
use super::cursor::Cursor;
|
||||
use crate::{
|
||||
metrics::{
|
||||
Operation, OperationMetrics, TransactionMetrics, TransactionMode, TransactionOutcome,
|
||||
DatabaseEnvMetrics, Operation, TransactionMetrics, TransactionMode, TransactionOutcome,
|
||||
},
|
||||
table::{Compress, DupSort, Encode, Table, TableImporter},
|
||||
tables::{utils::decode_one, Tables, NUM_TABLES},
|
||||
@ -50,9 +50,12 @@ impl<K: TransactionKind> Tx<K> {
|
||||
|
||||
/// Creates new `Tx` object with a `RO` or `RW` transaction and optionally enables metrics.
|
||||
#[track_caller]
|
||||
pub fn new_with_metrics(inner: Transaction<K>, with_metrics: bool) -> Self {
|
||||
let metrics_handler = if with_metrics {
|
||||
let handler = MetricsHandler::<K>::new(inner.id());
|
||||
pub fn new_with_metrics(
|
||||
inner: Transaction<K>,
|
||||
metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||
) -> Self {
|
||||
let metrics_handler = if let Some(metrics) = metrics {
|
||||
let handler = MetricsHandler::<K>::new(inner.id(), metrics);
|
||||
TransactionMetrics::record_open(handler.transaction_mode());
|
||||
handler.log_transaction_opened();
|
||||
Some(handler)
|
||||
@ -90,7 +93,10 @@ impl<K: TransactionKind> Tx<K> {
|
||||
.cursor_with_dbi(self.get_dbi::<T>()?)
|
||||
.map_err(|e| DatabaseError::InitCursor(e.into()))?;
|
||||
|
||||
Ok(Cursor::new_with_metrics(inner, self.metrics_handler.is_some()))
|
||||
Ok(Cursor::new_with_metrics(
|
||||
inner,
|
||||
self.metrics_handler.as_ref().map(|h| h.env_metrics.clone()),
|
||||
))
|
||||
}
|
||||
|
||||
/// If `self.metrics_handler == Some(_)`, measure the time it takes to execute the closure and
|
||||
@ -137,7 +143,9 @@ impl<K: TransactionKind> Tx<K> {
|
||||
) -> R {
|
||||
if let Some(metrics_handler) = &self.metrics_handler {
|
||||
metrics_handler.log_backtrace_on_long_read_transaction();
|
||||
OperationMetrics::record(T::NAME, operation, value_size, || f(&self.inner))
|
||||
metrics_handler
|
||||
.env_metrics
|
||||
.record_operation(T::NAME, operation, value_size, || f(&self.inner))
|
||||
} else {
|
||||
f(&self.inner)
|
||||
}
|
||||
@ -159,17 +167,19 @@ struct MetricsHandler<K: TransactionKind> {
|
||||
/// If `true`, the backtrace of transaction has already been recorded and logged.
|
||||
/// See [MetricsHandler::log_backtrace_on_long_read_transaction].
|
||||
backtrace_recorded: AtomicBool,
|
||||
env_metrics: Arc<DatabaseEnvMetrics>,
|
||||
_marker: PhantomData<K>,
|
||||
}
|
||||
|
||||
impl<K: TransactionKind> MetricsHandler<K> {
|
||||
fn new(txn_id: u64) -> Self {
|
||||
fn new(txn_id: u64, env_metrics: Arc<DatabaseEnvMetrics>) -> Self {
|
||||
Self {
|
||||
txn_id,
|
||||
start: Instant::now(),
|
||||
close_recorded: false,
|
||||
record_backtrace: true,
|
||||
backtrace_recorded: AtomicBool::new(false),
|
||||
env_metrics,
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,10 +1,57 @@
|
||||
use crate::{Tables, NUM_TABLES};
|
||||
use dashmap::DashMap;
|
||||
use metrics::{Gauge, Histogram};
|
||||
use reth_libmdbx::CommitLatency;
|
||||
use reth_metrics::{metrics::Counter, Metrics};
|
||||
use std::time::{Duration, Instant};
|
||||
use rustc_hash::FxHasher;
|
||||
use std::{
|
||||
hash::BuildHasherDefault,
|
||||
str::FromStr,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use strum::EnumCount;
|
||||
|
||||
const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096;
|
||||
|
||||
/// Caches metric handles for database environment to make sure handles are not re-created
|
||||
/// on every operation.
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseEnvMetrics {
|
||||
/// Caches OperationMetrics handles for each table and operation tuple.
|
||||
operations: DashMap<(Tables, Operation), OperationMetrics, BuildHasherDefault<FxHasher>>,
|
||||
}
|
||||
|
||||
impl DatabaseEnvMetrics {
|
||||
pub(crate) fn new() -> Self {
|
||||
Self {
|
||||
operations: DashMap::with_capacity_and_hasher(
|
||||
NUM_TABLES * Operation::COUNT,
|
||||
BuildHasherDefault::<FxHasher>::default(),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a metric for database operation executed in `f`. Panics if the table name is unknown.
|
||||
pub(crate) fn record_operation<R>(
|
||||
&self,
|
||||
table: &'static str,
|
||||
operation: Operation,
|
||||
value_size: Option<usize>,
|
||||
f: impl FnOnce() -> R,
|
||||
) -> R {
|
||||
let handle = self
|
||||
.operations
|
||||
.entry((Tables::from_str(table).expect("unknown table name"), operation))
|
||||
.or_insert_with(|| {
|
||||
OperationMetrics::new_with_labels(&[
|
||||
(Labels::Table.as_str(), table),
|
||||
(Labels::Operation.as_str(), operation.as_str()),
|
||||
])
|
||||
});
|
||||
handle.record(value_size, f)
|
||||
}
|
||||
}
|
||||
|
||||
/// Transaction mode for the database, either read-only or read-write.
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
|
||||
pub(crate) enum TransactionMode {
|
||||
@ -51,7 +98,7 @@ impl TransactionOutcome {
|
||||
}
|
||||
|
||||
/// Types of operations conducted on the database: get, put, delete, and various cursor operations.
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount)]
|
||||
pub(crate) enum Operation {
|
||||
/// Database get operation.
|
||||
Get,
|
||||
@ -199,24 +246,15 @@ impl OperationMetrics {
|
||||
///
|
||||
/// The duration it took to execute the closure is recorded only if the provided `value_size` is
|
||||
/// larger than [LARGE_VALUE_THRESHOLD_BYTES].
|
||||
pub(crate) fn record<T>(
|
||||
table: &'static str,
|
||||
operation: Operation,
|
||||
value_size: Option<usize>,
|
||||
f: impl FnOnce() -> T,
|
||||
) -> T {
|
||||
let metrics = Self::new_with_labels(&[
|
||||
(Labels::Table.as_str(), table),
|
||||
(Labels::Operation.as_str(), operation.as_str()),
|
||||
]);
|
||||
metrics.calls_total.increment(1);
|
||||
pub(crate) fn record<R>(&self, value_size: Option<usize>, f: impl FnOnce() -> R) -> R {
|
||||
self.calls_total.increment(1);
|
||||
|
||||
// Record duration only for large values to prevent the performance hit of clock syscall
|
||||
// on small operations
|
||||
if value_size.map_or(false, |size| size > LARGE_VALUE_THRESHOLD_BYTES) {
|
||||
let start = Instant::now();
|
||||
let result = f();
|
||||
metrics.large_value_duration_seconds.record(start.elapsed());
|
||||
self.large_value_duration_seconds.record(start.elapsed());
|
||||
result
|
||||
} else {
|
||||
f()
|
||||
|
||||
@ -108,7 +108,7 @@ macro_rules! tables {
|
||||
(TableType::Table, [$($table:ident),*]),
|
||||
(TableType::DupSort, [$($dupsort:ident),*])
|
||||
]) => {
|
||||
#[derive(Debug, PartialEq, Copy, Clone)]
|
||||
#[derive(Debug, PartialEq, Copy, Clone, Hash, Eq)]
|
||||
/// Default tables that should be present inside database.
|
||||
pub enum Tables {
|
||||
$(
|
||||
|
||||
Reference in New Issue
Block a user