refactor(prune): impl Segment for other prune segments (#4899)

This commit is contained in:
Alexey Shekhirin
2023-10-12 10:13:01 +03:00
committed by GitHub
parent 65cc314ab5
commit 18311976a1
14 changed files with 1460 additions and 1471 deletions

View File

@ -163,8 +163,8 @@ impl NodeState {
fn handle_pruner_event(&self, event: PrunerEvent) {
match event {
PrunerEvent::Finished { tip_block_number, elapsed, segments } => {
info!(tip_block_number, ?elapsed, ?segments, "Pruner finished");
PrunerEvent::Finished { tip_block_number, elapsed, stats } => {
info!(tip_block_number, ?elapsed, ?stats, "Pruner finished");
}
}
}

View File

@ -110,9 +110,4 @@ impl PruneProgress {
Self::HasMoreData
}
}
/// Returns `true` if pruning has been finished.
pub fn is_finished(&self) -> bool {
matches!(self, Self::Finished)
}
}

View File

@ -8,6 +8,6 @@ pub enum PrunerEvent {
Finished {
tip_block_number: BlockNumber,
elapsed: Duration,
segments: BTreeMap<PruneSegment, (PruneProgress, usize)>,
stats: BTreeMap<PruneSegment, (PruneProgress, usize)>,
},
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,226 @@
use crate::{
segments::{
history::prune_history_indices, PruneInput, PruneOutput, PruneOutputCheckpoint, Segment,
},
PrunerError,
};
use reth_db::{database::Database, models::ShardedKey, tables};
use reth_primitives::PruneSegment;
use reth_provider::DatabaseProviderRW;
use tracing::{instrument, trace};
#[derive(Default)]
#[non_exhaustive]
pub(crate) struct AccountHistory;
impl<DB: Database> Segment<DB> for AccountHistory {
fn segment(&self) -> PruneSegment {
PruneSegment::AccountHistory
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune(
&self,
provider: &DatabaseProviderRW<'_, DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
let range = match input.get_next_block_range() {
Some(range) => range,
None => {
trace!(target: "pruner", "No account history to prune");
return Ok(PruneOutput::done())
}
};
let range_end = *range.end();
let mut last_changeset_pruned_block = None;
let (pruned_changesets, done) = provider
.prune_table_with_range::<tables::AccountChangeSet>(
range,
input.delete_limit / 2,
|_| false,
|row| last_changeset_pruned_block = Some(row.0),
)?;
trace!(target: "pruner", pruned = %pruned_changesets, %done, "Pruned account history (changesets)");
let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more account account changesets to prune, set the checkpoint block number
// to previous, so we could finish pruning its account changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
let (processed, pruned_indices) = prune_history_indices::<DB, tables::AccountHistory, _>(
provider,
last_changeset_pruned_block,
|a, b| a.key == b.key,
|key| ShardedKey::last(key.key),
)?;
trace!(target: "pruner", %processed, pruned = %pruned_indices, %done, "Pruned account history (history)" );
Ok(PruneOutput {
done,
pruned: pruned_changesets + pruned_indices,
checkpoint: Some(PruneOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
#[cfg(test)]
mod tests {
use crate::segments::{AccountHistory, PruneInput, PruneOutput, Segment};
use assert_matches::assert_matches;
use reth_db::{tables, BlockNumberList};
use reth_interfaces::test_utils::{
generators,
generators::{random_block_range, random_changeset_range, random_eoa_account_range},
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestTransaction;
use std::{collections::BTreeMap, ops::AddAssign};
#[test]
fn prune() {
let tx = TestTransaction::default();
let mut rng = generators::rng();
let blocks = random_block_range(&mut rng, 1..=5000, B256::ZERO, 0..1);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
let accounts =
random_eoa_account_range(&mut rng, 0..2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
0..0,
0..0,
);
tx.insert_changesets(changesets.clone(), None).expect("insert changesets");
tx.insert_history(changesets.clone(), None).expect("insert history");
let account_occurrences = tx.table::<tables::AccountHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry(key.key).or_default().add_assign(1);
map
},
);
assert!(account_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
assert_eq!(
tx.table::<tables::AccountChangeSet>().unwrap().len(),
changesets.iter().flatten().count()
);
let original_shards = tx.table::<tables::AccountHistory>().unwrap();
let test_prune = |to_block: BlockNumber, run: usize, expected_result: (bool, usize)| {
let prune_mode = PruneMode::Before(to_block);
let input = PruneInput {
previous_checkpoint: tx
.inner()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
to_block,
delete_limit: 2000,
};
let segment = AccountHistory::default();
let provider = tx.inner_rw();
let result = segment.prune(&provider, input).unwrap();
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().map(move |change| (block_number, change))
})
.collect::<Vec<_>>();
#[allow(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _))| {
*i < input.delete_limit / 2 * run && *block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
let mut pruned_changesets = changesets
.iter()
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
.skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| if result.done {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
let pruned_changesets = pruned_changesets.fold(
BTreeMap::<_, Vec<_>>::new(),
|mut acc, (block_number, change)| {
acc.entry(block_number).or_default().push(change);
acc
},
);
assert_eq!(
tx.table::<tables::AccountChangeSet>().unwrap().len(),
pruned_changesets.values().flatten().count()
);
let actual_shards = tx.table::<tables::AccountHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks = blocks
.iter(0)
.skip_while(|block| *block <= last_pruned_block_number as usize)
.collect::<Vec<_>>();
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
tx.inner().get_prune_checkpoint(PruneSegment::AccountHistory).unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(998, 1, (false, 1000));
test_prune(998, 2, (true, 998));
test_prune(1400, 3, (true, 804));
}
}

View File

@ -14,16 +14,18 @@ use tracing::{instrument, trace};
#[non_exhaustive]
pub(crate) struct Headers;
impl Segment for Headers {
const SEGMENT: PruneSegment = PruneSegment::Headers;
impl<DB: Database> Segment<DB> for Headers {
fn segment(&self) -> PruneSegment {
PruneSegment::Headers
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune<DB: Database>(
fn prune(
&self,
provider: &DatabaseProviderRW<'_, DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
let block_range = match input.get_next_block_range(provider, Self::SEGMENT)? {
let block_range = match input.get_next_block_range() {
Some(range) => range,
None => {
trace!(target: "pruner", "No headers to prune");
@ -101,7 +103,7 @@ mod tests {
use assert_matches::assert_matches;
use reth_db::tables;
use reth_interfaces::test_utils::{generators, generators::random_header_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, B256};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestTransaction;
@ -119,12 +121,19 @@ mod tests {
let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| {
let prune_mode = PruneMode::Before(to_block);
let input = PruneInput { to_block, delete_limit: 10 };
let input = PruneInput {
previous_checkpoint: tx
.inner()
.get_prune_checkpoint(PruneSegment::Headers)
.unwrap(),
to_block,
delete_limit: 10,
};
let segment = Headers::default();
let next_block_number_to_prune = tx
.inner()
.get_prune_checkpoint(Headers::SEGMENT)
.get_prune_checkpoint(PruneSegment::Headers)
.unwrap()
.and_then(|checkpoint| checkpoint.block_number)
.map(|block_number| block_number + 1)
@ -161,7 +170,7 @@ mod tests {
headers.len() - (last_pruned_block_number + 1) as usize
);
assert_eq!(
tx.inner().get_prune_checkpoint(Headers::SEGMENT).unwrap(),
tx.inner().get_prune_checkpoint(PruneSegment::Headers).unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
@ -179,6 +188,7 @@ mod tests {
let tx = TestTransaction::default();
let input = PruneInput {
previous_checkpoint: None,
to_block: 1,
// Less than total number of tables for `Headers` segment
delete_limit: 2,

View File

@ -0,0 +1,112 @@
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
models::ShardedKey,
table::Table,
transaction::DbTxMut,
BlockNumberList,
};
use reth_interfaces::db::DatabaseError;
use reth_primitives::BlockNumber;
use reth_provider::DatabaseProviderRW;
/// Prune history indices up to the provided block, inclusive.
///
/// Returns total number of processed (walked) and deleted entities.
pub(crate) fn prune_history_indices<DB, T, SK>(
provider: &DatabaseProviderRW<'_, DB>,
to_block: BlockNumber,
key_matches: impl Fn(&T::Key, &T::Key) -> bool,
last_key: impl Fn(&T::Key) -> T::Key,
) -> Result<(usize, usize), DatabaseError>
where
DB: Database,
T: Table<Value = BlockNumberList>,
T::Key: AsRef<ShardedKey<SK>>,
{
let mut processed = 0;
let mut deleted = 0;
let mut cursor = provider.tx_ref().cursor_write::<T>()?;
// Prune history table:
// 1. If the shard has `highest_block_number` less than or equal to the target block number
// for pruning, delete the shard completely.
// 2. If the shard has `highest_block_number` greater than the target block number for
// pruning, filter block numbers inside the shard which are less than the target
// block number for pruning.
while let Some(result) = cursor.next()? {
let (key, blocks): (T::Key, BlockNumberList) = result;
// If shard consists only of block numbers less than the target one, delete shard
// completely.
if key.as_ref().highest_block_number <= to_block {
cursor.delete_current()?;
deleted += 1;
if key.as_ref().highest_block_number == to_block {
// Shard contains only block numbers up to the target one, so we can skip to
// the last shard for this key. It is guaranteed that further shards for this
// sharded key will not contain the target block number, as it's in this shard.
cursor.seek_exact(last_key(&key))?;
}
}
// Shard contains block numbers that are higher than the target one, so we need to
// filter it. It is guaranteed that further shards for this sharded key will not
// contain the target block number, as it's in this shard.
else {
let new_blocks =
blocks.iter(0).skip_while(|block| *block <= to_block as usize).collect::<Vec<_>>();
// If there were blocks less than or equal to the target one
// (so the shard has changed), update the shard.
if blocks.len() != new_blocks.len() {
// If there are no more blocks in this shard, we need to remove it, as empty
// shards are not allowed.
if new_blocks.is_empty() {
if key.as_ref().highest_block_number == u64::MAX {
let prev_row = cursor.prev()?;
match prev_row {
// If current shard is the last shard for the sharded key that
// has previous shards, replace it with the previous shard.
Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => {
cursor.delete_current()?;
deleted += 1;
// Upsert will replace the last shard for this sharded key with
// the previous value.
cursor.upsert(key.clone(), prev_value)?;
}
// If there's no previous shard for this sharded key,
// just delete last shard completely.
_ => {
// If we successfully moved the cursor to a previous row,
// jump to the original last shard.
if prev_row.is_some() {
cursor.next()?;
}
// Delete shard.
cursor.delete_current()?;
deleted += 1;
}
}
}
// If current shard is not the last shard for this sharded key,
// just delete it.
else {
cursor.delete_current()?;
deleted += 1;
}
} else {
cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))?;
}
}
// Jump to the last shard for this key, if current key isn't already the last shard.
if key.as_ref().highest_block_number != u64::MAX {
cursor.seek_exact(last_key(&key))?;
}
}
processed += 1;
}
Ok((processed, deleted))
}

View File

@ -1,18 +1,27 @@
mod account_history;
mod headers;
mod history;
mod receipts;
mod receipts_by_logs;
mod sender_recovery;
mod storage_history;
mod transaction_lookup;
mod transactions;
pub(crate) use account_history::AccountHistory;
pub(crate) use headers::Headers;
pub(crate) use receipts::Receipts;
pub(crate) use receipts_by_logs::ReceiptsByLogs;
pub(crate) use sender_recovery::SenderRecovery;
pub(crate) use storage_history::StorageHistory;
pub(crate) use transaction_lookup::TransactionLookup;
pub(crate) use transactions::Transactions;
use crate::PrunerError;
use reth_db::database::Database;
use reth_interfaces::RethResult;
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber};
use reth_provider::{
BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter,
};
use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointWriter};
use std::ops::RangeInclusive;
use tracing::error;
@ -23,30 +32,30 @@ use tracing::error;
/// 2. If [Segment::prune] returned a [Some] in `checkpoint` of [PruneOutput], call
/// [Segment::save_checkpoint].
/// 3. Subtract `pruned` of [PruneOutput] from `delete_limit` of next [PruneInput].
pub(crate) trait Segment {
/// Segment of the data that's pruned.
const SEGMENT: PruneSegment;
pub(crate) trait Segment<DB: Database> {
fn segment(&self) -> PruneSegment;
/// Prune data for [Self::SEGMENT] using the provided input.
fn prune<DB: Database>(
/// Prune data for [Self::segment] using the provided input.
fn prune(
&self,
provider: &DatabaseProviderRW<'_, DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError>;
/// Save checkpoint for [Self::SEGMENT] to the database.
fn save_checkpoint<DB: Database>(
/// Save checkpoint for [Self::segment] to the database.
fn save_checkpoint(
&self,
provider: &DatabaseProviderRW<'_, DB>,
checkpoint: PruneCheckpoint,
) -> RethResult<()> {
provider.save_prune_checkpoint(Self::SEGMENT, checkpoint)
provider.save_prune_checkpoint(self.segment(), checkpoint)
}
}
/// Segment pruning input, see [Segment::prune].
#[derive(Debug, Clone, Copy)]
pub(crate) struct PruneInput {
pub(crate) previous_checkpoint: Option<PruneCheckpoint>,
/// Target block up to which the pruning needs to be done, inclusive.
pub(crate) to_block: BlockNumber,
/// Maximum entries to delete from the database.
@ -62,18 +71,16 @@ impl PruneInput {
/// 2. If checkpoint doesn't exist, return 0.
///
/// To get the range end: get last tx number for `to_block`.
pub(crate) fn get_next_tx_num_range_from_checkpoint<DB: Database>(
pub(crate) fn get_next_tx_num_range<DB: Database>(
&self,
provider: &DatabaseProviderRW<'_, DB>,
segment: PruneSegment,
) -> RethResult<Option<RangeInclusive<TxNumber>>> {
let from_tx_number = provider
.get_prune_checkpoint(segment)?
let from_tx_number = self.previous_checkpoint
// Checkpoint exists, prune from the next transaction after the highest pruned one
.and_then(|checkpoint| match checkpoint.tx_number {
Some(tx_number) => Some(tx_number + 1),
_ => {
error!(target: "pruner", %segment, ?checkpoint, "Expected transaction number in prune checkpoint, found None");
error!(target: "pruner", ?checkpoint, "Expected transaction number in prune checkpoint, found None");
None
},
})
@ -102,13 +109,9 @@ impl PruneInput {
/// 2. If checkpoint doesn't exist, use block 0.
///
/// To get the range end: use block `to_block`.
pub(crate) fn get_next_block_range<DB: Database>(
&self,
provider: &DatabaseProviderRW<'_, DB>,
segment: PruneSegment,
) -> RethResult<Option<RangeInclusive<BlockNumber>>> {
let from_block = provider
.get_prune_checkpoint(segment)?
pub(crate) fn get_next_block_range(&self) -> Option<RangeInclusive<BlockNumber>> {
let from_block = self
.previous_checkpoint
.and_then(|checkpoint| checkpoint.block_number)
// Checkpoint exists, prune from the next block after the highest pruned one
.map(|block_number| block_number + 1)
@ -117,10 +120,10 @@ impl PruneInput {
let range = from_block..=self.to_block;
if range.is_empty() {
return Ok(None)
return None
}
Ok(Some(range))
Some(range)
}
}

View File

@ -12,16 +12,18 @@ use tracing::{instrument, trace};
#[non_exhaustive]
pub(crate) struct Receipts;
impl Segment for Receipts {
const SEGMENT: PruneSegment = PruneSegment::Receipts;
impl<DB: Database> Segment<DB> for Receipts {
fn segment(&self) -> PruneSegment {
PruneSegment::Receipts
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune<DB: Database>(
fn prune(
&self,
provider: &DatabaseProviderRW<'_, DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
let tx_range = match input.get_next_tx_num_range_from_checkpoint(provider, Self::SEGMENT)? {
let tx_range = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No receipts to prune");
@ -56,14 +58,14 @@ impl Segment for Receipts {
})
}
fn save_checkpoint<DB: Database>(
fn save_checkpoint(
&self,
provider: &DatabaseProviderRW<'_, DB>,
checkpoint: PruneCheckpoint,
) -> RethResult<()> {
provider.save_prune_checkpoint(Self::SEGMENT, checkpoint)?;
provider.save_prune_checkpoint(PruneSegment::Receipts, checkpoint)?;
// `PruneSegment::Receipts` overrides `PruneSegmnt::ContractLogs`, so we can preemptively
// `PruneSegment::Receipts` overrides `PruneSegment::ContractLogs`, so we can preemptively
// limit their pruning start point.
provider.save_prune_checkpoint(PruneSegment::ContractLogs, checkpoint)?;
@ -84,7 +86,7 @@ mod tests {
generators,
generators::{random_block_range, random_receipt},
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, TxNumber, B256};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestTransaction;
use std::ops::Sub;
@ -117,12 +119,19 @@ mod tests {
let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| {
let prune_mode = PruneMode::Before(to_block);
let input = PruneInput { to_block, delete_limit: 10 };
let input = PruneInput {
previous_checkpoint: tx
.inner()
.get_prune_checkpoint(PruneSegment::Receipts)
.unwrap(),
to_block,
delete_limit: 10,
};
let segment = Receipts::default();
let next_tx_number_to_prune = tx
.inner()
.get_prune_checkpoint(Receipts::SEGMENT)
.get_prune_checkpoint(PruneSegment::Receipts)
.unwrap()
.and_then(|checkpoint| checkpoint.tx_number)
.map(|tx_number| tx_number + 1)
@ -171,7 +180,7 @@ mod tests {
receipts.len() - (last_pruned_tx_number + 1)
);
assert_eq!(
tx.inner().get_prune_checkpoint(Receipts::SEGMENT).unwrap(),
tx.inner().get_prune_checkpoint(PruneSegment::Receipts).unwrap(),
Some(PruneCheckpoint {
block_number: last_pruned_block_number,
tx_number: Some(last_pruned_tx_number as TxNumber),

View File

@ -0,0 +1,305 @@
use crate::{segments::PruneOutput, PrunerError};
use reth_db::{database::Database, tables};
use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, ReceiptsLogPruneConfig,
MINIMUM_PRUNING_DISTANCE,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter,
TransactionsProvider,
};
use tracing::{instrument, trace};
#[derive(Default)]
#[non_exhaustive]
pub(crate) struct ReceiptsByLogs;
impl ReceiptsByLogs {
/// Prune receipts up to the provided block, inclusive, by filtering logs. Works as in inclusion
/// list, and removes every receipt not belonging to it. Respects the batch size.
#[instrument(level = "trace", skip(self, provider), target = "pruner")]
pub(crate) fn prune<DB: Database>(
&self,
provider: &DatabaseProviderRW<'_, DB>,
receipts_log_filter: &ReceiptsLogPruneConfig,
tip_block_number: BlockNumber,
delete_limit: usize,
) -> Result<PruneOutput, PrunerError> {
// Contract log filtering removes every receipt possible except the ones in the list. So,
// for the other receipts it's as if they had a `PruneMode::Distance()` of
// `MINIMUM_PRUNING_DISTANCE`.
let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)
.prune_target_block(
tip_block_number,
MINIMUM_PRUNING_DISTANCE,
PruneSegment::ContractLogs,
)?
.map(|(bn, _)| bn)
.unwrap_or_default();
// Get status checkpoint from latest run
let mut last_pruned_block = provider
.get_prune_checkpoint(PruneSegment::ContractLogs)?
.and_then(|checkpoint| checkpoint.block_number);
let initial_last_pruned_block = last_pruned_block;
let mut from_tx_number = match initial_last_pruned_block {
Some(block) => provider
.block_body_indices(block)?
.map(|block| block.last_tx_num() + 1)
.unwrap_or(0),
None => 0,
};
// Figure out what receipts have already been pruned, so we can have an accurate
// `address_filter`
let address_filter =
receipts_log_filter.group_by_block(tip_block_number, last_pruned_block)?;
// Splits all transactions in different block ranges. Each block range will have its own
// filter address list and will check it while going through the table
//
// Example:
// For an `address_filter` such as:
// { block9: [a1, a2], block20: [a3, a4, a5] }
//
// The following structures will be created in the exact order as showed:
// `block_ranges`: [
// (block0, block8, 0 addresses),
// (block9, block19, 2 addresses),
// (block20, to_block, 5 addresses)
// ]
// `filtered_addresses`: [a1, a2, a3, a4, a5]
//
// The first range will delete all receipts between block0 - block8
// The second range will delete all receipts between block9 - 19, except the ones with
// emitter logs from these addresses: [a1, a2].
// The third range will delete all receipts between block20 - to_block, except the ones with
// emitter logs from these addresses: [a1, a2, a3, a4, a5]
let mut block_ranges = vec![];
let mut blocks_iter = address_filter.iter().peekable();
let mut filtered_addresses = vec![];
while let Some((start_block, addresses)) = blocks_iter.next() {
filtered_addresses.extend_from_slice(addresses);
// This will clear all receipts before the first appearance of a contract log or since
// the block after the last pruned one.
if block_ranges.is_empty() {
let init = last_pruned_block.map(|b| b + 1).unwrap_or_default();
if init < *start_block {
block_ranges.push((init, *start_block - 1, 0));
}
}
let end_block =
blocks_iter.peek().map(|(next_block, _)| *next_block - 1).unwrap_or(to_block);
// Addresses in lower block ranges, are still included in the inclusion list for future
// ranges.
block_ranges.push((*start_block, end_block, filtered_addresses.len()));
}
trace!(
target: "pruner",
?block_ranges,
?filtered_addresses,
"Calculated block ranges and filtered addresses",
);
let mut limit = delete_limit;
let mut done = true;
let mut last_pruned_transaction = None;
for (start_block, end_block, num_addresses) in block_ranges {
let block_range = start_block..=end_block;
// Calculate the transaction range from this block range
let tx_range_end = match provider.block_body_indices(end_block)? {
Some(body) => body.last_tx_num(),
None => {
trace!(
target: "pruner",
?block_range,
"No receipts to prune."
);
continue
}
};
let tx_range = from_tx_number..=tx_range_end;
// Delete receipts, except the ones in the inclusion list
let mut last_skipped_transaction = 0;
let deleted;
(deleted, done) = provider.prune_table_with_range::<tables::Receipts>(
tx_range,
limit,
|(tx_num, receipt)| {
let skip = num_addresses > 0 &&
receipt.logs.iter().any(|log| {
filtered_addresses[..num_addresses].contains(&&log.address)
});
if skip {
last_skipped_transaction = *tx_num;
}
skip
},
|row| last_pruned_transaction = Some(row.0),
)?;
trace!(target: "pruner", %deleted, %done, ?block_range, "Pruned receipts");
limit = limit.saturating_sub(deleted);
// For accurate checkpoints we need to know that we have checked every transaction.
// Example: we reached the end of the range, and the last receipt is supposed to skip
// its deletion.
last_pruned_transaction =
Some(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction));
last_pruned_block = Some(
provider
.transaction_block(last_pruned_transaction.expect("qed"))?
.ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
// If there's more receipts to prune, set the checkpoint block number to
// previous, so we could finish pruning its receipts on the
// next run.
.saturating_sub(if done { 0 } else { 1 }),
);
if limit == 0 {
done &= end_block == to_block;
break
}
from_tx_number = last_pruned_transaction.expect("qed") + 1;
}
// If there are contracts using `PruneMode::Distance(_)` there will be receipts before
// `to_block` that become eligible to be pruned in future runs. Therefore, our checkpoint is
// not actually `to_block`, but the `lowest_block_with_distance` from any contract.
// This ensures that in future pruner runs we can prune all these receipts between the
// previous `lowest_block_with_distance` and the new one using
// `get_next_tx_num_range_from_checkpoint`.
//
// Only applies if we were able to prune everything intended for this run, otherwise the
// checkpoint is the `last_pruned_block`.
let prune_mode_block = receipts_log_filter
.lowest_block_with_distance(tip_block_number, initial_last_pruned_block)?
.unwrap_or(to_block);
provider.save_prune_checkpoint(
PruneSegment::ContractLogs,
PruneCheckpoint {
block_number: Some(prune_mode_block.min(last_pruned_block.unwrap_or(u64::MAX))),
tx_number: last_pruned_transaction,
prune_mode: PruneMode::Before(prune_mode_block),
},
)?;
Ok(PruneOutput { done, pruned: delete_limit - limit, checkpoint: None })
}
}
#[cfg(test)]
mod tests {
use crate::segments::receipts_by_logs::ReceiptsByLogs;
use assert_matches::assert_matches;
use reth_db::{cursor::DbCursorRO, tables, transaction::DbTx};
use reth_interfaces::test_utils::{
generators,
generators::{random_block_range, random_eoa_account, random_log, random_receipt},
};
use reth_primitives::{PruneMode, PruneSegment, ReceiptsLogPruneConfig, B256};
use reth_provider::{PruneCheckpointReader, TransactionsProvider};
use reth_stages::test_utils::TestTransaction;
use std::collections::BTreeMap;
#[test]
fn prune_receipts_by_logs() {
let tx = TestTransaction::default();
let mut rng = generators::rng();
let tip = 20000;
let blocks = [
random_block_range(&mut rng, 0..=100, B256::ZERO, 1..5),
random_block_range(&mut rng, (100 + 1)..=(tip - 100), B256::ZERO, 0..1),
random_block_range(&mut rng, (tip - 100 + 1)..=tip, B256::ZERO, 1..5),
]
.concat();
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
let mut receipts = Vec::new();
let (deposit_contract_addr, _) = random_eoa_account(&mut rng);
for block in &blocks {
for (txi, transaction) in block.body.iter().enumerate() {
let mut receipt = random_receipt(&mut rng, transaction, Some(1));
receipt.logs.push(random_log(
&mut rng,
if txi == (block.body.len() - 1) { Some(deposit_contract_addr) } else { None },
Some(1),
));
receipts.push((receipts.len() as u64, receipt));
}
}
tx.insert_receipts(receipts).expect("insert receipts");
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
blocks.iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
tx.table::<tables::Receipts>().unwrap().len()
);
let run_prune = || {
let provider = tx.inner_rw();
let prune_before_block: usize = 20;
let prune_mode = PruneMode::Before(prune_before_block as u64);
let receipts_log_filter =
ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)]));
let result = ReceiptsByLogs::default().prune(&provider, &receipts_log_filter, tip, 10);
provider.commit().expect("commit");
assert_matches!(result, Ok(_));
let output = result.unwrap();
let (pruned_block, pruned_tx) = tx
.inner()
.get_prune_checkpoint(PruneSegment::ContractLogs)
.unwrap()
.map(|checkpoint| (checkpoint.block_number.unwrap(), checkpoint.tx_number.unwrap()))
.unwrap_or_default();
// All receipts are in the end of the block
let unprunable = pruned_block.saturating_sub(prune_before_block as u64 - 1);
assert_eq!(
tx.table::<tables::Receipts>().unwrap().len(),
blocks.iter().map(|block| block.body.len()).sum::<usize>() -
((pruned_tx + 1) - unprunable) as usize
);
output.done
};
while !run_prune() {}
let provider = tx.inner();
let mut cursor = provider.tx_ref().cursor_read::<tables::Receipts>().unwrap();
let walker = cursor.walk(None).unwrap();
for receipt in walker {
let (tx_num, receipt) = receipt.unwrap();
// Either we only find our contract, or the receipt is part of the unprunable receipts
// set by tip - 128
assert!(
receipt.logs.iter().any(|l| l.address == deposit_contract_addr) ||
provider.transaction_block(tx_num).unwrap().unwrap() > tip - 128,
);
}
}
}

View File

@ -0,0 +1,183 @@
use crate::{
segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment},
PrunerError,
};
use reth_db::{database::Database, tables};
use reth_primitives::PruneSegment;
use reth_provider::{DatabaseProviderRW, TransactionsProvider};
use tracing::{instrument, trace};
#[derive(Default)]
#[non_exhaustive]
pub(crate) struct SenderRecovery;
impl<DB: Database> Segment<DB> for SenderRecovery {
fn segment(&self) -> PruneSegment {
PruneSegment::SenderRecovery
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune(
&self,
provider: &DatabaseProviderRW<'_, DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
let tx_range = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No transaction senders to prune");
return Ok(PruneOutput::done())
}
};
let tx_range_end = *tx_range.end();
let mut last_pruned_transaction = tx_range_end;
let (pruned, done) = provider.prune_table_with_range::<tables::TxSenders>(
tx_range,
input.delete_limit,
|_| false,
|row| last_pruned_transaction = row.0,
)?;
trace!(target: "pruner", %pruned, %done, "Pruned transaction senders");
let last_pruned_block = provider
.transaction_block(last_pruned_transaction)?
.ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
// If there's more transaction senders to prune, set the checkpoint block number to
// previous, so we could finish pruning its transaction senders on the next run.
.checked_sub(if done { 0 } else { 1 });
Ok(PruneOutput {
done,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
block_number: last_pruned_block,
tx_number: Some(last_pruned_transaction),
}),
})
}
}
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, PruneOutput, Segment, SenderRecovery};
use assert_matches::assert_matches;
use itertools::{
FoldWhile::{Continue, Done},
Itertools,
};
use reth_db::tables;
use reth_interfaces::test_utils::{generators, generators::random_block_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestTransaction;
use std::ops::Sub;
#[test]
fn prune() {
let tx = TestTransaction::default();
let mut rng = generators::rng();
let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
let mut transaction_senders = Vec::new();
for block in &blocks {
for transaction in &block.body {
transaction_senders.push((
transaction_senders.len() as u64,
transaction.recover_signer().expect("recover signer"),
));
}
}
tx.insert_transaction_senders(transaction_senders.clone())
.expect("insert transaction senders");
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
blocks.iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
tx.table::<tables::TxSenders>().unwrap().len()
);
let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| {
let prune_mode = PruneMode::Before(to_block);
let input = PruneInput {
previous_checkpoint: tx
.inner()
.get_prune_checkpoint(PruneSegment::SenderRecovery)
.unwrap(),
to_block,
delete_limit: 10,
};
let segment = SenderRecovery::default();
let next_tx_number_to_prune = tx
.inner()
.get_prune_checkpoint(PruneSegment::SenderRecovery)
.unwrap()
.and_then(|checkpoint| checkpoint.tx_number)
.map(|tx_number| tx_number + 1)
.unwrap_or_default();
let last_pruned_tx_number = blocks
.iter()
.take(to_block as usize)
.map(|block| block.body.len())
.sum::<usize>()
.min(next_tx_number_to_prune as usize + input.delete_limit)
.sub(1);
let last_pruned_block_number = blocks
.iter()
.fold_while((0, 0), |(_, mut tx_count), block| {
tx_count += block.body.len();
if tx_count > last_pruned_tx_number {
Done((block.number, tx_count))
} else {
Continue((block.number, tx_count))
}
})
.into_inner()
.0;
let provider = tx.inner_rw();
let result = segment.prune(&provider, input).unwrap();
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let last_pruned_block_number =
last_pruned_block_number.checked_sub(if result.done { 0 } else { 1 });
assert_eq!(
tx.table::<tables::TxSenders>().unwrap().len(),
transaction_senders.len() - (last_pruned_tx_number + 1)
);
assert_eq!(
tx.inner().get_prune_checkpoint(PruneSegment::SenderRecovery).unwrap(),
Some(PruneCheckpoint {
block_number: last_pruned_block_number,
tx_number: Some(last_pruned_tx_number as TxNumber),
prune_mode
})
);
};
test_prune(6, (false, 10));
test_prune(6, (true, 2));
test_prune(10, (true, 8));
}
}

View File

@ -0,0 +1,232 @@
use crate::{
segments::{
history::prune_history_indices, PruneInput, PruneOutput, PruneOutputCheckpoint, Segment,
},
PrunerError,
};
use reth_db::{
database::Database,
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
tables,
};
use reth_primitives::PruneSegment;
use reth_provider::DatabaseProviderRW;
use tracing::{instrument, trace};
#[derive(Default)]
#[non_exhaustive]
pub(crate) struct StorageHistory;
impl<DB: Database> Segment<DB> for StorageHistory {
fn segment(&self) -> PruneSegment {
PruneSegment::StorageHistory
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune(
&self,
provider: &DatabaseProviderRW<'_, DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
let range = match input.get_next_block_range() {
Some(range) => range,
None => {
trace!(target: "pruner", "No storage history to prune");
return Ok(PruneOutput::done())
}
};
let range_end = *range.end();
let mut last_changeset_pruned_block = None;
let (pruned_changesets, done) = provider
.prune_table_with_range::<tables::StorageChangeSet>(
BlockNumberAddress::range(range),
input.delete_limit / 2,
|_| false,
|row| last_changeset_pruned_block = Some(row.0.block_number()),
)?;
trace!(target: "pruner", deleted = %pruned_changesets, %done, "Pruned storage history (changesets)");
let last_changeset_pruned_block = last_changeset_pruned_block
// If there's more storage storage changesets to prune, set the checkpoint block number
// to previous, so we could finish pruning its storage changesets on the next run.
.map(|block_number| if done { block_number } else { block_number.saturating_sub(1) })
.unwrap_or(range_end);
let (processed, pruned_indices) = prune_history_indices::<DB, tables::StorageHistory, _>(
provider,
last_changeset_pruned_block,
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
|key| StorageShardedKey::last(key.address, key.sharded_key.key),
)?;
trace!(target: "pruner", %processed, deleted = %pruned_indices, %done, "Pruned storage history (history)" );
Ok(PruneOutput {
done,
pruned: pruned_changesets + pruned_indices,
checkpoint: Some(PruneOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
tx_number: None,
}),
})
}
}
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, PruneOutput, Segment, StorageHistory};
use assert_matches::assert_matches;
use reth_db::{tables, BlockNumberList};
use reth_interfaces::test_utils::{
generators,
generators::{random_block_range, random_changeset_range, random_eoa_account_range},
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestTransaction;
use std::{collections::BTreeMap, ops::AddAssign};
#[test]
fn prune() {
let tx = TestTransaction::default();
let mut rng = generators::rng();
let blocks = random_block_range(&mut rng, 0..=5000, B256::ZERO, 0..1);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
let accounts =
random_eoa_account_range(&mut rng, 0..2).into_iter().collect::<BTreeMap<_, _>>();
let (changesets, _) = random_changeset_range(
&mut rng,
blocks.iter(),
accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))),
2..3,
1..2,
);
tx.insert_changesets(changesets.clone(), None).expect("insert changesets");
tx.insert_history(changesets.clone(), None).expect("insert history");
let storage_occurrences = tx.table::<tables::StorageHistory>().unwrap().into_iter().fold(
BTreeMap::<_, usize>::new(),
|mut map, (key, _)| {
map.entry((key.address, key.sharded_key.key)).or_default().add_assign(1);
map
},
);
assert!(storage_occurrences.into_iter().any(|(_, occurrences)| occurrences > 1));
assert_eq!(
tx.table::<tables::StorageChangeSet>().unwrap().len(),
changesets.iter().flatten().flat_map(|(_, _, entries)| entries).count()
);
let original_shards = tx.table::<tables::StorageHistory>().unwrap();
let test_prune = |to_block: BlockNumber, run: usize, expected_result: (bool, usize)| {
let prune_mode = PruneMode::Before(to_block);
let input = PruneInput {
previous_checkpoint: tx
.inner()
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
to_block,
delete_limit: 2000,
};
let segment = StorageHistory::default();
let provider = tx.inner_rw();
let result = segment.prune(&provider, input).unwrap();
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().flat_map(move |(address, _, entries)| {
entries.iter().map(move |entry| (block_number, address, entry))
})
})
.collect::<Vec<_>>();
#[allow(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _, _))| {
*i < input.delete_limit / 2 * run && *block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
let mut pruned_changesets = changesets
.iter()
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
.skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| if result.done {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
let pruned_changesets = pruned_changesets.fold(
BTreeMap::<_, Vec<_>>::new(),
|mut acc, (block_number, address, entry)| {
acc.entry((block_number, address)).or_default().push(entry);
acc
},
);
assert_eq!(
tx.table::<tables::StorageChangeSet>().unwrap().len(),
pruned_changesets.values().flatten().count()
);
let actual_shards = tx.table::<tables::StorageHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks = blocks
.iter(0)
.skip_while(|block| *block <= last_pruned_block_number as usize)
.collect::<Vec<_>>();
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(
tx.inner().get_prune_checkpoint(PruneSegment::StorageHistory).unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(998, 1, (false, 1000));
test_prune(998, 2, (true, 998));
test_prune(1400, 3, (true, 804));
}
}

View File

@ -0,0 +1,202 @@
use crate::{
segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment},
PrunerError,
};
use rayon::prelude::*;
use reth_db::{database::Database, tables};
use reth_primitives::PruneSegment;
use reth_provider::{DatabaseProviderRW, TransactionsProvider};
use tracing::{instrument, trace};
#[derive(Default)]
#[non_exhaustive]
pub(crate) struct TransactionLookup;
impl<DB: Database> Segment<DB> for TransactionLookup {
fn segment(&self) -> PruneSegment {
PruneSegment::TransactionLookup
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune(
&self,
provider: &DatabaseProviderRW<'_, DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
let (start, end) = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No transaction lookup entries to prune");
return Ok(PruneOutput::done())
}
}
.into_inner();
let tx_range = start..=(end.min(start + input.delete_limit as u64 - 1));
let tx_range_end = *tx_range.end();
// Retrieve transactions in the range and calculate their hashes in parallel
let hashes = provider
.transactions_by_tx_range(tx_range.clone())?
.into_par_iter()
.map(|transaction| transaction.hash())
.collect::<Vec<_>>();
// Number of transactions retrieved from the database should match the tx range count
let tx_count = tx_range.count();
if hashes.len() != tx_count {
return Err(PrunerError::InconsistentData(
"Unexpected number of transaction hashes retrieved by transaction number range",
))
}
let mut last_pruned_transaction = None;
let (pruned, _) = provider.prune_table_with_iterator::<tables::TxHashNumber>(
hashes,
input.delete_limit,
|row| {
last_pruned_transaction = Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
},
)?;
let done = tx_range_end == end;
trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
let last_pruned_block = provider
.transaction_block(last_pruned_transaction)?
.ok_or(PrunerError::InconsistentData("Block for transaction is not found"))?
// If there's more transaction lookup entries to prune, set the checkpoint block number
// to previous, so we could finish pruning its transaction lookup entries on the next
// run.
.checked_sub(if done { 0 } else { 1 });
Ok(PruneOutput {
done,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
block_number: last_pruned_block,
tx_number: Some(last_pruned_transaction),
}),
})
}
}
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, PruneOutput, Segment, TransactionLookup};
use assert_matches::assert_matches;
use itertools::{
FoldWhile::{Continue, Done},
Itertools,
};
use reth_db::tables;
use reth_interfaces::test_utils::{generators, generators::random_block_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestTransaction;
use std::ops::Sub;
#[test]
fn prune() {
let tx = TestTransaction::default();
let mut rng = generators::rng();
let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
tx.insert_blocks(blocks.iter(), None).expect("insert blocks");
let mut tx_hash_numbers = Vec::new();
for block in &blocks {
for transaction in &block.body {
tx_hash_numbers.push((transaction.hash, tx_hash_numbers.len() as u64));
}
}
tx.insert_tx_hash_numbers(tx_hash_numbers.clone()).expect("insert tx hash numbers");
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
blocks.iter().map(|block| block.body.len()).sum::<usize>()
);
assert_eq!(
tx.table::<tables::Transactions>().unwrap().len(),
tx.table::<tables::TxHashNumber>().unwrap().len()
);
let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| {
let prune_mode = PruneMode::Before(to_block);
let input = PruneInput {
previous_checkpoint: tx
.inner()
.get_prune_checkpoint(PruneSegment::TransactionLookup)
.unwrap(),
to_block,
delete_limit: 10,
};
let segment = TransactionLookup::default();
let next_tx_number_to_prune = tx
.inner()
.get_prune_checkpoint(PruneSegment::TransactionLookup)
.unwrap()
.and_then(|checkpoint| checkpoint.tx_number)
.map(|tx_number| tx_number + 1)
.unwrap_or_default();
let last_pruned_tx_number = blocks
.iter()
.take(to_block as usize)
.map(|block| block.body.len())
.sum::<usize>()
.min(next_tx_number_to_prune as usize + input.delete_limit)
.sub(1);
let last_pruned_block_number = blocks
.iter()
.fold_while((0, 0), |(_, mut tx_count), block| {
tx_count += block.body.len();
if tx_count > last_pruned_tx_number {
Done((block.number, tx_count))
} else {
Continue((block.number, tx_count))
}
})
.into_inner()
.0;
let provider = tx.inner_rw();
let result = segment.prune(&provider, input).unwrap();
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let last_pruned_block_number =
last_pruned_block_number.checked_sub(if result.done { 0 } else { 1 });
assert_eq!(
tx.table::<tables::TxHashNumber>().unwrap().len(),
tx_hash_numbers.len() - (last_pruned_tx_number + 1)
);
assert_eq!(
tx.inner().get_prune_checkpoint(PruneSegment::TransactionLookup).unwrap(),
Some(PruneCheckpoint {
block_number: last_pruned_block_number,
tx_number: Some(last_pruned_tx_number as TxNumber),
prune_mode
})
);
};
test_prune(6, (false, 10));
test_prune(6, (true, 2));
test_prune(10, (true, 8));
}
}

View File

@ -11,16 +11,18 @@ use tracing::{instrument, trace};
#[non_exhaustive]
pub(crate) struct Transactions;
impl Segment for Transactions {
const SEGMENT: PruneSegment = PruneSegment::Transactions;
impl<DB: Database> Segment<DB> for Transactions {
fn segment(&self) -> PruneSegment {
PruneSegment::Transactions
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune<DB: Database>(
fn prune(
&self,
provider: &DatabaseProviderRW<'_, DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
let tx_range = match input.get_next_tx_num_range_from_checkpoint(provider, Self::SEGMENT)? {
let tx_range = match input.get_next_tx_num_range(provider)? {
Some(range) => range,
None => {
trace!(target: "pruner", "No transactions to prune");
@ -65,7 +67,7 @@ mod tests {
};
use reth_db::tables;
use reth_interfaces::test_utils::{generators, generators::random_block_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, TxNumber, B256};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestTransaction;
use std::ops::Sub;
@ -84,12 +86,19 @@ mod tests {
let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| {
let prune_mode = PruneMode::Before(to_block);
let input = PruneInput { to_block, delete_limit: 10 };
let input = PruneInput {
previous_checkpoint: tx
.inner()
.get_prune_checkpoint(PruneSegment::Transactions)
.unwrap(),
to_block,
delete_limit: 10,
};
let segment = Transactions::default();
let next_tx_number_to_prune = tx
.inner()
.get_prune_checkpoint(Transactions::SEGMENT)
.get_prune_checkpoint(PruneSegment::Transactions)
.unwrap()
.and_then(|checkpoint| checkpoint.tx_number)
.map(|tx_number| tx_number + 1)
@ -138,7 +147,7 @@ mod tests {
transactions.len() - (last_pruned_tx_number + 1)
);
assert_eq!(
tx.inner().get_prune_checkpoint(Transactions::SEGMENT).unwrap(),
tx.inner().get_prune_checkpoint(PruneSegment::Transactions).unwrap(),
Some(PruneCheckpoint {
block_number: last_pruned_block_number,
tx_number: Some(last_pruned_tx_number as TxNumber),