From 5ebfc01338839b23d2b2fcd02b0d2c4290e871ae Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Tue, 11 Oct 2022 20:54:05 +0300 Subject: [PATCH] chore: migrate stages to new db abstractions (#43) * migrate * set block as u64 * linter * BNum -> BlockNumber * stage id alias --- crates/db/src/kv/tables.rs | 8 ++++-- crates/stages/src/error.rs | 6 ++-- crates/stages/src/id.rs | 26 +++++++----------- crates/stages/src/pipeline.rs | 52 ++++++++++++----------------------- crates/stages/src/stage.rs | 6 ++-- crates/stages/src/util.rs | 25 +++++++++-------- 6 files changed, 54 insertions(+), 69 deletions(-) diff --git a/crates/db/src/kv/tables.rs b/crates/db/src/kv/tables.rs index 00fe64774..e1652c45d 100644 --- a/crates/db/src/kv/tables.rs +++ b/crates/db/src/kv/tables.rs @@ -1,10 +1,10 @@ //! Declaration of all MDBX tables. use crate::utils::TableType; -use reth_primitives::{Address, U256}; +use reth_primitives::{Address, BlockNumber, U256}; /// Default tables that should be present inside database. -pub const TABLES: [(TableType, &str); 17] = [ +pub const TABLES: [(TableType, &str); 18] = [ (TableType::Table, CanonicalHeaders::const_name()), (TableType::Table, HeaderTD::const_name()), (TableType::Table, HeaderNumbers::const_name()), @@ -22,6 +22,7 @@ pub const TABLES: [(TableType, &str); 17] = [ (TableType::DupSort, StorageChangeSet::const_name()), (TableType::Table, TxSenders::const_name()), (TableType::Table, Config::const_name()), + (TableType::Table, SyncStage::const_name()), ]; #[macro_export] @@ -86,6 +87,8 @@ table!(StorageChangeSet => TxId => StorageKeyBeforeTx); table!(TxSenders => TxId => Address); // Is it necessary? table!(Config => ConfigKey => ConfigValue); +table!(SyncStage => StageId => BlockNumber); + // // TODO: Temporary types, until they're properly defined alongside with the Encode and Decode Trait // @@ -110,3 +113,4 @@ type TxIdList = Vec; type Address_StorageKey = Vec; type AccountBeforeTx = Vec; type StorageKeyBeforeTx = Vec; +type StageId = Vec; diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index bd323fb93..2f501a358 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -1,5 +1,5 @@ use crate::pipeline::PipelineEvent; -use reth_db::mdbx; +use reth_db::kv::KVError; use reth_primitives::BlockNumber; use thiserror::Error; use tokio::sync::mpsc::error::SendError; @@ -17,7 +17,7 @@ pub enum StageError { }, /// The stage encountered a database error. #[error("A database error occurred.")] - Database(#[from] mdbx::Error), + Database(#[from] KVError), /// The stage encountered an internal error. #[error(transparent)] Internal(Box), @@ -31,7 +31,7 @@ pub enum PipelineError { Stage(#[from] StageError), /// The pipeline encountered a database error. #[error("A database error occurred.")] - Database(#[from] mdbx::Error), + Database(#[from] KVError), /// The pipeline encountered an error while trying to send an event. #[error("The pipeline encountered an error while trying to send an event.")] Channel(#[from] SendError), diff --git a/crates/stages/src/id.rs b/crates/stages/src/id.rs index cbc981068..40f962960 100644 --- a/crates/stages/src/id.rs +++ b/crates/stages/src/id.rs @@ -1,4 +1,7 @@ -use reth_db::mdbx; +use reth_db::{ + kv::{tables::SyncStage, tx::Tx, KVError}, + mdbx, +}; use reth_primitives::BlockNumber; use std::fmt::Display; @@ -18,33 +21,24 @@ impl StageId { /// Get the last committed progress of this stage. pub fn get_progress<'db, K, E>( &self, - tx: &mdbx::Transaction<'db, K, E>, - ) -> Result, mdbx::Error> + tx: &Tx<'db, K, E>, + ) -> Result, KVError> where K: mdbx::TransactionKind, E: mdbx::EnvironmentKind, { - // TODO: Clean up when we get better database abstractions - let bytes: Option> = tx.get(&tx.open_db(Some("SyncStage"))?, self.0.as_ref())?; - - Ok(bytes.map(|b| BlockNumber::from_be_bytes(b.try_into().expect("Database corrupt")))) + tx.get::(self.0.as_bytes().to_vec()) } /// Save the progress of this stage. pub fn save_progress<'db, E>( &self, - tx: &mdbx::Transaction<'db, mdbx::RW, E>, + tx: &Tx<'db, mdbx::RW, E>, block: BlockNumber, - ) -> Result<(), mdbx::Error> + ) -> Result<(), KVError> where E: mdbx::EnvironmentKind, { - // TODO: Clean up when we get better database abstractions - tx.put( - &tx.open_db(Some("SyncStage"))?, - self.0, - block.to_be_bytes(), - mdbx::WriteFlags::UPSERT, - ) + tx.put::(self.0.as_bytes().to_vec(), block) } } diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index d3e84e695..6345ca84d 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -3,7 +3,7 @@ use crate::{ util::{db::TxContainer, opt::MaybeSender}, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, }; -use reth_db::mdbx; +use reth_db::{kv::Env, mdbx}; use reth_primitives::BlockNumber; use std::fmt::{Debug, Formatter}; use tokio::sync::mpsc::Sender; @@ -118,7 +118,7 @@ where /// Run the pipeline in an infinite loop. Will terminate early if the user has specified /// a `max_block` in the pipeline. - pub async fn run(&mut self, db: &'db mdbx::Environment) -> Result<(), PipelineError> { + pub async fn run(&mut self, db: &'db Env) -> Result<(), PipelineError> { let mut state = PipelineState { events_sender: self.events_sender.clone(), max_block: self.max_block, @@ -189,7 +189,7 @@ where /// If the unwind is due to a bad block the number of that block should be specified. pub async fn unwind( &mut self, - db: &'db mdbx::Environment, + db: &'db Env, to: BlockNumber, bad_block: Option, ) -> Result<(), PipelineError> { @@ -202,7 +202,7 @@ where }; // Unwind stages in reverse order of priority (i.e. higher priority = first) - let mut tx = db.begin_rw_txn()?; + let mut tx = db.begin_mut_tx()?; for (_, QueuedStage { stage, .. }) in unwind_pipeline.iter_mut() { let stage_id = stage.id(); let span = info_span!("Unwinding", stage = %stage_id); @@ -355,8 +355,11 @@ where mod tests { use super::*; use crate::{StageId, UnwindOutput}; - use reth_db::mdbx; - use tempfile::tempdir; + use reth_db::{ + kv::{tx::Tx, EnvKind}, + mdbx, + }; + use tempfile::TempDir; use tokio::sync::mpsc::channel; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use utils::TestStage; @@ -652,34 +655,15 @@ mod tests { mod utils { use super::*; use async_trait::async_trait; + use reth_db::kv::KVError; use std::{collections::VecDeque, error::Error}; - // TODO: This is... not great. - pub(crate) fn test_db() -> Result, mdbx::Error> { - const DB_TABLES: usize = 10; - - // Build environment - let mut builder = mdbx::Environment::::new(); - builder.set_max_dbs(DB_TABLES); - builder.set_geometry(mdbx::Geometry { - size: Some(0..usize::MAX), - growth_step: None, - shrink_threshold: None, - page_size: None, - }); - builder.set_rp_augment_limit(16 * 256 * 1024); - - // Open - let tempdir = tempdir().unwrap(); - let path = tempdir.path(); - std::fs::DirBuilder::new().recursive(true).create(path).unwrap(); - let db = builder.open(path)?; - - // Create tables - let tx = db.begin_rw_txn()?; - tx.create_db(Some("SyncStage"), mdbx::DatabaseFlags::default())?; - tx.commit()?; - + pub(crate) fn test_db() -> Result, KVError> { + let path = + TempDir::new().expect("Not able to create a temporary directory.").into_path(); + let db = Env::::open(&path, EnvKind::RW) + .expect("Not able to open existing mdbx file."); + db.create_tables()?; Ok(db) } @@ -719,7 +703,7 @@ mod tests { async fn execute<'tx>( &mut self, - _: &mut mdbx::Transaction<'tx, mdbx::RW, E>, + _: &mut Tx<'tx, mdbx::RW, E>, _: ExecInput, ) -> Result where @@ -732,7 +716,7 @@ mod tests { async fn unwind<'tx>( &mut self, - _: &mut mdbx::Transaction<'tx, mdbx::RW, E>, + _: &mut Tx<'tx, mdbx::RW, E>, _: UnwindInput, ) -> Result> where diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 4d2b75962..03cd6c965 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -1,6 +1,6 @@ use crate::{error::StageError, id::StageId}; use async_trait::async_trait; -use reth_db::mdbx; +use reth_db::{kv::tx::Tx, mdbx}; use reth_primitives::BlockNumber; /// Stage execution input, see [Stage::execute]. @@ -63,7 +63,7 @@ where /// Execute the stage. async fn execute<'tx>( &mut self, - tx: &mut mdbx::Transaction<'tx, mdbx::RW, E>, + tx: &mut Tx<'tx, mdbx::RW, E>, input: ExecInput, ) -> Result where @@ -72,7 +72,7 @@ where /// Unwind the stage. async fn unwind<'tx>( &mut self, - tx: &mut mdbx::Transaction<'tx, mdbx::RW, E>, + tx: &mut Tx<'tx, mdbx::RW, E>, input: UnwindInput, ) -> Result> where diff --git a/crates/stages/src/util.rs b/crates/stages/src/util.rs index e49e39f3a..c80b53f4f 100644 --- a/crates/stages/src/util.rs +++ b/crates/stages/src/util.rs @@ -63,7 +63,10 @@ pub(crate) mod opt { } pub(crate) mod db { - use reth_db::mdbx; + use reth_db::{ + kv::{tx::Tx, Env, KVError}, + mdbx, + }; /// A container for a MDBX transaction that will open a new inner transaction when the current /// one is committed. @@ -76,8 +79,8 @@ pub(crate) mod db { E: mdbx::EnvironmentKind, { /// A handle to the MDBX database. - pub(crate) db: &'db mdbx::Environment, - tx: Option>, + pub(crate) db: &'db Env, + tx: Option>, } impl<'db, 'tx, E> TxContainer<'db, 'tx, E> @@ -88,8 +91,8 @@ pub(crate) mod db { /// Create a new container with the given database handle. /// /// A new inner transaction will be opened. - pub(crate) fn new(db: &'db mdbx::Environment) -> Result { - Ok(Self { db, tx: Some(db.begin_rw_txn()?) }) + pub(crate) fn new(db: &'db Env) -> Result { + Ok(Self { db, tx: Some(Tx::new(db.begin_rw_txn()?)) }) } /// Commit the current inner transaction and open a new one. @@ -98,10 +101,10 @@ pub(crate) mod db { /// /// Panics if an inner transaction does not exist. This should never be the case unless /// [TxContainer::close] was called without following up with a call to [TxContainer::open]. - pub(crate) fn commit(&mut self) -> Result { + pub(crate) fn commit(&mut self) -> Result { let success = self.tx.take().expect("Tried committing a non-existent transaction").commit()?; - self.tx = Some(self.db.begin_rw_txn()?); + self.tx = Some(Tx::new(self.db.begin_rw_txn()?)); Ok(success) } @@ -111,7 +114,7 @@ pub(crate) mod db { /// /// Panics if an inner transaction does not exist. This should never be the case unless /// [TxContainer::close] was called without following up with a call to [TxContainer::open]. - pub(crate) fn get(&self) -> &mdbx::Transaction<'tx, mdbx::RW, E> { + pub(crate) fn get(&self) -> &Tx<'tx, mdbx::RW, E> { self.tx.as_ref().expect("Tried getting a reference to a non-existent transaction") } @@ -121,15 +124,15 @@ pub(crate) mod db { /// /// Panics if an inner transaction does not exist. This should never be the case unless /// [TxContainer::close] was called without following up with a call to [TxContainer::open]. - pub(crate) fn get_mut(&mut self) -> &mut mdbx::Transaction<'tx, mdbx::RW, E> { + pub(crate) fn get_mut(&mut self) -> &mut Tx<'tx, mdbx::RW, E> { self.tx .as_mut() .expect("Tried getting a mutable reference to a non-existent transaction") } /// Open a new inner transaction. - pub(crate) fn open(&mut self) -> Result<(), mdbx::Error> { - self.tx = Some(self.db.begin_rw_txn()?); + pub(crate) fn open(&mut self) -> Result<(), KVError> { + self.tx = Some(Tx::new(self.db.begin_rw_txn()?)); Ok(()) }