mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 19:09:54 +00:00
feat(pruner): account history (#4000)
This commit is contained in:
@ -191,21 +191,22 @@ pub fn random_block_range<R: Rng>(
|
|||||||
blocks
|
blocks
|
||||||
}
|
}
|
||||||
|
|
||||||
type Transition = Vec<(Address, Account, Vec<StorageEntry>)>;
|
/// Collection of account and storage entry changes
|
||||||
|
pub type ChangeSet = Vec<(Address, Account, Vec<StorageEntry>)>;
|
||||||
type AccountState = (Account, Vec<StorageEntry>);
|
type AccountState = (Account, Vec<StorageEntry>);
|
||||||
|
|
||||||
/// Generate a range of transitions for given blocks and accounts.
|
/// Generate a range of changesets for given blocks and accounts.
|
||||||
/// Assumes all accounts start with an empty storage.
|
/// Assumes all accounts start with an empty storage.
|
||||||
///
|
///
|
||||||
/// Returns a Vec of account and storage changes for each transition,
|
/// Returns a Vec of account and storage changes for each block,
|
||||||
/// along with the final state of all accounts and storages.
|
/// along with the final state of all accounts and storages.
|
||||||
pub fn random_transition_range<'a, R: Rng, IBlk, IAcc>(
|
pub fn random_changeset_range<'a, R: Rng, IBlk, IAcc>(
|
||||||
rng: &mut R,
|
rng: &mut R,
|
||||||
blocks: IBlk,
|
blocks: IBlk,
|
||||||
accounts: IAcc,
|
accounts: IAcc,
|
||||||
n_changes: std::ops::Range<u64>,
|
n_storage_changes: std::ops::Range<u64>,
|
||||||
key_range: std::ops::Range<u64>,
|
key_range: std::ops::Range<u64>,
|
||||||
) -> (Vec<Transition>, BTreeMap<Address, AccountState>)
|
) -> (Vec<ChangeSet>, BTreeMap<Address, AccountState>)
|
||||||
where
|
where
|
||||||
IBlk: IntoIterator<Item = &'a SealedBlock>,
|
IBlk: IntoIterator<Item = &'a SealedBlock>,
|
||||||
IAcc: IntoIterator<Item = (Address, (Account, Vec<StorageEntry>))>,
|
IAcc: IntoIterator<Item = (Address, (Account, Vec<StorageEntry>))>,
|
||||||
@ -217,16 +218,20 @@ where
|
|||||||
|
|
||||||
let valid_addresses = state.keys().copied().collect();
|
let valid_addresses = state.keys().copied().collect();
|
||||||
|
|
||||||
let mut transitions = Vec::new();
|
let mut changesets = Vec::new();
|
||||||
|
|
||||||
blocks.into_iter().for_each(|block| {
|
blocks.into_iter().for_each(|block| {
|
||||||
let mut transition = Vec::new();
|
let mut changeset = Vec::new();
|
||||||
let (from, to, mut transfer, new_entries) =
|
let (from, to, mut transfer, new_entries) = random_account_change(
|
||||||
random_account_change(rng, &valid_addresses, n_changes.clone(), key_range.clone());
|
rng,
|
||||||
|
&valid_addresses,
|
||||||
|
n_storage_changes.clone(),
|
||||||
|
key_range.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
// extract from sending account
|
// extract from sending account
|
||||||
let (prev_from, _) = state.get_mut(&from).unwrap();
|
let (prev_from, _) = state.get_mut(&from).unwrap();
|
||||||
transition.push((from, *prev_from, Vec::new()));
|
changeset.push((from, *prev_from, Vec::new()));
|
||||||
|
|
||||||
transfer = max(min(transfer, prev_from.balance), U256::from(1));
|
transfer = max(min(transfer, prev_from.balance), U256::from(1));
|
||||||
prev_from.balance = prev_from.balance.wrapping_sub(transfer);
|
prev_from.balance = prev_from.balance.wrapping_sub(transfer);
|
||||||
@ -250,11 +255,11 @@ where
|
|||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
transition.push((to, *prev_to, old_entries));
|
changeset.push((to, *prev_to, old_entries));
|
||||||
|
|
||||||
prev_to.balance = prev_to.balance.wrapping_add(transfer);
|
prev_to.balance = prev_to.balance.wrapping_add(transfer);
|
||||||
|
|
||||||
transitions.push(transition);
|
changesets.push(changeset);
|
||||||
});
|
});
|
||||||
|
|
||||||
let final_state = state
|
let final_state = state
|
||||||
@ -263,7 +268,7 @@ where
|
|||||||
(addr, (acc, storage.into_iter().map(|v| v.into()).collect()))
|
(addr, (acc, storage.into_iter().map(|v| v.into()).collect()))
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
(transitions, final_state)
|
(changesets, final_state)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate a random account change.
|
/// Generate a random account change.
|
||||||
@ -272,7 +277,7 @@ where
|
|||||||
pub fn random_account_change<R: Rng>(
|
pub fn random_account_change<R: Rng>(
|
||||||
rng: &mut R,
|
rng: &mut R,
|
||||||
valid_addresses: &Vec<Address>,
|
valid_addresses: &Vec<Address>,
|
||||||
n_changes: std::ops::Range<u64>,
|
n_storage_changes: std::ops::Range<u64>,
|
||||||
key_range: std::ops::Range<u64>,
|
key_range: std::ops::Range<u64>,
|
||||||
) -> (Address, Address, U256, Vec<StorageEntry>) {
|
) -> (Address, Address, U256, Vec<StorageEntry>) {
|
||||||
let mut addresses = valid_addresses.choose_multiple(rng, 2).cloned();
|
let mut addresses = valid_addresses.choose_multiple(rng, 2).cloned();
|
||||||
@ -282,9 +287,13 @@ pub fn random_account_change<R: Rng>(
|
|||||||
|
|
||||||
let balance_change = U256::from(rng.gen::<u64>());
|
let balance_change = U256::from(rng.gen::<u64>());
|
||||||
|
|
||||||
let storage_changes = (0..n_changes.sample_single(rng))
|
let storage_changes = if n_storage_changes.is_empty() {
|
||||||
.map(|_| random_storage_entry(rng, key_range.clone()))
|
Vec::new()
|
||||||
.collect();
|
} else {
|
||||||
|
(0..n_storage_changes.sample_single(rng))
|
||||||
|
.map(|_| random_storage_entry(rng, key_range.clone()))
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
|
||||||
(addr_from, addr_to, balance_change, storage_changes)
|
(addr_from, addr_to, balance_change, storage_changes)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -2,9 +2,16 @@
|
|||||||
|
|
||||||
use crate::{Metrics, PrunerError};
|
use crate::{Metrics, PrunerError};
|
||||||
use rayon::prelude::*;
|
use rayon::prelude::*;
|
||||||
use reth_db::{database::Database, tables};
|
use reth_db::{
|
||||||
|
abstraction::cursor::{DbCursorRO, DbCursorRW},
|
||||||
|
database::Database,
|
||||||
|
models::ShardedKey,
|
||||||
|
tables,
|
||||||
|
transaction::DbTxMut,
|
||||||
|
BlockNumberList,
|
||||||
|
};
|
||||||
use reth_primitives::{
|
use reth_primitives::{
|
||||||
BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber,
|
Address, BlockNumber, ChainSpec, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber,
|
||||||
};
|
};
|
||||||
use reth_provider::{
|
use reth_provider::{
|
||||||
BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
|
BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
|
||||||
@ -23,11 +30,17 @@ pub struct BatchSizes {
|
|||||||
receipts: usize,
|
receipts: usize,
|
||||||
transaction_lookup: usize,
|
transaction_lookup: usize,
|
||||||
transaction_senders: usize,
|
transaction_senders: usize,
|
||||||
|
account_history: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for BatchSizes {
|
impl Default for BatchSizes {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
Self { receipts: 10000, transaction_lookup: 10000, transaction_senders: 10000 }
|
Self {
|
||||||
|
receipts: 10000,
|
||||||
|
transaction_lookup: 10000,
|
||||||
|
transaction_senders: 10000,
|
||||||
|
account_history: 10000,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,6 +116,12 @@ impl<DB: Database> Pruner<DB> {
|
|||||||
.record(part_start.elapsed())
|
.record(part_start.elapsed())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if let Some((to_block, prune_mode)) =
|
||||||
|
self.modes.prune_target_block_account_history(tip_block_number)?
|
||||||
|
{
|
||||||
|
self.prune_account_history(&provider, to_block, prune_mode)?;
|
||||||
|
}
|
||||||
|
|
||||||
provider.commit()?;
|
provider.commit()?;
|
||||||
self.last_pruned_block_number = Some(tip_block_number);
|
self.last_pruned_block_number = Some(tip_block_number);
|
||||||
|
|
||||||
@ -188,7 +207,7 @@ impl<DB: Database> Pruner<DB> {
|
|||||||
let total = range.clone().count();
|
let total = range.clone().count();
|
||||||
|
|
||||||
let mut processed = 0;
|
let mut processed = 0;
|
||||||
provider.prune_table_in_batches::<tables::Receipts, _>(
|
provider.prune_table_with_iterator_in_batches::<tables::Receipts>(
|
||||||
range,
|
range,
|
||||||
self.batch_sizes.receipts,
|
self.batch_sizes.receipts,
|
||||||
|entries| {
|
|entries| {
|
||||||
@ -256,7 +275,7 @@ impl<DB: Database> Pruner<DB> {
|
|||||||
// Pre-sort hashes to prune them in order
|
// Pre-sort hashes to prune them in order
|
||||||
hashes.sort_unstable();
|
hashes.sort_unstable();
|
||||||
|
|
||||||
let entries = provider.prune_table::<tables::TxHashNumber, _>(hashes)?;
|
let entries = provider.prune_table_with_iterator::<tables::TxHashNumber>(hashes)?;
|
||||||
processed += entries;
|
processed += entries;
|
||||||
trace!(
|
trace!(
|
||||||
target: "pruner",
|
target: "pruner",
|
||||||
@ -296,7 +315,7 @@ impl<DB: Database> Pruner<DB> {
|
|||||||
let total = range.clone().count();
|
let total = range.clone().count();
|
||||||
|
|
||||||
let mut processed = 0;
|
let mut processed = 0;
|
||||||
provider.prune_table_in_batches::<tables::TxSenders, _>(
|
provider.prune_table_with_range_in_batches::<tables::TxSenders>(
|
||||||
range,
|
range,
|
||||||
self.batch_sizes.transaction_senders,
|
self.batch_sizes.transaction_senders,
|
||||||
|entries| {
|
|entries| {
|
||||||
@ -317,22 +336,135 @@ impl<DB: Database> Pruner<DB> {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Prune account history up to the provided block, inclusive.
|
||||||
|
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
|
||||||
|
fn prune_account_history(
|
||||||
|
&self,
|
||||||
|
provider: &DatabaseProviderRW<'_, DB>,
|
||||||
|
to_block: BlockNumber,
|
||||||
|
prune_mode: PruneMode,
|
||||||
|
) -> PrunerResult {
|
||||||
|
let from_block = provider
|
||||||
|
.get_prune_checkpoint(PrunePart::AccountHistory)?
|
||||||
|
.map(|checkpoint| checkpoint.block_number + 1)
|
||||||
|
.unwrap_or_default();
|
||||||
|
let range = from_block..=to_block;
|
||||||
|
let total = range.clone().count();
|
||||||
|
|
||||||
|
let mut processed = 0;
|
||||||
|
provider.prune_table_with_range_in_batches::<tables::AccountChangeSet>(
|
||||||
|
range,
|
||||||
|
self.batch_sizes.account_history,
|
||||||
|
|entries| {
|
||||||
|
processed += entries;
|
||||||
|
trace!(
|
||||||
|
target: "pruner",
|
||||||
|
%entries,
|
||||||
|
progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64),
|
||||||
|
"Pruned account history (changesets)"
|
||||||
|
);
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let mut cursor = provider.tx_ref().cursor_write::<tables::AccountHistory>()?;
|
||||||
|
// Prune `AccountHistory` table:
|
||||||
|
// 1. If the shard has `highest_block_number` less than or equal to the target block number
|
||||||
|
// for pruning, delete the shard completely.
|
||||||
|
// 2. If the shard has `highest_block_number` greater than the target block number for
|
||||||
|
// pruning, filter block numbers inside the shard which are less than the target
|
||||||
|
// block number for pruning.
|
||||||
|
while let Some(result) = cursor.next()? {
|
||||||
|
let (key, blocks): (ShardedKey<Address>, BlockNumberList) = result;
|
||||||
|
|
||||||
|
if key.highest_block_number <= to_block {
|
||||||
|
// If shard consists only of block numbers less than the target one, delete shard
|
||||||
|
// completely.
|
||||||
|
cursor.delete_current()?;
|
||||||
|
if key.highest_block_number == to_block {
|
||||||
|
// Shard contains only block numbers up to the target one, so we can skip to the
|
||||||
|
// next address. It is guaranteed that further shards for this address will not
|
||||||
|
// contain the target block number, as it's in this shard.
|
||||||
|
cursor.seek_exact(ShardedKey::last(key.key))?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Shard contains block numbers that are higher than the target one, so we need to
|
||||||
|
// filter it. It is guaranteed that further shards for this address will not contain
|
||||||
|
// the target block number, as it's in this shard.
|
||||||
|
let blocks = blocks
|
||||||
|
.iter(0)
|
||||||
|
.skip_while(|block| *block <= to_block as usize)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
if blocks.is_empty() {
|
||||||
|
// If there are no more blocks in this shard, we need to remove it, as empty
|
||||||
|
// shards are not allowed.
|
||||||
|
if key.highest_block_number == u64::MAX {
|
||||||
|
// If current shard is the last shard for this address, replace it with the
|
||||||
|
// previous shard.
|
||||||
|
if let Some((prev_key, prev_value)) = cursor.prev()? {
|
||||||
|
if prev_key.key == key.key {
|
||||||
|
cursor.delete_current()?;
|
||||||
|
// Upsert will replace the last shard for this address with the
|
||||||
|
// previous value
|
||||||
|
cursor.upsert(key.clone(), prev_value)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// If current shard is not the last shard for this address, just delete it.
|
||||||
|
cursor.delete_current()?;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(blocks))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Jump to the next address
|
||||||
|
cursor.seek_exact(ShardedKey::last(key.key))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
processed += 1;
|
||||||
|
if processed % self.batch_sizes.account_history == 0 {
|
||||||
|
trace!(
|
||||||
|
target: "pruner",
|
||||||
|
entries = self.batch_sizes.account_history,
|
||||||
|
"Pruned account history (indices)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if processed % self.batch_sizes.account_history != 0 {
|
||||||
|
trace!(
|
||||||
|
target: "pruner",
|
||||||
|
entries = processed % self.batch_sizes.account_history,
|
||||||
|
"Pruned account history (indices)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
provider.save_prune_checkpoint(
|
||||||
|
PrunePart::AccountHistory,
|
||||||
|
PruneCheckpoint { block_number: to_block, prune_mode },
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use crate::{pruner::BatchSizes, Pruner};
|
use crate::{pruner::BatchSizes, Pruner};
|
||||||
use assert_matches::assert_matches;
|
use assert_matches::assert_matches;
|
||||||
use reth_db::{tables, test_utils::create_test_rw_db};
|
use reth_db::{tables, test_utils::create_test_rw_db, BlockNumberList};
|
||||||
use reth_interfaces::test_utils::{
|
use reth_interfaces::test_utils::{
|
||||||
generators,
|
generators,
|
||||||
generators::{random_block_range, random_receipt},
|
generators::{
|
||||||
|
random_block_range, random_changeset_range, random_eoa_account_range, random_receipt,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use reth_primitives::{
|
use reth_primitives::{
|
||||||
BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET,
|
Address, BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET,
|
||||||
};
|
};
|
||||||
use reth_provider::PruneCheckpointReader;
|
use reth_provider::PruneCheckpointReader;
|
||||||
use reth_stages::test_utils::TestTransaction;
|
use reth_stages::test_utils::TestTransaction;
|
||||||
|
use std::{collections::BTreeMap, ops::AddAssign};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn is_pruning_needed() {
|
fn is_pruning_needed() {
|
||||||
@ -542,4 +674,94 @@ mod tests {
|
|||||||
// ended last time
|
// ended last time
|
||||||
test_prune(20);
|
test_prune(20);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn prune_account_history() {
|
||||||
|
let tx = TestTransaction::default();
|
||||||
|
let mut rng = generators::rng();
|
||||||
|
|
||||||
|
let block_num = 7000;
|
||||||
|
let blocks = random_block_range(&mut rng, 0..=block_num, H256::zero(), 0..1);
|
||||||
|
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
|
||||||
|
|
||||||
|
let accounts =
|
||||||
|
random_eoa_account_range(&mut rng, 0..3).into_iter().collect::<BTreeMap<_, _>>();
|
||||||
|
|
||||||
|
let (changesets, _) = random_changeset_range(
|
||||||
|
&mut rng,
|
||||||
|
blocks.iter(),
|
||||||
|
accounts.clone().into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
||||||
|
0..0,
|
||||||
|
0..0,
|
||||||
|
);
|
||||||
|
tx.insert_changesets(changesets.clone(), None).expect("insert changesets");
|
||||||
|
tx.insert_history(changesets.clone(), None).expect("insert history");
|
||||||
|
|
||||||
|
let account_occurrences = tx.table::<tables::AccountHistory>().unwrap().into_iter().fold(
|
||||||
|
BTreeMap::<Address, usize>::new(),
|
||||||
|
|mut map, (key, _)| {
|
||||||
|
map.entry(key.key).or_default().add_assign(1);
|
||||||
|
map
|
||||||
|
},
|
||||||
|
);
|
||||||
|
assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
tx.table::<tables::AccountChangeSet>().unwrap().len(),
|
||||||
|
changesets.iter().flatten().count()
|
||||||
|
);
|
||||||
|
|
||||||
|
let original_shards = tx.table::<tables::AccountHistory>().unwrap();
|
||||||
|
|
||||||
|
let test_prune = |to_block: BlockNumber| {
|
||||||
|
let prune_mode = PruneMode::Before(to_block);
|
||||||
|
let pruner = Pruner::new(
|
||||||
|
tx.inner_raw(),
|
||||||
|
MAINNET.clone(),
|
||||||
|
5,
|
||||||
|
PruneModes { account_history: Some(prune_mode), ..Default::default() },
|
||||||
|
BatchSizes {
|
||||||
|
// Less than total amount of blocks to prune to test the batching logic
|
||||||
|
account_history: 10,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let provider = tx.inner_rw();
|
||||||
|
assert_matches!(pruner.prune_account_history(&provider, to_block, prune_mode), Ok(()));
|
||||||
|
provider.commit().expect("commit");
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
tx.table::<tables::AccountChangeSet>().unwrap().len(),
|
||||||
|
changesets[to_block as usize + 1..].iter().flatten().count()
|
||||||
|
);
|
||||||
|
|
||||||
|
let actual_shards = tx.table::<tables::AccountHistory>().unwrap();
|
||||||
|
|
||||||
|
let expected_shards = original_shards
|
||||||
|
.iter()
|
||||||
|
.filter(|(key, _)| key.highest_block_number > to_block)
|
||||||
|
.map(|(key, blocks)| {
|
||||||
|
let new_blocks = blocks
|
||||||
|
.iter(0)
|
||||||
|
.skip_while(|block| *block <= to_block as usize)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
assert_eq!(actual_shards, expected_shards);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
tx.inner().get_prune_checkpoint(PrunePart::AccountHistory).unwrap(),
|
||||||
|
Some(PruneCheckpoint { block_number: to_block, prune_mode })
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Prune first time: no previous checkpoint is present
|
||||||
|
test_prune(3000);
|
||||||
|
// Prune second time: previous checkpoint is present, should continue pruning from where
|
||||||
|
// ended last time
|
||||||
|
test_prune(4500);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,8 +8,8 @@ use reth_db::{
|
|||||||
use reth_interfaces::test_utils::{
|
use reth_interfaces::test_utils::{
|
||||||
generators,
|
generators,
|
||||||
generators::{
|
generators::{
|
||||||
random_block_range, random_contract_account_range, random_eoa_account_range,
|
random_block_range, random_changeset_range, random_contract_account_range,
|
||||||
random_transition_range,
|
random_eoa_account_range,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use reth_primitives::{Account, Address, SealedBlock, H256, MAINNET};
|
use reth_primitives::{Account, Address, SealedBlock, H256, MAINNET};
|
||||||
@ -119,7 +119,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
|
|||||||
|
|
||||||
let mut blocks = random_block_range(&mut rng, 0..=num_blocks, H256::zero(), txs_range);
|
let mut blocks = random_block_range(&mut rng, 0..=num_blocks, H256::zero(), txs_range);
|
||||||
|
|
||||||
let (transitions, start_state) = random_transition_range(
|
let (transitions, start_state) = random_changeset_range(
|
||||||
&mut rng,
|
&mut rng,
|
||||||
blocks.iter().take(2),
|
blocks.iter().take(2),
|
||||||
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
||||||
@ -139,10 +139,10 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
|
|||||||
|
|
||||||
let offset = transitions.len() as u64;
|
let offset = transitions.len() as u64;
|
||||||
|
|
||||||
tx.insert_transitions(transitions, None).unwrap();
|
tx.insert_changesets(transitions, None).unwrap();
|
||||||
tx.commit(|tx| updates.flush(tx)).unwrap();
|
tx.commit(|tx| updates.flush(tx)).unwrap();
|
||||||
|
|
||||||
let (transitions, final_state) = random_transition_range(
|
let (transitions, final_state) = random_changeset_range(
|
||||||
&mut rng,
|
&mut rng,
|
||||||
blocks.iter().skip(2),
|
blocks.iter().skip(2),
|
||||||
start_state,
|
start_state,
|
||||||
@ -150,7 +150,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
|
|||||||
key_range,
|
key_range,
|
||||||
);
|
);
|
||||||
|
|
||||||
tx.insert_transitions(transitions, Some(offset)).unwrap();
|
tx.insert_changesets(transitions, Some(offset)).unwrap();
|
||||||
|
|
||||||
tx.insert_accounts_and_storages(final_state).unwrap();
|
tx.insert_accounts_and_storages(final_state).unwrap();
|
||||||
|
|
||||||
|
|||||||
@ -92,7 +92,7 @@ mod tests {
|
|||||||
};
|
};
|
||||||
use reth_interfaces::test_utils::{
|
use reth_interfaces::test_utils::{
|
||||||
generators,
|
generators,
|
||||||
generators::{random_block_range, random_contract_account_range, random_transition_range},
|
generators::{random_block_range, random_changeset_range, random_contract_account_range},
|
||||||
};
|
};
|
||||||
use reth_primitives::{hex_literal::hex, Address, BlockNumber, H160, H256, MAINNET};
|
use reth_primitives::{hex_literal::hex, Address, BlockNumber, H160, H256, MAINNET};
|
||||||
|
|
||||||
@ -408,7 +408,7 @@ mod tests {
|
|||||||
|
|
||||||
let blocks = random_block_range(&mut rng, start..=end, H256::zero(), 0..3);
|
let blocks = random_block_range(&mut rng, start..=end, H256::zero(), 0..3);
|
||||||
|
|
||||||
let (transitions, _) = random_transition_range(
|
let (transitions, _) = random_changeset_range(
|
||||||
&mut rng,
|
&mut rng,
|
||||||
blocks.iter(),
|
blocks.iter(),
|
||||||
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
||||||
@ -417,7 +417,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// add block changeset from block 1.
|
// add block changeset from block 1.
|
||||||
self.tx.insert_transitions(transitions, Some(start))?;
|
self.tx.insert_changesets(transitions, Some(start))?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -91,7 +91,7 @@ mod tests {
|
|||||||
};
|
};
|
||||||
use reth_interfaces::test_utils::{
|
use reth_interfaces::test_utils::{
|
||||||
generators,
|
generators,
|
||||||
generators::{random_block_range, random_contract_account_range, random_transition_range},
|
generators::{random_block_range, random_changeset_range, random_contract_account_range},
|
||||||
};
|
};
|
||||||
use reth_primitives::{
|
use reth_primitives::{
|
||||||
hex_literal::hex, Address, BlockNumber, StorageEntry, H160, H256, MAINNET, U256,
|
hex_literal::hex, Address, BlockNumber, StorageEntry, H160, H256, MAINNET, U256,
|
||||||
@ -422,7 +422,7 @@ mod tests {
|
|||||||
|
|
||||||
let blocks = random_block_range(&mut rng, start..=end, H256::zero(), 0..3);
|
let blocks = random_block_range(&mut rng, start..=end, H256::zero(), 0..3);
|
||||||
|
|
||||||
let (transitions, _) = random_transition_range(
|
let (transitions, _) = random_changeset_range(
|
||||||
&mut rng,
|
&mut rng,
|
||||||
blocks.iter(),
|
blocks.iter(),
|
||||||
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
||||||
@ -431,7 +431,7 @@ mod tests {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// add block changeset from block 1.
|
// add block changeset from block 1.
|
||||||
self.tx.insert_transitions(transitions, Some(start))?;
|
self.tx.insert_changesets(transitions, Some(start))?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -376,8 +376,7 @@ mod tests {
|
|||||||
use reth_interfaces::test_utils::{
|
use reth_interfaces::test_utils::{
|
||||||
generators,
|
generators,
|
||||||
generators::{
|
generators::{
|
||||||
random_block, random_block_range, random_contract_account_range,
|
random_block, random_block_range, random_changeset_range, random_contract_account_range,
|
||||||
random_transition_range,
|
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use reth_primitives::{
|
use reth_primitives::{
|
||||||
@ -533,7 +532,7 @@ mod tests {
|
|||||||
blocks.extend(random_block_range(&mut rng, start..=end, head_hash, 0..3));
|
blocks.extend(random_block_range(&mut rng, start..=end, head_hash, 0..3));
|
||||||
self.tx.insert_blocks(blocks.iter(), None)?;
|
self.tx.insert_blocks(blocks.iter(), None)?;
|
||||||
|
|
||||||
let (transitions, final_state) = random_transition_range(
|
let (transitions, final_state) = random_changeset_range(
|
||||||
&mut rng,
|
&mut rng,
|
||||||
blocks.iter(),
|
blocks.iter(),
|
||||||
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
|
||||||
@ -541,7 +540,7 @@ mod tests {
|
|||||||
0..256,
|
0..256,
|
||||||
);
|
);
|
||||||
// add block changeset from block 1.
|
// add block changeset from block 1.
|
||||||
self.tx.insert_transitions(transitions, Some(start))?;
|
self.tx.insert_changesets(transitions, Some(start))?;
|
||||||
self.tx.insert_accounts_and_storages(final_state)?;
|
self.tx.insert_accounts_and_storages(final_state)?;
|
||||||
|
|
||||||
// Calculate state root
|
// Calculate state root
|
||||||
|
|||||||
@ -9,11 +9,12 @@ use reth_db::{
|
|||||||
transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT},
|
transaction::{DbTx, DbTxGAT, DbTxMut, DbTxMutGAT},
|
||||||
DatabaseEnv, DatabaseError as DbError,
|
DatabaseEnv, DatabaseError as DbError,
|
||||||
};
|
};
|
||||||
|
use reth_interfaces::test_utils::generators::ChangeSet;
|
||||||
use reth_primitives::{
|
use reth_primitives::{
|
||||||
keccak256, Account, Address, BlockNumber, Receipt, SealedBlock, SealedHeader, StorageEntry,
|
keccak256, Account, Address, BlockNumber, Receipt, SealedBlock, SealedHeader, StorageEntry,
|
||||||
TxHash, TxNumber, H256, MAINNET, U256,
|
TxHash, TxNumber, H256, MAINNET, U256,
|
||||||
};
|
};
|
||||||
use reth_provider::{DatabaseProviderRO, DatabaseProviderRW, ProviderFactory};
|
use reth_provider::{DatabaseProviderRO, DatabaseProviderRW, HistoryWriter, ProviderFactory};
|
||||||
use std::{
|
use std::{
|
||||||
borrow::Borrow,
|
borrow::Borrow,
|
||||||
collections::BTreeMap,
|
collections::BTreeMap,
|
||||||
@ -347,35 +348,62 @@ impl TestTransaction {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Insert collection of Vec<([Address], [Account], Vec<[StorageEntry]>)> into
|
/// Insert collection of [ChangeSet] into corresponding tables.
|
||||||
/// corresponding tables.
|
pub fn insert_changesets<I>(
|
||||||
pub fn insert_transitions<I>(
|
|
||||||
&self,
|
&self,
|
||||||
transitions: I,
|
changesets: I,
|
||||||
transition_offset: Option<u64>,
|
block_offset: Option<u64>,
|
||||||
) -> Result<(), DbError>
|
) -> Result<(), DbError>
|
||||||
where
|
where
|
||||||
I: IntoIterator<Item = Vec<(Address, Account, Vec<StorageEntry>)>>,
|
I: IntoIterator<Item = ChangeSet>,
|
||||||
{
|
{
|
||||||
let offset = transition_offset.unwrap_or_default();
|
let offset = block_offset.unwrap_or_default();
|
||||||
self.commit(|tx| {
|
self.commit(|tx| {
|
||||||
transitions.into_iter().enumerate().try_for_each(|(transition_id, changes)| {
|
changesets.into_iter().enumerate().try_for_each(|(block, changeset)| {
|
||||||
changes.into_iter().try_for_each(|(address, old_account, old_storage)| {
|
changeset.into_iter().try_for_each(|(address, old_account, old_storage)| {
|
||||||
let tid = offset + transition_id as u64;
|
let block = offset + block as u64;
|
||||||
// Insert into account changeset.
|
// Insert into account changeset.
|
||||||
tx.put::<tables::AccountChangeSet>(
|
tx.put::<tables::AccountChangeSet>(
|
||||||
tid,
|
block,
|
||||||
AccountBeforeTx { address, info: Some(old_account) },
|
AccountBeforeTx { address, info: Some(old_account) },
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let tid_address = (tid, address).into();
|
let block_address = (block, address).into();
|
||||||
|
|
||||||
// Insert into storage changeset.
|
// Insert into storage changeset.
|
||||||
old_storage.into_iter().try_for_each(|entry| {
|
old_storage.into_iter().try_for_each(|entry| {
|
||||||
tx.put::<tables::StorageChangeSet>(tid_address, entry)
|
tx.put::<tables::StorageChangeSet>(block_address, entry)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn insert_history<I>(
|
||||||
|
&self,
|
||||||
|
changesets: I,
|
||||||
|
block_offset: Option<u64>,
|
||||||
|
) -> reth_interfaces::Result<()>
|
||||||
|
where
|
||||||
|
I: IntoIterator<Item = ChangeSet>,
|
||||||
|
{
|
||||||
|
let mut accounts = BTreeMap::<Address, Vec<u64>>::new();
|
||||||
|
let mut storages = BTreeMap::<(Address, H256), Vec<u64>>::new();
|
||||||
|
|
||||||
|
for (block, changeset) in changesets.into_iter().enumerate() {
|
||||||
|
for (address, _, storage_entries) in changeset {
|
||||||
|
accounts.entry(address).or_default().push(block as u64);
|
||||||
|
for storage_entry in storage_entries {
|
||||||
|
storages.entry((address, storage_entry.key)).or_default().push(block as u64);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let provider = self.factory.provider_rw()?;
|
||||||
|
provider.insert_account_history_index(accounts)?;
|
||||||
|
provider.insert_storage_history_index(storages)?;
|
||||||
|
provider.commit()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,7 +17,7 @@ use reth_db::{
|
|||||||
sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
|
sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
|
||||||
ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals,
|
ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals,
|
||||||
},
|
},
|
||||||
table::{Key, Table},
|
table::Table,
|
||||||
tables,
|
tables,
|
||||||
transaction::{DbTx, DbTxMut},
|
transaction::{DbTx, DbTxMut},
|
||||||
BlockNumberList, DatabaseError,
|
BlockNumberList, DatabaseError,
|
||||||
@ -621,31 +621,23 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
|
|||||||
|
|
||||||
/// Prune the table for the specified pre-sorted key iterator.
|
/// Prune the table for the specified pre-sorted key iterator.
|
||||||
/// Returns number of rows pruned.
|
/// Returns number of rows pruned.
|
||||||
pub fn prune_table<T, K>(
|
pub fn prune_table_with_iterator<T: Table>(
|
||||||
&self,
|
&self,
|
||||||
keys: impl IntoIterator<Item = K>,
|
keys: impl IntoIterator<Item = T::Key>,
|
||||||
) -> std::result::Result<usize, DatabaseError>
|
) -> std::result::Result<usize, DatabaseError> {
|
||||||
where
|
self.prune_table_with_iterator_in_batches::<T>(keys, usize::MAX, |_| {})
|
||||||
T: Table<Key = K>,
|
|
||||||
K: Key,
|
|
||||||
{
|
|
||||||
self.prune_table_in_batches::<T, K>(keys, usize::MAX, |_| {})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Prune the table for the specified pre-sorted key iterator, calling `chunk_callback` after
|
/// Prune the table for the specified pre-sorted key iterator, calling `chunk_callback` after
|
||||||
/// every `batch_size` pruned rows.
|
/// every `batch_size` pruned rows.
|
||||||
///
|
///
|
||||||
/// Returns number of rows pruned.
|
/// Returns number of rows pruned.
|
||||||
pub fn prune_table_in_batches<T, K>(
|
pub fn prune_table_with_iterator_in_batches<T: Table>(
|
||||||
&self,
|
&self,
|
||||||
keys: impl IntoIterator<Item = K>,
|
keys: impl IntoIterator<Item = T::Key>,
|
||||||
batch_size: usize,
|
batch_size: usize,
|
||||||
mut batch_callback: impl FnMut(usize),
|
mut batch_callback: impl FnMut(usize),
|
||||||
) -> std::result::Result<usize, DatabaseError>
|
) -> std::result::Result<usize, DatabaseError> {
|
||||||
where
|
|
||||||
T: Table<Key = K>,
|
|
||||||
K: Key,
|
|
||||||
{
|
|
||||||
let mut cursor = self.tx.cursor_write::<T>()?;
|
let mut cursor = self.tx.cursor_write::<T>()?;
|
||||||
let mut deleted = 0;
|
let mut deleted = 0;
|
||||||
|
|
||||||
@ -667,6 +659,36 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
|
|||||||
Ok(deleted)
|
Ok(deleted)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Prune the table for the specified key range, calling `chunk_callback` after every
|
||||||
|
/// `batch_size` pruned rows.
|
||||||
|
///
|
||||||
|
/// Returns number of rows pruned.
|
||||||
|
pub fn prune_table_with_range_in_batches<T: Table>(
|
||||||
|
&self,
|
||||||
|
keys: impl RangeBounds<T::Key>,
|
||||||
|
batch_size: usize,
|
||||||
|
mut batch_callback: impl FnMut(usize),
|
||||||
|
) -> std::result::Result<usize, DatabaseError> {
|
||||||
|
let mut cursor = self.tx.cursor_write::<T>()?;
|
||||||
|
let mut walker = cursor.walk_range(keys)?;
|
||||||
|
let mut deleted = 0;
|
||||||
|
|
||||||
|
while walker.next().transpose()?.is_some() {
|
||||||
|
walker.delete_current()?;
|
||||||
|
deleted += 1;
|
||||||
|
|
||||||
|
if deleted % batch_size == 0 {
|
||||||
|
batch_callback(batch_size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if deleted % batch_size != 0 {
|
||||||
|
batch_callback(deleted % batch_size);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(deleted)
|
||||||
|
}
|
||||||
|
|
||||||
/// Load shard and remove it. If list is empty, last shard was full or
|
/// Load shard and remove it. If list is empty, last shard was full or
|
||||||
/// there are no shards at all.
|
/// there are no shards at all.
|
||||||
fn take_shard<T>(&self, key: T::Key) -> Result<Vec<u64>>
|
fn take_shard<T>(&self, key: T::Key) -> Result<Vec<u64>>
|
||||||
|
|||||||
Reference in New Issue
Block a user