From 3726cd17e88bcf567cf76eb683d8805faffbd1b7 Mon Sep 17 00:00:00 2001 From: Panagiotis Ganelis <50522617+PanGan21@users.noreply.github.com> Date: Tue, 2 Apr 2024 20:50:05 +0300 Subject: [PATCH] feat: use pipeline for `reth stage unwind` (#7085) Co-authored-by: joshieDo Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com> Co-authored-by: Alexey Shekhirin --- bin/reth/src/commands/stage/unwind.rs | 197 ++++++++++++++++++++--- crates/primitives/src/static_file/mod.rs | 5 + crates/stages/src/pipeline/mod.rs | 2 +- crates/stages/src/stages/bodies.rs | 70 +++++--- crates/stages/src/stages/execution.rs | 25 ++- 5 files changed, 248 insertions(+), 51 deletions(-) diff --git a/bin/reth/src/commands/stage/unwind.rs b/bin/reth/src/commands/stage/unwind.rs index 17847b161..c7483870a 100644 --- a/bin/reth/src/commands/stage/unwind.rs +++ b/bin/reth/src/commands/stage/unwind.rs @@ -8,10 +8,37 @@ use crate::{ dirs::{DataDirPath, MaybePlatformPath}, }; use clap::{Parser, Subcommand}; -use reth_db::{cursor::DbCursorRO, database::Database, open_db, tables, transaction::DbTx}; -use reth_primitives::{BlockHashOrNumber, ChainSpec}; -use reth_provider::{BlockExecutionWriter, ProviderFactory}; +use reth_beacon_consensus::BeaconConsensus; +use reth_config::{Config, PruneConfig}; +use reth_db::{database::Database, open_db}; +use reth_downloaders::{ + bodies::bodies::BodiesDownloaderBuilder, + headers::reverse_headers::ReverseHeadersDownloaderBuilder, +}; +use reth_interfaces::consensus::Consensus; +use reth_node_core::{ + args::{get_secret_key, NetworkArgs}, + dirs::ChainPath, +}; +use reth_node_ethereum::EthEvmConfig; +use reth_primitives::{BlockHashOrNumber, ChainSpec, PruneModes, B256}; +use reth_provider::{ + BlockExecutionWriter, BlockNumReader, ChainSpecProvider, HeaderSyncMode, ProviderFactory, +}; +use reth_prune::PrunerBuilder; +use reth_stages::{ + sets::DefaultStages, + stages::{ + AccountHashingStage, ExecutionStage, ExecutionStageThresholds, IndexAccountHistoryStage, + IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage, StorageHashingStage, + TransactionLookupStage, + }, + Pipeline, StageSet, +}; +use reth_static_file::StaticFileProducer; use std::{ops::RangeInclusive, sync::Arc}; +use tokio::sync::watch; +use tracing::info; /// `reth stage unwind` command #[derive(Debug, Parser)] @@ -42,6 +69,9 @@ pub struct Command { #[command(flatten)] db: DatabaseArgs, + #[command(flatten)] + network: NetworkArgs, + #[command(subcommand)] command: Subcommands, } @@ -55,28 +85,150 @@ impl Command { if !db_path.exists() { eyre::bail!("Database {db_path:?} does not exist.") } + let config_path = data_dir.config_path(); + let config: Config = confy::load_path(config_path).unwrap_or_default(); - let db = open_db(db_path.as_ref(), self.db.database_args())?; - - let range = self.command.unwind_range(&db)?; + let db = Arc::new(open_db(db_path.as_ref(), self.db.database_args())?); + let provider_factory = + ProviderFactory::new(db, self.chain.clone(), data_dir.static_files_path())?; + let range = self.command.unwind_range(provider_factory.clone())?; if *range.start() == 0 { eyre::bail!("Cannot unwind genesis block") } - let factory = ProviderFactory::new(&db, self.chain.clone(), data_dir.static_files_path())?; - let provider = factory.provider_rw()?; + // Only execute a pipeline unwind if the start of the range overlaps the existing static + // files. If that's the case, then copy all available data from MDBX to static files, and + // only then, proceed with the unwind. + if let Some(highest_static_block) = provider_factory + .static_file_provider() + .get_highest_static_files() + .max() + .filter(|highest_static_file_block| highest_static_file_block >= range.start()) + { + info!(target: "reth::cli", ?range, ?highest_static_block, "Executing a pipeline unwind."); + let mut pipeline = + self.build_pipeline(data_dir, config, provider_factory.clone()).await?; - let blocks_and_execution = provider - .take_block_and_execution_range(&self.chain, range) - .map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?; + // Move all applicable data from database to static files. + pipeline.produce_static_files()?; - provider.commit()?; + // Run the pruner so we don't potentially end up with higher height in the database vs + // static files. + let mut pruner = PrunerBuilder::new(PruneConfig::default()) + .prune_delete_limit(usize::MAX) + .build(provider_factory); + pruner.run(*range.end())?; - println!("Unwound {} blocks", blocks_and_execution.len()); + pipeline.unwind((*range.start()).saturating_sub(1), None)?; + } else { + info!(target: "reth::cli", ?range, "Executing a database unwind."); + let provider = provider_factory.provider_rw()?; + + let _ = provider + .take_block_and_execution_range(&self.chain, range.clone()) + .map_err(|err| eyre::eyre!("Transaction error on unwind: {err}"))?; + + provider.commit()?; + } + + println!("Unwound {} blocks", range.count()); Ok(()) } + + async fn build_pipeline( + self, + data_dir: ChainPath, + config: Config, + provider_factory: ProviderFactory>, + ) -> Result>, eyre::Error> { + // Even though we are not planning to download anything, we need to initialize Body and + // Header stage with a network client + let network_secret_path = + self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret_path()); + let p2p_secret_key = get_secret_key(&network_secret_path)?; + let default_peers_path = data_dir.known_peers_path(); + let network = self + .network + .network_config( + &config, + provider_factory.chain_spec(), + p2p_secret_key, + default_peers_path, + ) + .build(provider_factory.clone()) + .start_network() + .await?; + + let consensus: Arc = + Arc::new(BeaconConsensus::new(provider_factory.chain_spec())); + + // building network downloaders using the fetch client + let fetch_client = network.fetch_client().await?; + let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers) + .build(fetch_client.clone(), Arc::clone(&consensus)); + let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies).build( + fetch_client, + Arc::clone(&consensus), + provider_factory.clone(), + ); + let stage_conf = &config.stages; + + let (tip_tx, tip_rx) = watch::channel(B256::ZERO); + let factory = reth_revm::EvmProcessorFactory::new( + provider_factory.chain_spec(), + EthEvmConfig::default(), + ); + + let header_mode = HeaderSyncMode::Tip(tip_rx); + let pipeline = Pipeline::builder() + .with_tip_sender(tip_tx) + .add_stages( + DefaultStages::new( + provider_factory.clone(), + header_mode, + Arc::clone(&consensus), + header_downloader, + body_downloader, + factory.clone(), + stage_conf.etl.clone(), + ) + .set(SenderRecoveryStage { + commit_threshold: stage_conf.sender_recovery.commit_threshold, + }) + .set(ExecutionStage::new( + factory, + ExecutionStageThresholds { + max_blocks: None, + max_changes: None, + max_cumulative_gas: None, + max_duration: None, + }, + stage_conf + .merkle + .clean_threshold + .max(stage_conf.account_hashing.clean_threshold) + .max(stage_conf.storage_hashing.clean_threshold), + config.prune.clone().map(|prune| prune.segments).unwrap_or_default(), + )) + .set(AccountHashingStage::default()) + .set(StorageHashingStage::default()) + .set(MerkleStage::default_unwind()) + .set(TransactionLookupStage::default()) + .set(IndexAccountHistoryStage::default()) + .set(IndexStorageHistoryStage::default()), + ) + .build( + provider_factory.clone(), + StaticFileProducer::new( + provider_factory.clone(), + provider_factory.static_file_provider(), + PruneModes::default(), + ), + ); + Ok(pipeline) + } } /// `reth stage unwind` subcommand @@ -94,21 +246,22 @@ impl Subcommands { /// Returns the block range to unwind. /// /// This returns an inclusive range: [target..=latest] - fn unwind_range(&self, db: DB) -> eyre::Result> { - let tx = db.tx()?; - let mut cursor = tx.cursor_read::()?; - let last = cursor.last()?.ok_or_else(|| eyre::eyre!("No blocks in database"))?; - + fn unwind_range( + &self, + factory: ProviderFactory, + ) -> eyre::Result> { + let provider = factory.provider()?; + let last = provider.last_block_number()?; let target = match self { Subcommands::ToBlock { target } => match target { - BlockHashOrNumber::Hash(hash) => tx - .get::(*hash)? + BlockHashOrNumber::Hash(hash) => provider + .block_number(*hash)? .ok_or_else(|| eyre::eyre!("Block hash not found in database: {hash:?}"))?, BlockHashOrNumber::Number(num) => *num, }, - Subcommands::NumBlocks { amount } => last.0.saturating_sub(*amount), + Subcommands::NumBlocks { amount } => last.saturating_sub(*amount), } + 1; - Ok(target..=last.0) + Ok(target..=last) } } diff --git a/crates/primitives/src/static_file/mod.rs b/crates/primitives/src/static_file/mod.rs index fe15bd1c7..e7e9e47fd 100644 --- a/crates/primitives/src/static_file/mod.rs +++ b/crates/primitives/src/static_file/mod.rs @@ -44,6 +44,11 @@ impl HighestStaticFiles { StaticFileSegment::Receipts => &mut self.receipts, } } + + /// Returns the maximum block of all segments. + pub fn max(&self) -> Option { + [self.headers, self.transactions, self.receipts].iter().filter_map(|&option| option).max() + } } /// Each static file has a fixed number of blocks. This gives out the range where the requested diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index e4ad70fac..eb1f40cbd 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -232,7 +232,7 @@ where /// /// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the /// lock is occupied. - fn produce_static_files(&mut self) -> RethResult<()> { + pub fn produce_static_files(&mut self) -> RethResult<()> { let mut static_file_producer = self.static_file_producer.lock(); let provider = self.provider_factory.provider()?; diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 8d4519af2..b52274b1e 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -13,9 +13,12 @@ use reth_interfaces::{ }; use reth_primitives::{ stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, - StaticFileSegment, + StaticFileSegment, TxNumber, +}; +use reth_provider::{ + providers::{StaticFileProvider, StaticFileWriter}, + BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, StatsReader, }; -use reth_provider::{providers::StaticFileWriter, DatabaseProviderRW, HeaderProvider, StatsReader}; use std::{ cmp::Ordering, task::{ready, Context, Poll}, @@ -145,17 +148,11 @@ impl Stage for BodyStage { // error will trigger an unwind, that will bring the database to the same height as the // static files. Ordering::Less => { - let last_block = static_file_provider - .get_highest_static_file_block(StaticFileSegment::Transactions) - .unwrap_or_default(); - - let missing_block = - Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default()); - - return Err(StageError::MissingStaticFileData { - block: missing_block, - segment: StaticFileSegment::Transactions, - }) + return Err(missing_static_data_error( + next_static_file_tx_num.saturating_sub(1), + static_file_provider, + provider, + )?) } Ordering::Equal => {} } @@ -311,17 +308,11 @@ impl Stage for BodyStage { // If there are more transactions on database, then we are missing static file data and we // need to unwind further. if db_tx_num > static_file_tx_num { - let last_block = static_file_provider - .get_highest_static_file_block(StaticFileSegment::Transactions) - .unwrap_or_default(); - - let missing_block = - Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default()); - - return Err(StageError::MissingStaticFileData { - block: missing_block, - segment: StaticFileSegment::Transactions, - }) + return Err(missing_static_data_error( + static_file_tx_num, + static_file_provider, + provider, + )?) } // Unwinds static file @@ -335,6 +326,37 @@ impl Stage for BodyStage { } } +fn missing_static_data_error( + last_tx_num: TxNumber, + static_file_provider: &StaticFileProvider, + provider: &DatabaseProviderRW, +) -> Result { + let mut last_block = static_file_provider + .get_highest_static_file_block(StaticFileSegment::Transactions) + .unwrap_or_default(); + + // To be extra safe, we make sure that the last tx num matches the last block from its indices. + // If not, get it. + loop { + if let Some(indices) = provider.block_body_indices(last_block)? { + if indices.last_tx_num() <= last_tx_num { + break + } + } + if last_block == 0 { + break + } + last_block -= 1; + } + + let missing_block = Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default()); + + Ok(StageError::MissingStaticFileData { + block: missing_block, + segment: StaticFileSegment::Transactions, + }) +} + // TODO(alexey): ideally, we want to measure Bodies stage progress in bytes, but it's hard to know // beforehand how many bytes we need to download. So the good solution would be to measure the // progress in gas as a proxy to size. Execution stage uses a similar approach. diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 95f5cc2d4..81aef2ad3 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -578,13 +578,30 @@ where start_block.saturating_sub(1), )?, Ordering::Less => { - let last_block = static_file_provider + let mut last_block = static_file_provider .get_highest_static_file_block(StaticFileSegment::Receipts) .unwrap_or(0); - let missing_block = Box::new( - tx.get::(last_block + 1)?.unwrap_or_default().seal_slow(), - ); + let last_receipt_num = static_file_provider + .get_highest_static_file_tx(StaticFileSegment::Receipts) + .unwrap_or(0); + + // To be extra safe, we make sure that the last receipt num matches the last block from + // its indices. If not, get it. + loop { + if let Some(indices) = provider.block_body_indices(last_block)? { + if indices.last_tx_num() <= last_receipt_num { + break + } + } + if last_block == 0 { + break + } + last_block -= 1; + } + + let missing_block = + Box::new(provider.sealed_header(last_block + 1)?.unwrap_or_default()); return Err(StageError::MissingStaticFileData { block: missing_block,