mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(storage): use mdbx_txn_reset to time out transactions (#6924)
Co-authored-by: Emilia Hane <elsaemiliaevahane@gmail.com>
This commit is contained in:
@ -78,8 +78,7 @@ fn bench_get_seq_raw(c: &mut Criterion) {
|
||||
let (_dir, env) = setup_bench_db(n);
|
||||
|
||||
let dbi = env.begin_ro_txn().unwrap().open_db(None).unwrap().dbi();
|
||||
let _txn = env.begin_ro_txn().unwrap();
|
||||
let txn = _txn.txn();
|
||||
let txn = env.begin_ro_txn().unwrap();
|
||||
|
||||
let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
||||
let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
||||
@ -87,18 +86,21 @@ fn bench_get_seq_raw(c: &mut Criterion) {
|
||||
|
||||
c.bench_function("bench_get_seq_raw", |b| {
|
||||
b.iter(|| unsafe {
|
||||
mdbx_cursor_open(txn, dbi, &mut cursor);
|
||||
let mut i = 0;
|
||||
let mut count = 0u32;
|
||||
txn.txn_execute(|txn| {
|
||||
mdbx_cursor_open(txn, dbi, &mut cursor);
|
||||
let mut i = 0;
|
||||
let mut count = 0u32;
|
||||
|
||||
while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 {
|
||||
i += key.iov_len + data.iov_len;
|
||||
count += 1;
|
||||
}
|
||||
while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 {
|
||||
i += key.iov_len + data.iov_len;
|
||||
count += 1;
|
||||
}
|
||||
|
||||
black_box(i);
|
||||
assert_eq!(count, n);
|
||||
mdbx_cursor_close(cursor);
|
||||
black_box(i);
|
||||
assert_eq!(count, n);
|
||||
mdbx_cursor_close(cursor);
|
||||
})
|
||||
.unwrap();
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
@ -46,7 +46,7 @@ fn bench_get_rand_raw(c: &mut Criterion) {
|
||||
|
||||
c.bench_function("bench_get_rand_raw", |b| {
|
||||
b.iter(|| unsafe {
|
||||
txn.with_raw_tx_ptr(|txn| {
|
||||
txn.txn_execute(|txn| {
|
||||
let mut i: size_t = 0;
|
||||
for key in &keys {
|
||||
key_val.iov_len = key.len() as size_t;
|
||||
@ -57,7 +57,8 @@ fn bench_get_rand_raw(c: &mut Criterion) {
|
||||
i += key_val.iov_len;
|
||||
}
|
||||
black_box(i);
|
||||
});
|
||||
})
|
||||
.unwrap();
|
||||
})
|
||||
});
|
||||
}
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
use crate::{
|
||||
error::{mdbx_result, mdbx_result_with_tx_kind, Error, Result},
|
||||
error::{mdbx_result, Error, Result},
|
||||
flags::*,
|
||||
mdbx_try_optional,
|
||||
transaction::{TransactionKind, RW},
|
||||
@ -30,11 +30,9 @@ where
|
||||
pub(crate) fn new(txn: Transaction<K>, dbi: ffi::MDBX_dbi) -> Result<Self> {
|
||||
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
|
||||
unsafe {
|
||||
mdbx_result_with_tx_kind::<K>(
|
||||
txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)),
|
||||
txn.txn(),
|
||||
txn.env().txn_manager(),
|
||||
)?;
|
||||
txn.txn_execute(|txn_ptr| {
|
||||
mdbx_result(ffi::mdbx_cursor_open(txn_ptr, dbi, &mut cursor))
|
||||
})??;
|
||||
}
|
||||
Ok(Self { txn, cursor })
|
||||
}
|
||||
@ -47,7 +45,7 @@ where
|
||||
|
||||
let s = Self { txn: other.txn.clone(), cursor };
|
||||
|
||||
mdbx_result_with_tx_kind::<K>(res, s.txn.txn(), s.txn.env().txn_manager())?;
|
||||
mdbx_result(res)?;
|
||||
|
||||
Ok(s)
|
||||
}
|
||||
@ -95,11 +93,12 @@ where
|
||||
let key_ptr = key_val.iov_base;
|
||||
let data_ptr = data_val.iov_base;
|
||||
self.txn.txn_execute(|txn| {
|
||||
let v = mdbx_result_with_tx_kind::<K>(
|
||||
ffi::mdbx_cursor_get(self.cursor, &mut key_val, &mut data_val, op),
|
||||
txn,
|
||||
self.txn.env().txn_manager(),
|
||||
)?;
|
||||
let v = mdbx_result(ffi::mdbx_cursor_get(
|
||||
self.cursor,
|
||||
&mut key_val,
|
||||
&mut data_val,
|
||||
op,
|
||||
))?;
|
||||
assert_ne!(data_ptr, data_val.iov_base);
|
||||
let key_out = {
|
||||
// MDBX wrote in new key
|
||||
@ -111,7 +110,7 @@ where
|
||||
};
|
||||
let data_out = Value::decode_val::<K>(txn, data_val)?;
|
||||
Ok((key_out, data_out, v))
|
||||
})
|
||||
})?
|
||||
}
|
||||
}
|
||||
|
||||
@ -444,7 +443,7 @@ impl Cursor<RW> {
|
||||
mdbx_result(unsafe {
|
||||
self.txn.txn_execute(|_| {
|
||||
ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits())
|
||||
})
|
||||
})?
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
@ -458,7 +457,7 @@ impl Cursor<RW> {
|
||||
/// current key, if the database was opened with [DatabaseFlags::DUP_SORT].
|
||||
pub fn del(&mut self, flags: WriteFlags) -> Result<()> {
|
||||
mdbx_result(unsafe {
|
||||
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))
|
||||
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))?
|
||||
})?;
|
||||
|
||||
Ok(())
|
||||
@ -470,7 +469,7 @@ where
|
||||
K: TransactionKind,
|
||||
{
|
||||
fn clone(&self) -> Self {
|
||||
self.txn.txn_execute(|_| Self::new_at_position(self).unwrap())
|
||||
self.txn.txn_execute(|_| Self::new_at_position(self).unwrap()).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@ -488,7 +487,7 @@ where
|
||||
K: TransactionKind,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) })
|
||||
self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) }).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@ -564,7 +563,7 @@ where
|
||||
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
||||
let op = mem::replace(op, *next_op);
|
||||
unsafe {
|
||||
cursor.txn.txn_execute(|txn| {
|
||||
let result = cursor.txn.txn_execute(|txn| {
|
||||
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
|
||||
ffi::MDBX_SUCCESS => {
|
||||
let key = match Key::decode_val::<K>(txn, key) {
|
||||
@ -583,7 +582,11 @@ where
|
||||
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
|
||||
error => Some(Err(Error::from_err_code(error))),
|
||||
}
|
||||
})
|
||||
});
|
||||
match result {
|
||||
Ok(result) => result,
|
||||
Err(err) => Some(Err(err)),
|
||||
}
|
||||
}
|
||||
}
|
||||
Self::Err(err) => err.take().map(Err),
|
||||
@ -655,7 +658,7 @@ where
|
||||
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
||||
let op = mem::replace(op, *next_op);
|
||||
unsafe {
|
||||
cursor.txn.txn_execute(|txn| {
|
||||
let result = cursor.txn.txn_execute(|txn| {
|
||||
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
|
||||
ffi::MDBX_SUCCESS => {
|
||||
let key = match Key::decode_val::<K>(txn, key) {
|
||||
@ -674,7 +677,11 @@ where
|
||||
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
|
||||
error => Some(Err(Error::from_err_code(error))),
|
||||
}
|
||||
})
|
||||
});
|
||||
match result {
|
||||
Ok(result) => result,
|
||||
Err(err) => Some(Err(err)),
|
||||
}
|
||||
}
|
||||
}
|
||||
Iter::Err(err) => err.take().map(Err),
|
||||
@ -752,7 +759,7 @@ where
|
||||
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
||||
let op = mem::replace(op, ffi::MDBX_NEXT_NODUP);
|
||||
|
||||
cursor.txn.txn_execute(|_| {
|
||||
let result = cursor.txn.txn_execute(|_| {
|
||||
let err_code =
|
||||
unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) };
|
||||
|
||||
@ -763,7 +770,12 @@ where
|
||||
ffi::MDBX_NEXT_DUP,
|
||||
)
|
||||
})
|
||||
})
|
||||
});
|
||||
|
||||
match result {
|
||||
Ok(result) => result,
|
||||
Err(err) => Some(IntoIter::Err(Some(err))),
|
||||
}
|
||||
}
|
||||
IterDup::Err(err) => err.take().map(|e| IntoIter::Err(Some(e))),
|
||||
}
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
use crate::{
|
||||
error::{mdbx_result_with_tx_kind, Result},
|
||||
error::{mdbx_result, Result},
|
||||
transaction::TransactionKind,
|
||||
Environment, Transaction,
|
||||
};
|
||||
@ -31,12 +31,8 @@ impl Database {
|
||||
let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() };
|
||||
let mut dbi: ffi::MDBX_dbi = 0;
|
||||
txn.txn_execute(|txn_ptr| {
|
||||
mdbx_result_with_tx_kind::<K>(
|
||||
unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) },
|
||||
txn_ptr,
|
||||
txn.env().txn_manager(),
|
||||
)
|
||||
})?;
|
||||
mdbx_result(unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) })
|
||||
})??;
|
||||
Ok(Self::new_from_ptr(dbi, txn.env().clone()))
|
||||
}
|
||||
|
||||
|
||||
@ -88,6 +88,12 @@ impl Environment {
|
||||
&self.inner.txn_manager
|
||||
}
|
||||
|
||||
/// Returns the number of timed out transactions that were not aborted by the user yet.
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
pub fn timed_out_not_aborted_transactions(&self) -> usize {
|
||||
self.inner.txn_manager.timed_out_not_aborted_read_transactions().unwrap_or(0)
|
||||
}
|
||||
|
||||
/// Create a read-only transaction for use with the environment.
|
||||
#[inline]
|
||||
pub fn begin_ro_txn(&self) -> Result<Transaction<RO>> {
|
||||
|
||||
@ -1,4 +1,3 @@
|
||||
use crate::{txn_manager::TxnManager, TransactionKind};
|
||||
use libc::c_int;
|
||||
use std::result;
|
||||
|
||||
@ -118,8 +117,9 @@ pub enum Error {
|
||||
/// [Mode::ReadOnly](crate::flags::Mode::ReadOnly), write transactions can't be opened.
|
||||
#[error("write transactions are not supported in read-only mode")]
|
||||
WriteTransactionUnsupportedInReadOnlyMode,
|
||||
#[error("read transaction has been aborted by the transaction manager")]
|
||||
ReadTransactionAborted,
|
||||
/// Read transaction has been timed out.
|
||||
#[error("read transaction has been timed out")]
|
||||
ReadTransactionTimeout,
|
||||
/// Unknown error code.
|
||||
#[error("unknown error code")]
|
||||
Other(i32),
|
||||
@ -193,9 +193,10 @@ impl Error {
|
||||
Error::DecodeErrorLenDiff | Error::DecodeError => ffi::MDBX_EINVAL,
|
||||
Error::Access => ffi::MDBX_EACCESS,
|
||||
Error::TooLarge => ffi::MDBX_TOO_LARGE,
|
||||
Error::BadSignature | Error::ReadTransactionAborted => ffi::MDBX_EBADSIGN,
|
||||
Error::BadSignature => ffi::MDBX_EBADSIGN,
|
||||
Error::WriteTransactionUnsupportedInReadOnlyMode => ffi::MDBX_EACCESS,
|
||||
Error::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS,
|
||||
Error::ReadTransactionTimeout => -96000, // Custom non-MDBX error code
|
||||
Error::Other(err_code) => *err_code,
|
||||
}
|
||||
}
|
||||
@ -216,33 +217,6 @@ pub(crate) fn mdbx_result(err_code: c_int) -> Result<bool> {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
#[inline]
|
||||
pub(crate) fn mdbx_result_with_tx_kind<K: TransactionKind>(
|
||||
err_code: c_int,
|
||||
txn: *mut ffi::MDBX_txn,
|
||||
txn_manager: &TxnManager,
|
||||
) -> Result<bool> {
|
||||
if K::IS_READ_ONLY &&
|
||||
txn_manager.remove_aborted_read_transaction(txn).is_some() &&
|
||||
err_code == ffi::MDBX_EBADSIGN
|
||||
{
|
||||
return Err(Error::ReadTransactionAborted)
|
||||
}
|
||||
|
||||
mdbx_result(err_code)
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "read-tx-timeouts"))]
|
||||
#[inline]
|
||||
pub(crate) fn mdbx_result_with_tx_kind<K: TransactionKind>(
|
||||
err_code: c_int,
|
||||
_txn: *mut ffi::MDBX_txn,
|
||||
_txn_manager: &TxnManager,
|
||||
) -> Result<bool> {
|
||||
mdbx_result(err_code)
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! mdbx_try_optional {
|
||||
($expr:expr) => {{
|
||||
|
||||
@ -1,18 +1,17 @@
|
||||
use crate::{
|
||||
database::Database,
|
||||
environment::Environment,
|
||||
error::{mdbx_result, mdbx_result_with_tx_kind, Result},
|
||||
error::{mdbx_result, Result},
|
||||
flags::{DatabaseFlags, WriteFlags},
|
||||
txn_manager::{TxnManagerMessage, TxnPtr},
|
||||
Cursor, Error, Stat, TableObject,
|
||||
};
|
||||
use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
|
||||
use ffi::{mdbx_txn_renew, MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
|
||||
use indexmap::IndexSet;
|
||||
use libc::{c_uint, c_void};
|
||||
use parking_lot::Mutex;
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
use std::{
|
||||
fmt,
|
||||
fmt::Debug,
|
||||
fmt::{self, Debug},
|
||||
mem::size_of,
|
||||
ptr, slice,
|
||||
sync::{atomic::AtomicBool, mpsc::sync_channel, Arc},
|
||||
@ -71,29 +70,27 @@ where
|
||||
pub(crate) fn new(env: Environment) -> Result<Self> {
|
||||
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
|
||||
unsafe {
|
||||
mdbx_result_with_tx_kind::<K>(
|
||||
ffi::mdbx_txn_begin_ex(
|
||||
env.env_ptr(),
|
||||
ptr::null_mut(),
|
||||
K::OPEN_FLAGS,
|
||||
&mut txn,
|
||||
ptr::null_mut(),
|
||||
),
|
||||
txn,
|
||||
env.txn_manager(),
|
||||
)?;
|
||||
mdbx_result(ffi::mdbx_txn_begin_ex(
|
||||
env.env_ptr(),
|
||||
ptr::null_mut(),
|
||||
K::OPEN_FLAGS,
|
||||
&mut txn,
|
||||
ptr::null_mut(),
|
||||
))?;
|
||||
Ok(Self::new_from_ptr(env, txn))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn new_from_ptr(env: Environment, txn: *mut ffi::MDBX_txn) -> Self {
|
||||
pub(crate) fn new_from_ptr(env: Environment, txn_ptr: *mut ffi::MDBX_txn) -> Self {
|
||||
let txn = TransactionPtr::new(txn_ptr);
|
||||
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
if K::IS_READ_ONLY {
|
||||
env.txn_manager().add_active_read_transaction(txn)
|
||||
env.txn_manager().add_active_read_transaction(txn_ptr, txn.clone())
|
||||
}
|
||||
|
||||
let inner = TransactionInner {
|
||||
txn: TransactionPtr::new(txn),
|
||||
txn,
|
||||
primed_dbis: Mutex::new(IndexSet::new()),
|
||||
committed: AtomicBool::new(false),
|
||||
env,
|
||||
@ -108,7 +105,7 @@ where
|
||||
/// The caller **must** ensure that the pointer is not used after the
|
||||
/// lifetime of the transaction.
|
||||
#[inline]
|
||||
pub(crate) fn txn_execute<F, T>(&self, f: F) -> T
|
||||
pub fn txn_execute<F, T>(&self, f: F) -> Result<T>
|
||||
where
|
||||
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
||||
{
|
||||
@ -117,32 +114,18 @@ where
|
||||
|
||||
/// Returns a copy of the raw pointer to the underlying MDBX transaction.
|
||||
#[doc(hidden)]
|
||||
#[cfg(test)]
|
||||
pub fn txn(&self) -> *mut ffi::MDBX_txn {
|
||||
self.inner.txn.txn
|
||||
}
|
||||
|
||||
/// Executes the given closure once
|
||||
///
|
||||
/// This is only intended to be used when accessing mdbx ffi functions directly is required.
|
||||
///
|
||||
/// The caller **must** ensure that the pointer is only used within the closure.
|
||||
#[inline]
|
||||
#[doc(hidden)]
|
||||
pub fn with_raw_tx_ptr<F, T>(&self, f: F) -> T
|
||||
where
|
||||
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
||||
{
|
||||
let _lock = self.inner.txn.lock.lock();
|
||||
f(self.inner.txn.txn)
|
||||
}
|
||||
|
||||
/// Returns a raw pointer to the MDBX environment.
|
||||
pub fn env(&self) -> &Environment {
|
||||
&self.inner.env
|
||||
}
|
||||
|
||||
/// Returns the transaction id.
|
||||
pub fn id(&self) -> u64 {
|
||||
pub fn id(&self) -> Result<u64> {
|
||||
self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) })
|
||||
}
|
||||
|
||||
@ -168,7 +151,7 @@ where
|
||||
ffi::MDBX_NOTFOUND => Ok(None),
|
||||
err_code => Err(Error::from_err_code(err_code)),
|
||||
}
|
||||
})
|
||||
})?
|
||||
}
|
||||
|
||||
/// Commits the transaction.
|
||||
@ -191,11 +174,9 @@ where
|
||||
self.env().txn_manager().remove_active_read_transaction(txn);
|
||||
|
||||
let mut latency = CommitLatency::new();
|
||||
mdbx_result_with_tx_kind::<K>(
|
||||
unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) },
|
||||
txn,
|
||||
self.env().txn_manager(),
|
||||
)
|
||||
mdbx_result(unsafe {
|
||||
ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency())
|
||||
})
|
||||
.map(|v| (v, latency))
|
||||
} else {
|
||||
let (sender, rx) = sync_channel(0);
|
||||
@ -204,7 +185,7 @@ where
|
||||
.send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender });
|
||||
rx.recv().unwrap()
|
||||
}
|
||||
});
|
||||
})?;
|
||||
|
||||
self.inner.set_committed();
|
||||
result
|
||||
@ -243,12 +224,8 @@ where
|
||||
let mut flags: c_uint = 0;
|
||||
unsafe {
|
||||
self.txn_execute(|txn| {
|
||||
mdbx_result_with_tx_kind::<K>(
|
||||
ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()),
|
||||
txn,
|
||||
self.env().txn_manager(),
|
||||
)
|
||||
})?;
|
||||
mdbx_result(ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()))
|
||||
})??;
|
||||
}
|
||||
|
||||
// The types are not the same on Windows. Great!
|
||||
@ -266,12 +243,8 @@ where
|
||||
unsafe {
|
||||
let mut stat = Stat::new();
|
||||
self.txn_execute(|txn| {
|
||||
mdbx_result_with_tx_kind::<K>(
|
||||
ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>()),
|
||||
txn,
|
||||
self.env().txn_manager(),
|
||||
)
|
||||
})?;
|
||||
mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>()))
|
||||
})??;
|
||||
Ok(stat)
|
||||
}
|
||||
}
|
||||
@ -290,7 +263,7 @@ where
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
pub fn disable_timeout(&self) {
|
||||
if K::IS_READ_ONLY {
|
||||
self.env().txn_manager().remove_active_read_transaction(self.txn());
|
||||
self.env().txn_manager().remove_active_read_transaction(self.inner.txn.txn);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -342,11 +315,11 @@ where
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn txn_execute<F, T>(&self, f: F) -> T
|
||||
fn txn_execute<F, T>(&self, f: F) -> Result<T>
|
||||
where
|
||||
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
||||
{
|
||||
self.txn.txn_execute(f)
|
||||
self.txn.txn_execute_fail_on_timeout(f)
|
||||
}
|
||||
}
|
||||
|
||||
@ -355,24 +328,28 @@ where
|
||||
K: TransactionKind,
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
self.txn_execute(|txn| {
|
||||
if !self.has_committed() {
|
||||
if K::IS_READ_ONLY {
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
self.env.txn_manager().remove_active_read_transaction(txn);
|
||||
// To be able to abort a timed out transaction, we need to renew it first.
|
||||
// Hence the usage of `txn_execute_renew_on_timeout` here.
|
||||
self.txn
|
||||
.txn_execute_renew_on_timeout(|txn| {
|
||||
if !self.has_committed() {
|
||||
if K::IS_READ_ONLY {
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
self.env.txn_manager().remove_active_read_transaction(txn);
|
||||
|
||||
unsafe {
|
||||
ffi::mdbx_txn_abort(txn);
|
||||
unsafe {
|
||||
ffi::mdbx_txn_abort(txn);
|
||||
}
|
||||
} else {
|
||||
let (sender, rx) = sync_channel(0);
|
||||
self.env
|
||||
.txn_manager()
|
||||
.send_message(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender });
|
||||
rx.recv().unwrap().unwrap();
|
||||
}
|
||||
} else {
|
||||
let (sender, rx) = sync_channel(0);
|
||||
self.env
|
||||
.txn_manager()
|
||||
.send_message(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender });
|
||||
rx.recv().unwrap().unwrap();
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@ -418,7 +395,7 @@ impl Transaction<RW> {
|
||||
ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void };
|
||||
mdbx_result(self.txn_execute(|txn| unsafe {
|
||||
ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits())
|
||||
}))?;
|
||||
})?)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -447,7 +424,7 @@ impl Transaction<RW> {
|
||||
&mut data_val,
|
||||
flags.bits() | ffi::MDBX_RESERVE,
|
||||
)
|
||||
}))?;
|
||||
})?)?;
|
||||
Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len))
|
||||
}
|
||||
}
|
||||
@ -482,7 +459,7 @@ impl Transaction<RW> {
|
||||
} else {
|
||||
unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) }
|
||||
}
|
||||
})
|
||||
})?
|
||||
})
|
||||
.map(|_| true)
|
||||
.or_else(|e| match e {
|
||||
@ -493,7 +470,7 @@ impl Transaction<RW> {
|
||||
|
||||
/// Empties the given database. All items will be removed.
|
||||
pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
|
||||
mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) }))?;
|
||||
mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) })?)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -504,7 +481,7 @@ impl Transaction<RW> {
|
||||
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
|
||||
/// BEFORE calling this function.
|
||||
pub unsafe fn drop_db(&self, db: Database) -> Result<()> {
|
||||
mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true)))?;
|
||||
mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true))?)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -517,11 +494,7 @@ impl Transaction<RO> {
|
||||
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
|
||||
/// BEFORE calling this function.
|
||||
pub unsafe fn close_db(&self, db: Database) -> Result<()> {
|
||||
mdbx_result_with_tx_kind::<RO>(
|
||||
ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()),
|
||||
self.txn(),
|
||||
self.env().txn_manager(),
|
||||
)?;
|
||||
mdbx_result(ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -542,30 +515,90 @@ impl Transaction<RW> {
|
||||
});
|
||||
|
||||
rx.recv().unwrap().map(|ptr| Transaction::new_from_ptr(self.env().clone(), ptr.0))
|
||||
})
|
||||
})?
|
||||
}
|
||||
}
|
||||
|
||||
/// A shareable pointer to an MDBX transaction.
|
||||
#[derive(Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct TransactionPtr {
|
||||
txn: *mut ffi::MDBX_txn,
|
||||
timed_out: Arc<AtomicBool>,
|
||||
lock: Arc<Mutex<()>>,
|
||||
}
|
||||
|
||||
impl TransactionPtr {
|
||||
fn new(txn: *mut ffi::MDBX_txn) -> Self {
|
||||
Self { txn, lock: Arc::new(Mutex::new(())) }
|
||||
Self { txn, timed_out: Arc::new(AtomicBool::new(false)), lock: Arc::new(Mutex::new(())) }
|
||||
}
|
||||
|
||||
// Returns `true` if the transaction is timed out.
|
||||
//
|
||||
// When transaction is timed out via `TxnManager`, it's actually reset using
|
||||
// `mdbx_txn_reset`. It makes the transaction unusable (MDBX fails on any usages of such
|
||||
// transactions).
|
||||
//
|
||||
// Importantly, we can't rely on `MDBX_TXN_FINISHED` flag to check if the transaction is timed
|
||||
// out using `mdbx_txn_reset`, because MDBX uses it in other cases too.
|
||||
fn is_timed_out(&self) -> bool {
|
||||
self.timed_out.load(std::sync::atomic::Ordering::SeqCst)
|
||||
}
|
||||
|
||||
pub(crate) fn set_timed_out(&self) {
|
||||
self.timed_out.store(true, std::sync::atomic::Ordering::SeqCst);
|
||||
}
|
||||
|
||||
fn lock(&self) -> MutexGuard<'_, ()> {
|
||||
if let Some(lock) = self.lock.try_lock() {
|
||||
lock
|
||||
} else {
|
||||
tracing::debug!(
|
||||
target: "libmdbx",
|
||||
txn = %self.txn as usize,
|
||||
backtrace = %std::backtrace::Backtrace::force_capture(),
|
||||
"Transaction lock is already acquired, blocking..."
|
||||
);
|
||||
self.lock.lock()
|
||||
}
|
||||
}
|
||||
|
||||
/// Executes the given closure once the lock on the transaction is acquired.
|
||||
///
|
||||
/// Returns the result of the closure or an error if the transaction is timed out.
|
||||
#[inline]
|
||||
pub(crate) fn txn_execute<F, T>(&self, f: F) -> T
|
||||
pub(crate) fn txn_execute_fail_on_timeout<F, T>(&self, f: F) -> Result<T>
|
||||
where
|
||||
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
||||
{
|
||||
let _lck = self.lock.lock();
|
||||
(f)(self.txn)
|
||||
let _lck = self.lock();
|
||||
|
||||
// No race condition with the `TxnManager` timing out the transaction is possible here,
|
||||
// because we're taking a lock for any actions on the transaction pointer, including a call
|
||||
// to the `mdbx_txn_reset`.
|
||||
if self.is_timed_out() {
|
||||
return Err(Error::ReadTransactionTimeout)
|
||||
}
|
||||
|
||||
Ok((f)(self.txn))
|
||||
}
|
||||
|
||||
/// Executes the given closure once the lock on the transaction is acquired. If the tranasction
|
||||
/// is timed out, it will be renewed first.
|
||||
///
|
||||
/// Returns the result of the closure or an error if the transaction renewal fails.
|
||||
#[inline]
|
||||
fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
|
||||
where
|
||||
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
||||
{
|
||||
let _lck = self.lock();
|
||||
|
||||
// To be able to do any operations on the transaction, we need to renew it first.
|
||||
if self.is_timed_out() {
|
||||
mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?;
|
||||
}
|
||||
|
||||
Ok((f)(self.txn))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -52,9 +52,6 @@ impl TxnManager {
|
||||
/// - [TxnManagerMessage::Abort] aborts a transaction with [ffi::mdbx_txn_abort]
|
||||
/// - [TxnManagerMessage::Commit] commits a transaction with [ffi::mdbx_txn_commit_ex]
|
||||
fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
let read_transactions = self.read_transactions.clone();
|
||||
|
||||
std::thread::spawn(move || {
|
||||
#[allow(clippy::redundant_locals)]
|
||||
let env = env;
|
||||
@ -73,34 +70,12 @@ impl TxnManager {
|
||||
)
|
||||
})
|
||||
.map(|_| TxnPtr(txn));
|
||||
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
{
|
||||
use crate::transaction::TransactionKind;
|
||||
|
||||
if res.is_ok() && flags == crate::transaction::RO::OPEN_FLAGS {
|
||||
if let Some(read_transactions) = &read_transactions {
|
||||
read_transactions.add_active(txn);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sender.send(res).unwrap();
|
||||
}
|
||||
TxnManagerMessage::Abort { tx, sender } => {
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
if let Some(read_transactions) = &read_transactions {
|
||||
read_transactions.remove_active(tx.0);
|
||||
}
|
||||
|
||||
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
|
||||
}
|
||||
TxnManagerMessage::Commit { tx, sender } => {
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
if let Some(read_transactions) = &read_transactions {
|
||||
read_transactions.remove_active(tx.0);
|
||||
}
|
||||
|
||||
sender
|
||||
.send({
|
||||
let mut latency = CommitLatency::new();
|
||||
@ -125,7 +100,10 @@ impl TxnManager {
|
||||
|
||||
#[cfg(feature = "read-tx-timeouts")]
|
||||
mod read_transactions {
|
||||
use crate::{environment::EnvPtr, error::mdbx_result, txn_manager::TxnManager, Error};
|
||||
use crate::{
|
||||
environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr,
|
||||
txn_manager::TxnManager,
|
||||
};
|
||||
use dashmap::{DashMap, DashSet};
|
||||
use std::{
|
||||
sync::{mpsc::sync_channel, Arc},
|
||||
@ -155,9 +133,13 @@ mod read_transactions {
|
||||
}
|
||||
|
||||
/// Adds a new transaction to the list of active read transactions.
|
||||
pub(crate) fn add_active_read_transaction(&self, ptr: *mut ffi::MDBX_txn) {
|
||||
pub(crate) fn add_active_read_transaction(
|
||||
&self,
|
||||
ptr: *mut ffi::MDBX_txn,
|
||||
tx: TransactionPtr,
|
||||
) {
|
||||
if let Some(read_transactions) = &self.read_transactions {
|
||||
read_transactions.add_active(ptr);
|
||||
read_transactions.add_active(ptr, tx);
|
||||
}
|
||||
}
|
||||
|
||||
@ -165,16 +147,15 @@ mod read_transactions {
|
||||
pub(crate) fn remove_active_read_transaction(
|
||||
&self,
|
||||
ptr: *mut ffi::MDBX_txn,
|
||||
) -> Option<(usize, Instant)> {
|
||||
) -> Option<(usize, (TransactionPtr, Instant))> {
|
||||
self.read_transactions.as_ref()?.remove_active(ptr)
|
||||
}
|
||||
|
||||
/// Removes a transaction from the list of aborted read transactions.
|
||||
pub(crate) fn remove_aborted_read_transaction(
|
||||
&self,
|
||||
ptr: *mut ffi::MDBX_txn,
|
||||
) -> Option<usize> {
|
||||
self.read_transactions.as_ref()?.remove_aborted(ptr)
|
||||
/// Returns the number of timed out transactions that were not aborted by the user yet.
|
||||
pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option<usize> {
|
||||
self.read_transactions
|
||||
.as_ref()
|
||||
.map(|read_transactions| read_transactions.timed_out_not_aborted())
|
||||
}
|
||||
}
|
||||
|
||||
@ -187,13 +168,10 @@ mod read_transactions {
|
||||
///
|
||||
/// We store `usize` instead of a raw pointer as a key, because pointers are not
|
||||
/// comparable. The time of transaction opening is stored as a value.
|
||||
active: DashMap<usize, Instant>,
|
||||
/// List of read transactions aborted by the [ReadTransactions::start_monitor].
|
||||
/// We keep them until user tries to abort the transaction, so we're able to report a nice
|
||||
/// [Error::ReadTransactionAborted] error.
|
||||
///
|
||||
/// We store `usize` instead of a raw pointer, because pointers are not comparable.
|
||||
aborted: DashSet<usize>,
|
||||
active: DashMap<usize, (TransactionPtr, Instant)>,
|
||||
/// List of timed out transactions that were not aborted by the user yet, hence have a
|
||||
/// dangling read transaction pointer.
|
||||
timed_out_not_aborted: DashSet<usize>,
|
||||
}
|
||||
|
||||
impl ReadTransactions {
|
||||
@ -202,59 +180,70 @@ mod read_transactions {
|
||||
}
|
||||
|
||||
/// Adds a new transaction to the list of active read transactions.
|
||||
pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn) {
|
||||
let _ = self.active.insert(ptr as usize, Instant::now());
|
||||
pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) {
|
||||
let _ = self.active.insert(ptr as usize, (tx, Instant::now()));
|
||||
}
|
||||
|
||||
/// Removes a transaction from the list of active read transactions.
|
||||
pub(super) fn remove_active(&self, ptr: *mut ffi::MDBX_txn) -> Option<(usize, Instant)> {
|
||||
pub(super) fn remove_active(
|
||||
&self,
|
||||
ptr: *mut ffi::MDBX_txn,
|
||||
) -> Option<(usize, (TransactionPtr, Instant))> {
|
||||
self.timed_out_not_aborted.remove(&(ptr as usize));
|
||||
self.active.remove(&(ptr as usize))
|
||||
}
|
||||
|
||||
/// Adds a new transaction to the list of aborted read transactions.
|
||||
pub(super) fn add_aborted(&self, ptr: *mut ffi::MDBX_txn) {
|
||||
self.aborted.insert(ptr as usize);
|
||||
}
|
||||
|
||||
/// Removes a transaction from the list of aborted read transactions.
|
||||
pub(super) fn remove_aborted(&self, ptr: *mut ffi::MDBX_txn) -> Option<usize> {
|
||||
self.aborted.remove(&(ptr as usize))
|
||||
/// Returns the number of timed out transactions that were not aborted by the user yet.
|
||||
pub(super) fn timed_out_not_aborted(&self) -> usize {
|
||||
self.timed_out_not_aborted.len()
|
||||
}
|
||||
|
||||
/// Spawns a new thread with [std::thread::spawn] that monitors the list of active read
|
||||
/// transactions and aborts those that are open for longer than
|
||||
/// transactions and timeouts those that are open for longer than
|
||||
/// `ReadTransactions.max_duration`.
|
||||
///
|
||||
/// Aborted transaction pointers are placed into the list of aborted read transactions, and
|
||||
/// removed from this list by [crate::error::mdbx_result_with_tx_kind] when the user tries
|
||||
/// to use it.
|
||||
pub(super) fn start_monitor(self: Arc<Self>) {
|
||||
std::thread::spawn(move || {
|
||||
let mut aborted_active = Vec::new();
|
||||
let mut timed_out_active = Vec::new();
|
||||
|
||||
loop {
|
||||
let now = Instant::now();
|
||||
let mut max_active_transaction_duration = None;
|
||||
|
||||
// Iterate through active read transactions and abort those that's open for
|
||||
// Iterate through active read transactions and time out those that's open for
|
||||
// longer than `self.max_duration`.
|
||||
for entry in self.active.iter() {
|
||||
let (ptr, start) = entry.pair();
|
||||
let (tx, start) = entry.value();
|
||||
let duration = now - *start;
|
||||
|
||||
if duration > self.max_duration {
|
||||
let ptr = *ptr as *mut ffi::MDBX_txn;
|
||||
let result = tx.txn_execute_fail_on_timeout(|txn_ptr| {
|
||||
// Time out the transaction.
|
||||
//
|
||||
// We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to
|
||||
// prevent MDBX from reusing the pointer of the aborted
|
||||
// transaction for new read-only transactions. This is
|
||||
// important because we store the pointer in the `active` list
|
||||
// and assume that it is unique.
|
||||
//
|
||||
// See https://erthink.github.io/libmdbx/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info.
|
||||
let result = mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) });
|
||||
if result.is_ok() {
|
||||
tx.set_timed_out();
|
||||
}
|
||||
(txn_ptr, duration, result)
|
||||
});
|
||||
|
||||
// Add the transaction to the list of aborted transactions, so further
|
||||
// usages report the correct error when the transaction is closed.
|
||||
self.add_aborted(ptr);
|
||||
|
||||
// Abort the transaction
|
||||
let result = mdbx_result(unsafe { ffi::mdbx_txn_abort(ptr) });
|
||||
|
||||
// Add the transaction to `aborted_active`. We can't remove it instantly
|
||||
// from the list of active transactions, because we iterate through it.
|
||||
aborted_active.push((ptr, duration, result.err()));
|
||||
match result {
|
||||
Ok((txn_ptr, duration, error)) => {
|
||||
// Add the transaction to `timed_out_active`. We can't remove it
|
||||
// instantly from the list of active transactions, because we
|
||||
// iterate through it.
|
||||
timed_out_active.push((txn_ptr, duration, error));
|
||||
}
|
||||
Err(err) => {
|
||||
error!(target: "libmdbx", %err, "Failed to abort the long-lived read transaction")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
max_active_transaction_duration = Some(
|
||||
duration.max(max_active_transaction_duration.unwrap_or_default()),
|
||||
@ -262,40 +251,38 @@ mod read_transactions {
|
||||
}
|
||||
}
|
||||
|
||||
// Walk through aborted transactions, and delete them from the list of active
|
||||
// Walk through timed out transactions, and delete them from the list of active
|
||||
// transactions.
|
||||
for (ptr, open_duration, err) in aborted_active.iter().copied() {
|
||||
for (ptr, open_duration, err) in timed_out_active.iter().copied() {
|
||||
// Try deleting the transaction from the list of active transactions.
|
||||
let was_in_active = self.remove_active(ptr).is_some();
|
||||
if let Some(err) = err {
|
||||
// If there was an error when aborting the transaction, we need to
|
||||
// remove it from the list of aborted transactions, because otherwise it
|
||||
// will stay there forever.
|
||||
self.remove_aborted(ptr);
|
||||
if was_in_active && err != Error::BadSignature {
|
||||
// If the transaction was in the list of active transactions and the
|
||||
// error code is not `EBADSIGN`, then user didn't abort it.
|
||||
error!(target: "libmdbx", %err, ?open_duration, "Failed to abort the long-lived read transactions");
|
||||
if let Err(err) = err {
|
||||
if was_in_active {
|
||||
// If the transaction was in the list of active transactions,
|
||||
// then user didn't abort it and we failed to do so.
|
||||
error!(target: "libmdbx", %err, ?open_duration, "Failed to time out the long-lived read transaction");
|
||||
}
|
||||
} else {
|
||||
// Happy path, the transaction has been aborted by us with no errors.
|
||||
warn!(target: "libmdbx", ?open_duration, "Long-lived read transactions has been aborted");
|
||||
// Happy path, the transaction has been timed out by us with no errors.
|
||||
warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been timed out");
|
||||
// Add transaction to the list of timed out transactions that were not
|
||||
// aborted by the user yet.
|
||||
self.timed_out_not_aborted.insert(ptr as usize);
|
||||
}
|
||||
}
|
||||
|
||||
// Clear the list of aborted transactions, but not de-allocate the reserved
|
||||
// Clear the list of timed out transactions, but not de-allocate the reserved
|
||||
// capacity to save on further pushes.
|
||||
aborted_active.clear();
|
||||
timed_out_active.clear();
|
||||
|
||||
if !self.active.is_empty() || !self.aborted.is_empty() {
|
||||
if !self.active.is_empty() {
|
||||
trace!(
|
||||
target: "libmdbx",
|
||||
elapsed = ?now.elapsed(),
|
||||
active = ?self.active.iter().map(|entry| {
|
||||
let (ptr, start) = entry.pair();
|
||||
(*ptr, start.elapsed())
|
||||
let (tx, start) = entry.value();
|
||||
(tx.clone(), start.elapsed())
|
||||
}).collect::<Vec<_>>(),
|
||||
aborted = ?self.aborted.iter().map(|entry| *entry).collect::<Vec<_>>(),
|
||||
"Read transactions"
|
||||
);
|
||||
}
|
||||
@ -315,12 +302,10 @@ mod read_transactions {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{
|
||||
txn_manager::{
|
||||
read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, TxnManagerMessage, TxnPtr,
|
||||
},
|
||||
Environment, Error, MaxReadTransactionDuration, TransactionKind, RO,
|
||||
txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
|
||||
MaxReadTransactionDuration,
|
||||
};
|
||||
use std::{ptr, sync::mpsc::sync_channel, thread::sleep, time::Duration};
|
||||
use std::{thread::sleep, time::Duration};
|
||||
use tempfile::tempdir;
|
||||
|
||||
#[test]
|
||||
@ -345,7 +330,6 @@ mod read_transactions {
|
||||
drop(tx);
|
||||
|
||||
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
||||
assert!(!read_transactions.aborted.contains(&tx_ptr));
|
||||
}
|
||||
|
||||
// Create a read-only transaction, successfully use it, close it by committing.
|
||||
@ -358,24 +342,43 @@ mod read_transactions {
|
||||
tx.commit().unwrap();
|
||||
|
||||
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
||||
assert!(!read_transactions.aborted.contains(&tx_ptr));
|
||||
}
|
||||
|
||||
// Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the
|
||||
// manager kills it, use it and observe the `Error::ReadTransactionAborted` error.
|
||||
{
|
||||
// Create a read-only transaction and observe it's in the list of active
|
||||
// transactions.
|
||||
let tx = env.begin_ro_txn().unwrap();
|
||||
let tx_ptr = tx.txn() as usize;
|
||||
assert!(read_transactions.active.contains_key(&tx_ptr));
|
||||
|
||||
// Wait until the transaction is timed out by the manager.
|
||||
sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
|
||||
|
||||
// Ensure that the transaction is not in the list of active transactions anymore,
|
||||
// and is in the list of timed out but not aborted transactions.
|
||||
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
||||
assert!(read_transactions.aborted.contains(&tx_ptr));
|
||||
assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
|
||||
|
||||
assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionAborted));
|
||||
// Use the timed out transaction and observe the `Error::ReadTransactionTimeout`
|
||||
assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout));
|
||||
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
||||
assert!(!read_transactions.aborted.contains(&tx_ptr));
|
||||
assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
|
||||
|
||||
assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout));
|
||||
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
||||
assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
|
||||
|
||||
// Ensure that the transaction pointer is not reused when opening a new read-only
|
||||
// transaction.
|
||||
let new_tx = env.begin_ro_txn().unwrap();
|
||||
let new_tx_ptr = new_tx.txn() as usize;
|
||||
assert!(read_transactions.active.contains_key(&new_tx_ptr));
|
||||
assert_ne!(tx_ptr, new_tx_ptr);
|
||||
|
||||
// Drop the transaction and ensure that it's not in the list of timed out but not
|
||||
// aborted transactions anymore.
|
||||
drop(tx);
|
||||
assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr));
|
||||
}
|
||||
}
|
||||
|
||||
@ -393,64 +396,5 @@ mod read_transactions {
|
||||
sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
|
||||
assert!(tx.commit().is_ok())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn txn_manager_begin_read_transaction_via_message_listener() {
|
||||
const MAX_DURATION: Duration = Duration::from_secs(1);
|
||||
|
||||
let dir = tempdir().unwrap();
|
||||
let env = Environment::builder()
|
||||
.set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
|
||||
.open(dir.path())
|
||||
.unwrap();
|
||||
|
||||
let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
|
||||
|
||||
// Create a read-only transaction via the message listener.
|
||||
let (tx, rx) = sync_channel(0);
|
||||
env.txn_manager().send_message(TxnManagerMessage::Begin {
|
||||
parent: TxnPtr(ptr::null_mut()),
|
||||
flags: RO::OPEN_FLAGS,
|
||||
sender: tx,
|
||||
});
|
||||
|
||||
let txn_ptr = rx.recv().unwrap().unwrap();
|
||||
|
||||
assert!(read_transactions.active.contains_key(&(txn_ptr.0 as usize)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn txn_manager_reassign_transaction_removes_from_aborted_transactions() {
|
||||
const MAX_DURATION: Duration = Duration::from_secs(1);
|
||||
|
||||
let dir = tempdir().unwrap();
|
||||
let env = Environment::builder()
|
||||
.set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
|
||||
.open(dir.path())
|
||||
.unwrap();
|
||||
|
||||
let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
|
||||
|
||||
// Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the
|
||||
// manager kills it, use it and observe the `Error::ReadTransactionAborted` error.
|
||||
{
|
||||
let tx = env.begin_ro_txn().unwrap();
|
||||
let tx_ptr = tx.txn() as usize;
|
||||
assert!(read_transactions.active.contains_key(&tx_ptr));
|
||||
|
||||
sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
|
||||
|
||||
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
||||
assert!(read_transactions.aborted.contains(&tx_ptr));
|
||||
}
|
||||
|
||||
// Create a read-only transaction, ensure this removes it from aborted set if mdbx
|
||||
// reassigns same recently aborted transaction pointer.
|
||||
{
|
||||
let tx = env.begin_ro_txn().unwrap();
|
||||
let tx_ptr = tx.txn() as usize;
|
||||
assert!(!read_transactions.aborted.contains(&tx_ptr));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user