feat(prune): timeout (#6958)

Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Emilia Hane
2024-04-03 12:28:49 +02:00
committed by GitHub
parent 68727699db
commit f71d9c0003
23 changed files with 899 additions and 343 deletions

2
Cargo.lock generated
View File

@ -6759,6 +6759,7 @@ name = "reth-prune"
version = "0.2.0-beta.4"
dependencies = [
"assert_matches",
"derive_more",
"itertools 0.12.1",
"metrics",
"rayon",
@ -6770,6 +6771,7 @@ dependencies = [
"reth-provider",
"reth-stages",
"reth-tokio-util",
"reth-tracing",
"thiserror",
"tokio-stream",
"tracing",

View File

@ -434,6 +434,7 @@ where
5,
self.base_config.chain_spec.prune_delete_limit,
config.max_reorg_depth() as usize,
None,
);
let mut hooks = EngineHooks::new();

View File

@ -1515,7 +1515,7 @@ impl PeerMetadata {
fn new(request_tx: PeerRequestSender, version: EthVersion, client_version: Arc<str>) -> Self {
Self {
seen_transactions: LruCache::new(
NonZeroUsize::new(DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER).expect("infallible"),
NonZeroUsize::new(DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER).unwrap(),
),
request_tx,
version,

View File

@ -677,6 +677,7 @@ where
let mut pruner = PrunerBuilder::new(prune_config.clone())
.max_reorg_depth(tree_config.max_reorg_depth() as usize)
.prune_delete_limit(config.chain.prune_delete_limit)
.timeout(PrunerBuilder::DEFAULT_TIMEOUT)
.build(provider_factory.clone());
let pruner_events = pruner.events();

View File

@ -77,8 +77,9 @@ pub use net::{
};
pub use peer::{id2pk, pk2id, AnyNode, PeerId, WithPeerId};
pub use prune::{
PruneCheckpoint, PruneMode, PruneModes, PruneProgress, PrunePurpose, PruneSegment,
PruneSegmentError, ReceiptsLogPruneConfig, MINIMUM_PRUNING_DISTANCE,
PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneModes, PruneProgress,
PrunePurpose, PruneSegment, PruneSegmentError, ReceiptsLogPruneConfig,
MINIMUM_PRUNING_DISTANCE,
};
pub use receipt::{Receipt, ReceiptWithBloom, ReceiptWithBloomRef, Receipts};
pub use static_file::StaticFileSegment;

View File

@ -93,13 +93,13 @@ impl FromStr for AnyNode {
fn from_str(s: &str) -> Result<Self, Self::Err> {
if let Some(rem) = s.strip_prefix("enode://") {
if let Ok(record) = NodeRecord::from_str(s) {
return Ok(AnyNode::NodeRecord(record));
return Ok(AnyNode::NodeRecord(record))
}
// incomplete enode
if let Ok(peer_id) = PeerId::from_str(rem) {
return Ok(AnyNode::PeerId(peer_id));
return Ok(AnyNode::PeerId(peer_id))
}
return Err(format!("invalid public key: {rem}"));
return Err(format!("invalid public key: {rem}"))
}
if s.starts_with("enr:") {
return Enr::from_str(s).map(AnyNode::Enr)

View File

@ -0,0 +1,122 @@
use std::{
num::NonZeroUsize,
time::{Duration, Instant},
};
/// Limits a pruner run by either the number of entries (rows in the database) that can be deleted
/// or the time it can run.
#[derive(Debug, Clone, Default)]
pub struct PruneLimiter {
/// Maximum entries (rows in the database) to delete from the database per block.
deleted_entries_limit: Option<PruneDeletedEntriesLimit>,
/// Maximum duration of one prune run.
time_limit: Option<PruneTimeLimit>,
}
#[derive(Debug, Clone)]
struct PruneDeletedEntriesLimit {
/// Maximum entries (rows in the database) to delete from the database.
limit: usize,
/// Current number of entries (rows in the database) that have been deleted.
deleted: usize,
}
impl PruneDeletedEntriesLimit {
fn new(limit: usize) -> Self {
Self { limit, deleted: 0 }
}
fn is_limit_reached(&self) -> bool {
self.deleted >= self.limit
}
}
#[derive(Debug, Clone)]
struct PruneTimeLimit {
/// Maximum duration of one prune run.
limit: Duration,
/// Time when the prune run has started.
start: Instant,
}
impl PruneTimeLimit {
fn new(limit: Duration) -> Self {
Self { limit, start: Instant::now() }
}
fn is_limit_reached(&self) -> bool {
self.start.elapsed() > self.limit
}
}
impl PruneLimiter {
/// Sets the limit on the number of deleted entries (rows in the database).
/// If the limit was already set, it will be overwritten.
pub fn set_deleted_entries_limit(mut self, limit: usize) -> Self {
if let Some(deleted_entries_limit) = self.deleted_entries_limit.as_mut() {
deleted_entries_limit.limit = limit;
} else {
self.deleted_entries_limit = Some(PruneDeletedEntriesLimit::new(limit));
}
self
}
/// Sets the limit on the number of deleted entries (rows in the database) to a biggest
/// multiple of the given denominator that is smaller than the existing limit.
///
/// If the limit wasn't set, does nothing.
pub fn floor_deleted_entries_limit_to_multiple_of(mut self, denominator: NonZeroUsize) -> Self {
if let Some(deleted_entries_limit) = self.deleted_entries_limit.as_mut() {
deleted_entries_limit.limit =
(deleted_entries_limit.limit / denominator) * denominator.get();
}
self
}
/// Returns `true` if the limit on the number of deleted entries (rows in the database) is
/// reached.
pub fn is_deleted_entries_limit_reached(&self) -> bool {
self.deleted_entries_limit.as_ref().map_or(false, |limit| limit.is_limit_reached())
}
/// Increments the number of deleted entries by the given number.
pub fn increment_deleted_entries_count_by(&mut self, entries: usize) {
if let Some(limit) = self.deleted_entries_limit.as_mut() {
limit.deleted += entries;
}
}
/// Increments the number of deleted entries by one.
pub fn increment_deleted_entries_count(&mut self) {
self.increment_deleted_entries_count_by(1)
}
/// Returns the number of deleted entries left before the limit is reached.
pub fn deleted_entries_limit_left(&self) -> Option<usize> {
self.deleted_entries_limit.as_ref().map(|limit| limit.limit - limit.deleted)
}
/// Returns the limit on the number of deleted entries (rows in the database).
pub fn deleted_entries_limit(&self) -> Option<usize> {
self.deleted_entries_limit.as_ref().map(|limit| limit.limit)
}
/// Sets the time limit.
pub fn set_time_limit(mut self, limit: Duration) -> Self {
self.time_limit = Some(PruneTimeLimit::new(limit));
self
}
/// Returns `true` if time limit is reached.
pub fn is_time_limit_reached(&self) -> bool {
self.time_limit.as_ref().map_or(false, |limit| limit.is_limit_reached())
}
/// Returns `true` if any limit is reached.
pub fn is_limit_reached(&self) -> bool {
self.is_deleted_entries_limit_reached() || self.is_time_limit_reached()
}
}

View File

@ -1,10 +1,12 @@
mod checkpoint;
mod limiter;
mod mode;
mod segment;
mod target;
use crate::{Address, BlockNumber};
pub use checkpoint::PruneCheckpoint;
pub use limiter::PruneLimiter;
pub use mode::PruneMode;
pub use segment::{PrunePurpose, PruneSegment, PruneSegmentError};
use serde::{Deserialize, Serialize};
@ -91,21 +93,61 @@ impl ReceiptsLogPruneConfig {
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PruneProgress {
/// There is more data to prune.
HasMoreData,
HasMoreData(PruneInterruptReason),
/// Pruning has been finished.
Finished,
}
/// Reason for interrupting a prune run.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum PruneInterruptReason {
/// Prune run timed out.
Timeout,
/// Limit on the number of deleted entries (rows in the database) per prune run was reached.
DeletedEntriesLimitReached,
/// Unknown reason for stopping prune run.
Unknown,
}
impl PruneInterruptReason {
/// Creates new [PruneInterruptReason] based on the [PruneLimiter].
pub fn new(limiter: &PruneLimiter) -> Self {
if limiter.is_time_limit_reached() {
Self::Timeout
} else if limiter.is_deleted_entries_limit_reached() {
Self::DeletedEntriesLimitReached
} else {
Self::Unknown
}
}
/// Returns `true` if the reason is timeout.
pub const fn is_timeout(&self) -> bool {
matches!(self, Self::Timeout)
}
/// Returns `true` if the reason is reaching the limit on deleted entries.
pub const fn is_entries_limit_reached(&self) -> bool {
matches!(self, Self::DeletedEntriesLimitReached)
}
}
impl PruneProgress {
/// Creates new [PruneProgress] from `done` boolean value.
/// Creates new [PruneProgress].
///
/// If `done == true`, returns [PruneProgress::Finished], otherwise [PruneProgress::HasMoreData]
/// is returned.
pub fn from_done(done: bool) -> Self {
/// If `done == true`, returns [PruneProgress::Finished], otherwise
/// [PruneProgress::HasMoreData] is returned with [PruneInterruptReason] according to the passed
/// limiter.
pub fn new(done: bool, limiter: &PruneLimiter) -> Self {
if done {
Self::Finished
} else {
Self::HasMoreData
Self::HasMoreData(PruneInterruptReason::new(limiter))
}
}
/// Returns `true` if prune run is finished.
pub const fn is_finished(&self) -> bool {
matches!(self, Self::Finished)
}
}

View File

@ -35,7 +35,9 @@ tokio-stream.workspace = true
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-stages = { workspace = true, features = ["test-utils"] }
reth-tracing.workspace = true
# misc
derive_more.workspace = true
assert_matches.workspace = true

View File

@ -1,3 +1,5 @@
use std::time::Duration;
use crate::{segments::SegmentSet, Pruner};
use reth_config::PruneConfig;
use reth_db::database::Database;
@ -17,9 +19,14 @@ pub struct PrunerBuilder {
/// the amount of blocks between pruner runs to account for the difference in amount of new
/// data coming in.
pub prune_delete_limit: usize,
/// Time a pruner job can run before timing out.
pub timeout: Option<Duration>,
}
impl PrunerBuilder {
/// Default timeout for a prune run.
pub const DEFAULT_TIMEOUT: Duration = Duration::from_millis(100);
/// Creates a new [PrunerBuilder] from the given [PruneConfig].
pub fn new(pruner_config: PruneConfig) -> Self {
PrunerBuilder::default()
@ -51,6 +58,15 @@ impl PrunerBuilder {
self
}
/// Sets the timeout for pruner, per run.
///
/// CAUTION: Account and Storage History prune segments treat this timeout as a soft limit,
/// meaning they can go beyond it.
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
/// Builds a [Pruner] from the current configuration.
pub fn build<DB: Database>(self, provider_factory: ProviderFactory<DB>) -> Pruner<DB> {
let segments = SegmentSet::<DB>::from_prune_modes(self.segments);
@ -61,6 +77,7 @@ impl PrunerBuilder {
self.block_interval,
self.prune_delete_limit,
self.max_reorg_depth,
self.timeout,
)
}
}
@ -72,6 +89,7 @@ impl Default for PrunerBuilder {
segments: PruneModes::none(),
max_reorg_depth: 64,
prune_delete_limit: MAINNET.prune_delete_limit,
timeout: Some(Self::DEFAULT_TIMEOUT),
}
}
}

View File

@ -7,11 +7,15 @@ use crate::{
};
use reth_db::database::Database;
use reth_primitives::{
BlockNumber, PruneMode, PruneProgress, PrunePurpose, PruneSegment, StaticFileSegment,
BlockNumber, PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment,
StaticFileSegment,
};
use reth_provider::{DatabaseProviderRW, ProviderFactory, PruneCheckpointReader};
use reth_tokio_util::EventListeners;
use std::{collections::BTreeMap, time::Instant};
use std::{
collections::BTreeMap,
time::{Duration, Instant},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::debug;
@ -36,10 +40,13 @@ pub struct Pruner<DB> {
/// conjunction with `min_block_interval` to determine when the pruning needs to be initiated.
previous_tip_block_number: Option<BlockNumber>,
/// Maximum total entries to prune (delete from database) per block.
delete_limit: usize,
delete_limit_per_block: usize,
/// Maximum number of blocks to be pruned per run, as an additional restriction to
/// `previous_tip_block_number`.
prune_max_blocks_per_run: usize,
/// Maximum time for a one pruner run.
timeout: Option<Duration>,
#[doc(hidden)]
metrics: Metrics,
listeners: EventListeners<PrunerEvent>,
}
@ -52,14 +59,16 @@ impl<DB: Database> Pruner<DB> {
min_block_interval: usize,
delete_limit: usize,
prune_max_blocks_per_run: usize,
timeout: Option<Duration>,
) -> Self {
Self {
provider_factory,
segments,
min_block_interval,
previous_tip_block_number: None,
delete_limit,
delete_limit_per_block: delete_limit,
prune_max_blocks_per_run,
timeout,
metrics: Metrics::default(),
listeners: Default::default(),
}
@ -100,11 +109,16 @@ impl<DB: Database> Pruner<DB> {
tip_block_number.saturating_sub(previous_tip_block_number) as usize
})
.min(self.prune_max_blocks_per_run);
let delete_limit = self.delete_limit * blocks_since_last_run;
let mut limiter = PruneLimiter::default()
.set_deleted_entries_limit(self.delete_limit_per_block * blocks_since_last_run);
if let Some(timeout) = self.timeout {
limiter = limiter.set_time_limit(timeout);
};
let provider = self.provider_factory.provider_rw()?;
let (stats, delete_limit, progress) =
self.prune_segments(&provider, tip_block_number, delete_limit)?;
let (stats, deleted_entries, progress) =
self.prune_segments(&provider, tip_block_number, &mut limiter)?;
provider.commit()?;
self.previous_tip_block_number = Some(tip_block_number);
@ -112,14 +126,20 @@ impl<DB: Database> Pruner<DB> {
let elapsed = start.elapsed();
self.metrics.duration_seconds.record(elapsed);
let message = match progress {
PruneProgress::HasMoreData(_) => "Pruner interrupted and has more data to prune",
PruneProgress::Finished => "Pruner finished",
};
debug!(
target: "pruner",
%tip_block_number,
?elapsed,
%delete_limit,
?deleted_entries,
?limiter,
?progress,
?stats,
"Pruner finished"
"{message}",
);
self.listeners.notify(PrunerEvent::Finished { tip_block_number, elapsed, stats });
@ -128,15 +148,15 @@ impl<DB: Database> Pruner<DB> {
}
/// Prunes the segments that the [Pruner] was initialized with, and the segments that needs to
/// be pruned according to the highest static_files.
/// be pruned according to the highest static_files. Segments are parts of the database that
/// represent one or more tables.
///
/// Returns [PrunerStats], `delete_limit` that remained after pruning all segments, and
/// [PruneProgress].
/// Returns [PrunerStats], total number of entries pruned, and [PruneProgress].
fn prune_segments(
&mut self,
provider: &DatabaseProviderRW<DB>,
tip_block_number: BlockNumber,
mut delete_limit: usize,
limiter: &mut PruneLimiter,
) -> Result<(PrunerStats, usize, PruneProgress), PrunerError> {
let static_file_segments = self.static_file_segments();
let segments = static_file_segments
@ -144,11 +164,12 @@ impl<DB: Database> Pruner<DB> {
.map(|segment| (segment, PrunePurpose::StaticFile))
.chain(self.segments.iter().map(|segment| (segment, PrunePurpose::User)));
let mut done = true;
let mut stats = PrunerStats::new();
let mut pruned = 0;
let mut progress = PruneProgress::Finished;
for (segment, purpose) in segments {
if delete_limit == 0 {
if limiter.is_limit_reached() {
break
}
@ -169,8 +190,10 @@ impl<DB: Database> Pruner<DB> {
let segment_start = Instant::now();
let previous_checkpoint = provider.get_prune_checkpoint(segment.segment())?;
let output = segment
.prune(provider, PruneInput { previous_checkpoint, to_block, delete_limit })?;
let output = segment.prune(
provider,
PruneInput { previous_checkpoint, to_block, limiter: limiter.clone() },
)?;
if let Some(checkpoint) = output.checkpoint {
segment
.save_checkpoint(provider, checkpoint.as_prune_checkpoint(prune_mode))?;
@ -185,8 +208,7 @@ impl<DB: Database> Pruner<DB> {
.highest_pruned_block
.set(to_block as f64);
done = done && output.done;
delete_limit = delete_limit.saturating_sub(output.pruned);
progress = output.progress;
debug!(
target: "pruner",
@ -199,17 +221,16 @@ impl<DB: Database> Pruner<DB> {
);
if output.pruned > 0 {
stats.insert(
segment.segment(),
(PruneProgress::from_done(output.done), output.pruned),
);
limiter.increment_deleted_entries_count_by(output.pruned);
pruned += output.pruned;
stats.insert(segment.segment(), (output.progress, output.pruned));
}
} else {
debug!(target: "pruner", segment = ?segment.segment(), ?purpose, "Nothing to prune for the segment");
}
}
Ok((stats, delete_limit, PruneProgress::from_done(done)))
Ok((stats, pruned, progress))
}
/// Returns pre-configured segments that needs to be pruned according to the highest
@ -266,6 +287,7 @@ impl<DB: Database> Pruner<DB> {
#[cfg(test)]
mod tests {
use crate::Pruner;
use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
use reth_primitives::MAINNET;
@ -277,7 +299,7 @@ mod tests {
let (_static_dir, static_dir_path) = create_test_static_files_dir();
let provider_factory = ProviderFactory::new(db, MAINNET.clone(), static_dir_path)
.expect("create provide factory with static_files");
let mut pruner = Pruner::new(provider_factory, vec![], 5, 0, 5);
let mut pruner = Pruner::new(provider_factory, vec![], 5, 0, 5, None);
// No last pruned block number was set before
let first_block_number = 1;

View File

@ -5,10 +5,16 @@ use crate::{
PrunerError,
};
use reth_db::{database::Database, models::ShardedKey, tables};
use reth_primitives::{PruneMode, PruneSegment};
use reth_primitives::{PruneInterruptReason, PruneMode, PruneProgress, PruneSegment};
use reth_provider::DatabaseProviderRW;
use tracing::{instrument, trace};
/// Number of account history tables to prune in one step.
///
/// Account History consists of two tables: [tables::AccountChangeSets] and
/// [tables::AccountsHistory]. We want to prune them to the same block number.
const ACCOUNT_HISTORY_TABLES_TO_PRUNE: usize = 2;
#[derive(Debug)]
pub struct AccountHistory {
mode: PruneMode,
@ -44,11 +50,23 @@ impl<DB: Database> Segment<DB> for AccountHistory {
};
let range_end = *range.end();
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(PruneOutput::not_done(
PruneInterruptReason::new(&limiter),
input.previous_checkpoint.map(|checkpoint| checkpoint.into()),
))
}
let mut last_changeset_pruned_block = None;
let (pruned_changesets, done) = provider
.prune_table_with_range::<tables::AccountChangeSets>(
range,
input.delete_limit / 2,
&mut limiter,
|_| false,
|row| last_changeset_pruned_block = Some(row.0),
)?;
@ -66,10 +84,12 @@ impl<DB: Database> Segment<DB> for AccountHistory {
|a, b| a.key == b.key,
|key| ShardedKey::last(key.key),
)?;
trace!(target: "pruner", %processed, pruned = %pruned_indices, %done, "Pruned account history (history)" );
trace!(target: "pruner", %processed, pruned = %pruned_indices, %done, "Pruned account history (history)");
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
done,
progress,
pruned: pruned_changesets + pruned_indices,
checkpoint: Some(PruneOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
@ -81,14 +101,20 @@ impl<DB: Database> Segment<DB> for AccountHistory {
#[cfg(test)]
mod tests {
use crate::segments::{AccountHistory, PruneInput, PruneOutput, Segment};
use crate::segments::{
account_history::ACCOUNT_HISTORY_TABLES_TO_PRUNE, AccountHistory, PruneInput, PruneOutput,
Segment,
};
use assert_matches::assert_matches;
use reth_db::{tables, BlockNumberList};
use reth_interfaces::test_utils::{
generators,
generators::{random_block_range, random_changeset_range, random_eoa_accounts},
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256};
use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, B256,
};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::{collections::BTreeMap, ops::AddAssign};
@ -129,114 +155,126 @@ mod tests {
let original_shards = db.table::<tables::AccountsHistory>().unwrap();
let test_prune = |to_block: BlockNumber, run: usize, expected_result: (bool, usize)| {
let prune_mode = PruneMode::Before(to_block);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
to_block,
delete_limit: 2000,
};
let segment = AccountHistory::new(prune_mode);
let test_prune =
|to_block: BlockNumber, run: usize, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 2000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
to_block,
limiter: limiter.clone(),
};
let segment = AccountHistory::new(prune_mode);
let provider = db.factory.provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let provider = db.factory.provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().map(move |change| (block_number, change))
})
.collect::<Vec<_>>();
assert_matches!(
result,
PruneOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
#[allow(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _))| {
*i < input.delete_limit / 2 * run && *block_number <= to_block as usize
})
segment
.save_checkpoint(
&provider,
result.checkpoint.unwrap().as_prune_checkpoint(prune_mode),
)
.unwrap();
provider.commit().expect("commit");
let changesets = changesets
.iter()
.enumerate()
.flat_map(|(block_number, changeset)| {
changeset.iter().map(move |change| (block_number, change))
})
.collect::<Vec<_>>();
#[allow(clippy::skip_while_next)]
let pruned = changesets
.iter()
.enumerate()
.skip_while(|(i, (block_number, _))| {
*i < deleted_entries_limit / ACCOUNT_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
.unwrap_or_default();
let mut pruned_changesets = changesets
.iter()
// Skip what we've pruned so far, subtracting one to get last pruned block
// number further down
.skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(i, _)| i)
.unwrap_or_default();
let mut pruned_changesets = changesets
.iter()
// Skip what we've pruned so far, subtracting one to get last pruned block number
// further down
.skip(pruned.saturating_sub(1));
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _)| if result.done {
.map(|(block_number, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
} as BlockNumber)
.unwrap_or(to_block);
let pruned_changesets = pruned_changesets.fold(
BTreeMap::<_, Vec<_>>::new(),
|mut acc, (block_number, change)| {
acc.entry(block_number).or_default().push(change);
acc
},
);
let pruned_changesets = pruned_changesets.fold(
BTreeMap::<_, Vec<_>>::new(),
|mut acc, (block_number, change)| {
acc.entry(block_number).or_default().push(change);
acc
},
);
assert_eq!(
db.table::<tables::AccountChangeSets>().unwrap().len(),
pruned_changesets.values().flatten().count()
);
assert_eq!(
db.table::<tables::AccountChangeSets>().unwrap().len(),
pruned_changesets.values().flatten().count()
);
let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
let actual_shards = db.table::<tables::AccountsHistory>().unwrap();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks = blocks
.iter()
.skip_while(|block| *block <= last_pruned_block_number)
.collect::<Vec<_>>();
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
let expected_shards = original_shards
.iter()
.filter(|(key, _)| key.highest_block_number > last_pruned_block_number)
.map(|(key, blocks)| {
let new_blocks = blocks
.iter()
.skip_while(|block| *block <= last_pruned_block_number)
.collect::<Vec<_>>();
(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))
})
.collect::<Vec<_>>();
assert_eq!(actual_shards, expected_shards);
assert_eq!(actual_shards, expected_shards);
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
assert_eq!(
db.factory
.provider()
.unwrap()
.get_prune_checkpoint(PruneSegment::AccountHistory)
.unwrap(),
Some(PruneCheckpoint {
block_number: Some(last_pruned_block_number),
tx_number: None,
prune_mode
})
);
};
test_prune(998, 1, (false, 1000));
test_prune(998, 2, (true, 998));
test_prune(1400, 3, (true, 804));
test_prune(
998,
1,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 1000),
);
test_prune(998, 2, (PruneProgress::Finished, 998));
test_prune(1400, 3, (PruneProgress::Finished, 804));
}
}

View File

@ -1,15 +1,24 @@
use std::num::NonZeroUsize;
use crate::{
segments::{PruneInput, PruneOutput, PruneOutputCheckpoint, Segment},
PrunerError,
};
use itertools::Itertools;
use reth_db::{database::Database, table::Table, tables};
use reth_interfaces::RethResult;
use reth_primitives::{BlockNumber, PruneMode, PruneSegment};
use reth_db::{
cursor::{DbCursorRO, RangeWalker},
database::Database,
tables,
transaction::DbTxMut,
};
use reth_primitives::{BlockNumber, PruneLimiter, PruneMode, PruneProgress, PruneSegment};
use reth_provider::DatabaseProviderRW;
use std::ops::RangeInclusive;
use tracing::{instrument, trace};
/// Number of header tables to prune in one step
const HEADER_TABLES_TO_PRUNE: usize = 3;
#[derive(Debug)]
pub struct Headers {
mode: PruneMode,
@ -36,90 +45,168 @@ impl<DB: Database> Segment<DB> for Headers {
provider: &DatabaseProviderRW<DB>,
input: PruneInput,
) -> Result<PruneOutput, PrunerError> {
let block_range = match input.get_next_block_range() {
Some(range) => range,
let (block_range_start, block_range_end) = match input.get_next_block_range() {
Some(range) => (*range.start(), *range.end()),
None => {
trace!(target: "pruner", "No headers to prune");
return Ok(PruneOutput::done())
}
};
let delete_limit = input.delete_limit / 3;
if delete_limit == 0 {
// Nothing to do, `input.delete_limit` is less than 3, so we can't prune all
// headers-related tables up to the same height
return Ok(PruneOutput::not_done())
}
let last_pruned_block =
if block_range_start == 0 { None } else { Some(block_range_start - 1) };
let results = [
self.prune_table::<DB, tables::Headers>(provider, block_range.clone(), delete_limit)?,
self.prune_table::<DB, tables::HeaderTerminalDifficulties>(
provider,
block_range.clone(),
delete_limit,
)?,
self.prune_table::<DB, tables::CanonicalHeaders>(provider, block_range, delete_limit)?,
];
let range = last_pruned_block.map_or(0, |block| block + 1)..=block_range_end;
if !results.iter().map(|(_, _, last_pruned_block)| last_pruned_block).all_equal() {
return Err(PrunerError::InconsistentData(
"All headers-related tables should be pruned up to the same height",
))
}
let mut headers_cursor = provider.tx_ref().cursor_write::<tables::Headers>()?;
let mut header_tds_cursor =
provider.tx_ref().cursor_write::<tables::HeaderTerminalDifficulties>()?;
let mut canonical_headers_cursor =
provider.tx_ref().cursor_write::<tables::CanonicalHeaders>()?;
let (done, pruned, last_pruned_block) = results.into_iter().fold(
(true, 0, 0),
|(total_done, total_pruned, _), (done, pruned, last_pruned_block)| {
(total_done && done, total_pruned + pruned, last_pruned_block)
},
let mut limiter = input.limiter.floor_deleted_entries_limit_to_multiple_of(
NonZeroUsize::new(HEADER_TABLES_TO_PRUNE).unwrap(),
);
let tables_iter = HeaderTablesIter::new(
provider,
&mut limiter,
headers_cursor.walk_range(range.clone())?,
header_tds_cursor.walk_range(range.clone())?,
canonical_headers_cursor.walk_range(range)?,
);
let mut last_pruned_block: Option<u64> = None;
let mut pruned = 0;
for res in tables_iter {
let HeaderTablesIterItem { pruned_block, entries_pruned } = res?;
last_pruned_block = Some(pruned_block);
pruned += entries_pruned;
}
let done = last_pruned_block.map_or(false, |block| block == block_range_end);
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
done,
progress,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
block_number: Some(last_pruned_block),
block_number: last_pruned_block,
tx_number: None,
}),
})
}
}
impl Headers {
/// Prune one headers-related table.
///
/// Returns `done`, number of pruned rows and last pruned block number.
fn prune_table<DB: Database, T: Table<Key = BlockNumber>>(
&self,
provider: &DatabaseProviderRW<DB>,
range: RangeInclusive<BlockNumber>,
delete_limit: usize,
) -> RethResult<(bool, usize, BlockNumber)> {
let mut last_pruned_block = *range.end();
let (pruned, done) = provider.prune_table_with_range::<T>(
range,
delete_limit,
|_| false,
|row| last_pruned_block = row.0,
)?;
trace!(target: "pruner", %pruned, %done, table = %T::TABLE, "Pruned headers");
type Walker<'a, DB, T> = RangeWalker<'a, T, <<DB as Database>::TXMut as DbTxMut>::CursorMut<T>>;
Ok((done, pruned, last_pruned_block))
#[allow(missing_debug_implementations)]
struct HeaderTablesIter<'a, DB>
where
DB: Database,
{
provider: &'a DatabaseProviderRW<DB>,
limiter: &'a mut PruneLimiter,
headers_walker: Walker<'a, DB, tables::Headers>,
header_tds_walker: Walker<'a, DB, tables::HeaderTerminalDifficulties>,
canonical_headers_walker: Walker<'a, DB, tables::CanonicalHeaders>,
}
struct HeaderTablesIterItem {
pruned_block: BlockNumber,
entries_pruned: usize,
}
impl<'a, DB> HeaderTablesIter<'a, DB>
where
DB: Database,
{
fn new(
provider: &'a DatabaseProviderRW<DB>,
limiter: &'a mut PruneLimiter,
headers_walker: Walker<'a, DB, tables::Headers>,
header_tds_walker: Walker<'a, DB, tables::HeaderTerminalDifficulties>,
canonical_headers_walker: Walker<'a, DB, tables::CanonicalHeaders>,
) -> Self {
Self { provider, limiter, headers_walker, header_tds_walker, canonical_headers_walker }
}
}
impl<'a, DB> Iterator for HeaderTablesIter<'a, DB>
where
DB: Database,
{
type Item = Result<HeaderTablesIterItem, PrunerError>;
fn next(&mut self) -> Option<Self::Item> {
if self.limiter.is_limit_reached() {
return None
}
let mut pruned_block_headers = None;
let mut pruned_block_td = None;
let mut pruned_block_canonical = None;
if let Err(err) = self.provider.prune_table_with_range_step(
&mut self.headers_walker,
self.limiter,
&mut |_| false,
&mut |row| pruned_block_headers = Some(row.0),
) {
return Some(Err(err.into()))
}
if let Err(err) = self.provider.prune_table_with_range_step(
&mut self.header_tds_walker,
self.limiter,
&mut |_| false,
&mut |row| pruned_block_td = Some(row.0),
) {
return Some(Err(err.into()))
}
if let Err(err) = self.provider.prune_table_with_range_step(
&mut self.canonical_headers_walker,
self.limiter,
&mut |_| false,
&mut |row| pruned_block_canonical = Some(row.0),
) {
return Some(Err(err.into()))
}
if ![pruned_block_headers, pruned_block_td, pruned_block_canonical].iter().all_equal() {
return Some(Err(PrunerError::InconsistentData(
"All headers-related tables should be pruned up to the same height",
)))
}
pruned_block_headers.map(move |block| {
Ok(HeaderTablesIterItem { pruned_block: block, entries_pruned: HEADER_TABLES_TO_PRUNE })
})
}
}
#[cfg(test)]
mod tests {
use crate::segments::{Headers, PruneInput, PruneOutput, Segment};
use assert_matches::assert_matches;
use reth_db::{tables, transaction::DbTx};
use reth_interfaces::test_utils::{generators, generators::random_header_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256, U256};
use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, B256, U256,
};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use tracing::trace;
use crate::segments::{
headers::HEADER_TABLES_TO_PRUNE, Headers, PruneInput, PruneOutput, PruneOutputCheckpoint,
Segment,
};
#[test]
fn prune() {
reth_tracing::init_test_tracing();
let db = TestStageDB::default();
let mut rng = generators::rng();
@ -134,8 +221,10 @@ mod tests {
assert_eq!(db.table::<tables::Headers>().unwrap().len(), headers.len());
assert_eq!(db.table::<tables::HeaderTerminalDifficulties>().unwrap().len(), headers.len());
let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| {
let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let segment = Headers::new(prune_mode);
let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
let input = PruneInput {
previous_checkpoint: db
.factory
@ -144,9 +233,8 @@ mod tests {
.get_prune_checkpoint(PruneSegment::Headers)
.unwrap(),
to_block,
delete_limit: 10,
limiter: limiter.clone(),
};
let segment = Headers::new(prune_mode);
let next_block_number_to_prune = db
.factory
@ -159,11 +247,19 @@ mod tests {
.unwrap_or_default();
let provider = db.factory.provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
let result = segment.prune(&provider, input.clone()).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
trace!(target: "pruner::test",
expected_prune_progress=?expected_result.0,
expected_pruned=?expected_result.1,
result=?result,
"PruneOutput"
);
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
PruneOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
@ -173,8 +269,11 @@ mod tests {
.unwrap();
provider.commit().expect("commit");
let last_pruned_block_number = to_block
.min(next_block_number_to_prune + input.delete_limit as BlockNumber / 3 - 1);
let last_pruned_block_number = to_block.min(
next_block_number_to_prune +
(input.limiter.deleted_entries_limit().unwrap() / HEADER_TABLES_TO_PRUNE - 1)
as u64,
);
assert_eq!(
db.table::<tables::CanonicalHeaders>().unwrap().len(),
@ -198,24 +297,35 @@ mod tests {
);
};
test_prune(3, (false, 9));
test_prune(3, (true, 3));
test_prune(
3,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 9),
);
test_prune(3, (PruneProgress::Finished, 3));
}
#[test]
fn prune_cannot_be_done() {
let db = TestStageDB::default();
let segment = Headers::new(PruneMode::Full);
let limiter = PruneLimiter::default().set_deleted_entries_limit(0);
let input = PruneInput {
previous_checkpoint: None,
to_block: 1,
// Less than total number of tables for `Headers` segment
delete_limit: 2,
limiter,
};
let segment = Headers::new(PruneMode::Full);
let provider = db.factory.provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
assert_eq!(result, PruneOutput::not_done());
assert_eq!(
result,
PruneOutput::not_done(
PruneInterruptReason::DeletedEntriesLimitReached,
Some(PruneOutputCheckpoint::default())
)
);
}
}

View File

@ -1,6 +1,6 @@
mod account_history;
mod headers;
mod history;
pub(super) mod history;
mod receipts;
mod receipts_by_logs;
mod sender_recovery;
@ -23,7 +23,10 @@ pub use transactions::Transactions;
use crate::PrunerError;
use reth_db::database::Database;
use reth_interfaces::{provider::ProviderResult, RethResult};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber};
use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, TxNumber,
};
use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointWriter};
use std::ops::RangeInclusive;
use tracing::error;
@ -60,13 +63,14 @@ pub trait Segment<DB: Database>: Debug + Send + Sync {
}
/// Segment pruning input, see [Segment::prune].
#[derive(Debug, Clone, Copy)]
#[derive(Debug)]
#[cfg_attr(test, derive(Clone))]
pub struct PruneInput {
pub(crate) previous_checkpoint: Option<PruneCheckpoint>,
/// Target block up to which the pruning needs to be done, inclusive.
pub(crate) to_block: BlockNumber,
/// Maximum entries to delete from the database.
pub(crate) delete_limit: usize,
/// Limits pruning of a segment.
pub(crate) limiter: PruneLimiter,
}
impl PruneInput {
@ -125,14 +129,7 @@ impl PruneInput {
///
/// To get the range end: use block `to_block`.
pub(crate) fn get_next_block_range(&self) -> Option<RangeInclusive<BlockNumber>> {
let from_block = self
.previous_checkpoint
.and_then(|checkpoint| checkpoint.block_number)
// Checkpoint exists, prune from the next block after the highest pruned one
.map(|block_number| block_number + 1)
// No checkpoint exists, prune from genesis
.unwrap_or(0);
let from_block = self.get_start_next_block_range();
let range = from_block..=self.to_block;
if range.is_empty() {
return None
@ -140,14 +137,25 @@ impl PruneInput {
Some(range)
}
/// Returns the start of the next block range.
///
/// 1. If checkpoint exists, use next block.
/// 2. If checkpoint doesn't exist, use block 0.
pub(crate) fn get_start_next_block_range(&self) -> u64 {
self.previous_checkpoint
.and_then(|checkpoint| checkpoint.block_number)
// Checkpoint exists, prune from the next block after the highest pruned one
.map(|block_number| block_number + 1)
// No checkpoint exists, prune from genesis
.unwrap_or(0)
}
}
/// Segment pruning output, see [Segment::prune].
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct PruneOutput {
/// `true` if pruning has been completed up to the target block, and `false` if there's more
/// data to prune in further runs.
pub(crate) done: bool,
pub(crate) progress: PruneProgress,
/// Number of entries pruned, i.e. deleted from the database.
pub(crate) pruned: usize,
/// Pruning checkpoint to save to database, if any.
@ -158,17 +166,20 @@ impl PruneOutput {
/// Returns a [PruneOutput] with `done = true`, `pruned = 0` and `checkpoint = None`.
/// Use when no pruning is needed.
pub(crate) const fn done() -> Self {
Self { done: true, pruned: 0, checkpoint: None }
Self { progress: PruneProgress::Finished, pruned: 0, checkpoint: None }
}
/// Returns a [PruneOutput] with `done = false`, `pruned = 0` and `checkpoint = None`.
/// Use when pruning is needed but cannot be done.
pub(crate) const fn not_done() -> Self {
Self { done: false, pruned: 0, checkpoint: None }
pub(crate) const fn not_done(
reason: PruneInterruptReason,
checkpoint: Option<PruneOutputCheckpoint>,
) -> Self {
Self { progress: PruneProgress::HasMoreData(reason), pruned: 0, checkpoint }
}
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub(crate) struct PruneOutputCheckpoint {
/// Highest pruned block number. If it's [None], the pruning for block `0` is not finished yet.
pub(crate) block_number: Option<BlockNumber>,
@ -182,3 +193,9 @@ impl PruneOutputCheckpoint {
PruneCheckpoint { block_number: self.block_number, tx_number: self.tx_number, prune_mode }
}
}
impl From<PruneCheckpoint> for PruneOutputCheckpoint {
fn from(checkpoint: PruneCheckpoint) -> Self {
Self { block_number: checkpoint.block_number, tx_number: checkpoint.tx_number }
}
}

View File

@ -4,7 +4,7 @@ use crate::{
};
use reth_db::{database::Database, tables};
use reth_interfaces::provider::ProviderResult;
use reth_primitives::{PruneCheckpoint, PruneMode, PruneSegment};
use reth_primitives::{PruneCheckpoint, PruneMode, PruneProgress, PruneSegment};
use reth_provider::{DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider};
use tracing::{instrument, trace};
@ -43,10 +43,12 @@ impl<DB: Database> Segment<DB> for Receipts {
};
let tx_range_end = *tx_range.end();
let mut limiter = input.limiter;
let mut last_pruned_transaction = tx_range_end;
let (pruned, done) = provider.prune_table_with_range::<tables::Receipts>(
tx_range,
input.delete_limit,
&mut limiter,
|_| false,
|row| last_pruned_transaction = row.0,
)?;
@ -59,8 +61,10 @@ impl<DB: Database> Segment<DB> for Receipts {
// so we could finish pruning its receipts on the next run.
.checked_sub(if done { 0 } else { 1 });
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
done,
progress,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
block_number: last_pruned_block,
@ -97,7 +101,10 @@ mod tests {
generators,
generators::{random_block_range, random_receipt},
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, TxNumber, B256,
};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::ops::Sub;
@ -128,8 +135,10 @@ mod tests {
db.table::<tables::Receipts>().unwrap().len()
);
let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| {
let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let segment = Receipts::new(prune_mode);
let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
let input = PruneInput {
previous_checkpoint: db
.factory
@ -138,9 +147,8 @@ mod tests {
.get_prune_checkpoint(PruneSegment::Receipts)
.unwrap(),
to_block,
delete_limit: 10,
limiter: limiter.clone(),
};
let segment = Receipts::new(prune_mode);
let next_tx_number_to_prune = db
.factory
@ -157,16 +165,22 @@ mod tests {
.take(to_block as usize)
.map(|block| block.body.len())
.sum::<usize>()
.min(next_tx_number_to_prune as usize + input.delete_limit)
.min(
next_tx_number_to_prune as usize +
input.limiter.deleted_entries_limit().unwrap(),
)
.sub(1);
let provider = db.factory.provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
PruneOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
@ -188,7 +202,7 @@ mod tests {
})
.into_inner()
.0
.checked_sub(if result.done { 0 } else { 1 });
.checked_sub(if result.progress.is_finished() { 0 } else { 1 });
assert_eq!(
db.table::<tables::Receipts>().unwrap().len(),
@ -208,8 +222,11 @@ mod tests {
);
};
test_prune(6, (false, 10));
test_prune(6, (true, 2));
test_prune(10, (true, 8));
test_prune(
6,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
);
test_prune(6, (PruneProgress::Finished, 2));
test_prune(10, (PruneProgress::Finished, 8));
}
}

View File

@ -4,7 +4,7 @@ use crate::{
};
use reth_db::{database::Database, tables};
use reth_primitives::{
PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig,
PruneCheckpoint, PruneMode, PruneProgress, PrunePurpose, PruneSegment, ReceiptsLogPruneConfig,
MINIMUM_PRUNING_DISTANCE,
};
use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointWriter, TransactionsProvider};
@ -113,8 +113,10 @@ impl<DB: Database> Segment<DB> for ReceiptsByLogs {
"Calculated block ranges and filtered addresses",
);
let mut limit = input.delete_limit;
let mut limiter = input.limiter;
let mut done = true;
let mut pruned = 0;
let mut last_pruned_transaction = None;
for (start_block, end_block, num_addresses) in block_ranges {
let block_range = start_block..=end_block;
@ -138,7 +140,7 @@ impl<DB: Database> Segment<DB> for ReceiptsByLogs {
let deleted;
(deleted, done) = provider.prune_table_with_range::<tables::Receipts>(
tx_range,
limit,
&mut limiter,
|(tx_num, receipt)| {
let skip = num_addresses > 0 &&
receipt.logs.iter().any(|log| {
@ -152,9 +154,10 @@ impl<DB: Database> Segment<DB> for ReceiptsByLogs {
},
|row| last_pruned_transaction = Some(row.0),
)?;
trace!(target: "pruner", %deleted, %done, ?block_range, "Pruned receipts");
limit = limit.saturating_sub(deleted);
pruned += deleted;
// For accurate checkpoints we need to know that we have checked every transaction.
// Example: we reached the end of the range, and the last receipt is supposed to skip
@ -172,7 +175,7 @@ impl<DB: Database> Segment<DB> for ReceiptsByLogs {
.saturating_sub(if done { 0 } else { 1 }),
);
if limit == 0 {
if limiter.is_limit_reached() {
done &= end_block == to_block;
break
}
@ -203,7 +206,9 @@ impl<DB: Database> Segment<DB> for ReceiptsByLogs {
},
)?;
Ok(PruneOutput { done, pruned: input.delete_limit - limit, checkpoint: None })
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput { progress, pruned, checkpoint: None })
}
}
@ -216,13 +221,15 @@ mod tests {
generators,
generators::{random_block_range, random_eoa_account, random_log, random_receipt},
};
use reth_primitives::{PruneMode, PruneSegment, ReceiptsLogPruneConfig, B256};
use reth_primitives::{PruneLimiter, PruneMode, PruneSegment, ReceiptsLogPruneConfig, B256};
use reth_provider::{PruneCheckpointReader, TransactionsProvider};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::collections::BTreeMap;
#[test]
fn prune_receipts_by_logs() {
reth_tracing::init_test_tracing();
let db = TestStageDB::default();
let mut rng = generators::rng();
@ -268,6 +275,8 @@ mod tests {
let receipts_log_filter =
ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)]));
let limiter = PruneLimiter::default().set_deleted_entries_limit(10);
let result = ReceiptsByLogs::new(receipts_log_filter).prune(
&provider,
PruneInput {
@ -278,7 +287,7 @@ mod tests {
.get_prune_checkpoint(PruneSegment::ContractLogs)
.unwrap(),
to_block: tip,
delete_limit: 10,
limiter,
},
);
provider.commit().expect("commit");
@ -304,7 +313,7 @@ mod tests {
((pruned_tx + 1) - unprunable) as usize
);
output.done
output.progress.is_finished()
};
while !run_prune() {}

View File

@ -3,7 +3,7 @@ use crate::{
PrunerError,
};
use reth_db::{database::Database, tables};
use reth_primitives::{PruneMode, PruneSegment};
use reth_primitives::{PruneMode, PruneProgress, PruneSegment};
use reth_provider::{DatabaseProviderRW, TransactionsProvider};
use tracing::{instrument, trace};
@ -42,10 +42,12 @@ impl<DB: Database> Segment<DB> for SenderRecovery {
};
let tx_range_end = *tx_range.end();
let mut limiter = input.limiter;
let mut last_pruned_transaction = tx_range_end;
let (pruned, done) = provider.prune_table_with_range::<tables::TransactionSenders>(
tx_range,
input.delete_limit,
&mut limiter,
|_| false,
|row| last_pruned_transaction = row.0,
)?;
@ -58,8 +60,10 @@ impl<DB: Database> Segment<DB> for SenderRecovery {
// previous, so we could finish pruning its transaction senders on the next run.
.checked_sub(if done { 0 } else { 1 });
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
done,
progress,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
block_number: last_pruned_block,
@ -79,7 +83,10 @@ mod tests {
};
use reth_db::tables;
use reth_interfaces::test_utils::{generators, generators::random_block_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneLimiter, PruneMode, PruneProgress, PruneSegment,
TxNumber, B256,
};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::ops::Sub;
@ -113,8 +120,10 @@ mod tests {
db.table::<tables::TransactionSenders>().unwrap().len()
);
let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| {
let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let segment = SenderRecovery::new(prune_mode);
let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
let input = PruneInput {
previous_checkpoint: db
.factory
@ -123,9 +132,8 @@ mod tests {
.get_prune_checkpoint(PruneSegment::SenderRecovery)
.unwrap(),
to_block,
delete_limit: 10,
limiter: limiter.clone(),
};
let segment = SenderRecovery::new(prune_mode);
let next_tx_number_to_prune = db
.factory
@ -142,7 +150,10 @@ mod tests {
.take(to_block as usize)
.map(|block| block.body.len())
.sum::<usize>()
.min(next_tx_number_to_prune as usize + input.delete_limit)
.min(
next_tx_number_to_prune as usize +
input.limiter.deleted_entries_limit().unwrap(),
)
.sub(1);
let last_pruned_block_number = blocks
@ -161,11 +172,14 @@ mod tests {
let provider = db.factory.provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
PruneOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
@ -174,8 +188,8 @@ mod tests {
.unwrap();
provider.commit().expect("commit");
let last_pruned_block_number =
last_pruned_block_number.checked_sub(if result.done { 0 } else { 1 });
let last_pruned_block_number = last_pruned_block_number
.checked_sub(if result.progress.is_finished() { 0 } else { 1 });
assert_eq!(
db.table::<tables::TransactionSenders>().unwrap().len(),
@ -195,8 +209,16 @@ mod tests {
);
};
test_prune(6, (false, 10));
test_prune(6, (true, 2));
test_prune(10, (true, 8));
test_prune(
6,
(
PruneProgress::HasMoreData(
reth_primitives::PruneInterruptReason::DeletedEntriesLimitReached,
),
10,
),
);
test_prune(6, (PruneProgress::Finished, 2));
test_prune(10, (PruneProgress::Finished, 8));
}
}

View File

@ -9,10 +9,16 @@ use reth_db::{
models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress},
tables,
};
use reth_primitives::{PruneMode, PruneSegment};
use reth_primitives::{PruneInterruptReason, PruneMode, PruneProgress, PruneSegment};
use reth_provider::DatabaseProviderRW;
use tracing::{instrument, trace};
/// Number of storage history tables to prune in one step
///
/// Storage History consists of two tables: [tables::StorageChangeSets] and
/// [tables::StoragesHistory]. We want to prune them to the same block number.
const STORAGE_HISTORY_TABLES_TO_PRUNE: usize = 2;
#[derive(Debug)]
pub struct StorageHistory {
mode: PruneMode,
@ -48,11 +54,23 @@ impl<DB: Database> Segment<DB> for StorageHistory {
};
let range_end = *range.end();
let mut limiter = if let Some(limit) = input.limiter.deleted_entries_limit() {
input.limiter.set_deleted_entries_limit(limit / STORAGE_HISTORY_TABLES_TO_PRUNE)
} else {
input.limiter
};
if limiter.is_limit_reached() {
return Ok(PruneOutput::not_done(
PruneInterruptReason::new(&limiter),
input.previous_checkpoint.map(|checkpoint| checkpoint.into()),
))
}
let mut last_changeset_pruned_block = None;
let (pruned_changesets, done) = provider
.prune_table_with_range::<tables::StorageChangeSets>(
BlockNumberAddress::range(range),
input.delete_limit / 2,
&mut limiter,
|_| false,
|row| last_changeset_pruned_block = Some(row.0.block_number()),
)?;
@ -70,10 +88,12 @@ impl<DB: Database> Segment<DB> for StorageHistory {
|a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key,
|key| StorageShardedKey::last(key.address, key.sharded_key.key),
)?;
trace!(target: "pruner", %processed, deleted = %pruned_indices, %done, "Pruned storage history (history)" );
trace!(target: "pruner", %processed, deleted = %pruned_indices, %done, "Pruned storage history (history)");
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
done,
progress,
pruned: pruned_changesets + pruned_indices,
checkpoint: Some(PruneOutputCheckpoint {
block_number: Some(last_changeset_pruned_block),
@ -85,14 +105,19 @@ impl<DB: Database> Segment<DB> for StorageHistory {
#[cfg(test)]
mod tests {
use crate::segments::{PruneInput, PruneOutput, Segment, StorageHistory};
use crate::segments::{
storage_history::STORAGE_HISTORY_TABLES_TO_PRUNE, PruneInput, PruneOutput, Segment,
StorageHistory,
};
use assert_matches::assert_matches;
use reth_db::{tables, BlockNumberList};
use reth_interfaces::test_utils::{
generators,
generators::{random_block_range, random_changeset_range, random_eoa_accounts},
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256};
use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneLimiter, PruneMode, PruneProgress, PruneSegment, B256,
};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::{collections::BTreeMap, ops::AddAssign};
@ -133,8 +158,13 @@ mod tests {
let original_shards = db.table::<tables::StoragesHistory>().unwrap();
let test_prune = |to_block: BlockNumber, run: usize, expected_result: (bool, usize)| {
let test_prune = |to_block: BlockNumber,
run: usize,
expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let deleted_entries_limit = 1000;
let mut limiter =
PruneLimiter::default().set_deleted_entries_limit(deleted_entries_limit);
let input = PruneInput {
previous_checkpoint: db
.factory
@ -143,17 +173,20 @@ mod tests {
.get_prune_checkpoint(PruneSegment::StorageHistory)
.unwrap(),
to_block,
delete_limit: 1000,
limiter: limiter.clone(),
};
let segment = StorageHistory::new(prune_mode);
let provider = db.factory.provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
PruneOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
@ -177,7 +210,8 @@ mod tests {
.iter()
.enumerate()
.skip_while(|(i, (block_number, _, _))| {
*i < input.delete_limit / 2 * run && *block_number <= to_block as usize
*i < deleted_entries_limit / STORAGE_HISTORY_TABLES_TO_PRUNE * run &&
*block_number <= to_block as usize
})
.next()
.map(|(i, _)| i)
@ -191,7 +225,7 @@ mod tests {
let last_pruned_block_number = pruned_changesets
.next()
.map(|(block_number, _, _)| if result.done {
.map(|(block_number, _, _)| if result.progress.is_finished() {
*block_number
} else {
block_number.saturating_sub(1)
@ -241,8 +275,17 @@ mod tests {
);
};
test_prune(998, 1, (false, 500));
test_prune(998, 2, (true, 499));
test_prune(1200, 3, (true, 202));
test_prune(
998,
1,
(
PruneProgress::HasMoreData(
reth_primitives::PruneInterruptReason::DeletedEntriesLimitReached,
),
500,
),
);
test_prune(998, 2, (PruneProgress::Finished, 499));
test_prune(1200, 3, (PruneProgress::Finished, 202));
}
}

View File

@ -4,7 +4,7 @@ use crate::{
};
use rayon::prelude::*;
use reth_db::{database::Database, tables};
use reth_primitives::{PruneMode, PruneSegment};
use reth_primitives::{PruneMode, PruneProgress, PruneSegment};
use reth_provider::{DatabaseProviderRW, TransactionsProvider};
use tracing::{instrument, trace};
@ -42,7 +42,10 @@ impl<DB: Database> Segment<DB> for TransactionLookup {
}
}
.into_inner();
let tx_range = start..=end.min(start + input.delete_limit as u64 - 1);
let tx_range = start..=
Some(end)
.min(input.limiter.deleted_entries_limit_left().map(|left| start + left as u64 - 1))
.unwrap();
let tx_range_end = *tx_range.end();
// Retrieve transactions in the range and calculate their hashes in parallel
@ -60,15 +63,18 @@ impl<DB: Database> Segment<DB> for TransactionLookup {
))
}
let mut limiter = input.limiter;
let mut last_pruned_transaction = None;
let (pruned, _) = provider.prune_table_with_iterator::<tables::TransactionHashNumbers>(
let (pruned, done) = provider.prune_table_with_iterator::<tables::TransactionHashNumbers>(
hashes,
input.delete_limit,
&mut limiter,
|row| {
last_pruned_transaction = Some(last_pruned_transaction.unwrap_or(row.1).max(row.1))
},
)?;
let done = tx_range_end == end;
let done = done && tx_range_end == end;
trace!(target: "pruner", %pruned, %done, "Pruned transaction lookup");
let last_pruned_transaction = last_pruned_transaction.unwrap_or(tx_range_end);
@ -81,8 +87,10 @@ impl<DB: Database> Segment<DB> for TransactionLookup {
// run.
.checked_sub(if done { 0 } else { 1 });
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
done,
progress,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
block_number: last_pruned_block,
@ -102,7 +110,10 @@ mod tests {
};
use reth_db::tables;
use reth_interfaces::test_utils::{generators, generators::random_block_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, TxNumber, B256,
};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::ops::Sub;
@ -132,8 +143,10 @@ mod tests {
db.table::<tables::TransactionHashNumbers>().unwrap().len()
);
let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| {
let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let segment = TransactionLookup::new(prune_mode);
let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
let input = PruneInput {
previous_checkpoint: db
.factory
@ -142,9 +155,8 @@ mod tests {
.get_prune_checkpoint(PruneSegment::TransactionLookup)
.unwrap(),
to_block,
delete_limit: 10,
limiter: limiter.clone(),
};
let segment = TransactionLookup::new(prune_mode);
let next_tx_number_to_prune = db
.factory
@ -161,7 +173,10 @@ mod tests {
.take(to_block as usize)
.map(|block| block.body.len())
.sum::<usize>()
.min(next_tx_number_to_prune as usize + input.delete_limit)
.min(
next_tx_number_to_prune as usize +
input.limiter.deleted_entries_limit().unwrap(),
)
.sub(1);
let last_pruned_block_number = blocks
@ -180,11 +195,14 @@ mod tests {
let provider = db.factory.provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
PruneOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
@ -193,8 +211,8 @@ mod tests {
.unwrap();
provider.commit().expect("commit");
let last_pruned_block_number =
last_pruned_block_number.checked_sub(if result.done { 0 } else { 1 });
let last_pruned_block_number = last_pruned_block_number
.checked_sub(if result.progress.is_finished() { 0 } else { 1 });
assert_eq!(
db.table::<tables::TransactionHashNumbers>().unwrap().len(),
@ -214,8 +232,11 @@ mod tests {
);
};
test_prune(6, (false, 10));
test_prune(6, (true, 2));
test_prune(10, (true, 8));
test_prune(
6,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
);
test_prune(6, (PruneProgress::Finished, 2));
test_prune(10, (PruneProgress::Finished, 8));
}
}

View File

@ -3,7 +3,7 @@ use crate::{
PrunerError,
};
use reth_db::{database::Database, tables};
use reth_primitives::{PruneMode, PruneSegment};
use reth_primitives::{PruneMode, PruneProgress, PruneSegment};
use reth_provider::{DatabaseProviderRW, TransactionsProvider};
use tracing::{instrument, trace};
@ -41,10 +41,12 @@ impl<DB: Database> Segment<DB> for Transactions {
}
};
let mut limiter = input.limiter;
let mut last_pruned_transaction = *tx_range.end();
let (pruned, done) = provider.prune_table_with_range::<tables::Transactions>(
tx_range,
input.delete_limit,
&mut limiter,
|_| false,
|row| last_pruned_transaction = row.0,
)?;
@ -57,8 +59,10 @@ impl<DB: Database> Segment<DB> for Transactions {
// so we could finish pruning its transactions on the next run.
.checked_sub(if done { 0 } else { 1 });
let progress = PruneProgress::new(done, &limiter);
Ok(PruneOutput {
done,
progress,
pruned,
checkpoint: Some(PruneOutputCheckpoint {
block_number: last_pruned_block,
@ -78,7 +82,10 @@ mod tests {
};
use reth_db::tables;
use reth_interfaces::test_utils::{generators, generators::random_block_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_primitives::{
BlockNumber, PruneCheckpoint, PruneInterruptReason, PruneLimiter, PruneMode, PruneProgress,
PruneSegment, TxNumber, B256,
};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::ops::Sub;
@ -95,8 +102,10 @@ mod tests {
assert_eq!(db.table::<tables::Transactions>().unwrap().len(), transactions.len());
let test_prune = |to_block: BlockNumber, expected_result: (bool, usize)| {
let test_prune = |to_block: BlockNumber, expected_result: (PruneProgress, usize)| {
let prune_mode = PruneMode::Before(to_block);
let segment = Transactions::new(prune_mode);
let mut limiter = PruneLimiter::default().set_deleted_entries_limit(10);
let input = PruneInput {
previous_checkpoint: db
.factory
@ -105,9 +114,8 @@ mod tests {
.get_prune_checkpoint(PruneSegment::Transactions)
.unwrap(),
to_block,
delete_limit: 10,
limiter: limiter.clone(),
};
let segment = Transactions::new(prune_mode);
let next_tx_number_to_prune = db
.factory
@ -120,12 +128,15 @@ mod tests {
.unwrap_or_default();
let provider = db.factory.provider_rw().unwrap();
let result = segment.prune(&provider, input).unwrap();
let result = segment.prune(&provider, input.clone()).unwrap();
limiter.increment_deleted_entries_count_by(result.pruned);
assert_matches!(
result,
PruneOutput {done, pruned, checkpoint: Some(_)}
if (done, pruned) == expected_result
PruneOutput {progress, pruned, checkpoint: Some(_)}
if (progress, pruned) == expected_result
);
segment
.save_checkpoint(
&provider,
@ -139,7 +150,10 @@ mod tests {
.take(to_block as usize)
.map(|block| block.body.len())
.sum::<usize>()
.min(next_tx_number_to_prune as usize + input.delete_limit)
.min(
next_tx_number_to_prune as usize +
input.limiter.deleted_entries_limit().unwrap(),
)
.sub(1);
let last_pruned_block_number = blocks
@ -155,7 +169,7 @@ mod tests {
})
.into_inner()
.0
.checked_sub(if result.done { 0 } else { 1 });
.checked_sub(if result.progress.is_finished() { 0 } else { 1 });
assert_eq!(
db.table::<tables::Transactions>().unwrap().len(),
@ -175,7 +189,10 @@ mod tests {
);
};
test_prune(6, (false, 10));
test_prune(6, (true, 2));
test_prune(
6,
(PruneProgress::HasMoreData(PruneInterruptReason::DeletedEntriesLimitReached), 10),
);
test_prune(6, (PruneProgress::Finished, 2));
}
}

View File

@ -81,7 +81,7 @@ where
) -> EthResult<Vec<TraceResult>> {
if transactions.is_empty() {
// nothing to trace
return Ok(Vec::new());
return Ok(Vec::new())
}
// replay all transactions of the block

View File

@ -1097,7 +1097,7 @@ where
if block.body.is_empty() {
// nothing to trace
return Ok(Some(Vec::new()));
return Ok(Some(Vec::new()))
}
// replay all transactions of the block

View File

@ -15,7 +15,7 @@ use crate::{
use itertools::{izip, Itertools};
use reth_db::{
common::KeyValue,
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, RangeWalker},
database::Database,
models::{
sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress,
@ -38,10 +38,10 @@ use reth_primitives::{
stage::{StageCheckpoint, StageId},
trie::Nibbles,
Account, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders,
ChainInfo, ChainSpec, GotExpected, Head, Header, PruneCheckpoint, PruneModes, PruneSegment,
Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, StorageEntry,
TransactionMeta, TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash,
TxHash, TxNumber, Withdrawal, Withdrawals, B256, U256,
ChainInfo, ChainSpec, GotExpected, Head, Header, PruneCheckpoint, PruneLimiter, PruneModes,
PruneSegment, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment,
StorageEntry, TransactionMeta, TransactionSigned, TransactionSignedEcRecovered,
TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, Withdrawals, B256, U256,
};
use reth_trie::{
prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
@ -850,30 +850,38 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
pub fn prune_table_with_iterator<T: Table>(
&self,
keys: impl IntoIterator<Item = T::Key>,
limit: usize,
limiter: &mut PruneLimiter,
mut delete_callback: impl FnMut(TableRow<T>),
) -> Result<(usize, bool), DatabaseError> {
let mut cursor = self.tx.cursor_write::<T>()?;
let mut deleted = 0;
let mut keys = keys.into_iter();
if limit != 0 {
for key in &mut keys {
let row = cursor.seek_exact(key.clone())?;
if let Some(row) = row {
cursor.delete_current()?;
deleted += 1;
delete_callback(row);
}
let mut deleted_entries = 0;
if deleted == limit {
break
}
for key in &mut keys {
if limiter.is_limit_reached() {
debug!(
target: "providers::db",
?limiter,
deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
time_limit = %limiter.is_time_limit_reached(),
table = %T::NAME,
"Pruning limit reached"
);
break
}
let row = cursor.seek_exact(key)?;
if let Some(row) = row {
cursor.delete_current()?;
limiter.increment_deleted_entries_count();
deleted_entries += 1;
delete_callback(row);
}
}
Ok((deleted, keys.next().is_none()))
let done = keys.next().is_none();
Ok((deleted_entries, done))
}
/// Prune the table for the specified key range.
@ -882,29 +890,72 @@ impl<TX: DbTxMut + DbTx> DatabaseProvider<TX> {
pub fn prune_table_with_range<T: Table>(
&self,
keys: impl RangeBounds<T::Key> + Clone + Debug,
limit: usize,
limiter: &mut PruneLimiter,
mut skip_filter: impl FnMut(&TableRow<T>) -> bool,
mut delete_callback: impl FnMut(TableRow<T>),
) -> Result<(usize, bool), DatabaseError> {
let mut cursor = self.tx.cursor_write::<T>()?;
let mut walker = cursor.walk_range(keys)?;
let mut deleted = 0;
if limit != 0 {
while let Some(row) = walker.next().transpose()? {
if !skip_filter(&row) {
walker.delete_current()?;
deleted += 1;
delete_callback(row);
}
let mut deleted_entries = 0;
if deleted == limit {
break
}
let done = loop {
// check for time out must be done in this scope since it's not done in
// `prune_table_with_range_step`
if limiter.is_limit_reached() {
debug!(
target: "providers::db",
?limiter,
deleted_entries_limit = %limiter.is_deleted_entries_limit_reached(),
time_limit = %limiter.is_time_limit_reached(),
table = %T::NAME,
"Pruning limit reached"
);
break false
}
let done = self.prune_table_with_range_step(
&mut walker,
limiter,
&mut skip_filter,
&mut delete_callback,
)?;
if done {
break true
} else {
deleted_entries += 1;
}
};
Ok((deleted_entries, done))
}
/// Steps once with the given walker and prunes the entry in the table.
///
/// Returns `true` if the walker is finished, `false` if it may have more data to prune.
///
/// CAUTION: Pruner limits are not checked. This allows for a clean exit of a prune run that's
/// pruning different tables concurrently, by letting them step to the same height before
/// timing out.
pub fn prune_table_with_range_step<T: Table>(
&self,
walker: &mut RangeWalker<'_, T, <TX as DbTxMut>::CursorMut<T>>,
limiter: &mut PruneLimiter,
skip_filter: &mut impl FnMut(&TableRow<T>) -> bool,
delete_callback: &mut impl FnMut(TableRow<T>),
) -> Result<bool, DatabaseError> {
let Some(res) = walker.next() else { return Ok(true) };
let row = res?;
if !skip_filter(&row) {
walker.delete_current()?;
limiter.increment_deleted_entries_count();
delete_callback(row);
}
Ok((deleted, walker.next().transpose()?.is_none()))
Ok(false)
}
/// Load shard and remove it. If list is empty, last shard was full or