feat(prune, stages): prune pipeline stage (#9419)

This commit is contained in:
Alexey Shekhirin
2024-07-11 20:18:56 +01:00
committed by GitHub
parent c31d69683d
commit b040b86a54
19 changed files with 416 additions and 79 deletions

1
Cargo.lock generated
View File

@ -8579,6 +8579,7 @@ dependencies = [
"reth-primitives",
"reth-primitives-traits",
"reth-provider",
"reth-prune",
"reth-prune-types",
"reth-revm",
"reth-stages-api",

View File

@ -30,8 +30,8 @@ impl PruneCommand {
info!(target: "reth::cli", ?prune_tip, ?prune_config, "Pruning data from database...");
// Run the pruner according to the configuration, and don't enforce any limits on it
let mut pruner = PrunerBuilder::new(prune_config)
.prune_delete_limit(usize::MAX)
.build(provider_factory);
.delete_limit_per_block(usize::MAX)
.build_with_provider_factory(provider_factory);
pruner.run(prune_tip)?;
info!(target: "reth::cli", "Pruned data from database");

View File

@ -71,6 +71,8 @@ pub struct StageConfig {
pub sender_recovery: SenderRecoveryConfig,
/// Execution stage configuration.
pub execution: ExecutionConfig,
/// Prune stage configuration.
pub prune: PruneStageConfig,
/// Account Hashing stage configuration.
pub account_hashing: HashingConfig,
/// Storage Hashing stage configuration.
@ -228,6 +230,20 @@ impl From<ExecutionConfig> for ExecutionStageThresholds {
}
}
/// Prune stage configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default)]
pub struct PruneStageConfig {
/// The maximum number of entries to prune before committing progress to the database.
pub commit_threshold: usize,
}
impl Default for PruneStageConfig {
fn default() -> Self {
Self { commit_threshold: 1_000_000 }
}
}
/// Hashing stage configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default)]

View File

@ -9,6 +9,7 @@ use metrics::Counter;
use reth_db_api::database::Database;
use reth_errors::{RethError, RethResult};
use reth_primitives::BlockNumber;
use reth_provider::ProviderFactory;
use reth_prune::{Pruner, PrunerError, PrunerWithResult};
use reth_tasks::TaskSpawner;
use std::{
@ -39,7 +40,10 @@ impl<DB: fmt::Debug> fmt::Debug for PruneHook<DB> {
impl<DB: Database + 'static> PruneHook<DB> {
/// Create a new instance
pub fn new(pruner: Pruner<DB>, pruner_task_spawner: Box<dyn TaskSpawner>) -> Self {
pub fn new(
pruner: Pruner<DB, ProviderFactory<DB>>,
pruner_task_spawner: Box<dyn TaskSpawner>,
) -> Self {
Self {
pruner_state: PrunerState::Idle(Some(pruner)),
pruner_task_spawner,
@ -151,9 +155,9 @@ impl<DB: Database + 'static> EngineHook for PruneHook<DB> {
#[derive(Debug)]
enum PrunerState<DB> {
/// Pruner is idle.
Idle(Option<Pruner<DB>>),
Idle(Option<Pruner<DB, ProviderFactory<DB>>>),
/// Pruner is running and waiting for a response
Running(oneshot::Receiver<PrunerWithResult<DB>>),
Running(oneshot::Receiver<PrunerWithResult<DB, ProviderFactory<DB>>>),
}
#[derive(reth_metrics::Metrics)]

View File

@ -26,7 +26,7 @@ use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{BlockNumber, B256};
use reth_provider::{
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
ExecutionOutcome,
ExecutionOutcome, ProviderFactory,
};
use reth_prune::Pruner;
use reth_prune_types::PruneModes;
@ -400,7 +400,7 @@ where
let blockchain_provider =
BlockchainProvider::with_latest(provider_factory.clone(), tree, latest);
let pruner = Pruner::new(
let pruner = Pruner::<_, ProviderFactory<_>>::new(
provider_factory.clone(),
vec![],
5,

View File

@ -34,7 +34,7 @@ pub struct Persistence<DB> {
/// Handle for the static file service.
static_file_handle: StaticFileServiceHandle,
/// The pruner
pruner: Pruner<DB>,
pruner: Pruner<DB, ProviderFactory<DB>>,
}
impl<DB: Database> Persistence<DB> {
@ -43,7 +43,7 @@ impl<DB: Database> Persistence<DB> {
provider: ProviderFactory<DB>,
incoming: Receiver<PersistenceAction>,
static_file_handle: StaticFileServiceHandle,
pruner: Pruner<DB>,
pruner: Pruner<DB, ProviderFactory<DB>>,
) -> Self {
Self { provider, incoming, static_file_handle, pruner }
}
@ -145,7 +145,7 @@ where
fn spawn_new(
provider: ProviderFactory<DB>,
static_file_handle: StaticFileServiceHandle,
pruner: Pruner<DB>,
pruner: Pruner<DB, ProviderFactory<DB>>,
) -> PersistenceHandle {
let (tx, rx) = std::sync::mpsc::channel();
let service = Self::new(provider, rx, static_file_handle, pruner);
@ -313,7 +313,15 @@ mod tests {
let (finished_exex_height_tx, finished_exex_height_rx) =
tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
let pruner = Pruner::new(provider.clone(), vec![], 5, 0, 5, None, finished_exex_height_rx);
let pruner = Pruner::<_, ProviderFactory<_>>::new(
provider.clone(),
vec![],
5,
0,
5,
None,
finished_exex_height_rx,
);
let (static_file_sender, _static_file_receiver) = channel();
let static_file_handle = StaticFileServiceHandle::new(static_file_sender);

View File

@ -329,7 +329,7 @@ impl<R> LaunchContextWith<Attached<WithConfigs, R>> {
/// Returns an initialized [`PrunerBuilder`] based on the configured [`PruneConfig`]
pub fn pruner_builder(&self) -> PrunerBuilder {
PrunerBuilder::new(self.prune_config().unwrap_or_default())
.prune_delete_limit(self.chain_spec().prune_delete_limit)
.delete_limit_per_block(self.chain_spec().prune_delete_limit)
.timeout(PrunerBuilder::DEFAULT_TIMEOUT)
}

View File

@ -231,14 +231,15 @@ where
let initial_target = ctx.node_config().debug.tip;
let mut pruner_builder =
ctx.pruner_builder().max_reorg_depth(ctx.tree_config().max_reorg_depth() as usize);
let mut pruner_builder = ctx
.pruner_builder()
.prune_max_blocks_per_run(ctx.tree_config().max_reorg_depth() as usize);
if let Some(exex_manager_handle) = &exex_manager_handle {
pruner_builder =
pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
}
let pruner = pruner_builder.build(ctx.provider_factory().clone());
let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
let pruner_events = pruner.events();
info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");

View File

@ -15,12 +15,12 @@ pub struct PrunerBuilder {
block_interval: usize,
/// Pruning configuration for every part of the data that can be pruned.
segments: PruneModes,
/// The number of blocks that can be re-orged.
max_reorg_depth: usize,
/// The maximum number of blocks that can be pruned per run.
prune_max_blocks_per_run: usize,
/// The delete limit for pruner, per block. In the actual pruner run it will be multiplied by
/// the amount of blocks between pruner runs to account for the difference in amount of new
/// data coming in.
prune_delete_limit: usize,
delete_limit_per_block: usize,
/// Time a pruner job can run before timing out.
timeout: Option<Duration>,
/// The finished height of all `ExEx`'s.
@ -51,14 +51,14 @@ impl PrunerBuilder {
}
/// Sets the number of blocks that can be re-orged.
pub const fn max_reorg_depth(mut self, max_reorg_depth: usize) -> Self {
self.max_reorg_depth = max_reorg_depth;
pub const fn prune_max_blocks_per_run(mut self, prune_max_blocks_per_run: usize) -> Self {
self.prune_max_blocks_per_run = prune_max_blocks_per_run;
self
}
/// Sets the delete limit for pruner, per block.
pub const fn prune_delete_limit(mut self, prune_delete_limit: usize) -> Self {
self.prune_delete_limit = prune_delete_limit;
pub const fn delete_limit_per_block(mut self, delete_limit_per_block: usize) -> Self {
self.delete_limit_per_block = delete_limit_per_block;
self
}
@ -80,16 +80,33 @@ impl PrunerBuilder {
self
}
/// Builds a [Pruner] from the current configuration.
pub fn build<DB: Database>(self, provider_factory: ProviderFactory<DB>) -> Pruner<DB> {
/// Builds a [Pruner] from the current configuration with the given provider factory.
pub fn build_with_provider_factory<DB: Database>(
self,
provider_factory: ProviderFactory<DB>,
) -> Pruner<DB, ProviderFactory<DB>> {
let segments = SegmentSet::<DB>::from_prune_modes(self.segments);
Pruner::new(
Pruner::<_, ProviderFactory<DB>>::new(
provider_factory,
segments.into_vec(),
self.block_interval,
self.prune_delete_limit,
self.max_reorg_depth,
self.delete_limit_per_block,
self.prune_max_blocks_per_run,
self.timeout,
self.finished_exex_height,
)
}
/// Builds a [Pruner] from the current configuration.
pub fn build<DB: Database>(self) -> Pruner<DB, ()> {
let segments = SegmentSet::<DB>::from_prune_modes(self.segments);
Pruner::<_, ()>::new(
segments.into_vec(),
self.block_interval,
self.delete_limit_per_block,
self.prune_max_blocks_per_run,
self.timeout,
self.finished_exex_height,
)
@ -101,8 +118,8 @@ impl Default for PrunerBuilder {
Self {
block_interval: 5,
segments: PruneModes::none(),
max_reorg_depth: 64,
prune_delete_limit: MAINNET.prune_delete_limit,
prune_max_blocks_per_run: 64,
delete_limit_per_block: MAINNET.prune_delete_limit,
timeout: None,
finished_exex_height: watch::channel(FinishedExExHeight::NoExExs).1,
}

View File

@ -8,9 +8,7 @@ use crate::{
use alloy_primitives::BlockNumber;
use reth_db_api::database::Database;
use reth_exex_types::FinishedExExHeight;
use reth_provider::{
DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, StaticFileProviderFactory,
};
use reth_provider::{DatabaseProviderRW, ProviderFactory, PruneCheckpointReader};
use reth_prune_types::{PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment};
use reth_static_file_types::StaticFileSegment;
use reth_tokio_util::{EventSender, EventStream};
@ -22,14 +20,15 @@ use tracing::debug;
pub type PrunerResult = Result<PruneProgress, PrunerError>;
/// The pruner type itself with the result of [`Pruner::run`]
pub type PrunerWithResult<DB> = (Pruner<DB>, PrunerResult);
pub type PrunerWithResult<S, DB> = (Pruner<S, DB>, PrunerResult);
type PrunerStats = Vec<(PruneSegment, usize, PruneProgress)>;
/// Pruning routine. Main pruning logic happens in [`Pruner::run`].
#[derive(Debug)]
pub struct Pruner<DB> {
provider_factory: ProviderFactory<DB>,
pub struct Pruner<DB, PF> {
/// Provider factory. If pruner is initialized without it, it will be set to `()`.
provider_factory: PF,
segments: Vec<Box<dyn Segment<DB>>>,
/// Minimum pruning interval measured in blocks. All prune segments are checked and, if needed,
/// pruned, when the chain advances by the specified number of blocks.
@ -52,13 +51,38 @@ pub struct Pruner<DB> {
event_sender: EventSender<PrunerEvent>,
}
impl<DB: Database> Pruner<DB> {
/// Creates a new [Pruner].
impl<DB> Pruner<DB, ()> {
/// Creates a new [Pruner] without a provider factory.
pub fn new(
segments: Vec<Box<dyn Segment<DB>>>,
min_block_interval: usize,
delete_limit_per_block: usize,
prune_max_blocks_per_run: usize,
timeout: Option<Duration>,
finished_exex_height: watch::Receiver<FinishedExExHeight>,
) -> Self {
Self {
provider_factory: (),
segments,
min_block_interval,
previous_tip_block_number: None,
delete_limit_per_block,
prune_max_blocks_per_run,
timeout,
finished_exex_height,
metrics: Metrics::default(),
event_sender: Default::default(),
}
}
}
impl<DB: Database> Pruner<DB, ProviderFactory<DB>> {
/// Crates a new pruner with the given provider factory.
pub fn new(
provider_factory: ProviderFactory<DB>,
segments: Vec<Box<dyn Segment<DB>>>,
min_block_interval: usize,
delete_limit: usize,
delete_limit_per_block: usize,
prune_max_blocks_per_run: usize,
timeout: Option<Duration>,
finished_exex_height: watch::Receiver<FinishedExExHeight>,
@ -68,7 +92,7 @@ impl<DB: Database> Pruner<DB> {
segments,
min_block_interval,
previous_tip_block_number: None,
delete_limit_per_block: delete_limit,
delete_limit_per_block,
prune_max_blocks_per_run,
timeout,
finished_exex_height,
@ -76,18 +100,19 @@ impl<DB: Database> Pruner<DB> {
event_sender: Default::default(),
}
}
}
impl<DB: Database, S> Pruner<DB, S> {
/// Listen for events on the pruner.
pub fn events(&self) -> EventStream<PrunerEvent> {
self.event_sender.new_listener()
}
/// Run the pruner. This will only prune data up to the highest finished `ExEx` height, if there
/// are no `ExEx`s, .
///
/// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
/// to prune.
pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
fn run_with_provider(
&mut self,
provider: &DatabaseProviderRW<DB>,
tip_block_number: BlockNumber,
) -> PrunerResult {
let Some(tip_block_number) =
self.adjust_tip_block_number_to_finished_exex_height(tip_block_number)
else {
@ -128,10 +153,8 @@ impl<DB: Database> Pruner<DB> {
limiter = limiter.set_time_limit(timeout);
};
let provider = self.provider_factory.provider_rw()?;
let (stats, deleted_entries, progress) =
self.prune_segments(&provider, tip_block_number, &mut limiter)?;
provider.commit()?;
self.prune_segments(provider, tip_block_number, &mut limiter)?;
self.previous_tip_block_number = Some(tip_block_number);
@ -170,7 +193,7 @@ impl<DB: Database> Pruner<DB> {
tip_block_number: BlockNumber,
limiter: &mut PruneLimiter,
) -> Result<(PrunerStats, usize, PruneProgress), PrunerError> {
let static_file_segments = self.static_file_segments();
let static_file_segments = self.static_file_segments(provider);
let segments = static_file_segments
.iter()
.map(|segment| (segment, PrunePurpose::StaticFile))
@ -251,10 +274,10 @@ impl<DB: Database> Pruner<DB> {
/// Returns pre-configured segments that needs to be pruned according to the highest
/// `static_files` for [`PruneSegment::Transactions`], [`PruneSegment::Headers`] and
/// [`PruneSegment::Receipts`].
fn static_file_segments(&self) -> Vec<Box<dyn Segment<DB>>> {
fn static_file_segments(&self, provider: &DatabaseProviderRW<DB>) -> Vec<Box<dyn Segment<DB>>> {
let mut segments = Vec::<Box<dyn Segment<DB>>>::new();
let static_file_provider = self.provider_factory.static_file_provider();
let static_file_provider = provider.static_file_provider();
if let Some(to_block) =
static_file_provider.get_highest_static_file_block(StaticFileSegment::Transactions)
@ -330,6 +353,37 @@ impl<DB: Database> Pruner<DB> {
}
}
impl<DB: Database> Pruner<DB, ()> {
/// Run the pruner with the given provider. This will only prune data up to the highest finished
/// ExEx height, if there are no ExExes.
///
/// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
/// to prune.
#[allow(clippy::doc_markdown)]
pub fn run(
&mut self,
provider: &DatabaseProviderRW<DB>,
tip_block_number: BlockNumber,
) -> PrunerResult {
self.run_with_provider(provider, tip_block_number)
}
}
impl<DB: Database> Pruner<DB, ProviderFactory<DB>> {
/// Run the pruner. This will only prune data up to the highest finished ExEx height, if there
/// are no ExExes.
///
/// Returns a [`PruneProgress`], indicating whether pruning is finished, or there is more data
/// to prune.
#[allow(clippy::doc_markdown)]
pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
let provider = self.provider_factory.provider_rw()?;
let result = self.run_with_provider(&provider, tip_block_number);
provider.commit()?;
result
}
}
#[cfg(test)]
mod tests {
@ -352,8 +406,15 @@ mod tests {
let (finished_exex_height_tx, finished_exex_height_rx) =
tokio::sync::watch::channel(FinishedExExHeight::NoExExs);
let mut pruner =
Pruner::new(provider_factory, vec![], 5, 0, 5, None, finished_exex_height_rx);
let mut pruner = Pruner::<_, ProviderFactory<_>>::new(
provider_factory,
vec![],
5,
0,
5,
None,
finished_exex_height_rx,
);
// No last pruned block number was set before
let first_block_number = 1;

View File

@ -62,6 +62,11 @@ impl PruneModes {
receipts_log_filter: Default::default(),
}
}
/// Returns true if all prune modes are set to [`None`].
pub fn is_empty(&self) -> bool {
self == &Self::none()
}
}
/// Deserializes [`Option<PruneMode>`] and validates that the value is not less than the const

View File

@ -5,6 +5,7 @@ use reth_errors::{BlockExecutionError, DatabaseError, RethError};
use reth_network_p2p::error::DownloadError;
use reth_primitives_traits::SealedHeader;
use reth_provider::ProviderError;
use reth_prune::{PruneSegmentError, PrunerError};
use reth_static_file_types::StaticFileSegment;
use thiserror::Error;
use tokio::sync::broadcast::error::SendError;
@ -70,7 +71,10 @@ pub enum StageError {
Database(#[from] DatabaseError),
/// Invalid pruning configuration
#[error(transparent)]
PruningConfiguration(#[from] reth_prune::PruneSegmentError),
PruningConfiguration(#[from] PruneSegmentError),
/// Pruner error
#[error(transparent)]
Pruner(#[from] PrunerError),
/// Invalid checkpoint passed to the stage
#[error("invalid stage checkpoint: {0}")]
StageCheckpoint(u64),

View File

@ -256,8 +256,8 @@ where
// Run the pruner so we don't potentially end up with higher height in the database vs
// static files during a pipeline unwind
let mut pruner = PrunerBuilder::new(Default::default())
.prune_delete_limit(usize::MAX)
.build(self.provider_factory.clone());
.delete_limit_per_block(usize::MAX)
.build_with_provider_factory(self.provider_factory.clone());
pruner.run(prune_tip)?;
}

View File

@ -114,6 +114,17 @@ where
self
}
/// Adds the given [`Stage`] at the end of this set if it's [`Some`].
///
/// If the stage was already in the group, it is removed from its previous place.
pub fn add_stage_opt<S: Stage<DB> + 'static>(self, stage: Option<S>) -> Self {
if let Some(stage) = stage {
self.add_stage(stage)
} else {
self
}
}
/// Adds the given [`StageSet`] to the end of this set.
///
/// If a stage is in both sets, it is removed from its previous place in this set. Because of

View File

@ -27,6 +27,7 @@ reth-primitives.workspace = true
reth-primitives-traits.workspace = true
reth-provider.workspace = true
reth-execution-types.workspace = true
reth-prune.workspace = true
reth-prune-types.workspace = true
reth-storage-errors.workspace = true
reth-revm.workspace = true

View File

@ -36,8 +36,8 @@
use crate::{
stages::{
AccountHashingStage, BodyStage, ExecutionStage, FinishStage, HeaderStage,
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TransactionLookupStage,
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, PruneSenderRecoveryStage,
PruneStage, SenderRecoveryStage, StorageHashingStage, TransactionLookupStage,
},
StageSet, StageSetBuilder,
};
@ -49,7 +49,7 @@ use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::
use reth_primitives::B256;
use reth_provider::HeaderSyncGapProvider;
use reth_prune_types::PruneModes;
use std::sync::Arc;
use std::{ops::Not, sync::Arc};
use tokio::sync::watch;
/// A set containing all stages to run a fully syncing instance of reth.
@ -65,6 +65,7 @@ use tokio::sync::watch;
/// - [`BodyStage`]
/// - [`SenderRecoveryStage`]
/// - [`ExecutionStage`]
/// - [`PruneSenderRecoveryStage`] (execute)
/// - [`MerkleStage`] (unwind)
/// - [`AccountHashingStage`]
/// - [`StorageHashingStage`]
@ -72,6 +73,7 @@ use tokio::sync::watch;
/// - [`TransactionLookupStage`]
/// - [`IndexStorageHistoryStage`]
/// - [`IndexAccountHistoryStage`]
/// - [`PruneStage`] (execute)
/// - [`FinishStage`]
#[derive(Debug)]
pub struct DefaultStages<Provider, H, B, EF> {
@ -122,7 +124,7 @@ where
E: BlockExecutorProvider,
{
/// Appends the default offline stages and default finish stage to the given builder.
pub fn add_offline_stages<DB: Database>(
pub fn add_offline_stages<DB: Database + 'static>(
default_offline: StageSetBuilder<DB>,
executor_factory: E,
stages_config: StageConfig,
@ -247,13 +249,15 @@ where
/// A combination of (in order)
///
/// - [`ExecutionStages`]
/// - [`PruneSenderRecoveryStage`]
/// - [`HashingStages`]
/// - [`HistoryIndexingStages`]
/// - [`PruneStage`]
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct OfflineStages<EF> {
/// Executor factory needs for execution stage
pub executor_factory: EF,
executor_factory: EF,
/// Configuration for each stage in the pipeline
stages_config: StageConfig,
/// Prune configuration for every segment that can be pruned
@ -274,7 +278,7 @@ impl<EF> OfflineStages<EF> {
impl<E, DB> StageSet<DB> for OfflineStages<E>
where
E: BlockExecutorProvider,
DB: Database,
DB: Database + 'static,
{
fn builder(self) -> StageSetBuilder<DB> {
ExecutionStages::new(
@ -283,11 +287,21 @@ where
self.prune_modes.clone(),
)
.builder()
// If sender recovery prune mode is set, add the prune sender recovery stage.
.add_stage_opt(self.prune_modes.sender_recovery.map(|prune_mode| {
PruneSenderRecoveryStage::new(prune_mode, self.stages_config.prune.commit_threshold)
}))
.add_set(HashingStages { stages_config: self.stages_config.clone() })
.add_set(HistoryIndexingStages {
stages_config: self.stages_config.clone(),
prune_modes: self.prune_modes,
prune_modes: self.prune_modes.clone(),
})
// If any prune modes are set, add the prune stage.
.add_stage_opt(self.prune_modes.is_empty().not().then(|| {
// Prune stage should be added after all hashing stages, because otherwise it will
// delete
PruneStage::new(self.prune_modes.clone(), self.stages_config.prune.commit_threshold)
}))
}
}

View File

@ -16,6 +16,7 @@ mod index_account_history;
mod index_storage_history;
/// Stage for computing state root.
mod merkle;
mod prune;
/// The sender recovery stage.
mod sender_recovery;
/// The transaction lookup stage
@ -30,7 +31,7 @@ pub use headers::*;
pub use index_account_history::*;
pub use index_storage_history::*;
pub use merkle::*;
pub use prune::*;
pub use sender_recovery::*;
pub use tx_lookup::*;

View File

@ -0,0 +1,197 @@
use reth_db_api::database::Database;
use reth_provider::DatabaseProviderRW;
use reth_prune::{PruneMode, PruneModes, PrunerBuilder};
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
use tracing::info;
/// The prune stage that runs the pruner with the provided prune modes.
///
/// There are two main reasons to have this stage when running a full node:
/// - Sender Recovery stage inserts a lot of data into the database that's only needed for the
/// Execution stage. Pruner will clean up the unneeded recovered senders.
/// - Pruning during the live sync can take a significant amount of time, especially history
/// segments. If we can prune as much data as possible in one go before starting the live sync, we
/// should do it.
///
/// `commit_threshold` is the maximum number of entries to prune before committing
/// progress to the database.
#[derive(Debug)]
pub struct PruneStage {
prune_modes: PruneModes,
commit_threshold: usize,
}
impl PruneStage {
/// Crate new prune stage with the given prune modes and commit threshold.
pub const fn new(prune_modes: PruneModes, commit_threshold: usize) -> Self {
Self { prune_modes, commit_threshold }
}
}
impl<DB: Database> Stage<DB> for PruneStage {
fn id(&self) -> StageId {
StageId::Prune
}
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let mut pruner = PrunerBuilder::default()
.segments(self.prune_modes.clone())
.prune_max_blocks_per_run(1)
.delete_limit_per_block(self.commit_threshold)
.build();
let result = pruner.run(provider, input.target())?;
if result.is_finished() {
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true })
} else {
Ok(ExecOutput { checkpoint: input.checkpoint(), done: false })
}
}
fn unwind(
&mut self,
_provider: &DatabaseProviderRW<DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::prune::unwind", "Stage is always skipped");
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
}
/// The prune sender recovery stage that runs the pruner with the provided `PruneMode` for the
/// `SenderRecovery` segment.
///
/// Under the hood, this stage has the same functionality as [`PruneStage`].
#[derive(Debug)]
pub struct PruneSenderRecoveryStage(PruneStage);
impl PruneSenderRecoveryStage {
/// Create new prune sender recovery stage with the given prune mode and commit threshold.
pub fn new(prune_mode: PruneMode, commit_threshold: usize) -> Self {
Self(PruneStage::new(
PruneModes { sender_recovery: Some(prune_mode), ..PruneModes::none() },
commit_threshold,
))
}
}
impl<DB: Database> Stage<DB> for PruneSenderRecoveryStage {
fn id(&self) -> StageId {
StageId::PruneSenderRecovery
}
fn execute(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
self.0.execute(provider, input)
}
fn unwind(
&mut self,
provider: &DatabaseProviderRW<DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
self.0.unwind(provider, input)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
TestRunnerError, TestStageDB, UnwindStageTestRunner,
};
use reth_primitives::{SealedBlock, B256};
use reth_provider::{
providers::StaticFileWriter, TransactionsProvider, TransactionsProviderExt,
};
use reth_prune::PruneMode;
use reth_testing_utils::generators::{self, random_block_range};
stage_test_suite_ext!(PruneTestRunner, prune);
#[derive(Default)]
struct PruneTestRunner {
db: TestStageDB,
}
impl StageTestRunner for PruneTestRunner {
type S = PruneStage;
fn db(&self) -> &TestStageDB {
&self.db
}
fn stage(&self) -> Self::S {
PruneStage {
prune_modes: PruneModes {
sender_recovery: Some(PruneMode::Full),
..Default::default()
},
commit_threshold: usize::MAX,
}
}
}
impl ExecuteStageTestRunner for PruneTestRunner {
type Seed = Vec<SealedBlock>;
fn seed_execution(&mut self, input: ExecInput) -> Result<Self::Seed, TestRunnerError> {
let mut rng = generators::rng();
let blocks = random_block_range(
&mut rng,
input.checkpoint().block_number..=input.target(),
B256::ZERO,
1..3,
);
self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;
self.db.insert_transaction_senders(
blocks.iter().flat_map(|block| block.body.iter()).enumerate().map(|(i, tx)| {
(i as u64, tx.recover_signer().expect("failed to recover signer"))
}),
)?;
Ok(blocks)
}
fn validate_execution(
&self,
input: ExecInput,
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
if let Some(output) = output {
let start_block = input.next_block();
let end_block = output.checkpoint.block_number;
if start_block > end_block {
return Ok(())
}
assert!(output.done);
assert_eq!(output.checkpoint.block_number, input.target());
// Verify that the senders are pruned
let provider = self.db.factory.provider()?;
let tx_range =
provider.transaction_range_by_block_range(start_block..=end_block)?;
let senders = self.db.factory.provider()?.senders_by_tx_range(tx_range)?;
assert!(senders.is_empty());
}
Ok(())
}
}
impl UnwindStageTestRunner for PruneTestRunner {
fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> {
Ok(())
}
}
}

View File

@ -1,36 +1,26 @@
/// Stage IDs for all known stages.
///
/// For custom stages, use [`StageId::Other`]
#[allow(missing_docs)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum StageId {
/// Static File stage in the process.
#[deprecated(
note = "Static Files are generated outside of the pipeline and do not require a separate stage"
)]
StaticFile,
/// Header stage in the process.
Headers,
/// Bodies stage in the process.
Bodies,
/// Sender recovery stage in the process.
SenderRecovery,
/// Execution stage in the process.
Execution,
/// Merkle unwind stage in the process.
PruneSenderRecovery,
MerkleUnwind,
/// Account hashing stage in the process.
AccountHashing,
/// Storage hashing stage in the process.
StorageHashing,
/// Merkle execute stage in the process.
MerkleExecute,
/// Transaction lookup stage in the process.
TransactionLookup,
/// Index storage history stage in the process.
IndexStorageHistory,
/// Index account history stage in the process.
IndexAccountHistory,
/// Finish stage in the process.
Prune,
Finish,
/// Other custom stage with a provided string identifier.
Other(&'static str),
@ -38,11 +28,12 @@ pub enum StageId {
impl StageId {
/// All supported Stages
pub const ALL: [Self; 12] = [
pub const ALL: [Self; 14] = [
Self::Headers,
Self::Bodies,
Self::SenderRecovery,
Self::Execution,
Self::PruneSenderRecovery,
Self::MerkleUnwind,
Self::AccountHashing,
Self::StorageHashing,
@ -50,18 +41,21 @@ impl StageId {
Self::TransactionLookup,
Self::IndexStorageHistory,
Self::IndexAccountHistory,
Self::Prune,
Self::Finish,
];
/// Stages that require state.
pub const STATE_REQUIRED: [Self; 7] = [
pub const STATE_REQUIRED: [Self; 9] = [
Self::Execution,
Self::PruneSenderRecovery,
Self::MerkleUnwind,
Self::AccountHashing,
Self::StorageHashing,
Self::MerkleExecute,
Self::IndexStorageHistory,
Self::IndexAccountHistory,
Self::Prune,
];
/// Return stage id formatted as string.
@ -73,6 +67,7 @@ impl StageId {
Self::Bodies => "Bodies",
Self::SenderRecovery => "SenderRecovery",
Self::Execution => "Execution",
Self::PruneSenderRecovery => "PruneSenderRecovery",
Self::MerkleUnwind => "MerkleUnwind",
Self::AccountHashing => "AccountHashing",
Self::StorageHashing => "StorageHashing",
@ -80,6 +75,7 @@ impl StageId {
Self::TransactionLookup => "TransactionLookup",
Self::IndexAccountHistory => "IndexAccountHistory",
Self::IndexStorageHistory => "IndexStorageHistory",
Self::Prune => "Prune",
Self::Finish => "Finish",
Self::Other(s) => s,
}