diff --git a/bin/reth/src/test_eth_chain/runner.rs b/bin/reth/src/test_eth_chain/runner.rs index 6ae29830b..5f50f1aa4 100644 --- a/bin/reth/src/test_eth_chain/runner.rs +++ b/bin/reth/src/test_eth_chain/runner.rs @@ -11,7 +11,7 @@ use reth_primitives::{ keccak256, Account as RethAccount, BigEndianHash, BlockLocked, SealedHeader, StorageEntry, H256, }; use reth_rlp::Decodable; -use reth_stages::{stages::execution::ExecutionStage, ExecInput, Stage, StageDB}; +use reth_stages::{stages::execution::ExecutionStage, ExecInput, Stage, Transaction}; use std::{ ffi::OsStr, path::{Path, PathBuf}, @@ -145,7 +145,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> { // Call execution stage let input = ExecInput::default(); - stage.execute(&mut StageDB::new(db.as_ref())?, input).await?; + stage.execute(&mut Transaction::new(db.as_ref())?, input).await?; // Validate post state //for post in diff --git a/crates/stages/src/db.rs b/crates/stages/src/db.rs index c65318e64..c4ba2e474 100644 --- a/crates/stages/src/db.rs +++ b/crates/stages/src/db.rs @@ -21,19 +21,22 @@ use crate::{DatabaseIntegrityError, StageError}; // NOTE: This container is needed since `Transaction::commit` takes `mut self`, so methods in // the pipeline that just take a reference will not be able to commit their transaction and let // the pipeline continue. Is there a better way to do this? -pub struct StageDB<'this, DB: Database> { +// +// TODO: Re-evaluate if this is actually needed, this was introduced as a way to manage the +// lifetime of the `TXMut` and having a nice API for re-opening a new transaction after `commit` +pub struct Transaction<'this, DB: Database> { /// A handle to the DB. pub(crate) db: &'this DB, tx: Option<>::TXMut>, } -impl<'a, DB: Database> Debug for StageDB<'a, DB> { +impl<'a, DB: Database> Debug for Transaction<'a, DB> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("StageDB").finish() + f.debug_struct("Transaction").finish() } } -impl<'a, DB: Database> Deref for StageDB<'a, DB> { +impl<'a, DB: Database> Deref for Transaction<'a, DB> { type Target = >::TXMut; /// Dereference as the inner transaction. @@ -41,25 +44,25 @@ impl<'a, DB: Database> Deref for StageDB<'a, DB> { /// # Panics /// /// Panics if an inner transaction does not exist. This should never be the case unless - /// [StageDB::close] was called without following up with a call to [StageDB::open]. + /// [Transaction::close] was called without following up with a call to [Transaction::open]. fn deref(&self) -> &Self::Target { self.tx.as_ref().expect("Tried getting a reference to a non-existent transaction") } } -impl<'a, DB: Database> DerefMut for StageDB<'a, DB> { +impl<'a, DB: Database> DerefMut for Transaction<'a, DB> { /// Dereference as a mutable reference to the inner transaction. /// /// # Panics /// /// Panics if an inner transaction does not exist. This should never be the case unless - /// [StageDB::close] was called without following up with a call to [StageDB::open]. + /// [Transaction::close] was called without following up with a call to [Transaction::open]. fn deref_mut(&mut self) -> &mut Self::Target { self.tx.as_mut().expect("Tried getting a mutable reference to a non-existent transaction") } } -impl<'this, DB> StageDB<'this, DB> +impl<'this, DB> Transaction<'this, DB> where DB: Database, { @@ -80,7 +83,7 @@ where /// # Panics /// /// Panics if an inner transaction does not exist. This should never be the case unless - /// [StageDB::close] was called without following up with a call to [StageDB::open]. + /// [Transaction::close] was called without following up with a call to [Transaction::open]. pub fn commit(&mut self) -> Result { let success = self.tx.take().expect("Tried committing a non-existent transaction").commit()?; diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 549bba08b..a4353e5ff 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -27,7 +27,7 @@ mod test_utils; /// Implementations of stages. pub mod stages; -pub use db::StageDB; +pub use db::Transaction; pub use error::*; pub use id::*; pub use pipeline::*; diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index 0721cd7e5..fe520d2c1 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -1,5 +1,5 @@ use crate::{ - db::StageDB, error::*, util::opt::MaybeSender, ExecInput, ExecOutput, Stage, StageError, + db::Transaction, error::*, util::opt::MaybeSender, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, }; use reth_db::{database::Database, transaction::DbTx}; @@ -224,7 +224,7 @@ impl Pipeline { }; // Unwind stages in reverse order of priority (i.e. higher priority = first) - let mut db = StageDB::new(db)?; + let mut db = Transaction::new(db)?; for (_, QueuedStage { stage, .. }) in unwind_pipeline.iter_mut() { let stage_id = stage.id(); @@ -296,7 +296,7 @@ impl QueuedStage { } loop { - let mut db = StageDB::new(db)?; + let mut db = Transaction::new(db)?; let prev_progress = stage_id.get_progress(db.deref())?; @@ -805,7 +805,7 @@ mod tests { async fn execute( &mut self, - _: &mut StageDB<'_, DB>, + _: &mut Transaction<'_, DB>, _input: ExecInput, ) -> Result { self.exec_outputs @@ -815,7 +815,7 @@ mod tests { async fn unwind( &mut self, - _: &mut StageDB<'_, DB>, + _: &mut Transaction<'_, DB>, _input: UnwindInput, ) -> Result> { self.unwind_outputs diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index ef0ae4569..473fdccbd 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -1,4 +1,4 @@ -use crate::{db::StageDB, error::StageError, id::StageId}; +use crate::{db::Transaction, error::StageError, id::StageId}; use async_trait::async_trait; use reth_db::database::Database; use reth_primitives::BlockNumber; @@ -58,7 +58,7 @@ pub struct UnwindOutput { /// /// Stages are executed as part of a pipeline where they are executed serially. /// -/// Stages receive [`StageDB`] which manages the lifecycle of a transaction, +/// Stages receive [`Transaction`] which manages the lifecycle of a transaction, /// such as when to commit / reopen a new one etc. // ANCHOR: trait-Stage #[async_trait] @@ -71,14 +71,14 @@ pub trait Stage: Send + Sync { /// Execute the stage. async fn execute( &mut self, - db: &mut StageDB<'_, DB>, + db: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result; /// Unwind the stage. async fn unwind( &mut self, - db: &mut StageDB<'_, DB>, + db: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result>; } diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index bdf642768..ef8c5e35d 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -1,5 +1,5 @@ use crate::{ - db::StageDB, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, + db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, }; use futures_util::StreamExt; @@ -75,7 +75,7 @@ impl Stage for BodyStage, + db: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { let previous_stage_progress = input.previous_stage_progress(); @@ -165,7 +165,7 @@ impl Stage for BodyStage, + db: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result> { let mut tx_count_cursor = db.cursor_mut::()?; @@ -495,7 +495,7 @@ mod tests { use crate::{ stages::bodies::BodyStage, test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB, + ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, UnwindStageTestRunner, }, ExecInput, ExecOutput, UnwindInput, @@ -540,7 +540,7 @@ mod tests { pub(crate) struct BodyTestRunner { pub(crate) consensus: Arc, responses: HashMap>, - db: TestStageDB, + db: TestTransaction, batch_size: u64, } @@ -549,7 +549,7 @@ mod tests { Self { consensus: Arc::new(TestConsensus::default()), responses: HashMap::default(), - db: TestStageDB::default(), + db: TestTransaction::default(), batch_size: 1000, } } @@ -571,7 +571,7 @@ mod tests { impl StageTestRunner for BodyTestRunner { type S = BodyStage; - fn db(&self) -> &TestStageDB { + fn db(&self) -> &TestTransaction { &self.db } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 7252aeef4..d39405bee 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -1,5 +1,5 @@ use crate::{ - db::StageDB, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, + db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, }; use reth_db::{ @@ -80,7 +80,7 @@ impl Stage for ExecutionStage { /// Execute the stage async fn execute( &mut self, - db: &mut StageDB<'_, DB>, + db: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { let db_tx = db.deref_mut(); @@ -322,7 +322,7 @@ impl Stage for ExecutionStage { /// Unwind the stage. async fn unwind( &mut self, - db: &mut StageDB<'_, DB>, + db: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result> { let unwind_from = input.stage_progress; @@ -449,7 +449,7 @@ mod tests { // TODO cleanup the setup after https://github.com/paradigmxyz/reth/issues/332 // is merged as it has similar framework let state_db = create_test_db::(EnvKind::RW); - let mut db = StageDB::new(state_db.as_ref()).unwrap(); + let mut db = Transaction::new(state_db.as_ref()).unwrap(); let input = ExecInput { previous_stage: None, /// The progress of this stage the last time it was executed. @@ -532,7 +532,7 @@ mod tests { // is merged as it has similar framework let state_db = create_test_db::(EnvKind::RW); - let mut db = StageDB::new(state_db.as_ref()).unwrap(); + let mut db = Transaction::new(state_db.as_ref()).unwrap(); let input = ExecInput { previous_stage: None, /// The progress of this stage the last time it was executed. diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 77e6b889e..b80ed32cf 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -1,5 +1,5 @@ use crate::{ - db::StageDB, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, + db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, }; use futures_util::StreamExt; @@ -68,7 +68,7 @@ impl, + db: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { let stage_progress = input.stage_progress.unwrap_or_default(); @@ -136,7 +136,7 @@ impl, + db: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result> { // TODO: handle bad block @@ -155,7 +155,7 @@ impl { async fn update_head( &self, - db: &StageDB<'_, DB>, + db: &Transaction<'_, DB>, height: BlockNumber, ) -> Result<(), StageError> { let block_key = db.get_block_numhash(height)?; @@ -195,7 +195,7 @@ impl /// Write downloaded headers to the database async fn write_headers( &self, - db: &StageDB<'_, DB>, + db: &Transaction<'_, DB>, headers: Vec, ) -> Result, StageError> { let mut cursor_header = db.cursor_mut::()?; @@ -226,7 +226,7 @@ impl /// Iterate over inserted headers and write td entries fn write_td( &self, - db: &StageDB<'_, DB>, + db: &Transaction<'_, DB>, head: &SealedHeader, ) -> Result<(), StageError> { // Acquire cursor over total difficulty table @@ -357,7 +357,7 @@ mod tests { use crate::{ stages::headers::HeaderStage, test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB, + ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, UnwindStageTestRunner, }, ExecInput, ExecOutput, UnwindInput, @@ -379,7 +379,7 @@ mod tests { pub(crate) client: Arc, downloader: Arc, network_handle: TestStatusUpdater, - db: TestStageDB, + db: TestTransaction, } impl Default for HeadersTestRunner { @@ -391,7 +391,7 @@ mod tests { consensus: consensus.clone(), downloader: Arc::new(TestHeaderDownloader::new(client, consensus, 1000)), network_handle: TestStatusUpdater::default(), - db: TestStageDB::default(), + db: TestTransaction::default(), } } } @@ -399,7 +399,7 @@ mod tests { impl StageTestRunner for HeadersTestRunner { type S = HeaderStage, TestConsensus, TestHeadersClient, TestStatusUpdater>; - fn db(&self) -> &TestStageDB { + fn db(&self) -> &TestTransaction { &self.db } @@ -514,7 +514,7 @@ mod tests { consensus, downloader, network_handle: TestStatusUpdater::default(), - db: TestStageDB::default(), + db: TestTransaction::default(), } } } diff --git a/crates/stages/src/stages/senders.rs b/crates/stages/src/stages/senders.rs index 26b90f6a5..ead26ae72 100644 --- a/crates/stages/src/stages/senders.rs +++ b/crates/stages/src/stages/senders.rs @@ -1,5 +1,5 @@ use crate::{ - db::StageDB, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, + db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, }; use itertools::Itertools; use rayon::prelude::*; @@ -55,7 +55,7 @@ impl Stage for SendersStage { /// the [`TxSenders`][reth_interfaces::db::tables::TxSenders] table. async fn execute( &mut self, - db: &mut StageDB<'_, DB>, + db: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { let stage_progress = input.stage_progress.unwrap_or_default(); @@ -113,7 +113,7 @@ impl Stage for SendersStage { /// Unwind the stage. async fn unwind( &mut self, - db: &mut StageDB<'_, DB>, + db: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result> { // Lookup latest tx id that we should unwind to @@ -131,8 +131,8 @@ mod tests { use super::*; use crate::test_utils::{ - stage_test_suite, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB, - UnwindStageTestRunner, PREV_STAGE_ID, + stage_test_suite, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, + TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID, }; stage_test_suite!(SendersTestRunner); @@ -176,13 +176,13 @@ mod tests { } struct SendersTestRunner { - db: TestStageDB, + db: TestTransaction, threshold: u64, } impl Default for SendersTestRunner { fn default() -> Self { - Self { threshold: 1000, db: TestStageDB::default() } + Self { threshold: 1000, db: TestTransaction::default() } } } @@ -195,7 +195,7 @@ mod tests { impl StageTestRunner for SendersTestRunner { type S = SendersStage; - fn db(&self) -> &TestStageDB { + fn db(&self) -> &TestTransaction { &self.db } diff --git a/crates/stages/src/test_utils/mod.rs b/crates/stages/src/test_utils/mod.rs index 3306eca80..4b82df6cd 100644 --- a/crates/stages/src/test_utils/mod.rs +++ b/crates/stages/src/test_utils/mod.rs @@ -8,7 +8,7 @@ pub(crate) use macros::*; pub(crate) use runner::{ ExecuteStageTestRunner, StageTestRunner, TestRunnerError, UnwindStageTestRunner, }; -pub(crate) use test_db::TestStageDB; +pub(crate) use test_db::TestTransaction; /// The previous test stage id mock used for testing pub(crate) const PREV_STAGE_ID: StageId = StageId("PrevStage"); diff --git a/crates/stages/src/test_utils/runner.rs b/crates/stages/src/test_utils/runner.rs index 01723b6d4..f5a75b8ef 100644 --- a/crates/stages/src/test_utils/runner.rs +++ b/crates/stages/src/test_utils/runner.rs @@ -2,8 +2,8 @@ use reth_db::mdbx::{Env, WriteMap}; use std::borrow::Borrow; use tokio::sync::oneshot; -use super::TestStageDB; -use crate::{db::StageDB, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; +use super::TestTransaction; +use crate::{db::Transaction, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; #[derive(thiserror::Error, Debug)] pub(crate) enum TestRunnerError { @@ -19,7 +19,7 @@ pub(crate) trait StageTestRunner { type S: Stage> + 'static; /// Return a reference to the database. - fn db(&self) -> &TestStageDB; + fn db(&self) -> &TestTransaction; /// Return an instance of a Stage. fn stage(&self) -> Self::S; @@ -44,7 +44,7 @@ pub(crate) trait ExecuteStageTestRunner: StageTestRunner { let (tx, rx) = oneshot::channel(); let (db, mut stage) = (self.db().inner_raw(), self.stage()); tokio::spawn(async move { - let mut db = StageDB::new(db.borrow()).expect("failed to create db container"); + let mut db = Transaction::new(db.borrow()).expect("failed to create db container"); let result = stage.execute(&mut db, input).await; db.commit().expect("failed to commit"); tx.send(result).expect("failed to send message") @@ -71,7 +71,7 @@ pub(crate) trait UnwindStageTestRunner: StageTestRunner { let (tx, rx) = oneshot::channel(); let (db, mut stage) = (self.db().inner_raw(), self.stage()); tokio::spawn(async move { - let mut db = StageDB::new(db.borrow()).expect("failed to create db container"); + let mut db = Transaction::new(db.borrow()).expect("failed to create db container"); let result = stage.unwind(&mut db, input).await; db.commit().expect("failed to commit"); tx.send(result).expect("failed to send result"); diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 7b1aae0a5..05e29431a 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -10,7 +10,7 @@ use reth_db::{ use reth_primitives::{BlockNumber, SealedHeader, U256}; use std::{borrow::Borrow, sync::Arc}; -use crate::db::StageDB; +use crate::db::Transaction; /// The [StageTestDB] is used as an internal /// database for testing stage implementation. @@ -19,21 +19,21 @@ use crate::db::StageDB; /// let db = StageTestDB::default(); /// stage.execute(&mut db.container(), input); /// ``` -pub(crate) struct TestStageDB { +pub(crate) struct TestTransaction { db: Arc>, } -impl Default for TestStageDB { +impl Default for TestTransaction { /// Create a new instance of [StageTestDB] fn default() -> Self { Self { db: create_test_db::(EnvKind::RW) } } } -impl TestStageDB { - /// Return a database wrapped in [StageDB]. - pub(crate) fn inner(&self) -> StageDB<'_, Env> { - StageDB::new(self.db.borrow()).expect("failed to create db container") +impl TestTransaction { + /// Return a database wrapped in [Transaction]. + pub(crate) fn inner(&self) -> Transaction<'_, Env> { + Transaction::new(self.db.borrow()).expect("failed to create db container") } /// Get a pointer to an internal database. diff --git a/docs/design/database.md b/docs/design/database.md index 0735330e4..5a5b092cc 100644 --- a/docs/design/database.md +++ b/docs/design/database.md @@ -3,7 +3,7 @@ ## Abstractions * We created a [Database trait abstraction](https://github.com/paradigmxyz/reth/blob/0d9b9a392d4196793736522f3fc2ac804991b45d/crates/interfaces/src/db/mod.rs) using Rust Stable GATs which frees us from being bound to a single database implementation. We currently use MDBX, but are exploring [redb](https://github.com/cberner/redb) as an alternative. -* We then iterated on [`StageDB`](https://github.com/paradigmxyz/reth/blob/0d9b9a392d4196793736522f3fc2ac804991b45d/crates/stages/src/db.rs#L14-L19) as a non-leaky abstraction with helpers for strictly-typed and unit-tested higher-level database abstractions. +* We then iterated on [`Transaction`](https://github.com/paradigmxyz/reth/blob/0d9b9a392d4196793736522f3fc2ac804991b45d/crates/stages/src/db.rs#L14-L19) as a non-leaky abstraction with helpers for strictly-typed and unit-tested higher-level database abstractions. ## Codecs