From e823c4082d74ac99e10b194f562aca51f05ae4e3 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 9 Oct 2023 16:52:11 +0100 Subject: [PATCH] feat(prune): transactions segment (#4937) --- crates/primitives/src/prune/segment.rs | 2 + crates/prune/src/pruner.rs | 30 +++++ crates/prune/src/segments/mod.rs | 2 + crates/prune/src/segments/transactions.rs | 153 ++++++++++++++++++++++ 4 files changed, 187 insertions(+) create mode 100644 crates/prune/src/segments/transactions.rs diff --git a/crates/primitives/src/prune/segment.rs b/crates/primitives/src/prune/segment.rs index f6f4d02fd..c71c6bcf7 100644 --- a/crates/primitives/src/prune/segment.rs +++ b/crates/primitives/src/prune/segment.rs @@ -20,6 +20,8 @@ pub enum PruneSegment { StorageHistory, /// Prune segment responsible for the `CanonicalHeaders`, `Headers` and `HeaderTD` tables. Headers, + /// Prune segment responsible for the `Transactions` table. + Transactions, } /// PruneSegment error type. diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 19146c5d0..3feedd08a 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -312,6 +312,36 @@ impl Pruner { (PruneProgress::from_done(output.done), output.pruned), ); } + + if let (Some(to_block), true) = (snapshots.transactions, delete_limit > 0) { + let prune_mode = PruneMode::Before(to_block + 1); + trace!( + target: "pruner", + prune_segment = ?PruneSegment::Transactions, + %to_block, + ?prune_mode, + "Got target block to prune" + ); + + let segment_start = Instant::now(); + let segment = segments::Transactions::default(); + let output = segment.prune(&provider, PruneInput { to_block, delete_limit })?; + if let Some(checkpoint) = output.checkpoint { + segment + .save_checkpoint(&provider, checkpoint.as_prune_checkpoint(prune_mode))?; + } + self.metrics + .get_prune_segment_metrics(PruneSegment::Transactions) + .duration_seconds + .record(segment_start.elapsed()); + + done = done && output.done; + delete_limit = delete_limit.saturating_sub(output.pruned); + segments.insert( + PruneSegment::Transactions, + (PruneProgress::from_done(output.done), output.pruned), + ); + } } provider.commit()?; diff --git a/crates/prune/src/segments/mod.rs b/crates/prune/src/segments/mod.rs index 04f45c6e0..1446351ae 100644 --- a/crates/prune/src/segments/mod.rs +++ b/crates/prune/src/segments/mod.rs @@ -1,8 +1,10 @@ mod headers; mod receipts; +mod transactions; pub(crate) use headers::Headers; pub(crate) use receipts::Receipts; +pub(crate) use transactions::Transactions; use crate::PrunerError; use reth_db::database::Database; diff --git a/crates/prune/src/segments/transactions.rs b/crates/prune/src/segments/transactions.rs new file mode 100644 index 000000000..ea8b07252 --- /dev/null +++ b/crates/prune/src/segments/transactions.rs @@ -0,0 +1,153 @@ +use crate::{ + segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment}, + PrunerError, +}; +use reth_db::{database::Database, tables}; +use reth_primitives::PruneSegment; +use reth_provider::{DatabaseProviderRW, TransactionsProvider}; +use tracing::{instrument, trace}; + +#[derive(Default)] +#[non_exhaustive] +pub(crate) struct Transactions; + +impl Segment for Transactions { + const SEGMENT: PruneSegment = PruneSegment::Transactions; + + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] + fn prune( + &self, + provider: &DatabaseProviderRW<'_, DB>, + input: PruneInput, + ) -> Result { + let tx_range = match input.get_next_tx_num_range_from_checkpoint(provider, Self::SEGMENT)? { + Some(range) => range, + None => { + trace!(target: "pruner", "No transactions to prune"); + return Ok(PruneOutput::done()) + } + }; + + let mut last_pruned_transaction = *tx_range.end(); + let (pruned, done) = provider.prune_table_with_range::( + tx_range, + input.delete_limit, + |_| false, + |row| last_pruned_transaction = row.0, + )?; + trace!(target: "pruner", %pruned, %done, "Pruned transactions"); + + let last_pruned_block = provider + .transaction_block(last_pruned_transaction)? + .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + // If there's more transactions to prune, set the checkpoint block number to previous, + // so we could finish pruning its transactions on the next run. + .checked_sub(if done { 0 } else { 1 }); + + Ok(PruneOutput { + done, + pruned, + checkpoint: Some(PruneOutputCheckpoint { + block_number: last_pruned_block, + tx_number: Some(last_pruned_transaction), + }), + }) + } +} + +#[cfg(test)] +mod tests { + use crate::segments::{PruneInput, PruneOutput, Segment, Transactions}; + use assert_matches::assert_matches; + use itertools::{ + FoldWhile::{Continue, Done}, + Itertools, + }; + use reth_db::tables; + use reth_interfaces::test_utils::{generators, generators::random_block_range}; + use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, TxNumber, B256}; + use reth_provider::PruneCheckpointReader; + use reth_stages::test_utils::TestTransaction; + use std::ops::Sub; + + #[test] + fn prune() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let blocks = random_block_range(&mut rng, 1..=100, B256::ZERO, 2..3); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let transactions = blocks.iter().flat_map(|block| &block.body).collect::>(); + + assert_eq!(tx.table::().unwrap().len(), transactions.len()); + + let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| { + let prune_mode = PruneMode::Before(to_block); + let input = PruneInput { to_block, delete_limit: 10 }; + let segment = Transactions::default(); + + let next_tx_number_to_prune = tx + .inner() + .get_prune_checkpoint(Transactions::SEGMENT) + .unwrap() + .and_then(|checkpoint| checkpoint.tx_number) + .map(|tx_number| tx_number + 1) + .unwrap_or_default(); + + let provider = tx.inner_rw(); + let result = segment.prune(&provider, input).unwrap(); + assert_matches!( + result, + PruneOutput {done, pruned, checkpoint: Some(_)} + if (done, pruned) == expected_result + ); + segment + .save_checkpoint( + &provider, + result.checkpoint.unwrap().as_prune_checkpoint(prune_mode), + ) + .unwrap(); + provider.commit().expect("commit"); + + let last_pruned_tx_number = blocks + .iter() + .take(to_block as usize) + .map(|block| block.body.len()) + .sum::() + .min(next_tx_number_to_prune as usize + input.delete_limit) + .sub(1); + + let last_pruned_block_number = blocks + .iter() + .fold_while((0, 0), |(_, mut tx_count), block| { + tx_count += block.body.len(); + + if tx_count > last_pruned_tx_number { + Done((block.number, tx_count)) + } else { + Continue((block.number, tx_count)) + } + }) + .into_inner() + .0 + .checked_sub(if result.done { 0 } else { 1 }); + + assert_eq!( + tx.table::().unwrap().len(), + transactions.len() - (last_pruned_tx_number + 1) + ); + assert_eq!( + tx.inner().get_prune_checkpoint(Transactions::SEGMENT).unwrap(), + Some(PruneCheckpoint { + block_number: last_pruned_block_number, + tx_number: Some(last_pruned_tx_number as TxNumber), + prune_mode + }) + ); + }; + + test_prune(6, (false, 10)); + test_prune(6, (true, 2)); + } +}