From 1184e8c45b5676e1516844bba18996cdf1562654 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Thu, 9 May 2024 13:04:14 +0100 Subject: [PATCH] chore: use `NoopBodiesDownloader` & `NoopHeaderDownloader` on `stage unwind` command (#8165) --- bin/reth/src/commands/stage/unwind.rs | 46 +++------------------- crates/net/downloaders/src/bodies/mod.rs | 3 ++ crates/net/downloaders/src/bodies/noop.rs | 29 ++++++++++++++ crates/net/downloaders/src/headers/mod.rs | 3 ++ crates/net/downloaders/src/headers/noop.rs | 30 ++++++++++++++ 5 files changed, 70 insertions(+), 41 deletions(-) create mode 100644 crates/net/downloaders/src/bodies/noop.rs create mode 100644 crates/net/downloaders/src/headers/noop.rs diff --git a/bin/reth/src/commands/stage/unwind.rs b/bin/reth/src/commands/stage/unwind.rs index b7998d087..3a6597499 100644 --- a/bin/reth/src/commands/stage/unwind.rs +++ b/bin/reth/src/commands/stage/unwind.rs @@ -5,15 +5,9 @@ use reth_beacon_consensus::EthBeaconConsensus; use reth_config::{Config, PruneConfig}; use reth_consensus::Consensus; use reth_db::{database::Database, open_db}; -use reth_downloaders::{ - bodies::bodies::BodiesDownloaderBuilder, - headers::reverse_headers::ReverseHeadersDownloaderBuilder, -}; +use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader}; use reth_exex::ExExManagerHandle; -use reth_node_core::{ - args::{get_secret_key, NetworkArgs}, - dirs::ChainPath, -}; +use reth_node_core::args::NetworkArgs; use reth_primitives::{BlockHashOrNumber, ChainSpec, PruneModes, B256}; use reth_provider::{ BlockExecutionWriter, BlockNumReader, ChainSpecProvider, HeaderSyncMode, ProviderFactory, @@ -110,8 +104,7 @@ impl Command { .filter(|highest_static_file_block| highest_static_file_block >= range.start()) { info!(target: "reth::cli", ?range, ?highest_static_block, "Executing a pipeline unwind."); - let mut pipeline = - self.build_pipeline(data_dir, config, provider_factory.clone()).await?; + let mut pipeline = self.build_pipeline(config, provider_factory.clone()).await?; // Move all applicable data from database to static files. pipeline.produce_static_files()?; @@ -142,40 +135,11 @@ impl Command { async fn build_pipeline( self, - data_dir: ChainPath, config: Config, provider_factory: ProviderFactory>, ) -> Result>, eyre::Error> { - // Even though we are not planning to download anything, we need to initialize Body and - // Header stage with a network client - let network_secret_path = - self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret()); - let p2p_secret_key = get_secret_key(&network_secret_path)?; - let default_peers_path = data_dir.known_peers(); - let network = self - .network - .network_config( - &config, - provider_factory.chain_spec(), - p2p_secret_key, - default_peers_path, - ) - .build(provider_factory.clone()) - .start_network() - .await?; - let consensus: Arc = Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec())); - - // building network downloaders using the fetch client - let fetch_client = network.fetch_client().await?; - let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers) - .build(fetch_client.clone(), Arc::clone(&consensus)); - let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies).build( - fetch_client, - Arc::clone(&consensus), - provider_factory.clone(), - ); let stage_conf = &config.stages; let (tip_tx, tip_rx) = watch::channel(B256::ZERO); @@ -189,8 +153,8 @@ impl Command { provider_factory.clone(), header_mode, Arc::clone(&consensus), - header_downloader, - body_downloader, + NoopHeaderDownloader::default(), + NoopBodiesDownloader::default(), executor.clone(), stage_conf.etl.clone(), ) diff --git a/crates/net/downloaders/src/bodies/mod.rs b/crates/net/downloaders/src/bodies/mod.rs index f8931ea81..d4f613413 100644 --- a/crates/net/downloaders/src/bodies/mod.rs +++ b/crates/net/downloaders/src/bodies/mod.rs @@ -2,6 +2,9 @@ #[allow(clippy::module_inception)] pub mod bodies; +/// A body downloader that does nothing. Useful to build unwind-only pipelines. +pub mod noop; + /// A downloader implementation that spawns a downloader to a task pub mod task; diff --git a/crates/net/downloaders/src/bodies/noop.rs b/crates/net/downloaders/src/bodies/noop.rs new file mode 100644 index 000000000..5885a17c1 --- /dev/null +++ b/crates/net/downloaders/src/bodies/noop.rs @@ -0,0 +1,29 @@ +use futures::Stream; +use reth_interfaces::p2p::{ + bodies::{downloader::BodyDownloader, response::BlockResponse}, + error::{DownloadError, DownloadResult}, +}; +use reth_primitives::BlockNumber; +use std::ops::RangeInclusive; + +/// A [BodyDownloader] implementation that does nothing. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct NoopBodiesDownloader; + +impl BodyDownloader for NoopBodiesDownloader { + fn set_download_range(&mut self, _: RangeInclusive) -> DownloadResult<()> { + Ok(()) + } +} + +impl Stream for NoopBodiesDownloader { + type Item = Result, DownloadError>; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + panic!("NoopBodiesDownloader shouldn't be polled.") + } +} diff --git a/crates/net/downloaders/src/headers/mod.rs b/crates/net/downloaders/src/headers/mod.rs index 4321ef52b..a261f5579 100644 --- a/crates/net/downloaders/src/headers/mod.rs +++ b/crates/net/downloaders/src/headers/mod.rs @@ -1,6 +1,9 @@ /// A Linear downloader implementation. pub mod reverse_headers; +/// A header downloader that does nothing. Useful to build unwind-only pipelines. +pub mod noop; + /// A downloader implementation that spawns a downloader to a task pub mod task; diff --git a/crates/net/downloaders/src/headers/noop.rs b/crates/net/downloaders/src/headers/noop.rs new file mode 100644 index 000000000..8127cc232 --- /dev/null +++ b/crates/net/downloaders/src/headers/noop.rs @@ -0,0 +1,30 @@ +use futures::Stream; +use reth_interfaces::p2p::headers::{ + downloader::{HeaderDownloader, SyncTarget}, + error::HeadersDownloaderError, +}; +use reth_primitives::SealedHeader; + +/// A [HeaderDownloader] implementation that does nothing. +#[derive(Debug, Default)] +#[non_exhaustive] +pub struct NoopHeaderDownloader; + +impl HeaderDownloader for NoopHeaderDownloader { + fn update_local_head(&mut self, _: SealedHeader) {} + + fn update_sync_target(&mut self, _: SyncTarget) {} + + fn set_batch_size(&mut self, _: usize) {} +} + +impl Stream for NoopHeaderDownloader { + type Item = Result, HeadersDownloaderError>; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + panic!("NoopHeaderDownloader shouldn't be polled.") + } +}