perf(pruner): delete history indices by changeset keys (#9312)

Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com>
Co-authored-by: Emilia Hane <elsaemiliaevahane@gmail.com>
This commit is contained in:
Alexey Shekhirin
2024-07-08 17:46:29 +01:00
committed by GitHub
parent ad403b4671
commit a9ebab4c79
5 changed files with 195 additions and 98 deletions

1
Cargo.lock generated
View File

@ -8170,6 +8170,7 @@ dependencies = [
"reth-testing-utils",
"reth-tokio-util",
"reth-tracing",
"rustc-hash 2.0.0",
"thiserror",
"tokio",
"tracing",

View File

@ -35,6 +35,7 @@ thiserror.workspace = true
itertools.workspace = true
rayon.workspace = true
tokio.workspace = true
rustc-hash.workspace = true
[dev-dependencies]
# reth

View File

@ -4,10 +4,12 @@ use crate::{
},
PrunerError,
};
use itertools::Itertools;
use reth_db::tables;
use reth_db_api::{database::Database, models::ShardedKey};
use reth_provider::DatabaseProviderRW;
use reth_prune_types::{PruneInterruptReason, PruneMode, PruneProgress, PruneSegment};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
/// Number of account history tables to prune in one step.
@ -64,34 +66,53 @@ impl<DB: Database> Segment<DB> for AccountHistory {
}
let mut last_changeset_pruned_block = None;
// Deleted account changeset keys (account addresses) with the highest block number deleted
// for that key.
//
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_accounts = FxHashMap::default();
let (pruned_changesets, done) = provider
.prune_table_with_range::<tables::AccountChangeSets>(
range,
&mut limiter,
|_| false,
|row| last_changeset_pruned_block = Some(row.0),
|(block_number, account)| {
highest_deleted_accounts.insert(account.address, block_number);
last_changeset_pruned_block = Some(block_number);
},
)?;
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)");
let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more account account changesets to prune, set the checkpoint block number
// to previous, so we could finish pruning its account changesets on the next run.
// If there's more account changesets to prune, set the checkpoint block number to
// previous, so we could finish pruning its account changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
let (processed, pruned_indices) = prune_history_indices::<DB, tables::AccountsHistory, _>(
// Sort highest deleted block numbers by account address and turn them into sharded keys.
// We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
let highest_sharded_keys = highest_deleted_accounts
.into_iter()
.sorted_unstable() // Unstable is fine because no equal keys exist in the map
.map(|(address, block_number)| {
ShardedKey::new(address, block_number.min(last_changeset_pruned_block))
});
let outcomes = prune_history_indices::<DB, tables::AccountsHistory, _>(
provider,
last_changeset_pruned_block,
highest_sharded_keys,
|a, b| a.key == b.key,
|key| ShardedKey::last(key.key),
)?;
trace!(target: "pruner", %processed, pruned = %pruned_indices, %done, "Pruned account history (history)");
trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)");
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
progress,
pruned: pruned_changesets + pruned_indices,
pruned: pruned_changesets + outcomes.deleted,
checkpoint: Some(PruneOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,

View File

@ -1,5 +1,5 @@
use alloy_primitives::BlockNumber;
use reth_db::BlockNumberList;
use reth_db::{BlockNumberList, RawKey, RawTable, RawValue};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
@ -10,103 +10,151 @@ use reth_db_api::{
};
use reth_provider::DatabaseProviderRW;
/// Prune history indices up to the provided block, inclusive.
enum PruneShardOutcome {
Deleted,
Updated,
Unchanged,
}
#[derive(Debug, Default)]
pub(crate) struct PrunedIndices {
pub(crate) deleted: usize,
pub(crate) updated: usize,
pub(crate) unchanged: usize,
}
/// Prune history indices according to the provided list of highest sharded keys.
///
/// Returns total number of processed (walked) and deleted entities.
/// Returns total number of deleted, updated and unchanged entities.
pub(crate) fn prune_history_indices<DB, T, SK>(
provider: &DatabaseProviderRW<DB>,
to_block: BlockNumber,
highest_sharded_keys: impl IntoIterator<Item = T::Key>,
key_matches: impl Fn(&T::Key, &T::Key) -> bool,
last_key: impl Fn(&T::Key) -> T::Key,
) -> Result<(usize, usize), DatabaseError>
) -> Result<PrunedIndices, DatabaseError>
where
DB: Database,
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<SK>>,
{
let mut processed = 0;
let mut deleted = 0;
let mut cursor = provider.tx_ref().cursor_write::<T>()?;
let mut outcomes = PrunedIndices::default();
let mut cursor = provider.tx_ref().cursor_write::<RawTable<T>>()?;
// Prune history table:
// 1. If the shard has `highest_block_number` less than or equal to the target block number
// for pruning, delete the shard completely.
// 2. If the shard has `highest_block_number` greater than the target block number for
// pruning, filter block numbers inside the shard which are less than the target
// block number for pruning.
while let Some(result) = cursor.next()? {
let (key, blocks): (T::Key, BlockNumberList) = result;
for sharded_key in highest_sharded_keys {
// Seek to the shard that has the key >= the given sharded key
// TODO: optimize
let mut shard = cursor.seek(RawKey::new(sharded_key.clone()))?;
// If shard consists only of block numbers less than the target one, delete shard
// completely.
if key.as_ref().highest_block_number <= to_block {
cursor.delete_current()?;
deleted += 1;
if key.as_ref().highest_block_number == to_block {
// Shard contains only block numbers up to the target one, so we can skip to
// the last shard for this key. It is guaranteed that further shards for this
// sharded key will not contain the target block number, as it's in this shard.
cursor.seek_exact(last_key(&key))?;
}
}
// Shard contains block numbers that are higher than the target one, so we need to
// filter it. It is guaranteed that further shards for this sharded key will not
// contain the target block number, as it's in this shard.
else {
let higher_blocks =
blocks.iter().skip_while(|block| *block <= to_block).collect::<Vec<_>>();
// Get the highest block number that needs to be deleted for this sharded key
let to_block = sharded_key.as_ref().highest_block_number;
// If there were blocks less than or equal to the target one
// (so the shard has changed), update the shard.
if blocks.len() as usize != higher_blocks.len() {
// If there will be no more blocks in the shard after pruning blocks below target
// block, we need to remove it, as empty shards are not allowed.
if higher_blocks.is_empty() {
if key.as_ref().highest_block_number == u64::MAX {
let prev_row = cursor.prev()?;
match prev_row {
// If current shard is the last shard for the sharded key that
// has previous shards, replace it with the previous shard.
Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => {
cursor.delete_current()?;
deleted += 1;
// Upsert will replace the last shard for this sharded key with
// the previous value.
cursor.upsert(key.clone(), prev_value)?;
}
// If there's no previous shard for this sharded key,
// just delete last shard completely.
_ => {
// If we successfully moved the cursor to a previous row,
// jump to the original last shard.
if prev_row.is_some() {
cursor.next()?;
}
// Delete shard.
cursor.delete_current()?;
deleted += 1;
}
}
}
// If current shard is not the last shard for this sharded key,
// just delete it.
else {
cursor.delete_current()?;
deleted += 1;
}
} else {
cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(higher_blocks))?;
'shard: loop {
let Some((key, block_nums)) =
shard.map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v))).transpose()?
else {
break
};
if key_matches(&key, &sharded_key) {
match prune_shard(&mut cursor, key, block_nums, to_block, &key_matches)? {
PruneShardOutcome::Deleted => outcomes.deleted += 1,
PruneShardOutcome::Updated => outcomes.updated += 1,
PruneShardOutcome::Unchanged => outcomes.unchanged += 1,
}
} else {
// If such shard doesn't exist, skip to the next sharded key
break 'shard
}
// Jump to the last shard for this key, if current key isn't already the last shard.
if key.as_ref().highest_block_number != u64::MAX {
cursor.seek_exact(last_key(&key))?;
}
shard = cursor.next()?;
}
processed += 1;
}
Ok((processed, deleted))
Ok(outcomes)
}
/// Prunes one shard of a history table.
///
/// 1. If the shard has `highest_block_number` less than or equal to the target block number for
/// pruning, delete the shard completely.
/// 2. If the shard has `highest_block_number` greater than the target block number for pruning,
/// filter block numbers inside the shard which are less than the target block number for
/// pruning.
fn prune_shard<C, T, SK>(
cursor: &mut C,
key: T::Key,
raw_blocks: RawValue<T::Value>,
to_block: BlockNumber,
key_matches: impl Fn(&T::Key, &T::Key) -> bool,
) -> Result<PruneShardOutcome, DatabaseError>
where
C: DbCursorRO<RawTable<T>> + DbCursorRW<RawTable<T>>,
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<SK>>,
{
// If shard consists only of block numbers less than the target one, delete shard
// completely.
if key.as_ref().highest_block_number <= to_block {
cursor.delete_current()?;
Ok(PruneShardOutcome::Deleted)
}
// Shard contains block numbers that are higher than the target one, so we need to
// filter it. It is guaranteed that further shards for this sharded key will not
// contain the target block number, as it's in this shard.
else {
let blocks = raw_blocks.value()?;
let higher_blocks =
blocks.iter().skip_while(|block| *block <= to_block).collect::<Vec<_>>();
// If there were blocks less than or equal to the target one
// (so the shard has changed), update the shard.
if blocks.len() as usize != higher_blocks.len() {
// If there will be no more blocks in the shard after pruning blocks below target
// block, we need to remove it, as empty shards are not allowed.
if higher_blocks.is_empty() {
if key.as_ref().highest_block_number == u64::MAX {
let prev_row = cursor
.prev()?
.map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v)))
.transpose()?;
match prev_row {
// If current shard is the last shard for the sharded key that
// has previous shards, replace it with the previous shard.
Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => {
cursor.delete_current()?;
// Upsert will replace the last shard for this sharded key with
// the previous value.
cursor.upsert(RawKey::new(key), prev_value)?;
Ok(PruneShardOutcome::Updated)
}
// If there's no previous shard for this sharded key,
// just delete last shard completely.
_ => {
// If we successfully moved the cursor to a previous row,
// jump to the original last shard.
if prev_row.is_some() {
cursor.next()?;
}
// Delete shard.
cursor.delete_current()?;
Ok(PruneShardOutcome::Deleted)
}
}
}
// If current shard is not the last shard for this sharded key,
// just delete it.
else {
cursor.delete_current()?;
Ok(PruneShardOutcome::Deleted)
}
} else {
cursor.upsert(
RawKey::new(key),
RawValue::new(BlockNumberList::new_pre_sorted(higher_blocks)),
)?;
Ok(PruneShardOutcome::Updated)
}
} else {
Ok(PruneShardOutcome::Unchanged)
}
}
}

View File

@ -4,6 +4,7 @@ use crate::{
},
PrunerError,
};
use itertools::Itertools;
use reth_db::tables;
use reth_db_api::{
database::Database,
@ -11,6 +12,7 @@ use reth_db_api::{
};
use reth_provider::DatabaseProviderRW;
use reth_prune_types::{PruneInterruptReason, PruneMode, PruneProgress, PruneSegment};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
/// Number of storage history tables to prune in one step
@ -67,34 +69,58 @@ impl<DB: Database> Segment<DB> for StorageHistory {
}
let mut last_changeset_pruned_block = None;
// Deleted storage changeset keys (account addresses and storage slots) with the highest
// block number deleted for that key.
//
// The size of this map it's limited by `prune_delete_limit * blocks_since_last_run /
// ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5
// / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total
// size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is
// additionally limited by the `max_reorg_depth`, so no OOM is expected here.
let mut highest_deleted_storages = FxHashMap::default();
let (pruned_changesets, done) = provider
.prune_table_with_range::<tables::StorageChangeSets>(
BlockNumberAddress::range(range),
&mut limiter,
|_| false,
|row| last_changeset_pruned_block = Some(row.0.block_number()),
|(BlockNumberAddress((block_number, address)), entry)| {
highest_deleted_storages.insert((address, entry.key), block_number);
last_changeset_pruned_block = Some(block_number);
},
)?;
trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more storage storage changesets to prune, set the checkpoint block number
// to previous, so we could finish pruning its storage changesets on the next run.
// If there's more storage changesets to prune, set the checkpoint block number to
// previous, so we could finish pruning its storage changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
let (processed, pruned_indices) = prune_history_indices::<DB, tables::StoragesHistory, _>(
// Sort highest deleted block numbers by account address and storage key and turn them into
// sharded keys.
// We did not use `BTreeMap` from the beginning, because it's inefficient for hashes.
let highest_sharded_keys = highest_deleted_storages
.into_iter()
.sorted_unstable() // Unstable is fine because no equal keys exist in the map
.map(|((address, storage_key), block_number)| {
StorageShardedKey::new(
address,
storage_key,
block_number.min(last_changeset_pruned_block),
)
});
let outcomes = prune_history_indices::<DB, tables::StoragesHistory, _>(
provider,
last_changeset_pruned_block,
highest_sharded_keys,
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
|key| StorageShardedKey::last(key.address, key.sharded_key.key),
)?;
trace!(target: "pruner", %processed, deleted = %pruned_indices, %done, "Pruned storage history (history)");
trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)");
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
progress,
pruned: pruned_changesets + pruned_indices,
pruned: pruned_changesets + outcomes.deleted,
checkpoint: Some(PruneOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,