feat(db): mdbx integration & table skeletons (#15)

* wip

* add table macro

* add simple put get test with Address

* add Env.view and Env.update

* docs

* slightly change the test

* add initial table initialization and placeholders

* lint & some

* replace String with str

* add error.rs

* add docs to encode

* add docs

* clamp

* add source on libmdbx_max_page_size
This commit is contained in:
joshieDo
2022-10-11 03:35:35 +08:00
committed by GitHub
parent 230e9ef179
commit 60d3c64410
11 changed files with 895 additions and 10 deletions

27
Cargo.lock generated
View File

@ -1231,6 +1231,16 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "page_size"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eebde548fbbf1ea81a99b128872779c437752fb99f217c45245e1a61dcd9edcd"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "parity-scale-codec"
version = "3.2.1"
@ -1483,7 +1493,12 @@ version = "0.1.0"
name = "reth-db"
version = "0.1.0"
dependencies = [
"bytes",
"libmdbx",
"page_size",
"reth-primitives",
"tempfile",
"thiserror",
]
[[package]]
@ -2089,9 +2104,9 @@ checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]]
name = "tracing"
version = "0.1.36"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fce9567bd60a67d08a16488756721ba392f24f29006402881e43b19aac64307"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [
"cfg-if",
"pin-project-lite",
@ -2101,9 +2116,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.22"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "11c75893af559bc8e10716548bdef5cb2b983f8e637db9d0e15126b61b484ee2"
checksum = "4017f8f45139870ca7e672686113917c71c7a6e02d4924eda67186083c03081a"
dependencies = [
"proc-macro2",
"quote",
@ -2112,9 +2127,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.29"
version = "0.1.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aeea4303076558a00714b823f9ad67d58a3bbda1df83d8827d21193156e22f7"
checksum = "24eb03ba0eab1fd845050058ce5e616558e8f8d8fca633e6b163fe25c797213a"
dependencies = [
"once_cell",
]

View File

@ -8,4 +8,14 @@ readme = "README.md"
description = "Staged syncing primitives used in reth."
[dependencies]
libmdbx = "0.1.8"
# reth
reth-primitives = { path = "../primitives" }
# misc
bytes = "1.2.1"
libmdbx = "0.1.8"
page_size = "0.4.2"
thiserror = "1.0.37"
[dev-dependencies]
tempfile = "3.3.0"

194
crates/db/src/kv/cursor.rs Normal file
View File

@ -0,0 +1,194 @@
//! Cursor wrapper for libmdbx-sys.
use super::error::KVError;
use crate::{
kv::{Decode, DupSort, Encode, Table},
utils::*,
};
use libmdbx::{self, TransactionKind};
/// Alias type for a `(key, value)` result coming from a cursor.
pub type PairResult<T> = Result<Option<(<T as Table>::Key, <T as Table>::Value)>, KVError>;
/// Alias type for a `(key, value)` result coming from an iterator.
pub type IterPairResult<T> = Option<Result<(<T as Table>::Key, <T as Table>::Value), KVError>>;
/// Alias type for a value result coming from a cursor without its key.
pub type ValueOnlyResult<T> = Result<Option<<T as Table>::Value>, KVError>;
/// Cursor wrapper to access KV items.
#[derive(Debug)]
pub struct Cursor<'tx, K: TransactionKind, T: Table> {
/// Inner `libmdbx` cursor.
pub inner: libmdbx::Cursor<'tx, K>,
/// Table name as is inside the database.
pub table: &'static str,
/// Phantom data to enforce encoding/decoding.
pub _dbi: std::marker::PhantomData<T>,
}
/// Takes `(key, value)` from the database and decodes it appropriately.
#[macro_export]
macro_rules! decode {
($v:expr) => {
$v?.map(decoder::<T>).transpose()
};
}
impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> {
/// Returns the first `(key, value)` pair.
pub fn first(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
decode!(self.inner.first())
}
/// Seeks for a `(key, value)` pair greater or equal than `key`.
pub fn seek(&mut self, key: T::SeekKey) -> PairResult<T>
where
T::Key: Decode,
{
decode!(self.inner.set_range(key.encode().as_ref()))
}
/// Seeks for the exact `(key, value)` pair with `key`.
pub fn seek_exact(&mut self, key: T::Key) -> PairResult<T>
where
T::Key: Decode,
{
decode!(self.inner.set_key(key.encode().as_ref()))
}
/// Returns the next `(key, value)` pair.
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
decode!(self.inner.next())
}
/// Returns the previous `(key, value)` pair.
pub fn prev(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
decode!(self.inner.prev())
}
/// Returns the last `(key, value)` pair.
pub fn last(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
decode!(self.inner.last())
}
/// Returns the current `(key, value)` pair of the cursor.
pub fn current(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
decode!(self.inner.get_current())
}
/// Returns an iterator starting at a key greater or equal than `start_key`.
pub fn walk(
mut self,
start_key: T::Key,
) -> Result<
impl Iterator<Item = Result<(<T as Table>::Key, <T as Table>::Value), KVError>>,
KVError,
>
where
T::Key: Decode,
{
let start = self.inner.set_range(start_key.encode().as_ref())?.map(decoder::<T>);
Ok(Walker { cursor: self, start })
}
}
impl<'txn, K, T> Cursor<'txn, K, T>
where
K: TransactionKind,
T: DupSort,
{
/// Returns the next `(key, value)` pair of a DUPSORT table.
pub fn next_dup(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
decode!(self.inner.next_dup())
}
/// Returns the next `(key, value)` pair skipping the duplicates.
pub fn next_no_dup(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
decode!(self.inner.next_nodup())
}
/// Returns the next `value` of a duplicate `key`.
pub fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
self.inner.next_dup()?.map(decode_value::<T>).transpose()
}
/// Returns an iterator starting at a key greater or equal than `start_key` of a DUPSORT table.
pub fn walk_dup(
mut self,
key: T::Key,
subkey: T::SubKey,
) -> Result<impl Iterator<Item = Result<<T as Table>::Value, KVError>>, KVError> {
let start = self
.inner
.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())?
.map(decode_one::<T>);
Ok(DupWalker { cursor: self, start })
}
}
/// Provides an iterator to `Cursor` when handling `Table`.
#[derive(Debug)]
pub struct Walker<'a, K: TransactionKind, T: Table> {
/// Cursor to be used to walk through the table.
pub cursor: Cursor<'a, K, T>,
/// `(key, value)` where to start the walk.
pub start: IterPairResult<T>,
}
impl<'tx, K: TransactionKind, T: Table> std::iter::Iterator for Walker<'tx, K, T>
where
T::Key: Decode,
{
type Item = Result<(T::Key, T::Value), KVError>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.start.take();
if start.is_some() {
return start
}
self.cursor.next().transpose()
}
}
/// Provides an iterator to `Cursor` when handling a `DupSort` table.
#[derive(Debug)]
pub struct DupWalker<'a, K: TransactionKind, T: DupSort> {
/// Cursor to be used to walk through the table.
pub cursor: Cursor<'a, K, T>,
/// Value where to start the walk.
pub start: Option<Result<T::Value, KVError>>,
}
impl<'tx, K: TransactionKind, T: DupSort> std::iter::Iterator for DupWalker<'tx, K, T> {
type Item = Result<T::Value, KVError>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.start.take();
if start.is_some() {
return start
}
self.cursor.next_dup_val().transpose()
}
}

36
crates/db/src/kv/error.rs Normal file
View File

@ -0,0 +1,36 @@
//! KVError declaration.
use libmdbx::Error;
use thiserror::Error;
/// KV error type.
#[derive(Debug, Error)]
pub enum KVError {
/// Generic MDBX error.
#[error("MDBX error: {0:?}")]
MDBX(#[from] Error),
/// Failed to open MDBX file.
#[error("{0:?}")]
DatabaseLocation(Error),
/// Failed to create a table in database.
#[error("{0:?}")]
TableCreation(Error),
/// Failed to insert a value into a table.
#[error("{0:?}")]
Put(Error),
/// Failed to get a value into a table.
#[error("{0:?}")]
Get(Error),
/// Failed to delete a `(key, vakue)` pair into a table.
#[error("{0:?}")]
Delete(Error),
/// Failed to commit transaction changes into the database.
#[error("{0:?}")]
Commit(Error),
/// Failed to initiate a MDBX transaction.
#[error("{0:?}")]
InitTransaction(Error),
/// Failed to decode or encode a key or value coming from a table..
#[error("{0:?}")]
InvalidValue(Option<String>),
}

215
crates/db/src/kv/mod.rs Normal file
View File

@ -0,0 +1,215 @@
//! Module that interacts with MDBX.
use crate::utils::{default_page_size, TableType};
use libmdbx::{
DatabaseFlags, Environment, EnvironmentFlags, EnvironmentKind, Geometry, Mode, PageSize,
SyncMode, RO, RW,
};
use std::{ops::Deref, path::Path};
pub mod table;
use table::{Decode, DupSort, Encode, Table};
pub mod tables;
use tables::TABLES;
pub mod cursor;
pub mod tx;
use tx::Tx;
mod error;
pub use error::KVError;
/// Environment used when opening a MDBX environment. RO/RW.
#[derive(Debug)]
pub enum EnvKind {
/// Read-only MDBX environment.
RO,
/// Read-write MDBX environment.
RW,
}
/// Wrapper for the libmdbx environment.
#[derive(Debug)]
pub struct Env<E: EnvironmentKind> {
/// Libmdbx-sys environment.
pub inner: Environment<E>,
}
impl<E: EnvironmentKind> Env<E> {
/// Opens the database at the specified path with the given `EnvKind`.
///
/// It does not create the tables, for that call [`create_tables`].
pub fn open(path: &Path, kind: EnvKind) -> Result<Env<E>, KVError> {
let mode = match kind {
EnvKind::RO => Mode::ReadOnly,
EnvKind::RW => Mode::ReadWrite { sync_mode: SyncMode::Durable },
};
let env = Env {
inner: Environment::new()
.set_max_dbs(TABLES.len())
.set_geometry(Geometry {
size: Some(0..0x100000), // TODO: reevaluate
growth_step: Some(0x100000), // TODO: reevaluate
shrink_threshold: None,
page_size: Some(PageSize::Set(default_page_size())),
})
.set_flags(EnvironmentFlags {
mode,
no_rdahead: true, // TODO: reevaluate
coalesce: true,
..Default::default()
})
.open(path)
.map_err(KVError::DatabaseLocation)?,
};
Ok(env)
}
/// Creates all the defined tables, if necessary.
pub fn create_tables(&self) -> Result<(), KVError> {
let tx = self.inner.begin_rw_txn().map_err(KVError::InitTransaction)?;
for (table_type, table) in TABLES {
let flags = match table_type {
TableType::Table => DatabaseFlags::default(),
TableType::DupSort => DatabaseFlags::DUP_SORT,
};
tx.create_db(Some(table), flags).map_err(KVError::TableCreation)?;
}
tx.commit()?;
Ok(())
}
}
impl<E: EnvironmentKind> Env<E> {
/// Initiates a read-only transaction. It should be committed or rolled back in the end, so it
/// frees up pages.
pub fn begin_tx(&self) -> Result<Tx<'_, RO, E>, KVError> {
Ok(Tx::new(self.inner.begin_ro_txn().map_err(KVError::InitTransaction)?))
}
/// Initiates a read-write transaction. It should be committed or rolled back in the end.
pub fn begin_mut_tx(&self) -> Result<Tx<'_, RW, E>, KVError> {
Ok(Tx::new(self.inner.begin_rw_txn().map_err(KVError::InitTransaction)?))
}
/// Takes a function and passes a read-only transaction into it, making sure it's closed in the
/// end of the execution.
pub fn view<T, F>(&self, f: F) -> Result<T, KVError>
where
F: Fn(&Tx<'_, RO, E>) -> T,
{
let tx = self.begin_tx()?;
let res = f(&tx);
tx.commit()?;
Ok(res)
}
/// Takes a function and passes a write-read transaction into it, making sure it's committed in
/// the end of the execution.
pub fn update<T, F>(&self, f: F) -> Result<T, KVError>
where
F: Fn(&Tx<'_, RW, E>) -> T,
{
let tx = self.begin_mut_tx()?;
let res = f(&tx);
tx.commit()?;
Ok(res)
}
}
impl<E: EnvironmentKind> Deref for Env<E> {
type Target = libmdbx::Environment<E>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[cfg(test)]
mod tests {
use super::{tables::PlainState, Env, EnvKind};
use libmdbx::{NoWriteMap, WriteMap};
use reth_primitives::Address;
use std::str::FromStr;
use tempfile::TempDir;
const ERROR_DB_CREATION: &str = "Not able to create the mdbx file.";
const ERROR_DB_OPEN: &str = "Not able to open existing mdbx file.";
const ERROR_TABLE_CREATION: &str = "Not able to create tables in the database.";
const ERROR_PUT: &str = "Not able to insert value into table.";
const ERROR_GET: &str = "Not able to get value from table.";
const ERROR_COMMIT: &str = "Not able to commit transaction.";
const ERROR_RETURN_VALUE: &str = "Mismatching result.";
const ERROR_INIT_TX: &str = "Failed to create a MDBX transaction.";
const ERROR_ETH_ADDRESS: &str = "Invalid address.";
const ERROR_TEMPDIR: &str = "Not able to create a temporary directory.";
#[test]
fn db_creation() {
Env::<NoWriteMap>::open(&TempDir::new().expect(ERROR_TEMPDIR).into_path(), EnvKind::RW)
.expect(ERROR_DB_CREATION);
}
#[test]
fn db_manual_put_get() {
let env =
Env::<NoWriteMap>::open(&TempDir::new().expect(ERROR_TEMPDIR).into_path(), EnvKind::RW)
.expect(ERROR_DB_CREATION);
env.create_tables().expect(ERROR_TABLE_CREATION);
let value = vec![1, 3, 3, 7];
let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
.expect(ERROR_ETH_ADDRESS);
// PUT
let tx = env.begin_mut_tx().expect(ERROR_INIT_TX);
tx.put(PlainState, key, value.clone()).expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
// GET
let tx = env.begin_tx().expect(ERROR_INIT_TX);
let result = tx.get(PlainState, key).expect(ERROR_GET);
assert!(result.expect(ERROR_RETURN_VALUE) == value);
tx.commit().expect(ERROR_COMMIT);
}
#[test]
fn db_closure_put_get() {
let path = TempDir::new().expect(ERROR_TEMPDIR).into_path();
let value = vec![1, 3, 3, 7];
let key = Address::from_str("0xa2c122be93b0074270ebee7f6b7292c7deb45047")
.expect(ERROR_ETH_ADDRESS);
{
let env = Env::<WriteMap>::open(&path, EnvKind::RW).expect(ERROR_DB_OPEN);
env.create_tables().expect(ERROR_TABLE_CREATION);
// PUT
let result = env.update(|tx| {
tx.put(PlainState, key, value.clone()).expect(ERROR_PUT);
200
});
assert!(result.expect(ERROR_RETURN_VALUE) == 200);
}
let env = Env::<WriteMap>::open(&path, EnvKind::RO).expect(ERROR_DB_CREATION);
// GET
let result = env.view(|tx| tx.get(PlainState, key).expect(ERROR_GET)).expect(ERROR_GET);
assert!(result == Some(value))
}
}

134
crates/db/src/kv/table.rs Normal file
View File

@ -0,0 +1,134 @@
//! Table traits.
use super::KVError;
use bytes::Bytes;
use reth_primitives::{Address, U256};
use std::fmt::Debug;
/// Trait that will transform the data to be saved in the DB.
pub trait Encode: Send + Sync + Sized + Debug {
/// Encoded type.
type Encoded: AsRef<[u8]> + Send + Sync;
/// Decodes data going into the database.
fn encode(self) -> Self::Encoded;
}
/// Trait that will transform the data to be read from the DB.
pub trait Decode: Send + Sync + Sized + Debug {
/// Decodes data coming from the database.
fn decode(value: &[u8]) -> Result<Self, KVError>;
}
/// Generic trait that enforces the database value to implement [`Encode`] and [`Decode`].
pub trait Object: Encode + Decode {}
impl<T> Object for T where T: Encode + Decode {}
/// Generic trait that a database table should follow.
pub trait Table: Send + Sync + Debug + 'static {
/// Key element of `Table`.
type Key: Encode;
/// Value element of `Table`.
type Value: Object;
/// Seek Key element of `Table`.
type SeekKey: Encode;
/// Return name as it is present inside the MDBX.
fn name(&self) -> &'static str;
}
/// DupSort allows for keys not to be repeated in the database,
/// for more check: https://libmdbx.dqdkfa.ru/usage.html#autotoc_md48
pub trait DupSort: Table {
/// Subkey type. For more check https://libmdbx.dqdkfa.ru/usage.html#autotoc_md48
type SubKey: Object;
}
impl Encode for Vec<u8> {
type Encoded = Self;
fn encode(self) -> Self::Encoded {
self
}
}
impl Decode for Vec<u8> {
fn decode(value: &[u8]) -> Result<Self, KVError> {
Ok(value.to_vec())
}
}
impl Encode for Bytes {
type Encoded = Self;
fn encode(self) -> Self::Encoded {
self
}
}
impl Decode for Bytes {
fn decode(value: &[u8]) -> Result<Self, KVError> {
Ok(value.to_vec().into())
}
}
impl Encode for Address {
type Encoded = [u8; 20];
fn encode(self) -> Self::Encoded {
self.0
}
}
impl Decode for Address {
fn decode(value: &[u8]) -> Result<Self, KVError> {
Ok(Address::from_slice(value))
}
}
impl Encode for u16 {
type Encoded = [u8; 2];
fn encode(self) -> Self::Encoded {
self.to_be_bytes()
}
}
impl Decode for u16 {
fn decode(value: &[u8]) -> Result<Self, KVError> {
unsafe { Ok(u16::from_be_bytes(*(value.as_ptr() as *const [_; 2]))) }
}
}
impl Encode for u64 {
type Encoded = [u8; 8];
fn encode(self) -> Self::Encoded {
self.to_be_bytes()
}
}
impl Decode for u64 {
fn decode(value: &[u8]) -> Result<Self, KVError> {
unsafe { Ok(u64::from_be_bytes(*(value.as_ptr() as *const [_; 8]))) }
}
}
impl Encode for U256 {
type Encoded = [u8; 32];
fn encode(self) -> Self::Encoded {
let mut result = [0; 32];
self.to_big_endian(&mut result);
result
}
}
impl Decode for U256 {
fn decode(value: &[u8]) -> Result<Self, KVError> {
let mut result = [0; 32];
result.copy_from_slice(value);
Ok(Self::from_big_endian(&result))
}
}

115
crates/db/src/kv/tables.rs Normal file
View File

@ -0,0 +1,115 @@
//! Declaration of all MDBX tables.
use crate::utils::TableType;
use reth_primitives::{Address, U256};
/// Default tables that should be present inside database.
pub const TABLES: [(TableType, &str); 17] = [
(TableType::Table, CanonicalHeaders::const_name()),
(TableType::Table, HeaderTD::const_name()),
(TableType::Table, HeaderNumbers::const_name()),
(TableType::Table, Headers::const_name()),
(TableType::Table, BlockBodies::const_name()),
(TableType::Table, CumulativeTxCount::const_name()),
(TableType::Table, NonCanonicalTransactions::const_name()),
(TableType::Table, Transactions::const_name()),
(TableType::Table, Receipts::const_name()),
(TableType::Table, Logs::const_name()),
(TableType::Table, PlainState::const_name()),
(TableType::Table, AccountHistory::const_name()),
(TableType::Table, StorageHistory::const_name()),
(TableType::DupSort, AccountChangeSet::const_name()),
(TableType::DupSort, StorageChangeSet::const_name()),
(TableType::Table, TxSenders::const_name()),
(TableType::Table, Config::const_name()),
];
#[macro_export]
/// Macro to declare all necessary tables.
macro_rules! table {
($name:ident => $key:ty => $value:ty => $seek:ty) => {
/// $name MDBX table.
#[derive(Clone, Copy, Debug, Default)]
pub struct $name;
impl $crate::kv::table::Table for $name {
type Key = $key;
type Value = $value;
type SeekKey = $seek;
/// Return $name as it is present inside the database.
fn name(&self) -> &'static str {
$name::const_name()
}
}
impl $name {
/// Return $name as it is present inside the database.
pub const fn const_name() -> &'static str {
stringify!($name)
}
}
impl std::fmt::Display for $name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", stringify!($name))
}
}
};
($name:ident => $key:ty => $value:ty) => {
table!($name => $key => $value => $key);
};
}
//
// TABLE DEFINITIONS
//
table!(CanonicalHeaders => BNum => HeaderHash);
table!(HeaderTD => BNum_BHash => RlpTotalDifficulty);
table!(HeaderNumbers => BNum_BHash => BNum);
table!(Headers => BNum_BHash => RlpHeader);
table!(BlockBodies => BNum_BHash => NumTxesInBlock);
table!(CumulativeTxCount => BNum_BHash => u64); // TODO U256?
table!(NonCanonicalTransactions => BNum_BHash_TxId => RlpTxBody);
table!(Transactions => TxId => RlpTxBody); // Canonical only
table!(Receipts => TxId => Receipt); // Canonical only
table!(Logs => TxId => Receipt); // Canonical only
table!(PlainState => PlainStateKey => Vec<u8>);
table!(AccountHistory => Address => TxIdList);
table!(StorageHistory => Address_StorageKey => TxIdList);
table!(AccountChangeSet => TxId => AccountBeforeTx);
table!(StorageChangeSet => TxId => StorageKeyBeforeTx);
table!(TxSenders => TxId => Address); // Is it necessary?
table!(Config => ConfigKey => ConfigValue);
//
// TODO: Temporary types, until they're properly defined alongside with the Encode and Decode Trait
//
type ConfigKey = Vec<u8>;
type ConfigValue = Vec<u8>;
#[allow(non_camel_case_types)]
type BNum_BHash = Vec<u8>;
#[allow(non_camel_case_types)]
type BNum_BHash_TxId = Vec<u8>;
type RlpHeader = Vec<u8>;
type RlpTotalDifficulty = Vec<u8>;
type RlpTxBody = Vec<u8>;
type Receipt = Vec<u8>;
type NumTxesInBlock = u16; // TODO can it be u16
type BNum = u64; // TODO check size
type TxId = u64; // TODO check size
type HeaderHash = U256;
type PlainStateKey = Address; // TODO new type will have to account for address_incarna_skey as well
type TxIdList = Vec<u8>;
#[allow(non_camel_case_types)]
type Address_StorageKey = Vec<u8>;
type AccountBeforeTx = Vec<u8>;
type StorageKeyBeforeTx = Vec<u8>;

109
crates/db/src/kv/tx.rs Normal file
View File

@ -0,0 +1,109 @@
//! Transaction wrapper for libmdbx-sys.
use crate::{
kv::{
cursor::{Cursor, ValueOnlyResult},
table::{Encode, Table},
KVError,
},
utils::decode_one,
};
use libmdbx::{EnvironmentKind, Transaction, TransactionKind, WriteFlags, RW};
use std::marker::PhantomData;
/// Wrapper for the libmdbx transaction.
#[derive(Debug)]
pub struct Tx<'a, K: TransactionKind, E: EnvironmentKind> {
/// Libmdbx-sys transaction.
pub inner: Transaction<'a, K, E>,
}
impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> {
/// Creates new `Tx` object with a `RO` or `RW` transaction.
pub fn new<'a>(inner: Transaction<'a, K, E>) -> Self
where
'a: 'env,
{
Self { inner }
}
/// Gets this transaction ID.
pub fn id(&self) -> u64 {
self.inner.id()
}
/// Open cursor on `table`.
pub fn cursor<'a, T: Table>(&'a self, table: T) -> Result<Cursor<'a, K, T>, KVError>
where
'env: 'a,
T: Table,
{
let table_name = table.name();
Ok(Cursor {
inner: self.inner.cursor(&self.inner.open_db(Some(table_name))?)?,
table: table_name,
_dbi: PhantomData,
})
}
/// Gets value associated with `key` on `table`. If it's a DUPSORT table, then returns the first
/// entry.
pub fn get<T: Table>(&self, table: T, key: T::Key) -> ValueOnlyResult<T> {
self.inner
.get(&self.inner.open_db(Some(table.name()))?, key.encode().as_ref())?
.map(decode_one::<T>)
.transpose()
}
/// Saves all changes and frees up storage memory.
pub fn commit(self) -> Result<bool, KVError> {
self.inner.commit().map_err(KVError::Commit)
}
}
impl<'a, E: EnvironmentKind> Tx<'a, RW, E> {
/// Opens `table` and inserts `(key, value)` pair. If the `key` already exists, it replaces the
/// value it if the table doesn't support DUPSORT.
pub fn put<T>(&self, table: T, k: T::Key, v: T::Value) -> Result<(), KVError>
where
T: Table,
{
self.inner
.put(
&self.inner.open_db(Some(table.name()))?,
&k.encode(),
&v.encode(),
WriteFlags::UPSERT,
)
.map_err(KVError::Put)
}
/// Deletes the `(key, value)` entry on `table`. When `value` is `None`, all entries with `key`
/// are to be deleted. Otherwise, only the item matching that data shall be.
pub fn delete<T>(&self, table: T, key: T::Key, value: Option<T::Value>) -> Result<bool, KVError>
where
T: Table,
{
let mut data = None;
let value = value.map(Encode::encode);
if let Some(value) = &value {
data = Some(value.as_ref());
};
self.inner
.del(&self.inner.open_db(Some(table.name()))?, key.encode(), data)
.map_err(KVError::Delete)
}
/// Empties `table`.
pub fn clear<T>(&self, table: T) -> Result<(), KVError>
where
T: Table,
{
self.inner.clear_db(&self.inner.open_db(Some(table.name()))?)?;
Ok(())
}
}

View File

@ -1,3 +1,5 @@
//! Module that interacts with MDBX.
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
@ -5,10 +7,10 @@
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Database bindings for reth.
// TODO: Actually provide database bindings. For now, this is just a re-export of MDBX.
/// Rust bindings for [MDBX](https://libmdbx.dqdkfa.ru/).
pub mod mdbx {
pub use libmdbx::*;
}
pub mod kv;
mod utils;

0
crates/db/src/static.rs Normal file
View File

55
crates/db/src/utils.rs Normal file
View File

@ -0,0 +1,55 @@
//! Utils crate for `db`.
use crate::kv::{
table::{Decode, Table},
KVError,
};
use std::borrow::Cow;
/// Enum for the type of table present in libmdbx.
#[derive(Debug)]
pub enum TableType {
Table,
DupSort,
}
/// Returns the default page size that can be used in this OS.
pub(crate) fn default_page_size() -> usize {
let os_page_size = page_size::get();
// source: https://gitflic.ru/project/erthink/libmdbx/blob?file=mdbx.h#line-num-821
let libmdbx_max_page_size = 0x10000;
// May lead to errors if it's reduced further because of the potential size of the
// data.
let min_page_size = 4096;
os_page_size.clamp(min_page_size, libmdbx_max_page_size)
}
/// Helper function to decode a `(key, value)` pair.
pub(crate) fn decoder<'a, T>(
kv: (Cow<'a, [u8]>, Cow<'a, [u8]>),
) -> Result<(T::Key, T::Value), KVError>
where
T: Table,
T::Key: Decode,
{
Ok((Decode::decode(&kv.0)?, Decode::decode(&kv.1)?))
}
/// Helper function to decode only a value from a `(key, value)` pair.
pub(crate) fn decode_value<'a, T>(kv: (Cow<'a, [u8]>, Cow<'a, [u8]>)) -> Result<T::Value, KVError>
where
T: Table,
{
Decode::decode(&kv.1)
}
/// Helper function to decode a value. It can be a key or subkey.
pub(crate) fn decode_one<T>(value: Cow<'_, [u8]>) -> Result<T::Value, KVError>
where
T: Table,
{
Decode::decode(&value)
}