feat(bin, prune): pass prune segments from CLI & refactor modes (#4964)

This commit is contained in:
Alexey Shekhirin
2023-10-12 14:49:28 +03:00
committed by GitHub
parent d2a967d4b5
commit 2dbd142d60
27 changed files with 395 additions and 332 deletions

1
Cargo.lock generated
View File

@ -6047,7 +6047,6 @@ dependencies = [
"modular-bitfield", "modular-bitfield",
"num_enum 0.7.0", "num_enum 0.7.0",
"once_cell", "once_cell",
"paste",
"plain_hasher", "plain_hasher",
"pprof", "pprof",
"proptest", "proptest",

View File

@ -60,9 +60,11 @@ use reth_provider::{
providers::BlockchainProvider, BlockHashReader, BlockReader, CanonStateSubscriptions, providers::BlockchainProvider, BlockHashReader, BlockReader, CanonStateSubscriptions,
HeaderProvider, ProviderFactory, StageCheckpointReader, HeaderProvider, ProviderFactory, StageCheckpointReader,
}; };
use reth_prune::{segments::SegmentSet, Pruner};
use reth_revm::Factory; use reth_revm::Factory;
use reth_revm_inspectors::stack::Hook; use reth_revm_inspectors::stack::Hook;
use reth_rpc_engine_api::EngineApi; use reth_rpc_engine_api::EngineApi;
use reth_snapshot::HighestSnapshotsTracker;
use reth_stages::{ use reth_stages::{
prelude::*, prelude::*,
stages::{ stages::{
@ -455,17 +457,12 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
let mut hooks = EngineHooks::new(); let mut hooks = EngineHooks::new();
let pruner_events = if let Some(prune_config) = prune_config { let pruner_events = if let Some(prune_config) = prune_config {
info!(target: "reth::cli", ?prune_config, "Pruner initialized"); let mut pruner = self.build_pruner(&prune_config, db.clone(), highest_snapshots_rx);
let mut pruner = reth_prune::Pruner::new(
db.clone(),
self.chain.clone(),
prune_config.block_interval,
prune_config.segments,
self.chain.prune_delete_limit,
highest_snapshots_rx,
);
let events = pruner.events(); let events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone()))); hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone())));
info!(target: "reth::cli", ?prune_config, "Pruner initialized");
Either::Left(events) Either::Left(events)
} else { } else {
Either::Right(stream::empty()) Either::Right(stream::empty())
@ -878,15 +875,15 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
.set(TransactionLookupStage::new( .set(TransactionLookupStage::new(
stage_config.transaction_lookup.commit_threshold, stage_config.transaction_lookup.commit_threshold,
prune_modes.clone(), prune_modes.transaction_lookup,
)) ))
.set(IndexAccountHistoryStage::new( .set(IndexAccountHistoryStage::new(
stage_config.index_account_history.commit_threshold, stage_config.index_account_history.commit_threshold,
prune_modes.clone(), prune_modes.account_history,
)) ))
.set(IndexStorageHistoryStage::new( .set(IndexStorageHistoryStage::new(
stage_config.index_storage_history.commit_threshold, stage_config.index_storage_history.commit_threshold,
prune_modes, prune_modes.storage_history,
)), )),
) )
.build(db, self.chain.clone()); .build(db, self.chain.clone());
@ -894,6 +891,45 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
Ok(pipeline) Ok(pipeline)
} }
fn build_pruner<DB: Database>(
&self,
config: &PruneConfig,
db: DB,
highest_snapshots_rx: HighestSnapshotsTracker,
) -> Pruner<DB> {
let mut segments = SegmentSet::new();
if let Some(mode) = config.segments.receipts {
segments = segments.add_segment(reth_prune::segments::Receipts::new(mode));
}
if !config.segments.receipts_log_filter.is_empty() {
segments = segments.add_segment(reth_prune::segments::ReceiptsByLogs::new(
config.segments.receipts_log_filter.clone(),
));
}
if let Some(mode) = config.segments.transaction_lookup {
segments = segments.add_segment(reth_prune::segments::TransactionLookup::new(mode));
}
if let Some(mode) = config.segments.sender_recovery {
segments = segments.add_segment(reth_prune::segments::SenderRecovery::new(mode));
}
if let Some(mode) = config.segments.account_history {
segments = segments.add_segment(reth_prune::segments::AccountHistory::new(mode));
}
if let Some(mode) = config.segments.storage_history {
segments = segments.add_segment(reth_prune::segments::StorageHistory::new(mode));
}
Pruner::new(
db,
self.chain.clone(),
segments.into_vec(),
config.block_interval,
self.chain.prune_delete_limit,
highest_snapshots_rx,
)
}
/// Change rpc port numbers based on the instance number. /// Change rpc port numbers based on the instance number.
fn adjust_instance_ports(&mut self) { fn adjust_instance_ports(&mut self) {
// auth port is scaled by a factor of instance * 100 // auth port is scaled by a factor of instance * 100

View File

@ -12,7 +12,7 @@ use reth_beacon_consensus::BeaconConsensus;
use reth_config::Config; use reth_config::Config;
use reth_db::init_db; use reth_db::init_db;
use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder;
use reth_primitives::{ChainSpec, PruneModes}; use reth_primitives::ChainSpec;
use reth_provider::{ProviderFactory, StageCheckpointReader}; use reth_provider::{ProviderFactory, StageCheckpointReader};
use reth_stages::{ use reth_stages::{
stages::{ stages::{
@ -211,7 +211,7 @@ impl Command {
) )
} }
StageEnum::TxLookup => { StageEnum::TxLookup => {
(Box::new(TransactionLookupStage::new(batch_size, PruneModes::none())), None) (Box::new(TransactionLookupStage::new(batch_size, None)), None)
} }
StageEnum::AccountHashing => { StageEnum::AccountHashing => {
(Box::new(AccountHashingStage::new(1, batch_size)), None) (Box::new(AccountHashingStage::new(1, batch_size)), None)

View File

@ -515,8 +515,8 @@ where
let pruner = Pruner::new( let pruner = Pruner::new(
db.clone(), db.clone(),
self.base_config.chain_spec.clone(), self.base_config.chain_spec.clone(),
vec![],
5, 5,
PruneModes::none(),
self.base_config.chain_spec.prune_delete_limit, self.base_config.chain_spec.prune_delete_limit,
watch::channel(None).1, watch::channel(None).1,
); );

View File

@ -48,7 +48,6 @@ derive_more = "0.99"
url = "2.3" url = "2.3"
once_cell = "1.17.0" once_cell = "1.17.0"
zstd = { version = "0.12", features = ["experimental"] } zstd = { version = "0.12", features = ["experimental"] }
paste = "1.0"
rayon.workspace = true rayon.workspace = true
tempfile.workspace = true tempfile.workspace = true
sha2 = "0.10.7" sha2 = "0.10.7"

View File

@ -53,7 +53,7 @@ impl ReceiptsLogPruneConfig {
// Reminder, that we increment because the [`BlockNumber`] key of the new map should be // Reminder, that we increment because the [`BlockNumber`] key of the new map should be
// viewed as `PruneMode::Before(block)` // viewed as `PruneMode::Before(block)`
let block = (pruned_block + 1).max( let block = (pruned_block + 1).max(
mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PruneSegment::ContractLogs)? mode.prune_target_block(tip, PruneSegment::ContractLogs)?
.map(|(block, _)| block) .map(|(block, _)| block)
.unwrap_or_default() + .unwrap_or_default() +
1, 1,
@ -75,11 +75,9 @@ impl ReceiptsLogPruneConfig {
for (_, mode) in self.0.iter() { for (_, mode) in self.0.iter() {
if let PruneMode::Distance(_) = mode { if let PruneMode::Distance(_) = mode {
if let Some((block, _)) = mode.prune_target_block( if let Some((block, _)) =
tip, mode.prune_target_block(tip, PruneSegment::ContractLogs)?
MINIMUM_PRUNING_DISTANCE, {
PruneSegment::ContractLogs,
)? {
lowest = Some(lowest.unwrap_or(u64::MAX).min(block)); lowest = Some(lowest.unwrap_or(u64::MAX).min(block));
} }
} }

View File

@ -20,17 +20,16 @@ impl PruneMode {
pub fn prune_target_block( pub fn prune_target_block(
&self, &self,
tip: BlockNumber, tip: BlockNumber,
min_blocks: u64,
segment: PruneSegment, segment: PruneSegment,
) -> Result<Option<(BlockNumber, PruneMode)>, PruneSegmentError> { ) -> Result<Option<(BlockNumber, PruneMode)>, PruneSegmentError> {
let result = match self { let result = match self {
PruneMode::Full if min_blocks == 0 => Some((tip, *self)), PruneMode::Full if segment.min_blocks() == 0 => Some((tip, *self)),
PruneMode::Distance(distance) if *distance > tip => None, // Nothing to prune yet PruneMode::Distance(distance) if *distance > tip => None, // Nothing to prune yet
PruneMode::Distance(distance) if *distance >= min_blocks => { PruneMode::Distance(distance) if *distance >= segment.min_blocks() => {
Some((tip - distance, *self)) Some((tip - distance, *self))
} }
PruneMode::Before(n) if *n > tip => None, // Nothing to prune yet PruneMode::Before(n) if *n > tip => None, // Nothing to prune yet
PruneMode::Before(n) if tip - n >= min_blocks => Some((n - 1, *self)), PruneMode::Before(n) if tip - n >= segment.min_blocks() => Some((n - 1, *self)),
_ => return Err(PruneSegmentError::Configuration(segment)), _ => return Err(PruneSegmentError::Configuration(segment)),
}; };
Ok(result) Ok(result)
@ -72,7 +71,6 @@ mod tests {
#[test] #[test]
fn test_prune_target_block() { fn test_prune_target_block() {
let tip = 20000; let tip = 20000;
let min_blocks = MINIMUM_PRUNING_DISTANCE;
let segment = PruneSegment::Receipts; let segment = PruneSegment::Receipts;
let tests = vec![ let tests = vec![
@ -80,7 +78,10 @@ mod tests {
(PruneMode::Full, Err(PruneSegmentError::Configuration(segment))), (PruneMode::Full, Err(PruneSegmentError::Configuration(segment))),
// Nothing to prune // Nothing to prune
(PruneMode::Distance(tip + 1), Ok(None)), (PruneMode::Distance(tip + 1), Ok(None)),
(PruneMode::Distance(min_blocks + 1), Ok(Some(tip - (min_blocks + 1)))), (
PruneMode::Distance(segment.min_blocks() + 1),
Ok(Some(tip - (segment.min_blocks() + 1))),
),
// Nothing to prune // Nothing to prune
(PruneMode::Before(tip + 1), Ok(None)), (PruneMode::Before(tip + 1), Ok(None)),
( (
@ -96,7 +97,7 @@ mod tests {
for (index, (mode, expected_result)) in tests.into_iter().enumerate() { for (index, (mode, expected_result)) in tests.into_iter().enumerate() {
assert_eq!( assert_eq!(
mode.prune_target_block(tip, min_blocks, segment), mode.prune_target_block(tip, segment),
expected_result.map(|r| r.map(|b| (b, mode))), expected_result.map(|r| r.map(|b| (b, mode))),
"Test {} failed", "Test {} failed",
index + 1, index + 1,
@ -105,7 +106,7 @@ mod tests {
// Test for a scenario where there are no minimum blocks and Full can be used // Test for a scenario where there are no minimum blocks and Full can be used
assert_eq!( assert_eq!(
PruneMode::Full.prune_target_block(tip, 0, segment), PruneMode::Full.prune_target_block(tip, PruneSegment::Transactions),
Ok(Some((tip, PruneMode::Full))), Ok(Some((tip, PruneMode::Full))),
); );
} }

View File

@ -1,3 +1,4 @@
use crate::MINIMUM_PRUNING_DISTANCE;
use derive_more::Display; use derive_more::Display;
use reth_codecs::{main_codec, Compact}; use reth_codecs::{main_codec, Compact};
use thiserror::Error; use thiserror::Error;
@ -24,6 +25,20 @@ pub enum PruneSegment {
Transactions, Transactions,
} }
impl PruneSegment {
/// Returns minimum number of blocks to left in the database for this segment.
pub fn min_blocks(&self) -> u64 {
match self {
Self::SenderRecovery | Self::TransactionLookup | Self::Headers | Self::Transactions => {
0
}
Self::Receipts | Self::ContractLogs | Self::AccountHistory | Self::StorageHistory => {
MINIMUM_PRUNING_DISTANCE
}
}
}
}
/// PruneSegment error type. /// PruneSegment error type.
#[derive(Debug, Error, PartialEq, Eq, Clone)] #[derive(Debug, Error, PartialEq, Eq, Clone)]
pub enum PruneSegmentError { pub enum PruneSegmentError {

View File

@ -1,8 +1,6 @@
use crate::{ use crate::{
prune::PruneSegmentError, serde_helper::deserialize_opt_prune_mode_with_min_blocks, serde_helper::deserialize_opt_prune_mode_with_min_blocks, PruneMode, ReceiptsLogPruneConfig,
BlockNumber, PruneMode, PruneSegment, ReceiptsLogPruneConfig,
}; };
use paste::paste;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Minimum distance from the tip necessary for the node to work correctly: /// Minimum distance from the tip necessary for the node to work correctly:
@ -44,69 +42,26 @@ pub struct PruneModes {
/// Receipts pruning configuration by retaining only those receipts that contain logs emitted /// Receipts pruning configuration by retaining only those receipts that contain logs emitted
/// by the specified addresses, discarding others. This setting is overridden by `receipts`. /// by the specified addresses, discarding others. This setting is overridden by `receipts`.
/// ///
/// The [`BlockNumber`] represents the starting block from which point onwards the receipts are /// The [BlockNumber](`crate::BlockNumber`) represents the starting block from which point
/// preserved. /// onwards the receipts are preserved.
pub receipts_log_filter: ReceiptsLogPruneConfig, pub receipts_log_filter: ReceiptsLogPruneConfig,
} }
macro_rules! impl_prune_segments {
($(($segment:ident, $variant:ident, $min_blocks:expr)),+) => {
$(
paste! {
#[doc = concat!(
"Check if ",
stringify!($variant),
" should be pruned at the target block according to the provided tip."
)]
pub fn [<should_prune_ $segment>](&self, block: BlockNumber, tip: BlockNumber) -> bool {
if let Some(mode) = &self.$segment {
return mode.should_prune(block, tip)
}
false
}
}
)+
$(
paste! {
#[doc = concat!(
"Returns block up to which ",
stringify!($variant),
" pruning needs to be done, inclusive, according to the provided tip."
)]
pub fn [<prune_target_block_ $segment>](&self, tip: BlockNumber) -> Result<Option<(BlockNumber, PruneMode)>, PruneSegmentError> {
match self.$segment {
Some(mode) => mode.prune_target_block(tip, $min_blocks.unwrap_or_default(), PruneSegment::$variant),
None => Ok(None)
}
}
}
)+
/// Sets pruning to all targets.
pub fn all() -> Self {
Self {
$(
$segment: Some(PruneMode::Full),
)+
receipts_log_filter: Default::default()
}
}
};
}
impl PruneModes { impl PruneModes {
/// Sets pruning to no target. /// Sets pruning to no target.
pub fn none() -> Self { pub fn none() -> Self {
PruneModes::default() PruneModes::default()
} }
impl_prune_segments!( /// Sets pruning to all targets.
(sender_recovery, SenderRecovery, None), pub fn all() -> Self {
(transaction_lookup, TransactionLookup, None), Self {
(receipts, Receipts, Some(MINIMUM_PRUNING_DISTANCE)), sender_recovery: Some(PruneMode::Full),
(account_history, AccountHistory, Some(MINIMUM_PRUNING_DISTANCE)), transaction_lookup: Some(PruneMode::Full),
(storage_history, StorageHistory, Some(MINIMUM_PRUNING_DISTANCE)) receipts: Some(PruneMode::Full),
); account_history: Some(PruneMode::Full),
storage_history: Some(PruneMode::Full),
receipts_log_filter: Default::default(),
}
}
} }

View File

@ -13,7 +13,7 @@ mod error;
mod event; mod event;
mod metrics; mod metrics;
mod pruner; mod pruner;
mod segments; pub mod segments;
use crate::metrics::Metrics; use crate::metrics::Metrics;
pub use error::PrunerError; pub use error::PrunerError;

View File

@ -7,8 +7,7 @@ use crate::{
}; };
use reth_db::database::Database; use reth_db::database::Database;
use reth_primitives::{ use reth_primitives::{
listener::EventListeners, BlockNumber, ChainSpec, PruneMode, PruneModes, PruneProgress, listener::EventListeners, BlockNumber, ChainSpec, PruneMode, PruneProgress, PruneSegment,
PruneSegment, PruneSegmentError,
}; };
use reth_provider::{ProviderFactory, PruneCheckpointReader}; use reth_provider::{ProviderFactory, PruneCheckpointReader};
use reth_snapshot::HighestSnapshotsTracker; use reth_snapshot::HighestSnapshotsTracker;
@ -22,32 +21,11 @@ pub type PrunerResult = Result<PruneProgress, PrunerError>;
/// The pruner type itself with the result of [Pruner::run] /// The pruner type itself with the result of [Pruner::run]
pub type PrunerWithResult<DB> = (Pruner<DB>, PrunerResult); pub type PrunerWithResult<DB> = (Pruner<DB>, PrunerResult);
type RunnableSegmentGetPruneTargetBlockResult =
Result<Option<(BlockNumber, PruneMode)>, PruneSegmentError>;
struct PrunableSegment<DB: Database>(
Box<dyn Segment<DB>>,
#[allow(clippy::type_complexity)]
Box<dyn Fn(&PruneModes, BlockNumber) -> RunnableSegmentGetPruneTargetBlockResult>,
);
impl<DB: Database> PrunableSegment<DB> {
fn new<
S: Segment<DB> + 'static,
F: Fn(&PruneModes, BlockNumber) -> RunnableSegmentGetPruneTargetBlockResult + 'static,
>(
segment: S,
f: F,
) -> Self {
Self(Box::new(segment), Box::new(f))
}
}
/// Pruning routine. Main pruning logic happens in [Pruner::run]. /// Pruning routine. Main pruning logic happens in [Pruner::run].
#[derive(Debug)] #[derive(Debug)]
pub struct Pruner<DB> { pub struct Pruner<DB> {
metrics: Metrics,
provider_factory: ProviderFactory<DB>, provider_factory: ProviderFactory<DB>,
segments: Vec<Arc<dyn Segment<DB>>>,
/// Minimum pruning interval measured in blocks. All prune segments are checked and, if needed, /// Minimum pruning interval measured in blocks. All prune segments are checked and, if needed,
/// pruned, when the chain advances by the specified number of blocks. /// pruned, when the chain advances by the specified number of blocks.
min_block_interval: usize, min_block_interval: usize,
@ -55,12 +33,12 @@ pub struct Pruner<DB> {
/// number is updated with the tip block number the pruner was called with. It's used in /// number is updated with the tip block number the pruner was called with. It's used in
/// conjunction with `min_block_interval` to determine when the pruning needs to be initiated. /// conjunction with `min_block_interval` to determine when the pruning needs to be initiated.
previous_tip_block_number: Option<BlockNumber>, previous_tip_block_number: Option<BlockNumber>,
modes: PruneModes,
/// Maximum total entries to prune (delete from database) per block. /// Maximum total entries to prune (delete from database) per block.
delete_limit: usize, delete_limit: usize,
listeners: EventListeners<PrunerEvent>,
#[allow(dead_code)] #[allow(dead_code)]
highest_snapshots_tracker: HighestSnapshotsTracker, highest_snapshots_tracker: HighestSnapshotsTracker,
metrics: Metrics,
listeners: EventListeners<PrunerEvent>,
} }
impl<DB: Database> Pruner<DB> { impl<DB: Database> Pruner<DB> {
@ -68,20 +46,20 @@ impl<DB: Database> Pruner<DB> {
pub fn new( pub fn new(
db: DB, db: DB,
chain_spec: Arc<ChainSpec>, chain_spec: Arc<ChainSpec>,
segments: Vec<Arc<dyn Segment<DB>>>,
min_block_interval: usize, min_block_interval: usize,
modes: PruneModes,
delete_limit: usize, delete_limit: usize,
highest_snapshots_tracker: HighestSnapshotsTracker, highest_snapshots_tracker: HighestSnapshotsTracker,
) -> Self { ) -> Self {
Self { Self {
metrics: Metrics::default(),
provider_factory: ProviderFactory::new(db, chain_spec), provider_factory: ProviderFactory::new(db, chain_spec),
segments,
min_block_interval, min_block_interval,
previous_tip_block_number: None, previous_tip_block_number: None,
modes,
delete_limit, delete_limit,
listeners: Default::default(),
highest_snapshots_tracker, highest_snapshots_tracker,
metrics: Metrics::default(),
listeners: Default::default(),
} }
} }
@ -119,37 +97,16 @@ impl<DB: Database> Pruner<DB> {
.map_or(1, |previous_tip_block_number| tip_block_number - previous_tip_block_number) .map_or(1, |previous_tip_block_number| tip_block_number - previous_tip_block_number)
as usize; as usize;
// TODO(alexey): this is cursed, refactor for segment in &self.segments {
let segments: [PrunableSegment<DB>; 5] = [
PrunableSegment::new(
segments::Receipts::default(),
PruneModes::prune_target_block_receipts,
),
PrunableSegment::new(
segments::TransactionLookup::default(),
PruneModes::prune_target_block_transaction_lookup,
),
PrunableSegment::new(
segments::SenderRecovery::default(),
PruneModes::prune_target_block_sender_recovery,
),
PrunableSegment::new(
segments::AccountHistory::default(),
PruneModes::prune_target_block_account_history,
),
PrunableSegment::new(
segments::StorageHistory::default(),
PruneModes::prune_target_block_storage_history,
),
];
for PrunableSegment(segment, get_prune_target_block) in segments {
if delete_limit == 0 { if delete_limit == 0 {
break break
} }
if let Some((to_block, prune_mode)) = if let Some((to_block, prune_mode)) = segment
get_prune_target_block(&self.modes, tip_block_number)? .mode()
.map(|mode| mode.prune_target_block(tip_block_number, segment.segment()))
.transpose()?
.flatten()
{ {
trace!( trace!(
target: "pruner", target: "pruner",
@ -183,30 +140,6 @@ impl<DB: Database> Pruner<DB> {
} }
} }
// TODO(alexey): make it not a special case
if !self.modes.receipts_log_filter.is_empty() && delete_limit > 0 {
let segment_start = Instant::now();
let output = segments::ReceiptsByLogs::default().prune(
&provider,
&self.modes.receipts_log_filter,
tip_block_number,
delete_limit,
)?;
self.metrics
.get_prune_segment_metrics(PruneSegment::ContractLogs)
.duration_seconds
.record(segment_start.elapsed());
done = done && output.done;
delete_limit = delete_limit.saturating_sub(output.pruned);
stats.insert(
PruneSegment::ContractLogs,
(PruneProgress::from_done(output.done), output.pruned),
);
} else {
trace!(target: "pruner", segment = ?PruneSegment::ContractLogs, "No filter to prune");
}
if let Some(snapshots) = highest_snapshots { if let Some(snapshots) = highest_snapshots {
if let (Some(to_block), true) = (snapshots.headers, delete_limit > 0) { if let (Some(to_block), true) = (snapshots.headers, delete_limit > 0) {
let prune_mode = PruneMode::Before(to_block + 1); let prune_mode = PruneMode::Before(to_block + 1);
@ -219,7 +152,7 @@ impl<DB: Database> Pruner<DB> {
); );
let segment_start = Instant::now(); let segment_start = Instant::now();
let segment = segments::Headers::default(); let segment = segments::Headers::new(prune_mode);
let previous_checkpoint = provider.get_prune_checkpoint(PruneSegment::Headers)?; let previous_checkpoint = provider.get_prune_checkpoint(PruneSegment::Headers)?;
let output = segment let output = segment
.prune(&provider, PruneInput { previous_checkpoint, to_block, delete_limit })?; .prune(&provider, PruneInput { previous_checkpoint, to_block, delete_limit })?;
@ -251,7 +184,7 @@ impl<DB: Database> Pruner<DB> {
); );
let segment_start = Instant::now(); let segment_start = Instant::now();
let segment = segments::Transactions::default(); let segment = segments::Transactions::new(prune_mode);
let previous_checkpoint = provider.get_prune_checkpoint(PruneSegment::Headers)?; let previous_checkpoint = provider.get_prune_checkpoint(PruneSegment::Headers)?;
let output = segment let output = segment
.prune(&provider, PruneInput { previous_checkpoint, to_block, delete_limit })?; .prune(&provider, PruneInput { previous_checkpoint, to_block, delete_limit })?;
@ -321,14 +254,13 @@ impl<DB: Database> Pruner<DB> {
mod tests { mod tests {
use crate::Pruner; use crate::Pruner;
use reth_db::test_utils::create_test_rw_db; use reth_db::test_utils::create_test_rw_db;
use reth_primitives::{PruneModes, MAINNET}; use reth_primitives::MAINNET;
use tokio::sync::watch; use tokio::sync::watch;
#[test] #[test]
fn is_pruning_needed() { fn is_pruning_needed() {
let db = create_test_rw_db(); let db = create_test_rw_db();
let mut pruner = let mut pruner = Pruner::new(db, MAINNET.clone(), vec![], 5, 0, watch::channel(None).1);
Pruner::new(db, MAINNET.clone(), 5, PruneModes::none(), 0, watch::channel(None).1);
// No last pruned block number was set before // No last pruned block number was set before
let first_block_number = 1; let first_block_number = 1;

View File

@ -5,19 +5,30 @@ use crate::{
PrunerError, PrunerError,
}; };
use reth_db::{database::Database, models::ShardedKey, tables}; use reth_db::{database::Database, models::ShardedKey, tables};
use reth_primitives::PruneSegment; use reth_primitives::{PruneMode, PruneSegment};
use reth_provider::DatabaseProviderRW; use reth_provider::DatabaseProviderRW;
use tracing::{instrument, trace}; use tracing::{instrument, trace};
#[derive(Default)] #[derive(Debug)]
#[non_exhaustive] pub struct AccountHistory {
pub(crate) struct AccountHistory; mode: PruneMode,
}
impl AccountHistory {
pub fn new(mode: PruneMode) -> Self {
Self { mode }
}
}
impl<DB: Database> Segment<DB> for AccountHistory { impl<DB: Database> Segment<DB> for AccountHistory {
fn segment(&self) -> PruneSegment { fn segment(&self) -> PruneSegment {
PruneSegment::AccountHistory PruneSegment::AccountHistory
} }
fn mode(&self) -> Option<PruneMode> {
Some(self.mode)
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune( fn prune(
&self, &self,
@ -129,7 +140,7 @@ mod tests {
to_block, to_block,
delete_limit: 2000, delete_limit: 2000,
}; };
let segment = AccountHistory::default(); let segment = AccountHistory::new(prune_mode);
let provider = tx.inner_rw(); let provider = tx.inner_rw();
let result = segment.prune(&provider, input).unwrap(); let result = segment.prune(&provider, input).unwrap();

View File

@ -5,20 +5,31 @@ use crate::{
use itertools::Itertools; use itertools::Itertools;
use reth_db::{database::Database, table::Table, tables}; use reth_db::{database::Database, table::Table, tables};
use reth_interfaces::RethResult; use reth_interfaces::RethResult;
use reth_primitives::{BlockNumber, PruneSegment}; use reth_primitives::{BlockNumber, PruneMode, PruneSegment};
use reth_provider::DatabaseProviderRW; use reth_provider::DatabaseProviderRW;
use std::ops::RangeInclusive; use std::ops::RangeInclusive;
use tracing::{instrument, trace}; use tracing::{instrument, trace};
#[derive(Default)] #[derive(Debug)]
#[non_exhaustive] pub struct Headers {
pub(crate) struct Headers; mode: PruneMode,
}
impl Headers {
pub fn new(mode: PruneMode) -> Self {
Self { mode }
}
}
impl<DB: Database> Segment<DB> for Headers { impl<DB: Database> Segment<DB> for Headers {
fn segment(&self) -> PruneSegment { fn segment(&self) -> PruneSegment {
PruneSegment::Headers PruneSegment::Headers
} }
fn mode(&self) -> Option<PruneMode> {
Some(self.mode)
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune( fn prune(
&self, &self,
@ -129,7 +140,7 @@ mod tests {
to_block, to_block,
delete_limit: 10, delete_limit: 10,
}; };
let segment = Headers::default(); let segment = Headers::new(prune_mode);
let next_block_number_to_prune = tx let next_block_number_to_prune = tx
.inner() .inner()
@ -193,7 +204,7 @@ mod tests {
// Less than total number of tables for `Headers` segment // Less than total number of tables for `Headers` segment
delete_limit: 2, delete_limit: 2,
}; };
let segment = Headers::default(); let segment = Headers::new(PruneMode::Full);
let provider = tx.inner_rw(); let provider = tx.inner_rw();
let result = segment.prune(&provider, input).unwrap(); let result = segment.prune(&provider, input).unwrap();

View File

@ -4,18 +4,21 @@ mod history;
mod receipts; mod receipts;
mod receipts_by_logs; mod receipts_by_logs;
mod sender_recovery; mod sender_recovery;
mod set;
mod storage_history; mod storage_history;
mod transaction_lookup; mod transaction_lookup;
mod transactions; mod transactions;
pub(crate) use account_history::AccountHistory; pub use account_history::AccountHistory;
pub(crate) use headers::Headers; pub use headers::Headers;
pub(crate) use receipts::Receipts; pub use receipts::Receipts;
pub(crate) use receipts_by_logs::ReceiptsByLogs; pub use receipts_by_logs::ReceiptsByLogs;
pub(crate) use sender_recovery::SenderRecovery; pub use sender_recovery::SenderRecovery;
pub(crate) use storage_history::StorageHistory; pub use set::SegmentSet;
pub(crate) use transaction_lookup::TransactionLookup; use std::fmt::Debug;
pub(crate) use transactions::Transactions; pub use storage_history::StorageHistory;
pub use transaction_lookup::TransactionLookup;
pub use transactions::Transactions;
use crate::PrunerError; use crate::PrunerError;
use reth_db::database::Database; use reth_db::database::Database;
@ -32,9 +35,13 @@ use tracing::error;
/// 2. If [Segment::prune] returned a [Some] in `checkpoint` of [PruneOutput], call /// 2. If [Segment::prune] returned a [Some] in `checkpoint` of [PruneOutput], call
/// [Segment::save_checkpoint]. /// [Segment::save_checkpoint].
/// 3. Subtract `pruned` of [PruneOutput] from `delete_limit` of next [PruneInput]. /// 3. Subtract `pruned` of [PruneOutput] from `delete_limit` of next [PruneInput].
pub(crate) trait Segment<DB: Database> { pub trait Segment<DB: Database>: Debug + Send + Sync {
/// Segment of data that's pruned.
fn segment(&self) -> PruneSegment; fn segment(&self) -> PruneSegment;
/// Prune mode with which the segment was initialized
fn mode(&self) -> Option<PruneMode>;
/// Prune data for [Self::segment] using the provided input. /// Prune data for [Self::segment] using the provided input.
fn prune( fn prune(
&self, &self,
@ -54,7 +61,7 @@ pub(crate) trait Segment<DB: Database> {
/// Segment pruning input, see [Segment::prune]. /// Segment pruning input, see [Segment::prune].
#[derive(Debug, Clone, Copy)] #[derive(Debug, Clone, Copy)]
pub(crate) struct PruneInput { pub struct PruneInput {
pub(crate) previous_checkpoint: Option<PruneCheckpoint>, pub(crate) previous_checkpoint: Option<PruneCheckpoint>,
/// Target block up to which the pruning needs to be done, inclusive. /// Target block up to which the pruning needs to be done, inclusive.
pub(crate) to_block: BlockNumber, pub(crate) to_block: BlockNumber,
@ -129,7 +136,7 @@ impl PruneInput {
/// Segment pruning output, see [Segment::prune]. /// Segment pruning output, see [Segment::prune].
#[derive(Debug, Clone, Copy, Eq, PartialEq)] #[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub(crate) struct PruneOutput { pub struct PruneOutput {
/// `true` if pruning has been completed up to the target block, and `false` if there's more /// `true` if pruning has been completed up to the target block, and `false` if there's more
/// data to prune in further runs. /// data to prune in further runs.
pub(crate) done: bool, pub(crate) done: bool,

View File

@ -4,19 +4,30 @@ use crate::{
}; };
use reth_db::{database::Database, tables}; use reth_db::{database::Database, tables};
use reth_interfaces::RethResult; use reth_interfaces::RethResult;
use reth_primitives::{PruneCheckpoint, PruneSegment}; use reth_primitives::{PruneCheckpoint, PruneMode, PruneSegment};
use reth_provider::{DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider}; use reth_provider::{DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider};
use tracing::{instrument, trace}; use tracing::{instrument, trace};
#[derive(Default)] #[derive(Debug)]
#[non_exhaustive] pub struct Receipts {
pub(crate) struct Receipts; mode: PruneMode,
}
impl Receipts {
pub fn new(mode: PruneMode) -> Self {
Self { mode }
}
}
impl<DB: Database> Segment<DB> for Receipts { impl<DB: Database> Segment<DB> for Receipts {
fn segment(&self) -> PruneSegment { fn segment(&self) -> PruneSegment {
PruneSegment::Receipts PruneSegment::Receipts
} }
fn mode(&self) -> Option<PruneMode> {
Some(self.mode)
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune( fn prune(
&self, &self,
@ -127,7 +138,7 @@ mod tests {
to_block, to_block,
delete_limit: 10, delete_limit: 10,
}; };
let segment = Receipts::default(); let segment = Receipts::new(prune_mode);
let next_tx_number_to_prune = tx let next_tx_number_to_prune = tx
.inner() .inner()

View File

@ -1,46 +1,51 @@
use crate::{segments::PruneOutput, PrunerError}; use crate::{
segments::{PruneInput, PruneOutput, Segment},
PrunerError,
};
use reth_db::{database::Database, tables}; use reth_db::{database::Database, tables};
use reth_primitives::{ use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, ReceiptsLogPruneConfig, PruneCheckpoint, PruneMode, PruneSegment, ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE,
MINIMUM_PRUNING_DISTANCE,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter,
TransactionsProvider,
}; };
use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider};
use tracing::{instrument, trace}; use tracing::{instrument, trace};
#[derive(Default)] #[derive(Debug)]
#[non_exhaustive] pub struct ReceiptsByLogs {
pub(crate) struct ReceiptsByLogs; config: ReceiptsLogPruneConfig,
}
impl ReceiptsByLogs { impl ReceiptsByLogs {
/// Prune receipts up to the provided block, inclusive, by filtering logs. Works as in inclusion pub fn new(config: ReceiptsLogPruneConfig) -> Self {
/// list, and removes every receipt not belonging to it. Respects the batch size. Self { config }
#[instrument(level = "trace", skip(self, provider), target = "pruner")] }
pub(crate) fn prune<DB: Database>( }
impl<DB: Database> Segment<DB> for ReceiptsByLogs {
fn segment(&self) -> PruneSegment {
PruneSegment::ContractLogs
}
fn mode(&self) -> Option<PruneMode> {
None
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune(
&self, &self,
provider: &DatabaseProviderRW<'_, DB>, provider: &DatabaseProviderRW<'_, DB>,
receipts_log_filter: &ReceiptsLogPruneConfig, input: PruneInput,
tip_block_number: BlockNumber,
delete_limit: usize,
) -> Result<PruneOutput, PrunerError> { ) -> Result<PruneOutput, PrunerError> {
// Contract log filtering removes every receipt possible except the ones in the list. So, // Contract log filtering removes every receipt possible except the ones in the list. So,
// for the other receipts it's as if they had a `PruneMode::Distance()` of // for the other receipts it's as if they had a `PruneMode::Distance()` of
// `MINIMUM_PRUNING_DISTANCE`. // `MINIMUM_PRUNING_DISTANCE`.
let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE) let to_block = PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)
.prune_target_block( .prune_target_block(input.to_block, PruneSegment::ContractLogs)?
tip_block_number,
MINIMUM_PRUNING_DISTANCE,
PruneSegment::ContractLogs,
)?
.map(|(bn, _)| bn) .map(|(bn, _)| bn)
.unwrap_or_default(); .unwrap_or_default();
// Get status checkpoint from latest run // Get status checkpoint from latest run
let mut last_pruned_block = provider let mut last_pruned_block =
.get_prune_checkpoint(PruneSegment::ContractLogs)? input.previous_checkpoint.and_then(|checkpoint| checkpoint.block_number);
.and_then(|checkpoint| checkpoint.block_number);
let initial_last_pruned_block = last_pruned_block; let initial_last_pruned_block = last_pruned_block;
@ -54,8 +59,7 @@ impl ReceiptsByLogs {
// Figure out what receipts have already been pruned, so we can have an accurate // Figure out what receipts have already been pruned, so we can have an accurate
// `address_filter` // `address_filter`
let address_filter = let address_filter = self.config.group_by_block(input.to_block, last_pruned_block)?;
receipts_log_filter.group_by_block(tip_block_number, last_pruned_block)?;
// Splits all transactions in different block ranges. Each block range will have its own // Splits all transactions in different block ranges. Each block range will have its own
// filter address list and will check it while going through the table // filter address list and will check it while going through the table
@ -108,7 +112,7 @@ impl ReceiptsByLogs {
"Calculated block ranges and filtered addresses", "Calculated block ranges and filtered addresses",
); );
let mut limit = delete_limit; let mut limit = input.delete_limit;
let mut done = true; let mut done = true;
let mut last_pruned_transaction = None; let mut last_pruned_transaction = None;
for (start_block, end_block, num_addresses) in block_ranges { for (start_block, end_block, num_addresses) in block_ranges {
@ -183,8 +187,9 @@ impl ReceiptsByLogs {
// //
// Only applies if we were able to prune everything intended for this run, otherwise the // Only applies if we were able to prune everything intended for this run, otherwise the
// checkpoint is the `last_pruned_block`. // checkpoint is the `last_pruned_block`.
let prune_mode_block = receipts_log_filter let prune_mode_block = self
.lowest_block_with_distance(tip_block_number, initial_last_pruned_block)? .config
.lowest_block_with_distance(input.to_block, initial_last_pruned_block)?
.unwrap_or(to_block); .unwrap_or(to_block);
provider.save_prune_checkpoint( provider.save_prune_checkpoint(
@ -196,13 +201,13 @@ impl ReceiptsByLogs {
}, },
)?; )?;
Ok(PruneOutput { done, pruned: delete_limit - limit, checkpoint: None }) Ok(PruneOutput { done, pruned: input.delete_limit - limit, checkpoint: None })
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::segments::receipts_by_logs::ReceiptsByLogs; use crate::segments::{receipts_by_logs::ReceiptsByLogs, PruneInput, Segment};
use assert_matches::assert_matches; use assert_matches::assert_matches;
use reth_db::{cursor::DbCursorRO, tables, transaction::DbTx}; use reth_db::{cursor::DbCursorRO, tables, transaction::DbTx};
use reth_interfaces::test_utils::{ use reth_interfaces::test_utils::{
@ -261,7 +266,17 @@ mod tests {
let receipts_log_filter = let receipts_log_filter =
ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)])); ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)]));
let result = ReceiptsByLogs::default().prune(&provider, &receipts_log_filter, tip, 10); let result = ReceiptsByLogs::new(receipts_log_filter).prune(
&provider,
PruneInput {
previous_checkpoint: tx
.inner()
.get_prune_checkpoint(PruneSegment::ContractLogs)
.unwrap(),
to_block: tip,
delete_limit: 10,
},
);
provider.commit().expect("commit"); provider.commit().expect("commit");
assert_matches!(result, Ok(_)); assert_matches!(result, Ok(_));

View File

@ -3,19 +3,30 @@ use crate::{
PrunerError, PrunerError,
}; };
use reth_db::{database::Database, tables}; use reth_db::{database::Database, tables};
use reth_primitives::PruneSegment; use reth_primitives::{PruneMode, PruneSegment};
use reth_provider::{DatabaseProviderRW, TransactionsProvider}; use reth_provider::{DatabaseProviderRW, TransactionsProvider};
use tracing::{instrument, trace}; use tracing::{instrument, trace};
#[derive(Default)] #[derive(Debug)]
#[non_exhaustive] pub struct SenderRecovery {
pub(crate) struct SenderRecovery; mode: PruneMode,
}
impl SenderRecovery {
pub fn new(mode: PruneMode) -> Self {
Self { mode }
}
}
impl<DB: Database> Segment<DB> for SenderRecovery { impl<DB: Database> Segment<DB> for SenderRecovery {
fn segment(&self) -> PruneSegment { fn segment(&self) -> PruneSegment {
PruneSegment::SenderRecovery PruneSegment::SenderRecovery
} }
fn mode(&self) -> Option<PruneMode> {
Some(self.mode)
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune( fn prune(
&self, &self,
@ -112,7 +123,7 @@ mod tests {
to_block, to_block,
delete_limit: 10, delete_limit: 10,
}; };
let segment = SenderRecovery::default(); let segment = SenderRecovery::new(prune_mode);
let next_tx_number_to_prune = tx let next_tx_number_to_prune = tx
.inner() .inner()

View File

@ -0,0 +1,33 @@
use crate::segments::Segment;
use reth_db::database::Database;
use std::sync::Arc;
/// Collection of [Segment]. Thread-safe, allocated on the heap.
#[derive(Debug)]
pub struct SegmentSet<DB: Database> {
inner: Vec<Arc<dyn Segment<DB>>>,
}
impl<DB: Database> SegmentSet<DB> {
/// Returns empty [SegmentSet] collection.
pub fn new() -> Self {
Self::default()
}
/// Adds new [Segment] to collection.
pub fn add_segment<S: Segment<DB> + 'static>(mut self, segment: S) -> Self {
self.inner.push(Arc::new(segment));
self
}
/// Consumes [SegmentSet] and returns a [Vec].
pub fn into_vec(self) -> Vec<Arc<dyn Segment<DB>>> {
self.inner
}
}
impl<DB: Database> Default for SegmentSet<DB> {
fn default() -> Self {
Self { inner: Vec::new() }
}
}

View File

@ -9,19 +9,30 @@ use reth_db::{
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress}, models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
tables, tables,
}; };
use reth_primitives::PruneSegment; use reth_primitives::{PruneMode, PruneSegment};
use reth_provider::DatabaseProviderRW; use reth_provider::DatabaseProviderRW;
use tracing::{instrument, trace}; use tracing::{instrument, trace};
#[derive(Default)] #[derive(Debug)]
#[non_exhaustive] pub struct StorageHistory {
pub(crate) struct StorageHistory; mode: PruneMode,
}
impl StorageHistory {
pub fn new(mode: PruneMode) -> Self {
Self { mode }
}
}
impl<DB: Database> Segment<DB> for StorageHistory { impl<DB: Database> Segment<DB> for StorageHistory {
fn segment(&self) -> PruneSegment { fn segment(&self) -> PruneSegment {
PruneSegment::StorageHistory PruneSegment::StorageHistory
} }
fn mode(&self) -> Option<PruneMode> {
Some(self.mode)
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune( fn prune(
&self, &self,
@ -133,7 +144,7 @@ mod tests {
to_block, to_block,
delete_limit: 2000, delete_limit: 2000,
}; };
let segment = StorageHistory::default(); let segment = StorageHistory::new(prune_mode);
let provider = tx.inner_rw(); let provider = tx.inner_rw();
let result = segment.prune(&provider, input).unwrap(); let result = segment.prune(&provider, input).unwrap();

View File

@ -4,19 +4,30 @@ use crate::{
}; };
use rayon::prelude::*; use rayon::prelude::*;
use reth_db::{database::Database, tables}; use reth_db::{database::Database, tables};
use reth_primitives::PruneSegment; use reth_primitives::{PruneMode, PruneSegment};
use reth_provider::{DatabaseProviderRW, TransactionsProvider}; use reth_provider::{DatabaseProviderRW, TransactionsProvider};
use tracing::{instrument, trace}; use tracing::{instrument, trace};
#[derive(Default)] #[derive(Debug)]
#[non_exhaustive] pub struct TransactionLookup {
pub(crate) struct TransactionLookup; mode: PruneMode,
}
impl TransactionLookup {
pub fn new(mode: PruneMode) -> Self {
Self { mode }
}
}
impl<DB: Database> Segment<DB> for TransactionLookup { impl<DB: Database> Segment<DB> for TransactionLookup {
fn segment(&self) -> PruneSegment { fn segment(&self) -> PruneSegment {
PruneSegment::TransactionLookup PruneSegment::TransactionLookup
} }
fn mode(&self) -> Option<PruneMode> {
Some(self.mode)
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune( fn prune(
&self, &self,
@ -131,7 +142,7 @@ mod tests {
to_block, to_block,
delete_limit: 10, delete_limit: 10,
}; };
let segment = TransactionLookup::default(); let segment = TransactionLookup::new(prune_mode);
let next_tx_number_to_prune = tx let next_tx_number_to_prune = tx
.inner() .inner()

View File

@ -3,19 +3,30 @@ use crate::{
PrunerError, PrunerError,
}; };
use reth_db::{database::Database, tables}; use reth_db::{database::Database, tables};
use reth_primitives::PruneSegment; use reth_primitives::{PruneMode, PruneSegment};
use reth_provider::{DatabaseProviderRW, TransactionsProvider}; use reth_provider::{DatabaseProviderRW, TransactionsProvider};
use tracing::{instrument, trace}; use tracing::{instrument, trace};
#[derive(Default)] #[derive(Debug)]
#[non_exhaustive] pub struct Transactions {
pub(crate) struct Transactions; mode: PruneMode,
}
impl Transactions {
pub fn new(mode: PruneMode) -> Self {
Self { mode }
}
}
impl<DB: Database> Segment<DB> for Transactions { impl<DB: Database> Segment<DB> for Transactions {
fn segment(&self) -> PruneSegment { fn segment(&self) -> PruneSegment {
PruneSegment::Transactions PruneSegment::Transactions
} }
fn mode(&self) -> Option<PruneMode> {
Some(self.mode)
}
#[instrument(level = "trace", target = "pruner", skip(self, provider), ret)] #[instrument(level = "trace", target = "pruner", skip(self, provider), ret)]
fn prune( fn prune(
&self, &self,
@ -94,7 +105,7 @@ mod tests {
to_block, to_block,
delete_limit: 10, delete_limit: 10,
}; };
let segment = Transactions::default(); let segment = Transactions::new(prune_mode);
let next_tx_number_to_prune = tx let next_tx_number_to_prune = tx
.inner() .inner()

View File

@ -363,8 +363,14 @@ impl<'a> EVMProcessor<'a> {
let time = Instant::now(); let time = Instant::now();
let retention = if self.tip.map_or(true, |tip| { let retention = if self.tip.map_or(true, |tip| {
!self.prune_modes.should_prune_account_history(block.number, tip) && !self
!self.prune_modes.should_prune_storage_history(block.number, tip) .prune_modes
.account_history
.map_or(false, |mode| mode.should_prune(block.number, tip)) &&
!self
.prune_modes
.storage_history
.map_or(false, |mode| mode.should_prune(block.number, tip))
}) { }) {
BundleRetention::Reverts BundleRetention::Reverts
} else { } else {
@ -405,7 +411,7 @@ impl<'a> EVMProcessor<'a> {
// Block receipts should not be retained // Block receipts should not be retained
if self.prune_modes.receipts == Some(PruneMode::Full) || if self.prune_modes.receipts == Some(PruneMode::Full) ||
// [`PruneSegment::Receipts`] takes priority over [`PruneSegment::ContractLogs`] // [`PruneSegment::Receipts`] takes priority over [`PruneSegment::ContractLogs`]
self.prune_modes.should_prune_receipts(block_number, tip) self.prune_modes.receipts.map_or(false, |mode| mode.should_prune(block_number, tip))
{ {
receipts.clear(); receipts.clear();
return Ok(()) return Ok(())

View File

@ -5,7 +5,7 @@ use criterion::{
use pprof::criterion::{Output, PProfProfiler}; use pprof::criterion::{Output, PProfProfiler};
use reth_db::DatabaseEnv; use reth_db::DatabaseEnv;
use reth_interfaces::test_utils::TestConsensus; use reth_interfaces::test_utils::TestConsensus;
use reth_primitives::{stage::StageCheckpoint, PruneModes, MAINNET}; use reth_primitives::{stage::StageCheckpoint, MAINNET};
use reth_provider::ProviderFactory; use reth_provider::ProviderFactory;
use reth_stages::{ use reth_stages::{
stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage},
@ -62,7 +62,7 @@ fn transaction_lookup(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages"); let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times // don't need to run each stage for that many times
group.sample_size(10); group.sample_size(10);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, PruneModes::none()); let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, None);
measure_stage( measure_stage(
&mut group, &mut group,

View File

@ -2,7 +2,7 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}
use reth_db::database::Database; use reth_db::database::Database;
use reth_primitives::{ use reth_primitives::{
stage::{StageCheckpoint, StageId}, stage::{StageCheckpoint, StageId},
PruneCheckpoint, PruneModes, PruneSegment, PruneCheckpoint, PruneMode, PruneSegment,
}; };
use reth_provider::{ use reth_provider::{
AccountExtReader, DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, AccountExtReader, DatabaseProviderRW, HistoryWriter, PruneCheckpointReader,
@ -19,19 +19,19 @@ pub struct IndexAccountHistoryStage {
/// flow will be returned to the pipeline for commit. /// flow will be returned to the pipeline for commit.
pub commit_threshold: u64, pub commit_threshold: u64,
/// Pruning configuration. /// Pruning configuration.
pub prune_modes: PruneModes, pub prune_mode: Option<PruneMode>,
} }
impl IndexAccountHistoryStage { impl IndexAccountHistoryStage {
/// Create new instance of [IndexAccountHistoryStage]. /// Create new instance of [IndexAccountHistoryStage].
pub fn new(commit_threshold: u64, prune_modes: PruneModes) -> Self { pub fn new(commit_threshold: u64, prune_mode: Option<PruneMode>) -> Self {
Self { commit_threshold, prune_modes } Self { commit_threshold, prune_mode }
} }
} }
impl Default for IndexAccountHistoryStage { impl Default for IndexAccountHistoryStage {
fn default() -> Self { fn default() -> Self {
Self { commit_threshold: 100_000, prune_modes: PruneModes::none() } Self { commit_threshold: 100_000, prune_mode: None }
} }
} }
@ -48,8 +48,11 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
provider: &DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
mut input: ExecInput, mut input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
if let Some((target_prunable_block, prune_mode)) = if let Some((target_prunable_block, prune_mode)) = self
self.prune_modes.prune_target_block_account_history(input.target())? .prune_mode
.map(|mode| mode.prune_target_block(input.target(), PruneSegment::AccountHistory))
.transpose()?
.flatten()
{ {
if target_prunable_block > input.checkpoint().block_number { if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
@ -396,7 +399,7 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn insert_index_with_prune_modes() { async fn insert_index_with_prune_mode() {
// init // init
let tx = TestTransaction::default(); let tx = TestTransaction::default();
@ -426,10 +429,7 @@ mod tests {
// run // run
let input = ExecInput { target: Some(20000), ..Default::default() }; let input = ExecInput { target: Some(20000), ..Default::default() };
let mut stage = IndexAccountHistoryStage { let mut stage = IndexAccountHistoryStage {
prune_modes: PruneModes { prune_mode: Some(PruneMode::Before(36)),
account_history: Some(PruneMode::Before(36)),
..Default::default()
},
..Default::default() ..Default::default()
}; };
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
@ -455,16 +455,12 @@ mod tests {
struct IndexAccountHistoryTestRunner { struct IndexAccountHistoryTestRunner {
pub(crate) tx: TestTransaction, pub(crate) tx: TestTransaction,
commit_threshold: u64, commit_threshold: u64,
prune_modes: PruneModes, prune_mode: Option<PruneMode>,
} }
impl Default for IndexAccountHistoryTestRunner { impl Default for IndexAccountHistoryTestRunner {
fn default() -> Self { fn default() -> Self {
Self { Self { tx: TestTransaction::default(), commit_threshold: 1000, prune_mode: None }
tx: TestTransaction::default(),
commit_threshold: 1000,
prune_modes: PruneModes::none(),
}
} }
} }
@ -476,10 +472,7 @@ mod tests {
} }
fn stage(&self) -> Self::S { fn stage(&self) -> Self::S {
Self::S { Self::S { commit_threshold: self.commit_threshold, prune_mode: self.prune_mode }
commit_threshold: self.commit_threshold,
prune_modes: self.prune_modes.clone(),
}
} }
} }

View File

@ -2,7 +2,7 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}
use reth_db::{database::Database, models::BlockNumberAddress}; use reth_db::{database::Database, models::BlockNumberAddress};
use reth_primitives::{ use reth_primitives::{
stage::{StageCheckpoint, StageId}, stage::{StageCheckpoint, StageId},
PruneCheckpoint, PruneModes, PruneSegment, PruneCheckpoint, PruneMode, PruneSegment,
}; };
use reth_provider::{ use reth_provider::{
DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageReader, DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, StorageReader,
@ -18,19 +18,19 @@ pub struct IndexStorageHistoryStage {
/// flow will be returned to the pipeline for commit. /// flow will be returned to the pipeline for commit.
pub commit_threshold: u64, pub commit_threshold: u64,
/// Pruning configuration. /// Pruning configuration.
pub prune_modes: PruneModes, pub prune_mode: Option<PruneMode>,
} }
impl IndexStorageHistoryStage { impl IndexStorageHistoryStage {
/// Create new instance of [IndexStorageHistoryStage]. /// Create new instance of [IndexStorageHistoryStage].
pub fn new(commit_threshold: u64, prune_modes: PruneModes) -> Self { pub fn new(commit_threshold: u64, prune_mode: Option<PruneMode>) -> Self {
Self { commit_threshold, prune_modes } Self { commit_threshold, prune_mode }
} }
} }
impl Default for IndexStorageHistoryStage { impl Default for IndexStorageHistoryStage {
fn default() -> Self { fn default() -> Self {
Self { commit_threshold: 100_000, prune_modes: PruneModes::none() } Self { commit_threshold: 100_000, prune_mode: None }
} }
} }
@ -47,8 +47,11 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
provider: &DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
mut input: ExecInput, mut input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
if let Some((target_prunable_block, prune_mode)) = if let Some((target_prunable_block, prune_mode)) = self
self.prune_modes.prune_target_block_storage_history(input.target())? .prune_mode
.map(|mode| mode.prune_target_block(input.target(), PruneSegment::StorageHistory))
.transpose()?
.flatten()
{ {
if target_prunable_block > input.checkpoint().block_number { if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
@ -409,7 +412,7 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn insert_index_with_prune_modes() { async fn insert_index_with_prune_mode() {
// init // init
let tx = TestTransaction::default(); let tx = TestTransaction::default();
@ -439,10 +442,7 @@ mod tests {
// run // run
let input = ExecInput { target: Some(20000), ..Default::default() }; let input = ExecInput { target: Some(20000), ..Default::default() };
let mut stage = IndexStorageHistoryStage { let mut stage = IndexStorageHistoryStage {
prune_modes: PruneModes { prune_mode: Some(PruneMode::Before(36)),
storage_history: Some(PruneMode::Before(36)),
..Default::default()
},
..Default::default() ..Default::default()
}; };
let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone()); let factory = ProviderFactory::new(tx.tx.as_ref(), MAINNET.clone());
@ -468,16 +468,12 @@ mod tests {
struct IndexStorageHistoryTestRunner { struct IndexStorageHistoryTestRunner {
pub(crate) tx: TestTransaction, pub(crate) tx: TestTransaction,
commit_threshold: u64, commit_threshold: u64,
prune_modes: PruneModes, prune_mode: Option<PruneMode>,
} }
impl Default for IndexStorageHistoryTestRunner { impl Default for IndexStorageHistoryTestRunner {
fn default() -> Self { fn default() -> Self {
Self { Self { tx: TestTransaction::default(), commit_threshold: 1000, prune_mode: None }
tx: TestTransaction::default(),
commit_threshold: 1000,
prune_modes: PruneModes::none(),
}
} }
} }
@ -489,10 +485,7 @@ mod tests {
} }
fn stage(&self) -> Self::S { fn stage(&self) -> Self::S {
Self::S { Self::S { commit_threshold: self.commit_threshold, prune_mode: self.prune_mode }
commit_threshold: self.commit_threshold,
prune_modes: self.prune_modes.clone(),
}
} }
} }

View File

@ -156,8 +156,10 @@ mod tests {
); );
// Check AccountHistory // Check AccountHistory
let mut acc_indexing_stage = let mut acc_indexing_stage = IndexAccountHistoryStage {
IndexAccountHistoryStage { prune_modes: prune_modes.clone(), ..Default::default() }; prune_mode: prune_modes.account_history,
..Default::default()
};
if let Some(PruneMode::Full) = prune_modes.account_history { if let Some(PruneMode::Full) = prune_modes.account_history {
// Full is not supported // Full is not supported
@ -170,8 +172,10 @@ mod tests {
} }
// Check StorageHistory // Check StorageHistory
let mut storage_indexing_stage = let mut storage_indexing_stage = IndexStorageHistoryStage {
IndexStorageHistoryStage { prune_modes: prune_modes.clone(), ..Default::default() }; prune_mode: prune_modes.storage_history,
..Default::default()
};
if let Some(PruneMode::Full) = prune_modes.storage_history { if let Some(PruneMode::Full) = prune_modes.storage_history {
// Full is not supported // Full is not supported

View File

@ -12,7 +12,7 @@ use reth_interfaces::provider::ProviderError;
use reth_primitives::{ use reth_primitives::{
keccak256, keccak256,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
PruneCheckpoint, PruneModes, PruneSegment, TransactionSignedNoHash, TxNumber, B256, PruneCheckpoint, PruneMode, PruneSegment, TransactionSignedNoHash, TxNumber, B256,
}; };
use reth_provider::{ use reth_provider::{
BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter,
@ -29,19 +29,19 @@ use tracing::*;
pub struct TransactionLookupStage { pub struct TransactionLookupStage {
/// The number of lookup entries to commit at once /// The number of lookup entries to commit at once
commit_threshold: u64, commit_threshold: u64,
prune_modes: PruneModes, prune_mode: Option<PruneMode>,
} }
impl Default for TransactionLookupStage { impl Default for TransactionLookupStage {
fn default() -> Self { fn default() -> Self {
Self { commit_threshold: 5_000_000, prune_modes: PruneModes::none() } Self { commit_threshold: 5_000_000, prune_mode: None }
} }
} }
impl TransactionLookupStage { impl TransactionLookupStage {
/// Create new instance of [TransactionLookupStage]. /// Create new instance of [TransactionLookupStage].
pub fn new(commit_threshold: u64, prune_modes: PruneModes) -> Self { pub fn new(commit_threshold: u64, prune_mode: Option<PruneMode>) -> Self {
Self { commit_threshold, prune_modes } Self { commit_threshold, prune_mode }
} }
} }
@ -58,8 +58,11 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
provider: &DatabaseProviderRW<'_, &DB>, provider: &DatabaseProviderRW<'_, &DB>,
mut input: ExecInput, mut input: ExecInput,
) -> Result<ExecOutput, StageError> { ) -> Result<ExecOutput, StageError> {
if let Some((target_prunable_block, prune_mode)) = if let Some((target_prunable_block, prune_mode)) = self
self.prune_modes.prune_target_block_transaction_lookup(input.target())? .prune_mode
.map(|mode| mode.prune_target_block(input.target(), PruneSegment::TransactionLookup))
.transpose()?
.flatten()
{ {
if target_prunable_block > input.checkpoint().block_number { if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));
@ -382,10 +385,7 @@ mod tests {
random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..2); random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..2);
runner.tx.insert_blocks(seed.iter(), None).expect("failed to seed execution"); runner.tx.insert_blocks(seed.iter(), None).expect("failed to seed execution");
runner.set_prune_modes(PruneModes { runner.set_prune_mode(PruneMode::Before(prune_target));
transaction_lookup: Some(PruneMode::Before(prune_target)),
..Default::default()
});
let rx = runner.execute(input); let rx = runner.execute(input);
@ -469,16 +469,12 @@ mod tests {
struct TransactionLookupTestRunner { struct TransactionLookupTestRunner {
tx: TestTransaction, tx: TestTransaction,
commit_threshold: u64, commit_threshold: u64,
prune_modes: PruneModes, prune_mode: Option<PruneMode>,
} }
impl Default for TransactionLookupTestRunner { impl Default for TransactionLookupTestRunner {
fn default() -> Self { fn default() -> Self {
Self { Self { tx: TestTransaction::default(), commit_threshold: 1000, prune_mode: None }
tx: TestTransaction::default(),
commit_threshold: 1000,
prune_modes: PruneModes::none(),
}
} }
} }
@ -487,8 +483,8 @@ mod tests {
self.commit_threshold = threshold; self.commit_threshold = threshold;
} }
fn set_prune_modes(&mut self, prune_modes: PruneModes) { fn set_prune_mode(&mut self, prune_mode: PruneMode) {
self.prune_modes = prune_modes; self.prune_mode = Some(prune_mode);
} }
/// # Panics /// # Panics
@ -528,7 +524,7 @@ mod tests {
fn stage(&self) -> Self::S { fn stage(&self) -> Self::S {
TransactionLookupStage { TransactionLookupStage {
commit_threshold: self.commit_threshold, commit_threshold: self.commit_threshold,
prune_modes: self.prune_modes.clone(), prune_mode: self.prune_mode,
} }
} }
} }
@ -556,9 +552,13 @@ mod tests {
let provider = self.tx.inner(); let provider = self.tx.inner();
if let Some((target_prunable_block, _)) = self if let Some((target_prunable_block, _)) = self
.prune_modes .prune_mode
.prune_target_block_transaction_lookup(input.target()) .map(|mode| {
mode.prune_target_block(input.target(), PruneSegment::TransactionLookup)
})
.transpose()
.expect("prune target block for transaction lookup") .expect("prune target block for transaction lookup")
.flatten()
{ {
if target_prunable_block > input.checkpoint().block_number { if target_prunable_block > input.checkpoint().block_number {
input.checkpoint = Some(StageCheckpoint::new(target_prunable_block)); input.checkpoint = Some(StageCheckpoint::new(target_prunable_block));