feat(storage): use mdbx_txn_reset to time out transactions (#6850)

Co-authored-by: Emilia Hane <elsaemiliaevahane@gmail.com>
This commit is contained in:
Alexey Shekhirin
2024-02-29 20:44:56 +00:00
committed by GitHub
parent 771951429e
commit 74dc0e36d3
11 changed files with 289 additions and 335 deletions

View File

@ -78,8 +78,7 @@ fn bench_get_seq_raw(c: &mut Criterion) {
let (_dir, env) = setup_bench_db(n);
let dbi = env.begin_ro_txn().unwrap().open_db(None).unwrap().dbi();
let _txn = env.begin_ro_txn().unwrap();
let txn = _txn.txn();
let txn = env.begin_ro_txn().unwrap();
let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
@ -87,18 +86,21 @@ fn bench_get_seq_raw(c: &mut Criterion) {
c.bench_function("bench_get_seq_raw", |b| {
b.iter(|| unsafe {
mdbx_cursor_open(txn, dbi, &mut cursor);
let mut i = 0;
let mut count = 0u32;
txn.txn_execute(|txn| {
mdbx_cursor_open(txn, dbi, &mut cursor);
let mut i = 0;
let mut count = 0u32;
while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 {
i += key.iov_len + data.iov_len;
count += 1;
}
while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 {
i += key.iov_len + data.iov_len;
count += 1;
}
black_box(i);
assert_eq!(count, n);
mdbx_cursor_close(cursor);
black_box(i);
assert_eq!(count, n);
mdbx_cursor_close(cursor);
})
.unwrap();
})
});
}

View File

@ -46,7 +46,7 @@ fn bench_get_rand_raw(c: &mut Criterion) {
c.bench_function("bench_get_rand_raw", |b| {
b.iter(|| unsafe {
txn.with_raw_tx_ptr(|txn| {
txn.txn_execute(|txn| {
let mut i: size_t = 0;
for key in &keys {
key_val.iov_len = key.len() as size_t;
@ -57,7 +57,8 @@ fn bench_get_rand_raw(c: &mut Criterion) {
i += key_val.iov_len;
}
black_box(i);
});
})
.unwrap();
})
});
}

View File

@ -1,5 +1,5 @@
use crate::{
error::{mdbx_result, mdbx_result_with_tx_kind, Error, Result},
error::{mdbx_result, Error, Result},
flags::*,
mdbx_try_optional,
transaction::{TransactionKind, RW},
@ -30,26 +30,26 @@ where
pub(crate) fn new(txn: Transaction<K>, dbi: ffi::MDBX_dbi) -> Result<Self> {
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
unsafe {
mdbx_result_with_tx_kind::<K>(
txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)),
txn.txn(),
txn.env().txn_manager(),
)?;
txn.txn_execute(|txn_ptr| {
mdbx_result(ffi::mdbx_cursor_open(txn_ptr, dbi, &mut cursor))
})??;
}
Ok(Self { txn, cursor })
}
fn new_at_position(other: &Self) -> Result<Self> {
unsafe {
let cursor = ffi::mdbx_cursor_create(ptr::null_mut());
other.txn.txn_execute(|_| {
let cursor = ffi::mdbx_cursor_create(ptr::null_mut());
let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);
let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);
let s = Self { txn: other.txn.clone(), cursor };
let s = Self { txn: other.txn.clone(), cursor };
mdbx_result_with_tx_kind::<K>(res, s.txn.txn(), s.txn.env().txn_manager())?;
mdbx_result(res)?;
Ok(s)
Ok(s)
})?
}
}
@ -95,11 +95,12 @@ where
let key_ptr = key_val.iov_base;
let data_ptr = data_val.iov_base;
self.txn.txn_execute(|txn| {
let v = mdbx_result_with_tx_kind::<K>(
ffi::mdbx_cursor_get(self.cursor, &mut key_val, &mut data_val, op),
txn,
self.txn.env().txn_manager(),
)?;
let v = mdbx_result(ffi::mdbx_cursor_get(
self.cursor,
&mut key_val,
&mut data_val,
op,
))?;
assert_ne!(data_ptr, data_val.iov_base);
let key_out = {
// MDBX wrote in new key
@ -111,7 +112,7 @@ where
};
let data_out = Value::decode_val::<K>(txn, data_val)?;
Ok((key_out, data_out, v))
})
})?
}
}
@ -444,7 +445,7 @@ impl Cursor<RW> {
mdbx_result(unsafe {
self.txn.txn_execute(|_| {
ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits())
})
})?
})?;
Ok(())
@ -458,7 +459,7 @@ impl Cursor<RW> {
/// current key, if the database was opened with [DatabaseFlags::DUP_SORT].
pub fn del(&mut self, flags: WriteFlags) -> Result<()> {
mdbx_result(unsafe {
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))?
})?;
Ok(())
@ -470,7 +471,7 @@ where
K: TransactionKind,
{
fn clone(&self) -> Self {
self.txn.txn_execute(|_| Self::new_at_position(self).unwrap())
Self::new_at_position(self).unwrap()
}
}
@ -488,7 +489,7 @@ where
K: TransactionKind,
{
fn drop(&mut self) {
self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) })
let _ = self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) });
}
}
@ -564,7 +565,7 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, *next_op);
unsafe {
cursor.txn.txn_execute(|txn| {
let result = cursor.txn.txn_execute(|txn| {
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
ffi::MDBX_SUCCESS => {
let key = match Key::decode_val::<K>(txn, key) {
@ -583,7 +584,11 @@ where
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
error => Some(Err(Error::from_err_code(error))),
}
})
});
match result {
Ok(result) => result,
Err(err) => Some(Err(err)),
}
}
}
Self::Err(err) => err.take().map(Err),
@ -655,7 +660,7 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, *next_op);
unsafe {
cursor.txn.txn_execute(|txn| {
let result = cursor.txn.txn_execute(|txn| {
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
ffi::MDBX_SUCCESS => {
let key = match Key::decode_val::<K>(txn, key) {
@ -674,7 +679,11 @@ where
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
error => Some(Err(Error::from_err_code(error))),
}
})
});
match result {
Ok(result) => result,
Err(err) => Some(Err(err)),
}
}
}
Iter::Err(err) => err.take().map(Err),
@ -752,17 +761,15 @@ where
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
let op = mem::replace(op, ffi::MDBX_NEXT_NODUP);
cursor.txn.txn_execute(|_| {
let err_code =
unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) };
let err_code =
unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) };
(err_code == ffi::MDBX_SUCCESS).then(|| {
IntoIter::new(
Cursor::new_at_position(&**cursor).unwrap(),
ffi::MDBX_GET_CURRENT,
ffi::MDBX_NEXT_DUP,
)
})
(err_code == ffi::MDBX_SUCCESS).then(|| {
IntoIter::new(
Cursor::new_at_position(&**cursor).unwrap(),
ffi::MDBX_GET_CURRENT,
ffi::MDBX_NEXT_DUP,
)
})
}
IterDup::Err(err) => err.take().map(|e| IntoIter::Err(Some(e))),

View File

@ -1,5 +1,5 @@
use crate::{
error::{mdbx_result_with_tx_kind, Result},
error::{mdbx_result, Result},
transaction::TransactionKind,
Environment, Transaction,
};
@ -31,12 +31,8 @@ impl Database {
let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() };
let mut dbi: ffi::MDBX_dbi = 0;
txn.txn_execute(|txn_ptr| {
mdbx_result_with_tx_kind::<K>(
unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) },
txn_ptr,
txn.env().txn_manager(),
)
})?;
mdbx_result(unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) })
})??;
Ok(Self::new_from_ptr(dbi, txn.env().clone()))
}

View File

@ -88,6 +88,12 @@ impl Environment {
&self.inner.txn_manager
}
/// Returns the number of timed out transactions that were not aborted by the user yet.
#[cfg(feature = "read-tx-timeouts")]
pub fn timed_out_not_aborted_transactions(&self) -> usize {
self.inner.txn_manager.timed_out_not_aborted_read_transactions().unwrap_or(0)
}
/// Create a read-only transaction for use with the environment.
#[inline]
pub fn begin_ro_txn(&self) -> Result<Transaction<RO>> {

View File

@ -1,4 +1,3 @@
use crate::{txn_manager::TxnManager, TransactionKind};
use libc::c_int;
use std::result;
@ -118,8 +117,9 @@ pub enum Error {
/// [Mode::ReadOnly](crate::flags::Mode::ReadOnly), write transactions can't be opened.
#[error("write transactions are not supported in read-only mode")]
WriteTransactionUnsupportedInReadOnlyMode,
#[error("read transaction has been aborted by the transaction manager")]
ReadTransactionAborted,
/// Read transaction has been timed out.
#[error("read transaction has been timed out")]
ReadTransactionTimeout,
/// Unknown error code.
#[error("unknown error code")]
Other(i32),
@ -193,9 +193,10 @@ impl Error {
Error::DecodeErrorLenDiff | Error::DecodeError => ffi::MDBX_EINVAL,
Error::Access => ffi::MDBX_EACCESS,
Error::TooLarge => ffi::MDBX_TOO_LARGE,
Error::BadSignature | Error::ReadTransactionAborted => ffi::MDBX_EBADSIGN,
Error::BadSignature => ffi::MDBX_EBADSIGN,
Error::WriteTransactionUnsupportedInReadOnlyMode => ffi::MDBX_EACCESS,
Error::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS,
Error::ReadTransactionTimeout => -96000, // Custom non-MDBX error code
Error::Other(err_code) => *err_code,
}
}
@ -216,33 +217,6 @@ pub(crate) fn mdbx_result(err_code: c_int) -> Result<bool> {
}
}
#[cfg(feature = "read-tx-timeouts")]
#[inline]
pub(crate) fn mdbx_result_with_tx_kind<K: TransactionKind>(
err_code: c_int,
txn: *mut ffi::MDBX_txn,
txn_manager: &TxnManager,
) -> Result<bool> {
if K::IS_READ_ONLY &&
txn_manager.remove_aborted_read_transaction(txn).is_some() &&
err_code == ffi::MDBX_EBADSIGN
{
return Err(Error::ReadTransactionAborted)
}
mdbx_result(err_code)
}
#[cfg(not(feature = "read-tx-timeouts"))]
#[inline]
pub(crate) fn mdbx_result_with_tx_kind<K: TransactionKind>(
err_code: c_int,
_txn: *mut ffi::MDBX_txn,
_txn_manager: &TxnManager,
) -> Result<bool> {
mdbx_result(err_code)
}
#[macro_export]
macro_rules! mdbx_try_optional {
($expr:expr) => {{

View File

@ -1,12 +1,12 @@
use crate::{
database::Database,
environment::Environment,
error::{mdbx_result, mdbx_result_with_tx_kind, Result},
error::{mdbx_result, Result},
flags::{DatabaseFlags, WriteFlags},
txn_manager::{TxnManagerMessage, TxnPtr},
Cursor, Error, Stat, TableObject,
};
use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
use ffi::{mdbx_txn_renew, MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
use indexmap::IndexSet;
use libc::{c_uint, c_void};
use parking_lot::Mutex;
@ -71,29 +71,27 @@ where
pub(crate) fn new(env: Environment) -> Result<Self> {
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
unsafe {
mdbx_result_with_tx_kind::<K>(
ffi::mdbx_txn_begin_ex(
env.env_ptr(),
ptr::null_mut(),
K::OPEN_FLAGS,
&mut txn,
ptr::null_mut(),
),
txn,
env.txn_manager(),
)?;
mdbx_result(ffi::mdbx_txn_begin_ex(
env.env_ptr(),
ptr::null_mut(),
K::OPEN_FLAGS,
&mut txn,
ptr::null_mut(),
))?;
Ok(Self::new_from_ptr(env, txn))
}
}
pub(crate) fn new_from_ptr(env: Environment, txn: *mut ffi::MDBX_txn) -> Self {
pub(crate) fn new_from_ptr(env: Environment, txn_ptr: *mut ffi::MDBX_txn) -> Self {
let txn = TransactionPtr::new(txn_ptr);
#[cfg(feature = "read-tx-timeouts")]
if K::IS_READ_ONLY {
env.txn_manager().add_active_read_transaction(txn)
env.txn_manager().add_active_read_transaction(txn_ptr, txn.clone())
}
let inner = TransactionInner {
txn: TransactionPtr::new(txn),
txn,
primed_dbis: Mutex::new(IndexSet::new()),
committed: AtomicBool::new(false),
env,
@ -108,7 +106,7 @@ where
/// The caller **must** ensure that the pointer is not used after the
/// lifetime of the transaction.
#[inline]
pub(crate) fn txn_execute<F, T>(&self, f: F) -> T
pub fn txn_execute<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(*mut ffi::MDBX_txn) -> T,
{
@ -117,32 +115,18 @@ where
/// Returns a copy of the raw pointer to the underlying MDBX transaction.
#[doc(hidden)]
#[cfg(test)]
pub fn txn(&self) -> *mut ffi::MDBX_txn {
self.inner.txn.txn
}
/// Executes the given closure once
///
/// This is only intended to be used when accessing mdbx ffi functions directly is required.
///
/// The caller **must** ensure that the pointer is only used within the closure.
#[inline]
#[doc(hidden)]
pub fn with_raw_tx_ptr<F, T>(&self, f: F) -> T
where
F: FnOnce(*mut ffi::MDBX_txn) -> T,
{
let _lock = self.inner.txn.lock.lock();
f(self.inner.txn.txn)
}
/// Returns a raw pointer to the MDBX environment.
pub fn env(&self) -> &Environment {
&self.inner.env
}
/// Returns the transaction id.
pub fn id(&self) -> u64 {
pub fn id(&self) -> Result<u64> {
self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) })
}
@ -168,7 +152,7 @@ where
ffi::MDBX_NOTFOUND => Ok(None),
err_code => Err(Error::from_err_code(err_code)),
}
})
})?
}
/// Commits the transaction.
@ -191,11 +175,9 @@ where
self.env().txn_manager().remove_active_read_transaction(txn);
let mut latency = CommitLatency::new();
mdbx_result_with_tx_kind::<K>(
unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) },
txn,
self.env().txn_manager(),
)
mdbx_result(unsafe {
ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency())
})
.map(|v| (v, latency))
} else {
let (sender, rx) = sync_channel(0);
@ -204,7 +186,7 @@ where
.send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender });
rx.recv().unwrap()
}
});
})?;
self.inner.set_committed();
result
@ -243,12 +225,8 @@ where
let mut flags: c_uint = 0;
unsafe {
self.txn_execute(|txn| {
mdbx_result_with_tx_kind::<K>(
ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()),
txn,
self.env().txn_manager(),
)
})?;
mdbx_result(ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()))
})??;
}
// The types are not the same on Windows. Great!
@ -266,12 +244,8 @@ where
unsafe {
let mut stat = Stat::new();
self.txn_execute(|txn| {
mdbx_result_with_tx_kind::<K>(
ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>()),
txn,
self.env().txn_manager(),
)
})?;
mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>()))
})??;
Ok(stat)
}
}
@ -290,7 +264,7 @@ where
#[cfg(feature = "read-tx-timeouts")]
pub fn disable_timeout(&self) {
if K::IS_READ_ONLY {
self.env().txn_manager().remove_active_read_transaction(self.txn());
self.env().txn_manager().remove_active_read_transaction(self.inner.txn.txn);
}
}
}
@ -342,11 +316,11 @@ where
}
#[inline]
fn txn_execute<F, T>(&self, f: F) -> T
fn txn_execute<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(*mut ffi::MDBX_txn) -> T,
{
self.txn.txn_execute(f)
self.txn.txn_execute_fail_on_timeout(f)
}
}
@ -355,7 +329,9 @@ where
K: TransactionKind,
{
fn drop(&mut self) {
self.txn_execute(|txn| {
// To be able to abort a timed out transaction, we need to renew it first.
// Hence the usage of `txn_execute_renew_on_timeout` here.
let _ = self.txn.txn_execute_renew_on_timeout(|txn| {
if !self.has_committed() {
if K::IS_READ_ONLY {
#[cfg(feature = "read-tx-timeouts")]
@ -372,7 +348,7 @@ where
rx.recv().unwrap().unwrap();
}
}
})
});
}
}
@ -418,7 +394,7 @@ impl Transaction<RW> {
ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void };
mdbx_result(self.txn_execute(|txn| unsafe {
ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits())
}))?;
})?)?;
Ok(())
}
@ -447,7 +423,7 @@ impl Transaction<RW> {
&mut data_val,
flags.bits() | ffi::MDBX_RESERVE,
)
}))?;
})?)?;
Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len))
}
}
@ -482,7 +458,7 @@ impl Transaction<RW> {
} else {
unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) }
}
})
})?
})
.map(|_| true)
.or_else(|e| match e {
@ -493,7 +469,7 @@ impl Transaction<RW> {
/// Empties the given database. All items will be removed.
pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) }))?;
mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) })?)?;
Ok(())
}
@ -504,7 +480,7 @@ impl Transaction<RW> {
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
/// BEFORE calling this function.
pub unsafe fn drop_db(&self, db: Database) -> Result<()> {
mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true)))?;
mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true))?)?;
Ok(())
}
@ -517,11 +493,7 @@ impl Transaction<RO> {
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
/// BEFORE calling this function.
pub unsafe fn close_db(&self, db: Database) -> Result<()> {
mdbx_result_with_tx_kind::<RO>(
ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()),
self.txn(),
self.env().txn_manager(),
)?;
self.txn_execute(|_| mdbx_result(ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi())))??;
Ok(())
}
@ -542,12 +514,12 @@ impl Transaction<RW> {
});
rx.recv().unwrap().map(|ptr| Transaction::new_from_ptr(self.env().clone(), ptr.0))
})
})?
}
}
/// A shareable pointer to an MDBX transaction.
#[derive(Clone)]
#[derive(Debug, Clone)]
pub(crate) struct TransactionPtr {
txn: *mut ffi::MDBX_txn,
lock: Arc<Mutex<()>>,
@ -558,14 +530,52 @@ impl TransactionPtr {
Self { txn, lock: Arc::new(Mutex::new(())) }
}
// Returns `true` if the transaction is timed out.
//
// When transaction is timed out via `TxnManager`, it's actually reset using
// `mdbx_txn_reset`. It makes the transaction unusable (MDBX fails on any usages of such
// transactions), and sets the `MDBX_TXN_FINISHED` flag.
fn is_timed_out(&self) -> bool {
(unsafe { ffi::mdbx_txn_flags(self.txn) } & ffi::MDBX_TXN_FINISHED) != 0
}
/// Executes the given closure once the lock on the transaction is acquired.
///
/// Returns the result of the closure or an error if the transaction is timed out.
#[inline]
pub(crate) fn txn_execute<F, T>(&self, f: F) -> T
pub(crate) fn txn_execute_fail_on_timeout<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(*mut ffi::MDBX_txn) -> T,
{
let _lck = self.lock.lock();
(f)(self.txn)
// No race condition with the `TxnManager` timing out the transaction is possible here,
// because we're taking a lock for any actions on the transaction pointer, including a call
// to the `mdbx_txn_reset`.
if self.is_timed_out() {
return Err(Error::ReadTransactionTimeout)
}
Ok((f)(self.txn))
}
/// Executes the given closure once the lock on the transaction is acquired. If the tranasction
/// is timed out, it will be renewed first.
///
/// Returns the result of the closure or an error if the transaction renewal fails.
#[inline]
fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
where
F: FnOnce(*mut ffi::MDBX_txn) -> T,
{
let _lck = self.lock.lock();
// To be able to do any operations on the transaction, we need to renew it first.
if self.is_timed_out() {
mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?;
}
Ok((f)(self.txn))
}
}

View File

@ -52,9 +52,6 @@ impl TxnManager {
/// - [TxnManagerMessage::Abort] aborts a transaction with [ffi::mdbx_txn_abort]
/// - [TxnManagerMessage::Commit] commits a transaction with [ffi::mdbx_txn_commit_ex]
fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
#[cfg(feature = "read-tx-timeouts")]
let read_transactions = self.read_transactions.clone();
std::thread::spawn(move || {
#[allow(clippy::redundant_locals)]
let env = env;
@ -73,34 +70,12 @@ impl TxnManager {
)
})
.map(|_| TxnPtr(txn));
#[cfg(feature = "read-tx-timeouts")]
{
use crate::transaction::TransactionKind;
if res.is_ok() && flags == crate::transaction::RO::OPEN_FLAGS {
if let Some(read_transactions) = &read_transactions {
read_transactions.add_active(txn);
}
}
}
sender.send(res).unwrap();
}
TxnManagerMessage::Abort { tx, sender } => {
#[cfg(feature = "read-tx-timeouts")]
if let Some(read_transactions) = &read_transactions {
read_transactions.remove_active(tx.0);
}
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
}
TxnManagerMessage::Commit { tx, sender } => {
#[cfg(feature = "read-tx-timeouts")]
if let Some(read_transactions) = &read_transactions {
read_transactions.remove_active(tx.0);
}
sender
.send({
let mut latency = CommitLatency::new();
@ -125,7 +100,10 @@ impl TxnManager {
#[cfg(feature = "read-tx-timeouts")]
mod read_transactions {
use crate::{environment::EnvPtr, error::mdbx_result, txn_manager::TxnManager, Error};
use crate::{
environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr,
txn_manager::TxnManager,
};
use dashmap::{DashMap, DashSet};
use std::{
sync::{mpsc::sync_channel, Arc},
@ -155,9 +133,13 @@ mod read_transactions {
}
/// Adds a new transaction to the list of active read transactions.
pub(crate) fn add_active_read_transaction(&self, ptr: *mut ffi::MDBX_txn) {
pub(crate) fn add_active_read_transaction(
&self,
ptr: *mut ffi::MDBX_txn,
tx: TransactionPtr,
) {
if let Some(read_transactions) = &self.read_transactions {
read_transactions.add_active(ptr);
read_transactions.add_active(ptr, tx);
}
}
@ -165,16 +147,15 @@ mod read_transactions {
pub(crate) fn remove_active_read_transaction(
&self,
ptr: *mut ffi::MDBX_txn,
) -> Option<(usize, Instant)> {
) -> Option<(usize, (TransactionPtr, Instant))> {
self.read_transactions.as_ref()?.remove_active(ptr)
}
/// Removes a transaction from the list of aborted read transactions.
pub(crate) fn remove_aborted_read_transaction(
&self,
ptr: *mut ffi::MDBX_txn,
) -> Option<usize> {
self.read_transactions.as_ref()?.remove_aborted(ptr)
/// Returns the number of timed out transactions that were not aborted by the user yet.
pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option<usize> {
self.read_transactions
.as_ref()
.map(|read_transactions| read_transactions.timed_out_not_aborted())
}
}
@ -187,13 +168,10 @@ mod read_transactions {
///
/// We store `usize` instead of a raw pointer as a key, because pointers are not
/// comparable. The time of transaction opening is stored as a value.
active: DashMap<usize, Instant>,
/// List of read transactions aborted by the [ReadTransactions::start_monitor].
/// We keep them until user tries to abort the transaction, so we're able to report a nice
/// [Error::ReadTransactionAborted] error.
///
/// We store `usize` instead of a raw pointer, because pointers are not comparable.
aborted: DashSet<usize>,
active: DashMap<usize, (TransactionPtr, Instant)>,
/// List of timed out transactions that were not aborted by the user yet, hence have a
/// dangling read transaction pointer.
timed_out_not_aborted: DashSet<usize>,
}
impl ReadTransactions {
@ -202,59 +180,70 @@ mod read_transactions {
}
/// Adds a new transaction to the list of active read transactions.
pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn) {
let _ = self.active.insert(ptr as usize, Instant::now());
pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) {
let _ = self.active.insert(ptr as usize, (tx, Instant::now()));
}
/// Removes a transaction from the list of active read transactions.
pub(super) fn remove_active(&self, ptr: *mut ffi::MDBX_txn) -> Option<(usize, Instant)> {
pub(super) fn remove_active(
&self,
ptr: *mut ffi::MDBX_txn,
) -> Option<(usize, (TransactionPtr, Instant))> {
self.timed_out_not_aborted.remove(&(ptr as usize));
self.active.remove(&(ptr as usize))
}
/// Adds a new transaction to the list of aborted read transactions.
pub(super) fn add_aborted(&self, ptr: *mut ffi::MDBX_txn) {
self.aborted.insert(ptr as usize);
}
/// Removes a transaction from the list of aborted read transactions.
pub(super) fn remove_aborted(&self, ptr: *mut ffi::MDBX_txn) -> Option<usize> {
self.aborted.remove(&(ptr as usize))
/// Returns the number of timed out transactions that were not aborted by the user yet.
pub(super) fn timed_out_not_aborted(&self) -> usize {
self.timed_out_not_aborted.len()
}
/// Spawns a new thread with [std::thread::spawn] that monitors the list of active read
/// transactions and aborts those that are open for longer than
/// transactions and timeouts those that are open for longer than
/// `ReadTransactions.max_duration`.
///
/// Aborted transaction pointers are placed into the list of aborted read transactions, and
/// removed from this list by [crate::error::mdbx_result_with_tx_kind] when the user tries
/// to use it.
pub(super) fn start_monitor(self: Arc<Self>) {
std::thread::spawn(move || {
let mut aborted_active = Vec::new();
let mut timed_out_active = Vec::new();
loop {
let now = Instant::now();
let mut max_active_transaction_duration = None;
// Iterate through active read transactions and abort those that's open for
// Iterate through active read transactions and time out those that's open for
// longer than `self.max_duration`.
for entry in self.active.iter() {
let (ptr, start) = entry.pair();
let (tx, start) = entry.value();
let duration = now - *start;
if duration > self.max_duration {
let ptr = *ptr as *mut ffi::MDBX_txn;
let result = tx.txn_execute_fail_on_timeout(|txn_ptr| {
(
txn_ptr,
duration,
// Time out the transaction.
//
// We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to
// prevent MDBX from reusing the pointer of the aborted
// transaction for new read-only transactions. This is
// important because we store the pointer in the `active` list
// and assume that it is unique.
//
// See https://erthink.github.io/libmdbx/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info.
mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) }),
)
});
// Add the transaction to the list of aborted transactions, so further
// usages report the correct error when the transaction is closed.
self.add_aborted(ptr);
// Abort the transaction
let result = mdbx_result(unsafe { ffi::mdbx_txn_abort(ptr) });
// Add the transaction to `aborted_active`. We can't remove it instantly
// from the list of active transactions, because we iterate through it.
aborted_active.push((ptr, duration, result.err()));
match result {
Ok((txn_ptr, duration, error)) => {
// Add the transaction to `timed_out_active`. We can't remove it
// instantly from the list of active transactions, because we
// iterate through it.
timed_out_active.push((txn_ptr, duration, error));
}
Err(err) => {
error!(target: "libmdbx", %err, "Failed to abort the long-lived read transaction")
}
}
} else {
max_active_transaction_duration = Some(
duration.max(max_active_transaction_duration.unwrap_or_default()),
@ -262,40 +251,38 @@ mod read_transactions {
}
}
// Walk through aborted transactions, and delete them from the list of active
// Walk through timed out transactions, and delete them from the list of active
// transactions.
for (ptr, open_duration, err) in aborted_active.iter().copied() {
for (ptr, open_duration, err) in timed_out_active.iter().copied() {
// Try deleting the transaction from the list of active transactions.
let was_in_active = self.remove_active(ptr).is_some();
if let Some(err) = err {
// If there was an error when aborting the transaction, we need to
// remove it from the list of aborted transactions, because otherwise it
// will stay there forever.
self.remove_aborted(ptr);
if was_in_active && err != Error::BadSignature {
// If the transaction was in the list of active transactions and the
// error code is not `EBADSIGN`, then user didn't abort it.
error!(target: "libmdbx", %err, ?open_duration, "Failed to abort the long-lived read transactions");
if let Err(err) = err {
if was_in_active {
// If the transaction was in the list of active transactions,
// then user didn't abort it and we failed to do so.
error!(target: "libmdbx", %err, ?open_duration, "Failed to time out the long-lived read transaction");
}
} else {
// Happy path, the transaction has been aborted by us with no errors.
warn!(target: "libmdbx", ?open_duration, "Long-lived read transactions has been aborted");
// Happy path, the transaction has been timed out by us with no errors.
warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been timed out");
// Add transaction to the list of timed out transactions that were not
// aborted by the user yet.
self.timed_out_not_aborted.insert(ptr as usize);
}
}
// Clear the list of aborted transactions, but not de-allocate the reserved
// Clear the list of timed out transactions, but not de-allocate the reserved
// capacity to save on further pushes.
aborted_active.clear();
timed_out_active.clear();
if !self.active.is_empty() || !self.aborted.is_empty() {
if !self.active.is_empty() {
trace!(
target: "libmdbx",
elapsed = ?now.elapsed(),
active = ?self.active.iter().map(|entry| {
let (ptr, start) = entry.pair();
(*ptr, start.elapsed())
let (tx, start) = entry.value();
(tx.clone(), start.elapsed())
}).collect::<Vec<_>>(),
aborted = ?self.aborted.iter().map(|entry| *entry).collect::<Vec<_>>(),
"Read transactions"
);
}
@ -315,12 +302,10 @@ mod read_transactions {
#[cfg(test)]
mod tests {
use crate::{
txn_manager::{
read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, TxnManagerMessage, TxnPtr,
},
Environment, Error, MaxReadTransactionDuration, TransactionKind, RO,
txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
MaxReadTransactionDuration,
};
use std::{ptr, sync::mpsc::sync_channel, thread::sleep, time::Duration};
use std::{thread::sleep, time::Duration};
use tempfile::tempdir;
#[test]
@ -345,7 +330,6 @@ mod read_transactions {
drop(tx);
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(!read_transactions.aborted.contains(&tx_ptr));
}
// Create a read-only transaction, successfully use it, close it by committing.
@ -358,24 +342,43 @@ mod read_transactions {
tx.commit().unwrap();
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(!read_transactions.aborted.contains(&tx_ptr));
}
// Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the
// manager kills it, use it and observe the `Error::ReadTransactionAborted` error.
{
// Create a read-only transaction and observe it's in the list of active
// transactions.
let tx = env.begin_ro_txn().unwrap();
let tx_ptr = tx.txn() as usize;
assert!(read_transactions.active.contains_key(&tx_ptr));
// Wait until the transaction is timed out by the manager.
sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
// Ensure that the transaction is not in the list of active transactions anymore,
// and is in the list of timed out but not aborted transactions.
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(read_transactions.aborted.contains(&tx_ptr));
assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionAborted));
// Use the timed out transaction and observe the `Error::ReadTransactionTimeout`
assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout));
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(!read_transactions.aborted.contains(&tx_ptr));
assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout));
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
// Ensure that the transaction pointer is not reused when opening a new read-only
// transaction.
let new_tx = env.begin_ro_txn().unwrap();
let new_tx_ptr = new_tx.txn() as usize;
assert!(read_transactions.active.contains_key(&new_tx_ptr));
assert_ne!(tx_ptr, new_tx_ptr);
// Drop the transaction and ensure that it's not in the list of timed out but not
// aborted transactions anymore.
drop(tx);
assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr));
}
}
@ -393,64 +396,5 @@ mod read_transactions {
sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
assert!(tx.commit().is_ok())
}
#[test]
fn txn_manager_begin_read_transaction_via_message_listener() {
const MAX_DURATION: Duration = Duration::from_secs(1);
let dir = tempdir().unwrap();
let env = Environment::builder()
.set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
.open(dir.path())
.unwrap();
let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
// Create a read-only transaction via the message listener.
let (tx, rx) = sync_channel(0);
env.txn_manager().send_message(TxnManagerMessage::Begin {
parent: TxnPtr(ptr::null_mut()),
flags: RO::OPEN_FLAGS,
sender: tx,
});
let txn_ptr = rx.recv().unwrap().unwrap();
assert!(read_transactions.active.contains_key(&(txn_ptr.0 as usize)));
}
#[test]
fn txn_manager_reassign_transaction_removes_from_aborted_transactions() {
const MAX_DURATION: Duration = Duration::from_secs(1);
let dir = tempdir().unwrap();
let env = Environment::builder()
.set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
.open(dir.path())
.unwrap();
let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
// Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the
// manager kills it, use it and observe the `Error::ReadTransactionAborted` error.
{
let tx = env.begin_ro_txn().unwrap();
let tx_ptr = tx.txn() as usize;
assert!(read_transactions.active.contains_key(&tx_ptr));
sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(read_transactions.aborted.contains(&tx_ptr));
}
// Create a read-only transaction, ensure this removes it from aborted set if mdbx
// reassigns same recently aborted transaction pointer.
{
let tx = env.begin_ro_txn().unwrap();
let tx_ptr = tx.txn() as usize;
assert!(!read_transactions.aborted.contains(&tx_ptr));
}
}
}
}