From 9ca0cdcf1e8b67919fa58c8c955d0de000bd4b9d Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 9 Oct 2023 15:25:48 +0100 Subject: [PATCH] feat(prune): headers segment triggered by snapshots (#4936) --- crates/primitives/src/prune/segment.rs | 6 +- crates/prune/src/pruner.rs | 34 +++- crates/prune/src/segments/headers.rs | 192 ++++++++++++++++++ crates/prune/src/segments/mod.rs | 44 +++- crates/prune/src/segments/receipts.rs | 30 ++- crates/stages/src/test_utils/test_db.rs | 2 +- .../src/providers/database/provider.rs | 39 ++-- 7 files changed, 307 insertions(+), 40 deletions(-) create mode 100644 crates/prune/src/segments/headers.rs diff --git a/crates/primitives/src/prune/segment.rs b/crates/primitives/src/prune/segment.rs index 834d2efce..f6f4d02fd 100644 --- a/crates/primitives/src/prune/segment.rs +++ b/crates/primitives/src/prune/segment.rs @@ -10,14 +10,16 @@ pub enum PruneSegment { SenderRecovery, /// Prune segment responsible for the `TxHashNumber` table. TransactionLookup, - /// Prune segment responsible for all `Receipts`. + /// Prune segment responsible for all rows in `Receipts` table. Receipts, - /// Prune segment responsible for some `Receipts` filtered by logs. + /// Prune segment responsible for some rows in `Receipts` table filtered by logs. ContractLogs, /// Prune segment responsible for the `AccountChangeSet` and `AccountHistory` tables. AccountHistory, /// Prune segment responsible for the `StorageChangeSet` and `StorageHistory` tables. StorageHistory, + /// Prune segment responsible for the `CanonicalHeaders`, `Headers` and `HeaderTD` tables. + Headers, } /// PruneSegment error type. diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index b12cff48f..19146c5d0 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -103,7 +103,7 @@ impl Pruner { let mut segments = BTreeMap::new(); // TODO(alexey): prune snapshotted segments of data (headers, transactions) - // let highest_snapshots = *self.highest_snapshots_tracker.borrow(); + let highest_snapshots = *self.highest_snapshots_tracker.borrow(); // Multiply `delete_limit` (number of row to delete per block) by number of blocks since // last pruner run. `previous_tip_block_number` is close to `tip_block_number`, usually @@ -282,6 +282,38 @@ impl Pruner { ); } + if let Some(snapshots) = highest_snapshots { + if let (Some(to_block), true) = (snapshots.headers, delete_limit > 0) { + let prune_mode = PruneMode::Before(to_block + 1); + trace!( + target: "pruner", + prune_segment = ?PruneSegment::Headers, + %to_block, + ?prune_mode, + "Got target block to prune" + ); + + let segment_start = Instant::now(); + let segment = segments::Headers::default(); + let output = segment.prune(&provider, PruneInput { to_block, delete_limit })?; + if let Some(checkpoint) = output.checkpoint { + segment + .save_checkpoint(&provider, checkpoint.as_prune_checkpoint(prune_mode))?; + } + self.metrics + .get_prune_segment_metrics(PruneSegment::Headers) + .duration_seconds + .record(segment_start.elapsed()); + + done = done && output.done; + delete_limit = delete_limit.saturating_sub(output.pruned); + segments.insert( + PruneSegment::Headers, + (PruneProgress::from_done(output.done), output.pruned), + ); + } + } + provider.commit()?; self.previous_tip_block_number = Some(tip_block_number); diff --git a/crates/prune/src/segments/headers.rs b/crates/prune/src/segments/headers.rs new file mode 100644 index 000000000..91ae3fa81 --- /dev/null +++ b/crates/prune/src/segments/headers.rs @@ -0,0 +1,192 @@ +use crate::{ + segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment}, + PrunerError, +}; +use itertools::Itertools; +use reth_db::{database::Database, table::Table, tables}; +use reth_interfaces::RethResult; +use reth_primitives::{BlockNumber, PruneSegment}; +use reth_provider::DatabaseProviderRW; +use std::ops::RangeInclusive; +use tracing::{instrument, trace}; + +#[derive(Default)] +#[non_exhaustive] +pub(crate) struct Headers; + +impl Segment for Headers { + const SEGMENT: PruneSegment = PruneSegment::Headers; + + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] + fn prune( + &self, + provider: &DatabaseProviderRW<'_, DB>, + input: PruneInput, + ) -> Result { + let block_range = match input.get_next_block_range(provider, Self::SEGMENT)? { + Some(range) => range, + None => { + trace!(target: "pruner", "No headers to prune"); + return Ok(PruneOutput::done()) + } + }; + + let delete_limit = input.delete_limit / 3; + if delete_limit == 0 { + // Nothing to do, `input.delete_limit` is less than 3, so we can't prune all + // headers-related tables up to the same height + return Ok(PruneOutput::not_done()) + } + + let results = [ + self.prune_table::( + provider, + block_range.clone(), + delete_limit, + )?, + self.prune_table::(provider, block_range.clone(), delete_limit)?, + self.prune_table::(provider, block_range, delete_limit)?, + ]; + + if !results.iter().map(|(_, _, last_pruned_block)| last_pruned_block).all_equal() { + return Err(PrunerError::InconsistentData( + "All headers-related tables should be pruned up to the same height", + )) + } + + let (done, pruned, last_pruned_block) = results.into_iter().fold( + (true, 0, 0), + |(total_done, total_pruned, _), (done, pruned, last_pruned_block)| { + (total_done && done, total_pruned + pruned, last_pruned_block) + }, + ); + + Ok(PruneOutput { + done, + pruned, + checkpoint: Some(PruneOutputCheckpoint { + block_number: Some(last_pruned_block), + tx_number: None, + }), + }) + } +} + +impl Headers { + /// Prune one headers-related table. + /// + /// Returns `done`, number of pruned rows and last pruned block number. + fn prune_table>( + &self, + provider: &DatabaseProviderRW<'_, DB>, + range: RangeInclusive, + delete_limit: usize, + ) -> RethResult<(bool, usize, BlockNumber)> { + let mut last_pruned_block = *range.end(); + let (pruned, done) = provider.prune_table_with_range::( + range, + delete_limit, + |_| false, + |row| last_pruned_block = row.0, + )?; + trace!(target: "pruner", %pruned, %done, table = %T::NAME, "Pruned headers"); + + Ok((done, pruned, last_pruned_block)) + } +} + +#[cfg(test)] +mod tests { + use crate::segments::{Headers, PruneInput, PruneOutput, Segment}; + use assert_matches::assert_matches; + use reth_db::tables; + use reth_interfaces::test_utils::{generators, generators::random_header_range}; + use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, B256}; + use reth_provider::PruneCheckpointReader; + use reth_stages::test_utils::TestTransaction; + + #[test] + fn prune() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let headers = random_header_range(&mut rng, 0..100, B256::ZERO); + tx.insert_headers_with_td(headers.iter()).expect("insert headers"); + + assert_eq!(tx.table::().unwrap().len(), headers.len()); + assert_eq!(tx.table::().unwrap().len(), headers.len()); + assert_eq!(tx.table::().unwrap().len(), headers.len()); + + let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| { + let prune_mode = PruneMode::Before(to_block); + let input = PruneInput { to_block, delete_limit: 10 }; + let segment = Headers::default(); + + let next_block_number_to_prune = tx + .inner() + .get_prune_checkpoint(Headers::SEGMENT) + .unwrap() + .and_then(|checkpoint| checkpoint.block_number) + .map(|block_number| block_number + 1) + .unwrap_or_default(); + + let provider = tx.inner_rw(); + let result = segment.prune(&provider, input).unwrap(); + assert_matches!( + result, + PruneOutput {done, pruned, checkpoint: Some(_)} + if (done, pruned) == expected_result + ); + segment + .save_checkpoint( + &provider, + result.checkpoint.unwrap().as_prune_checkpoint(prune_mode), + ) + .unwrap(); + provider.commit().expect("commit"); + + let last_pruned_block_number = to_block + .min(next_block_number_to_prune + input.delete_limit as BlockNumber / 3 - 1); + + assert_eq!( + tx.table::().unwrap().len(), + headers.len() - (last_pruned_block_number + 1) as usize + ); + assert_eq!( + tx.table::().unwrap().len(), + headers.len() - (last_pruned_block_number + 1) as usize + ); + assert_eq!( + tx.table::().unwrap().len(), + headers.len() - (last_pruned_block_number + 1) as usize + ); + assert_eq!( + tx.inner().get_prune_checkpoint(Headers::SEGMENT).unwrap(), + Some(PruneCheckpoint { + block_number: Some(last_pruned_block_number), + tx_number: None, + prune_mode + }) + ); + }; + + test_prune(3, (false, 9)); + test_prune(3, (true, 3)); + } + + #[test] + fn prune_cannot_be_done() { + let tx = TestTransaction::default(); + + let input = PruneInput { + to_block: 1, + // Less than total number of tables for `Headers` segment + delete_limit: 2, + }; + let segment = Headers::default(); + + let provider = tx.inner_rw(); + let result = segment.prune(&provider, input).unwrap(); + assert_eq!(result, PruneOutput::not_done()); + } +} diff --git a/crates/prune/src/segments/mod.rs b/crates/prune/src/segments/mod.rs index 56e0a1254..04f45c6e0 100644 --- a/crates/prune/src/segments/mod.rs +++ b/crates/prune/src/segments/mod.rs @@ -1,4 +1,7 @@ +mod headers; mod receipts; + +pub(crate) use headers::Headers; pub(crate) use receipts::Receipts; use crate::PrunerError; @@ -88,10 +91,39 @@ impl PruneInput { Ok(Some(range)) } + + /// Get next inclusive block range to prune according to the checkpoint, `to_block` block + /// number and `limit`. + /// + /// To get the range start (`from_block`): + /// 1. If checkpoint exists, use next block. + /// 2. If checkpoint doesn't exist, use block 0. + /// + /// To get the range end: use block `to_block`. + pub(crate) fn get_next_block_range( + &self, + provider: &DatabaseProviderRW<'_, DB>, + segment: PruneSegment, + ) -> RethResult>> { + let from_block = provider + .get_prune_checkpoint(segment)? + .and_then(|checkpoint| checkpoint.block_number) + // Checkpoint exists, prune from the next block after the highest pruned one + .map(|block_number| block_number + 1) + // No checkpoint exists, prune from genesis + .unwrap_or(0); + + let range = from_block..=self.to_block; + if range.is_empty() { + return Ok(None) + } + + Ok(Some(range)) + } } /// Segment pruning output, see [Segment::prune]. -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] pub(crate) struct PruneOutput { /// `true` if pruning has been completed up to the target block, and `false` if there's more /// data to prune in further runs. @@ -105,12 +137,18 @@ pub(crate) struct PruneOutput { impl PruneOutput { /// Returns a [PruneOutput] with `done = true`, `pruned = 0` and `checkpoint = None`. /// Use when no pruning is needed. - pub(crate) fn done() -> Self { + pub(crate) const fn done() -> Self { Self { done: true, pruned: 0, checkpoint: None } } + + /// Returns a [PruneOutput] with `done = false`, `pruned = 0` and `checkpoint = None`. + /// Use when pruning is needed but cannot be done. + pub(crate) const fn not_done() -> Self { + Self { done: false, pruned: 0, checkpoint: None } + } } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] pub(crate) struct PruneOutputCheckpoint { /// Highest pruned block number. If it's [None], the pruning for block `0` is not finished yet. pub(crate) block_number: Option, diff --git a/crates/prune/src/segments/receipts.rs b/crates/prune/src/segments/receipts.rs index 721a57920..ee63a08c6 100644 --- a/crates/prune/src/segments/receipts.rs +++ b/crates/prune/src/segments/receipts.rs @@ -136,20 +136,6 @@ mod tests { .min(next_tx_number_to_prune as usize + input.delete_limit) .sub(1); - let last_pruned_block_number = blocks - .iter() - .fold_while((0, 0), |(_, mut tx_count), block| { - tx_count += block.body.len(); - - if tx_count > last_pruned_tx_number { - Done((block.number, tx_count)) - } else { - Continue((block.number, tx_count)) - } - }) - .into_inner() - .0; - let provider = tx.inner_rw(); let result = segment.prune(&provider, input).unwrap(); assert_matches!( @@ -165,8 +151,20 @@ mod tests { .unwrap(); provider.commit().expect("commit"); - let last_pruned_block_number = - last_pruned_block_number.checked_sub(if result.done { 0 } else { 1 }); + let last_pruned_block_number = blocks + .iter() + .fold_while((0, 0), |(_, mut tx_count), block| { + tx_count += block.body.len(); + + if tx_count > last_pruned_tx_number { + Done((block.number, tx_count)) + } else { + Continue((block.number, tx_count)) + } + }) + .into_inner() + .0 + .checked_sub(if result.done { 0 } else { 1 }); assert_eq!( tx.table::().unwrap().len(), diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index b0d14e648..6c5cc623b 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -219,7 +219,7 @@ impl TestTransaction { /// Inserts total difficulty of headers into the corresponding tables. /// /// Superset functionality of [TestTransaction::insert_headers]. - pub(crate) fn insert_headers_with_td<'a, I>(&self, headers: I) -> Result<(), DbError> + pub fn insert_headers_with_td<'a, I>(&self, headers: I) -> Result<(), DbError> where I: Iterator, { diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 0204dac34..7beae9b5d 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -684,16 +684,19 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { let mut deleted = 0; let mut keys = keys.into_iter(); - for key in &mut keys { - if deleted == limit { - break - } - let row = cursor.seek_exact(key.clone())?; - if let Some(row) = row { - cursor.delete_current()?; - deleted += 1; - delete_callback(row); + if limit != 0 { + for key in &mut keys { + let row = cursor.seek_exact(key.clone())?; + if let Some(row) = row { + cursor.delete_current()?; + deleted += 1; + delete_callback(row); + } + + if deleted == limit { + break + } } } @@ -714,15 +717,17 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { let mut walker = cursor.walk_range(keys)?; let mut deleted = 0; - while let Some(row) = walker.next().transpose()? { - if deleted == limit { - break - } + if limit != 0 { + while let Some(row) = walker.next().transpose()? { + if !skip_filter(&row) { + walker.delete_current()?; + deleted += 1; + delete_callback(row); + } - if !skip_filter(&row) { - walker.delete_current()?; - deleted += 1; - delete_callback(row); + if deleted == limit { + break + } } }