mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(pruner): storage history (#4006)
This commit is contained in:
@ -5,7 +5,7 @@ use rayon::prelude::*;
|
|||||||
use reth_db::{
|
use reth_db::{
|
||||||
abstraction::cursor::{DbCursorRO, DbCursorRW},
|
abstraction::cursor::{DbCursorRO, DbCursorRW},
|
||||||
database::Database,
|
database::Database,
|
||||||
models::ShardedKey,
|
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey},
|
||||||
tables,
|
tables,
|
||||||
transaction::DbTxMut,
|
transaction::DbTxMut,
|
||||||
BlockNumberList,
|
BlockNumberList,
|
||||||
@ -31,6 +31,7 @@ pub struct BatchSizes {
|
|||||||
transaction_lookup: usize,
|
transaction_lookup: usize,
|
||||||
transaction_senders: usize,
|
transaction_senders: usize,
|
||||||
account_history: usize,
|
account_history: usize,
|
||||||
|
storage_history: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for BatchSizes {
|
impl Default for BatchSizes {
|
||||||
@ -40,6 +41,7 @@ impl Default for BatchSizes {
|
|||||||
transaction_lookup: 10000,
|
transaction_lookup: 10000,
|
||||||
transaction_senders: 10000,
|
transaction_senders: 10000,
|
||||||
account_history: 10000,
|
account_history: 10000,
|
||||||
|
storage_history: 10000,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -122,6 +124,12 @@ impl<DB: Database> Pruner<DB> {
|
|||||||
self.prune_account_history(&provider, to_block, prune_mode)?;
|
self.prune_account_history(&provider, to_block, prune_mode)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some((to_block, prune_mode)) =
|
||||||
|
self.modes.prune_target_block_storage_history(tip_block_number)?
|
||||||
|
{
|
||||||
|
self.prune_storage_history(&provider, to_block, prune_mode)?;
|
||||||
|
}
|
||||||
|
|
||||||
provider.commit()?;
|
provider.commit()?;
|
||||||
self.last_pruned_block_number = Some(tip_block_number);
|
self.last_pruned_block_number = Some(tip_block_number);
|
||||||
|
|
||||||
@ -446,6 +454,120 @@ impl<DB: Database> Pruner<DB> {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Prune storage history up to the provided block, inclusive.
|
||||||
|
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
|
||||||
|
fn prune_storage_history(
|
||||||
|
&self,
|
||||||
|
provider: &DatabaseProviderRW<'_, DB>,
|
||||||
|
to_block: BlockNumber,
|
||||||
|
prune_mode: PruneMode,
|
||||||
|
) -> PrunerResult {
|
||||||
|
let from_block = provider
|
||||||
|
.get_prune_checkpoint(PrunePart::StorageHistory)?
|
||||||
|
.map(|checkpoint| checkpoint.block_number + 1)
|
||||||
|
.unwrap_or_default();
|
||||||
|
let block_range = from_block..=to_block;
|
||||||
|
let total = block_range.clone().count();
|
||||||
|
let range = BlockNumberAddress::range(block_range);
|
||||||
|
|
||||||
|
let mut processed = 0;
|
||||||
|
provider.prune_table_with_range_in_batches::<tables::StorageChangeSet>(
|
||||||
|
range,
|
||||||
|
self.batch_sizes.storage_history,
|
||||||
|
|entries| {
|
||||||
|
processed += entries;
|
||||||
|
trace!(
|
||||||
|
target: "pruner",
|
||||||
|
%entries,
|
||||||
|
progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64),
|
||||||
|
"Pruned storage history (changesets)"
|
||||||
|
);
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let mut cursor = provider.tx_ref().cursor_write::<tables::StorageHistory>()?;
|
||||||
|
// Prune `StorageHistory` table:
|
||||||
|
// 1. If the shard has `highest_block_number` less than or equal to the target block number
|
||||||
|
// for pruning, delete the shard completely.
|
||||||
|
// 2. If the shard has `highest_block_number` greater than the target block number for
|
||||||
|
// pruning, filter block numbers inside the shard which are less than the target
|
||||||
|
// block number for pruning.
|
||||||
|
while let Some(result) = cursor.next()? {
|
||||||
|
let (key, blocks): (StorageShardedKey, BlockNumberList) = result;
|
||||||
|
|
||||||
|
if key.sharded_key.highest_block_number <= to_block {
|
||||||
|
// If shard consists only of block numbers less than the target one, delete shard
|
||||||
|
// completely.
|
||||||
|
cursor.delete_current()?;
|
||||||
|
if key.sharded_key.highest_block_number == to_block {
|
||||||
|
// Shard contains only block numbers up to the target one, so we can skip to the
|
||||||
|
// next storage slot for this address. It is guaranteed that further shards for
|
||||||
|
// this address and storage slot will not contain the target block number, as
|
||||||
|
// it's in this shard.
|
||||||
|
cursor.seek_exact(StorageShardedKey::last(key.address, key.sharded_key.key))?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Shard contains block numbers that are higher than the target one, so we need to
|
||||||
|
// filter it. It is guaranteed that further shards for this address and storage slot
|
||||||
|
// will not contain the target block number, as it's in this shard.
|
||||||
|
let blocks = blocks
|
||||||
|
.iter(0)
|
||||||
|
.skip_while(|block| *block <= to_block as usize)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
if blocks.is_empty() {
|
||||||
|
// If there are no more blocks in this shard, we need to remove it, as empty
|
||||||
|
// shards are not allowed.
|
||||||
|
if key.sharded_key.highest_block_number == u64::MAX {
|
||||||
|
// If current shard is the last shard for this address and storage slot,
|
||||||
|
// replace it with the previous shard.
|
||||||
|
if let Some((prev_key, prev_value)) = cursor.prev()? {
|
||||||
|
if prev_key.address == key.address &&
|
||||||
|
prev_key.sharded_key.key == key.sharded_key.key
|
||||||
|
{
|
||||||
|
cursor.delete_current()?;
|
||||||
|
// Upsert will replace the last shard for this address and storage
|
||||||
|
// slot with the previous value
|
||||||
|
cursor.upsert(key.clone(), prev_value)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If current shard is not the last shard for this address, just delete it.
|
||||||
|
cursor.delete_current()?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(blocks))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Jump to the next address
|
||||||
|
cursor.seek_exact(StorageShardedKey::last(key.address, key.sharded_key.key))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
processed += 1;
|
||||||
|
if processed % self.batch_sizes.storage_history == 0 {
|
||||||
|
trace!(
|
||||||
|
target: "pruner",
|
||||||
|
entries = self.batch_sizes.storage_history,
|
||||||
|
"Pruned storage history (indices)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if processed % self.batch_sizes.storage_history != 0 {
|
||||||
|
trace!(
|
||||||
|
target: "pruner",
|
||||||
|
entries = processed % self.batch_sizes.storage_history,
|
||||||
|
"Pruned storage history (indices)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
provider.save_prune_checkpoint(
|
||||||
|
PrunePart::StorageHistory,
|
||||||
|
PruneCheckpoint { block_number: to_block, prune_mode },
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -690,7 +812,7 @@ mod tests {
|
|||||||
let (changesets, _) = random_changeset_range(
|
let (changesets, _) = random_changeset_range(
|
||||||
&mut rng,
|
&mut rng,
|
||||||
blocks.iter(),
|
blocks.iter(),
|
||||||
accounts.clone().into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
||||||
0..0,
|
0..0,
|
||||||
0..0,
|
0..0,
|
||||||
);
|
);
|
||||||
@ -698,7 +820,7 @@ mod tests {
|
|||||||
tx.insert_history(changesets.clone(), None).expect("insert history");
|
tx.insert_history(changesets.clone(), None).expect("insert history");
|
||||||
|
|
||||||
let account_occurrences = tx.table::<tables::AccountHistory>().unwrap().into_iter().fold(
|
let account_occurrences = tx.table::<tables::AccountHistory>().unwrap().into_iter().fold(
|
||||||
BTreeMap::<Address, usize>::new(),
|
BTreeMap::<_, usize>::new(),
|
||||||
|mut map, (key, _)| {
|
|mut map, (key, _)| {
|
||||||
map.entry(key.key).or_default().add_assign(1);
|
map.entry(key.key).or_default().add_assign(1);
|
||||||
map
|
map
|
||||||
@ -764,4 +886,98 @@ mod tests {
|
|||||||
// ended last time
|
// ended last time
|
||||||
test_prune(4500);
|
test_prune(4500);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn prune_storage_history() {
|
||||||
|
let tx = TestTransaction::default();
|
||||||
|
let mut rng = generators::rng();
|
||||||
|
|
||||||
|
let block_num = 7000;
|
||||||
|
let blocks = random_block_range(&mut rng, 0..=block_num, H256::zero(), 0..1);
|
||||||
|
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
|
||||||
|
|
||||||
|
let accounts =
|
||||||
|
random_eoa_account_range(&mut rng, 0..3).into_iter().collect::<BTreeMap<_, _>>();
|
||||||
|
|
||||||
|
let (changesets, _) = random_changeset_range(
|
||||||
|
&mut rng,
|
||||||
|
blocks.iter(),
|
||||||
|
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
||||||
|
1..2,
|
||||||
|
1..2,
|
||||||
|
);
|
||||||
|
tx.insert_changesets(changesets.clone(), None).expect("insert changesets");
|
||||||
|
tx.insert_history(changesets.clone(), None).expect("insert history");
|
||||||
|
|
||||||
|
let storage_occurences = tx.table::<tables::StorageHistory>().unwrap().into_iter().fold(
|
||||||
|
BTreeMap::<_, usize>::new(),
|
||||||
|
|mut map, (key, _)| {
|
||||||
|
map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
|
||||||
|
map
|
||||||
|
},
|
||||||
|
);
|
||||||
|
assert!(storage_occurences.into_iter().any(|(_, occurrences)| occurrences > 1));
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
tx.table::<tables::StorageChangeSet>().unwrap().len(),
|
||||||
|
changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count()
|
||||||
|
);
|
||||||
|
|
||||||
|
let original_shards = tx.table::<tables::StorageHistory>().unwrap();
|
||||||
|
|
||||||
|
let test_prune = |to_block: BlockNumber| {
|
||||||
|
let prune_mode = PruneMode::Before(to_block);
|
||||||
|
let pruner = Pruner::new(
|
||||||
|
tx.inner_raw(),
|
||||||
|
MAINNET.clone(),
|
||||||
|
5,
|
||||||
|
PruneModes { storage_history: Some(prune_mode), ..Default::default() },
|
||||||
|
BatchSizes {
|
||||||
|
// Less than total amount of blocks to prune to test the batching logic
|
||||||
|
storage_history: 10,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let provider = tx.inner_rw();
|
||||||
|
assert_matches!(pruner.prune_storage_history(&provider, to_block, prune_mode), Ok(()));
|
||||||
|
provider.commit().expect("commit");
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
tx.table::<tables::StorageChangeSet>().unwrap().len(),
|
||||||
|
changesets[to_block as usize + 1..]
|
||||||
|
.iter()
|
||||||
|
.flatten()
|
||||||
|
.flat_map(|(_, _, entries)| entries)
|
||||||
|
.count()
|
||||||
|
);
|
||||||
|
|
||||||
|
let actual_shards = tx.table::<tables::StorageHistory>().unwrap();
|
||||||
|
|
||||||
|
let expected_shards = original_shards
|
||||||
|
.iter()
|
||||||
|
.filter(|(key, _)| key.sharded_key.highest_block_number > to_block)
|
||||||
|
.map(|(key, blocks)| {
|
||||||
|
let new_blocks = blocks
|
||||||
|
.iter(0)
|
||||||
|
.skip_while(|block| *block <= to_block as usize)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
assert_eq!(actual_shards, expected_shards);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
tx.inner().get_prune_checkpoint(PrunePart::StorageHistory).unwrap(),
|
||||||
|
Some(PruneCheckpoint { block_number: to_block, prune_mode })
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Prune first time: no previous checkpoint is present
|
||||||
|
test_prune(3000);
|
||||||
|
// Prune second time: previous checkpoint is present, should continue pruning from where
|
||||||
|
// ended last time
|
||||||
|
test_prune(4500);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1530,7 +1530,7 @@ impl TransportRpcModules<()> {
|
|||||||
let other = other.into();
|
let other = other.into();
|
||||||
self.merge_http(other.clone())?;
|
self.merge_http(other.clone())?;
|
||||||
self.merge_ws(other.clone())?;
|
self.merge_ws(other.clone())?;
|
||||||
self.merge_ipc(other.clone())?;
|
self.merge_ipc(other)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user