chore: rename StageDB to Transaction (#470)

* chore: rename StageDB to Transaction

* chore: cargo fmt

* chore: add note about potentially removing this in the future
This commit is contained in:
Georgios Konstantopoulos
2022-12-15 18:14:14 +02:00
committed by GitHub
parent c10bdb5830
commit 9cd5824f35
13 changed files with 69 additions and 66 deletions

View File

@ -11,7 +11,7 @@ use reth_primitives::{
keccak256, Account as RethAccount, BigEndianHash, BlockLocked, SealedHeader, StorageEntry, H256,
};
use reth_rlp::Decodable;
use reth_stages::{stages::execution::ExecutionStage, ExecInput, Stage, StageDB};
use reth_stages::{stages::execution::ExecutionStage, ExecInput, Stage, Transaction};
use std::{
ffi::OsStr,
path::{Path, PathBuf},
@ -145,7 +145,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result<()> {
// Call execution stage
let input = ExecInput::default();
stage.execute(&mut StageDB::new(db.as_ref())?, input).await?;
stage.execute(&mut Transaction::new(db.as_ref())?, input).await?;
// Validate post state
//for post in

View File

@ -21,19 +21,22 @@ use crate::{DatabaseIntegrityError, StageError};
// NOTE: This container is needed since `Transaction::commit` takes `mut self`, so methods in
// the pipeline that just take a reference will not be able to commit their transaction and let
// the pipeline continue. Is there a better way to do this?
pub struct StageDB<'this, DB: Database> {
//
// TODO: Re-evaluate if this is actually needed, this was introduced as a way to manage the
// lifetime of the `TXMut` and having a nice API for re-opening a new transaction after `commit`
pub struct Transaction<'this, DB: Database> {
/// A handle to the DB.
pub(crate) db: &'this DB,
tx: Option<<DB as DatabaseGAT<'this>>::TXMut>,
}
impl<'a, DB: Database> Debug for StageDB<'a, DB> {
impl<'a, DB: Database> Debug for Transaction<'a, DB> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StageDB").finish()
f.debug_struct("Transaction").finish()
}
}
impl<'a, DB: Database> Deref for StageDB<'a, DB> {
impl<'a, DB: Database> Deref for Transaction<'a, DB> {
type Target = <DB as DatabaseGAT<'a>>::TXMut;
/// Dereference as the inner transaction.
@ -41,25 +44,25 @@ impl<'a, DB: Database> Deref for StageDB<'a, DB> {
/// # Panics
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [StageDB::close] was called without following up with a call to [StageDB::open].
/// [Transaction::close] was called without following up with a call to [Transaction::open].
fn deref(&self) -> &Self::Target {
self.tx.as_ref().expect("Tried getting a reference to a non-existent transaction")
}
}
impl<'a, DB: Database> DerefMut for StageDB<'a, DB> {
impl<'a, DB: Database> DerefMut for Transaction<'a, DB> {
/// Dereference as a mutable reference to the inner transaction.
///
/// # Panics
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [StageDB::close] was called without following up with a call to [StageDB::open].
/// [Transaction::close] was called without following up with a call to [Transaction::open].
fn deref_mut(&mut self) -> &mut Self::Target {
self.tx.as_mut().expect("Tried getting a mutable reference to a non-existent transaction")
}
}
impl<'this, DB> StageDB<'this, DB>
impl<'this, DB> Transaction<'this, DB>
where
DB: Database,
{
@ -80,7 +83,7 @@ where
/// # Panics
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [StageDB::close] was called without following up with a call to [StageDB::open].
/// [Transaction::close] was called without following up with a call to [Transaction::open].
pub fn commit(&mut self) -> Result<bool, Error> {
let success =
self.tx.take().expect("Tried committing a non-existent transaction").commit()?;

View File

@ -27,7 +27,7 @@ mod test_utils;
/// Implementations of stages.
pub mod stages;
pub use db::StageDB;
pub use db::Transaction;
pub use error::*;
pub use id::*;
pub use pipeline::*;

View File

@ -1,5 +1,5 @@
use crate::{
db::StageDB, error::*, util::opt::MaybeSender, ExecInput, ExecOutput, Stage, StageError,
db::Transaction, error::*, util::opt::MaybeSender, ExecInput, ExecOutput, Stage, StageError,
StageId, UnwindInput,
};
use reth_db::{database::Database, transaction::DbTx};
@ -224,7 +224,7 @@ impl<DB: Database> Pipeline<DB> {
};
// Unwind stages in reverse order of priority (i.e. higher priority = first)
let mut db = StageDB::new(db)?;
let mut db = Transaction::new(db)?;
for (_, QueuedStage { stage, .. }) in unwind_pipeline.iter_mut() {
let stage_id = stage.id();
@ -296,7 +296,7 @@ impl<DB: Database> QueuedStage<DB> {
}
loop {
let mut db = StageDB::new(db)?;
let mut db = Transaction::new(db)?;
let prev_progress = stage_id.get_progress(db.deref())?;
@ -805,7 +805,7 @@ mod tests {
async fn execute(
&mut self,
_: &mut StageDB<'_, DB>,
_: &mut Transaction<'_, DB>,
_input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.exec_outputs
@ -815,7 +815,7 @@ mod tests {
async fn unwind(
&mut self,
_: &mut StageDB<'_, DB>,
_: &mut Transaction<'_, DB>,
_input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
self.unwind_outputs

View File

@ -1,4 +1,4 @@
use crate::{db::StageDB, error::StageError, id::StageId};
use crate::{db::Transaction, error::StageError, id::StageId};
use async_trait::async_trait;
use reth_db::database::Database;
use reth_primitives::BlockNumber;
@ -58,7 +58,7 @@ pub struct UnwindOutput {
///
/// Stages are executed as part of a pipeline where they are executed serially.
///
/// Stages receive [`StageDB`] which manages the lifecycle of a transaction,
/// Stages receive [`Transaction`] which manages the lifecycle of a transaction,
/// such as when to commit / reopen a new one etc.
// ANCHOR: trait-Stage
#[async_trait]
@ -71,14 +71,14 @@ pub trait Stage<DB: Database>: Send + Sync {
/// Execute the stage.
async fn execute(
&mut self,
db: &mut StageDB<'_, DB>,
db: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError>;
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut StageDB<'_, DB>,
db: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>;
}

View File

@ -1,5 +1,5 @@
use crate::{
db::StageDB, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId,
db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
};
use futures_util::StreamExt;
@ -75,7 +75,7 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
/// header, limited by the stage's batch size.
async fn execute(
&mut self,
db: &mut StageDB<'_, DB>,
db: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let previous_stage_progress = input.previous_stage_progress();
@ -165,7 +165,7 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut StageDB<'_, DB>,
db: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
let mut tx_count_cursor = db.cursor_mut::<tables::CumulativeTxCount>()?;
@ -495,7 +495,7 @@ mod tests {
use crate::{
stages::bodies::BodyStage,
test_utils::{
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB,
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
UnwindStageTestRunner,
},
ExecInput, ExecOutput, UnwindInput,
@ -540,7 +540,7 @@ mod tests {
pub(crate) struct BodyTestRunner {
pub(crate) consensus: Arc<TestConsensus>,
responses: HashMap<H256, DownloadResult<BlockBody>>,
db: TestStageDB,
db: TestTransaction,
batch_size: u64,
}
@ -549,7 +549,7 @@ mod tests {
Self {
consensus: Arc::new(TestConsensus::default()),
responses: HashMap::default(),
db: TestStageDB::default(),
db: TestTransaction::default(),
batch_size: 1000,
}
}
@ -571,7 +571,7 @@ mod tests {
impl StageTestRunner for BodyTestRunner {
type S = BodyStage<TestBodyDownloader, TestConsensus>;
fn db(&self) -> &TestStageDB {
fn db(&self) -> &TestTransaction {
&self.db
}

View File

@ -1,5 +1,5 @@
use crate::{
db::StageDB, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId,
db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
};
use reth_db::{
@ -80,7 +80,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
/// Execute the stage
async fn execute(
&mut self,
db: &mut StageDB<'_, DB>,
db: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let db_tx = db.deref_mut();
@ -322,7 +322,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut StageDB<'_, DB>,
db: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
let unwind_from = input.stage_progress;
@ -449,7 +449,7 @@ mod tests {
// TODO cleanup the setup after https://github.com/paradigmxyz/reth/issues/332
// is merged as it has similar framework
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let mut db = StageDB::new(state_db.as_ref()).unwrap();
let mut db = Transaction::new(state_db.as_ref()).unwrap();
let input = ExecInput {
previous_stage: None,
/// The progress of this stage the last time it was executed.
@ -532,7 +532,7 @@ mod tests {
// is merged as it has similar framework
let state_db = create_test_db::<WriteMap>(EnvKind::RW);
let mut db = StageDB::new(state_db.as_ref()).unwrap();
let mut db = Transaction::new(state_db.as_ref()).unwrap();
let input = ExecInput {
previous_stage: None,
/// The progress of this stage the last time it was executed.

View File

@ -1,5 +1,5 @@
use crate::{
db::StageDB, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId,
db::Transaction, DatabaseIntegrityError, ExecInput, ExecOutput, Stage, StageError, StageId,
UnwindInput, UnwindOutput,
};
use futures_util::StreamExt;
@ -68,7 +68,7 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
/// starting from the tip
async fn execute(
&mut self,
db: &mut StageDB<'_, DB>,
db: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let stage_progress = input.stage_progress.unwrap_or_default();
@ -136,7 +136,7 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut StageDB<'_, DB>,
db: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
// TODO: handle bad block
@ -155,7 +155,7 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
{
async fn update_head<DB: Database>(
&self,
db: &StageDB<'_, DB>,
db: &Transaction<'_, DB>,
height: BlockNumber,
) -> Result<(), StageError> {
let block_key = db.get_block_numhash(height)?;
@ -195,7 +195,7 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
/// Write downloaded headers to the database
async fn write_headers<DB: Database>(
&self,
db: &StageDB<'_, DB>,
db: &Transaction<'_, DB>,
headers: Vec<SealedHeader>,
) -> Result<Option<BlockNumber>, StageError> {
let mut cursor_header = db.cursor_mut::<tables::Headers>()?;
@ -226,7 +226,7 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
/// Iterate over inserted headers and write td entries
fn write_td<DB: Database>(
&self,
db: &StageDB<'_, DB>,
db: &Transaction<'_, DB>,
head: &SealedHeader,
) -> Result<(), StageError> {
// Acquire cursor over total difficulty table
@ -357,7 +357,7 @@ mod tests {
use crate::{
stages::headers::HeaderStage,
test_utils::{
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB,
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction,
UnwindStageTestRunner,
},
ExecInput, ExecOutput, UnwindInput,
@ -379,7 +379,7 @@ mod tests {
pub(crate) client: Arc<TestHeadersClient>,
downloader: Arc<D>,
network_handle: TestStatusUpdater,
db: TestStageDB,
db: TestTransaction,
}
impl Default for HeadersTestRunner<TestHeaderDownloader> {
@ -391,7 +391,7 @@ mod tests {
consensus: consensus.clone(),
downloader: Arc::new(TestHeaderDownloader::new(client, consensus, 1000)),
network_handle: TestStatusUpdater::default(),
db: TestStageDB::default(),
db: TestTransaction::default(),
}
}
}
@ -399,7 +399,7 @@ mod tests {
impl<D: HeaderDownloader + 'static> StageTestRunner for HeadersTestRunner<D> {
type S = HeaderStage<Arc<D>, TestConsensus, TestHeadersClient, TestStatusUpdater>;
fn db(&self) -> &TestStageDB {
fn db(&self) -> &TestTransaction {
&self.db
}
@ -514,7 +514,7 @@ mod tests {
consensus,
downloader,
network_handle: TestStatusUpdater::default(),
db: TestStageDB::default(),
db: TestTransaction::default(),
}
}
}

View File

@ -1,5 +1,5 @@
use crate::{
db::StageDB, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
db::Transaction, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use itertools::Itertools;
use rayon::prelude::*;
@ -55,7 +55,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
/// the [`TxSenders`][reth_interfaces::db::tables::TxSenders] table.
async fn execute(
&mut self,
db: &mut StageDB<'_, DB>,
db: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let stage_progress = input.stage_progress.unwrap_or_default();
@ -113,7 +113,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
/// Unwind the stage.
async fn unwind(
&mut self,
db: &mut StageDB<'_, DB>,
db: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>> {
// Lookup latest tx id that we should unwind to
@ -131,8 +131,8 @@ mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB,
UnwindStageTestRunner, PREV_STAGE_ID,
stage_test_suite, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID,
};
stage_test_suite!(SendersTestRunner);
@ -176,13 +176,13 @@ mod tests {
}
struct SendersTestRunner {
db: TestStageDB,
db: TestTransaction,
threshold: u64,
}
impl Default for SendersTestRunner {
fn default() -> Self {
Self { threshold: 1000, db: TestStageDB::default() }
Self { threshold: 1000, db: TestTransaction::default() }
}
}
@ -195,7 +195,7 @@ mod tests {
impl StageTestRunner for SendersTestRunner {
type S = SendersStage;
fn db(&self) -> &TestStageDB {
fn db(&self) -> &TestTransaction {
&self.db
}

View File

@ -8,7 +8,7 @@ pub(crate) use macros::*;
pub(crate) use runner::{
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, UnwindStageTestRunner,
};
pub(crate) use test_db::TestStageDB;
pub(crate) use test_db::TestTransaction;
/// The previous test stage id mock used for testing
pub(crate) const PREV_STAGE_ID: StageId = StageId("PrevStage");

View File

@ -2,8 +2,8 @@ use reth_db::mdbx::{Env, WriteMap};
use std::borrow::Borrow;
use tokio::sync::oneshot;
use super::TestStageDB;
use crate::{db::StageDB, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use super::TestTransaction;
use crate::{db::Transaction, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
#[derive(thiserror::Error, Debug)]
pub(crate) enum TestRunnerError {
@ -19,7 +19,7 @@ pub(crate) trait StageTestRunner {
type S: Stage<Env<WriteMap>> + 'static;
/// Return a reference to the database.
fn db(&self) -> &TestStageDB;
fn db(&self) -> &TestTransaction;
/// Return an instance of a Stage.
fn stage(&self) -> Self::S;
@ -44,7 +44,7 @@ pub(crate) trait ExecuteStageTestRunner: StageTestRunner {
let (tx, rx) = oneshot::channel();
let (db, mut stage) = (self.db().inner_raw(), self.stage());
tokio::spawn(async move {
let mut db = StageDB::new(db.borrow()).expect("failed to create db container");
let mut db = Transaction::new(db.borrow()).expect("failed to create db container");
let result = stage.execute(&mut db, input).await;
db.commit().expect("failed to commit");
tx.send(result).expect("failed to send message")
@ -71,7 +71,7 @@ pub(crate) trait UnwindStageTestRunner: StageTestRunner {
let (tx, rx) = oneshot::channel();
let (db, mut stage) = (self.db().inner_raw(), self.stage());
tokio::spawn(async move {
let mut db = StageDB::new(db.borrow()).expect("failed to create db container");
let mut db = Transaction::new(db.borrow()).expect("failed to create db container");
let result = stage.unwind(&mut db, input).await;
db.commit().expect("failed to commit");
tx.send(result).expect("failed to send result");

View File

@ -10,7 +10,7 @@ use reth_db::{
use reth_primitives::{BlockNumber, SealedHeader, U256};
use std::{borrow::Borrow, sync::Arc};
use crate::db::StageDB;
use crate::db::Transaction;
/// The [StageTestDB] is used as an internal
/// database for testing stage implementation.
@ -19,21 +19,21 @@ use crate::db::StageDB;
/// let db = StageTestDB::default();
/// stage.execute(&mut db.container(), input);
/// ```
pub(crate) struct TestStageDB {
pub(crate) struct TestTransaction {
db: Arc<Env<WriteMap>>,
}
impl Default for TestStageDB {
impl Default for TestTransaction {
/// Create a new instance of [StageTestDB]
fn default() -> Self {
Self { db: create_test_db::<WriteMap>(EnvKind::RW) }
}
}
impl TestStageDB {
/// Return a database wrapped in [StageDB].
pub(crate) fn inner(&self) -> StageDB<'_, Env<WriteMap>> {
StageDB::new(self.db.borrow()).expect("failed to create db container")
impl TestTransaction {
/// Return a database wrapped in [Transaction].
pub(crate) fn inner(&self) -> Transaction<'_, Env<WriteMap>> {
Transaction::new(self.db.borrow()).expect("failed to create db container")
}
/// Get a pointer to an internal database.

View File

@ -3,7 +3,7 @@
## Abstractions
* We created a [Database trait abstraction](https://github.com/paradigmxyz/reth/blob/0d9b9a392d4196793736522f3fc2ac804991b45d/crates/interfaces/src/db/mod.rs) using Rust Stable GATs which frees us from being bound to a single database implementation. We currently use MDBX, but are exploring [redb](https://github.com/cberner/redb) as an alternative.
* We then iterated on [`StageDB`](https://github.com/paradigmxyz/reth/blob/0d9b9a392d4196793736522f3fc2ac804991b45d/crates/stages/src/db.rs#L14-L19) as a non-leaky abstraction with helpers for strictly-typed and unit-tested higher-level database abstractions.
* We then iterated on [`Transaction`](https://github.com/paradigmxyz/reth/blob/0d9b9a392d4196793736522f3fc2ac804991b45d/crates/stages/src/db.rs#L14-L19) as a non-leaky abstraction with helpers for strictly-typed and unit-tested higher-level database abstractions.
## Codecs