feat(storage): handle-slow-readers callback for MDBX (#5941)

This commit is contained in:
Alexey Shekhirin
2024-01-08 17:54:47 +00:00
committed by GitHub
parent 2287392405
commit 408d1c7e9e
7 changed files with 278 additions and 9 deletions

21
Cargo.lock generated
View File

@ -4069,6 +4069,25 @@ version = "0.2.151"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4"
[[package]]
name = "libffi"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce826c243048e3d5cec441799724de52e2d42f820468431fc3fceee2341871e2"
dependencies = [
"libc",
"libffi-sys",
]
[[package]]
name = "libffi-sys"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f36115160c57e8529781b4183c2bb51fdc1f6d6d1ed345591d84be7703befb3c"
dependencies = [
"cc",
]
[[package]]
name = "libloading"
version = "0.8.1"
@ -5876,6 +5895,7 @@ dependencies = [
"itertools 0.12.0",
"metrics 0.21.1",
"modular-bitfield",
"once_cell",
"page_size",
"parity-scale-codec",
"parking_lot 0.12.1",
@ -6123,6 +6143,7 @@ dependencies = [
"derive_more",
"indexmap 2.1.0",
"libc",
"libffi",
"parking_lot 0.12.1",
"pprof",
"rand 0.8.5",

View File

@ -55,6 +55,7 @@ itertools.workspace = true
arbitrary = { workspace = true, features = ["derive"], optional = true }
proptest = { workspace = true, optional = true }
proptest-derive = { workspace = true, optional = true }
once_cell = "1.19.0"
[dev-dependencies]
# reth libs with arbitrary

View File

@ -9,11 +9,13 @@ use crate::{
};
use eyre::Context;
use metrics::{gauge, Label};
use once_cell::sync::Lazy;
use reth_interfaces::db::LogLevel;
use reth_libmdbx::{
DatabaseFlags, Environment, EnvironmentFlags, Geometry, Mode, PageSize, SyncMode, RO, RW,
DatabaseFlags, Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode, Mode,
PageSize, SyncMode, RO, RW,
};
use reth_tracing::tracing::error;
use reth_tracing::tracing::{error, warn};
use std::{ops::Deref, path::Path};
use tx::Tx;
@ -26,6 +28,13 @@ const TERABYTE: usize = GIGABYTE * 1024;
/// MDBX allows up to 32767 readers (`MDBX_READERS_LIMIT`), but we limit it to slightly below that
const DEFAULT_MAX_READERS: u64 = 32_000;
/// Space that a read-only transaction can occupy until the warning is emitted.
/// See [reth_libmdbx::EnvironmentBuilder::set_handle_slow_readers] for more information.
const MAX_SAFE_READER_SPACE: usize = 10 * GIGABYTE;
static PROCESS_ID: Lazy<u32> =
Lazy::new(|| if cfg!(unix) { std::os::unix::process::parent_id() } else { std::process::id() });
/// Environment used when opening a MDBX environment. RO/RW.
#[derive(Debug)]
pub enum DatabaseEnvKind {
@ -168,6 +177,29 @@ impl DatabaseEnv {
shrink_threshold: None,
page_size: Some(PageSize::Set(default_page_size())),
});
let _ = *PROCESS_ID; // Initialize the process ID at the time of environment opening
inner_env.set_handle_slow_readers(
|process_id: u32, thread_id: u32, read_txn_id: u64, gap: usize, space: usize, retry: isize| {
if space > MAX_SAFE_READER_SPACE {
let message = if process_id == *PROCESS_ID {
"Current process has a long-lived database transaction that grows the database file."
} else {
"External process has a long-lived database transaction that grows the database file. Use shorter-lived read transactions or shut down the node."
};
warn!(
target: "storage::db::mdbx",
?process_id,
?thread_id,
?read_txn_id,
?gap,
?space,
?retry,
message
)
}
HandleSlowReadersReturnCode::ProceedWithoutKillingReader
});
inner_env.set_flags(EnvironmentFlags {
mode,
// We disable readahead because it improves performance for linear scans, but
@ -176,7 +208,7 @@ impl DatabaseEnv {
coalesce: true,
..Default::default()
});
// configure more readers
// Configure more readers
inner_env.set_max_readers(DEFAULT_MAX_READERS);
// This parameter sets the maximum size of the "reclaimed list", and the unit of measurement
// is "pages". Reclaimed list is the list of freed pages that's populated during the

View File

@ -103,7 +103,7 @@ impl<K: TransactionKind> Tx<K> {
) -> R {
if let Some(mut metrics_handler) = self.metrics_handler.take() {
metrics_handler.close_recorded = true;
metrics_handler.log_backtrace_on_long_transaction();
metrics_handler.log_backtrace_on_long_read_transaction();
let start = Instant::now();
let (result, commit_latency) = f(self);
@ -135,7 +135,7 @@ impl<K: TransactionKind> Tx<K> {
f: impl FnOnce(&Transaction<K>) -> R,
) -> R {
if let Some(metrics_handler) = &self.metrics_handler {
metrics_handler.log_backtrace_on_long_transaction();
metrics_handler.log_backtrace_on_long_read_transaction();
OperationMetrics::record(T::NAME, operation, value_size, || f(&self.inner))
} else {
f(&self.inner)
@ -153,7 +153,7 @@ struct MetricsHandler<K: TransactionKind> {
/// to do anything on [Drop::drop].
close_recorded: bool,
/// If `true`, the backtrace of transaction has already been recorded and logged.
/// See [MetricsHandler::log_backtrace_on_long_transaction].
/// See [MetricsHandler::log_backtrace_on_long_read_transaction].
backtrace_recorded: AtomicBool,
_marker: PhantomData<K>,
}
@ -183,7 +183,7 @@ impl<K: TransactionKind> MetricsHandler<K> {
///
/// NOTE: Backtrace is recorded using [Backtrace::force_capture], so `RUST_BACKTRACE` env var is
/// not needed.
fn log_backtrace_on_long_transaction(&self) {
fn log_backtrace_on_long_read_transaction(&self) {
if !self.backtrace_recorded.load(Ordering::Relaxed) &&
self.transaction_mode().is_read_only()
{
@ -206,7 +206,7 @@ impl<K: TransactionKind> MetricsHandler<K> {
impl<K: TransactionKind> Drop for MetricsHandler<K> {
fn drop(&mut self) {
if !self.close_recorded {
self.log_backtrace_on_long_transaction();
self.log_backtrace_on_long_read_transaction();
TransactionMetrics::record_close(
self.transaction_mode(),

View File

@ -20,6 +20,7 @@ byteorder = "1"
derive_more = "0.99"
indexmap = "2"
libc = "0.2"
libffi = "3.2.0"
parking_lot.workspace = true
thiserror.workspace = true

View File

@ -6,6 +6,7 @@ use crate::{
Mode, Transaction, TransactionKind,
};
use byteorder::{ByteOrder, NativeEndian};
use ffi::{mdbx_pid_t, mdbx_tid_t, MDBX_env, MDBX_hsr_func, MDBX_txn};
use mem::size_of;
use std::{
ffi::CString,
@ -48,6 +49,7 @@ impl Environment {
geometry: None,
log_level: None,
kind: Default::default(),
handle_slow_readers: None,
}
}
@ -494,6 +496,90 @@ impl<R> Default for Geometry<R> {
}
}
/// Handle-Slow-Readers callback function to resolve database full/overflow issue due to a reader(s)
/// which prevents the old data from being recycled.
///
/// Read transactions prevent reuse of pages freed by newer write transactions, thus the database
/// can grow quickly. This callback will be called when there is not enough space in the database
/// (i.e. before increasing the database size or before `MDBX_MAP_FULL` error) and thus can be
/// used to resolve issues with a "long-lived" read transacttions.
///
/// Depending on the arguments and needs, your implementation may wait,
/// terminate a process or thread that is performing a long read, or perform
/// some other action. In doing so it is important that the returned code always
/// corresponds to the performed action.
///
/// # Arguments
///
/// * `process_id` A proceess id of the reader process.
/// * `thread_id` A thread id of the reader thread.
/// * `read_txn_id` An oldest read transaction number on which stalled.
/// * `gap` A lag from the last committed txn.
/// * `space` A space that actually become available for reuse after this reader finished. The
/// callback function can take this value into account to evaluate the impact that a long-running
/// transaction has.
/// * `retry` A retry number starting from 0. If callback has returned 0 at least once, then at
/// end of current handling loop the callback function will be called additionally with negative
/// `retry` value to notify about the end of loop. The callback function can use this fact to
/// implement timeout reset logic while waiting for a readers.
///
/// # Returns
/// A return code that determines the further actions for MDBX and must match the action which
/// was executed by the callback:
/// * `-2` or less An error condition and the reader was not killed.
/// * `-1` The callback was unable to solve the problem and agreed on `MDBX_MAP_FULL` error; MDBX
/// should increase the database size or return `MDBX_MAP_FULL` error.
/// * `0` The callback solved the problem or just waited for a while, libmdbx should rescan the
/// reader lock table and retry. This also includes a situation when corresponding transaction
/// terminated in normal way by `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted.
/// I.e. reader slot isn't needed to be cleaned from transaction.
/// * `1` Transaction aborted asynchronous and reader slot should be cleared immediately, i.e.
/// read transaction will not continue but `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be
/// called later.
/// * `2` or greater The reader process was terminated or killed, and MDBX should entirely reset
/// reader registration.
pub type HandleSlowReadersCallback = fn(
process_id: u32,
thread_id: u32,
read_txn_id: u64,
gap: usize,
space: usize,
retry: isize,
) -> HandleSlowReadersReturnCode;
#[derive(Debug)]
pub enum HandleSlowReadersReturnCode {
/// An error condition and the reader was not killed.
Error,
/// The callback was unable to solve the problem and agreed on `MDBX_MAP_FULL` error;
/// MDBX should increase the database size or return `MDBX_MAP_FULL` error.
ProceedWithoutKillingReader,
/// The callback solved the problem or just waited for a while, libmdbx should rescan the
/// reader lock table and retry. This also includes a situation when corresponding transaction
/// terminated in normal way by `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted.
/// I.e. reader slot isn't needed to be cleaned from transaction.
Success,
/// Transaction aborted asynchronous and reader slot should be cleared immediately, i.e. read
/// transaction will not continue but `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be called
/// later.
ClearReaderSlot,
/// The reader process was terminated or killed, and MDBX should entirely reset reader
/// registration.
ReaderProcessTerminated,
}
impl From<HandleSlowReadersReturnCode> for i32 {
fn from(value: HandleSlowReadersReturnCode) -> Self {
match value {
HandleSlowReadersReturnCode::Error => -2,
HandleSlowReadersReturnCode::ProceedWithoutKillingReader => -1,
HandleSlowReadersReturnCode::Success => 0,
HandleSlowReadersReturnCode::ClearReaderSlot => 1,
HandleSlowReadersReturnCode::ReaderProcessTerminated => 2,
}
}
}
/// Options for opening or creating an environment.
#[derive(Debug, Clone)]
pub struct EnvironmentBuilder {
@ -509,6 +595,7 @@ pub struct EnvironmentBuilder {
geometry: Option<Geometry<(Option<usize>, Option<usize>)>>,
log_level: Option<ffi::MDBX_log_level_t>,
kind: EnvironmentKind,
handle_slow_readers: Option<HandleSlowReadersCallback>,
}
impl EnvironmentBuilder {
@ -589,6 +676,13 @@ impl EnvironmentBuilder {
))?;
}
if let Some(handle_slow_readers) = self.handle_slow_readers {
mdbx_result(ffi::mdbx_env_set_hsr(
env,
handle_slow_readers_callback(handle_slow_readers),
))?;
}
#[cfg(unix)]
fn path_to_bytes<P: AsRef<Path>>(path: P) -> Vec<u8> {
use std::os::unix::ffi::OsStrExt;
@ -765,8 +859,127 @@ impl EnvironmentBuilder {
self
}
/// Set the Handle-Slow-Readers callback. See [HandleSlowReadersCallback] for more information.
pub fn set_handle_slow_readers(&mut self, hsr: HandleSlowReadersCallback) -> &mut Self {
self.handle_slow_readers = Some(hsr);
self
}
pub fn set_log_level(&mut self, log_level: ffi::MDBX_log_level_t) -> &mut Self {
self.log_level = Some(log_level);
self
}
}
/// Creates an instance of `MDBX_hsr_func`.
///
/// Caution: this leaks the memory for callbacks, so they're alive throughout the program. It's
/// fine, because we also expect the database environment to be alive during this whole time.
unsafe fn handle_slow_readers_callback(callback: HandleSlowReadersCallback) -> MDBX_hsr_func {
// Move the callback function to heap and intentionally leak it, so it's not dropped and the
// MDBX env can use it throughout the whole program.
let callback = Box::leak(Box::new(callback));
// Wrap the callback into an ffi binding. The callback is needed for a nicer UX with Rust types,
// and without `env` and `txn` arguments that we don't want to expose to the user. Again,
// move the closure to heap and leak.
let hsr = Box::leak(Box::new(
|_env: *const MDBX_env,
_txn: *const MDBX_txn,
pid: mdbx_pid_t,
tid: mdbx_tid_t,
laggard: u64,
gap: ::libc::c_uint,
space: usize,
retry: ::libc::c_int|
-> i32 {
callback(pid as u32, tid as u32, laggard, gap as usize, space, retry as isize).into()
},
));
// Create a pointer to the C function from the Rust closure, and forcefully forget the original
// closure.
let closure = libffi::high::Closure8::new(hsr);
let closure_ptr = *closure.code_ptr();
std::mem::forget(closure);
// Cast the closure to FFI `extern fn` type.
Some(std::mem::transmute(closure_ptr))
}
#[cfg(test)]
mod tests {
use crate::{Environment, Error, Geometry, HandleSlowReadersReturnCode, PageSize, WriteFlags};
use std::{
ops::RangeInclusive,
sync::atomic::{AtomicBool, Ordering},
};
#[test]
fn test_handle_slow_readers_callback() {
static CALLED: AtomicBool = AtomicBool::new(false);
let tempdir = tempfile::tempdir().unwrap();
let env = Environment::builder()
.set_geometry(Geometry::<RangeInclusive<usize>> {
size: Some(0..=1024 * 1024), // Max 1MB, so we can hit the limit
page_size: Some(PageSize::MinimalAcceptable), // To create as many pages as possible
..Default::default()
})
.set_handle_slow_readers(
|_process_id: u32,
_thread_id: u32,
_read_txn_id: u64,
_gap: usize,
_space: usize,
_retry: isize| {
CALLED.store(true, Ordering::Relaxed);
HandleSlowReadersReturnCode::ProceedWithoutKillingReader
},
)
.open(tempdir.path())
.unwrap();
// Insert some data in the database, so the read transaction can lock on the snapshot of it
{
let tx = env.begin_rw_txn().unwrap();
let db = tx.open_db(None).unwrap();
for i in 0usize..1_000 {
tx.put(db.dbi(), i.to_le_bytes(), b"0", WriteFlags::empty()).unwrap()
}
tx.commit().unwrap();
}
// Create a read transaction
let _tx_ro = env.begin_ro_txn().unwrap();
// Change previously inserted data, so the read transaction would use the previous snapshot
{
let tx = env.begin_rw_txn().unwrap();
let db = tx.open_db(None).unwrap();
for i in 0usize..1_000 {
tx.put(db.dbi(), i.to_le_bytes(), b"1", WriteFlags::empty()).unwrap();
}
tx.commit().unwrap();
}
// Insert more data in the database, so we hit the DB size limit error, and MDBX tries to
// kick long-lived readers and delete their snapshots
{
let tx = env.begin_rw_txn().unwrap();
let db = tx.open_db(None).unwrap();
for i in 1_000usize..1_000_000 {
match tx.put(db.dbi(), i.to_le_bytes(), b"0", WriteFlags::empty()) {
Ok(_) => continue,
Err(Error::MapFull) => break,
result @ Err(_) => result.unwrap(),
}
}
tx.commit().unwrap();
}
// Expect the HSR to be called
assert!(CALLED.load(Ordering::Relaxed));
}
}

View File

@ -12,7 +12,8 @@ pub use crate::{
cursor::{Cursor, Iter, IterDup},
database::Database,
environment::{
Environment, EnvironmentBuilder, EnvironmentKind, Geometry, Info, PageSize, Stat,
Environment, EnvironmentBuilder, EnvironmentKind, Geometry, HandleSlowReadersCallback,
HandleSlowReadersReturnCode, Info, PageSize, Stat,
},
error::{Error, Result},
flags::*,