From 24632aca6f98cda6c8d425c7a0da4b563787c174 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Fri, 18 Aug 2023 16:58:07 +0100 Subject: [PATCH] feat(pruner): prune receipts based on log emitters during live sync (#4140) --- crates/primitives/src/prune/mod.rs | 47 ++++- crates/prune/src/pruner.rs | 165 ++++++++++++++++++ crates/storage/provider/src/post_state/mod.rs | 2 +- .../src/providers/database/provider.rs | 13 +- 4 files changed, 217 insertions(+), 10 deletions(-) diff --git a/crates/primitives/src/prune/mod.rs b/crates/primitives/src/prune/mod.rs index bec2bfa26..5359c3d9c 100644 --- a/crates/primitives/src/prune/mod.rs +++ b/crates/primitives/src/prune/mod.rs @@ -24,24 +24,61 @@ impl ContractLogsPruneConfig { /// Given the `tip` block number, consolidates the structure so it can easily be queried for /// filtering across a range of blocks. /// - /// The [`BlockNumber`] key of the map should be viewed as `PruneMode::Before(block)`. + /// Example: + /// + /// `{ addrA: Before(872), addrB: Before(500), addrC: Distance(128) }` + /// + /// for `tip: 1000`, gets transformed to a map such as: + /// + /// `{ 500: [addrB], 872: [addrA, addrC] }` + /// + /// The [`BlockNumber`] key of the new map should be viewed as `PruneMode::Before(block)`, which + /// makes the previous result equivalent to + /// + /// `{ Before(500): [addrB], Before(872): [addrA, addrC] }` pub fn group_by_block( &self, tip: BlockNumber, + pruned_block: Option, ) -> Result>, PrunePartError> { let mut map = BTreeMap::new(); + let pruned_block = pruned_block.unwrap_or_default(); + for (address, mode) in self.0.iter() { // Getting `None`, means that there is nothing to prune yet, so we need it to include in // the BTreeMap (block = 0), otherwise it will be excluded. // Reminder that this BTreeMap works as an inclusion list that excludes (prunes) all // other receipts. - let block = mode - .prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)? - .map(|(block, _)| block) - .unwrap_or_default(); + let block = (pruned_block + 1).max( + mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)? + .map(|(block, _)| block) + .unwrap_or_default(), + ); map.entry(block).or_insert_with(Vec::new).push(address) } Ok(map) } + + /// Returns the lowest block where we start filtering logs which use `PruneMode::Distance(_)`. + pub fn lowest_block_with_distance( + &self, + tip: BlockNumber, + pruned_block: Option, + ) -> Result, PrunePartError> { + let pruned_block = pruned_block.unwrap_or_default(); + let mut lowest = None; + + for (_, mode) in self.0.iter() { + if let PruneMode::Distance(_) = mode { + if let Some((block, _)) = + mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)? + { + lowest = Some(lowest.unwrap_or(u64::MAX).min(block)); + } + } + } + + Ok(lowest.map(|lowest| lowest.max(pruned_block))) + } } diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index e39cd5b4c..5ae773c12 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -13,6 +13,7 @@ use reth_db::{ }; use reth_primitives::{ BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber, + MINIMUM_PRUNING_DISTANCE, }; use reth_provider::{ BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, @@ -102,6 +103,15 @@ impl Pruner { .record(part_start.elapsed()) } + if !self.modes.contract_logs_filter.is_empty() { + let part_start = Instant::now(); + self.prune_receipts_by_logs(&provider, tip_block_number)?; + self.metrics + .get_prune_part_metrics(PrunePart::ContractLogs) + .duration_seconds + .record(part_start.elapsed()) + } + if let Some((to_block, prune_mode)) = self.modes.prune_target_block_transaction_lookup(tip_block_number)? { @@ -251,6 +261,7 @@ impl Pruner { "Pruned receipts" ); }, + |_| false, )?; provider.save_prune_checkpoint( @@ -258,6 +269,160 @@ impl Pruner { PruneCheckpoint { block_number: to_block, prune_mode }, )?; + // `PrunePart::Receipts` overrides `PrunePart::ContractLogs`, so we can preemptively + // limit their pruning start point. + provider.save_prune_checkpoint( + PrunePart::ContractLogs, + PruneCheckpoint { block_number: to_block, prune_mode }, + )?; + + Ok(()) + } + + /// Prune receipts up to the provided block by filtering logs. Works as in inclusion list, and + /// removes every receipt not belonging to it. + #[instrument(level = "trace", skip(self, provider), target = "pruner")] + fn prune_receipts_by_logs( + &self, + provider: &DatabaseProviderRW<'_, DB>, + tip_block_number: BlockNumber, + ) -> PrunerResult { + // Contract log filtering removes every receipt possible except the ones in the list. So, + // for the other receipts it's as if they had a `PruneMode::Distance()` of 128. + let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE) + .prune_target_block( + tip_block_number, + MINIMUM_PRUNING_DISTANCE, + PrunePart::ContractLogs, + )? + .map(|(bn, _)| bn) + .unwrap_or_default(); + + // Figure out what receipts have already been pruned, so we can have an accurate + // `address_filter` + let pruned = provider + .get_prune_checkpoint(PrunePart::ContractLogs)? + .map(|checkpoint| checkpoint.block_number); + + let address_filter = + self.modes.contract_logs_filter.group_by_block(tip_block_number, pruned)?; + + // Splits all transactions in different block ranges. Each block range will have its own + // filter address list and will check it while going through the table + // + // Example: + // For an `address_filter` such as: + // { block9: [a1, a2], block20: [a3, a4, a5] } + // + // The following structures will be created in the exact order as showed: + // `block_ranges`: [ + // (block0, block8, 0 addresses), + // (block9, block19, 2 addresses), + // (block20, to_block, 5 addresses) + // ] + // `filtered_addresses`: [a1, a2, a3, a4, a5] + // + // The first range will delete all receipts between block0 - block8 + // The second range will delete all receipts between block9 - 19, except the ones with + // emitter logs from these addresses: [a1, a2]. + // The third range will delete all receipts between block20 - to_block, except the ones with + // emitter logs from these addresses: [a1, a2, a3, a4, a5] + let mut block_ranges = vec![]; + let mut blocks_iter = address_filter.iter().peekable(); + let mut filtered_addresses = vec![]; + + while let Some((start_block, addresses)) = blocks_iter.next() { + filtered_addresses.extend_from_slice(addresses); + + // This will clear all receipts before the first appearance of a contract log + if block_ranges.is_empty() { + block_ranges.push((0, *start_block - 1, 0)); + } + + let end_block = + blocks_iter.peek().map(|(next_block, _)| *next_block - 1).unwrap_or(to_block); + + // Addresses in lower block ranges, are still included in the inclusion list for future + // ranges. + block_ranges.push((*start_block, end_block, filtered_addresses.len())); + } + + for (start_block, end_block, num_addresses) in block_ranges { + let range = match self.get_next_tx_num_range_from_checkpoint( + provider, + PrunePart::ContractLogs, + end_block, + )? { + Some(range) => range, + None => { + trace!( + target: "pruner", + block_range = format!("{start_block}..={end_block}"), + "No receipts to prune." + ); + continue + } + }; + + let total = range.clone().count(); + let mut processed = 0; + + provider.prune_table_with_iterator_in_batches::( + range, + self.batch_sizes.receipts, + |rows| { + processed += rows; + trace!( + target: "pruner", + %rows, + block_range = format!("{start_block}..={end_block}"), + progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64), + "Pruned receipts" + ); + }, + |receipt| { + num_addresses > 0 && + receipt.logs.iter().any(|log| { + filtered_addresses[..num_addresses].contains(&&log.address) + }) + }, + )?; + + // If this is the last block range, avoid writing an unused checkpoint + if end_block != to_block { + // This allows us to query for the transactions in the next block range with + // [`get_next_tx_num_range_from_checkpoint`]. It's just a temporary intermediate + // checkpoint, which should be adjusted in the end. + provider.save_prune_checkpoint( + PrunePart::ContractLogs, + PruneCheckpoint { + block_number: end_block, + prune_mode: PruneMode::Before(end_block + 1), + }, + )?; + } + } + + // If there are contracts using `PruneMode::Distance(_)` there will be receipts before + // `to_block` that become eligible to be pruned in future runs. Therefore, our + // checkpoint is not actually `to_block`, but the `lowest_block_with_distance` from any + // contract. This ensures that in future pruner runs we can + // prune all these receipts between the previous `lowest_block_with_distance` and the new + // one using `get_next_tx_num_range_from_checkpoint`. + let checkpoint_block = self + .modes + .contract_logs_filter + .lowest_block_with_distance(tip_block_number, pruned)? + .unwrap_or(to_block); + + provider.save_prune_checkpoint( + PrunePart::ContractLogs, + PruneCheckpoint { + block_number: checkpoint_block - 1, + prune_mode: PruneMode::Before(checkpoint_block), + }, + )?; + Ok(()) } diff --git a/crates/storage/provider/src/post_state/mod.rs b/crates/storage/provider/src/post_state/mod.rs index 4032531ff..aa1cb7489 100644 --- a/crates/storage/provider/src/post_state/mod.rs +++ b/crates/storage/provider/src/post_state/mod.rs @@ -662,7 +662,7 @@ impl PostState { let contract_log_pruner = self .prune_modes .contract_logs_filter - .group_by_block(tip) + .group_by_block(tip, None) .map_err(|e| Error::Custom(e.to_string()))?; // Empty implies that there is going to be diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 0ac58a3b7..b013ee697 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -629,27 +629,32 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { &self, keys: impl IntoIterator, ) -> std::result::Result { - self.prune_table_with_iterator_in_batches::(keys, usize::MAX, |_| {}) + self.prune_table_with_iterator_in_batches::(keys, usize::MAX, |_| {}, |_| false) } /// Prune the table for the specified pre-sorted key iterator, calling `chunk_callback` after /// every `batch_size` pruned rows with number of total rows pruned. /// + /// `skip_filter` can be used to skip pruning certain elements. + /// /// Returns number of rows pruned. pub fn prune_table_with_iterator_in_batches( &self, keys: impl IntoIterator, batch_size: usize, mut batch_callback: impl FnMut(usize), + skip_filter: impl Fn(&T::Value) -> bool, ) -> std::result::Result { let mut cursor = self.tx.cursor_write::()?; let mut deleted = 0; for key in keys { - if cursor.seek_exact(key)?.is_some() { - cursor.delete_current()?; + if let Some((_, value)) = cursor.seek_exact(key)? { + if !skip_filter(&value) { + cursor.delete_current()?; + deleted += 1; + } } - deleted += 1; if deleted % batch_size == 0 { batch_callback(deleted);