From 7a6a725d914cbe35b44b03d890341b2835ebe879 Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Wed, 27 Nov 2024 19:31:13 +0400 Subject: [PATCH] feat: add `Receipt` AT to writer traits (#12892) --- .../commands/debug_cmd/in_memory_merkle.rs | 7 +- bin/reth/src/commands/debug_cmd/merkle.rs | 7 +- .../src/commands/debug_cmd/replay_engine.rs | 5 +- .../cli/commands/src/stage/dump/execution.rs | 19 +- crates/cli/commands/src/stage/dump/merkle.rs | 12 +- crates/exex/exex/src/backfill/test_utils.rs | 5 +- crates/node/builder/src/launch/common.rs | 2 + crates/node/builder/src/setup.rs | 14 +- .../cli/src/commands/import_receipts.rs | 11 +- crates/primitives/src/receipt.rs | 7 +- crates/stages/stages/src/stages/execution.rs | 7 +- crates/storage/db-common/src/init.rs | 16 +- crates/storage/db/src/tables/mod.rs | 4 +- .../src/providers/blockchain_provider.rs | 2 +- .../provider/src/providers/consistent.rs | 5 +- .../src/providers/database/provider.rs | 171 +++++++++--------- crates/storage/provider/src/providers/mod.rs | 6 +- .../src/providers/static_file/manager.rs | 6 +- .../src/providers/static_file/writer.rs | 5 +- crates/storage/provider/src/traits/block.rs | 4 +- crates/storage/provider/src/traits/mod.rs | 2 +- crates/storage/provider/src/traits/state.rs | 16 +- crates/storage/provider/src/writer/mod.rs | 36 ++-- crates/trie/parallel/benches/root.rs | 3 +- 24 files changed, 211 insertions(+), 161 deletions(-) diff --git a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs index d592e956c..870dc1ddf 100644 --- a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs +++ b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs @@ -60,7 +60,10 @@ impl> Command { async fn build_network< N: ProviderNodeTypes< ChainSpec = C::ChainSpec, - Primitives: NodePrimitives, + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, >, >( &self, @@ -178,7 +181,7 @@ impl> Command { .try_seal_with_senders() .map_err(|_| BlockValidationError::SenderRecoveryError)?, )?; - provider_rw.write_to_storage( + provider_rw.write_state( execution_outcome, OriginalValuesKnown::No, StorageLocation::Database, diff --git a/bin/reth/src/commands/debug_cmd/merkle.rs b/bin/reth/src/commands/debug_cmd/merkle.rs index ba6fd12f8..78e32df52 100644 --- a/bin/reth/src/commands/debug_cmd/merkle.rs +++ b/bin/reth/src/commands/debug_cmd/merkle.rs @@ -59,7 +59,10 @@ impl> Command { async fn build_network< N: ProviderNodeTypes< ChainSpec = C::ChainSpec, - Primitives: NodePrimitives, + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, >, >( &self, @@ -163,7 +166,7 @@ impl> Command { executor.execute_and_verify_one((&sealed_block.clone().unseal(), td).into())?; let execution_outcome = executor.finalize(); - provider_rw.write_to_storage( + provider_rw.write_state( execution_outcome, OriginalValuesKnown::Yes, StorageLocation::Database, diff --git a/bin/reth/src/commands/debug_cmd/replay_engine.rs b/bin/reth/src/commands/debug_cmd/replay_engine.rs index 409871673..04d3b5763 100644 --- a/bin/reth/src/commands/debug_cmd/replay_engine.rs +++ b/bin/reth/src/commands/debug_cmd/replay_engine.rs @@ -58,7 +58,10 @@ impl> Command { async fn build_network< N: ProviderNodeTypes< ChainSpec = C::ChainSpec, - Primitives: NodePrimitives, + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, >, >( &self, diff --git a/crates/cli/commands/src/stage/dump/execution.rs b/crates/cli/commands/src/stage/dump/execution.rs index 4afcdf446..000c1b542 100644 --- a/crates/cli/commands/src/stage/dump/execution.rs +++ b/crates/cli/commands/src/stage/dump/execution.rs @@ -28,7 +28,10 @@ pub(crate) async fn dump_execution_stage( where N: ProviderNodeTypes< DB = Arc, - Primitives: NodePrimitives, + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, >, E: BlockExecutorProvider, { @@ -136,7 +139,12 @@ fn import_tables_with_range( /// `PlainAccountState` safely. There might be some state dependency from an address /// which hasn't been changed in the given range. fn unwind_and_copy< - N: ProviderNodeTypes>, + N: ProviderNodeTypes< + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, + >, >( db_tool: &DbTool, from: u64, @@ -174,7 +182,12 @@ fn dry_run( executor: E, ) -> eyre::Result<()> where - N: ProviderNodeTypes>, + N: ProviderNodeTypes< + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, + >, E: BlockExecutorProvider, { info!(target: "reth::cli", "Executing stage. [dry-run]"); diff --git a/crates/cli/commands/src/stage/dump/merkle.rs b/crates/cli/commands/src/stage/dump/merkle.rs index 3fa0c4f07..ce1874372 100644 --- a/crates/cli/commands/src/stage/dump/merkle.rs +++ b/crates/cli/commands/src/stage/dump/merkle.rs @@ -28,7 +28,10 @@ use tracing::info; pub(crate) async fn dump_merkle_stage< N: ProviderNodeTypes< DB = Arc, - Primitives: NodePrimitives, + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, >, >( db_tool: &DbTool, @@ -74,7 +77,12 @@ pub(crate) async fn dump_merkle_stage< /// Dry-run an unwind to FROM block and copy the necessary table data to the new database. fn unwind_and_copy< - N: ProviderNodeTypes>, + N: ProviderNodeTypes< + Primitives: NodePrimitives< + Block = reth_primitives::Block, + Receipt = reth_primitives::Receipt, + >, + >, >( db_tool: &DbTool, range: (u64, u64), diff --git a/crates/exex/exex/src/backfill/test_utils.rs b/crates/exex/exex/src/backfill/test_utils.rs index 861d42f04..6d93314e2 100644 --- a/crates/exex/exex/src/backfill/test_utils.rs +++ b/crates/exex/exex/src/backfill/test_utils.rs @@ -62,6 +62,7 @@ where Primitives: FullNodePrimitives< Block = reth_primitives::Block, BlockBody = reth_primitives::BlockBody, + Receipt = reth_primitives::Receipt, >, >, { @@ -171,6 +172,7 @@ where Primitives: FullNodePrimitives< Block = reth_primitives::Block, BlockBody = reth_primitives::BlockBody, + Receipt = reth_primitives::Receipt, >, >, { @@ -194,7 +196,8 @@ pub(crate) fn blocks_and_execution_outcome( ) -> eyre::Result<(Vec, ExecutionOutcome)> where N: ProviderNodeTypes, - N::Primitives: FullNodePrimitives, + N::Primitives: + FullNodePrimitives, { let (block1, block2) = blocks(chain_spec.clone(), key_pair)?; diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 987839912..830909c8c 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -385,6 +385,7 @@ where N::Primitives: FullNodePrimitives< Block = reth_primitives::Block, BlockBody = reth_primitives::BlockBody, + Receipt = reth_primitives::Receipt, >, { let factory = ProviderFactory::new( @@ -455,6 +456,7 @@ where N::Primitives: FullNodePrimitives< Block = reth_primitives::Block, BlockBody = reth_primitives::BlockBody, + Receipt = reth_primitives::Receipt, >, { let factory = self.create_provider_factory().await?; diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index ea0e6b9fe..092c1fdf6 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -41,8 +41,11 @@ where N: ProviderNodeTypes, Client: EthBlockClient + 'static, Executor: BlockExecutorProvider, - N::Primitives: - FullNodePrimitives, + N::Primitives: FullNodePrimitives< + Block = reth_primitives::Block, + BlockBody = reth_primitives::BlockBody, + Receipt = reth_primitives::Receipt, + >, { // building network downloaders using the fetch client let header_downloader = ReverseHeadersDownloaderBuilder::new(config.headers) @@ -90,8 +93,11 @@ where H: HeaderDownloader
+ 'static, B: BodyDownloader> + 'static, Executor: BlockExecutorProvider, - N::Primitives: - FullNodePrimitives, + N::Primitives: FullNodePrimitives< + Block = reth_primitives::Block, + BlockBody = reth_primitives::BlockBody, + Receipt = reth_primitives::Receipt, + >, { let mut builder = Pipeline::::builder(); diff --git a/crates/optimism/cli/src/commands/import_receipts.rs b/crates/optimism/cli/src/commands/import_receipts.rs index 59d596685..a5c12a48c 100644 --- a/crates/optimism/cli/src/commands/import_receipts.rs +++ b/crates/optimism/cli/src/commands/import_receipts.rs @@ -15,7 +15,7 @@ use reth_execution_types::ExecutionOutcome; use reth_node_core::version::SHORT_VERSION; use reth_optimism_chainspec::OpChainSpec; use reth_optimism_primitives::bedrock::is_dup_tx; -use reth_primitives::Receipts; +use reth_primitives::{NodePrimitives, Receipts}; use reth_provider::{ providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory, OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StageCheckpointWriter, @@ -85,7 +85,10 @@ pub async fn import_receipts_from_file( filter: F, ) -> eyre::Result<()> where - N: ProviderNodeTypes, + N: ProviderNodeTypes< + ChainSpec = OpChainSpec, + Primitives: NodePrimitives, + >, P: AsRef, F: FnMut(u64, &mut Receipts) -> usize, { @@ -123,7 +126,7 @@ pub async fn import_receipts_from_reader( mut filter: F, ) -> eyre::Result where - N: ProviderNodeTypes, + N: ProviderNodeTypes>, F: FnMut(u64, &mut Receipts) -> usize, { let static_file_provider = provider_factory.static_file_provider(); @@ -219,7 +222,7 @@ where ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default()); // finally, write the receipts - provider.write_to_storage( + provider.write_state( execution_outcome, OriginalValuesKnown::Yes, StorageLocation::StaticFiles, diff --git a/crates/primitives/src/receipt.rs b/crates/primitives/src/receipt.rs index 77a44dc39..95d707d1b 100644 --- a/crates/primitives/src/receipt.rs +++ b/crates/primitives/src/receipt.rs @@ -131,7 +131,6 @@ impl InMemorySize for Receipt { Debug, PartialEq, Eq, - Default, Serialize, Deserialize, From, @@ -187,6 +186,12 @@ impl From for ReceiptWithBloom { } } +impl Default for Receipts { + fn default() -> Self { + Self { receipt_vec: Vec::new() } + } +} + /// [`Receipt`] with calculated bloom filter. #[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)] #[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))] diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index 297130c34..ce969f257 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -16,7 +16,7 @@ use reth_primitives_traits::{format_gas_throughput, Block, BlockBody, NodePrimit use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, BlockHashReader, BlockReader, DBProvider, HeaderProvider, LatestStateProviderRef, - OriginalValuesKnown, ProviderError, StateChangeWriter, StateCommitmentProvider, StateWriter, + OriginalValuesKnown, ProviderError, StateCommitmentProvider, StateWriter, StaticFileProviderFactory, StatsReader, StorageLocation, TransactionVariant, }; use reth_prune_types::PruneModes; @@ -262,9 +262,8 @@ where + BlockReader + StaticFileProviderFactory + StatsReader - + StateChangeWriter + BlockHashReader - + StateWriter + + StateWriter + StateCommitmentProvider, { /// Return the id of the stage @@ -432,7 +431,7 @@ where let time = Instant::now(); // write output - provider.write_to_storage(state, OriginalValuesKnown::Yes, StorageLocation::StaticFiles)?; + provider.write_state(state, OriginalValuesKnown::Yes, StorageLocation::StaticFiles)?; let db_write_duration = time.elapsed(); debug!( diff --git a/crates/storage/db-common/src/init.rs b/crates/storage/db-common/src/init.rs index 367190b58..ec31edd06 100644 --- a/crates/storage/db-common/src/init.rs +++ b/crates/storage/db-common/src/init.rs @@ -13,8 +13,8 @@ use reth_provider::{ errors::provider::ProviderResult, providers::StaticFileWriter, writer::UnifiedStorageWriter, BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, ExecutionOutcome, HashingWriter, HeaderProvider, HistoryWriter, - OriginalValuesKnown, ProviderError, RevertsInit, StageCheckpointWriter, StateChangeWriter, - StateWriter, StaticFileProviderFactory, StorageLocation, TrieWriter, + OriginalValuesKnown, ProviderError, RevertsInit, StageCheckpointWriter, StateWriter, + StaticFileProviderFactory, StorageLocation, TrieWriter, }; use reth_stages_types::{StageCheckpoint, StageId}; use reth_trie::{IntermediateStateRootState, StateRoot as StateRootComputer, StateRootProgress}; @@ -75,7 +75,7 @@ where + HistoryWriter + HeaderProvider + HashingWriter - + StateChangeWriter + + StateWriter + StateWriter + AsRef, { @@ -146,7 +146,6 @@ pub fn insert_genesis_state<'a, 'b, Provider>( where Provider: StaticFileProviderFactory + DBProvider - + StateChangeWriter + HeaderProvider + StateWriter + AsRef, @@ -163,7 +162,6 @@ pub fn insert_state<'a, 'b, Provider>( where Provider: StaticFileProviderFactory + DBProvider - + StateChangeWriter + HeaderProvider + StateWriter + AsRef, @@ -233,11 +231,7 @@ where Vec::new(), ); - provider.write_to_storage( - execution_outcome, - OriginalValuesKnown::Yes, - StorageLocation::Database, - )?; + provider.write_state(execution_outcome, OriginalValuesKnown::Yes, StorageLocation::Database)?; trace!(target: "reth::cli", "Inserted state"); @@ -355,7 +349,6 @@ where + HistoryWriter + HeaderProvider + HashingWriter - + StateChangeWriter + TrieWriter + StateWriter + AsRef, @@ -478,7 +471,6 @@ where + HashingWriter + HistoryWriter + StateWriter - + StateChangeWriter + AsRef, { let accounts_len = collector.len(); diff --git a/crates/storage/db/src/tables/mod.rs b/crates/storage/db/src/tables/mod.rs index aafdf606b..a1fea62f0 100644 --- a/crates/storage/db/src/tables/mod.rs +++ b/crates/storage/db/src/tables/mod.rs @@ -346,9 +346,9 @@ tables! { } /// Canonical only Stores transaction receipts. - table Receipts { + table Receipts { type Key = TxNumber; - type Value = Receipt; + type Value = R; } /// Stores all smart contract bytecodes. diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 37d984e67..08f5e4680 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -145,7 +145,7 @@ impl BlockchainProvider2 { pub fn get_state( &self, range: RangeInclusive, - ) -> ProviderResult> { + ) -> ProviderResult>>> { self.consistent_provider()?.get_state(range) } } diff --git a/crates/storage/provider/src/providers/consistent.rs b/crates/storage/provider/src/providers/consistent.rs index cf473a1fb..e70f4b4e5 100644 --- a/crates/storage/provider/src/providers/consistent.rs +++ b/crates/storage/provider/src/providers/consistent.rs @@ -1512,7 +1512,10 @@ impl StateReader for ConsistentProvider { /// inconsistent. Currently this can safely be called within the blockchain tree thread, /// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the /// first place. - fn get_state(&self, block: BlockNumber) -> ProviderResult> { + fn get_state( + &self, + block: BlockNumber, + ) -> ProviderResult>> { if let Some(state) = self.head_block.as_ref().and_then(|b| b.block_on_chain(block.into())) { let state = state.block_ref().execution_outcome().clone(); Ok(Some(state)) diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 569492017..05f2501c1 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -15,10 +15,9 @@ use crate::{ HeaderSyncGapProvider, HistoricalStateProvider, HistoricalStateProviderRef, HistoryWriter, LatestStateProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RevertsInit, StageCheckpointReader, - StateChangeWriter, StateCommitmentProvider, StateProviderBox, StateWriter, - StaticFileProviderFactory, StatsReader, StorageLocation, StorageReader, StorageTrieWriter, - TransactionVariant, TransactionsProvider, TransactionsProviderExt, TrieWriter, - WithdrawalsProvider, + StateCommitmentProvider, StateProviderBox, StateWriter, StaticFileProviderFactory, StatsReader, + StorageLocation, StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider, + TransactionsProviderExt, TrieWriter, WithdrawalsProvider, }; use alloy_consensus::Header; use alloy_eips::{ @@ -351,7 +350,7 @@ impl DatabaseProvider ProviderResult<()> { if remove_from.database() { // iterate over block body and remove receipts - self.remove::(from_tx..)?; + self.remove::>>(from_tx..)?; } if remove_from.static_files() && !self.prune_modes.has_receipts_pruning() { @@ -1536,7 +1535,7 @@ impl ReceiptProvider for DatabasePr StaticFileSegment::Receipts, id, |static_file| static_file.receipt(id), - || Ok(self.tx.get::(id)?), + || Ok(self.tx.get::>(id)?), ) } @@ -1573,7 +1572,10 @@ impl ReceiptProvider for DatabasePr StaticFileSegment::Receipts, to_range(range), |static_file, range, _| static_file.receipts_by_tx_range(range), - |range, _| self.cursor_read_collect::(range).map_err(Into::into), + |range, _| { + self.cursor_read_collect::>(range) + .map_err(Into::into) + }, |_| true, ) } @@ -1798,9 +1800,77 @@ impl StorageReader for DatabaseProvider } } -impl StateChangeWriter +impl StateWriter for DatabaseProvider { + type Receipt = ReceiptTy; + + fn write_state( + &self, + execution_outcome: ExecutionOutcome, + is_value_known: OriginalValuesKnown, + write_receipts_to: StorageLocation, + ) -> ProviderResult<()> { + let (plain_state, reverts) = + execution_outcome.bundle.to_plain_state_and_reverts(is_value_known); + + self.write_state_reverts(reverts, execution_outcome.first_block)?; + self.write_state_changes(plain_state)?; + + let mut bodies_cursor = self.tx.cursor_read::()?; + + let has_receipts_pruning = self.prune_modes.has_receipts_pruning() || + execution_outcome.receipts.iter().flatten().any(|receipt| receipt.is_none()); + + // Prepare receipts cursor if we are going to write receipts to the database + // + // We are writing to database if requested or if there's any kind of receipt pruning + // configured + let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning) + .then(|| self.tx.cursor_write::>()) + .transpose()?; + + // Prepare receipts static writer if we are going to write receipts to static files + // + // We are writing to static files if requested and if there's no receipt pruning configured + let mut receipts_static_writer = (write_receipts_to.static_files() && + !has_receipts_pruning) + .then(|| { + self.static_file_provider + .get_writer(execution_outcome.first_block, StaticFileSegment::Receipts) + }) + .transpose()?; + + for (idx, receipts) in execution_outcome.receipts.into_iter().enumerate() { + let block_number = execution_outcome.first_block + idx as u64; + + // Increment block number for receipts static file writer + if let Some(writer) = receipts_static_writer.as_mut() { + writer.increment_block(block_number)?; + } + + let first_tx_index = bodies_cursor + .seek_exact(block_number)? + .map(|(_, indices)| indices.first_tx_num()) + .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?; + + for (idx, receipt) in receipts.into_iter().enumerate() { + let receipt_idx = first_tx_index + idx as u64; + if let Some(receipt) = receipt { + if let Some(writer) = &mut receipts_static_writer { + writer.append_receipt(receipt_idx, &receipt)?; + } + + if let Some(cursor) = &mut receipts_cursor { + cursor.append(receipt_idx, receipt)?; + } + } + } + } + + Ok(()) + } + fn write_state_reverts( &self, reverts: PlainStateReverts, @@ -2089,7 +2159,7 @@ impl StateChangeWriter &self, block: BlockNumber, remove_receipts_from: StorageLocation, - ) -> ProviderResult { + ) -> ProviderResult> { let range = block + 1..=self.last_block_number()?; if range.is_empty() { @@ -2172,7 +2242,7 @@ impl StateChangeWriter }, |range, _| { self.tx - .cursor_read::()? + .cursor_read::>()? .walk_range(range)? .map(|r| r.map_err(Into::into)) .collect() @@ -2709,6 +2779,7 @@ impl BlockWrite for DatabaseProvider { type Block = BlockTy; + type Receipt = ReceiptTy; /// Inserts the block into the database, always modifying the following tables: /// * [`CanonicalHeaders`](tables::CanonicalHeaders) @@ -2976,7 +3047,7 @@ impl BlockWrite fn append_blocks_with_state( &self, blocks: Vec>, - execution_outcome: ExecutionOutcome, + execution_outcome: ExecutionOutcome, hashed_state: HashedPostStateSorted, trie_updates: TrieUpdates, ) -> ProviderResult<()> { @@ -2998,11 +3069,7 @@ impl BlockWrite durations_recorder.record_relative(metrics::Action::InsertBlock); } - self.write_to_storage( - execution_outcome, - OriginalValuesKnown::No, - StorageLocation::Database, - )?; + self.write_state(execution_outcome, OriginalValuesKnown::No, StorageLocation::Database)?; durations_recorder.record_relative(metrics::Action::InsertState); // insert hashes and intermediate merkle nodes @@ -3050,7 +3117,7 @@ impl PruneCheckpointWriter for DatabaseProvider StatsReader for DatabaseProvider { +impl StatsReader for DatabaseProvider { fn count_entries(&self) -> ProviderResult { let db_entries = self.tx.entries::()?; let static_file_entries = match self.static_file_provider.count_entries::() { @@ -3122,73 +3189,3 @@ impl DBProvider for DatabaseProvider self.prune_modes_ref() } } - -impl StateWriter - for DatabaseProvider -{ - fn write_to_storage( - &self, - execution_outcome: ExecutionOutcome, - is_value_known: OriginalValuesKnown, - write_receipts_to: StorageLocation, - ) -> ProviderResult<()> { - let (plain_state, reverts) = - execution_outcome.bundle.to_plain_state_and_reverts(is_value_known); - - self.write_state_reverts(reverts, execution_outcome.first_block)?; - self.write_state_changes(plain_state)?; - - let mut bodies_cursor = self.tx.cursor_read::()?; - - let has_receipts_pruning = self.prune_modes.has_receipts_pruning() || - execution_outcome.receipts.iter().flatten().any(|receipt| receipt.is_none()); - - // Prepare receipts cursor if we are going to write receipts to the database - // - // We are writing to database if requested or if there's any kind of receipt pruning - // configured - let mut receipts_cursor = (write_receipts_to.database() || has_receipts_pruning) - .then(|| self.tx.cursor_write::()) - .transpose()?; - - // Prepare receipts static writer if we are going to write receipts to static files - // - // We are writing to static files if requested and if there's no receipt pruning configured - let mut receipts_static_writer = (write_receipts_to.static_files() && - !has_receipts_pruning) - .then(|| { - self.static_file_provider - .get_writer(execution_outcome.first_block, StaticFileSegment::Receipts) - }) - .transpose()?; - - for (idx, receipts) in execution_outcome.receipts.into_iter().enumerate() { - let block_number = execution_outcome.first_block + idx as u64; - - // Increment block number for receipts static file writer - if let Some(writer) = receipts_static_writer.as_mut() { - writer.increment_block(block_number)?; - } - - let first_tx_index = bodies_cursor - .seek_exact(block_number)? - .map(|(_, indices)| indices.first_tx_num()) - .ok_or(ProviderError::BlockBodyIndicesNotFound(block_number))?; - - for (idx, receipt) in receipts.into_iter().enumerate() { - let receipt_idx = first_tx_index + idx as u64; - if let Some(receipt) = receipt { - if let Some(writer) = &mut receipts_static_writer { - writer.append_receipt(receipt_idx, &receipt)?; - } - - if let Some(cursor) = &mut receipts_cursor { - cursor.append(receipt_idx, receipt)?; - } - } - } - } - - Ok(()) - } -} diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 92c94952a..6631b5b1b 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -79,8 +79,8 @@ where Storage: ChainStorage, Primitives: FullNodePrimitives< SignedTx: Value, + Receipt: Value, BlockHeader = alloy_consensus::Header, - Receipt = reth_primitives::Receipt, >, >, { @@ -92,8 +92,8 @@ impl NodeTypesForProvider for T where Storage: ChainStorage, Primitives: FullNodePrimitives< SignedTx: Value, + Receipt: Value, BlockHeader = alloy_consensus::Header, - Receipt = reth_primitives::Receipt, >, > { @@ -541,7 +541,7 @@ impl ReceiptProvider for BlockchainProvider { } } -impl ReceiptProviderIdExt for BlockchainProvider { +impl ReceiptProviderIdExt for BlockchainProvider { fn receipts_by_block_id(&self, block: BlockId) -> ProviderResult>> { match block { BlockId::Hash(rpc_block_hash) => { diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 2f32edcc2..e19166961 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -38,8 +38,8 @@ use reth_primitives::{ DEFAULT_BLOCKS_PER_STATIC_FILE, }, transaction::recover_signers, - BlockWithSenders, SealedBlockFor, SealedBlockWithSenders, SealedHeader, StaticFileSegment, - TransactionMeta, TransactionSignedNoHash, + BlockWithSenders, Receipt, SealedBlockFor, SealedBlockWithSenders, SealedHeader, + StaticFileSegment, TransactionMeta, TransactionSignedNoHash, }; use reth_primitives_traits::SignedTransaction; use reth_stages_types::{PipelineTarget, StageId}; @@ -1692,7 +1692,7 @@ impl StatsReader for StaticFileProvider { .map(|block| block + 1) .unwrap_or_default() as usize), - tables::Receipts::NAME => Ok(self + tables::Receipts::::NAME => Ok(self .get_highest_static_file_tx(StaticFileSegment::Receipts) .map(|receipts| receipts + 1) .unwrap_or_default() as usize), diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index 83954bde3..6f5335ec6 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -585,7 +585,10 @@ impl StaticFileProviderRW { /// empty blocks and this function wouldn't be called. /// /// Returns the current [`TxNumber`] as seen in the static file. - pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &Receipt) -> ProviderResult<()> { + pub fn append_receipt(&mut self, tx_num: TxNumber, receipt: &N::Receipt) -> ProviderResult<()> + where + N::Receipt: Compact, + { let start = Instant::now(); self.ensure_no_queued_prune()?; diff --git a/crates/storage/provider/src/traits/block.rs b/crates/storage/provider/src/traits/block.rs index e7669b0ea..d12f240e6 100644 --- a/crates/storage/provider/src/traits/block.rs +++ b/crates/storage/provider/src/traits/block.rs @@ -95,6 +95,8 @@ pub trait StateReader: Send + Sync { pub trait BlockWriter: Send + Sync { /// The body this writer can write. type Block: reth_primitives_traits::Block; + /// The receipt type for [`ExecutionOutcome`]. + type Receipt: Send + Sync; /// Insert full block and make it canonical. Parent tx num and transition id is taken from /// parent block in database. @@ -154,7 +156,7 @@ pub trait BlockWriter: Send + Sync { fn append_blocks_with_state( &self, blocks: Vec>, - execution_outcome: ExecutionOutcome, + execution_outcome: ExecutionOutcome, hashed_state: HashedPostStateSorted, trie_updates: TrieUpdates, ) -> ProviderResult<()>; diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index a772204d0..d82e97d1d 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -13,7 +13,7 @@ mod header_sync_gap; pub use header_sync_gap::{HeaderSyncGap, HeaderSyncGapProvider}; mod state; -pub use state::{StateChangeWriter, StateWriter}; +pub use state::StateWriter; pub use reth_chainspec::ChainSpecProvider; diff --git a/crates/storage/provider/src/traits/state.rs b/crates/storage/provider/src/traits/state.rs index 2e46e2850..2c4ee2cfa 100644 --- a/crates/storage/provider/src/traits/state.rs +++ b/crates/storage/provider/src/traits/state.rs @@ -9,20 +9,20 @@ use revm::db::{ use super::StorageLocation; -/// A helper trait for [`ExecutionOutcome`] to write state and receipts to storage. +/// A trait specifically for writing state changes or reverts pub trait StateWriter { - /// Write the data and receipts to the database or static files if `static_file_producer` is + /// Receipt type included into [`ExecutionOutcome`]. + type Receipt; + + /// Write the state and receipts to the database or static files if `static_file_producer` is /// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts. - fn write_to_storage( + fn write_state( &self, - execution_outcome: ExecutionOutcome, + execution_outcome: ExecutionOutcome, is_value_known: OriginalValuesKnown, write_receipts_to: StorageLocation, ) -> ProviderResult<()>; -} -/// A trait specifically for writing state changes or reverts -pub trait StateChangeWriter { /// Write state reverts to the database. /// /// NOTE: Reverts will delete all wiped storage from plain state. @@ -52,5 +52,5 @@ pub trait StateChangeWriter { &self, block: BlockNumber, remove_receipts_from: StorageLocation, - ) -> ProviderResult; + ) -> ProviderResult>; } diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index c0eeb64b8..02e912050 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -1,12 +1,14 @@ use crate::{ providers::{StaticFileProvider, StaticFileWriter as SfWriter}, - BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter, - StaticFileProviderFactory, StorageLocation, TrieWriter, + BlockExecutionWriter, BlockWriter, HistoryWriter, StateWriter, StaticFileProviderFactory, + StorageLocation, TrieWriter, }; +use alloy_consensus::BlockHeader; use reth_chain_state::ExecutedBlock; use reth_db::transaction::{DbTx, DbTxMut}; use reth_errors::ProviderResult; -use reth_primitives::StaticFileSegment; +use reth_primitives::{NodePrimitives, StaticFileSegment}; +use reth_primitives_traits::SignedTransaction; use reth_storage_api::{DBProvider, StageCheckpointWriter, TransactionsProviderExt}; use reth_storage_errors::writer::UnifiedStorageWriterError; use revm::db::OriginalValuesKnown; @@ -119,9 +121,8 @@ impl UnifiedStorageWriter<'_, (), ()> { impl UnifiedStorageWriter<'_, ProviderDB, &StaticFileProvider> where ProviderDB: DBProvider - + BlockWriter + + BlockWriter + TransactionsProviderExt - + StateChangeWriter + TrieWriter + StateWriter + HistoryWriter @@ -131,7 +132,11 @@ where + StaticFileProviderFactory, { /// Writes executed blocks and receipts to storage. - pub fn save_blocks(&self, blocks: Vec) -> ProviderResult<()> { + pub fn save_blocks(&self, blocks: Vec>) -> ProviderResult<()> + where + N: NodePrimitives, + ProviderDB: BlockWriter + StateWriter, + { if blocks.is_empty() { debug!(target: "provider::storage_writer", "Attempted to write empty block range"); return Ok(()) @@ -139,9 +144,10 @@ where // NOTE: checked non-empty above let first_block = blocks.first().unwrap().block(); + let last_block = blocks.last().unwrap().block(); - let first_number = first_block.number; - let last_block_number = last_block.number; + let first_number = first_block.number(); + let last_block_number = last_block.number(); debug!(target: "provider::storage_writer", block_count = %blocks.len(), "Writing blocks and execution data to storage"); @@ -162,7 +168,7 @@ where // Write state and changesets to the database. // Must be written after blocks because of the receipt lookup. - self.database().write_to_storage( + self.database().write_state( Arc::unwrap_or_clone(execution_output), OriginalValuesKnown::No, StorageLocation::StaticFiles, @@ -490,7 +496,7 @@ mod tests { let outcome = ExecutionOutcome::new(state.take_bundle(), Receipts::default(), 1, Vec::new()); provider - .write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) + .write_state(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) .expect("Could not write bundle state to DB"); // Check plain storage state @@ -590,7 +596,7 @@ mod tests { let outcome = ExecutionOutcome::new(state.take_bundle(), Receipts::default(), 2, Vec::new()); provider - .write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) + .write_state(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) .expect("Could not write bundle state to DB"); assert_eq!( @@ -657,7 +663,7 @@ mod tests { let outcome = ExecutionOutcome::new(init_state.take_bundle(), Receipts::default(), 0, Vec::new()); provider - .write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) + .write_state(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) .expect("Could not write bundle state to DB"); let mut state = State::builder().with_bundle_update().build(); @@ -805,7 +811,7 @@ mod tests { let outcome: ExecutionOutcome = ExecutionOutcome::new(bundle, Receipts::default(), 1, Vec::new()); provider - .write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) + .write_state(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) .expect("Could not write bundle state to DB"); let mut storage_changeset_cursor = provider @@ -970,7 +976,7 @@ mod tests { let outcome = ExecutionOutcome::new(init_state.take_bundle(), Receipts::default(), 0, Vec::new()); provider - .write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) + .write_state(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) .expect("Could not write bundle state to DB"); let mut state = State::builder().with_bundle_update().build(); @@ -1017,7 +1023,7 @@ mod tests { let outcome = ExecutionOutcome::new(state.take_bundle(), Receipts::default(), 1, Vec::new()); provider - .write_to_storage(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) + .write_state(outcome, OriginalValuesKnown::Yes, StorageLocation::Database) .expect("Could not write bundle state to DB"); let mut storage_changeset_cursor = provider diff --git a/crates/trie/parallel/benches/root.rs b/crates/trie/parallel/benches/root.rs index eb5b6575b..a9300efa9 100644 --- a/crates/trie/parallel/benches/root.rs +++ b/crates/trie/parallel/benches/root.rs @@ -5,8 +5,7 @@ use proptest::{prelude::*, strategy::ValueTree, test_runner::TestRunner}; use proptest_arbitrary_interop::arb; use reth_primitives::Account; use reth_provider::{ - providers::ConsistentDbView, test_utils::create_test_provider_factory, StateChangeWriter, - TrieWriter, + providers::ConsistentDbView, test_utils::create_test_provider_factory, StateWriter, TrieWriter, }; use reth_trie::{ hashed_cursor::HashedPostStateCursorFactory, HashedPostState, HashedStorage, StateRoot,