diff --git a/Cargo.lock b/Cargo.lock index 0da532e46..04b3833c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6047,7 +6047,6 @@ dependencies = [ "modular-bitfield", "num_enum 0.7.0", "once_cell", - "paste", "plain_hasher", "pprof", "proptest", diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index b5101a2db..e7f0cfae7 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -60,9 +60,11 @@ use reth_provider::{ providers::BlockchainProvider, BlockHashReader, BlockReader, CanonStateSubscriptions, HeaderProvider, ProviderFactory, StageCheckpointReader, }; +use reth_prune::{segments::SegmentSet, Pruner}; use reth_revm::Factory; use reth_revm_inspectors::stack::Hook; use reth_rpc_engine_api::EngineApi; +use reth_snapshot::HighestSnapshotsTracker; use reth_stages::{ prelude::*, stages::{ @@ -455,17 +457,12 @@ impl NodeCommand { let mut hooks = EngineHooks::new(); let pruner_events = if let Some(prune_config) = prune_config { - info!(target: "reth::cli", ?prune_config, "Pruner initialized"); - let mut pruner = reth_prune::Pruner::new( - db.clone(), - self.chain.clone(), - prune_config.block_interval, - prune_config.segments, - self.chain.prune_delete_limit, - highest_snapshots_rx, - ); + let mut pruner = self.build_pruner(&prune_config, db.clone(), highest_snapshots_rx); + let events = pruner.events(); hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone()))); + + info!(target: "reth::cli", ?prune_config, "Pruner initialized"); Either::Left(events) } else { Either::Right(stream::empty()) @@ -878,15 +875,15 @@ impl NodeCommand { .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) .set(TransactionLookupStage::new( stage_config.transaction_lookup.commit_threshold, - prune_modes.clone(), + prune_modes.transaction_lookup, )) .set(IndexAccountHistoryStage::new( stage_config.index_account_history.commit_threshold, - prune_modes.clone(), + prune_modes.account_history, )) .set(IndexStorageHistoryStage::new( stage_config.index_storage_history.commit_threshold, - prune_modes, + prune_modes.storage_history, )), ) .build(db, self.chain.clone()); @@ -894,6 +891,45 @@ impl NodeCommand { Ok(pipeline) } + fn build_pruner( + &self, + config: &PruneConfig, + db: DB, + highest_snapshots_rx: HighestSnapshotsTracker, + ) -> Pruner { + let mut segments = SegmentSet::new(); + + if let Some(mode) = config.segments.receipts { + segments = segments.add_segment(reth_prune::segments::Receipts::new(mode)); + } + if !config.segments.receipts_log_filter.is_empty() { + segments = segments.add_segment(reth_prune::segments::ReceiptsByLogs::new( + config.segments.receipts_log_filter.clone(), + )); + } + if let Some(mode) = config.segments.transaction_lookup { + segments = segments.add_segment(reth_prune::segments::TransactionLookup::new(mode)); + } + if let Some(mode) = config.segments.sender_recovery { + segments = segments.add_segment(reth_prune::segments::SenderRecovery::new(mode)); + } + if let Some(mode) = config.segments.account_history { + segments = segments.add_segment(reth_prune::segments::AccountHistory::new(mode)); + } + if let Some(mode) = config.segments.storage_history { + segments = segments.add_segment(reth_prune::segments::StorageHistory::new(mode)); + } + + Pruner::new( + db, + self.chain.clone(), + segments.into_vec(), + config.block_interval, + self.chain.prune_delete_limit, + highest_snapshots_rx, + ) + } + /// Change rpc port numbers based on the instance number. fn adjust_instance_ports(&mut self) { // auth port is scaled by a factor of instance * 100 diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index 3c58938fb..d08c8835e 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -12,7 +12,7 @@ use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; use reth_db::init_db; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; -use reth_primitives::{ChainSpec, PruneModes}; +use reth_primitives::ChainSpec; use reth_provider::{ProviderFactory, StageCheckpointReader}; use reth_stages::{ stages::{ @@ -211,7 +211,7 @@ impl Command { ) } StageEnum::TxLookup => { - (Box::new(TransactionLookupStage::new(batch_size, PruneModes::none())), None) + (Box::new(TransactionLookupStage::new(batch_size, None)), None) } StageEnum::AccountHashing => { (Box::new(AccountHashingStage::new(1, batch_size)), None) diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index 2879d8676..e68272102 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -515,8 +515,8 @@ where let pruner = Pruner::new( db.clone(), self.base_config.chain_spec.clone(), + vec![], 5, - PruneModes::none(), self.base_config.chain_spec.prune_delete_limit, watch::channel(None).1, ); diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index 69bf1bfc8..5493cf5fc 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -48,7 +48,6 @@ derive_more = "0.99" url = "2.3" once_cell = "1.17.0" zstd = { version = "0.12", features = ["experimental"] } -paste = "1.0" rayon.workspace = true tempfile.workspace = true sha2 = "0.10.7" diff --git a/crates/primitives/src/prune/mod.rs b/crates/primitives/src/prune/mod.rs index f02fa64a6..761440072 100644 --- a/crates/primitives/src/prune/mod.rs +++ b/crates/primitives/src/prune/mod.rs @@ -53,7 +53,7 @@ impl ReceiptsLogPruneConfig { // Reminder, that we increment because the [`BlockNumber`] key of the new map should be // viewed as `PruneMode::Before(block)` let block = (pruned_block + 1).max( - mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PruneSegment::ContractLogs)? + mode.prune_target_block(tip, PruneSegment::ContractLogs)? .map(|(block, _)| block) .unwrap_or_default() + 1, @@ -75,11 +75,9 @@ impl ReceiptsLogPruneConfig { for (_, mode) in self.0.iter() { if let PruneMode::Distance(_) = mode { - if let Some((block, _)) = mode.prune_target_block( - tip, - MINIMUM_PRUNING_DISTANCE, - PruneSegment::ContractLogs, - )? { + if let Some((block, _)) = + mode.prune_target_block(tip, PruneSegment::ContractLogs)? + { lowest = Some(lowest.unwrap_or(u64::MAX).min(block)); } } diff --git a/crates/primitives/src/prune/mode.rs b/crates/primitives/src/prune/mode.rs index 42df3b0e0..2dd044734 100644 --- a/crates/primitives/src/prune/mode.rs +++ b/crates/primitives/src/prune/mode.rs @@ -20,17 +20,16 @@ impl PruneMode { pub fn prune_target_block( &self, tip: BlockNumber, - min_blocks: u64, segment: PruneSegment, ) -> Result, PruneSegmentError> { let result = match self { - PruneMode::Full if min_blocks == 0 => Some((tip, *self)), + PruneMode::Full if segment.min_blocks() == 0 => Some((tip, *self)), PruneMode::Distance(distance) if *distance > tip => None, // Nothing to prune yet - PruneMode::Distance(distance) if *distance >= min_blocks => { + PruneMode::Distance(distance) if *distance >= segment.min_blocks() => { Some((tip - distance, *self)) } PruneMode::Before(n) if *n > tip => None, // Nothing to prune yet - PruneMode::Before(n) if tip - n >= min_blocks => Some((n - 1, *self)), + PruneMode::Before(n) if tip - n >= segment.min_blocks() => Some((n - 1, *self)), _ => return Err(PruneSegmentError::Configuration(segment)), }; Ok(result) @@ -72,7 +71,6 @@ mod tests { #[test] fn test_prune_target_block() { let tip = 20000; - let min_blocks = MINIMUM_PRUNING_DISTANCE; let segment = PruneSegment::Receipts; let tests = vec![ @@ -80,7 +78,10 @@ mod tests { (PruneMode::Full, Err(PruneSegmentError::Configuration(segment))), // Nothing to prune (PruneMode::Distance(tip + 1), Ok(None)), - (PruneMode::Distance(min_blocks + 1), Ok(Some(tip - (min_blocks + 1)))), + ( + PruneMode::Distance(segment.min_blocks() + 1), + Ok(Some(tip - (segment.min_blocks() + 1))), + ), // Nothing to prune (PruneMode::Before(tip + 1), Ok(None)), ( @@ -96,7 +97,7 @@ mod tests { for (index, (mode, expected_result)) in tests.into_iter().enumerate() { assert_eq!( - mode.prune_target_block(tip, min_blocks, segment), + mode.prune_target_block(tip, segment), expected_result.map(|r| r.map(|b| (b, mode))), "Test {} failed", index + 1, @@ -105,7 +106,7 @@ mod tests { // Test for a scenario where there are no minimum blocks and Full can be used assert_eq!( - PruneMode::Full.prune_target_block(tip, 0, segment), + PruneMode::Full.prune_target_block(tip, PruneSegment::Transactions), Ok(Some((tip, PruneMode::Full))), ); } diff --git a/crates/primitives/src/prune/segment.rs b/crates/primitives/src/prune/segment.rs index c71c6bcf7..4f3e62027 100644 --- a/crates/primitives/src/prune/segment.rs +++ b/crates/primitives/src/prune/segment.rs @@ -1,3 +1,4 @@ +use crate::MINIMUM_PRUNING_DISTANCE; use derive_more::Display; use reth_codecs::{main_codec, Compact}; use thiserror::Error; @@ -24,6 +25,20 @@ pub enum PruneSegment { Transactions, } +impl PruneSegment { + /// Returns minimum number of blocks to left in the database for this segment. + pub fn min_blocks(&self) -> u64 { + match self { + Self::SenderRecovery | Self::TransactionLookup | Self::Headers | Self::Transactions => { + 0 + } + Self::Receipts | Self::ContractLogs | Self::AccountHistory | Self::StorageHistory => { + MINIMUM_PRUNING_DISTANCE + } + } + } +} + /// PruneSegment error type. #[derive(Debug, Error, PartialEq, Eq, Clone)] pub enum PruneSegmentError { diff --git a/crates/primitives/src/prune/target.rs b/crates/primitives/src/prune/target.rs index 917b51b34..2fd9c0f29 100644 --- a/crates/primitives/src/prune/target.rs +++ b/crates/primitives/src/prune/target.rs @@ -1,8 +1,6 @@ use crate::{ - prune::PruneSegmentError, serde_helper::deserialize_opt_prune_mode_with_min_blocks, - BlockNumber, PruneMode, PruneSegment, ReceiptsLogPruneConfig, + serde_helper::deserialize_opt_prune_mode_with_min_blocks, PruneMode, ReceiptsLogPruneConfig, }; -use paste::paste; use serde::{Deserialize, Serialize}; /// Minimum distance from the tip necessary for the node to work correctly: @@ -44,69 +42,26 @@ pub struct PruneModes { /// Receipts pruning configuration by retaining only those receipts that contain logs emitted /// by the specified addresses, discarding others. This setting is overridden by `receipts`. /// - /// The [`BlockNumber`] represents the starting block from which point onwards the receipts are - /// preserved. + /// The [BlockNumber](`crate::BlockNumber`) represents the starting block from which point + /// onwards the receipts are preserved. pub receipts_log_filter: ReceiptsLogPruneConfig, } -macro_rules! impl_prune_segments { - ($(($segment:ident, $variant:ident, $min_blocks:expr)),+) => { - $( - paste! { - #[doc = concat!( - "Check if ", - stringify!($variant), - " should be pruned at the target block according to the provided tip." - )] - pub fn [](&self, block: BlockNumber, tip: BlockNumber) -> bool { - if let Some(mode) = &self.$segment { - return mode.should_prune(block, tip) - } - false - } - } - )+ - - $( - paste! { - #[doc = concat!( - "Returns block up to which ", - stringify!($variant), - " pruning needs to be done, inclusive, according to the provided tip." - )] - pub fn [](&self, tip: BlockNumber) -> Result, PruneSegmentError> { - match self.$segment { - Some(mode) => mode.prune_target_block(tip, $min_blocks.unwrap_or_default(), PruneSegment::$variant), - None => Ok(None) - } - } - } - )+ - - /// Sets pruning to all targets. - pub fn all() -> Self { - Self { - $( - $segment: Some(PruneMode::Full), - )+ - receipts_log_filter: Default::default() - } - } - - }; -} - impl PruneModes { /// Sets pruning to no target. pub fn none() -> Self { PruneModes::default() } - impl_prune_segments!( - (sender_recovery, SenderRecovery, None), - (transaction_lookup, TransactionLookup, None), - (receipts, Receipts, Some(MINIMUM_PRUNING_DISTANCE)), - (account_history, AccountHistory, Some(MINIMUM_PRUNING_DISTANCE)), - (storage_history, StorageHistory, Some(MINIMUM_PRUNING_DISTANCE)) - ); + /// Sets pruning to all targets. + pub fn all() -> Self { + Self { + sender_recovery: Some(PruneMode::Full), + transaction_lookup: Some(PruneMode::Full), + receipts: Some(PruneMode::Full), + account_history: Some(PruneMode::Full), + storage_history: Some(PruneMode::Full), + receipts_log_filter: Default::default(), + } + } } diff --git a/crates/prune/src/lib.rs b/crates/prune/src/lib.rs index de37d25fc..e632bf8db 100644 --- a/crates/prune/src/lib.rs +++ b/crates/prune/src/lib.rs @@ -13,7 +13,7 @@ mod error; mod event; mod metrics; mod pruner; -mod segments; +pub mod segments; use crate::metrics::Metrics; pub use error::PrunerError; diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index e67b5bc5d..0025fd769 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -7,8 +7,7 @@ use crate::{ }; use reth_db::database::Database; use reth_primitives::{ - listener::EventListeners, BlockNumber, ChainSpec, PruneMode, PruneModes, PruneProgress, - PruneSegment, PruneSegmentError, + listener::EventListeners, BlockNumber, ChainSpec, PruneMode, PruneProgress, PruneSegment, }; use reth_provider::{ProviderFactory, PruneCheckpointReader}; use reth_snapshot::HighestSnapshotsTracker; @@ -22,32 +21,11 @@ pub type PrunerResult = Result; /// The pruner type itself with the result of [Pruner::run] pub type PrunerWithResult = (Pruner, PrunerResult); -type RunnableSegmentGetPruneTargetBlockResult = - Result, PruneSegmentError>; - -struct PrunableSegment( - Box>, - #[allow(clippy::type_complexity)] - Box RunnableSegmentGetPruneTargetBlockResult>, -); - -impl PrunableSegment { - fn new< - S: Segment + 'static, - F: Fn(&PruneModes, BlockNumber) -> RunnableSegmentGetPruneTargetBlockResult + 'static, - >( - segment: S, - f: F, - ) -> Self { - Self(Box::new(segment), Box::new(f)) - } -} - /// Pruning routine. Main pruning logic happens in [Pruner::run]. #[derive(Debug)] pub struct Pruner { - metrics: Metrics, provider_factory: ProviderFactory, + segments: Vec>>, /// Minimum pruning interval measured in blocks. All prune segments are checked and, if needed, /// pruned, when the chain advances by the specified number of blocks. min_block_interval: usize, @@ -55,12 +33,12 @@ pub struct Pruner { /// number is updated with the tip block number the pruner was called with. It's used in /// conjunction with `min_block_interval` to determine when the pruning needs to be initiated. previous_tip_block_number: Option, - modes: PruneModes, /// Maximum total entries to prune (delete from database) per block. delete_limit: usize, - listeners: EventListeners, #[allow(dead_code)] highest_snapshots_tracker: HighestSnapshotsTracker, + metrics: Metrics, + listeners: EventListeners, } impl Pruner { @@ -68,20 +46,20 @@ impl Pruner { pub fn new( db: DB, chain_spec: Arc, + segments: Vec>>, min_block_interval: usize, - modes: PruneModes, delete_limit: usize, highest_snapshots_tracker: HighestSnapshotsTracker, ) -> Self { Self { - metrics: Metrics::default(), provider_factory: ProviderFactory::new(db, chain_spec), + segments, min_block_interval, previous_tip_block_number: None, - modes, delete_limit, - listeners: Default::default(), highest_snapshots_tracker, + metrics: Metrics::default(), + listeners: Default::default(), } } @@ -119,37 +97,16 @@ impl Pruner { .map_or(1, |previous_tip_block_number| tip_block_number - previous_tip_block_number) as usize; - // TODO(alexey): this is cursed, refactor - let segments: [PrunableSegment; 5] = [ - PrunableSegment::new( - segments::Receipts::default(), - PruneModes::prune_target_block_receipts, - ), - PrunableSegment::new( - segments::TransactionLookup::default(), - PruneModes::prune_target_block_transaction_lookup, - ), - PrunableSegment::new( - segments::SenderRecovery::default(), - PruneModes::prune_target_block_sender_recovery, - ), - PrunableSegment::new( - segments::AccountHistory::default(), - PruneModes::prune_target_block_account_history, - ), - PrunableSegment::new( - segments::StorageHistory::default(), - PruneModes::prune_target_block_storage_history, - ), - ]; - - for PrunableSegment(segment, get_prune_target_block) in segments { + for segment in &self.segments { if delete_limit == 0 { break } - if let Some((to_block, prune_mode)) = - get_prune_target_block(&self.modes, tip_block_number)? + if let Some((to_block, prune_mode)) = segment + .mode() + .map(|mode| mode.prune_target_block(tip_block_number, segment.segment())) + .transpose()? + .flatten() { trace!( target: "pruner", @@ -183,30 +140,6 @@ impl Pruner { } } - // TODO(alexey): make it not a special case - if !self.modes.receipts_log_filter.is_empty() && delete_limit > 0 { - let segment_start = Instant::now(); - let output = segments::ReceiptsByLogs::default().prune( - &provider, - &self.modes.receipts_log_filter, - tip_block_number, - delete_limit, - )?; - self.metrics - .get_prune_segment_metrics(PruneSegment::ContractLogs) - .duration_seconds - .record(segment_start.elapsed()); - - done = done && output.done; - delete_limit = delete_limit.saturating_sub(output.pruned); - stats.insert( - PruneSegment::ContractLogs, - (PruneProgress::from_done(output.done), output.pruned), - ); - } else { - trace!(target: "pruner", segment = ?PruneSegment::ContractLogs, "No filter to prune"); - } - if let Some(snapshots) = highest_snapshots { if let (Some(to_block), true) = (snapshots.headers, delete_limit > 0) { let prune_mode = PruneMode::Before(to_block + 1); @@ -219,7 +152,7 @@ impl Pruner { ); let segment_start = Instant::now(); - let segment = segments::Headers::default(); + let segment = segments::Headers::new(prune_mode); let previous_checkpoint = provider.get_prune_checkpoint(PruneSegment::Headers)?; let output = segment .prune(&provider, PruneInput { previous_checkpoint, to_block, delete_limit })?; @@ -251,7 +184,7 @@ impl Pruner { ); let segment_start = Instant::now(); - let segment = segments::Transactions::default(); + let segment = segments::Transactions::new(prune_mode); let previous_checkpoint = provider.get_prune_checkpoint(PruneSegment::Headers)?; let output = segment .prune(&provider, PruneInput { previous_checkpoint, to_block, delete_limit })?; @@ -321,14 +254,13 @@ impl Pruner { mod tests { use crate::Pruner; use reth_db::test_utils::create_test_rw_db; - use reth_primitives::{PruneModes, MAINNET}; + use reth_primitives::MAINNET; use tokio::sync::watch; #[test] fn is_pruning_needed() { let db = create_test_rw_db(); - let mut pruner = - Pruner::new(db, MAINNET.clone(), 5, PruneModes::none(), 0, watch::channel(None).1); + let mut pruner = Pruner::new(db, MAINNET.clone(), vec![], 5, 0, watch::channel(None).1); // No last pruned block number was set before let first_block_number = 1; diff --git a/crates/prune/src/segments/account_history.rs b/crates/prune/src/segments/account_history.rs index ba8ffb59e..c1b5ae682 100644 --- a/crates/prune/src/segments/account_history.rs +++ b/crates/prune/src/segments/account_history.rs @@ -5,19 +5,30 @@ use crate::{ PrunerError, }; use reth_db::{database::Database, models::ShardedKey, tables}; -use reth_primitives::PruneSegment; +use reth_primitives::{PruneMode, PruneSegment}; use reth_provider::DatabaseProviderRW; use tracing::{instrument, trace}; -#[derive(Default)] -#[non_exhaustive] -pub(crate) struct AccountHistory; +#[derive(Debug)] +pub struct AccountHistory { + mode: PruneMode, +} + +impl AccountHistory { + pub fn new(mode: PruneMode) -> Self { + Self { mode } + } +} impl Segment for AccountHistory { fn segment(&self) -> PruneSegment { PruneSegment::AccountHistory } + fn mode(&self) -> Option { + Some(self.mode) + } + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] fn prune( &self, @@ -129,7 +140,7 @@ mod tests { to_block, delete_limit: 2000, }; - let segment = AccountHistory::default(); + let segment = AccountHistory::new(prune_mode); let provider = tx.inner_rw(); let result = segment.prune(&provider, input).unwrap(); diff --git a/crates/prune/src/segments/headers.rs b/crates/prune/src/segments/headers.rs index 3ede8ff58..12ba19416 100644 --- a/crates/prune/src/segments/headers.rs +++ b/crates/prune/src/segments/headers.rs @@ -5,20 +5,31 @@ use crate::{ use itertools::Itertools; use reth_db::{database::Database, table::Table, tables}; use reth_interfaces::RethResult; -use reth_primitives::{BlockNumber, PruneSegment}; +use reth_primitives::{BlockNumber, PruneMode, PruneSegment}; use reth_provider::DatabaseProviderRW; use std::ops::RangeInclusive; use tracing::{instrument, trace}; -#[derive(Default)] -#[non_exhaustive] -pub(crate) struct Headers; +#[derive(Debug)] +pub struct Headers { + mode: PruneMode, +} + +impl Headers { + pub fn new(mode: PruneMode) -> Self { + Self { mode } + } +} impl Segment for Headers { fn segment(&self) -> PruneSegment { PruneSegment::Headers } + fn mode(&self) -> Option { + Some(self.mode) + } + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] fn prune( &self, @@ -129,7 +140,7 @@ mod tests { to_block, delete_limit: 10, }; - let segment = Headers::default(); + let segment = Headers::new(prune_mode); let next_block_number_to_prune = tx .inner() @@ -193,7 +204,7 @@ mod tests { // Less than total number of tables for `Headers` segment delete_limit: 2, }; - let segment = Headers::default(); + let segment = Headers::new(PruneMode::Full); let provider = tx.inner_rw(); let result = segment.prune(&provider, input).unwrap(); diff --git a/crates/prune/src/segments/mod.rs b/crates/prune/src/segments/mod.rs index 06b4c6988..82a44f6b0 100644 --- a/crates/prune/src/segments/mod.rs +++ b/crates/prune/src/segments/mod.rs @@ -4,18 +4,21 @@ mod history; mod receipts; mod receipts_by_logs; mod sender_recovery; +mod set; mod storage_history; mod transaction_lookup; mod transactions; -pub(crate) use account_history::AccountHistory; -pub(crate) use headers::Headers; -pub(crate) use receipts::Receipts; -pub(crate) use receipts_by_logs::ReceiptsByLogs; -pub(crate) use sender_recovery::SenderRecovery; -pub(crate) use storage_history::StorageHistory; -pub(crate) use transaction_lookup::TransactionLookup; -pub(crate) use transactions::Transactions; +pub use account_history::AccountHistory; +pub use headers::Headers; +pub use receipts::Receipts; +pub use receipts_by_logs::ReceiptsByLogs; +pub use sender_recovery::SenderRecovery; +pub use set::SegmentSet; +use std::fmt::Debug; +pub use storage_history::StorageHistory; +pub use transaction_lookup::TransactionLookup; +pub use transactions::Transactions; use crate::PrunerError; use reth_db::database::Database; @@ -32,9 +35,13 @@ use tracing::error; /// 2. If [Segment::prune] returned a [Some] in `checkpoint` of [PruneOutput], call /// [Segment::save_checkpoint]. /// 3. Subtract `pruned` of [PruneOutput] from `delete_limit` of next [PruneInput]. -pub(crate) trait Segment { +pub trait Segment: Debug + Send + Sync { + /// Segment of data that's pruned. fn segment(&self) -> PruneSegment; + /// Prune mode with which the segment was initialized + fn mode(&self) -> Option; + /// Prune data for [Self::segment] using the provided input. fn prune( &self, @@ -54,7 +61,7 @@ pub(crate) trait Segment { /// Segment pruning input, see [Segment::prune]. #[derive(Debug, Clone, Copy)] -pub(crate) struct PruneInput { +pub struct PruneInput { pub(crate) previous_checkpoint: Option, /// Target block up to which the pruning needs to be done, inclusive. pub(crate) to_block: BlockNumber, @@ -129,7 +136,7 @@ impl PruneInput { /// Segment pruning output, see [Segment::prune]. #[derive(Debug, Clone, Copy, Eq, PartialEq)] -pub(crate) struct PruneOutput { +pub struct PruneOutput { /// `true` if pruning has been completed up to the target block, and `false` if there's more /// data to prune in further runs. pub(crate) done: bool, diff --git a/crates/prune/src/segments/receipts.rs b/crates/prune/src/segments/receipts.rs index 33f1824b1..b260cf78b 100644 --- a/crates/prune/src/segments/receipts.rs +++ b/crates/prune/src/segments/receipts.rs @@ -4,19 +4,30 @@ use crate::{ }; use reth_db::{database::Database, tables}; use reth_interfaces::RethResult; -use reth_primitives::{PruneCheckpoint, PruneSegment}; +use reth_primitives::{PruneCheckpoint, PruneMode, PruneSegment}; use reth_provider::{DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider}; use tracing::{instrument, trace}; -#[derive(Default)] -#[non_exhaustive] -pub(crate) struct Receipts; +#[derive(Debug)] +pub struct Receipts { + mode: PruneMode, +} + +impl Receipts { + pub fn new(mode: PruneMode) -> Self { + Self { mode } + } +} impl Segment for Receipts { fn segment(&self) -> PruneSegment { PruneSegment::Receipts } + fn mode(&self) -> Option { + Some(self.mode) + } + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] fn prune( &self, @@ -127,7 +138,7 @@ mod tests { to_block, delete_limit: 10, }; - let segment = Receipts::default(); + let segment = Receipts::new(prune_mode); let next_tx_number_to_prune = tx .inner() diff --git a/crates/prune/src/segments/receipts_by_logs.rs b/crates/prune/src/segments/receipts_by_logs.rs index 93c775c02..8f9faa4fd 100644 --- a/crates/prune/src/segments/receipts_by_logs.rs +++ b/crates/prune/src/segments/receipts_by_logs.rs @@ -1,46 +1,51 @@ -use crate::{segments::PruneOutput, PrunerError}; +use crate::{ + segments::{PruneInput, PruneOutput, Segment}, + PrunerError, +}; use reth_db::{database::Database, tables}; use reth_primitives::{ - BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, ReceiptsLogPruneConfig, - MINIMUM_PRUNING_DISTANCE, -}; -use reth_provider::{ - BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, - TransactionsProvider, + PruneCheckpoint, PruneMode, PruneSegment, ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE, }; +use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider}; use tracing::{instrument, trace}; -#[derive(Default)] -#[non_exhaustive] -pub(crate) struct ReceiptsByLogs; +#[derive(Debug)] +pub struct ReceiptsByLogs { + config: ReceiptsLogPruneConfig, +} impl ReceiptsByLogs { - /// Prune receipts up to the provided block, inclusive, by filtering logs. Works as in inclusion - /// list, and removes every receipt not belonging to it. Respects the batch size. - #[instrument(level = "trace", skip(self, provider), target = "pruner")] - pub(crate) fn prune( + pub fn new(config: ReceiptsLogPruneConfig) -> Self { + Self { config } + } +} + +impl Segment for ReceiptsByLogs { + fn segment(&self) -> PruneSegment { + PruneSegment::ContractLogs + } + + fn mode(&self) -> Option { + None + } + + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] + fn prune( &self, provider: &DatabaseProviderRW<'_, DB>, - receipts_log_filter: &ReceiptsLogPruneConfig, - tip_block_number: BlockNumber, - delete_limit: usize, + input: PruneInput, ) -> Result { // Contract log filtering removes every receipt possible except the ones in the list. So, // for the other receipts it's as if they had a `PruneMode::Distance()` of // `MINIMUM_PRUNING_DISTANCE`. let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE) - .prune_target_block( - tip_block_number, - MINIMUM_PRUNING_DISTANCE, - PruneSegment::ContractLogs, - )? + .prune_target_block(input.to_block, PruneSegment::ContractLogs)? .map(|(bn, _)| bn) .unwrap_or_default(); // Get status checkpoint from latest run - let mut last_pruned_block = provider - .get_prune_checkpoint(PruneSegment::ContractLogs)? - .and_then(|checkpoint| checkpoint.block_number); + let mut last_pruned_block = + input.previous_checkpoint.and_then(|checkpoint| checkpoint.block_number); let initial_last_pruned_block = last_pruned_block; @@ -54,8 +59,7 @@ impl ReceiptsByLogs { // Figure out what receipts have already been pruned, so we can have an accurate // `address_filter` - let address_filter = - receipts_log_filter.group_by_block(tip_block_number, last_pruned_block)?; + let address_filter = self.config.group_by_block(input.to_block, last_pruned_block)?; // Splits all transactions in different block ranges. Each block range will have its own // filter address list and will check it while going through the table @@ -108,7 +112,7 @@ impl ReceiptsByLogs { "Calculated block ranges and filtered addresses", ); - let mut limit = delete_limit; + let mut limit = input.delete_limit; let mut done = true; let mut last_pruned_transaction = None; for (start_block, end_block, num_addresses) in block_ranges { @@ -183,8 +187,9 @@ impl ReceiptsByLogs { // // Only applies if we were able to prune everything intended for this run, otherwise the // checkpoint is the `last_pruned_block`. - let prune_mode_block = receipts_log_filter - .lowest_block_with_distance(tip_block_number, initial_last_pruned_block)? + let prune_mode_block = self + .config + .lowest_block_with_distance(input.to_block, initial_last_pruned_block)? .unwrap_or(to_block); provider.save_prune_checkpoint( @@ -196,13 +201,13 @@ impl ReceiptsByLogs { }, )?; - Ok(PruneOutput { done, pruned: delete_limit - limit, checkpoint: None }) + Ok(PruneOutput { done, pruned: input.delete_limit - limit, checkpoint: None }) } } #[cfg(test)] mod tests { - use crate::segments::receipts_by_logs::ReceiptsByLogs; + use crate::segments::{receipts_by_logs::ReceiptsByLogs, PruneInput, Segment}; use assert_matches::assert_matches; use reth_db::{cursor::DbCursorRO, tables, transaction::DbTx}; use reth_interfaces::test_utils::{ @@ -261,7 +266,17 @@ mod tests { let receipts_log_filter = ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)])); - let result = ReceiptsByLogs::default().prune(&provider, &receipts_log_filter, tip, 10); + let result = ReceiptsByLogs::new(receipts_log_filter).prune( + &provider, + PruneInput { + previous_checkpoint: tx + .inner() + .get_prune_checkpoint(PruneSegment::ContractLogs) + .unwrap(), + to_block: tip, + delete_limit: 10, + }, + ); provider.commit().expect("commit"); assert_matches!(result, Ok(_)); diff --git a/crates/prune/src/segments/sender_recovery.rs b/crates/prune/src/segments/sender_recovery.rs index 830f9fd84..aa8d48a2e 100644 --- a/crates/prune/src/segments/sender_recovery.rs +++ b/crates/prune/src/segments/sender_recovery.rs @@ -3,19 +3,30 @@ use crate::{ PrunerError, }; use reth_db::{database::Database, tables}; -use reth_primitives::PruneSegment; +use reth_primitives::{PruneMode, PruneSegment}; use reth_provider::{DatabaseProviderRW, TransactionsProvider}; use tracing::{instrument, trace}; -#[derive(Default)] -#[non_exhaustive] -pub(crate) struct SenderRecovery; +#[derive(Debug)] +pub struct SenderRecovery { + mode: PruneMode, +} + +impl SenderRecovery { + pub fn new(mode: PruneMode) -> Self { + Self { mode } + } +} impl Segment for SenderRecovery { fn segment(&self) -> PruneSegment { PruneSegment::SenderRecovery } + fn mode(&self) -> Option { + Some(self.mode) + } + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] fn prune( &self, @@ -112,7 +123,7 @@ mod tests { to_block, delete_limit: 10, }; - let segment = SenderRecovery::default(); + let segment = SenderRecovery::new(prune_mode); let next_tx_number_to_prune = tx .inner() diff --git a/crates/prune/src/segments/set.rs b/crates/prune/src/segments/set.rs new file mode 100644 index 000000000..92402825e --- /dev/null +++ b/crates/prune/src/segments/set.rs @@ -0,0 +1,33 @@ +use crate::segments::Segment; +use reth_db::database::Database; +use std::sync::Arc; + +/// Collection of [Segment]. Thread-safe, allocated on the heap. +#[derive(Debug)] +pub struct SegmentSet { + inner: Vec>>, +} + +impl SegmentSet { + /// Returns empty [SegmentSet] collection. + pub fn new() -> Self { + Self::default() + } + + /// Adds new [Segment] to collection. + pub fn add_segment + 'static>(mut self, segment: S) -> Self { + self.inner.push(Arc::new(segment)); + self + } + + /// Consumes [SegmentSet] and returns a [Vec]. + pub fn into_vec(self) -> Vec>> { + self.inner + } +} + +impl Default for SegmentSet { + fn default() -> Self { + Self { inner: Vec::new() } + } +} diff --git a/crates/prune/src/segments/storage_history.rs b/crates/prune/src/segments/storage_history.rs index d836da69f..aa68eb714 100644 --- a/crates/prune/src/segments/storage_history.rs +++ b/crates/prune/src/segments/storage_history.rs @@ -9,19 +9,30 @@ use reth_db::{ models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress}, tables, }; -use reth_primitives::PruneSegment; +use reth_primitives::{PruneMode, PruneSegment}; use reth_provider::DatabaseProviderRW; use tracing::{instrument, trace}; -#[derive(Default)] -#[non_exhaustive] -pub(crate) struct StorageHistory; +#[derive(Debug)] +pub struct StorageHistory { + mode: PruneMode, +} + +impl StorageHistory { + pub fn new(mode: PruneMode) -> Self { + Self { mode } + } +} impl Segment for StorageHistory { fn segment(&self) -> PruneSegment { PruneSegment::StorageHistory } + fn mode(&self) -> Option { + Some(self.mode) + } + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] fn prune( &self, @@ -133,7 +144,7 @@ mod tests { to_block, delete_limit: 2000, }; - let segment = StorageHistory::default(); + let segment = StorageHistory::new(prune_mode); let provider = tx.inner_rw(); let result = segment.prune(&provider, input).unwrap(); diff --git a/crates/prune/src/segments/transaction_lookup.rs b/crates/prune/src/segments/transaction_lookup.rs index 4a9ca5982..6785a22fc 100644 --- a/crates/prune/src/segments/transaction_lookup.rs +++ b/crates/prune/src/segments/transaction_lookup.rs @@ -4,19 +4,30 @@ use crate::{ }; use rayon::prelude::*; use reth_db::{database::Database, tables}; -use reth_primitives::PruneSegment; +use reth_primitives::{PruneMode, PruneSegment}; use reth_provider::{DatabaseProviderRW, TransactionsProvider}; use tracing::{instrument, trace}; -#[derive(Default)] -#[non_exhaustive] -pub(crate) struct TransactionLookup; +#[derive(Debug)] +pub struct TransactionLookup { + mode: PruneMode, +} + +impl TransactionLookup { + pub fn new(mode: PruneMode) -> Self { + Self { mode } + } +} impl Segment for TransactionLookup { fn segment(&self) -> PruneSegment { PruneSegment::TransactionLookup } + fn mode(&self) -> Option { + Some(self.mode) + } + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] fn prune( &self, @@ -131,7 +142,7 @@ mod tests { to_block, delete_limit: 10, }; - let segment = TransactionLookup::default(); + let segment = TransactionLookup::new(prune_mode); let next_tx_number_to_prune = tx .inner() diff --git a/crates/prune/src/segments/transactions.rs b/crates/prune/src/segments/transactions.rs index 2dcfc28b2..d06e97b65 100644 --- a/crates/prune/src/segments/transactions.rs +++ b/crates/prune/src/segments/transactions.rs @@ -3,19 +3,30 @@ use crate::{ PrunerError, }; use reth_db::{database::Database, tables}; -use reth_primitives::PruneSegment; +use reth_primitives::{PruneMode, PruneSegment}; use reth_provider::{DatabaseProviderRW, TransactionsProvider}; use tracing::{instrument, trace}; -#[derive(Default)] -#[non_exhaustive] -pub(crate) struct Transactions; +#[derive(Debug)] +pub struct Transactions { + mode: PruneMode, +} + +impl Transactions { + pub fn new(mode: PruneMode) -> Self { + Self { mode } + } +} impl Segment for Transactions { fn segment(&self) -> PruneSegment { PruneSegment::Transactions } + fn mode(&self) -> Option { + Some(self.mode) + } + #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] fn prune( &self, @@ -94,7 +105,7 @@ mod tests { to_block, delete_limit: 10, }; - let segment = Transactions::default(); + let segment = Transactions::new(prune_mode); let next_tx_number_to_prune = tx .inner() diff --git a/crates/revm/src/processor.rs b/crates/revm/src/processor.rs index 373acdd2d..157c8154b 100644 --- a/crates/revm/src/processor.rs +++ b/crates/revm/src/processor.rs @@ -363,8 +363,14 @@ impl<'a> EVMProcessor<'a> { let time = Instant::now(); let retention = if self.tip.map_or(true, |tip| { - !self.prune_modes.should_prune_account_history(block.number, tip) && - !self.prune_modes.should_prune_storage_history(block.number, tip) + !self + .prune_modes + .account_history + .map_or(false, |mode| mode.should_prune(block.number, tip)) && + !self + .prune_modes + .storage_history + .map_or(false, |mode| mode.should_prune(block.number, tip)) }) { BundleRetention::Reverts } else { @@ -405,7 +411,7 @@ impl<'a> EVMProcessor<'a> { // Block receipts should not be retained if self.prune_modes.receipts == Some(PruneMode::Full) || // [`PruneSegment::Receipts`] takes priority over [`PruneSegment::ContractLogs`] - self.prune_modes.should_prune_receipts(block_number, tip) + self.prune_modes.receipts.map_or(false, |mode| mode.should_prune(block_number, tip)) { receipts.clear(); return Ok(()) diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index aae96c772..449256106 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -5,7 +5,7 @@ use criterion::{ use pprof::criterion::{Output, PProfProfiler}; use reth_db::DatabaseEnv; use reth_interfaces::test_utils::TestConsensus; -use reth_primitives::{stage::StageCheckpoint, PruneModes, MAINNET}; +use reth_primitives::{stage::StageCheckpoint, MAINNET}; use reth_provider::ProviderFactory; use reth_stages::{ stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, @@ -62,7 +62,7 @@ fn transaction_lookup(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); // don't need to run each stage for that many times group.sample_size(10); - let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, PruneModes::none()); + let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, None); measure_stage( &mut group, diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 2b47bfe0a..0945538c3 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -2,7 +2,7 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput} use reth_db::database::Database; use reth_primitives::{ stage::{StageCheckpoint, StageId}, - PruneCheckpoint, PruneModes, PruneSegment, + PruneCheckpoint, PruneMode, PruneSegment, }; use reth_provider::{ AccountExtReader, DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, @@ -19,19 +19,19 @@ pub struct IndexAccountHistoryStage { /// flow will be returned to the pipeline for commit. pub commit_threshold: u64, /// Pruning configuration. - pub prune_modes: PruneModes, + pub prune_mode: Option, } impl IndexAccountHistoryStage { /// Create new instance of [IndexAccountHistoryStage]. - pub fn new(commit_threshold: u64, prune_modes: PruneModes) -> Self { - Self { commit_threshold, prune_modes } + pub fn new(commit_threshold: u64, prune_mode: Option) -> Self { + Self { commit_threshold, prune_mode } } } impl Default for IndexAccountHistoryStage { fn default() -> Self { - Self { commit_threshold: 100_000, prune_modes: PruneModes::none() } + Self { commit_threshold: 100_000, prune_mode: None } } } @@ -48,8 +48,11 @@ impl Stage for IndexAccountHistoryStage { provider: &DatabaseProviderRW<'_, &DB>, mut input: ExecInput, ) -> Result { - if let Some((target_prunable_block, prune_mode)) = - self.prune_modes.prune_target_block_account_history(input.target())? + if let Some((target_prunable_block, prune_mode)) = self + .prune_mode + .map(|mode| mode.prune_target_block(input.target(), PruneSegment::AccountHistory)) + .transpose()? + .flatten() { if target_prunable_block > input.checkpoint().block_number { input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); @@ -396,7 +399,7 @@ mod tests { } #[tokio::test] - async fn insert_index_with_prune_modes() { + async fn insert_index_with_prune_mode() { // init let tx = TestTransaction::default(); @@ -426,10 +429,7 @@ mod tests { // run let input = ExecInput { target: Some(20000), ..Default::default() }; let mut stage = IndexAccountHistoryStage { - prune_modes: PruneModes { - account_history: Some(PruneMode::Before(36)), - ..Default::default() - }, + prune_mode: Some(PruneMode::Before(36)), ..Default::default() }; let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); @@ -455,16 +455,12 @@ mod tests { struct IndexAccountHistoryTestRunner { pub(crate) tx: TestTransaction, commit_threshold: u64, - prune_modes: PruneModes, + prune_mode: Option, } impl Default for IndexAccountHistoryTestRunner { fn default() -> Self { - Self { - tx: TestTransaction::default(), - commit_threshold: 1000, - prune_modes: PruneModes::none(), - } + Self { tx: TestTransaction::default(), commit_threshold: 1000, prune_mode: None } } } @@ -476,10 +472,7 @@ mod tests { } fn stage(&self) -> Self::S { - Self::S { - commit_threshold: self.commit_threshold, - prune_modes: self.prune_modes.clone(), - } + Self::S { commit_threshold: self.commit_threshold, prune_mode: self.prune_mode } } } diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 99eeb3e65..b1e27aed1 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -2,7 +2,7 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput} use reth_db::{database::Database, models::BlockNumberAddress}; use reth_primitives::{ stage::{StageCheckpoint, StageId}, - PruneCheckpoint, PruneModes, PruneSegment, + PruneCheckpoint, PruneMode, PruneSegment, }; use reth_provider::{ DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageReader, @@ -18,19 +18,19 @@ pub struct IndexStorageHistoryStage { /// flow will be returned to the pipeline for commit. pub commit_threshold: u64, /// Pruning configuration. - pub prune_modes: PruneModes, + pub prune_mode: Option, } impl IndexStorageHistoryStage { /// Create new instance of [IndexStorageHistoryStage]. - pub fn new(commit_threshold: u64, prune_modes: PruneModes) -> Self { - Self { commit_threshold, prune_modes } + pub fn new(commit_threshold: u64, prune_mode: Option) -> Self { + Self { commit_threshold, prune_mode } } } impl Default for IndexStorageHistoryStage { fn default() -> Self { - Self { commit_threshold: 100_000, prune_modes: PruneModes::none() } + Self { commit_threshold: 100_000, prune_mode: None } } } @@ -47,8 +47,11 @@ impl Stage for IndexStorageHistoryStage { provider: &DatabaseProviderRW<'_, &DB>, mut input: ExecInput, ) -> Result { - if let Some((target_prunable_block, prune_mode)) = - self.prune_modes.prune_target_block_storage_history(input.target())? + if let Some((target_prunable_block, prune_mode)) = self + .prune_mode + .map(|mode| mode.prune_target_block(input.target(), PruneSegment::StorageHistory)) + .transpose()? + .flatten() { if target_prunable_block > input.checkpoint().block_number { input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); @@ -409,7 +412,7 @@ mod tests { } #[tokio::test] - async fn insert_index_with_prune_modes() { + async fn insert_index_with_prune_mode() { // init let tx = TestTransaction::default(); @@ -439,10 +442,7 @@ mod tests { // run let input = ExecInput { target: Some(20000), ..Default::default() }; let mut stage = IndexStorageHistoryStage { - prune_modes: PruneModes { - storage_history: Some(PruneMode::Before(36)), - ..Default::default() - }, + prune_mode: Some(PruneMode::Before(36)), ..Default::default() }; let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); @@ -468,16 +468,12 @@ mod tests { struct IndexStorageHistoryTestRunner { pub(crate) tx: TestTransaction, commit_threshold: u64, - prune_modes: PruneModes, + prune_mode: Option, } impl Default for IndexStorageHistoryTestRunner { fn default() -> Self { - Self { - tx: TestTransaction::default(), - commit_threshold: 1000, - prune_modes: PruneModes::none(), - } + Self { tx: TestTransaction::default(), commit_threshold: 1000, prune_mode: None } } } @@ -489,10 +485,7 @@ mod tests { } fn stage(&self) -> Self::S { - Self::S { - commit_threshold: self.commit_threshold, - prune_modes: self.prune_modes.clone(), - } + Self::S { commit_threshold: self.commit_threshold, prune_mode: self.prune_mode } } } diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 00a820781..02b51dfa7 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -156,8 +156,10 @@ mod tests { ); // Check AccountHistory - let mut acc_indexing_stage = - IndexAccountHistoryStage { prune_modes: prune_modes.clone(), ..Default::default() }; + let mut acc_indexing_stage = IndexAccountHistoryStage { + prune_mode: prune_modes.account_history, + ..Default::default() + }; if let Some(PruneMode::Full) = prune_modes.account_history { // Full is not supported @@ -170,8 +172,10 @@ mod tests { } // Check StorageHistory - let mut storage_indexing_stage = - IndexStorageHistoryStage { prune_modes: prune_modes.clone(), ..Default::default() }; + let mut storage_indexing_stage = IndexStorageHistoryStage { + prune_mode: prune_modes.storage_history, + ..Default::default() + }; if let Some(PruneMode::Full) = prune_modes.storage_history { // Full is not supported diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index f35f28de4..697c18707 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -12,7 +12,7 @@ use reth_interfaces::provider::ProviderError; use reth_primitives::{ keccak256, stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, - PruneCheckpoint, PruneModes, PruneSegment, TransactionSignedNoHash, TxNumber, B256, + PruneCheckpoint, PruneMode, PruneSegment, TransactionSignedNoHash, TxNumber, B256, }; use reth_provider::{ BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, @@ -29,19 +29,19 @@ use tracing::*; pub struct TransactionLookupStage { /// The number of lookup entries to commit at once commit_threshold: u64, - prune_modes: PruneModes, + prune_mode: Option, } impl Default for TransactionLookupStage { fn default() -> Self { - Self { commit_threshold: 5_000_000, prune_modes: PruneModes::none() } + Self { commit_threshold: 5_000_000, prune_mode: None } } } impl TransactionLookupStage { /// Create new instance of [TransactionLookupStage]. - pub fn new(commit_threshold: u64, prune_modes: PruneModes) -> Self { - Self { commit_threshold, prune_modes } + pub fn new(commit_threshold: u64, prune_mode: Option) -> Self { + Self { commit_threshold, prune_mode } } } @@ -58,8 +58,11 @@ impl Stage for TransactionLookupStage { provider: &DatabaseProviderRW<'_, &DB>, mut input: ExecInput, ) -> Result { - if let Some((target_prunable_block, prune_mode)) = - self.prune_modes.prune_target_block_transaction_lookup(input.target())? + if let Some((target_prunable_block, prune_mode)) = self + .prune_mode + .map(|mode| mode.prune_target_block(input.target(), PruneSegment::TransactionLookup)) + .transpose()? + .flatten() { if target_prunable_block > input.checkpoint().block_number { input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); @@ -382,10 +385,7 @@ mod tests { random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..2); runner.tx.insert_blocks(seed.iter(), None).expect("failed to seed execution"); - runner.set_prune_modes(PruneModes { - transaction_lookup: Some(PruneMode::Before(prune_target)), - ..Default::default() - }); + runner.set_prune_mode(PruneMode::Before(prune_target)); let rx = runner.execute(input); @@ -469,16 +469,12 @@ mod tests { struct TransactionLookupTestRunner { tx: TestTransaction, commit_threshold: u64, - prune_modes: PruneModes, + prune_mode: Option, } impl Default for TransactionLookupTestRunner { fn default() -> Self { - Self { - tx: TestTransaction::default(), - commit_threshold: 1000, - prune_modes: PruneModes::none(), - } + Self { tx: TestTransaction::default(), commit_threshold: 1000, prune_mode: None } } } @@ -487,8 +483,8 @@ mod tests { self.commit_threshold = threshold; } - fn set_prune_modes(&mut self, prune_modes: PruneModes) { - self.prune_modes = prune_modes; + fn set_prune_mode(&mut self, prune_mode: PruneMode) { + self.prune_mode = Some(prune_mode); } /// # Panics @@ -528,7 +524,7 @@ mod tests { fn stage(&self) -> Self::S { TransactionLookupStage { commit_threshold: self.commit_threshold, - prune_modes: self.prune_modes.clone(), + prune_mode: self.prune_mode, } } } @@ -556,9 +552,13 @@ mod tests { let provider = self.tx.inner(); if let Some((target_prunable_block, _)) = self - .prune_modes - .prune_target_block_transaction_lookup(input.target()) + .prune_mode + .map(|mode| { + mode.prune_target_block(input.target(), PruneSegment::TransactionLookup) + }) + .transpose() .expect("prune target block for transaction lookup") + .flatten() { if target_prunable_block > input.checkpoint().block_number { input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));