chore: move receipts pruning to provider (#13886)

This commit is contained in:
Arsenii Kulikov
2025-01-22 17:41:13 +04:00
committed by GitHub
parent cd44fc3d09
commit 23ebf96188
8 changed files with 75 additions and 250 deletions

1
Cargo.lock generated
View File

@ -9276,6 +9276,7 @@ dependencies = [
"derive_more", "derive_more",
"reth-fs-util", "reth-fs-util",
"reth-primitives-traits", "reth-primitives-traits",
"reth-prune-types",
"reth-static-file-types", "reth-static-file-types",
"thiserror 2.0.11", "thiserror 2.0.11",
] ]

View File

@ -232,6 +232,11 @@ impl<T> ExecutionOutcome<T> {
self.first_block self.first_block
} }
/// Return last block of the execution outcome
pub fn last_block(&self) -> BlockNumber {
(self.first_block + self.len() as u64).saturating_sub(1)
}
/// Revert the state to the given block number. /// Revert the state to the given block number.
/// ///
/// Returns false if the block number is not in the bundle state. /// Returns false if the block number is not in the bundle state.

View File

@ -42,7 +42,7 @@ pub enum PruneSegment {
} }
impl PruneSegment { impl PruneSegment {
/// Returns minimum number of blocks to left in the database for this segment. /// Returns minimum number of blocks to keep in the database for this segment.
pub const fn min_blocks(&self, purpose: PrunePurpose) -> u64 { pub const fn min_blocks(&self, purpose: PrunePurpose) -> u64 {
match self { match self {
Self::SenderRecovery | Self::TransactionLookup | Self::Headers | Self::Transactions => { Self::SenderRecovery | Self::TransactionLookup | Self::Headers | Self::Transactions => {
@ -84,9 +84,6 @@ pub enum PruneSegmentError {
/// Invalid configuration of a prune segment. /// Invalid configuration of a prune segment.
#[error("the configuration provided for {0} is invalid")] #[error("the configuration provided for {0} is invalid")]
Configuration(PruneSegment), Configuration(PruneSegment),
/// Receipts have been pruned
#[error("receipts have been pruned")]
ReceiptsPruned,
} }
#[cfg(test)] #[cfg(test)]

View File

@ -3,11 +3,11 @@
use alloc::vec::Vec; use alloc::vec::Vec;
use alloy_eips::eip7685::Requests; use alloy_eips::eip7685::Requests;
use alloy_primitives::{map::HashSet, Address, BlockNumber, Log}; use alloy_primitives::{BlockNumber, Log};
use reth_execution_errors::{BlockExecutionError, InternalBlockExecutionError}; use reth_execution_errors::BlockExecutionError;
use reth_primitives::Receipts; use reth_primitives::Receipts;
use reth_primitives_traits::Receipt; use reth_primitives_traits::Receipt;
use reth_prune_types::{PruneMode, PruneModes, PruneSegmentError, MINIMUM_PRUNING_DISTANCE}; use reth_prune_types::PruneModes;
use revm::db::states::bundle_state::BundleRetention; use revm::db::states::bundle_state::BundleRetention;
/// Takes care of: /// Takes care of:
@ -31,11 +31,6 @@ pub struct BlockBatchRecord<T = reth_primitives::Receipt> {
/// A transaction may have zero or more requests, so the length of the inner vector is not /// A transaction may have zero or more requests, so the length of the inner vector is not
/// guaranteed to be the same as the number of transactions. /// guaranteed to be the same as the number of transactions.
requests: Vec<Requests>, requests: Vec<Requests>,
/// Memoized address pruning filter.
///
/// Empty implies that there is going to be addresses to include in the filter in a future
/// block. None means there isn't any kind of configuration.
pruning_address_filter: Option<(u64, HashSet<Address>)>,
/// First block will be initialized to `None` /// First block will be initialized to `None`
/// and be set to the block number of first block executed. /// and be set to the block number of first block executed.
first_block: Option<BlockNumber>, first_block: Option<BlockNumber>,
@ -49,7 +44,6 @@ impl<T> Default for BlockBatchRecord<T> {
prune_modes: Default::default(), prune_modes: Default::default(),
receipts: Default::default(), receipts: Default::default(),
requests: Default::default(), requests: Default::default(),
pruning_address_filter: Default::default(),
first_block: Default::default(), first_block: Default::default(),
tip: Default::default(), tip: Default::default(),
} }
@ -58,10 +52,7 @@ impl<T> Default for BlockBatchRecord<T> {
impl<T> BlockBatchRecord<T> { impl<T> BlockBatchRecord<T> {
/// Create a new receipts recorder with the given pruning configuration. /// Create a new receipts recorder with the given pruning configuration.
pub fn new(prune_modes: PruneModes) -> Self pub fn new(prune_modes: PruneModes) -> Self {
where
T: Default,
{
Self { prune_modes, ..Default::default() } Self { prune_modes, ..Default::default() }
} }
@ -133,64 +124,12 @@ impl<T> BlockBatchRecord<T> {
where where
T: Receipt<Log = Log>, T: Receipt<Log = Log>,
{ {
let mut receipts = receipts.into_iter().map(Some).collect(); let receipts = receipts.into_iter().map(Some).collect();
// Prune receipts if necessary.
self.prune_receipts(&mut receipts).map_err(InternalBlockExecutionError::from)?;
// Save receipts. // Save receipts.
self.receipts.push(receipts); self.receipts.push(receipts);
Ok(()) Ok(())
} }
/// Prune receipts according to the pruning configuration.
fn prune_receipts(&mut self, receipts: &mut Vec<Option<T>>) -> Result<(), PruneSegmentError>
where
T: Receipt<Log = Log>,
{
let (Some(first_block), Some(tip)) = (self.first_block, self.tip) else { return Ok(()) };
let block_number = first_block + self.receipts.len() as u64;
// Block receipts should not be retained
if self.prune_modes.receipts == Some(PruneMode::Full) ||
// [`PruneSegment::Receipts`] takes priority over [`PruneSegment::ContractLogs`]
self.prune_modes.receipts.is_some_and(|mode| mode.should_prune(block_number, tip))
{
receipts.clear();
return Ok(())
}
// All receipts from the last 128 blocks are required for blockchain tree, even with
// [`PruneSegment::ContractLogs`].
let prunable_receipts =
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(block_number, tip);
if !prunable_receipts {
return Ok(())
}
let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
if !contract_log_pruner.is_empty() {
let (prev_block, filter) =
self.pruning_address_filter.get_or_insert_with(|| (0, Default::default()));
for (_, addresses) in contract_log_pruner.range(*prev_block..=block_number) {
filter.extend(addresses.iter().copied());
}
}
if let Some((_, filter)) = &self.pruning_address_filter {
for receipt in receipts.iter_mut() {
// If there is an address_filter, it does not contain any of the
// contract addresses, then remove this receipt.
let inner_receipt = receipt.as_ref().expect("receipts have not been pruned");
if !inner_receipt.logs().iter().any(|log| filter.contains(&log.address)) {
receipt.take();
}
}
}
Ok(())
}
/// Save EIP-7685 requests to the executor. /// Save EIP-7685 requests to the executor.
pub fn save_requests(&mut self, requests: Requests) { pub fn save_requests(&mut self, requests: Requests) {
self.requests.push(requests); self.requests.push(requests);
@ -200,10 +139,6 @@ impl<T> BlockBatchRecord<T> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use alloc::collections::BTreeMap;
use alloy_primitives::Address;
use reth_primitives::{Log, Receipt};
use reth_prune_types::{PruneMode, ReceiptsLogPruneConfig};
#[test] #[test]
fn test_save_receipts_empty() { fn test_save_receipts_empty() {
@ -216,170 +151,4 @@ mod tests {
// Verify that the saved receipts are equal to a nested empty vector // Verify that the saved receipts are equal to a nested empty vector
assert_eq!(*recorder.receipts(), vec![vec![]].into()); assert_eq!(*recorder.receipts(), vec![vec![]].into());
} }
#[test]
fn test_save_receipts_non_empty_no_pruning() {
let mut recorder = BlockBatchRecord::default();
let receipts = vec![Receipt::default()];
// Verify that saving receipts completes without error
assert!(recorder.save_receipts(receipts).is_ok());
// Verify that there is one block of receipts
assert_eq!(recorder.receipts().len(), 1);
// Verify that the first block contains one receipt
assert_eq!(recorder.receipts()[0].len(), 1);
// Verify that the saved receipt is the default receipt
assert_eq!(recorder.receipts()[0][0], Some(Receipt::default()));
}
#[test]
fn test_save_receipts_with_pruning_no_prunable_receipts() {
let mut recorder = BlockBatchRecord::default();
// Set the first block number
recorder.set_first_block(1);
// Set the tip (highest known block)
recorder.set_tip(130);
// Create a vector of receipts with a default receipt
let receipts = vec![Receipt::default()];
// Verify that saving receipts completes without error
assert!(recorder.save_receipts(receipts).is_ok());
// Verify that there is one block of receipts
assert_eq!(recorder.receipts().len(), 1);
// Verify that the first block contains one receipt
assert_eq!(recorder.receipts()[0].len(), 1);
// Verify that the saved receipt is the default receipt
assert_eq!(recorder.receipts()[0][0], Some(Receipt::default()));
}
#[test]
fn test_save_receipts_with_pruning_no_tip() {
// Create a PruneModes with receipts set to PruneMode::Full
let prune_modes = PruneModes { receipts: Some(PruneMode::Full), ..Default::default() };
let mut recorder = BlockBatchRecord::new(prune_modes);
// Set the first block number
recorder.set_first_block(1);
// Create a vector of receipts with a default receipt
let receipts = vec![Receipt::default()];
// Verify that saving receipts completes without error
assert!(recorder.save_receipts(receipts).is_ok());
// Verify that there is one block of receipts
assert_eq!(recorder.receipts().len(), 1);
// Verify that the first block contains one receipt
assert_eq!(recorder.receipts()[0].len(), 1);
// Verify that the saved receipt is the default receipt
assert_eq!(recorder.receipts()[0][0], Some(Receipt::default()));
}
#[test]
fn test_save_receipts_with_pruning_no_block_number() {
// Create a PruneModes with receipts set to PruneMode::Full
let prune_modes = PruneModes { receipts: Some(PruneMode::Full), ..Default::default() };
// Create a BlockBatchRecord with the prune_modes
let mut recorder = BlockBatchRecord::new(prune_modes);
// Set the tip (highest known block)
recorder.set_tip(130);
// Create a vector of receipts with a default receipt
let receipts = vec![Receipt::default()];
// Verify that saving receipts completes without error
assert!(recorder.save_receipts(receipts).is_ok());
// Verify that there is one block of receipts
assert_eq!(recorder.receipts().len(), 1);
// Verify that the first block contains one receipt
assert_eq!(recorder.receipts()[0].len(), 1);
// Verify that the saved receipt is the default receipt
assert_eq!(recorder.receipts()[0][0], Some(Receipt::default()));
}
// Test saving receipts with pruning configuration and receipts should be pruned
#[test]
fn test_save_receipts_with_pruning_should_prune() {
// Create a PruneModes with receipts set to PruneMode::Full
let prune_modes = PruneModes { receipts: Some(PruneMode::Full), ..Default::default() };
// Create a BlockBatchRecord with the prune_modes
let mut recorder = BlockBatchRecord::new(prune_modes);
// Set the first block number
recorder.set_first_block(1);
// Set the tip (highest known block)
recorder.set_tip(130);
// Create a vector of receipts with a default receipt
let receipts = vec![Receipt::default()];
// Verify that saving receipts completes without error
assert!(recorder.save_receipts(receipts).is_ok());
// Verify that there is one block of receipts
assert_eq!(recorder.receipts().len(), 1);
// Verify that the receipts are pruned (empty)
assert!(recorder.receipts()[0].is_empty());
}
// Test saving receipts with address filter pruning
#[test]
fn test_save_receipts_with_address_filter_pruning() {
// Create a PruneModes with receipts_log_filter configuration
let prune_modes = PruneModes {
receipts_log_filter: ReceiptsLogPruneConfig(BTreeMap::from([
(Address::with_last_byte(1), PruneMode::Before(1300001)),
(Address::with_last_byte(2), PruneMode::Before(1300002)),
(Address::with_last_byte(3), PruneMode::Distance(1300003)),
])),
..Default::default()
};
// Create a BlockBatchRecord with the prune_modes
let mut recorder = BlockBatchRecord::new(prune_modes);
// Set the first block number
recorder.set_first_block(1);
// Set the tip (highest known block)
recorder.set_tip(1300000);
// With a receipt that should be pruned (address 4 not in the log filter)
let mut receipt = Receipt::default();
receipt.logs.push(Log { address: Address::with_last_byte(4), ..Default::default() });
let receipts = vec![receipt.clone()];
assert!(recorder.save_receipts(receipts).is_ok());
// Verify that the receipts are pruned (empty)
assert_eq!(recorder.receipts().len(), 1);
assert_eq!(recorder.receipts()[0], vec![None]);
// With a receipt that should not be pruned (address 1 in the log filter)
let mut receipt1 = Receipt::default();
receipt1.logs.push(Log { address: Address::with_last_byte(1), ..Default::default() });
let receipts = vec![receipt1.clone()];
assert!(recorder.save_receipts(receipts).is_ok());
// Verify that the second block of receipts contains the receipt
assert_eq!(recorder.receipts().len(), 2);
assert_eq!(recorder.receipts()[1][0], Some(receipt1));
// With a receipt that should not be pruned (address 2 in the log filter)
let mut receipt2 = Receipt::default();
receipt2.logs.push(Log { address: Address::with_last_byte(2), ..Default::default() });
let receipts = vec![receipt2.clone()];
assert!(recorder.save_receipts(receipts).is_ok());
// Verify that the third block of receipts contains the receipt
assert_eq!(recorder.receipts().len(), 3);
assert_eq!(recorder.receipts()[2][0], Some(receipt2));
// With a receipt that should not be pruned (address 3 in the log filter)
let mut receipt3 = Receipt::default();
receipt3.logs.push(Log { address: Address::with_last_byte(3), ..Default::default() });
let receipts = vec![receipt3.clone()];
assert!(recorder.save_receipts(receipts).is_ok());
// Verify that the fourth block of receipts contains the receipt
assert_eq!(recorder.receipts().len(), 4);
assert_eq!(recorder.receipts()[3][0], Some(receipt3));
}
} }

View File

@ -874,8 +874,10 @@ mod tests {
// If there is a pruning configuration, then it's forced to use the database. // If there is a pruning configuration, then it's forced to use the database.
// This way we test both cases. // This way we test both cases.
let modes = [None, Some(PruneModes::none())]; let modes = [None, Some(PruneModes::none())];
let random_filter = let random_filter = ReceiptsLogPruneConfig(BTreeMap::from([(
ReceiptsLogPruneConfig(BTreeMap::from([(Address::random(), PruneMode::Full)])); Address::random(),
PruneMode::Distance(100000),
)]));
// Tests node with database and node with static files // Tests node with database and node with static files
for mut mode in modes { for mut mode in modes {
@ -1011,8 +1013,10 @@ mod tests {
// If there is a pruning configuration, then it's forced to use the database. // If there is a pruning configuration, then it's forced to use the database.
// This way we test both cases. // This way we test both cases.
let modes = [None, Some(PruneModes::none())]; let modes = [None, Some(PruneModes::none())];
let random_filter = let random_filter = ReceiptsLogPruneConfig(BTreeMap::from([(
ReceiptsLogPruneConfig(BTreeMap::from([(Address::random(), PruneMode::Full)])); Address::random(),
PruneMode::Before(100000),
)]));
// Tests node with database and node with static files // Tests node with database and node with static files
for mut mode in modes { for mut mode in modes {

View File

@ -12,8 +12,9 @@ workspace = true
[dependencies] [dependencies]
# reth # reth
reth-primitives-traits.workspace = true
reth-fs-util.workspace = true reth-fs-util.workspace = true
reth-primitives-traits.workspace = true
reth-prune-types.workspace = true
reth-static-file-types.workspace = true reth-static-file-types.workspace = true
# ethereum # ethereum

View File

@ -4,6 +4,7 @@ use alloy_eips::{BlockHashOrNumber, HashOrNumber};
use alloy_primitives::{Address, BlockHash, BlockNumber, TxNumber, B256}; use alloy_primitives::{Address, BlockHash, BlockNumber, TxNumber, B256};
use derive_more::Display; use derive_more::Display;
use reth_primitives_traits::GotExpected; use reth_primitives_traits::GotExpected;
use reth_prune_types::PruneSegmentError;
use reth_static_file_types::StaticFileSegment; use reth_static_file_types::StaticFileSegment;
/// Provider result type. /// Provider result type.
@ -15,6 +16,9 @@ pub enum ProviderError {
/// Database error. /// Database error.
#[error(transparent)] #[error(transparent)]
Database(#[from] DatabaseError), Database(#[from] DatabaseError),
/// Pruning error.
#[error(transparent)]
Pruning(#[from] PruneSegmentError),
/// RLP error. /// RLP error.
#[error("{_0}")] #[error("{_0}")]
Rlp(alloy_rlp::Error), Rlp(alloy_rlp::Error),

View File

@ -19,7 +19,7 @@ use crate::{
StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider, StorageReader, StorageTrieWriter, TransactionVariant, TransactionsProvider,
TransactionsProviderExt, TrieWriter, WithdrawalsProvider, TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
}; };
use alloy_consensus::{transaction::TransactionMeta, BlockHeader, Header}; use alloy_consensus::{transaction::TransactionMeta, BlockHeader, Header, TxReceipt};
use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber}; use alloy_eips::{eip2718::Encodable2718, eip4895::Withdrawals, BlockHashOrNumber};
use alloy_primitives::{ use alloy_primitives::{
keccak256, keccak256,
@ -51,7 +51,9 @@ use reth_primitives::{
StaticFileSegment, StorageEntry, StaticFileSegment, StorageEntry,
}; };
use reth_primitives_traits::{Block as _, BlockBody as _, SignedTransaction}; use reth_primitives_traits::{Block as _, BlockBody as _, SignedTransaction};
use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment}; use reth_prune_types::{
PruneCheckpoint, PruneMode, PruneModes, PruneSegment, MINIMUM_PRUNING_DISTANCE,
};
use reth_stages_types::{StageCheckpoint, StageId}; use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::{ use reth_storage_api::{
BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider, BlockBodyIndicesProvider, BlockBodyReader, NodePrimitivesProvider, OmmersProvider,
@ -1783,9 +1785,11 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
write_receipts_to: StorageLocation, write_receipts_to: StorageLocation,
) -> ProviderResult<()> { ) -> ProviderResult<()> {
let first_block = execution_outcome.first_block(); let first_block = execution_outcome.first_block();
let block_count = execution_outcome.receipts.len() as u64; let block_count = execution_outcome.len() as u64;
let block_range = first_block..=first_block.saturating_add(block_count).saturating_sub(1); let last_block = execution_outcome.last_block();
let last_block = *block_range.end(); let block_range = first_block..=last_block;
let tip = self.last_block_number()?.max(last_block);
let (plain_state, reverts) = let (plain_state, reverts) =
execution_outcome.bundle.to_plain_state_and_reverts(is_value_known); execution_outcome.bundle.to_plain_state_and_reverts(is_value_known);
@ -1827,6 +1831,20 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
.then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts)) .then(|| self.static_file_provider.get_writer(first_block, StaticFileSegment::Receipts))
.transpose()?; .transpose()?;
let has_contract_log_filter = !self.prune_modes.receipts_log_filter.is_empty();
let contract_log_pruner = self.prune_modes.receipts_log_filter.group_by_block(tip, None)?;
// All receipts from the last 128 blocks are required for blockchain tree, even with
// [`PruneSegment::ContractLogs`].
let prunable_receipts =
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE).should_prune(first_block, tip);
// Prepare set of addresses which logs should not be pruned.
let mut allowed_addresses: HashSet<Address, _> = HashSet::new();
for (_, addresses) in contract_log_pruner.range(..first_block) {
allowed_addresses.extend(addresses.iter().copied());
}
for (idx, (receipts, first_tx_index)) in for (idx, (receipts, first_tx_index)) in
execution_outcome.receipts.iter().zip(block_indices).enumerate() execution_outcome.receipts.iter().zip(block_indices).enumerate()
{ {
@ -1837,9 +1855,35 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
writer.increment_block(block_number)?; writer.increment_block(block_number)?;
} }
// Skip writing receipts if pruning configuration requires us to.
if prunable_receipts &&
self.prune_modes
.receipts
.is_some_and(|mode| mode.should_prune(block_number, tip))
{
continue
}
// If there are new addresses to retain after this block number, track them
if let Some(new_addresses) = contract_log_pruner.get(&block_number) {
allowed_addresses.extend(new_addresses.iter().copied());
}
for (idx, receipt) in receipts.iter().enumerate() { for (idx, receipt) in receipts.iter().enumerate() {
let receipt_idx = first_tx_index + idx as u64; let receipt_idx = first_tx_index + idx as u64;
if let Some(receipt) = receipt { if let Some(receipt) = receipt {
// Skip writing receipt if log filter is active and it does not have any logs to
// retain
if prunable_receipts &&
has_contract_log_filter &&
!receipt
.logs()
.iter()
.any(|log| allowed_addresses.contains(&log.address))
{
continue
}
if let Some(writer) = &mut receipts_static_writer { if let Some(writer) = &mut receipts_static_writer {
writer.append_receipt(receipt_idx, receipt)?; writer.append_receipt(receipt_idx, receipt)?;
} }