mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Revert "feat(storage): use mdbx_txn_reset to time out transactions … (#6919)
This commit is contained in:
@ -110,11 +110,6 @@ where
|
|||||||
describe_gauge!("db.table_pages", "The number of database pages for a table");
|
describe_gauge!("db.table_pages", "The number of database pages for a table");
|
||||||
describe_gauge!("db.table_entries", "The number of entries for a table");
|
describe_gauge!("db.table_entries", "The number of entries for a table");
|
||||||
describe_gauge!("db.freelist", "The number of pages on the freelist");
|
describe_gauge!("db.freelist", "The number of pages on the freelist");
|
||||||
describe_gauge!(
|
|
||||||
"db.timed_out_not_aborted_transactions",
|
|
||||||
"Number of timed out transactions that were not aborted by the user yet"
|
|
||||||
);
|
|
||||||
|
|
||||||
describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
|
describe_gauge!("static_files.segment_size", Unit::Bytes, "The size of a static file segment");
|
||||||
describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
|
describe_gauge!("static_files.segment_files", "The number of files for a static file segment");
|
||||||
describe_gauge!(
|
describe_gauge!(
|
||||||
|
|||||||
@ -123,19 +123,17 @@ impl Database for DatabaseEnv {
|
|||||||
type TXMut = tx::Tx<RW>;
|
type TXMut = tx::Tx<RW>;
|
||||||
|
|
||||||
fn tx(&self) -> Result<Self::TX, DatabaseError> {
|
fn tx(&self) -> Result<Self::TX, DatabaseError> {
|
||||||
Tx::new_with_metrics(
|
Ok(Tx::new_with_metrics(
|
||||||
self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
|
self.inner.begin_ro_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
|
||||||
self.metrics.as_ref().cloned(),
|
self.metrics.as_ref().cloned(),
|
||||||
)
|
))
|
||||||
.map_err(|e| DatabaseError::InitTx(e.into()))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
|
fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
|
||||||
Tx::new_with_metrics(
|
Ok(Tx::new_with_metrics(
|
||||||
self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
|
self.inner.begin_rw_txn().map_err(|e| DatabaseError::InitTx(e.into()))?,
|
||||||
self.metrics.as_ref().cloned(),
|
self.metrics.as_ref().cloned(),
|
||||||
)
|
))
|
||||||
.map_err(|e| DatabaseError::InitTx(e.into()))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,12 +202,6 @@ impl DatabaseMetrics for DatabaseEnv {
|
|||||||
metrics.push(("db.freelist", freelist as f64, vec![]));
|
metrics.push(("db.freelist", freelist as f64, vec![]));
|
||||||
}
|
}
|
||||||
|
|
||||||
metrics.push((
|
|
||||||
"db.timed_out_not_aborted_transactions",
|
|
||||||
self.timed_out_not_aborted_transactions() as f64,
|
|
||||||
vec![],
|
|
||||||
));
|
|
||||||
|
|
||||||
metrics
|
metrics
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -53,16 +53,14 @@ impl<K: TransactionKind> Tx<K> {
|
|||||||
pub fn new_with_metrics(
|
pub fn new_with_metrics(
|
||||||
inner: Transaction<K>,
|
inner: Transaction<K>,
|
||||||
env_metrics: Option<Arc<DatabaseEnvMetrics>>,
|
env_metrics: Option<Arc<DatabaseEnvMetrics>>,
|
||||||
) -> reth_libmdbx::Result<Self> {
|
) -> Self {
|
||||||
let metrics_handler = env_metrics
|
let metrics_handler = env_metrics.map(|env_metrics| {
|
||||||
.map(|env_metrics| {
|
let handler = MetricsHandler::<K>::new(inner.id(), env_metrics);
|
||||||
let handler = MetricsHandler::<K>::new(inner.id()?, env_metrics);
|
handler.env_metrics.record_opened_transaction(handler.transaction_mode());
|
||||||
handler.env_metrics.record_opened_transaction(handler.transaction_mode());
|
handler.log_transaction_opened();
|
||||||
handler.log_transaction_opened();
|
handler
|
||||||
Ok(handler)
|
});
|
||||||
})
|
Self::new_inner(inner, metrics_handler)
|
||||||
.transpose()?;
|
|
||||||
Ok(Self::new_inner(inner, metrics_handler))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -78,8 +76,8 @@ impl<K: TransactionKind> Tx<K> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Gets this transaction ID.
|
/// Gets this transaction ID.
|
||||||
pub fn id(&self) -> reth_libmdbx::Result<u64> {
|
pub fn id(&self) -> u64 {
|
||||||
self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| Ok(handler.txn_id))
|
self.metrics_handler.as_ref().map_or_else(|| self.inner.id(), |handler| handler.txn_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Gets a table database handle if it exists, otherwise creates it.
|
/// Gets a table database handle if it exists, otherwise creates it.
|
||||||
@ -439,7 +437,7 @@ mod tests {
|
|||||||
|
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
tx.get::<tables::Transactions>(0).err(),
|
tx.get::<tables::Transactions>(0).err(),
|
||||||
Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionTimeout.into()))
|
Some(DatabaseError::Open(reth_libmdbx::Error::ReadTransactionAborted.into()))
|
||||||
); // Transaction is timeout-ed
|
); // Transaction is timeout-ed
|
||||||
assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
|
assert!(tx.metrics_handler.unwrap().backtrace_recorded.load(Ordering::Relaxed));
|
||||||
// Backtrace is recorded
|
// Backtrace is recorded
|
||||||
|
|||||||
@ -78,7 +78,8 @@ fn bench_get_seq_raw(c: &mut Criterion) {
|
|||||||
let (_dir, env) = setup_bench_db(n);
|
let (_dir, env) = setup_bench_db(n);
|
||||||
|
|
||||||
let dbi = env.begin_ro_txn().unwrap().open_db(None).unwrap().dbi();
|
let dbi = env.begin_ro_txn().unwrap().open_db(None).unwrap().dbi();
|
||||||
let txn = env.begin_ro_txn().unwrap();
|
let _txn = env.begin_ro_txn().unwrap();
|
||||||
|
let txn = _txn.txn();
|
||||||
|
|
||||||
let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
let mut key = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
||||||
let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
let mut data = MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
||||||
@ -86,21 +87,18 @@ fn bench_get_seq_raw(c: &mut Criterion) {
|
|||||||
|
|
||||||
c.bench_function("bench_get_seq_raw", |b| {
|
c.bench_function("bench_get_seq_raw", |b| {
|
||||||
b.iter(|| unsafe {
|
b.iter(|| unsafe {
|
||||||
txn.txn_execute(|txn| {
|
mdbx_cursor_open(txn, dbi, &mut cursor);
|
||||||
mdbx_cursor_open(txn, dbi, &mut cursor);
|
let mut i = 0;
|
||||||
let mut i = 0;
|
let mut count = 0u32;
|
||||||
let mut count = 0u32;
|
|
||||||
|
|
||||||
while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 {
|
while mdbx_cursor_get(cursor, &mut key, &mut data, MDBX_NEXT) == 0 {
|
||||||
i += key.iov_len + data.iov_len;
|
i += key.iov_len + data.iov_len;
|
||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
black_box(i);
|
black_box(i);
|
||||||
assert_eq!(count, n);
|
assert_eq!(count, n);
|
||||||
mdbx_cursor_close(cursor);
|
mdbx_cursor_close(cursor);
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@ -46,7 +46,7 @@ fn bench_get_rand_raw(c: &mut Criterion) {
|
|||||||
|
|
||||||
c.bench_function("bench_get_rand_raw", |b| {
|
c.bench_function("bench_get_rand_raw", |b| {
|
||||||
b.iter(|| unsafe {
|
b.iter(|| unsafe {
|
||||||
txn.txn_execute(|txn| {
|
txn.with_raw_tx_ptr(|txn| {
|
||||||
let mut i: size_t = 0;
|
let mut i: size_t = 0;
|
||||||
for key in &keys {
|
for key in &keys {
|
||||||
key_val.iov_len = key.len() as size_t;
|
key_val.iov_len = key.len() as size_t;
|
||||||
@ -57,8 +57,7 @@ fn bench_get_rand_raw(c: &mut Criterion) {
|
|||||||
i += key_val.iov_len;
|
i += key_val.iov_len;
|
||||||
}
|
}
|
||||||
black_box(i);
|
black_box(i);
|
||||||
})
|
});
|
||||||
.unwrap();
|
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
error::{mdbx_result, Error, Result},
|
error::{mdbx_result, mdbx_result_with_tx_kind, Error, Result},
|
||||||
flags::*,
|
flags::*,
|
||||||
mdbx_try_optional,
|
mdbx_try_optional,
|
||||||
transaction::{TransactionKind, RW},
|
transaction::{TransactionKind, RW},
|
||||||
@ -30,26 +30,26 @@ where
|
|||||||
pub(crate) fn new(txn: Transaction<K>, dbi: ffi::MDBX_dbi) -> Result<Self> {
|
pub(crate) fn new(txn: Transaction<K>, dbi: ffi::MDBX_dbi) -> Result<Self> {
|
||||||
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
|
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
|
||||||
unsafe {
|
unsafe {
|
||||||
txn.txn_execute(|txn_ptr| {
|
mdbx_result_with_tx_kind::<K>(
|
||||||
mdbx_result(ffi::mdbx_cursor_open(txn_ptr, dbi, &mut cursor))
|
txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)),
|
||||||
})??;
|
txn.txn(),
|
||||||
|
txn.env().txn_manager(),
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
Ok(Self { txn, cursor })
|
Ok(Self { txn, cursor })
|
||||||
}
|
}
|
||||||
|
|
||||||
fn new_at_position(other: &Self) -> Result<Self> {
|
fn new_at_position(other: &Self) -> Result<Self> {
|
||||||
unsafe {
|
unsafe {
|
||||||
other.txn.txn_execute(|_| {
|
let cursor = ffi::mdbx_cursor_create(ptr::null_mut());
|
||||||
let cursor = ffi::mdbx_cursor_create(ptr::null_mut());
|
|
||||||
|
|
||||||
let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);
|
let res = ffi::mdbx_cursor_copy(other.cursor(), cursor);
|
||||||
|
|
||||||
let s = Self { txn: other.txn.clone(), cursor };
|
let s = Self { txn: other.txn.clone(), cursor };
|
||||||
|
|
||||||
mdbx_result(res)?;
|
mdbx_result_with_tx_kind::<K>(res, s.txn.txn(), s.txn.env().txn_manager())?;
|
||||||
|
|
||||||
Ok(s)
|
Ok(s)
|
||||||
})?
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,12 +95,11 @@ where
|
|||||||
let key_ptr = key_val.iov_base;
|
let key_ptr = key_val.iov_base;
|
||||||
let data_ptr = data_val.iov_base;
|
let data_ptr = data_val.iov_base;
|
||||||
self.txn.txn_execute(|txn| {
|
self.txn.txn_execute(|txn| {
|
||||||
let v = mdbx_result(ffi::mdbx_cursor_get(
|
let v = mdbx_result_with_tx_kind::<K>(
|
||||||
self.cursor,
|
ffi::mdbx_cursor_get(self.cursor, &mut key_val, &mut data_val, op),
|
||||||
&mut key_val,
|
txn,
|
||||||
&mut data_val,
|
self.txn.env().txn_manager(),
|
||||||
op,
|
)?;
|
||||||
))?;
|
|
||||||
assert_ne!(data_ptr, data_val.iov_base);
|
assert_ne!(data_ptr, data_val.iov_base);
|
||||||
let key_out = {
|
let key_out = {
|
||||||
// MDBX wrote in new key
|
// MDBX wrote in new key
|
||||||
@ -112,7 +111,7 @@ where
|
|||||||
};
|
};
|
||||||
let data_out = Value::decode_val::<K>(txn, data_val)?;
|
let data_out = Value::decode_val::<K>(txn, data_val)?;
|
||||||
Ok((key_out, data_out, v))
|
Ok((key_out, data_out, v))
|
||||||
})?
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -445,7 +444,7 @@ impl Cursor<RW> {
|
|||||||
mdbx_result(unsafe {
|
mdbx_result(unsafe {
|
||||||
self.txn.txn_execute(|_| {
|
self.txn.txn_execute(|_| {
|
||||||
ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits())
|
ffi::mdbx_cursor_put(self.cursor, &key_val, &mut data_val, flags.bits())
|
||||||
})?
|
})
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -459,7 +458,7 @@ impl Cursor<RW> {
|
|||||||
/// current key, if the database was opened with [DatabaseFlags::DUP_SORT].
|
/// current key, if the database was opened with [DatabaseFlags::DUP_SORT].
|
||||||
pub fn del(&mut self, flags: WriteFlags) -> Result<()> {
|
pub fn del(&mut self, flags: WriteFlags) -> Result<()> {
|
||||||
mdbx_result(unsafe {
|
mdbx_result(unsafe {
|
||||||
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))?
|
self.txn.txn_execute(|_| ffi::mdbx_cursor_del(self.cursor, flags.bits()))
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -471,7 +470,7 @@ where
|
|||||||
K: TransactionKind,
|
K: TransactionKind,
|
||||||
{
|
{
|
||||||
fn clone(&self) -> Self {
|
fn clone(&self) -> Self {
|
||||||
Self::new_at_position(self).unwrap()
|
self.txn.txn_execute(|_| Self::new_at_position(self).unwrap())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -489,7 +488,7 @@ where
|
|||||||
K: TransactionKind,
|
K: TransactionKind,
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
let _ = self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) });
|
self.txn.txn_execute(|_| unsafe { ffi::mdbx_cursor_close(self.cursor) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -565,7 +564,7 @@ where
|
|||||||
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
||||||
let op = mem::replace(op, *next_op);
|
let op = mem::replace(op, *next_op);
|
||||||
unsafe {
|
unsafe {
|
||||||
let result = cursor.txn.txn_execute(|txn| {
|
cursor.txn.txn_execute(|txn| {
|
||||||
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
|
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
|
||||||
ffi::MDBX_SUCCESS => {
|
ffi::MDBX_SUCCESS => {
|
||||||
let key = match Key::decode_val::<K>(txn, key) {
|
let key = match Key::decode_val::<K>(txn, key) {
|
||||||
@ -584,11 +583,7 @@ where
|
|||||||
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
|
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
|
||||||
error => Some(Err(Error::from_err_code(error))),
|
error => Some(Err(Error::from_err_code(error))),
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
match result {
|
|
||||||
Ok(result) => result,
|
|
||||||
Err(err) => Some(Err(err)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Self::Err(err) => err.take().map(Err),
|
Self::Err(err) => err.take().map(Err),
|
||||||
@ -660,7 +655,7 @@ where
|
|||||||
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
||||||
let op = mem::replace(op, *next_op);
|
let op = mem::replace(op, *next_op);
|
||||||
unsafe {
|
unsafe {
|
||||||
let result = cursor.txn.txn_execute(|txn| {
|
cursor.txn.txn_execute(|txn| {
|
||||||
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
|
match ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) {
|
||||||
ffi::MDBX_SUCCESS => {
|
ffi::MDBX_SUCCESS => {
|
||||||
let key = match Key::decode_val::<K>(txn, key) {
|
let key = match Key::decode_val::<K>(txn, key) {
|
||||||
@ -679,11 +674,7 @@ where
|
|||||||
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
|
ffi::MDBX_NOTFOUND | ffi::MDBX_ENODATA => None,
|
||||||
error => Some(Err(Error::from_err_code(error))),
|
error => Some(Err(Error::from_err_code(error))),
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
match result {
|
|
||||||
Ok(result) => result,
|
|
||||||
Err(err) => Some(Err(err)),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Iter::Err(err) => err.take().map(Err),
|
Iter::Err(err) => err.take().map(Err),
|
||||||
@ -761,15 +752,17 @@ where
|
|||||||
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
let mut data = ffi::MDBX_val { iov_len: 0, iov_base: ptr::null_mut() };
|
||||||
let op = mem::replace(op, ffi::MDBX_NEXT_NODUP);
|
let op = mem::replace(op, ffi::MDBX_NEXT_NODUP);
|
||||||
|
|
||||||
let err_code =
|
cursor.txn.txn_execute(|_| {
|
||||||
unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) };
|
let err_code =
|
||||||
|
unsafe { ffi::mdbx_cursor_get(cursor.cursor(), &mut key, &mut data, op) };
|
||||||
|
|
||||||
(err_code == ffi::MDBX_SUCCESS).then(|| {
|
(err_code == ffi::MDBX_SUCCESS).then(|| {
|
||||||
IntoIter::new(
|
IntoIter::new(
|
||||||
Cursor::new_at_position(&**cursor).unwrap(),
|
Cursor::new_at_position(&**cursor).unwrap(),
|
||||||
ffi::MDBX_GET_CURRENT,
|
ffi::MDBX_GET_CURRENT,
|
||||||
ffi::MDBX_NEXT_DUP,
|
ffi::MDBX_NEXT_DUP,
|
||||||
)
|
)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
IterDup::Err(err) => err.take().map(|e| IntoIter::Err(Some(e))),
|
IterDup::Err(err) => err.take().map(|e| IntoIter::Err(Some(e))),
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
error::{mdbx_result, Result},
|
error::{mdbx_result_with_tx_kind, Result},
|
||||||
transaction::TransactionKind,
|
transaction::TransactionKind,
|
||||||
Environment, Transaction,
|
Environment, Transaction,
|
||||||
};
|
};
|
||||||
@ -31,8 +31,12 @@ impl Database {
|
|||||||
let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() };
|
let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() };
|
||||||
let mut dbi: ffi::MDBX_dbi = 0;
|
let mut dbi: ffi::MDBX_dbi = 0;
|
||||||
txn.txn_execute(|txn_ptr| {
|
txn.txn_execute(|txn_ptr| {
|
||||||
mdbx_result(unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) })
|
mdbx_result_with_tx_kind::<K>(
|
||||||
})??;
|
unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) },
|
||||||
|
txn_ptr,
|
||||||
|
txn.env().txn_manager(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
Ok(Self::new_from_ptr(dbi, txn.env().clone()))
|
Ok(Self::new_from_ptr(dbi, txn.env().clone()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -88,12 +88,6 @@ impl Environment {
|
|||||||
&self.inner.txn_manager
|
&self.inner.txn_manager
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the number of timed out transactions that were not aborted by the user yet.
|
|
||||||
#[cfg(feature = "read-tx-timeouts")]
|
|
||||||
pub fn timed_out_not_aborted_transactions(&self) -> usize {
|
|
||||||
self.inner.txn_manager.timed_out_not_aborted_read_transactions().unwrap_or(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a read-only transaction for use with the environment.
|
/// Create a read-only transaction for use with the environment.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn begin_ro_txn(&self) -> Result<Transaction<RO>> {
|
pub fn begin_ro_txn(&self) -> Result<Transaction<RO>> {
|
||||||
|
|||||||
@ -1,3 +1,4 @@
|
|||||||
|
use crate::{txn_manager::TxnManager, TransactionKind};
|
||||||
use libc::c_int;
|
use libc::c_int;
|
||||||
use std::result;
|
use std::result;
|
||||||
|
|
||||||
@ -117,9 +118,8 @@ pub enum Error {
|
|||||||
/// [Mode::ReadOnly](crate::flags::Mode::ReadOnly), write transactions can't be opened.
|
/// [Mode::ReadOnly](crate::flags::Mode::ReadOnly), write transactions can't be opened.
|
||||||
#[error("write transactions are not supported in read-only mode")]
|
#[error("write transactions are not supported in read-only mode")]
|
||||||
WriteTransactionUnsupportedInReadOnlyMode,
|
WriteTransactionUnsupportedInReadOnlyMode,
|
||||||
/// Read transaction has been timed out.
|
#[error("read transaction has been aborted by the transaction manager")]
|
||||||
#[error("read transaction has been timed out")]
|
ReadTransactionAborted,
|
||||||
ReadTransactionTimeout,
|
|
||||||
/// Unknown error code.
|
/// Unknown error code.
|
||||||
#[error("unknown error code")]
|
#[error("unknown error code")]
|
||||||
Other(i32),
|
Other(i32),
|
||||||
@ -193,10 +193,9 @@ impl Error {
|
|||||||
Error::DecodeErrorLenDiff | Error::DecodeError => ffi::MDBX_EINVAL,
|
Error::DecodeErrorLenDiff | Error::DecodeError => ffi::MDBX_EINVAL,
|
||||||
Error::Access => ffi::MDBX_EACCESS,
|
Error::Access => ffi::MDBX_EACCESS,
|
||||||
Error::TooLarge => ffi::MDBX_TOO_LARGE,
|
Error::TooLarge => ffi::MDBX_TOO_LARGE,
|
||||||
Error::BadSignature => ffi::MDBX_EBADSIGN,
|
Error::BadSignature | Error::ReadTransactionAborted => ffi::MDBX_EBADSIGN,
|
||||||
Error::WriteTransactionUnsupportedInReadOnlyMode => ffi::MDBX_EACCESS,
|
Error::WriteTransactionUnsupportedInReadOnlyMode => ffi::MDBX_EACCESS,
|
||||||
Error::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS,
|
Error::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS,
|
||||||
Error::ReadTransactionTimeout => -96000, // Custom non-MDBX error code
|
|
||||||
Error::Other(err_code) => *err_code,
|
Error::Other(err_code) => *err_code,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -217,6 +216,33 @@ pub(crate) fn mdbx_result(err_code: c_int) -> Result<bool> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "read-tx-timeouts")]
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn mdbx_result_with_tx_kind<K: TransactionKind>(
|
||||||
|
err_code: c_int,
|
||||||
|
txn: *mut ffi::MDBX_txn,
|
||||||
|
txn_manager: &TxnManager,
|
||||||
|
) -> Result<bool> {
|
||||||
|
if K::IS_READ_ONLY &&
|
||||||
|
txn_manager.remove_aborted_read_transaction(txn).is_some() &&
|
||||||
|
err_code == ffi::MDBX_EBADSIGN
|
||||||
|
{
|
||||||
|
return Err(Error::ReadTransactionAborted)
|
||||||
|
}
|
||||||
|
|
||||||
|
mdbx_result(err_code)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(feature = "read-tx-timeouts"))]
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn mdbx_result_with_tx_kind<K: TransactionKind>(
|
||||||
|
err_code: c_int,
|
||||||
|
_txn: *mut ffi::MDBX_txn,
|
||||||
|
_txn_manager: &TxnManager,
|
||||||
|
) -> Result<bool> {
|
||||||
|
mdbx_result(err_code)
|
||||||
|
}
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! mdbx_try_optional {
|
macro_rules! mdbx_try_optional {
|
||||||
($expr:expr) => {{
|
($expr:expr) => {{
|
||||||
|
|||||||
@ -1,12 +1,12 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
database::Database,
|
database::Database,
|
||||||
environment::Environment,
|
environment::Environment,
|
||||||
error::{mdbx_result, Result},
|
error::{mdbx_result, mdbx_result_with_tx_kind, Result},
|
||||||
flags::{DatabaseFlags, WriteFlags},
|
flags::{DatabaseFlags, WriteFlags},
|
||||||
txn_manager::{TxnManagerMessage, TxnPtr},
|
txn_manager::{TxnManagerMessage, TxnPtr},
|
||||||
Cursor, Error, Stat, TableObject,
|
Cursor, Error, Stat, TableObject,
|
||||||
};
|
};
|
||||||
use ffi::{mdbx_txn_renew, MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
|
use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
|
||||||
use indexmap::IndexSet;
|
use indexmap::IndexSet;
|
||||||
use libc::{c_uint, c_void};
|
use libc::{c_uint, c_void};
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
@ -71,27 +71,29 @@ where
|
|||||||
pub(crate) fn new(env: Environment) -> Result<Self> {
|
pub(crate) fn new(env: Environment) -> Result<Self> {
|
||||||
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
|
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
|
||||||
unsafe {
|
unsafe {
|
||||||
mdbx_result(ffi::mdbx_txn_begin_ex(
|
mdbx_result_with_tx_kind::<K>(
|
||||||
env.env_ptr(),
|
ffi::mdbx_txn_begin_ex(
|
||||||
ptr::null_mut(),
|
env.env_ptr(),
|
||||||
K::OPEN_FLAGS,
|
ptr::null_mut(),
|
||||||
&mut txn,
|
K::OPEN_FLAGS,
|
||||||
ptr::null_mut(),
|
&mut txn,
|
||||||
))?;
|
ptr::null_mut(),
|
||||||
|
),
|
||||||
|
txn,
|
||||||
|
env.txn_manager(),
|
||||||
|
)?;
|
||||||
Ok(Self::new_from_ptr(env, txn))
|
Ok(Self::new_from_ptr(env, txn))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn new_from_ptr(env: Environment, txn_ptr: *mut ffi::MDBX_txn) -> Self {
|
pub(crate) fn new_from_ptr(env: Environment, txn: *mut ffi::MDBX_txn) -> Self {
|
||||||
let txn = TransactionPtr::new(txn_ptr);
|
|
||||||
|
|
||||||
#[cfg(feature = "read-tx-timeouts")]
|
#[cfg(feature = "read-tx-timeouts")]
|
||||||
if K::IS_READ_ONLY {
|
if K::IS_READ_ONLY {
|
||||||
env.txn_manager().add_active_read_transaction(txn_ptr, txn.clone())
|
env.txn_manager().add_active_read_transaction(txn)
|
||||||
}
|
}
|
||||||
|
|
||||||
let inner = TransactionInner {
|
let inner = TransactionInner {
|
||||||
txn,
|
txn: TransactionPtr::new(txn),
|
||||||
primed_dbis: Mutex::new(IndexSet::new()),
|
primed_dbis: Mutex::new(IndexSet::new()),
|
||||||
committed: AtomicBool::new(false),
|
committed: AtomicBool::new(false),
|
||||||
env,
|
env,
|
||||||
@ -106,7 +108,7 @@ where
|
|||||||
/// The caller **must** ensure that the pointer is not used after the
|
/// The caller **must** ensure that the pointer is not used after the
|
||||||
/// lifetime of the transaction.
|
/// lifetime of the transaction.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn txn_execute<F, T>(&self, f: F) -> Result<T>
|
pub(crate) fn txn_execute<F, T>(&self, f: F) -> T
|
||||||
where
|
where
|
||||||
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
||||||
{
|
{
|
||||||
@ -115,18 +117,32 @@ where
|
|||||||
|
|
||||||
/// Returns a copy of the raw pointer to the underlying MDBX transaction.
|
/// Returns a copy of the raw pointer to the underlying MDBX transaction.
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
#[cfg(test)]
|
|
||||||
pub fn txn(&self) -> *mut ffi::MDBX_txn {
|
pub fn txn(&self) -> *mut ffi::MDBX_txn {
|
||||||
self.inner.txn.txn
|
self.inner.txn.txn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Executes the given closure once
|
||||||
|
///
|
||||||
|
/// This is only intended to be used when accessing mdbx ffi functions directly is required.
|
||||||
|
///
|
||||||
|
/// The caller **must** ensure that the pointer is only used within the closure.
|
||||||
|
#[inline]
|
||||||
|
#[doc(hidden)]
|
||||||
|
pub fn with_raw_tx_ptr<F, T>(&self, f: F) -> T
|
||||||
|
where
|
||||||
|
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
||||||
|
{
|
||||||
|
let _lock = self.inner.txn.lock.lock();
|
||||||
|
f(self.inner.txn.txn)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns a raw pointer to the MDBX environment.
|
/// Returns a raw pointer to the MDBX environment.
|
||||||
pub fn env(&self) -> &Environment {
|
pub fn env(&self) -> &Environment {
|
||||||
&self.inner.env
|
&self.inner.env
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the transaction id.
|
/// Returns the transaction id.
|
||||||
pub fn id(&self) -> Result<u64> {
|
pub fn id(&self) -> u64 {
|
||||||
self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) })
|
self.txn_execute(|txn| unsafe { ffi::mdbx_txn_id(txn) })
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -152,7 +168,7 @@ where
|
|||||||
ffi::MDBX_NOTFOUND => Ok(None),
|
ffi::MDBX_NOTFOUND => Ok(None),
|
||||||
err_code => Err(Error::from_err_code(err_code)),
|
err_code => Err(Error::from_err_code(err_code)),
|
||||||
}
|
}
|
||||||
})?
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Commits the transaction.
|
/// Commits the transaction.
|
||||||
@ -175,9 +191,11 @@ where
|
|||||||
self.env().txn_manager().remove_active_read_transaction(txn);
|
self.env().txn_manager().remove_active_read_transaction(txn);
|
||||||
|
|
||||||
let mut latency = CommitLatency::new();
|
let mut latency = CommitLatency::new();
|
||||||
mdbx_result(unsafe {
|
mdbx_result_with_tx_kind::<K>(
|
||||||
ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency())
|
unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) },
|
||||||
})
|
txn,
|
||||||
|
self.env().txn_manager(),
|
||||||
|
)
|
||||||
.map(|v| (v, latency))
|
.map(|v| (v, latency))
|
||||||
} else {
|
} else {
|
||||||
let (sender, rx) = sync_channel(0);
|
let (sender, rx) = sync_channel(0);
|
||||||
@ -186,7 +204,7 @@ where
|
|||||||
.send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender });
|
.send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender });
|
||||||
rx.recv().unwrap()
|
rx.recv().unwrap()
|
||||||
}
|
}
|
||||||
})?;
|
});
|
||||||
|
|
||||||
self.inner.set_committed();
|
self.inner.set_committed();
|
||||||
result
|
result
|
||||||
@ -225,8 +243,12 @@ where
|
|||||||
let mut flags: c_uint = 0;
|
let mut flags: c_uint = 0;
|
||||||
unsafe {
|
unsafe {
|
||||||
self.txn_execute(|txn| {
|
self.txn_execute(|txn| {
|
||||||
mdbx_result(ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()))
|
mdbx_result_with_tx_kind::<K>(
|
||||||
})??;
|
ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()),
|
||||||
|
txn,
|
||||||
|
self.env().txn_manager(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// The types are not the same on Windows. Great!
|
// The types are not the same on Windows. Great!
|
||||||
@ -244,8 +266,12 @@ where
|
|||||||
unsafe {
|
unsafe {
|
||||||
let mut stat = Stat::new();
|
let mut stat = Stat::new();
|
||||||
self.txn_execute(|txn| {
|
self.txn_execute(|txn| {
|
||||||
mdbx_result(ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>()))
|
mdbx_result_with_tx_kind::<K>(
|
||||||
})??;
|
ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>()),
|
||||||
|
txn,
|
||||||
|
self.env().txn_manager(),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
Ok(stat)
|
Ok(stat)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -264,7 +290,7 @@ where
|
|||||||
#[cfg(feature = "read-tx-timeouts")]
|
#[cfg(feature = "read-tx-timeouts")]
|
||||||
pub fn disable_timeout(&self) {
|
pub fn disable_timeout(&self) {
|
||||||
if K::IS_READ_ONLY {
|
if K::IS_READ_ONLY {
|
||||||
self.env().txn_manager().remove_active_read_transaction(self.inner.txn.txn);
|
self.env().txn_manager().remove_active_read_transaction(self.txn());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -316,11 +342,11 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
fn txn_execute<F, T>(&self, f: F) -> Result<T>
|
fn txn_execute<F, T>(&self, f: F) -> T
|
||||||
where
|
where
|
||||||
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
||||||
{
|
{
|
||||||
self.txn.txn_execute_fail_on_timeout(f)
|
self.txn.txn_execute(f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -329,9 +355,7 @@ where
|
|||||||
K: TransactionKind,
|
K: TransactionKind,
|
||||||
{
|
{
|
||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
// To be able to abort a timed out transaction, we need to renew it first.
|
self.txn_execute(|txn| {
|
||||||
// Hence the usage of `txn_execute_renew_on_timeout` here.
|
|
||||||
let _ = self.txn.txn_execute_renew_on_timeout(|txn| {
|
|
||||||
if !self.has_committed() {
|
if !self.has_committed() {
|
||||||
if K::IS_READ_ONLY {
|
if K::IS_READ_ONLY {
|
||||||
#[cfg(feature = "read-tx-timeouts")]
|
#[cfg(feature = "read-tx-timeouts")]
|
||||||
@ -348,7 +372,7 @@ where
|
|||||||
rx.recv().unwrap().unwrap();
|
rx.recv().unwrap().unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -394,7 +418,7 @@ impl Transaction<RW> {
|
|||||||
ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void };
|
ffi::MDBX_val { iov_len: data.len(), iov_base: data.as_ptr() as *mut c_void };
|
||||||
mdbx_result(self.txn_execute(|txn| unsafe {
|
mdbx_result(self.txn_execute(|txn| unsafe {
|
||||||
ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits())
|
ffi::mdbx_put(txn, dbi, &key_val, &mut data_val, flags.bits())
|
||||||
})?)?;
|
}))?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -423,7 +447,7 @@ impl Transaction<RW> {
|
|||||||
&mut data_val,
|
&mut data_val,
|
||||||
flags.bits() | ffi::MDBX_RESERVE,
|
flags.bits() | ffi::MDBX_RESERVE,
|
||||||
)
|
)
|
||||||
})?)?;
|
}))?;
|
||||||
Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len))
|
Ok(slice::from_raw_parts_mut(data_val.iov_base as *mut u8, data_val.iov_len))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -458,7 +482,7 @@ impl Transaction<RW> {
|
|||||||
} else {
|
} else {
|
||||||
unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) }
|
unsafe { ffi::mdbx_del(txn, dbi, &key_val, ptr::null()) }
|
||||||
}
|
}
|
||||||
})?
|
})
|
||||||
})
|
})
|
||||||
.map(|_| true)
|
.map(|_| true)
|
||||||
.or_else(|e| match e {
|
.or_else(|e| match e {
|
||||||
@ -469,7 +493,7 @@ impl Transaction<RW> {
|
|||||||
|
|
||||||
/// Empties the given database. All items will be removed.
|
/// Empties the given database. All items will be removed.
|
||||||
pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
|
pub fn clear_db(&self, dbi: ffi::MDBX_dbi) -> Result<()> {
|
||||||
mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) })?)?;
|
mdbx_result(self.txn_execute(|txn| unsafe { ffi::mdbx_drop(txn, dbi, false) }))?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -480,7 +504,7 @@ impl Transaction<RW> {
|
|||||||
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
|
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
|
||||||
/// BEFORE calling this function.
|
/// BEFORE calling this function.
|
||||||
pub unsafe fn drop_db(&self, db: Database) -> Result<()> {
|
pub unsafe fn drop_db(&self, db: Database) -> Result<()> {
|
||||||
mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true))?)?;
|
mdbx_result(self.txn_execute(|txn| ffi::mdbx_drop(txn, db.dbi(), true)))?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -493,7 +517,11 @@ impl Transaction<RO> {
|
|||||||
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
|
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
|
||||||
/// BEFORE calling this function.
|
/// BEFORE calling this function.
|
||||||
pub unsafe fn close_db(&self, db: Database) -> Result<()> {
|
pub unsafe fn close_db(&self, db: Database) -> Result<()> {
|
||||||
self.txn_execute(|_| mdbx_result(ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi())))??;
|
mdbx_result_with_tx_kind::<RO>(
|
||||||
|
ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()),
|
||||||
|
self.txn(),
|
||||||
|
self.env().txn_manager(),
|
||||||
|
)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -514,12 +542,12 @@ impl Transaction<RW> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
rx.recv().unwrap().map(|ptr| Transaction::new_from_ptr(self.env().clone(), ptr.0))
|
rx.recv().unwrap().map(|ptr| Transaction::new_from_ptr(self.env().clone(), ptr.0))
|
||||||
})?
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A shareable pointer to an MDBX transaction.
|
/// A shareable pointer to an MDBX transaction.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Clone)]
|
||||||
pub(crate) struct TransactionPtr {
|
pub(crate) struct TransactionPtr {
|
||||||
txn: *mut ffi::MDBX_txn,
|
txn: *mut ffi::MDBX_txn,
|
||||||
lock: Arc<Mutex<()>>,
|
lock: Arc<Mutex<()>>,
|
||||||
@ -530,52 +558,14 @@ impl TransactionPtr {
|
|||||||
Self { txn, lock: Arc::new(Mutex::new(())) }
|
Self { txn, lock: Arc::new(Mutex::new(())) }
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns `true` if the transaction is timed out.
|
|
||||||
//
|
|
||||||
// When transaction is timed out via `TxnManager`, it's actually reset using
|
|
||||||
// `mdbx_txn_reset`. It makes the transaction unusable (MDBX fails on any usages of such
|
|
||||||
// transactions), and sets the `MDBX_TXN_FINISHED` flag.
|
|
||||||
fn is_timed_out(&self) -> bool {
|
|
||||||
(unsafe { ffi::mdbx_txn_flags(self.txn) } & ffi::MDBX_TXN_FINISHED) != 0
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Executes the given closure once the lock on the transaction is acquired.
|
/// Executes the given closure once the lock on the transaction is acquired.
|
||||||
///
|
|
||||||
/// Returns the result of the closure or an error if the transaction is timed out.
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub(crate) fn txn_execute_fail_on_timeout<F, T>(&self, f: F) -> Result<T>
|
pub(crate) fn txn_execute<F, T>(&self, f: F) -> T
|
||||||
where
|
where
|
||||||
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
||||||
{
|
{
|
||||||
let _lck = self.lock.lock();
|
let _lck = self.lock.lock();
|
||||||
|
(f)(self.txn)
|
||||||
// No race condition with the `TxnManager` timing out the transaction is possible here,
|
|
||||||
// because we're taking a lock for any actions on the transaction pointer, including a call
|
|
||||||
// to the `mdbx_txn_reset`.
|
|
||||||
if self.is_timed_out() {
|
|
||||||
return Err(Error::ReadTransactionTimeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((f)(self.txn))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Executes the given closure once the lock on the transaction is acquired. If the tranasction
|
|
||||||
/// is timed out, it will be renewed first.
|
|
||||||
///
|
|
||||||
/// Returns the result of the closure or an error if the transaction renewal fails.
|
|
||||||
#[inline]
|
|
||||||
fn txn_execute_renew_on_timeout<F, T>(&self, f: F) -> Result<T>
|
|
||||||
where
|
|
||||||
F: FnOnce(*mut ffi::MDBX_txn) -> T,
|
|
||||||
{
|
|
||||||
let _lck = self.lock.lock();
|
|
||||||
|
|
||||||
// To be able to do any operations on the transaction, we need to renew it first.
|
|
||||||
if self.is_timed_out() {
|
|
||||||
mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((f)(self.txn))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -52,6 +52,9 @@ impl TxnManager {
|
|||||||
/// - [TxnManagerMessage::Abort] aborts a transaction with [ffi::mdbx_txn_abort]
|
/// - [TxnManagerMessage::Abort] aborts a transaction with [ffi::mdbx_txn_abort]
|
||||||
/// - [TxnManagerMessage::Commit] commits a transaction with [ffi::mdbx_txn_commit_ex]
|
/// - [TxnManagerMessage::Commit] commits a transaction with [ffi::mdbx_txn_commit_ex]
|
||||||
fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
|
fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
|
||||||
|
#[cfg(feature = "read-tx-timeouts")]
|
||||||
|
let read_transactions = self.read_transactions.clone();
|
||||||
|
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
#[allow(clippy::redundant_locals)]
|
#[allow(clippy::redundant_locals)]
|
||||||
let env = env;
|
let env = env;
|
||||||
@ -70,12 +73,34 @@ impl TxnManager {
|
|||||||
)
|
)
|
||||||
})
|
})
|
||||||
.map(|_| TxnPtr(txn));
|
.map(|_| TxnPtr(txn));
|
||||||
|
|
||||||
|
#[cfg(feature = "read-tx-timeouts")]
|
||||||
|
{
|
||||||
|
use crate::transaction::TransactionKind;
|
||||||
|
|
||||||
|
if res.is_ok() && flags == crate::transaction::RO::OPEN_FLAGS {
|
||||||
|
if let Some(read_transactions) = &read_transactions {
|
||||||
|
read_transactions.add_active(txn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
sender.send(res).unwrap();
|
sender.send(res).unwrap();
|
||||||
}
|
}
|
||||||
TxnManagerMessage::Abort { tx, sender } => {
|
TxnManagerMessage::Abort { tx, sender } => {
|
||||||
|
#[cfg(feature = "read-tx-timeouts")]
|
||||||
|
if let Some(read_transactions) = &read_transactions {
|
||||||
|
read_transactions.remove_active(tx.0);
|
||||||
|
}
|
||||||
|
|
||||||
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
|
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
|
||||||
}
|
}
|
||||||
TxnManagerMessage::Commit { tx, sender } => {
|
TxnManagerMessage::Commit { tx, sender } => {
|
||||||
|
#[cfg(feature = "read-tx-timeouts")]
|
||||||
|
if let Some(read_transactions) = &read_transactions {
|
||||||
|
read_transactions.remove_active(tx.0);
|
||||||
|
}
|
||||||
|
|
||||||
sender
|
sender
|
||||||
.send({
|
.send({
|
||||||
let mut latency = CommitLatency::new();
|
let mut latency = CommitLatency::new();
|
||||||
@ -100,10 +125,7 @@ impl TxnManager {
|
|||||||
|
|
||||||
#[cfg(feature = "read-tx-timeouts")]
|
#[cfg(feature = "read-tx-timeouts")]
|
||||||
mod read_transactions {
|
mod read_transactions {
|
||||||
use crate::{
|
use crate::{environment::EnvPtr, error::mdbx_result, txn_manager::TxnManager, Error};
|
||||||
environment::EnvPtr, error::mdbx_result, transaction::TransactionPtr,
|
|
||||||
txn_manager::TxnManager,
|
|
||||||
};
|
|
||||||
use dashmap::{DashMap, DashSet};
|
use dashmap::{DashMap, DashSet};
|
||||||
use std::{
|
use std::{
|
||||||
sync::{mpsc::sync_channel, Arc},
|
sync::{mpsc::sync_channel, Arc},
|
||||||
@ -133,13 +155,9 @@ mod read_transactions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Adds a new transaction to the list of active read transactions.
|
/// Adds a new transaction to the list of active read transactions.
|
||||||
pub(crate) fn add_active_read_transaction(
|
pub(crate) fn add_active_read_transaction(&self, ptr: *mut ffi::MDBX_txn) {
|
||||||
&self,
|
|
||||||
ptr: *mut ffi::MDBX_txn,
|
|
||||||
tx: TransactionPtr,
|
|
||||||
) {
|
|
||||||
if let Some(read_transactions) = &self.read_transactions {
|
if let Some(read_transactions) = &self.read_transactions {
|
||||||
read_transactions.add_active(ptr, tx);
|
read_transactions.add_active(ptr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -147,15 +165,16 @@ mod read_transactions {
|
|||||||
pub(crate) fn remove_active_read_transaction(
|
pub(crate) fn remove_active_read_transaction(
|
||||||
&self,
|
&self,
|
||||||
ptr: *mut ffi::MDBX_txn,
|
ptr: *mut ffi::MDBX_txn,
|
||||||
) -> Option<(usize, (TransactionPtr, Instant))> {
|
) -> Option<(usize, Instant)> {
|
||||||
self.read_transactions.as_ref()?.remove_active(ptr)
|
self.read_transactions.as_ref()?.remove_active(ptr)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the number of timed out transactions that were not aborted by the user yet.
|
/// Removes a transaction from the list of aborted read transactions.
|
||||||
pub(crate) fn timed_out_not_aborted_read_transactions(&self) -> Option<usize> {
|
pub(crate) fn remove_aborted_read_transaction(
|
||||||
self.read_transactions
|
&self,
|
||||||
.as_ref()
|
ptr: *mut ffi::MDBX_txn,
|
||||||
.map(|read_transactions| read_transactions.timed_out_not_aborted())
|
) -> Option<usize> {
|
||||||
|
self.read_transactions.as_ref()?.remove_aborted(ptr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -168,10 +187,13 @@ mod read_transactions {
|
|||||||
///
|
///
|
||||||
/// We store `usize` instead of a raw pointer as a key, because pointers are not
|
/// We store `usize` instead of a raw pointer as a key, because pointers are not
|
||||||
/// comparable. The time of transaction opening is stored as a value.
|
/// comparable. The time of transaction opening is stored as a value.
|
||||||
active: DashMap<usize, (TransactionPtr, Instant)>,
|
active: DashMap<usize, Instant>,
|
||||||
/// List of timed out transactions that were not aborted by the user yet, hence have a
|
/// List of read transactions aborted by the [ReadTransactions::start_monitor].
|
||||||
/// dangling read transaction pointer.
|
/// We keep them until user tries to abort the transaction, so we're able to report a nice
|
||||||
timed_out_not_aborted: DashSet<usize>,
|
/// [Error::ReadTransactionAborted] error.
|
||||||
|
///
|
||||||
|
/// We store `usize` instead of a raw pointer, because pointers are not comparable.
|
||||||
|
aborted: DashSet<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ReadTransactions {
|
impl ReadTransactions {
|
||||||
@ -180,70 +202,59 @@ mod read_transactions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Adds a new transaction to the list of active read transactions.
|
/// Adds a new transaction to the list of active read transactions.
|
||||||
pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn, tx: TransactionPtr) {
|
pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn) {
|
||||||
let _ = self.active.insert(ptr as usize, (tx, Instant::now()));
|
let _ = self.active.insert(ptr as usize, Instant::now());
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Removes a transaction from the list of active read transactions.
|
/// Removes a transaction from the list of active read transactions.
|
||||||
pub(super) fn remove_active(
|
pub(super) fn remove_active(&self, ptr: *mut ffi::MDBX_txn) -> Option<(usize, Instant)> {
|
||||||
&self,
|
|
||||||
ptr: *mut ffi::MDBX_txn,
|
|
||||||
) -> Option<(usize, (TransactionPtr, Instant))> {
|
|
||||||
self.timed_out_not_aborted.remove(&(ptr as usize));
|
|
||||||
self.active.remove(&(ptr as usize))
|
self.active.remove(&(ptr as usize))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns the number of timed out transactions that were not aborted by the user yet.
|
/// Adds a new transaction to the list of aborted read transactions.
|
||||||
pub(super) fn timed_out_not_aborted(&self) -> usize {
|
pub(super) fn add_aborted(&self, ptr: *mut ffi::MDBX_txn) {
|
||||||
self.timed_out_not_aborted.len()
|
self.aborted.insert(ptr as usize);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes a transaction from the list of aborted read transactions.
|
||||||
|
pub(super) fn remove_aborted(&self, ptr: *mut ffi::MDBX_txn) -> Option<usize> {
|
||||||
|
self.aborted.remove(&(ptr as usize))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Spawns a new thread with [std::thread::spawn] that monitors the list of active read
|
/// Spawns a new thread with [std::thread::spawn] that monitors the list of active read
|
||||||
/// transactions and timeouts those that are open for longer than
|
/// transactions and aborts those that are open for longer than
|
||||||
/// `ReadTransactions.max_duration`.
|
/// `ReadTransactions.max_duration`.
|
||||||
|
///
|
||||||
|
/// Aborted transaction pointers are placed into the list of aborted read transactions, and
|
||||||
|
/// removed from this list by [crate::error::mdbx_result_with_tx_kind] when the user tries
|
||||||
|
/// to use it.
|
||||||
pub(super) fn start_monitor(self: Arc<Self>) {
|
pub(super) fn start_monitor(self: Arc<Self>) {
|
||||||
std::thread::spawn(move || {
|
std::thread::spawn(move || {
|
||||||
let mut timed_out_active = Vec::new();
|
let mut aborted_active = Vec::new();
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
let mut max_active_transaction_duration = None;
|
let mut max_active_transaction_duration = None;
|
||||||
|
|
||||||
// Iterate through active read transactions and time out those that's open for
|
// Iterate through active read transactions and abort those that's open for
|
||||||
// longer than `self.max_duration`.
|
// longer than `self.max_duration`.
|
||||||
for entry in self.active.iter() {
|
for entry in self.active.iter() {
|
||||||
let (tx, start) = entry.value();
|
let (ptr, start) = entry.pair();
|
||||||
let duration = now - *start;
|
let duration = now - *start;
|
||||||
|
|
||||||
if duration > self.max_duration {
|
if duration > self.max_duration {
|
||||||
let result = tx.txn_execute_fail_on_timeout(|txn_ptr| {
|
let ptr = *ptr as *mut ffi::MDBX_txn;
|
||||||
(
|
|
||||||
txn_ptr,
|
|
||||||
duration,
|
|
||||||
// Time out the transaction.
|
|
||||||
//
|
|
||||||
// We use `mdbx_txn_reset` instead of `mdbx_txn_abort` here to
|
|
||||||
// prevent MDBX from reusing the pointer of the aborted
|
|
||||||
// transaction for new read-only transactions. This is
|
|
||||||
// important because we store the pointer in the `active` list
|
|
||||||
// and assume that it is unique.
|
|
||||||
//
|
|
||||||
// See https://erthink.github.io/libmdbx/group__c__transactions.html#gae9f34737fe60b0ba538d5a09b6a25c8d for more info.
|
|
||||||
mdbx_result(unsafe { ffi::mdbx_txn_reset(txn_ptr) }),
|
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
match result {
|
// Add the transaction to the list of aborted transactions, so further
|
||||||
Ok((txn_ptr, duration, error)) => {
|
// usages report the correct error when the transaction is closed.
|
||||||
// Add the transaction to `timed_out_active`. We can't remove it
|
self.add_aborted(ptr);
|
||||||
// instantly from the list of active transactions, because we
|
|
||||||
// iterate through it.
|
// Abort the transaction
|
||||||
timed_out_active.push((txn_ptr, duration, error));
|
let result = mdbx_result(unsafe { ffi::mdbx_txn_abort(ptr) });
|
||||||
}
|
|
||||||
Err(err) => {
|
// Add the transaction to `aborted_active`. We can't remove it instantly
|
||||||
error!(target: "libmdbx", %err, "Failed to abort the long-lived read transaction")
|
// from the list of active transactions, because we iterate through it.
|
||||||
}
|
aborted_active.push((ptr, duration, result.err()));
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
max_active_transaction_duration = Some(
|
max_active_transaction_duration = Some(
|
||||||
duration.max(max_active_transaction_duration.unwrap_or_default()),
|
duration.max(max_active_transaction_duration.unwrap_or_default()),
|
||||||
@ -251,38 +262,40 @@ mod read_transactions {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Walk through timed out transactions, and delete them from the list of active
|
// Walk through aborted transactions, and delete them from the list of active
|
||||||
// transactions.
|
// transactions.
|
||||||
for (ptr, open_duration, err) in timed_out_active.iter().copied() {
|
for (ptr, open_duration, err) in aborted_active.iter().copied() {
|
||||||
// Try deleting the transaction from the list of active transactions.
|
// Try deleting the transaction from the list of active transactions.
|
||||||
let was_in_active = self.remove_active(ptr).is_some();
|
let was_in_active = self.remove_active(ptr).is_some();
|
||||||
if let Err(err) = err {
|
if let Some(err) = err {
|
||||||
if was_in_active {
|
// If there was an error when aborting the transaction, we need to
|
||||||
// If the transaction was in the list of active transactions,
|
// remove it from the list of aborted transactions, because otherwise it
|
||||||
// then user didn't abort it and we failed to do so.
|
// will stay there forever.
|
||||||
error!(target: "libmdbx", %err, ?open_duration, "Failed to time out the long-lived read transaction");
|
self.remove_aborted(ptr);
|
||||||
|
if was_in_active && err != Error::BadSignature {
|
||||||
|
// If the transaction was in the list of active transactions and the
|
||||||
|
// error code is not `EBADSIGN`, then user didn't abort it.
|
||||||
|
error!(target: "libmdbx", %err, ?open_duration, "Failed to abort the long-lived read transactions");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Happy path, the transaction has been timed out by us with no errors.
|
// Happy path, the transaction has been aborted by us with no errors.
|
||||||
warn!(target: "libmdbx", ?open_duration, "Long-lived read transaction has been timed out");
|
warn!(target: "libmdbx", ?open_duration, "Long-lived read transactions has been aborted");
|
||||||
// Add transaction to the list of timed out transactions that were not
|
|
||||||
// aborted by the user yet.
|
|
||||||
self.timed_out_not_aborted.insert(ptr as usize);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear the list of timed out transactions, but not de-allocate the reserved
|
// Clear the list of aborted transactions, but not de-allocate the reserved
|
||||||
// capacity to save on further pushes.
|
// capacity to save on further pushes.
|
||||||
timed_out_active.clear();
|
aborted_active.clear();
|
||||||
|
|
||||||
if !self.active.is_empty() {
|
if !self.active.is_empty() || !self.aborted.is_empty() {
|
||||||
trace!(
|
trace!(
|
||||||
target: "libmdbx",
|
target: "libmdbx",
|
||||||
elapsed = ?now.elapsed(),
|
elapsed = ?now.elapsed(),
|
||||||
active = ?self.active.iter().map(|entry| {
|
active = ?self.active.iter().map(|entry| {
|
||||||
let (tx, start) = entry.value();
|
let (ptr, start) = entry.pair();
|
||||||
(tx.clone(), start.elapsed())
|
(*ptr, start.elapsed())
|
||||||
}).collect::<Vec<_>>(),
|
}).collect::<Vec<_>>(),
|
||||||
|
aborted = ?self.aborted.iter().map(|entry| *entry).collect::<Vec<_>>(),
|
||||||
"Read transactions"
|
"Read transactions"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@ -302,10 +315,12 @@ mod read_transactions {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::{
|
use crate::{
|
||||||
txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
|
txn_manager::{
|
||||||
MaxReadTransactionDuration,
|
read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, TxnManagerMessage, TxnPtr,
|
||||||
|
},
|
||||||
|
Environment, Error, MaxReadTransactionDuration, TransactionKind, RO,
|
||||||
};
|
};
|
||||||
use std::{thread::sleep, time::Duration};
|
use std::{ptr, sync::mpsc::sync_channel, thread::sleep, time::Duration};
|
||||||
use tempfile::tempdir;
|
use tempfile::tempdir;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
@ -330,6 +345,7 @@ mod read_transactions {
|
|||||||
drop(tx);
|
drop(tx);
|
||||||
|
|
||||||
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
||||||
|
assert!(!read_transactions.aborted.contains(&tx_ptr));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a read-only transaction, successfully use it, close it by committing.
|
// Create a read-only transaction, successfully use it, close it by committing.
|
||||||
@ -342,43 +358,24 @@ mod read_transactions {
|
|||||||
tx.commit().unwrap();
|
tx.commit().unwrap();
|
||||||
|
|
||||||
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
||||||
|
assert!(!read_transactions.aborted.contains(&tx_ptr));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the
|
||||||
|
// manager kills it, use it and observe the `Error::ReadTransactionAborted` error.
|
||||||
{
|
{
|
||||||
// Create a read-only transaction and observe it's in the list of active
|
|
||||||
// transactions.
|
|
||||||
let tx = env.begin_ro_txn().unwrap();
|
let tx = env.begin_ro_txn().unwrap();
|
||||||
let tx_ptr = tx.txn() as usize;
|
let tx_ptr = tx.txn() as usize;
|
||||||
assert!(read_transactions.active.contains_key(&tx_ptr));
|
assert!(read_transactions.active.contains_key(&tx_ptr));
|
||||||
|
|
||||||
// Wait until the transaction is timed out by the manager.
|
|
||||||
sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
|
sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
|
||||||
|
|
||||||
// Ensure that the transaction is not in the list of active transactions anymore,
|
|
||||||
// and is in the list of timed out but not aborted transactions.
|
|
||||||
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
||||||
assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
|
assert!(read_transactions.aborted.contains(&tx_ptr));
|
||||||
|
|
||||||
// Use the timed out transaction and observe the `Error::ReadTransactionTimeout`
|
assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionAborted));
|
||||||
assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionTimeout));
|
|
||||||
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
||||||
assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
|
assert!(!read_transactions.aborted.contains(&tx_ptr));
|
||||||
|
|
||||||
assert_eq!(tx.id().err(), Some(Error::ReadTransactionTimeout));
|
|
||||||
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
|
||||||
assert!(read_transactions.timed_out_not_aborted.contains(&tx_ptr));
|
|
||||||
|
|
||||||
// Ensure that the transaction pointer is not reused when opening a new read-only
|
|
||||||
// transaction.
|
|
||||||
let new_tx = env.begin_ro_txn().unwrap();
|
|
||||||
let new_tx_ptr = new_tx.txn() as usize;
|
|
||||||
assert!(read_transactions.active.contains_key(&new_tx_ptr));
|
|
||||||
assert_ne!(tx_ptr, new_tx_ptr);
|
|
||||||
|
|
||||||
// Drop the transaction and ensure that it's not in the list of timed out but not
|
|
||||||
// aborted transactions anymore.
|
|
||||||
drop(tx);
|
|
||||||
assert!(!read_transactions.timed_out_not_aborted.contains(&tx_ptr));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -396,5 +393,64 @@ mod read_transactions {
|
|||||||
sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
|
sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
|
||||||
assert!(tx.commit().is_ok())
|
assert!(tx.commit().is_ok())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn txn_manager_begin_read_transaction_via_message_listener() {
|
||||||
|
const MAX_DURATION: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let env = Environment::builder()
|
||||||
|
.set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
|
||||||
|
.open(dir.path())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
|
||||||
|
|
||||||
|
// Create a read-only transaction via the message listener.
|
||||||
|
let (tx, rx) = sync_channel(0);
|
||||||
|
env.txn_manager().send_message(TxnManagerMessage::Begin {
|
||||||
|
parent: TxnPtr(ptr::null_mut()),
|
||||||
|
flags: RO::OPEN_FLAGS,
|
||||||
|
sender: tx,
|
||||||
|
});
|
||||||
|
|
||||||
|
let txn_ptr = rx.recv().unwrap().unwrap();
|
||||||
|
|
||||||
|
assert!(read_transactions.active.contains_key(&(txn_ptr.0 as usize)));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn txn_manager_reassign_transaction_removes_from_aborted_transactions() {
|
||||||
|
const MAX_DURATION: Duration = Duration::from_secs(1);
|
||||||
|
|
||||||
|
let dir = tempdir().unwrap();
|
||||||
|
let env = Environment::builder()
|
||||||
|
.set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
|
||||||
|
.open(dir.path())
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
|
||||||
|
|
||||||
|
// Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the
|
||||||
|
// manager kills it, use it and observe the `Error::ReadTransactionAborted` error.
|
||||||
|
{
|
||||||
|
let tx = env.begin_ro_txn().unwrap();
|
||||||
|
let tx_ptr = tx.txn() as usize;
|
||||||
|
assert!(read_transactions.active.contains_key(&tx_ptr));
|
||||||
|
|
||||||
|
sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
|
||||||
|
|
||||||
|
assert!(!read_transactions.active.contains_key(&tx_ptr));
|
||||||
|
assert!(read_transactions.aborted.contains(&tx_ptr));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a read-only transaction, ensure this removes it from aborted set if mdbx
|
||||||
|
// reassigns same recently aborted transaction pointer.
|
||||||
|
{
|
||||||
|
let tx = env.begin_ro_txn().unwrap();
|
||||||
|
let tx_ptr = tx.txn() as usize;
|
||||||
|
assert!(!read_transactions.aborted.contains(&tx_ptr));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user