From 8f1bc8a799d6f9fe81db42f5948b3507cd2cd2cd Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 2 Aug 2023 16:06:57 +0100 Subject: [PATCH] feat(pruner): storage history (#4006) --- crates/prune/src/pruner.rs | 222 +++++++++++++++++++++++++++++- crates/rpc/rpc-builder/src/lib.rs | 2 +- 2 files changed, 220 insertions(+), 4 deletions(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 57775390e..cc71421bf 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -5,7 +5,7 @@ use rayon::prelude::*; use reth_db::{ abstraction::cursor::{DbCursorRO, DbCursorRW}, database::Database, - models::ShardedKey, + models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey}, tables, transaction::DbTxMut, BlockNumberList, @@ -31,6 +31,7 @@ pub struct BatchSizes { transaction_lookup: usize, transaction_senders: usize, account_history: usize, + storage_history: usize, } impl Default for BatchSizes { @@ -40,6 +41,7 @@ impl Default for BatchSizes { transaction_lookup: 10000, transaction_senders: 10000, account_history: 10000, + storage_history: 10000, } } } @@ -122,6 +124,12 @@ impl Pruner { self.prune_account_history(&provider, to_block, prune_mode)?; } + if let Some((to_block, prune_mode)) = + self.modes.prune_target_block_storage_history(tip_block_number)? + { + self.prune_storage_history(&provider, to_block, prune_mode)?; + } + provider.commit()?; self.last_pruned_block_number = Some(tip_block_number); @@ -446,6 +454,120 @@ impl Pruner { Ok(()) } + + /// Prune storage history up to the provided block, inclusive. + #[instrument(level = "trace", skip(self, provider), target = "pruner")] + fn prune_storage_history( + &self, + provider: &DatabaseProviderRW<'_, DB>, + to_block: BlockNumber, + prune_mode: PruneMode, + ) -> PrunerResult { + let from_block = provider + .get_prune_checkpoint(PrunePart::StorageHistory)? + .map(|checkpoint| checkpoint.block_number + 1) + .unwrap_or_default(); + let block_range = from_block..=to_block; + let total = block_range.clone().count(); + let range = BlockNumberAddress::range(block_range); + + let mut processed = 0; + provider.prune_table_with_range_in_batches::( + range, + self.batch_sizes.storage_history, + |entries| { + processed += entries; + trace!( + target: "pruner", + %entries, + progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64), + "Pruned storage history (changesets)" + ); + }, + )?; + + let mut cursor = provider.tx_ref().cursor_write::()?; + // Prune `StorageHistory` table: + // 1. If the shard has `highest_block_number` less than or equal to the target block number + // for pruning, delete the shard completely. + // 2. If the shard has `highest_block_number` greater than the target block number for + // pruning, filter block numbers inside the shard which are less than the target + // block number for pruning. + while let Some(result) = cursor.next()? { + let (key, blocks): (StorageShardedKey, BlockNumberList) = result; + + if key.sharded_key.highest_block_number <= to_block { + // If shard consists only of block numbers less than the target one, delete shard + // completely. + cursor.delete_current()?; + if key.sharded_key.highest_block_number == to_block { + // Shard contains only block numbers up to the target one, so we can skip to the + // next storage slot for this address. It is guaranteed that further shards for + // this address and storage slot will not contain the target block number, as + // it's in this shard. + cursor.seek_exact(StorageShardedKey::last(key.address, key.sharded_key.key))?; + } + } else { + // Shard contains block numbers that are higher than the target one, so we need to + // filter it. It is guaranteed that further shards for this address and storage slot + // will not contain the target block number, as it's in this shard. + let blocks = blocks + .iter(0) + .skip_while(|block| *block <= to_block as usize) + .collect::>(); + if blocks.is_empty() { + // If there are no more blocks in this shard, we need to remove it, as empty + // shards are not allowed. + if key.sharded_key.highest_block_number == u64::MAX { + // If current shard is the last shard for this address and storage slot, + // replace it with the previous shard. + if let Some((prev_key, prev_value)) = cursor.prev()? { + if prev_key.address == key.address && + prev_key.sharded_key.key == key.sharded_key.key + { + cursor.delete_current()?; + // Upsert will replace the last shard for this address and storage + // slot with the previous value + cursor.upsert(key.clone(), prev_value)?; + } + } + } else { + // If current shard is not the last shard for this address, just delete it. + cursor.delete_current()?; + } + } else { + cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(blocks))?; + } + + // Jump to the next address + cursor.seek_exact(StorageShardedKey::last(key.address, key.sharded_key.key))?; + } + + processed += 1; + if processed % self.batch_sizes.storage_history == 0 { + trace!( + target: "pruner", + entries = self.batch_sizes.storage_history, + "Pruned storage history (indices)" + ); + } + } + + if processed % self.batch_sizes.storage_history != 0 { + trace!( + target: "pruner", + entries = processed % self.batch_sizes.storage_history, + "Pruned storage history (indices)" + ); + } + + provider.save_prune_checkpoint( + PrunePart::StorageHistory, + PruneCheckpoint { block_number: to_block, prune_mode }, + )?; + + Ok(()) + } } #[cfg(test)] @@ -690,7 +812,7 @@ mod tests { let (changesets, _) = random_changeset_range( &mut rng, blocks.iter(), - accounts.clone().into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), 0..0, 0..0, ); @@ -698,7 +820,7 @@ mod tests { tx.insert_history(changesets.clone(), None).expect("insert history"); let account_occurrences = tx.table::().unwrap().into_iter().fold( - BTreeMap::::new(), + BTreeMap::<_, usize>::new(), |mut map, (key, _)| { map.entry(key.key).or_default().add_assign(1); map @@ -764,4 +886,98 @@ mod tests { // ended last time test_prune(4500); } + + #[test] + fn prune_storage_history() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let block_num = 7000; + let blocks = random_block_range(&mut rng, 0..=block_num, H256::zero(), 0..1); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let accounts = + random_eoa_account_range(&mut rng, 0..3).into_iter().collect::>(); + + let (changesets, _) = random_changeset_range( + &mut rng, + blocks.iter(), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + 1..2, + 1..2, + ); + tx.insert_changesets(changesets.clone(), None).expect("insert changesets"); + tx.insert_history(changesets.clone(), None).expect("insert history"); + + let storage_occurences = tx.table::().unwrap().into_iter().fold( + BTreeMap::<_, usize>::new(), + |mut map, (key, _)| { + map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1); + map + }, + ); + assert!(storage_occurences.into_iter().any(|(_, occurrences)| occurrences > 1)); + + assert_eq!( + tx.table::().unwrap().len(), + changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count() + ); + + let original_shards = tx.table::().unwrap(); + + let test_prune = |to_block: BlockNumber| { + let prune_mode = PruneMode::Before(to_block); + let pruner = Pruner::new( + tx.inner_raw(), + MAINNET.clone(), + 5, + PruneModes { storage_history: Some(prune_mode), ..Default::default() }, + BatchSizes { + // Less than total amount of blocks to prune to test the batching logic + storage_history: 10, + ..Default::default() + }, + ); + + let provider = tx.inner_rw(); + assert_matches!(pruner.prune_storage_history(&provider, to_block, prune_mode), Ok(())); + provider.commit().expect("commit"); + + assert_eq!( + tx.table::().unwrap().len(), + changesets[to_block as usize + 1..] + .iter() + .flatten() + .flat_map(|(_, _, entries)| entries) + .count() + ); + + let actual_shards = tx.table::().unwrap(); + + let expected_shards = original_shards + .iter() + .filter(|(key, _)| key.sharded_key.highest_block_number > to_block) + .map(|(key, blocks)| { + let new_blocks = blocks + .iter(0) + .skip_while(|block| *block <= to_block as usize) + .collect::>(); + (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) + }) + .collect::>(); + + assert_eq!(actual_shards, expected_shards); + + assert_eq!( + tx.inner().get_prune_checkpoint(PrunePart::StorageHistory).unwrap(), + Some(PruneCheckpoint { block_number: to_block, prune_mode }) + ); + }; + + // Prune first time: no previous checkpoint is present + test_prune(3000); + // Prune second time: previous checkpoint is present, should continue pruning from where + // ended last time + test_prune(4500); + } } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index dadccb1c7..6b2f7461c 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -1530,7 +1530,7 @@ impl TransportRpcModules<()> { let other = other.into(); self.merge_http(other.clone())?; self.merge_ws(other.clone())?; - self.merge_ipc(other.clone())?; + self.merge_ipc(other)?; Ok(()) }