diff --git a/Cargo.lock b/Cargo.lock index 5e285f83d..6e1a22a84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1231,6 +1231,16 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "page_size" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eebde548fbbf1ea81a99b128872779c437752fb99f217c45245e1a61dcd9edcd" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "parity-scale-codec" version = "3.2.1" @@ -1483,7 +1493,12 @@ version = "0.1.0" name = "reth-db" version = "0.1.0" dependencies = [ + "bytes", "libmdbx", + "page_size", + "reth-primitives", + "tempfile", + "thiserror", ] [[package]] @@ -2089,9 +2104,9 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" [[package]] name = "tracing" -version = "0.1.36" +version = "0.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307" +checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if", "pin-project-lite", @@ -2101,9 +2116,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.22" +version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2" +checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a" dependencies = [ "proc-macro2", "quote", @@ -2112,9 +2127,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.29" +version = "0.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7" +checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a" dependencies = [ "once_cell", ] diff --git a/crates/db/Cargo.toml b/crates/db/Cargo.toml index ad11b3c00..16d6ef5bf 100644 --- a/crates/db/Cargo.toml +++ b/crates/db/Cargo.toml @@ -8,4 +8,14 @@ readme = "README.md" description = "Staged syncing primitives used in reth." [dependencies] -libmdbx = "0.1.8" \ No newline at end of file +# reth +reth-primitives = { path = "../primitives" } + +# misc +bytes = "1.2.1" +libmdbx = "0.1.8" +page_size = "0.4.2" +thiserror = "1.0.37" + +[dev-dependencies] +tempfile = "3.3.0" diff --git a/crates/db/src/kv/cursor.rs b/crates/db/src/kv/cursor.rs new file mode 100644 index 000000000..8ed4f4d61 --- /dev/null +++ b/crates/db/src/kv/cursor.rs @@ -0,0 +1,194 @@ +//! Cursor wrapper for libmdbx-sys. + +use super::error::KVError; +use crate::{ + kv::{Decode, DupSort, Encode, Table}, + utils::*, +}; +use libmdbx::{self, TransactionKind}; + +/// Alias type for a `(key, value)` result coming from a cursor. +pub type PairResult = Result::Key, ::Value)>, KVError>; +/// Alias type for a `(key, value)` result coming from an iterator. +pub type IterPairResult = Option::Key, ::Value), KVError>>; +/// Alias type for a value result coming from a cursor without its key. +pub type ValueOnlyResult = Result::Value>, KVError>; + +/// Cursor wrapper to access KV items. +#[derive(Debug)] +pub struct Cursor<'tx, K: TransactionKind, T: Table> { + /// Inner `libmdbx` cursor. + pub inner: libmdbx::Cursor<'tx, K>, + /// Table name as is inside the database. + pub table: &'static str, + /// Phantom data to enforce encoding/decoding. + pub _dbi: std::marker::PhantomData, +} + +/// Takes `(key, value)` from the database and decodes it appropriately. +#[macro_export] +macro_rules! decode { + ($v:expr) => { + $v?.map(decoder::).transpose() + }; +} + +impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> { + /// Returns the first `(key, value)` pair. + pub fn first(&mut self) -> PairResult + where + T::Key: Decode, + { + decode!(self.inner.first()) + } + + /// Seeks for a `(key, value)` pair greater or equal than `key`. + pub fn seek(&mut self, key: T::SeekKey) -> PairResult + where + T::Key: Decode, + { + decode!(self.inner.set_range(key.encode().as_ref())) + } + + /// Seeks for the exact `(key, value)` pair with `key`. + pub fn seek_exact(&mut self, key: T::Key) -> PairResult + where + T::Key: Decode, + { + decode!(self.inner.set_key(key.encode().as_ref())) + } + + /// Returns the next `(key, value)` pair. + #[allow(clippy::should_implement_trait)] + pub fn next(&mut self) -> PairResult + where + T::Key: Decode, + { + decode!(self.inner.next()) + } + + /// Returns the previous `(key, value)` pair. + pub fn prev(&mut self) -> PairResult + where + T::Key: Decode, + { + decode!(self.inner.prev()) + } + + /// Returns the last `(key, value)` pair. + pub fn last(&mut self) -> PairResult + where + T::Key: Decode, + { + decode!(self.inner.last()) + } + + /// Returns the current `(key, value)` pair of the cursor. + pub fn current(&mut self) -> PairResult + where + T::Key: Decode, + { + decode!(self.inner.get_current()) + } + + /// Returns an iterator starting at a key greater or equal than `start_key`. + pub fn walk( + mut self, + start_key: T::Key, + ) -> Result< + impl Iterator::Key, ::Value), KVError>>, + KVError, + > + where + T::Key: Decode, + { + let start = self.inner.set_range(start_key.encode().as_ref())?.map(decoder::); + + Ok(Walker { cursor: self, start }) + } +} + +impl<'txn, K, T> Cursor<'txn, K, T> +where + K: TransactionKind, + T: DupSort, +{ + /// Returns the next `(key, value)` pair of a DUPSORT table. + pub fn next_dup(&mut self) -> PairResult + where + T::Key: Decode, + { + decode!(self.inner.next_dup()) + } + + /// Returns the next `(key, value)` pair skipping the duplicates. + pub fn next_no_dup(&mut self) -> PairResult + where + T::Key: Decode, + { + decode!(self.inner.next_nodup()) + } + + /// Returns the next `value` of a duplicate `key`. + pub fn next_dup_val(&mut self) -> ValueOnlyResult { + self.inner.next_dup()?.map(decode_value::).transpose() + } + + /// Returns an iterator starting at a key greater or equal than `start_key` of a DUPSORT table. + pub fn walk_dup( + mut self, + key: T::Key, + subkey: T::SubKey, + ) -> Result::Value, KVError>>, KVError> { + let start = self + .inner + .get_both_range(key.encode().as_ref(), subkey.encode().as_ref())? + .map(decode_one::); + + Ok(DupWalker { cursor: self, start }) + } +} + +/// Provides an iterator to `Cursor` when handling `Table`. +#[derive(Debug)] +pub struct Walker<'a, K: TransactionKind, T: Table> { + /// Cursor to be used to walk through the table. + pub cursor: Cursor<'a, K, T>, + /// `(key, value)` where to start the walk. + pub start: IterPairResult, +} + +impl<'tx, K: TransactionKind, T: Table> std::iter::Iterator for Walker<'tx, K, T> +where + T::Key: Decode, +{ + type Item = Result<(T::Key, T::Value), KVError>; + fn next(&mut self) -> Option { + let start = self.start.take(); + if start.is_some() { + return start + } + + self.cursor.next().transpose() + } +} + +/// Provides an iterator to `Cursor` when handling a `DupSort` table. +#[derive(Debug)] +pub struct DupWalker<'a, K: TransactionKind, T: DupSort> { + /// Cursor to be used to walk through the table. + pub cursor: Cursor<'a, K, T>, + /// Value where to start the walk. + pub start: Option>, +} + +impl<'tx, K: TransactionKind, T: DupSort> std::iter::Iterator for DupWalker<'tx, K, T> { + type Item = Result; + fn next(&mut self) -> Option { + let start = self.start.take(); + if start.is_some() { + return start + } + self.cursor.next_dup_val().transpose() + } +} diff --git a/crates/db/src/kv/error.rs b/crates/db/src/kv/error.rs new file mode 100644 index 000000000..7d46779d6 --- /dev/null +++ b/crates/db/src/kv/error.rs @@ -0,0 +1,36 @@ +//! KVError declaration. + +use libmdbx::Error; +use thiserror::Error; + +/// KV error type. +#[derive(Debug, Error)] +pub enum KVError { + /// Generic MDBX error. + #[error("MDBX error: {0:?}")] + MDBX(#[from] Error), + /// Failed to open MDBX file. + #[error("{0:?}")] + DatabaseLocation(Error), + /// Failed to create a table in database. + #[error("{0:?}")] + TableCreation(Error), + /// Failed to insert a value into a table. + #[error("{0:?}")] + Put(Error), + /// Failed to get a value into a table. + #[error("{0:?}")] + Get(Error), + /// Failed to delete a `(key, vakue)` pair into a table. + #[error("{0:?}")] + Delete(Error), + /// Failed to commit transaction changes into the database. + #[error("{0:?}")] + Commit(Error), + /// Failed to initiate a MDBX transaction. + #[error("{0:?}")] + InitTransaction(Error), + /// Failed to decode or encode a key or value coming from a table.. + #[error("{0:?}")] + InvalidValue(Option), +} diff --git a/crates/db/src/kv/mod.rs b/crates/db/src/kv/mod.rs new file mode 100644 index 000000000..d67f21209 --- /dev/null +++ b/crates/db/src/kv/mod.rs @@ -0,0 +1,215 @@ +//! Module that interacts with MDBX. + +use crate::utils::{default_page_size, TableType}; +use libmdbx::{ + DatabaseFlags, Environment, EnvironmentFlags, EnvironmentKind, Geometry, Mode, PageSize, + SyncMode, RO, RW, +}; +use std::{ops::Deref, path::Path}; + +pub mod table; +use table::{Decode, DupSort, Encode, Table}; + +pub mod tables; +use tables::TABLES; + +pub mod cursor; + +pub mod tx; +use tx::Tx; + +mod error; +pub use error::KVError; + +/// Environment used when opening a MDBX environment. RO/RW. +#[derive(Debug)] +pub enum EnvKind { + /// Read-only MDBX environment. + RO, + /// Read-write MDBX environment. + RW, +} + +/// Wrapper for the libmdbx environment. +#[derive(Debug)] +pub struct Env { + /// Libmdbx-sys environment. + pub inner: Environment, +} + +impl Env { + /// Opens the database at the specified path with the given `EnvKind`. + /// + /// It does not create the tables, for that call [`create_tables`]. + pub fn open(path: &Path, kind: EnvKind) -> Result, KVError> { + let mode = match kind { + EnvKind::RO => Mode::ReadOnly, + EnvKind::RW => Mode::ReadWrite { sync_mode: SyncMode::Durable }, + }; + + let env = Env { + inner: Environment::new() + .set_max_dbs(TABLES.len()) + .set_geometry(Geometry { + size: Some(0..0x100000), // TODO: reevaluate + growth_step: Some(0x100000), // TODO: reevaluate + shrink_threshold: None, + page_size: Some(PageSize::Set(default_page_size())), + }) + .set_flags(EnvironmentFlags { + mode, + no_rdahead: true, // TODO: reevaluate + coalesce: true, + ..Default::default() + }) + .open(path) + .map_err(KVError::DatabaseLocation)?, + }; + + Ok(env) + } + + /// Creates all the defined tables, if necessary. + pub fn create_tables(&self) -> Result<(), KVError> { + let tx = self.inner.begin_rw_txn().map_err(KVError::InitTransaction)?; + + for (table_type, table) in TABLES { + let flags = match table_type { + TableType::Table => DatabaseFlags::default(), + TableType::DupSort => DatabaseFlags::DUP_SORT, + }; + + tx.create_db(Some(table), flags).map_err(KVError::TableCreation)?; + } + + tx.commit()?; + + Ok(()) + } +} + +impl Env { + /// Initiates a read-only transaction. It should be committed or rolled back in the end, so it + /// frees up pages. + pub fn begin_tx(&self) -> Result, KVError> { + Ok(Tx::new(self.inner.begin_ro_txn().map_err(KVError::InitTransaction)?)) + } + + /// Initiates a read-write transaction. It should be committed or rolled back in the end. + pub fn begin_mut_tx(&self) -> Result, KVError> { + Ok(Tx::new(self.inner.begin_rw_txn().map_err(KVError::InitTransaction)?)) + } + + /// Takes a function and passes a read-only transaction into it, making sure it's closed in the + /// end of the execution. + pub fn view(&self, f: F) -> Result + where + F: Fn(&Tx<'_, RO, E>) -> T, + { + let tx = self.begin_tx()?; + + let res = f(&tx); + tx.commit()?; + + Ok(res) + } + + /// Takes a function and passes a write-read transaction into it, making sure it's committed in + /// the end of the execution. + pub fn update(&self, f: F) -> Result + where + F: Fn(&Tx<'_, RW, E>) -> T, + { + let tx = self.begin_mut_tx()?; + + let res = f(&tx); + tx.commit()?; + + Ok(res) + } +} + +impl Deref for Env { + type Target = libmdbx::Environment; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +#[cfg(test)] +mod tests { + use super::{tables::PlainState, Env, EnvKind}; + use libmdbx::{NoWriteMap, WriteMap}; + use reth_primitives::Address; + use std::str::FromStr; + use tempfile::TempDir; + + const ERROR_DB_CREATION: &str = "Not able to create the mdbx file."; + const ERROR_DB_OPEN: &str = "Not able to open existing mdbx file."; + const ERROR_TABLE_CREATION: &str = "Not able to create tables in the database."; + const ERROR_PUT: &str = "Not able to insert value into table."; + const ERROR_GET: &str = "Not able to get value from table."; + const ERROR_COMMIT: &str = "Not able to commit transaction."; + const ERROR_RETURN_VALUE: &str = "Mismatching result."; + const ERROR_INIT_TX: &str = "Failed to create a MDBX transaction."; + const ERROR_ETH_ADDRESS: &str = "Invalid address."; + const ERROR_TEMPDIR: &str = "Not able to create a temporary directory."; + + #[test] + fn db_creation() { + Env::::open(&TempDir::new().expect(ERROR_TEMPDIR).into_path(), EnvKind::RW) + .expect(ERROR_DB_CREATION); + } + + #[test] + fn db_manual_put_get() { + let env = + Env::::open(&TempDir::new().expect(ERROR_TEMPDIR).into_path(), EnvKind::RW) + .expect(ERROR_DB_CREATION); + env.create_tables().expect(ERROR_TABLE_CREATION); + + let value = vec![1, 3, 3, 7]; + let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047") + .expect(ERROR_ETH_ADDRESS); + + // PUT + let tx = env.begin_mut_tx().expect(ERROR_INIT_TX); + tx.put(PlainState, key, value.clone()).expect(ERROR_PUT); + tx.commit().expect(ERROR_COMMIT); + + // GET + let tx = env.begin_tx().expect(ERROR_INIT_TX); + let result = tx.get(PlainState, key).expect(ERROR_GET); + assert!(result.expect(ERROR_RETURN_VALUE) == value); + tx.commit().expect(ERROR_COMMIT); + } + + #[test] + fn db_closure_put_get() { + let path = TempDir::new().expect(ERROR_TEMPDIR).into_path(); + + let value = vec![1, 3, 3, 7]; + let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047") + .expect(ERROR_ETH_ADDRESS); + + { + let env = Env::::open(&path, EnvKind::RW).expect(ERROR_DB_OPEN); + env.create_tables().expect(ERROR_TABLE_CREATION); + + // PUT + let result = env.update(|tx| { + tx.put(PlainState, key, value.clone()).expect(ERROR_PUT); + 200 + }); + assert!(result.expect(ERROR_RETURN_VALUE) == 200); + } + + let env = Env::::open(&path, EnvKind::RO).expect(ERROR_DB_CREATION); + + // GET + let result = env.view(|tx| tx.get(PlainState, key).expect(ERROR_GET)).expect(ERROR_GET); + + assert!(result == Some(value)) + } +} diff --git a/crates/db/src/kv/table.rs b/crates/db/src/kv/table.rs new file mode 100644 index 000000000..f34c78c9a --- /dev/null +++ b/crates/db/src/kv/table.rs @@ -0,0 +1,134 @@ +//! Table traits. + +use super::KVError; +use bytes::Bytes; +use reth_primitives::{Address, U256}; +use std::fmt::Debug; + +/// Trait that will transform the data to be saved in the DB. +pub trait Encode: Send + Sync + Sized + Debug { + /// Encoded type. + type Encoded: AsRef<[u8]> + Send + Sync; + + /// Decodes data going into the database. + fn encode(self) -> Self::Encoded; +} + +/// Trait that will transform the data to be read from the DB. +pub trait Decode: Send + Sync + Sized + Debug { + /// Decodes data coming from the database. + fn decode(value: &[u8]) -> Result; +} + +/// Generic trait that enforces the database value to implement [`Encode`] and [`Decode`]. +pub trait Object: Encode + Decode {} + +impl Object for T where T: Encode + Decode {} + +/// Generic trait that a database table should follow. +pub trait Table: Send + Sync + Debug + 'static { + /// Key element of `Table`. + type Key: Encode; + /// Value element of `Table`. + type Value: Object; + /// Seek Key element of `Table`. + type SeekKey: Encode; + + /// Return name as it is present inside the MDBX. + fn name(&self) -> &'static str; +} + +/// DupSort allows for keys not to be repeated in the database, +/// for more check: https://libmdbx.dqdkfa.ru/usage.html#autotoc_md48 +pub trait DupSort: Table { + /// Subkey type. For more check https://libmdbx.dqdkfa.ru/usage.html#autotoc_md48 + type SubKey: Object; +} + +impl Encode for Vec { + type Encoded = Self; + + fn encode(self) -> Self::Encoded { + self + } +} + +impl Decode for Vec { + fn decode(value: &[u8]) -> Result { + Ok(value.to_vec()) + } +} + +impl Encode for Bytes { + type Encoded = Self; + + fn encode(self) -> Self::Encoded { + self + } +} + +impl Decode for Bytes { + fn decode(value: &[u8]) -> Result { + Ok(value.to_vec().into()) + } +} + +impl Encode for Address { + type Encoded = [u8; 20]; + + fn encode(self) -> Self::Encoded { + self.0 + } +} + +impl Decode for Address { + fn decode(value: &[u8]) -> Result { + Ok(Address::from_slice(value)) + } +} + +impl Encode for u16 { + type Encoded = [u8; 2]; + + fn encode(self) -> Self::Encoded { + self.to_be_bytes() + } +} + +impl Decode for u16 { + fn decode(value: &[u8]) -> Result { + unsafe { Ok(u16::from_be_bytes(*(value.as_ptr() as *const [_; 2]))) } + } +} + +impl Encode for u64 { + type Encoded = [u8; 8]; + + fn encode(self) -> Self::Encoded { + self.to_be_bytes() + } +} + +impl Decode for u64 { + fn decode(value: &[u8]) -> Result { + unsafe { Ok(u64::from_be_bytes(*(value.as_ptr() as *const [_; 8]))) } + } +} + +impl Encode for U256 { + type Encoded = [u8; 32]; + + fn encode(self) -> Self::Encoded { + let mut result = [0; 32]; + self.to_big_endian(&mut result); + result + } +} + +impl Decode for U256 { + fn decode(value: &[u8]) -> Result { + let mut result = [0; 32]; + result.copy_from_slice(value); + Ok(Self::from_big_endian(&result)) + } +} diff --git a/crates/db/src/kv/tables.rs b/crates/db/src/kv/tables.rs new file mode 100644 index 000000000..6b92ab0d0 --- /dev/null +++ b/crates/db/src/kv/tables.rs @@ -0,0 +1,115 @@ +//! Declaration of all MDBX tables. + +use crate::utils::TableType; +use reth_primitives::{Address, U256}; + +/// Default tables that should be present inside database. +pub const TABLES: [(TableType, &str); 17] = [ + (TableType::Table, CanonicalHeaders::const_name()), + (TableType::Table, HeaderTD::const_name()), + (TableType::Table, HeaderNumbers::const_name()), + (TableType::Table, Headers::const_name()), + (TableType::Table, BlockBodies::const_name()), + (TableType::Table, CumulativeTxCount::const_name()), + (TableType::Table, NonCanonicalTransactions::const_name()), + (TableType::Table, Transactions::const_name()), + (TableType::Table, Receipts::const_name()), + (TableType::Table, Logs::const_name()), + (TableType::Table, PlainState::const_name()), + (TableType::Table, AccountHistory::const_name()), + (TableType::Table, StorageHistory::const_name()), + (TableType::DupSort, AccountChangeSet::const_name()), + (TableType::DupSort, StorageChangeSet::const_name()), + (TableType::Table, TxSenders::const_name()), + (TableType::Table, Config::const_name()), +]; + +#[macro_export] +/// Macro to declare all necessary tables. +macro_rules! table { + ($name:ident => $key:ty => $value:ty => $seek:ty) => { + /// $name MDBX table. + #[derive(Clone, Copy, Debug, Default)] + pub struct $name; + + impl $crate::kv::table::Table for $name { + type Key = $key; + type Value = $value; + type SeekKey = $seek; + + /// Return $name as it is present inside the database. + fn name(&self) -> &'static str { + $name::const_name() + } + } + + impl $name { + /// Return $name as it is present inside the database. + pub const fn const_name() -> &'static str { + stringify!($name) + } + } + + impl std::fmt::Display for $name { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", stringify!($name)) + } + } + }; + ($name:ident => $key:ty => $value:ty) => { + table!($name => $key => $value => $key); + }; +} + +// +// TABLE DEFINITIONS +// + +table!(CanonicalHeaders => BNum => HeaderHash); +table!(HeaderTD => BNum_BHash => RlpTotalDifficulty); +table!(HeaderNumbers => BNum_BHash => BNum); +table!(Headers => BNum_BHash => RlpHeader); + +table!(BlockBodies => BNum_BHash => NumTxesInBlock); +table!(CumulativeTxCount => BNum_BHash => u64); // TODO U256? + +table!(NonCanonicalTransactions => BNum_BHash_TxId => RlpTxBody); +table!(Transactions => TxId => RlpTxBody); // Canonical only +table!(Receipts => TxId => Receipt); // Canonical only +table!(Logs => TxId => Receipt); // Canonical only + +table!(PlainState => PlainStateKey => Vec); + +table!(AccountHistory => Address => TxIdList); +table!(StorageHistory => Address_StorageKey => TxIdList); + +table!(AccountChangeSet => TxId => AccountBeforeTx); +table!(StorageChangeSet => TxId => StorageKeyBeforeTx); + +table!(TxSenders => TxId => Address); // Is it necessary? +table!(Config => ConfigKey => ConfigValue); + +// +// TODO: Temporary types, until they're properly defined alongside with the Encode and Decode Trait +// + +type ConfigKey = Vec; +type ConfigValue = Vec; +#[allow(non_camel_case_types)] +type BNum_BHash = Vec; +#[allow(non_camel_case_types)] +type BNum_BHash_TxId = Vec; +type RlpHeader = Vec; +type RlpTotalDifficulty = Vec; +type RlpTxBody = Vec; +type Receipt = Vec; +type NumTxesInBlock = u16; // TODO can it be u16 +type BNum = u64; // TODO check size +type TxId = u64; // TODO check size +type HeaderHash = U256; +type PlainStateKey = Address; // TODO new type will have to account for address_incarna_skey as well +type TxIdList = Vec; +#[allow(non_camel_case_types)] +type Address_StorageKey = Vec; +type AccountBeforeTx = Vec; +type StorageKeyBeforeTx = Vec; diff --git a/crates/db/src/kv/tx.rs b/crates/db/src/kv/tx.rs new file mode 100644 index 000000000..fdd46529d --- /dev/null +++ b/crates/db/src/kv/tx.rs @@ -0,0 +1,109 @@ +//! Transaction wrapper for libmdbx-sys. + +use crate::{ + kv::{ + cursor::{Cursor, ValueOnlyResult}, + table::{Encode, Table}, + KVError, + }, + utils::decode_one, +}; +use libmdbx::{EnvironmentKind, Transaction, TransactionKind, WriteFlags, RW}; +use std::marker::PhantomData; + +/// Wrapper for the libmdbx transaction. +#[derive(Debug)] +pub struct Tx<'a, K: TransactionKind, E: EnvironmentKind> { + /// Libmdbx-sys transaction. + pub inner: Transaction<'a, K, E>, +} + +impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { + /// Creates new `Tx` object with a `RO` or `RW` transaction. + pub fn new<'a>(inner: Transaction<'a, K, E>) -> Self + where + 'a: 'env, + { + Self { inner } + } + + /// Gets this transaction ID. + pub fn id(&self) -> u64 { + self.inner.id() + } + + /// Open cursor on `table`. + pub fn cursor<'a, T: Table>(&'a self, table: T) -> Result, KVError> + where + 'env: 'a, + T: Table, + { + let table_name = table.name(); + + Ok(Cursor { + inner: self.inner.cursor(&self.inner.open_db(Some(table_name))?)?, + table: table_name, + _dbi: PhantomData, + }) + } + + /// Gets value associated with `key` on `table`. If it's a DUPSORT table, then returns the first + /// entry. + pub fn get(&self, table: T, key: T::Key) -> ValueOnlyResult { + self.inner + .get(&self.inner.open_db(Some(table.name()))?, key.encode().as_ref())? + .map(decode_one::) + .transpose() + } + + /// Saves all changes and frees up storage memory. + pub fn commit(self) -> Result { + self.inner.commit().map_err(KVError::Commit) + } +} + +impl<'a, E: EnvironmentKind> Tx<'a, RW, E> { + /// Opens `table` and inserts `(key, value)` pair. If the `key` already exists, it replaces the + /// value it if the table doesn't support DUPSORT. + pub fn put(&self, table: T, k: T::Key, v: T::Value) -> Result<(), KVError> + where + T: Table, + { + self.inner + .put( + &self.inner.open_db(Some(table.name()))?, + &k.encode(), + &v.encode(), + WriteFlags::UPSERT, + ) + .map_err(KVError::Put) + } + + /// Deletes the `(key, value)` entry on `table`. When `value` is `None`, all entries with `key` + /// are to be deleted. Otherwise, only the item matching that data shall be. + pub fn delete(&self, table: T, key: T::Key, value: Option) -> Result + where + T: Table, + { + let mut data = None; + + let value = value.map(Encode::encode); + if let Some(value) = &value { + data = Some(value.as_ref()); + }; + + self.inner + .del(&self.inner.open_db(Some(table.name()))?, key.encode(), data) + .map_err(KVError::Delete) + } + + /// Empties `table`. + pub fn clear(&self, table: T) -> Result<(), KVError> + where + T: Table, + { + self.inner.clear_db(&self.inner.open_db(Some(table.name()))?)?; + + Ok(()) + } +} diff --git a/crates/db/src/lib.rs b/crates/db/src/lib.rs index b78282fb3..89cf267d5 100644 --- a/crates/db/src/lib.rs +++ b/crates/db/src/lib.rs @@ -1,3 +1,5 @@ +//! Module that interacts with MDBX. + #![warn(missing_debug_implementations, missing_docs, unreachable_pub)] #![deny(unused_must_use, rust_2018_idioms)] #![doc(test( @@ -5,10 +7,10 @@ attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] -//! Database bindings for reth. -// TODO: Actually provide database bindings. For now, this is just a re-export of MDBX. - /// Rust bindings for [MDBX](https://libmdbx.dqdkfa.ru/). pub mod mdbx { pub use libmdbx::*; } + +pub mod kv; +mod utils; diff --git a/crates/db/src/static.rs b/crates/db/src/static.rs new file mode 100644 index 000000000..e69de29bb diff --git a/crates/db/src/utils.rs b/crates/db/src/utils.rs new file mode 100644 index 000000000..fdcfeb578 --- /dev/null +++ b/crates/db/src/utils.rs @@ -0,0 +1,55 @@ +//! Utils crate for `db`. + +use crate::kv::{ + table::{Decode, Table}, + KVError, +}; +use std::borrow::Cow; + +/// Enum for the type of table present in libmdbx. +#[derive(Debug)] +pub enum TableType { + Table, + DupSort, +} + +/// Returns the default page size that can be used in this OS. +pub(crate) fn default_page_size() -> usize { + let os_page_size = page_size::get(); + + // source: https://gitflic.ru/project/erthink/libmdbx/blob?file=mdbx.h#line-num-821 + let libmdbx_max_page_size = 0x10000; + + // May lead to errors if it's reduced further because of the potential size of the + // data. + let min_page_size = 4096; + + os_page_size.clamp(min_page_size, libmdbx_max_page_size) +} + +/// Helper function to decode a `(key, value)` pair. +pub(crate) fn decoder<'a, T>( + kv: (Cow<'a, [u8]>, Cow<'a, [u8]>), +) -> Result<(T::Key, T::Value), KVError> +where + T: Table, + T::Key: Decode, +{ + Ok((Decode::decode(&kv.0)?, Decode::decode(&kv.1)?)) +} + +/// Helper function to decode only a value from a `(key, value)` pair. +pub(crate) fn decode_value<'a, T>(kv: (Cow<'a, [u8]>, Cow<'a, [u8]>)) -> Result +where + T: Table, +{ + Decode::decode(&kv.1) +} + +/// Helper function to decode a value. It can be a key or subkey. +pub(crate) fn decode_one(value: Cow<'_, [u8]>) -> Result +where + T: Table, +{ + Decode::decode(&value) +}