diff --git a/Cargo.lock b/Cargo.lock index 9a7f4d4bc..7d74ed3ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6759,6 +6759,7 @@ name = "reth-prune" version = "0.2.0-beta.4" dependencies = [ "assert_matches", + "derive_more", "itertools 0.12.1", "metrics", "rayon", @@ -6770,6 +6771,7 @@ dependencies = [ "reth-provider", "reth-stages", "reth-tokio-util", + "reth-tracing", "thiserror", "tokio-stream", "tracing", diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index 55b37f812..7aeb8d746 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -434,6 +434,7 @@ where 5, self.base_config.chain_spec.prune_delete_limit, config.max_reorg_depth() as usize, + None, ); let mut hooks = EngineHooks::new(); diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 04851f42b..913001f6b 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -1515,7 +1515,7 @@ impl PeerMetadata { fn new(request_tx: PeerRequestSender, version: EthVersion, client_version: Arc) -> Self { Self { seen_transactions: LruCache::new( - NonZeroUsize::new(DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER).expect("infallible"), + NonZeroUsize::new(DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER).unwrap(), ), request_tx, version, diff --git a/crates/node-builder/src/builder.rs b/crates/node-builder/src/builder.rs index 72372d15e..ee194386d 100644 --- a/crates/node-builder/src/builder.rs +++ b/crates/node-builder/src/builder.rs @@ -677,6 +677,7 @@ where let mut pruner = PrunerBuilder::new(prune_config.clone()) .max_reorg_depth(tree_config.max_reorg_depth() as usize) .prune_delete_limit(config.chain.prune_delete_limit) + .timeout(PrunerBuilder::DEFAULT_TIMEOUT) .build(provider_factory.clone()); let pruner_events = pruner.events(); diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 8e548a233..8210e1a8c 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -77,8 +77,9 @@ pub use net::{ }; pub use peer::{id2pk, pk2id, AnyNode, PeerId, WithPeerId}; pub use prune::{ - PruneCheckpoint, PruneMode, PruneModes, PruneProgress, PrunePurpose, PruneSegment, - PruneSegmentError, ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE, + PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneModes, PruneProgress, + PrunePurpose, PruneSegment, PruneSegmentError, ReceiptsLogPruneConfig, + MINIMUM_PRUNING_DISTANCE, }; pub use receipt::{Receipt, ReceiptWithBloom, ReceiptWithBloomRef, Receipts}; pub use static_file::StaticFileSegment; diff --git a/crates/primitives/src/peer.rs b/crates/primitives/src/peer.rs index 0f6a814c3..f66361f39 100644 --- a/crates/primitives/src/peer.rs +++ b/crates/primitives/src/peer.rs @@ -93,13 +93,13 @@ impl FromStr for AnyNode { fn from_str(s: &str) -> Result { if let Some(rem) = s.strip_prefix("enode://") { if let Ok(record) = NodeRecord::from_str(s) { - return Ok(AnyNode::NodeRecord(record)); + return Ok(AnyNode::NodeRecord(record)) } // incomplete enode if let Ok(peer_id) = PeerId::from_str(rem) { - return Ok(AnyNode::PeerId(peer_id)); + return Ok(AnyNode::PeerId(peer_id)) } - return Err(format!("invalid public key: {rem}")); + return Err(format!("invalid public key: {rem}")) } if s.starts_with("enr:") { return Enr::from_str(s).map(AnyNode::Enr) diff --git a/crates/primitives/src/prune/limiter.rs b/crates/primitives/src/prune/limiter.rs new file mode 100644 index 000000000..94adc1563 --- /dev/null +++ b/crates/primitives/src/prune/limiter.rs @@ -0,0 +1,122 @@ +use std::{ + num::NonZeroUsize, + time::{Duration, Instant}, +}; + +/// Limits a pruner run by either the number of entries (rows in the database) that can be deleted +/// or the time it can run. +#[derive(Debug, Clone, Default)] +pub struct PruneLimiter { + /// Maximum entries (rows in the database) to delete from the database per block. + deleted_entries_limit: Option, + /// Maximum duration of one prune run. + time_limit: Option, +} + +#[derive(Debug, Clone)] +struct PruneDeletedEntriesLimit { + /// Maximum entries (rows in the database) to delete from the database. + limit: usize, + /// Current number of entries (rows in the database) that have been deleted. + deleted: usize, +} + +impl PruneDeletedEntriesLimit { + fn new(limit: usize) -> Self { + Self { limit, deleted: 0 } + } + + fn is_limit_reached(&self) -> bool { + self.deleted >= self.limit + } +} + +#[derive(Debug, Clone)] +struct PruneTimeLimit { + /// Maximum duration of one prune run. + limit: Duration, + /// Time when the prune run has started. + start: Instant, +} + +impl PruneTimeLimit { + fn new(limit: Duration) -> Self { + Self { limit, start: Instant::now() } + } + + fn is_limit_reached(&self) -> bool { + self.start.elapsed() > self.limit + } +} + +impl PruneLimiter { + /// Sets the limit on the number of deleted entries (rows in the database). + /// If the limit was already set, it will be overwritten. + pub fn set_deleted_entries_limit(mut self, limit: usize) -> Self { + if let Some(deleted_entries_limit) = self.deleted_entries_limit.as_mut() { + deleted_entries_limit.limit = limit; + } else { + self.deleted_entries_limit = Some(PruneDeletedEntriesLimit::new(limit)); + } + + self + } + + /// Sets the limit on the number of deleted entries (rows in the database) to a biggest + /// multiple of the given denominator that is smaller than the existing limit. + /// + /// If the limit wasn't set, does nothing. + pub fn floor_deleted_entries_limit_to_multiple_of(mut self, denominator: NonZeroUsize) -> Self { + if let Some(deleted_entries_limit) = self.deleted_entries_limit.as_mut() { + deleted_entries_limit.limit = + (deleted_entries_limit.limit / denominator) * denominator.get(); + } + + self + } + + /// Returns `true` if the limit on the number of deleted entries (rows in the database) is + /// reached. + pub fn is_deleted_entries_limit_reached(&self) -> bool { + self.deleted_entries_limit.as_ref().map_or(false, |limit| limit.is_limit_reached()) + } + + /// Increments the number of deleted entries by the given number. + pub fn increment_deleted_entries_count_by(&mut self, entries: usize) { + if let Some(limit) = self.deleted_entries_limit.as_mut() { + limit.deleted += entries; + } + } + + /// Increments the number of deleted entries by one. + pub fn increment_deleted_entries_count(&mut self) { + self.increment_deleted_entries_count_by(1) + } + + /// Returns the number of deleted entries left before the limit is reached. + pub fn deleted_entries_limit_left(&self) -> Option { + self.deleted_entries_limit.as_ref().map(|limit| limit.limit - limit.deleted) + } + + /// Returns the limit on the number of deleted entries (rows in the database). + pub fn deleted_entries_limit(&self) -> Option { + self.deleted_entries_limit.as_ref().map(|limit| limit.limit) + } + + /// Sets the time limit. + pub fn set_time_limit(mut self, limit: Duration) -> Self { + self.time_limit = Some(PruneTimeLimit::new(limit)); + + self + } + + /// Returns `true` if time limit is reached. + pub fn is_time_limit_reached(&self) -> bool { + self.time_limit.as_ref().map_or(false, |limit| limit.is_limit_reached()) + } + + /// Returns `true` if any limit is reached. + pub fn is_limit_reached(&self) -> bool { + self.is_deleted_entries_limit_reached() || self.is_time_limit_reached() + } +} diff --git a/crates/primitives/src/prune/mod.rs b/crates/primitives/src/prune/mod.rs index b11aef432..07da6132f 100644 --- a/crates/primitives/src/prune/mod.rs +++ b/crates/primitives/src/prune/mod.rs @@ -1,10 +1,12 @@ mod checkpoint; +mod limiter; mod mode; mod segment; mod target; use crate::{Address, BlockNumber}; pub use checkpoint::PruneCheckpoint; +pub use limiter::PruneLimiter; pub use mode::PruneMode; pub use segment::{PrunePurpose, PruneSegment, PruneSegmentError}; use serde::{Deserialize, Serialize}; @@ -91,21 +93,61 @@ impl ReceiptsLogPruneConfig { #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub enum PruneProgress { /// There is more data to prune. - HasMoreData, + HasMoreData(PruneInterruptReason), /// Pruning has been finished. Finished, } +/// Reason for interrupting a prune run. +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum PruneInterruptReason { + /// Prune run timed out. + Timeout, + /// Limit on the number of deleted entries (rows in the database) per prune run was reached. + DeletedEntriesLimitReached, + /// Unknown reason for stopping prune run. + Unknown, +} + +impl PruneInterruptReason { + /// Creates new [PruneInterruptReason] based on the [PruneLimiter]. + pub fn new(limiter: &PruneLimiter) -> Self { + if limiter.is_time_limit_reached() { + Self::Timeout + } else if limiter.is_deleted_entries_limit_reached() { + Self::DeletedEntriesLimitReached + } else { + Self::Unknown + } + } + + /// Returns `true` if the reason is timeout. + pub const fn is_timeout(&self) -> bool { + matches!(self, Self::Timeout) + } + + /// Returns `true` if the reason is reaching the limit on deleted entries. + pub const fn is_entries_limit_reached(&self) -> bool { + matches!(self, Self::DeletedEntriesLimitReached) + } +} + impl PruneProgress { - /// Creates new [PruneProgress] from `done` boolean value. + /// Creates new [PruneProgress]. /// - /// If `done == true`, returns [PruneProgress::Finished], otherwise [PruneProgress::HasMoreData] - /// is returned. - pub fn from_done(done: bool) -> Self { + /// If `done == true`, returns [PruneProgress::Finished], otherwise + /// [PruneProgress::HasMoreData] is returned with [PruneInterruptReason] according to the passed + /// limiter. + pub fn new(done: bool, limiter: &PruneLimiter) -> Self { if done { Self::Finished } else { - Self::HasMoreData + Self::HasMoreData(PruneInterruptReason::new(limiter)) } } + + /// Returns `true` if prune run is finished. + pub const fn is_finished(&self) -> bool { + matches!(self, Self::Finished) + } } diff --git a/crates/prune/Cargo.toml b/crates/prune/Cargo.toml index d59da8096..3a8971a66 100644 --- a/crates/prune/Cargo.toml +++ b/crates/prune/Cargo.toml @@ -35,7 +35,9 @@ tokio-stream.workspace = true # reth reth-db = { workspace = true, features = ["test-utils"] } reth-stages = { workspace = true, features = ["test-utils"] } +reth-tracing.workspace = true # misc +derive_more.workspace = true assert_matches.workspace = true diff --git a/crates/prune/src/builder.rs b/crates/prune/src/builder.rs index 5836688bf..377a98664 100644 --- a/crates/prune/src/builder.rs +++ b/crates/prune/src/builder.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::{segments::SegmentSet, Pruner}; use reth_config::PruneConfig; use reth_db::database::Database; @@ -17,9 +19,14 @@ pub struct PrunerBuilder { /// the amount of blocks between pruner runs to account for the difference in amount of new /// data coming in. pub prune_delete_limit: usize, + /// Time a pruner job can run before timing out. + pub timeout: Option, } impl PrunerBuilder { + /// Default timeout for a prune run. + pub const DEFAULT_TIMEOUT: Duration = Duration::from_millis(100); + /// Creates a new [PrunerBuilder] from the given [PruneConfig]. pub fn new(pruner_config: PruneConfig) -> Self { PrunerBuilder::default() @@ -51,6 +58,15 @@ impl PrunerBuilder { self } + /// Sets the timeout for pruner, per run. + /// + /// CAUTION: Account and Storage History prune segments treat this timeout as a soft limit, + /// meaning they can go beyond it. + pub fn timeout(mut self, timeout: Duration) -> Self { + self.timeout = Some(timeout); + self + } + /// Builds a [Pruner] from the current configuration. pub fn build(self, provider_factory: ProviderFactory) -> Pruner { let segments = SegmentSet::::from_prune_modes(self.segments); @@ -61,6 +77,7 @@ impl PrunerBuilder { self.block_interval, self.prune_delete_limit, self.max_reorg_depth, + self.timeout, ) } } @@ -72,6 +89,7 @@ impl Default for PrunerBuilder { segments: PruneModes::none(), max_reorg_depth: 64, prune_delete_limit: MAINNET.prune_delete_limit, + timeout: Some(Self::DEFAULT_TIMEOUT), } } } diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index b5e0cc1de..750284ad5 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -7,11 +7,15 @@ use crate::{ }; use reth_db::database::Database; use reth_primitives::{ - BlockNumber, PruneMode, PruneProgress, PrunePurpose, PruneSegment, StaticFileSegment, + BlockNumber, PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment, + StaticFileSegment, }; use reth_provider::{DatabaseProviderRW, ProviderFactory, PruneCheckpointReader}; use reth_tokio_util::EventListeners; -use std::{collections::BTreeMap, time::Instant}; +use std::{ + collections::BTreeMap, + time::{Duration, Instant}, +}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::debug; @@ -36,10 +40,13 @@ pub struct Pruner { /// conjunction with `min_block_interval` to determine when the pruning needs to be initiated. previous_tip_block_number: Option, /// Maximum total entries to prune (delete from database) per block. - delete_limit: usize, + delete_limit_per_block: usize, /// Maximum number of blocks to be pruned per run, as an additional restriction to /// `previous_tip_block_number`. prune_max_blocks_per_run: usize, + /// Maximum time for a one pruner run. + timeout: Option, + #[doc(hidden)] metrics: Metrics, listeners: EventListeners, } @@ -52,14 +59,16 @@ impl Pruner { min_block_interval: usize, delete_limit: usize, prune_max_blocks_per_run: usize, + timeout: Option, ) -> Self { Self { provider_factory, segments, min_block_interval, previous_tip_block_number: None, - delete_limit, + delete_limit_per_block: delete_limit, prune_max_blocks_per_run, + timeout, metrics: Metrics::default(), listeners: Default::default(), } @@ -100,11 +109,16 @@ impl Pruner { tip_block_number.saturating_sub(previous_tip_block_number) as usize }) .min(self.prune_max_blocks_per_run); - let delete_limit = self.delete_limit * blocks_since_last_run; + + let mut limiter = PruneLimiter::default() + .set_deleted_entries_limit(self.delete_limit_per_block * blocks_since_last_run); + if let Some(timeout) = self.timeout { + limiter = limiter.set_time_limit(timeout); + }; let provider = self.provider_factory.provider_rw()?; - let (stats, delete_limit, progress) = - self.prune_segments(&provider, tip_block_number, delete_limit)?; + let (stats, deleted_entries, progress) = + self.prune_segments(&provider, tip_block_number, &mut limiter)?; provider.commit()?; self.previous_tip_block_number = Some(tip_block_number); @@ -112,14 +126,20 @@ impl Pruner { let elapsed = start.elapsed(); self.metrics.duration_seconds.record(elapsed); + let message = match progress { + PruneProgress::HasMoreData(_) => "Pruner interrupted and has more data to prune", + PruneProgress::Finished => "Pruner finished", + }; + debug!( target: "pruner", %tip_block_number, ?elapsed, - %delete_limit, + ?deleted_entries, + ?limiter, ?progress, ?stats, - "Pruner finished" + "{message}", ); self.listeners.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats }); @@ -128,15 +148,15 @@ impl Pruner { } /// Prunes the segments that the [Pruner] was initialized with, and the segments that needs to - /// be pruned according to the highest static_files. + /// be pruned according to the highest static_files. Segments are parts of the database that + /// represent one or more tables. /// - /// Returns [PrunerStats], `delete_limit` that remained after pruning all segments, and - /// [PruneProgress]. + /// Returns [PrunerStats], total number of entries pruned, and [PruneProgress]. fn prune_segments( &mut self, provider: &DatabaseProviderRW, tip_block_number: BlockNumber, - mut delete_limit: usize, + limiter: &mut PruneLimiter, ) -> Result<(PrunerStats, usize, PruneProgress), PrunerError> { let static_file_segments = self.static_file_segments(); let segments = static_file_segments @@ -144,11 +164,12 @@ impl Pruner { .map(|segment| (segment, PrunePurpose::StaticFile)) .chain(self.segments.iter().map(|segment| (segment, PrunePurpose::User))); - let mut done = true; let mut stats = PrunerStats::new(); + let mut pruned = 0; + let mut progress = PruneProgress::Finished; for (segment, purpose) in segments { - if delete_limit == 0 { + if limiter.is_limit_reached() { break } @@ -169,8 +190,10 @@ impl Pruner { let segment_start = Instant::now(); let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?; - let output = segment - .prune(provider, PruneInput { previous_checkpoint, to_block, delete_limit })?; + let output = segment.prune( + provider, + PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() }, + )?; if let Some(checkpoint) = output.checkpoint { segment .save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?; @@ -185,8 +208,7 @@ impl Pruner { .highest_pruned_block .set(to_block as f64); - done = done && output.done; - delete_limit = delete_limit.saturating_sub(output.pruned); + progress = output.progress; debug!( target: "pruner", @@ -199,17 +221,16 @@ impl Pruner { ); if output.pruned > 0 { - stats.insert( - segment.segment(), - (PruneProgress::from_done(output.done), output.pruned), - ); + limiter.increment_deleted_entries_count_by(output.pruned); + pruned += output.pruned; + stats.insert(segment.segment(), (output.progress, output.pruned)); } } else { debug!(target: "pruner", segment = ?segment.segment(), ?purpose, "Nothing to prune for the segment"); } } - Ok((stats, delete_limit, PruneProgress::from_done(done))) + Ok((stats, pruned, progress)) } /// Returns pre-configured segments that needs to be pruned according to the highest @@ -266,6 +287,7 @@ impl Pruner { #[cfg(test)] mod tests { + use crate::Pruner; use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir}; use reth_primitives::MAINNET; @@ -277,7 +299,7 @@ mod tests { let (_static_dir, static_dir_path) = create_test_static_files_dir(); let provider_factory = ProviderFactory::new(db, MAINNET.clone(), static_dir_path) .expect("create provide factory with static_files"); - let mut pruner = Pruner::new(provider_factory, vec![], 5, 0, 5); + let mut pruner = Pruner::new(provider_factory, vec![], 5, 0, 5, None); // No last pruned block number was set before let first_block_number = 1; diff --git a/crates/prune/src/segments/account_history.rs b/crates/prune/src/segments/account_history.rs index a18897640..c0d929298 100644 --- a/crates/prune/src/segments/account_history.rs +++ b/crates/prune/src/segments/account_history.rs @@ -5,10 +5,16 @@ use crate::{ PrunerError, }; use reth_db::{database::Database, models::ShardedKey, tables}; -use reth_primitives::{PruneMode, PruneSegment}; +use reth_primitives::{PruneInterruptReason, PruneMode, PruneProgress, PruneSegment}; use reth_provider::DatabaseProviderRW; use tracing::{instrument, trace}; +/// Number of account history tables to prune in one step. +/// +/// Account History consists of two tables: [tables::AccountChangeSets] and +/// [tables::AccountsHistory]. We want to prune them to the same block number. +const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2; + #[derive(Debug)] pub struct AccountHistory { mode: PruneMode, @@ -44,11 +50,23 @@ impl Segment for AccountHistory { }; let range_end = *range.end(); + let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { + input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE) + } else { + input.limiter + }; + if limiter.is_limit_reached() { + return Ok(PruneOutput::not_done( + PruneInterruptReason::new(&limiter), + input.previous_checkpoint.map(|checkpoint| checkpoint.into()), + )) + } + let mut last_changeset_pruned_block = None; let (pruned_changesets, done) = provider .prune_table_with_range::( range, - input.delete_limit / 2, + &mut limiter, |_| false, |row| last_changeset_pruned_block = Some(row.0), )?; @@ -66,10 +84,12 @@ impl Segment for AccountHistory { |a, b| a.key == b.key, |key| ShardedKey::last(key.key), )?; - trace!(target: "pruner", %processed, pruned = %pruned_indices, %done, "Pruned account history (history)" ); + trace!(target: "pruner", %processed, pruned = %pruned_indices, %done, "Pruned account history (history)"); + + let progress = PruneProgress::new(done, &limiter); Ok(PruneOutput { - done, + progress, pruned: pruned_changesets + pruned_indices, checkpoint: Some(PruneOutputCheckpoint { block_number: Some(last_changeset_pruned_block), @@ -81,14 +101,20 @@ impl Segment for AccountHistory { #[cfg(test)] mod tests { - use crate::segments::{AccountHistory, PruneInput, PruneOutput, Segment}; + use crate::segments::{ + account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput, PruneOutput, + Segment, + }; use assert_matches::assert_matches; use reth_db::{tables, BlockNumberList}; use reth_interfaces::test_utils::{ generators, generators::{random_block_range, random_changeset_range, random_eoa_accounts}, }; - use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256}; + use reth_primitives::{ + BlockNumber, PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, + PruneSegment, B256, + }; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::{collections::BTreeMap, ops::AddAssign}; @@ -129,114 +155,126 @@ mod tests { let original_shards = db.table::().unwrap(); - let test_prune = |to_block: BlockNumber, run: usize, expected_result: (bool, usize)| { - let prune_mode = PruneMode::Before(to_block); - let input = PruneInput { - previous_checkpoint: db - .factory - .provider() - .unwrap() - .get_prune_checkpoint(PruneSegment::AccountHistory) - .unwrap(), - to_block, - delete_limit: 2000, - }; - let segment = AccountHistory::new(prune_mode); + let test_prune = + |to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| { + let prune_mode = PruneMode::Before(to_block); + let deleted_entries_limit = 2000; + let mut limiter = + PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit); + let input = PruneInput { + previous_checkpoint: db + .factory + .provider() + .unwrap() + .get_prune_checkpoint(PruneSegment::AccountHistory) + .unwrap(), + to_block, + limiter: limiter.clone(), + }; + let segment = AccountHistory::new(prune_mode); - let provider = db.factory.provider_rw().unwrap(); - let result = segment.prune(&provider, input).unwrap(); - assert_matches!( - result, - PruneOutput {done, pruned, checkpoint: Some(_)} - if (done, pruned) == expected_result - ); - segment - .save_checkpoint( - &provider, - result.checkpoint.unwrap().as_prune_checkpoint(prune_mode), - ) - .unwrap(); - provider.commit().expect("commit"); + let provider = db.factory.provider_rw().unwrap(); + let result = segment.prune(&provider, input).unwrap(); + limiter.increment_deleted_entries_count_by(result.pruned); - let changesets = changesets - .iter() - .enumerate() - .flat_map(|(block_number, changeset)| { - changeset.iter().map(move |change| (block_number, change)) - }) - .collect::>(); + assert_matches!( + result, + PruneOutput {progress, pruned, checkpoint: Some(_)} + if (progress, pruned) == expected_result + ); - #[allow(clippy::skip_while_next)] - let pruned = changesets - .iter() - .enumerate() - .skip_while(|(i, (block_number, _))| { - *i < input.delete_limit / 2 * run && *block_number <= to_block as usize - }) + segment + .save_checkpoint( + &provider, + result.checkpoint.unwrap().as_prune_checkpoint(prune_mode), + ) + .unwrap(); + provider.commit().expect("commit"); + + let changesets = changesets + .iter() + .enumerate() + .flat_map(|(block_number, changeset)| { + changeset.iter().map(move |change| (block_number, change)) + }) + .collect::>(); + + #[allow(clippy::skip_while_next)] + let pruned = changesets + .iter() + .enumerate() + .skip_while(|(i, (block_number, _))| { + *i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run && + *block_number <= to_block as usize + }) + .next() + .map(|(i, _)| i) + .unwrap_or_default(); + + let mut pruned_changesets = changesets + .iter() + // Skip what we've pruned so far, subtracting one to get last pruned block + // number further down + .skip(pruned.saturating_sub(1)); + + let last_pruned_block_number = pruned_changesets .next() - .map(|(i, _)| i) - .unwrap_or_default(); - - let mut pruned_changesets = changesets - .iter() - // Skip what we've pruned so far, subtracting one to get last pruned block number - // further down - .skip(pruned.saturating_sub(1)); - - let last_pruned_block_number = pruned_changesets - .next() - .map(|(block_number, _)| if result.done { + .map(|(block_number, _)| if result.progress.is_finished() { *block_number } else { block_number.saturating_sub(1) } as BlockNumber) .unwrap_or(to_block); - let pruned_changesets = pruned_changesets.fold( - BTreeMap::<_, Vec<_>>::new(), - |mut acc, (block_number, change)| { - acc.entry(block_number).or_default().push(change); - acc - }, - ); + let pruned_changesets = pruned_changesets.fold( + BTreeMap::<_, Vec<_>>::new(), + |mut acc, (block_number, change)| { + acc.entry(block_number).or_default().push(change); + acc + }, + ); - assert_eq!( - db.table::().unwrap().len(), - pruned_changesets.values().flatten().count() - ); + assert_eq!( + db.table::().unwrap().len(), + pruned_changesets.values().flatten().count() + ); - let actual_shards = db.table::().unwrap(); + let actual_shards = db.table::().unwrap(); - let expected_shards = original_shards - .iter() - .filter(|(key, _)| key.highest_block_number > last_pruned_block_number) - .map(|(key, blocks)| { - let new_blocks = blocks - .iter() - .skip_while(|block| *block <= last_pruned_block_number) - .collect::>(); - (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) - }) - .collect::>(); + let expected_shards = original_shards + .iter() + .filter(|(key, _)| key.highest_block_number > last_pruned_block_number) + .map(|(key, blocks)| { + let new_blocks = blocks + .iter() + .skip_while(|block| *block <= last_pruned_block_number) + .collect::>(); + (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) + }) + .collect::>(); - assert_eq!(actual_shards, expected_shards); + assert_eq!(actual_shards, expected_shards); - assert_eq!( - db.factory - .provider() - .unwrap() - .get_prune_checkpoint(PruneSegment::AccountHistory) - .unwrap(), - Some(PruneCheckpoint { - block_number: Some(last_pruned_block_number), - tx_number: None, - prune_mode - }) - ); - }; + assert_eq!( + db.factory + .provider() + .unwrap() + .get_prune_checkpoint(PruneSegment::AccountHistory) + .unwrap(), + Some(PruneCheckpoint { + block_number: Some(last_pruned_block_number), + tx_number: None, + prune_mode + }) + ); + }; - test_prune(998, 1, (false, 1000)); - test_prune(998, 2, (true, 998)); - test_prune(1400, 3, (true, 804)); + test_prune( + 998, + 1, + (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000), + ); + test_prune(998, 2, (PruneProgress::Finished, 998)); + test_prune(1400, 3, (PruneProgress::Finished, 804)); } } diff --git a/crates/prune/src/segments/headers.rs b/crates/prune/src/segments/headers.rs index 2da191375..64263c881 100644 --- a/crates/prune/src/segments/headers.rs +++ b/crates/prune/src/segments/headers.rs @@ -1,15 +1,24 @@ +use std::num::NonZeroUsize; + use crate::{ segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment}, PrunerError, }; use itertools::Itertools; -use reth_db::{database::Database, table::Table, tables}; -use reth_interfaces::RethResult; -use reth_primitives::{BlockNumber, PruneMode, PruneSegment}; +use reth_db::{ + cursor::{DbCursorRO, RangeWalker}, + database::Database, + tables, + transaction::DbTxMut, +}; + +use reth_primitives::{BlockNumber, PruneLimiter, PruneMode, PruneProgress, PruneSegment}; use reth_provider::DatabaseProviderRW; -use std::ops::RangeInclusive; use tracing::{instrument, trace}; +/// Number of header tables to prune in one step +const HEADER_TABLES_TO_PRUNE: usize = 3; + #[derive(Debug)] pub struct Headers { mode: PruneMode, @@ -36,90 +45,168 @@ impl Segment for Headers { provider: &DatabaseProviderRW, input: PruneInput, ) -> Result { - let block_range = match input.get_next_block_range() { - Some(range) => range, + let (block_range_start, block_range_end) = match input.get_next_block_range() { + Some(range) => (*range.start(), *range.end()), None => { trace!(target: "pruner", "No headers to prune"); return Ok(PruneOutput::done()) } }; - let delete_limit = input.delete_limit / 3; - if delete_limit == 0 { - // Nothing to do, `input.delete_limit` is less than 3, so we can't prune all - // headers-related tables up to the same height - return Ok(PruneOutput::not_done()) - } + let last_pruned_block = + if block_range_start == 0 { None } else { Some(block_range_start - 1) }; - let results = [ - self.prune_table::(provider, block_range.clone(), delete_limit)?, - self.prune_table::( - provider, - block_range.clone(), - delete_limit, - )?, - self.prune_table::(provider, block_range, delete_limit)?, - ]; + let range = last_pruned_block.map_or(0, |block| block + 1)..=block_range_end; - if !results.iter().map(|(_, _, last_pruned_block)| last_pruned_block).all_equal() { - return Err(PrunerError::InconsistentData( - "All headers-related tables should be pruned up to the same height", - )) - } + let mut headers_cursor = provider.tx_ref().cursor_write::()?; + let mut header_tds_cursor = + provider.tx_ref().cursor_write::()?; + let mut canonical_headers_cursor = + provider.tx_ref().cursor_write::()?; - let (done, pruned, last_pruned_block) = results.into_iter().fold( - (true, 0, 0), - |(total_done, total_pruned, _), (done, pruned, last_pruned_block)| { - (total_done && done, total_pruned + pruned, last_pruned_block) - }, + let mut limiter = input.limiter.floor_deleted_entries_limit_to_multiple_of( + NonZeroUsize::new(HEADER_TABLES_TO_PRUNE).unwrap(), ); + let tables_iter = HeaderTablesIter::new( + provider, + &mut limiter, + headers_cursor.walk_range(range.clone())?, + header_tds_cursor.walk_range(range.clone())?, + canonical_headers_cursor.walk_range(range)?, + ); + + let mut last_pruned_block: Option = None; + let mut pruned = 0; + for res in tables_iter { + let HeaderTablesIterItem { pruned_block, entries_pruned } = res?; + last_pruned_block = Some(pruned_block); + pruned += entries_pruned; + } + + let done = last_pruned_block.map_or(false, |block| block == block_range_end); + let progress = PruneProgress::new(done, &limiter); + Ok(PruneOutput { - done, + progress, pruned, checkpoint: Some(PruneOutputCheckpoint { - block_number: Some(last_pruned_block), + block_number: last_pruned_block, tx_number: None, }), }) } } -impl Headers { - /// Prune one headers-related table. - /// - /// Returns `done`, number of pruned rows and last pruned block number. - fn prune_table>( - &self, - provider: &DatabaseProviderRW, - range: RangeInclusive, - delete_limit: usize, - ) -> RethResult<(bool, usize, BlockNumber)> { - let mut last_pruned_block = *range.end(); - let (pruned, done) = provider.prune_table_with_range::( - range, - delete_limit, - |_| false, - |row| last_pruned_block = row.0, - )?; - trace!(target: "pruner", %pruned, %done, table = %T::TABLE, "Pruned headers"); +type Walker<'a, DB, T> = RangeWalker<'a, T, <::TXMut as DbTxMut>::CursorMut>; - Ok((done, pruned, last_pruned_block)) +#[allow(missing_debug_implementations)] +struct HeaderTablesIter<'a, DB> +where + DB: Database, +{ + provider: &'a DatabaseProviderRW, + limiter: &'a mut PruneLimiter, + headers_walker: Walker<'a, DB, tables::Headers>, + header_tds_walker: Walker<'a, DB, tables::HeaderTerminalDifficulties>, + canonical_headers_walker: Walker<'a, DB, tables::CanonicalHeaders>, +} + +struct HeaderTablesIterItem { + pruned_block: BlockNumber, + entries_pruned: usize, +} + +impl<'a, DB> HeaderTablesIter<'a, DB> +where + DB: Database, +{ + fn new( + provider: &'a DatabaseProviderRW, + limiter: &'a mut PruneLimiter, + headers_walker: Walker<'a, DB, tables::Headers>, + header_tds_walker: Walker<'a, DB, tables::HeaderTerminalDifficulties>, + canonical_headers_walker: Walker<'a, DB, tables::CanonicalHeaders>, + ) -> Self { + Self { provider, limiter, headers_walker, header_tds_walker, canonical_headers_walker } + } +} + +impl<'a, DB> Iterator for HeaderTablesIter<'a, DB> +where + DB: Database, +{ + type Item = Result; + fn next(&mut self) -> Option { + if self.limiter.is_limit_reached() { + return None + } + + let mut pruned_block_headers = None; + let mut pruned_block_td = None; + let mut pruned_block_canonical = None; + + if let Err(err) = self.provider.prune_table_with_range_step( + &mut self.headers_walker, + self.limiter, + &mut |_| false, + &mut |row| pruned_block_headers = Some(row.0), + ) { + return Some(Err(err.into())) + } + + if let Err(err) = self.provider.prune_table_with_range_step( + &mut self.header_tds_walker, + self.limiter, + &mut |_| false, + &mut |row| pruned_block_td = Some(row.0), + ) { + return Some(Err(err.into())) + } + + if let Err(err) = self.provider.prune_table_with_range_step( + &mut self.canonical_headers_walker, + self.limiter, + &mut |_| false, + &mut |row| pruned_block_canonical = Some(row.0), + ) { + return Some(Err(err.into())) + } + + if ![pruned_block_headers, pruned_block_td, pruned_block_canonical].iter().all_equal() { + return Some(Err(PrunerError::InconsistentData( + "All headers-related tables should be pruned up to the same height", + ))) + } + + pruned_block_headers.map(move |block| { + Ok(HeaderTablesIterItem { pruned_block: block, entries_pruned: HEADER_TABLES_TO_PRUNE }) + }) } } #[cfg(test)] mod tests { - use crate::segments::{Headers, PruneInput, PruneOutput, Segment}; use assert_matches::assert_matches; use reth_db::{tables, transaction::DbTx}; use reth_interfaces::test_utils::{generators, generators::random_header_range}; - use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256, U256}; + use reth_primitives::{ + BlockNumber, PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, + PruneSegment, B256, U256, + }; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::TestStageDB; + use tracing::trace; + + use crate::segments::{ + headers::HEADER_TABLES_TO_PRUNE, Headers, PruneInput, PruneOutput, PruneOutputCheckpoint, + Segment, + }; #[test] fn prune() { + reth_tracing::init_test_tracing(); + let db = TestStageDB::default(); let mut rng = generators::rng(); @@ -134,8 +221,10 @@ mod tests { assert_eq!(db.table::().unwrap().len(), headers.len()); assert_eq!(db.table::().unwrap().len(), headers.len()); - let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| { + let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| { let prune_mode = PruneMode::Before(to_block); + let segment = Headers::new(prune_mode); + let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10); let input = PruneInput { previous_checkpoint: db .factory @@ -144,9 +233,8 @@ mod tests { .get_prune_checkpoint(PruneSegment::Headers) .unwrap(), to_block, - delete_limit: 10, + limiter: limiter.clone(), }; - let segment = Headers::new(prune_mode); let next_block_number_to_prune = db .factory @@ -159,11 +247,19 @@ mod tests { .unwrap_or_default(); let provider = db.factory.provider_rw().unwrap(); - let result = segment.prune(&provider, input).unwrap(); + let result = segment.prune(&provider, input.clone()).unwrap(); + limiter.increment_deleted_entries_count_by(result.pruned); + trace!(target: "pruner::test", + expected_prune_progress=?expected_result.0, + expected_pruned=?expected_result.1, + result=?result, + "PruneOutput" + ); + assert_matches!( result, - PruneOutput {done, pruned, checkpoint: Some(_)} - if (done, pruned) == expected_result + PruneOutput {progress, pruned, checkpoint: Some(_)} + if (progress, pruned) == expected_result ); segment .save_checkpoint( @@ -173,8 +269,11 @@ mod tests { .unwrap(); provider.commit().expect("commit"); - let last_pruned_block_number = to_block - .min(next_block_number_to_prune + input.delete_limit as BlockNumber / 3 - 1); + let last_pruned_block_number = to_block.min( + next_block_number_to_prune + + (input.limiter.deleted_entries_limit().unwrap() / HEADER_TABLES_TO_PRUNE - 1) + as u64, + ); assert_eq!( db.table::().unwrap().len(), @@ -198,24 +297,35 @@ mod tests { ); }; - test_prune(3, (false, 9)); - test_prune(3, (true, 3)); + test_prune( + 3, + (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 9), + ); + test_prune(3, (PruneProgress::Finished, 3)); } #[test] fn prune_cannot_be_done() { let db = TestStageDB::default(); + let segment = Headers::new(PruneMode::Full); + let limiter = PruneLimiter::default().set_deleted_entries_limit(0); + let input = PruneInput { previous_checkpoint: None, to_block: 1, // Less than total number of tables for `Headers` segment - delete_limit: 2, + limiter, }; - let segment = Headers::new(PruneMode::Full); let provider = db.factory.provider_rw().unwrap(); let result = segment.prune(&provider, input).unwrap(); - assert_eq!(result, PruneOutput::not_done()); + assert_eq!( + result, + PruneOutput::not_done( + PruneInterruptReason::DeletedEntriesLimitReached, + Some(PruneOutputCheckpoint::default()) + ) + ); } } diff --git a/crates/prune/src/segments/mod.rs b/crates/prune/src/segments/mod.rs index 5e644e227..82b95bc07 100644 --- a/crates/prune/src/segments/mod.rs +++ b/crates/prune/src/segments/mod.rs @@ -1,6 +1,6 @@ mod account_history; mod headers; -mod history; +pub(super) mod history; mod receipts; mod receipts_by_logs; mod sender_recovery; @@ -23,7 +23,10 @@ pub use transactions::Transactions; use crate::PrunerError; use reth_db::database::Database; use reth_interfaces::{provider::ProviderResult, RethResult}; -use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber}; +use reth_primitives::{ + BlockNumber, PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, + PruneSegment, TxNumber, +}; use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointWriter}; use std::ops::RangeInclusive; use tracing::error; @@ -60,13 +63,14 @@ pub trait Segment: Debug + Send + Sync { } /// Segment pruning input, see [Segment::prune]. -#[derive(Debug, Clone, Copy)] +#[derive(Debug)] +#[cfg_attr(test, derive(Clone))] pub struct PruneInput { pub(crate) previous_checkpoint: Option, /// Target block up to which the pruning needs to be done, inclusive. pub(crate) to_block: BlockNumber, - /// Maximum entries to delete from the database. - pub(crate) delete_limit: usize, + /// Limits pruning of a segment. + pub(crate) limiter: PruneLimiter, } impl PruneInput { @@ -125,14 +129,7 @@ impl PruneInput { /// /// To get the range end: use block `to_block`. pub(crate) fn get_next_block_range(&self) -> Option> { - let from_block = self - .previous_checkpoint - .and_then(|checkpoint| checkpoint.block_number) - // Checkpoint exists, prune from the next block after the highest pruned one - .map(|block_number| block_number + 1) - // No checkpoint exists, prune from genesis - .unwrap_or(0); - + let from_block = self.get_start_next_block_range(); let range = from_block..=self.to_block; if range.is_empty() { return None @@ -140,14 +137,25 @@ impl PruneInput { Some(range) } + + /// Returns the start of the next block range. + /// + /// 1. If checkpoint exists, use next block. + /// 2. If checkpoint doesn't exist, use block 0. + pub(crate) fn get_start_next_block_range(&self) -> u64 { + self.previous_checkpoint + .and_then(|checkpoint| checkpoint.block_number) + // Checkpoint exists, prune from the next block after the highest pruned one + .map(|block_number| block_number + 1) + // No checkpoint exists, prune from genesis + .unwrap_or(0) + } } /// Segment pruning output, see [Segment::prune]. #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub struct PruneOutput { - /// `true` if pruning has been completed up to the target block, and `false` if there's more - /// data to prune in further runs. - pub(crate) done: bool, + pub(crate) progress: PruneProgress, /// Number of entries pruned, i.e. deleted from the database. pub(crate) pruned: usize, /// Pruning checkpoint to save to database, if any. @@ -158,17 +166,20 @@ impl PruneOutput { /// Returns a [PruneOutput] with `done = true`, `pruned = 0` and `checkpoint = None`. /// Use when no pruning is needed. pub(crate) const fn done() -> Self { - Self { done: true, pruned: 0, checkpoint: None } + Self { progress: PruneProgress::Finished, pruned: 0, checkpoint: None } } /// Returns a [PruneOutput] with `done = false`, `pruned = 0` and `checkpoint = None`. /// Use when pruning is needed but cannot be done. - pub(crate) const fn not_done() -> Self { - Self { done: false, pruned: 0, checkpoint: None } + pub(crate) const fn not_done( + reason: PruneInterruptReason, + checkpoint: Option, + ) -> Self { + Self { progress: PruneProgress::HasMoreData(reason), pruned: 0, checkpoint } } } -#[derive(Debug, Clone, Copy, Eq, PartialEq)] +#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)] pub(crate) struct PruneOutputCheckpoint { /// Highest pruned block number. If it's [None], the pruning for block `0` is not finished yet. pub(crate) block_number: Option, @@ -182,3 +193,9 @@ impl PruneOutputCheckpoint { PruneCheckpoint { block_number: self.block_number, tx_number: self.tx_number, prune_mode } } } + +impl From for PruneOutputCheckpoint { + fn from(checkpoint: PruneCheckpoint) -> Self { + Self { block_number: checkpoint.block_number, tx_number: checkpoint.tx_number } + } +} diff --git a/crates/prune/src/segments/receipts.rs b/crates/prune/src/segments/receipts.rs index d1ce5324e..4ae58db3e 100644 --- a/crates/prune/src/segments/receipts.rs +++ b/crates/prune/src/segments/receipts.rs @@ -4,7 +4,7 @@ use crate::{ }; use reth_db::{database::Database, tables}; use reth_interfaces::provider::ProviderResult; -use reth_primitives::{PruneCheckpoint, PruneMode, PruneSegment}; +use reth_primitives::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment}; use reth_provider::{DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider}; use tracing::{instrument, trace}; @@ -43,10 +43,12 @@ impl Segment for Receipts { }; let tx_range_end = *tx_range.end(); + let mut limiter = input.limiter; + let mut last_pruned_transaction = tx_range_end; let (pruned, done) = provider.prune_table_with_range::( tx_range, - input.delete_limit, + &mut limiter, |_| false, |row| last_pruned_transaction = row.0, )?; @@ -59,8 +61,10 @@ impl Segment for Receipts { // so we could finish pruning its receipts on the next run. .checked_sub(if done { 0 } else { 1 }); + let progress = PruneProgress::new(done, &limiter); + Ok(PruneOutput { - done, + progress, pruned, checkpoint: Some(PruneOutputCheckpoint { block_number: last_pruned_block, @@ -97,7 +101,10 @@ mod tests { generators, generators::{random_block_range, random_receipt}, }; - use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; + use reth_primitives::{ + BlockNumber, PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, + PruneSegment, TxNumber, B256, + }; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::ops::Sub; @@ -128,8 +135,10 @@ mod tests { db.table::().unwrap().len() ); - let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| { + let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| { let prune_mode = PruneMode::Before(to_block); + let segment = Receipts::new(prune_mode); + let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10); let input = PruneInput { previous_checkpoint: db .factory @@ -138,9 +147,8 @@ mod tests { .get_prune_checkpoint(PruneSegment::Receipts) .unwrap(), to_block, - delete_limit: 10, + limiter: limiter.clone(), }; - let segment = Receipts::new(prune_mode); let next_tx_number_to_prune = db .factory @@ -157,16 +165,22 @@ mod tests { .take(to_block as usize) .map(|block| block.body.len()) .sum::() - .min(next_tx_number_to_prune as usize + input.delete_limit) + .min( + next_tx_number_to_prune as usize + + input.limiter.deleted_entries_limit().unwrap(), + ) .sub(1); let provider = db.factory.provider_rw().unwrap(); let result = segment.prune(&provider, input).unwrap(); + limiter.increment_deleted_entries_count_by(result.pruned); + assert_matches!( result, - PruneOutput {done, pruned, checkpoint: Some(_)} - if (done, pruned) == expected_result + PruneOutput {progress, pruned, checkpoint: Some(_)} + if (progress, pruned) == expected_result ); + segment .save_checkpoint( &provider, @@ -188,7 +202,7 @@ mod tests { }) .into_inner() .0 - .checked_sub(if result.done { 0 } else { 1 }); + .checked_sub(if result.progress.is_finished() { 0 } else { 1 }); assert_eq!( db.table::().unwrap().len(), @@ -208,8 +222,11 @@ mod tests { ); }; - test_prune(6, (false, 10)); - test_prune(6, (true, 2)); - test_prune(10, (true, 8)); + test_prune( + 6, + (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10), + ); + test_prune(6, (PruneProgress::Finished, 2)); + test_prune(10, (PruneProgress::Finished, 8)); } } diff --git a/crates/prune/src/segments/receipts_by_logs.rs b/crates/prune/src/segments/receipts_by_logs.rs index efcbfe761..c9bf4a799 100644 --- a/crates/prune/src/segments/receipts_by_logs.rs +++ b/crates/prune/src/segments/receipts_by_logs.rs @@ -4,7 +4,7 @@ use crate::{ }; use reth_db::{database::Database, tables}; use reth_primitives::{ - PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, + PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE, }; use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider}; @@ -113,8 +113,10 @@ impl Segment for ReceiptsByLogs { "Calculated block ranges and filtered addresses", ); - let mut limit = input.delete_limit; + let mut limiter = input.limiter; + let mut done = true; + let mut pruned = 0; let mut last_pruned_transaction = None; for (start_block, end_block, num_addresses) in block_ranges { let block_range = start_block..=end_block; @@ -138,7 +140,7 @@ impl Segment for ReceiptsByLogs { let deleted; (deleted, done) = provider.prune_table_with_range::( tx_range, - limit, + &mut limiter, |(tx_num, receipt)| { let skip = num_addresses > 0 && receipt.logs.iter().any(|log| { @@ -152,9 +154,10 @@ impl Segment for ReceiptsByLogs { }, |row| last_pruned_transaction = Some(row.0), )?; + trace!(target: "pruner", %deleted, %done, ?block_range, "Pruned receipts"); - limit = limit.saturating_sub(deleted); + pruned += deleted; // For accurate checkpoints we need to know that we have checked every transaction. // Example: we reached the end of the range, and the last receipt is supposed to skip @@ -172,7 +175,7 @@ impl Segment for ReceiptsByLogs { .saturating_sub(if done { 0 } else { 1 }), ); - if limit == 0 { + if limiter.is_limit_reached() { done &= end_block == to_block; break } @@ -203,7 +206,9 @@ impl Segment for ReceiptsByLogs { }, )?; - Ok(PruneOutput { done, pruned: input.delete_limit - limit, checkpoint: None }) + let progress = PruneProgress::new(done, &limiter); + + Ok(PruneOutput { progress, pruned, checkpoint: None }) } } @@ -216,13 +221,15 @@ mod tests { generators, generators::{random_block_range, random_eoa_account, random_log, random_receipt}, }; - use reth_primitives::{PruneMode, PruneSegment, ReceiptsLogPruneConfig, B256}; + use reth_primitives::{PruneLimiter, PruneMode, PruneSegment, ReceiptsLogPruneConfig, B256}; use reth_provider::{PruneCheckpointReader, TransactionsProvider}; use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::collections::BTreeMap; #[test] fn prune_receipts_by_logs() { + reth_tracing::init_test_tracing(); + let db = TestStageDB::default(); let mut rng = generators::rng(); @@ -268,6 +275,8 @@ mod tests { let receipts_log_filter = ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)])); + let limiter = PruneLimiter::default().set_deleted_entries_limit(10); + let result = ReceiptsByLogs::new(receipts_log_filter).prune( &provider, PruneInput { @@ -278,7 +287,7 @@ mod tests { .get_prune_checkpoint(PruneSegment::ContractLogs) .unwrap(), to_block: tip, - delete_limit: 10, + limiter, }, ); provider.commit().expect("commit"); @@ -304,7 +313,7 @@ mod tests { ((pruned_tx + 1) - unprunable) as usize ); - output.done + output.progress.is_finished() }; while !run_prune() {} diff --git a/crates/prune/src/segments/sender_recovery.rs b/crates/prune/src/segments/sender_recovery.rs index 0684fbd37..355d82f45 100644 --- a/crates/prune/src/segments/sender_recovery.rs +++ b/crates/prune/src/segments/sender_recovery.rs @@ -3,7 +3,7 @@ use crate::{ PrunerError, }; use reth_db::{database::Database, tables}; -use reth_primitives::{PruneMode, PruneSegment}; +use reth_primitives::{PruneMode, PruneProgress, PruneSegment}; use reth_provider::{DatabaseProviderRW, TransactionsProvider}; use tracing::{instrument, trace}; @@ -42,10 +42,12 @@ impl Segment for SenderRecovery { }; let tx_range_end = *tx_range.end(); + let mut limiter = input.limiter; + let mut last_pruned_transaction = tx_range_end; let (pruned, done) = provider.prune_table_with_range::( tx_range, - input.delete_limit, + &mut limiter, |_| false, |row| last_pruned_transaction = row.0, )?; @@ -58,8 +60,10 @@ impl Segment for SenderRecovery { // previous, so we could finish pruning its transaction senders on the next run. .checked_sub(if done { 0 } else { 1 }); + let progress = PruneProgress::new(done, &limiter); + Ok(PruneOutput { - done, + progress, pruned, checkpoint: Some(PruneOutputCheckpoint { block_number: last_pruned_block, @@ -79,7 +83,10 @@ mod tests { }; use reth_db::tables; use reth_interfaces::test_utils::{generators, generators::random_block_range}; - use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; + use reth_primitives::{ + BlockNumber, PruneCheckpoint, PruneLimiter, PruneMode, PruneProgress, PruneSegment, + TxNumber, B256, + }; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::ops::Sub; @@ -113,8 +120,10 @@ mod tests { db.table::().unwrap().len() ); - let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| { + let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| { let prune_mode = PruneMode::Before(to_block); + let segment = SenderRecovery::new(prune_mode); + let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10); let input = PruneInput { previous_checkpoint: db .factory @@ -123,9 +132,8 @@ mod tests { .get_prune_checkpoint(PruneSegment::SenderRecovery) .unwrap(), to_block, - delete_limit: 10, + limiter: limiter.clone(), }; - let segment = SenderRecovery::new(prune_mode); let next_tx_number_to_prune = db .factory @@ -142,7 +150,10 @@ mod tests { .take(to_block as usize) .map(|block| block.body.len()) .sum::() - .min(next_tx_number_to_prune as usize + input.delete_limit) + .min( + next_tx_number_to_prune as usize + + input.limiter.deleted_entries_limit().unwrap(), + ) .sub(1); let last_pruned_block_number = blocks @@ -161,11 +172,14 @@ mod tests { let provider = db.factory.provider_rw().unwrap(); let result = segment.prune(&provider, input).unwrap(); + limiter.increment_deleted_entries_count_by(result.pruned); + assert_matches!( result, - PruneOutput {done, pruned, checkpoint: Some(_)} - if (done, pruned) == expected_result + PruneOutput {progress, pruned, checkpoint: Some(_)} + if (progress, pruned) == expected_result ); + segment .save_checkpoint( &provider, @@ -174,8 +188,8 @@ mod tests { .unwrap(); provider.commit().expect("commit"); - let last_pruned_block_number = - last_pruned_block_number.checked_sub(if result.done { 0 } else { 1 }); + let last_pruned_block_number = last_pruned_block_number + .checked_sub(if result.progress.is_finished() { 0 } else { 1 }); assert_eq!( db.table::().unwrap().len(), @@ -195,8 +209,16 @@ mod tests { ); }; - test_prune(6, (false, 10)); - test_prune(6, (true, 2)); - test_prune(10, (true, 8)); + test_prune( + 6, + ( + PruneProgress::HasMoreData( + reth_primitives::PruneInterruptReason::DeletedEntriesLimitReached, + ), + 10, + ), + ); + test_prune(6, (PruneProgress::Finished, 2)); + test_prune(10, (PruneProgress::Finished, 8)); } } diff --git a/crates/prune/src/segments/storage_history.rs b/crates/prune/src/segments/storage_history.rs index eba8d1724..1ee8d20f4 100644 --- a/crates/prune/src/segments/storage_history.rs +++ b/crates/prune/src/segments/storage_history.rs @@ -9,10 +9,16 @@ use reth_db::{ models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress}, tables, }; -use reth_primitives::{PruneMode, PruneSegment}; +use reth_primitives::{PruneInterruptReason, PruneMode, PruneProgress, PruneSegment}; use reth_provider::DatabaseProviderRW; use tracing::{instrument, trace}; +/// Number of storage history tables to prune in one step +/// +/// Storage History consists of two tables: [tables::StorageChangeSets] and +/// [tables::StoragesHistory]. We want to prune them to the same block number. +const STORAGE_HISTORY_TABLES_TO_PRUNE: usize = 2; + #[derive(Debug)] pub struct StorageHistory { mode: PruneMode, @@ -48,11 +54,23 @@ impl Segment for StorageHistory { }; let range_end = *range.end(); + let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() { + input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE) + } else { + input.limiter + }; + if limiter.is_limit_reached() { + return Ok(PruneOutput::not_done( + PruneInterruptReason::new(&limiter), + input.previous_checkpoint.map(|checkpoint| checkpoint.into()), + )) + } + let mut last_changeset_pruned_block = None; let (pruned_changesets, done) = provider .prune_table_with_range::( BlockNumberAddress::range(range), - input.delete_limit / 2, + &mut limiter, |_| false, |row| last_changeset_pruned_block = Some(row.0.block_number()), )?; @@ -70,10 +88,12 @@ impl Segment for StorageHistory { |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, |key| StorageShardedKey::last(key.address, key.sharded_key.key), )?; - trace!(target: "pruner", %processed, deleted = %pruned_indices, %done, "Pruned storage history (history)" ); + trace!(target: "pruner", %processed, deleted = %pruned_indices, %done, "Pruned storage history (history)"); + + let progress = PruneProgress::new(done, &limiter); Ok(PruneOutput { - done, + progress, pruned: pruned_changesets + pruned_indices, checkpoint: Some(PruneOutputCheckpoint { block_number: Some(last_changeset_pruned_block), @@ -85,14 +105,19 @@ impl Segment for StorageHistory { #[cfg(test)] mod tests { - use crate::segments::{PruneInput, PruneOutput, Segment, StorageHistory}; + use crate::segments::{ + storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneOutput, Segment, + StorageHistory, + }; use assert_matches::assert_matches; use reth_db::{tables, BlockNumberList}; use reth_interfaces::test_utils::{ generators, generators::{random_block_range, random_changeset_range, random_eoa_accounts}, }; - use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256}; + use reth_primitives::{ + BlockNumber, PruneCheckpoint, PruneLimiter, PruneMode, PruneProgress, PruneSegment, B256, + }; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::{collections::BTreeMap, ops::AddAssign}; @@ -133,8 +158,13 @@ mod tests { let original_shards = db.table::().unwrap(); - let test_prune = |to_block: BlockNumber, run: usize, expected_result: (bool, usize)| { + let test_prune = |to_block: BlockNumber, + run: usize, + expected_result: (PruneProgress, usize)| { let prune_mode = PruneMode::Before(to_block); + let deleted_entries_limit = 1000; + let mut limiter = + PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit); let input = PruneInput { previous_checkpoint: db .factory @@ -143,17 +173,20 @@ mod tests { .get_prune_checkpoint(PruneSegment::StorageHistory) .unwrap(), to_block, - delete_limit: 1000, + limiter: limiter.clone(), }; let segment = StorageHistory::new(prune_mode); let provider = db.factory.provider_rw().unwrap(); let result = segment.prune(&provider, input).unwrap(); + limiter.increment_deleted_entries_count_by(result.pruned); + assert_matches!( result, - PruneOutput {done, pruned, checkpoint: Some(_)} - if (done, pruned) == expected_result + PruneOutput {progress, pruned, checkpoint: Some(_)} + if (progress, pruned) == expected_result ); + segment .save_checkpoint( &provider, @@ -177,7 +210,8 @@ mod tests { .iter() .enumerate() .skip_while(|(i, (block_number, _, _))| { - *i < input.delete_limit / 2 * run && *block_number <= to_block as usize + *i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run && + *block_number <= to_block as usize }) .next() .map(|(i, _)| i) @@ -191,7 +225,7 @@ mod tests { let last_pruned_block_number = pruned_changesets .next() - .map(|(block_number, _, _)| if result.done { + .map(|(block_number, _, _)| if result.progress.is_finished() { *block_number } else { block_number.saturating_sub(1) @@ -241,8 +275,17 @@ mod tests { ); }; - test_prune(998, 1, (false, 500)); - test_prune(998, 2, (true, 499)); - test_prune(1200, 3, (true, 202)); + test_prune( + 998, + 1, + ( + PruneProgress::HasMoreData( + reth_primitives::PruneInterruptReason::DeletedEntriesLimitReached, + ), + 500, + ), + ); + test_prune(998, 2, (PruneProgress::Finished, 499)); + test_prune(1200, 3, (PruneProgress::Finished, 202)); } } diff --git a/crates/prune/src/segments/transaction_lookup.rs b/crates/prune/src/segments/transaction_lookup.rs index f0b24ef0a..3203552ba 100644 --- a/crates/prune/src/segments/transaction_lookup.rs +++ b/crates/prune/src/segments/transaction_lookup.rs @@ -4,7 +4,7 @@ use crate::{ }; use rayon::prelude::*; use reth_db::{database::Database, tables}; -use reth_primitives::{PruneMode, PruneSegment}; +use reth_primitives::{PruneMode, PruneProgress, PruneSegment}; use reth_provider::{DatabaseProviderRW, TransactionsProvider}; use tracing::{instrument, trace}; @@ -42,7 +42,10 @@ impl Segment for TransactionLookup { } } .into_inner(); - let tx_range = start..=end.min(start + input.delete_limit as u64 - 1); + let tx_range = start..= + Some(end) + .min(input.limiter.deleted_entries_limit_left().map(|left| start + left as u64 - 1)) + .unwrap(); let tx_range_end = *tx_range.end(); // Retrieve transactions in the range and calculate their hashes in parallel @@ -60,15 +63,18 @@ impl Segment for TransactionLookup { )) } + let mut limiter = input.limiter; + let mut last_pruned_transaction = None; - let (pruned, _) = provider.prune_table_with_iterator::( + let (pruned, done) = provider.prune_table_with_iterator::( hashes, - input.delete_limit, + &mut limiter, |row| { last_pruned_transaction = Some(last_pruned_transaction.unwrap_or(row.1).max(row.1)) }, )?; - let done = tx_range_end == end; + + let done = done && tx_range_end == end; trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup"); let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end); @@ -81,8 +87,10 @@ impl Segment for TransactionLookup { // run. .checked_sub(if done { 0 } else { 1 }); + let progress = PruneProgress::new(done, &limiter); + Ok(PruneOutput { - done, + progress, pruned, checkpoint: Some(PruneOutputCheckpoint { block_number: last_pruned_block, @@ -102,7 +110,10 @@ mod tests { }; use reth_db::tables; use reth_interfaces::test_utils::{generators, generators::random_block_range}; - use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; + use reth_primitives::{ + BlockNumber, PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, + PruneSegment, TxNumber, B256, + }; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::ops::Sub; @@ -132,8 +143,10 @@ mod tests { db.table::().unwrap().len() ); - let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| { + let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| { let prune_mode = PruneMode::Before(to_block); + let segment = TransactionLookup::new(prune_mode); + let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10); let input = PruneInput { previous_checkpoint: db .factory @@ -142,9 +155,8 @@ mod tests { .get_prune_checkpoint(PruneSegment::TransactionLookup) .unwrap(), to_block, - delete_limit: 10, + limiter: limiter.clone(), }; - let segment = TransactionLookup::new(prune_mode); let next_tx_number_to_prune = db .factory @@ -161,7 +173,10 @@ mod tests { .take(to_block as usize) .map(|block| block.body.len()) .sum::() - .min(next_tx_number_to_prune as usize + input.delete_limit) + .min( + next_tx_number_to_prune as usize + + input.limiter.deleted_entries_limit().unwrap(), + ) .sub(1); let last_pruned_block_number = blocks @@ -180,11 +195,14 @@ mod tests { let provider = db.factory.provider_rw().unwrap(); let result = segment.prune(&provider, input).unwrap(); + limiter.increment_deleted_entries_count_by(result.pruned); + assert_matches!( result, - PruneOutput {done, pruned, checkpoint: Some(_)} - if (done, pruned) == expected_result + PruneOutput {progress, pruned, checkpoint: Some(_)} + if (progress, pruned) == expected_result ); + segment .save_checkpoint( &provider, @@ -193,8 +211,8 @@ mod tests { .unwrap(); provider.commit().expect("commit"); - let last_pruned_block_number = - last_pruned_block_number.checked_sub(if result.done { 0 } else { 1 }); + let last_pruned_block_number = last_pruned_block_number + .checked_sub(if result.progress.is_finished() { 0 } else { 1 }); assert_eq!( db.table::().unwrap().len(), @@ -214,8 +232,11 @@ mod tests { ); }; - test_prune(6, (false, 10)); - test_prune(6, (true, 2)); - test_prune(10, (true, 8)); + test_prune( + 6, + (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10), + ); + test_prune(6, (PruneProgress::Finished, 2)); + test_prune(10, (PruneProgress::Finished, 8)); } } diff --git a/crates/prune/src/segments/transactions.rs b/crates/prune/src/segments/transactions.rs index 3c2ac4255..ed53c0306 100644 --- a/crates/prune/src/segments/transactions.rs +++ b/crates/prune/src/segments/transactions.rs @@ -3,7 +3,7 @@ use crate::{ PrunerError, }; use reth_db::{database::Database, tables}; -use reth_primitives::{PruneMode, PruneSegment}; +use reth_primitives::{PruneMode, PruneProgress, PruneSegment}; use reth_provider::{DatabaseProviderRW, TransactionsProvider}; use tracing::{instrument, trace}; @@ -41,10 +41,12 @@ impl Segment for Transactions { } }; + let mut limiter = input.limiter; + let mut last_pruned_transaction = *tx_range.end(); let (pruned, done) = provider.prune_table_with_range::( tx_range, - input.delete_limit, + &mut limiter, |_| false, |row| last_pruned_transaction = row.0, )?; @@ -57,8 +59,10 @@ impl Segment for Transactions { // so we could finish pruning its transactions on the next run. .checked_sub(if done { 0 } else { 1 }); + let progress = PruneProgress::new(done, &limiter); + Ok(PruneOutput { - done, + progress, pruned, checkpoint: Some(PruneOutputCheckpoint { block_number: last_pruned_block, @@ -78,7 +82,10 @@ mod tests { }; use reth_db::tables; use reth_interfaces::test_utils::{generators, generators::random_block_range}; - use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; + use reth_primitives::{ + BlockNumber, PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, + PruneSegment, TxNumber, B256, + }; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::ops::Sub; @@ -95,8 +102,10 @@ mod tests { assert_eq!(db.table::().unwrap().len(), transactions.len()); - let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| { + let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| { let prune_mode = PruneMode::Before(to_block); + let segment = Transactions::new(prune_mode); + let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10); let input = PruneInput { previous_checkpoint: db .factory @@ -105,9 +114,8 @@ mod tests { .get_prune_checkpoint(PruneSegment::Transactions) .unwrap(), to_block, - delete_limit: 10, + limiter: limiter.clone(), }; - let segment = Transactions::new(prune_mode); let next_tx_number_to_prune = db .factory @@ -120,12 +128,15 @@ mod tests { .unwrap_or_default(); let provider = db.factory.provider_rw().unwrap(); - let result = segment.prune(&provider, input).unwrap(); + let result = segment.prune(&provider, input.clone()).unwrap(); + limiter.increment_deleted_entries_count_by(result.pruned); + assert_matches!( result, - PruneOutput {done, pruned, checkpoint: Some(_)} - if (done, pruned) == expected_result + PruneOutput {progress, pruned, checkpoint: Some(_)} + if (progress, pruned) == expected_result ); + segment .save_checkpoint( &provider, @@ -139,7 +150,10 @@ mod tests { .take(to_block as usize) .map(|block| block.body.len()) .sum::() - .min(next_tx_number_to_prune as usize + input.delete_limit) + .min( + next_tx_number_to_prune as usize + + input.limiter.deleted_entries_limit().unwrap(), + ) .sub(1); let last_pruned_block_number = blocks @@ -155,7 +169,7 @@ mod tests { }) .into_inner() .0 - .checked_sub(if result.done { 0 } else { 1 }); + .checked_sub(if result.progress.is_finished() { 0 } else { 1 }); assert_eq!( db.table::().unwrap().len(), @@ -175,7 +189,10 @@ mod tests { ); }; - test_prune(6, (false, 10)); - test_prune(6, (true, 2)); + test_prune( + 6, + (PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10), + ); + test_prune(6, (PruneProgress::Finished, 2)); } } diff --git a/crates/rpc/rpc/src/debug.rs b/crates/rpc/rpc/src/debug.rs index 15f0d8a1d..3d2466b4e 100644 --- a/crates/rpc/rpc/src/debug.rs +++ b/crates/rpc/rpc/src/debug.rs @@ -81,7 +81,7 @@ where ) -> EthResult> { if transactions.is_empty() { // nothing to trace - return Ok(Vec::new()); + return Ok(Vec::new()) } // replay all transactions of the block diff --git a/crates/rpc/rpc/src/eth/api/transactions.rs b/crates/rpc/rpc/src/eth/api/transactions.rs index 67e804086..6f1981de0 100644 --- a/crates/rpc/rpc/src/eth/api/transactions.rs +++ b/crates/rpc/rpc/src/eth/api/transactions.rs @@ -1097,7 +1097,7 @@ where if block.body.is_empty() { // nothing to trace - return Ok(Some(Vec::new())); + return Ok(Some(Vec::new())) } // replay all transactions of the block diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index df51b19b8..9ac9785e0 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -15,7 +15,7 @@ use crate::{ use itertools::{izip, Itertools}; use reth_db::{ common::KeyValue, - cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, + cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, RangeWalker}, database::Database, models::{ sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress, @@ -38,10 +38,10 @@ use reth_primitives::{ stage::{StageCheckpoint, StageId}, trie::Nibbles, Account, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders, - ChainInfo, ChainSpec, GotExpected, Head, Header, PruneCheckpoint, PruneModes, PruneSegment, - Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, StorageEntry, - TransactionMeta, TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash, - TxHash, TxNumber, Withdrawal, Withdrawals, B256, U256, + ChainInfo, ChainSpec, GotExpected, Head, Header, PruneCheckpoint, PruneLimiter, PruneModes, + PruneSegment, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, + StorageEntry, TransactionMeta, TransactionSigned, TransactionSignedEcRecovered, + TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, Withdrawals, B256, U256, }; use reth_trie::{ prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets}, @@ -850,30 +850,38 @@ impl DatabaseProvider { pub fn prune_table_with_iterator( &self, keys: impl IntoIterator, - limit: usize, + limiter: &mut PruneLimiter, mut delete_callback: impl FnMut(TableRow), ) -> Result<(usize, bool), DatabaseError> { let mut cursor = self.tx.cursor_write::()?; - let mut deleted = 0; - let mut keys = keys.into_iter(); - if limit != 0 { - for key in &mut keys { - let row = cursor.seek_exact(key.clone())?; - if let Some(row) = row { - cursor.delete_current()?; - deleted += 1; - delete_callback(row); - } + let mut deleted_entries = 0; - if deleted == limit { - break - } + for key in &mut keys { + if limiter.is_limit_reached() { + debug!( + target: "providers::db", + ?limiter, + deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(), + time_limit = %limiter.is_time_limit_reached(), + table = %T::NAME, + "Pruning limit reached" + ); + break + } + + let row = cursor.seek_exact(key)?; + if let Some(row) = row { + cursor.delete_current()?; + limiter.increment_deleted_entries_count(); + deleted_entries += 1; + delete_callback(row); } } - Ok((deleted, keys.next().is_none())) + let done = keys.next().is_none(); + Ok((deleted_entries, done)) } /// Prune the table for the specified key range. @@ -882,29 +890,72 @@ impl DatabaseProvider { pub fn prune_table_with_range( &self, keys: impl RangeBounds + Clone + Debug, - limit: usize, + limiter: &mut PruneLimiter, mut skip_filter: impl FnMut(&TableRow) -> bool, mut delete_callback: impl FnMut(TableRow), ) -> Result<(usize, bool), DatabaseError> { let mut cursor = self.tx.cursor_write::()?; let mut walker = cursor.walk_range(keys)?; - let mut deleted = 0; - if limit != 0 { - while let Some(row) = walker.next().transpose()? { - if !skip_filter(&row) { - walker.delete_current()?; - deleted += 1; - delete_callback(row); - } + let mut deleted_entries = 0; - if deleted == limit { - break - } + let done = loop { + // check for time out must be done in this scope since it's not done in + // `prune_table_with_range_step` + if limiter.is_limit_reached() { + debug!( + target: "providers::db", + ?limiter, + deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(), + time_limit = %limiter.is_time_limit_reached(), + table = %T::NAME, + "Pruning limit reached" + ); + break false } + + let done = self.prune_table_with_range_step( + &mut walker, + limiter, + &mut skip_filter, + &mut delete_callback, + )?; + + if done { + break true + } else { + deleted_entries += 1; + } + }; + + Ok((deleted_entries, done)) + } + + /// Steps once with the given walker and prunes the entry in the table. + /// + /// Returns `true` if the walker is finished, `false` if it may have more data to prune. + /// + /// CAUTION: Pruner limits are not checked. This allows for a clean exit of a prune run that's + /// pruning different tables concurrently, by letting them step to the same height before + /// timing out. + pub fn prune_table_with_range_step( + &self, + walker: &mut RangeWalker<'_, T, ::CursorMut>, + limiter: &mut PruneLimiter, + skip_filter: &mut impl FnMut(&TableRow) -> bool, + delete_callback: &mut impl FnMut(TableRow), + ) -> Result { + let Some(res) = walker.next() else { return Ok(true) }; + + let row = res?; + + if !skip_filter(&row) { + walker.delete_current()?; + limiter.increment_deleted_entries_count(); + delete_callback(row); } - Ok((deleted, walker.next().transpose()?.is_none())) + Ok(false) } /// Load shard and remove it. If list is empty, last shard was full or