Clone read transactions into TxnManager message listener (#6809)

This commit is contained in:
Emilia Hane
2024-02-27 01:59:29 +01:00
committed by GitHub
parent c268c71d44
commit f5ce869c0e
2 changed files with 51 additions and 16 deletions

View File

@ -700,21 +700,23 @@ impl EnvironmentBuilder {
}
}
let env_ptr = EnvPtr(env);
#[cfg(not(feature = "read-tx-timeouts"))]
let txn_manager = TxnManager::new(EnvPtr(env));
let txn_manager = TxnManager::new(env_ptr);
#[cfg(feature = "read-tx-timeouts")]
let txn_manager = {
let mut txn_manager = TxnManager::new(EnvPtr(env));
if let crate::MaxReadTransactionDuration::Set(duration) = self
.max_read_transaction_duration
.unwrap_or(read_transactions::MaxReadTransactionDuration::Set(
DEFAULT_MAX_READ_TRANSACTION_DURATION,
))
{
txn_manager = txn_manager.with_max_read_transaction_duration(duration);
};
txn_manager
TxnManager::new_with_max_read_transaction_duration(env_ptr, duration)
} else {
TxnManager::new(env_ptr)
}
};
let env = EnvironmentInner { env, txn_manager, env_kind: self.kind };

View File

@ -127,10 +127,10 @@ impl TxnManager {
#[cfg(feature = "read-tx-timeouts")]
mod read_transactions {
use crate::{error::mdbx_result, txn_manager::TxnManager, Error};
use crate::{environment::EnvPtr, error::mdbx_result, txn_manager::TxnManager, Error};
use dashmap::{DashMap, DashSet};
use std::{
sync::Arc,
sync::{mpsc::sync_channel, Arc},
time::{Duration, Instant},
};
use tracing::{error, trace, warn};
@ -138,16 +138,22 @@ mod read_transactions {
const READ_TRANSACTIONS_CHECK_INTERVAL: Duration = Duration::from_secs(5);
impl TxnManager {
/// Sets the maximum duration that a read transaction can be open.
pub(crate) fn with_max_read_transaction_duration(
mut self,
/// Returns a new instance for which the maximum duration that a read transaction can be
/// open is set.
pub(crate) fn new_with_max_read_transaction_duration(
env: EnvPtr,
duration: Duration,
) -> TxnManager {
) -> Self {
let read_transactions = Arc::new(ReadTransactions::new(duration));
read_transactions.clone().start_monitor();
self.read_transactions = Some(read_transactions);
self
let (tx, rx) = sync_channel(0);
let txn_manager = Self { sender: tx, read_transactions: Some(read_transactions) };
txn_manager.start_message_listener(env, rx);
txn_manager
}
/// Adds a new transaction to the list of active read transactions.
@ -311,10 +317,12 @@ mod read_transactions {
#[cfg(test)]
mod tests {
use crate::{
txn_manager::read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, Environment, Error,
MaxReadTransactionDuration,
txn_manager::{
read_transactions::READ_TRANSACTIONS_CHECK_INTERVAL, TxnManagerMessage, TxnPtr,
},
Environment, Error, MaxReadTransactionDuration, TransactionKind, RO,
};
use std::{thread::sleep, time::Duration};
use std::{ptr, sync::mpsc::sync_channel, thread::sleep, time::Duration};
use tempfile::tempdir;
#[test]
@ -387,5 +395,30 @@ mod read_transactions {
sleep(READ_TRANSACTIONS_CHECK_INTERVAL);
assert!(tx.commit().is_ok())
}
#[test]
fn txn_manager_begin_read_transaction_via_message_listener() {
const MAX_DURATION: Duration = Duration::from_secs(1);
let dir = tempdir().unwrap();
let env = Environment::builder()
.set_max_read_transaction_duration(MaxReadTransactionDuration::Set(MAX_DURATION))
.open(dir.path())
.unwrap();
let read_transactions = env.txn_manager().read_transactions.as_ref().unwrap();
// Create a read-only transaction via the message listener.
let (tx, rx) = sync_channel(0);
env.txn_manager().send_message(TxnManagerMessage::Begin {
parent: TxnPtr(ptr::null_mut()),
flags: RO::OPEN_FLAGS,
sender: tx,
});
let txn_ptr = rx.recv().unwrap().unwrap();
assert!(read_transactions.active.contains_key(&(txn_ptr.0 as usize)));
}
}
}