chore(deps): remove libffi dependency (#8736)

This commit is contained in:
DaniPopes
2024-06-11 08:44:29 +02:00
committed by GitHub
parent bd1f5ba8eb
commit d772344d37
9 changed files with 111 additions and 156 deletions

20
Cargo.lock generated
View File

@ -4409,25 +4409,6 @@ version = "0.2.155"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c"
[[package]]
name = "libffi"
version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce826c243048e3d5cec441799724de52e2d42f820468431fc3fceee2341871e2"
dependencies = [
"libc",
"libffi-sys",
]
[[package]]
name = "libffi-sys"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f36115160c57e8529781b4183c2bb51fdc1f6d6d1ed345591d84be7703befb3c"
dependencies = [
"cc",
]
[[package]]
name = "libloading"
version = "0.8.3"
@ -7099,7 +7080,6 @@ dependencies = [
"derive_more",
"indexmap 2.2.6",
"libc",
"libffi",
"parking_lot 0.12.3",
"pprof",
"rand 0.8.5",

View File

@ -17,8 +17,8 @@ use reth_db_api::{
transaction::{DbTx, DbTxMut},
};
use reth_libmdbx::{
DatabaseFlags, Environment, EnvironmentFlags, Geometry, MaxReadTransactionDuration, Mode,
PageSize, SyncMode, RO, RW,
ffi, DatabaseFlags, Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode,
MaxReadTransactionDuration, Mode, PageSize, SyncMode, RO, RW,
};
use reth_storage_errors::db::LogLevel;
use reth_tracing::tracing::error;
@ -289,8 +289,7 @@ impl DatabaseEnv {
shrink_threshold: Some(0),
page_size: Some(PageSize::Set(default_page_size())),
});
#[cfg(not(windows))]
{
fn is_current_process(id: u32) -> bool {
#[cfg(unix)]
{
@ -302,29 +301,40 @@ impl DatabaseEnv {
id == std::process::id()
}
}
inner_env.set_handle_slow_readers(
|process_id: u32, thread_id: u32, read_txn_id: u64, gap: usize, space: usize, retry: isize| {
extern "C" fn handle_slow_readers(
_env: *const ffi::MDBX_env,
_txn: *const ffi::MDBX_txn,
process_id: ffi::mdbx_pid_t,
thread_id: ffi::mdbx_tid_t,
read_txn_id: u64,
gap: std::ffi::c_uint,
space: usize,
retry: std::ffi::c_int,
) -> HandleSlowReadersReturnCode {
if space > MAX_SAFE_READER_SPACE {
let message = if is_current_process(process_id) {
let message = if is_current_process(process_id as u32) {
"Current process has a long-lived database transaction that grows the database file."
} else {
"External process has a long-lived database transaction that grows the database file. Use shorter-lived read transactions or shut down the node."
"External process has a long-lived database transaction that grows the database file. \
Use shorter-lived read transactions or shut down the node."
};
reth_tracing::tracing::warn!(
target: "storage::db::mdbx",
?process_id,
?thread_id,
?read_txn_id,
?gap,
?space,
?retry,
message
process_id,
thread_id,
read_txn_id,
gap,
space,
retry,
"{message}"
)
}
reth_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader
});
}
inner_env.set_handle_slow_readers(handle_slow_readers);
inner_env.set_flags(EnvironmentFlags {
mode,
// We disable readahead because it improves performance for linear scans, but

View File

@ -10,7 +10,7 @@ use reth_db_api::{
table::{Compress, DupSort, Encode, Table, TableImporter},
transaction::{DbTx, DbTxMut},
};
use reth_libmdbx::{ffi::DBI, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
use reth_libmdbx::{ffi::MDBX_dbi, CommitLatency, Transaction, TransactionKind, WriteFlags, RW};
use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation};
use reth_tracing::tracing::{debug, trace, warn};
use std::{
@ -75,7 +75,7 @@ impl<K: TransactionKind> Tx<K> {
}
/// Gets a table database handle if it exists, otherwise creates it.
pub fn get_dbi<T: Table>(&self) -> Result<DBI, DatabaseError> {
pub fn get_dbi<T: Table>(&self) -> Result<MDBX_dbi, DatabaseError> {
self.inner
.open_db(Some(T::NAME))
.map(|db| db.dbi())

View File

@ -25,10 +25,7 @@ thiserror.workspace = true
dashmap = { workspace = true, features = ["inline"], optional = true }
tracing.workspace = true
ffi = { package = "reth-mdbx-sys", path = "./mdbx-sys" }
[target.'cfg(not(windows))'.dependencies]
libffi = "3.2.0"
reth-mdbx-sys.workspace = true
[features]
default = []
@ -36,7 +33,11 @@ return-borrowed = []
read-tx-timeouts = ["dashmap", "dashmap/inline"]
[dev-dependencies]
pprof = { workspace = true, features = ["flamegraph", "frame-pointer", "criterion"] }
pprof = { workspace = true, features = [
"flamegraph",
"frame-pointer",
"criterion",
] }
criterion.workspace = true
rand.workspace = true
rand_xorshift = "0.3"

View File

@ -1,10 +1,9 @@
#![allow(missing_docs)]
mod utils;
use ::ffi::*;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use pprof::criterion::{Output, PProfProfiler};
use reth_libmdbx::*;
use reth_libmdbx::{ffi::*, *};
use std::ptr;
use utils::*;

View File

@ -2,11 +2,10 @@
mod utils;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use ffi::*;
use libc::size_t;
use rand::{prelude::SliceRandom, SeedableRng};
use rand_xorshift::XorShiftRng;
use reth_libmdbx::{ObjectLength, WriteFlags};
use reth_libmdbx::{ffi::*, ObjectLength, WriteFlags};
use std::ptr;
use utils::*;

View File

@ -10,8 +10,7 @@ use byteorder::{ByteOrder, NativeEndian};
use mem::size_of;
use std::{
ffi::CString,
fmt,
fmt::Debug,
fmt::{self, Debug},
mem,
ops::{Bound, RangeBounds},
path::Path,
@ -51,7 +50,6 @@ impl Environment {
geometry: None,
log_level: None,
kind: Default::default(),
#[cfg(not(windows))]
handle_slow_readers: None,
#[cfg(feature = "read-tx-timeouts")]
max_read_transaction_duration: None,
@ -514,6 +512,7 @@ impl<R> Default for Geometry<R> {
/// implement timeout reset logic while waiting for a readers.
///
/// # Returns
///
/// A return code that determines the further actions for MDBX and must match the action which
/// was executed by the callback:
/// * `-2` or less An error condition and the reader was not killed.
@ -528,46 +527,37 @@ impl<R> Default for Geometry<R> {
/// called later.
/// * `2` or greater The reader process was terminated or killed, and MDBX should entirely reset
/// reader registration.
pub type HandleSlowReadersCallback = fn(
process_id: u32,
thread_id: u32,
read_txn_id: u64,
gap: usize,
pub type HandleSlowReadersCallback = extern "C" fn(
env: *const ffi::MDBX_env,
txn: *const ffi::MDBX_txn,
pid: ffi::mdbx_pid_t,
tid: ffi::mdbx_tid_t,
laggard: u64,
gap: std::ffi::c_uint,
space: usize,
retry: isize,
retry: std::ffi::c_int,
) -> HandleSlowReadersReturnCode;
#[derive(Debug)]
#[repr(i32)]
pub enum HandleSlowReadersReturnCode {
/// An error condition and the reader was not killed.
Error,
Error = -2,
/// The callback was unable to solve the problem and agreed on `MDBX_MAP_FULL` error;
/// MDBX should increase the database size or return `MDBX_MAP_FULL` error.
ProceedWithoutKillingReader,
ProceedWithoutKillingReader = -1,
/// The callback solved the problem or just waited for a while, libmdbx should rescan the
/// reader lock table and retry. This also includes a situation when corresponding transaction
/// terminated in normal way by `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted.
/// I.e. reader slot isn't needed to be cleaned from transaction.
Success,
Success = 0,
/// Transaction aborted asynchronous and reader slot should be cleared immediately, i.e. read
/// transaction will not continue but `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be called
/// later.
ClearReaderSlot,
ClearReaderSlot = 1,
/// The reader process was terminated or killed, and MDBX should entirely reset reader
/// registration.
ReaderProcessTerminated,
}
impl From<HandleSlowReadersReturnCode> for i32 {
fn from(value: HandleSlowReadersReturnCode) -> Self {
match value {
HandleSlowReadersReturnCode::Error => -2,
HandleSlowReadersReturnCode::ProceedWithoutKillingReader => -1,
HandleSlowReadersReturnCode::Success => 0,
HandleSlowReadersReturnCode::ClearReaderSlot => 1,
HandleSlowReadersReturnCode::ReaderProcessTerminated => 2,
}
}
ReaderProcessTerminated = 2,
}
/// Options for opening or creating an environment.
@ -585,7 +575,6 @@ pub struct EnvironmentBuilder {
geometry: Option<Geometry<(Option<usize>, Option<usize>)>>,
log_level: Option<ffi::MDBX_log_level_t>,
kind: EnvironmentKind,
#[cfg(not(windows))]
handle_slow_readers: Option<HandleSlowReadersCallback>,
#[cfg(feature = "read-tx-timeouts")]
/// The maximum duration of a read transaction. If [None], but the `read-tx-timeout` feature is
@ -671,11 +660,10 @@ impl EnvironmentBuilder {
))?;
}
#[cfg(not(windows))]
if let Some(handle_slow_readers) = self.handle_slow_readers {
mdbx_result(ffi::mdbx_env_set_hsr(
env,
handle_slow_readers_callback(handle_slow_readers),
convert_hsr_fn(Some(handle_slow_readers)),
))?;
}
@ -834,7 +822,6 @@ impl EnvironmentBuilder {
/// Set the Handle-Slow-Readers callback. See [`HandleSlowReadersCallback`] for more
/// information.
#[cfg(not(windows))]
pub fn set_handle_slow_readers(&mut self, hsr: HandleSlowReadersCallback) -> &mut Self {
self.handle_slow_readers = Some(hsr);
self
@ -878,42 +865,10 @@ pub(crate) mod read_transactions {
}
}
/// Creates an instance of `MDBX_hsr_func`.
///
/// Caution: this leaks the memory for callbacks, so they're alive throughout the program. It's
/// fine, because we also expect the database environment to be alive during this whole time.
#[cfg(not(windows))]
unsafe fn handle_slow_readers_callback(callback: HandleSlowReadersCallback) -> ffi::MDBX_hsr_func {
// Move the callback function to heap and intentionally leak it, so it's not dropped and the
// MDBX env can use it throughout the whole program.
let callback = Box::leak(Box::new(callback));
// Wrap the callback into an ffi binding. The callback is needed for a nicer UX with Rust types,
// and without `env` and `txn` arguments that we don't want to expose to the user. Again,
// move the closure to heap and leak.
let hsr = Box::leak(Box::new(
|_env: *const ffi::MDBX_env,
_txn: *const ffi::MDBX_txn,
pid: ffi::mdbx_pid_t,
tid: ffi::mdbx_tid_t,
laggard: u64,
gap: ::libc::c_uint,
space: usize,
retry: ::libc::c_int|
-> i32 {
callback(pid as u32, tid as u32, laggard, gap as usize, space, retry as isize).into()
},
));
// Create a pointer to the C function from the Rust closure, and forcefully forget the original
// closure.
let closure = libffi::high::Closure8::new(hsr);
let closure_ptr = *closure.code_ptr();
std::mem::forget(closure);
// Cast the closure to FFI `extern fn` type.
/// Converts a [`HandleSlowReadersCallback`] to the actual FFI function pointer.
#[allow(clippy::missing_transmute_annotations)]
Some(std::mem::transmute(closure_ptr))
fn convert_hsr_fn(callback: Option<HandleSlowReadersCallback>) -> ffi::MDBX_hsr_func {
unsafe { std::mem::transmute(callback) }
}
#[cfg(test)]
@ -924,11 +879,24 @@ mod tests {
sync::atomic::{AtomicBool, Ordering},
};
#[cfg(not(windows))]
#[test]
fn test_handle_slow_readers_callback() {
static CALLED: AtomicBool = AtomicBool::new(false);
extern "C" fn handle_slow_readers(
_env: *const ffi::MDBX_env,
_txn: *const ffi::MDBX_txn,
_pid: ffi::mdbx_pid_t,
_tid: ffi::mdbx_tid_t,
_laggard: u64,
_gap: std::ffi::c_uint,
_space: usize,
_retry: std::ffi::c_int,
) -> HandleSlowReadersReturnCode {
CALLED.store(true, Ordering::Relaxed);
HandleSlowReadersReturnCode::ProceedWithoutKillingReader
}
let tempdir = tempfile::tempdir().unwrap();
let env = Environment::builder()
.set_geometry(Geometry::<RangeInclusive<usize>> {
@ -936,18 +904,7 @@ mod tests {
page_size: Some(PageSize::MinimalAcceptable), // To create as many pages as possible
..Default::default()
})
.set_handle_slow_readers(
|_process_id: u32,
_thread_id: u32,
_read_txn_id: u64,
_gap: usize,
_space: usize,
_retry: isize| {
CALLED.store(true, Ordering::Relaxed);
HandleSlowReadersReturnCode::ProceedWithoutKillingReader
},
)
.set_handle_slow_readers(handle_slow_readers)
.open(tempdir.path())
.unwrap();

View File

@ -8,6 +8,8 @@
#![allow(missing_docs, clippy::needless_pass_by_ref_mut)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
pub extern crate reth_mdbx_sys as ffi;
pub use crate::{
codec::*,
cursor::{Cursor, Iter, IterDup},
@ -20,9 +22,6 @@ pub use crate::{
flags::*,
transaction::{CommitLatency, Transaction, TransactionKind, RO, RW},
};
pub mod ffi {
pub use ffi::{MDBX_dbi as DBI, MDBX_log_level_t as LogLevel};
}
#[cfg(feature = "read-tx-timeouts")]
pub use crate::environment::read_transactions::MaxReadTransactionDuration;

View File

@ -523,27 +523,35 @@ impl Transaction<RW> {
#[derive(Debug, Clone)]
pub(crate) struct TransactionPtr {
txn: *mut ffi::MDBX_txn,
#[cfg(feature = "read-tx-timeouts")]
timed_out: Arc<AtomicBool>,
lock: Arc<Mutex<()>>,
}
impl TransactionPtr {
fn new(txn: *mut ffi::MDBX_txn) -> Self {
Self { txn, timed_out: Arc::new(AtomicBool::new(false)), lock: Arc::new(Mutex::new(())) }
Self {
txn,
#[cfg(feature = "read-tx-timeouts")]
timed_out: Arc::new(AtomicBool::new(false)),
lock: Arc::new(Mutex::new(())),
}
}
// Returns `true` if the transaction is timed out.
//
// When transaction is timed out via `TxnManager`, it's actually reset using
// `mdbx_txn_reset`. It makes the transaction unusable (MDBX fails on any usages of such
// transactions).
//
// Importantly, we can't rely on `MDBX_TXN_FINISHED` flag to check if the transaction is timed
// out using `mdbx_txn_reset`, because MDBX uses it in other cases too.
/// Returns `true` if the transaction is timed out.
///
/// When transaction is timed out via `TxnManager`, it's actually reset using
/// `mdbx_txn_reset`. It makes the transaction unusable (MDBX fails on any usages of such
/// transactions).
///
/// Importantly, we can't rely on `MDBX_TXN_FINISHED` flag to check if the transaction is timed
/// out using `mdbx_txn_reset`, because MDBX uses it in other cases too.
#[cfg(feature = "read-tx-timeouts")]
fn is_timed_out(&self) -> bool {
self.timed_out.load(std::sync::atomic::Ordering::SeqCst)
}
#[cfg(feature = "read-tx-timeouts")]
pub(crate) fn set_timed_out(&self) {
self.timed_out.store(true, std::sync::atomic::Ordering::SeqCst);
}
@ -575,6 +583,7 @@ impl TransactionPtr {
// No race condition with the `TxnManager` timing out the transaction is possible here,
// because we're taking a lock for any actions on the transaction pointer, including a call
// to the `mdbx_txn_reset`.
#[cfg(feature = "read-tx-timeouts")]
if self.is_timed_out() {
return Err(Error::ReadTransactionTimeout)
}
@ -594,6 +603,7 @@ impl TransactionPtr {
let _lck = self.lock();
// To be able to do any operations on the transaction, we need to renew it first.
#[cfg(feature = "read-tx-timeouts")]
if self.is_timed_out() {
mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?;
}