feat(db): Database Abstraction (#101)

* database tx traits

* wip build passes

* Db and tx results

* nightly and db GAT

* Impl tx, wip cursor

* Move Decode to Table Key, working cursor trait

* wip dupsort

* build all Cursor abstraction

* cleanup

* wip cleanup

* old stages

* codecs moved o interface,stages wip

* resolve db ref, it builds

* Add tx commit after execution

* fmt

* Remove sync send restriction

* Add missing rw cursor functions

* Cleanup, added missing cursor fn. rust toolchain

* fmt

* add nightly to ci

* deny dead_code, remove unwrap

* rm printfn, stages fix, bench fix
This commit is contained in:
rakita
2022-10-20 17:32:13 +02:00
committed by GitHub
parent e9945b565a
commit 483bcdf9ab
32 changed files with 713 additions and 523 deletions

View File

@ -21,7 +21,7 @@ jobs:
- name: Checkout sources
uses: actions/checkout@v3
- name: Install toolchain
uses: dtolnay/rust-toolchain@stable
uses: dtolnay/rust-toolchain@nightly
- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

10
Cargo.lock generated
View File

@ -2415,11 +2415,13 @@ version = "0.1.0"
dependencies = [
"bytes",
"criterion",
"eyre",
"iai",
"libmdbx",
"page_size",
"parity-scale-codec",
"postcard",
"reth-interfaces",
"reth-primitives",
"serde",
"tempfile",
@ -2501,8 +2503,15 @@ version = "0.1.0"
dependencies = [
"async-trait",
"auto_impl",
"bytes",
"eyre",
"heapless",
"parity-scale-codec",
"postcard",
"reth-primitives",
"reth-rpc-types",
"serde",
"test-fuzz",
"thiserror",
"tokio",
]
@ -2608,6 +2617,7 @@ dependencies = [
"aquamarine",
"async-trait",
"reth-db",
"reth-interfaces",
"reth-primitives",
"tempfile",
"thiserror",

View File

@ -4,6 +4,10 @@
![Github Actions](https://github.com/foundry-rs/reth/workflows/ci/badge.svg)
# Build
To build this project we are currently using rust nightly for GAT support, that is planed to release in rust 1.65 (4.nov.2022). GAT's are used for Database trait in reth-interface.
## Docs
Contributor docs can be found [here](./docs).

View File

@ -10,6 +10,7 @@ description = "Staged syncing primitives used in reth."
[dependencies]
# reth
reth-primitives = { path = "../primitives" }
reth-interfaces = { path = "../interfaces" }
# codecs
serde = { version = "1.0.*", default-features = false }
@ -22,6 +23,7 @@ libmdbx = "0.1.8"
page_size = "0.4.2"
thiserror = "1.0.37"
tempfile = { version = "3.3.0", optional = true }
eyre = "0.6.8"
[dev-dependencies]
tempfile = "3.3.0"
@ -29,6 +31,8 @@ test-fuzz = "3.0.4"
criterion = "0.4.0"
iai = "0.1.1"
reth-interfaces = { path = "../interfaces",features=["bench"] }
[features]
test-utils = ["tempfile"]
bench-postcard = ["bench"]

View File

@ -7,10 +7,11 @@ macro_rules! impl_criterion_encoding_benchmark {
let mut size = 0;
c.bench_function(stringify!($name), |b| {
b.iter(|| {
let encoded_size = reth_db::kv::codecs::fuzz::Header::encode_and_decode(
black_box(reth_primitives::Header::default()),
)
.0;
let encoded_size =
reth_interfaces::db::codecs::fuzz::Header::encode_and_decode(black_box(
reth_primitives::Header::default(),
))
.0;
if size == 0 {
size = encoded_size;

View File

@ -1,10 +1,11 @@
use iai::{black_box, main};
use reth_interfaces::db;
/// Benchmarks the encoding and decoding of `Header` using iai.
macro_rules! impl_iai_encoding_benchmark {
($name:tt) => {
fn $name() {
reth_db::kv::codecs::fuzz::Header::encode_and_decode(black_box(
db::codecs::fuzz::Header::encode_and_decode(black_box(
reth_primitives::Header::default(),
));
}

View File

@ -1,18 +1,23 @@
//! Cursor wrapper for libmdbx-sys.
use super::error::KVError;
use crate::{
kv::{Decode, DupSort, Encode, Table},
utils::*,
use crate::utils::*;
use libmdbx::{self, TransactionKind, WriteFlags, RO, RW};
use reth_interfaces::db::{
DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupSort, DupWalker, Encode, Error, Table,
Walker,
};
use libmdbx::{self, TransactionKind, WriteFlags, RW};
/// Alias type for a `(key, value)` result coming from a cursor.
pub type PairResult<T> = Result<Option<(<T as Table>::Key, <T as Table>::Value)>, KVError>;
pub type PairResult<T> = Result<Option<(<T as Table>::Key, <T as Table>::Value)>, Error>;
/// Alias type for a `(key, value)` result coming from an iterator.
pub type IterPairResult<T> = Option<Result<(<T as Table>::Key, <T as Table>::Value), KVError>>;
pub type IterPairResult<T> = Option<Result<(<T as Table>::Key, <T as Table>::Value), Error>>;
/// Alias type for a value result coming from a cursor without its key.
pub type ValueOnlyResult<T> = Result<Option<<T as Table>::Value>, KVError>;
pub type ValueOnlyResult<T> = Result<Option<<T as Table>::Value>, Error>;
/// Read only Cursor.
pub type CursorRO<'tx, T> = Cursor<'tx, RO, T>;
/// Read write cursor.
pub type CursorRW<'tx, T> = Cursor<'tx, RW, T>;
/// Cursor wrapper to access KV items.
#[derive(Debug)]
@ -29,175 +34,110 @@ pub struct Cursor<'tx, K: TransactionKind, T: Table> {
#[macro_export]
macro_rules! decode {
($v:expr) => {
$v?.map(decoder::<T>).transpose()
$v.map_err(|e| Error::Decode(e.into()))?.map(decoder::<T>).transpose()
};
}
impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> {
/// Returns the first `(key, value)` pair.
pub fn first(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T> {
fn first(&mut self) -> reth_interfaces::db::PairResult<T> {
decode!(self.inner.first())
}
/// Seeks for a `(key, value)` pair greater or equal than `key`.
pub fn seek(&mut self, key: T::SeekKey) -> PairResult<T>
where
T::Key: Decode,
{
fn seek(&mut self, key: <T as Table>::SeekKey) -> reth_interfaces::db::PairResult<T> {
decode!(self.inner.set_range(key.encode().as_ref()))
}
/// Seeks for the exact `(key, value)` pair with `key`.
pub fn seek_exact(&mut self, key: T::Key) -> PairResult<T>
where
T::Key: Decode,
{
fn seek_exact(&mut self, key: <T as Table>::Key) -> reth_interfaces::db::PairResult<T> {
decode!(self.inner.set_key(key.encode().as_ref()))
}
/// Returns the next `(key, value)` pair.
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
fn next(&mut self) -> reth_interfaces::db::PairResult<T> {
decode!(self.inner.next())
}
/// Returns the previous `(key, value)` pair.
pub fn prev(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
fn prev(&mut self) -> reth_interfaces::db::PairResult<T> {
decode!(self.inner.prev())
}
/// Returns the last `(key, value)` pair.
pub fn last(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
fn last(&mut self) -> reth_interfaces::db::PairResult<T> {
decode!(self.inner.last())
}
/// Returns the current `(key, value)` pair of the cursor.
pub fn current(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
fn current(&mut self) -> reth_interfaces::db::PairResult<T> {
decode!(self.inner.get_current())
}
/// Returns an iterator starting at a key greater or equal than `start_key`.
pub fn walk(
mut self,
start_key: T::Key,
) -> Result<
impl Iterator<Item = Result<(<T as Table>::Key, <T as Table>::Value), KVError>>,
KVError,
>
where
T::Key: Decode,
{
let start = self.inner.set_range(start_key.encode().as_ref())?.map(decoder::<T>);
fn walk(&'tx mut self, start_key: <T as Table>::Key) -> Result<Walker<'tx, T>, Error> {
let start = self
.inner
.set_range(start_key.encode().as_ref())
.map_err(|e| Error::Internal(e.into()))?
.map(decoder::<T>);
Ok(Walker { cursor: self, start })
Ok(Walker::<'tx, T> { cursor: self, start })
}
}
impl<'tx, T: Table> Cursor<'tx, RW, T> {
/// Inserts a `(key, value)` to the database. Repositions the cursor to the new item
pub fn put(&mut self, k: T::Key, v: T::Value, f: Option<WriteFlags>) -> Result<(), KVError> {
self.inner
.put(k.encode().as_ref(), v.encode().as_ref(), f.unwrap_or_default())
.map_err(KVError::Put)
}
}
impl<'txn, K, T> Cursor<'txn, K, T>
where
K: TransactionKind,
T: DupSort,
{
impl<'tx, K: TransactionKind, T: DupSort> DbDupCursorRO<'tx, T> for Cursor<'tx, K, T> {
/// Returns the next `(key, value)` pair of a DUPSORT table.
pub fn next_dup(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
fn next_dup(&mut self) -> PairResult<T> {
decode!(self.inner.next_dup())
}
/// Returns the next `(key, value)` pair skipping the duplicates.
pub fn next_no_dup(&mut self) -> PairResult<T>
where
T::Key: Decode,
{
fn next_no_dup(&mut self) -> PairResult<T> {
decode!(self.inner.next_nodup())
}
/// Returns the next `value` of a duplicate `key`.
pub fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
self.inner.next_dup()?.map(decode_value::<T>).transpose()
fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
self.inner
.next_dup()
.map_err(|e| Error::Internal(e.into()))?
.map(decode_value::<T>)
.transpose()
}
/// Returns an iterator starting at a key greater or equal than `start_key` of a DUPSORT table.
pub fn walk_dup(
mut self,
key: T::Key,
subkey: T::SubKey,
) -> Result<impl Iterator<Item = Result<<T as Table>::Value, KVError>>, KVError> {
fn walk_dup(&'tx mut self, key: T::Key, subkey: T::SubKey) -> Result<DupWalker<'tx, T>, Error> {
let start = self
.inner
.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())?
.get_both_range(key.encode().as_ref(), subkey.encode().as_ref())
.map_err(|e| Error::Internal(e.into()))?
.map(decode_one::<T>);
Ok(DupWalker { cursor: self, start })
Ok(DupWalker::<'tx, T> { cursor: self, start })
}
}
/// Provides an iterator to `Cursor` when handling `Table`.
#[derive(Debug)]
pub struct Walker<'a, K: TransactionKind, T: Table> {
/// Cursor to be used to walk through the table.
pub cursor: Cursor<'a, K, T>,
/// `(key, value)` where to start the walk.
pub start: IterPairResult<T>,
}
impl<'tx, T: Table> DbCursorRW<'tx, T> for Cursor<'tx, RW, T> {
/// Database operation that will update an existing row if a specified value already
/// exists in a table, and insert a new row if the specified value doesn't already exist
fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner
.put(key.encode().as_ref(), value.encode().as_ref(), WriteFlags::UPSERT)
.map_err(|e| Error::Internal(e.into()))
}
impl<'tx, K: TransactionKind, T: Table> std::iter::Iterator for Walker<'tx, K, T>
where
T::Key: Decode,
{
type Item = Result<(T::Key, T::Value), KVError>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.start.take();
if start.is_some() {
return start
}
fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner
.put(key.encode().as_ref(), value.encode().as_ref(), WriteFlags::APPEND)
.map_err(|e| Error::Internal(e.into()))
}
self.cursor.next().transpose()
fn delete_current(&mut self) -> Result<(), Error> {
self.inner.del(WriteFlags::CURRENT).map_err(|e| Error::Internal(e.into()))
}
}
/// Provides an iterator to `Cursor` when handling a `DupSort` table.
#[derive(Debug)]
pub struct DupWalker<'a, K: TransactionKind, T: DupSort> {
/// Cursor to be used to walk through the table.
pub cursor: Cursor<'a, K, T>,
/// Value where to start the walk.
pub start: Option<Result<T::Value, KVError>>,
}
impl<'tx, T: DupSort> DbDupCursorRW<'tx, T> for Cursor<'tx, RW, T> {
fn delete_current_duplicates(&mut self) -> Result<(), Error> {
self.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| Error::Internal(e.into()))
}
impl<'tx, K: TransactionKind, T: DupSort> std::iter::Iterator for DupWalker<'tx, K, T> {
type Item = Result<T::Value, KVError>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.start.take();
if start.is_some() {
return start
}
self.cursor.next_dup_val().transpose()
fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner
.put(key.encode().as_ref(), value.encode().as_ref(), WriteFlags::APPEND_DUP)
.map_err(|e| Error::Internal(e.into()))
}
}

View File

@ -1,36 +0,0 @@
//! KVError declaration.
use libmdbx::Error;
use thiserror::Error;
/// KV error type.
#[derive(Debug, Error)]
pub enum KVError {
/// Generic MDBX error.
#[error("MDBX error: {0:?}")]
MDBX(#[from] Error),
/// Failed to open MDBX file.
#[error("{0:?}")]
DatabaseLocation(Error),
/// Failed to create a table in database.
#[error("{0:?}")]
TableCreation(Error),
/// Failed to insert a value into a table.
#[error("{0:?}")]
Put(Error),
/// Failed to get a value into a table.
#[error("{0:?}")]
Get(Error),
/// Failed to delete a `(key, vakue)` pair into a table.
#[error("{0:?}")]
Delete(Error),
/// Failed to commit transaction changes into the database.
#[error("{0:?}")]
Commit(Error),
/// Failed to initiate a MDBX transaction.
#[error("{0:?}")]
InitTransaction(Error),
/// Failed to decode or encode a key or value coming from a table..
#[error("Error decoding value.")]
InvalidValue,
}

View File

@ -1,32 +1,21 @@
//! Module that interacts with MDBX.
use crate::utils::{default_page_size, TableType};
use crate::utils::default_page_size;
use libmdbx::{
DatabaseFlags, Environment, EnvironmentFlags, EnvironmentKind, Geometry, Mode, PageSize,
SyncMode, RO, RW,
};
use reth_interfaces::db::{
tables::{TableType, TABLES},
Database, Error,
};
use std::{ops::Deref, path::Path};
pub mod table;
use table::{Decode, DupSort, Encode, Table};
pub mod tables;
use tables::TABLES;
pub mod cursor;
pub mod models;
pub use models::*;
pub mod tx;
use tx::Tx;
mod error;
pub use error::KVError;
// Made public so `benches` can access it.
pub mod codecs;
/// Environment used when opening a MDBX environment. RO/RW.
#[derive(Debug)]
pub enum EnvKind {
@ -43,11 +32,24 @@ pub struct Env<E: EnvironmentKind> {
pub inner: Environment<E>,
}
impl<E: EnvironmentKind> Database for Env<E> {
type TX<'a> = tx::Tx<'a, RO, E>;
type TXMut<'a> = tx::Tx<'a, RW, E>;
fn tx(&self) -> Result<Self::TX<'_>, Error> {
Ok(Tx::new(self.inner.begin_ro_txn().map_err(|e| Error::Internal(e.into()))?))
}
fn tx_mut(&self) -> Result<Self::TXMut<'_>, Error> {
Ok(Tx::new(self.inner.begin_rw_txn().map_err(|e| Error::Internal(e.into()))?))
}
}
impl<E: EnvironmentKind> Env<E> {
/// Opens the database at the specified path with the given `EnvKind`.
///
/// It does not create the tables, for that call [`create_tables`].
pub fn open(path: &Path, kind: EnvKind) -> Result<Env<E>, KVError> {
pub fn open(path: &Path, kind: EnvKind) -> Result<Env<E>, Error> {
let mode = match kind {
EnvKind::RO => Mode::ReadOnly,
EnvKind::RW => Mode::ReadWrite { sync_mode: SyncMode::Durable },
@ -69,15 +71,15 @@ impl<E: EnvironmentKind> Env<E> {
..Default::default()
})
.open(path)
.map_err(KVError::DatabaseLocation)?,
.map_err(|e| Error::Internal(e.into()))?,
};
Ok(env)
}
/// Creates all the defined tables, if necessary.
pub fn create_tables(&self) -> Result<(), KVError> {
let tx = self.inner.begin_rw_txn().map_err(KVError::InitTransaction)?;
pub fn create_tables(&self) -> Result<(), Error> {
let tx = self.inner.begin_rw_txn().map_err(|e| Error::Initialization(e.into()))?;
for (table_type, table) in TABLES {
let flags = match table_type {
@ -85,56 +87,15 @@ impl<E: EnvironmentKind> Env<E> {
TableType::DupSort => DatabaseFlags::DUP_SORT,
};
tx.create_db(Some(table), flags).map_err(KVError::TableCreation)?;
tx.create_db(Some(table), flags).map_err(|e| Error::Initialization(e.into()))?;
}
tx.commit()?;
tx.commit().map_err(|e| Error::Initialization(e.into()))?;
Ok(())
}
}
impl<E: EnvironmentKind> Env<E> {
/// Initiates a read-only transaction. It should be committed or rolled back in the end, so it
/// frees up pages.
pub fn begin_tx(&self) -> Result<Tx<'_, RO, E>, KVError> {
Ok(Tx::new(self.inner.begin_ro_txn().map_err(KVError::InitTransaction)?))
}
/// Initiates a read-write transaction. It should be committed or rolled back in the end.
pub fn begin_mut_tx(&self) -> Result<Tx<'_, RW, E>, KVError> {
Ok(Tx::new(self.inner.begin_rw_txn().map_err(KVError::InitTransaction)?))
}
/// Takes a function and passes a read-only transaction into it, making sure it's closed in the
/// end of the execution.
pub fn view<T, F>(&self, f: F) -> Result<T, KVError>
where
F: Fn(&Tx<'_, RO, E>) -> T,
{
let tx = self.begin_tx()?;
let res = f(&tx);
tx.commit()?;
Ok(res)
}
/// Takes a function and passes a write-read transaction into it, making sure it's committed in
/// the end of the execution.
pub fn update<T, F>(&self, f: F) -> Result<T, KVError>
where
F: Fn(&Tx<'_, RW, E>) -> T,
{
let tx = self.begin_mut_tx()?;
let res = f(&tx);
tx.commit()?;
Ok(res)
}
}
impl<E: EnvironmentKind> Deref for Env<E> {
type Target = libmdbx::Environment<E>;
@ -170,11 +131,12 @@ pub mod test_utils {
#[cfg(test)]
mod tests {
use super::{
tables::{Headers, PlainState},
test_utils, Env, EnvKind,
};
use super::{test_utils, Env, EnvKind};
use libmdbx::{NoWriteMap, WriteMap};
use reth_interfaces::db::{
tables::{Headers, PlainState},
Database, DbTx, DbTxMut,
};
use reth_primitives::{Account, Address, Header, H256, U256};
use std::str::FromStr;
use tempfile::TempDir;
@ -200,12 +162,12 @@ mod tests {
let key = (1u64, H256::zero());
// PUT
let tx = env.begin_mut_tx().expect(ERROR_INIT_TX);
let tx = env.tx_mut().expect(ERROR_INIT_TX);
tx.put::<Headers>(key.into(), value.clone()).expect(ERROR_PUT);
tx.commit().expect(ERROR_COMMIT);
// GET
let tx = env.begin_tx().expect(ERROR_INIT_TX);
let tx = env.tx().expect(ERROR_INIT_TX);
let result = tx.get::<Headers>(key.into()).expect(ERROR_GET);
assert!(result.expect(ERROR_RETURN_VALUE) == value);
tx.commit().expect(ERROR_COMMIT);

View File

@ -1,14 +1,8 @@
//! Transaction wrapper for libmdbx-sys.
use crate::{
kv::{
cursor::{Cursor, ValueOnlyResult},
table::{Encode, Table},
KVError,
},
utils::decode_one,
};
use crate::{kv::cursor::Cursor, utils::decode_one};
use libmdbx::{EnvironmentKind, Transaction, TransactionKind, WriteFlags, RW};
use reth_interfaces::db::{DbTx, DbTxMut, DupSort, Encode, Error, Table};
use std::marker::PhantomData;
/// Wrapper for the libmdbx transaction.
@ -32,52 +26,67 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> {
self.inner.id()
}
/// Open cursor on `table`.
pub fn cursor<'a, T: Table>(&'a self) -> Result<Cursor<'a, K, T>, KVError>
where
'env: 'a,
T: Table,
{
/// Create db Cursor
pub fn new_cursor<T: Table>(&self) -> Result<Cursor<'env, K, T>, Error> {
Ok(Cursor {
inner: self.inner.cursor(&self.inner.open_db(Some(T::NAME))?)?,
inner: self
.inner
.cursor(&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?)
.map_err(|e| Error::Internal(e.into()))?,
table: T::NAME,
_dbi: PhantomData,
})
}
}
/// Gets value associated with `key` on `table`. If it's a DUPSORT table, then returns the first
/// entry.
pub fn get<T: Table>(&self, key: T::Key) -> ValueOnlyResult<T> {
impl<'env, K: TransactionKind, E: EnvironmentKind> DbTx<'env> for Tx<'env, K, E> {
/// Cursor GAT
type Cursor<T: Table> = Cursor<'env, K, T>;
/// DupCursor GAT
type DupCursor<T: DupSort> = Cursor<'env, K, T>;
/// Iterate over read only values in database.
fn cursor<T: Table>(&self) -> Result<Self::Cursor<T>, Error> {
self.new_cursor()
}
/// Iterate over read only values in database.
fn cursor_dup<T: DupSort>(&self) -> Result<Self::DupCursor<T>, Error> {
self.new_cursor()
}
fn commit(self) -> Result<bool, Error> {
self.inner.commit().map_err(|e| Error::Internal(e.into()))
}
fn get<T: Table>(&self, key: T::Key) -> Result<Option<<T as Table>::Value>, Error> {
self.inner
.get(&self.inner.open_db(Some(T::NAME))?, key.encode().as_ref())?
.get(
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?,
key.encode().as_ref(),
)
.map_err(|e| Error::Internal(e.into()))?
.map(decode_one::<T>)
.transpose()
}
/// Saves all changes and frees up storage memory.
pub fn commit(self) -> Result<bool, KVError> {
self.inner.commit().map_err(KVError::Commit)
}
}
impl<'a, E: EnvironmentKind> Tx<'a, RW, E> {
/// Opens `table` and inserts `(key, value)` pair. If the `key` already exists, it replaces the
/// value it if the table doesn't support DUPSORT.
pub fn put<T>(&self, k: T::Key, v: T::Value) -> Result<(), KVError>
where
T: Table,
{
impl<'env, E: EnvironmentKind> DbTxMut<'env> for Tx<'env, RW, E> {
type CursorMut<T: Table> = Cursor<'env, RW, T>;
type DupCursorMut<T: DupSort> = Cursor<'env, RW, T>;
fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), Error> {
self.inner
.put(&self.inner.open_db(Some(T::NAME))?, &k.encode(), &v.encode(), WriteFlags::UPSERT)
.map_err(KVError::Put)
.put(
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?,
&key.encode(),
&value.encode(),
WriteFlags::UPSERT,
)
.map_err(|e| Error::Internal(e.into()))
}
/// Deletes the `(key, value)` entry on `table`. When `value` is `None`, all entries with `key`
/// are to be deleted. Otherwise, only the item matching that data shall be.
pub fn delete<T>(&self, key: T::Key, value: Option<T::Value>) -> Result<bool, KVError>
where
T: Table,
{
fn delete<T: Table>(&self, key: T::Key, value: Option<T::Value>) -> Result<bool, Error> {
let mut data = None;
let value = value.map(Encode::encode);
@ -86,17 +95,27 @@ impl<'a, E: EnvironmentKind> Tx<'a, RW, E> {
};
self.inner
.del(&self.inner.open_db(Some(T::NAME))?, key.encode(), data)
.map_err(KVError::Delete)
.del(
&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?,
key.encode(),
data,
)
.map_err(|e| Error::Internal(e.into()))
}
/// Empties `table`.
pub fn clear<T>(&self) -> Result<(), KVError>
where
T: Table,
{
self.inner.clear_db(&self.inner.open_db(Some(T::NAME))?)?;
fn clear<T: Table>(&self) -> Result<(), Error> {
self.inner
.clear_db(&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?)
.map_err(|e| Error::Internal(e.into()))?;
Ok(())
}
fn cursor_mut<T: Table>(&self) -> Result<Self::CursorMut<T>, Error> {
self.new_cursor()
}
fn cursor_dup_mut<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, Error> {
self.new_cursor()
}
}

View File

@ -1,19 +1,10 @@
//! Utils crate for `db`.
use crate::kv::{
table::{Decode, Table},
KVError,
};
//suse crate::kv::Error;
use bytes::Bytes;
use reth_interfaces::db::{Decode, Error, Table};
use std::borrow::Cow;
/// Enum for the type of table present in libmdbx.
#[derive(Debug)]
pub enum TableType {
Table,
DupSort,
}
/// Returns the default page size that can be used in this OS.
pub(crate) fn default_page_size() -> usize {
let os_page_size = page_size::get();
@ -31,7 +22,7 @@ pub(crate) fn default_page_size() -> usize {
/// Helper function to decode a `(key, value)` pair.
pub(crate) fn decoder<'a, T>(
kv: (Cow<'a, [u8]>, Cow<'a, [u8]>),
) -> Result<(T::Key, T::Value), KVError>
) -> Result<(T::Key, T::Value), Error>
where
T: Table,
T::Key: Decode,
@ -43,7 +34,7 @@ where
}
/// Helper function to decode only a value from a `(key, value)` pair.
pub(crate) fn decode_value<'a, T>(kv: (Cow<'a, [u8]>, Cow<'a, [u8]>)) -> Result<T::Value, KVError>
pub(crate) fn decode_value<'a, T>(kv: (Cow<'a, [u8]>, Cow<'a, [u8]>)) -> Result<T::Value, Error>
where
T: Table,
{
@ -51,7 +42,7 @@ where
}
/// Helper function to decode a value. It can be a key or subkey.
pub(crate) fn decode_one<T>(value: Cow<'_, [u8]>) -> Result<T::Value, KVError>
pub(crate) fn decode_one<T>(value: Cow<'_, [u8]>) -> Result<T::Value, Error>
where
T: Table,
{

View File

@ -11,5 +11,19 @@ reth-primitives = { path = "../primitives" }
reth-rpc-types = { path = "../net/rpc-types" }
async-trait = "0.1.57"
thiserror = "1.0.37"
eyre = "0.6.8"
auto_impl = "1.0"
tokio = { version = "1.21.2", features = ["sync"] }
bytes = "1.2"
# codecs
serde = { version = "1.0.*", default-features = false }
postcard = { version = "1.0.2", features = ["alloc"] }
heapless = "0.7.16"
parity-scale-codec = { version = "3.2.1", features = ["bytes"] }
[dev-dependencies]
test-fuzz = "3.0.4"
[features]
bench = []

View File

@ -10,7 +10,7 @@ macro_rules! impl_fuzzer {
#[cfg(any(test, feature = "bench"))]
pub mod $name {
use reth_primitives::$name;
use crate::kv::table;
use crate::db::table;
/// Encodes and decodes table types returning its encoded size and the decoded object.
pub fn encode_and_decode(obj: $name) -> (usize, $name) {

View File

@ -1,7 +1,7 @@
#![allow(unused)]
use crate::kv::{Decode, Encode, KVError};
use postcard::{from_bytes, to_allocvec};
use crate::db::{Decode, Encode, Error};
use postcard::{from_bytes, to_allocvec, to_vec};
use reth_primitives::*;
// Just add `Serialize` and `Deserialize`, and set impl_heapless_postcard!(T, MaxSize(T))
@ -28,7 +28,7 @@ macro_rules! impl_postcard {
impl Decode for $name {
fn decode<B: Into<bytes::Bytes>>(value: B) -> Result<Self, KVError> {
from_bytes(&value.into()).map_err(|_| KVError::InvalidValue)
from_bytes(&value.into()).map_err(|e| Error::Decode(e.into()))
}
}
)+

View File

@ -1,4 +1,4 @@
use crate::kv::{Decode, Encode, KVError};
use crate::db::{Decode, Encode, Error};
use parity_scale_codec::decode_from_bytes;
use reth_primitives::*;
@ -23,8 +23,8 @@ impl<T> Decode for T
where
T: ScaleOnly + parity_scale_codec::Decode + Sync + Send + std::fmt::Debug,
{
fn decode<B: Into<bytes::Bytes>>(value: B) -> Result<T, KVError> {
decode_from_bytes(value.into()).map_err(|_| KVError::InvalidValue)
fn decode<B: Into<bytes::Bytes>>(value: B) -> Result<T, Error> {
decode_from_bytes(value.into()).map_err(|e| Error::Decode(e.into()))
}
}

View File

@ -0,0 +1,16 @@
/// Database Error
#[derive(Debug, thiserror::Error)]
pub enum Error {
/// Encode errors.
#[error("A table encoding error:{0}")]
Encode(eyre::Error),
/// Decode errors.
#[error("A table decoding error:{0}")]
Decode(eyre::Error),
/// Initialization database error.
#[error("Initialization database error:{0}")]
Initialization(eyre::Error),
/// Internal DB error.
#[error("A internal database error:{0}")]
Internal(eyre::Error),
}

View File

@ -0,0 +1,180 @@
//! Mock database
use std::collections::BTreeMap;
use super::{
Database, DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DbTx, DbTxMut, DupSort, Table,
};
/// Mock database used for testing with inner BTreeMap structure
/// TODO
#[derive(Clone, Default)]
pub struct DatabaseMock {
/// Main data. TODO (Make it table aware)
pub data: BTreeMap<Vec<u8>, Vec<u8>>,
}
impl Database for DatabaseMock {
type TX<'a> = TxMock;
type TXMut<'a> = TxMock;
fn tx(&self) -> Result<Self::TX<'_>, super::Error> {
Ok(TxMock::default())
}
fn tx_mut(&self) -> Result<Self::TXMut<'_>, super::Error> {
Ok(TxMock::default())
}
}
/// Mock read only tx
#[derive(Clone, Default)]
pub struct TxMock {
/// Table representation
_table: BTreeMap<Vec<u8>, Vec<u8>>,
}
impl<'a> DbTx<'a> for TxMock {
type Cursor<T: super::Table> = CursorMock;
type DupCursor<T: super::DupSort> = CursorMock;
fn get<T: super::Table>(&self, _key: T::Key) -> Result<Option<T::Value>, super::Error> {
todo!()
}
fn commit(self) -> Result<bool, super::Error> {
todo!()
}
fn cursor<T: super::Table>(&self) -> Result<Self::Cursor<T>, super::Error> {
todo!()
}
fn cursor_dup<T: super::DupSort>(&self) -> Result<Self::DupCursor<T>, super::Error> {
todo!()
}
}
impl<'a> DbTxMut<'a> for TxMock {
type CursorMut<T: super::Table> = CursorMock;
type DupCursorMut<T: super::DupSort> = CursorMock;
fn put<T: super::Table>(&self, _key: T::Key, _value: T::Value) -> Result<(), super::Error> {
todo!()
}
fn delete<T: super::Table>(
&self,
_key: T::Key,
_value: Option<T::Value>,
) -> Result<bool, super::Error> {
todo!()
}
fn cursor_mut<T: super::Table>(&self) -> Result<Self::CursorMut<T>, super::Error> {
todo!()
}
fn cursor_dup_mut<T: super::DupSort>(&self) -> Result<Self::DupCursorMut<T>, super::Error> {
todo!()
}
fn clear<T: Table>(&self) -> Result<(), super::Error> {
todo!()
}
}
/// CUrsor that iterates over table
pub struct CursorMock {
_cursor: u32,
}
impl<'tx, T: Table> DbCursorRO<'tx, T> for CursorMock {
fn first(&mut self) -> super::PairResult<T> {
todo!()
}
fn seek(&mut self, _key: T::SeekKey) -> super::PairResult<T> {
todo!()
}
fn seek_exact(&mut self, _key: T::Key) -> super::PairResult<T> {
todo!()
}
fn next(&mut self) -> super::PairResult<T> {
todo!()
}
fn prev(&mut self) -> super::PairResult<T> {
todo!()
}
fn last(&mut self) -> super::PairResult<T> {
todo!()
}
fn current(&mut self) -> super::PairResult<T> {
todo!()
}
fn walk(&'tx mut self, _start_key: T::Key) -> Result<super::Walker<'tx, T>, super::Error> {
todo!()
}
}
impl<'tx, T: DupSort> DbDupCursorRO<'tx, T> for CursorMock {
fn next_dup(&mut self) -> super::PairResult<T> {
todo!()
}
fn next_no_dup(&mut self) -> super::PairResult<T> {
todo!()
}
fn next_dup_val(&mut self) -> super::ValueOnlyResult<T> {
todo!()
}
fn walk_dup(
&'tx mut self,
_key: <T>::Key,
_subkey: <T as DupSort>::SubKey,
) -> Result<super::DupWalker<'tx, T>, super::Error> {
todo!()
}
}
impl<'tx, T: Table> DbCursorRW<'tx, T> for CursorMock {
fn upsert(
&mut self,
_key: <T as Table>::Key,
_value: <T as Table>::Value,
) -> Result<(), super::Error> {
todo!()
}
fn append(
&mut self,
_key: <T as Table>::Key,
_value: <T as Table>::Value,
) -> Result<(), super::Error> {
todo!()
}
fn delete_current(&mut self) -> Result<(), super::Error> {
todo!()
}
}
impl<'tx, T: DupSort> DbDupCursorRW<'tx, T> for CursorMock {
fn delete_current_duplicates(&mut self) -> Result<(), super::Error> {
todo!()
}
fn append_dup(&mut self, _key: <T>::Key, _value: <T>::Value) -> Result<(), super::Error> {
todo!()
}
}

View File

@ -0,0 +1,202 @@
pub mod codecs;
mod error;
pub mod mock;
pub mod models;
mod table;
pub mod tables;
pub use error::Error;
pub use table::*;
/// Main Database trait that spawns transactions to be executed.
pub trait Database {
/// RO database transaction
type TX<'a>: DbTx<'a> + Send + Sync
where
Self: 'a;
/// RW database transaction
type TXMut<'a>: DbTxMut<'a> + DbTx<'a> + Send + Sync
where
Self: 'a;
/// Create read only transaction.
fn tx(&self) -> Result<Self::TX<'_>, Error>;
/// Create read write transaction only possible if database is open with write access.
fn tx_mut(&self) -> Result<Self::TXMut<'_>, Error>;
/// Takes a function and passes a read-only transaction into it, making sure it's closed in the
/// end of the execution.
fn view<T, F>(&self, f: F) -> Result<T, Error>
where
F: Fn(&Self::TX<'_>) -> T,
{
let tx = self.tx()?;
let res = f(&tx);
tx.commit()?;
Ok(res)
}
/// Takes a function and passes a write-read transaction into it, making sure it's committed in
/// the end of the execution.
fn update<T, F>(&self, f: F) -> Result<T, Error>
where
F: Fn(&Self::TXMut<'_>) -> T,
{
let tx = self.tx_mut()?;
let res = f(&tx);
tx.commit()?;
Ok(res)
}
}
/// Read only transaction
pub trait DbTx<'a> {
/// Cursor GAT
type Cursor<T: Table>: DbCursorRO<'a, T>;
/// DupCursor GAT
type DupCursor<T: DupSort>: DbDupCursorRO<'a, T> + DbCursorRO<'a, T>;
/// Get value
fn get<T: Table>(&self, key: T::Key) -> Result<Option<T::Value>, Error>;
/// Commit for read only transaction will consume and free transaction and allows
/// freeing of memory pages
fn commit(self) -> Result<bool, Error>;
/// Iterate over read only values in table.
fn cursor<T: Table>(&self) -> Result<Self::Cursor<T>, Error>;
/// Iterate over read only values in dup sorted table.
fn cursor_dup<T: DupSort>(&self) -> Result<Self::DupCursor<T>, Error>;
}
/// Read write transaction that allows writing to database
pub trait DbTxMut<'a> {
/// Cursor GAT
type CursorMut<T: Table>: DbCursorRW<'a, T> + DbCursorRO<'a, T>;
/// DupCursor GAT
type DupCursorMut<T: DupSort>: DbDupCursorRW<'a, T>
+ DbCursorRW<'a, T>
+ DbDupCursorRO<'a, T>
+ DbCursorRO<'a, T>;
/// Put value to database
fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), Error>;
/// Delete value from database
fn delete<T: Table>(&self, key: T::Key, value: Option<T::Value>) -> Result<bool, Error>;
/// Clears database.
fn clear<T: Table>(&self) -> Result<(), Error>;
/// Cursor mut
fn cursor_mut<T: Table>(&self) -> Result<Self::CursorMut<T>, Error>;
/// DupCursor mut.
fn cursor_dup_mut<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, Error>;
}
/// Alias type for a `(key, value)` result coming from a cursor.
pub type PairResult<T> = Result<Option<(<T as Table>::Key, <T as Table>::Value)>, Error>;
/// Alias type for a `(key, value)` result coming from an iterator.
pub type IterPairResult<T> = Option<Result<(<T as Table>::Key, <T as Table>::Value), Error>>;
/// Alias type for a value result coming from a cursor without its key.
pub type ValueOnlyResult<T> = Result<Option<<T as Table>::Value>, Error>;
/// Read only cursor over table.
pub trait DbCursorRO<'tx, T: Table> {
/// First item in table
fn first(&mut self) -> PairResult<T>;
/// Seeks for a `(key, value)` pair greater or equal than `key`.
fn seek(&mut self, key: T::SeekKey) -> PairResult<T>;
/// Seeks for the exact `(key, value)` pair with `key`.
fn seek_exact(&mut self, key: T::Key) -> PairResult<T>;
/// Returns the next `(key, value)` pair.
#[allow(clippy::should_implement_trait)]
fn next(&mut self) -> PairResult<T>;
/// Returns the previous `(key, value)` pair.
fn prev(&mut self) -> PairResult<T>;
/// Returns the last `(key, value)` pair.
fn last(&mut self) -> PairResult<T>;
/// Returns the current `(key, value)` pair of the cursor.
fn current(&mut self) -> PairResult<T>;
/// Returns an iterator starting at a key greater or equal than `start_key`.
fn walk(&'tx mut self, start_key: T::Key) -> Result<Walker<'tx, T>, Error>;
}
/// Read only curor over DupSort table.
pub trait DbDupCursorRO<'tx, T: DupSort> {
/// Returns the next `(key, value)` pair of a DUPSORT table.
fn next_dup(&mut self) -> PairResult<T>;
/// Returns the next `(key, value)` pair skipping the duplicates.
fn next_no_dup(&mut self) -> PairResult<T>;
/// Returns the next `value` of a duplicate `key`.
fn next_dup_val(&mut self) -> ValueOnlyResult<T>;
/// Returns an iterator starting at a key greater or equal than `start_key` of a DUPSORT
/// table.
fn walk_dup(&'tx mut self, key: T::Key, subkey: T::SubKey) -> Result<DupWalker<'tx, T>, Error>;
}
/// Read write cursor over table.
pub trait DbCursorRW<'tx, T: Table> {
/// Database operation that will update an existing row if a specified value already
/// exists in a table, and insert a new row if the specified value doesn't already exist
fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>;
/// Append value to next cursor item
fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>;
/// Delete current value that cursor points to
fn delete_current(&mut self) -> Result<(), Error>;
}
/// Read Write Cursor over DupSorted table.
pub trait DbDupCursorRW<'tx, T: DupSort> {
/// Append value to next cursor item
fn delete_current_duplicates(&mut self) -> Result<(), Error>;
/// Append duplicate value
fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>;
}
/// Provides an iterator to `Cursor` when handling `Table`.
pub struct Walker<'cursor, T: Table> {
/// Cursor to be used to walk through the table.
pub cursor: &'cursor mut dyn DbCursorRO<'cursor, T>,
/// `(key, value)` where to start the walk.
pub start: IterPairResult<T>,
}
impl<'cursor, T: Table> std::iter::Iterator for Walker<'cursor, T> {
type Item = Result<(T::Key, T::Value), Error>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.start.take();
if start.is_some() {
return start
}
self.cursor.next().transpose()
}
}
/// Provides an iterator to `Cursor` when handling a `DupSort` table.
pub struct DupWalker<'cursor, T: DupSort> {
/// Cursor to be used to walk through the table.
pub cursor: &'cursor mut dyn DbDupCursorRO<'cursor, T>,
/// Value where to start the walk.
pub start: Option<Result<T::Value, Error>>,
}
impl<'cursor, T: DupSort> std::iter::Iterator for DupWalker<'cursor, T> {
type Item = Result<T::Value, Error>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.start.take();
if start.is_some() {
return start
}
self.cursor.next_dup_val().transpose()
}
}

View File

@ -1,10 +1,11 @@
//! Block related models and types.
use crate::kv::{
use crate::db::{
table::{Decode, Encode},
KVError,
Error,
};
use bytes::Bytes;
use eyre::eyre;
use reth_primitives::{BlockHash, BlockNumber, H256};
/// Total chain number of transactions. Key for [`CumulativeTxCount`].
@ -53,10 +54,12 @@ impl Encode for BlockNumHash {
}
impl Decode for BlockNumHash {
fn decode<B: Into<Bytes>>(value: B) -> Result<Self, KVError> {
fn decode<B: Into<Bytes>>(value: B) -> Result<Self, Error> {
let value: bytes::Bytes = value.into();
let num = u64::from_be_bytes(value.as_ref().try_into().map_err(|_| KVError::InvalidValue)?);
let num = u64::from_be_bytes(
value.as_ref().try_into().map_err(|_| Error::Decode(eyre!("Into bytes error.")))?,
);
let hash = H256::decode(value.slice(8..))?;
Ok(BlockNumHash((num, hash)))

View File

@ -1,11 +1,10 @@
//! Table traits.
use super::KVError;
use super::Error;
use bytes::Bytes;
use std::{
fmt::Debug,
marker::{Send, Sync},
};
/// Trait that will transform the data to be saved in the DB.
pub trait Encode: Send + Sync + Sized + Debug {
/// Encoded type.
@ -18,7 +17,7 @@ pub trait Encode: Send + Sync + Sized + Debug {
/// Trait that will transform the data to be read from the DB.
pub trait Decode: Send + Sync + Sized + Debug {
/// Decodes data coming from the database.
fn decode<B: Into<Bytes>>(value: B) -> Result<Self, KVError>;
fn decode<B: Into<Bytes>>(value: B) -> Result<Self, Error>;
}
/// Generic trait that enforces the database value to implement [`Encode`] and [`Decode`].
@ -40,11 +39,11 @@ pub trait Table: Send + Sync + Debug + 'static {
/// Key element of `Table`.
///
/// Sorting should be taken into account when encoding this.
type Key: Encode;
type Key: Object;
/// Value element of `Table`.
type Value: Object;
/// Seek Key element of `Table`.
type SeekKey: Encode;
type SeekKey: Object;
}
/// DupSort allows for keys not to be repeated in the database,

View File

@ -1,10 +1,16 @@
//! Declaration of all MDBX tables.
use crate::{
kv::blocks::{BlockNumHash, HeaderHash, NumTransactions, NumTxesInBlock},
utils::TableType,
};
//! Declaration of all Database tables.
use crate::db::models::blocks::{BlockNumHash, HeaderHash, NumTransactions, NumTxesInBlock};
use reth_primitives::{Account, Address, BlockNumber, Header, Receipt};
/// Enum for the type of table present in libmdbx.
#[derive(Debug)]
pub enum TableType {
/// key value table
Table,
/// Duplicate key value table
DupSort,
}
/// Default tables that should be present inside database.
pub const TABLES: [(TableType, &str); 18] = [
(TableType::Table, CanonicalHeaders::const_name()),
@ -35,7 +41,7 @@ macro_rules! table {
#[derive(Clone, Copy, Debug, Default)]
pub struct $name;
impl $crate::kv::table::Table for $name {
impl $crate::db::table::Table for $name {
const NAME: &'static str = $name::const_name();
type Key = $key;
type Value = $value;

View File

@ -1,4 +1,4 @@
#![warn(missing_debug_implementations, missing_docs, unreachable_pub)]
#![warn(missing_docs, unreachable_pub)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
@ -13,6 +13,8 @@ pub mod executor;
/// Consensus traits.
pub mod consensus;
/// Database traits.
pub mod db;
/// Traits that provide chain access.
pub mod provider;

View File

@ -15,6 +15,7 @@ pub struct Signature {
pub odd_y_parity: bool,
}
#[allow(dead_code)]
impl Signature {
/// Encode the `v`, `r`, `s` values without a RLP header.
/// Encodes the `v` value using the legacy scheme without EIP-155.

View File

@ -9,6 +9,7 @@ description = "Staged syncing primitives used in reth."
[dependencies]
reth-primitives = { path = "../primitives" }
reth-interfaces = { path = "../interfaces" }
reth-db = { path = "../db" }
async-trait = "0.1.57"
thiserror = "1.0.37"

View File

@ -1,5 +1,5 @@
use crate::pipeline::PipelineEvent;
use reth_db::kv::KVError;
use reth_interfaces::db::Error as DbError;
use reth_primitives::BlockNumber;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
@ -17,7 +17,7 @@ pub enum StageError {
},
/// The stage encountered a database error.
#[error("A database error occurred.")]
Database(#[from] KVError),
Database(#[from] DbError),
/// The stage encountered an internal error.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
@ -31,7 +31,7 @@ pub enum PipelineError {
Stage(#[from] StageError),
/// The pipeline encountered a database error.
#[error("A database error occurred.")]
Database(#[from] KVError),
Database(#[from] DbError),
/// The pipeline encountered an error while trying to send an event.
#[error("The pipeline encountered an error while trying to send an event.")]
Channel(#[from] SendError<PipelineEvent>),

View File

@ -1,7 +1,4 @@
use reth_db::{
kv::{tables::SyncStage, tx::Tx, KVError},
mdbx,
};
use reth_interfaces::db::{tables::SyncStage, DbTx, DbTxMut, Error as DbError};
use reth_primitives::BlockNumber;
use std::fmt::Display;
@ -19,26 +16,16 @@ impl Display for StageId {
impl StageId {
/// Get the last committed progress of this stage.
pub fn get_progress<'db, K, E>(
&self,
tx: &Tx<'db, K, E>,
) -> Result<Option<BlockNumber>, KVError>
where
K: mdbx::TransactionKind,
E: mdbx::EnvironmentKind,
{
pub fn get_progress<'db>(&self, tx: &impl DbTx<'db>) -> Result<Option<BlockNumber>, DbError> {
tx.get::<SyncStage>(self.0.as_bytes().to_vec())
}
/// Save the progress of this stage.
pub fn save_progress<'db, E>(
pub fn save_progress<'db>(
&self,
tx: &Tx<'db, mdbx::RW, E>,
tx: &impl DbTxMut<'db>,
block: BlockNumber,
) -> Result<(), KVError>
where
E: mdbx::EnvironmentKind,
{
) -> Result<(), DbError> {
tx.put::<SyncStage>(self.0.as_bytes().to_vec(), block)
}
}

View File

@ -1,9 +1,8 @@
use crate::{
error::*,
util::{db::TxContainer, opt::MaybeSender},
ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
error::*, util::opt::MaybeSender, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput,
};
use reth_db::{kv::Env, mdbx};
use reth_interfaces::db::{Database, DbTx};
use reth_primitives::BlockNumber;
use std::fmt::{Debug, Formatter};
use tokio::sync::mpsc::Sender;
@ -70,37 +69,24 @@ use state::*;
///
/// The unwind priority is set with [Pipeline::push_with_unwind_priority]. Stages with higher unwind
/// priorities are unwound first.
pub struct Pipeline<'db, E>
where
E: mdbx::EnvironmentKind,
{
stages: Vec<QueuedStage<'db, E>>,
pub struct Pipeline<DB: Database> {
stages: Vec<QueuedStage<DB>>,
max_block: Option<BlockNumber>,
events_sender: MaybeSender<PipelineEvent>,
}
impl<'db, E> Default for Pipeline<'db, E>
where
E: mdbx::EnvironmentKind,
{
impl<DB: Database> Default for Pipeline<DB> {
fn default() -> Self {
Self { stages: Vec::new(), max_block: None, events_sender: MaybeSender::new(None) }
}
}
impl<'db, E> Debug for Pipeline<'db, E>
where
E: mdbx::EnvironmentKind,
{
impl<DB: Database> Debug for Pipeline<DB> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pipeline").field("max_block", &self.max_block).finish()
}
}
impl<'db, E> Pipeline<'db, E>
where
E: mdbx::EnvironmentKind,
{
impl<DB: Database> Pipeline<DB> {
/// Create a new pipeline.
pub fn new() -> Self {
Default::default()
@ -118,7 +104,7 @@ where
/// The unwind priority is set to 0.
pub fn push<S>(self, stage: S, require_tip: bool) -> Self
where
S: Stage<'db, E> + 'static,
S: Stage<DB> + 'static,
{
self.push_with_unwind_priority(stage, require_tip, 0)
}
@ -131,7 +117,7 @@ where
unwind_priority: usize,
) -> Self
where
S: Stage<'db, E> + 'static,
S: Stage<DB> + 'static,
{
self.stages.push(QueuedStage { stage: Box::new(stage), require_tip, unwind_priority });
self
@ -153,7 +139,7 @@ where
/// Run the pipeline in an infinite loop. Will terminate early if the user has specified
/// a `max_block` in the pipeline.
pub async fn run(&mut self, db: &'db Env<E>) -> Result<(), PipelineError> {
pub async fn run(&mut self, db: &DB) -> Result<(), PipelineError> {
loop {
let mut state = PipelineState {
events_sender: self.events_sender.clone(),
@ -162,8 +148,7 @@ where
minimum_progress: None,
reached_tip: true,
};
let mut tx = TxContainer::new(db)?;
let next_action = self.run_loop(&mut state, &mut tx).await?;
let next_action = self.run_loop(&mut state, db).await?;
// Terminate the loop early if it's reached the maximum user
// configured block.
@ -184,32 +169,29 @@ where
/// If any stage is unsuccessful at execution, we proceed to
/// unwind. This will undo the progress across the entire pipeline
/// up to the block that caused the error.
async fn run_loop<'tx>(
async fn run_loop(
&mut self,
state: &mut PipelineState,
tx: &mut TxContainer<'db, 'tx, E>,
) -> Result<ControlFlow, PipelineError>
where
'db: 'tx,
{
db: &DB,
) -> Result<ControlFlow, PipelineError> {
let mut previous_stage = None;
for (_, queued_stage) in self.stages.iter_mut().enumerate() {
let stage_id = queued_stage.stage.id();
let next = queued_stage
.execute(state, previous_stage, tx)
.execute(state, previous_stage, db)
.instrument(info_span!("Running", stage = %stage_id))
.await?;
match next {
ControlFlow::Continue => {
let tx = db.tx()?;
previous_stage =
Some((stage_id, stage_id.get_progress(tx.get())?.unwrap_or_default()));
Some((stage_id, stage_id.get_progress(&tx)?.unwrap_or_default()));
tx.commit()?;
}
ControlFlow::Unwind { target, bad_block } => {
// TODO: Note on close
tx.close();
self.unwind(tx.db, target, bad_block).await?;
tx.open()?;
self.unwind(db, target, bad_block).await?;
return Ok(ControlFlow::Unwind { target, bad_block })
}
}
@ -223,7 +205,7 @@ where
/// If the unwind is due to a bad block the number of that block should be specified.
pub async fn unwind(
&mut self,
db: &'db Env<E>,
db: &DB,
to: BlockNumber,
bad_block: Option<BlockNumber>,
) -> Result<(), PipelineError> {
@ -236,7 +218,7 @@ where
};
// Unwind stages in reverse order of priority (i.e. higher priority = first)
let mut tx = db.begin_mut_tx()?;
let tx = db.tx_mut()?;
for (_, QueuedStage { stage, .. }) in unwind_pipeline.iter_mut() {
let stage_id = stage.id();
let span = info_span!("Unwinding", stage = %stage_id);
@ -254,7 +236,7 @@ where
let input = UnwindInput { stage_progress, unwind_to: to, bad_block };
self.events_sender.send(PipelineEvent::Unwinding { stage_id, input }).await?;
let output = stage.unwind(&mut tx, input).await;
let output = stage.unwind(&tx, input).await;
match output {
Ok(unwind_output) => {
stage_progress = unwind_output.stage_progress;
@ -278,12 +260,9 @@ where
}
/// A container for a queued stage.
struct QueuedStage<'db, E>
where
E: mdbx::EnvironmentKind,
{
struct QueuedStage<DB: Database> {
/// The actual stage to execute.
stage: Box<dyn Stage<'db, E>>,
stage: Box<dyn Stage<DB>>,
/// The unwind priority of the stage.
unwind_priority: usize,
/// Whether or not this stage can only execute when we reach what we believe to be the tip of
@ -291,20 +270,14 @@ where
require_tip: bool,
}
impl<'db, E> QueuedStage<'db, E>
where
E: mdbx::EnvironmentKind,
{
impl<DB: Database> QueuedStage<DB> {
/// Execute the stage.
async fn execute<'tx>(
&mut self,
state: &mut PipelineState,
previous_stage: Option<(StageId, BlockNumber)>,
tx: &mut TxContainer<'db, 'tx, E>,
) -> Result<ControlFlow, PipelineError>
where
'db: 'tx,
{
db: &DB,
) -> Result<ControlFlow, PipelineError> {
let stage_id = self.stage.id();
if self.require_tip && !state.reached_tip() {
info!("Tip not reached, skipping.");
@ -315,8 +288,10 @@ where
return Ok(ControlFlow::Continue)
}
let mut tx = db.tx_mut()?;
loop {
let prev_progress = stage_id.get_progress(tx.get())?;
let prev_progress = stage_id.get_progress(&tx)?;
let stage_reached_max_block = prev_progress
.zip(state.max_block)
@ -337,12 +312,12 @@ where
match self
.stage
.execute(tx.get_mut(), ExecInput { previous_stage, stage_progress: prev_progress })
.execute(&tx, ExecInput { previous_stage, stage_progress: prev_progress })
.await
{
Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => {
debug!(stage = %stage_id, %stage_progress, %done, "Stage made progress");
stage_id.save_progress(tx.get_mut(), stage_progress)?;
stage_id.save_progress(&tx, stage_progress)?;
state
.events_sender
@ -351,6 +326,8 @@ where
// TODO: Make the commit interval configurable
tx.commit()?;
// create new mut transaction.
tx = db.tx_mut()?;
state.record_progress_outliers(stage_progress);
state.set_reached_tip(reached_tip);
@ -383,11 +360,12 @@ where
#[cfg(test)]
mod tests {
use super::*;
use crate::{StageId, UnwindOutput};
use reth_db::{
kv::{test_utils, tx::Tx, EnvKind},
mdbx,
kv::{test_utils, Env, EnvKind},
mdbx::{self, WriteMap},
};
use tokio::sync::mpsc::channel;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
@ -401,7 +379,7 @@ mod tests {
// Run pipeline
tokio::spawn(async move {
Pipeline::<mdbx::WriteMap>::new_with_channel(tx)
Pipeline::<Env<WriteMap>>::new_with_channel(tx)
.push(
TestStage::new(StageId("A")).add_exec(Ok(ExecOutput {
stage_progress: 20,
@ -449,7 +427,7 @@ mod tests {
// Run pipeline
tokio::spawn(async move {
let mut pipeline = Pipeline::<mdbx::WriteMap>::new()
let mut pipeline = Pipeline::<Env<mdbx::WriteMap>>::new()
.push(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput {
@ -522,7 +500,7 @@ mod tests {
// Run pipeline
tokio::spawn(async move {
Pipeline::<mdbx::WriteMap>::new()
Pipeline::<Env<mdbx::WriteMap>>::new()
.push(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput {
@ -606,7 +584,7 @@ mod tests {
// Run pipeline
tokio::spawn(async move {
let mut pipeline = Pipeline::<mdbx::WriteMap>::new()
let mut pipeline = Pipeline::<Env<mdbx::WriteMap>>::new()
.push_with_unwind_priority(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput {
@ -689,7 +667,7 @@ mod tests {
// Run pipeline
tokio::spawn(async move {
Pipeline::<mdbx::WriteMap>::new_with_channel(tx)
Pipeline::<Env<mdbx::WriteMap>>::new_with_channel(tx)
.push(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput {
@ -772,35 +750,26 @@ mod tests {
}
#[async_trait]
impl<'db, E> Stage<'db, E> for TestStage
where
E: mdbx::EnvironmentKind,
{
impl<DB: Database> Stage<DB> for TestStage {
fn id(&self) -> StageId {
self.id
}
async fn execute<'tx>(
async fn execute<'db>(
&mut self,
_: &mut Tx<'tx, mdbx::RW, E>,
_: ExecInput,
) -> Result<ExecOutput, StageError>
where
'db: 'tx,
{
_: &DB::TXMut<'db>,
_input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.exec_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id))
}
async fn unwind<'tx>(
async fn unwind<'db>(
&mut self,
_: &mut Tx<'tx, mdbx::RW, E>,
_: UnwindInput,
) -> Result<UnwindOutput, Box<dyn Error + Send + Sync>>
where
'db: 'tx,
{
_: &DB::TXMut<'db>,
_input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
self.unwind_outputs
.pop_front()
.unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id))

View File

@ -1,6 +1,6 @@
use crate::{error::StageError, id::StageId};
use async_trait::async_trait;
use reth_db::{kv::tx::Tx, mdbx};
use reth_interfaces::db::Database;
use reth_primitives::BlockNumber;
/// Stage execution input, see [Stage::execute].
@ -51,30 +51,23 @@ pub struct UnwindOutput {
///
/// Stages are executed as part of a pipeline where they are executed serially.
#[async_trait]
pub trait Stage<'db, E>: Send + Sync
where
E: mdbx::EnvironmentKind,
{
pub trait Stage<DB: Database>: Send + Sync {
/// Get the ID of the stage.
///
/// Stage IDs must be unique.
fn id(&self) -> StageId;
/// Execute the stage.
async fn execute<'tx>(
async fn execute<'db>(
&mut self,
tx: &mut Tx<'tx, mdbx::RW, E>,
tx: &DB::TXMut<'db>,
input: ExecInput,
) -> Result<ExecOutput, StageError>
where
'db: 'tx;
) -> Result<ExecOutput, StageError>;
/// Unwind the stage.
async fn unwind<'tx>(
async fn unwind<'db>(
&mut self,
tx: &mut Tx<'tx, mdbx::RW, E>,
tx: &DB::TXMut<'db>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>
where
'db: 'tx;
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>;
}

View File

@ -61,84 +61,3 @@ pub(crate) mod opt {
}
}
}
pub(crate) mod db {
use reth_db::{
kv::{tx::Tx, Env, KVError},
mdbx,
};
/// A container for a MDBX transaction that will open a new inner transaction when the current
/// one is committed.
// NOTE: This container is needed since `Transaction::commit` takes `mut self`, so methods in
// the pipeline that just take a reference will not be able to commit their transaction and let
// the pipeline continue. Is there a better way to do this?
pub(crate) struct TxContainer<'db, 'tx, E>
where
'db: 'tx,
E: mdbx::EnvironmentKind,
{
/// A handle to the MDBX database.
pub(crate) db: &'db Env<E>,
tx: Option<Tx<'tx, mdbx::RW, E>>,
}
impl<'db, 'tx, E> TxContainer<'db, 'tx, E>
where
'db: 'tx,
E: mdbx::EnvironmentKind,
{
/// Create a new container with the given database handle.
///
/// A new inner transaction will be opened.
pub(crate) fn new(db: &'db Env<E>) -> Result<Self, KVError> {
Ok(Self { db, tx: Some(Tx::new(db.begin_rw_txn()?)) })
}
/// Commit the current inner transaction and open a new one.
///
/// # Panics
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [TxContainer::close] was called without following up with a call to [TxContainer::open].
pub(crate) fn commit(&mut self) -> Result<bool, KVError> {
let success =
self.tx.take().expect("Tried committing a non-existent transaction").commit()?;
self.tx = Some(Tx::new(self.db.begin_rw_txn()?));
Ok(success)
}
/// Get the inner transaction.
///
/// # Panics
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [TxContainer::close] was called without following up with a call to [TxContainer::open].
pub(crate) fn get(&self) -> &Tx<'tx, mdbx::RW, E> {
self.tx.as_ref().expect("Tried getting a reference to a non-existent transaction")
}
/// Get a mutable reference to the inner transaction.
///
/// # Panics
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [TxContainer::close] was called without following up with a call to [TxContainer::open].
pub(crate) fn get_mut(&mut self) -> &mut Tx<'tx, mdbx::RW, E> {
self.tx
.as_mut()
.expect("Tried getting a mutable reference to a non-existent transaction")
}
/// Open a new inner transaction.
pub(crate) fn open(&mut self) -> Result<(), KVError> {
self.tx = Some(Tx::new(self.db.begin_rw_txn()?));
Ok(())
}
/// Close the current inner transaction.
pub(crate) fn close(&mut self) {
self.tx.take();
}
}
}

2
rust-toolchain.toml Normal file
View File

@ -0,0 +1,2 @@
[toolchain]
channel = "nightly"