From 9b79218c1885c8b1dd92ecaa8710cc27c0e58aa5 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Tue, 16 May 2023 23:24:40 +0300 Subject: [PATCH] fix(p2p): network sync state (#2699) --- bin/reth/src/chain/import.rs | 4 +- bin/reth/src/node/mod.rs | 14 +-- crates/consensus/beacon/src/engine/mod.rs | 53 ++++++++-- crates/interfaces/src/p2p/headers/client.rs | 17 +--- crates/interfaces/src/sync.rs | 17 ++-- crates/interfaces/src/test_utils/headers.rs | 25 +---- .../downloaders/src/test_utils/file_client.rs | 20 +--- crates/net/network/src/network.rs | 19 ++-- crates/net/network/src/transactions.rs | 2 +- crates/net/network/tests/it/connect.rs | 2 +- crates/stages/src/lib.rs | 5 +- crates/stages/src/pipeline/builder.rs | 20 +--- crates/stages/src/pipeline/mod.rs | 13 --- crates/stages/src/sets.rs | 26 ++--- crates/stages/src/stages/finish.rs | 96 ++----------------- 15 files changed, 94 insertions(+), 239 deletions(-) diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index 8d6995559..64a4cfae1 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -12,7 +12,7 @@ use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient, }; -use reth_interfaces::{consensus::Consensus, p2p::headers::client::NoopStatusUpdater}; +use reth_interfaces::consensus::Consensus; use reth_primitives::{ChainSpec, H256}; use reth_staged_sync::{ utils::{ @@ -157,14 +157,12 @@ impl ImportCommand { .with_tip_sender(tip_tx) // we want to sync all blocks the file client provides or 0 if empty .with_max_block(file_client.max_block().unwrap_or(0)) - .with_sync_state_updater(file_client) .add_stages( DefaultStages::new( HeaderSyncMode::Tip(tip_rx), consensus.clone(), header_downloader, body_downloader, - NoopStatusUpdater::default(), factory.clone(), ) .set( diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 76acb985d..f7c79504c 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -34,9 +34,8 @@ use reth_interfaces::{ p2p::{ bodies::{client::BodiesClient, downloader::BodyDownloader}, either::EitherDownloader, - headers::{client::StatusUpdater, downloader::HeaderDownloader}, + headers::downloader::HeaderDownloader, }, - sync::SyncStateUpdater, }; use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager}; use reth_network_api::NetworkInfo; @@ -311,7 +310,6 @@ impl Command { let mut pipeline = self .build_networked_pipeline( &mut config, - network.clone(), client.clone(), Arc::clone(&consensus), db.clone(), @@ -329,7 +327,6 @@ impl Command { let pipeline = self .build_networked_pipeline( &mut config, - network.clone(), network_client.clone(), Arc::clone(&consensus), db.clone(), @@ -348,6 +345,7 @@ impl Command { pipeline, blockchain_db.clone(), Box::new(ctx.task_executor.clone()), + Box::new(network.clone()), self.debug.max_block, self.debug.continuous, payload_builder.clone(), @@ -417,7 +415,6 @@ impl Command { async fn build_networked_pipeline( &self, config: &mut Config, - network: NetworkHandle, client: Client, consensus: Arc, db: DB, @@ -450,7 +447,6 @@ impl Command { config, header_downloader, body_downloader, - network.clone(), consensus, max_block, self.debug.continuous, @@ -625,13 +621,12 @@ impl Command { } #[allow(clippy::too_many_arguments)] - async fn build_pipeline( + async fn build_pipeline( &self, db: DB, config: &Config, header_downloader: H, body_downloader: B, - updater: U, consensus: Arc, max_block: Option, continuous: bool, @@ -640,7 +635,6 @@ impl Command { DB: Database + Clone + 'static, H: HeaderDownloader + 'static, B: BodyDownloader + 'static, - U: SyncStateUpdater + StatusUpdater + Clone + 'static, { let stage_conf = &config.stages; @@ -673,7 +667,6 @@ impl Command { let header_mode = if continuous { HeaderSyncMode::Continuous } else { HeaderSyncMode::Tip(tip_rx) }; let pipeline = builder - .with_sync_state_updater(updater.clone()) .with_tip_sender(tip_tx) .add_stages( DefaultStages::new( @@ -681,7 +674,6 @@ impl Command { Arc::clone(&consensus), header_downloader, body_downloader, - updater, factory.clone(), ) .set( diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index a8b7d2c24..0faff51a0 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -9,11 +9,12 @@ use reth_interfaces::{ consensus::ForkchoiceState, executor::Error as ExecutorError, p2p::{bodies::client::BodiesClient, headers::client::HeadersClient}, + sync::{NetworkSyncUpdater, SyncState}, Error, }; use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle}; use reth_primitives::{ - listener::EventListeners, BlockNumber, Header, SealedBlock, SealedHeader, H256, U256, + listener::EventListeners, BlockNumber, Head, Header, SealedBlock, SealedHeader, H256, U256, }; use reth_provider::{BlockProvider, BlockSource, CanonChainTracker, ProviderError}; use reth_rpc_types::engine::{ @@ -151,6 +152,8 @@ where sync: EngineSyncController, /// The type we can use to query both the database and the blockchain tree. blockchain: BT, + /// Used for emitting updates about whether the engine is syncing or not. + sync_state_updater: Box, /// The Engine API message receiver. engine_message_rx: UnboundedReceiverStream, /// A clone of the handle @@ -183,6 +186,7 @@ where pipeline: Pipeline, blockchain: BT, task_spawner: Box, + sync_state_updater: Box, max_block: Option, run_pipeline_continuously: bool, payload_builder: PayloadBuilderHandle, @@ -194,6 +198,7 @@ where pipeline, blockchain, task_spawner, + sync_state_updater, max_block, run_pipeline_continuously, payload_builder, @@ -211,6 +216,7 @@ where pipeline: Pipeline, blockchain: BT, task_spawner: Box, + sync_state_updater: Box, max_block: Option, run_pipeline_continuously: bool, payload_builder: PayloadBuilderHandle, @@ -229,6 +235,7 @@ where db, sync, blockchain, + sync_state_updater, engine_message_rx: UnboundedReceiverStream::new(rx), handle: handle.clone(), forkchoice_state: None, @@ -398,6 +405,8 @@ where // properly updated self.update_canon_chain(&state)?; } + self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state)); + trace!(target: "consensus::engine", ?state, status = ?payload_response, "Returning forkchoice status"); return Ok(payload_response) } @@ -428,6 +437,7 @@ where } /// Sets the state of the canon chain tracker based on the given forkchoice update. + /// Additionally, updates the head used for p2p handshakes. /// /// This should be called before issuing a VALID forkchoice update. fn update_canon_chain(&self, update: &ForkchoiceState) -> Result<(), reth_interfaces::Error> { @@ -458,8 +468,19 @@ where .ok_or_else(|| { Error::Provider(ProviderError::UnknownBlockHash(update.head_block_hash)) })?; + let head = head.header.seal(update.head_block_hash); + let head_td = self.blockchain.header_td_by_number(head.number)?.ok_or_else(|| { + Error::Provider(ProviderError::TotalDifficulty { number: head.number }) + })?; - self.blockchain.set_canonical_head(head.header.seal(update.head_block_hash)); + self.sync_state_updater.update_status(Head { + number: head.number, + hash: head.hash, + difficulty: head.difficulty, + timestamp: head.timestamp, + total_difficulty: head_td, + }); + self.blockchain.set_canonical_head(head); self.blockchain.on_forkchoice_update_received(update); Ok(()) } @@ -678,6 +699,12 @@ where block_number, block_hash, )); + + // Update the network sync state to `Idle`. + // Handles the edge case where the pipeline is never triggered, because we + // are sufficiently synced. + self.sync_state_updater.update_sync_state(SyncState::Idle); + PayloadStatusEnum::Valid } BlockStatus::Accepted => { @@ -753,11 +780,14 @@ where if !self.try_insert_new_payload(block).is_valid() { // if the payload is invalid, we run the pipeline self.sync.set_pipeline_sync_target(hash); + } else { + self.sync_state_updater.update_sync_state(SyncState::Idle); } } EngineSyncEvent::PipelineStarted(target) => { trace!(target: "consensus::engine", ?target, continuous = target.is_none(), "Started the pipeline"); self.metrics.pipeline_runs.increment(1); + self.sync_state_updater.update_sync_state(SyncState::Syncing); } EngineSyncEvent::PipelineTaskDropped => { error!(target: "consensus::engine", "Failed to receive spawned pipeline"); @@ -774,11 +804,14 @@ where return Some(Ok(())) } - // Update the state and hashes of the blockchain tree if possible - if let Err(error) = self.restore_tree_if_possible(*current_state) { - error!(target: "consensus::engine", ?error, "Error restoring blockchain tree"); - return Some(Err(error.into())) - } + // Update the state and hashes of the blockchain tree if possible. + match self.restore_tree_if_possible(*current_state) { + Ok(_) => self.sync_state_updater.update_sync_state(SyncState::Idle), + Err(error) => { + error!(target: "consensus::engine", ?error, "Error restoring blockchain tree"); + return Some(Err(error.into())) + } + }; } // Any pipeline error at this point is fatal. Err(error) => return Some(Err(error.into())), @@ -877,7 +910,10 @@ mod tests { BlockchainTree, ShareableBlockchainTree, }; use reth_db::mdbx::{test_utils::create_test_rw_db, Env, WriteMap}; - use reth_interfaces::test_utils::{NoopFullBlockClient, TestConsensus}; + use reth_interfaces::{ + sync::NoopSyncStateUpdater, + test_utils::{NoopFullBlockClient, TestConsensus}, + }; use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET}; use reth_provider::{ @@ -999,6 +1035,7 @@ mod tests { pipeline, blockchain_provider, Box::::default(), + Box::::default(), None, false, payload_builder, diff --git a/crates/interfaces/src/p2p/headers/client.rs b/crates/interfaces/src/p2p/headers/client.rs index b95f8c477..845240cab 100644 --- a/crates/interfaces/src/p2p/headers/client.rs +++ b/crates/interfaces/src/p2p/headers/client.rs @@ -1,7 +1,7 @@ use crate::p2p::{download::DownloadClient, error::PeerRequestResult, priority::Priority}; use futures::{Future, FutureExt}; pub use reth_eth_wire::BlockHeaders; -use reth_primitives::{BlockHashOrNumber, Head, Header, HeadersDirection}; +use reth_primitives::{BlockHashOrNumber, Header, HeadersDirection}; use std::{ fmt::Debug, pin::Pin, @@ -84,18 +84,3 @@ where Poll::Ready(resp) } } - -/// The status updater for updating the status of the p2p node -pub trait StatusUpdater: Send + Sync { - /// Updates the status of the p2p node - fn update_status(&self, head: Head); -} - -/// A [StatusUpdater] implementation that does nothing. -#[derive(Debug, Clone, Default)] -#[non_exhaustive] -pub struct NoopStatusUpdater; - -impl StatusUpdater for NoopStatusUpdater { - fn update_status(&self, _: Head) {} -} diff --git a/crates/interfaces/src/sync.rs b/crates/interfaces/src/sync.rs index bd4612cbd..8becffcda 100644 --- a/crates/interfaces/src/sync.rs +++ b/crates/interfaces/src/sync.rs @@ -1,4 +1,5 @@ //! Traits used when interacting with the sync status of the network. +use reth_primitives::Head; /// A type that provides information about whether the node is currently syncing and the network is /// currently serving syncing related requests. @@ -8,7 +9,7 @@ pub trait SyncStateProvider: Send + Sync { fn is_syncing(&self) -> bool; } -/// An updater for updating the [SyncState] of the network. +/// An updater for updating the [SyncState] and status of the network. /// /// The node is either syncing, or it is idle. /// While syncing, the node will download data from the network and process it. The processing @@ -16,9 +17,12 @@ pub trait SyncStateProvider: Send + Sync { /// Eventually the node reaches the `Finish` stage and will transition to [`SyncState::Idle`], it /// which point the node is considered fully synced. #[auto_impl::auto_impl(&, Arc, Box)] -pub trait SyncStateUpdater: std::fmt::Debug + Send + Sync + 'static { +pub trait NetworkSyncUpdater: std::fmt::Debug + Send + Sync + 'static { /// Notifies about an [SyncState] update. fn update_sync_state(&self, state: SyncState); + + /// Updates the status of the p2p node + fn update_status(&self, head: Head); } /// The state the network is currently in when it comes to synchronization. @@ -41,17 +45,18 @@ impl SyncState { } } -/// A [SyncStateUpdater] implementation that does nothing. +/// A [NetworkSyncUpdater] implementation that does nothing. #[derive(Debug, Clone, Default)] #[non_exhaustive] -pub struct NoopSyncStateUpdate; +pub struct NoopSyncStateUpdater; -impl SyncStateProvider for NoopSyncStateUpdate { +impl SyncStateProvider for NoopSyncStateUpdater { fn is_syncing(&self) -> bool { false } } -impl SyncStateUpdater for NoopSyncStateUpdate { +impl NetworkSyncUpdater for NoopSyncStateUpdater { fn update_sync_state(&self, _state: SyncState) {} + fn update_status(&self, _: Head) {} } diff --git a/crates/interfaces/src/test_utils/headers.rs b/crates/interfaces/src/test_utils/headers.rs index 3ecc78e5f..b20498242 100644 --- a/crates/interfaces/src/test_utils/headers.rs +++ b/crates/interfaces/src/test_utils/headers.rs @@ -5,7 +5,7 @@ use crate::{ download::DownloadClient, error::{DownloadError, DownloadResult, PeerRequestResult, RequestError}, headers::{ - client::{HeadersClient, HeadersRequest, StatusUpdater}, + client::{HeadersClient, HeadersRequest}, downloader::{validate_header_download, HeaderDownloader, SyncTarget}, }, priority::Priority, @@ -281,29 +281,6 @@ impl TestConsensus { } } -/// Status updater for testing. -/// -/// [`TestStatusUpdater::new()`] creates a new [`TestStatusUpdater`] that is **not** shareable. This -/// struct wraps the sender side of a [`tokio::sync::watch`] channel. The receiving side of the -/// channel (which is shareable by cloning it) is also returned. -#[derive(Debug)] -pub struct TestStatusUpdater(tokio::sync::watch::Sender); - -impl TestStatusUpdater { - /// Create a new test status updater and a receiver to listen to status updates on. - pub fn new() -> (Self, tokio::sync::watch::Receiver) { - let (tx, rx) = tokio::sync::watch::channel(Head::default()); - - (Self(tx), rx) - } -} - -impl StatusUpdater for TestStatusUpdater { - fn update_status(&self, head: Head) { - self.0.send(head).expect("could not send status update"); - } -} - #[async_trait::async_trait] impl Consensus for TestConsensus { fn validate_header(&self, _header: &SealedHeader) -> Result<(), ConsensusError> { diff --git a/crates/net/downloaders/src/test_utils/file_client.rs b/crates/net/downloaders/src/test_utils/file_client.rs index 6e8a658e7..261d5ac12 100644 --- a/crates/net/downloaders/src/test_utils/file_client.rs +++ b/crates/net/downloaders/src/test_utils/file_client.rs @@ -8,7 +8,7 @@ use reth_interfaces::{ headers::client::{HeadersClient, HeadersFut, HeadersRequest}, priority::Priority, }, - sync::{SyncState, SyncStateProvider, SyncStateUpdater}, + sync::{NetworkSyncUpdater, SyncState, SyncStateProvider}, }; use reth_primitives::{ Block, BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, Header, HeadersDirection, PeerId, @@ -54,9 +54,6 @@ pub struct FileClient { /// The buffered bodies retrieved when fetching new headers. bodies: HashMap, - - /// Represents if we are currently syncing. - is_syncing: Arc, } /// An error that can occur when constructing and using a [`FileClient`](FileClient). @@ -114,7 +111,7 @@ impl FileClient { trace!(blocks = headers.len(), "Initialized file client"); - Ok(Self { headers, hash_to_number, bodies, is_syncing: Arc::new(Default::default()) }) + Ok(Self { headers, hash_to_number, bodies }) } /// Get the tip hash of the chain. @@ -247,19 +244,6 @@ impl DownloadClient for FileClient { } } -impl SyncStateProvider for FileClient { - fn is_syncing(&self) -> bool { - self.is_syncing.load(Ordering::Relaxed) - } -} - -impl SyncStateUpdater for FileClient { - fn update_sync_state(&self, state: SyncState) { - let is_syncing = state.is_syncing(); - self.is_syncing.store(is_syncing, Ordering::Relaxed) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index ad8868f7e..f4c286617 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -5,10 +5,7 @@ use crate::{ use async_trait::async_trait; use parking_lot::Mutex; use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions}; -use reth_interfaces::{ - p2p::headers::client::StatusUpdater, - sync::{SyncState, SyncStateProvider, SyncStateUpdater}, -}; +use reth_interfaces::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider}; use reth_net_common::bandwidth_meter::BandwidthMeter; use reth_network_api::{ NetworkError, NetworkInfo, PeerKind, Peers, PeersInfo, Reputation, ReputationChangeKind, @@ -243,24 +240,22 @@ impl NetworkInfo for NetworkHandle { } } -impl StatusUpdater for NetworkHandle { - /// Update the status of the node. - fn update_status(&self, head: Head) { - self.send_message(NetworkHandleMessage::StatusUpdate { head }); - } -} - impl SyncStateProvider for NetworkHandle { fn is_syncing(&self) -> bool { self.inner.is_syncing.load(Ordering::Relaxed) } } -impl SyncStateUpdater for NetworkHandle { +impl NetworkSyncUpdater for NetworkHandle { fn update_sync_state(&self, state: SyncState) { let is_syncing = state.is_syncing(); self.inner.is_syncing.store(is_syncing, Ordering::Relaxed) } + + /// Update the status of the node. + fn update_status(&self, head: Head) { + self.send_message(NetworkHandleMessage::StatusUpdate { head }); + } } #[derive(Debug)] diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 0aa996ce8..8cf34a67a 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -733,7 +733,7 @@ pub enum NetworkTransactionEvent { mod tests { use super::*; use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager}; - use reth_interfaces::sync::{SyncState, SyncStateUpdater}; + use reth_interfaces::sync::{NetworkSyncUpdater, SyncState}; use reth_network_api::NetworkInfo; use reth_provider::test_utils::NoopProvider; use reth_rlp::Decodable; diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 5549ba615..690eea9b1 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -7,7 +7,7 @@ use reth_discv4::Discv4Config; use reth_eth_wire::DisconnectReason; use reth_interfaces::{ p2p::headers::client::{HeadersClient, HeadersRequest}, - sync::{SyncState, SyncStateUpdater}, + sync::{NetworkSyncUpdater, SyncState}, }; use reth_net_common::ban_list::BanList; use reth_network::{ diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index d50fb70cf..57008db42 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -24,7 +24,7 @@ //! # use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; //! # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloaderBuilder; //! # use reth_interfaces::consensus::Consensus; -//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient, TestStatusUpdater}; +//! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient}; //! # use reth_revm::Factory; //! # use reth_primitives::{PeerId, MAINNET, H256}; //! # use reth_stages::Pipeline; @@ -44,13 +44,12 @@ //! # ); //! # let (tip_tx, tip_rx) = watch::channel(H256::default()); //! # let factory = Factory::new(Arc::new(MAINNET.clone())); -//! # let (status_updater, _) = TestStatusUpdater::new(); //! // Create a pipeline that can fully sync //! # let pipeline = //! Pipeline::builder() //! .with_tip_sender(tip_tx) //! .add_stages( -//! DefaultStages::new(HeaderSyncMode::Tip(tip_rx), consensus, headers_downloader, bodies_downloader, status_updater, factory) +//! DefaultStages::new(HeaderSyncMode::Tip(tip_rx), consensus, headers_downloader, bodies_downloader, factory) //! ) //! .build(db); //! ``` diff --git a/crates/stages/src/pipeline/builder.rs b/crates/stages/src/pipeline/builder.rs index 524337ce6..e8c26187b 100644 --- a/crates/stages/src/pipeline/builder.rs +++ b/crates/stages/src/pipeline/builder.rs @@ -1,6 +1,5 @@ use crate::{pipeline::BoxedStage, Pipeline, Stage, StageId, StageSet}; use reth_db::database::Database; -use reth_interfaces::sync::{NoopSyncStateUpdate, SyncStateUpdater}; use reth_primitives::{BlockNumber, H256}; use tokio::sync::watch; @@ -14,8 +13,6 @@ where stages: Vec>, /// The maximum block number to sync to. max_block: Option, - /// Used for emitting updates about whether the pipeline is running or not. - sync_state_updater: Box, /// A receiver for the current chain tip to sync to /// /// Note: this is only used for debugging purposes. @@ -63,22 +60,15 @@ where self } - /// Set a [SyncStateUpdater]. - pub fn with_sync_state_updater(mut self, updater: U) -> Self { - self.sync_state_updater = Box::new(updater); - self - } - /// Builds the final [`Pipeline`] using the given database. /// /// Note: it's expected that this is either an [Arc](std::sync::Arc) or an Arc wrapper type. pub fn build(self, db: DB) -> Pipeline { - let Self { stages, max_block, sync_state_updater, tip_tx } = self; + let Self { stages, max_block, tip_tx } = self; Pipeline { db, stages, max_block, - sync_state_updater, tip_tx, listeners: Default::default(), progress: Default::default(), @@ -89,12 +79,7 @@ where impl Default for PipelineBuilder { fn default() -> Self { - Self { - stages: Vec::new(), - max_block: None, - sync_state_updater: Box::::default(), - tip_tx: None, - } + Self { stages: Vec::new(), max_block: None, tip_tx: None } } } @@ -103,7 +88,6 @@ impl std::fmt::Debug for PipelineBuilder { f.debug_struct("PipelineBuilder") .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::>()) .field("max_block", &self.max_block) - .field("sync_state_updater", &self.sync_state_updater) .finish() } } diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index cf7294c1f..a2cb1df03 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -1,7 +1,6 @@ use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput}; use futures_util::Future; use reth_db::database::Database; -use reth_interfaces::sync::{SyncState, SyncStateUpdater}; use reth_primitives::{listener::EventListeners, BlockNumber, H256}; use reth_provider::Transaction; use std::{ops::Deref, pin::Pin}; @@ -92,8 +91,6 @@ pub struct Pipeline { max_block: Option, /// All listeners for events the pipeline emits. listeners: EventListeners, - /// Used for emitting updates about whether the pipeline is running or not. - sync_state_updater: Box, /// Keeps track of the progress of the pipeline. progress: PipelineProgress, /// A receiver for the current chain tip to sync to @@ -202,13 +199,6 @@ where let stage = &self.stages[stage_index]; let stage_id = stage.id(); - // Update sync state - if stage_id.is_finish() { - self.sync_state_updater.update_sync_state(SyncState::Idle); - } else { - self.sync_state_updater.update_sync_state(SyncState::Syncing); - } - trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage"); let next = self .execute_stage_to_completion(previous_stage, stage_index) @@ -225,8 +215,6 @@ where } ControlFlow::Continue { progress } => self.progress.update(progress), ControlFlow::Unwind { target, bad_block } => { - // reset the sync state - self.sync_state_updater.update_sync_state(SyncState::Syncing); self.unwind(target, bad_block).await?; return Ok(ControlFlow::Unwind { target, bad_block }) } @@ -404,7 +392,6 @@ impl std::fmt::Debug for Pipeline { .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::>()) .field("max_block", &self.max_block) .field("listeners", &self.listeners) - .field("sync_state_updater", &self.sync_state_updater) .finish() } } diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index a6d882d0b..6348daa8a 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -49,10 +49,7 @@ use crate::{ use reth_db::database::Database; use reth_interfaces::{ consensus::Consensus, - p2p::{ - bodies::downloader::BodyDownloader, - headers::{client::StatusUpdater, downloader::HeaderDownloader}, - }, + p2p::{bodies::downloader::BodyDownloader, headers::downloader::HeaderDownloader}, }; use reth_provider::ExecutorFactory; use std::sync::Arc; @@ -65,23 +62,20 @@ use std::sync::Arc; /// - [`OfflineStages`] /// - [`FinishStage`] #[derive(Debug)] -pub struct DefaultStages { +pub struct DefaultStages { /// Configuration for the online stages online: OnlineStages, /// Executor factory needs for execution stage executor_factory: EF, - /// Configuration for the [`FinishStage`] stage. - status_updater: S, } -impl DefaultStages { +impl DefaultStages { /// Create a new set of default stages with default values. pub fn new( header_mode: HeaderSyncMode, consensus: Arc, header_downloader: H, body_downloader: B, - status_updater: S, executor_factory: EF, ) -> Self where @@ -90,38 +84,32 @@ impl DefaultStages { Self { online: OnlineStages::new(header_mode, consensus, header_downloader, body_downloader), executor_factory, - status_updater, } } } -impl DefaultStages +impl DefaultStages where - S: StatusUpdater + 'static, EF: ExecutorFactory, { /// Appends the default offline stages and default finish stage to the given builder. pub fn add_offline_stages( default_offline: StageSetBuilder, - status_updater: S, executor_factory: EF, ) -> StageSetBuilder { - default_offline - .add_set(OfflineStages::new(executor_factory)) - .add_stage(FinishStage::new(status_updater)) + default_offline.add_set(OfflineStages::new(executor_factory)).add_stage(FinishStage) } } -impl StageSet for DefaultStages +impl StageSet for DefaultStages where DB: Database, H: HeaderDownloader + 'static, B: BodyDownloader + 'static, - S: StatusUpdater + 'static, EF: ExecutorFactory, { fn builder(self) -> StageSetBuilder { - Self::add_offline_stages(self.online.builder(), self.status_updater, self.executor_factory) + Self::add_offline_stages(self.online.builder(), self.executor_factory) } } diff --git a/crates/stages/src/stages/finish.rs b/crates/stages/src/stages/finish.rs index 952bb7f8a..2027c9adb 100644 --- a/crates/stages/src/stages/finish.rs +++ b/crates/stages/src/stages/finish.rs @@ -1,7 +1,5 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use reth_db::database::Database; -use reth_interfaces::p2p::headers::client::StatusUpdater; -use reth_primitives::{BlockNumber, Head}; use reth_provider::Transaction; /// The [`StageId`] of the finish stage. @@ -10,71 +8,29 @@ pub const FINISH: StageId = StageId("Finish"); /// The finish stage. /// /// This stage does not write anything; it's checkpoint is used to denote the highest fully synced -/// block, which is communicated to the P2P networking component, as well as the RPC component. -/// -/// The highest block is communicated via a [StatusUpdater] when the stage executes or unwinds. -/// -/// When the node starts up, the checkpoint for this stage should be used send the initial status -/// update to the relevant components. Assuming the genesis block is written before this, it is safe -/// to default the stage checkpoint to block number 0 on startup. +/// block. #[derive(Default, Debug, Clone)] -pub struct FinishStage { - updater: S, -} - -impl FinishStage -where - S: StatusUpdater, -{ - /// Create a new [FinishStage] with the given [StatusUpdater]. - pub fn new(updater: S) -> Self { - Self { updater } - } - - /// Construct a [Head] from `block_number`. - fn fetch_head( - &self, - tx: &mut Transaction<'_, DB>, - block_number: BlockNumber, - ) -> Result { - let header = tx.get_header(block_number)?; - let hash = tx.get_block_hash(block_number)?; - let total_difficulty = tx.get_td(block_number)?; - - Ok(Head { - number: block_number, - hash, - total_difficulty, - difficulty: header.difficulty, - timestamp: header.timestamp, - }) - } -} +pub struct FinishStage; #[async_trait::async_trait] -impl Stage for FinishStage -where - S: StatusUpdater, -{ +impl Stage for FinishStage { fn id(&self) -> StageId { FINISH } async fn execute( &mut self, - tx: &mut Transaction<'_, DB>, + _tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - self.updater.update_status(self.fetch_head(tx, input.previous_stage_progress())?); Ok(ExecOutput { done: true, stage_progress: input.previous_stage_progress() }) } async fn unwind( &mut self, - tx: &mut Transaction<'_, DB>, + _tx: &mut Transaction<'_, DB>, input: UnwindInput, ) -> Result { - self.updater.update_status(self.fetch_head(tx, input.unwind_to)?); Ok(UnwindOutput { stage_progress: input.unwind_to }) } } @@ -86,52 +42,30 @@ mod tests { stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, UnwindStageTestRunner, }; - use reth_interfaces::test_utils::{ - generators::{random_header, random_header_range}, - TestStatusUpdater, - }; + use reth_interfaces::test_utils::generators::{random_header, random_header_range}; use reth_primitives::SealedHeader; - use std::sync::Mutex; stage_test_suite_ext!(FinishTestRunner, finish); struct FinishTestRunner { tx: TestTransaction, - status: Mutex>>, - } - - impl FinishTestRunner { - /// Gets the current status. - /// - /// # Panics - /// - /// Panics if multiple threads try to read the status at the same time, or if no status - /// receiver has been set. - fn current_status(&self) -> Head { - let status_lock = self.status.try_lock().expect("competing for status lock"); - let status = status_lock.as_ref().expect("no status receiver set").borrow(); - *status - } } impl Default for FinishTestRunner { fn default() -> Self { - FinishTestRunner { tx: TestTransaction::default(), status: Mutex::new(None) } + FinishTestRunner { tx: TestTransaction::default() } } } impl StageTestRunner for FinishTestRunner { - type S = FinishStage; + type S = FinishStage; fn tx(&self) -> &TestTransaction { &self.tx } fn stage(&self) -> Self::S { - let (status_updater, status) = TestStatusUpdater::new(); - let mut status_container = self.status.try_lock().expect("competing for status lock"); - *status_container = Some(status); - FinishStage::new(status_updater) + FinishStage } } @@ -168,23 +102,13 @@ mod tests { input.previous_stage_progress(), "stage progress should always match progress of previous stage" ); - assert_eq!( - self.current_status().number, - output.stage_progress, - "incorrect block number in status update" - ); } Ok(()) } } impl UnwindStageTestRunner for FinishTestRunner { - fn validate_unwind(&self, input: UnwindInput) -> Result<(), TestRunnerError> { - assert_eq!( - self.current_status().number, - input.unwind_to, - "incorrect block number in status update" - ); + fn validate_unwind(&self, _input: UnwindInput) -> Result<(), TestRunnerError> { Ok(()) } }