feat(storage, mdbx): transaction manager (#6126)

This commit is contained in:
Alexey Shekhirin
2024-01-23 12:24:56 +00:00
committed by GitHub
parent 9a5120a883
commit a6f8e449f7
36 changed files with 821 additions and 262 deletions

View File

@ -22,6 +22,8 @@ indexmap = "2"
libc = "0.2"
parking_lot.workspace = true
thiserror.workspace = true
dashmap = { version = "5.5.3", features = ["inline"], optional = true }
tracing = { workspace = true, optional = true }
ffi = { package = "reth-mdbx-sys", path = "./mdbx-sys" }
@ -31,6 +33,7 @@ libffi = "3.2.0"
[features]
default = []
return-borrowed = []
read-tx-timeouts = ["dashmap", "dashmap/inline", "tracing"]
[dev-dependencies]
pprof = { workspace = true, features = ["flamegraph", "frame-pointer", "criterion"] }

View File

@ -41,7 +41,7 @@ impl<'tx> TableObject for Cow<'tx, [u8]> {
#[cfg(not(feature = "return-borrowed"))]
{
let is_dirty = (!K::ONLY_CLEAN) &&
let is_dirty = (!K::IS_READ_ONLY) &&
crate::error::mdbx_result(ffi::mdbx_is_dirty(_txn, data_val.iov_base))?;
Ok(if is_dirty { Cow::Owned(s.to_vec()) } else { Cow::Borrowed(s) })

View File

@ -1,5 +1,5 @@
use crate::{
error::{mdbx_result, Error, Result},
error::{mdbx_result, mdbx_result_with_tx_kind, Error, Result},
flags::*,
mdbx_try_optional,
transaction::{TransactionKind, RW},
@ -30,7 +30,11 @@ where
pub(crate) fn new(txn: Transaction<K>, dbi: ffi::MDBX_dbi) -> Result<Self> {
let mut cursor: *mut ffi::MDBX_cursor = ptr::null_mut();
unsafe {
mdbx_result(txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)))?;
mdbx_result_with_tx_kind::<K>(
txn.txn_execute(|txn| ffi::mdbx_cursor_open(txn, dbi, &mut cursor)),
txn.txn(),
txn.env().txn_manager(),
)?;
}
Ok(Self { txn, cursor })
}
@ -43,7 +47,7 @@ where
let s = Self { txn: other.txn.clone(), cursor };
mdbx_result(res)?;
mdbx_result_with_tx_kind::<K>(res, s.txn.txn(), s.txn.env().txn_manager())?;
Ok(s)
}
@ -91,12 +95,11 @@ where
let key_ptr = key_val.iov_base;
let data_ptr = data_val.iov_base;
self.txn.txn_execute(|txn| {
let v = mdbx_result(ffi::mdbx_cursor_get(
self.cursor,
&mut key_val,
&mut data_val,
op,
))?;
let v = mdbx_result_with_tx_kind::<K>(
ffi::mdbx_cursor_get(self.cursor, &mut key_val, &mut data_val, op),
txn,
self.txn.env().txn_manager(),
)?;
assert_ne!(data_ptr, data_val.iov_base);
let key_out = {
// MDBX wrote in new key

View File

@ -1,5 +1,5 @@
use crate::{
error::{mdbx_result, Result},
error::{mdbx_result_with_tx_kind, Result},
transaction::TransactionKind,
Environment, Transaction,
};
@ -30,9 +30,13 @@ impl Database {
let c_name = name.map(|n| CString::new(n).unwrap());
let name_ptr = if let Some(c_name) = &c_name { c_name.as_ptr() } else { ptr::null() };
let mut dbi: ffi::MDBX_dbi = 0;
mdbx_result(
txn.txn_execute(|txn| unsafe { ffi::mdbx_dbi_open(txn, name_ptr, flags, &mut dbi) }),
)?;
txn.txn_execute(|txn_ptr| {
mdbx_result_with_tx_kind::<K>(
unsafe { ffi::mdbx_dbi_open(txn_ptr, name_ptr, flags, &mut dbi) },
txn_ptr,
txn.env().txn_manager(),
)
})?;
Ok(Self::new_from_ptr(dbi, txn.env().clone()))
}

View File

@ -2,8 +2,9 @@ use crate::{
database::Database,
error::{mdbx_result, Error, Result},
flags::EnvironmentFlags,
transaction::{CommitLatency, RO, RW},
Mode, Transaction, TransactionKind,
transaction::{RO, RW},
txn_manager::{TxnManager, TxnManagerMessage, TxnPtr},
Transaction, TransactionKind,
};
use byteorder::{ByteOrder, NativeEndian};
use mem::size_of;
@ -15,14 +16,15 @@ use std::{
ops::{Bound, RangeBounds},
path::Path,
ptr,
sync::{
mpsc::{sync_channel, SyncSender},
Arc,
},
sync::{mpsc::sync_channel, Arc},
thread::sleep,
time::Duration,
};
/// The default maximum duration of a read transaction.
#[cfg(feature = "read-tx-timeouts")]
const DEFAULT_MAX_READ_TRANSACTION_DURATION: Duration = Duration::from_secs(5 * 60);
/// An environment supports multiple databases, all residing in the same shared-memory map.
///
/// Accessing the environment is thread-safe.
@ -50,6 +52,8 @@ impl Environment {
kind: Default::default(),
#[cfg(not(windows))]
handle_slow_readers: None,
#[cfg(feature = "read-tx-timeouts")]
max_read_transaction_duration: None,
}
}
@ -65,32 +69,22 @@ impl Environment {
self.inner.env_kind
}
/// Returns true if the environment was opened in [Mode::ReadWrite] mode.
/// Returns true if the environment was opened in [crate::Mode::ReadWrite] mode.
#[inline]
pub fn is_read_write(&self) -> bool {
self.inner.txn_manager.is_some()
self.inner.env_kind.is_write_map()
}
/// Returns true if the environment was opened in [Mode::ReadOnly] mode.
/// Returns true if the environment was opened in [crate::Mode::ReadOnly] mode.
#[inline]
pub fn is_read_only(&self) -> bool {
self.inner.txn_manager.is_none()
!self.inner.env_kind.is_write_map()
}
/// Returns the manager that handles transaction messages.
///
/// Requires [Mode::ReadWrite] and returns None otherwise.
/// Returns the transaction manager.
#[inline]
pub(crate) fn txn_manager(&self) -> Option<&SyncSender<TxnManagerMessage>> {
self.inner.txn_manager.as_ref()
}
/// Returns the manager that handles transaction messages.
///
/// Requires [Mode::ReadWrite] and returns None otherwise.
#[inline]
pub(crate) fn ensure_txn_manager(&self) -> Result<&SyncSender<TxnManagerMessage>> {
self.txn_manager().ok_or(Error::WriteTransactionUnsupportedInReadOnlyMode)
pub(crate) fn txn_manager(&self) -> &TxnManager {
&self.inner.txn_manager
}
/// Create a read-only transaction for use with the environment.
@ -102,16 +96,13 @@ impl Environment {
/// Create a read-write transaction for use with the environment. This method will block while
/// there are any other read-write transactions open on the environment.
pub fn begin_rw_txn(&self) -> Result<Transaction<RW>> {
let sender = self.ensure_txn_manager()?;
let txn = loop {
let (tx, rx) = sync_channel(0);
sender
.send(TxnManagerMessage::Begin {
parent: TxnPtr(ptr::null_mut()),
flags: RW::OPEN_FLAGS,
sender: tx,
})
.unwrap();
self.txn_manager().send_message(TxnManagerMessage::Begin {
parent: TxnPtr(ptr::null_mut()),
flags: RW::OPEN_FLAGS,
sender: tx,
});
let res = rx.recv().unwrap();
if let Err(Error::Busy) = &res {
sleep(Duration::from_millis(250));
@ -235,10 +226,8 @@ struct EnvironmentInner {
env: *mut ffi::MDBX_env,
/// Whether the environment was opened as WRITEMAP.
env_kind: EnvironmentKind,
/// the sender half of the transaction manager channel
///
/// Only set if the environment was opened in [Mode::ReadWrite] mode.
txn_manager: Option<SyncSender<TxnManagerMessage>>,
/// Transaction manager
txn_manager: TxnManager,
}
impl Drop for EnvironmentInner {
@ -265,12 +254,12 @@ pub enum EnvironmentKind {
Default,
/// Open the environment as mdbx-WRITEMAP.
/// Use a writeable memory map unless the environment is opened as MDBX_RDONLY
/// ([Mode::ReadOnly]).
/// ([crate::Mode::ReadOnly]).
///
/// All data will be mapped into memory in the read-write mode [Mode::ReadWrite]. This offers a
/// significant performance benefit, since the data will be modified directly in mapped
/// memory and then flushed to disk by single system call, without any memory management
/// nor copying.
/// All data will be mapped into memory in the read-write mode [crate::Mode::ReadWrite]. This
/// offers a significant performance benefit, since the data will be modified directly in
/// mapped memory and then flushed to disk by single system call, without any memory
/// management nor copying.
///
/// This mode is incompatible with nested transactions.
WriteMap,
@ -292,22 +281,11 @@ impl EnvironmentKind {
}
}
#[derive(Copy, Clone, Debug)]
pub(crate) struct TxnPtr(pub(crate) *mut ffi::MDBX_txn);
unsafe impl Send for TxnPtr {}
unsafe impl Sync for TxnPtr {}
#[derive(Copy, Clone, Debug)]
pub(crate) struct EnvPtr(pub(crate) *mut ffi::MDBX_env);
unsafe impl Send for EnvPtr {}
unsafe impl Sync for EnvPtr {}
pub(crate) enum TxnManagerMessage {
Begin { parent: TxnPtr, flags: ffi::MDBX_txn_flags_t, sender: SyncSender<Result<TxnPtr>> },
Abort { tx: TxnPtr, sender: SyncSender<Result<bool>> },
Commit { tx: TxnPtr, sender: SyncSender<Result<(bool, CommitLatency)>> },
}
/// Environment statistics.
///
/// Contains information about the size and layout of an MDBX environment or database.
@ -597,6 +575,10 @@ pub struct EnvironmentBuilder {
kind: EnvironmentKind,
#[cfg(not(windows))]
handle_slow_readers: Option<HandleSlowReadersCallback>,
#[cfg(feature = "read-tx-timeouts")]
/// The maximum duration of a read transaction. If [None], but the `read-tx-timeout` feature is
/// enabled, the default value of [DEFAULT_MAX_READ_TRANSACTION_DURATION] is used.
max_read_transaction_duration: Option<read_transactions::MaxReadTransactionDuration>,
}
impl EnvironmentBuilder {
@ -718,54 +700,24 @@ impl EnvironmentBuilder {
}
}
let mut env = EnvironmentInner { env, txn_manager: None, env_kind: self.kind };
#[cfg(not(feature = "read-tx-timeouts"))]
let txn_manager = TxnManager::new(EnvPtr(env));
if let Mode::ReadWrite { .. } = self.flags.mode {
let (tx, rx) = std::sync::mpsc::sync_channel(0);
let e = EnvPtr(env.env);
std::thread::spawn(move || loop {
match rx.recv() {
Ok(msg) => match msg {
TxnManagerMessage::Begin { parent, flags, sender } => {
#[allow(clippy::redundant_locals)]
let e = e;
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
sender
.send(
mdbx_result(unsafe {
ffi::mdbx_txn_begin_ex(
e.0,
parent.0,
flags,
&mut txn,
ptr::null_mut(),
)
})
.map(|_| TxnPtr(txn)),
)
.unwrap()
}
TxnManagerMessage::Abort { tx, sender } => {
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
}
TxnManagerMessage::Commit { tx, sender } => {
sender
.send({
let mut latency = CommitLatency::new();
mdbx_result(unsafe {
ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
})
.map(|v| (v, latency))
})
.unwrap();
}
},
Err(_) => return,
}
});
#[cfg(feature = "read-tx-timeouts")]
let txn_manager = {
let mut txn_manager = TxnManager::new(EnvPtr(env));
if let crate::MaxReadTransactionDuration::Set(duration) = self
.max_read_transaction_duration
.unwrap_or(read_transactions::MaxReadTransactionDuration::Set(
DEFAULT_MAX_READ_TRANSACTION_DURATION,
))
{
txn_manager = txn_manager.with_max_read_transaction_duration(duration);
};
txn_manager
};
env.txn_manager = Some(tx);
}
let env = EnvironmentInner { env, txn_manager, env_kind: self.kind };
Ok(Environment { inner: Arc::new(env) })
}
@ -861,16 +813,53 @@ impl EnvironmentBuilder {
self
}
pub fn set_log_level(&mut self, log_level: ffi::MDBX_log_level_t) -> &mut Self {
self.log_level = Some(log_level);
self
}
/// Set the Handle-Slow-Readers callback. See [HandleSlowReadersCallback] for more information.
#[cfg(not(windows))]
pub fn set_handle_slow_readers(&mut self, hsr: HandleSlowReadersCallback) -> &mut Self {
self.handle_slow_readers = Some(hsr);
self
}
}
pub fn set_log_level(&mut self, log_level: ffi::MDBX_log_level_t) -> &mut Self {
self.log_level = Some(log_level);
self
#[cfg(feature = "read-tx-timeouts")]
pub(crate) mod read_transactions {
use crate::EnvironmentBuilder;
use std::time::Duration;
/// The maximum duration of a read transaction.
#[derive(Debug, Clone, Copy)]
#[cfg(feature = "read-tx-timeouts")]
pub enum MaxReadTransactionDuration {
/// The maximum duration of a read transaction is unbounded.
Unbounded,
/// The maximum duration of a read transaction is set to the given duration.
Set(Duration),
}
#[cfg(feature = "read-tx-timeouts")]
impl MaxReadTransactionDuration {
pub fn as_duration(&self) -> Option<Duration> {
match self {
MaxReadTransactionDuration::Unbounded => None,
MaxReadTransactionDuration::Set(duration) => Some(*duration),
}
}
}
impl EnvironmentBuilder {
/// Set the maximum time a read-only transaction can be open.
pub fn set_max_read_transaction_duration(
&mut self,
max_read_transaction_duration: MaxReadTransactionDuration,
) -> &mut Self {
self.max_read_transaction_duration = Some(max_read_transaction_duration);
self
}
}
}

View File

@ -1,3 +1,4 @@
use crate::{txn_manager::TxnManager, TransactionKind};
use libc::c_int;
use std::result;
@ -5,7 +6,7 @@ use std::result;
pub type Result<T> = result::Result<T, Error>;
/// An MDBX error kind.
#[derive(Debug, thiserror::Error, Clone, PartialEq, Eq)]
#[derive(Debug, thiserror::Error, Clone, Copy, PartialEq, Eq)]
pub enum Error {
/// Key/data pair already exists.
#[error("key/data pair already exists")]
@ -117,6 +118,8 @@ pub enum Error {
/// [Mode::ReadOnly](crate::flags::Mode::ReadOnly), write transactions can't be opened.
#[error("write transactions are not supported in read-only mode")]
WriteTransactionUnsupportedInReadOnlyMode,
#[error("read transaction has been aborted by the transaction manager")]
ReadTransactionAborted,
/// Unknown error code.
#[error("unknown error code")]
Other(i32),
@ -190,7 +193,7 @@ impl Error {
Error::DecodeErrorLenDiff | Error::DecodeError => ffi::MDBX_EINVAL,
Error::Access => ffi::MDBX_EACCESS,
Error::TooLarge => ffi::MDBX_TOO_LARGE,
Error::BadSignature => ffi::MDBX_EBADSIGN,
Error::BadSignature | Error::ReadTransactionAborted => ffi::MDBX_EBADSIGN,
Error::WriteTransactionUnsupportedInReadOnlyMode => ffi::MDBX_EACCESS,
Error::NestedTransactionsUnsupportedWithWriteMap => ffi::MDBX_EACCESS,
Error::Other(err_code) => *err_code,
@ -213,6 +216,33 @@ pub(crate) fn mdbx_result(err_code: c_int) -> Result<bool> {
}
}
#[cfg(feature = "read-tx-timeouts")]
#[inline]
pub(crate) fn mdbx_result_with_tx_kind<K: TransactionKind>(
err_code: c_int,
txn: *mut ffi::MDBX_txn,
txn_manager: &TxnManager,
) -> Result<bool> {
if K::IS_READ_ONLY &&
err_code == ffi::MDBX_EBADSIGN &&
txn_manager.remove_aborted_read_transaction(txn).is_some()
{
return Err(Error::ReadTransactionAborted);
}
mdbx_result(err_code)
}
#[cfg(not(feature = "read-tx-timeouts"))]
#[inline]
pub(crate) fn mdbx_result_with_tx_kind<K: TransactionKind>(
err_code: c_int,
_txn: *mut ffi::MDBX_txn,
_txn_manager: &TxnManager,
) -> Result<bool> {
mdbx_result(err_code)
}
#[macro_export]
macro_rules! mdbx_try_optional {
($expr:expr) => {{

View File

@ -23,6 +23,9 @@ pub mod ffi {
pub use ffi::{MDBX_dbi as DBI, MDBX_log_level_t as LogLevel};
}
#[cfg(feature = "read-tx-timeouts")]
pub use crate::environment::read_transactions::MaxReadTransactionDuration;
mod codec;
mod cursor;
mod database;
@ -30,6 +33,7 @@ mod environment;
mod error;
mod flags;
mod transaction;
mod txn_manager;
#[cfg(test)]
mod test_utils {

View File

@ -1,8 +1,9 @@
use crate::{
database::Database,
environment::{Environment, TxnManagerMessage, TxnPtr},
error::{mdbx_result, Result},
environment::Environment,
error::{mdbx_result, mdbx_result_with_tx_kind, Result},
flags::{DatabaseFlags, WriteFlags},
txn_manager::{TxnManagerMessage, TxnPtr},
Cursor, Error, Stat, TableObject,
};
use ffi::{MDBX_txn_flags_t, MDBX_TXN_RDONLY, MDBX_TXN_READWRITE};
@ -28,12 +29,10 @@ mod private {
}
pub trait TransactionKind: private::Sealed + Send + Sync + Debug + 'static {
#[doc(hidden)]
const ONLY_CLEAN: bool;
#[doc(hidden)]
const OPEN_FLAGS: MDBX_txn_flags_t;
/// Convenience flag for distinguishing between read-only and read-write transactions.
#[doc(hidden)]
const IS_READ_ONLY: bool;
}
@ -47,12 +46,10 @@ pub struct RO;
pub struct RW;
impl TransactionKind for RO {
const ONLY_CLEAN: bool = true;
const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_RDONLY;
const IS_READ_ONLY: bool = true;
}
impl TransactionKind for RW {
const ONLY_CLEAN: bool = false;
const OPEN_FLAGS: MDBX_txn_flags_t = MDBX_TXN_READWRITE;
const IS_READ_ONLY: bool = false;
}
@ -74,18 +71,27 @@ where
pub(crate) fn new(env: Environment) -> Result<Self> {
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
unsafe {
mdbx_result(ffi::mdbx_txn_begin_ex(
env.env_ptr(),
ptr::null_mut(),
K::OPEN_FLAGS,
&mut txn,
ptr::null_mut(),
))?;
mdbx_result_with_tx_kind::<K>(
ffi::mdbx_txn_begin_ex(
env.env_ptr(),
ptr::null_mut(),
K::OPEN_FLAGS,
&mut txn,
ptr::null_mut(),
),
txn,
env.txn_manager(),
)?;
Ok(Self::new_from_ptr(env, txn))
}
}
pub(crate) fn new_from_ptr(env: Environment, txn: *mut ffi::MDBX_txn) -> Self {
#[cfg(feature = "read-tx-timeouts")]
if K::IS_READ_ONLY {
env.txn_manager().add_active_read_transaction(txn)
}
let inner = TransactionInner {
txn: TransactionPtr::new(txn),
primed_dbis: Mutex::new(IndexSet::new()),
@ -93,6 +99,7 @@ where
env,
_marker: Default::default(),
};
Self { inner: Arc::new(inner) }
}
@ -179,22 +186,26 @@ where
pub fn commit_and_rebind_open_dbs(self) -> Result<(bool, CommitLatency, Vec<Database>)> {
let result = {
let result = self.txn_execute(|txn| {
if K::ONLY_CLEAN {
if K::IS_READ_ONLY {
#[cfg(feature = "read-tx-timeouts")]
self.env().txn_manager().remove_active_read_transaction(txn);
let mut latency = CommitLatency::new();
mdbx_result(unsafe {
ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency())
})
mdbx_result_with_tx_kind::<K>(
unsafe { ffi::mdbx_txn_commit_ex(txn, latency.mdb_commit_latency()) },
txn,
self.env().txn_manager(),
)
.map(|v| (v, latency))
} else {
let (sender, rx) = sync_channel(0);
self.env()
.ensure_txn_manager()
.unwrap()
.send(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender })
.unwrap();
.txn_manager()
.send_message(TxnManagerMessage::Commit { tx: TxnPtr(txn), sender });
rx.recv().unwrap()
}
});
self.inner.set_committed();
result
};
@ -231,9 +242,13 @@ where
pub fn db_flags(&self, db: &Database) -> Result<DatabaseFlags> {
let mut flags: c_uint = 0;
unsafe {
mdbx_result(self.txn_execute(|txn| {
ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut())
}))?;
self.txn_execute(|txn| {
mdbx_result_with_tx_kind::<K>(
ffi::mdbx_dbi_flags_ex(txn, db.dbi(), &mut flags, ptr::null_mut()),
txn,
self.env().txn_manager(),
)
})?;
}
// The types are not the same on Windows. Great!
@ -250,9 +265,13 @@ where
pub fn db_stat_with_dbi(&self, dbi: ffi::MDBX_dbi) -> Result<Stat> {
unsafe {
let mut stat = Stat::new();
mdbx_result(self.txn_execute(|txn| {
ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>())
}))?;
self.txn_execute(|txn| {
mdbx_result_with_tx_kind::<K>(
ffi::mdbx_dbi_stat(txn, dbi, stat.mdb_stat(), size_of::<Stat>()),
txn,
self.env().txn_manager(),
)
})?;
Ok(stat)
}
}
@ -330,17 +349,18 @@ where
fn drop(&mut self) {
self.txn_execute(|txn| {
if !self.has_committed() {
if K::ONLY_CLEAN {
if K::IS_READ_ONLY {
#[cfg(feature = "read-tx-timeouts")]
self.env.txn_manager().remove_active_read_transaction(txn);
unsafe {
ffi::mdbx_txn_abort(txn);
}
} else {
let (sender, rx) = sync_channel(0);
self.env
.ensure_txn_manager()
.unwrap()
.send(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender })
.unwrap();
.txn_manager()
.send_message(TxnManagerMessage::Abort { tx: TxnPtr(txn), sender });
rx.recv().unwrap().unwrap();
}
}
@ -489,7 +509,11 @@ impl Transaction<RO> {
/// Caller must close ALL other [Database] and [Cursor] instances pointing to the same dbi
/// BEFORE calling this function.
pub unsafe fn close_db(&self, db: Database) -> Result<()> {
mdbx_result(ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()))?;
mdbx_result_with_tx_kind::<RO>(
ffi::mdbx_dbi_close(self.env().env_ptr(), db.dbi()),
self.txn(),
self.env().txn_manager(),
)?;
Ok(())
}
@ -503,15 +527,11 @@ impl Transaction<RW> {
}
self.txn_execute(|txn| {
let (tx, rx) = sync_channel(0);
self.env()
.ensure_txn_manager()
.unwrap()
.send(TxnManagerMessage::Begin {
parent: TxnPtr(txn),
flags: RW::OPEN_FLAGS,
sender: tx,
})
.unwrap();
self.env().txn_manager().send_message(TxnManagerMessage::Begin {
parent: TxnPtr(txn),
flags: RW::OPEN_FLAGS,
sender: tx,
});
rx.recv().unwrap().map(|ptr| Transaction::new_from_ptr(self.env().clone(), ptr.0))
})

View File

@ -0,0 +1,389 @@
use crate::{
environment::EnvPtr,
error::{mdbx_result, Result},
CommitLatency,
};
use std::{
ptr,
sync::mpsc::{sync_channel, Receiver, SyncSender},
};
#[derive(Copy, Clone, Debug)]
pub(crate) struct TxnPtr(pub(crate) *mut ffi::MDBX_txn);
unsafe impl Send for TxnPtr {}
unsafe impl Sync for TxnPtr {}
pub(crate) enum TxnManagerMessage {
Begin { parent: TxnPtr, flags: ffi::MDBX_txn_flags_t, sender: SyncSender<Result<TxnPtr>> },
Abort { tx: TxnPtr, sender: SyncSender<Result<bool>> },
Commit { tx: TxnPtr, sender: SyncSender<Result<(bool, CommitLatency)>> },
}
/// Manages transactions by doing two things:
/// - Opening, aborting, and committing transactions using [TxnManager::send_message] with the
/// corresponding [TxnManagerMessage]
/// - Aborting long-lived read transactions (if the `read-tx-timeouts` feature is enabled and
/// `TxnManager::with_max_read_transaction_duration` is called)
#[derive(Debug)]
pub(crate) struct TxnManager {
sender: SyncSender<TxnManagerMessage>,
#[cfg(feature = "read-tx-timeouts")]
read_transactions: Option<std::sync::Arc<read_transactions::ReadTransactions>>,
}
impl TxnManager {
pub(crate) fn new(env: EnvPtr) -> Self {
let (tx, rx) = sync_channel(0);
let txn_manager = Self {
sender: tx,
#[cfg(feature = "read-tx-timeouts")]
read_transactions: None,
};
txn_manager.start_message_listener(env, rx);
txn_manager
}
/// Spawns a new thread with [std::thread::spawn] that listens to incoming [TxnManagerMessage]
/// messages, executes an FFI function, and returns the result on the provided channel.
///
/// - [TxnManagerMessage::Begin] opens a new transaction with [ffi::mdbx_txn_begin_ex]
/// - [TxnManagerMessage::Abort] aborts a transaction with [ffi::mdbx_txn_abort]
/// - [TxnManagerMessage::Commit] commits a transaction with [ffi::mdbx_txn_commit_ex]
fn start_message_listener(&self, env: EnvPtr, rx: Receiver<TxnManagerMessage>) {
let read_transactions = self.read_transactions.clone();
std::thread::spawn(move || {
#[allow(clippy::redundant_locals)]
let env = env;
loop {
match rx.recv() {
Ok(msg) => match msg {
TxnManagerMessage::Begin { parent, flags, sender } => {
let mut txn: *mut ffi::MDBX_txn = ptr::null_mut();
sender
.send(
mdbx_result(unsafe {
ffi::mdbx_txn_begin_ex(
env.0,
parent.0,
flags,
&mut txn,
ptr::null_mut(),
)
})
.map(|_| TxnPtr(txn)),
)
.unwrap();
#[cfg(feature = "read-tx-timeouts")]
{
use crate::transaction::TransactionKind;
if flags == crate::transaction::RO::OPEN_FLAGS {
if let Some(read_transactions) = &read_transactions {
read_transactions.add_active(txn);
}
}
}
}
TxnManagerMessage::Abort { tx, sender } => {
#[cfg(feature = "read-tx-timeouts")]
if let Some(read_transactions) = &read_transactions {
read_transactions.remove_active(tx.0);
}
sender.send(mdbx_result(unsafe { ffi::mdbx_txn_abort(tx.0) })).unwrap();
}
TxnManagerMessage::Commit { tx, sender } => {
#[cfg(feature = "read-tx-timeouts")]
if let Some(read_transactions) = &read_transactions {
read_transactions.remove_active(tx.0);
}
sender
.send({
let mut latency = CommitLatency::new();
mdbx_result(unsafe {
ffi::mdbx_txn_commit_ex(tx.0, latency.mdb_commit_latency())
})
.map(|v| (v, latency))
})
.unwrap();
}
},
Err(_) => return,
}
}
});
}
pub(crate) fn send_message(&self, message: TxnManagerMessage) {
self.sender.send(message).unwrap()
}
}
#[cfg(feature = "read-tx-timeouts")]
mod read_transactions {
use crate::{error::mdbx_result, txn_manager::TxnManager, Error};
use dashmap::{DashMap, DashSet};
use std::{
sync::Arc,
time::{Duration, Instant},
};
use tracing::{error, trace, warn};
const READ_TRANSACTIONS_CHECK_INTERVAL: Duration = Duration::from_secs(5);
impl TxnManager {
/// Sets the maximum duration that a read transaction can be open.
pub(crate) fn with_max_read_transaction_duration(
mut self,
duration: Duration,
) -> TxnManager {
let read_transactions = Arc::new(ReadTransactions::new(duration));
read_transactions.clone().start_monitor();
self.read_transactions = Some(read_transactions);
self
}
/// Adds a new transaction to the list of active read transactions.
pub(crate) fn add_active_read_transaction(&self, ptr: *mut ffi::MDBX_txn) {
if let Some(read_transactions) = &self.read_transactions {
read_transactions.add_active(ptr);
}
}
/// Removes a transaction from the list of active read transactions.
pub(crate) fn remove_active_read_transaction(
&self,
ptr: *mut ffi::MDBX_txn,
) -> Option<(usize, Instant)> {
self.read_transactions.as_ref()?.remove_active(ptr)
}
/// Removes a transaction from the list of aborted read transactions.
pub(crate) fn remove_aborted_read_transaction(
&self,
ptr: *mut ffi::MDBX_txn,
) -> Option<usize> {
self.read_transactions.as_ref()?.remove_aborted(ptr)
}
}
#[derive(Debug, Default)]
pub(super) struct ReadTransactions {
/// Maximum duration that a read transaction can be open until the
/// [ReadTransactions::start_monitor] aborts it.
max_duration: Duration,
/// List of currently active read transactions.
///
/// We store `usize` instead of a raw pointer as a key, because pointers are not
/// comparable. The time of transaction opening is stored as a value.
active: DashMap<usize, Instant>,
/// List of read transactions aborted by the [ReadTransactions::start_monitor].
/// We keep them until user tries to abort the transaction, so we're able to report a nice
/// [Error::ReadTransactionAborted] error.
///
/// We store `usize` instead of a raw pointer, because pointers are not comparable.
aborted: DashSet<usize>,
}
impl ReadTransactions {
pub(super) fn new(max_duration: Duration) -> Self {
Self { max_duration, ..Default::default() }
}
/// Adds a new transaction to the list of active read transactions.
pub(super) fn add_active(&self, ptr: *mut ffi::MDBX_txn) {
let _ = self.active.insert(ptr as usize, Instant::now());
}
/// Removes a transaction from the list of active read transactions.
pub(super) fn remove_active(&self, ptr: *mut ffi::MDBX_txn) -> Option<(usize, Instant)> {
self.active.remove(&(ptr as usize))
}
/// Adds a new transaction to the list of aborted read transactions.
pub(super) fn add_aborted(&self, ptr: *mut ffi::MDBX_txn) {
self.aborted.insert(ptr as usize);
}
/// Removes a transaction from the list of aborted read transactions.
pub(super) fn remove_aborted(&self, ptr: *mut ffi::MDBX_txn) -> Option<usize> {
self.aborted.remove(&(ptr as usize))
}
/// Spawns a new thread with [std::thread::spawn] that monitors the list of active read
/// transactions and aborts those that are open for longer than
/// `ReadTransactions.max_duration`.
///
/// Aborted transaction pointers are placed into the list of aborted read transactions, and
/// removed from this list by [crate::error::mdbx_result_with_tx_kind] when the user tries
/// to use it.
pub(super) fn start_monitor(self: Arc<Self>) {
std::thread::spawn(move || {
let mut aborted_active = Vec::new();
loop {
let now = Instant::now();
let mut max_active_transaction_duration = None;
// Iterate through active read transactions and abort those that's open for
// longer than `self.max_duration`.
for entry in self.active.iter() {
let (ptr, start) = entry.pair();
let duration = now - *start;
if duration > self.max_duration {
let ptr = *ptr as *mut ffi::MDBX_txn;
// Add the transaction to the list of aborted transactions, so further
// usages report the correct error when the transaction is closed.
self.add_aborted(ptr);
// Abort the transaction
let result = mdbx_result(unsafe { ffi::mdbx_txn_abort(ptr) });
// Add the transaction to `aborted_active`. We can't remove it instantly
// from the list of active transactions, because we iterate through it.
aborted_active.push((ptr, duration, result.err()));
} else {
max_active_transaction_duration = Some(
duration.max(max_active_transaction_duration.unwrap_or_default()),
);
}
}
// Walk through aborted transactions, and delete them from the list of active
// transactions.
for (ptr, open_duration, err) in aborted_active.iter().copied() {
// Try deleting the transaction from the list of active transactions.
let was_in_active = self.remove_active(ptr).is_some();
if let Some(err) = err {
// If there was an error when aborting the transaction, we need to
// remove it from the list of aborted transactions, because otherwise it
// will stay there forever.
self.remove_aborted(ptr);
if was_in_active && err != Error::BadSignature {
// If the transaction was in the list of active transactions and the
// error code is not `EBADSIGN`, then user didn't abort it.
error!(target: "libmdbx", ?err, ?open_duration, "Failed to abort the long-lived read transactions");
}
} else {
// Happy path, the transaction has been aborted by us with no errors.
warn!(target: "libmdbx", ?open_duration, "Long-lived read transactions has been aborted");
}
}
// Clear the list of aborted transactions, but not de-allocate the reserved
// capacity to save on further pushes.
aborted_active.clear();
if !self.active.is_empty() || !self.aborted.is_empty() {
trace!(
target: "libmdbx",
elapsed = ?now.elapsed(),
active = ?self.active.iter().map(|entry| {
let (ptr, start) = entry.pair();
(*ptr, start.elapsed())
}).collect::<Vec<_>>(),
aborted = ?self.aborted.iter().map(|entry| *entry).collect::<Vec<_>>(),
"Read transactions"
);
}
// Sleep not more than `READ_TRANSACTIONS_CHECK_INTERVAL`, but at least until
// the closest deadline of an active read transaction
let duration_until_closest_deadline =
self.max_duration - max_active_transaction_duration.unwrap_or_default();
std::thread::sleep(
READ_TRANSACTIONS_CHECK_INTERVAL.min(duration_until_closest_deadline),
);
}
});
}
}
#[cfg(test)]
mod tests {
use crate::{
txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
MaxReadTransactionDuration,
};
use std::{thread::sleep, time::Duration};
use tempfile::tempdir;
#[test]
fn txn_manager_read_transactions_duration_set() {
const MAX_DURATION: Duration = Duration::from_secs(1);
let dir = tempdir().unwrap();
let env = Environment::builder()
.set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
.open(dir.path())
.unwrap();
let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
// Create a read-only transaction, successfully use it, close it by dropping.
{
let tx = env.begin_ro_txn().unwrap();
let tx_ptr = tx.txn() as usize;
assert!(read_transactions.active.contains_key(&tx_ptr));
tx.open_db(None).unwrap();
drop(tx);
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(!read_transactions.aborted.contains(&tx_ptr));
}
// Create a read-only transaction, successfully use it, close it by committing.
{
let tx = env.begin_ro_txn().unwrap();
let tx_ptr = tx.txn() as usize;
assert!(read_transactions.active.contains_key(&tx_ptr));
tx.open_db(None).unwrap();
tx.commit().unwrap();
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(!read_transactions.aborted.contains(&tx_ptr));
}
// Create a read-only transaction, wait until `MAX_DURATION` time is elapsed so the
// manager kills it, use it and observe the `Error::ReadTransactionAborted` error.
{
let tx = env.begin_ro_txn().unwrap();
let tx_ptr = tx.txn() as usize;
assert!(read_transactions.active.contains_key(&tx_ptr));
sleep(MAX_DURATION + READ_TRANSACTIONS_CHECK_INTERVAL);
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(read_transactions.aborted.contains(&tx_ptr));
assert_eq!(tx.open_db(None).err(), Some(Error::ReadTransactionAborted));
assert!(!read_transactions.active.contains_key(&tx_ptr));
assert!(!read_transactions.aborted.contains(&tx_ptr));
}
}
#[test]
fn txn_manager_read_transactions_duration_unbounded() {
let dir = tempdir().unwrap();
let env = Environment::builder()
.set_max_read_transaction_duration(MaxReadTransactionDuration::Unbounded)
.open(dir.path())
.unwrap();
assert!(env.txn_manager().read_transactions.is_none());
let tx = env.begin_ro_txn().unwrap();
sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
assert!(tx.commit().is_ok())
}
}
}