refactor(prune-types/prune): move PruneLimiter to reth-prune (#13182)

This commit is contained in:
Léa Narzis
2024-12-06 21:48:52 +01:00
committed by GitHub
parent a0326e4f86
commit 2183752f8d
15 changed files with 73 additions and 82 deletions

View File

@ -1,12 +1,12 @@
use std::{fmt::Debug, ops::RangeBounds};
use crate::PruneLimiter;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, RangeWalker},
table::{Table, TableRow},
transaction::DbTxMut,
DatabaseError,
};
use reth_prune_types::PruneLimiter;
use tracing::debug;
pub(crate) trait DbTxPruneExt: DbTxMut {

View File

@ -12,6 +12,7 @@
mod builder;
mod db_ext;
mod error;
mod limiter;
mod metrics;
mod pruner;
pub mod segments;
@ -19,6 +20,7 @@ pub mod segments;
use crate::metrics::Metrics;
pub use builder::PrunerBuilder;
pub use error::PrunerError;
pub use limiter::PruneLimiter;
pub use pruner::{Pruner, PrunerResult, PrunerWithFactory, PrunerWithResult};
// Re-export prune types

View File

@ -1,3 +1,4 @@
use reth_prune_types::{PruneInterruptReason, PruneProgress};
use std::{
num::NonZeroUsize,
time::{Duration, Instant},
@ -119,6 +120,30 @@ impl PruneLimiter {
pub fn is_limit_reached(&self) -> bool {
self.is_deleted_entries_limit_reached() || self.is_time_limit_reached()
}
/// Creates new [`PruneInterruptReason`] based on the limiter's state.
pub fn interrupt_reason(&self) -> PruneInterruptReason {
if self.is_time_limit_reached() {
PruneInterruptReason::Timeout
} else if self.is_deleted_entries_limit_reached() {
PruneInterruptReason::DeletedEntriesLimitReached
} else {
PruneInterruptReason::Unknown
}
}
/// Creates new [`PruneProgress`].
///
/// If `done == true`, returns [`PruneProgress::Finished`], otherwise
/// [`PruneProgress::HasMoreData`] is returned with [`PruneInterruptReason`] according to the
/// limiter's state.
pub fn progress(&self, done: bool) -> PruneProgress {
if done {
PruneProgress::Finished
} else {
PruneProgress::HasMoreData(self.interrupt_reason())
}
}
}
#[cfg(test)]

View File

@ -2,14 +2,14 @@
use crate::{
segments::{PruneInput, Segment},
Metrics, PrunerError, PrunerEvent,
Metrics, PruneLimiter, PrunerError, PrunerEvent,
};
use alloy_primitives::BlockNumber;
use reth_exex_types::FinishedExExHeight;
use reth_provider::{
DBProvider, DatabaseProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
};
use reth_prune_types::{PruneLimiter, PruneProgress, PrunedSegmentInfo, PrunerOutput};
use reth_prune_types::{PruneProgress, PrunedSegmentInfo, PrunerOutput};
use reth_tokio_util::{EventSender, EventStream};
use std::time::{Duration, Instant};
use tokio::sync::watch;

View File

@ -3,12 +3,10 @@ mod set;
mod static_file;
mod user;
use crate::PrunerError;
use crate::{PruneLimiter, PrunerError};
use alloy_primitives::{BlockNumber, TxNumber};
use reth_provider::{errors::provider::ProviderResult, BlockReader, PruneCheckpointWriter};
use reth_prune_types::{
PruneCheckpoint, PruneLimiter, PruneMode, PrunePurpose, PruneSegment, SegmentOutput,
};
use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput};
pub use set::SegmentSet;
pub use static_file::{
Headers as StaticFileHeaders, Receipts as StaticFileReceipts,

View File

@ -12,9 +12,7 @@ use reth_provider::{
errors::provider::ProviderResult, BlockReader, DBProvider, NodePrimitivesProvider,
PruneCheckpointWriter, TransactionsProvider,
};
use reth_prune_types::{
PruneCheckpoint, PruneProgress, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_prune_types::{PruneCheckpoint, PruneSegment, SegmentOutput, SegmentOutputCheckpoint};
use tracing::trace;
pub(crate) fn prune<Provider>(
@ -56,7 +54,7 @@ where
// so we could finish pruning its receipts on the next run.
.checked_sub(if done { 0 } else { 1 });
let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
@ -83,7 +81,7 @@ pub(crate) fn save_checkpoint(
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, SegmentOutput};
use crate::segments::{PruneInput, PruneLimiter, SegmentOutput};
use alloy_primitives::{BlockNumber, TxNumber, B256};
use assert_matches::assert_matches;
use itertools::{
@ -93,7 +91,7 @@ mod tests {
use reth_db::tables;
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment,
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{

View File

@ -3,7 +3,7 @@ use std::num::NonZeroUsize;
use crate::{
db_ext::DbTxPruneExt,
segments::{PruneInput, Segment},
PrunerError,
PruneLimiter, PrunerError,
};
use alloy_primitives::BlockNumber;
use itertools::Itertools;
@ -14,8 +14,7 @@ use reth_db::{
};
use reth_provider::{providers::StaticFileProvider, DBProvider, StaticFileProviderFactory};
use reth_prune_types::{
PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
SegmentOutputCheckpoint,
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use tracing::trace;
@ -92,7 +91,7 @@ impl<Provider: StaticFileProviderFactory + DBProvider<Tx: DbTxMut>> Segment<Prov
}
let done = last_pruned_block == Some(block_range_end);
let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
@ -195,7 +194,8 @@ where
#[cfg(test)]
mod tests {
use crate::segments::{
static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, Segment, SegmentOutput,
static_file::headers::HEADER_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
SegmentOutput,
};
use alloy_primitives::{BlockNumber, B256, U256};
use assert_matches::assert_matches;
@ -206,8 +206,8 @@ mod tests {
StaticFileProviderFactory,
};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, SegmentOutputCheckpoint,
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
SegmentOutputCheckpoint,
};
use reth_stages::test_utils::TestStageDB;
use reth_testing_utils::{generators, generators::random_header_range};

View File

@ -10,7 +10,7 @@ use reth_provider::{
TransactionsProvider,
};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_static_file_types::StaticFileSegment;
use tracing::trace;
@ -76,7 +76,7 @@ where
// so we could finish pruning its transactions on the next run.
.checked_sub(if done { 0 } else { 1 });
let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
@ -91,7 +91,7 @@ where
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, Segment};
use crate::segments::{PruneInput, PruneLimiter, Segment};
use alloy_primitives::{BlockNumber, TxNumber, B256};
use assert_matches::assert_matches;
use itertools::{
@ -104,8 +104,8 @@ mod tests {
StaticFileProviderFactory,
};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, SegmentOutput,
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
SegmentOutput,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};

View File

@ -8,8 +8,7 @@ use reth_db::{tables, transaction::DbTxMut};
use reth_db_api::models::ShardedKey;
use reth_provider::DBProvider;
use reth_prune_types::{
PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput,
SegmentOutputCheckpoint,
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
@ -65,7 +64,7 @@ where
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
PruneInterruptReason::new(&limiter),
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
@ -113,7 +112,7 @@ where
)?;
trace!(target: "pruner", ?outcomes, %done, "Pruned account history (indices)");
let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
@ -130,14 +129,14 @@ where
mod tests {
use crate::segments::{
user::account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput,
Segment, SegmentOutput,
PruneLimiter, Segment, SegmentOutput,
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db::{tables, BlockNumberList};
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment,
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{

View File

@ -10,8 +10,8 @@ use reth_provider::{
BlockReader, DBProvider, NodePrimitivesProvider, PruneCheckpointWriter, TransactionsProvider,
};
use reth_prune_types::{
PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig,
SegmentOutput, MINIMUM_PRUNING_DISTANCE,
PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig, SegmentOutput,
MINIMUM_PRUNING_DISTANCE,
};
use tracing::{instrument, trace};
#[derive(Debug)]
@ -219,7 +219,7 @@ where
},
)?;
let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);
Ok(SegmentOutput { progress, pruned, checkpoint: None })
}
@ -227,14 +227,14 @@ where
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, ReceiptsByLogs, Segment};
use crate::segments::{PruneInput, PruneLimiter, ReceiptsByLogs, Segment};
use alloy_primitives::B256;
use assert_matches::assert_matches;
use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_primitives_traits::InMemorySize;
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader, TransactionsProvider};
use reth_prune_types::{PruneLimiter, PruneMode, PruneSegment, ReceiptsLogPruneConfig};
use reth_prune_types::{PruneMode, PruneSegment, ReceiptsLogPruneConfig};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{
self, random_block_range, random_eoa_account, random_log, random_receipt, BlockRangeParams,

View File

@ -6,7 +6,7 @@ use crate::{
use reth_db::{tables, transaction::DbTxMut};
use reth_provider::{BlockReader, DBProvider, TransactionsProvider};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
PruneMode, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint,
};
use tracing::{instrument, trace};
@ -67,7 +67,7 @@ where
// previous, so we could finish pruning its transaction senders on the next run.
.checked_sub(if done { 0 } else { 1 });
let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
@ -82,7 +82,7 @@ where
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, Segment, SegmentOutput, SenderRecovery};
use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, SenderRecovery};
use alloy_primitives::{BlockNumber, TxNumber, B256};
use assert_matches::assert_matches;
use itertools::{
@ -92,7 +92,7 @@ mod tests {
use reth_db::tables;
use reth_primitives_traits::SignedTransaction;
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{PruneCheckpoint, PruneLimiter, PruneMode, PruneProgress, PruneSegment};
use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};
use std::ops::Sub;

View File

@ -7,10 +7,7 @@ use itertools::Itertools;
use reth_db::{tables, transaction::DbTxMut};
use reth_db_api::models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress};
use reth_provider::DBProvider;
use reth_prune_types::{
PruneInterruptReason, PruneMode, PruneProgress, PrunePurpose, PruneSegment,
SegmentOutputCheckpoint,
};
use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
use rustc_hash::FxHashMap;
use tracing::{instrument, trace};
@ -65,7 +62,7 @@ where
};
if limiter.is_limit_reached() {
return Ok(SegmentOutput::not_done(
PruneInterruptReason::new(&limiter),
limiter.interrupt_reason(),
input.previous_checkpoint.map(SegmentOutputCheckpoint::from_prune_checkpoint),
))
}
@ -118,7 +115,7 @@ where
)?;
trace!(target: "pruner", ?outcomes, %done, "Pruned storage history (indices)");
let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
@ -134,14 +131,14 @@ where
#[cfg(test)]
mod tests {
use crate::segments::{
user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, Segment, SegmentOutput,
StorageHistory,
user::storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneLimiter, Segment,
SegmentOutput, StorageHistory,
};
use alloy_primitives::{BlockNumber, B256};
use assert_matches::assert_matches;
use reth_db::{tables, BlockNumberList};
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{PruneCheckpoint, PruneLimiter, PruneMode, PruneProgress, PruneSegment};
use reth_prune_types::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{
self, random_block_range, random_changeset_range, random_eoa_accounts, BlockRangeParams,

View File

@ -7,9 +7,7 @@ use alloy_eips::eip2718::Encodable2718;
use rayon::prelude::*;
use reth_db::{tables, transaction::DbTxMut};
use reth_provider::{BlockReader, DBProvider};
use reth_prune_types::{
PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutputCheckpoint,
};
use reth_prune_types::{PruneMode, PrunePurpose, PruneSegment, SegmentOutputCheckpoint};
use tracing::{instrument, trace};
#[derive(Debug)]
@ -96,7 +94,7 @@ where
// run.
.checked_sub(if done { 0 } else { 1 });
let progress = PruneProgress::new(done, &limiter);
let progress = limiter.progress(done);
Ok(SegmentOutput {
progress,
@ -111,7 +109,7 @@ where
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, Segment, SegmentOutput, TransactionLookup};
use crate::segments::{PruneInput, PruneLimiter, Segment, SegmentOutput, TransactionLookup};
use alloy_primitives::{BlockNumber, TxNumber, B256};
use assert_matches::assert_matches;
use itertools::{
@ -121,7 +119,7 @@ mod tests {
use reth_db::tables;
use reth_provider::{DatabaseProviderFactory, PruneCheckpointReader};
use reth_prune_types::{
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress, PruneSegment,
PruneCheckpoint, PruneInterruptReason, PruneMode, PruneProgress, PruneSegment,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use reth_testing_utils::generators::{self, random_block_range, BlockRangeParams};

View File

@ -10,7 +10,6 @@
mod checkpoint;
mod event;
mod limiter;
mod mode;
mod pruner;
mod segment;
@ -18,7 +17,6 @@ mod target;
pub use checkpoint::PruneCheckpoint;
pub use event::PrunerEvent;
pub use limiter::PruneLimiter;
pub use mode::PruneMode;
pub use pruner::{
PruneInterruptReason, PruneProgress, PrunedSegmentInfo, PrunerOutput, SegmentOutput,

View File

@ -1,4 +1,4 @@
use crate::{PruneCheckpoint, PruneLimiter, PruneMode, PruneSegment};
use crate::{PruneCheckpoint, PruneMode, PruneSegment};
use alloy_primitives::{BlockNumber, TxNumber};
use derive_more::Display;
@ -101,17 +101,6 @@ pub enum PruneInterruptReason {
}
impl PruneInterruptReason {
/// Creates new [`PruneInterruptReason`] based on the [`PruneLimiter`].
pub fn new(limiter: &PruneLimiter) -> Self {
if limiter.is_time_limit_reached() {
Self::Timeout
} else if limiter.is_deleted_entries_limit_reached() {
Self::DeletedEntriesLimitReached
} else {
Self::Unknown
}
}
/// Returns `true` if the reason is timeout.
pub const fn is_timeout(&self) -> bool {
matches!(self, Self::Timeout)
@ -124,19 +113,6 @@ impl PruneInterruptReason {
}
impl PruneProgress {
/// Creates new [`PruneProgress`].
///
/// If `done == true`, returns [`PruneProgress::Finished`], otherwise
/// [`PruneProgress::HasMoreData`] is returned with [`PruneInterruptReason`] according to the
/// passed limiter.
pub fn new(done: bool, limiter: &PruneLimiter) -> Self {
if done {
Self::Finished
} else {
Self::HasMoreData(PruneInterruptReason::new(limiter))
}
}
/// Returns `true` if prune run is finished.
pub const fn is_finished(&self) -> bool {
matches!(self, Self::Finished)