mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
perf(metrics): cache handles for database transaction metrics (#6730)
This commit is contained in:
@ -2,9 +2,7 @@
|
||||
|
||||
use super::cursor::Cursor;
|
||||
use crate::{
|
||||
metrics::{
|
||||
DatabaseEnvMetrics, Operation, TransactionMetrics, TransactionMode, TransactionOutcome,
|
||||
},
|
||||
metrics::{DatabaseEnvMetrics, Operation, TransactionMode, TransactionOutcome},
|
||||
table::{Compress, DupSort, Encode, Table, TableImporter},
|
||||
tables::{utils::decode_one, Tables},
|
||||
transaction::{DbTx, DbTxMut},
|
||||
@ -55,7 +53,7 @@ impl<K: TransactionKind> Tx<K> {
|
||||
) -> Self {
|
||||
let metrics_handler = if let Some(metrics) = metrics {
|
||||
let handler = MetricsHandler::<K>::new(inner.id(), metrics);
|
||||
TransactionMetrics::record_open(handler.transaction_mode());
|
||||
handler.env_metrics.record_opened_transaction(handler.transaction_mode());
|
||||
handler.log_transaction_opened();
|
||||
Some(handler)
|
||||
} else {
|
||||
@ -128,7 +126,7 @@ impl<K: TransactionKind> Tx<K> {
|
||||
|
||||
let (result, commit_latency, close_duration) = run(self);
|
||||
let open_duration = metrics_handler.start.elapsed();
|
||||
TransactionMetrics::record_close(
|
||||
metrics_handler.env_metrics.record_closed_transaction(
|
||||
metrics_handler.transaction_mode(),
|
||||
outcome,
|
||||
open_duration,
|
||||
@ -244,8 +242,7 @@ impl<K: TransactionKind> Drop for MetricsHandler<K> {
|
||||
fn drop(&mut self) {
|
||||
if !self.close_recorded {
|
||||
self.log_backtrace_on_long_read_transaction();
|
||||
|
||||
TransactionMetrics::record_close(
|
||||
self.env_metrics.record_closed_transaction(
|
||||
self.transaction_mode(),
|
||||
TransactionOutcome::Drop,
|
||||
self.start.elapsed(),
|
||||
|
||||
@ -2,7 +2,7 @@ use crate::Tables;
|
||||
use metrics::{Gauge, Histogram};
|
||||
use reth_libmdbx::CommitLatency;
|
||||
use reth_metrics::{metrics::Counter, Metrics};
|
||||
use rustc_hash::FxHasher;
|
||||
use rustc_hash::{FxHashMap, FxHasher};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
hash::BuildHasherDefault,
|
||||
@ -20,14 +20,31 @@ const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096;
|
||||
#[derive(Debug)]
|
||||
pub struct DatabaseEnvMetrics {
|
||||
/// Caches OperationMetrics handles for each table and operation tuple.
|
||||
operations: HashMap<(Tables, Operation), OperationMetrics, BuildHasherDefault<FxHasher>>,
|
||||
operations: FxHashMap<(Tables, Operation), OperationMetrics>,
|
||||
/// Caches TransactionMetrics handles for counters grouped by only transaction mode.
|
||||
/// Updated both at tx open and close.
|
||||
transactions: FxHashMap<TransactionMode, TransactionMetrics>,
|
||||
/// Caches TransactionOutcomeMetrics handles for counters grouped by transaction mode and
|
||||
/// outcome. Can only be updated at tx close, as outcome is only known at that point.
|
||||
transaction_outcomes:
|
||||
FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics>,
|
||||
}
|
||||
|
||||
impl DatabaseEnvMetrics {
|
||||
pub(crate) fn new() -> Self {
|
||||
// Pre-populate the map with all possible table and operation combinations
|
||||
// Pre-populate metric handle maps with all possible combinations of labels
|
||||
// to avoid runtime locks on the map when recording metrics.
|
||||
let mut operations = HashMap::with_capacity_and_hasher(
|
||||
Self {
|
||||
operations: Self::generate_operation_handles(),
|
||||
transactions: Self::generate_transaction_handles(),
|
||||
transaction_outcomes: Self::generate_transaction_outcome_handles(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Generate a map of all possible operation handles for each table and operation tuple.
|
||||
/// Used for tracking all operation metrics.
|
||||
fn generate_operation_handles() -> FxHashMap<(Tables, Operation), OperationMetrics> {
|
||||
let mut operations = FxHashMap::with_capacity_and_hasher(
|
||||
Tables::COUNT * Operation::COUNT,
|
||||
BuildHasherDefault::<FxHasher>::default(),
|
||||
);
|
||||
@ -42,7 +59,45 @@ impl DatabaseEnvMetrics {
|
||||
);
|
||||
}
|
||||
}
|
||||
Self { operations }
|
||||
operations
|
||||
}
|
||||
|
||||
/// Generate a map of all possible transaction modes to metric handles.
|
||||
/// Used for tracking a counter of open transactions.
|
||||
fn generate_transaction_handles() -> FxHashMap<TransactionMode, TransactionMetrics> {
|
||||
TransactionMode::iter()
|
||||
.map(|mode| {
|
||||
(
|
||||
mode,
|
||||
TransactionMetrics::new_with_labels(&[(
|
||||
Labels::TransactionMode.as_str(),
|
||||
mode.as_str(),
|
||||
)]),
|
||||
)
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Generate a map of all possible transaction mode and outcome handles.
|
||||
/// Used for tracking various stats for finished transactions (e.g. commit duration).
|
||||
fn generate_transaction_outcome_handles(
|
||||
) -> FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics> {
|
||||
let mut transaction_outcomes = HashMap::with_capacity_and_hasher(
|
||||
TransactionMode::COUNT * TransactionOutcome::COUNT,
|
||||
BuildHasherDefault::<FxHasher>::default(),
|
||||
);
|
||||
for mode in TransactionMode::iter() {
|
||||
for outcome in TransactionOutcome::iter() {
|
||||
transaction_outcomes.insert(
|
||||
(mode, outcome),
|
||||
TransactionOutcomeMetrics::new_with_labels(&[
|
||||
(Labels::TransactionMode.as_str(), mode.as_str()),
|
||||
(Labels::TransactionOutcome.as_str(), outcome.as_str()),
|
||||
]),
|
||||
);
|
||||
}
|
||||
}
|
||||
transaction_outcomes
|
||||
}
|
||||
|
||||
/// Record a metric for database operation executed in `f`.
|
||||
@ -59,10 +114,38 @@ impl DatabaseEnvMetrics {
|
||||
.expect("operation & table metric handle not found")
|
||||
.record(value_size, f)
|
||||
}
|
||||
|
||||
/// Record metrics for opening a database transaction.
|
||||
pub(crate) fn record_opened_transaction(&self, mode: TransactionMode) {
|
||||
self.transactions
|
||||
.get(&mode)
|
||||
.expect("transaction mode metric handle not found")
|
||||
.record_open();
|
||||
}
|
||||
|
||||
/// Record metrics for closing a database transactions.
|
||||
pub(crate) fn record_closed_transaction(
|
||||
&self,
|
||||
mode: TransactionMode,
|
||||
outcome: TransactionOutcome,
|
||||
open_duration: Duration,
|
||||
close_duration: Option<Duration>,
|
||||
commit_latency: Option<CommitLatency>,
|
||||
) {
|
||||
self.transactions
|
||||
.get(&mode)
|
||||
.expect("transaction mode metric handle not found")
|
||||
.record_close();
|
||||
|
||||
self.transaction_outcomes
|
||||
.get(&(mode, outcome))
|
||||
.expect("transaction outcome metric handle not found")
|
||||
.record(open_duration, close_duration, commit_latency);
|
||||
}
|
||||
}
|
||||
|
||||
/// Transaction mode for the database, either read-only or read-write.
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
|
||||
pub(crate) enum TransactionMode {
|
||||
/// Read-only transaction mode.
|
||||
ReadOnly,
|
||||
@ -85,7 +168,7 @@ impl TransactionMode {
|
||||
}
|
||||
|
||||
/// Transaction outcome after a database operation - commit, abort, or drop.
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash)]
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq, Hash, EnumCount, EnumIter)]
|
||||
pub(crate) enum TransactionOutcome {
|
||||
/// Successful commit of the transaction.
|
||||
Commit,
|
||||
@ -175,6 +258,21 @@ impl Labels {
|
||||
pub(crate) struct TransactionMetrics {
|
||||
/// Total number of currently open database transactions
|
||||
open_total: Gauge,
|
||||
}
|
||||
|
||||
impl TransactionMetrics {
|
||||
pub(crate) fn record_open(&self) {
|
||||
self.open_total.increment(1.0);
|
||||
}
|
||||
|
||||
pub(crate) fn record_close(&self) {
|
||||
self.open_total.decrement(1.0);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Metrics, Clone)]
|
||||
#[metrics(scope = "database.transaction")]
|
||||
pub(crate) struct TransactionOutcomeMetrics {
|
||||
/// The time a database transaction has been open
|
||||
open_duration_seconds: Histogram,
|
||||
/// The time it took to close a database transaction
|
||||
@ -198,44 +296,30 @@ pub(crate) struct TransactionMetrics {
|
||||
commit_gc_cputime_duration_seconds: Histogram,
|
||||
}
|
||||
|
||||
impl TransactionMetrics {
|
||||
/// Record transaction opening.
|
||||
pub(crate) fn record_open(mode: TransactionMode) {
|
||||
let metrics = Self::new_with_labels(&[(Labels::TransactionMode.as_str(), mode.as_str())]);
|
||||
metrics.open_total.increment(1.0);
|
||||
}
|
||||
|
||||
impl TransactionOutcomeMetrics {
|
||||
/// Record transaction closing with the duration it was open and the duration it took to close
|
||||
/// it.
|
||||
pub(crate) fn record_close(
|
||||
mode: TransactionMode,
|
||||
outcome: TransactionOutcome,
|
||||
pub(crate) fn record(
|
||||
&self,
|
||||
open_duration: Duration,
|
||||
close_duration: Option<Duration>,
|
||||
commit_latency: Option<CommitLatency>,
|
||||
) {
|
||||
let metrics = Self::new_with_labels(&[(Labels::TransactionMode.as_str(), mode.as_str())]);
|
||||
metrics.open_total.decrement(1.0);
|
||||
|
||||
let metrics = Self::new_with_labels(&[
|
||||
(Labels::TransactionMode.as_str(), mode.as_str()),
|
||||
(Labels::TransactionOutcome.as_str(), outcome.as_str()),
|
||||
]);
|
||||
metrics.open_duration_seconds.record(open_duration);
|
||||
self.open_duration_seconds.record(open_duration);
|
||||
|
||||
if let Some(close_duration) = close_duration {
|
||||
metrics.close_duration_seconds.record(close_duration)
|
||||
self.close_duration_seconds.record(close_duration)
|
||||
}
|
||||
|
||||
if let Some(commit_latency) = commit_latency {
|
||||
metrics.commit_preparation_duration_seconds.record(commit_latency.preparation());
|
||||
metrics.commit_gc_wallclock_duration_seconds.record(commit_latency.gc_wallclock());
|
||||
metrics.commit_audit_duration_seconds.record(commit_latency.audit());
|
||||
metrics.commit_write_duration_seconds.record(commit_latency.write());
|
||||
metrics.commit_sync_duration_seconds.record(commit_latency.sync());
|
||||
metrics.commit_ending_duration_seconds.record(commit_latency.ending());
|
||||
metrics.commit_whole_duration_seconds.record(commit_latency.whole());
|
||||
metrics.commit_gc_cputime_duration_seconds.record(commit_latency.gc_cputime());
|
||||
self.commit_preparation_duration_seconds.record(commit_latency.preparation());
|
||||
self.commit_gc_wallclock_duration_seconds.record(commit_latency.gc_wallclock());
|
||||
self.commit_audit_duration_seconds.record(commit_latency.audit());
|
||||
self.commit_write_duration_seconds.record(commit_latency.write());
|
||||
self.commit_sync_duration_seconds.record(commit_latency.sync());
|
||||
self.commit_ending_duration_seconds.record(commit_latency.ending());
|
||||
self.commit_whole_duration_seconds.record(commit_latency.whole());
|
||||
self.commit_gc_cputime_duration_seconds.record(commit_latency.gc_cputime());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user