mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: Report MDBX commit latency metrics (#5668)
Co-authored-by: Nikita Smirnov <nikita.smirnov.m@gmail.com> Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
@ -12,7 +12,7 @@ use crate::{
|
||||
};
|
||||
use parking_lot::RwLock;
|
||||
use reth_interfaces::db::{DatabaseWriteError, DatabaseWriteOperation};
|
||||
use reth_libmdbx::{ffi::DBI, Transaction, TransactionKind, WriteFlags, RW};
|
||||
use reth_libmdbx::{ffi::DBI, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
|
||||
use reth_tracing::tracing::debug;
|
||||
use std::{
|
||||
backtrace::Backtrace,
|
||||
@ -99,14 +99,14 @@ impl<K: TransactionKind> Tx<K> {
|
||||
fn execute_with_close_transaction_metric<R>(
|
||||
mut self,
|
||||
outcome: TransactionOutcome,
|
||||
f: impl FnOnce(Self) -> R,
|
||||
f: impl FnOnce(Self) -> (R, Option<CommitLatency>),
|
||||
) -> R {
|
||||
if let Some(mut metrics_handler) = self.metrics_handler.take() {
|
||||
metrics_handler.close_recorded = true;
|
||||
metrics_handler.log_backtrace_on_long_transaction();
|
||||
|
||||
let start = Instant::now();
|
||||
let result = f(self);
|
||||
let (result, commit_latency) = f(self);
|
||||
let open_duration = metrics_handler.start.elapsed();
|
||||
let close_duration = start.elapsed();
|
||||
|
||||
@ -115,11 +115,12 @@ impl<K: TransactionKind> Tx<K> {
|
||||
outcome,
|
||||
open_duration,
|
||||
Some(close_duration),
|
||||
commit_latency,
|
||||
);
|
||||
|
||||
result
|
||||
} else {
|
||||
f(self)
|
||||
f(self).0
|
||||
}
|
||||
}
|
||||
|
||||
@ -212,6 +213,7 @@ impl<K: TransactionKind> Drop for MetricsHandler<K> {
|
||||
TransactionOutcome::Drop,
|
||||
self.start.elapsed(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -234,13 +236,16 @@ impl<K: TransactionKind> DbTx for Tx<K> {
|
||||
|
||||
fn commit(self) -> Result<bool, DatabaseError> {
|
||||
self.execute_with_close_transaction_metric(TransactionOutcome::Commit, |this| {
|
||||
this.inner.commit().map_err(|e| DatabaseError::Commit(e.into()))
|
||||
match this.inner.commit().map_err(|e| DatabaseError::Commit(e.into())) {
|
||||
Ok((v, latency)) => (Ok(v), Some(latency)),
|
||||
Err(e) => (Err(e), None),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
fn abort(self) {
|
||||
self.execute_with_close_transaction_metric(TransactionOutcome::Abort, |this| {
|
||||
drop(this.inner)
|
||||
(drop(this.inner), None)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
use metrics::{Gauge, Histogram};
|
||||
use reth_libmdbx::CommitLatency;
|
||||
use reth_metrics::{metrics::Counter, Metrics};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
@ -100,6 +101,23 @@ pub(crate) struct TransactionMetrics {
|
||||
open_duration_seconds: Histogram,
|
||||
/// The time it took to close a database transaction
|
||||
close_duration_seconds: Histogram,
|
||||
/// The time it took to prepare a transaction commit
|
||||
commit_preparation_duration_seconds: Histogram,
|
||||
/// Duration of GC update during transaction commit by wall clock
|
||||
commit_gc_wallclock_duration_seconds: Histogram,
|
||||
/// The time it took to conduct audit of a transaction commit
|
||||
commit_audit_duration_seconds: Histogram,
|
||||
/// The time it took to write dirty/modified data pages to a filesystem during transaction
|
||||
/// commit
|
||||
commit_write_duration_seconds: Histogram,
|
||||
/// The time it took to sync written data to the disk/storage during transaction commit
|
||||
commit_sync_duration_seconds: Histogram,
|
||||
/// The time it took to release resources during transaction commit
|
||||
commit_ending_duration_seconds: Histogram,
|
||||
/// The total duration of a transaction commit
|
||||
commit_whole_duration_seconds: Histogram,
|
||||
/// User-mode CPU time spent on GC update during transaction commit
|
||||
commit_gc_cputime_duration_seconds: Histogram,
|
||||
}
|
||||
|
||||
impl TransactionMetrics {
|
||||
@ -116,6 +134,7 @@ impl TransactionMetrics {
|
||||
outcome: TransactionOutcome,
|
||||
open_duration: Duration,
|
||||
close_duration: Option<Duration>,
|
||||
commit_latency: Option<CommitLatency>,
|
||||
) {
|
||||
let metrics = Self::new_with_labels(&[(Labels::TransactionMode.as_str(), mode.as_str())]);
|
||||
metrics.open_total.decrement(1.0);
|
||||
@ -129,6 +148,17 @@ impl TransactionMetrics {
|
||||
if let Some(close_duration) = close_duration {
|
||||
metrics.close_duration_seconds.record(close_duration)
|
||||
}
|
||||
|
||||
if let Some(commit_latency) = commit_latency {
|
||||
metrics.commit_preparation_duration_seconds.record(commit_latency.preparation());
|
||||
metrics.commit_gc_wallclock_duration_seconds.record(commit_latency.gc_wallclock());
|
||||
metrics.commit_audit_duration_seconds.record(commit_latency.audit());
|
||||
metrics.commit_write_duration_seconds.record(commit_latency.write());
|
||||
metrics.commit_sync_duration_seconds.record(commit_latency.sync());
|
||||
metrics.commit_ending_duration_seconds.record(commit_latency.ending());
|
||||
metrics.commit_whole_duration_seconds.record(commit_latency.whole());
|
||||
metrics.commit_gc_cputime_duration_seconds.record(commit_latency.gc_cputime());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -68,7 +68,7 @@ fn bench_put_rand(c: &mut Criterion) {
|
||||
let txn = env.begin_ro_txn().unwrap();
|
||||
let db = txn.open_db(None).unwrap();
|
||||
txn.prime_for_permaopen(db);
|
||||
let db = txn.commit_and_rebind_open_dbs().unwrap().1.remove(0);
|
||||
let db = txn.commit_and_rebind_open_dbs().unwrap().2.remove(0);
|
||||
|
||||
let mut items: Vec<(String, String)> = (0..n).map(|n| (get_key(n), get_data(n))).collect();
|
||||
items.shuffle(&mut XorShiftRng::from_seed(Default::default()));
|
||||
|
||||
@ -2,7 +2,7 @@ use crate::{
|
||||
database::Database,
|
||||
error::{mdbx_result, Error, Result},
|
||||
flags::EnvironmentFlags,
|
||||
transaction::{RO, RW},
|
||||
transaction::{CommitLatency, RO, RW},
|
||||
Mode, Transaction, TransactionKind,
|
||||
};
|
||||
use byteorder::{ByteOrder, NativeEndian};
|
||||
@ -303,7 +303,7 @@ unsafe impl Sync for EnvPtr {}
|
||||
pub(crate) enum TxnManagerMessage {
|
||||
Begin { parent: TxnPtr, flags: ffi::MDBX_txn_flags_t, sender: SyncSender<Result<TxnPtr>> },
|
||||
Abort { tx: TxnPtr, sender: SyncSender<Result<bool>> },
|
||||
Commit { tx: TxnPtr, sender: SyncSender<Result<bool>> },
|
||||
Commit { tx: TxnPtr, sender: SyncSender<Result<(bool, CommitLatency)>> },
|
||||
}
|
||||
|
||||
/// Environment statistics.
|
||||
@ -603,9 +603,13 @@ impl EnvironmentBuilder {
|
||||
}
|
||||
TxnManagerMessage::Commit { tx, sender } => {
|
||||
sender
|
||||
.send(mdbx_result(unsafe {
|
||||
ffi::mdbx_txn_commit_ex(tx.0, ptr::null_mut())
|
||||
}))
|
||||
.send({
|
||||
let mut latency = CommitLatency::new();
|
||||
mdbx_result(unsafe {
|
||||
ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
|
||||
})
|
||||
.map(|v| (v, latency))
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
},
|
||||
|
||||
@ -19,7 +19,7 @@ pub use crate::{
|
||||
},
|
||||
error::{Error, Result},
|
||||
flags::*,
|
||||
transaction::{Transaction, TransactionKind, RO, RW},
|
||||
transaction::{CommitLatency, Transaction, TransactionKind, RO, RW},
|
||||
};
|
||||
pub mod ffi {
|
||||
pub use ffi::{MDBX_dbi as DBI, MDBX_log_level_t as LogLevel};
|
||||
|
||||
@ -15,6 +15,7 @@ use std::{
|
||||
mem::size_of,
|
||||
ptr, slice,
|
||||
sync::{atomic::AtomicBool, mpsc::sync_channel, Arc},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
mod private {
|
||||
@ -166,8 +167,8 @@ where
|
||||
/// Commits the transaction.
|
||||
///
|
||||
/// Any pending operations will be saved.
|
||||
pub fn commit(self) -> Result<bool> {
|
||||
self.commit_and_rebind_open_dbs().map(|v| v.0)
|
||||
pub fn commit(self) -> Result<(bool, CommitLatency)> {
|
||||
self.commit_and_rebind_open_dbs().map(|v| (v.0, v.1))
|
||||
}
|
||||
|
||||
pub fn prime_for_permaopen(&self, db: Database) {
|
||||
@ -175,11 +176,15 @@ where
|
||||
}
|
||||
|
||||
/// Commits the transaction and returns table handles permanently open until dropped.
|
||||
pub fn commit_and_rebind_open_dbs(self) -> Result<(bool, Vec<Database>)> {
|
||||
pub fn commit_and_rebind_open_dbs(self) -> Result<(bool, CommitLatency, Vec<Database>)> {
|
||||
let result = {
|
||||
let result = self.txn_execute(|txn| {
|
||||
if K::ONLY_CLEAN {
|
||||
mdbx_result(unsafe { ffi::mdbx_txn_commit_ex(txn, ptr::null_mut()) })
|
||||
let mut latency = CommitLatency::new();
|
||||
mdbx_result(unsafe {
|
||||
ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency())
|
||||
})
|
||||
.map(|v| (v, latency))
|
||||
} else {
|
||||
let (sender, rx) = sync_channel(0);
|
||||
self.env()
|
||||
@ -193,9 +198,10 @@ where
|
||||
self.inner.set_committed();
|
||||
result
|
||||
};
|
||||
result.map(|v| {
|
||||
result.map(|(v, latency)| {
|
||||
(
|
||||
v,
|
||||
latency,
|
||||
self.inner
|
||||
.primed_dbis
|
||||
.lock()
|
||||
@ -535,6 +541,83 @@ impl TransactionPtr {
|
||||
}
|
||||
}
|
||||
|
||||
/// Commit latencies info.
|
||||
///
|
||||
/// Contains information about latency of commit stages.
|
||||
/// Inner struct stores this info in 1/65536 of seconds units.
|
||||
#[repr(transparent)]
|
||||
pub struct CommitLatency(ffi::MDBX_commit_latency);
|
||||
|
||||
impl CommitLatency {
|
||||
/// Create a new CommitLatency with zero'd inner struct `ffi::MDBX_commit_latency`.
|
||||
pub(crate) fn new() -> Self {
|
||||
unsafe { Self(std::mem::zeroed()) }
|
||||
}
|
||||
|
||||
/// Returns a mut pointer to `ffi::MDBX_commit_latency`.
|
||||
pub(crate) fn mdb_commit_latency(&mut self) -> *mut ffi::MDBX_commit_latency {
|
||||
&mut self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl CommitLatency {
|
||||
/// Duration of preparation (commit child transactions, update
|
||||
/// sub-databases records and cursors destroying).
|
||||
#[inline]
|
||||
pub fn preparation(&self) -> Duration {
|
||||
Self::time_to_duration(self.0.preparation)
|
||||
}
|
||||
|
||||
/// Duration of GC update by wall clock.
|
||||
#[inline]
|
||||
pub fn gc_wallclock(&self) -> Duration {
|
||||
Self::time_to_duration(self.0.gc_wallclock)
|
||||
}
|
||||
|
||||
/// Duration of internal audit if enabled.
|
||||
#[inline]
|
||||
pub fn audit(&self) -> Duration {
|
||||
Self::time_to_duration(self.0.audit)
|
||||
}
|
||||
|
||||
/// Duration of writing dirty/modified data pages to a filesystem,
|
||||
/// i.e. the summary duration of a `write()` syscalls during commit.
|
||||
#[inline]
|
||||
pub fn write(&self) -> Duration {
|
||||
Self::time_to_duration(self.0.write)
|
||||
}
|
||||
|
||||
/// Duration of syncing written data to the disk/storage, i.e.
|
||||
/// the duration of a `fdatasync()` or a `msync()` syscall during commit.
|
||||
#[inline]
|
||||
pub fn sync(&self) -> Duration {
|
||||
Self::time_to_duration(self.0.sync)
|
||||
}
|
||||
|
||||
/// Duration of transaction ending (releasing resources).
|
||||
#[inline]
|
||||
pub fn ending(&self) -> Duration {
|
||||
Self::time_to_duration(self.0.ending)
|
||||
}
|
||||
|
||||
/// The total duration of a commit.
|
||||
#[inline]
|
||||
pub fn whole(&self) -> Duration {
|
||||
Self::time_to_duration(self.0.whole)
|
||||
}
|
||||
|
||||
/// User-mode CPU time spent on GC update.
|
||||
#[inline]
|
||||
pub fn gc_cputime(&self) -> Duration {
|
||||
Self::time_to_duration(self.0.gc_cputime)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn time_to_duration(time: u32) -> Duration {
|
||||
Duration::from_nanos(time as u64 * (1_000_000_000 / 65_536))
|
||||
}
|
||||
}
|
||||
|
||||
// SAFETY: Access to the transaction is synchronized by the lock.
|
||||
unsafe impl Send for TransactionPtr {}
|
||||
|
||||
|
||||
@ -109,7 +109,7 @@ fn test_iter() {
|
||||
for (key, data) in &items {
|
||||
txn.put(db.dbi(), key, data, WriteFlags::empty()).unwrap();
|
||||
}
|
||||
assert!(!txn.commit().unwrap());
|
||||
assert!(!txn.commit().unwrap().0);
|
||||
}
|
||||
|
||||
let txn = env.begin_ro_txn().unwrap();
|
||||
|
||||
@ -147,13 +147,13 @@ fn test_clear_db() {
|
||||
{
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
txn.put(txn.open_db(None).unwrap().dbi(), b"key", b"val", WriteFlags::empty()).unwrap();
|
||||
assert!(!txn.commit().unwrap());
|
||||
assert!(!txn.commit().unwrap().0);
|
||||
}
|
||||
|
||||
{
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
txn.clear_db(txn.open_db(None).unwrap().dbi()).unwrap();
|
||||
assert!(!txn.commit().unwrap());
|
||||
assert!(!txn.commit().unwrap().0);
|
||||
}
|
||||
|
||||
let txn = env.begin_ro_txn().unwrap();
|
||||
@ -177,7 +177,7 @@ fn test_drop_db() {
|
||||
.unwrap();
|
||||
// Workaround for MDBX dbi drop issue
|
||||
txn.create_db(Some("canary"), DatabaseFlags::empty()).unwrap();
|
||||
assert!(!txn.commit().unwrap());
|
||||
assert!(!txn.commit().unwrap().0);
|
||||
}
|
||||
{
|
||||
let txn = env.begin_rw_txn().unwrap();
|
||||
@ -186,7 +186,7 @@ fn test_drop_db() {
|
||||
txn.drop_db(db).unwrap();
|
||||
}
|
||||
assert!(matches!(txn.open_db(Some("test")).unwrap_err(), Error::NotFound));
|
||||
assert!(!txn.commit().unwrap());
|
||||
assert!(!txn.commit().unwrap().0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user