diff --git a/Cargo.lock b/Cargo.lock index 5515adabb..62727689e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8170,6 +8170,7 @@ dependencies = [ "reth-testing-utils", "reth-tokio-util", "reth-tracing", + "rustc-hash 2.0.0", "thiserror", "tokio", "tracing", diff --git a/crates/prune/prune/Cargo.toml b/crates/prune/prune/Cargo.toml index b5d9059c9..2f2a37d5b 100644 --- a/crates/prune/prune/Cargo.toml +++ b/crates/prune/prune/Cargo.toml @@ -35,6 +35,7 @@ thiserror.workspace = true itertools.workspace = true rayon.workspace = true tokio.workspace = true +rustc-hash.workspace = true [dev-dependencies] # reth diff --git a/crates/prune/prune/src/segments/account_history.rs b/crates/prune/prune/src/segments/account_history.rs index ab2800a31..28e448560 100644 --- a/crates/prune/prune/src/segments/account_history.rs +++ b/crates/prune/prune/src/segments/account_history.rs @@ -4,10 +4,12 @@ use crate::{ }, PrunerError, }; +use itertools::Itertools; use reth_db::tables; use reth_db_api::{database::Database, models::ShardedKey}; use reth_provider::DatabaseProviderRW; use reth_prune_types::{PruneInterruptReason, PruneMode, PruneProgress, PruneSegment}; +use rustc_hash::FxHashMap; use tracing::{instrument, trace}; /// Number of account history tables to prune in one step. @@ -64,34 +66,53 @@ impl Segment for AccountHistory { } let mut last_changeset_pruned_block = None; + // Deleted account changeset keys (account addresses) with the highest block number deleted + // for that key. + // + // The size of this map it's limited by `prune_delete_limit * blocks_since_last_run / + // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5 + // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total + // size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is + // additionally limited by the `max_reorg_depth`, so no OOM is expected here. + let mut highest_deleted_accounts = FxHashMap::default(); let (pruned_changesets, done) = provider .prune_table_with_range::( range, &mut limiter, |_| false, - |row| last_changeset_pruned_block = Some(row.0), + |(block_number, account)| { + highest_deleted_accounts.insert(account.address, block_number); + last_changeset_pruned_block = Some(block_number); + }, )?; trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)"); let last_changeset_pruned_block = last_changeset_pruned_block - // If there's more account account changesets to prune, set the checkpoint block number - // to previous, so we could finish pruning its account changesets on the next run. + // If there's more account changesets to prune, set the checkpoint block number to + // previous, so we could finish pruning its account changesets on the next run. .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) .unwrap_or(range_end); - let (processed, pruned_indices) = prune_history_indices::( + // Sort highest deleted block numbers by account address and turn them into sharded keys. + // We did not use `BTreeMap` from the beginning, because it's inefficient for hashes. + let highest_sharded_keys = highest_deleted_accounts + .into_iter() + .sorted_unstable() // Unstable is fine because no equal keys exist in the map + .map(|(address, block_number)| { + ShardedKey::new(address, block_number.min(last_changeset_pruned_block)) + }); + let outcomes = prune_history_indices::( provider, - last_changeset_pruned_block, + highest_sharded_keys, |a, b| a.key == b.key, - |key| ShardedKey::last(key.key), )?; - trace!(target: "pruner", %processed, pruned = %pruned_indices, %done, "Pruned account history (history)"); + trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)"); let progress = PruneProgress::new(done, &limiter); Ok(PruneOutput { progress, - pruned: pruned_changesets + pruned_indices, + pruned: pruned_changesets + outcomes.deleted, checkpoint: Some(PruneOutputCheckpoint { block_number: Some(last_changeset_pruned_block), tx_number: None, diff --git a/crates/prune/prune/src/segments/history.rs b/crates/prune/prune/src/segments/history.rs index ee841ef89..ff477a39f 100644 --- a/crates/prune/prune/src/segments/history.rs +++ b/crates/prune/prune/src/segments/history.rs @@ -1,5 +1,5 @@ use alloy_primitives::BlockNumber; -use reth_db::BlockNumberList; +use reth_db::{BlockNumberList, RawKey, RawTable, RawValue}; use reth_db_api::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -10,103 +10,151 @@ use reth_db_api::{ }; use reth_provider::DatabaseProviderRW; -/// Prune history indices up to the provided block, inclusive. +enum PruneShardOutcome { + Deleted, + Updated, + Unchanged, +} + +#[derive(Debug, Default)] +pub(crate) struct PrunedIndices { + pub(crate) deleted: usize, + pub(crate) updated: usize, + pub(crate) unchanged: usize, +} + +/// Prune history indices according to the provided list of highest sharded keys. /// -/// Returns total number of processed (walked) and deleted entities. +/// Returns total number of deleted, updated and unchanged entities. pub(crate) fn prune_history_indices( provider: &DatabaseProviderRW, - to_block: BlockNumber, + highest_sharded_keys: impl IntoIterator, key_matches: impl Fn(&T::Key, &T::Key) -> bool, - last_key: impl Fn(&T::Key) -> T::Key, -) -> Result<(usize, usize), DatabaseError> +) -> Result where DB: Database, T: Table, T::Key: AsRef>, { - let mut processed = 0; - let mut deleted = 0; - let mut cursor = provider.tx_ref().cursor_write::()?; + let mut outcomes = PrunedIndices::default(); + let mut cursor = provider.tx_ref().cursor_write::>()?; - // Prune history table: - // 1. If the shard has `highest_block_number` less than or equal to the target block number - // for pruning, delete the shard completely. - // 2. If the shard has `highest_block_number` greater than the target block number for - // pruning, filter block numbers inside the shard which are less than the target - // block number for pruning. - while let Some(result) = cursor.next()? { - let (key, blocks): (T::Key, BlockNumberList) = result; + for sharded_key in highest_sharded_keys { + // Seek to the shard that has the key >= the given sharded key + // TODO: optimize + let mut shard = cursor.seek(RawKey::new(sharded_key.clone()))?; - // If shard consists only of block numbers less than the target one, delete shard - // completely. - if key.as_ref().highest_block_number <= to_block { - cursor.delete_current()?; - deleted += 1; - if key.as_ref().highest_block_number == to_block { - // Shard contains only block numbers up to the target one, so we can skip to - // the last shard for this key. It is guaranteed that further shards for this - // sharded key will not contain the target block number, as it's in this shard. - cursor.seek_exact(last_key(&key))?; - } - } - // Shard contains block numbers that are higher than the target one, so we need to - // filter it. It is guaranteed that further shards for this sharded key will not - // contain the target block number, as it's in this shard. - else { - let higher_blocks = - blocks.iter().skip_while(|block| *block <= to_block).collect::>(); + // Get the highest block number that needs to be deleted for this sharded key + let to_block = sharded_key.as_ref().highest_block_number; - // If there were blocks less than or equal to the target one - // (so the shard has changed), update the shard. - if blocks.len() as usize != higher_blocks.len() { - // If there will be no more blocks in the shard after pruning blocks below target - // block, we need to remove it, as empty shards are not allowed. - if higher_blocks.is_empty() { - if key.as_ref().highest_block_number == u64::MAX { - let prev_row = cursor.prev()?; - match prev_row { - // If current shard is the last shard for the sharded key that - // has previous shards, replace it with the previous shard. - Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => { - cursor.delete_current()?; - deleted += 1; - // Upsert will replace the last shard for this sharded key with - // the previous value. - cursor.upsert(key.clone(), prev_value)?; - } - // If there's no previous shard for this sharded key, - // just delete last shard completely. - _ => { - // If we successfully moved the cursor to a previous row, - // jump to the original last shard. - if prev_row.is_some() { - cursor.next()?; - } - // Delete shard. - cursor.delete_current()?; - deleted += 1; - } - } - } - // If current shard is not the last shard for this sharded key, - // just delete it. - else { - cursor.delete_current()?; - deleted += 1; - } - } else { - cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(higher_blocks))?; + 'shard: loop { + let Some((key, block_nums)) = + shard.map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v))).transpose()? + else { + break + }; + + if key_matches(&key, &sharded_key) { + match prune_shard(&mut cursor, key, block_nums, to_block, &key_matches)? { + PruneShardOutcome::Deleted => outcomes.deleted += 1, + PruneShardOutcome::Updated => outcomes.updated += 1, + PruneShardOutcome::Unchanged => outcomes.unchanged += 1, } + } else { + // If such shard doesn't exist, skip to the next sharded key + break 'shard } - // Jump to the last shard for this key, if current key isn't already the last shard. - if key.as_ref().highest_block_number != u64::MAX { - cursor.seek_exact(last_key(&key))?; - } + shard = cursor.next()?; } - - processed += 1; } - Ok((processed, deleted)) + Ok(outcomes) +} + +/// Prunes one shard of a history table. +/// +/// 1. If the shard has `highest_block_number` less than or equal to the target block number for +/// pruning, delete the shard completely. +/// 2. If the shard has `highest_block_number` greater than the target block number for pruning, +/// filter block numbers inside the shard which are less than the target block number for +/// pruning. +fn prune_shard( + cursor: &mut C, + key: T::Key, + raw_blocks: RawValue, + to_block: BlockNumber, + key_matches: impl Fn(&T::Key, &T::Key) -> bool, +) -> Result +where + C: DbCursorRO> + DbCursorRW>, + T: Table, + T::Key: AsRef>, +{ + // If shard consists only of block numbers less than the target one, delete shard + // completely. + if key.as_ref().highest_block_number <= to_block { + cursor.delete_current()?; + Ok(PruneShardOutcome::Deleted) + } + // Shard contains block numbers that are higher than the target one, so we need to + // filter it. It is guaranteed that further shards for this sharded key will not + // contain the target block number, as it's in this shard. + else { + let blocks = raw_blocks.value()?; + let higher_blocks = + blocks.iter().skip_while(|block| *block <= to_block).collect::>(); + + // If there were blocks less than or equal to the target one + // (so the shard has changed), update the shard. + if blocks.len() as usize != higher_blocks.len() { + // If there will be no more blocks in the shard after pruning blocks below target + // block, we need to remove it, as empty shards are not allowed. + if higher_blocks.is_empty() { + if key.as_ref().highest_block_number == u64::MAX { + let prev_row = cursor + .prev()? + .map(|(k, v)| Result::<_, DatabaseError>::Ok((k.key()?, v))) + .transpose()?; + match prev_row { + // If current shard is the last shard for the sharded key that + // has previous shards, replace it with the previous shard. + Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => { + cursor.delete_current()?; + // Upsert will replace the last shard for this sharded key with + // the previous value. + cursor.upsert(RawKey::new(key), prev_value)?; + Ok(PruneShardOutcome::Updated) + } + // If there's no previous shard for this sharded key, + // just delete last shard completely. + _ => { + // If we successfully moved the cursor to a previous row, + // jump to the original last shard. + if prev_row.is_some() { + cursor.next()?; + } + // Delete shard. + cursor.delete_current()?; + Ok(PruneShardOutcome::Deleted) + } + } + } + // If current shard is not the last shard for this sharded key, + // just delete it. + else { + cursor.delete_current()?; + Ok(PruneShardOutcome::Deleted) + } + } else { + cursor.upsert( + RawKey::new(key), + RawValue::new(BlockNumberList::new_pre_sorted(higher_blocks)), + )?; + Ok(PruneShardOutcome::Updated) + } + } else { + Ok(PruneShardOutcome::Unchanged) + } + } } diff --git a/crates/prune/prune/src/segments/storage_history.rs b/crates/prune/prune/src/segments/storage_history.rs index 3e7ad86a7..95e9afa0a 100644 --- a/crates/prune/prune/src/segments/storage_history.rs +++ b/crates/prune/prune/src/segments/storage_history.rs @@ -4,6 +4,7 @@ use crate::{ }, PrunerError, }; +use itertools::Itertools; use reth_db::tables; use reth_db_api::{ database::Database, @@ -11,6 +12,7 @@ use reth_db_api::{ }; use reth_provider::DatabaseProviderRW; use reth_prune_types::{PruneInterruptReason, PruneMode, PruneProgress, PruneSegment}; +use rustc_hash::FxHashMap; use tracing::{instrument, trace}; /// Number of storage history tables to prune in one step @@ -67,34 +69,58 @@ impl Segment for StorageHistory { } let mut last_changeset_pruned_block = None; + // Deleted storage changeset keys (account addresses and storage slots) with the highest + // block number deleted for that key. + // + // The size of this map it's limited by `prune_delete_limit * blocks_since_last_run / + // ACCOUNT_HISTORY_TABLES_TO_PRUNE`, and with current default it's usually `3500 * 5 + // / 2`, so 8750 entries. Each entry is `160 bit + 256 bit + 64 bit`, so the total + // size should be up to 0.5MB + some hashmap overhead. `blocks_since_last_run` is + // additionally limited by the `max_reorg_depth`, so no OOM is expected here. + let mut highest_deleted_storages = FxHashMap::default(); let (pruned_changesets, done) = provider .prune_table_with_range::( BlockNumberAddress::range(range), &mut limiter, |_| false, - |row| last_changeset_pruned_block = Some(row.0.block_number()), + |(BlockNumberAddress((block_number, address)), entry)| { + highest_deleted_storages.insert((address, entry.key), block_number); + last_changeset_pruned_block = Some(block_number); + }, )?; trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)"); let last_changeset_pruned_block = last_changeset_pruned_block - // If there's more storage storage changesets to prune, set the checkpoint block number - // to previous, so we could finish pruning its storage changesets on the next run. + // If there's more storage changesets to prune, set the checkpoint block number to + // previous, so we could finish pruning its storage changesets on the next run. .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) .unwrap_or(range_end); - let (processed, pruned_indices) = prune_history_indices::( + // Sort highest deleted block numbers by account address and storage key and turn them into + // sharded keys. + // We did not use `BTreeMap` from the beginning, because it's inefficient for hashes. + let highest_sharded_keys = highest_deleted_storages + .into_iter() + .sorted_unstable() // Unstable is fine because no equal keys exist in the map + .map(|((address, storage_key), block_number)| { + StorageShardedKey::new( + address, + storage_key, + block_number.min(last_changeset_pruned_block), + ) + }); + let outcomes = prune_history_indices::( provider, - last_changeset_pruned_block, + highest_sharded_keys, |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, - |key| StorageShardedKey::last(key.address, key.sharded_key.key), )?; - trace!(target: "pruner", %processed, deleted = %pruned_indices, %done, "Pruned storage history (history)"); + trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)"); let progress = PruneProgress::new(done, &limiter); Ok(PruneOutput { progress, - pruned: pruned_changesets + pruned_indices, + pruned: pruned_changesets + outcomes.deleted, checkpoint: Some(PruneOutputCheckpoint { block_number: Some(last_changeset_pruned_block), tx_number: None,