feat: use DbProvider on UnifiedStorageWriter (#10933)

This commit is contained in:
joshieDo
2024-09-16 14:49:24 +01:00
committed by GitHub
parent 56fb18bf09
commit 66848d98ef
5 changed files with 83 additions and 35 deletions

View File

@ -354,7 +354,7 @@ where
let time = Instant::now();
// write output
let mut writer = UnifiedStorageWriter::new(provider, static_file_producer);
let mut writer = UnifiedStorageWriter::new(&provider, static_file_producer);
writer.write_to_storage(state, OriginalValuesKnown::Yes)?;
let db_write_duration = time.elapsed();

View File

@ -208,7 +208,7 @@ pub fn insert_state<'a, 'b, DB: Database>(
Vec::new(),
);
let mut storage_writer = UnifiedStorageWriter::from_database(provider);
let mut storage_writer = UnifiedStorageWriter::from_database(&provider);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
trace!(target: "reth::cli", "Inserted state");

View File

@ -92,6 +92,12 @@ impl<DB: Database> DerefMut for DatabaseProviderRW<DB> {
}
}
impl<DB: Database> AsRef<DatabaseProvider<<DB as Database>::TXMut>> for DatabaseProviderRW<DB> {
fn as_ref(&self) -> &DatabaseProvider<<DB as Database>::TXMut> {
&self.0
}
}
impl<DB: Database> DatabaseProviderRW<DB> {
/// Commit database transaction and static file if it exists.
pub fn commit(self) -> ProviderResult<bool> {
@ -104,6 +110,12 @@ impl<DB: Database> DatabaseProviderRW<DB> {
}
}
impl<DB: Database> From<DatabaseProviderRW<DB>> for DatabaseProvider<<DB as Database>::TXMut> {
fn from(provider: DatabaseProviderRW<DB>) -> Self {
provider.0
}
}
/// A provider struct that fetches data from the database.
/// Wrapper around [`DbTx`] and [`DbTxMut`]. Example: [`HeaderProvider`] [`BlockHashReader`]
#[derive(Debug)]
@ -142,6 +154,12 @@ impl<TX: DbTxMut> DatabaseProvider<TX> {
}
}
impl<TX> AsRef<Self> for DatabaseProvider<TX> {
fn as_ref(&self) -> &Self {
self
}
}
impl<TX: DbTx + 'static> TryIntoHistoricalStateProvider for DatabaseProvider<TX> {
fn try_into_history_at_block(
self,
@ -3120,7 +3138,7 @@ impl<TX: DbTx> StateReader for DatabaseProvider<TX> {
}
}
impl<TX: DbTxMut + DbTx> BlockExecutionWriter for DatabaseProvider<TX> {
impl<TX: DbTxMut + DbTx + 'static> BlockExecutionWriter for DatabaseProvider<TX> {
fn take_block_and_execution_range(
&self,
range: RangeInclusive<BlockNumber>,
@ -3298,7 +3316,7 @@ impl<TX: DbTxMut + DbTx> BlockExecutionWriter for DatabaseProvider<TX> {
}
}
impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
impl<TX: DbTxMut + DbTx + 'static> BlockWriter for DatabaseProvider<TX> {
/// Inserts the block into the database, always modifying the following tables:
/// * [`CanonicalHeaders`](tables::CanonicalHeaders)
/// * [`Headers`](tables::Headers)
@ -3591,6 +3609,10 @@ impl<TX: DbTx + 'static> DBProvider for DatabaseProvider<TX> {
fn into_tx(self) -> Self::Tx {
self.tx
}
fn prune_modes_ref(&self) -> &PruneModes {
self.prune_modes_ref()
}
}
/// Helper method to recover senders for any blocks in the db which do not have senders. This

View File

@ -1,8 +1,7 @@
use crate::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter as SfWriter},
writer::static_file::StaticFileWriter,
BlockExecutionWriter, BlockWriter, DatabaseProvider, DatabaseProviderRW, HistoryWriter,
StateChangeWriter, StateWriter, TrieWriter,
BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter, TrieWriter,
};
use reth_chain_state::ExecutedBlock;
use reth_db::{
@ -10,7 +9,6 @@ use reth_db::{
models::CompactU256,
tables,
transaction::{DbTx, DbTxMut},
Database,
};
use reth_errors::{ProviderError, ProviderResult};
use reth_execution_types::ExecutionOutcome;
@ -19,7 +17,7 @@ use reth_primitives::{
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::{
BlockNumReader, HeaderProvider, ReceiptWriter, StageCheckpointWriter, TransactionsProviderExt,
DBProvider, HeaderProvider, ReceiptWriter, StageCheckpointWriter, TransactionsProviderExt,
};
use reth_storage_errors::writer::UnifiedStorageWriterError;
use revm::db::OriginalValuesKnown;
@ -38,29 +36,38 @@ enum StorageType<C = (), S = ()> {
/// [`UnifiedStorageWriter`] is responsible for managing the writing to storage with both database
/// and static file providers.
#[derive(Debug)]
pub struct UnifiedStorageWriter<'a, TX, SF> {
database: &'a DatabaseProvider<TX>,
static_file: Option<SF>,
pub struct UnifiedStorageWriter<'a, ProviderDB, ProviderSF> {
database: &'a ProviderDB,
static_file: Option<ProviderSF>,
}
impl<'a, TX, SF> UnifiedStorageWriter<'a, TX, SF> {
impl<'a, ProviderDB, ProviderSF> UnifiedStorageWriter<'a, ProviderDB, ProviderSF> {
/// Creates a new instance of [`UnifiedStorageWriter`].
///
/// # Parameters
/// - `database`: An optional reference to a database provider.
/// - `static_file`: An optional mutable reference to a static file instance.
pub const fn new(database: &'a DatabaseProvider<TX>, static_file: Option<SF>) -> Self {
Self { database, static_file }
pub fn new<P>(database: &'a P, static_file: Option<ProviderSF>) -> Self
where
P: AsRef<ProviderDB>,
{
Self { database: database.as_ref(), static_file }
}
/// Creates a new instance of [`UnifiedStorageWriter`] from a database provider and a static
/// file instance.
pub const fn from(database: &'a DatabaseProvider<TX>, static_file: SF) -> Self {
pub fn from<P>(database: &'a P, static_file: ProviderSF) -> Self
where
P: AsRef<ProviderDB>,
{
Self::new(database, Some(static_file))
}
/// Creates a new instance of [`UnifiedStorageWriter`] from a database provider.
pub const fn from_database(database: &'a DatabaseProvider<TX>) -> Self {
pub fn from_database<P>(database: &'a P) -> Self
where
P: AsRef<ProviderDB>,
{
Self::new(database, None)
}
@ -68,7 +75,7 @@ impl<'a, TX, SF> UnifiedStorageWriter<'a, TX, SF> {
///
/// # Panics
/// If the database provider is not set.
const fn database(&self) -> &DatabaseProvider<TX> {
const fn database(&self) -> &ProviderDB {
self.database
}
@ -76,7 +83,7 @@ impl<'a, TX, SF> UnifiedStorageWriter<'a, TX, SF> {
///
/// # Panics
/// If the static file instance is not set.
fn static_file(&self) -> &SF {
fn static_file(&self) -> &ProviderSF {
self.static_file.as_ref().expect("should exist")
}
@ -84,7 +91,7 @@ impl<'a, TX, SF> UnifiedStorageWriter<'a, TX, SF> {
///
/// # Panics
/// If the static file instance is not set.
fn static_file_mut(&mut self) -> &mut SF {
fn static_file_mut(&mut self) -> &mut ProviderSF {
self.static_file.as_mut().expect("should exist")
}
@ -111,12 +118,15 @@ impl UnifiedStorageWriter<'_, (), ()> {
/// start-up.
///
/// NOTE: If unwinding data from storage, use `commit_unwind` instead!
pub fn commit<DB: Database>(
database: DatabaseProviderRW<DB>,
pub fn commit<P>(
database: impl Into<P> + AsRef<P>,
static_file: StaticFileProvider,
) -> ProviderResult<()> {
) -> ProviderResult<()>
where
P: DBProvider<Tx: DbTxMut>,
{
static_file.commit()?;
database.commit()?;
database.into().into_tx().commit()?;
Ok(())
}
@ -128,19 +138,30 @@ impl UnifiedStorageWriter<'_, (), ()> {
/// checkpoints on the next start-up.
///
/// NOTE: Should only be used after unwinding data from storage!
pub fn commit_unwind<DB: Database>(
database: DatabaseProviderRW<DB>,
pub fn commit_unwind<P>(
database: impl Into<P> + AsRef<P>,
static_file: StaticFileProvider,
) -> ProviderResult<()> {
database.commit()?;
) -> ProviderResult<()>
where
P: DBProvider<Tx: DbTxMut>,
{
database.into().into_tx().commit()?;
static_file.commit()?;
Ok(())
}
}
impl<'a, 'b, TX> UnifiedStorageWriter<'a, TX, &'b StaticFileProvider>
impl<'a, 'b, ProviderDB> UnifiedStorageWriter<'a, ProviderDB, &'b StaticFileProvider>
where
TX: DbTxMut + DbTx,
ProviderDB: DBProvider<Tx: DbTx + DbTxMut>
+ BlockWriter
+ TransactionsProviderExt
+ StateChangeWriter
+ TrieWriter
+ HistoryWriter
+ StageCheckpointWriter
+ BlockExecutionWriter
+ AsRef<ProviderDB>,
{
/// Writes executed blocks and receipts to storage.
pub fn save_blocks(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> {
@ -296,9 +317,9 @@ where
}
}
impl<'a, 'b, TX> UnifiedStorageWriter<'a, TX, StaticFileProviderRWRefMut<'b>>
impl<'a, 'b, ProviderDB> UnifiedStorageWriter<'a, ProviderDB, StaticFileProviderRWRefMut<'b>>
where
TX: DbTx,
ProviderDB: DBProvider<Tx: DbTx> + HeaderProvider,
{
/// Ensures that the static file writer is set and of the right [`StaticFileSegment`] variant.
///
@ -407,9 +428,9 @@ where
}
}
impl<'a, 'b, TX> UnifiedStorageWriter<'a, TX, StaticFileProviderRWRefMut<'b>>
impl<'a, 'b, ProviderDB> UnifiedStorageWriter<'a, ProviderDB, StaticFileProviderRWRefMut<'b>>
where
TX: DbTxMut + DbTx,
ProviderDB: DBProvider<Tx: DbTxMut + DbTx> + HeaderProvider,
{
/// Appends receipts block by block.
///
@ -488,9 +509,10 @@ where
}
}
impl<'a, 'b, TX> StateWriter for UnifiedStorageWriter<'a, TX, StaticFileProviderRWRefMut<'b>>
impl<'a, 'b, ProviderDB> StateWriter
for UnifiedStorageWriter<'a, ProviderDB, StaticFileProviderRWRefMut<'b>>
where
TX: DbTxMut + DbTx,
ProviderDB: DBProvider<Tx: DbTxMut + DbTx> + StateChangeWriter + HeaderProvider,
{
/// Write the data and receipts to the database or static files if `static_file_producer` is
/// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts.

View File

@ -1,4 +1,5 @@
use reth_db_api::{database::Database, transaction::DbTx};
use reth_prune_types::PruneModes;
use reth_storage_errors::provider::ProviderResult;
/// Database provider.
@ -30,6 +31,9 @@ pub trait DBProvider: Send + Sync + Sized + 'static {
fn commit(self) -> ProviderResult<bool> {
Ok(self.into_tx().commit()?)
}
/// Returns a reference to prune modes.
fn prune_modes_ref(&self) -> &PruneModes;
}
/// Database provider factory.