From 23ebf96188c9b70e4689972bf22104c6c329db32 Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Wed, 22 Jan 2025 17:41:13 +0400 Subject: [PATCH] chore: move receipts pruning to provider (#13886) --- Cargo.lock | 1 + .../execution-types/src/execution_outcome.rs | 5 + crates/prune/types/src/segment.rs | 5 +- crates/revm/src/batch.rs | 241 +----------------- crates/stages/stages/src/stages/execution.rs | 12 +- crates/storage/errors/Cargo.toml | 3 +- crates/storage/errors/src/provider.rs | 4 + .../src/providers/database/provider.rs | 54 +++- 8 files changed, 75 insertions(+), 250 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5f73af1dc..7e55a7170 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9276,6 +9276,7 @@ dependencies = [ "derive_more", "reth-fs-util", "reth-primitives-traits", + "reth-prune-types", "reth-static-file-types", "thiserror 2.0.11", ] diff --git a/crates/evm/execution-types/src/execution_outcome.rs b/crates/evm/execution-types/src/execution_outcome.rs index 9d735271c..0f225a9c5 100644 --- a/crates/evm/execution-types/src/execution_outcome.rs +++ b/crates/evm/execution-types/src/execution_outcome.rs @@ -232,6 +232,11 @@ impl ExecutionOutcome { self.first_block } + /// Return last block of the execution outcome + pub fn last_block(&self) -> BlockNumber { + (self.first_block + self.len() as u64).saturating_sub(1) + } + /// Revert the state to the given block number. /// /// Returns false if the block number is not in the bundle state. diff --git a/crates/prune/types/src/segment.rs b/crates/prune/types/src/segment.rs index 6d44f88a8..e0b73aab7 100644 --- a/crates/prune/types/src/segment.rs +++ b/crates/prune/types/src/segment.rs @@ -42,7 +42,7 @@ pub enum PruneSegment { } impl PruneSegment { - /// Returns minimum number of blocks to left in the database for this segment. + /// Returns minimum number of blocks to keep in the database for this segment. pub const fn min_blocks(&self, purpose: PrunePurpose) -> u64 { match self { Self::SenderRecovery | Self::TransactionLookup | Self::Headers | Self::Transactions => { @@ -84,9 +84,6 @@ pub enum PruneSegmentError { /// Invalid configuration of a prune segment. #[error("the configuration provided for {0} is invalid")] Configuration(PruneSegment), - /// Receipts have been pruned - #[error("receipts have been pruned")] - ReceiptsPruned, } #[cfg(test)] diff --git a/crates/revm/src/batch.rs b/crates/revm/src/batch.rs index c980bdc98..296e42685 100644 --- a/crates/revm/src/batch.rs +++ b/crates/revm/src/batch.rs @@ -3,11 +3,11 @@ use alloc::vec::Vec; use alloy_eips::eip7685::Requests; -use alloy_primitives::{map::HashSet, Address, BlockNumber, Log}; -use reth_execution_errors::{BlockExecutionError, InternalBlockExecutionError}; +use alloy_primitives::{BlockNumber, Log}; +use reth_execution_errors::BlockExecutionError; use reth_primitives::Receipts; use reth_primitives_traits::Receipt; -use reth_prune_types::{PruneMode, PruneModes, PruneSegmentError, MINIMUM_PRUNING_DISTANCE}; +use reth_prune_types::PruneModes; use revm::db::states::bundle_state::BundleRetention; /// Takes care of: @@ -31,11 +31,6 @@ pub struct BlockBatchRecord { /// A transaction may have zero or more requests, so the length of the inner vector is not /// guaranteed to be the same as the number of transactions. requests: Vec, - /// Memoized address pruning filter. - /// - /// Empty implies that there is going to be addresses to include in the filter in a future - /// block. None means there isn't any kind of configuration. - pruning_address_filter: Option<(u64, HashSet
)>, /// First block will be initialized to `None` /// and be set to the block number of first block executed. first_block: Option, @@ -49,7 +44,6 @@ impl Default for BlockBatchRecord { prune_modes: Default::default(), receipts: Default::default(), requests: Default::default(), - pruning_address_filter: Default::default(), first_block: Default::default(), tip: Default::default(), } @@ -58,10 +52,7 @@ impl Default for BlockBatchRecord { impl BlockBatchRecord { /// Create a new receipts recorder with the given pruning configuration. - pub fn new(prune_modes: PruneModes) -> Self - where - T: Default, - { + pub fn new(prune_modes: PruneModes) -> Self { Self { prune_modes, ..Default::default() } } @@ -133,64 +124,12 @@ impl BlockBatchRecord { where T: Receipt, { - let mut receipts = receipts.into_iter().map(Some).collect(); - // Prune receipts if necessary. - self.prune_receipts(&mut receipts).map_err(InternalBlockExecutionError::from)?; + let receipts = receipts.into_iter().map(Some).collect(); // Save receipts. self.receipts.push(receipts); Ok(()) } - /// Prune receipts according to the pruning configuration. - fn prune_receipts(&mut self, receipts: &mut Vec>) -> Result<(), PruneSegmentError> - where - T: Receipt, - { - let (Some(first_block), Some(tip)) = (self.first_block, self.tip) else { return Ok(()) }; - - let block_number = first_block + self.receipts.len() as u64; - - // Block receipts should not be retained - if self.prune_modes.receipts == Some(PruneMode::Full) || - // [`PruneSegment::Receipts`] takes priority over [`PruneSegment::ContractLogs`] - self.prune_modes.receipts.is_some_and(|mode| mode.should_prune(block_number, tip)) - { - receipts.clear(); - return Ok(()) - } - - // All receipts from the last 128 blocks are required for blockchain tree, even with - // [`PruneSegment::ContractLogs`]. - let prunable_receipts = - PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(block_number, tip); - if !prunable_receipts { - return Ok(()) - } - - let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?; - - if !contract_log_pruner.is_empty() { - let (prev_block, filter) = - self.pruning_address_filter.get_or_insert_with(|| (0, Default::default())); - for (_, addresses) in contract_log_pruner.range(*prev_block..=block_number) { - filter.extend(addresses.iter().copied()); - } - } - - if let Some((_, filter)) = &self.pruning_address_filter { - for receipt in receipts.iter_mut() { - // If there is an address_filter, it does not contain any of the - // contract addresses, then remove this receipt. - let inner_receipt = receipt.as_ref().expect("receipts have not been pruned"); - if !inner_receipt.logs().iter().any(|log| filter.contains(&log.address)) { - receipt.take(); - } - } - } - - Ok(()) - } - /// Save EIP-7685 requests to the executor. pub fn save_requests(&mut self, requests: Requests) { self.requests.push(requests); @@ -200,10 +139,6 @@ impl BlockBatchRecord { #[cfg(test)] mod tests { use super::*; - use alloc::collections::BTreeMap; - use alloy_primitives::Address; - use reth_primitives::{Log, Receipt}; - use reth_prune_types::{PruneMode, ReceiptsLogPruneConfig}; #[test] fn test_save_receipts_empty() { @@ -216,170 +151,4 @@ mod tests { // Verify that the saved receipts are equal to a nested empty vector assert_eq!(*recorder.receipts(), vec![vec![]].into()); } - - #[test] - fn test_save_receipts_non_empty_no_pruning() { - let mut recorder = BlockBatchRecord::default(); - let receipts = vec![Receipt::default()]; - - // Verify that saving receipts completes without error - assert!(recorder.save_receipts(receipts).is_ok()); - // Verify that there is one block of receipts - assert_eq!(recorder.receipts().len(), 1); - // Verify that the first block contains one receipt - assert_eq!(recorder.receipts()[0].len(), 1); - // Verify that the saved receipt is the default receipt - assert_eq!(recorder.receipts()[0][0], Some(Receipt::default())); - } - - #[test] - fn test_save_receipts_with_pruning_no_prunable_receipts() { - let mut recorder = BlockBatchRecord::default(); - - // Set the first block number - recorder.set_first_block(1); - // Set the tip (highest known block) - recorder.set_tip(130); - - // Create a vector of receipts with a default receipt - let receipts = vec![Receipt::default()]; - - // Verify that saving receipts completes without error - assert!(recorder.save_receipts(receipts).is_ok()); - // Verify that there is one block of receipts - assert_eq!(recorder.receipts().len(), 1); - // Verify that the first block contains one receipt - assert_eq!(recorder.receipts()[0].len(), 1); - // Verify that the saved receipt is the default receipt - assert_eq!(recorder.receipts()[0][0], Some(Receipt::default())); - } - - #[test] - fn test_save_receipts_with_pruning_no_tip() { - // Create a PruneModes with receipts set to PruneMode::Full - let prune_modes = PruneModes { receipts: Some(PruneMode::Full), ..Default::default() }; - - let mut recorder = BlockBatchRecord::new(prune_modes); - - // Set the first block number - recorder.set_first_block(1); - // Create a vector of receipts with a default receipt - let receipts = vec![Receipt::default()]; - - // Verify that saving receipts completes without error - assert!(recorder.save_receipts(receipts).is_ok()); - // Verify that there is one block of receipts - assert_eq!(recorder.receipts().len(), 1); - // Verify that the first block contains one receipt - assert_eq!(recorder.receipts()[0].len(), 1); - // Verify that the saved receipt is the default receipt - assert_eq!(recorder.receipts()[0][0], Some(Receipt::default())); - } - - #[test] - fn test_save_receipts_with_pruning_no_block_number() { - // Create a PruneModes with receipts set to PruneMode::Full - let prune_modes = PruneModes { receipts: Some(PruneMode::Full), ..Default::default() }; - - // Create a BlockBatchRecord with the prune_modes - let mut recorder = BlockBatchRecord::new(prune_modes); - - // Set the tip (highest known block) - recorder.set_tip(130); - - // Create a vector of receipts with a default receipt - let receipts = vec![Receipt::default()]; - - // Verify that saving receipts completes without error - assert!(recorder.save_receipts(receipts).is_ok()); - // Verify that there is one block of receipts - assert_eq!(recorder.receipts().len(), 1); - // Verify that the first block contains one receipt - assert_eq!(recorder.receipts()[0].len(), 1); - // Verify that the saved receipt is the default receipt - assert_eq!(recorder.receipts()[0][0], Some(Receipt::default())); - } - - // Test saving receipts with pruning configuration and receipts should be pruned - #[test] - fn test_save_receipts_with_pruning_should_prune() { - // Create a PruneModes with receipts set to PruneMode::Full - let prune_modes = PruneModes { receipts: Some(PruneMode::Full), ..Default::default() }; - - // Create a BlockBatchRecord with the prune_modes - let mut recorder = BlockBatchRecord::new(prune_modes); - - // Set the first block number - recorder.set_first_block(1); - // Set the tip (highest known block) - recorder.set_tip(130); - - // Create a vector of receipts with a default receipt - let receipts = vec![Receipt::default()]; - - // Verify that saving receipts completes without error - assert!(recorder.save_receipts(receipts).is_ok()); - // Verify that there is one block of receipts - assert_eq!(recorder.receipts().len(), 1); - // Verify that the receipts are pruned (empty) - assert!(recorder.receipts()[0].is_empty()); - } - - // Test saving receipts with address filter pruning - #[test] - fn test_save_receipts_with_address_filter_pruning() { - // Create a PruneModes with receipts_log_filter configuration - let prune_modes = PruneModes { - receipts_log_filter: ReceiptsLogPruneConfig(BTreeMap::from([ - (Address::with_last_byte(1), PruneMode::Before(1300001)), - (Address::with_last_byte(2), PruneMode::Before(1300002)), - (Address::with_last_byte(3), PruneMode::Distance(1300003)), - ])), - ..Default::default() - }; - - // Create a BlockBatchRecord with the prune_modes - let mut recorder = BlockBatchRecord::new(prune_modes); - - // Set the first block number - recorder.set_first_block(1); - // Set the tip (highest known block) - recorder.set_tip(1300000); - - // With a receipt that should be pruned (address 4 not in the log filter) - let mut receipt = Receipt::default(); - receipt.logs.push(Log { address: Address::with_last_byte(4), ..Default::default() }); - let receipts = vec![receipt.clone()]; - assert!(recorder.save_receipts(receipts).is_ok()); - // Verify that the receipts are pruned (empty) - assert_eq!(recorder.receipts().len(), 1); - assert_eq!(recorder.receipts()[0], vec![None]); - - // With a receipt that should not be pruned (address 1 in the log filter) - let mut receipt1 = Receipt::default(); - receipt1.logs.push(Log { address: Address::with_last_byte(1), ..Default::default() }); - let receipts = vec![receipt1.clone()]; - assert!(recorder.save_receipts(receipts).is_ok()); - // Verify that the second block of receipts contains the receipt - assert_eq!(recorder.receipts().len(), 2); - assert_eq!(recorder.receipts()[1][0], Some(receipt1)); - - // With a receipt that should not be pruned (address 2 in the log filter) - let mut receipt2 = Receipt::default(); - receipt2.logs.push(Log { address: Address::with_last_byte(2), ..Default::default() }); - let receipts = vec![receipt2.clone()]; - assert!(recorder.save_receipts(receipts).is_ok()); - // Verify that the third block of receipts contains the receipt - assert_eq!(recorder.receipts().len(), 3); - assert_eq!(recorder.receipts()[2][0], Some(receipt2)); - - // With a receipt that should not be pruned (address 3 in the log filter) - let mut receipt3 = Receipt::default(); - receipt3.logs.push(Log { address: Address::with_last_byte(3), ..Default::default() }); - let receipts = vec![receipt3.clone()]; - assert!(recorder.save_receipts(receipts).is_ok()); - // Verify that the fourth block of receipts contains the receipt - assert_eq!(recorder.receipts().len(), 4); - assert_eq!(recorder.receipts()[3][0], Some(receipt3)); - } } diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index afa493b4d..b157ce315 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -874,8 +874,10 @@ mod tests { // If there is a pruning configuration, then it's forced to use the database. // This way we test both cases. let modes = [None, Some(PruneModes::none())]; - let random_filter = - ReceiptsLogPruneConfig(BTreeMap::from([(Address::random(), PruneMode::Full)])); + let random_filter = ReceiptsLogPruneConfig(BTreeMap::from([( + Address::random(), + PruneMode::Distance(100000), + )])); // Tests node with database and node with static files for mut mode in modes { @@ -1011,8 +1013,10 @@ mod tests { // If there is a pruning configuration, then it's forced to use the database. // This way we test both cases. let modes = [None, Some(PruneModes::none())]; - let random_filter = - ReceiptsLogPruneConfig(BTreeMap::from([(Address::random(), PruneMode::Full)])); + let random_filter = ReceiptsLogPruneConfig(BTreeMap::from([( + Address::random(), + PruneMode::Before(100000), + )])); // Tests node with database and node with static files for mut mode in modes { diff --git a/crates/storage/errors/Cargo.toml b/crates/storage/errors/Cargo.toml index 10e4a1446..b37c76ac5 100644 --- a/crates/storage/errors/Cargo.toml +++ b/crates/storage/errors/Cargo.toml @@ -12,8 +12,9 @@ workspace = true [dependencies] # reth -reth-primitives-traits.workspace = true reth-fs-util.workspace = true +reth-primitives-traits.workspace = true +reth-prune-types.workspace = true reth-static-file-types.workspace = true # ethereum diff --git a/crates/storage/errors/src/provider.rs b/crates/storage/errors/src/provider.rs index b06e758e4..671c3d673 100644 --- a/crates/storage/errors/src/provider.rs +++ b/crates/storage/errors/src/provider.rs @@ -4,6 +4,7 @@ use alloy_eips::{BlockHashOrNumber, HashOrNumber}; use alloy_primitives::{Address, BlockHash, BlockNumber, TxNumber, B256}; use derive_more::Display; use reth_primitives_traits::GotExpected; +use reth_prune_types::PruneSegmentError; use reth_static_file_types::StaticFileSegment; /// Provider result type. @@ -15,6 +16,9 @@ pub enum ProviderError { /// Database error. #[error(transparent)] Database(#[from] DatabaseError), + /// Pruning error. + #[error(transparent)] + Pruning(#[from] PruneSegmentError), /// RLP error. #[error("{_0}")] Rlp(alloy_rlp::Error), diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index dc8b46895..6fefa4d40 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -19,7 +19,7 @@ use crate::{ StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider, TransactionsProviderExt, TrieWriter, WithdrawalsProvider, }; -use alloy_consensus::{transaction::TransactionMeta, BlockHeader, Header}; +use alloy_consensus::{transaction::TransactionMeta, BlockHeader, Header, TxReceipt}; use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber}; use alloy_primitives::{ keccak256, @@ -51,7 +51,9 @@ use reth_primitives::{ StaticFileSegment, StorageEntry, }; use reth_primitives_traits::{Block as _, BlockBody as _, SignedTransaction}; -use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment}; +use reth_prune_types::{ + PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE, +}; use reth_stages_types::{StageCheckpoint, StageId}; use reth_storage_api::{ BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider, @@ -1783,9 +1785,11 @@ impl StateWriter write_receipts_to: StorageLocation, ) -> ProviderResult<()> { let first_block = execution_outcome.first_block(); - let block_count = execution_outcome.receipts.len() as u64; - let block_range = first_block..=first_block.saturating_add(block_count).saturating_sub(1); - let last_block = *block_range.end(); + let block_count = execution_outcome.len() as u64; + let last_block = execution_outcome.last_block(); + let block_range = first_block..=last_block; + + let tip = self.last_block_number()?.max(last_block); let (plain_state, reverts) = execution_outcome.bundle.to_plain_state_and_reverts(is_value_known); @@ -1827,6 +1831,20 @@ impl StateWriter .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts)) .transpose()?; + let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty(); + let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?; + + // All receipts from the last 128 blocks are required for blockchain tree, even with + // [`PruneSegment::ContractLogs`]. + let prunable_receipts = + PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip); + + // Prepare set of addresses which logs should not be pruned. + let mut allowed_addresses: HashSet = HashSet::new(); + for (_, addresses) in contract_log_pruner.range(..first_block) { + allowed_addresses.extend(addresses.iter().copied()); + } + for (idx, (receipts, first_tx_index)) in execution_outcome.receipts.iter().zip(block_indices).enumerate() { @@ -1837,9 +1855,35 @@ impl StateWriter writer.increment_block(block_number)?; } + // Skip writing receipts if pruning configuration requires us to. + if prunable_receipts && + self.prune_modes + .receipts + .is_some_and(|mode| mode.should_prune(block_number, tip)) + { + continue + } + + // If there are new addresses to retain after this block number, track them + if let Some(new_addresses) = contract_log_pruner.get(&block_number) { + allowed_addresses.extend(new_addresses.iter().copied()); + } + for (idx, receipt) in receipts.iter().enumerate() { let receipt_idx = first_tx_index + idx as u64; if let Some(receipt) = receipt { + // Skip writing receipt if log filter is active and it does not have any logs to + // retain + if prunable_receipts && + has_contract_log_filter && + !receipt + .logs() + .iter() + .any(|log| allowed_addresses.contains(&log.address)) + { + continue + } + if let Some(writer) = &mut receipts_static_writer { writer.append_receipt(receipt_idx, receipt)?; }