feat(pruner): respect batch size per run (#4246)

Co-authored-by: joshieDo <ranriver@protonmail.com>
This commit is contained in:
Alexey Shekhirin
2023-08-23 18:23:25 +01:00
committed by GitHub
parent 1343644955
commit 312cf724bc
11 changed files with 874 additions and 431 deletions

55
Cargo.lock generated
View File

@ -119,9 +119,9 @@ dependencies = [
[[package]]
name = "aho-corasick"
version = "1.0.4"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6748e8def348ed4d14996fa801f4122cd763fff530258cdc03f64b25f89d3a5a"
checksum = "86b8f9420f797f2d9e935edf629310eb938a0d839f984e25327f3c7eed22300c"
dependencies = [
"memchr",
]
@ -1899,9 +1899,9 @@ dependencies = [
[[package]]
name = "ed25519"
version = "2.2.2"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60f6d271ca33075c88028be6f04d502853d63a5ece419d269c15315d4fc1cf1d"
checksum = "5fb04eee5d9d907f29e80ee6b0e78f7e2c82342c63e3580d8c4f69d9d5aad963"
dependencies = [
"pkcs8",
"signature",
@ -2505,9 +2505,9 @@ dependencies = [
[[package]]
name = "flate2"
version = "1.0.27"
version = "1.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010"
checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743"
dependencies = [
"crc32fast",
"miniz_oxide",
@ -3008,9 +3008,9 @@ checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
[[package]]
name = "httpdate"
version = "1.0.3"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "human_bytes"
@ -3105,7 +3105,7 @@ dependencies = [
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows 0.48.0",
"windows",
]
[[package]]
@ -3925,9 +3925,9 @@ dependencies = [
[[package]]
name = "metrics-process"
version = "1.0.12"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c93f6ad342d3f7bc14724147e2dbc6eb6fdbe5a832ace16ea23b73618e8cc17"
checksum = "006271a8019ad7a9a28cfac2cc40e3ee104d54be763c4a0901e228a63f49d706"
dependencies = [
"libproc",
"mach2",
@ -3935,7 +3935,7 @@ dependencies = [
"once_cell",
"procfs",
"rlimit",
"windows 0.51.1",
"windows",
]
[[package]]
@ -4135,9 +4135,9 @@ dependencies = [
[[package]]
name = "num-complex"
version = "0.4.4"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ba157ca0885411de85d6ca030ba7e2a83a28636056c7c699b07c8b6f7383214"
checksum = "02e0d21255c828d6f128a1e41534206671e8c3ea0c62f32291e808dc82cff17d"
dependencies = [
"num-traits",
]
@ -5098,7 +5098,7 @@ version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a"
dependencies = [
"aho-corasick 1.0.4",
"aho-corasick 1.0.3",
"memchr",
"regex-automata 0.3.6",
"regex-syntax 0.7.4",
@ -5119,7 +5119,7 @@ version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69"
dependencies = [
"aho-corasick 1.0.4",
"aho-corasick 1.0.3",
"memchr",
"regex-syntax 0.7.4",
]
@ -6308,9 +6308,9 @@ dependencies = [
[[package]]
name = "rlimit"
version = "0.10.1"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3560f70f30a0f16d11d01ed078a07740fe6b489667abc7c7b029155d9f21c3d8"
checksum = "f8a29d87a652dc4d43c586328706bb5cdff211f3f39a530f240b53f7221dab8e"
dependencies = [
"libc",
]
@ -8320,25 +8320,6 @@ dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "windows"
version = "0.51.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca229916c5ee38c2f2bc1e9d8f04df975b4bd93f9955dc69fabb5d91270045c9"
dependencies = [
"windows-core",
"windows-targets 0.48.5",
]
[[package]]
name = "windows-core"
version = "0.51.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64"
dependencies = [
"windows-targets 0.48.5",
]
[[package]]
name = "windows-sys"
version = "0.45.0"

View File

@ -3,7 +3,7 @@
use futures::FutureExt;
use reth_db::database::Database;
use reth_primitives::BlockNumber;
use reth_prune::{Pruner, PrunerError, PrunerWithResult};
use reth_prune::{Pruner, PrunerResult, PrunerWithResult};
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
use tokio::sync::oneshot;
@ -116,7 +116,7 @@ pub(crate) enum EnginePruneEvent {
/// If this is returned, the pruner is idle.
Finished {
/// Final result of the pruner run.
result: Result<(), PrunerError>,
result: PrunerResult,
},
/// Pruner task was dropped after it was started, unable to receive it because channel
/// closed. This would indicate a panicked pruner task

View File

@ -239,7 +239,7 @@ where
// deposit in receiving account and update storage
let (prev_to, storage): &mut (Account, BTreeMap<H256, U256>) = state.get_mut(&to).unwrap();
let old_entries = new_entries
let mut old_entries: Vec<_> = new_entries
.into_iter()
.filter_map(|entry| {
let old = if entry.value != U256::ZERO {
@ -254,9 +254,12 @@ where
Some(StorageEntry { value: old.unwrap_or(U256::from(0)), ..entry })
})
.collect();
old_entries.sort_by_key(|entry| entry.key);
changeset.push((to, *prev_to, old_entries));
changeset.sort_by_key(|(address, _, _)| *address);
prev_to.balance = prev_to.balance.wrapping_add(transfer);
changesets.push(changeset);

View File

@ -1,4 +1,4 @@
use crate::{prune::PruneMode, BlockNumber};
use crate::{prune::PruneMode, BlockNumber, TxNumber};
use reth_codecs::{main_codec, Compact};
/// Saves the pruning progress of a stage.
@ -7,7 +7,10 @@ use reth_codecs::{main_codec, Compact};
#[cfg_attr(test, derive(Default))]
pub struct PruneCheckpoint {
/// Highest pruned block number.
pub block_number: BlockNumber,
/// If it's [None], the pruning for block `0` is not finished yet.
pub block_number: Option<BlockNumber>,
/// Highest pruned transaction number, if applicable.
pub tx_number: Option<TxNumber>,
/// Prune mode.
pub prune_mode: PruneMode,
}

View File

@ -49,10 +49,14 @@ impl ReceiptsLogPruneConfig {
// the BTreeMap (block = 0), otherwise it will be excluded.
// Reminder that this BTreeMap works as an inclusion list that excludes (prunes) all
// other receipts.
//
// Reminder, that we increment because the [`BlockNumber`] key of the new map should be
// viewed as `PruneMode::Before(block)`
let block = (pruned_block + 1).max(
mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)?
.map(|(block, _)| block)
.unwrap_or_default(),
.unwrap_or_default() +
1,
);
map.entry(block).or_insert_with(Vec::new).push(address)

File diff suppressed because it is too large Load Diff

View File

@ -251,6 +251,13 @@ where
// append gas used
cumulative_gas_used += result.gas_used();
tracing::trace!(
target: "revm::executor",
hash = ?transaction.hash,
gas_used = result.gas_used(),
"transaction executed"
);
// Push transaction changeset and calculate header bloom filter for receipt.
post_state.add_receipt(
block.number,

View File

@ -212,11 +212,7 @@ fn stage_checkpoint<DB: Database>(
) -> Result<EntitiesCheckpoint, StageError> {
let pruned_entries = provider
.get_prune_checkpoint(PrunePart::SenderRecovery)?
.map(|checkpoint| provider.block_body_indices(checkpoint.block_number))
.transpose()?
.flatten()
// +1 is needed because TxNumber is 0-indexed
.map(|body| body.last_tx_num() + 1)
.and_then(|checkpoint| checkpoint.tx_number)
.unwrap_or_default();
Ok(EntitiesCheckpoint {
// If `TxSenders` table was pruned, we will have a number of entries in it not matching
@ -409,7 +405,13 @@ mod tests {
.save_prune_checkpoint(
PrunePart::SenderRecovery,
PruneCheckpoint {
block_number: max_pruned_block as BlockNumber,
block_number: Some(max_pruned_block),
tx_number: Some(
blocks[..=max_pruned_block as usize]
.iter()
.map(|block| block.body.len() as u64)
.sum::<u64>(),
),
prune_mode: PruneMode::Full,
},
)

View File

@ -13,7 +13,7 @@ use reth_primitives::{
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
PrunePart, TransactionSignedNoHash, TxNumber, H256,
};
use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointReader};
use reth_provider::{DatabaseProviderRW, PruneCheckpointReader};
use tokio::sync::mpsc;
use tracing::*;
@ -186,11 +186,7 @@ fn stage_checkpoint<DB: Database>(
) -> Result<EntitiesCheckpoint, StageError> {
let pruned_entries = provider
.get_prune_checkpoint(PrunePart::TransactionLookup)?
.map(|checkpoint| provider.block_body_indices(checkpoint.block_number))
.transpose()?
.flatten()
// +1 is needed because TxNumber is 0-indexed
.map(|body| body.last_tx_num() + 1)
.and_then(|checkpoint| checkpoint.tx_number)
.unwrap_or_default();
Ok(EntitiesCheckpoint {
// If `TxHashNumber` table was pruned, we will have a number of entries in it not matching
@ -365,7 +361,13 @@ mod tests {
.save_prune_checkpoint(
PrunePart::TransactionLookup,
PruneCheckpoint {
block_number: max_pruned_block as BlockNumber,
block_number: Some(max_pruned_block),
tx_number: Some(
blocks[..=max_pruned_block as usize]
.iter()
.map(|block| block.body.len() as u64)
.sum::<u64>(),
),
prune_mode: PruneMode::Full,
},
)

View File

@ -111,14 +111,18 @@ impl<DB: Database> ProviderFactory<DB> {
// If we pruned account or storage history, we can't return state on every historical block.
// Instead, we should cap it at the latest prune checkpoint for corresponding prune part.
if let Some(prune_checkpoint) = account_history_prune_checkpoint {
if let Some(prune_checkpoint_block_number) =
account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
state_provider = state_provider.with_lowest_available_account_history_block_number(
prune_checkpoint.block_number + 1,
prune_checkpoint_block_number + 1,
);
}
if let Some(prune_checkpoint) = storage_history_prune_checkpoint {
if let Some(prune_checkpoint_block_number) =
storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number)
{
state_provider = state_provider.with_lowest_available_storage_history_block_number(
prune_checkpoint.block_number + 1,
prune_checkpoint_block_number + 1,
);
}

View File

@ -17,7 +17,7 @@ use reth_db::{
sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals,
},
table::Table,
table::{Table, TableRow},
tables,
transaction::{DbTx, DbTxMut},
BlockNumberList, DatabaseError,
@ -624,85 +624,61 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
}
/// Prune the table for the specified pre-sorted key iterator.
///
/// Returns number of rows pruned.
pub fn prune_table_with_iterator<T: Table>(
&self,
keys: impl IntoIterator<Item = T::Key>,
) -> std::result::Result<usize, DatabaseError> {
self.prune_table_with_iterator_in_batches::<T>(keys, usize::MAX, |_| {}, |_| false)
}
/// Prune the table for the specified pre-sorted key iterator, calling `chunk_callback` after
/// every `batch_size` pruned rows with number of total rows pruned.
///
/// `skip_filter` can be used to skip pruning certain elements.
///
/// Returns number of rows pruned.
pub fn prune_table_with_iterator_in_batches<T: Table>(
&self,
keys: impl IntoIterator<Item = T::Key>,
batch_size: usize,
mut batch_callback: impl FnMut(usize),
skip_filter: impl Fn(&T::Value) -> bool,
) -> std::result::Result<usize, DatabaseError> {
limit: usize,
mut delete_callback: impl FnMut(TableRow<T>),
) -> std::result::Result<(usize, bool), DatabaseError> {
let mut cursor = self.tx.cursor_write::<T>()?;
let mut deleted = 0;
for key in keys {
if let Some((_, value)) = cursor.seek_exact(key)? {
if !skip_filter(&value) {
let mut keys = keys.into_iter();
for key in &mut keys {
let row = cursor.seek_exact(key.clone())?;
if let Some(row) = row {
cursor.delete_current()?;
deleted += 1;
delete_callback(row);
}
if deleted == limit {
break
}
}
if deleted % batch_size == 0 {
batch_callback(deleted);
}
Ok((deleted, keys.next().is_none()))
}
if deleted % batch_size != 0 {
batch_callback(deleted);
}
Ok(deleted)
}
/// Prune the table for the specified key range, calling `chunk_callback` after every
/// `batch_size` pruned rows with number of total unique keys and total rows pruned. For dupsort
/// tables, these numbers will be different as one key can correspond to multiple rows.
/// Prune the table for the specified key range.
///
/// Returns number of rows pruned.
pub fn prune_table_with_range_in_batches<T: Table>(
/// Returns number of total unique keys and total rows pruned pruned.
pub fn prune_table_with_range<T: Table>(
&self,
keys: impl RangeBounds<T::Key>,
batch_size: usize,
mut batch_callback: impl FnMut(usize, usize),
) -> std::result::Result<(), DatabaseError> {
keys: impl RangeBounds<T::Key> + Clone + Debug,
limit: usize,
mut skip_filter: impl FnMut(&TableRow<T>) -> bool,
mut delete_callback: impl FnMut(TableRow<T>),
) -> std::result::Result<(usize, bool), DatabaseError> {
let mut cursor = self.tx.cursor_write::<T>()?;
let mut walker = cursor.walk_range(keys)?;
let mut deleted_keys = 0;
let mut deleted_rows = 0;
let mut previous_key = None;
let mut walker = cursor.walk_range(keys.clone())?;
let mut deleted = 0;
while let Some((key, _)) = walker.next().transpose()? {
while let Some(row) = walker.next().transpose()? {
if !skip_filter(&row) {
walker.delete_current()?;
deleted_rows += 1;
if previous_key.as_ref().map(|previous_key| previous_key != &key).unwrap_or(true) {
deleted_keys += 1;
previous_key = Some(key);
deleted += 1;
delete_callback(row);
}
if deleted_rows % batch_size == 0 {
batch_callback(deleted_keys, deleted_rows);
if deleted == limit {
break
}
}
if deleted_rows % batch_size != 0 {
batch_callback(deleted_keys, deleted_rows);
}
Ok(())
Ok((deleted, walker.next().transpose()?.is_none()))
}
/// Load shard and remove it. If list is empty, last shard was full or