diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index 8802592dd..3ab0f1a6c 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -6,7 +6,8 @@ use clap::{crate_version, Parser}; use eyre::Context; use futures::{Stream, StreamExt}; use reth_beacon_consensus::BeaconConsensus; -use reth_db::mdbx::{Env, WriteMap}; + +use reth_db::database::Database; use reth_downloaders::{ bodies::bodies::BodiesDownloaderBuilder, headers::reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::FileClient, @@ -104,7 +105,7 @@ impl ImportCommand { info!(target: "reth::cli", "Chain file imported"); let (mut pipeline, events) = - self.build_import_pipeline(config, db.clone(), &consensus, file_client).await?; + self.build_import_pipeline(config, db, &consensus, file_client).await?; // override the tip pipeline.set_tip(tip); @@ -115,7 +116,7 @@ impl ImportCommand { // Run pipeline info!(target: "reth::cli", "Starting sync pipeline"); tokio::select! { - res = pipeline.run(db.clone()) => res?, + res = pipeline.run() => res?, _ = tokio::signal::ctrl_c() => {}, }; @@ -123,14 +124,15 @@ impl ImportCommand { Ok(()) } - async fn build_import_pipeline( + async fn build_import_pipeline( &self, config: Config, - db: Arc>, + db: DB, consensus: &Arc, file_client: Arc, - ) -> eyre::Result<(Pipeline>, impl Stream)> + ) -> eyre::Result<(Pipeline, impl Stream)> where + DB: Database + Clone + Unpin + 'static, C: Consensus + 'static, { if !file_client.has_canonical_blocks() { @@ -142,7 +144,7 @@ impl ImportCommand { .into_task(); let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies) - .build(file_client.clone(), consensus.clone(), db) + .build(file_client.clone(), consensus.clone(), db.clone()) .into_task(); let (tip_tx, tip_rx) = watch::channel(H256::zero()); @@ -171,7 +173,7 @@ impl ImportCommand { }) .set(ExecutionStage::new(factory, config.stages.execution.commit_threshold)), ) - .build(); + .build(db); let events = pipeline.events().map(Into::into); diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index e910de6b1..e1309edc6 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -412,22 +412,23 @@ impl Command { } /// Constructs a [Pipeline] that's wired to the network - async fn build_networked_pipeline( + async fn build_networked_pipeline( &self, config: &mut Config, network: NetworkHandle, client: Client, consensus: Arc, - db: Arc>, + db: DB, task_executor: &TaskExecutor, - ) -> eyre::Result>> + ) -> eyre::Result> where + DB: Database + Unpin + Clone + 'static, Client: HeadersClient + BodiesClient + Clone + 'static, { let max_block = if let Some(block) = self.debug.max_block { Some(block) } else if let Some(tip) = self.debug.tip { - Some(self.lookup_or_fetch_tip(db.clone(), &client, tip).await?) + Some(self.lookup_or_fetch_tip(&db, &client, tip).await?) } else { None }; @@ -443,6 +444,7 @@ impl Command { let pipeline = self .build_pipeline( + db, config, header_downloader, body_downloader, @@ -541,13 +543,14 @@ impl Command { /// If it doesn't exist, download the header and return the block number. /// /// NOTE: The download is attempted with infinite retries. - async fn lookup_or_fetch_tip( + async fn lookup_or_fetch_tip( &self, - db: Arc>, + db: &DB, client: Client, tip: H256, ) -> Result where + DB: Database, Client: HeadersClient, { Ok(self.fetch_tip(db, client, BlockHashOrNumber::Hash(tip)).await?.number) @@ -556,13 +559,14 @@ impl Command { /// Attempt to look up the block with the given number and return the header. /// /// NOTE: The download is attempted with infinite retries. - async fn fetch_tip( + async fn fetch_tip( &self, - db: Arc>, + db: &DB, client: Client, tip: BlockHashOrNumber, ) -> Result where + DB: Database, Client: HeadersClient, { let header = db.view(|tx| -> Result, reth_db::Error> { @@ -619,8 +623,9 @@ impl Command { } #[allow(clippy::too_many_arguments)] - async fn build_pipeline( + async fn build_pipeline( &self, + db: DB, config: &Config, header_downloader: H, body_downloader: B, @@ -628,8 +633,9 @@ impl Command { consensus: Arc, max_block: Option, continuous: bool, - ) -> eyre::Result>> + ) -> eyre::Result> where + DB: Database + Clone + 'static, H: HeaderDownloader + 'static, B: BodyDownloader + 'static, U: SyncStateUpdater + StatusUpdater + Clone + 'static, @@ -687,7 +693,7 @@ impl Command { .disable_if(MERKLE_UNWIND, || self.auto_mine) .disable_if(MERKLE_EXECUTION, || self.auto_mine), ) - .build(); + .build(db); Ok(pipeline) } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 69def9017..ae74d56ac 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -20,7 +20,6 @@ use reth_tasks::TaskSpawner; use schnellru::{ByLength, LruMap}; use std::{ pin::Pin, - sync::Arc, task::{Context, Poll}, }; use tokio::sync::{ @@ -141,7 +140,7 @@ where BT: BlockchainTreeEngine, { /// The database handle. - db: Arc, + db: DB, /// Task spawner for spawning the pipeline. task_spawner: TS, /// The current state of the pipeline. @@ -184,7 +183,7 @@ where { /// Create a new instance of the [BeaconConsensusEngine]. pub fn new( - db: Arc, + db: DB, task_spawner: TS, pipeline: Pipeline, blockchain_tree: BT, @@ -210,7 +209,7 @@ where /// the [BeaconEngineMessage] communication channel. #[allow(clippy::too_many_arguments)] pub fn with_channel( - db: Arc, + db: DB, task_spawner: TS, pipeline: Pipeline, blockchain_tree: BT, @@ -570,11 +569,10 @@ where self.metrics.pipeline_runs.increment(1); trace!(target: "consensus::engine", ?tip, continuous = tip.is_none(), "Starting the pipeline"); let (tx, rx) = oneshot::channel(); - let db = self.db.clone(); self.task_spawner.spawn_critical_blocking( "pipeline", Box::pin(async move { - let result = pipeline.run_as_fut(db, tip).await; + let result = pipeline.run_as_fut(tip).await; let _ = tx.send(result); }), ); @@ -768,7 +766,7 @@ enum PipelineTarget { Safe, } -/// Keeps track of invalid headerst. +/// Keeps track of invalid headers. struct InvalidHeaderCache { headers: LruMap, } @@ -807,20 +805,20 @@ mod tests { use reth_provider::{test_utils::TestExecutorFactory, Transaction}; use reth_stages::{test_utils::TestStages, ExecOutput, PipelineError, StageError}; use reth_tasks::TokioTaskExecutor; - use std::{collections::VecDeque, time::Duration}; + use std::{collections::VecDeque, sync::Arc, time::Duration}; use tokio::sync::{ oneshot::{self, error::TryRecvError}, watch, }; type TestBeaconConsensusEngine = BeaconConsensusEngine< - Env, + Arc>, TokioTaskExecutor, ShareableBlockchainTree>, TestConsensus, TestExecutorFactory>, >; struct TestEnv { - db: Arc, + db: DB, // Keep the tip receiver around, so it's not dropped. #[allow(dead_code)] tip_rx: watch::Receiver, @@ -829,7 +827,7 @@ mod tests { impl TestEnv { fn new( - db: Arc, + db: DB, tip_rx: watch::Receiver, engine_handle: BeaconConsensusEngineHandle, ) -> Self { @@ -883,7 +881,7 @@ mod tests { chain_spec: Arc, pipeline_exec_outputs: VecDeque>, executor_results: Vec, - ) -> (TestBeaconConsensusEngine, TestEnv>) { + ) -> (TestBeaconConsensusEngine, TestEnv>>) { reth_tracing::init_test_tracing(); let db = create_test_rw_db(); let consensus = TestConsensus::default(); @@ -897,7 +895,7 @@ mod tests { let pipeline = Pipeline::builder() .add_stages(TestStages::new(pipeline_exec_outputs, Default::default())) .with_tip_sender(tip_tx) - .build(); + .build(db.clone()); // Setup blockchain tree let externals = TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec); diff --git a/crates/interfaces/src/sync.rs b/crates/interfaces/src/sync.rs index 68b8191c9..bd4612cbd 100644 --- a/crates/interfaces/src/sync.rs +++ b/crates/interfaces/src/sync.rs @@ -16,7 +16,7 @@ pub trait SyncStateProvider: Send + Sync { /// Eventually the node reaches the `Finish` stage and will transition to [`SyncState::Idle`], it /// which point the node is considered fully synced. #[auto_impl::auto_impl(&, Arc, Box)] -pub trait SyncStateUpdater: Send + Sync + 'static { +pub trait SyncStateUpdater: std::fmt::Debug + Send + Sync + 'static { /// Notifies about an [SyncState] update. fn update_sync_state(&self, state: SyncState); } diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index ae2b36857..e1e0b1063 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -50,7 +50,7 @@ pub struct BodiesDownloader { consensus: Arc, // TODO: make this a [HeaderProvider] /// The database handle - db: Arc, + db: DB, /// The maximum number of non-empty blocks per one request request_limit: u64, /// The maximum number of block bodies returned at once from the stream @@ -76,7 +76,7 @@ pub struct BodiesDownloader { impl BodiesDownloader where B: BodiesClient + 'static, - DB: Database, + DB: Database + Unpin + 'static, { /// Returns the next contiguous request. fn next_headers_request(&mut self) -> DownloadResult>> { @@ -249,7 +249,7 @@ where impl BodiesDownloader where B: BodiesClient + 'static, - DB: Database, + DB: Database + Unpin + 'static, Self: BodyDownloader + 'static, { /// Spawns the downloader task via [tokio::task::spawn] @@ -270,7 +270,7 @@ where impl BodyDownloader for BodiesDownloader where B: BodiesClient + 'static, - DB: Database, + DB: Database + Unpin + 'static, { /// Set a new download range (exclusive). /// @@ -315,7 +315,7 @@ where impl Stream for BodiesDownloader where B: BodiesClient + 'static, - DB: Database, + DB: Database + Unpin + 'static, { type Item = BodyDownloaderResult; @@ -497,7 +497,7 @@ impl BodiesDownloaderBuilder { self, client: B, consensus: Arc, - db: Arc, + db: DB, ) -> BodiesDownloader where B: BodiesClient + 'static, diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 8f7331fc7..d50fb70cf 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -21,7 +21,6 @@ //! ``` //! # use std::sync::Arc; //! # use reth_db::mdbx::test_utils::create_test_rw_db; -//! # use reth_db::mdbx::{Env, WriteMap}; //! # use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; //! # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloaderBuilder; //! # use reth_interfaces::consensus::Consensus; @@ -37,22 +36,23 @@ //! # Arc::new(TestHeadersClient::default()), //! # consensus.clone() //! # ); +//! # let db = create_test_rw_db(); //! # let bodies_downloader = BodiesDownloaderBuilder::default().build( //! # Arc::new(TestBodiesClient { responder: |_| Ok((PeerId::zero(), vec![]).into()) }), //! # consensus.clone(), -//! # create_test_rw_db() +//! # db.clone() //! # ); //! # let (tip_tx, tip_rx) = watch::channel(H256::default()); //! # let factory = Factory::new(Arc::new(MAINNET.clone())); //! # let (status_updater, _) = TestStatusUpdater::new(); //! // Create a pipeline that can fully sync -//! # let pipeline: Pipeline> = +//! # let pipeline = //! Pipeline::builder() //! .with_tip_sender(tip_tx) //! .add_stages( //! DefaultStages::new(HeaderSyncMode::Tip(tip_rx), consensus, headers_downloader, bodies_downloader, status_updater, factory) //! ) -//! .build(); +//! .build(db); //! ``` mod error; mod id; diff --git a/crates/stages/src/pipeline/builder.rs b/crates/stages/src/pipeline/builder.rs index f4902c4de..524337ce6 100644 --- a/crates/stages/src/pipeline/builder.rs +++ b/crates/stages/src/pipeline/builder.rs @@ -1,23 +1,25 @@ -use crate::{Pipeline, Stage, StageSet}; +use crate::{pipeline::BoxedStage, Pipeline, Stage, StageId, StageSet}; use reth_db::database::Database; -use reth_interfaces::sync::SyncStateUpdater; +use reth_interfaces::sync::{NoopSyncStateUpdate, SyncStateUpdater}; use reth_primitives::{BlockNumber, H256}; use tokio::sync::watch; /// Builds a [`Pipeline`]. -#[derive(Debug)] #[must_use = "call `build` to construct the pipeline"] pub struct PipelineBuilder where DB: Database, { - pipeline: Pipeline, -} - -impl Default for PipelineBuilder { - fn default() -> Self { - Self { pipeline: Pipeline::default() } - } + /// All configured stages in the order they will be executed. + stages: Vec>, + /// The maximum block number to sync to. + max_block: Option, + /// Used for emitting updates about whether the pipeline is running or not. + sync_state_updater: Box, + /// A receiver for the current chain tip to sync to + /// + /// Note: this is only used for debugging purposes. + tip_tx: Option>, } impl PipelineBuilder @@ -29,7 +31,7 @@ where where S: Stage + 'static, { - self.pipeline.stages.push(Box::new(stage)); + self.stages.push(Box::new(stage)); self } @@ -42,7 +44,7 @@ where /// [`StageSetBuilder`][crate::StageSetBuilder]. pub fn add_stages>(mut self, set: Set) -> Self { for stage in set.builder().build() { - self.pipeline.stages.push(stage); + self.stages.push(stage); } self } @@ -51,24 +53,57 @@ where /// /// Once this block is reached, the pipeline will stop. pub fn with_max_block(mut self, block: BlockNumber) -> Self { - self.pipeline.max_block = Some(block); + self.max_block = Some(block); self } /// Set the tip sender. pub fn with_tip_sender(mut self, tip_tx: watch::Sender) -> Self { - self.pipeline.tip_tx = Some(tip_tx); + self.tip_tx = Some(tip_tx); self } /// Set a [SyncStateUpdater]. pub fn with_sync_state_updater(mut self, updater: U) -> Self { - self.pipeline.sync_state_updater = Box::new(updater); + self.sync_state_updater = Box::new(updater); self } - /// Builds the final [`Pipeline`]. - pub fn build(self) -> Pipeline { - self.pipeline + /// Builds the final [`Pipeline`] using the given database. + /// + /// Note: it's expected that this is either an [Arc](std::sync::Arc) or an Arc wrapper type. + pub fn build(self, db: DB) -> Pipeline { + let Self { stages, max_block, sync_state_updater, tip_tx } = self; + Pipeline { + db, + stages, + max_block, + sync_state_updater, + tip_tx, + listeners: Default::default(), + progress: Default::default(), + metrics: Default::default(), + } + } +} + +impl Default for PipelineBuilder { + fn default() -> Self { + Self { + stages: Vec::new(), + max_block: None, + sync_state_updater: Box::::default(), + tip_tx: None, + } + } +} + +impl std::fmt::Debug for PipelineBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PipelineBuilder") + .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::>()) + .field("max_block", &self.max_block) + .field("sync_state_updater", &self.sync_state_updater) + .finish() } } diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 71d0f37e1..310426be5 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -1,15 +1,10 @@ use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput}; use futures_util::Future; use reth_db::database::Database; -use reth_interfaces::sync::{NoopSyncStateUpdate, SyncState, SyncStateUpdater}; +use reth_interfaces::sync::{SyncState, SyncStateUpdater}; use reth_primitives::{listener::EventListeners, BlockNumber, H256}; use reth_provider::Transaction; -use std::{ - fmt::{Debug, Formatter}, - ops::Deref, - pin::Pin, - sync::Arc, -}; +use std::{ops::Deref, pin::Pin}; use tokio::sync::watch; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; @@ -28,6 +23,16 @@ use progress::*; pub use set::*; use sync_metrics::*; +/// A container for a queued stage. +pub(crate) type BoxedStage = Box>; + +/// The future that returns the owned pipeline and the result of the pipeline run. See +/// [Pipeline::run_as_fut]. +pub type PipelineFut = Pin> + Send>>; + +/// The pipeline type itself with the result of [Pipeline::run_as_fut] +pub type PipelineWithResult = (Pipeline, Result); + #[cfg_attr(doc, aquamarine::aquamarine)] /// A staged sync pipeline. /// @@ -79,6 +84,8 @@ use sync_metrics::*; /// pipeline will unwind the stages in reverse order of execution. It is also possible to /// request an unwind manually (see [Pipeline::unwind]). pub struct Pipeline { + /// The Database + db: DB, /// All configured stages in the order they will be executed. stages: Vec>, /// The maximum block number to sync to. @@ -96,36 +103,6 @@ pub struct Pipeline { metrics: Metrics, } -/// The future that returns the owned pipeline and the result of the pipeline run. See -/// [Pipeline::run_as_fut]. -pub type PipelineFut = Pin> + Send>>; - -/// The pipeline type itself with the result of [Pipeline::run_as_fut] -pub type PipelineWithResult = (Pipeline, Result); - -impl Default for Pipeline { - fn default() -> Self { - Self { - stages: Vec::new(), - max_block: None, - listeners: EventListeners::default(), - sync_state_updater: Box::::default(), - progress: PipelineProgress::default(), - tip_tx: None, - metrics: Metrics::default(), - } - } -} - -impl Debug for Pipeline { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Pipeline") - .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::>()) - .field("max_block", &self.max_block) - .finish() - } -} - impl Pipeline where DB: Database + 'static, @@ -154,12 +131,13 @@ where } /// Registers progress metrics for each registered stage - pub fn register_metrics(&mut self, db: Arc) { + pub fn register_metrics(&mut self) { for stage in &self.stages { let stage_id = stage.id(); self.metrics.stage_checkpoint( stage_id, - db.view(|tx| stage_id.get_progress(tx).ok().flatten().unwrap_or_default()) + self.db + .view(|tx| stage_id.get_progress(tx).ok().flatten().unwrap_or_default()) .ok() .unwrap_or_default(), ); @@ -169,16 +147,16 @@ where /// Consume the pipeline and run it until it reaches the provided tip, if set. Return the /// pipeline and its result as a future. #[track_caller] - pub fn run_as_fut(mut self, db: Arc, tip: Option) -> PipelineFut { + pub fn run_as_fut(mut self, tip: Option) -> PipelineFut { // TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for // updating metrics. - self.register_metrics(db.clone()); + self.register_metrics(); Box::pin(async move { // NOTE: the tip should only be None if we are in continuous sync mode. if let Some(tip) = tip { self.set_tip(tip); } - let result = self.run_loop(db).await; + let result = self.run_loop().await; trace!(target: "sync::pipeline", ?tip, ?result, "Pipeline finished"); (self, result) }) @@ -186,11 +164,11 @@ where /// Run the pipeline in an infinite loop. Will terminate early if the user has specified /// a `max_block` in the pipeline. - pub async fn run(&mut self, db: Arc) -> Result<(), PipelineError> { - self.register_metrics(db.clone()); + pub async fn run(&mut self) -> Result<(), PipelineError> { + self.register_metrics(); loop { - let next_action = self.run_loop(db.clone()).await?; + let next_action = self.run_loop().await?; // Terminate the loop early if it's reached the maximum user // configured block. @@ -218,7 +196,7 @@ where /// If any stage is unsuccessful at execution, we proceed to /// unwind. This will undo the progress across the entire pipeline /// up to the block that caused the error. - async fn run_loop(&mut self, db: Arc) -> Result { + async fn run_loop(&mut self) -> Result { let mut previous_stage = None; for stage_index in 0..self.stages.len() { let stage = &self.stages[stage_index]; @@ -233,7 +211,7 @@ where trace!(target: "sync::pipeline", stage = %stage_id, "Executing stage"); let next = self - .execute_stage_to_completion(db.as_ref(), previous_stage, stage_index) + .execute_stage_to_completion(previous_stage, stage_index) .instrument(info_span!("execute", stage = %stage_id)) .await?; @@ -249,13 +227,15 @@ where ControlFlow::Unwind { target, bad_block } => { // reset the sync state self.sync_state_updater.update_sync_state(SyncState::Syncing); - self.unwind(db.as_ref(), target, bad_block).await?; + self.unwind(target, bad_block).await?; return Ok(ControlFlow::Unwind { target, bad_block }) } } - previous_stage = - Some((stage_id, db.view(|tx| stage_id.get_progress(tx))??.unwrap_or_default())); + previous_stage = Some(( + stage_id, + self.db.view(|tx| stage_id.get_progress(tx))??.unwrap_or_default(), + )); } Ok(self.progress.next_ctrl()) @@ -266,14 +246,13 @@ where /// If the unwind is due to a bad block the number of that block should be specified. pub async fn unwind( &mut self, - db: &DB, to: BlockNumber, bad_block: Option, ) -> Result<(), PipelineError> { // Unwind stages in reverse order of execution let unwind_pipeline = self.stages.iter_mut().rev(); - let mut tx = Transaction::new(db)?; + let mut tx = Transaction::new(&self.db)?; for stage in unwind_pipeline { let stage_id = stage.id(); @@ -316,7 +295,6 @@ where async fn execute_stage_to_completion( &mut self, - db: &DB, previous_stage: Option<(StageId, BlockNumber)>, stage_index: usize, ) -> Result { @@ -324,7 +302,7 @@ where let stage_id = stage.id(); let mut made_progress = false; loop { - let mut tx = Transaction::new(db)?; + let mut tx = Transaction::new(&self.db)?; let prev_progress = stage_id.get_progress(tx.deref())?; @@ -419,8 +397,16 @@ where } } -/// A container for a queued stage. -pub(crate) type BoxedStage = Box>; +impl std::fmt::Debug for Pipeline { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Pipeline") + .field("stages", &self.stages.iter().map(|stage| stage.id()).collect::>()) + .field("max_block", &self.max_block) + .field("listeners", &self.listeners) + .field("sync_state_updater", &self.sync_state_updater) + .finish() + } +} #[cfg(test)] mod tests { @@ -463,7 +449,7 @@ mod tests { async fn run_pipeline() { let db = test_utils::create_test_db::(EnvKind::RW); - let mut pipeline: Pipeline<_> = Pipeline::builder() + let mut pipeline = Pipeline::builder() .add_stage( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { stage_progress: 20, done: true })), @@ -473,12 +459,12 @@ mod tests { .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) .with_max_block(10) - .build(); + .build(db); let events = pipeline.events(); // Run pipeline tokio::spawn(async move { - pipeline.run(db).await.unwrap(); + pipeline.run().await.unwrap(); }); // Check that the stages were run in order @@ -521,16 +507,16 @@ mod tests { .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), ) .with_max_block(10) - .build(); + .build(db); let events = pipeline.events(); // Run pipeline tokio::spawn(async move { // Sync first - pipeline.run(db.clone()).await.expect("Could not run pipeline"); + pipeline.run().await.expect("Could not run pipeline"); // Unwind - pipeline.unwind(&db, 1, None).await.expect("Could not unwind pipeline"); + pipeline.unwind(1, None).await.expect("Could not unwind pipeline"); }); // Check that the stages were unwound in reverse order @@ -598,16 +584,16 @@ mod tests { .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) .with_max_block(10) - .build(); + .build(db); let events = pipeline.events(); // Run pipeline tokio::spawn(async move { // Sync first - pipeline.run(db.clone()).await.expect("Could not run pipeline"); + pipeline.run().await.expect("Could not run pipeline"); // Unwind - pipeline.unwind(&db, 50, None).await.expect("Could not unwind pipeline"); + pipeline.unwind(50, None).await.expect("Could not unwind pipeline"); }); // Check that the stages were unwound in reverse order @@ -673,12 +659,12 @@ mod tests { .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) .with_max_block(10) - .build(); + .build(db); let events = pipeline.events(); // Run pipeline tokio::spawn(async move { - pipeline.run(db).await.expect("Could not run pipeline"); + pipeline.run().await.expect("Could not run pipeline"); }); // Check that the stages were unwound in reverse order @@ -726,8 +712,8 @@ mod tests { .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) .with_max_block(10) - .build(); - let result = pipeline.run(db).await; + .build(db); + let result = pipeline.run().await; assert_matches!(result, Ok(())); // Fatal @@ -736,8 +722,8 @@ mod tests { .add_stage(TestStage::new(StageId("Fatal")).add_exec(Err( StageError::DatabaseIntegrity(ProviderError::BlockBodyIndices { number: 5 }), ))) - .build(); - let result = pipeline.run(db).await; + .build(db); + let result = pipeline.run().await; assert_matches!( result, Err(PipelineError::Stage(StageError::DatabaseIntegrity( diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index 93069ccad..6062cf80e 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -10,17 +10,18 @@ //! # Examples //! //! ```no_run -//! # use reth_db::mdbx::{Env, WriteMap}; //! # use reth_stages::Pipeline; //! # use reth_stages::sets::{OfflineStages}; //! # use reth_revm::Factory; //! # use reth_primitives::MAINNET; //! # use std::sync::Arc; +//! use reth_db::mdbx::test_utils::create_test_rw_db; //! //! # let factory = Factory::new(Arc::new(MAINNET.clone())); +//! # let db = create_test_rw_db(); //! // Build a pipeline with all offline stages. -//! # let pipeline: Pipeline> = -//! Pipeline::builder().add_stages(OfflineStages::new(factory)).build(); +//! # let pipeline = +//! Pipeline::builder().add_stages(OfflineStages::new(factory)).build(db); //! ``` //! //! ```ignore