feat: introduce PrunerBuilder (#5834)

This commit is contained in:
Dan Cline
2023-12-20 19:43:48 +02:00
committed by GitHub
parent f97b3fc175
commit ad3b893c23
7 changed files with 127 additions and 57 deletions

1
Cargo.lock generated
View File

@ -6308,6 +6308,7 @@ dependencies = [
"itertools 0.11.0",
"metrics",
"rayon",
"reth-config",
"reth-db",
"reth-interfaces",
"reth-metrics",

View File

@ -64,11 +64,10 @@ use reth_provider::{
providers::BlockchainProvider, BlockHashReader, BlockReader, CanonStateSubscriptions,
HeaderProvider, HeaderSyncMode, ProviderFactory, StageCheckpointReader,
};
use reth_prune::{segments::SegmentSet, Pruner};
use reth_prune::PrunerBuilder;
use reth_revm::EvmProcessorFactory;
use reth_revm_inspectors::stack::Hook;
use reth_rpc_engine_api::EngineApi;
use reth_snapshot::HighestSnapshotsTracker;
use reth_stages::{
prelude::*,
stages::{
@ -475,12 +474,10 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
let mut hooks = EngineHooks::new();
let pruner_events = if let Some(prune_config) = prune_config {
let mut pruner = self.build_pruner(
&prune_config,
provider_factory,
tree_config,
snapshotter.highest_snapshot_receiver(),
);
let mut pruner = PrunerBuilder::new(prune_config.clone())
.max_reorg_depth(tree_config.max_reorg_depth() as usize)
.prune_delete_limit(self.chain.prune_delete_limit)
.build(provider_factory, snapshotter.highest_snapshot_receiver());
let events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone())));
@ -975,53 +972,6 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
Ok(pipeline)
}
/// Builds a [Pruner] with the given config.
fn build_pruner<DB: Database>(
&self,
config: &PruneConfig,
provider_factory: ProviderFactory<DB>,
tree_config: BlockchainTreeConfig,
highest_snapshots_rx: HighestSnapshotsTracker,
) -> Pruner<DB> {
let segments = SegmentSet::default()
// Receipts
.segment_opt(config.segments.receipts.map(reth_prune::segments::Receipts::new))
// Receipts by logs
.segment_opt((!config.segments.receipts_log_filter.is_empty()).then(|| {
reth_prune::segments::ReceiptsByLogs::new(
config.segments.receipts_log_filter.clone(),
)
}))
// Transaction lookup
.segment_opt(
config
.segments
.transaction_lookup
.map(reth_prune::segments::TransactionLookup::new),
)
// Sender recovery
.segment_opt(
config.segments.sender_recovery.map(reth_prune::segments::SenderRecovery::new),
)
// Account history
.segment_opt(
config.segments.account_history.map(reth_prune::segments::AccountHistory::new),
)
// Storage history
.segment_opt(
config.segments.storage_history.map(reth_prune::segments::StorageHistory::new),
);
Pruner::new(
provider_factory,
segments.into_vec(),
config.block_interval,
self.chain.prune_delete_limit,
tree_config.max_reorg_depth() as usize,
highest_snapshots_rx,
)
}
/// Change rpc port numbers based on the instance number.
fn adjust_instance_ports(&mut self) {
// auth port is scaled by a factor of instance * 100

View File

@ -10,4 +10,4 @@
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
pub mod config;
pub use config::Config;
pub use config::{Config, PruneConfig};

View File

@ -16,6 +16,7 @@ reth-provider.workspace = true
reth-interfaces.workspace = true
reth-snapshot.workspace = true
reth-tokio-util.workspace = true
reth-config.workspace = true
# async
tokio = { workspace = true, features = ["sync"] }

View File

@ -0,0 +1,83 @@
use crate::{segments::SegmentSet, Pruner};
use reth_config::PruneConfig;
use reth_db::database::Database;
use reth_primitives::{PruneModes, MAINNET};
use reth_provider::ProviderFactory;
use reth_snapshot::HighestSnapshotsTracker;
/// Contains the information required to build a pruner
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PrunerBuilder {
/// Minimum pruning interval measured in blocks.
pub block_interval: usize,
/// Pruning configuration for every part of the data that can be pruned.
pub segments: PruneModes,
/// The number of blocks that can be re-orged.
pub max_reorg_depth: usize,
/// The delete limit for pruner, per block. In the actual pruner run it will be multiplied by
/// the amount of blocks between pruner runs to account for the difference in amount of new
/// data coming in.
pub prune_delete_limit: usize,
}
impl PrunerBuilder {
/// Creates a new [PrunerBuilder] from the given [PruneConfig].
pub fn new(pruner_config: PruneConfig) -> Self {
PrunerBuilder::default()
.block_interval(pruner_config.block_interval)
.segments(pruner_config.segments)
}
/// Sets the minimum pruning interval measured in blocks.
pub fn block_interval(mut self, block_interval: usize) -> Self {
self.block_interval = block_interval;
self
}
/// Sets the configuration for every part of the data that can be pruned.
pub fn segments(mut self, segments: PruneModes) -> Self {
self.segments = segments;
self
}
/// Sets the number of blocks that can be re-orged.
pub fn max_reorg_depth(mut self, max_reorg_depth: usize) -> Self {
self.max_reorg_depth = max_reorg_depth;
self
}
/// Sets the delete limit for pruner, per block.
pub fn prune_delete_limit(mut self, prune_delete_limit: usize) -> Self {
self.prune_delete_limit = prune_delete_limit;
self
}
/// Builds a [Pruner] from the current configuration.
pub fn build<DB: Database>(
self,
provider_factory: ProviderFactory<DB>,
highest_snapshots_rx: HighestSnapshotsTracker,
) -> Pruner<DB> {
let segments = SegmentSet::<DB>::from_prune_modes(self.segments);
Pruner::new(
provider_factory,
segments.into_vec(),
self.block_interval,
self.prune_delete_limit,
self.max_reorg_depth,
highest_snapshots_rx,
)
}
}
impl Default for PrunerBuilder {
fn default() -> Self {
Self {
block_interval: 5,
segments: PruneModes::none(),
max_reorg_depth: 64,
prune_delete_limit: MAINNET.prune_delete_limit,
}
}
}

View File

@ -9,6 +9,7 @@
#![deny(unused_must_use, rust_2018_idioms)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
mod builder;
mod error;
mod event;
mod metrics;
@ -16,6 +17,7 @@ mod pruner;
pub mod segments;
use crate::metrics::Metrics;
pub use builder::PrunerBuilder;
pub use error::PrunerError;
pub use event::PrunerEvent;
pub use pruner::{Pruner, PrunerResult, PrunerWithResult};

View File

@ -1,5 +1,9 @@
use crate::segments::Segment;
use crate::segments::{
AccountHistory, Receipts, ReceiptsByLogs, Segment, SenderRecovery, StorageHistory,
TransactionLookup,
};
use reth_db::database::Database;
use reth_primitives::PruneModes;
use std::sync::Arc;
/// Collection of [Segment]. Thread-safe, allocated on the heap.
@ -32,6 +36,35 @@ impl<DB: Database> SegmentSet<DB> {
pub fn into_vec(self) -> Vec<Arc<dyn Segment<DB>>> {
self.inner
}
/// Creates a [SegmentSet] from an existing [PruneModes].
pub fn from_prune_modes(prune_modes: PruneModes) -> Self {
let PruneModes {
sender_recovery,
transaction_lookup,
receipts,
account_history,
storage_history,
receipts_log_filter,
} = prune_modes;
SegmentSet::default()
// Receipts
.segment_opt(receipts.map(Receipts::new))
// Receipts by logs
.segment_opt(
(!receipts_log_filter.is_empty())
.then(|| ReceiptsByLogs::new(receipts_log_filter.clone())),
)
// Transaction lookup
.segment_opt(transaction_lookup.map(TransactionLookup::new))
// Sender recovery
.segment_opt(sender_recovery.map(SenderRecovery::new))
// Account history
.segment_opt(account_history.map(AccountHistory::new))
// Storage history
.segment_opt(storage_history.map(StorageHistory::new))
}
}
impl<DB: Database> Default for SegmentSet<DB> {