diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 37da445fb..63dd61a68 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: - name: Checkout sources uses: actions/checkout@v3 - name: Install toolchain - uses: dtolnay/rust-toolchain@stable + uses: dtolnay/rust-toolchain@nightly - uses: Swatinem/rust-cache@v2 with: cache-on-failure: true diff --git a/Cargo.lock b/Cargo.lock index efa8ad189..62f8cafd6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2415,11 +2415,13 @@ version = "0.1.0" dependencies = [ "bytes", "criterion", + "eyre", "iai", "libmdbx", "page_size", "parity-scale-codec", "postcard", + "reth-interfaces", "reth-primitives", "serde", "tempfile", @@ -2501,8 +2503,15 @@ version = "0.1.0" dependencies = [ "async-trait", "auto_impl", + "bytes", + "eyre", + "heapless", + "parity-scale-codec", + "postcard", "reth-primitives", "reth-rpc-types", + "serde", + "test-fuzz", "thiserror", "tokio", ] @@ -2608,6 +2617,7 @@ dependencies = [ "aquamarine", "async-trait", "reth-db", + "reth-interfaces", "reth-primitives", "tempfile", "thiserror", diff --git a/README.md b/README.md index b15add4f4..441d6f7e0 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,10 @@ ![Github Actions](https://github.com/foundry-rs/reth/workflows/ci/badge.svg) +# Build + +To build this project we are currently using rust nightly for GAT support, that is planed to release in rust 1.65 (4.nov.2022). GAT's are used for Database trait in reth-interface. + ## Docs -Contributor docs can be found [here](./docs). \ No newline at end of file +Contributor docs can be found [here](./docs). diff --git a/crates/db/Cargo.toml b/crates/db/Cargo.toml index b902f9ef9..c969e0901 100644 --- a/crates/db/Cargo.toml +++ b/crates/db/Cargo.toml @@ -10,6 +10,7 @@ description = "Staged syncing primitives used in reth." [dependencies] # reth reth-primitives = { path = "../primitives" } +reth-interfaces = { path = "../interfaces" } # codecs serde = { version = "1.0.*", default-features = false } @@ -22,6 +23,7 @@ libmdbx = "0.1.8" page_size = "0.4.2" thiserror = "1.0.37" tempfile = { version = "3.3.0", optional = true } +eyre = "0.6.8" [dev-dependencies] tempfile = "3.3.0" @@ -29,6 +31,8 @@ test-fuzz = "3.0.4" criterion = "0.4.0" iai = "0.1.1" +reth-interfaces = { path = "../interfaces",features=["bench"] } + [features] test-utils = ["tempfile"] bench-postcard = ["bench"] diff --git a/crates/db/benches/encoding_crit.rs b/crates/db/benches/encoding_crit.rs index 27bbe36f7..34272cafa 100644 --- a/crates/db/benches/encoding_crit.rs +++ b/crates/db/benches/encoding_crit.rs @@ -7,10 +7,11 @@ macro_rules! impl_criterion_encoding_benchmark { let mut size = 0; c.bench_function(stringify!($name), |b| { b.iter(|| { - let encoded_size = reth_db::kv::codecs::fuzz::Header::encode_and_decode( - black_box(reth_primitives::Header::default()), - ) - .0; + let encoded_size = + reth_interfaces::db::codecs::fuzz::Header::encode_and_decode(black_box( + reth_primitives::Header::default(), + )) + .0; if size == 0 { size = encoded_size; diff --git a/crates/db/benches/encoding_iai.rs b/crates/db/benches/encoding_iai.rs index b418f597c..1584dc850 100644 --- a/crates/db/benches/encoding_iai.rs +++ b/crates/db/benches/encoding_iai.rs @@ -1,10 +1,11 @@ use iai::{black_box, main}; +use reth_interfaces::db; /// Benchmarks the encoding and decoding of `Header` using iai. macro_rules! impl_iai_encoding_benchmark { ($name:tt) => { fn $name() { - reth_db::kv::codecs::fuzz::Header::encode_and_decode(black_box( + db::codecs::fuzz::Header::encode_and_decode(black_box( reth_primitives::Header::default(), )); } diff --git a/crates/db/src/kv/cursor.rs b/crates/db/src/kv/cursor.rs index 0aaf66b17..681480622 100644 --- a/crates/db/src/kv/cursor.rs +++ b/crates/db/src/kv/cursor.rs @@ -1,18 +1,23 @@ //! Cursor wrapper for libmdbx-sys. -use super::error::KVError; -use crate::{ - kv::{Decode, DupSort, Encode, Table}, - utils::*, +use crate::utils::*; +use libmdbx::{self, TransactionKind, WriteFlags, RO, RW}; +use reth_interfaces::db::{ + DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupSort, DupWalker, Encode, Error, Table, + Walker, }; -use libmdbx::{self, TransactionKind, WriteFlags, RW}; /// Alias type for a `(key, value)` result coming from a cursor. -pub type PairResult = Result::Key, ::Value)>, KVError>; +pub type PairResult = Result::Key, ::Value)>, Error>; /// Alias type for a `(key, value)` result coming from an iterator. -pub type IterPairResult = Option::Key, ::Value), KVError>>; +pub type IterPairResult = Option::Key, ::Value), Error>>; /// Alias type for a value result coming from a cursor without its key. -pub type ValueOnlyResult = Result::Value>, KVError>; +pub type ValueOnlyResult = Result::Value>, Error>; + +/// Read only Cursor. +pub type CursorRO<'tx, T> = Cursor<'tx, RO, T>; +/// Read write cursor. +pub type CursorRW<'tx, T> = Cursor<'tx, RW, T>; /// Cursor wrapper to access KV items. #[derive(Debug)] @@ -29,175 +34,110 @@ pub struct Cursor<'tx, K: TransactionKind, T: Table> { #[macro_export] macro_rules! decode { ($v:expr) => { - $v?.map(decoder::).transpose() + $v.map_err(|e| Error::Decode(e.into()))?.map(decoder::).transpose() }; } -impl<'tx, K: TransactionKind, T: Table> Cursor<'tx, K, T> { - /// Returns the first `(key, value)` pair. - pub fn first(&mut self) -> PairResult - where - T::Key: Decode, - { +impl<'tx, K: TransactionKind, T: Table> DbCursorRO<'tx, T> for Cursor<'tx, K, T> { + fn first(&mut self) -> reth_interfaces::db::PairResult { decode!(self.inner.first()) } - /// Seeks for a `(key, value)` pair greater or equal than `key`. - pub fn seek(&mut self, key: T::SeekKey) -> PairResult - where - T::Key: Decode, - { + fn seek(&mut self, key: ::SeekKey) -> reth_interfaces::db::PairResult { decode!(self.inner.set_range(key.encode().as_ref())) } - /// Seeks for the exact `(key, value)` pair with `key`. - pub fn seek_exact(&mut self, key: T::Key) -> PairResult - where - T::Key: Decode, - { + fn seek_exact(&mut self, key: ::Key) -> reth_interfaces::db::PairResult { decode!(self.inner.set_key(key.encode().as_ref())) } - /// Returns the next `(key, value)` pair. - #[allow(clippy::should_implement_trait)] - pub fn next(&mut self) -> PairResult - where - T::Key: Decode, - { + fn next(&mut self) -> reth_interfaces::db::PairResult { decode!(self.inner.next()) } - /// Returns the previous `(key, value)` pair. - pub fn prev(&mut self) -> PairResult - where - T::Key: Decode, - { + fn prev(&mut self) -> reth_interfaces::db::PairResult { decode!(self.inner.prev()) } - /// Returns the last `(key, value)` pair. - pub fn last(&mut self) -> PairResult - where - T::Key: Decode, - { + fn last(&mut self) -> reth_interfaces::db::PairResult { decode!(self.inner.last()) } - /// Returns the current `(key, value)` pair of the cursor. - pub fn current(&mut self) -> PairResult - where - T::Key: Decode, - { + fn current(&mut self) -> reth_interfaces::db::PairResult { decode!(self.inner.get_current()) } - /// Returns an iterator starting at a key greater or equal than `start_key`. - pub fn walk( - mut self, - start_key: T::Key, - ) -> Result< - impl Iterator::Key, ::Value), KVError>>, - KVError, - > - where - T::Key: Decode, - { - let start = self.inner.set_range(start_key.encode().as_ref())?.map(decoder::); + fn walk(&'tx mut self, start_key: ::Key) -> Result, Error> { + let start = self + .inner + .set_range(start_key.encode().as_ref()) + .map_err(|e| Error::Internal(e.into()))? + .map(decoder::); - Ok(Walker { cursor: self, start }) + Ok(Walker::<'tx, T> { cursor: self, start }) } } -impl<'tx, T: Table> Cursor<'tx, RW, T> { - /// Inserts a `(key, value)` to the database. Repositions the cursor to the new item - pub fn put(&mut self, k: T::Key, v: T::Value, f: Option) -> Result<(), KVError> { - self.inner - .put(k.encode().as_ref(), v.encode().as_ref(), f.unwrap_or_default()) - .map_err(KVError::Put) - } -} - -impl<'txn, K, T> Cursor<'txn, K, T> -where - K: TransactionKind, - T: DupSort, -{ +impl<'tx, K: TransactionKind, T: DupSort> DbDupCursorRO<'tx, T> for Cursor<'tx, K, T> { /// Returns the next `(key, value)` pair of a DUPSORT table. - pub fn next_dup(&mut self) -> PairResult - where - T::Key: Decode, - { + fn next_dup(&mut self) -> PairResult { decode!(self.inner.next_dup()) } /// Returns the next `(key, value)` pair skipping the duplicates. - pub fn next_no_dup(&mut self) -> PairResult - where - T::Key: Decode, - { + fn next_no_dup(&mut self) -> PairResult { decode!(self.inner.next_nodup()) } /// Returns the next `value` of a duplicate `key`. - pub fn next_dup_val(&mut self) -> ValueOnlyResult { - self.inner.next_dup()?.map(decode_value::).transpose() + fn next_dup_val(&mut self) -> ValueOnlyResult { + self.inner + .next_dup() + .map_err(|e| Error::Internal(e.into()))? + .map(decode_value::) + .transpose() } /// Returns an iterator starting at a key greater or equal than `start_key` of a DUPSORT table. - pub fn walk_dup( - mut self, - key: T::Key, - subkey: T::SubKey, - ) -> Result::Value, KVError>>, KVError> { + fn walk_dup(&'tx mut self, key: T::Key, subkey: T::SubKey) -> Result, Error> { let start = self .inner - .get_both_range(key.encode().as_ref(), subkey.encode().as_ref())? + .get_both_range(key.encode().as_ref(), subkey.encode().as_ref()) + .map_err(|e| Error::Internal(e.into()))? .map(decode_one::); - Ok(DupWalker { cursor: self, start }) + Ok(DupWalker::<'tx, T> { cursor: self, start }) } } -/// Provides an iterator to `Cursor` when handling `Table`. -#[derive(Debug)] -pub struct Walker<'a, K: TransactionKind, T: Table> { - /// Cursor to be used to walk through the table. - pub cursor: Cursor<'a, K, T>, - /// `(key, value)` where to start the walk. - pub start: IterPairResult, -} +impl<'tx, T: Table> DbCursorRW<'tx, T> for Cursor<'tx, RW, T> { + /// Database operation that will update an existing row if a specified value already + /// exists in a table, and insert a new row if the specified value doesn't already exist + fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> { + self.inner + .put(key.encode().as_ref(), value.encode().as_ref(), WriteFlags::UPSERT) + .map_err(|e| Error::Internal(e.into())) + } -impl<'tx, K: TransactionKind, T: Table> std::iter::Iterator for Walker<'tx, K, T> -where - T::Key: Decode, -{ - type Item = Result<(T::Key, T::Value), KVError>; - fn next(&mut self) -> Option { - let start = self.start.take(); - if start.is_some() { - return start - } + fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> { + self.inner + .put(key.encode().as_ref(), value.encode().as_ref(), WriteFlags::APPEND) + .map_err(|e| Error::Internal(e.into())) + } - self.cursor.next().transpose() + fn delete_current(&mut self) -> Result<(), Error> { + self.inner.del(WriteFlags::CURRENT).map_err(|e| Error::Internal(e.into())) } } -/// Provides an iterator to `Cursor` when handling a `DupSort` table. -#[derive(Debug)] -pub struct DupWalker<'a, K: TransactionKind, T: DupSort> { - /// Cursor to be used to walk through the table. - pub cursor: Cursor<'a, K, T>, - /// Value where to start the walk. - pub start: Option>, -} +impl<'tx, T: DupSort> DbDupCursorRW<'tx, T> for Cursor<'tx, RW, T> { + fn delete_current_duplicates(&mut self) -> Result<(), Error> { + self.inner.del(WriteFlags::NO_DUP_DATA).map_err(|e| Error::Internal(e.into())) + } -impl<'tx, K: TransactionKind, T: DupSort> std::iter::Iterator for DupWalker<'tx, K, T> { - type Item = Result; - fn next(&mut self) -> Option { - let start = self.start.take(); - if start.is_some() { - return start - } - self.cursor.next_dup_val().transpose() + fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), Error> { + self.inner + .put(key.encode().as_ref(), value.encode().as_ref(), WriteFlags::APPEND_DUP) + .map_err(|e| Error::Internal(e.into())) } } diff --git a/crates/db/src/kv/error.rs b/crates/db/src/kv/error.rs deleted file mode 100644 index 096196c96..000000000 --- a/crates/db/src/kv/error.rs +++ /dev/null @@ -1,36 +0,0 @@ -//! KVError declaration. - -use libmdbx::Error; -use thiserror::Error; - -/// KV error type. -#[derive(Debug, Error)] -pub enum KVError { - /// Generic MDBX error. - #[error("MDBX error: {0:?}")] - MDBX(#[from] Error), - /// Failed to open MDBX file. - #[error("{0:?}")] - DatabaseLocation(Error), - /// Failed to create a table in database. - #[error("{0:?}")] - TableCreation(Error), - /// Failed to insert a value into a table. - #[error("{0:?}")] - Put(Error), - /// Failed to get a value into a table. - #[error("{0:?}")] - Get(Error), - /// Failed to delete a `(key, vakue)` pair into a table. - #[error("{0:?}")] - Delete(Error), - /// Failed to commit transaction changes into the database. - #[error("{0:?}")] - Commit(Error), - /// Failed to initiate a MDBX transaction. - #[error("{0:?}")] - InitTransaction(Error), - /// Failed to decode or encode a key or value coming from a table.. - #[error("Error decoding value.")] - InvalidValue, -} diff --git a/crates/db/src/kv/mod.rs b/crates/db/src/kv/mod.rs index 1e2fcab55..4937d19a4 100644 --- a/crates/db/src/kv/mod.rs +++ b/crates/db/src/kv/mod.rs @@ -1,32 +1,21 @@ //! Module that interacts with MDBX. -use crate::utils::{default_page_size, TableType}; +use crate::utils::default_page_size; use libmdbx::{ DatabaseFlags, Environment, EnvironmentFlags, EnvironmentKind, Geometry, Mode, PageSize, SyncMode, RO, RW, }; +use reth_interfaces::db::{ + tables::{TableType, TABLES}, + Database, Error, +}; use std::{ops::Deref, path::Path}; -pub mod table; -use table::{Decode, DupSort, Encode, Table}; - -pub mod tables; -use tables::TABLES; - pub mod cursor; -pub mod models; -pub use models::*; - pub mod tx; use tx::Tx; -mod error; -pub use error::KVError; - -// Made public so `benches` can access it. -pub mod codecs; - /// Environment used when opening a MDBX environment. RO/RW. #[derive(Debug)] pub enum EnvKind { @@ -43,11 +32,24 @@ pub struct Env { pub inner: Environment, } +impl Database for Env { + type TX<'a> = tx::Tx<'a, RO, E>; + type TXMut<'a> = tx::Tx<'a, RW, E>; + + fn tx(&self) -> Result, Error> { + Ok(Tx::new(self.inner.begin_ro_txn().map_err(|e| Error::Internal(e.into()))?)) + } + + fn tx_mut(&self) -> Result, Error> { + Ok(Tx::new(self.inner.begin_rw_txn().map_err(|e| Error::Internal(e.into()))?)) + } +} + impl Env { /// Opens the database at the specified path with the given `EnvKind`. /// /// It does not create the tables, for that call [`create_tables`]. - pub fn open(path: &Path, kind: EnvKind) -> Result, KVError> { + pub fn open(path: &Path, kind: EnvKind) -> Result, Error> { let mode = match kind { EnvKind::RO => Mode::ReadOnly, EnvKind::RW => Mode::ReadWrite { sync_mode: SyncMode::Durable }, @@ -69,15 +71,15 @@ impl Env { ..Default::default() }) .open(path) - .map_err(KVError::DatabaseLocation)?, + .map_err(|e| Error::Internal(e.into()))?, }; Ok(env) } /// Creates all the defined tables, if necessary. - pub fn create_tables(&self) -> Result<(), KVError> { - let tx = self.inner.begin_rw_txn().map_err(KVError::InitTransaction)?; + pub fn create_tables(&self) -> Result<(), Error> { + let tx = self.inner.begin_rw_txn().map_err(|e| Error::Initialization(e.into()))?; for (table_type, table) in TABLES { let flags = match table_type { @@ -85,56 +87,15 @@ impl Env { TableType::DupSort => DatabaseFlags::DUP_SORT, }; - tx.create_db(Some(table), flags).map_err(KVError::TableCreation)?; + tx.create_db(Some(table), flags).map_err(|e| Error::Initialization(e.into()))?; } - tx.commit()?; + tx.commit().map_err(|e| Error::Initialization(e.into()))?; Ok(()) } } -impl Env { - /// Initiates a read-only transaction. It should be committed or rolled back in the end, so it - /// frees up pages. - pub fn begin_tx(&self) -> Result, KVError> { - Ok(Tx::new(self.inner.begin_ro_txn().map_err(KVError::InitTransaction)?)) - } - - /// Initiates a read-write transaction. It should be committed or rolled back in the end. - pub fn begin_mut_tx(&self) -> Result, KVError> { - Ok(Tx::new(self.inner.begin_rw_txn().map_err(KVError::InitTransaction)?)) - } - - /// Takes a function and passes a read-only transaction into it, making sure it's closed in the - /// end of the execution. - pub fn view(&self, f: F) -> Result - where - F: Fn(&Tx<'_, RO, E>) -> T, - { - let tx = self.begin_tx()?; - - let res = f(&tx); - tx.commit()?; - - Ok(res) - } - - /// Takes a function and passes a write-read transaction into it, making sure it's committed in - /// the end of the execution. - pub fn update(&self, f: F) -> Result - where - F: Fn(&Tx<'_, RW, E>) -> T, - { - let tx = self.begin_mut_tx()?; - - let res = f(&tx); - tx.commit()?; - - Ok(res) - } -} - impl Deref for Env { type Target = libmdbx::Environment; @@ -170,11 +131,12 @@ pub mod test_utils { #[cfg(test)] mod tests { - use super::{ - tables::{Headers, PlainState}, - test_utils, Env, EnvKind, - }; + use super::{test_utils, Env, EnvKind}; use libmdbx::{NoWriteMap, WriteMap}; + use reth_interfaces::db::{ + tables::{Headers, PlainState}, + Database, DbTx, DbTxMut, + }; use reth_primitives::{Account, Address, Header, H256, U256}; use std::str::FromStr; use tempfile::TempDir; @@ -200,12 +162,12 @@ mod tests { let key = (1u64, H256::zero()); // PUT - let tx = env.begin_mut_tx().expect(ERROR_INIT_TX); + let tx = env.tx_mut().expect(ERROR_INIT_TX); tx.put::(key.into(), value.clone()).expect(ERROR_PUT); tx.commit().expect(ERROR_COMMIT); // GET - let tx = env.begin_tx().expect(ERROR_INIT_TX); + let tx = env.tx().expect(ERROR_INIT_TX); let result = tx.get::(key.into()).expect(ERROR_GET); assert!(result.expect(ERROR_RETURN_VALUE) == value); tx.commit().expect(ERROR_COMMIT); diff --git a/crates/db/src/kv/tx.rs b/crates/db/src/kv/tx.rs index 3737c5cad..02495ec3a 100644 --- a/crates/db/src/kv/tx.rs +++ b/crates/db/src/kv/tx.rs @@ -1,14 +1,8 @@ //! Transaction wrapper for libmdbx-sys. -use crate::{ - kv::{ - cursor::{Cursor, ValueOnlyResult}, - table::{Encode, Table}, - KVError, - }, - utils::decode_one, -}; +use crate::{kv::cursor::Cursor, utils::decode_one}; use libmdbx::{EnvironmentKind, Transaction, TransactionKind, WriteFlags, RW}; +use reth_interfaces::db::{DbTx, DbTxMut, DupSort, Encode, Error, Table}; use std::marker::PhantomData; /// Wrapper for the libmdbx transaction. @@ -32,52 +26,67 @@ impl<'env, K: TransactionKind, E: EnvironmentKind> Tx<'env, K, E> { self.inner.id() } - /// Open cursor on `table`. - pub fn cursor<'a, T: Table>(&'a self) -> Result, KVError> - where - 'env: 'a, - T: Table, - { + /// Create db Cursor + pub fn new_cursor(&self) -> Result, Error> { Ok(Cursor { - inner: self.inner.cursor(&self.inner.open_db(Some(T::NAME))?)?, + inner: self + .inner + .cursor(&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?) + .map_err(|e| Error::Internal(e.into()))?, table: T::NAME, _dbi: PhantomData, }) } +} - /// Gets value associated with `key` on `table`. If it's a DUPSORT table, then returns the first - /// entry. - pub fn get(&self, key: T::Key) -> ValueOnlyResult { +impl<'env, K: TransactionKind, E: EnvironmentKind> DbTx<'env> for Tx<'env, K, E> { + /// Cursor GAT + type Cursor = Cursor<'env, K, T>; + /// DupCursor GAT + type DupCursor = Cursor<'env, K, T>; + /// Iterate over read only values in database. + fn cursor(&self) -> Result, Error> { + self.new_cursor() + } + + /// Iterate over read only values in database. + fn cursor_dup(&self) -> Result, Error> { + self.new_cursor() + } + + fn commit(self) -> Result { + self.inner.commit().map_err(|e| Error::Internal(e.into())) + } + + fn get(&self, key: T::Key) -> Result::Value>, Error> { self.inner - .get(&self.inner.open_db(Some(T::NAME))?, key.encode().as_ref())? + .get( + &self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?, + key.encode().as_ref(), + ) + .map_err(|e| Error::Internal(e.into()))? .map(decode_one::) .transpose() } - - /// Saves all changes and frees up storage memory. - pub fn commit(self) -> Result { - self.inner.commit().map_err(KVError::Commit) - } } -impl<'a, E: EnvironmentKind> Tx<'a, RW, E> { - /// Opens `table` and inserts `(key, value)` pair. If the `key` already exists, it replaces the - /// value it if the table doesn't support DUPSORT. - pub fn put(&self, k: T::Key, v: T::Value) -> Result<(), KVError> - where - T: Table, - { +impl<'env, E: EnvironmentKind> DbTxMut<'env> for Tx<'env, RW, E> { + type CursorMut = Cursor<'env, RW, T>; + + type DupCursorMut = Cursor<'env, RW, T>; + + fn put(&self, key: T::Key, value: T::Value) -> Result<(), Error> { self.inner - .put(&self.inner.open_db(Some(T::NAME))?, &k.encode(), &v.encode(), WriteFlags::UPSERT) - .map_err(KVError::Put) + .put( + &self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?, + &key.encode(), + &value.encode(), + WriteFlags::UPSERT, + ) + .map_err(|e| Error::Internal(e.into())) } - /// Deletes the `(key, value)` entry on `table`. When `value` is `None`, all entries with `key` - /// are to be deleted. Otherwise, only the item matching that data shall be. - pub fn delete(&self, key: T::Key, value: Option) -> Result - where - T: Table, - { + fn delete(&self, key: T::Key, value: Option) -> Result { let mut data = None; let value = value.map(Encode::encode); @@ -86,17 +95,27 @@ impl<'a, E: EnvironmentKind> Tx<'a, RW, E> { }; self.inner - .del(&self.inner.open_db(Some(T::NAME))?, key.encode(), data) - .map_err(KVError::Delete) + .del( + &self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?, + key.encode(), + data, + ) + .map_err(|e| Error::Internal(e.into())) } - /// Empties `table`. - pub fn clear(&self) -> Result<(), KVError> - where - T: Table, - { - self.inner.clear_db(&self.inner.open_db(Some(T::NAME))?)?; + fn clear(&self) -> Result<(), Error> { + self.inner + .clear_db(&self.inner.open_db(Some(T::NAME)).map_err(|e| Error::Internal(e.into()))?) + .map_err(|e| Error::Internal(e.into()))?; Ok(()) } + + fn cursor_mut(&self) -> Result, Error> { + self.new_cursor() + } + + fn cursor_dup_mut(&self) -> Result, Error> { + self.new_cursor() + } } diff --git a/crates/db/src/utils.rs b/crates/db/src/utils.rs index e13135843..4d17d5f0d 100644 --- a/crates/db/src/utils.rs +++ b/crates/db/src/utils.rs @@ -1,19 +1,10 @@ //! Utils crate for `db`. -use crate::kv::{ - table::{Decode, Table}, - KVError, -}; +//suse crate::kv::Error; use bytes::Bytes; +use reth_interfaces::db::{Decode, Error, Table}; use std::borrow::Cow; -/// Enum for the type of table present in libmdbx. -#[derive(Debug)] -pub enum TableType { - Table, - DupSort, -} - /// Returns the default page size that can be used in this OS. pub(crate) fn default_page_size() -> usize { let os_page_size = page_size::get(); @@ -31,7 +22,7 @@ pub(crate) fn default_page_size() -> usize { /// Helper function to decode a `(key, value)` pair. pub(crate) fn decoder<'a, T>( kv: (Cow<'a, [u8]>, Cow<'a, [u8]>), -) -> Result<(T::Key, T::Value), KVError> +) -> Result<(T::Key, T::Value), Error> where T: Table, T::Key: Decode, @@ -43,7 +34,7 @@ where } /// Helper function to decode only a value from a `(key, value)` pair. -pub(crate) fn decode_value<'a, T>(kv: (Cow<'a, [u8]>, Cow<'a, [u8]>)) -> Result +pub(crate) fn decode_value<'a, T>(kv: (Cow<'a, [u8]>, Cow<'a, [u8]>)) -> Result where T: Table, { @@ -51,7 +42,7 @@ where } /// Helper function to decode a value. It can be a key or subkey. -pub(crate) fn decode_one(value: Cow<'_, [u8]>) -> Result +pub(crate) fn decode_one(value: Cow<'_, [u8]>) -> Result where T: Table, { diff --git a/crates/interfaces/Cargo.toml b/crates/interfaces/Cargo.toml index e6d88f7da..6b0e0c506 100644 --- a/crates/interfaces/Cargo.toml +++ b/crates/interfaces/Cargo.toml @@ -11,5 +11,19 @@ reth-primitives = { path = "../primitives" } reth-rpc-types = { path = "../net/rpc-types" } async-trait = "0.1.57" thiserror = "1.0.37" +eyre = "0.6.8" auto_impl = "1.0" tokio = { version = "1.21.2", features = ["sync"] } +bytes = "1.2" + +# codecs +serde = { version = "1.0.*", default-features = false } +postcard = { version = "1.0.2", features = ["alloc"] } +heapless = "0.7.16" +parity-scale-codec = { version = "3.2.1", features = ["bytes"] } + +[dev-dependencies] +test-fuzz = "3.0.4" + +[features] +bench = [] \ No newline at end of file diff --git a/crates/db/src/kv/codecs/fuzz.rs b/crates/interfaces/src/db/codecs/fuzz.rs similarity index 97% rename from crates/db/src/kv/codecs/fuzz.rs rename to crates/interfaces/src/db/codecs/fuzz.rs index e8e50d42f..e4abb5d1d 100644 --- a/crates/db/src/kv/codecs/fuzz.rs +++ b/crates/interfaces/src/db/codecs/fuzz.rs @@ -10,7 +10,7 @@ macro_rules! impl_fuzzer { #[cfg(any(test, feature = "bench"))] pub mod $name { use reth_primitives::$name; - use crate::kv::table; + use crate::db::table; /// Encodes and decodes table types returning its encoded size and the decoded object. pub fn encode_and_decode(obj: $name) -> (usize, $name) { diff --git a/crates/db/src/kv/codecs/mod.rs b/crates/interfaces/src/db/codecs/mod.rs similarity index 100% rename from crates/db/src/kv/codecs/mod.rs rename to crates/interfaces/src/db/codecs/mod.rs diff --git a/crates/db/src/kv/codecs/postcard.rs b/crates/interfaces/src/db/codecs/postcard.rs similarity index 84% rename from crates/db/src/kv/codecs/postcard.rs rename to crates/interfaces/src/db/codecs/postcard.rs index 6e3b141d7..792adaef1 100644 --- a/crates/db/src/kv/codecs/postcard.rs +++ b/crates/interfaces/src/db/codecs/postcard.rs @@ -1,7 +1,7 @@ #![allow(unused)] -use crate::kv::{Decode, Encode, KVError}; -use postcard::{from_bytes, to_allocvec}; +use crate::db::{Decode, Encode, Error}; +use postcard::{from_bytes, to_allocvec, to_vec}; use reth_primitives::*; // Just add `Serialize` and `Deserialize`, and set impl_heapless_postcard!(T, MaxSize(T)) @@ -28,7 +28,7 @@ macro_rules! impl_postcard { impl Decode for $name { fn decode>(value: B) -> Result { - from_bytes(&value.into()).map_err(|_| KVError::InvalidValue) + from_bytes(&value.into()).map_err(|e| Error::Decode(e.into())) } } )+ diff --git a/crates/db/src/kv/codecs/scale.rs b/crates/interfaces/src/db/codecs/scale.rs similarity index 82% rename from crates/db/src/kv/codecs/scale.rs rename to crates/interfaces/src/db/codecs/scale.rs index 499b8c836..3974913a2 100644 --- a/crates/db/src/kv/codecs/scale.rs +++ b/crates/interfaces/src/db/codecs/scale.rs @@ -1,4 +1,4 @@ -use crate::kv::{Decode, Encode, KVError}; +use crate::db::{Decode, Encode, Error}; use parity_scale_codec::decode_from_bytes; use reth_primitives::*; @@ -23,8 +23,8 @@ impl Decode for T where T: ScaleOnly + parity_scale_codec::Decode + Sync + Send + std::fmt::Debug, { - fn decode>(value: B) -> Result { - decode_from_bytes(value.into()).map_err(|_| KVError::InvalidValue) + fn decode>(value: B) -> Result { + decode_from_bytes(value.into()).map_err(|e| Error::Decode(e.into())) } } diff --git a/crates/interfaces/src/db/error.rs b/crates/interfaces/src/db/error.rs new file mode 100644 index 000000000..9f41f6cb0 --- /dev/null +++ b/crates/interfaces/src/db/error.rs @@ -0,0 +1,16 @@ +/// Database Error +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Encode errors. + #[error("A table encoding error:{0}")] + Encode(eyre::Error), + /// Decode errors. + #[error("A table decoding error:{0}")] + Decode(eyre::Error), + /// Initialization database error. + #[error("Initialization database error:{0}")] + Initialization(eyre::Error), + /// Internal DB error. + #[error("A internal database error:{0}")] + Internal(eyre::Error), +} diff --git a/crates/interfaces/src/db/mock.rs b/crates/interfaces/src/db/mock.rs new file mode 100644 index 000000000..112311204 --- /dev/null +++ b/crates/interfaces/src/db/mock.rs @@ -0,0 +1,180 @@ +//! Mock database +use std::collections::BTreeMap; + +use super::{ + Database, DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DbTx, DbTxMut, DupSort, Table, +}; + +/// Mock database used for testing with inner BTreeMap structure +/// TODO +#[derive(Clone, Default)] +pub struct DatabaseMock { + /// Main data. TODO (Make it table aware) + pub data: BTreeMap, Vec>, +} + +impl Database for DatabaseMock { + type TX<'a> = TxMock; + + type TXMut<'a> = TxMock; + + fn tx(&self) -> Result, super::Error> { + Ok(TxMock::default()) + } + + fn tx_mut(&self) -> Result, super::Error> { + Ok(TxMock::default()) + } +} + +/// Mock read only tx +#[derive(Clone, Default)] +pub struct TxMock { + /// Table representation + _table: BTreeMap, Vec>, +} + +impl<'a> DbTx<'a> for TxMock { + type Cursor = CursorMock; + + type DupCursor = CursorMock; + + fn get(&self, _key: T::Key) -> Result, super::Error> { + todo!() + } + + fn commit(self) -> Result { + todo!() + } + + fn cursor(&self) -> Result, super::Error> { + todo!() + } + + fn cursor_dup(&self) -> Result, super::Error> { + todo!() + } +} + +impl<'a> DbTxMut<'a> for TxMock { + type CursorMut = CursorMock; + + type DupCursorMut = CursorMock; + + fn put(&self, _key: T::Key, _value: T::Value) -> Result<(), super::Error> { + todo!() + } + + fn delete( + &self, + _key: T::Key, + _value: Option, + ) -> Result { + todo!() + } + + fn cursor_mut(&self) -> Result, super::Error> { + todo!() + } + + fn cursor_dup_mut(&self) -> Result, super::Error> { + todo!() + } + + fn clear(&self) -> Result<(), super::Error> { + todo!() + } +} + +/// CUrsor that iterates over table +pub struct CursorMock { + _cursor: u32, +} + +impl<'tx, T: Table> DbCursorRO<'tx, T> for CursorMock { + fn first(&mut self) -> super::PairResult { + todo!() + } + + fn seek(&mut self, _key: T::SeekKey) -> super::PairResult { + todo!() + } + + fn seek_exact(&mut self, _key: T::Key) -> super::PairResult { + todo!() + } + + fn next(&mut self) -> super::PairResult { + todo!() + } + + fn prev(&mut self) -> super::PairResult { + todo!() + } + + fn last(&mut self) -> super::PairResult { + todo!() + } + + fn current(&mut self) -> super::PairResult { + todo!() + } + + fn walk(&'tx mut self, _start_key: T::Key) -> Result, super::Error> { + todo!() + } +} + +impl<'tx, T: DupSort> DbDupCursorRO<'tx, T> for CursorMock { + fn next_dup(&mut self) -> super::PairResult { + todo!() + } + + fn next_no_dup(&mut self) -> super::PairResult { + todo!() + } + + fn next_dup_val(&mut self) -> super::ValueOnlyResult { + todo!() + } + + fn walk_dup( + &'tx mut self, + _key: ::Key, + _subkey: ::SubKey, + ) -> Result, super::Error> { + todo!() + } +} + +impl<'tx, T: Table> DbCursorRW<'tx, T> for CursorMock { + fn upsert( + &mut self, + _key: ::Key, + _value: ::Value, + ) -> Result<(), super::Error> { + todo!() + } + + fn append( + &mut self, + _key: ::Key, + _value: ::Value, + ) -> Result<(), super::Error> { + todo!() + } + + fn delete_current(&mut self) -> Result<(), super::Error> { + todo!() + } +} + +impl<'tx, T: DupSort> DbDupCursorRW<'tx, T> for CursorMock { + fn delete_current_duplicates(&mut self) -> Result<(), super::Error> { + todo!() + } + + fn append_dup(&mut self, _key: ::Key, _value: ::Value) -> Result<(), super::Error> { + todo!() + } +} diff --git a/crates/interfaces/src/db/mod.rs b/crates/interfaces/src/db/mod.rs new file mode 100644 index 000000000..d64cdcc35 --- /dev/null +++ b/crates/interfaces/src/db/mod.rs @@ -0,0 +1,202 @@ +pub mod codecs; +mod error; +pub mod mock; +pub mod models; +mod table; +pub mod tables; + +pub use error::Error; +pub use table::*; + +/// Main Database trait that spawns transactions to be executed. +pub trait Database { + /// RO database transaction + type TX<'a>: DbTx<'a> + Send + Sync + where + Self: 'a; + /// RW database transaction + type TXMut<'a>: DbTxMut<'a> + DbTx<'a> + Send + Sync + where + Self: 'a; + /// Create read only transaction. + fn tx(&self) -> Result, Error>; + /// Create read write transaction only possible if database is open with write access. + fn tx_mut(&self) -> Result, Error>; + + /// Takes a function and passes a read-only transaction into it, making sure it's closed in the + /// end of the execution. + fn view(&self, f: F) -> Result + where + F: Fn(&Self::TX<'_>) -> T, + { + let tx = self.tx()?; + + let res = f(&tx); + tx.commit()?; + + Ok(res) + } + + /// Takes a function and passes a write-read transaction into it, making sure it's committed in + /// the end of the execution. + fn update(&self, f: F) -> Result + where + F: Fn(&Self::TXMut<'_>) -> T, + { + let tx = self.tx_mut()?; + + let res = f(&tx); + tx.commit()?; + + Ok(res) + } +} + +/// Read only transaction +pub trait DbTx<'a> { + /// Cursor GAT + type Cursor: DbCursorRO<'a, T>; + /// DupCursor GAT + type DupCursor: DbDupCursorRO<'a, T> + DbCursorRO<'a, T>; + /// Get value + fn get(&self, key: T::Key) -> Result, Error>; + /// Commit for read only transaction will consume and free transaction and allows + /// freeing of memory pages + fn commit(self) -> Result; + /// Iterate over read only values in table. + fn cursor(&self) -> Result, Error>; + /// Iterate over read only values in dup sorted table. + fn cursor_dup(&self) -> Result, Error>; +} + +/// Read write transaction that allows writing to database +pub trait DbTxMut<'a> { + /// Cursor GAT + type CursorMut: DbCursorRW<'a, T> + DbCursorRO<'a, T>; + /// DupCursor GAT + type DupCursorMut: DbDupCursorRW<'a, T> + + DbCursorRW<'a, T> + + DbDupCursorRO<'a, T> + + DbCursorRO<'a, T>; + /// Put value to database + fn put(&self, key: T::Key, value: T::Value) -> Result<(), Error>; + /// Delete value from database + fn delete(&self, key: T::Key, value: Option) -> Result; + /// Clears database. + fn clear(&self) -> Result<(), Error>; + /// Cursor mut + fn cursor_mut(&self) -> Result, Error>; + /// DupCursor mut. + fn cursor_dup_mut(&self) -> Result, Error>; +} + +/// Alias type for a `(key, value)` result coming from a cursor. +pub type PairResult = Result::Key, ::Value)>, Error>; +/// Alias type for a `(key, value)` result coming from an iterator. +pub type IterPairResult = Option::Key, ::Value), Error>>; +/// Alias type for a value result coming from a cursor without its key. +pub type ValueOnlyResult = Result::Value>, Error>; + +/// Read only cursor over table. +pub trait DbCursorRO<'tx, T: Table> { + /// First item in table + fn first(&mut self) -> PairResult; + + /// Seeks for a `(key, value)` pair greater or equal than `key`. + fn seek(&mut self, key: T::SeekKey) -> PairResult; + + /// Seeks for the exact `(key, value)` pair with `key`. + fn seek_exact(&mut self, key: T::Key) -> PairResult; + + /// Returns the next `(key, value)` pair. + #[allow(clippy::should_implement_trait)] + fn next(&mut self) -> PairResult; + + /// Returns the previous `(key, value)` pair. + fn prev(&mut self) -> PairResult; + + /// Returns the last `(key, value)` pair. + fn last(&mut self) -> PairResult; + + /// Returns the current `(key, value)` pair of the cursor. + fn current(&mut self) -> PairResult; + + /// Returns an iterator starting at a key greater or equal than `start_key`. + fn walk(&'tx mut self, start_key: T::Key) -> Result, Error>; +} + +/// Read only curor over DupSort table. +pub trait DbDupCursorRO<'tx, T: DupSort> { + /// Returns the next `(key, value)` pair of a DUPSORT table. + fn next_dup(&mut self) -> PairResult; + + /// Returns the next `(key, value)` pair skipping the duplicates. + fn next_no_dup(&mut self) -> PairResult; + + /// Returns the next `value` of a duplicate `key`. + fn next_dup_val(&mut self) -> ValueOnlyResult; + + /// Returns an iterator starting at a key greater or equal than `start_key` of a DUPSORT + /// table. + fn walk_dup(&'tx mut self, key: T::Key, subkey: T::SubKey) -> Result, Error>; +} + +/// Read write cursor over table. +pub trait DbCursorRW<'tx, T: Table> { + /// Database operation that will update an existing row if a specified value already + /// exists in a table, and insert a new row if the specified value doesn't already exist + fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>; + + /// Append value to next cursor item + fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>; + + /// Delete current value that cursor points to + fn delete_current(&mut self) -> Result<(), Error>; +} + +/// Read Write Cursor over DupSorted table. +pub trait DbDupCursorRW<'tx, T: DupSort> { + /// Append value to next cursor item + fn delete_current_duplicates(&mut self) -> Result<(), Error>; + /// Append duplicate value + fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), Error>; +} + +/// Provides an iterator to `Cursor` when handling `Table`. +pub struct Walker<'cursor, T: Table> { + /// Cursor to be used to walk through the table. + pub cursor: &'cursor mut dyn DbCursorRO<'cursor, T>, + /// `(key, value)` where to start the walk. + pub start: IterPairResult, +} + +impl<'cursor, T: Table> std::iter::Iterator for Walker<'cursor, T> { + type Item = Result<(T::Key, T::Value), Error>; + fn next(&mut self) -> Option { + let start = self.start.take(); + if start.is_some() { + return start + } + + self.cursor.next().transpose() + } +} + +/// Provides an iterator to `Cursor` when handling a `DupSort` table. +pub struct DupWalker<'cursor, T: DupSort> { + /// Cursor to be used to walk through the table. + pub cursor: &'cursor mut dyn DbDupCursorRO<'cursor, T>, + /// Value where to start the walk. + pub start: Option>, +} + +impl<'cursor, T: DupSort> std::iter::Iterator for DupWalker<'cursor, T> { + type Item = Result; + fn next(&mut self) -> Option { + let start = self.start.take(); + if start.is_some() { + return start + } + self.cursor.next_dup_val().transpose() + } +} diff --git a/crates/db/src/kv/models/blocks.rs b/crates/interfaces/src/db/models/blocks.rs similarity index 86% rename from crates/db/src/kv/models/blocks.rs rename to crates/interfaces/src/db/models/blocks.rs index 214234060..a23b88721 100644 --- a/crates/db/src/kv/models/blocks.rs +++ b/crates/interfaces/src/db/models/blocks.rs @@ -1,10 +1,11 @@ //! Block related models and types. -use crate::kv::{ +use crate::db::{ table::{Decode, Encode}, - KVError, + Error, }; use bytes::Bytes; +use eyre::eyre; use reth_primitives::{BlockHash, BlockNumber, H256}; /// Total chain number of transactions. Key for [`CumulativeTxCount`]. @@ -53,10 +54,12 @@ impl Encode for BlockNumHash { } impl Decode for BlockNumHash { - fn decode>(value: B) -> Result { + fn decode>(value: B) -> Result { let value: bytes::Bytes = value.into(); - let num = u64::from_be_bytes(value.as_ref().try_into().map_err(|_| KVError::InvalidValue)?); + let num = u64::from_be_bytes( + value.as_ref().try_into().map_err(|_| Error::Decode(eyre!("Into bytes error.")))?, + ); let hash = H256::decode(value.slice(8..))?; Ok(BlockNumHash((num, hash))) diff --git a/crates/db/src/kv/models/mod.rs b/crates/interfaces/src/db/models/mod.rs similarity index 100% rename from crates/db/src/kv/models/mod.rs rename to crates/interfaces/src/db/models/mod.rs diff --git a/crates/db/src/kv/table.rs b/crates/interfaces/src/db/table.rs similarity index 92% rename from crates/db/src/kv/table.rs rename to crates/interfaces/src/db/table.rs index 70f0f70ae..13f477105 100644 --- a/crates/db/src/kv/table.rs +++ b/crates/interfaces/src/db/table.rs @@ -1,11 +1,10 @@ -//! Table traits. - -use super::KVError; +use super::Error; use bytes::Bytes; use std::{ fmt::Debug, marker::{Send, Sync}, }; + /// Trait that will transform the data to be saved in the DB. pub trait Encode: Send + Sync + Sized + Debug { /// Encoded type. @@ -18,7 +17,7 @@ pub trait Encode: Send + Sync + Sized + Debug { /// Trait that will transform the data to be read from the DB. pub trait Decode: Send + Sync + Sized + Debug { /// Decodes data coming from the database. - fn decode>(value: B) -> Result; + fn decode>(value: B) -> Result; } /// Generic trait that enforces the database value to implement [`Encode`] and [`Decode`]. @@ -40,11 +39,11 @@ pub trait Table: Send + Sync + Debug + 'static { /// Key element of `Table`. /// /// Sorting should be taken into account when encoding this. - type Key: Encode; + type Key: Object; /// Value element of `Table`. type Value: Object; /// Seek Key element of `Table`. - type SeekKey: Encode; + type SeekKey: Object; } /// DupSort allows for keys not to be repeated in the database, diff --git a/crates/db/src/kv/tables.rs b/crates/interfaces/src/db/tables.rs similarity index 91% rename from crates/db/src/kv/tables.rs rename to crates/interfaces/src/db/tables.rs index 515376d59..46c42eab3 100644 --- a/crates/db/src/kv/tables.rs +++ b/crates/interfaces/src/db/tables.rs @@ -1,10 +1,16 @@ -//! Declaration of all MDBX tables. -use crate::{ - kv::blocks::{BlockNumHash, HeaderHash, NumTransactions, NumTxesInBlock}, - utils::TableType, -}; +//! Declaration of all Database tables. + +use crate::db::models::blocks::{BlockNumHash, HeaderHash, NumTransactions, NumTxesInBlock}; use reth_primitives::{Account, Address, BlockNumber, Header, Receipt}; +/// Enum for the type of table present in libmdbx. +#[derive(Debug)] +pub enum TableType { + /// key value table + Table, + /// Duplicate key value table + DupSort, +} /// Default tables that should be present inside database. pub const TABLES: [(TableType, &str); 18] = [ (TableType::Table, CanonicalHeaders::const_name()), @@ -35,7 +41,7 @@ macro_rules! table { #[derive(Clone, Copy, Debug, Default)] pub struct $name; - impl $crate::kv::table::Table for $name { + impl $crate::db::table::Table for $name { const NAME: &'static str = $name::const_name(); type Key = $key; type Value = $value; diff --git a/crates/interfaces/src/lib.rs b/crates/interfaces/src/lib.rs index bf9766c68..91d9521bc 100644 --- a/crates/interfaces/src/lib.rs +++ b/crates/interfaces/src/lib.rs @@ -1,4 +1,4 @@ -#![warn(missing_debug_implementations, missing_docs, unreachable_pub)] +#![warn(missing_docs, unreachable_pub)] #![deny(unused_must_use, rust_2018_idioms)] #![doc(test( no_crate_inject, @@ -13,6 +13,8 @@ pub mod executor; /// Consensus traits. pub mod consensus; +/// Database traits. +pub mod db; /// Traits that provide chain access. pub mod provider; diff --git a/crates/primitives/src/transaction/signature.rs b/crates/primitives/src/transaction/signature.rs index 02bcdcc6a..bfbb67495 100644 --- a/crates/primitives/src/transaction/signature.rs +++ b/crates/primitives/src/transaction/signature.rs @@ -15,6 +15,7 @@ pub struct Signature { pub odd_y_parity: bool, } +#[allow(dead_code)] impl Signature { /// Encode the `v`, `r`, `s` values without a RLP header. /// Encodes the `v` value using the legacy scheme without EIP-155. diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index 92062abe1..b6c3e879f 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -9,6 +9,7 @@ description = "Staged syncing primitives used in reth." [dependencies] reth-primitives = { path = "../primitives" } +reth-interfaces = { path = "../interfaces" } reth-db = { path = "../db" } async-trait = "0.1.57" thiserror = "1.0.37" diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index 2f501a358..3370d8fdd 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -1,5 +1,5 @@ use crate::pipeline::PipelineEvent; -use reth_db::kv::KVError; +use reth_interfaces::db::Error as DbError; use reth_primitives::BlockNumber; use thiserror::Error; use tokio::sync::mpsc::error::SendError; @@ -17,7 +17,7 @@ pub enum StageError { }, /// The stage encountered a database error. #[error("A database error occurred.")] - Database(#[from] KVError), + Database(#[from] DbError), /// The stage encountered an internal error. #[error(transparent)] Internal(Box), @@ -31,7 +31,7 @@ pub enum PipelineError { Stage(#[from] StageError), /// The pipeline encountered a database error. #[error("A database error occurred.")] - Database(#[from] KVError), + Database(#[from] DbError), /// The pipeline encountered an error while trying to send an event. #[error("The pipeline encountered an error while trying to send an event.")] Channel(#[from] SendError), diff --git a/crates/stages/src/id.rs b/crates/stages/src/id.rs index 938417d7a..ecc920c11 100644 --- a/crates/stages/src/id.rs +++ b/crates/stages/src/id.rs @@ -1,7 +1,4 @@ -use reth_db::{ - kv::{tables::SyncStage, tx::Tx, KVError}, - mdbx, -}; +use reth_interfaces::db::{tables::SyncStage, DbTx, DbTxMut, Error as DbError}; use reth_primitives::BlockNumber; use std::fmt::Display; @@ -19,26 +16,16 @@ impl Display for StageId { impl StageId { /// Get the last committed progress of this stage. - pub fn get_progress<'db, K, E>( - &self, - tx: &Tx<'db, K, E>, - ) -> Result, KVError> - where - K: mdbx::TransactionKind, - E: mdbx::EnvironmentKind, - { + pub fn get_progress<'db>(&self, tx: &impl DbTx<'db>) -> Result, DbError> { tx.get::(self.0.as_bytes().to_vec()) } /// Save the progress of this stage. - pub fn save_progress<'db, E>( + pub fn save_progress<'db>( &self, - tx: &Tx<'db, mdbx::RW, E>, + tx: &impl DbTxMut<'db>, block: BlockNumber, - ) -> Result<(), KVError> - where - E: mdbx::EnvironmentKind, - { + ) -> Result<(), DbError> { tx.put::(self.0.as_bytes().to_vec(), block) } } diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index e64a7be57..7dec12a2e 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -1,9 +1,8 @@ use crate::{ - error::*, - util::{db::TxContainer, opt::MaybeSender}, - ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, + error::*, util::opt::MaybeSender, ExecInput, ExecOutput, Stage, StageError, StageId, + UnwindInput, }; -use reth_db::{kv::Env, mdbx}; +use reth_interfaces::db::{Database, DbTx}; use reth_primitives::BlockNumber; use std::fmt::{Debug, Formatter}; use tokio::sync::mpsc::Sender; @@ -70,37 +69,24 @@ use state::*; /// /// The unwind priority is set with [Pipeline::push_with_unwind_priority]. Stages with higher unwind /// priorities are unwound first. -pub struct Pipeline<'db, E> -where - E: mdbx::EnvironmentKind, -{ - stages: Vec>, +pub struct Pipeline { + stages: Vec>, max_block: Option, events_sender: MaybeSender, } -impl<'db, E> Default for Pipeline<'db, E> -where - E: mdbx::EnvironmentKind, -{ +impl Default for Pipeline { fn default() -> Self { Self { stages: Vec::new(), max_block: None, events_sender: MaybeSender::new(None) } } } - -impl<'db, E> Debug for Pipeline<'db, E> -where - E: mdbx::EnvironmentKind, -{ +impl Debug for Pipeline { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("Pipeline").field("max_block", &self.max_block).finish() } } -impl<'db, E> Pipeline<'db, E> -where - E: mdbx::EnvironmentKind, -{ +impl Pipeline { /// Create a new pipeline. pub fn new() -> Self { Default::default() @@ -118,7 +104,7 @@ where /// The unwind priority is set to 0. pub fn push(self, stage: S, require_tip: bool) -> Self where - S: Stage<'db, E> + 'static, + S: Stage + 'static, { self.push_with_unwind_priority(stage, require_tip, 0) } @@ -131,7 +117,7 @@ where unwind_priority: usize, ) -> Self where - S: Stage<'db, E> + 'static, + S: Stage + 'static, { self.stages.push(QueuedStage { stage: Box::new(stage), require_tip, unwind_priority }); self @@ -153,7 +139,7 @@ where /// Run the pipeline in an infinite loop. Will terminate early if the user has specified /// a `max_block` in the pipeline. - pub async fn run(&mut self, db: &'db Env) -> Result<(), PipelineError> { + pub async fn run(&mut self, db: &DB) -> Result<(), PipelineError> { loop { let mut state = PipelineState { events_sender: self.events_sender.clone(), @@ -162,8 +148,7 @@ where minimum_progress: None, reached_tip: true, }; - let mut tx = TxContainer::new(db)?; - let next_action = self.run_loop(&mut state, &mut tx).await?; + let next_action = self.run_loop(&mut state, db).await?; // Terminate the loop early if it's reached the maximum user // configured block. @@ -184,32 +169,29 @@ where /// If any stage is unsuccessful at execution, we proceed to /// unwind. This will undo the progress across the entire pipeline /// up to the block that caused the error. - async fn run_loop<'tx>( + async fn run_loop( &mut self, state: &mut PipelineState, - tx: &mut TxContainer<'db, 'tx, E>, - ) -> Result - where - 'db: 'tx, - { + db: &DB, + ) -> Result { let mut previous_stage = None; for (_, queued_stage) in self.stages.iter_mut().enumerate() { let stage_id = queued_stage.stage.id(); let next = queued_stage - .execute(state, previous_stage, tx) + .execute(state, previous_stage, db) .instrument(info_span!("Running", stage = %stage_id)) .await?; match next { ControlFlow::Continue => { + let tx = db.tx()?; previous_stage = - Some((stage_id, stage_id.get_progress(tx.get())?.unwrap_or_default())); + Some((stage_id, stage_id.get_progress(&tx)?.unwrap_or_default())); + tx.commit()?; } ControlFlow::Unwind { target, bad_block } => { - // TODO: Note on close - tx.close(); - self.unwind(tx.db, target, bad_block).await?; - tx.open()?; + self.unwind(db, target, bad_block).await?; + return Ok(ControlFlow::Unwind { target, bad_block }) } } @@ -223,7 +205,7 @@ where /// If the unwind is due to a bad block the number of that block should be specified. pub async fn unwind( &mut self, - db: &'db Env, + db: &DB, to: BlockNumber, bad_block: Option, ) -> Result<(), PipelineError> { @@ -236,7 +218,7 @@ where }; // Unwind stages in reverse order of priority (i.e. higher priority = first) - let mut tx = db.begin_mut_tx()?; + let tx = db.tx_mut()?; for (_, QueuedStage { stage, .. }) in unwind_pipeline.iter_mut() { let stage_id = stage.id(); let span = info_span!("Unwinding", stage = %stage_id); @@ -254,7 +236,7 @@ where let input = UnwindInput { stage_progress, unwind_to: to, bad_block }; self.events_sender.send(PipelineEvent::Unwinding { stage_id, input }).await?; - let output = stage.unwind(&mut tx, input).await; + let output = stage.unwind(&tx, input).await; match output { Ok(unwind_output) => { stage_progress = unwind_output.stage_progress; @@ -278,12 +260,9 @@ where } /// A container for a queued stage. -struct QueuedStage<'db, E> -where - E: mdbx::EnvironmentKind, -{ +struct QueuedStage { /// The actual stage to execute. - stage: Box>, + stage: Box>, /// The unwind priority of the stage. unwind_priority: usize, /// Whether or not this stage can only execute when we reach what we believe to be the tip of @@ -291,20 +270,14 @@ where require_tip: bool, } -impl<'db, E> QueuedStage<'db, E> -where - E: mdbx::EnvironmentKind, -{ +impl QueuedStage { /// Execute the stage. async fn execute<'tx>( &mut self, state: &mut PipelineState, previous_stage: Option<(StageId, BlockNumber)>, - tx: &mut TxContainer<'db, 'tx, E>, - ) -> Result - where - 'db: 'tx, - { + db: &DB, + ) -> Result { let stage_id = self.stage.id(); if self.require_tip && !state.reached_tip() { info!("Tip not reached, skipping."); @@ -315,8 +288,10 @@ where return Ok(ControlFlow::Continue) } + let mut tx = db.tx_mut()?; + loop { - let prev_progress = stage_id.get_progress(tx.get())?; + let prev_progress = stage_id.get_progress(&tx)?; let stage_reached_max_block = prev_progress .zip(state.max_block) @@ -337,12 +312,12 @@ where match self .stage - .execute(tx.get_mut(), ExecInput { previous_stage, stage_progress: prev_progress }) + .execute(&tx, ExecInput { previous_stage, stage_progress: prev_progress }) .await { Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => { debug!(stage = %stage_id, %stage_progress, %done, "Stage made progress"); - stage_id.save_progress(tx.get_mut(), stage_progress)?; + stage_id.save_progress(&tx, stage_progress)?; state .events_sender @@ -351,6 +326,8 @@ where // TODO: Make the commit interval configurable tx.commit()?; + // create new mut transaction. + tx = db.tx_mut()?; state.record_progress_outliers(stage_progress); state.set_reached_tip(reached_tip); @@ -383,11 +360,12 @@ where #[cfg(test)] mod tests { + use super::*; use crate::{StageId, UnwindOutput}; use reth_db::{ - kv::{test_utils, tx::Tx, EnvKind}, - mdbx, + kv::{test_utils, Env, EnvKind}, + mdbx::{self, WriteMap}, }; use tokio::sync::mpsc::channel; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; @@ -401,7 +379,7 @@ mod tests { // Run pipeline tokio::spawn(async move { - Pipeline::::new_with_channel(tx) + Pipeline::>::new_with_channel(tx) .push( TestStage::new(StageId("A")).add_exec(Ok(ExecOutput { stage_progress: 20, @@ -449,7 +427,7 @@ mod tests { // Run pipeline tokio::spawn(async move { - let mut pipeline = Pipeline::::new() + let mut pipeline = Pipeline::>::new() .push( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { @@ -522,7 +500,7 @@ mod tests { // Run pipeline tokio::spawn(async move { - Pipeline::::new() + Pipeline::>::new() .push( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { @@ -606,7 +584,7 @@ mod tests { // Run pipeline tokio::spawn(async move { - let mut pipeline = Pipeline::::new() + let mut pipeline = Pipeline::>::new() .push_with_unwind_priority( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { @@ -689,7 +667,7 @@ mod tests { // Run pipeline tokio::spawn(async move { - Pipeline::::new_with_channel(tx) + Pipeline::>::new_with_channel(tx) .push( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { @@ -772,35 +750,26 @@ mod tests { } #[async_trait] - impl<'db, E> Stage<'db, E> for TestStage - where - E: mdbx::EnvironmentKind, - { + impl Stage for TestStage { fn id(&self) -> StageId { self.id } - async fn execute<'tx>( + async fn execute<'db>( &mut self, - _: &mut Tx<'tx, mdbx::RW, E>, - _: ExecInput, - ) -> Result - where - 'db: 'tx, - { + _: &DB::TXMut<'db>, + _input: ExecInput, + ) -> Result { self.exec_outputs .pop_front() .unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id)) } - async fn unwind<'tx>( + async fn unwind<'db>( &mut self, - _: &mut Tx<'tx, mdbx::RW, E>, - _: UnwindInput, - ) -> Result> - where - 'db: 'tx, - { + _: &DB::TXMut<'db>, + _input: UnwindInput, + ) -> Result> { self.unwind_outputs .pop_front() .unwrap_or_else(|| panic!("Test stage {} unwound too many times.", self.id)) diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 03cd6c965..c88342a18 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -1,6 +1,6 @@ use crate::{error::StageError, id::StageId}; use async_trait::async_trait; -use reth_db::{kv::tx::Tx, mdbx}; +use reth_interfaces::db::Database; use reth_primitives::BlockNumber; /// Stage execution input, see [Stage::execute]. @@ -51,30 +51,23 @@ pub struct UnwindOutput { /// /// Stages are executed as part of a pipeline where they are executed serially. #[async_trait] -pub trait Stage<'db, E>: Send + Sync -where - E: mdbx::EnvironmentKind, -{ +pub trait Stage: Send + Sync { /// Get the ID of the stage. /// /// Stage IDs must be unique. fn id(&self) -> StageId; /// Execute the stage. - async fn execute<'tx>( + async fn execute<'db>( &mut self, - tx: &mut Tx<'tx, mdbx::RW, E>, + tx: &DB::TXMut<'db>, input: ExecInput, - ) -> Result - where - 'db: 'tx; + ) -> Result; /// Unwind the stage. - async fn unwind<'tx>( + async fn unwind<'db>( &mut self, - tx: &mut Tx<'tx, mdbx::RW, E>, + tx: &DB::TXMut<'db>, input: UnwindInput, - ) -> Result> - where - 'db: 'tx; + ) -> Result>; } diff --git a/crates/stages/src/util.rs b/crates/stages/src/util.rs index c80b53f4f..bb70011da 100644 --- a/crates/stages/src/util.rs +++ b/crates/stages/src/util.rs @@ -61,84 +61,3 @@ pub(crate) mod opt { } } } - -pub(crate) mod db { - use reth_db::{ - kv::{tx::Tx, Env, KVError}, - mdbx, - }; - - /// A container for a MDBX transaction that will open a new inner transaction when the current - /// one is committed. - // NOTE: This container is needed since `Transaction::commit` takes `mut self`, so methods in - // the pipeline that just take a reference will not be able to commit their transaction and let - // the pipeline continue. Is there a better way to do this? - pub(crate) struct TxContainer<'db, 'tx, E> - where - 'db: 'tx, - E: mdbx::EnvironmentKind, - { - /// A handle to the MDBX database. - pub(crate) db: &'db Env, - tx: Option>, - } - - impl<'db, 'tx, E> TxContainer<'db, 'tx, E> - where - 'db: 'tx, - E: mdbx::EnvironmentKind, - { - /// Create a new container with the given database handle. - /// - /// A new inner transaction will be opened. - pub(crate) fn new(db: &'db Env) -> Result { - Ok(Self { db, tx: Some(Tx::new(db.begin_rw_txn()?)) }) - } - - /// Commit the current inner transaction and open a new one. - /// - /// # Panics - /// - /// Panics if an inner transaction does not exist. This should never be the case unless - /// [TxContainer::close] was called without following up with a call to [TxContainer::open]. - pub(crate) fn commit(&mut self) -> Result { - let success = - self.tx.take().expect("Tried committing a non-existent transaction").commit()?; - self.tx = Some(Tx::new(self.db.begin_rw_txn()?)); - Ok(success) - } - - /// Get the inner transaction. - /// - /// # Panics - /// - /// Panics if an inner transaction does not exist. This should never be the case unless - /// [TxContainer::close] was called without following up with a call to [TxContainer::open]. - pub(crate) fn get(&self) -> &Tx<'tx, mdbx::RW, E> { - self.tx.as_ref().expect("Tried getting a reference to a non-existent transaction") - } - - /// Get a mutable reference to the inner transaction. - /// - /// # Panics - /// - /// Panics if an inner transaction does not exist. This should never be the case unless - /// [TxContainer::close] was called without following up with a call to [TxContainer::open]. - pub(crate) fn get_mut(&mut self) -> &mut Tx<'tx, mdbx::RW, E> { - self.tx - .as_mut() - .expect("Tried getting a mutable reference to a non-existent transaction") - } - - /// Open a new inner transaction. - pub(crate) fn open(&mut self) -> Result<(), KVError> { - self.tx = Some(Tx::new(self.db.begin_rw_txn()?)); - Ok(()) - } - - /// Close the current inner transaction. - pub(crate) fn close(&mut self) { - self.tx.take(); - } - } -} diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 000000000..271800cb2 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,2 @@ +[toolchain] +channel = "nightly" \ No newline at end of file