diff --git a/crates/node-core/src/metrics/prometheus_exporter.rs b/crates/node-core/src/metrics/prometheus_exporter.rs index e1d1e378f..4bfcddf0c 100644 --- a/crates/node-core/src/metrics/prometheus_exporter.rs +++ b/crates/node-core/src/metrics/prometheus_exporter.rs @@ -102,6 +102,10 @@ where describe_gauge!("db.table_pages", "The number of database pages for a table"); describe_gauge!("db.table_entries", "The number of entries for a table"); describe_gauge!("db.freelist", "The number of pages on the freelist"); + describe_gauge!( + "db.timed_out_not_aborted_transactions", + "Number of timed out transactions that were not aborted by the user yet" + ); process.describe(); describe_memory_stats(); describe_io_stats(); diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 5a3e01a3e..1cc6f8543 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -123,17 +123,19 @@ impl Database for DatabaseEnv { type TXMut = tx::Tx; fn tx(&self) -> Result { - Ok(Tx::new_with_metrics( + Tx::new_with_metrics( self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?, self.metrics.as_ref().cloned(), - )) + ) + .map_err(|e| DatabaseError::InitTx(e.into())) } fn tx_mut(&self) -> Result { - Ok(Tx::new_with_metrics( + Tx::new_with_metrics( self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?, self.metrics.as_ref().cloned(), - )) + ) + .map_err(|e| DatabaseError::InitTx(e.into())) } } @@ -202,6 +204,12 @@ impl DatabaseMetrics for DatabaseEnv { metrics.push(("db.freelist", freelist as f64, vec![])); } + metrics.push(( + "db.timed_out_not_aborted_transactions", + self.timed_out_not_aborted_transactions() as f64, + vec![], + )); + metrics } } diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index 16d38ead6..798b0b3c0 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -53,14 +53,16 @@ impl Tx { pub fn new_with_metrics( inner: Transaction, env_metrics: Option>, - ) -> Self { - let metrics_handler = env_metrics.map(|env_metrics| { - let handler = MetricsHandler::::new(inner.id(), env_metrics); - handler.env_metrics.record_opened_transaction(handler.transaction_mode()); - handler.log_transaction_opened(); - handler - }); - Self::new_inner(inner, metrics_handler) + ) -> reth_libmdbx::Result { + let metrics_handler = env_metrics + .map(|env_metrics| { + let handler = MetricsHandler::::new(inner.id()?, env_metrics); + handler.env_metrics.record_opened_transaction(handler.transaction_mode()); + handler.log_transaction_opened(); + Ok(handler) + }) + .transpose()?; + Ok(Self::new_inner(inner, metrics_handler)) } #[inline] @@ -76,8 +78,8 @@ impl Tx { } /// Gets this transaction ID. - pub fn id(&self) -> u64 { - self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| handler.txn_id) + pub fn id(&self) -> reth_libmdbx::Result { + self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id)) } /// Gets a table database handle if it exists, otherwise creates it. @@ -437,7 +439,7 @@ mod tests { assert_eq!( tx.get::(0).err(), - Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionAborted.into())) + Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionTimeout.into())) ); // Transaction is timeout-ed assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed)); // Backtrace is recorded diff --git a/crates/storage/libmdbx-rs/benches/cursor.rs b/crates/storage/libmdbx-rs/benches/cursor.rs index c05cef4b6..a73b4fe27 100644 --- a/crates/storage/libmdbx-rs/benches/cursor.rs +++ b/crates/storage/libmdbx-rs/benches/cursor.rs @@ -78,8 +78,7 @@ fn bench_get_seq_raw(c: &mut Criterion) { let (_dir, env) = setup_bench_db(n); let dbi = env.begin_ro_txn().unwrap().open_db(None).unwrap().dbi(); - let _txn = env.begin_ro_txn().unwrap(); - let txn = _txn.txn(); + let txn = env.begin_ro_txn().unwrap(); let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; @@ -87,18 +86,21 @@ fn bench_get_seq_raw(c: &mut Criterion) { c.bench_function("bench_get_seq_raw", |b| { b.iter(|| unsafe { - mdbx_cursor_open(txn, dbi, &mut cursor); - let mut i = 0; - let mut count = 0u32; + txn.txn_execute(|txn| { + mdbx_cursor_open(txn, dbi, &mut cursor); + let mut i = 0; + let mut count = 0u32; - while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 { - i += key.iov_len + data.iov_len; - count += 1; - } + while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 { + i += key.iov_len + data.iov_len; + count += 1; + } - black_box(i); - assert_eq!(count, n); - mdbx_cursor_close(cursor); + black_box(i); + assert_eq!(count, n); + mdbx_cursor_close(cursor); + }) + .unwrap(); }) }); } diff --git a/crates/storage/libmdbx-rs/benches/transaction.rs b/crates/storage/libmdbx-rs/benches/transaction.rs index 91e2c4404..dc8426c5b 100644 --- a/crates/storage/libmdbx-rs/benches/transaction.rs +++ b/crates/storage/libmdbx-rs/benches/transaction.rs @@ -46,7 +46,7 @@ fn bench_get_rand_raw(c: &mut Criterion) { c.bench_function("bench_get_rand_raw", |b| { b.iter(|| unsafe { - txn.with_raw_tx_ptr(|txn| { + txn.txn_execute(|txn| { let mut i: size_t = 0; for key in &keys { key_val.iov_len = key.len() as size_t; @@ -57,7 +57,8 @@ fn bench_get_rand_raw(c: &mut Criterion) { i += key_val.iov_len; } black_box(i); - }); + }) + .unwrap(); }) }); } diff --git a/crates/storage/libmdbx-rs/src/cursor.rs b/crates/storage/libmdbx-rs/src/cursor.rs index 1f356a7e5..d9b1c5c42 100644 --- a/crates/storage/libmdbx-rs/src/cursor.rs +++ b/crates/storage/libmdbx-rs/src/cursor.rs @@ -1,5 +1,5 @@ use crate::{ - error::{mdbx_result, mdbx_result_with_tx_kind, Error, Result}, + error::{mdbx_result, Error, Result}, flags::*, mdbx_try_optional, transaction::{TransactionKind, RW}, @@ -30,26 +30,26 @@ where pub(crate) fn new(txn: Transaction, dbi: ffi::MDBX_dbi) -> Result { let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut(); unsafe { - mdbx_result_with_tx_kind::( - txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)), - txn.txn(), - txn.env().txn_manager(), - )?; + txn.txn_execute(|txn_ptr| { + mdbx_result(ffi::mdbx_cursor_open(txn_ptr, dbi, &mut cursor)) + })??; } Ok(Self { txn, cursor }) } fn new_at_position(other: &Self) -> Result { unsafe { - let cursor = ffi::mdbx_cursor_create(ptr::null_mut()); + other.txn.txn_execute(|_| { + let cursor = ffi::mdbx_cursor_create(ptr::null_mut()); - let res = ffi::mdbx_cursor_copy(other.cursor(), cursor); + let res = ffi::mdbx_cursor_copy(other.cursor(), cursor); - let s = Self { txn: other.txn.clone(), cursor }; + let s = Self { txn: other.txn.clone(), cursor }; - mdbx_result_with_tx_kind::(res, s.txn.txn(), s.txn.env().txn_manager())?; + mdbx_result(res)?; - Ok(s) + Ok(s) + })? } } @@ -95,11 +95,12 @@ where let key_ptr = key_val.iov_base; let data_ptr = data_val.iov_base; self.txn.txn_execute(|txn| { - let v = mdbx_result_with_tx_kind::( - ffi::mdbx_cursor_get(self.cursor, &mut key_val, &mut data_val, op), - txn, - self.txn.env().txn_manager(), - )?; + let v = mdbx_result(ffi::mdbx_cursor_get( + self.cursor, + &mut key_val, + &mut data_val, + op, + ))?; assert_ne!(data_ptr, data_val.iov_base); let key_out = { // MDBX wrote in new key @@ -111,7 +112,7 @@ where }; let data_out = Value::decode_val::(txn, data_val)?; Ok((key_out, data_out, v)) - }) + })? } } @@ -444,7 +445,7 @@ impl Cursor { mdbx_result(unsafe { self.txn.txn_execute(|_| { ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits()) - }) + })? })?; Ok(()) @@ -458,7 +459,7 @@ impl Cursor { /// current key, if the database was opened with [DatabaseFlags::DUP_SORT]. pub fn del(&mut self, flags: WriteFlags) -> Result<()> { mdbx_result(unsafe { - self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits())) + self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))? })?; Ok(()) @@ -470,7 +471,7 @@ where K: TransactionKind, { fn clone(&self) -> Self { - self.txn.txn_execute(|_| Self::new_at_position(self).unwrap()) + Self::new_at_position(self).unwrap() } } @@ -488,7 +489,7 @@ where K: TransactionKind, { fn drop(&mut self) { - self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) }) + let _ = self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) }); } } @@ -564,7 +565,7 @@ where let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let op = mem::replace(op, *next_op); unsafe { - cursor.txn.txn_execute(|txn| { + let result = cursor.txn.txn_execute(|txn| { match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) { ffi::MDBX_SUCCESS => { let key = match Key::decode_val::(txn, key) { @@ -583,7 +584,11 @@ where ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None, error => Some(Err(Error::from_err_code(error))), } - }) + }); + match result { + Ok(result) => result, + Err(err) => Some(Err(err)), + } } } Self::Err(err) => err.take().map(Err), @@ -655,7 +660,7 @@ where let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let op = mem::replace(op, *next_op); unsafe { - cursor.txn.txn_execute(|txn| { + let result = cursor.txn.txn_execute(|txn| { match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) { ffi::MDBX_SUCCESS => { let key = match Key::decode_val::(txn, key) { @@ -674,7 +679,11 @@ where ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None, error => Some(Err(Error::from_err_code(error))), } - }) + }); + match result { + Ok(result) => result, + Err(err) => Some(Err(err)), + } } } Iter::Err(err) => err.take().map(Err), @@ -752,17 +761,15 @@ where let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() }; let op = mem::replace(op, ffi::MDBX_NEXT_NODUP); - cursor.txn.txn_execute(|_| { - let err_code = - unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) }; + let err_code = + unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) }; - (err_code == ffi::MDBX_SUCCESS).then(|| { - IntoIter::new( - Cursor::new_at_position(&**cursor).unwrap(), - ffi::MDBX_GET_CURRENT, - ffi::MDBX_NEXT_DUP, - ) - }) + (err_code == ffi::MDBX_SUCCESS).then(|| { + IntoIter::new( + Cursor::new_at_position(&**cursor).unwrap(), + ffi::MDBX_GET_CURRENT, + ffi::MDBX_NEXT_DUP, + ) }) } IterDup::Err(err) => err.take().map(|e| IntoIter::Err(Some(e))), diff --git a/crates/storage/libmdbx-rs/src/database.rs b/crates/storage/libmdbx-rs/src/database.rs index 09e2e1089..55eb7e0bb 100644 --- a/crates/storage/libmdbx-rs/src/database.rs +++ b/crates/storage/libmdbx-rs/src/database.rs @@ -1,5 +1,5 @@ use crate::{ - error::{mdbx_result_with_tx_kind, Result}, + error::{mdbx_result, Result}, transaction::TransactionKind, Environment, Transaction, }; @@ -31,12 +31,8 @@ impl Database { let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() }; let mut dbi: ffi::MDBX_dbi = 0; txn.txn_execute(|txn_ptr| { - mdbx_result_with_tx_kind::( - unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) }, - txn_ptr, - txn.env().txn_manager(), - ) - })?; + mdbx_result(unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) }) + })??; Ok(Self::new_from_ptr(dbi, txn.env().clone())) } diff --git a/crates/storage/libmdbx-rs/src/environment.rs b/crates/storage/libmdbx-rs/src/environment.rs index 91bf80edb..c4ca89113 100644 --- a/crates/storage/libmdbx-rs/src/environment.rs +++ b/crates/storage/libmdbx-rs/src/environment.rs @@ -88,6 +88,12 @@ impl Environment { &self.inner.txn_manager } + /// Returns the number of timed out transactions that were not aborted by the user yet. + #[cfg(feature = "read-tx-timeouts")] + pub fn timed_out_not_aborted_transactions(&self) -> usize { + self.inner.txn_manager.timed_out_not_aborted_read_transactions().unwrap_or(0) + } + /// Create a read-only transaction for use with the environment. #[inline] pub fn begin_ro_txn(&self) -> Result> { diff --git a/crates/storage/libmdbx-rs/src/error.rs b/crates/storage/libmdbx-rs/src/error.rs index 22e440426..84a6ef361 100644 --- a/crates/storage/libmdbx-rs/src/error.rs +++ b/crates/storage/libmdbx-rs/src/error.rs @@ -1,4 +1,3 @@ -use crate::{txn_manager::TxnManager, TransactionKind}; use libc::c_int; use std::result; @@ -118,8 +117,9 @@ pub enum Error { /// [Mode::ReadOnly](crate::flags::Mode::ReadOnly), write transactions can't be opened. #[error("write transactions are not supported in read-only mode")] WriteTransactionUnsupportedInReadOnlyMode, - #[error("read transaction has been aborted by the transaction manager")] - ReadTransactionAborted, + /// Read transaction has been timed out. + #[error("read transaction has been timed out")] + ReadTransactionTimeout, /// Unknown error code. #[error("unknown error code")] Other(i32), @@ -193,9 +193,10 @@ impl Error { Error::DecodeErrorLenDiff | Error::DecodeError => ffi::MDBX_EINVAL, Error::Access => ffi::MDBX_EACCESS, Error::TooLarge => ffi::MDBX_TOO_LARGE, - Error::BadSignature | Error::ReadTransactionAborted => ffi::MDBX_EBADSIGN, + Error::BadSignature => ffi::MDBX_EBADSIGN, Error::WriteTransactionUnsupportedInReadOnlyMode => ffi::MDBX_EACCESS, Error::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS, + Error::ReadTransactionTimeout => -96000, // Custom non-MDBX error code Error::Other(err_code) => *err_code, } } @@ -216,33 +217,6 @@ pub(crate) fn mdbx_result(err_code: c_int) -> Result { } } -#[cfg(feature = "read-tx-timeouts")] -#[inline] -pub(crate) fn mdbx_result_with_tx_kind( - err_code: c_int, - txn: *mut ffi::MDBX_txn, - txn_manager: &TxnManager, -) -> Result { - if K::IS_READ_ONLY && - txn_manager.remove_aborted_read_transaction(txn).is_some() && - err_code == ffi::MDBX_EBADSIGN - { - return Err(Error::ReadTransactionAborted) - } - - mdbx_result(err_code) -} - -#[cfg(not(feature = "read-tx-timeouts"))] -#[inline] -pub(crate) fn mdbx_result_with_tx_kind( - err_code: c_int, - _txn: *mut ffi::MDBX_txn, - _txn_manager: &TxnManager, -) -> Result { - mdbx_result(err_code) -} - #[macro_export] macro_rules! mdbx_try_optional { ($expr:expr) => {{ diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index 9910d9dca..a819f8b9f 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -1,12 +1,12 @@ use crate::{ database::Database, environment::Environment, - error::{mdbx_result, mdbx_result_with_tx_kind, Result}, + error::{mdbx_result, Result}, flags::{DatabaseFlags, WriteFlags}, txn_manager::{TxnManagerMessage, TxnPtr}, Cursor, Error, Stat, TableObject, }; -use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE}; +use ffi::{mdbx_txn_renew, MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE}; use indexmap::IndexSet; use libc::{c_uint, c_void}; use parking_lot::Mutex; @@ -71,29 +71,27 @@ where pub(crate) fn new(env: Environment) -> Result { let mut txn: *mut ffi::MDBX_txn = ptr::null_mut(); unsafe { - mdbx_result_with_tx_kind::( - ffi::mdbx_txn_begin_ex( - env.env_ptr(), - ptr::null_mut(), - K::OPEN_FLAGS, - &mut txn, - ptr::null_mut(), - ), - txn, - env.txn_manager(), - )?; + mdbx_result(ffi::mdbx_txn_begin_ex( + env.env_ptr(), + ptr::null_mut(), + K::OPEN_FLAGS, + &mut txn, + ptr::null_mut(), + ))?; Ok(Self::new_from_ptr(env, txn)) } } - pub(crate) fn new_from_ptr(env: Environment, txn: *mut ffi::MDBX_txn) -> Self { + pub(crate) fn new_from_ptr(env: Environment, txn_ptr: *mut ffi::MDBX_txn) -> Self { + let txn = TransactionPtr::new(txn_ptr); + #[cfg(feature = "read-tx-timeouts")] if K::IS_READ_ONLY { - env.txn_manager().add_active_read_transaction(txn) + env.txn_manager().add_active_read_transaction(txn_ptr, txn.clone()) } let inner = TransactionInner { - txn: TransactionPtr::new(txn), + txn, primed_dbis: Mutex::new(IndexSet::new()), committed: AtomicBool::new(false), env, @@ -108,7 +106,7 @@ where /// The caller **must** ensure that the pointer is not used after the /// lifetime of the transaction. #[inline] - pub(crate) fn txn_execute(&self, f: F) -> T + pub fn txn_execute(&self, f: F) -> Result where F: FnOnce(*mut ffi::MDBX_txn) -> T, { @@ -117,32 +115,18 @@ where /// Returns a copy of the raw pointer to the underlying MDBX transaction. #[doc(hidden)] + #[cfg(test)] pub fn txn(&self) -> *mut ffi::MDBX_txn { self.inner.txn.txn } - /// Executes the given closure once - /// - /// This is only intended to be used when accessing mdbx ffi functions directly is required. - /// - /// The caller **must** ensure that the pointer is only used within the closure. - #[inline] - #[doc(hidden)] - pub fn with_raw_tx_ptr(&self, f: F) -> T - where - F: FnOnce(*mut ffi::MDBX_txn) -> T, - { - let _lock = self.inner.txn.lock.lock(); - f(self.inner.txn.txn) - } - /// Returns a raw pointer to the MDBX environment. pub fn env(&self) -> &Environment { &self.inner.env } /// Returns the transaction id. - pub fn id(&self) -> u64 { + pub fn id(&self) -> Result { self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) }) } @@ -168,7 +152,7 @@ where ffi::MDBX_NOTFOUND => Ok(None), err_code => Err(Error::from_err_code(err_code)), } - }) + })? } /// Commits the transaction. @@ -191,11 +175,9 @@ where self.env().txn_manager().remove_active_read_transaction(txn); let mut latency = CommitLatency::new(); - mdbx_result_with_tx_kind::( - unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) }, - txn, - self.env().txn_manager(), - ) + mdbx_result(unsafe { + ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) + }) .map(|v| (v, latency)) } else { let (sender, rx) = sync_channel(0); @@ -204,7 +186,7 @@ where .send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender }); rx.recv().unwrap() } - }); + })?; self.inner.set_committed(); result @@ -243,12 +225,8 @@ where let mut flags: c_uint = 0; unsafe { self.txn_execute(|txn| { - mdbx_result_with_tx_kind::( - ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()), - txn, - self.env().txn_manager(), - ) - })?; + mdbx_result(ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut())) + })??; } // The types are not the same on Windows. Great! @@ -266,12 +244,8 @@ where unsafe { let mut stat = Stat::new(); self.txn_execute(|txn| { - mdbx_result_with_tx_kind::( - ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::()), - txn, - self.env().txn_manager(), - ) - })?; + mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::())) + })??; Ok(stat) } } @@ -290,7 +264,7 @@ where #[cfg(feature = "read-tx-timeouts")] pub fn disable_timeout(&self) { if K::IS_READ_ONLY { - self.env().txn_manager().remove_active_read_transaction(self.txn()); + self.env().txn_manager().remove_active_read_transaction(self.inner.txn.txn); } } } @@ -342,11 +316,11 @@ where } #[inline] - fn txn_execute(&self, f: F) -> T + fn txn_execute(&self, f: F) -> Result where F: FnOnce(*mut ffi::MDBX_txn) -> T, { - self.txn.txn_execute(f) + self.txn.txn_execute_fail_on_timeout(f) } } @@ -355,7 +329,9 @@ where K: TransactionKind, { fn drop(&mut self) { - self.txn_execute(|txn| { + // To be able to abort a timed out transaction, we need to renew it first. + // Hence the usage of `txn_execute_renew_on_timeout` here. + let _ = self.txn.txn_execute_renew_on_timeout(|txn| { if !self.has_committed() { if K::IS_READ_ONLY { #[cfg(feature = "read-tx-timeouts")] @@ -372,7 +348,7 @@ where rx.recv().unwrap().unwrap(); } } - }) + }); } } @@ -418,7 +394,7 @@ impl Transaction { ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void }; mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits()) - }))?; + })?)?; Ok(()) } @@ -447,7 +423,7 @@ impl Transaction { &mut data_val, flags.bits() | ffi::MDBX_RESERVE, ) - }))?; + })?)?; Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len)) } } @@ -482,7 +458,7 @@ impl Transaction { } else { unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) } } - }) + })? }) .map(|_| true) .or_else(|e| match e { @@ -493,7 +469,7 @@ impl Transaction { /// Empties the given database. All items will be removed. pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> { - mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) }))?; + mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) })?)?; Ok(()) } @@ -504,7 +480,7 @@ impl Transaction { /// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi /// BEFORE calling this function. pub unsafe fn drop_db(&self, db: Database) -> Result<()> { - mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true)))?; + mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true))?)?; Ok(()) } @@ -517,11 +493,7 @@ impl Transaction { /// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi /// BEFORE calling this function. pub unsafe fn close_db(&self, db: Database) -> Result<()> { - mdbx_result_with_tx_kind::( - ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()), - self.txn(), - self.env().txn_manager(), - )?; + self.txn_execute(|_| mdbx_result(ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi())))??; Ok(()) } @@ -542,12 +514,12 @@ impl Transaction { }); rx.recv().unwrap().map(|ptr| Transaction::new_from_ptr(self.env().clone(), ptr.0)) - }) + })? } } /// A shareable pointer to an MDBX transaction. -#[derive(Clone)] +#[derive(Debug, Clone)] pub(crate) struct TransactionPtr { txn: *mut ffi::MDBX_txn, lock: Arc>, @@ -558,14 +530,52 @@ impl TransactionPtr { Self { txn, lock: Arc::new(Mutex::new(())) } } + // Returns `true` if the transaction is timed out. + // + // When transaction is timed out via `TxnManager`, it's actually reset using + // `mdbx_txn_reset`. It makes the transaction unusable (MDBX fails on any usages of such + // transactions), and sets the `MDBX_TXN_FINISHED` flag. + fn is_timed_out(&self) -> bool { + (unsafe { ffi::mdbx_txn_flags(self.txn) } & ffi::MDBX_TXN_FINISHED) != 0 + } + /// Executes the given closure once the lock on the transaction is acquired. + /// + /// Returns the result of the closure or an error if the transaction is timed out. #[inline] - pub(crate) fn txn_execute(&self, f: F) -> T + pub(crate) fn txn_execute_fail_on_timeout(&self, f: F) -> Result where F: FnOnce(*mut ffi::MDBX_txn) -> T, { let _lck = self.lock.lock(); - (f)(self.txn) + + // No race condition with the `TxnManager` timing out the transaction is possible here, + // because we're taking a lock for any actions on the transaction pointer, including a call + // to the `mdbx_txn_reset`. + if self.is_timed_out() { + return Err(Error::ReadTransactionTimeout) + } + + Ok((f)(self.txn)) + } + + /// Executes the given closure once the lock on the transaction is acquired. If the tranasction + /// is timed out, it will be renewed first. + /// + /// Returns the result of the closure or an error if the transaction renewal fails. + #[inline] + fn txn_execute_renew_on_timeout(&self, f: F) -> Result + where + F: FnOnce(*mut ffi::MDBX_txn) -> T, + { + let _lck = self.lock.lock(); + + // To be able to do any operations on the transaction, we need to renew it first. + if self.is_timed_out() { + mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?; + } + + Ok((f)(self.txn)) } } diff --git a/crates/storage/libmdbx-rs/src/txn_manager.rs b/crates/storage/libmdbx-rs/src/txn_manager.rs index bf46680b8..82d53d908 100644 --- a/crates/storage/libmdbx-rs/src/txn_manager.rs +++ b/crates/storage/libmdbx-rs/src/txn_manager.rs @@ -52,9 +52,6 @@ impl TxnManager { /// - [TxnManagerMessage::Abort] aborts a transaction with [ffi::mdbx_txn_abort] /// - [TxnManagerMessage::Commit] commits a transaction with [ffi::mdbx_txn_commit_ex] fn start_message_listener(&self, env: EnvPtr, rx: Receiver) { - #[cfg(feature = "read-tx-timeouts")] - let read_transactions = self.read_transactions.clone(); - std::thread::spawn(move || { #[allow(clippy::redundant_locals)] let env = env; @@ -73,34 +70,12 @@ impl TxnManager { ) }) .map(|_| TxnPtr(txn)); - - #[cfg(feature = "read-tx-timeouts")] - { - use crate::transaction::TransactionKind; - - if res.is_ok() && flags == crate::transaction::RO::OPEN_FLAGS { - if let Some(read_transactions) = &read_transactions { - read_transactions.add_active(txn); - } - } - } - sender.send(res).unwrap(); } TxnManagerMessage::Abort { tx, sender } => { - #[cfg(feature = "read-tx-timeouts")] - if let Some(read_transactions) = &read_transactions { - read_transactions.remove_active(tx.0); - } - sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap(); } TxnManagerMessage::Commit { tx, sender } => { - #[cfg(feature = "read-tx-timeouts")] - if let Some(read_transactions) = &read_transactions { - read_transactions.remove_active(tx.0); - } - sender .send({ let mut latency = CommitLatency::new(); @@ -125,7 +100,10 @@ impl TxnManager { #[cfg(feature = "read-tx-timeouts")] mod read_transactions { - use crate::{environment::EnvPtr, error::mdbx_result, txn_manager::TxnManager, Error}; + use crate::{ + environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr, + txn_manager::TxnManager, + }; use dashmap::{DashMap, DashSet}; use std::{ sync::{mpsc::sync_channel, Arc}, @@ -155,9 +133,13 @@ mod read_transactions { } /// Adds a new transaction to the list of active read transactions. - pub(crate) fn add_active_read_transaction(&self, ptr: *mut ffi::MDBX_txn) { + pub(crate) fn add_active_read_transaction( + &self, + ptr: *mut ffi::MDBX_txn, + tx: TransactionPtr, + ) { if let Some(read_transactions) = &self.read_transactions { - read_transactions.add_active(ptr); + read_transactions.add_active(ptr, tx); } } @@ -165,16 +147,15 @@ mod read_transactions { pub(crate) fn remove_active_read_transaction( &self, ptr: *mut ffi::MDBX_txn, - ) -> Option<(usize, Instant)> { + ) -> Option<(usize, (TransactionPtr, Instant))> { self.read_transactions.as_ref()?.remove_active(ptr) } - /// Removes a transaction from the list of aborted read transactions. - pub(crate) fn remove_aborted_read_transaction( - &self, - ptr: *mut ffi::MDBX_txn, - ) -> Option { - self.read_transactions.as_ref()?.remove_aborted(ptr) + /// Returns the number of timed out transactions that were not aborted by the user yet. + pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option { + self.read_transactions + .as_ref() + .map(|read_transactions| read_transactions.timed_out_not_aborted()) } } @@ -187,13 +168,10 @@ mod read_transactions { /// /// We store `usize` instead of a raw pointer as a key, because pointers are not /// comparable. The time of transaction opening is stored as a value. - active: DashMap, - /// List of read transactions aborted by the [ReadTransactions::start_monitor]. - /// We keep them until user tries to abort the transaction, so we're able to report a nice - /// [Error::ReadTransactionAborted] error. - /// - /// We store `usize` instead of a raw pointer, because pointers are not comparable. - aborted: DashSet, + active: DashMap, + /// List of timed out transactions that were not aborted by the user yet, hence have a + /// dangling read transaction pointer. + timed_out_not_aborted: DashSet, } impl ReadTransactions { @@ -202,59 +180,70 @@ mod read_transactions { } /// Adds a new transaction to the list of active read transactions. - pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn) { - let _ = self.active.insert(ptr as usize, Instant::now()); + pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) { + let _ = self.active.insert(ptr as usize, (tx, Instant::now())); } /// Removes a transaction from the list of active read transactions. - pub(super) fn remove_active(&self, ptr: *mut ffi::MDBX_txn) -> Option<(usize, Instant)> { + pub(super) fn remove_active( + &self, + ptr: *mut ffi::MDBX_txn, + ) -> Option<(usize, (TransactionPtr, Instant))> { + self.timed_out_not_aborted.remove(&(ptr as usize)); self.active.remove(&(ptr as usize)) } - /// Adds a new transaction to the list of aborted read transactions. - pub(super) fn add_aborted(&self, ptr: *mut ffi::MDBX_txn) { - self.aborted.insert(ptr as usize); - } - - /// Removes a transaction from the list of aborted read transactions. - pub(super) fn remove_aborted(&self, ptr: *mut ffi::MDBX_txn) -> Option { - self.aborted.remove(&(ptr as usize)) + /// Returns the number of timed out transactions that were not aborted by the user yet. + pub(super) fn timed_out_not_aborted(&self) -> usize { + self.timed_out_not_aborted.len() } /// Spawns a new thread with [std::thread::spawn] that monitors the list of active read - /// transactions and aborts those that are open for longer than + /// transactions and timeouts those that are open for longer than /// `ReadTransactions.max_duration`. - /// - /// Aborted transaction pointers are placed into the list of aborted read transactions, and - /// removed from this list by [crate::error::mdbx_result_with_tx_kind] when the user tries - /// to use it. pub(super) fn start_monitor(self: Arc) { std::thread::spawn(move || { - let mut aborted_active = Vec::new(); + let mut timed_out_active = Vec::new(); loop { let now = Instant::now(); let mut max_active_transaction_duration = None; - // Iterate through active read transactions and abort those that's open for + // Iterate through active read transactions and time out those that's open for // longer than `self.max_duration`. for entry in self.active.iter() { - let (ptr, start) = entry.pair(); + let (tx, start) = entry.value(); let duration = now - *start; if duration > self.max_duration { - let ptr = *ptr as *mut ffi::MDBX_txn; + let result = tx.txn_execute_fail_on_timeout(|txn_ptr| { + ( + txn_ptr, + duration, + // Time out the transaction. + // + // We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to + // prevent MDBX from reusing the pointer of the aborted + // transaction for new read-only transactions. This is + // important because we store the pointer in the `active` list + // and assume that it is unique. + // + // See https://erthink.github.io/libmdbx/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info. + mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) }), + ) + }); - // Add the transaction to the list of aborted transactions, so further - // usages report the correct error when the transaction is closed. - self.add_aborted(ptr); - - // Abort the transaction - let result = mdbx_result(unsafe { ffi::mdbx_txn_abort(ptr) }); - - // Add the transaction to `aborted_active`. We can't remove it instantly - // from the list of active transactions, because we iterate through it. - aborted_active.push((ptr, duration, result.err())); + match result { + Ok((txn_ptr, duration, error)) => { + // Add the transaction to `timed_out_active`. We can't remove it + // instantly from the list of active transactions, because we + // iterate through it. + timed_out_active.push((txn_ptr, duration, error)); + } + Err(err) => { + error!(target: "libmdbx", %err, "Failed to abort the long-lived read transaction") + } + } } else { max_active_transaction_duration = Some( duration.max(max_active_transaction_duration.unwrap_or_default()), @@ -262,40 +251,38 @@ mod read_transactions { } } - // Walk through aborted transactions, and delete them from the list of active + // Walk through timed out transactions, and delete them from the list of active // transactions. - for (ptr, open_duration, err) in aborted_active.iter().copied() { + for (ptr, open_duration, err) in timed_out_active.iter().copied() { // Try deleting the transaction from the list of active transactions. let was_in_active = self.remove_active(ptr).is_some(); - if let Some(err) = err { - // If there was an error when aborting the transaction, we need to - // remove it from the list of aborted transactions, because otherwise it - // will stay there forever. - self.remove_aborted(ptr); - if was_in_active && err != Error::BadSignature { - // If the transaction was in the list of active transactions and the - // error code is not `EBADSIGN`, then user didn't abort it. - error!(target: "libmdbx", %err, ?open_duration, "Failed to abort the long-lived read transactions"); + if let Err(err) = err { + if was_in_active { + // If the transaction was in the list of active transactions, + // then user didn't abort it and we failed to do so. + error!(target: "libmdbx", %err, ?open_duration, "Failed to time out the long-lived read transaction"); } } else { - // Happy path, the transaction has been aborted by us with no errors. - warn!(target: "libmdbx", ?open_duration, "Long-lived read transactions has been aborted"); + // Happy path, the transaction has been timed out by us with no errors. + warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been timed out"); + // Add transaction to the list of timed out transactions that were not + // aborted by the user yet. + self.timed_out_not_aborted.insert(ptr as usize); } } - // Clear the list of aborted transactions, but not de-allocate the reserved + // Clear the list of timed out transactions, but not de-allocate the reserved // capacity to save on further pushes. - aborted_active.clear(); + timed_out_active.clear(); - if !self.active.is_empty() || !self.aborted.is_empty() { + if !self.active.is_empty() { trace!( target: "libmdbx", elapsed = ?now.elapsed(), active = ?self.active.iter().map(|entry| { - let (ptr, start) = entry.pair(); - (*ptr, start.elapsed()) + let (tx, start) = entry.value(); + (tx.clone(), start.elapsed()) }).collect::>(), - aborted = ?self.aborted.iter().map(|entry| *entry).collect::>(), "Read transactions" ); } @@ -315,12 +302,10 @@ mod read_transactions { #[cfg(test)] mod tests { use crate::{ - txn_manager::{ - read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, TxnManagerMessage, TxnPtr, - }, - Environment, Error, MaxReadTransactionDuration, TransactionKind, RO, + txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error, + MaxReadTransactionDuration, }; - use std::{ptr, sync::mpsc::sync_channel, thread::sleep, time::Duration}; + use std::{thread::sleep, time::Duration}; use tempfile::tempdir; #[test] @@ -345,7 +330,6 @@ mod read_transactions { drop(tx); assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert!(!read_transactions.aborted.contains(&tx_ptr)); } // Create a read-only transaction, successfully use it, close it by committing. @@ -358,24 +342,43 @@ mod read_transactions { tx.commit().unwrap(); assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert!(!read_transactions.aborted.contains(&tx_ptr)); } - // Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the - // manager kills it, use it and observe the `Error::ReadTransactionAborted` error. { + // Create a read-only transaction and observe it's in the list of active + // transactions. let tx = env.begin_ro_txn().unwrap(); let tx_ptr = tx.txn() as usize; assert!(read_transactions.active.contains_key(&tx_ptr)); + // Wait until the transaction is timed out by the manager. sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL); + // Ensure that the transaction is not in the list of active transactions anymore, + // and is in the list of timed out but not aborted transactions. assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert!(read_transactions.aborted.contains(&tx_ptr)); + assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr)); - assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionAborted)); + // Use the timed out transaction and observe the `Error::ReadTransactionTimeout` + assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout)); assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert!(!read_transactions.aborted.contains(&tx_ptr)); + assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr)); + + assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout)); + assert!(!read_transactions.active.contains_key(&tx_ptr)); + assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr)); + + // Ensure that the transaction pointer is not reused when opening a new read-only + // transaction. + let new_tx = env.begin_ro_txn().unwrap(); + let new_tx_ptr = new_tx.txn() as usize; + assert!(read_transactions.active.contains_key(&new_tx_ptr)); + assert_ne!(tx_ptr, new_tx_ptr); + + // Drop the transaction and ensure that it's not in the list of timed out but not + // aborted transactions anymore. + drop(tx); + assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr)); } } @@ -393,64 +396,5 @@ mod read_transactions { sleep(READ_TRANSACTIONS_CHECK_INTERVAL); assert!(tx.commit().is_ok()) } - - #[test] - fn txn_manager_begin_read_transaction_via_message_listener() { - const MAX_DURATION: Duration = Duration::from_secs(1); - - let dir = tempdir().unwrap(); - let env = Environment::builder() - .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION)) - .open(dir.path()) - .unwrap(); - - let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap(); - - // Create a read-only transaction via the message listener. - let (tx, rx) = sync_channel(0); - env.txn_manager().send_message(TxnManagerMessage::Begin { - parent: TxnPtr(ptr::null_mut()), - flags: RO::OPEN_FLAGS, - sender: tx, - }); - - let txn_ptr = rx.recv().unwrap().unwrap(); - - assert!(read_transactions.active.contains_key(&(txn_ptr.0 as usize))); - } - - #[test] - fn txn_manager_reassign_transaction_removes_from_aborted_transactions() { - const MAX_DURATION: Duration = Duration::from_secs(1); - - let dir = tempdir().unwrap(); - let env = Environment::builder() - .set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION)) - .open(dir.path()) - .unwrap(); - - let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap(); - - // Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the - // manager kills it, use it and observe the `Error::ReadTransactionAborted` error. - { - let tx = env.begin_ro_txn().unwrap(); - let tx_ptr = tx.txn() as usize; - assert!(read_transactions.active.contains_key(&tx_ptr)); - - sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL); - - assert!(!read_transactions.active.contains_key(&tx_ptr)); - assert!(read_transactions.aborted.contains(&tx_ptr)); - } - - // Create a read-only transaction, ensure this removes it from aborted set if mdbx - // reassigns same recently aborted transaction pointer. - { - let tx = env.begin_ro_txn().unwrap(); - let tx_ptr = tx.txn() as usize; - assert!(!read_transactions.aborted.contains(&tx_ptr)); - } - } } }