From 0a1f652b2f2835b8cad6791fcd6e24b4989d70f7 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 16 Jul 2024 16:45:21 +0200 Subject: [PATCH] feat: add `StorageWriter` standalone type (#9507) --- crates/prune/types/src/target.rs | 5 + crates/storage/errors/src/lib.rs | 3 + crates/storage/errors/src/provider.rs | 3 + crates/storage/errors/src/writer.rs | 15 ++ .../src/bundle_state/execution_outcome.rs | 58 ++----- crates/storage/provider/src/lib.rs | 3 + .../src/providers/database/provider.rs | 5 + .../src/providers/static_file/writer.rs | 3 +- .../storage/provider/src/writer/database.rs | 30 ++++ crates/storage/provider/src/writer/mod.rs | 157 ++++++++++++++++++ .../provider/src/writer/static_file.rs | 26 +++ crates/storage/storage-api/src/receipts.rs | 21 ++- 12 files changed, 278 insertions(+), 51 deletions(-) create mode 100644 crates/storage/errors/src/writer.rs create mode 100644 crates/storage/provider/src/writer/database.rs create mode 100644 crates/storage/provider/src/writer/mod.rs create mode 100644 crates/storage/provider/src/writer/static_file.rs diff --git a/crates/prune/types/src/target.rs b/crates/prune/types/src/target.rs index f2ceb168b..ca355b394 100644 --- a/crates/prune/types/src/target.rs +++ b/crates/prune/types/src/target.rs @@ -63,6 +63,11 @@ impl PruneModes { } } + /// Returns whether there is any kind of receipt pruning configuration. + pub fn has_receipts_pruning(&self) -> bool { + self.receipts.is_some() || !self.receipts_log_filter.is_empty() + } + /// Returns true if all prune modes are set to [`None`]. pub fn is_empty(&self) -> bool { self == &Self::none() diff --git a/crates/storage/errors/src/lib.rs b/crates/storage/errors/src/lib.rs index dc8d24a16..cf1a1a976 100644 --- a/crates/storage/errors/src/lib.rs +++ b/crates/storage/errors/src/lib.rs @@ -20,3 +20,6 @@ pub mod lockfile; /// Provider error pub mod provider; + +/// Writer error +pub mod writer; diff --git a/crates/storage/errors/src/provider.rs b/crates/storage/errors/src/provider.rs index db59d671f..c3d47aa0b 100644 --- a/crates/storage/errors/src/provider.rs +++ b/crates/storage/errors/src/provider.rs @@ -139,6 +139,9 @@ pub enum ProviderError { /// Storage lock error. #[error(transparent)] StorageLockError(#[from] crate::lockfile::StorageLockError), + /// Storage writer error. + #[error(transparent)] + StorageWriterError(#[from] crate::writer::StorageWriterError), } impl From for ProviderError { diff --git a/crates/storage/errors/src/writer.rs b/crates/storage/errors/src/writer.rs new file mode 100644 index 000000000..f60198d2f --- /dev/null +++ b/crates/storage/errors/src/writer.rs @@ -0,0 +1,15 @@ +use crate::db::DatabaseError; + +/// `StorageWriter` related errors +#[derive(Clone, Debug, thiserror_no_std::Error, PartialEq, Eq)] +pub enum StorageWriterError { + /// Database writer is missing + #[error("Database writer is missing")] + MissingDatabaseWriter, + /// Static file writer is missing + #[error("Static file writer is missing")] + MissingStaticFileWriter, + /// Database-related errors. + #[error(transparent)] + Database(#[from] DatabaseError), +} diff --git a/crates/storage/provider/src/bundle_state/execution_outcome.rs b/crates/storage/provider/src/bundle_state/execution_outcome.rs index 1305ca497..ebb69201e 100644 --- a/crates/storage/provider/src/bundle_state/execution_outcome.rs +++ b/crates/storage/provider/src/bundle_state/execution_outcome.rs @@ -1,67 +1,28 @@ use crate::{ - providers::StaticFileProviderRWRefMut, DatabaseProviderRW, StateChanges, StateReverts, - StateWriter, -}; -use reth_db::{tables, Database}; -use reth_db_api::{ - cursor::{DbCursorRO, DbCursorRW}, - transaction::{DbTx, DbTxMut}, + providers::StaticFileProviderRWRefMut, writer::StorageWriter, DatabaseProviderRW, StateChanges, + StateReverts, StateWriter, }; +use reth_db::Database; pub use reth_execution_types::*; -use reth_primitives::StaticFileSegment; -use reth_storage_errors::provider::{ProviderError, ProviderResult}; +use reth_storage_errors::provider::ProviderResult; pub use revm::db::states::OriginalValuesKnown; impl StateWriter for ExecutionOutcome { fn write_to_storage( self, provider_rw: &DatabaseProviderRW, - mut static_file_producer: Option>, + static_file_producer: Option>, is_value_known: OriginalValuesKnown, ) -> ProviderResult<()> where DB: Database, { - let tx = provider_rw.tx_ref(); let (plain_state, reverts) = self.bundle.into_plain_state_and_reverts(is_value_known); StateReverts(reverts).write_to_db(provider_rw, self.first_block)?; - // write receipts - let mut bodies_cursor = tx.cursor_read::()?; - let mut receipts_cursor = tx.cursor_write::()?; - - // ATTENTION: Any potential future refactor or change to how this loop works should keep in - // mind that the static file producer must always call `increment_block` even if the block - // has no receipts. Keeping track of the exact block range of the segment is needed for - // consistency, querying and file range segmentation. - let blocks = self.receipts.into_iter().enumerate(); - for (idx, receipts) in blocks { - let block_number = self.first_block + idx as u64; - let first_tx_index = bodies_cursor - .seek_exact(block_number)? - .map(|(_, indices)| indices.first_tx_num()) - .ok_or_else(|| ProviderError::BlockBodyIndicesNotFound(block_number))?; - - if let Some(static_file_producer) = &mut static_file_producer { - // Increment block on static file header. - static_file_producer.increment_block(StaticFileSegment::Receipts, block_number)?; - let receipts = receipts.into_iter().enumerate().map(|(tx_idx, receipt)| { - Ok(( - first_tx_index + tx_idx as u64, - receipt - .expect("receipt should not be filtered when saving to static files."), - )) - }); - static_file_producer.append_receipts(receipts)?; - } else if !receipts.is_empty() { - for (tx_idx, receipt) in receipts.into_iter().enumerate() { - if let Some(receipt) = receipt { - receipts_cursor.append(first_tx_index + tx_idx as u64, receipt)?; - } - } - } - } + StorageWriter::new(Some(provider_rw), static_file_producer) + .append_receipts_from_blocks(self.first_block, self.receipts.into_iter())?; StateChanges(plain_state).write_to_db(provider_rw)?; @@ -73,11 +34,12 @@ impl StateWriter for ExecutionOutcome { mod tests { use super::*; use crate::{test_utils::create_test_provider_factory, AccountReader}; - use reth_db::test_utils::create_test_rw_db; + use reth_db::{tables, test_utils::create_test_rw_db}; use reth_db_api::{ - cursor::DbDupCursorRO, + cursor::{DbCursorRO, DbDupCursorRO}, database::Database, models::{AccountBeforeTx, BlockNumberAddress}, + transaction::{DbTx, DbTxMut}, }; use reth_primitives::{ keccak256, Account, Address, Receipt, Receipts, StorageEntry, B256, U256, diff --git a/crates/storage/provider/src/lib.rs b/crates/storage/provider/src/lib.rs index 17c6af875..a578fa09d 100644 --- a/crates/storage/provider/src/lib.rs +++ b/crates/storage/provider/src/lib.rs @@ -36,6 +36,9 @@ pub use reth_execution_types::*; pub mod bundle_state; pub use bundle_state::{OriginalValuesKnown, StateChanges, StateReverts}; +/// Writer standalone type. +pub mod writer; + pub(crate) fn to_range>(bounds: R) -> std::ops::Range { let start = match bounds.start_bound() { std::ops::Bound::Included(&v) => v, diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 3aace4f86..4c773bdf7 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -113,6 +113,11 @@ impl DatabaseProvider { pub const fn static_file_provider(&self) -> &StaticFileProvider { &self.static_file_provider } + + /// Returns reference to prune modes. + pub const fn prune_modes_ref(&self) -> &PruneModes { + &self.prune_modes + } } impl DatabaseProvider { diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index a84210f4c..df4417ace 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -1,8 +1,7 @@ -use crate::providers::static_file::metrics::StaticFileProviderOperation; - use super::{ manager::StaticFileProviderInner, metrics::StaticFileProviderMetrics, StaticFileProvider, }; +use crate::providers::static_file::metrics::StaticFileProviderOperation; use dashmap::mapref::one::RefMut; use reth_codecs::Compact; use reth_db_api::models::CompactU256; diff --git a/crates/storage/provider/src/writer/database.rs b/crates/storage/provider/src/writer/database.rs new file mode 100644 index 000000000..86da1d6ea --- /dev/null +++ b/crates/storage/provider/src/writer/database.rs @@ -0,0 +1,30 @@ +use reth_db::{ + cursor::{DbCursorRO, DbCursorRW}, + tables, +}; +use reth_errors::ProviderResult; +use reth_primitives::{BlockNumber, Receipt, TxNumber}; +use reth_storage_api::ReceiptWriter; + +pub(crate) struct DatabaseWriter<'a, W>(pub(crate) &'a mut W); + +impl<'a, W> ReceiptWriter for DatabaseWriter<'a, W> +where + W: DbCursorRO + DbCursorRW, +{ + fn append_block_receipts( + &mut self, + first_tx_index: TxNumber, + _: BlockNumber, + receipts: Vec>, + ) -> ProviderResult<()> { + if !receipts.is_empty() { + for (tx_idx, receipt) in receipts.into_iter().enumerate() { + if let Some(receipt) = receipt { + self.0.append(first_tx_index + tx_idx as u64, receipt)?; + } + } + } + Ok(()) + } +} diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs new file mode 100644 index 000000000..9bd8b2a19 --- /dev/null +++ b/crates/storage/provider/src/writer/mod.rs @@ -0,0 +1,157 @@ +use crate::{providers::StaticFileProviderRWRefMut, DatabaseProviderRW}; +use reth_db::{ + cursor::DbCursorRO, + tables, + transaction::{DbTx, DbTxMut}, + Database, +}; +use reth_errors::{ProviderError, ProviderResult}; +use reth_primitives::BlockNumber; +use reth_storage_api::ReceiptWriter; +use reth_storage_errors::writer::StorageWriterError; +use static_file::StaticFileWriter; + +mod database; +mod static_file; +use database::DatabaseWriter; + +enum StorageType { + Database(C), + StaticFile(S), +} + +/// [`StorageWriter`] is responsible for managing the writing to either database, static file or +/// both. +#[derive(Debug)] +pub struct StorageWriter<'a, 'b, DB: Database> { + database_writer: Option<&'a DatabaseProviderRW>, + static_file_writer: Option>, +} + +impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> { + /// Creates a new instance of [`StorageWriter`]. + /// + /// # Parameters + /// - `database_writer`: An optional reference to a database writer. + /// - `static_file_writer`: An optional mutable reference to a static file writer. + pub const fn new( + database_writer: Option<&'a DatabaseProviderRW>, + static_file_writer: Option>, + ) -> Self { + Self { database_writer, static_file_writer } + } + + /// Creates a new instance of [`StorageWriter`] from a database writer. + pub const fn from_database_writer(database_writer: &'a DatabaseProviderRW) -> Self { + Self::new(Some(database_writer), None) + } + + /// Creates a new instance of [`StorageWriter`] from a static file writer. + pub const fn from_static_file_writer( + static_file_writer: StaticFileProviderRWRefMut<'b>, + ) -> Self { + Self::new(None, Some(static_file_writer)) + } + + /// Returns a reference to the database writer. + /// + /// # Panics + /// If the database writer is not set. + fn database_writer(&self) -> &DatabaseProviderRW { + self.database_writer.as_ref().expect("should exist") + } + + /// Returns a mutable reference to the static file writer. + /// + /// # Panics + /// If the static file writer is not set. + fn static_file_writer(&mut self) -> &mut StaticFileProviderRWRefMut<'b> { + self.static_file_writer.as_mut().expect("should exist") + } + + /// Ensures that the database writer is set. + /// + /// # Returns + /// - `Ok(())` if the database writer is set. + /// - `Err(StorageWriterError::MissingDatabaseWriter)` if the database writer is not set. + const fn ensure_database_writer(&self) -> Result<(), StorageWriterError> { + if self.database_writer.is_none() { + return Err(StorageWriterError::MissingDatabaseWriter) + } + Ok(()) + } + + /// Ensures that the static file writer is set. + /// + /// # Returns + /// - `Ok(())` if the static file writer is set. + /// - `Err(StorageWriterError::MissingStaticFileWriter)` if the static file writer is not set. + const fn ensure_static_file_writer(&self) -> Result<(), StorageWriterError> { + if self.static_file_writer.is_none() { + return Err(StorageWriterError::MissingStaticFileWriter) + } + Ok(()) + } + + /// Appends receipts block by block. + /// + /// ATTENTION: If called from [`StorageWriter`] without a static file producer, it will always + /// write them to database. Otherwise, it will look into the pruning configuration to decide. + /// + /// # Parameters + /// - `initial_block_number`: The starting block number. + /// - `blocks`: An iterator over blocks, each block having a vector of optional receipts. If + /// `receipt` is `None`, it has been pruned. + pub fn append_receipts_from_blocks( + mut self, + initial_block_number: BlockNumber, + blocks: impl Iterator>>, + ) -> ProviderResult<()> { + self.ensure_database_writer()?; + let mut bodies_cursor = + self.database_writer().tx_ref().cursor_read::()?; + + // We write receipts to database in two situations: + // * If we are in live sync. In this case, `StorageWriter` is built without a static file + // writer. + // * If there is any kind of receipt pruning + let mut storage_type = if self.static_file_writer.is_none() || + self.database_writer().prune_modes_ref().has_receipts_pruning() + { + StorageType::Database( + self.database_writer().tx_ref().cursor_write::()?, + ) + } else { + self.ensure_static_file_writer()?; + StorageType::StaticFile(self.static_file_writer()) + }; + + for (idx, receipts) in blocks.enumerate() { + let block_number = initial_block_number + idx as u64; + + let first_tx_index = bodies_cursor + .seek_exact(block_number)? + .map(|(_, indices)| indices.first_tx_num()) + .ok_or_else(|| ProviderError::BlockBodyIndicesNotFound(block_number))?; + + match &mut storage_type { + StorageType::Database(cursor) => { + DatabaseWriter(cursor).append_block_receipts( + first_tx_index, + block_number, + receipts, + )?; + } + StorageType::StaticFile(sf) => { + StaticFileWriter(*sf).append_block_receipts( + first_tx_index, + block_number, + receipts, + )?; + } + }; + } + + Ok(()) + } +} diff --git a/crates/storage/provider/src/writer/static_file.rs b/crates/storage/provider/src/writer/static_file.rs new file mode 100644 index 000000000..b31b7dabd --- /dev/null +++ b/crates/storage/provider/src/writer/static_file.rs @@ -0,0 +1,26 @@ +use crate::providers::StaticFileProviderRWRefMut; +use reth_errors::ProviderResult; +use reth_primitives::{BlockNumber, Receipt, StaticFileSegment, TxNumber}; +use reth_storage_api::ReceiptWriter; + +pub(crate) struct StaticFileWriter<'a, W>(pub(crate) &'a mut W); + +impl<'a> ReceiptWriter for StaticFileWriter<'a, StaticFileProviderRWRefMut<'_>> { + fn append_block_receipts( + &mut self, + first_tx_index: TxNumber, + block_number: BlockNumber, + receipts: Vec>, + ) -> ProviderResult<()> { + // Increment block on static file header. + self.0.increment_block(StaticFileSegment::Receipts, block_number)?; + let receipts = receipts.into_iter().enumerate().map(|(tx_idx, receipt)| { + Ok(( + first_tx_index + tx_idx as u64, + receipt.expect("receipt should not be filtered when saving to static files."), + )) + }); + self.0.append_receipts(receipts)?; + Ok(()) + } +} diff --git a/crates/storage/storage-api/src/receipts.rs b/crates/storage/storage-api/src/receipts.rs index 04eb81aad..ec28d8143 100644 --- a/crates/storage/storage-api/src/receipts.rs +++ b/crates/storage/storage-api/src/receipts.rs @@ -1,5 +1,7 @@ use crate::BlockIdReader; -use reth_primitives::{BlockHashOrNumber, BlockId, BlockNumberOrTag, Receipt, TxHash, TxNumber}; +use reth_primitives::{ + BlockHashOrNumber, BlockId, BlockNumber, BlockNumberOrTag, Receipt, TxHash, TxNumber, +}; use reth_storage_errors::provider::ProviderResult; use std::ops::RangeBounds; @@ -66,3 +68,20 @@ pub trait ReceiptProviderIdExt: ReceiptProvider + BlockIdReader { self.receipts_by_block_id(number_or_tag.into()) } } + +/// Writer trait for writing [`Receipt`] data. +pub trait ReceiptWriter { + /// Appends receipts for a block. + /// + /// # Parameters + /// - `first_tx_index`: The transaction number of the first receipt in the block. + /// - `block_number`: The block number to which the receipts belong. + /// - `receipts`: A vector of optional receipts in the block. If `None`, it means they were + /// pruned. + fn append_block_receipts( + &mut self, + first_tx_index: TxNumber, + block_number: BlockNumber, + receipts: Vec>, + ) -> ProviderResult<()>; +}