diff --git a/bin/reth/src/commands/common.rs b/bin/reth/src/commands/common.rs index 5fd81d163..ed6a92cdb 100644 --- a/bin/reth/src/commands/common.rs +++ b/bin/reth/src/commands/common.rs @@ -14,13 +14,12 @@ use reth_node_core::{ }, dirs::{ChainPath, DataDirPath}, }; -use reth_primitives::ChainSpec; -use reth_provider::{ - providers::StaticFileProvider, HeaderSyncMode, ProviderFactory, StaticFileProviderFactory, -}; +use reth_primitives::{ChainSpec, B256}; +use reth_provider::{providers::StaticFileProvider, ProviderFactory, StaticFileProviderFactory}; use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget}; use reth_static_file::StaticFileProducer; use std::{path::PathBuf, sync::Arc}; +use tokio::sync::watch; use tracing::{debug, info, warn}; /// Struct to hold config and datadir paths @@ -127,11 +126,13 @@ impl EnvironmentArgs { info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check."); + let (_tip_tx, tip_rx) = watch::channel(B256::ZERO); + // Builds and executes an unwind-only pipeline let mut pipeline = Pipeline::builder() .add_stages(DefaultStages::new( factory.clone(), - HeaderSyncMode::Continuous, + tip_rx, Arc::new(EthBeaconConsensus::new(self.chain.clone())), NoopHeaderDownloader::default(), NoopBodiesDownloader::default(), diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index 200f212d0..c1fd4cfa5 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -24,7 +24,7 @@ use reth_network_api::NetworkInfo; use reth_network_p2p::{bodies::client::BodiesClient, headers::client::HeadersClient}; use reth_primitives::{BlockHashOrNumber, BlockNumber, B256}; use reth_provider::{ - BlockExecutionWriter, ChainSpecProvider, HeaderSyncMode, ProviderFactory, StageCheckpointReader, + BlockExecutionWriter, ChainSpecProvider, ProviderFactory, StageCheckpointReader, }; use reth_prune_types::PruneModes; use reth_stages::{ @@ -86,13 +86,12 @@ impl Command { let (tip_tx, tip_rx) = watch::channel(B256::ZERO); let executor = block_executor!(provider_factory.chain_spec()); - let header_mode = HeaderSyncMode::Tip(tip_rx); let pipeline = Pipeline::builder() .with_tip_sender(tip_tx) .add_stages( DefaultStages::new( provider_factory.clone(), - header_mode, + tip_rx, Arc::clone(&consensus), header_downloader, body_downloader, diff --git a/bin/reth/src/commands/debug_cmd/replay_engine.rs b/bin/reth/src/commands/debug_cmd/replay_engine.rs index 7d8e179d2..26c8a3558 100644 --- a/bin/reth/src/commands/debug_cmd/replay_engine.rs +++ b/bin/reth/src/commands/debug_cmd/replay_engine.rs @@ -155,7 +155,6 @@ impl Command { Box::new(ctx.task_executor.clone()), Box::new(network), None, - false, payload_builder, None, u64::MAX, diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index 156e8f4d2..baf194714 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -24,8 +24,8 @@ use reth_network_p2p::{ use reth_node_events::node::NodeEvent; use reth_primitives::B256; use reth_provider::{ - BlockNumReader, ChainSpecProvider, HeaderProvider, HeaderSyncMode, ProviderError, - ProviderFactory, StageCheckpointReader, + BlockNumReader, ChainSpecProvider, HeaderProvider, ProviderError, ProviderFactory, + StageCheckpointReader, }; use reth_prune_types::PruneModes; use reth_stages::{prelude::*, Pipeline, StageId, StageSet}; @@ -208,7 +208,7 @@ where .add_stages( DefaultStages::new( provider_factory.clone(), - HeaderSyncMode::Tip(tip_rx), + tip_rx, consensus.clone(), header_downloader, body_downloader, diff --git a/bin/reth/src/commands/stage/unwind.rs b/bin/reth/src/commands/stage/unwind.rs index 89131e5aa..157f33bff 100644 --- a/bin/reth/src/commands/stage/unwind.rs +++ b/bin/reth/src/commands/stage/unwind.rs @@ -11,7 +11,7 @@ use reth_node_core::args::NetworkArgs; use reth_primitives::{BlockHashOrNumber, BlockNumber, B256}; use reth_provider::{ BlockExecutionWriter, BlockNumReader, ChainSpecProvider, FinalizedBlockReader, - FinalizedBlockWriter, HeaderSyncMode, ProviderFactory, StaticFileProviderFactory, + FinalizedBlockWriter, ProviderFactory, StaticFileProviderFactory, }; use reth_prune_types::PruneModes; use reth_stages::{ @@ -105,13 +105,12 @@ impl Command { let (tip_tx, tip_rx) = watch::channel(B256::ZERO); let executor = block_executor!(provider_factory.chain_spec()); - let header_mode = HeaderSyncMode::Tip(tip_rx); let pipeline = Pipeline::builder() .with_tip_sender(tip_tx) .add_stages( DefaultStages::new( provider_factory.clone(), - header_mode, + tip_rx, Arc::clone(&consensus), NoopHeaderDownloader::default(), NoopBodiesDownloader::default(), diff --git a/book/cli/reth/node.md b/book/cli/reth/node.md index cd07d0692..575fe18cc 100644 --- a/book/cli/reth/node.md +++ b/book/cli/reth/node.md @@ -447,11 +447,6 @@ Builder: [default: 3] Debug: - --debug.continuous - Prompt the downloader to download blocks one at a time. - - NOTE: This is for testing purposes only. - --debug.terminate Flag indicating whether the node should be terminated after the pipeline sync diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 2badb73f4..a0ac2dbea 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -240,7 +240,6 @@ where task_spawner: Box, sync_state_updater: Box, max_block: Option, - run_pipeline_continuously: bool, payload_builder: PayloadBuilderHandle, target: Option, pipeline_run_threshold: u64, @@ -254,7 +253,6 @@ where task_spawner, sync_state_updater, max_block, - run_pipeline_continuously, payload_builder, target, pipeline_run_threshold, @@ -285,7 +283,6 @@ where task_spawner: Box, sync_state_updater: Box, max_block: Option, - run_pipeline_continuously: bool, payload_builder: PayloadBuilderHandle, target: Option, pipeline_run_threshold: u64, @@ -299,7 +296,6 @@ where pipeline, client, task_spawner.clone(), - run_pipeline_continuously, max_block, blockchain.chain_spec(), event_sender.clone(), @@ -1448,11 +1444,6 @@ where return Ok(()) } - // update the canon chain if continuous is enabled - if self.sync.run_pipeline_continuously() { - self.set_canonical_head(ctrl.block_number().unwrap_or_default())?; - } - let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() { Some(current_state) => current_state, None => { diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index 80e863340..5299a972f 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -54,8 +54,6 @@ where /// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for /// ordering. This means the blocks will be popped from the heap with ascending block numbers. range_buffered_blocks: BinaryHeap>, - /// If enabled, the pipeline will be triggered continuously, as soon as it becomes idle - run_pipeline_continuously: bool, /// Max block after which the consensus engine would terminate the sync. Used for debugging /// purposes. max_block: Option, @@ -73,7 +71,6 @@ where pipeline: Pipeline, client: Client, pipeline_task_spawner: Box, - run_pipeline_continuously: bool, max_block: Option, chain_spec: Arc, event_sender: EventSender, @@ -89,7 +86,6 @@ where inflight_full_block_requests: Vec::new(), inflight_block_range_requests: Vec::new(), range_buffered_blocks: BinaryHeap::new(), - run_pipeline_continuously, event_sender, max_block, metrics: EngineSyncMetrics::default(), @@ -122,11 +118,6 @@ where self.update_block_download_metrics(); } - /// Returns whether or not the sync controller is set to run the pipeline continuously. - pub(crate) const fn run_pipeline_continuously(&self) -> bool { - self.run_pipeline_continuously - } - /// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`. #[allow(dead_code)] pub(crate) const fn is_pipeline_sync_pending(&self) -> bool { @@ -271,20 +262,14 @@ where fn try_spawn_pipeline(&mut self) -> Option { match &mut self.pipeline_state { PipelineState::Idle(pipeline) => { - let target = self.pending_pipeline_target.take(); - - if target.is_none() && !self.run_pipeline_continuously { - // nothing to sync - return None - } - + let target = self.pending_pipeline_target.take()?; let (tx, rx) = oneshot::channel(); let pipeline = pipeline.take().expect("exists"); self.pipeline_task_spawner.spawn_critical_blocking( "pipeline task", Box::pin(async move { - let result = pipeline.run_as_fut(target).await; + let result = pipeline.run_as_fut(Some(target)).await; let _ = tx.send(result); }), ); @@ -294,7 +279,7 @@ where // outdated (included in the range the pipeline is syncing anyway) self.clear_block_download_requests(); - Some(EngineSyncEvent::PipelineStarted(target)) + Some(EngineSyncEvent::PipelineStarted(Some(target))) } PipelineState::Running(_) => None, } @@ -550,8 +535,6 @@ mod tests { pipeline, client, Box::::default(), - // run_pipeline_continuously: false here until we want to test this - false, self.max_block, chain_spec, Default::default(), diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index 05164d010..b040f87d4 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -25,7 +25,7 @@ use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_primitives::{BlockNumber, ChainSpec, B256}; use reth_provider::{ providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec, - ExecutionOutcome, HeaderSyncMode, + ExecutionOutcome, }; use reth_prune::Pruner; use reth_prune_types::PruneModes; @@ -371,7 +371,7 @@ where Pipeline::builder().add_stages(DefaultStages::new( provider_factory.clone(), - HeaderSyncMode::Tip(tip_rx.clone()), + tip_rx.clone(), Arc::clone(&consensus), header_downloader, body_downloader, @@ -418,7 +418,6 @@ where Box::::default(), Box::::default(), None, - false, payload_builder, None, self.base_config.pipeline_run_threshold.unwrap_or(MIN_BLOCKS_FOR_PIPELINE_RUN), diff --git a/crates/node-core/src/args/debug.rs b/crates/node-core/src/args/debug.rs index b132eb3a6..432bacf7d 100644 --- a/crates/node-core/src/args/debug.rs +++ b/crates/node-core/src/args/debug.rs @@ -8,12 +8,6 @@ use std::path::PathBuf; #[derive(Debug, Clone, Args, PartialEq, Eq, Default)] #[command(next_help_heading = "Debug")] pub struct DebugArgs { - /// Prompt the downloader to download blocks one at a time. - /// - /// NOTE: This is for testing purposes only. - #[arg(long = "debug.continuous", help_heading = "Debug", conflicts_with = "tip")] - pub continuous: bool, - /// Flag indicating whether the node should be terminated after the pipeline sync. #[arg(long = "debug.terminate", help_heading = "Debug")] pub terminate: bool, @@ -21,7 +15,7 @@ pub struct DebugArgs { /// Set the chain tip manually for testing purposes. /// /// NOTE: This is a temporary flag - #[arg(long = "debug.tip", help_heading = "Debug", conflicts_with = "continuous")] + #[arg(long = "debug.tip", help_heading = "Debug")] pub tip: Option, /// Runs the sync only up to the specified block. diff --git a/crates/node-core/src/node_config.rs b/crates/node-core/src/node_config.rs index b8b1ebc8b..5e6fb5e94 100644 --- a/crates/node-core/src/node_config.rs +++ b/crates/node-core/src/node_config.rs @@ -239,27 +239,6 @@ impl NodeConfig { self } - /// Returns the initial pipeline target, based on whether or not the node is running in - /// `debug.tip` mode, `debug.continuous` mode, or neither. - /// - /// If running in `debug.tip` mode, the configured tip is returned. - /// Otherwise, if running in `debug.continuous` mode, the genesis hash is returned. - /// Otherwise, `None` is returned. This is what the node will do by default. - pub fn initial_pipeline_target(&self, genesis_hash: B256) -> Option { - if let Some(tip) = self.debug.tip { - // Set the provided tip as the initial pipeline target. - debug!(target: "reth::cli", %tip, "Tip manually set"); - Some(tip) - } else if self.debug.continuous { - // Set genesis as the initial pipeline target. - // This will allow the downloader to start - debug!(target: "reth::cli", "Continuous sync mode enabled"); - Some(genesis_hash) - } else { - None - } - } - /// Returns pruning configuration. pub fn prune_config(&self) -> Option { self.pruning.prune_config(&self.chain) diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 395e5e19e..4689d8346 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -16,9 +16,7 @@ use reth_node_core::{ node_config::NodeConfig, }; use reth_primitives::{BlockNumber, Chain, ChainSpec, Head, B256}; -use reth_provider::{ - providers::StaticFileProvider, HeaderSyncMode, ProviderFactory, StaticFileProviderFactory, -}; +use reth_provider::{providers::StaticFileProvider, ProviderFactory, StaticFileProviderFactory}; use reth_prune::{PruneModes, PrunerBuilder}; use reth_rpc_builder::config::RethRpcServerConfig; use reth_rpc_layer::JwtSecret; @@ -27,7 +25,7 @@ use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; use reth_tracing::tracing::{debug, error, info, warn}; use std::{sync::Arc, thread::available_parallelism}; -use tokio::sync::{mpsc::Receiver, oneshot}; +use tokio::sync::{mpsc::Receiver, oneshot, watch}; /// Reusable setup for launching a node. /// @@ -316,16 +314,6 @@ impl LaunchContextWith> { .timeout(PrunerBuilder::DEFAULT_TIMEOUT) } - /// Returns the initial pipeline target, based on whether or not the node is running in - /// `debug.tip` mode, `debug.continuous` mode, or neither. - /// - /// If running in `debug.tip` mode, the configured tip is returned. - /// Otherwise, if running in `debug.continuous` mode, the genesis hash is returned. - /// Otherwise, `None` is returned. This is what the node will do by default. - pub fn initial_pipeline_target(&self) -> Option { - self.node_config().initial_pipeline_target(self.genesis_hash()) - } - /// Loads the JWT secret for the engine API pub fn auth_jwt_secret(&self) -> eyre::Result { let default_jwt_path = self.data_dir().jwt(); @@ -377,11 +365,13 @@ where info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check."); + let (_tip_tx, tip_rx) = watch::channel(B256::ZERO); + // Builds an unwind-only pipeline let pipeline = Pipeline::builder() .add_stages(DefaultStages::new( factory.clone(), - HeaderSyncMode::Continuous, + tip_rx, Arc::new(EthBeaconConsensus::new(self.chain_spec())), NoopHeaderDownloader::default(), NoopBodiesDownloader::default(), diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index fde21ad97..7cb1b2703 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -250,7 +250,6 @@ where .build(); let pipeline = crate::setup::build_networked_pipeline( - ctx.node_config(), &ctx.toml_config().stages, client.clone(), consensus.clone(), @@ -273,7 +272,6 @@ where (pipeline, Either::Left(client)) } else { let pipeline = crate::setup::build_networked_pipeline( - ctx.node_config(), &ctx.toml_config().stages, network_client.clone(), consensus.clone(), @@ -293,7 +291,7 @@ where let pipeline_events = pipeline.events(); - let initial_target = ctx.initial_pipeline_target(); + let initial_target = ctx.node_config().debug.tip; let mut pruner_builder = ctx.pruner_builder().max_reorg_depth(tree_config.max_reorg_depth() as usize); @@ -316,7 +314,6 @@ where Box::new(ctx.task_executor().clone()), Box::new(node_adapter.components.network().clone()), max_block, - ctx.node_config().debug.continuous, node_adapter.components.payload_builder().clone(), initial_target, reth_beacon_consensus::MIN_BLOCKS_FOR_PIPELINE_RUN, diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index b4a6fef16..cf4d090dc 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -13,11 +13,8 @@ use reth_network_p2p::{ bodies::{client::BodiesClient, downloader::BodyDownloader}, headers::{client::HeadersClient, downloader::HeaderDownloader}, }; -use reth_node_core::{ - node_config::NodeConfig, - primitives::{BlockNumber, B256}, -}; -use reth_provider::{HeaderSyncMode, ProviderFactory}; +use reth_node_core::primitives::{BlockNumber, B256}; +use reth_provider::ProviderFactory; use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet}; use reth_static_file::StaticFileProducer; use reth_tasks::TaskExecutor; @@ -28,7 +25,6 @@ use tokio::sync::watch; /// Constructs a [Pipeline] that's wired to the network #[allow(clippy::too_many_arguments)] pub async fn build_networked_pipeline( - node_config: &NodeConfig, config: &StageConfig, client: Client, consensus: Arc, @@ -56,7 +52,6 @@ where .into_task_with(task_executor); let pipeline = build_pipeline( - node_config, provider_factory, config, header_downloader, @@ -77,7 +72,6 @@ where /// Builds the [Pipeline] with the given [`ProviderFactory`] and downloaders. #[allow(clippy::too_many_arguments)] pub async fn build_pipeline( - node_config: &NodeConfig, provider_factory: ProviderFactory, stage_config: &StageConfig, header_downloader: H, @@ -107,18 +101,13 @@ where let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default(); - let header_mode = if node_config.debug.continuous { - HeaderSyncMode::Continuous - } else { - HeaderSyncMode::Tip(tip_rx) - }; let pipeline = builder .with_tip_sender(tip_tx) .with_metrics_tx(metrics_tx.clone()) .add_stages( DefaultStages::new( provider_factory.clone(), - header_mode, + tip_rx, Arc::clone(&consensus), header_downloader, body_downloader, diff --git a/crates/stages/stages/src/lib.rs b/crates/stages/stages/src/lib.rs index 4e60e168a..ca44e586f 100644 --- a/crates/stages/stages/src/lib.rs +++ b/crates/stages/stages/src/lib.rs @@ -26,7 +26,6 @@ //! # use reth_evm_ethereum::EthEvmConfig; //! # use reth_provider::ProviderFactory; //! # use reth_provider::StaticFileProviderFactory; -//! # use reth_provider::HeaderSyncMode; //! # use reth_provider::test_utils::create_test_provider_factory; //! # use reth_static_file::StaticFileProducer; //! # use reth_config::config::StageConfig; @@ -57,7 +56,7 @@ //! .with_tip_sender(tip_tx) //! .add_stages(DefaultStages::new( //! provider_factory.clone(), -//! HeaderSyncMode::Tip(tip_rx), +//! tip_rx, //! consensus, //! headers_downloader, //! bodies_downloader, diff --git a/crates/stages/stages/src/sets.rs b/crates/stages/stages/src/sets.rs index 8c82902bf..772f562a2 100644 --- a/crates/stages/stages/src/sets.rs +++ b/crates/stages/stages/src/sets.rs @@ -46,9 +46,11 @@ use reth_consensus::Consensus; use reth_db_api::database::Database; use reth_evm::execute::BlockExecutorProvider; use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader}; -use reth_provider::{HeaderSyncGapProvider, HeaderSyncMode}; +use reth_primitives::B256; +use reth_provider::HeaderSyncGapProvider; use reth_prune_types::PruneModes; use std::sync::Arc; +use tokio::sync::watch; /// A set containing all stages to run a fully syncing instance of reth. /// @@ -88,7 +90,7 @@ impl DefaultStages { #[allow(clippy::too_many_arguments)] pub fn new( provider: Provider, - header_mode: HeaderSyncMode, + tip: watch::Receiver, consensus: Arc, header_downloader: H, body_downloader: B, @@ -102,7 +104,7 @@ impl DefaultStages { Self { online: OnlineStages::new( provider, - header_mode, + tip, consensus, header_downloader, body_downloader, @@ -159,8 +161,8 @@ where pub struct OnlineStages { /// Sync gap provider for the headers stage. provider: Provider, - /// The sync mode for the headers stage. - header_mode: HeaderSyncMode, + /// The tip for the headers stage. + tip: watch::Receiver, /// The consensus engine used to validate incoming data. consensus: Arc, /// The block header downloader @@ -175,13 +177,13 @@ impl OnlineStages { /// Create a new set of online stages with default values. pub fn new( provider: Provider, - header_mode: HeaderSyncMode, + tip: watch::Receiver, consensus: Arc, header_downloader: H, body_downloader: B, stages_config: StageConfig, ) -> Self { - Self { provider, header_mode, consensus, header_downloader, body_downloader, stages_config } + Self { provider, tip, consensus, header_downloader, body_downloader, stages_config } } } @@ -203,7 +205,7 @@ where pub fn builder_with_bodies( bodies: BodyStage, provider: Provider, - mode: HeaderSyncMode, + tip: watch::Receiver, header_downloader: H, consensus: Arc, stages_config: StageConfig, @@ -212,7 +214,7 @@ where .add_stage(HeaderStage::new( provider, header_downloader, - mode, + tip, consensus.clone(), stages_config.etl, )) @@ -232,7 +234,7 @@ where .add_stage(HeaderStage::new( self.provider, self.header_downloader, - self.header_mode, + self.tip, self.consensus.clone(), self.stages_config.etl.clone(), )) diff --git a/crates/stages/stages/src/stages/headers.rs b/crates/stages/stages/src/stages/headers.rs index 21d3ad4ab..58c578e34 100644 --- a/crates/stages/stages/src/stages/headers.rs +++ b/crates/stages/stages/src/stages/headers.rs @@ -10,11 +10,10 @@ use reth_db_api::{ }; use reth_etl::Collector; use reth_network_p2p::headers::{downloader::HeaderDownloader, error::HeadersDownloaderError}; -use reth_primitives::{BlockHash, BlockNumber, SealedHeader, StaticFileSegment}; +use reth_primitives::{BlockHash, BlockNumber, SealedHeader, StaticFileSegment, B256}; use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, BlockHashReader, DatabaseProviderRW, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, - HeaderSyncMode, }; use reth_stages_api::{ BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput, ExecOutput, @@ -25,6 +24,7 @@ use std::{ sync::Arc, task::{ready, Context, Poll}, }; +use tokio::sync::watch; use tracing::*; /// The headers stage. @@ -44,8 +44,8 @@ pub struct HeaderStage { provider: Provider, /// Strategy for downloading the headers downloader: Downloader, - /// The sync mode for the stage. - mode: HeaderSyncMode, + /// The tip for the stage. + tip: watch::Receiver, /// Consensus client implementation consensus: Arc, /// Current sync gap. @@ -68,14 +68,14 @@ where pub fn new( database: Provider, downloader: Downloader, - mode: HeaderSyncMode, + tip: watch::Receiver, consensus: Arc, etl_config: EtlConfig, ) -> Self { Self { provider: database, downloader, - mode, + tip, consensus, sync_gap: None, hash_collector: Collector::new(etl_config.file_size / 2, etl_config.dir.clone()), @@ -206,7 +206,7 @@ where } // Lookup the head and tip of the sync range - let gap = self.provider.sync_gap(self.mode.clone(), current_checkpoint.block_number)?; + let gap = self.provider.sync_gap(self.tip.clone(), current_checkpoint.block_number)?; let tip = gap.target.tip(); self.sync_gap = Some(gap.clone()); @@ -436,7 +436,7 @@ mod tests { HeaderStage::new( self.db.factory.clone(), (*self.downloader_factory)(), - HeaderSyncMode::Tip(self.channel.1.clone()), + self.channel.1.clone(), self.consensus.clone(), EtlConfig::default(), ) diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index edf095124..683b50dce 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -3,10 +3,9 @@ use crate::{ to_range, traits::{BlockSource, ReceiptProvider}, BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, - EvmEnvProvider, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HeaderSyncMode, - ProviderError, PruneCheckpointReader, RequestsProvider, StageCheckpointReader, - StateProviderBox, StaticFileProviderFactory, TransactionVariant, TransactionsProvider, - WithdrawalsProvider, + EvmEnvProvider, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, ProviderError, + PruneCheckpointReader, RequestsProvider, StageCheckpointReader, StateProviderBox, + StaticFileProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider, }; use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv}; use reth_db_api::{database::Database, models::StoredBlockBodyIndices}; @@ -27,6 +26,7 @@ use std::{ path::Path, sync::Arc, }; +use tokio::sync::watch; use tracing::trace; mod metrics; @@ -165,10 +165,10 @@ impl StaticFileProviderFactory for ProviderFactory { impl HeaderSyncGapProvider for ProviderFactory { fn sync_gap( &self, - mode: HeaderSyncMode, + tip: watch::Receiver, highest_uninterrupted_block: BlockNumber, ) -> ProviderResult { - self.provider()?.sync_gap(mode, highest_uninterrupted_block) + self.provider()?.sync_gap(tip, highest_uninterrupted_block) } } @@ -592,8 +592,7 @@ mod tests { use crate::{ providers::{StaticFileProvider, StaticFileWriter}, test_utils::create_test_provider_factory, - BlockHashReader, BlockNumReader, BlockWriter, HeaderSyncGapProvider, HeaderSyncMode, - TransactionsProvider, + BlockHashReader, BlockNumReader, BlockWriter, HeaderSyncGapProvider, TransactionsProvider, }; use alloy_rlp::Decodable; use assert_matches::assert_matches; @@ -748,7 +747,6 @@ mod tests { let mut rng = generators::rng(); let consensus_tip = rng.gen(); let (_tip_tx, tip_rx) = watch::channel(consensus_tip); - let mode = HeaderSyncMode::Tip(tip_rx); // Genesis let checkpoint = 0; @@ -756,7 +754,7 @@ mod tests { // Empty database assert_matches!( - provider.sync_gap(mode.clone(), checkpoint), + provider.sync_gap(tip_rx.clone(), checkpoint), Err(ProviderError::HeaderNotFound(block_number)) if block_number.as_number().unwrap() == checkpoint ); @@ -768,7 +766,7 @@ mod tests { static_file_writer.commit().unwrap(); drop(static_file_writer); - let gap = provider.sync_gap(mode, checkpoint).unwrap(); + let gap = provider.sync_gap(tip_rx, checkpoint).unwrap(); assert_eq!(gap.local_head, head); assert_eq!(gap.target.tip(), consensus_tip.into()); } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index ee7eb72ec..01c481659 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -7,8 +7,8 @@ use crate::{ }, AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter, Chain, EvmEnvProvider, FinalizedBlockReader, FinalizedBlockWriter, HashingWriter, - HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HeaderSyncMode, HistoricalStateProvider, - HistoryWriter, LatestStateProvider, OriginalValuesKnown, ProviderError, PruneCheckpointReader, + HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HistoricalStateProvider, HistoryWriter, + LatestStateProvider, OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RequestsProvider, StageCheckpointReader, StateProviderBox, StateWriter, StatsReader, StorageReader, TransactionVariant, TransactionsProvider, TransactionsProviderExt, WithdrawalsProvider, @@ -55,6 +55,7 @@ use std::{ sync::{mpsc, Arc}, time::{Duration, Instant}, }; +use tokio::sync::watch; use tracing::{debug, error, warn}; /// A [`DatabaseProvider`] that holds a read-only database transaction. @@ -1272,7 +1273,7 @@ impl ChangeSetReader for DatabaseProvider { impl HeaderSyncGapProvider for DatabaseProvider { fn sync_gap( &self, - mode: HeaderSyncMode, + tip: watch::Receiver, highest_uninterrupted_block: BlockNumber, ) -> ProviderResult { let static_file_provider = self.static_file_provider(); @@ -1307,10 +1308,7 @@ impl HeaderSyncGapProvider for DatabaseProvider { .sealed_header(highest_uninterrupted_block)? .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?; - let target = match mode { - HeaderSyncMode::Tip(rx) => SyncTarget::Tip(*rx.borrow()), - HeaderSyncMode::Continuous => SyncTarget::TipNum(highest_uninterrupted_block + 1), - }; + let target = SyncTarget::Tip(*tip.borrow()); Ok(HeaderSyncGap { local_head, target }) } diff --git a/crates/storage/provider/src/traits/header_sync_gap.rs b/crates/storage/provider/src/traits/header_sync_gap.rs index 54556101a..faa02b39e 100644 --- a/crates/storage/provider/src/traits/header_sync_gap.rs +++ b/crates/storage/provider/src/traits/header_sync_gap.rs @@ -3,17 +3,6 @@ use reth_primitives::{BlockHashOrNumber, BlockNumber, SealedHeader, B256}; use reth_storage_errors::provider::ProviderResult; use tokio::sync::watch; -/// The header sync mode. -#[derive(Clone, Debug)] -pub enum HeaderSyncMode { - /// A sync mode in which the stage continuously requests the downloader for - /// next blocks. - Continuous, - /// A sync mode in which the stage polls the receiver for the next tip - /// to download from. - Tip(watch::Receiver), -} - /// Represents a gap to sync: from `local_head` to `target` #[derive(Clone, Debug)] pub struct HeaderSyncGap { @@ -38,13 +27,13 @@ impl HeaderSyncGap { /// Client trait for determining the current headers sync gap. #[auto_impl::auto_impl(&, Arc)] pub trait HeaderSyncGapProvider: Send + Sync { - /// Find a current sync gap for the headers depending on the [HeaderSyncMode] and the last + /// Find a current sync gap for the headers depending on the last /// uninterrupted block number. Last uninterrupted block represents the block number before /// which there are no gaps. It's up to the caller to ensure that last uninterrupted block is /// determined correctly. fn sync_gap( &self, - mode: HeaderSyncMode, + tip: watch::Receiver, highest_uninterrupted_block: BlockNumber, ) -> ProviderResult; } diff --git a/crates/storage/provider/src/traits/mod.rs b/crates/storage/provider/src/traits/mod.rs index 283ba5a48..bf69eda03 100644 --- a/crates/storage/provider/src/traits/mod.rs +++ b/crates/storage/provider/src/traits/mod.rs @@ -13,7 +13,7 @@ mod chain_info; pub use chain_info::CanonChainTracker; mod header_sync_gap; -pub use header_sync_gap::{HeaderSyncGap, HeaderSyncGapProvider, HeaderSyncMode}; +pub use header_sync_gap::{HeaderSyncGap, HeaderSyncGapProvider}; mod state; pub use state::StateWriter;