feat(pruner): shared deletion limit (#4880)

This commit is contained in:
Alexey Shekhirin
2023-10-04 20:12:58 +02:00
committed by GitHub
parent e93edfcdf0
commit af8e3c9ee9
15 changed files with 405 additions and 397 deletions

View File

@ -19,7 +19,7 @@ use reth_interfaces::{
test_utils::{NoopFullBlockClient, TestConsensus},
};
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{BlockNumber, ChainSpec, PruneBatchSizes, PruneModes, B256, U256};
use reth_primitives::{BlockNumber, ChainSpec, PruneModes, B256, U256};
use reth_provider::{
providers::BlockchainProvider, test_utils::TestExecutorFactory, BlockExecutor,
BundleStateWithReceipts, ExecutorFactory, ProviderFactory, PrunableBlockExecutor,
@ -518,7 +518,7 @@ where
self.base_config.chain_spec.clone(),
5,
PruneModes::none(),
PruneBatchSizes::default(),
self.base_config.chain_spec.prune_delete_limit,
watch::channel(None).1,
);

View File

@ -49,7 +49,7 @@ pub enum BlockchainTreeError {
}
/// Result alias for `CanonicalError`
pub type CanonicalResult<T> = std::result::Result<T, CanonicalError>;
pub type CanonicalResult<T> = Result<T, CanonicalError>;
/// Canonical Errors
#[allow(missing_docs)]

View File

@ -39,7 +39,7 @@ pub fn rng() -> StdRng {
/// The headers are assumed to not be correct if validated.
pub fn random_header_range<R: Rng>(
rng: &mut R,
range: std::ops::Range<u64>,
range: Range<u64>,
head: B256,
) -> Vec<SealedHeader> {
let mut headers = Vec::with_capacity(range.end.saturating_sub(range.start) as usize);
@ -204,8 +204,8 @@ pub fn random_changeset_range<'a, R: Rng, IBlk, IAcc>(
rng: &mut R,
blocks: IBlk,
accounts: IAcc,
n_storage_changes: std::ops::Range<u64>,
key_range: std::ops::Range<u64>,
n_storage_changes: Range<u64>,
key_range: Range<u64>,
) -> (Vec<ChangeSet>, BTreeMap<Address, AccountState>)
where
IBlk: IntoIterator<Item = &'a SealedBlock>,
@ -280,8 +280,8 @@ where
pub fn random_account_change<R: Rng>(
rng: &mut R,
valid_addresses: &Vec<Address>,
n_storage_changes: std::ops::Range<u64>,
key_range: std::ops::Range<u64>,
n_storage_changes: Range<u64>,
key_range: Range<u64>,
) -> (Address, Address, U256, Vec<StorageEntry>) {
let mut addresses = valid_addresses.choose_multiple(rng, 2).cloned();
@ -302,7 +302,7 @@ pub fn random_account_change<R: Rng>(
}
/// Generate a random storage change.
pub fn random_storage_entry<R: Rng>(rng: &mut R, key_range: std::ops::Range<u64>) -> StorageEntry {
pub fn random_storage_entry<R: Rng>(rng: &mut R, key_range: Range<u64>) -> StorageEntry {
let key = B256::new({
let n = key_range.sample_single(rng);
let mut m = [0u8; 32];
@ -326,7 +326,7 @@ pub fn random_eoa_account<R: Rng>(rng: &mut R) -> (Address, Account) {
/// Generate random Externally Owned Accounts
pub fn random_eoa_account_range<R: Rng>(
rng: &mut R,
acc_range: std::ops::Range<u64>,
acc_range: Range<u64>,
) -> Vec<(Address, Account)> {
let mut accounts = Vec::with_capacity(acc_range.end.saturating_sub(acc_range.start) as usize);
for _ in acc_range {
@ -338,7 +338,7 @@ pub fn random_eoa_account_range<R: Rng>(
/// Generate random Contract Accounts
pub fn random_contract_account_range<R: Rng>(
rng: &mut R,
acc_range: &mut std::ops::Range<u64>,
acc_range: &mut Range<u64>,
) -> Vec<(Address, Account)> {
let mut accounts = Vec::with_capacity(acc_range.end.saturating_sub(acc_range.start) as usize);
for _ in acc_range {

View File

@ -7,7 +7,7 @@ use crate::{
header::Head,
proofs::genesis_state_root,
Address, BlockNumber, Chain, ForkFilter, ForkHash, ForkId, Genesis, Hardfork, Header,
PruneBatchSizes, SealedHeader, B256, EMPTY_OMMER_ROOT, U256,
SealedHeader, B256, EMPTY_OMMER_ROOT, U256,
};
use once_cell::sync::Lazy;
use revm_primitives::{address, b256};
@ -64,7 +64,7 @@ pub static MAINNET: Lazy<Arc<ChainSpec>> = Lazy::new(|| {
b256!("649bbc62d0e31342afea4e5cd82d4049e7e1ee912fc0889aa790803be39038c5"),
)),
base_fee_params: BaseFeeParams::ethereum(),
prune_batch_sizes: PruneBatchSizes::mainnet(),
prune_delete_limit: 3500,
snapshot_block_interval: 500_000,
}
.into()
@ -107,7 +107,7 @@ pub static GOERLI: Lazy<Arc<ChainSpec>> = Lazy::new(|| {
b256!("649bbc62d0e31342afea4e5cd82d4049e7e1ee912fc0889aa790803be39038c5"),
)),
base_fee_params: BaseFeeParams::ethereum(),
prune_batch_sizes: PruneBatchSizes::testnet(),
prune_delete_limit: 1700,
snapshot_block_interval: 1_000_000,
}
.into()
@ -154,7 +154,7 @@ pub static SEPOLIA: Lazy<Arc<ChainSpec>> = Lazy::new(|| {
b256!("649bbc62d0e31342afea4e5cd82d4049e7e1ee912fc0889aa790803be39038c5"),
)),
base_fee_params: BaseFeeParams::ethereum(),
prune_batch_sizes: PruneBatchSizes::testnet(),
prune_delete_limit: 1700,
snapshot_block_interval: 1_000_000,
}
.into()
@ -196,7 +196,7 @@ pub static HOLESKY: Lazy<Arc<ChainSpec>> = Lazy::new(|| {
b256!("649bbc62d0e31342afea4e5cd82d4049e7e1ee912fc0889aa790803be39038c5"),
)),
base_fee_params: BaseFeeParams::ethereum(),
prune_batch_sizes: PruneBatchSizes::testnet(),
prune_delete_limit: 1700,
snapshot_block_interval: 1_000_000,
}
.into()
@ -302,11 +302,11 @@ pub struct ChainSpec {
/// The parameters that configure how a block's base fee is computed
pub base_fee_params: BaseFeeParams,
/// The batch sizes for pruner, per block. In the actual pruner run it will be multiplied by
/// The delete limit for pruner, per block. In the actual pruner run it will be multiplied by
/// the amount of blocks between pruner runs to account for the difference in amount of new
/// data coming in.
#[serde(default)]
pub prune_batch_sizes: PruneBatchSizes,
pub prune_delete_limit: usize,
/// The block interval for creating snapshots. Each snapshot will have that much blocks in it.
pub snapshot_block_interval: u64,
@ -323,7 +323,7 @@ impl Default for ChainSpec {
hardforks: Default::default(),
deposit_contract: Default::default(),
base_fee_params: BaseFeeParams::ethereum(),
prune_batch_sizes: Default::default(),
prune_delete_limit: MAINNET.prune_delete_limit,
snapshot_block_interval: Default::default(),
}
}

View File

@ -78,7 +78,7 @@ pub use net::{
};
pub use peer::{PeerId, WithPeerId};
pub use prune::{
PruneBatchSizes, PruneCheckpoint, PruneMode, PruneModes, PrunePart, PrunePartError,
PruneCheckpoint, PruneMode, PruneModes, PrunePart, PrunePartError, PruneProgress,
ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE,
};
pub use receipt::{Receipt, ReceiptWithBloom, ReceiptWithBloomRef, Receipts};

View File

@ -1,83 +0,0 @@
use paste::paste;
use serde::{Deserialize, Serialize};
/// Batch sizes for configuring the pruner.
/// The batch size for each prune part should be both large enough to prune the data which was
/// generated with each new block, and small enough to not generate an excessive load on the
/// database due to deletion of too many rows at once.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub struct PruneBatchSizes {
/// Maximum number of receipts to prune, per block.
receipts: usize,
/// Maximum number of transaction lookup entries to prune, per block.
transaction_lookup: usize,
/// Maximum number of transaction senders to prune, per block.
transaction_senders: usize,
/// Maximum number of account history entries to prune, per block.
/// Measured in the number of `AccountChangeSet` table rows.
account_history: usize,
/// Maximum number of storage history entries to prune, per block.
/// Measured in the number of `StorageChangeSet` table rows.
storage_history: usize,
}
macro_rules! impl_prune_batch_size_methods {
($(($human_name:expr, $name:ident)),+) => {
paste! {
impl PruneBatchSizes {
$(
#[doc = concat!("Maximum number of ", $human_name, " to prune, accounting for the block interval.")]
pub fn $name(&self, block_interval: usize) -> usize {
self.$name * block_interval
}
#[doc = concat!("Set the maximum number of ", $human_name, " to prune per block.")]
pub fn [<with_ $name>](mut self, batch_size: usize) -> Self {
self.$name = batch_size;
self
}
)+
}
}
};
}
impl_prune_batch_size_methods!(
("receipts", receipts),
("transaction lookup entries", transaction_lookup),
("transaction senders", transaction_senders),
("account history entries", account_history),
("storage history entries", storage_history)
);
impl PruneBatchSizes {
/// Default prune batch sizes for Ethereum mainnet.
/// These settings are sufficient to prune more data than generated with each new block.
pub const fn mainnet() -> Self {
Self {
receipts: 250,
transaction_lookup: 250,
transaction_senders: 1000,
account_history: 1000,
storage_history: 1000,
}
}
/// Default prune batch sizes for Ethereum testnets.
/// These settings are sufficient to prune more data than generated with each new block.
pub const fn testnet() -> Self {
Self {
receipts: 100,
transaction_lookup: 100,
transaction_senders: 500,
account_history: 500,
storage_history: 500,
}
}
}
impl Default for PruneBatchSizes {
fn default() -> Self {
Self::mainnet()
}
}

View File

@ -1,11 +1,9 @@
mod batch_sizes;
mod checkpoint;
mod mode;
mod part;
mod target;
use crate::{Address, BlockNumber};
pub use batch_sizes::PruneBatchSizes;
pub use checkpoint::PruneCheckpoint;
pub use mode::PruneMode;
pub use part::{PrunePart, PrunePartError};
@ -88,3 +86,31 @@ impl ReceiptsLogPruneConfig {
Ok(lowest.map(|lowest| lowest.max(pruned_block)))
}
}
/// Progress of pruning.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PruneProgress {
/// There is more data to prune.
HasMoreData,
/// Pruning has been finished.
Finished,
}
impl PruneProgress {
/// Creates new [PruneProgress] from `done` boolean value.
///
/// If `done == true`, returns [PruneProgress::Finished], otherwise [PruneProgress::HasMoreData]
/// is returned.
pub fn from_done(done: bool) -> Self {
if done {
Self::Finished
} else {
Self::HasMoreData
}
}
/// Returns `true` if pruning has been finished.
pub fn is_finished(&self) -> bool {
matches!(self, Self::Finished)
}
}

View File

@ -1,4 +1,4 @@
use reth_primitives::{BlockNumber, PrunePart};
use reth_primitives::{BlockNumber, PrunePart, PruneProgress};
use std::{collections::BTreeMap, time::Duration};
/// An event emitted by a [Pruner][crate::Pruner].
@ -8,7 +8,6 @@ pub enum PrunerEvent {
Finished {
tip_block_number: BlockNumber,
elapsed: Duration,
done: bool,
parts_done: BTreeMap<PrunePart, bool>,
parts: BTreeMap<PrunePart, (PruneProgress, usize)>,
},
}

File diff suppressed because it is too large Load Diff

View File

@ -178,14 +178,14 @@ impl<'this, TX: DbTx<'this>> DatabaseProvider<'this, TX> {
}
/// Return full table as Vec
pub fn table<T: Table>(&self) -> std::result::Result<Vec<KeyValue<T>>, DatabaseError>
pub fn table<T: Table>(&self) -> Result<Vec<KeyValue<T>>, DatabaseError>
where
T::Key: Default + Ord,
{
self.tx
.cursor_read::<T>()?
.walk(Some(T::Key::default()))?
.collect::<std::result::Result<Vec<_>, DatabaseError>>()
.collect::<Result<Vec<_>, DatabaseError>>()
}
}
@ -380,7 +380,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
pub fn get_or_take<T: Table, const TAKE: bool>(
&self,
range: impl RangeBounds<T::Key>,
) -> std::result::Result<Vec<KeyValue<T>>, DatabaseError> {
) -> Result<Vec<KeyValue<T>>, DatabaseError> {
if TAKE {
let mut cursor_write = self.tx.cursor_write::<T>()?;
let mut walker = cursor_write.walk_range(range)?;
@ -391,10 +391,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
}
Ok(items)
} else {
self.tx
.cursor_read::<T>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()
self.tx.cursor_read::<T>()?.walk_range(range)?.collect::<Result<Vec<_>, _>>()
}
}
@ -625,7 +622,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
///
/// Note: Key is not inclusive and specified key would stay in db.
#[inline]
pub fn unwind_table_by_num<T>(&self, num: u64) -> std::result::Result<usize, DatabaseError>
pub fn unwind_table_by_num<T>(&self, num: u64) -> Result<usize, DatabaseError>
where
T: Table<Key = u64>,
{
@ -640,7 +637,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
&self,
key: u64,
mut selector: F,
) -> std::result::Result<usize, DatabaseError>
) -> Result<usize, DatabaseError>
where
T: Table,
F: FnMut(T::Key) -> u64,
@ -661,10 +658,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
}
/// Unwind a table forward by a [Walker][reth_db::abstraction::cursor::Walker] on another table
pub fn unwind_table_by_walker<T1, T2>(
&self,
start_at: T1::Key,
) -> std::result::Result<(), DatabaseError>
pub fn unwind_table_by_walker<T1, T2>(&self, start_at: T1::Key) -> Result<(), DatabaseError>
where
T1: Table,
T2: Table<Key = T1::Value>,
@ -685,22 +679,22 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
keys: impl IntoIterator<Item = T::Key>,
limit: usize,
mut delete_callback: impl FnMut(TableRow<T>),
) -> std::result::Result<(usize, bool), DatabaseError> {
) -> Result<(usize, bool), DatabaseError> {
let mut cursor = self.tx.cursor_write::<T>()?;
let mut deleted = 0;
let mut keys = keys.into_iter();
for key in &mut keys {
if deleted == limit {
break
}
let row = cursor.seek_exact(key.clone())?;
if let Some(row) = row {
cursor.delete_current()?;
deleted += 1;
delete_callback(row);
}
if deleted == limit {
break
}
}
Ok((deleted, keys.next().is_none()))
@ -708,28 +702,28 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
/// Prune the table for the specified key range.
///
/// Returns number of total unique keys and total rows pruned pruned.
/// Returns number of rows pruned.
pub fn prune_table_with_range<T: Table>(
&self,
keys: impl RangeBounds<T::Key> + Clone + Debug,
limit: usize,
mut skip_filter: impl FnMut(&TableRow<T>) -> bool,
mut delete_callback: impl FnMut(TableRow<T>),
) -> std::result::Result<(usize, bool), DatabaseError> {
) -> Result<(usize, bool), DatabaseError> {
let mut cursor = self.tx.cursor_write::<T>()?;
let mut walker = cursor.walk_range(keys)?;
let mut deleted = 0;
while let Some(row) = walker.next().transpose()? {
if deleted == limit {
break
}
if !skip_filter(&row) {
walker.delete_current()?;
deleted += 1;
delete_callback(row);
}
if deleted == limit {
break
}
}
Ok((deleted, walker.next().transpose()?.is_none()))
@ -824,7 +818,7 @@ impl<'this, TX: DbTx<'this>> AccountExtReader for DatabaseProvider<'this, TX> {
Ok(iter
.into_iter()
.map(|address| plain_accounts.seek_exact(address).map(|a| (address, a.map(|(_, v)| v))))
.collect::<std::result::Result<Vec<_>, _>>()?)
.collect::<Result<Vec<_>, _>>()?)
}
fn changed_accounts_and_blocks_with_range(
@ -1116,7 +1110,7 @@ impl<'this, TX: DbTx<'this>> BlockReader for DatabaseProvider<'this, TX> {
tx_cursor
.walk_range(tx_range)?
.map(|result| result.map(|(_, tx)| tx.into()))
.collect::<std::result::Result<Vec<_>, _>>()?
.collect::<Result<Vec<_>, _>>()?
};
// If we are past shanghai, then all blocks should have a withdrawal list,
@ -1237,7 +1231,7 @@ impl<'this, TX: DbTx<'this>> TransactionsProvider for DatabaseProvider<'this, TX
let transactions = tx_cursor
.walk_range(tx_range)?
.map(|result| result.map(|(_, tx)| tx.into()))
.collect::<std::result::Result<Vec<_>, _>>()?;
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(transactions))
}
}
@ -1262,7 +1256,7 @@ impl<'this, TX: DbTx<'this>> TransactionsProvider for DatabaseProvider<'this, TX
tx_cursor
.walk_range(tx_num_range)?
.map(|result| result.map(|(_, tx)| tx.into()))
.collect::<std::result::Result<Vec<_>, _>>()?,
.collect::<Result<Vec<_>, _>>()?,
);
}
}
@ -1278,7 +1272,7 @@ impl<'this, TX: DbTx<'this>> TransactionsProvider for DatabaseProvider<'this, TX
.cursor_read::<tables::Transactions>()?
.walk_range(range)?
.map(|entry| entry.map(|tx| tx.1))
.collect::<std::result::Result<Vec<_>, _>>()?)
.collect::<Result<Vec<_>, _>>()?)
}
fn senders_by_tx_range(&self, range: impl RangeBounds<TxNumber>) -> RethResult<Vec<Address>> {
@ -1287,7 +1281,7 @@ impl<'this, TX: DbTx<'this>> TransactionsProvider for DatabaseProvider<'this, TX
.cursor_read::<tables::TxSenders>()?
.walk_range(range)?
.map(|entry| entry.map(|sender| sender.1))
.collect::<std::result::Result<Vec<_>, _>>()?)
.collect::<Result<Vec<_>, _>>()?)
}
fn transaction_sender(&self, id: TxNumber) -> RethResult<Option<Address>> {
@ -1319,7 +1313,7 @@ impl<'this, TX: DbTx<'this>> ReceiptProvider for DatabaseProvider<'this, TX> {
let receipts = receipts_cursor
.walk_range(tx_range)?
.map(|result| result.map(|(_, receipt)| receipt))
.collect::<std::result::Result<Vec<_>, _>>()?;
.collect::<Result<Vec<_>, _>>()?;
Ok(Some(receipts))
}
}
@ -1612,7 +1606,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider
.tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
@ -1710,7 +1704,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider
.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.rev()
// fold all account to get the old balance/nonces and account that needs to be removed
@ -1813,7 +1807,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider
.tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?;
.collect::<Result<Vec<_>, _>>()?;
let changesets = storage_changesets.len();
let last_indices = storage_changesets
@ -1863,7 +1857,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider
.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?;
.collect::<Result<Vec<_>, _>>()?;
let changesets = account_changeset.len();
let last_indices = account_changeset

View File

@ -557,17 +557,14 @@ where
DB: Send + Sync,
Tree: BlockchainTreeEngine,
{
fn buffer_block(
&self,
block: SealedBlockWithSenders,
) -> std::result::Result<(), InsertBlockError> {
fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<(), InsertBlockError> {
self.tree.buffer_block(block)
}
fn insert_block(
&self,
block: SealedBlockWithSenders,
) -> std::result::Result<InsertPayloadOk, InsertBlockError> {
) -> Result<InsertPayloadOk, InsertBlockError> {
self.tree.insert_block(block)
}
@ -628,7 +625,7 @@ where
self.tree.find_canonical_ancestor(hash)
}
fn is_canonical(&self, hash: BlockHash) -> std::result::Result<bool, RethError> {
fn is_canonical(&self, hash: BlockHash) -> Result<bool, RethError> {
self.tree.is_canonical(hash)
}

View File

@ -119,7 +119,7 @@ mod test {
B256::random(),
);
db.update(|tx| -> std::result::Result<(), DatabaseError> {
db.update(|tx| -> Result<(), DatabaseError> {
let mut td = U256::ZERO;
for header in headers.clone() {
td += header.header.difficulty;

View File

@ -79,8 +79,7 @@ mod tests {
.unwrap();
}
let db_data =
cursor.walk_range(..).unwrap().collect::<std::result::Result<Vec<_>, _>>().unwrap();
let db_data = cursor.walk_range(..).unwrap().collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(db_data[0].0.inner.to_vec(), data[0]);
assert_eq!(db_data[1].0.inner.to_vec(), data[1]);
assert_eq!(db_data[2].0.inner.to_vec(), data[2]);