headers(part1) feat(interfaces): introduce implicit trait bounds (#117)

* feat(ifaces): modify DB GAT to have implicit bounds

https://sabrinajewson.org/blog/the-better-alternative-to-lifetime-gats\#the-better-gats

* test(ifaces): impl new trait for mock

* feat(ifaces): impl db container

we use that downstream to avoid consuming the db transactions
when committing a stage

* fix(test): explicitly define lifetime

* test: ensure can spawn

* test: pls review

* chore: lints/warnings

* feat(db): impl the new gats

* test(db): try to make real db work with gats like in stages

* test(db): make Stage trait take DBContainer

otherwise we cannot call db.commit()
gst

* feat(stages): impl new traits

* chore(db): cleanup tests

* chore: remove unused imports
This commit is contained in:
Georgios Konstantopoulos
2022-10-23 23:10:59 -07:00
committed by GitHub
parent a123a2b6fd
commit a4e505132c
9 changed files with 211 additions and 50 deletions

2
Cargo.lock generated
View File

@ -2413,6 +2413,7 @@ version = "0.1.0"
name = "reth-db"
version = "0.1.0"
dependencies = [
"async-trait",
"bytes",
"criterion",
"eyre",
@ -2427,6 +2428,7 @@ dependencies = [
"tempfile",
"test-fuzz",
"thiserror",
"tokio",
]
[[package]]

View File

@ -30,8 +30,10 @@ tempfile = "3.3.0"
test-fuzz = "3.0.4"
criterion = "0.4.0"
iai = "0.1.1"
tokio = { version = "1.21.2", features = ["full"] }
reth-interfaces = { path = "../interfaces",features=["bench"] }
async-trait = "0.1.58"
[features]
test-utils = ["tempfile"]
@ -44,4 +46,4 @@ harness = false
[[bench]]
name = "encoding_iai"
harness = false
harness = false

View File

@ -7,7 +7,7 @@ use libmdbx::{
};
use reth_interfaces::db::{
tables::{TableType, TABLES},
Database, Error,
Database, DatabaseGAT, Error,
};
use std::{ops::Deref, path::Path};
@ -32,15 +32,17 @@ pub struct Env<E: EnvironmentKind> {
pub inner: Environment<E>,
}
impl<E: EnvironmentKind> Database for Env<E> {
type TX<'a> = tx::Tx<'a, RO, E>;
type TXMut<'a> = tx::Tx<'a, RW, E>;
impl<'a, E: EnvironmentKind> DatabaseGAT<'a> for Env<E> {
type TX = tx::Tx<'a, RO, E>;
type TXMut = tx::Tx<'a, RW, E>;
}
fn tx(&self) -> Result<Self::TX<'_>, Error> {
impl<E: EnvironmentKind> Database for Env<E> {
fn tx(&self) -> Result<<Self as DatabaseGAT<'_>>::TX, Error> {
Ok(Tx::new(self.inner.begin_ro_txn().map_err(|e| Error::Internal(e.into()))?))
}
fn tx_mut(&self) -> Result<Self::TXMut<'_>, Error> {
fn tx_mut(&self) -> Result<<Self as DatabaseGAT<'_>>::TXMut, Error> {
Ok(Tx::new(self.inner.begin_rw_txn().map_err(|e| Error::Internal(e.into()))?))
}
}
@ -204,3 +206,36 @@ mod tests {
assert!(result == Some(value))
}
}
#[cfg(test)]
// This ensures that we can use the GATs in the downstream staged exec pipeline.
mod gat_tests {
use super::*;
use reth_interfaces::db::{mock::DatabaseMock, DBContainer};
#[async_trait::async_trait]
trait Stage<DB: Database> {
async fn run(&mut self, db: &mut DBContainer<'_, DB>) -> ();
}
struct MyStage<'a, DB>(&'a DB);
#[async_trait::async_trait]
impl<'c, DB: Database> Stage<DB> for MyStage<'c, DB> {
async fn run(&mut self, db: &mut DBContainer<'_, DB>) -> () {
let _tx = db.commit().unwrap();
()
}
}
#[test]
#[should_panic] // no tokio runtime configured
fn can_spawn() {
let db = DatabaseMock::default();
tokio::spawn(async move {
let mut container = DBContainer::new(&db).unwrap();
let mut stage = MyStage(&db);
let _ = stage.run(&mut container);
});
}
}

View File

@ -24,6 +24,7 @@ parity-scale-codec = { version = "3.2.1", features = ["bytes"] }
[dev-dependencies]
test-fuzz = "3.0.4"
tokio = { version = "1.21.2", features = ["full"] }
[features]
bench = []
bench = []

View File

@ -0,0 +1,101 @@
use crate::db::{Database, DatabaseGAT, DbTx, Error};
/// A container for any DB transaction that will open a new inner transaction when the current
/// one is committed.
// NOTE: This container is needed since `Transaction::commit` takes `mut self`, so methods in
// the pipeline that just take a reference will not be able to commit their transaction and let
// the pipeline continue. Is there a better way to do this?
pub struct DBContainer<'this, DB: Database> {
/// A handle to the DB.
pub(crate) db: &'this DB,
tx: Option<<DB as DatabaseGAT<'this>>::TXMut>,
}
impl<'this, DB> DBContainer<'this, DB>
where
DB: Database,
{
/// Create a new container with the given database handle.
///
/// A new inner transaction will be opened.
pub fn new(db: &'this DB) -> Result<Self, Error> {
Ok(Self { db, tx: Some(db.tx_mut()?) })
}
/// Commit the current inner transaction and open a new one.
///
/// # Panics
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [DBContainer::close] was called without following up with a call to [DBContainer::open].
pub fn commit(&mut self) -> Result<bool, Error> {
let success =
self.tx.take().expect("Tried committing a non-existent transaction").commit()?;
self.tx = Some(self.db.tx_mut()?);
Ok(success)
}
/// Get the inner transaction.
///
/// # Panics
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [DBContainer::close] was called without following up with a call to [DBContainer::open].
pub fn get(&self) -> &<DB as DatabaseGAT<'this>>::TXMut {
self.tx.as_ref().expect("Tried getting a reference to a non-existent transaction")
}
/// Get a mutable reference to the inner transaction.
///
/// # Panics
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [DBContainer::close] was called without following up with a call to [DBContainer::open].
pub fn get_mut(&mut self) -> &mut <DB as DatabaseGAT<'this>>::TXMut {
self.tx.as_mut().expect("Tried getting a mutable reference to a non-existent transaction")
}
/// Open a new inner transaction.
pub fn open(&mut self) -> Result<(), Error> {
self.tx = Some(self.db.tx_mut()?);
Ok(())
}
/// Close the current inner transaction.
pub fn close(&mut self) {
self.tx.take();
}
}
#[cfg(test)]
// This ensures that we can use the GATs in the downstream staged exec pipeline.
mod tests {
use super::*;
use crate::db::mock::DatabaseMock;
#[async_trait::async_trait]
trait Stage<DB: Database> {
async fn run(&mut self, db: &mut DBContainer<'_, DB>) -> ();
}
struct MyStage<'a, DB>(&'a DB);
#[async_trait::async_trait]
impl<'a, DB: Database> Stage<DB> for MyStage<'a, DB> {
async fn run(&mut self, db: &mut DBContainer<'_, DB>) -> () {
let _tx = db.commit().unwrap();
()
}
}
#[test]
#[should_panic] // no tokio runtime configured
fn can_spawn() {
let db = DatabaseMock::default();
tokio::spawn(async move {
let mut container = DBContainer::new(&db).unwrap();
let mut stage = MyStage(&db);
let _ = stage.run(&mut container);
});
}
}

View File

@ -2,7 +2,8 @@
use std::collections::BTreeMap;
use super::{
Database, DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DbTx, DbTxMut, DupSort, Table,
Database, DatabaseGAT, DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DbTx, DbTxMut,
DupSort, Table,
};
/// Mock database used for testing with inner BTreeMap structure
@ -14,19 +15,21 @@ pub struct DatabaseMock {
}
impl Database for DatabaseMock {
type TX<'a> = TxMock;
type TXMut<'a> = TxMock;
fn tx(&self) -> Result<Self::TX<'_>, super::Error> {
fn tx(&self) -> Result<<Self as DatabaseGAT<'_>>::TX, super::Error> {
Ok(TxMock::default())
}
fn tx_mut(&self) -> Result<Self::TXMut<'_>, super::Error> {
fn tx_mut(&self) -> Result<<Self as DatabaseGAT<'_>>::TXMut, super::Error> {
Ok(TxMock::default())
}
}
impl<'a> DatabaseGAT<'a> for DatabaseMock {
type TX = TxMock;
type TXMut = TxMock;
}
/// Mock read only tx
#[derive(Clone, Default)]
pub struct TxMock {

View File

@ -1,4 +1,5 @@
pub mod codecs;
mod container;
mod error;
pub mod mock;
pub mod models;
@ -8,26 +9,40 @@ pub mod tables;
pub use error::Error;
pub use table::*;
/// Main Database trait that spawns transactions to be executed.
pub trait Database {
pub use container::DBContainer;
// Sealed trait helper to prevent misuse of the API.
mod sealed {
pub trait Sealed: Sized {}
pub struct Bounds<T>(T);
impl<T> Sealed for Bounds<T> {}
}
use sealed::{Bounds, Sealed};
/// Implements the GAT method from:
/// https://sabrinajewson.org/blog/the-better-alternative-to-lifetime-gats#the-better-gats.
///
/// Sealed trait which cannot be implemented by 3rd parties, exposed only for implementers
pub trait DatabaseGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync {
/// RO database transaction
type TX<'a>: DbTx<'a> + Send + Sync
where
Self: 'a;
type TX: DbTx<'a> + Send + Sync;
/// RW database transaction
type TXMut<'a>: DbTxMut<'a> + DbTx<'a> + Send + Sync
where
Self: 'a;
type TXMut: DbTxMut<'a> + DbTx<'a> + Send + Sync;
}
/// Main Database trait that spawns transactions to be executed.
pub trait Database: for<'a> DatabaseGAT<'a> {
/// Create read only transaction.
fn tx(&self) -> Result<Self::TX<'_>, Error>;
fn tx(&self) -> Result<<Self as DatabaseGAT<'_>>::TX, Error>;
/// Create read write transaction only possible if database is open with write access.
fn tx_mut(&self) -> Result<Self::TXMut<'_>, Error>;
fn tx_mut(&self) -> Result<<Self as DatabaseGAT<'_>>::TXMut, Error>;
/// Takes a function and passes a read-only transaction into it, making sure it's closed in the
/// end of the execution.
fn view<T, F>(&self, f: F) -> Result<T, Error>
where
F: Fn(&Self::TX<'_>) -> T,
F: Fn(&<Self as DatabaseGAT<'_>>::TX) -> T,
{
let tx = self.tx()?;
@ -41,7 +56,7 @@ pub trait Database {
/// the end of the execution.
fn update<T, F>(&self, f: F) -> Result<T, Error>
where
F: Fn(&Self::TXMut<'_>) -> T,
F: Fn(&<Self as DatabaseGAT<'_>>::TXMut) -> T,
{
let tx = self.tx_mut()?;

View File

@ -2,7 +2,7 @@ use crate::{
error::*, util::opt::MaybeSender, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput,
};
use reth_interfaces::db::{Database, DbTx};
use reth_interfaces::db::{DBContainer, Database, DbTx};
use reth_primitives::BlockNumber;
use std::fmt::{Debug, Formatter};
use tokio::sync::mpsc::Sender;
@ -218,13 +218,14 @@ impl<DB: Database> Pipeline<DB> {
};
// Unwind stages in reverse order of priority (i.e. higher priority = first)
let tx = db.tx_mut()?;
let mut db = DBContainer::new(db)?;
for (_, QueuedStage { stage, .. }) in unwind_pipeline.iter_mut() {
let stage_id = stage.id();
let span = info_span!("Unwinding", stage = %stage_id);
let _enter = span.enter();
let mut stage_progress = stage_id.get_progress(&tx)?.unwrap_or_default();
let mut stage_progress = stage_id.get_progress(db.get())?.unwrap_or_default();
if stage_progress < to {
debug!(from = %stage_progress, %to, "Unwind point too far for stage");
self.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
@ -236,11 +237,11 @@ impl<DB: Database> Pipeline<DB> {
let input = UnwindInput { stage_progress, unwind_to: to, bad_block };
self.events_sender.send(PipelineEvent::Unwinding { stage_id, input }).await?;
let output = stage.unwind(&tx, input).await;
let output = stage.unwind(&mut db, input).await;
match output {
Ok(unwind_output) => {
stage_progress = unwind_output.stage_progress;
stage_id.save_progress(&tx, stage_progress)?;
stage_id.save_progress(db.get(), stage_progress)?;
self.events_sender
.send(PipelineEvent::Unwound { stage_id, result: unwind_output })
@ -254,7 +255,7 @@ impl<DB: Database> Pipeline<DB> {
}
}
tx.commit()?;
db.commit()?;
Ok(())
}
}
@ -288,10 +289,10 @@ impl<DB: Database> QueuedStage<DB> {
return Ok(ControlFlow::Continue)
}
let mut tx = db.tx_mut()?;
loop {
let prev_progress = stage_id.get_progress(&tx)?;
let mut db = DBContainer::new(db)?;
let prev_progress = stage_id.get_progress(db.get())?;
let stage_reached_max_block = prev_progress
.zip(state.max_block)
@ -312,12 +313,12 @@ impl<DB: Database> QueuedStage<DB> {
match self
.stage
.execute(&tx, ExecInput { previous_stage, stage_progress: prev_progress })
.execute(&mut db, ExecInput { previous_stage, stage_progress: prev_progress })
.await
{
Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => {
debug!(stage = %stage_id, %stage_progress, %done, "Stage made progress");
stage_id.save_progress(&tx, stage_progress)?;
stage_id.save_progress(db.get(), stage_progress)?;
state
.events_sender
@ -325,9 +326,7 @@ impl<DB: Database> QueuedStage<DB> {
.await?;
// TODO: Make the commit interval configurable
tx.commit()?;
// create new mut transaction.
tx = db.tx_mut()?;
db.commit()?;
state.record_progress_outliers(stage_progress);
state.set_reached_tip(reached_tip);
@ -755,9 +754,9 @@ mod tests {
self.id
}
async fn execute<'db>(
async fn execute(
&mut self,
_: &DB::TXMut<'db>,
_: &mut DBContainer<'_, DB>,
_input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.exec_outputs
@ -765,9 +764,9 @@ mod tests {
.unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id))
}
async fn unwind<'db>(
async fn unwind(
&mut self,
_: &DB::TXMut<'db>,
_: &mut DBContainer<'_, DB>,
_input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
self.unwind_outputs

View File

@ -1,6 +1,6 @@
use crate::{error::StageError, id::StageId};
use async_trait::async_trait;
use reth_interfaces::db::Database;
use reth_interfaces::db::{DBContainer, Database};
use reth_primitives::BlockNumber;
/// Stage execution input, see [Stage::execute].
@ -50,6 +50,9 @@ pub struct UnwindOutput {
/// ([Stage::execute]) and a way to "roll back" ([Stage::unwind]).
///
/// Stages are executed as part of a pipeline where they are executed serially.
///
/// Stages receive a [`DBContainer`] which manages the lifecycle of a transaction, such
/// as when to commit / reopen a new one etc.
#[async_trait]
pub trait Stage<DB: Database>: Send + Sync {
/// Get the ID of the stage.
@ -58,16 +61,16 @@ pub trait Stage<DB: Database>: Send + Sync {
fn id(&self) -> StageId;
/// Execute the stage.
async fn execute<'db>(
async fn execute(
&mut self,
tx: &DB::TXMut<'db>,
db: &mut DBContainer<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError>;
/// Unwind the stage.
async fn unwind<'db>(
async fn unwind(
&mut self,
tx: &DB::TXMut<'db>,
db: &mut DBContainer<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>;
}