From 90fa586ced2d5e732fadc1cea89e1bfc68e1cab8 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 3 May 2023 13:57:28 +0200 Subject: [PATCH] refactor: remove SyncstateUpdater generics and use boxed instead (#2534) --- bin/reth/src/chain/import.rs | 6 +-- bin/reth/src/node/mod.rs | 4 +- crates/consensus/beacon/src/engine/mod.rs | 23 ++++----- .../beacon/src/engine/pipeline_state.rs | 9 ++-- crates/interfaces/src/sync.rs | 2 +- crates/stages/src/lib.rs | 3 +- crates/stages/src/pipeline/builder.rs | 18 +++---- crates/stages/src/pipeline/mod.rs | 51 +++++++++---------- crates/stages/src/sets.rs | 3 +- 9 files changed, 51 insertions(+), 68 deletions(-) diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index e6cac01c5..8802592dd 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -11,9 +11,7 @@ use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient, }; -use reth_interfaces::{ - consensus::Consensus, p2p::headers::client::NoopStatusUpdater, sync::SyncStateUpdater, -}; +use reth_interfaces::{consensus::Consensus, p2p::headers::client::NoopStatusUpdater}; use reth_primitives::{ChainSpec, H256}; use reth_staged_sync::{ utils::{ @@ -131,7 +129,7 @@ impl ImportCommand { db: Arc>, consensus: &Arc, file_client: Arc, - ) -> eyre::Result<(Pipeline, impl SyncStateUpdater>, impl Stream)> + ) -> eyre::Result<(Pipeline>, impl Stream)> where C: Consensus + 'static, { diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index bc416e3a9..160cbd6e2 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -425,7 +425,7 @@ impl Command { consensus: Arc, db: Arc>, task_executor: &TaskExecutor, - ) -> eyre::Result, NetworkHandle>> + ) -> eyre::Result>> where Client: HeadersClient + BodiesClient + Clone + 'static, { @@ -633,7 +633,7 @@ impl Command { consensus: Arc, max_block: Option, continuous: bool, - ) -> eyre::Result, U>> + ) -> eyre::Result>> where H: HeaderDownloader + 'static, B: BodyDownloader + 'static, diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 1b31cf55a..69def9017 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -5,7 +5,6 @@ use reth_interfaces::{ blockchain_tree::{BlockStatus, BlockchainTreeEngine}, consensus::ForkchoiceState, executor::Error as ExecutorError, - sync::SyncStateUpdater, Error, }; use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle}; @@ -135,11 +134,10 @@ impl BeaconConsensusEngineHandle { /// /// If the future is polled more than once. Leads to undefined state. #[must_use = "Future does nothing unless polled"] -pub struct BeaconConsensusEngine +pub struct BeaconConsensusEngine where DB: Database, TS: TaskSpawner, - U: SyncStateUpdater, BT: BlockchainTreeEngine, { /// The database handle. @@ -149,7 +147,7 @@ where /// The current state of the pipeline. /// Must always be [Some] unless the state is being reevaluated. /// The pipeline is used for historical sync by setting the current forkchoice head. - pipeline_state: Option>, + pipeline_state: Option>, /// The blockchain tree used for live sync and reorg tracking. blockchain_tree: BT, /// The Engine API message receiver. @@ -178,18 +176,17 @@ where metrics: Metrics, } -impl BeaconConsensusEngine +impl BeaconConsensusEngine where DB: Database + Unpin + 'static, TS: TaskSpawner, - U: SyncStateUpdater + 'static, BT: BlockchainTreeEngine + 'static, { /// Create a new instance of the [BeaconConsensusEngine]. pub fn new( db: Arc, task_spawner: TS, - pipeline: Pipeline, + pipeline: Pipeline, blockchain_tree: BT, max_block: Option, continuous: bool, @@ -215,7 +212,7 @@ where pub fn with_channel( db: Arc, task_spawner: TS, - pipeline: Pipeline, + pipeline: Pipeline, blockchain_tree: BT, max_block: Option, continuous: bool, @@ -553,9 +550,9 @@ where /// Resets the next action to the default value. fn next_pipeline_state( &mut self, - pipeline: Pipeline, + pipeline: Pipeline, forkchoice_state: ForkchoiceState, - ) -> PipelineState { + ) -> PipelineState { let next_action = std::mem::take(&mut self.next_action); let (tip, should_run_pipeline) = match next_action { @@ -639,11 +636,10 @@ where /// local forkchoice state, it will launch the pipeline to sync to the head hash. /// While the pipeline is syncing, the consensus engine will keep processing messages from the /// receiver and forwarding them to the blockchain tree. -impl Future for BeaconConsensusEngine +impl Future for BeaconConsensusEngine where DB: Database + Unpin + 'static, TS: TaskSpawner + Unpin, - U: SyncStateUpdater + Unpin + 'static, BT: BlockchainTreeEngine + Unpin + 'static, { type Output = Result<(), BeaconEngineError>; @@ -805,7 +801,7 @@ mod tests { BlockchainTree, ShareableBlockchainTree, }; use reth_db::mdbx::{test_utils::create_test_rw_db, Env, WriteMap}; - use reth_interfaces::{sync::NoopSyncStateUpdate, test_utils::TestConsensus}; + use reth_interfaces::test_utils::TestConsensus; use reth_payload_builder::test_utils::spawn_test_payload_service; use reth_primitives::{ChainSpec, ChainSpecBuilder, SealedBlockWithSenders, H256, MAINNET}; use reth_provider::{test_utils::TestExecutorFactory, Transaction}; @@ -820,7 +816,6 @@ mod tests { type TestBeaconConsensusEngine = BeaconConsensusEngine< Env, TokioTaskExecutor, - NoopSyncStateUpdate, ShareableBlockchainTree>, TestConsensus, TestExecutorFactory>, >; diff --git a/crates/consensus/beacon/src/engine/pipeline_state.rs b/crates/consensus/beacon/src/engine/pipeline_state.rs index 4cab3d353..2595e140a 100644 --- a/crates/consensus/beacon/src/engine/pipeline_state.rs +++ b/crates/consensus/beacon/src/engine/pipeline_state.rs @@ -1,5 +1,4 @@ use reth_db::database::Database; -use reth_interfaces::sync::SyncStateUpdater; use reth_stages::{Pipeline, PipelineWithResult}; use tokio::sync::oneshot; @@ -12,14 +11,14 @@ use tokio::sync::oneshot; /// running, it acquires the write lock over the database. This means that we cannot forward to the /// blockchain tree any messages that would result in database writes, since it would result in a /// deadlock. -pub enum PipelineState { +pub enum PipelineState { /// Pipeline is idle. - Idle(Pipeline), + Idle(Pipeline), /// Pipeline is running. - Running(oneshot::Receiver>), + Running(oneshot::Receiver>), } -impl PipelineState { +impl PipelineState { /// Returns `true` if the state matches idle. pub fn is_idle(&self) -> bool { matches!(self, PipelineState::Idle(_)) diff --git a/crates/interfaces/src/sync.rs b/crates/interfaces/src/sync.rs index cbe2e044f..68b8191c9 100644 --- a/crates/interfaces/src/sync.rs +++ b/crates/interfaces/src/sync.rs @@ -16,7 +16,7 @@ pub trait SyncStateProvider: Send + Sync { /// Eventually the node reaches the `Finish` stage and will transition to [`SyncState::Idle`], it /// which point the node is considered fully synced. #[auto_impl::auto_impl(&, Arc, Box)] -pub trait SyncStateUpdater: SyncStateProvider { +pub trait SyncStateUpdater: Send + Sync + 'static { /// Notifies about an [SyncState] update. fn update_sync_state(&self, state: SyncState); } diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 86ec72f84..8f7331fc7 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -25,7 +25,6 @@ //! # use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; //! # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloaderBuilder; //! # use reth_interfaces::consensus::Consensus; -//! # use reth_interfaces::sync::NoopSyncStateUpdate; //! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient, TestStatusUpdater}; //! # use reth_revm::Factory; //! # use reth_primitives::{PeerId, MAINNET, H256}; @@ -47,7 +46,7 @@ //! # let factory = Factory::new(Arc::new(MAINNET.clone())); //! # let (status_updater, _) = TestStatusUpdater::new(); //! // Create a pipeline that can fully sync -//! # let pipeline: Pipeline, NoopSyncStateUpdate> = +//! # let pipeline: Pipeline> = //! Pipeline::builder() //! .with_tip_sender(tip_tx) //! .add_stages( diff --git a/crates/stages/src/pipeline/builder.rs b/crates/stages/src/pipeline/builder.rs index 9506fa21c..0f3d94d64 100644 --- a/crates/stages/src/pipeline/builder.rs +++ b/crates/stages/src/pipeline/builder.rs @@ -1,30 +1,28 @@ use crate::{Pipeline, Stage, StageSet}; use reth_db::database::Database; -use reth_interfaces::sync::{NoopSyncStateUpdate, SyncStateUpdater}; +use reth_interfaces::sync::SyncStateUpdater; use reth_primitives::{BlockNumber, H256}; use tokio::sync::watch; /// Builds a [`Pipeline`]. #[derive(Debug)] #[must_use = "call `build` to construct the pipeline"] -pub struct PipelineBuilder +pub struct PipelineBuilder where DB: Database, - U: SyncStateUpdater, { - pipeline: Pipeline, + pipeline: Pipeline, } -impl Default for PipelineBuilder { +impl Default for PipelineBuilder { fn default() -> Self { Self { pipeline: Pipeline::default() } } } -impl PipelineBuilder +impl PipelineBuilder where DB: Database, - U: SyncStateUpdater, { /// Add a stage to the pipeline. pub fn add_stage(mut self, stage: S) -> Self @@ -71,13 +69,13 @@ where } /// Set a [SyncStateUpdater]. - pub fn with_sync_state_updater(mut self, updater: U) -> Self { - self.pipeline.sync_state_updater = Some(updater); + pub fn with_sync_state_updater(mut self, updater: U) -> Self { + self.pipeline.sync_state_updater = Box::new(updater); self } /// Builds the final [`Pipeline`]. - pub fn build(self) -> Pipeline { + pub fn build(self) -> Pipeline { self.pipeline } } diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 2fea9e08d..00173574f 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -1,7 +1,7 @@ use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput}; use futures_util::Future; use reth_db::database::Database; -use reth_interfaces::sync::{SyncState, SyncStateUpdater}; +use reth_interfaces::sync::{NoopSyncStateUpdate, SyncState, SyncStateUpdater}; use reth_primitives::{listener::EventListeners, BlockNumber, H256}; use reth_provider::Transaction; use std::{ @@ -78,12 +78,12 @@ use sync_metrics::*; /// In case of a validation error (as determined by the consensus engine) in one of the stages, the /// pipeline will unwind the stages in reverse order of execution. It is also possible to /// request an unwind manually (see [Pipeline::unwind]). -pub struct Pipeline { +pub struct Pipeline { stages: Vec>, max_block: Option, continuous: bool, listeners: EventListeners, - sync_state_updater: Option, + sync_state_updater: Box, progress: PipelineProgress, tip_tx: Option>, metrics: Metrics, @@ -91,19 +91,19 @@ pub struct Pipeline { /// The future that returns the owned pipeline and the result of the pipeline run. See /// [Pipeline::run_as_fut]. -pub type PipelineFut = Pin> + Send>>; +pub type PipelineFut = Pin> + Send>>; /// The pipeline type itself with the result of [Pipeline::run_as_fut] -pub type PipelineWithResult = (Pipeline, Result); +pub type PipelineWithResult = (Pipeline, Result); -impl Default for Pipeline { +impl Default for Pipeline { fn default() -> Self { Self { stages: Vec::new(), max_block: None, continuous: false, listeners: EventListeners::default(), - sync_state_updater: None, + sync_state_updater: Box::::default(), progress: PipelineProgress::default(), tip_tx: None, metrics: Metrics::default(), @@ -111,7 +111,7 @@ impl Default for Pipeline { } } -impl Debug for Pipeline { +impl Debug for Pipeline { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("Pipeline") .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::>()) @@ -120,13 +120,12 @@ impl Debug for Pipeline { } } -impl Pipeline +impl Pipeline where DB: Database + 'static, - U: SyncStateUpdater + 'static, { /// Construct a pipeline using a [`PipelineBuilder`]. - pub fn builder() -> PipelineBuilder { + pub fn builder() -> PipelineBuilder { PipelineBuilder::default() } @@ -164,7 +163,7 @@ where /// Consume the pipeline and run it until it reaches the provided tip, if set. Return the /// pipeline and its result as a future. #[track_caller] - pub fn run_as_fut(mut self, db: Arc, tip: Option) -> PipelineFut { + pub fn run_as_fut(mut self, db: Arc, tip: Option) -> PipelineFut { // TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for // updating metrics. self.register_metrics(db.clone()); @@ -220,12 +219,10 @@ where let stage_id = stage.id(); // Update sync state - if let Some(ref updater) = self.sync_state_updater { - if stage_id.is_finish() { - updater.update_sync_state(SyncState::Idle); - } else { - updater.update_sync_state(SyncState::Syncing); - } + if stage_id.is_finish() { + self.sync_state_updater.update_sync_state(SyncState::Idle); + } else { + self.sync_state_updater.update_sync_state(SyncState::Syncing); } trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage"); @@ -245,9 +242,7 @@ where ControlFlow::Continue { progress } => self.progress.update(progress), ControlFlow::Unwind { target, bad_block } => { // reset the sync state - if let Some(ref updater) = self.sync_state_updater { - updater.update_sync_state(SyncState::Syncing); - } + self.sync_state_updater.update_sync_state(SyncState::Syncing); self.unwind(db.as_ref(), target, bad_block).await?; return Ok(ControlFlow::Unwind { target, bad_block }) } @@ -427,7 +422,7 @@ mod tests { use crate::{test_utils::TestStage, StageId, UnwindOutput}; use assert_matches::assert_matches; use reth_db::mdbx::{self, test_utils, EnvKind}; - use reth_interfaces::{consensus, provider::ProviderError, sync::NoopSyncStateUpdate}; + use reth_interfaces::{consensus, provider::ProviderError}; use tokio_stream::StreamExt; #[test] @@ -462,7 +457,7 @@ mod tests { async fn run_pipeline() { let db = test_utils::create_test_db::(EnvKind::RW); - let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder() + let mut pipeline: Pipeline<_> = Pipeline::builder() .add_stage( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { stage_progress: 20, done: true })), @@ -503,7 +498,7 @@ mod tests { async fn unwind_pipeline() { let db = test_utils::create_test_db::(EnvKind::RW); - let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder() + let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { stage_progress: 100, done: true })) @@ -586,7 +581,7 @@ mod tests { async fn unwind_pipeline_with_intermediate_progress() { let db = test_utils::create_test_db::(EnvKind::RW); - let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder() + let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { stage_progress: 100, done: true })) @@ -655,7 +650,7 @@ mod tests { async fn run_pipeline_with_unwind() { let db = test_utils::create_test_db::(EnvKind::RW); - let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder() + let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })) @@ -718,7 +713,7 @@ mod tests { async fn pipeline_error_handling() { // Non-fatal let db = test_utils::create_test_db::(EnvKind::RW); - let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder() + let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId("NonFatal")) .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) @@ -731,7 +726,7 @@ mod tests { // Fatal let db = test_utils::create_test_db::(EnvKind::RW); - let mut pipeline: Pipeline<_, NoopSyncStateUpdate> = Pipeline::builder() + let mut pipeline = Pipeline::builder() .add_stage(TestStage::new(StageId("Fatal")).add_exec(Err( StageError::DatabaseIntegrity(ProviderError::BlockBodyIndices { number: 5 }), ))) diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index 83c7341ac..93069ccad 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -11,7 +11,6 @@ //! //! ```no_run //! # use reth_db::mdbx::{Env, WriteMap}; -//! # use reth_interfaces::sync::NoopSyncStateUpdate; //! # use reth_stages::Pipeline; //! # use reth_stages::sets::{OfflineStages}; //! # use reth_revm::Factory; @@ -20,7 +19,7 @@ //! //! # let factory = Factory::new(Arc::new(MAINNET.clone())); //! // Build a pipeline with all offline stages. -//! # let pipeline: Pipeline, NoopSyncStateUpdate> = +//! # let pipeline: Pipeline> = //! Pipeline::builder().add_stages(OfflineStages::new(factory)).build(); //! ``` //!