From d772344d3725f4325db97f9082a4a21829a78989 Mon Sep 17 00:00:00 2001 From: DaniPopes <57450786+DaniPopes@users.noreply.github.com> Date: Tue, 11 Jun 2024 08:44:29 +0200 Subject: [PATCH] chore(deps): remove libffi dependency (#8736) --- Cargo.lock | 20 --- .../storage/db/src/implementation/mdbx/mod.rs | 78 ++++++------ .../storage/db/src/implementation/mdbx/tx.rs | 4 +- crates/storage/libmdbx-rs/Cargo.toml | 11 +- crates/storage/libmdbx-rs/benches/cursor.rs | 3 +- .../storage/libmdbx-rs/benches/transaction.rs | 3 +- crates/storage/libmdbx-rs/src/environment.rs | 115 ++++++------------ crates/storage/libmdbx-rs/src/lib.rs | 5 +- crates/storage/libmdbx-rs/src/transaction.rs | 28 +++-- 9 files changed, 111 insertions(+), 156 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7e6132d05..d660fd913 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4409,25 +4409,6 @@ version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" -[[package]] -name = "libffi" -version = "3.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce826c243048e3d5cec441799724de52e2d42f820468431fc3fceee2341871e2" -dependencies = [ - "libc", - "libffi-sys", -] - -[[package]] -name = "libffi-sys" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f36115160c57e8529781b4183c2bb51fdc1f6d6d1ed345591d84be7703befb3c" -dependencies = [ - "cc", -] - [[package]] name = "libloading" version = "0.8.3" @@ -7099,7 +7080,6 @@ dependencies = [ "derive_more", "indexmap 2.2.6", "libc", - "libffi", "parking_lot 0.12.3", "pprof", "rand 0.8.5", diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index d9d1a8e2a..685ffbc3a 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -17,8 +17,8 @@ use reth_db_api::{ transaction::{DbTx, DbTxMut}, }; use reth_libmdbx::{ - DatabaseFlags, Environment, EnvironmentFlags, Geometry, MaxReadTransactionDuration, Mode, - PageSize, SyncMode, RO, RW, + ffi, DatabaseFlags, Environment, EnvironmentFlags, Geometry, HandleSlowReadersReturnCode, + MaxReadTransactionDuration, Mode, PageSize, SyncMode, RO, RW, }; use reth_storage_errors::db::LogLevel; use reth_tracing::tracing::error; @@ -289,42 +289,52 @@ impl DatabaseEnv { shrink_threshold: Some(0), page_size: Some(PageSize::Set(default_page_size())), }); - #[cfg(not(windows))] - { - fn is_current_process(id: u32) -> bool { - #[cfg(unix)] - { - id == std::os::unix::process::parent_id() || id == std::process::id() - } - #[cfg(not(unix))] - { - id == std::process::id() - } + fn is_current_process(id: u32) -> bool { + #[cfg(unix)] + { + id == std::os::unix::process::parent_id() || id == std::process::id() } - inner_env.set_handle_slow_readers( - |process_id: u32, thread_id: u32, read_txn_id: u64, gap: usize, space: usize, retry: isize| { - if space > MAX_SAFE_READER_SPACE { - let message = if is_current_process(process_id) { - "Current process has a long-lived database transaction that grows the database file." - } else { - "External process has a long-lived database transaction that grows the database file. Use shorter-lived read transactions or shut down the node." - }; - reth_tracing::tracing::warn!( - target: "storage::db::mdbx", - ?process_id, - ?thread_id, - ?read_txn_id, - ?gap, - ?space, - ?retry, - message - ) - } - reth_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader - }); + #[cfg(not(unix))] + { + id == std::process::id() + } } + + extern "C" fn handle_slow_readers( + _env: *const ffi::MDBX_env, + _txn: *const ffi::MDBX_txn, + process_id: ffi::mdbx_pid_t, + thread_id: ffi::mdbx_tid_t, + read_txn_id: u64, + gap: std::ffi::c_uint, + space: usize, + retry: std::ffi::c_int, + ) -> HandleSlowReadersReturnCode { + if space > MAX_SAFE_READER_SPACE { + let message = if is_current_process(process_id as u32) { + "Current process has a long-lived database transaction that grows the database file." + } else { + "External process has a long-lived database transaction that grows the database file. \ + Use shorter-lived read transactions or shut down the node." + }; + reth_tracing::tracing::warn!( + target: "storage::db::mdbx", + process_id, + thread_id, + read_txn_id, + gap, + space, + retry, + "{message}" + ) + } + + reth_libmdbx::HandleSlowReadersReturnCode::ProceedWithoutKillingReader + } + inner_env.set_handle_slow_readers(handle_slow_readers); + inner_env.set_flags(EnvironmentFlags { mode, // We disable readahead because it improves performance for linear scans, but diff --git a/crates/storage/db/src/implementation/mdbx/tx.rs b/crates/storage/db/src/implementation/mdbx/tx.rs index b95d543f9..8feb6c90a 100644 --- a/crates/storage/db/src/implementation/mdbx/tx.rs +++ b/crates/storage/db/src/implementation/mdbx/tx.rs @@ -10,7 +10,7 @@ use reth_db_api::{ table::{Compress, DupSort, Encode, Table, TableImporter}, transaction::{DbTx, DbTxMut}, }; -use reth_libmdbx::{ffi::DBI, CommitLatency, Transaction, TransactionKind, WriteFlags, RW}; +use reth_libmdbx::{ffi::MDBX_dbi, CommitLatency, Transaction, TransactionKind, WriteFlags, RW}; use reth_storage_errors::db::{DatabaseWriteError, DatabaseWriteOperation}; use reth_tracing::tracing::{debug, trace, warn}; use std::{ @@ -75,7 +75,7 @@ impl Tx { } /// Gets a table database handle if it exists, otherwise creates it. - pub fn get_dbi(&self) -> Result { + pub fn get_dbi(&self) -> Result { self.inner .open_db(Some(T::NAME)) .map(|db| db.dbi()) diff --git a/crates/storage/libmdbx-rs/Cargo.toml b/crates/storage/libmdbx-rs/Cargo.toml index 2042cd896..68576e0d0 100644 --- a/crates/storage/libmdbx-rs/Cargo.toml +++ b/crates/storage/libmdbx-rs/Cargo.toml @@ -25,10 +25,7 @@ thiserror.workspace = true dashmap = { workspace = true, features = ["inline"], optional = true } tracing.workspace = true -ffi = { package = "reth-mdbx-sys", path = "./mdbx-sys" } - -[target.'cfg(not(windows))'.dependencies] -libffi = "3.2.0" +reth-mdbx-sys.workspace = true [features] default = [] @@ -36,7 +33,11 @@ return-borrowed = [] read-tx-timeouts = ["dashmap", "dashmap/inline"] [dev-dependencies] -pprof = { workspace = true, features = ["flamegraph", "frame-pointer", "criterion"] } +pprof = { workspace = true, features = [ + "flamegraph", + "frame-pointer", + "criterion", +] } criterion.workspace = true rand.workspace = true rand_xorshift = "0.3" diff --git a/crates/storage/libmdbx-rs/benches/cursor.rs b/crates/storage/libmdbx-rs/benches/cursor.rs index a73b4fe27..acd7d9a72 100644 --- a/crates/storage/libmdbx-rs/benches/cursor.rs +++ b/crates/storage/libmdbx-rs/benches/cursor.rs @@ -1,10 +1,9 @@ #![allow(missing_docs)] mod utils; -use ::ffi::*; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use pprof::criterion::{Output, PProfProfiler}; -use reth_libmdbx::*; +use reth_libmdbx::{ffi::*, *}; use std::ptr; use utils::*; diff --git a/crates/storage/libmdbx-rs/benches/transaction.rs b/crates/storage/libmdbx-rs/benches/transaction.rs index 8ec7702f7..8cc84b01f 100644 --- a/crates/storage/libmdbx-rs/benches/transaction.rs +++ b/crates/storage/libmdbx-rs/benches/transaction.rs @@ -2,11 +2,10 @@ mod utils; use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use ffi::*; use libc::size_t; use rand::{prelude::SliceRandom, SeedableRng}; use rand_xorshift::XorShiftRng; -use reth_libmdbx::{ObjectLength, WriteFlags}; +use reth_libmdbx::{ffi::*, ObjectLength, WriteFlags}; use std::ptr; use utils::*; diff --git a/crates/storage/libmdbx-rs/src/environment.rs b/crates/storage/libmdbx-rs/src/environment.rs index 34e64fa18..f1a2cbe14 100644 --- a/crates/storage/libmdbx-rs/src/environment.rs +++ b/crates/storage/libmdbx-rs/src/environment.rs @@ -10,8 +10,7 @@ use byteorder::{ByteOrder, NativeEndian}; use mem::size_of; use std::{ ffi::CString, - fmt, - fmt::Debug, + fmt::{self, Debug}, mem, ops::{Bound, RangeBounds}, path::Path, @@ -51,7 +50,6 @@ impl Environment { geometry: None, log_level: None, kind: Default::default(), - #[cfg(not(windows))] handle_slow_readers: None, #[cfg(feature = "read-tx-timeouts")] max_read_transaction_duration: None, @@ -514,6 +512,7 @@ impl Default for Geometry { /// implement timeout reset logic while waiting for a readers. /// /// # Returns +/// /// A return code that determines the further actions for MDBX and must match the action which /// was executed by the callback: /// * `-2` or less – An error condition and the reader was not killed. @@ -528,46 +527,37 @@ impl Default for Geometry { /// called later. /// * `2` or greater – The reader process was terminated or killed, and MDBX should entirely reset /// reader registration. -pub type HandleSlowReadersCallback = fn( - process_id: u32, - thread_id: u32, - read_txn_id: u64, - gap: usize, +pub type HandleSlowReadersCallback = extern "C" fn( + env: *const ffi::MDBX_env, + txn: *const ffi::MDBX_txn, + pid: ffi::mdbx_pid_t, + tid: ffi::mdbx_tid_t, + laggard: u64, + gap: std::ffi::c_uint, space: usize, - retry: isize, + retry: std::ffi::c_int, ) -> HandleSlowReadersReturnCode; #[derive(Debug)] +#[repr(i32)] pub enum HandleSlowReadersReturnCode { /// An error condition and the reader was not killed. - Error, + Error = -2, /// The callback was unable to solve the problem and agreed on `MDBX_MAP_FULL` error; /// MDBX should increase the database size or return `MDBX_MAP_FULL` error. - ProceedWithoutKillingReader, + ProceedWithoutKillingReader = -1, /// The callback solved the problem or just waited for a while, libmdbx should rescan the /// reader lock table and retry. This also includes a situation when corresponding transaction /// terminated in normal way by `mdbx_txn_abort()` or `mdbx_txn_reset()`, and may be restarted. /// I.e. reader slot isn't needed to be cleaned from transaction. - Success, + Success = 0, /// Transaction aborted asynchronous and reader slot should be cleared immediately, i.e. read /// transaction will not continue but `mdbx_txn_abort()` nor `mdbx_txn_reset()` will be called /// later. - ClearReaderSlot, + ClearReaderSlot = 1, /// The reader process was terminated or killed, and MDBX should entirely reset reader /// registration. - ReaderProcessTerminated, -} - -impl From for i32 { - fn from(value: HandleSlowReadersReturnCode) -> Self { - match value { - HandleSlowReadersReturnCode::Error => -2, - HandleSlowReadersReturnCode::ProceedWithoutKillingReader => -1, - HandleSlowReadersReturnCode::Success => 0, - HandleSlowReadersReturnCode::ClearReaderSlot => 1, - HandleSlowReadersReturnCode::ReaderProcessTerminated => 2, - } - } + ReaderProcessTerminated = 2, } /// Options for opening or creating an environment. @@ -585,7 +575,6 @@ pub struct EnvironmentBuilder { geometry: Option, Option)>>, log_level: Option, kind: EnvironmentKind, - #[cfg(not(windows))] handle_slow_readers: Option, #[cfg(feature = "read-tx-timeouts")] /// The maximum duration of a read transaction. If [None], but the `read-tx-timeout` feature is @@ -671,11 +660,10 @@ impl EnvironmentBuilder { ))?; } - #[cfg(not(windows))] if let Some(handle_slow_readers) = self.handle_slow_readers { mdbx_result(ffi::mdbx_env_set_hsr( env, - handle_slow_readers_callback(handle_slow_readers), + convert_hsr_fn(Some(handle_slow_readers)), ))?; } @@ -834,7 +822,6 @@ impl EnvironmentBuilder { /// Set the Handle-Slow-Readers callback. See [`HandleSlowReadersCallback`] for more /// information. - #[cfg(not(windows))] pub fn set_handle_slow_readers(&mut self, hsr: HandleSlowReadersCallback) -> &mut Self { self.handle_slow_readers = Some(hsr); self @@ -878,42 +865,10 @@ pub(crate) mod read_transactions { } } -/// Creates an instance of `MDBX_hsr_func`. -/// -/// Caution: this leaks the memory for callbacks, so they're alive throughout the program. It's -/// fine, because we also expect the database environment to be alive during this whole time. -#[cfg(not(windows))] -unsafe fn handle_slow_readers_callback(callback: HandleSlowReadersCallback) -> ffi::MDBX_hsr_func { - // Move the callback function to heap and intentionally leak it, so it's not dropped and the - // MDBX env can use it throughout the whole program. - let callback = Box::leak(Box::new(callback)); - - // Wrap the callback into an ffi binding. The callback is needed for a nicer UX with Rust types, - // and without `env` and `txn` arguments that we don't want to expose to the user. Again, - // move the closure to heap and leak. - let hsr = Box::leak(Box::new( - |_env: *const ffi::MDBX_env, - _txn: *const ffi::MDBX_txn, - pid: ffi::mdbx_pid_t, - tid: ffi::mdbx_tid_t, - laggard: u64, - gap: ::libc::c_uint, - space: usize, - retry: ::libc::c_int| - -> i32 { - callback(pid as u32, tid as u32, laggard, gap as usize, space, retry as isize).into() - }, - )); - - // Create a pointer to the C function from the Rust closure, and forcefully forget the original - // closure. - let closure = libffi::high::Closure8::new(hsr); - let closure_ptr = *closure.code_ptr(); - std::mem::forget(closure); - - // Cast the closure to FFI `extern fn` type. - #[allow(clippy::missing_transmute_annotations)] - Some(std::mem::transmute(closure_ptr)) +/// Converts a [`HandleSlowReadersCallback`] to the actual FFI function pointer. +#[allow(clippy::missing_transmute_annotations)] +fn convert_hsr_fn(callback: Option) -> ffi::MDBX_hsr_func { + unsafe { std::mem::transmute(callback) } } #[cfg(test)] @@ -924,11 +879,24 @@ mod tests { sync::atomic::{AtomicBool, Ordering}, }; - #[cfg(not(windows))] #[test] fn test_handle_slow_readers_callback() { static CALLED: AtomicBool = AtomicBool::new(false); + extern "C" fn handle_slow_readers( + _env: *const ffi::MDBX_env, + _txn: *const ffi::MDBX_txn, + _pid: ffi::mdbx_pid_t, + _tid: ffi::mdbx_tid_t, + _laggard: u64, + _gap: std::ffi::c_uint, + _space: usize, + _retry: std::ffi::c_int, + ) -> HandleSlowReadersReturnCode { + CALLED.store(true, Ordering::Relaxed); + HandleSlowReadersReturnCode::ProceedWithoutKillingReader + } + let tempdir = tempfile::tempdir().unwrap(); let env = Environment::builder() .set_geometry(Geometry::> { @@ -936,18 +904,7 @@ mod tests { page_size: Some(PageSize::MinimalAcceptable), // To create as many pages as possible ..Default::default() }) - .set_handle_slow_readers( - |_process_id: u32, - _thread_id: u32, - _read_txn_id: u64, - _gap: usize, - _space: usize, - _retry: isize| { - CALLED.store(true, Ordering::Relaxed); - - HandleSlowReadersReturnCode::ProceedWithoutKillingReader - }, - ) + .set_handle_slow_readers(handle_slow_readers) .open(tempdir.path()) .unwrap(); diff --git a/crates/storage/libmdbx-rs/src/lib.rs b/crates/storage/libmdbx-rs/src/lib.rs index 6ffaf8cb0..6a0fe97f7 100644 --- a/crates/storage/libmdbx-rs/src/lib.rs +++ b/crates/storage/libmdbx-rs/src/lib.rs @@ -8,6 +8,8 @@ #![allow(missing_docs, clippy::needless_pass_by_ref_mut)] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +pub extern crate reth_mdbx_sys as ffi; + pub use crate::{ codec::*, cursor::{Cursor, Iter, IterDup}, @@ -20,9 +22,6 @@ pub use crate::{ flags::*, transaction::{CommitLatency, Transaction, TransactionKind, RO, RW}, }; -pub mod ffi { - pub use ffi::{MDBX_dbi as DBI, MDBX_log_level_t as LogLevel}; -} #[cfg(feature = "read-tx-timeouts")] pub use crate::environment::read_transactions::MaxReadTransactionDuration; diff --git a/crates/storage/libmdbx-rs/src/transaction.rs b/crates/storage/libmdbx-rs/src/transaction.rs index 035246b4f..5e8049bb2 100644 --- a/crates/storage/libmdbx-rs/src/transaction.rs +++ b/crates/storage/libmdbx-rs/src/transaction.rs @@ -523,27 +523,35 @@ impl Transaction { #[derive(Debug, Clone)] pub(crate) struct TransactionPtr { txn: *mut ffi::MDBX_txn, + #[cfg(feature = "read-tx-timeouts")] timed_out: Arc, lock: Arc>, } impl TransactionPtr { fn new(txn: *mut ffi::MDBX_txn) -> Self { - Self { txn, timed_out: Arc::new(AtomicBool::new(false)), lock: Arc::new(Mutex::new(())) } + Self { + txn, + #[cfg(feature = "read-tx-timeouts")] + timed_out: Arc::new(AtomicBool::new(false)), + lock: Arc::new(Mutex::new(())), + } } - // Returns `true` if the transaction is timed out. - // - // When transaction is timed out via `TxnManager`, it's actually reset using - // `mdbx_txn_reset`. It makes the transaction unusable (MDBX fails on any usages of such - // transactions). - // - // Importantly, we can't rely on `MDBX_TXN_FINISHED` flag to check if the transaction is timed - // out using `mdbx_txn_reset`, because MDBX uses it in other cases too. + /// Returns `true` if the transaction is timed out. + /// + /// When transaction is timed out via `TxnManager`, it's actually reset using + /// `mdbx_txn_reset`. It makes the transaction unusable (MDBX fails on any usages of such + /// transactions). + /// + /// Importantly, we can't rely on `MDBX_TXN_FINISHED` flag to check if the transaction is timed + /// out using `mdbx_txn_reset`, because MDBX uses it in other cases too. + #[cfg(feature = "read-tx-timeouts")] fn is_timed_out(&self) -> bool { self.timed_out.load(std::sync::atomic::Ordering::SeqCst) } + #[cfg(feature = "read-tx-timeouts")] pub(crate) fn set_timed_out(&self) { self.timed_out.store(true, std::sync::atomic::Ordering::SeqCst); } @@ -575,6 +583,7 @@ impl TransactionPtr { // No race condition with the `TxnManager` timing out the transaction is possible here, // because we're taking a lock for any actions on the transaction pointer, including a call // to the `mdbx_txn_reset`. + #[cfg(feature = "read-tx-timeouts")] if self.is_timed_out() { return Err(Error::ReadTransactionTimeout) } @@ -594,6 +603,7 @@ impl TransactionPtr { let _lck = self.lock(); // To be able to do any operations on the transaction, we need to renew it first. + #[cfg(feature = "read-tx-timeouts")] if self.is_timed_out() { mdbx_result(unsafe { mdbx_txn_renew(self.txn) })?; }