chore: migrate stages to new db abstractions (#43)

* migrate

* set block as u64

* linter

* BNum -> BlockNumber

* stage id alias
This commit is contained in:
Roman Krasiuk
2022-10-11 20:54:05 +03:00
committed by GitHub
parent 098bfe84b9
commit 5ebfc01338
6 changed files with 54 additions and 69 deletions

View File

@ -1,10 +1,10 @@
//! Declaration of all MDBX tables.
use crate::utils::TableType;
use reth_primitives::{Address, U256};
use reth_primitives::{Address, BlockNumber, U256};
/// Default tables that should be present inside database.
pub const TABLES: [(TableType, &str); 17] = [
pub const TABLES: [(TableType, &str); 18] = [
(TableType::Table, CanonicalHeaders::const_name()),
(TableType::Table, HeaderTD::const_name()),
(TableType::Table, HeaderNumbers::const_name()),
@ -22,6 +22,7 @@ pub const TABLES: [(TableType, &str); 17] = [
(TableType::DupSort, StorageChangeSet::const_name()),
(TableType::Table, TxSenders::const_name()),
(TableType::Table, Config::const_name()),
(TableType::Table, SyncStage::const_name()),
];
#[macro_export]
@ -86,6 +87,8 @@ table!(StorageChangeSet => TxId => StorageKeyBeforeTx);
table!(TxSenders => TxId => Address); // Is it necessary?
table!(Config => ConfigKey => ConfigValue);
table!(SyncStage => StageId => BlockNumber);
//
// TODO: Temporary types, until they're properly defined alongside with the Encode and Decode Trait
//
@ -110,3 +113,4 @@ type TxIdList = Vec<u8>;
type Address_StorageKey = Vec<u8>;
type AccountBeforeTx = Vec<u8>;
type StorageKeyBeforeTx = Vec<u8>;
type StageId = Vec<u8>;

View File

@ -1,5 +1,5 @@
use crate::pipeline::PipelineEvent;
use reth_db::mdbx;
use reth_db::kv::KVError;
use reth_primitives::BlockNumber;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
@ -17,7 +17,7 @@ pub enum StageError {
},
/// The stage encountered a database error.
#[error("A database error occurred.")]
Database(#[from] mdbx::Error),
Database(#[from] KVError),
/// The stage encountered an internal error.
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
@ -31,7 +31,7 @@ pub enum PipelineError {
Stage(#[from] StageError),
/// The pipeline encountered a database error.
#[error("A database error occurred.")]
Database(#[from] mdbx::Error),
Database(#[from] KVError),
/// The pipeline encountered an error while trying to send an event.
#[error("The pipeline encountered an error while trying to send an event.")]
Channel(#[from] SendError<PipelineEvent>),

View File

@ -1,4 +1,7 @@
use reth_db::mdbx;
use reth_db::{
kv::{tables::SyncStage, tx::Tx, KVError},
mdbx,
};
use reth_primitives::BlockNumber;
use std::fmt::Display;
@ -18,33 +21,24 @@ impl StageId {
/// Get the last committed progress of this stage.
pub fn get_progress<'db, K, E>(
&self,
tx: &mdbx::Transaction<'db, K, E>,
) -> Result<Option<BlockNumber>, mdbx::Error>
tx: &Tx<'db, K, E>,
) -> Result<Option<BlockNumber>, KVError>
where
K: mdbx::TransactionKind,
E: mdbx::EnvironmentKind,
{
// TODO: Clean up when we get better database abstractions
let bytes: Option<Vec<u8>> = tx.get(&tx.open_db(Some("SyncStage"))?, self.0.as_ref())?;
Ok(bytes.map(|b| BlockNumber::from_be_bytes(b.try_into().expect("Database corrupt"))))
tx.get::<SyncStage>(self.0.as_bytes().to_vec())
}
/// Save the progress of this stage.
pub fn save_progress<'db, E>(
&self,
tx: &mdbx::Transaction<'db, mdbx::RW, E>,
tx: &Tx<'db, mdbx::RW, E>,
block: BlockNumber,
) -> Result<(), mdbx::Error>
) -> Result<(), KVError>
where
E: mdbx::EnvironmentKind,
{
// TODO: Clean up when we get better database abstractions
tx.put(
&tx.open_db(Some("SyncStage"))?,
self.0,
block.to_be_bytes(),
mdbx::WriteFlags::UPSERT,
)
tx.put::<SyncStage>(self.0.as_bytes().to_vec(), block)
}
}

View File

@ -3,7 +3,7 @@ use crate::{
util::{db::TxContainer, opt::MaybeSender},
ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput,
};
use reth_db::mdbx;
use reth_db::{kv::Env, mdbx};
use reth_primitives::BlockNumber;
use std::fmt::{Debug, Formatter};
use tokio::sync::mpsc::Sender;
@ -118,7 +118,7 @@ where
/// Run the pipeline in an infinite loop. Will terminate early if the user has specified
/// a `max_block` in the pipeline.
pub async fn run(&mut self, db: &'db mdbx::Environment<E>) -> Result<(), PipelineError> {
pub async fn run(&mut self, db: &'db Env<E>) -> Result<(), PipelineError> {
let mut state = PipelineState {
events_sender: self.events_sender.clone(),
max_block: self.max_block,
@ -189,7 +189,7 @@ where
/// If the unwind is due to a bad block the number of that block should be specified.
pub async fn unwind(
&mut self,
db: &'db mdbx::Environment<E>,
db: &'db Env<E>,
to: BlockNumber,
bad_block: Option<BlockNumber>,
) -> Result<(), PipelineError> {
@ -202,7 +202,7 @@ where
};
// Unwind stages in reverse order of priority (i.e. higher priority = first)
let mut tx = db.begin_rw_txn()?;
let mut tx = db.begin_mut_tx()?;
for (_, QueuedStage { stage, .. }) in unwind_pipeline.iter_mut() {
let stage_id = stage.id();
let span = info_span!("Unwinding", stage = %stage_id);
@ -355,8 +355,11 @@ where
mod tests {
use super::*;
use crate::{StageId, UnwindOutput};
use reth_db::mdbx;
use tempfile::tempdir;
use reth_db::{
kv::{tx::Tx, EnvKind},
mdbx,
};
use tempfile::TempDir;
use tokio::sync::mpsc::channel;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use utils::TestStage;
@ -652,34 +655,15 @@ mod tests {
mod utils {
use super::*;
use async_trait::async_trait;
use reth_db::kv::KVError;
use std::{collections::VecDeque, error::Error};
// TODO: This is... not great.
pub(crate) fn test_db() -> Result<mdbx::Environment<mdbx::WriteMap>, mdbx::Error> {
const DB_TABLES: usize = 10;
// Build environment
let mut builder = mdbx::Environment::<mdbx::WriteMap>::new();
builder.set_max_dbs(DB_TABLES);
builder.set_geometry(mdbx::Geometry {
size: Some(0..usize::MAX),
growth_step: None,
shrink_threshold: None,
page_size: None,
});
builder.set_rp_augment_limit(16 * 256 * 1024);
// Open
let tempdir = tempdir().unwrap();
let path = tempdir.path();
std::fs::DirBuilder::new().recursive(true).create(path).unwrap();
let db = builder.open(path)?;
// Create tables
let tx = db.begin_rw_txn()?;
tx.create_db(Some("SyncStage"), mdbx::DatabaseFlags::default())?;
tx.commit()?;
pub(crate) fn test_db() -> Result<Env<mdbx::WriteMap>, KVError> {
let path =
TempDir::new().expect("Not able to create a temporary directory.").into_path();
let db = Env::<mdbx::WriteMap>::open(&path, EnvKind::RW)
.expect("Not able to open existing mdbx file.");
db.create_tables()?;
Ok(db)
}
@ -719,7 +703,7 @@ mod tests {
async fn execute<'tx>(
&mut self,
_: &mut mdbx::Transaction<'tx, mdbx::RW, E>,
_: &mut Tx<'tx, mdbx::RW, E>,
_: ExecInput,
) -> Result<ExecOutput, StageError>
where
@ -732,7 +716,7 @@ mod tests {
async fn unwind<'tx>(
&mut self,
_: &mut mdbx::Transaction<'tx, mdbx::RW, E>,
_: &mut Tx<'tx, mdbx::RW, E>,
_: UnwindInput,
) -> Result<UnwindOutput, Box<dyn Error + Send + Sync>>
where

View File

@ -1,6 +1,6 @@
use crate::{error::StageError, id::StageId};
use async_trait::async_trait;
use reth_db::mdbx;
use reth_db::{kv::tx::Tx, mdbx};
use reth_primitives::BlockNumber;
/// Stage execution input, see [Stage::execute].
@ -63,7 +63,7 @@ where
/// Execute the stage.
async fn execute<'tx>(
&mut self,
tx: &mut mdbx::Transaction<'tx, mdbx::RW, E>,
tx: &mut Tx<'tx, mdbx::RW, E>,
input: ExecInput,
) -> Result<ExecOutput, StageError>
where
@ -72,7 +72,7 @@ where
/// Unwind the stage.
async fn unwind<'tx>(
&mut self,
tx: &mut mdbx::Transaction<'tx, mdbx::RW, E>,
tx: &mut Tx<'tx, mdbx::RW, E>,
input: UnwindInput,
) -> Result<UnwindOutput, Box<dyn std::error::Error + Send + Sync>>
where

View File

@ -63,7 +63,10 @@ pub(crate) mod opt {
}
pub(crate) mod db {
use reth_db::mdbx;
use reth_db::{
kv::{tx::Tx, Env, KVError},
mdbx,
};
/// A container for a MDBX transaction that will open a new inner transaction when the current
/// one is committed.
@ -76,8 +79,8 @@ pub(crate) mod db {
E: mdbx::EnvironmentKind,
{
/// A handle to the MDBX database.
pub(crate) db: &'db mdbx::Environment<E>,
tx: Option<mdbx::Transaction<'tx, mdbx::RW, E>>,
pub(crate) db: &'db Env<E>,
tx: Option<Tx<'tx, mdbx::RW, E>>,
}
impl<'db, 'tx, E> TxContainer<'db, 'tx, E>
@ -88,8 +91,8 @@ pub(crate) mod db {
/// Create a new container with the given database handle.
///
/// A new inner transaction will be opened.
pub(crate) fn new(db: &'db mdbx::Environment<E>) -> Result<Self, mdbx::Error> {
Ok(Self { db, tx: Some(db.begin_rw_txn()?) })
pub(crate) fn new(db: &'db Env<E>) -> Result<Self, KVError> {
Ok(Self { db, tx: Some(Tx::new(db.begin_rw_txn()?)) })
}
/// Commit the current inner transaction and open a new one.
@ -98,10 +101,10 @@ pub(crate) mod db {
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [TxContainer::close] was called without following up with a call to [TxContainer::open].
pub(crate) fn commit(&mut self) -> Result<bool, mdbx::Error> {
pub(crate) fn commit(&mut self) -> Result<bool, KVError> {
let success =
self.tx.take().expect("Tried committing a non-existent transaction").commit()?;
self.tx = Some(self.db.begin_rw_txn()?);
self.tx = Some(Tx::new(self.db.begin_rw_txn()?));
Ok(success)
}
@ -111,7 +114,7 @@ pub(crate) mod db {
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [TxContainer::close] was called without following up with a call to [TxContainer::open].
pub(crate) fn get(&self) -> &mdbx::Transaction<'tx, mdbx::RW, E> {
pub(crate) fn get(&self) -> &Tx<'tx, mdbx::RW, E> {
self.tx.as_ref().expect("Tried getting a reference to a non-existent transaction")
}
@ -121,15 +124,15 @@ pub(crate) mod db {
///
/// Panics if an inner transaction does not exist. This should never be the case unless
/// [TxContainer::close] was called without following up with a call to [TxContainer::open].
pub(crate) fn get_mut(&mut self) -> &mut mdbx::Transaction<'tx, mdbx::RW, E> {
pub(crate) fn get_mut(&mut self) -> &mut Tx<'tx, mdbx::RW, E> {
self.tx
.as_mut()
.expect("Tried getting a mutable reference to a non-existent transaction")
}
/// Open a new inner transaction.
pub(crate) fn open(&mut self) -> Result<(), mdbx::Error> {
self.tx = Some(self.db.begin_rw_txn()?);
pub(crate) fn open(&mut self) -> Result<(), KVError> {
self.tx = Some(Tx::new(self.db.begin_rw_txn()?));
Ok(())
}