diff --git a/Cargo.lock b/Cargo.lock index 621a84ed2..6d9252332 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6434,7 +6434,6 @@ dependencies = [ "reth-payload-validator", "reth-primitives", "reth-provider", - "reth-prune", "reth-revm", "reth-rpc", "reth-rpc-api", @@ -7890,6 +7889,7 @@ dependencies = [ "reth-metrics", "reth-primitives", "reth-provider", + "reth-prune", "reth-static-file", "reth-tokio-util", "thiserror", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 37b26686f..c1ed8981a 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -43,7 +43,6 @@ reth-payload-builder.workspace = true reth-payload-validator.workspace = true reth-basic-payload-builder.workspace = true reth-discv4.workspace = true -reth-prune.workspace = true reth-static-file = { workspace = true } reth-trie = { workspace = true, features = ["metrics"] } reth-nippy-jar.workspace = true diff --git a/bin/reth/src/commands/stage/unwind.rs b/bin/reth/src/commands/stage/unwind.rs index 3a6597499..1f0c7fc45 100644 --- a/bin/reth/src/commands/stage/unwind.rs +++ b/bin/reth/src/commands/stage/unwind.rs @@ -2,7 +2,7 @@ use clap::{Parser, Subcommand}; use reth_beacon_consensus::EthBeaconConsensus; -use reth_config::{Config, PruneConfig}; +use reth_config::Config; use reth_consensus::Consensus; use reth_db::{database::Database, open_db}; use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader}; @@ -13,7 +13,6 @@ use reth_provider::{ BlockExecutionWriter, BlockNumReader, ChainSpecProvider, HeaderSyncMode, ProviderFactory, StaticFileProviderFactory, }; -use reth_prune::PrunerBuilder; use reth_stages::{ sets::DefaultStages, stages::{ @@ -107,14 +106,7 @@ impl Command { let mut pipeline = self.build_pipeline(config, provider_factory.clone()).await?; // Move all applicable data from database to static files. - pipeline.produce_static_files()?; - - // Run the pruner so we don't potentially end up with higher height in the database vs - // static files. - let mut pruner = PrunerBuilder::new(PruneConfig::default()) - .prune_delete_limit(usize::MAX) - .build(provider_factory); - pruner.run(*range.end())?; + pipeline.move_to_static_files()?; pipeline.unwind((*range.start()).saturating_sub(1), None)?; } else { diff --git a/crates/primitives/src/prune/mode.rs b/crates/primitives/src/prune/mode.rs index c32f66d35..3454573b9 100644 --- a/crates/primitives/src/prune/mode.rs +++ b/crates/primitives/src/prune/mode.rs @@ -36,6 +36,7 @@ impl PruneMode { PruneMode::Distance(distance) if *distance >= segment.min_blocks(purpose) => { Some((tip - distance, *self)) } + PruneMode::Before(n) if *n == tip + 1 && purpose.is_static_file() => Some((tip, *self)), PruneMode::Before(n) if *n > tip => None, // Nothing to prune yet PruneMode::Before(n) if tip - n >= segment.min_blocks(purpose) => Some((n - 1, *self)), _ => return Err(PruneSegmentError::Configuration(segment)), diff --git a/crates/prune/src/builder.rs b/crates/prune/src/builder.rs index 879bd9fb9..4e0ffd21a 100644 --- a/crates/prune/src/builder.rs +++ b/crates/prune/src/builder.rs @@ -1,10 +1,9 @@ -use std::time::Duration; - use crate::{segments::SegmentSet, Pruner}; use reth_config::PruneConfig; use reth_db::database::Database; use reth_primitives::{FinishedExExHeight, PruneModes, MAINNET}; use reth_provider::ProviderFactory; +use std::time::Duration; use tokio::sync::watch; /// Contains the information required to build a pruner diff --git a/crates/prune/src/error.rs b/crates/prune/src/error.rs index e12320bc8..bdf5bacc1 100644 --- a/crates/prune/src/error.rs +++ b/crates/prune/src/error.rs @@ -21,3 +21,16 @@ pub enum PrunerError { #[error(transparent)] Provider(#[from] ProviderError), } + +impl From for RethError { + fn from(err: PrunerError) -> Self { + match err { + PrunerError::PruneSegment(_) | PrunerError::InconsistentData(_) => { + RethError::Custom(err.to_string()) + } + PrunerError::Interface(err) => err, + PrunerError::Database(err) => RethError::Database(err), + PrunerError::Provider(err) => RethError::Provider(err), + } + } +} diff --git a/crates/stages-api/Cargo.toml b/crates/stages-api/Cargo.toml index d1e31ba78..2101961fd 100644 --- a/crates/stages-api/Cargo.toml +++ b/crates/stages-api/Cargo.toml @@ -19,6 +19,7 @@ reth-interfaces.workspace = true reth-static-file.workspace = true reth-tokio-util.workspace = true reth-consensus.workspace = true +reth-prune.workspace = true # metrics reth-metrics.workspace = true diff --git a/crates/stages-api/src/pipeline/mod.rs b/crates/stages-api/src/pipeline/mod.rs index 199cc41e6..5aceb515b 100644 --- a/crates/stages-api/src/pipeline/mod.rs +++ b/crates/stages-api/src/pipeline/mod.rs @@ -15,6 +15,7 @@ use reth_provider::{ providers::StaticFileWriter, ProviderFactory, StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory, }; +use reth_prune::PrunerBuilder; use reth_static_file::StaticFileProducer; use reth_tokio_util::EventListeners; use std::pin::Pin; @@ -140,7 +141,7 @@ where match target { PipelineTarget::Sync(tip) => self.set_tip(tip), PipelineTarget::Unwind(target) => { - if let Err(err) = self.produce_static_files() { + if let Err(err) = self.move_to_static_files() { return (self, Err(err.into())) } if let Err(err) = self.unwind(target, None) { @@ -199,7 +200,7 @@ where /// pipeline (for example the `Finish` stage). Or [ControlFlow::Unwind] of the stage that caused /// the unwind. pub async fn run_loop(&mut self) -> Result { - self.produce_static_files()?; + self.move_to_static_files()?; let mut previous_stage = None; for stage_index in 0..self.stages.len() { @@ -236,9 +237,10 @@ where Ok(self.progress.next_ctrl()) } - /// Run [static file producer](StaticFileProducer) and move all data from the database to static - /// files for corresponding [segments](reth_primitives::static_file::StaticFileSegment), - /// according to their [stage checkpoints](StageCheckpoint): + /// Run [static file producer](StaticFileProducer) and [pruner](reth_prune::Pruner) to **move** + /// all data from the database to static files for corresponding + /// [segments](reth_primitives::static_file::StaticFileSegment), according to their [stage + /// checkpoints](StageCheckpoint): /// - [StaticFileSegment::Headers](reth_primitives::static_file::StaticFileSegment::Headers) -> /// [StageId::Headers] /// - [StaticFileSegment::Receipts](reth_primitives::static_file::StaticFileSegment::Receipts) @@ -248,22 +250,38 @@ where /// /// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the /// lock is occupied. - pub fn produce_static_files(&self) -> RethResult<()> { + pub fn move_to_static_files(&self) -> RethResult<()> { let mut static_file_producer = self.static_file_producer.lock(); - let provider = self.provider_factory.provider()?; - let targets = static_file_producer.get_static_file_targets(HighestStaticFiles { - headers: provider - .get_stage_checkpoint(StageId::Headers)? - .map(|checkpoint| checkpoint.block_number), - receipts: provider - .get_stage_checkpoint(StageId::Execution)? - .map(|checkpoint| checkpoint.block_number), - transactions: provider - .get_stage_checkpoint(StageId::Bodies)? - .map(|checkpoint| checkpoint.block_number), - })?; - static_file_producer.run(targets)?; + // Copies data from database to static files + let lowest_static_file_height = { + let provider = self.provider_factory.provider()?; + let stages_checkpoints = [StageId::Headers, StageId::Execution, StageId::Bodies] + .into_iter() + .map(|stage| { + provider.get_stage_checkpoint(stage).map(|c| c.map(|c| c.block_number)) + }) + .collect::, _>>()?; + + let targets = static_file_producer.get_static_file_targets(HighestStaticFiles { + headers: stages_checkpoints[0], + receipts: stages_checkpoints[1], + transactions: stages_checkpoints[2], + })?; + static_file_producer.run(targets)?; + stages_checkpoints.into_iter().min().expect("exists") + }; + + // Deletes data which has been copied to static files. + if let Some(prune_tip) = lowest_static_file_height { + // Run the pruner so we don't potentially end up with higher height in the database vs + // static files during a pipeline unwind + let mut pruner = PrunerBuilder::new(Default::default()) + .prune_delete_limit(usize::MAX) + .build(self.provider_factory.clone()); + + pruner.run(prune_tip)?; + } Ok(()) } diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 1e2f73cbc..c84e9d8ce 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -34,10 +34,10 @@ use reth_db::mdbx::DatabaseArguments; /// A common provider that fetches data from a database or static file. /// /// This provider implements most provider or provider factory traits. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct ProviderFactory { /// Database - db: DB, + db: Arc, /// Chain spec chain_spec: Arc, /// Static File Provider @@ -52,7 +52,7 @@ impl ProviderFactory { static_files_path: PathBuf, ) -> RethResult> { Ok(Self { - db, + db: Arc::new(db), chain_spec, static_file_provider: StaticFileProvider::new(static_files_path)?, }) @@ -71,7 +71,7 @@ impl ProviderFactory { #[cfg(any(test, feature = "test-utils"))] /// Consumes Self and returns DB - pub fn into_db(self) -> DB { + pub fn into_db(self) -> Arc { self.db } } @@ -86,7 +86,7 @@ impl ProviderFactory { static_files_path: PathBuf, ) -> RethResult { Ok(ProviderFactory:: { - db: init_db(path, args).map_err(|e| RethError::Custom(e.to_string()))?, + db: Arc::new(init_db(path, args).map_err(|e| RethError::Custom(e.to_string()))?), chain_spec, static_file_provider: StaticFileProvider::new(static_files_path)?, }) @@ -558,6 +558,15 @@ impl PruneCheckpointReader for ProviderFactory { } } +impl Clone for ProviderFactory { + fn clone(&self) -> Self { + ProviderFactory { + db: Arc::clone(&self.db), + chain_spec: self.chain_spec.clone(), + static_file_provider: self.static_file_provider.clone(), + } + } +} #[cfg(test)] mod tests { use super::ProviderFactory;