diff --git a/Cargo.lock b/Cargo.lock index 864636b92..80c874264 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2413,6 +2413,7 @@ version = "0.1.0" name = "reth-db" version = "0.1.0" dependencies = [ + "async-trait", "bytes", "criterion", "eyre", @@ -2427,6 +2428,7 @@ dependencies = [ "tempfile", "test-fuzz", "thiserror", + "tokio", ] [[package]] diff --git a/crates/db/Cargo.toml b/crates/db/Cargo.toml index c969e0901..7ecb1605a 100644 --- a/crates/db/Cargo.toml +++ b/crates/db/Cargo.toml @@ -30,8 +30,10 @@ tempfile = "3.3.0" test-fuzz = "3.0.4" criterion = "0.4.0" iai = "0.1.1" +tokio = { version = "1.21.2", features = ["full"] } reth-interfaces = { path = "../interfaces",features=["bench"] } +async-trait = "0.1.58" [features] test-utils = ["tempfile"] @@ -44,4 +46,4 @@ harness = false [[bench]] name = "encoding_iai" -harness = false \ No newline at end of file +harness = false diff --git a/crates/db/src/kv/mod.rs b/crates/db/src/kv/mod.rs index 4937d19a4..f75acea1e 100644 --- a/crates/db/src/kv/mod.rs +++ b/crates/db/src/kv/mod.rs @@ -7,7 +7,7 @@ use libmdbx::{ }; use reth_interfaces::db::{ tables::{TableType, TABLES}, - Database, Error, + Database, DatabaseGAT, Error, }; use std::{ops::Deref, path::Path}; @@ -32,15 +32,17 @@ pub struct Env { pub inner: Environment, } -impl Database for Env { - type TX<'a> = tx::Tx<'a, RO, E>; - type TXMut<'a> = tx::Tx<'a, RW, E>; +impl<'a, E: EnvironmentKind> DatabaseGAT<'a> for Env { + type TX = tx::Tx<'a, RO, E>; + type TXMut = tx::Tx<'a, RW, E>; +} - fn tx(&self) -> Result, Error> { +impl Database for Env { + fn tx(&self) -> Result<>::TX, Error> { Ok(Tx::new(self.inner.begin_ro_txn().map_err(|e| Error::Internal(e.into()))?)) } - fn tx_mut(&self) -> Result, Error> { + fn tx_mut(&self) -> Result<>::TXMut, Error> { Ok(Tx::new(self.inner.begin_rw_txn().map_err(|e| Error::Internal(e.into()))?)) } } @@ -204,3 +206,36 @@ mod tests { assert!(result == Some(value)) } } + +#[cfg(test)] +// This ensures that we can use the GATs in the downstream staged exec pipeline. +mod gat_tests { + use super::*; + use reth_interfaces::db::{mock::DatabaseMock, DBContainer}; + + #[async_trait::async_trait] + trait Stage { + async fn run(&mut self, db: &mut DBContainer<'_, DB>) -> (); + } + + struct MyStage<'a, DB>(&'a DB); + + #[async_trait::async_trait] + impl<'c, DB: Database> Stage for MyStage<'c, DB> { + async fn run(&mut self, db: &mut DBContainer<'_, DB>) -> () { + let _tx = db.commit().unwrap(); + () + } + } + + #[test] + #[should_panic] // no tokio runtime configured + fn can_spawn() { + let db = DatabaseMock::default(); + tokio::spawn(async move { + let mut container = DBContainer::new(&db).unwrap(); + let mut stage = MyStage(&db); + let _ = stage.run(&mut container); + }); + } +} diff --git a/crates/interfaces/Cargo.toml b/crates/interfaces/Cargo.toml index 6b0e0c506..31c9b8688 100644 --- a/crates/interfaces/Cargo.toml +++ b/crates/interfaces/Cargo.toml @@ -24,6 +24,7 @@ parity-scale-codec = { version = "3.2.1", features = ["bytes"] } [dev-dependencies] test-fuzz = "3.0.4" +tokio = { version = "1.21.2", features = ["full"] } [features] -bench = [] \ No newline at end of file +bench = [] diff --git a/crates/interfaces/src/db/container.rs b/crates/interfaces/src/db/container.rs new file mode 100644 index 000000000..3f0f57fe2 --- /dev/null +++ b/crates/interfaces/src/db/container.rs @@ -0,0 +1,101 @@ +use crate::db::{Database, DatabaseGAT, DbTx, Error}; + +/// A container for any DB transaction that will open a new inner transaction when the current +/// one is committed. +// NOTE: This container is needed since `Transaction::commit` takes `mut self`, so methods in +// the pipeline that just take a reference will not be able to commit their transaction and let +// the pipeline continue. Is there a better way to do this? +pub struct DBContainer<'this, DB: Database> { + /// A handle to the DB. + pub(crate) db: &'this DB, + tx: Option<>::TXMut>, +} + +impl<'this, DB> DBContainer<'this, DB> +where + DB: Database, +{ + /// Create a new container with the given database handle. + /// + /// A new inner transaction will be opened. + pub fn new(db: &'this DB) -> Result { + Ok(Self { db, tx: Some(db.tx_mut()?) }) + } + + /// Commit the current inner transaction and open a new one. + /// + /// # Panics + /// + /// Panics if an inner transaction does not exist. This should never be the case unless + /// [DBContainer::close] was called without following up with a call to [DBContainer::open]. + pub fn commit(&mut self) -> Result { + let success = + self.tx.take().expect("Tried committing a non-existent transaction").commit()?; + self.tx = Some(self.db.tx_mut()?); + Ok(success) + } + + /// Get the inner transaction. + /// + /// # Panics + /// + /// Panics if an inner transaction does not exist. This should never be the case unless + /// [DBContainer::close] was called without following up with a call to [DBContainer::open]. + pub fn get(&self) -> &>::TXMut { + self.tx.as_ref().expect("Tried getting a reference to a non-existent transaction") + } + + /// Get a mutable reference to the inner transaction. + /// + /// # Panics + /// + /// Panics if an inner transaction does not exist. This should never be the case unless + /// [DBContainer::close] was called without following up with a call to [DBContainer::open]. + pub fn get_mut(&mut self) -> &mut >::TXMut { + self.tx.as_mut().expect("Tried getting a mutable reference to a non-existent transaction") + } + + /// Open a new inner transaction. + pub fn open(&mut self) -> Result<(), Error> { + self.tx = Some(self.db.tx_mut()?); + Ok(()) + } + + /// Close the current inner transaction. + pub fn close(&mut self) { + self.tx.take(); + } +} + +#[cfg(test)] +// This ensures that we can use the GATs in the downstream staged exec pipeline. +mod tests { + use super::*; + use crate::db::mock::DatabaseMock; + + #[async_trait::async_trait] + trait Stage { + async fn run(&mut self, db: &mut DBContainer<'_, DB>) -> (); + } + + struct MyStage<'a, DB>(&'a DB); + + #[async_trait::async_trait] + impl<'a, DB: Database> Stage for MyStage<'a, DB> { + async fn run(&mut self, db: &mut DBContainer<'_, DB>) -> () { + let _tx = db.commit().unwrap(); + () + } + } + + #[test] + #[should_panic] // no tokio runtime configured + fn can_spawn() { + let db = DatabaseMock::default(); + tokio::spawn(async move { + let mut container = DBContainer::new(&db).unwrap(); + let mut stage = MyStage(&db); + let _ = stage.run(&mut container); + }); + } +} diff --git a/crates/interfaces/src/db/mock.rs b/crates/interfaces/src/db/mock.rs index 112311204..811f64192 100644 --- a/crates/interfaces/src/db/mock.rs +++ b/crates/interfaces/src/db/mock.rs @@ -2,7 +2,8 @@ use std::collections::BTreeMap; use super::{ - Database, DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DbTx, DbTxMut, DupSort, Table, + Database, DatabaseGAT, DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW, DbTx, DbTxMut, + DupSort, Table, }; /// Mock database used for testing with inner BTreeMap structure @@ -14,19 +15,21 @@ pub struct DatabaseMock { } impl Database for DatabaseMock { - type TX<'a> = TxMock; - - type TXMut<'a> = TxMock; - - fn tx(&self) -> Result, super::Error> { + fn tx(&self) -> Result<>::TX, super::Error> { Ok(TxMock::default()) } - fn tx_mut(&self) -> Result, super::Error> { + fn tx_mut(&self) -> Result<>::TXMut, super::Error> { Ok(TxMock::default()) } } +impl<'a> DatabaseGAT<'a> for DatabaseMock { + type TX = TxMock; + + type TXMut = TxMock; +} + /// Mock read only tx #[derive(Clone, Default)] pub struct TxMock { diff --git a/crates/interfaces/src/db/mod.rs b/crates/interfaces/src/db/mod.rs index d64cdcc35..745d052d0 100644 --- a/crates/interfaces/src/db/mod.rs +++ b/crates/interfaces/src/db/mod.rs @@ -1,4 +1,5 @@ pub mod codecs; +mod container; mod error; pub mod mock; pub mod models; @@ -8,26 +9,40 @@ pub mod tables; pub use error::Error; pub use table::*; -/// Main Database trait that spawns transactions to be executed. -pub trait Database { +pub use container::DBContainer; + +// Sealed trait helper to prevent misuse of the API. +mod sealed { + pub trait Sealed: Sized {} + pub struct Bounds(T); + impl Sealed for Bounds {} +} +use sealed::{Bounds, Sealed}; + +/// Implements the GAT method from: +/// https://sabrinajewson.org/blog/the-better-alternative-to-lifetime-gats#the-better-gats. +/// +/// Sealed trait which cannot be implemented by 3rd parties, exposed only for implementers +pub trait DatabaseGAT<'a, __ImplicitBounds: Sealed = Bounds<&'a Self>>: Send + Sync { /// RO database transaction - type TX<'a>: DbTx<'a> + Send + Sync - where - Self: 'a; + type TX: DbTx<'a> + Send + Sync; /// RW database transaction - type TXMut<'a>: DbTxMut<'a> + DbTx<'a> + Send + Sync - where - Self: 'a; + type TXMut: DbTxMut<'a> + DbTx<'a> + Send + Sync; +} + +/// Main Database trait that spawns transactions to be executed. +pub trait Database: for<'a> DatabaseGAT<'a> { /// Create read only transaction. - fn tx(&self) -> Result, Error>; + fn tx(&self) -> Result<>::TX, Error>; + /// Create read write transaction only possible if database is open with write access. - fn tx_mut(&self) -> Result, Error>; + fn tx_mut(&self) -> Result<>::TXMut, Error>; /// Takes a function and passes a read-only transaction into it, making sure it's closed in the /// end of the execution. fn view(&self, f: F) -> Result where - F: Fn(&Self::TX<'_>) -> T, + F: Fn(&>::TX) -> T, { let tx = self.tx()?; @@ -41,7 +56,7 @@ pub trait Database { /// the end of the execution. fn update(&self, f: F) -> Result where - F: Fn(&Self::TXMut<'_>) -> T, + F: Fn(&>::TXMut) -> T, { let tx = self.tx_mut()?; diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index 7dec12a2e..d193554b3 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -2,7 +2,7 @@ use crate::{ error::*, util::opt::MaybeSender, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, }; -use reth_interfaces::db::{Database, DbTx}; +use reth_interfaces::db::{DBContainer, Database, DbTx}; use reth_primitives::BlockNumber; use std::fmt::{Debug, Formatter}; use tokio::sync::mpsc::Sender; @@ -218,13 +218,14 @@ impl Pipeline { }; // Unwind stages in reverse order of priority (i.e. higher priority = first) - let tx = db.tx_mut()?; + let mut db = DBContainer::new(db)?; + for (_, QueuedStage { stage, .. }) in unwind_pipeline.iter_mut() { let stage_id = stage.id(); let span = info_span!("Unwinding", stage = %stage_id); let _enter = span.enter(); - let mut stage_progress = stage_id.get_progress(&tx)?.unwrap_or_default(); + let mut stage_progress = stage_id.get_progress(db.get())?.unwrap_or_default(); if stage_progress < to { debug!(from = %stage_progress, %to, "Unwind point too far for stage"); self.events_sender.send(PipelineEvent::Skipped { stage_id }).await?; @@ -236,11 +237,11 @@ impl Pipeline { let input = UnwindInput { stage_progress, unwind_to: to, bad_block }; self.events_sender.send(PipelineEvent::Unwinding { stage_id, input }).await?; - let output = stage.unwind(&tx, input).await; + let output = stage.unwind(&mut db, input).await; match output { Ok(unwind_output) => { stage_progress = unwind_output.stage_progress; - stage_id.save_progress(&tx, stage_progress)?; + stage_id.save_progress(db.get(), stage_progress)?; self.events_sender .send(PipelineEvent::Unwound { stage_id, result: unwind_output }) @@ -254,7 +255,7 @@ impl Pipeline { } } - tx.commit()?; + db.commit()?; Ok(()) } } @@ -288,10 +289,10 @@ impl QueuedStage { return Ok(ControlFlow::Continue) } - let mut tx = db.tx_mut()?; - loop { - let prev_progress = stage_id.get_progress(&tx)?; + let mut db = DBContainer::new(db)?; + + let prev_progress = stage_id.get_progress(db.get())?; let stage_reached_max_block = prev_progress .zip(state.max_block) @@ -312,12 +313,12 @@ impl QueuedStage { match self .stage - .execute(&tx, ExecInput { previous_stage, stage_progress: prev_progress }) + .execute(&mut db, ExecInput { previous_stage, stage_progress: prev_progress }) .await { Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => { debug!(stage = %stage_id, %stage_progress, %done, "Stage made progress"); - stage_id.save_progress(&tx, stage_progress)?; + stage_id.save_progress(db.get(), stage_progress)?; state .events_sender @@ -325,9 +326,7 @@ impl QueuedStage { .await?; // TODO: Make the commit interval configurable - tx.commit()?; - // create new mut transaction. - tx = db.tx_mut()?; + db.commit()?; state.record_progress_outliers(stage_progress); state.set_reached_tip(reached_tip); @@ -755,9 +754,9 @@ mod tests { self.id } - async fn execute<'db>( + async fn execute( &mut self, - _: &DB::TXMut<'db>, + _: &mut DBContainer<'_, DB>, _input: ExecInput, ) -> Result { self.exec_outputs @@ -765,9 +764,9 @@ mod tests { .unwrap_or_else(|| panic!("Test stage {} executed too many times.", self.id)) } - async fn unwind<'db>( + async fn unwind( &mut self, - _: &DB::TXMut<'db>, + _: &mut DBContainer<'_, DB>, _input: UnwindInput, ) -> Result> { self.unwind_outputs diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index c88342a18..d962d4e55 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -1,6 +1,6 @@ use crate::{error::StageError, id::StageId}; use async_trait::async_trait; -use reth_interfaces::db::Database; +use reth_interfaces::db::{DBContainer, Database}; use reth_primitives::BlockNumber; /// Stage execution input, see [Stage::execute]. @@ -50,6 +50,9 @@ pub struct UnwindOutput { /// ([Stage::execute]) and a way to "roll back" ([Stage::unwind]). /// /// Stages are executed as part of a pipeline where they are executed serially. +/// +/// Stages receive a [`DBContainer`] which manages the lifecycle of a transaction, such +/// as when to commit / reopen a new one etc. #[async_trait] pub trait Stage: Send + Sync { /// Get the ID of the stage. @@ -58,16 +61,16 @@ pub trait Stage: Send + Sync { fn id(&self) -> StageId; /// Execute the stage. - async fn execute<'db>( + async fn execute( &mut self, - tx: &DB::TXMut<'db>, + db: &mut DBContainer<'_, DB>, input: ExecInput, ) -> Result; /// Unwind the stage. - async fn unwind<'db>( + async fn unwind( &mut self, - tx: &DB::TXMut<'db>, + db: &mut DBContainer<'_, DB>, input: UnwindInput, ) -> Result>; }