chore: split db abstraction into new crate (#8594)

This commit is contained in:
Oliver
2024-06-04 23:45:57 +02:00
committed by GitHub
parent a8095740fc
commit 51a28f22da
183 changed files with 825 additions and 755 deletions

View File

@ -0,0 +1,71 @@
[package]
name = "reth-db-api"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Database abstraction used in reth."
[lints]
workspace = true
[dependencies]
# reth
reth-codecs.workspace = true
reth-primitives.workspace = true
reth-storage-errors.workspace = true
# codecs
modular-bitfield.workspace = true
parity-scale-codec = { version = "3.2.1", features = ["bytes"] }
serde = { workspace = true, default-features = false }
# metrics
metrics.workspace = true
# misc
derive_more.workspace = true
bytes.workspace = true
# arbitrary utils
arbitrary = { workspace = true, features = ["derive"], optional = true }
proptest = { workspace = true, optional = true }
proptest-derive = { workspace = true, optional = true }
[dev-dependencies]
# reth libs with arbitrary
reth-primitives = { workspace = true, features = ["arbitrary"] }
reth-codecs.workspace = true
rand.workspace = true
serde_json.workspace = true
test-fuzz.workspace = true
pprof = { workspace = true, features = [
"flamegraph",
"frame-pointer",
"criterion",
] }
criterion.workspace = true
iai-callgrind = "0.10.2"
arbitrary = { workspace = true, features = ["derive"] }
proptest.workspace = true
proptest-derive.workspace = true
paste.workspace = true
assert_matches.workspace = true
[features]
test-utils = ["arbitrary"]
arbitrary = [
"reth-primitives/arbitrary",
"dep:arbitrary",
"dep:proptest",
"dep:proptest-derive",
]
optimism = []

View File

@ -0,0 +1,19 @@
use crate::{table::*, DatabaseError};
/// A key-value pair for table `T`.
pub type KeyValue<T> = (<T as Table>::Key, <T as Table>::Value);
/// A fallible key-value pair that may or may not exist.
///
/// The `Result` represents that the operation might fail, while the `Option` represents whether or
/// not the entry exists.
pub type PairResult<T> = Result<Option<KeyValue<T>>, DatabaseError>;
/// A key-value pair coming from an iterator.
///
/// The `Result` represents that the operation might fail, while the `Option` represents whether or
/// not there is another entry.
pub type IterPairResult<T> = Option<Result<KeyValue<T>, DatabaseError>>;
/// A value only result for table `T`.
pub type ValueOnlyResult<T> = Result<Option<<T as Table>::Value>, DatabaseError>;

View File

@ -0,0 +1,373 @@
use std::{
fmt,
ops::{Bound, RangeBounds},
};
use crate::{
common::{IterPairResult, PairResult, ValueOnlyResult},
table::{DupSort, Table, TableRow},
DatabaseError,
};
/// A read-only cursor over table `T`.
pub trait DbCursorRO<T: Table> {
/// Positions the cursor at the first entry in the table, returning it.
fn first(&mut self) -> PairResult<T>;
/// Seeks to the KV pair exactly at `key`.
fn seek_exact(&mut self, key: T::Key) -> PairResult<T>;
/// Seeks to the KV pair whose key is greater than or equal to `key`.
fn seek(&mut self, key: T::Key) -> PairResult<T>;
/// Position the cursor at the next KV pair, returning it.
#[allow(clippy::should_implement_trait)]
fn next(&mut self) -> PairResult<T>;
/// Position the cursor at the previous KV pair, returning it.
fn prev(&mut self) -> PairResult<T>;
/// Positions the cursor at the last entry in the table, returning it.
fn last(&mut self) -> PairResult<T>;
/// Get the KV pair at the cursor's current position.
fn current(&mut self) -> PairResult<T>;
/// Get an iterator that walks through the table.
///
/// If `start_key` is `None`, then the walker will start from the first entry of the table,
/// otherwise it starts at the entry greater than or equal to the provided key.
fn walk(&mut self, start_key: Option<T::Key>) -> Result<Walker<'_, T, Self>, DatabaseError>
where
Self: Sized;
/// Get an iterator that walks over a range of keys in the table.
fn walk_range(
&mut self,
range: impl RangeBounds<T::Key>,
) -> Result<RangeWalker<'_, T, Self>, DatabaseError>
where
Self: Sized;
/// Get an iterator that walks through the table in reverse order.
///
/// If `start_key` is `None`, then the walker will start from the last entry of the table,
/// otherwise it starts at the entry greater than or equal to the provided key.
fn walk_back(
&mut self,
start_key: Option<T::Key>,
) -> Result<ReverseWalker<'_, T, Self>, DatabaseError>
where
Self: Sized;
}
/// A read-only cursor over the dup table `T`.
pub trait DbDupCursorRO<T: DupSort> {
/// Positions the cursor at the next KV pair of the table, returning it.
fn next_dup(&mut self) -> PairResult<T>;
/// Positions the cursor at the next KV pair of the table, skipping duplicates.
fn next_no_dup(&mut self) -> PairResult<T>;
/// Positions the cursor at the next duplicate value of the current key.
fn next_dup_val(&mut self) -> ValueOnlyResult<T>;
/// Positions the cursor at the entry greater than or equal to the provided key/subkey pair.
///
/// # Note
///
/// The position of the cursor might not correspond to the key/subkey pair if the entry does not
/// exist.
fn seek_by_key_subkey(&mut self, key: T::Key, subkey: T::SubKey) -> ValueOnlyResult<T>;
/// Get an iterator that walks through the dup table.
///
/// The cursor will start at different points in the table depending on the values of `key` and
/// `subkey`:
///
/// | `key` | `subkey` | **Equivalent starting position** |
/// |--------|----------|-----------------------------------------|
/// | `None` | `None` | [`DbCursorRO::first()`] |
/// | `Some` | `None` | [`DbCursorRO::seek()`] |
/// | `None` | `Some` | [`DbDupCursorRO::seek_by_key_subkey()`] |
/// | `Some` | `Some` | [`DbDupCursorRO::seek_by_key_subkey()`] |
fn walk_dup(
&mut self,
key: Option<T::Key>,
subkey: Option<T::SubKey>,
) -> Result<DupWalker<'_, T, Self>, DatabaseError>
where
Self: Sized;
}
/// Read write cursor over table.
pub trait DbCursorRW<T: Table> {
/// Database operation that will update an existing row if a specified value already
/// exists in a table, and insert a new row if the specified value doesn't already exist
fn upsert(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError>;
/// Database operation that will insert a row at a given key. If the key is already
/// present, the operation will result in an error.
fn insert(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError>;
/// Append value to next cursor item.
///
/// This is efficient for pre-sorted data. If the data is not pre-sorted, use
/// [`DbCursorRW::insert`].
fn append(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError>;
/// Delete current value that cursor points to
fn delete_current(&mut self) -> Result<(), DatabaseError>;
}
/// Read Write Cursor over `DupSorted` table.
pub trait DbDupCursorRW<T: DupSort> {
/// Delete all duplicate entries for current key.
fn delete_current_duplicates(&mut self) -> Result<(), DatabaseError>;
/// Append duplicate value.
///
/// This is efficient for pre-sorted data. If the data is not pre-sorted, use `insert`.
fn append_dup(&mut self, key: T::Key, value: T::Value) -> Result<(), DatabaseError>;
}
/// Provides an iterator to `Cursor` when handling `Table`.
///
/// Reason why we have two lifetimes is to distinguish between `'cursor` lifetime
/// and inherited `'tx` lifetime. If there is only one, rust would short circle
/// the Cursor lifetime and it wouldn't be possible to use Walker.
pub struct Walker<'cursor, T: Table, CURSOR: DbCursorRO<T>> {
/// Cursor to be used to walk through the table.
cursor: &'cursor mut CURSOR,
/// `(key, value)` where to start the walk.
start: IterPairResult<T>,
}
impl<T, CURSOR> fmt::Debug for Walker<'_, T, CURSOR>
where
T: Table,
CURSOR: DbCursorRO<T> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Walker").field("cursor", &self.cursor).field("start", &self.start).finish()
}
}
impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> Iterator for Walker<'cursor, T, CURSOR> {
type Item = Result<TableRow<T>, DatabaseError>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.start.take();
if start.is_some() {
return start
}
self.cursor.next().transpose()
}
}
impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> Walker<'cursor, T, CURSOR> {
/// construct Walker
pub fn new(cursor: &'cursor mut CURSOR, start: IterPairResult<T>) -> Self {
Self { cursor, start }
}
/// convert current [`Walker`] to [`ReverseWalker`] which iterates reversely
pub fn rev(self) -> ReverseWalker<'cursor, T, CURSOR> {
let start = self.cursor.current().transpose();
ReverseWalker::new(self.cursor, start)
}
}
impl<'cursor, T: Table, CURSOR: DbCursorRW<T> + DbCursorRO<T>> Walker<'cursor, T, CURSOR> {
/// Delete current item that walker points to.
pub fn delete_current(&mut self) -> Result<(), DatabaseError> {
self.start.take();
self.cursor.delete_current()
}
}
/// Provides a reverse iterator to `Cursor` when handling `Table`.
/// Also check [`Walker`]
pub struct ReverseWalker<'cursor, T: Table, CURSOR: DbCursorRO<T>> {
/// Cursor to be used to walk through the table.
cursor: &'cursor mut CURSOR,
/// `(key, value)` where to start the walk.
start: IterPairResult<T>,
}
impl<T, CURSOR> fmt::Debug for ReverseWalker<'_, T, CURSOR>
where
T: Table,
CURSOR: DbCursorRO<T> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ReverseWalker")
.field("cursor", &self.cursor)
.field("start", &self.start)
.finish()
}
}
impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> ReverseWalker<'cursor, T, CURSOR> {
/// construct `ReverseWalker`
pub fn new(cursor: &'cursor mut CURSOR, start: IterPairResult<T>) -> Self {
Self { cursor, start }
}
/// convert current [`ReverseWalker`] to [`Walker`] which iterate forwardly
pub fn forward(self) -> Walker<'cursor, T, CURSOR> {
let start = self.cursor.current().transpose();
Walker::new(self.cursor, start)
}
}
impl<'cursor, T: Table, CURSOR: DbCursorRW<T> + DbCursorRO<T>> ReverseWalker<'cursor, T, CURSOR> {
/// Delete current item that walker points to.
pub fn delete_current(&mut self) -> Result<(), DatabaseError> {
self.start.take();
self.cursor.delete_current()
}
}
impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> Iterator for ReverseWalker<'cursor, T, CURSOR> {
type Item = Result<TableRow<T>, DatabaseError>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.start.take();
if start.is_some() {
return start
}
self.cursor.prev().transpose()
}
}
/// Provides a range iterator to `Cursor` when handling `Table`.
/// Also check [`Walker`]
pub struct RangeWalker<'cursor, T: Table, CURSOR: DbCursorRO<T>> {
/// Cursor to be used to walk through the table.
cursor: &'cursor mut CURSOR,
/// `(key, value)` where to start the walk.
start: IterPairResult<T>,
/// `key` where to stop the walk.
end_key: Bound<T::Key>,
/// flag whether is ended
is_done: bool,
}
impl<T, CURSOR> fmt::Debug for RangeWalker<'_, T, CURSOR>
where
T: Table,
CURSOR: DbCursorRO<T> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RangeWalker")
.field("cursor", &self.cursor)
.field("start", &self.start)
.field("end_key", &self.end_key)
.field("is_done", &self.is_done)
.finish()
}
}
impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> Iterator for RangeWalker<'cursor, T, CURSOR> {
type Item = Result<TableRow<T>, DatabaseError>;
fn next(&mut self) -> Option<Self::Item> {
if self.is_done {
return None
}
let next_item = self.start.take().or_else(|| self.cursor.next().transpose());
match next_item {
Some(Ok((key, value))) => match &self.end_key {
Bound::Included(end_key) if &key <= end_key => Some(Ok((key, value))),
Bound::Excluded(end_key) if &key < end_key => Some(Ok((key, value))),
Bound::Unbounded => Some(Ok((key, value))),
_ => {
self.is_done = true;
None
}
},
Some(res @ Err(_)) => Some(res),
None => {
self.is_done = matches!(self.end_key, Bound::Unbounded);
None
}
}
}
}
impl<'cursor, T: Table, CURSOR: DbCursorRO<T>> RangeWalker<'cursor, T, CURSOR> {
/// construct `RangeWalker`
pub fn new(
cursor: &'cursor mut CURSOR,
start: IterPairResult<T>,
end_key: Bound<T::Key>,
) -> Self {
// mark done if range is empty.
let is_done = match start {
Some(Ok((ref start_key, _))) => match &end_key {
Bound::Included(end_key) if start_key > end_key => true,
Bound::Excluded(end_key) if start_key >= end_key => true,
_ => false,
},
None => true,
_ => false,
};
Self { cursor, start, end_key, is_done }
}
}
impl<'cursor, T: Table, CURSOR: DbCursorRW<T> + DbCursorRO<T>> RangeWalker<'cursor, T, CURSOR> {
/// Delete current item that walker points to.
pub fn delete_current(&mut self) -> Result<(), DatabaseError> {
self.start.take();
self.cursor.delete_current()
}
}
/// Provides an iterator to `Cursor` when handling a `DupSort` table.
///
/// Reason why we have two lifetimes is to distinguish between `'cursor` lifetime
/// and inherited `'tx` lifetime. If there is only one, rust would short circle
/// the Cursor lifetime and it wouldn't be possible to use Walker.
pub struct DupWalker<'cursor, T: DupSort, CURSOR: DbDupCursorRO<T>> {
/// Cursor to be used to walk through the table.
pub cursor: &'cursor mut CURSOR,
/// Value where to start the walk.
pub start: IterPairResult<T>,
}
impl<T, CURSOR> fmt::Debug for DupWalker<'_, T, CURSOR>
where
T: DupSort,
CURSOR: DbDupCursorRO<T> + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DupWalker")
.field("cursor", &self.cursor)
.field("start", &self.start)
.finish()
}
}
impl<'cursor, T: DupSort, CURSOR: DbCursorRW<T> + DbDupCursorRO<T>> DupWalker<'cursor, T, CURSOR> {
/// Delete current item that walker points to.
pub fn delete_current(&mut self) -> Result<(), DatabaseError> {
self.start.take();
self.cursor.delete_current()
}
}
impl<'cursor, T: DupSort, CURSOR: DbDupCursorRO<T>> Iterator for DupWalker<'cursor, T, CURSOR> {
type Item = Result<TableRow<T>, DatabaseError>;
fn next(&mut self) -> Option<Self::Item> {
let start = self.start.take();
if start.is_some() {
return start
}
self.cursor.next_dup().transpose()
}
}

View File

@ -0,0 +1,78 @@
use crate::{
table::TableImporter,
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use std::{fmt::Debug, sync::Arc};
/// Main Database trait that can open read-only and read-write transactions.
///
/// Sealed trait which cannot be implemented by 3rd parties, exposed only for consumption.
pub trait Database: Send + Sync {
/// Read-Only database transaction
type TX: DbTx + Send + Sync + Debug + 'static;
/// Read-Write database transaction
type TXMut: DbTxMut + DbTx + TableImporter + Send + Sync + Debug + 'static;
/// Create read only transaction.
#[track_caller]
fn tx(&self) -> Result<Self::TX, DatabaseError>;
/// Create read write transaction only possible if database is open with write access.
#[track_caller]
fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError>;
/// Takes a function and passes a read-only transaction into it, making sure it's closed in the
/// end of the execution.
fn view<T, F>(&self, f: F) -> Result<T, DatabaseError>
where
F: FnOnce(&Self::TX) -> T,
{
let tx = self.tx()?;
let res = f(&tx);
tx.commit()?;
Ok(res)
}
/// Takes a function and passes a write-read transaction into it, making sure it's committed in
/// the end of the execution.
fn update<T, F>(&self, f: F) -> Result<T, DatabaseError>
where
F: FnOnce(&Self::TXMut) -> T,
{
let tx = self.tx_mut()?;
let res = f(&tx);
tx.commit()?;
Ok(res)
}
}
impl<DB: Database> Database for Arc<DB> {
type TX = <DB as Database>::TX;
type TXMut = <DB as Database>::TXMut;
fn tx(&self) -> Result<Self::TX, DatabaseError> {
<DB as Database>::tx(self)
}
fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
<DB as Database>::tx_mut(self)
}
}
impl<DB: Database> Database for &DB {
type TX = <DB as Database>::TX;
type TXMut = <DB as Database>::TXMut;
fn tx(&self) -> Result<Self::TX, DatabaseError> {
<DB as Database>::tx(self)
}
fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
<DB as Database>::tx_mut(self)
}
}

View File

@ -0,0 +1,74 @@
use metrics::{counter, gauge, histogram, Label};
use std::sync::Arc;
/// Represents a type that can report metrics, used mainly with the database. The `report_metrics`
/// method can be used as a prometheus hook.
pub trait DatabaseMetrics {
/// Reports metrics for the database.
fn report_metrics(&self) {
for (name, value, labels) in self.gauge_metrics() {
gauge!(name, labels).set(value);
}
for (name, value, labels) in self.counter_metrics() {
counter!(name, labels).increment(value);
}
for (name, value, labels) in self.histogram_metrics() {
histogram!(name, labels).record(value);
}
}
/// Returns a list of [Gauge](metrics::Gauge) metrics for the database.
fn gauge_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
vec![]
}
/// Returns a list of [Counter](metrics::Counter) metrics for the database.
fn counter_metrics(&self) -> Vec<(&'static str, u64, Vec<Label>)> {
vec![]
}
/// Returns a list of [Histogram](metrics::Histogram) metrics for the database.
fn histogram_metrics(&self) -> Vec<(&'static str, f64, Vec<Label>)> {
vec![]
}
}
impl<DB: DatabaseMetrics> DatabaseMetrics for Arc<DB> {
fn report_metrics(&self) {
<DB as DatabaseMetrics>::report_metrics(self)
}
}
/// The type used to store metadata about the database.
#[derive(Debug, Default)]
pub struct DatabaseMetadataValue {
/// The freelist size
freelist_size: Option<usize>,
}
impl DatabaseMetadataValue {
/// Creates a new [`DatabaseMetadataValue`] with the given freelist size.
pub const fn new(freelist_size: Option<usize>) -> Self {
Self { freelist_size }
}
/// Returns the freelist size, if available.
pub const fn freelist_size(&self) -> Option<usize> {
self.freelist_size
}
}
/// Includes a method to return a [`DatabaseMetadataValue`] type, which can be used to dynamically
/// retrieve information about the database.
pub trait DatabaseMetadata {
/// Returns a metadata type, [`DatabaseMetadataValue`] for the database.
fn metadata(&self) -> DatabaseMetadataValue;
}
impl<DB: DatabaseMetadata> DatabaseMetadata for Arc<DB> {
fn metadata(&self) -> DatabaseMetadataValue {
<DB as DatabaseMetadata>::metadata(self)
}
}

View File

@ -0,0 +1,83 @@
//! reth's database abstraction layer.
//!
//! The database abstraction assumes that the underlying store is a KV store subdivided into tables.
//!
//! One or more changes are tied to a transaction that is atomically committed to the data store at
//! the same time. Strong consistency in what data is written and when is important for reth, so it
//! is not possible to write data to the database outside of using a transaction.
//!
//! Good starting points for this crate are:
//!
//! - [`Database`] for the main database abstraction
//! - [`DbTx`] (RO) and [`DbTxMut`] (RW) for the transaction abstractions.
//! - [`DbCursorRO`] (RO) and [`DbCursorRW`] (RW) for the cursor abstractions (see below).
//!
//! # Cursors and Walkers
//!
//! The abstraction also defines a couple of helpful abstractions for iterating and writing data:
//!
//! - **Cursors** ([`DbCursorRO`] / [`DbCursorRW`]) for iterating data in a table. Cursors are
//! assumed to resolve data in a sorted manner when iterating from start to finish, and it is safe
//! to assume that they are efficient at doing so.
//! - **Walkers** ([`Walker`] / [`RangeWalker`] / [`ReverseWalker`]) use cursors to walk the entries
//! in a table, either fully from a specific point, or over a range.
//!
//! Dup tables (see below) also have corresponding cursors and walkers (e.g. [`DbDupCursorRO`]).
//! These **should** be preferred when working with dup tables, as they provide additional methods
//! that are optimized for dup tables.
//!
//! # Tables
//!
//! reth has two types of tables: simple KV stores (one key, one value) and dup tables (one key,
//! many values). Dup tables can be efficient for certain types of data.
//!
//! Keys are de/serialized using the [`Encode`] and [`Decode`] traits, and values are de/serialized
//! ("compressed") using the [`Compress`] and [`Decompress`] traits.
//!
//! Tables implement the [`Table`] trait.
//!
//! [`Database`]: crate::database::Database
//! [`DbTx`]: crate::transaction::DbTx
//! [`DbTxMut`]: crate::transaction::DbTxMut
//! [`DbCursorRO`]: crate::cursor::DbCursorRO
//! [`DbCursorRW`]: crate::cursor::DbCursorRW
//! [`Walker`]: crate::cursor::Walker
//! [`RangeWalker`]: crate::cursor::RangeWalker
//! [`ReverseWalker`]: crate::cursor::ReverseWalker
//! [`DbDupCursorRO`]: crate::cursor::DbDupCursorRO
//! [`Encode`]: crate::table::Encode
//! [`Decode`]: crate::table::Decode
//! [`Compress`]: crate::table::Compress
//! [`Decompress`]: crate::table::Decompress
//! [`Table`]: crate::table::Table
#![doc(
html_logo_url = "https://raw.githubusercontent.com/paradigmxyz/reth/main/assets/reth-docs.png",
html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256",
issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/"
)]
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
// TODO: remove when https://github.com/proptest-rs/proptest/pull/427 is merged
#![allow(unknown_lints, non_local_definitions)]
/// Common types used throughout the abstraction.
pub mod common;
/// Cursor database traits.
pub mod cursor;
/// Database traits.
pub mod database;
/// Database metrics trait extensions.
pub mod database_metrics;
pub mod mock;
/// Table traits
pub mod table;
/// Transaction database traits.
pub mod transaction;
/// Re-exports
pub use reth_storage_errors::db::{DatabaseError, DatabaseWriteOperation};
pub mod models;
mod scale;
mod utils;

View File

@ -0,0 +1,250 @@
//! Mock database
use crate::{
common::{IterPairResult, PairResult, ValueOnlyResult},
cursor::{
DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DupWalker, RangeWalker,
ReverseWalker, Walker,
},
database::Database,
table::{DupSort, Table, TableImporter},
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use core::ops::Bound;
use std::{collections::BTreeMap, ops::RangeBounds};
/// Mock database used for testing with inner `BTreeMap` structure
// TODO
#[derive(Clone, Debug, Default)]
pub struct DatabaseMock {
/// Main data. TODO (Make it table aware)
pub data: BTreeMap<Vec<u8>, Vec<u8>>,
}
impl Database for DatabaseMock {
type TX = TxMock;
type TXMut = TxMock;
fn tx(&self) -> Result<Self::TX, DatabaseError> {
Ok(TxMock::default())
}
fn tx_mut(&self) -> Result<Self::TXMut, DatabaseError> {
Ok(TxMock::default())
}
}
/// Mock read only tx
#[derive(Debug, Clone, Default)]
pub struct TxMock {
/// Table representation
_table: BTreeMap<Vec<u8>, Vec<u8>>,
}
impl DbTx for TxMock {
type Cursor<T: Table> = CursorMock;
type DupCursor<T: DupSort> = CursorMock;
fn get<T: Table>(&self, _key: T::Key) -> Result<Option<T::Value>, DatabaseError> {
Ok(None)
}
fn commit(self) -> Result<bool, DatabaseError> {
Ok(true)
}
fn abort(self) {}
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError> {
Ok(CursorMock { _cursor: 0 })
}
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError> {
Ok(CursorMock { _cursor: 0 })
}
fn entries<T: Table>(&self) -> Result<usize, DatabaseError> {
Ok(self._table.len())
}
fn disable_long_read_transaction_safety(&mut self) {}
}
impl DbTxMut for TxMock {
type CursorMut<T: Table> = CursorMock;
type DupCursorMut<T: DupSort> = CursorMock;
fn put<T: Table>(&self, _key: T::Key, _value: T::Value) -> Result<(), DatabaseError> {
Ok(())
}
fn delete<T: Table>(
&self,
_key: T::Key,
_value: Option<T::Value>,
) -> Result<bool, DatabaseError> {
Ok(true)
}
fn clear<T: Table>(&self) -> Result<(), DatabaseError> {
Ok(())
}
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError> {
Ok(CursorMock { _cursor: 0 })
}
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError> {
Ok(CursorMock { _cursor: 0 })
}
}
impl TableImporter for TxMock {}
/// Cursor that iterates over table
#[derive(Debug)]
pub struct CursorMock {
_cursor: u32,
}
impl<T: Table> DbCursorRO<T> for CursorMock {
fn first(&mut self) -> PairResult<T> {
Ok(None)
}
fn seek_exact(&mut self, _key: T::Key) -> PairResult<T> {
Ok(None)
}
fn seek(&mut self, _key: T::Key) -> PairResult<T> {
Ok(None)
}
fn next(&mut self) -> PairResult<T> {
Ok(None)
}
fn prev(&mut self) -> PairResult<T> {
Ok(None)
}
fn last(&mut self) -> PairResult<T> {
Ok(None)
}
fn current(&mut self) -> PairResult<T> {
Ok(None)
}
fn walk(&mut self, start_key: Option<T::Key>) -> Result<Walker<'_, T, Self>, DatabaseError> {
let start: IterPairResult<T> = match start_key {
Some(key) => <Self as DbCursorRO<T>>::seek(self, key).transpose(),
None => <Self as DbCursorRO<T>>::first(self).transpose(),
};
Ok(Walker::new(self, start))
}
fn walk_range(
&mut self,
range: impl RangeBounds<T::Key>,
) -> Result<RangeWalker<'_, T, Self>, DatabaseError> {
let start_key = match range.start_bound() {
Bound::Included(key) | Bound::Excluded(key) => Some((*key).clone()),
Bound::Unbounded => None,
};
let end_key = match range.end_bound() {
Bound::Included(key) | Bound::Excluded(key) => Bound::Included((*key).clone()),
Bound::Unbounded => Bound::Unbounded,
};
let start: IterPairResult<T> = match start_key {
Some(key) => <Self as DbCursorRO<T>>::seek(self, key).transpose(),
None => <Self as DbCursorRO<T>>::first(self).transpose(),
};
Ok(RangeWalker::new(self, start, end_key))
}
fn walk_back(
&mut self,
start_key: Option<T::Key>,
) -> Result<ReverseWalker<'_, T, Self>, DatabaseError> {
let start: IterPairResult<T> = match start_key {
Some(key) => <Self as DbCursorRO<T>>::seek(self, key).transpose(),
None => <Self as DbCursorRO<T>>::last(self).transpose(),
};
Ok(ReverseWalker::new(self, start))
}
}
impl<T: DupSort> DbDupCursorRO<T> for CursorMock {
fn next_dup(&mut self) -> PairResult<T> {
Ok(None)
}
fn next_no_dup(&mut self) -> PairResult<T> {
Ok(None)
}
fn next_dup_val(&mut self) -> ValueOnlyResult<T> {
Ok(None)
}
fn seek_by_key_subkey(
&mut self,
_key: <T as Table>::Key,
_subkey: <T as DupSort>::SubKey,
) -> ValueOnlyResult<T> {
Ok(None)
}
fn walk_dup(
&mut self,
_key: Option<<T>::Key>,
_subkey: Option<<T as DupSort>::SubKey>,
) -> Result<DupWalker<'_, T, Self>, DatabaseError> {
Ok(DupWalker { cursor: self, start: None })
}
}
impl<T: Table> DbCursorRW<T> for CursorMock {
fn upsert(
&mut self,
_key: <T as Table>::Key,
_value: <T as Table>::Value,
) -> Result<(), DatabaseError> {
Ok(())
}
fn insert(
&mut self,
_key: <T as Table>::Key,
_value: <T as Table>::Value,
) -> Result<(), DatabaseError> {
Ok(())
}
fn append(
&mut self,
_key: <T as Table>::Key,
_value: <T as Table>::Value,
) -> Result<(), DatabaseError> {
Ok(())
}
fn delete_current(&mut self) -> Result<(), DatabaseError> {
Ok(())
}
}
impl<T: DupSort> DbDupCursorRW<T> for CursorMock {
fn delete_current_duplicates(&mut self) -> Result<(), DatabaseError> {
Ok(())
}
fn append_dup(&mut self, _key: <T>::Key, _value: <T>::Value) -> Result<(), DatabaseError> {
Ok(())
}
}

View File

@ -0,0 +1,213 @@
//! Account related models and types.
use std::ops::{Range, RangeInclusive};
use crate::{
impl_fixed_arbitrary,
table::{Decode, Encode},
DatabaseError,
};
use reth_codecs::{derive_arbitrary, Compact};
use reth_primitives::{Account, Address, BlockNumber, Buf, StorageKey};
use serde::{Deserialize, Serialize};
/// Account as it is saved in the database.
///
/// [`Address`] is the subkey.
#[derive_arbitrary(compact)]
#[derive(Debug, Default, Clone, Eq, PartialEq, Serialize)]
pub struct AccountBeforeTx {
/// Address for the account. Acts as `DupSort::SubKey`.
pub address: Address,
/// Account state before the transaction.
pub info: Option<Account>,
}
// NOTE: Removing main_codec and manually encode subkey
// and compress second part of the value. If we have compression
// over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey
impl Compact for AccountBeforeTx {
fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
// for now put full bytes and later compress it.
buf.put_slice(self.address.as_slice());
let mut acc_len = 0;
if let Some(account) = self.info {
acc_len = account.to_compact(buf);
}
acc_len + 20
}
fn from_compact(mut buf: &[u8], len: usize) -> (Self, &[u8]) {
let address = Address::from_slice(&buf[..20]);
buf.advance(20);
let info = if len - 20 > 0 {
let (acc, advanced_buf) = Account::from_compact(buf, len - 20);
buf = advanced_buf;
Some(acc)
} else {
None
};
(Self { address, info }, buf)
}
}
/// [`BlockNumber`] concatenated with [`Address`].
///
/// Since it's used as a key, it isn't compressed when encoding it.
#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Ord, PartialOrd, Hash,
)]
pub struct BlockNumberAddress(pub (BlockNumber, Address));
impl BlockNumberAddress {
/// Create a new Range from `start` to `end`
///
/// Note: End is inclusive
pub fn range(range: RangeInclusive<BlockNumber>) -> Range<Self> {
(*range.start(), Address::ZERO).into()..(*range.end() + 1, Address::ZERO).into()
}
/// Return the block number
pub const fn block_number(&self) -> BlockNumber {
self.0 .0
}
/// Return the address
pub const fn address(&self) -> Address {
self.0 .1
}
/// Consumes `Self` and returns [`BlockNumber`], [`Address`]
pub const fn take(self) -> (BlockNumber, Address) {
(self.0 .0, self.0 .1)
}
}
impl From<(BlockNumber, Address)> for BlockNumberAddress {
fn from(tpl: (u64, Address)) -> Self {
Self(tpl)
}
}
impl Encode for BlockNumberAddress {
type Encoded = [u8; 28];
fn encode(self) -> Self::Encoded {
let block_number = self.0 .0;
let address = self.0 .1;
let mut buf = [0u8; 28];
buf[..8].copy_from_slice(&block_number.to_be_bytes());
buf[8..].copy_from_slice(address.as_slice());
buf
}
}
impl Decode for BlockNumberAddress {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
let value = value.as_ref();
let num = u64::from_be_bytes(value[..8].try_into().map_err(|_| DatabaseError::Decode)?);
let hash = Address::from_slice(&value[8..]);
Ok(Self((num, hash)))
}
}
/// [`Address`] concatenated with [`StorageKey`]. Used by `reth_etl` and history stages.
///
/// Since it's used as a key, it isn't compressed when encoding it.
#[derive(
Debug, Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Ord, PartialOrd, Hash,
)]
pub struct AddressStorageKey(pub (Address, StorageKey));
impl Encode for AddressStorageKey {
type Encoded = [u8; 52];
fn encode(self) -> Self::Encoded {
let address = self.0 .0;
let storage_key = self.0 .1;
let mut buf = [0u8; 52];
buf[..20].copy_from_slice(address.as_slice());
buf[20..].copy_from_slice(storage_key.as_slice());
buf
}
}
impl Decode for AddressStorageKey {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
let value = value.as_ref();
let address = Address::from_slice(&value[..20]);
let storage_key = StorageKey::from_slice(&value[20..]);
Ok(Self((address, storage_key)))
}
}
impl_fixed_arbitrary!((BlockNumberAddress, 28), (AddressStorageKey, 52));
#[cfg(test)]
mod tests {
use super::*;
use rand::{thread_rng, Rng};
use std::str::FromStr;
#[test]
fn test_block_number_address() {
let num = 1u64;
let hash = Address::from_str("ba5e000000000000000000000000000000000000").unwrap();
let key = BlockNumberAddress((num, hash));
let mut bytes = [0u8; 28];
bytes[..8].copy_from_slice(&num.to_be_bytes());
bytes[8..].copy_from_slice(hash.as_slice());
let encoded = Encode::encode(key);
assert_eq!(encoded, bytes);
let decoded: BlockNumberAddress = Decode::decode(encoded).unwrap();
assert_eq!(decoded, key);
}
#[test]
fn test_block_number_address_rand() {
let mut bytes = [0u8; 28];
thread_rng().fill(bytes.as_mut_slice());
let key = BlockNumberAddress::arbitrary(&mut Unstructured::new(&bytes)).unwrap();
assert_eq!(bytes, Encode::encode(key));
}
#[test]
fn test_address_storage_key() {
let storage_key = StorageKey::random();
let address = Address::from_str("ba5e000000000000000000000000000000000000").unwrap();
let key = AddressStorageKey((address, storage_key));
let mut bytes = [0u8; 52];
bytes[..20].copy_from_slice(address.as_slice());
bytes[20..].copy_from_slice(storage_key.as_slice());
let encoded = Encode::encode(key);
assert_eq!(encoded, bytes);
let decoded: AddressStorageKey = Decode::decode(encoded).unwrap();
assert_eq!(decoded, key);
}
#[test]
fn test_address_storage_key_rand() {
let mut bytes = [0u8; 52];
thread_rng().fill(bytes.as_mut_slice());
let key = AddressStorageKey::arbitrary(&mut Unstructured::new(&bytes)).unwrap();
assert_eq!(bytes, Encode::encode(key));
}
}

View File

@ -0,0 +1,117 @@
//! Block related models and types.
use reth_codecs::{main_codec, Compact};
use reth_primitives::{Header, TxNumber, Withdrawals, B256};
use std::ops::Range;
/// Total number of transactions.
pub type NumTransactions = u64;
/// The storage of the block body indices.
///
/// It has the pointer to the transaction Number of the first
/// transaction in the block and the total number of transactions.
#[derive(Debug, Default, Eq, PartialEq, Clone)]
#[main_codec]
pub struct StoredBlockBodyIndices {
/// The number of the first transaction in this block
///
/// Note: If the block is empty, this is the number of the first transaction
/// in the next non-empty block.
pub first_tx_num: TxNumber,
/// The total number of transactions in the block
///
/// NOTE: Number of transitions is equal to number of transactions with
/// additional transition for block change if block has block reward or withdrawal.
pub tx_count: NumTransactions,
}
impl StoredBlockBodyIndices {
/// Return the range of transaction ids for this block.
pub const fn tx_num_range(&self) -> Range<TxNumber> {
self.first_tx_num..self.first_tx_num + self.tx_count
}
/// Return the index of last transaction in this block unless the block
/// is empty in which case it refers to the last transaction in a previous
/// non-empty block
pub const fn last_tx_num(&self) -> TxNumber {
self.first_tx_num.saturating_add(self.tx_count).saturating_sub(1)
}
/// First transaction index.
///
/// Caution: If the block is empty, this is the number of the first transaction
/// in the next non-empty block.
pub const fn first_tx_num(&self) -> TxNumber {
self.first_tx_num
}
/// Return the index of the next transaction after this block.
pub const fn next_tx_num(&self) -> TxNumber {
self.first_tx_num + self.tx_count
}
/// Return a flag whether the block is empty
pub const fn is_empty(&self) -> bool {
self.tx_count == 0
}
/// Return number of transaction inside block
///
/// NOTE: This is not the same as the number of transitions.
pub const fn tx_count(&self) -> NumTransactions {
self.tx_count
}
}
/// The storage representation of a block's ommers.
///
/// It is stored as the headers of the block's uncles.
#[main_codec]
#[derive(Debug, Default, Eq, PartialEq, Clone)]
pub struct StoredBlockOmmers {
/// The block headers of this block's uncles.
pub ommers: Vec<Header>,
}
/// The storage representation of block withdrawals.
#[main_codec]
#[derive(Debug, Default, Eq, PartialEq, Clone)]
pub struct StoredBlockWithdrawals {
/// The block withdrawals.
pub withdrawals: Withdrawals,
}
/// Hash of the block header.
pub type HeaderHash = B256;
#[cfg(test)]
mod tests {
use super::*;
use crate::table::{Compress, Decompress};
#[test]
fn test_ommer() {
let mut ommer = StoredBlockOmmers::default();
ommer.ommers.push(Header::default());
ommer.ommers.push(Header::default());
assert_eq!(
ommer.clone(),
StoredBlockOmmers::decompress::<Vec<_>>(ommer.compress()).unwrap()
);
}
#[test]
fn block_indices() {
let first_tx_num = 10;
let tx_count = 6;
let block_indices = StoredBlockBodyIndices { first_tx_num, tx_count };
assert_eq!(block_indices.first_tx_num(), first_tx_num);
assert_eq!(block_indices.last_tx_num(), first_tx_num + tx_count - 1);
assert_eq!(block_indices.next_tx_num(), first_tx_num + tx_count);
assert_eq!(block_indices.tx_count(), tx_count);
assert_eq!(block_indices.tx_num_range(), first_tx_num..first_tx_num + tx_count);
}
}

View File

@ -0,0 +1,47 @@
//! Client version model.
use reth_codecs::{derive_arbitrary, Compact};
use serde::{Deserialize, Serialize};
/// Client version that accessed the database.
#[derive_arbitrary(compact)]
#[derive(Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
pub struct ClientVersion {
/// Client version
pub version: String,
/// The git commit sha
pub git_sha: String,
/// Build timestamp
pub build_timestamp: String,
}
impl ClientVersion {
/// Returns `true` if no version fields are set.
pub fn is_empty(&self) -> bool {
self.version.is_empty() && self.git_sha.is_empty() && self.build_timestamp.is_empty()
}
}
impl Compact for ClientVersion {
fn to_compact<B>(self, buf: &mut B) -> usize
where
B: bytes::BufMut + AsMut<[u8]>,
{
let Self { version, git_sha, build_timestamp } = self;
version.into_bytes().to_compact(buf);
git_sha.into_bytes().to_compact(buf);
build_timestamp.into_bytes().to_compact(buf)
}
fn from_compact(buf: &[u8], len: usize) -> (Self, &[u8]) {
let (version, buf) = Vec::<u8>::from_compact(buf, len);
let (git_sha, buf) = Vec::<u8>::from_compact(buf, len);
let (build_timestamp, buf) = Vec::<u8>::from_compact(buf, len);
let client_version = Self {
version: unsafe { String::from_utf8_unchecked(version) },
git_sha: unsafe { String::from_utf8_unchecked(git_sha) },
build_timestamp: unsafe { String::from_utf8_unchecked(build_timestamp) },
};
(client_version, buf)
}
}

View File

@ -0,0 +1,24 @@
//! Implements [`Compress`] and [`Decompress`] for [`IntegerList`]
use crate::{
table::{Compress, Decompress},
DatabaseError,
};
use reth_primitives::IntegerList;
impl Compress for IntegerList {
type Compressed = Vec<u8>;
fn compress(self) -> Self::Compressed {
self.to_bytes()
}
fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(self, buf: &mut B) {
self.to_mut_bytes(buf)
}
}
impl Decompress for IntegerList {
fn decompress<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
Self::from_bytes(value.as_ref()).map_err(|_| DatabaseError::Decode)
}
}

View File

@ -0,0 +1,383 @@
//! Implements data structures specific to the database
use crate::{
table::{Compress, Decode, Decompress, Encode},
DatabaseError,
};
use reth_codecs::{main_codec, Compact};
use reth_primitives::{
stage::StageCheckpoint,
trie::{StoredNibbles, StoredNibblesSubKey, *},
Address, PruneSegment, B256, *,
};
pub mod accounts;
pub mod blocks;
pub mod client_version;
pub mod integer_list;
pub mod sharded_key;
pub mod storage_sharded_key;
pub use accounts::*;
pub use blocks::*;
pub use client_version::ClientVersion;
pub use sharded_key::ShardedKey;
/// Macro that implements [`Encode`] and [`Decode`] for uint types.
macro_rules! impl_uints {
($($name:tt),+) => {
$(
impl Encode for $name {
type Encoded = [u8; std::mem::size_of::<$name>()];
fn encode(self) -> Self::Encoded {
self.to_be_bytes()
}
}
impl Decode for $name {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, $crate::DatabaseError> {
Ok(
$name::from_be_bytes(
value.as_ref().try_into().map_err(|_| $crate::DatabaseError::Decode)?
)
)
}
}
)+
};
}
impl_uints!(u64, u32, u16, u8);
impl Encode for Vec<u8> {
type Encoded = Self;
fn encode(self) -> Self::Encoded {
self
}
}
impl Decode for Vec<u8> {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
Ok(value.as_ref().to_vec())
}
}
impl Encode for Address {
type Encoded = [u8; 20];
fn encode(self) -> Self::Encoded {
self.0 .0
}
}
impl Decode for Address {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
Ok(Self::from_slice(value.as_ref()))
}
}
impl Encode for B256 {
type Encoded = [u8; 32];
fn encode(self) -> Self::Encoded {
self.0
}
}
impl Decode for B256 {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
Ok(Self::new(value.as_ref().try_into().map_err(|_| DatabaseError::Decode)?))
}
}
impl Encode for String {
type Encoded = Vec<u8>;
fn encode(self) -> Self::Encoded {
self.into_bytes()
}
}
impl Decode for String {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
Self::from_utf8(value.as_ref().to_vec()).map_err(|_| DatabaseError::Decode)
}
}
impl Encode for StoredNibbles {
type Encoded = Vec<u8>;
// Delegate to the Compact implementation
fn encode(self) -> Self::Encoded {
let mut buf = Vec::with_capacity(self.0.len());
self.to_compact(&mut buf);
buf
}
}
impl Decode for StoredNibbles {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
let buf = value.as_ref();
Ok(Self::from_compact(buf, buf.len()).0)
}
}
impl Encode for StoredNibblesSubKey {
type Encoded = Vec<u8>;
// Delegate to the Compact implementation
fn encode(self) -> Self::Encoded {
let mut buf = Vec::with_capacity(65);
self.to_compact(&mut buf);
buf
}
}
impl Decode for StoredNibblesSubKey {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
let buf = value.as_ref();
Ok(Self::from_compact(buf, buf.len()).0)
}
}
impl Encode for PruneSegment {
type Encoded = [u8; 1];
fn encode(self) -> Self::Encoded {
let mut buf = [0u8];
self.to_compact(&mut buf.as_mut());
buf
}
}
impl Decode for PruneSegment {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
let buf = value.as_ref();
Ok(Self::from_compact(buf, buf.len()).0)
}
}
impl Encode for ClientVersion {
type Encoded = Vec<u8>;
// Delegate to the Compact implementation
fn encode(self) -> Self::Encoded {
let mut buf = vec![];
self.to_compact(&mut buf);
buf
}
}
impl Decode for ClientVersion {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
let buf = value.as_ref();
Ok(Self::from_compact(buf, buf.len()).0)
}
}
/// Implements compression for Compact type.
macro_rules! impl_compression_for_compact {
($($name:tt),+) => {
$(
impl Compress for $name {
type Compressed = Vec<u8>;
fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(self, buf: &mut B) {
let _ = Compact::to_compact(self, buf);
}
}
impl Decompress for $name {
fn decompress<B: AsRef<[u8]>>(value: B) -> Result<$name, $crate::DatabaseError> {
let value = value.as_ref();
let (obj, _) = Compact::from_compact(&value, value.len());
Ok(obj)
}
}
)+
};
}
impl_compression_for_compact!(
SealedHeader,
Header,
Account,
Log,
Receipt,
TxType,
StorageEntry,
StoredBranchNode,
StoredNibbles,
StoredNibblesSubKey,
StorageTrieEntry,
StoredBlockBodyIndices,
StoredBlockOmmers,
StoredBlockWithdrawals,
Bytecode,
AccountBeforeTx,
TransactionSignedNoHash,
CompactU256,
StageCheckpoint,
PruneCheckpoint,
ClientVersion,
Requests,
// Non-DB
GenesisAccount
);
macro_rules! impl_compression_fixed_compact {
($($name:tt),+) => {
$(
impl Compress for $name
{
type Compressed = Vec<u8>;
fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(self, buf: &mut B) {
let _ = Compact::to_compact(self, buf);
}
fn uncompressable_ref(&self) -> Option<&[u8]> {
Some(self.as_ref())
}
}
impl Decompress for $name
{
fn decompress<B: AsRef<[u8]>>(value: B) -> Result<$name, $crate::DatabaseError> {
let value = value.as_ref();
let (obj, _) = Compact::from_compact(&value, value.len());
Ok(obj)
}
}
)+
};
}
impl_compression_fixed_compact!(B256, Address);
/// Adds wrapper structs for some primitive types so they can use `StructFlags` from Compact, when
/// used as pure table values.
macro_rules! add_wrapper_struct {
($(($name:tt, $wrapper:tt)),+) => {
$(
/// Wrapper struct so it can use StructFlags from Compact, when used as pure table values.
#[main_codec]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct $wrapper(pub $name);
impl From<$name> for $wrapper {
fn from(value: $name) -> Self {
$wrapper(value)
}
}
impl From<$wrapper> for $name {
fn from(value: $wrapper) -> Self {
value.0
}
}
impl std::ops::Deref for $wrapper {
type Target = $name;
fn deref(&self) -> &Self::Target {
&self.0
}
}
)+
};
}
add_wrapper_struct!((U256, CompactU256));
add_wrapper_struct!((u64, CompactU64));
add_wrapper_struct!((ClientVersion, CompactClientVersion));
#[cfg(test)]
mod tests {
use super::*;
use reth_primitives::{
stage::{
AccountHashingCheckpoint, CheckpointBlockRange, EntitiesCheckpoint,
ExecutionCheckpoint, HeadersCheckpoint, IndexHistoryCheckpoint, StageCheckpoint,
StageUnitCheckpoint, StorageHashingCheckpoint,
},
Account, Header, PruneCheckpoint, PruneMode, PruneSegment, Receipt, ReceiptWithBloom,
SealedHeader, TxEip1559, TxEip2930, TxEip4844, TxLegacy, Withdrawals,
};
// each value in the database has an extra field named flags that encodes metadata about other
// fields in the value, e.g. offset and length.
//
// this check is to ensure we do not inadvertently add too many fields to a struct which would
// expand the flags field and break backwards compatibility
#[test]
fn test_ensure_backwards_compatibility() {
#[cfg(not(feature = "optimism"))]
{
assert_eq!(Account::bitflag_encoded_bytes(), 2);
assert_eq!(AccountHashingCheckpoint::bitflag_encoded_bytes(), 1);
assert_eq!(CheckpointBlockRange::bitflag_encoded_bytes(), 1);
assert_eq!(CompactClientVersion::bitflag_encoded_bytes(), 0);
assert_eq!(CompactU256::bitflag_encoded_bytes(), 1);
assert_eq!(CompactU64::bitflag_encoded_bytes(), 1);
assert_eq!(EntitiesCheckpoint::bitflag_encoded_bytes(), 1);
assert_eq!(ExecutionCheckpoint::bitflag_encoded_bytes(), 0);
assert_eq!(Header::bitflag_encoded_bytes(), 4);
assert_eq!(HeadersCheckpoint::bitflag_encoded_bytes(), 0);
assert_eq!(IndexHistoryCheckpoint::bitflag_encoded_bytes(), 0);
assert_eq!(PruneCheckpoint::bitflag_encoded_bytes(), 1);
assert_eq!(PruneMode::bitflag_encoded_bytes(), 1);
assert_eq!(PruneSegment::bitflag_encoded_bytes(), 1);
assert_eq!(Receipt::bitflag_encoded_bytes(), 1);
assert_eq!(ReceiptWithBloom::bitflag_encoded_bytes(), 0);
assert_eq!(SealedHeader::bitflag_encoded_bytes(), 0);
assert_eq!(StageCheckpoint::bitflag_encoded_bytes(), 1);
assert_eq!(StageUnitCheckpoint::bitflag_encoded_bytes(), 1);
assert_eq!(StoredBlockBodyIndices::bitflag_encoded_bytes(), 1);
assert_eq!(StoredBlockOmmers::bitflag_encoded_bytes(), 0);
assert_eq!(StoredBlockWithdrawals::bitflag_encoded_bytes(), 0);
assert_eq!(StorageHashingCheckpoint::bitflag_encoded_bytes(), 1);
assert_eq!(TxEip1559::bitflag_encoded_bytes(), 4);
assert_eq!(TxEip2930::bitflag_encoded_bytes(), 3);
assert_eq!(TxEip4844::bitflag_encoded_bytes(), 5);
assert_eq!(TxLegacy::bitflag_encoded_bytes(), 3);
assert_eq!(Withdrawals::bitflag_encoded_bytes(), 0);
}
#[cfg(feature = "optimism")]
{
assert_eq!(Account::bitflag_encoded_bytes(), 2);
assert_eq!(AccountHashingCheckpoint::bitflag_encoded_bytes(), 1);
assert_eq!(CheckpointBlockRange::bitflag_encoded_bytes(), 1);
assert_eq!(CompactClientVersion::bitflag_encoded_bytes(), 0);
assert_eq!(CompactU256::bitflag_encoded_bytes(), 1);
assert_eq!(CompactU64::bitflag_encoded_bytes(), 1);
assert_eq!(EntitiesCheckpoint::bitflag_encoded_bytes(), 1);
assert_eq!(ExecutionCheckpoint::bitflag_encoded_bytes(), 0);
assert_eq!(Header::bitflag_encoded_bytes(), 4);
assert_eq!(HeadersCheckpoint::bitflag_encoded_bytes(), 0);
assert_eq!(IndexHistoryCheckpoint::bitflag_encoded_bytes(), 0);
assert_eq!(PruneCheckpoint::bitflag_encoded_bytes(), 1);
assert_eq!(PruneMode::bitflag_encoded_bytes(), 1);
assert_eq!(PruneSegment::bitflag_encoded_bytes(), 1);
assert_eq!(Receipt::bitflag_encoded_bytes(), 2);
assert_eq!(ReceiptWithBloom::bitflag_encoded_bytes(), 0);
assert_eq!(SealedHeader::bitflag_encoded_bytes(), 0);
assert_eq!(StageCheckpoint::bitflag_encoded_bytes(), 1);
assert_eq!(StageUnitCheckpoint::bitflag_encoded_bytes(), 1);
assert_eq!(StoredBlockBodyIndices::bitflag_encoded_bytes(), 1);
assert_eq!(StoredBlockOmmers::bitflag_encoded_bytes(), 0);
assert_eq!(StoredBlockWithdrawals::bitflag_encoded_bytes(), 0);
assert_eq!(StorageHashingCheckpoint::bitflag_encoded_bytes(), 1);
assert_eq!(TxEip1559::bitflag_encoded_bytes(), 4);
assert_eq!(TxEip2930::bitflag_encoded_bytes(), 3);
assert_eq!(TxEip4844::bitflag_encoded_bytes(), 5);
assert_eq!(TxLegacy::bitflag_encoded_bytes(), 3);
assert_eq!(Withdrawals::bitflag_encoded_bytes(), 0);
}
}
}

View File

@ -0,0 +1,86 @@
//! Sharded key
use crate::{
table::{Decode, Encode},
DatabaseError,
};
use reth_primitives::BlockNumber;
use serde::{Deserialize, Serialize};
use std::hash::Hash;
/// Number of indices in one shard.
pub const NUM_OF_INDICES_IN_SHARD: usize = 2_000;
/// Sometimes data can be too big to be saved for a single key. This helps out by dividing the data
/// into different shards. Example:
///
/// `Address | 200` -> data is from block 0 to 200.
///
/// `Address | 300` -> data is from block 201 to 300.
#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct ShardedKey<T> {
/// The key for this type.
pub key: T,
/// Highest block number to which `value` is related to.
pub highest_block_number: BlockNumber,
}
impl<T> AsRef<Self> for ShardedKey<T> {
fn as_ref(&self) -> &Self {
self
}
}
impl<T> ShardedKey<T> {
/// Creates a new `ShardedKey<T>`.
pub const fn new(key: T, highest_block_number: BlockNumber) -> Self {
Self { key, highest_block_number }
}
/// Creates a new key with the highest block number set to maximum.
/// This is useful when we want to search the last value for a given key.
pub const fn last(key: T) -> Self {
Self { key, highest_block_number: u64::MAX }
}
}
impl<T> Encode for ShardedKey<T>
where
T: Encode,
Vec<u8>: From<<T as Encode>::Encoded>,
{
type Encoded = Vec<u8>;
fn encode(self) -> Self::Encoded {
let mut buf: Vec<u8> = Encode::encode(self.key).into();
buf.extend_from_slice(&self.highest_block_number.to_be_bytes());
buf
}
}
impl<T> Decode for ShardedKey<T>
where
T: Decode,
{
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
let value = value.as_ref();
let tx_num_index = value.len() - 8;
let highest_tx_number = u64::from_be_bytes(
value[tx_num_index..].try_into().map_err(|_| DatabaseError::Decode)?,
);
let key = T::decode(&value[..tx_num_index])?;
Ok(Self::new(key, highest_tx_number))
}
}
impl<T> Hash for ShardedKey<T>
where
T: Hash,
{
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.key.hash(state);
self.highest_block_number.hash(state);
}
}

View File

@ -0,0 +1,76 @@
//! Storage sharded key
use crate::{
table::{Decode, Encode},
DatabaseError,
};
use derive_more::AsRef;
use reth_primitives::{Address, BlockNumber, B256};
use serde::{Deserialize, Serialize};
use super::ShardedKey;
/// Number of indices in one shard.
pub const NUM_OF_INDICES_IN_SHARD: usize = 2_000;
/// Sometimes data can be too big to be saved for a single key. This helps out by dividing the data
/// into different shards. Example:
///
/// `Address | Storagekey | 200` -> data is from transition 0 to 200.
///
/// `Address | StorageKey | 300` -> data is from transition 201 to 300.
#[derive(
Debug, Default, Clone, Eq, Ord, PartialOrd, PartialEq, AsRef, Serialize, Deserialize, Hash,
)]
pub struct StorageShardedKey {
/// Storage account address.
pub address: Address,
/// Storage slot with highest transition id.
#[as_ref]
pub sharded_key: ShardedKey<B256>,
}
impl StorageShardedKey {
/// Creates a new `StorageShardedKey`.
pub const fn new(
address: Address,
storage_key: B256,
highest_block_number: BlockNumber,
) -> Self {
Self { address, sharded_key: ShardedKey { key: storage_key, highest_block_number } }
}
/// Creates a new key with the highest block number set to maximum.
/// This is useful when we want to search the last value for a given key.
pub const fn last(address: Address, storage_key: B256) -> Self {
Self {
address,
sharded_key: ShardedKey { key: storage_key, highest_block_number: u64::MAX },
}
}
}
impl Encode for StorageShardedKey {
type Encoded = Vec<u8>;
fn encode(self) -> Self::Encoded {
let mut buf: Vec<u8> = Encode::encode(self.address).into();
buf.extend_from_slice(&Encode::encode(self.sharded_key.key));
buf.extend_from_slice(&self.sharded_key.highest_block_number.to_be_bytes());
buf
}
}
impl Decode for StorageShardedKey {
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError> {
let value = value.as_ref();
let tx_num_index = value.len() - 8;
let highest_tx_number = u64::from_be_bytes(
value[tx_num_index..].try_into().map_err(|_| DatabaseError::Decode)?,
);
let address = Address::decode(&value[..20])?;
let storage_key = B256::decode(&value[20..52])?;
Ok(Self { address, sharded_key: ShardedKey::new(storage_key, highest_tx_number) })
}
}

View File

@ -0,0 +1,52 @@
use crate::{
table::{Compress, Decompress},
DatabaseError,
};
use reth_primitives::*;
mod sealed {
pub trait Sealed {}
}
/// Marker trait type to restrict the [`Compress`] and [`Decompress`] with scale to chosen types.
pub trait ScaleValue: sealed::Sealed {}
impl<T> Compress for T
where
T: ScaleValue + parity_scale_codec::Encode + Sync + Send + std::fmt::Debug,
{
type Compressed = Vec<u8>;
fn compress(self) -> Self::Compressed {
parity_scale_codec::Encode::encode(&self)
}
fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(self, buf: &mut B) {
buf.put_slice(&parity_scale_codec::Encode::encode(&self))
}
}
impl<T> Decompress for T
where
T: ScaleValue + parity_scale_codec::Decode + Sync + Send + std::fmt::Debug,
{
fn decompress<B: AsRef<[u8]>>(value: B) -> Result<T, DatabaseError> {
parity_scale_codec::Decode::decode(&mut value.as_ref()).map_err(|_| DatabaseError::Decode)
}
}
/// Implements compression for SCALE type.
macro_rules! impl_compression_for_scale {
($($name:tt),+) => {
$(
impl ScaleValue for $name {}
impl sealed::Sealed for $name {}
)+
};
}
impl ScaleValue for Vec<u8> {}
impl sealed::Sealed for Vec<u8> {}
impl_compression_for_scale!(U256);
impl_compression_for_scale!(u8, u32, u16, u64);

View File

@ -0,0 +1,163 @@
use crate::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
/// Trait that will transform the data to be saved in the DB in a (ideally) compressed format
pub trait Compress: Send + Sync + Sized + Debug {
/// Compressed type.
type Compressed: bytes::BufMut
+ AsRef<[u8]>
+ AsMut<[u8]>
+ Into<Vec<u8>>
+ Default
+ Send
+ Sync
+ Debug;
/// If the type cannot be compressed, return its inner reference as `Some(self.as_ref())`
fn uncompressable_ref(&self) -> Option<&[u8]> {
None
}
/// Compresses data going into the database.
fn compress(self) -> Self::Compressed {
let mut buf = Self::Compressed::default();
self.compress_to_buf(&mut buf);
buf
}
/// Compresses data to a given buffer.
fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(self, buf: &mut B);
}
/// Trait that will transform the data to be read from the DB.
pub trait Decompress: Send + Sync + Sized + Debug {
/// Decompresses data coming from the database.
fn decompress<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError>;
/// Decompresses owned data coming from the database.
fn decompress_owned(value: Vec<u8>) -> Result<Self, DatabaseError> {
Self::decompress(value)
}
}
/// Trait that will transform the data to be saved in the DB.
pub trait Encode: Send + Sync + Sized + Debug {
/// Encoded type.
type Encoded: AsRef<[u8]> + Into<Vec<u8>> + Send + Sync + Ord + Debug;
/// Encodes data going into the database.
fn encode(self) -> Self::Encoded;
}
/// Trait that will transform the data to be read from the DB.
pub trait Decode: Send + Sync + Sized + Debug {
/// Decodes data coming from the database.
fn decode<B: AsRef<[u8]>>(value: B) -> Result<Self, DatabaseError>;
}
/// Generic trait that enforces the database key to implement [`Encode`] and [`Decode`].
pub trait Key: Encode + Decode + Ord + Clone + Serialize + for<'a> Deserialize<'a> {}
impl<T> Key for T where T: Encode + Decode + Ord + Clone + Serialize + for<'a> Deserialize<'a> {}
/// Generic trait that enforces the database value to implement [`Compress`] and [`Decompress`].
pub trait Value: Compress + Decompress + Serialize {}
impl<T> Value for T where T: Compress + Decompress + Serialize {}
/// Generic trait that a database table should follow.
///
/// The [`Table::Key`] and [`Table::Value`] types should implement [`Encode`] and
/// [`Decode`] when appropriate. These traits define how the data is stored and read from the
/// database.
///
/// It allows for the use of codecs. See [`crate::models::ShardedKey`] for a custom
/// implementation.
pub trait Table: Send + Sync + Debug + 'static {
/// The table's name.
const NAME: &'static str;
/// Key element of `Table`.
///
/// Sorting should be taken into account when encoding this.
type Key: Key;
/// Value element of `Table`.
type Value: Value;
}
/// Tuple with `T::Key` and `T::Value`.
pub type TableRow<T> = (<T as Table>::Key, <T as Table>::Value);
/// `DupSort` allows for keys to be repeated in the database.
///
/// Upstream docs: <https://libmdbx.dqdkfa.ru/usage.html#autotoc_md48>
pub trait DupSort: Table {
/// The table subkey. This type must implement [`Encode`] and [`Decode`].
///
/// Sorting should be taken into account when encoding this.
///
/// Upstream docs: <https://libmdbx.dqdkfa.ru/usage.html#autotoc_md48>
type SubKey: Key;
}
/// Allows duplicating tables across databases
pub trait TableImporter: DbTxMut {
/// Imports all table data from another transaction.
fn import_table<T: Table, R: DbTx>(&self, source_tx: &R) -> Result<(), DatabaseError> {
let mut destination_cursor = self.cursor_write::<T>()?;
for kv in source_tx.cursor_read::<T>()?.walk(None)? {
let (k, v) = kv?;
destination_cursor.append(k, v)?;
}
Ok(())
}
/// Imports table data from another transaction within a range.
fn import_table_with_range<T: Table, R: DbTx>(
&self,
source_tx: &R,
from: Option<<T as Table>::Key>,
to: <T as Table>::Key,
) -> Result<(), DatabaseError>
where
T::Key: Default,
{
let mut destination_cursor = self.cursor_write::<T>()?;
let mut source_cursor = source_tx.cursor_read::<T>()?;
let source_range = match from {
Some(from) => source_cursor.walk_range(from..=to),
None => source_cursor.walk_range(..=to),
};
for row in source_range? {
let (key, value) = row?;
destination_cursor.append(key, value)?;
}
Ok(())
}
/// Imports all dupsort data from another transaction.
fn import_dupsort<T: DupSort, R: DbTx>(&self, source_tx: &R) -> Result<(), DatabaseError> {
let mut destination_cursor = self.cursor_dup_write::<T>()?;
let mut cursor = source_tx.cursor_dup_read::<T>()?;
while let Some((k, _)) = cursor.next_no_dup()? {
for kv in cursor.walk_dup(Some(k), None)? {
let (k, v) = kv?;
destination_cursor.append_dup(k, v)?;
}
}
Ok(())
}
}

View File

@ -0,0 +1,54 @@
use crate::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
table::{DupSort, Table},
DatabaseError,
};
/// Read only transaction
pub trait DbTx: Send + Sync {
/// Cursor type for this read-only transaction
type Cursor<T: Table>: DbCursorRO<T> + Send + Sync;
/// `DupCursor` type for this read-only transaction
type DupCursor<T: DupSort>: DbDupCursorRO<T> + DbCursorRO<T> + Send + Sync;
/// Get value
fn get<T: Table>(&self, key: T::Key) -> Result<Option<T::Value>, DatabaseError>;
/// Commit for read only transaction will consume and free transaction and allows
/// freeing of memory pages
fn commit(self) -> Result<bool, DatabaseError>;
/// Aborts transaction
fn abort(self);
/// Iterate over read only values in table.
fn cursor_read<T: Table>(&self) -> Result<Self::Cursor<T>, DatabaseError>;
/// Iterate over read only values in dup sorted table.
fn cursor_dup_read<T: DupSort>(&self) -> Result<Self::DupCursor<T>, DatabaseError>;
/// Returns number of entries in the table.
fn entries<T: Table>(&self) -> Result<usize, DatabaseError>;
/// Disables long-lived read transaction safety guarantees.
fn disable_long_read_transaction_safety(&mut self);
}
/// Read write transaction that allows writing to database
pub trait DbTxMut: Send + Sync {
/// Read-Write Cursor type
type CursorMut<T: Table>: DbCursorRW<T> + DbCursorRO<T> + Send + Sync;
/// Read-Write `DupCursor` type
type DupCursorMut<T: DupSort>: DbDupCursorRW<T>
+ DbCursorRW<T>
+ DbDupCursorRO<T>
+ DbCursorRO<T>
+ Send
+ Sync;
/// Put value to database
fn put<T: Table>(&self, key: T::Key, value: T::Value) -> Result<(), DatabaseError>;
/// Delete value from database
fn delete<T: Table>(&self, key: T::Key, value: Option<T::Value>)
-> Result<bool, DatabaseError>;
/// Clears database.
fn clear<T: Table>(&self) -> Result<(), DatabaseError>;
/// Cursor mut
fn cursor_write<T: Table>(&self) -> Result<Self::CursorMut<T>, DatabaseError>;
/// `DupCursor` mut.
fn cursor_dup_write<T: DupSort>(&self) -> Result<Self::DupCursorMut<T>, DatabaseError>;
}

View File

@ -0,0 +1,34 @@
#[macro_export]
/// Implements the `Arbitrary` trait for types with fixed array types.
macro_rules! impl_fixed_arbitrary {
($(($name:ident, $size:expr)),*) => {
#[cfg(any(test, feature = "arbitrary"))]
use arbitrary::{Arbitrary, Unstructured};
$(
#[cfg(any(test, feature = "arbitrary"))]
impl<'a> Arbitrary<'a> for $name {
fn arbitrary(u: &mut Unstructured<'a>) -> Result<Self, arbitrary::Error> {
let mut buffer = vec![0; $size];
u.fill_buffer(buffer.as_mut_slice())?;
Decode::decode(buffer).map_err(|_| arbitrary::Error::IncorrectFormat)
}
}
#[cfg(any(test, feature = "arbitrary"))]
impl proptest::prelude::Arbitrary for $name {
type Parameters = ();
type Strategy = proptest::strategy::Map<
proptest::collection::VecStrategy<<u8 as proptest::arbitrary::Arbitrary>::Strategy>,
fn(Vec<u8>) -> Self,
>;
fn arbitrary_with(args: Self::Parameters) -> Self::Strategy {
use proptest::strategy::Strategy;
proptest::collection::vec(proptest::arbitrary::any_with::<u8>(args), $size)
.prop_map(move |vec| Decode::decode(vec).unwrap())
}
}
)+
};
}