chore: remove HeaderSyncMode::Continuous & debug.continuous (#8714)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Oliver Nordbjerg <hi@notbjerg.me>
This commit is contained in:
Pelle
2024-06-12 09:09:45 -06:00
committed by GitHub
parent fcd28f69a8
commit 0de932d174
21 changed files with 66 additions and 165 deletions

View File

@ -14,13 +14,12 @@ use reth_node_core::{
}, },
dirs::{ChainPath, DataDirPath}, dirs::{ChainPath, DataDirPath},
}; };
use reth_primitives::ChainSpec; use reth_primitives::{ChainSpec, B256};
use reth_provider::{ use reth_provider::{providers::StaticFileProvider, ProviderFactory, StaticFileProviderFactory};
providers::StaticFileProvider, HeaderSyncMode, ProviderFactory, StaticFileProviderFactory,
};
use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget}; use reth_stages::{sets::DefaultStages, Pipeline, PipelineTarget};
use reth_static_file::StaticFileProducer; use reth_static_file::StaticFileProducer;
use std::{path::PathBuf, sync::Arc}; use std::{path::PathBuf, sync::Arc};
use tokio::sync::watch;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
/// Struct to hold config and datadir paths /// Struct to hold config and datadir paths
@ -127,11 +126,13 @@ impl EnvironmentArgs {
info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check."); info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
// Builds and executes an unwind-only pipeline // Builds and executes an unwind-only pipeline
let mut pipeline = Pipeline::builder() let mut pipeline = Pipeline::builder()
.add_stages(DefaultStages::new( .add_stages(DefaultStages::new(
factory.clone(), factory.clone(),
HeaderSyncMode::Continuous, tip_rx,
Arc::new(EthBeaconConsensus::new(self.chain.clone())), Arc::new(EthBeaconConsensus::new(self.chain.clone())),
NoopHeaderDownloader::default(), NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(), NoopBodiesDownloader::default(),

View File

@ -24,7 +24,7 @@ use reth_network_api::NetworkInfo;
use reth_network_p2p::{bodies::client::BodiesClient, headers::client::HeadersClient}; use reth_network_p2p::{bodies::client::BodiesClient, headers::client::HeadersClient};
use reth_primitives::{BlockHashOrNumber, BlockNumber, B256}; use reth_primitives::{BlockHashOrNumber, BlockNumber, B256};
use reth_provider::{ use reth_provider::{
BlockExecutionWriter, ChainSpecProvider, HeaderSyncMode, ProviderFactory, StageCheckpointReader, BlockExecutionWriter, ChainSpecProvider, ProviderFactory, StageCheckpointReader,
}; };
use reth_prune_types::PruneModes; use reth_prune_types::PruneModes;
use reth_stages::{ use reth_stages::{
@ -86,13 +86,12 @@ impl Command {
let (tip_tx, tip_rx) = watch::channel(B256::ZERO); let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
let executor = block_executor!(provider_factory.chain_spec()); let executor = block_executor!(provider_factory.chain_spec());
let header_mode = HeaderSyncMode::Tip(tip_rx);
let pipeline = Pipeline::builder() let pipeline = Pipeline::builder()
.with_tip_sender(tip_tx) .with_tip_sender(tip_tx)
.add_stages( .add_stages(
DefaultStages::new( DefaultStages::new(
provider_factory.clone(), provider_factory.clone(),
header_mode, tip_rx,
Arc::clone(&consensus), Arc::clone(&consensus),
header_downloader, header_downloader,
body_downloader, body_downloader,

View File

@ -155,7 +155,6 @@ impl Command {
Box::new(ctx.task_executor.clone()), Box::new(ctx.task_executor.clone()),
Box::new(network), Box::new(network),
None, None,
false,
payload_builder, payload_builder,
None, None,
u64::MAX, u64::MAX,

View File

@ -24,8 +24,8 @@ use reth_network_p2p::{
use reth_node_events::node::NodeEvent; use reth_node_events::node::NodeEvent;
use reth_primitives::B256; use reth_primitives::B256;
use reth_provider::{ use reth_provider::{
BlockNumReader, ChainSpecProvider, HeaderProvider, HeaderSyncMode, ProviderError, BlockNumReader, ChainSpecProvider, HeaderProvider, ProviderError, ProviderFactory,
ProviderFactory, StageCheckpointReader, StageCheckpointReader,
}; };
use reth_prune_types::PruneModes; use reth_prune_types::PruneModes;
use reth_stages::{prelude::*, Pipeline, StageId, StageSet}; use reth_stages::{prelude::*, Pipeline, StageId, StageSet};
@ -208,7 +208,7 @@ where
.add_stages( .add_stages(
DefaultStages::new( DefaultStages::new(
provider_factory.clone(), provider_factory.clone(),
HeaderSyncMode::Tip(tip_rx), tip_rx,
consensus.clone(), consensus.clone(),
header_downloader, header_downloader,
body_downloader, body_downloader,

View File

@ -11,7 +11,7 @@ use reth_node_core::args::NetworkArgs;
use reth_primitives::{BlockHashOrNumber, BlockNumber, B256}; use reth_primitives::{BlockHashOrNumber, BlockNumber, B256};
use reth_provider::{ use reth_provider::{
BlockExecutionWriter, BlockNumReader, ChainSpecProvider, FinalizedBlockReader, BlockExecutionWriter, BlockNumReader, ChainSpecProvider, FinalizedBlockReader,
FinalizedBlockWriter, HeaderSyncMode, ProviderFactory, StaticFileProviderFactory, FinalizedBlockWriter, ProviderFactory, StaticFileProviderFactory,
}; };
use reth_prune_types::PruneModes; use reth_prune_types::PruneModes;
use reth_stages::{ use reth_stages::{
@ -105,13 +105,12 @@ impl Command {
let (tip_tx, tip_rx) = watch::channel(B256::ZERO); let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
let executor = block_executor!(provider_factory.chain_spec()); let executor = block_executor!(provider_factory.chain_spec());
let header_mode = HeaderSyncMode::Tip(tip_rx);
let pipeline = Pipeline::builder() let pipeline = Pipeline::builder()
.with_tip_sender(tip_tx) .with_tip_sender(tip_tx)
.add_stages( .add_stages(
DefaultStages::new( DefaultStages::new(
provider_factory.clone(), provider_factory.clone(),
header_mode, tip_rx,
Arc::clone(&consensus), Arc::clone(&consensus),
NoopHeaderDownloader::default(), NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(), NoopBodiesDownloader::default(),

View File

@ -447,11 +447,6 @@ Builder:
[default: 3] [default: 3]
Debug: Debug:
--debug.continuous
Prompt the downloader to download blocks one at a time.
NOTE: This is for testing purposes only.
--debug.terminate --debug.terminate
Flag indicating whether the node should be terminated after the pipeline sync Flag indicating whether the node should be terminated after the pipeline sync

View File

@ -240,7 +240,6 @@ where
task_spawner: Box<dyn TaskSpawner>, task_spawner: Box<dyn TaskSpawner>,
sync_state_updater: Box<dyn NetworkSyncUpdater>, sync_state_updater: Box<dyn NetworkSyncUpdater>,
max_block: Option<BlockNumber>, max_block: Option<BlockNumber>,
run_pipeline_continuously: bool,
payload_builder: PayloadBuilderHandle<EngineT>, payload_builder: PayloadBuilderHandle<EngineT>,
target: Option<B256>, target: Option<B256>,
pipeline_run_threshold: u64, pipeline_run_threshold: u64,
@ -254,7 +253,6 @@ where
task_spawner, task_spawner,
sync_state_updater, sync_state_updater,
max_block, max_block,
run_pipeline_continuously,
payload_builder, payload_builder,
target, target,
pipeline_run_threshold, pipeline_run_threshold,
@ -285,7 +283,6 @@ where
task_spawner: Box<dyn TaskSpawner>, task_spawner: Box<dyn TaskSpawner>,
sync_state_updater: Box<dyn NetworkSyncUpdater>, sync_state_updater: Box<dyn NetworkSyncUpdater>,
max_block: Option<BlockNumber>, max_block: Option<BlockNumber>,
run_pipeline_continuously: bool,
payload_builder: PayloadBuilderHandle<EngineT>, payload_builder: PayloadBuilderHandle<EngineT>,
target: Option<B256>, target: Option<B256>,
pipeline_run_threshold: u64, pipeline_run_threshold: u64,
@ -299,7 +296,6 @@ where
pipeline, pipeline,
client, client,
task_spawner.clone(), task_spawner.clone(),
run_pipeline_continuously,
max_block, max_block,
blockchain.chain_spec(), blockchain.chain_spec(),
event_sender.clone(), event_sender.clone(),
@ -1448,11 +1444,6 @@ where
return Ok(()) return Ok(())
} }
// update the canon chain if continuous is enabled
if self.sync.run_pipeline_continuously() {
self.set_canonical_head(ctrl.block_number().unwrap_or_default())?;
}
let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() { let sync_target_state = match self.forkchoice_state_tracker.sync_target_state() {
Some(current_state) => current_state, Some(current_state) => current_state,
None => { None => {

View File

@ -54,8 +54,6 @@ where
/// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for /// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
/// ordering. This means the blocks will be popped from the heap with ascending block numbers. /// ordering. This means the blocks will be popped from the heap with ascending block numbers.
range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock>>, range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock>>,
/// If enabled, the pipeline will be triggered continuously, as soon as it becomes idle
run_pipeline_continuously: bool,
/// Max block after which the consensus engine would terminate the sync. Used for debugging /// Max block after which the consensus engine would terminate the sync. Used for debugging
/// purposes. /// purposes.
max_block: Option<BlockNumber>, max_block: Option<BlockNumber>,
@ -73,7 +71,6 @@ where
pipeline: Pipeline<DB>, pipeline: Pipeline<DB>,
client: Client, client: Client,
pipeline_task_spawner: Box<dyn TaskSpawner>, pipeline_task_spawner: Box<dyn TaskSpawner>,
run_pipeline_continuously: bool,
max_block: Option<BlockNumber>, max_block: Option<BlockNumber>,
chain_spec: Arc<ChainSpec>, chain_spec: Arc<ChainSpec>,
event_sender: EventSender<BeaconConsensusEngineEvent>, event_sender: EventSender<BeaconConsensusEngineEvent>,
@ -89,7 +86,6 @@ where
inflight_full_block_requests: Vec::new(), inflight_full_block_requests: Vec::new(),
inflight_block_range_requests: Vec::new(), inflight_block_range_requests: Vec::new(),
range_buffered_blocks: BinaryHeap::new(), range_buffered_blocks: BinaryHeap::new(),
run_pipeline_continuously,
event_sender, event_sender,
max_block, max_block,
metrics: EngineSyncMetrics::default(), metrics: EngineSyncMetrics::default(),
@ -122,11 +118,6 @@ where
self.update_block_download_metrics(); self.update_block_download_metrics();
} }
/// Returns whether or not the sync controller is set to run the pipeline continuously.
pub(crate) const fn run_pipeline_continuously(&self) -> bool {
self.run_pipeline_continuously
}
/// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`. /// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
#[allow(dead_code)] #[allow(dead_code)]
pub(crate) const fn is_pipeline_sync_pending(&self) -> bool { pub(crate) const fn is_pipeline_sync_pending(&self) -> bool {
@ -271,20 +262,14 @@ where
fn try_spawn_pipeline(&mut self) -> Option<EngineSyncEvent> { fn try_spawn_pipeline(&mut self) -> Option<EngineSyncEvent> {
match &mut self.pipeline_state { match &mut self.pipeline_state {
PipelineState::Idle(pipeline) => { PipelineState::Idle(pipeline) => {
let target = self.pending_pipeline_target.take(); let target = self.pending_pipeline_target.take()?;
if target.is_none() && !self.run_pipeline_continuously {
// nothing to sync
return None
}
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let pipeline = pipeline.take().expect("exists"); let pipeline = pipeline.take().expect("exists");
self.pipeline_task_spawner.spawn_critical_blocking( self.pipeline_task_spawner.spawn_critical_blocking(
"pipeline task", "pipeline task",
Box::pin(async move { Box::pin(async move {
let result = pipeline.run_as_fut(target).await; let result = pipeline.run_as_fut(Some(target)).await;
let _ = tx.send(result); let _ = tx.send(result);
}), }),
); );
@ -294,7 +279,7 @@ where
// outdated (included in the range the pipeline is syncing anyway) // outdated (included in the range the pipeline is syncing anyway)
self.clear_block_download_requests(); self.clear_block_download_requests();
Some(EngineSyncEvent::PipelineStarted(target)) Some(EngineSyncEvent::PipelineStarted(Some(target)))
} }
PipelineState::Running(_) => None, PipelineState::Running(_) => None,
} }
@ -550,8 +535,6 @@ mod tests {
pipeline, pipeline,
client, client,
Box::<TokioTaskExecutor>::default(), Box::<TokioTaskExecutor>::default(),
// run_pipeline_continuously: false here until we want to test this
false,
self.max_block, self.max_block,
chain_spec, chain_spec,
Default::default(), Default::default(),

View File

@ -25,7 +25,7 @@ use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{BlockNumber, ChainSpec, B256}; use reth_primitives::{BlockNumber, ChainSpec, B256};
use reth_provider::{ use reth_provider::{
providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec, providers::BlockchainProvider, test_utils::create_test_provider_factory_with_chain_spec,
ExecutionOutcome, HeaderSyncMode, ExecutionOutcome,
}; };
use reth_prune::Pruner; use reth_prune::Pruner;
use reth_prune_types::PruneModes; use reth_prune_types::PruneModes;
@ -371,7 +371,7 @@ where
Pipeline::builder().add_stages(DefaultStages::new( Pipeline::builder().add_stages(DefaultStages::new(
provider_factory.clone(), provider_factory.clone(),
HeaderSyncMode::Tip(tip_rx.clone()), tip_rx.clone(),
Arc::clone(&consensus), Arc::clone(&consensus),
header_downloader, header_downloader,
body_downloader, body_downloader,
@ -418,7 +418,6 @@ where
Box::<TokioTaskExecutor>::default(), Box::<TokioTaskExecutor>::default(),
Box::<NoopSyncStateUpdater>::default(), Box::<NoopSyncStateUpdater>::default(),
None, None,
false,
payload_builder, payload_builder,
None, None,
self.base_config.pipeline_run_threshold.unwrap_or(MIN_BLOCKS_FOR_PIPELINE_RUN), self.base_config.pipeline_run_threshold.unwrap_or(MIN_BLOCKS_FOR_PIPELINE_RUN),

View File

@ -8,12 +8,6 @@ use std::path::PathBuf;
#[derive(Debug, Clone, Args, PartialEq, Eq, Default)] #[derive(Debug, Clone, Args, PartialEq, Eq, Default)]
#[command(next_help_heading = "Debug")] #[command(next_help_heading = "Debug")]
pub struct DebugArgs { pub struct DebugArgs {
/// Prompt the downloader to download blocks one at a time.
///
/// NOTE: This is for testing purposes only.
#[arg(long = "debug.continuous", help_heading = "Debug", conflicts_with = "tip")]
pub continuous: bool,
/// Flag indicating whether the node should be terminated after the pipeline sync. /// Flag indicating whether the node should be terminated after the pipeline sync.
#[arg(long = "debug.terminate", help_heading = "Debug")] #[arg(long = "debug.terminate", help_heading = "Debug")]
pub terminate: bool, pub terminate: bool,
@ -21,7 +15,7 @@ pub struct DebugArgs {
/// Set the chain tip manually for testing purposes. /// Set the chain tip manually for testing purposes.
/// ///
/// NOTE: This is a temporary flag /// NOTE: This is a temporary flag
#[arg(long = "debug.tip", help_heading = "Debug", conflicts_with = "continuous")] #[arg(long = "debug.tip", help_heading = "Debug")]
pub tip: Option<B256>, pub tip: Option<B256>,
/// Runs the sync only up to the specified block. /// Runs the sync only up to the specified block.

View File

@ -239,27 +239,6 @@ impl NodeConfig {
self self
} }
/// Returns the initial pipeline target, based on whether or not the node is running in
/// `debug.tip` mode, `debug.continuous` mode, or neither.
///
/// If running in `debug.tip` mode, the configured tip is returned.
/// Otherwise, if running in `debug.continuous` mode, the genesis hash is returned.
/// Otherwise, `None` is returned. This is what the node will do by default.
pub fn initial_pipeline_target(&self, genesis_hash: B256) -> Option<B256> {
if let Some(tip) = self.debug.tip {
// Set the provided tip as the initial pipeline target.
debug!(target: "reth::cli", %tip, "Tip manually set");
Some(tip)
} else if self.debug.continuous {
// Set genesis as the initial pipeline target.
// This will allow the downloader to start
debug!(target: "reth::cli", "Continuous sync mode enabled");
Some(genesis_hash)
} else {
None
}
}
/// Returns pruning configuration. /// Returns pruning configuration.
pub fn prune_config(&self) -> Option<PruneConfig> { pub fn prune_config(&self) -> Option<PruneConfig> {
self.pruning.prune_config(&self.chain) self.pruning.prune_config(&self.chain)

View File

@ -16,9 +16,7 @@ use reth_node_core::{
node_config::NodeConfig, node_config::NodeConfig,
}; };
use reth_primitives::{BlockNumber, Chain, ChainSpec, Head, B256}; use reth_primitives::{BlockNumber, Chain, ChainSpec, Head, B256};
use reth_provider::{ use reth_provider::{providers::StaticFileProvider, ProviderFactory, StaticFileProviderFactory};
providers::StaticFileProvider, HeaderSyncMode, ProviderFactory, StaticFileProviderFactory,
};
use reth_prune::{PruneModes, PrunerBuilder}; use reth_prune::{PruneModes, PrunerBuilder};
use reth_rpc_builder::config::RethRpcServerConfig; use reth_rpc_builder::config::RethRpcServerConfig;
use reth_rpc_layer::JwtSecret; use reth_rpc_layer::JwtSecret;
@ -27,7 +25,7 @@ use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor; use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, error, info, warn}; use reth_tracing::tracing::{debug, error, info, warn};
use std::{sync::Arc, thread::available_parallelism}; use std::{sync::Arc, thread::available_parallelism};
use tokio::sync::{mpsc::Receiver, oneshot}; use tokio::sync::{mpsc::Receiver, oneshot, watch};
/// Reusable setup for launching a node. /// Reusable setup for launching a node.
/// ///
@ -316,16 +314,6 @@ impl<R> LaunchContextWith<Attached<WithConfigs, R>> {
.timeout(PrunerBuilder::DEFAULT_TIMEOUT) .timeout(PrunerBuilder::DEFAULT_TIMEOUT)
} }
/// Returns the initial pipeline target, based on whether or not the node is running in
/// `debug.tip` mode, `debug.continuous` mode, or neither.
///
/// If running in `debug.tip` mode, the configured tip is returned.
/// Otherwise, if running in `debug.continuous` mode, the genesis hash is returned.
/// Otherwise, `None` is returned. This is what the node will do by default.
pub fn initial_pipeline_target(&self) -> Option<B256> {
self.node_config().initial_pipeline_target(self.genesis_hash())
}
/// Loads the JWT secret for the engine API /// Loads the JWT secret for the engine API
pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> { pub fn auth_jwt_secret(&self) -> eyre::Result<JwtSecret> {
let default_jwt_path = self.data_dir().jwt(); let default_jwt_path = self.data_dir().jwt();
@ -377,11 +365,13 @@ where
info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check."); info!(target: "reth::cli", unwind_target = %unwind_target, "Executing an unwind after a failed storage consistency check.");
let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);
// Builds an unwind-only pipeline // Builds an unwind-only pipeline
let pipeline = Pipeline::builder() let pipeline = Pipeline::builder()
.add_stages(DefaultStages::new( .add_stages(DefaultStages::new(
factory.clone(), factory.clone(),
HeaderSyncMode::Continuous, tip_rx,
Arc::new(EthBeaconConsensus::new(self.chain_spec())), Arc::new(EthBeaconConsensus::new(self.chain_spec())),
NoopHeaderDownloader::default(), NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(), NoopBodiesDownloader::default(),

View File

@ -250,7 +250,6 @@ where
.build(); .build();
let pipeline = crate::setup::build_networked_pipeline( let pipeline = crate::setup::build_networked_pipeline(
ctx.node_config(),
&ctx.toml_config().stages, &ctx.toml_config().stages,
client.clone(), client.clone(),
consensus.clone(), consensus.clone(),
@ -273,7 +272,6 @@ where
(pipeline, Either::Left(client)) (pipeline, Either::Left(client))
} else { } else {
let pipeline = crate::setup::build_networked_pipeline( let pipeline = crate::setup::build_networked_pipeline(
ctx.node_config(),
&ctx.toml_config().stages, &ctx.toml_config().stages,
network_client.clone(), network_client.clone(),
consensus.clone(), consensus.clone(),
@ -293,7 +291,7 @@ where
let pipeline_events = pipeline.events(); let pipeline_events = pipeline.events();
let initial_target = ctx.initial_pipeline_target(); let initial_target = ctx.node_config().debug.tip;
let mut pruner_builder = let mut pruner_builder =
ctx.pruner_builder().max_reorg_depth(tree_config.max_reorg_depth() as usize); ctx.pruner_builder().max_reorg_depth(tree_config.max_reorg_depth() as usize);
@ -316,7 +314,6 @@ where
Box::new(ctx.task_executor().clone()), Box::new(ctx.task_executor().clone()),
Box::new(node_adapter.components.network().clone()), Box::new(node_adapter.components.network().clone()),
max_block, max_block,
ctx.node_config().debug.continuous,
node_adapter.components.payload_builder().clone(), node_adapter.components.payload_builder().clone(),
initial_target, initial_target,
reth_beacon_consensus::MIN_BLOCKS_FOR_PIPELINE_RUN, reth_beacon_consensus::MIN_BLOCKS_FOR_PIPELINE_RUN,

View File

@ -13,11 +13,8 @@ use reth_network_p2p::{
bodies::{client::BodiesClient, downloader::BodyDownloader}, bodies::{client::BodiesClient, downloader::BodyDownloader},
headers::{client::HeadersClient, downloader::HeaderDownloader}, headers::{client::HeadersClient, downloader::HeaderDownloader},
}; };
use reth_node_core::{ use reth_node_core::primitives::{BlockNumber, B256};
node_config::NodeConfig, use reth_provider::ProviderFactory;
primitives::{BlockNumber, B256},
};
use reth_provider::{HeaderSyncMode, ProviderFactory};
use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet}; use reth_stages::{prelude::DefaultStages, stages::ExecutionStage, Pipeline, StageSet};
use reth_static_file::StaticFileProducer; use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor; use reth_tasks::TaskExecutor;
@ -28,7 +25,6 @@ use tokio::sync::watch;
/// Constructs a [Pipeline] that's wired to the network /// Constructs a [Pipeline] that's wired to the network
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn build_networked_pipeline<DB, Client, Executor>( pub async fn build_networked_pipeline<DB, Client, Executor>(
node_config: &NodeConfig,
config: &StageConfig, config: &StageConfig,
client: Client, client: Client,
consensus: Arc<dyn Consensus>, consensus: Arc<dyn Consensus>,
@ -56,7 +52,6 @@ where
.into_task_with(task_executor); .into_task_with(task_executor);
let pipeline = build_pipeline( let pipeline = build_pipeline(
node_config,
provider_factory, provider_factory,
config, config,
header_downloader, header_downloader,
@ -77,7 +72,6 @@ where
/// Builds the [Pipeline] with the given [`ProviderFactory`] and downloaders. /// Builds the [Pipeline] with the given [`ProviderFactory`] and downloaders.
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn build_pipeline<DB, H, B, Executor>( pub async fn build_pipeline<DB, H, B, Executor>(
node_config: &NodeConfig,
provider_factory: ProviderFactory<DB>, provider_factory: ProviderFactory<DB>,
stage_config: &StageConfig, stage_config: &StageConfig,
header_downloader: H, header_downloader: H,
@ -107,18 +101,13 @@ where
let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default(); let prune_modes = prune_config.map(|prune| prune.segments).unwrap_or_default();
let header_mode = if node_config.debug.continuous {
HeaderSyncMode::Continuous
} else {
HeaderSyncMode::Tip(tip_rx)
};
let pipeline = builder let pipeline = builder
.with_tip_sender(tip_tx) .with_tip_sender(tip_tx)
.with_metrics_tx(metrics_tx.clone()) .with_metrics_tx(metrics_tx.clone())
.add_stages( .add_stages(
DefaultStages::new( DefaultStages::new(
provider_factory.clone(), provider_factory.clone(),
header_mode, tip_rx,
Arc::clone(&consensus), Arc::clone(&consensus),
header_downloader, header_downloader,
body_downloader, body_downloader,

View File

@ -26,7 +26,6 @@
//! # use reth_evm_ethereum::EthEvmConfig; //! # use reth_evm_ethereum::EthEvmConfig;
//! # use reth_provider::ProviderFactory; //! # use reth_provider::ProviderFactory;
//! # use reth_provider::StaticFileProviderFactory; //! # use reth_provider::StaticFileProviderFactory;
//! # use reth_provider::HeaderSyncMode;
//! # use reth_provider::test_utils::create_test_provider_factory; //! # use reth_provider::test_utils::create_test_provider_factory;
//! # use reth_static_file::StaticFileProducer; //! # use reth_static_file::StaticFileProducer;
//! # use reth_config::config::StageConfig; //! # use reth_config::config::StageConfig;
@ -57,7 +56,7 @@
//! .with_tip_sender(tip_tx) //! .with_tip_sender(tip_tx)
//! .add_stages(DefaultStages::new( //! .add_stages(DefaultStages::new(
//! provider_factory.clone(), //! provider_factory.clone(),
//! HeaderSyncMode::Tip(tip_rx), //! tip_rx,
//! consensus, //! consensus,
//! headers_downloader, //! headers_downloader,
//! bodies_downloader, //! bodies_downloader,

View File

@ -46,9 +46,11 @@ use reth_consensus::Consensus;
use reth_db_api::database::Database; use reth_db_api::database::Database;
use reth_evm::execute::BlockExecutorProvider; use reth_evm::execute::BlockExecutorProvider;
use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader}; use reth_network_p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader};
use reth_provider::{HeaderSyncGapProvider, HeaderSyncMode}; use reth_primitives::B256;
use reth_provider::HeaderSyncGapProvider;
use reth_prune_types::PruneModes; use reth_prune_types::PruneModes;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::watch;
/// A set containing all stages to run a fully syncing instance of reth. /// A set containing all stages to run a fully syncing instance of reth.
/// ///
@ -88,7 +90,7 @@ impl<Provider, H, B, E> DefaultStages<Provider, H, B, E> {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub fn new( pub fn new(
provider: Provider, provider: Provider,
header_mode: HeaderSyncMode, tip: watch::Receiver<B256>,
consensus: Arc<dyn Consensus>, consensus: Arc<dyn Consensus>,
header_downloader: H, header_downloader: H,
body_downloader: B, body_downloader: B,
@ -102,7 +104,7 @@ impl<Provider, H, B, E> DefaultStages<Provider, H, B, E> {
Self { Self {
online: OnlineStages::new( online: OnlineStages::new(
provider, provider,
header_mode, tip,
consensus, consensus,
header_downloader, header_downloader,
body_downloader, body_downloader,
@ -159,8 +161,8 @@ where
pub struct OnlineStages<Provider, H, B> { pub struct OnlineStages<Provider, H, B> {
/// Sync gap provider for the headers stage. /// Sync gap provider for the headers stage.
provider: Provider, provider: Provider,
/// The sync mode for the headers stage. /// The tip for the headers stage.
header_mode: HeaderSyncMode, tip: watch::Receiver<B256>,
/// The consensus engine used to validate incoming data. /// The consensus engine used to validate incoming data.
consensus: Arc<dyn Consensus>, consensus: Arc<dyn Consensus>,
/// The block header downloader /// The block header downloader
@ -175,13 +177,13 @@ impl<Provider, H, B> OnlineStages<Provider, H, B> {
/// Create a new set of online stages with default values. /// Create a new set of online stages with default values.
pub fn new( pub fn new(
provider: Provider, provider: Provider,
header_mode: HeaderSyncMode, tip: watch::Receiver<B256>,
consensus: Arc<dyn Consensus>, consensus: Arc<dyn Consensus>,
header_downloader: H, header_downloader: H,
body_downloader: B, body_downloader: B,
stages_config: StageConfig, stages_config: StageConfig,
) -> Self { ) -> Self {
Self { provider, header_mode, consensus, header_downloader, body_downloader, stages_config } Self { provider, tip, consensus, header_downloader, body_downloader, stages_config }
} }
} }
@ -203,7 +205,7 @@ where
pub fn builder_with_bodies<DB: Database>( pub fn builder_with_bodies<DB: Database>(
bodies: BodyStage<B>, bodies: BodyStage<B>,
provider: Provider, provider: Provider,
mode: HeaderSyncMode, tip: watch::Receiver<B256>,
header_downloader: H, header_downloader: H,
consensus: Arc<dyn Consensus>, consensus: Arc<dyn Consensus>,
stages_config: StageConfig, stages_config: StageConfig,
@ -212,7 +214,7 @@ where
.add_stage(HeaderStage::new( .add_stage(HeaderStage::new(
provider, provider,
header_downloader, header_downloader,
mode, tip,
consensus.clone(), consensus.clone(),
stages_config.etl, stages_config.etl,
)) ))
@ -232,7 +234,7 @@ where
.add_stage(HeaderStage::new( .add_stage(HeaderStage::new(
self.provider, self.provider,
self.header_downloader, self.header_downloader,
self.header_mode, self.tip,
self.consensus.clone(), self.consensus.clone(),
self.stages_config.etl.clone(), self.stages_config.etl.clone(),
)) ))

View File

@ -10,11 +10,10 @@ use reth_db_api::{
}; };
use reth_etl::Collector; use reth_etl::Collector;
use reth_network_p2p::headers::{downloader::HeaderDownloader, error::HeadersDownloaderError}; use reth_network_p2p::headers::{downloader::HeaderDownloader, error::HeadersDownloaderError};
use reth_primitives::{BlockHash, BlockNumber, SealedHeader, StaticFileSegment}; use reth_primitives::{BlockHash, BlockNumber, SealedHeader, StaticFileSegment, B256};
use reth_provider::{ use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter}, providers::{StaticFileProvider, StaticFileWriter},
BlockHashReader, DatabaseProviderRW, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, BlockHashReader, DatabaseProviderRW, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
HeaderSyncMode,
}; };
use reth_stages_api::{ use reth_stages_api::{
BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput, ExecOutput, BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput, ExecOutput,
@ -25,6 +24,7 @@ use std::{
sync::Arc, sync::Arc,
task::{ready, Context, Poll}, task::{ready, Context, Poll},
}; };
use tokio::sync::watch;
use tracing::*; use tracing::*;
/// The headers stage. /// The headers stage.
@ -44,8 +44,8 @@ pub struct HeaderStage<Provider, Downloader: HeaderDownloader> {
provider: Provider, provider: Provider,
/// Strategy for downloading the headers /// Strategy for downloading the headers
downloader: Downloader, downloader: Downloader,
/// The sync mode for the stage. /// The tip for the stage.
mode: HeaderSyncMode, tip: watch::Receiver<B256>,
/// Consensus client implementation /// Consensus client implementation
consensus: Arc<dyn Consensus>, consensus: Arc<dyn Consensus>,
/// Current sync gap. /// Current sync gap.
@ -68,14 +68,14 @@ where
pub fn new( pub fn new(
database: Provider, database: Provider,
downloader: Downloader, downloader: Downloader,
mode: HeaderSyncMode, tip: watch::Receiver<B256>,
consensus: Arc<dyn Consensus>, consensus: Arc<dyn Consensus>,
etl_config: EtlConfig, etl_config: EtlConfig,
) -> Self { ) -> Self {
Self { Self {
provider: database, provider: database,
downloader, downloader,
mode, tip,
consensus, consensus,
sync_gap: None, sync_gap: None,
hash_collector: Collector::new(etl_config.file_size / 2, etl_config.dir.clone()), hash_collector: Collector::new(etl_config.file_size / 2, etl_config.dir.clone()),
@ -206,7 +206,7 @@ where
} }
// Lookup the head and tip of the sync range // Lookup the head and tip of the sync range
let gap = self.provider.sync_gap(self.mode.clone(), current_checkpoint.block_number)?; let gap = self.provider.sync_gap(self.tip.clone(), current_checkpoint.block_number)?;
let tip = gap.target.tip(); let tip = gap.target.tip();
self.sync_gap = Some(gap.clone()); self.sync_gap = Some(gap.clone());
@ -436,7 +436,7 @@ mod tests {
HeaderStage::new( HeaderStage::new(
self.db.factory.clone(), self.db.factory.clone(),
(*self.downloader_factory)(), (*self.downloader_factory)(),
HeaderSyncMode::Tip(self.channel.1.clone()), self.channel.1.clone(),
self.consensus.clone(), self.consensus.clone(),
EtlConfig::default(), EtlConfig::default(),
) )

View File

@ -3,10 +3,9 @@ use crate::{
to_range, to_range,
traits::{BlockSource, ReceiptProvider}, traits::{BlockSource, ReceiptProvider},
BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory, BlockHashReader, BlockNumReader, BlockReader, ChainSpecProvider, DatabaseProviderFactory,
EvmEnvProvider, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HeaderSyncMode, EvmEnvProvider, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, ProviderError,
ProviderError, PruneCheckpointReader, RequestsProvider, StageCheckpointReader, PruneCheckpointReader, RequestsProvider, StageCheckpointReader, StateProviderBox,
StateProviderBox, StaticFileProviderFactory, TransactionVariant, TransactionsProvider, StaticFileProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider,
WithdrawalsProvider,
}; };
use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv}; use reth_db::{init_db, mdbx::DatabaseArguments, DatabaseEnv};
use reth_db_api::{database::Database, models::StoredBlockBodyIndices}; use reth_db_api::{database::Database, models::StoredBlockBodyIndices};
@ -27,6 +26,7 @@ use std::{
path::Path, path::Path,
sync::Arc, sync::Arc,
}; };
use tokio::sync::watch;
use tracing::trace; use tracing::trace;
mod metrics; mod metrics;
@ -165,10 +165,10 @@ impl<DB> StaticFileProviderFactory for ProviderFactory<DB> {
impl<DB: Database> HeaderSyncGapProvider for ProviderFactory<DB> { impl<DB: Database> HeaderSyncGapProvider for ProviderFactory<DB> {
fn sync_gap( fn sync_gap(
&self, &self,
mode: HeaderSyncMode, tip: watch::Receiver<B256>,
highest_uninterrupted_block: BlockNumber, highest_uninterrupted_block: BlockNumber,
) -> ProviderResult<HeaderSyncGap> { ) -> ProviderResult<HeaderSyncGap> {
self.provider()?.sync_gap(mode, highest_uninterrupted_block) self.provider()?.sync_gap(tip, highest_uninterrupted_block)
} }
} }
@ -592,8 +592,7 @@ mod tests {
use crate::{ use crate::{
providers::{StaticFileProvider, StaticFileWriter}, providers::{StaticFileProvider, StaticFileWriter},
test_utils::create_test_provider_factory, test_utils::create_test_provider_factory,
BlockHashReader, BlockNumReader, BlockWriter, HeaderSyncGapProvider, HeaderSyncMode, BlockHashReader, BlockNumReader, BlockWriter, HeaderSyncGapProvider, TransactionsProvider,
TransactionsProvider,
}; };
use alloy_rlp::Decodable; use alloy_rlp::Decodable;
use assert_matches::assert_matches; use assert_matches::assert_matches;
@ -748,7 +747,6 @@ mod tests {
let mut rng = generators::rng(); let mut rng = generators::rng();
let consensus_tip = rng.gen(); let consensus_tip = rng.gen();
let (_tip_tx, tip_rx) = watch::channel(consensus_tip); let (_tip_tx, tip_rx) = watch::channel(consensus_tip);
let mode = HeaderSyncMode::Tip(tip_rx);
// Genesis // Genesis
let checkpoint = 0; let checkpoint = 0;
@ -756,7 +754,7 @@ mod tests {
// Empty database // Empty database
assert_matches!( assert_matches!(
provider.sync_gap(mode.clone(), checkpoint), provider.sync_gap(tip_rx.clone(), checkpoint),
Err(ProviderError::HeaderNotFound(block_number)) Err(ProviderError::HeaderNotFound(block_number))
if block_number.as_number().unwrap() == checkpoint if block_number.as_number().unwrap() == checkpoint
); );
@ -768,7 +766,7 @@ mod tests {
static_file_writer.commit().unwrap(); static_file_writer.commit().unwrap();
drop(static_file_writer); drop(static_file_writer);
let gap = provider.sync_gap(mode, checkpoint).unwrap(); let gap = provider.sync_gap(tip_rx, checkpoint).unwrap();
assert_eq!(gap.local_head, head); assert_eq!(gap.local_head, head);
assert_eq!(gap.target.tip(), consensus_tip.into()); assert_eq!(gap.target.tip(), consensus_tip.into());
} }

View File

@ -7,8 +7,8 @@ use crate::{
}, },
AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter, AccountReader, BlockExecutionWriter, BlockHashReader, BlockNumReader, BlockReader, BlockWriter,
Chain, EvmEnvProvider, FinalizedBlockReader, FinalizedBlockWriter, HashingWriter, Chain, EvmEnvProvider, FinalizedBlockReader, FinalizedBlockWriter, HashingWriter,
HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HeaderSyncMode, HistoricalStateProvider, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HistoricalStateProvider, HistoryWriter,
HistoryWriter, LatestStateProvider, OriginalValuesKnown, ProviderError, PruneCheckpointReader, LatestStateProvider, OriginalValuesKnown, ProviderError, PruneCheckpointReader,
PruneCheckpointWriter, RequestsProvider, StageCheckpointReader, StateProviderBox, StateWriter, PruneCheckpointWriter, RequestsProvider, StageCheckpointReader, StateProviderBox, StateWriter,
StatsReader, StorageReader, TransactionVariant, TransactionsProvider, TransactionsProviderExt, StatsReader, StorageReader, TransactionVariant, TransactionsProvider, TransactionsProviderExt,
WithdrawalsProvider, WithdrawalsProvider,
@ -55,6 +55,7 @@ use std::{
sync::{mpsc, Arc}, sync::{mpsc, Arc},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use tokio::sync::watch;
use tracing::{debug, error, warn}; use tracing::{debug, error, warn};
/// A [`DatabaseProvider`] that holds a read-only database transaction. /// A [`DatabaseProvider`] that holds a read-only database transaction.
@ -1272,7 +1273,7 @@ impl<TX: DbTx> ChangeSetReader for DatabaseProvider<TX> {
impl<TX: DbTx> HeaderSyncGapProvider for DatabaseProvider<TX> { impl<TX: DbTx> HeaderSyncGapProvider for DatabaseProvider<TX> {
fn sync_gap( fn sync_gap(
&self, &self,
mode: HeaderSyncMode, tip: watch::Receiver<B256>,
highest_uninterrupted_block: BlockNumber, highest_uninterrupted_block: BlockNumber,
) -> ProviderResult<HeaderSyncGap> { ) -> ProviderResult<HeaderSyncGap> {
let static_file_provider = self.static_file_provider(); let static_file_provider = self.static_file_provider();
@ -1307,10 +1308,7 @@ impl<TX: DbTx> HeaderSyncGapProvider for DatabaseProvider<TX> {
.sealed_header(highest_uninterrupted_block)? .sealed_header(highest_uninterrupted_block)?
.ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?; .ok_or_else(|| ProviderError::HeaderNotFound(highest_uninterrupted_block.into()))?;
let target = match mode { let target = SyncTarget::Tip(*tip.borrow());
HeaderSyncMode::Tip(rx) => SyncTarget::Tip(*rx.borrow()),
HeaderSyncMode::Continuous => SyncTarget::TipNum(highest_uninterrupted_block + 1),
};
Ok(HeaderSyncGap { local_head, target }) Ok(HeaderSyncGap { local_head, target })
} }

View File

@ -3,17 +3,6 @@ use reth_primitives::{BlockHashOrNumber, BlockNumber, SealedHeader, B256};
use reth_storage_errors::provider::ProviderResult; use reth_storage_errors::provider::ProviderResult;
use tokio::sync::watch; use tokio::sync::watch;
/// The header sync mode.
#[derive(Clone, Debug)]
pub enum HeaderSyncMode {
/// A sync mode in which the stage continuously requests the downloader for
/// next blocks.
Continuous,
/// A sync mode in which the stage polls the receiver for the next tip
/// to download from.
Tip(watch::Receiver<B256>),
}
/// Represents a gap to sync: from `local_head` to `target` /// Represents a gap to sync: from `local_head` to `target`
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct HeaderSyncGap { pub struct HeaderSyncGap {
@ -38,13 +27,13 @@ impl HeaderSyncGap {
/// Client trait for determining the current headers sync gap. /// Client trait for determining the current headers sync gap.
#[auto_impl::auto_impl(&, Arc)] #[auto_impl::auto_impl(&, Arc)]
pub trait HeaderSyncGapProvider: Send + Sync { pub trait HeaderSyncGapProvider: Send + Sync {
/// Find a current sync gap for the headers depending on the [HeaderSyncMode] and the last /// Find a current sync gap for the headers depending on the last
/// uninterrupted block number. Last uninterrupted block represents the block number before /// uninterrupted block number. Last uninterrupted block represents the block number before
/// which there are no gaps. It's up to the caller to ensure that last uninterrupted block is /// which there are no gaps. It's up to the caller to ensure that last uninterrupted block is
/// determined correctly. /// determined correctly.
fn sync_gap( fn sync_gap(
&self, &self,
mode: HeaderSyncMode, tip: watch::Receiver<B256>,
highest_uninterrupted_block: BlockNumber, highest_uninterrupted_block: BlockNumber,
) -> ProviderResult<HeaderSyncGap>; ) -> ProviderResult<HeaderSyncGap>;
} }

View File

@ -13,7 +13,7 @@ mod chain_info;
pub use chain_info::CanonChainTracker; pub use chain_info::CanonChainTracker;
mod header_sync_gap; mod header_sync_gap;
pub use header_sync_gap::{HeaderSyncGap, HeaderSyncGapProvider, HeaderSyncMode}; pub use header_sync_gap::{HeaderSyncGap, HeaderSyncGapProvider};
mod state; mod state;
pub use state::StateWriter; pub use state::StateWriter;