diff --git a/Cargo.lock b/Cargo.lock index 5f76031ce..b08629d28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6147,7 +6147,7 @@ dependencies = [ "reth-primitives", "reth-provider", "reth-revm", - "reth-stages", + "reth-stages-api", "reth-transaction-pool", "tokio", "tokio-stream", @@ -6238,7 +6238,7 @@ dependencies = [ "reth-primitives", "reth-provider", "reth-revm", - "reth-stages", + "reth-stages-api", "reth-trie", "reth-trie-parallel", "tokio", @@ -7427,6 +7427,7 @@ dependencies = [ "reth-primitives", "reth-provider", "reth-revm", + "reth-stages-api", "reth-static-file", "reth-tokio-util", "reth-trie", @@ -7438,6 +7439,28 @@ dependencies = [ "tracing", ] +[[package]] +name = "reth-stages-api" +version = "0.2.0-beta.5" +dependencies = [ + "aquamarine", + "assert_matches", + "auto_impl", + "futures-util", + "metrics", + "reth-db", + "reth-interfaces", + "reth-metrics", + "reth-primitives", + "reth-provider", + "reth-static-file", + "reth-tokio-util", + "thiserror", + "tokio", + "tokio-stream", + "tracing", +] + [[package]] name = "reth-static-file" version = "0.2.0-beta.5" diff --git a/Cargo.toml b/Cargo.toml index 2ce5d813d..e025f29d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ members = [ "crates/node-api/", "crates/node-e2e-tests/", "crates/stages/", + "crates/stages-api", "crates/static-file/", "crates/storage/codecs/", "crates/storage/codecs/derive/", @@ -95,9 +96,9 @@ resolver = "2" rust.missing_debug_implementations = "warn" rust.missing_docs = "warn" rust.unreachable_pub = "warn" -rustdoc.all = "warn" rust.unused_must_use = "deny" rust.rust_2018_idioms = "deny" +rustdoc.all = "warn" [workspace.lints.clippy] # These are some of clippy's nursery (i.e., experimental) lints that we like. @@ -254,6 +255,7 @@ reth-rpc-engine-api = { path = "crates/rpc/rpc-engine-api" } reth-rpc-types = { path = "crates/rpc/rpc-types" } reth-rpc-types-compat = { path = "crates/rpc/rpc-types-compat" } reth-stages = { path = "crates/stages" } +reth-stages-api = { path = "crates/stages-api" } reth-static-file = { path = "crates/static-file" } reth-tasks = { path = "crates/tasks" } reth-tokio-util = { path = "crates/tokio-util" } diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index ce9cd3efe..d15106ce3 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -34,6 +34,7 @@ use reth_provider::{HeaderSyncMode, ProviderFactory, StageCheckpointReader}; use reth_stages::{ prelude::*, stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage}, + Pipeline, StageSet, }; use reth_static_file::StaticFileProducer; use std::{path::PathBuf, sync::Arc}; diff --git a/crates/blockchain-tree/Cargo.toml b/crates/blockchain-tree/Cargo.toml index 81bf9ea05..3a6ab1439 100644 --- a/crates/blockchain-tree/Cargo.toml +++ b/crates/blockchain-tree/Cargo.toml @@ -16,7 +16,7 @@ reth-primitives.workspace = true reth-interfaces.workspace = true reth-db.workspace = true reth-provider.workspace = true -reth-stages.workspace = true +reth-stages-api.workspace = true reth-trie = { workspace = true, features = ["metrics"] } reth-trie-parallel = { workspace = true, features = ["parallel"] } diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index 7c51590eb..eb699ff1a 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -26,7 +26,7 @@ use reth_provider::{ CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, Chain, ChainSpecProvider, DisplayBlocksChain, ExecutorFactory, HeaderProvider, ProviderError, }; -use reth_stages::{MetricEvent, MetricEventsSender}; +use reth_stages_api::{MetricEvent, MetricEventsSender}; use std::{ collections::{btree_map::Entry, BTreeMap, HashSet}, sync::Arc, diff --git a/crates/consensus/auto-seal/Cargo.toml b/crates/consensus/auto-seal/Cargo.toml index 72a593b5a..5fbf4f07a 100644 --- a/crates/consensus/auto-seal/Cargo.toml +++ b/crates/consensus/auto-seal/Cargo.toml @@ -17,7 +17,7 @@ reth-beacon-consensus.workspace = true reth-primitives.workspace = true reth-interfaces.workspace = true reth-provider.workspace = true -reth-stages.workspace = true +reth-stages-api.workspace = true reth-revm.workspace = true reth-transaction-pool.workspace = true reth-evm.workspace = true diff --git a/crates/consensus/auto-seal/src/task.rs b/crates/consensus/auto-seal/src/task.rs index 6d7a29a33..e76b4333e 100644 --- a/crates/consensus/auto-seal/src/task.rs +++ b/crates/consensus/auto-seal/src/task.rs @@ -6,7 +6,7 @@ use reth_evm::ConfigureEvm; use reth_interfaces::consensus::ForkchoiceState; use reth_primitives::{Block, ChainSpec, IntoRecoveredTransaction, SealedBlockWithSenders}; use reth_provider::{CanonChainTracker, CanonStateNotificationSender, Chain, StateProviderFactory}; -use reth_stages::PipelineEvent; +use reth_stages_api::PipelineEvent; use reth_transaction_pool::{TransactionPool, ValidPoolTransaction}; use std::{ collections::VecDeque, diff --git a/crates/node-e2e-tests/tests/it/eth.rs b/crates/node-e2e-tests/tests/it/eth.rs index 5686c6e18..a2c761221 100644 --- a/crates/node-e2e-tests/tests/it/eth.rs +++ b/crates/node-e2e-tests/tests/it/eth.rs @@ -1,6 +1,5 @@ use node_e2e_tests::{node::NodeHelper, wallet::Wallet}; use reth::{ - self, args::RpcServerArgs, builder::{NodeBuilder, NodeConfig, NodeHandle}, tasks::TaskManager, diff --git a/crates/stages-api/Cargo.toml b/crates/stages-api/Cargo.toml new file mode 100644 index 000000000..8d1eccd6e --- /dev/null +++ b/crates/stages-api/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "reth-stages-api" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +exclude.workspace = true + +[dependencies] +# reth +reth-primitives.workspace = true +reth-provider.workspace = true +reth-db.workspace = true +reth-interfaces.workspace = true +reth-static-file.workspace = true +assert_matches.workspace = true +reth-tokio-util.workspace = true + +# metrics +reth-metrics.workspace = true +metrics.workspace = true + +# async +tokio = { workspace = true, features = ["sync"] } +tokio-stream.workspace = true +futures-util.workspace = true + +# misc +thiserror.workspace = true +tracing.workspace = true +auto_impl = "1" +aquamarine.workspace = true + +[features] +test-utils = [] + +[lints] +workspace = true diff --git a/crates/stages/docs/mermaid/pipeline.mmd b/crates/stages-api/docs/mermaid/pipeline.mmd similarity index 100% rename from crates/stages/docs/mermaid/pipeline.mmd rename to crates/stages-api/docs/mermaid/pipeline.mmd diff --git a/crates/stages/src/error.rs b/crates/stages-api/src/error.rs similarity index 98% rename from crates/stages/src/error.rs rename to crates/stages-api/src/error.rs index e8a5e3a71..3b744e7cb 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages-api/src/error.rs @@ -1,10 +1,11 @@ -use crate::pipeline::PipelineEvent; use reth_interfaces::{ - consensus, db::DatabaseError as DbError, executor, p2p::error::DownloadError, - provider::ProviderError, RethError, + consensus, db::DatabaseError as DbError, executor, p2p::error::DownloadError, RethError, }; use reth_primitives::{BlockNumber, SealedHeader, StaticFileSegment, TxNumber}; +use reth_provider::ProviderError; use thiserror::Error; + +use crate::PipelineEvent; use tokio::sync::mpsc::error::SendError; /// Represents the specific error type within a block error. diff --git a/crates/stages-api/src/lib.rs b/crates/stages-api/src/lib.rs new file mode 100644 index 000000000..fa6cd74e6 --- /dev/null +++ b/crates/stages-api/src/lib.rs @@ -0,0 +1,14 @@ +//! Staged syncing primitives for reth. +mod error; +mod metrics; +mod pipeline; +mod stage; +#[allow(missing_docs)] +#[cfg(any(test, feature = "test-utils"))] +pub mod test_utils; +mod util; + +pub use crate::metrics::*; +pub use error::*; +pub use pipeline::*; +pub use stage::*; diff --git a/crates/stages/src/metrics/listener.rs b/crates/stages-api/src/metrics/listener.rs similarity index 98% rename from crates/stages/src/metrics/listener.rs rename to crates/stages-api/src/metrics/listener.rs index 39ccc29a3..2aa566744 100644 --- a/crates/stages/src/metrics/listener.rs +++ b/crates/stages-api/src/metrics/listener.rs @@ -45,7 +45,8 @@ pub enum MetricEvent { #[derive(Debug)] pub struct MetricsListener { events_rx: UnboundedReceiver, - pub(crate) sync_metrics: SyncMetrics, + /// underline metrics of stages + pub sync_metrics: SyncMetrics, } impl MetricsListener { diff --git a/crates/stages/src/metrics/mod.rs b/crates/stages-api/src/metrics/mod.rs similarity index 100% rename from crates/stages/src/metrics/mod.rs rename to crates/stages-api/src/metrics/mod.rs diff --git a/crates/stages/src/metrics/sync_metrics.rs b/crates/stages-api/src/metrics/sync_metrics.rs similarity index 64% rename from crates/stages/src/metrics/sync_metrics.rs rename to crates/stages-api/src/metrics/sync_metrics.rs index 148368f02..64c38c21e 100644 --- a/crates/stages/src/metrics/sync_metrics.rs +++ b/crates/stages-api/src/metrics/sync_metrics.rs @@ -6,14 +6,14 @@ use reth_primitives::stage::StageId; use std::collections::HashMap; #[derive(Debug, Default)] -pub(crate) struct SyncMetrics { - pub(crate) stages: HashMap, - pub(crate) execution_stage: ExecutionStageMetrics, +pub struct SyncMetrics { + pub stages: HashMap, + pub execution_stage: ExecutionStageMetrics, } impl SyncMetrics { /// Returns existing or initializes a new instance of [StageMetrics] for the provided [StageId]. - pub(crate) fn get_stage_metrics(&mut self, stage_id: StageId) -> &mut StageMetrics { + pub fn get_stage_metrics(&mut self, stage_id: StageId) -> &mut StageMetrics { self.stages .entry(stage_id) .or_insert_with(|| StageMetrics::new_with_labels(&[("stage", stage_id.to_string())])) @@ -22,19 +22,19 @@ impl SyncMetrics { #[derive(Metrics)] #[metrics(scope = "sync")] -pub(crate) struct StageMetrics { +pub struct StageMetrics { /// The block number of the last commit for a stage. - pub(crate) checkpoint: Gauge, + pub checkpoint: Gauge, /// The number of processed entities of the last commit for a stage, if applicable. - pub(crate) entities_processed: Gauge, + pub entities_processed: Gauge, /// The number of total entities of the last commit for a stage, if applicable. - pub(crate) entities_total: Gauge, + pub entities_total: Gauge, } /// Execution stage metrics. #[derive(Metrics)] #[metrics(scope = "sync.execution")] -pub(crate) struct ExecutionStageMetrics { +pub struct ExecutionStageMetrics { /// The total amount of gas processed (in millions) - pub(crate) mgas_processed_total: Counter, + pub mgas_processed_total: Counter, } diff --git a/crates/stages/src/pipeline/builder.rs b/crates/stages-api/src/pipeline/builder.rs similarity index 100% rename from crates/stages/src/pipeline/builder.rs rename to crates/stages-api/src/pipeline/builder.rs diff --git a/crates/stages/src/pipeline/ctrl.rs b/crates/stages-api/src/pipeline/ctrl.rs similarity index 100% rename from crates/stages/src/pipeline/ctrl.rs rename to crates/stages-api/src/pipeline/ctrl.rs diff --git a/crates/stages/src/pipeline/event.rs b/crates/stages-api/src/pipeline/event.rs similarity index 100% rename from crates/stages/src/pipeline/event.rs rename to crates/stages-api/src/pipeline/event.rs diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages-api/src/pipeline/mod.rs similarity index 98% rename from crates/stages/src/pipeline/mod.rs rename to crates/stages-api/src/pipeline/mod.rs index eb1f40cbd..d19325a3c 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages-api/src/pipeline/mod.rs @@ -1,6 +1,8 @@ -use crate::{ - error::*, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage, StageExt, UnwindInput, -}; +mod ctrl; +mod event; +pub use crate::pipeline::ctrl::ControlFlow; +pub use event::*; + use futures_util::Future; use reth_db::database::Database; use reth_interfaces::RethResult; @@ -20,15 +22,22 @@ use tokio::sync::watch; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; -mod builder; -mod ctrl; -mod event; -mod progress; -mod set; +// (todo) remove it +#[allow(missing_docs)] +pub mod builder; -pub use crate::pipeline::ctrl::ControlFlow; +// (todo) remove it +#[allow(missing_docs)] +pub mod progress; +// (todo) remove it +#[allow(missing_docs)] +pub mod set; + +use crate::{ + BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, PipelineError, Stage, + StageError, StageExt, UnwindInput, +}; pub use builder::*; -pub use event::*; use progress::*; pub use set::*; @@ -372,7 +381,7 @@ where let exec_input = ExecInput { target, checkpoint: prev_checkpoint }; self.listeners.notify(PipelineEvent::Prepare { - pipeline_stages_progress: event::PipelineStagesProgress { + pipeline_stages_progress: PipelineStagesProgress { current: stage_index + 1, total: total_stages, }, @@ -390,7 +399,7 @@ where } self.listeners.notify(PipelineEvent::Run { - pipeline_stages_progress: event::PipelineStagesProgress { + pipeline_stages_progress: PipelineStagesProgress { current: stage_index + 1, total: total_stages, }, @@ -415,7 +424,7 @@ where provider_rw.save_stage_checkpoint(stage_id, checkpoint)?; self.listeners.notify(PipelineEvent::Ran { - pipeline_stages_progress: event::PipelineStagesProgress { + pipeline_stages_progress: PipelineStagesProgress { current: stage_index + 1, total: total_stages, }, @@ -896,9 +905,9 @@ mod tests { /// /// - Stage A syncs to block 10 /// - Stage B triggers an unwind, marking block 5 as bad - /// - Stage B unwinds to it's previous progress, block 0 but since it is still at block 0, it is + /// - Stage B unwinds to its previous progress, block 0 but since it is still at block 0, it is /// skipped entirely (there is nothing to unwind) - /// - Stage A unwinds to it's previous progress, block 0 + /// - Stage A unwinds to its previous progress, block 0 /// - Stage A syncs back up to block 10 /// - Stage B syncs to block 10 /// - The pipeline finishes diff --git a/crates/stages/src/pipeline/progress.rs b/crates/stages-api/src/pipeline/progress.rs similarity index 77% rename from crates/stages/src/pipeline/progress.rs rename to crates/stages-api/src/pipeline/progress.rs index 1c4bbcf6c..cb124a8bc 100644 --- a/crates/stages/src/pipeline/progress.rs +++ b/crates/stages-api/src/pipeline/progress.rs @@ -1,15 +1,14 @@ -use super::ctrl::ControlFlow; -use crate::util::opt; +use crate::{util::opt, ControlFlow}; use reth_primitives::BlockNumber; #[derive(Debug, Default)] -pub(crate) struct PipelineProgress { +pub struct PipelineProgress { /// Block number reached by the stage. - pub(crate) block_number: Option, + pub block_number: Option, /// The maximum block number achieved by any stage during the execution of the pipeline. - pub(crate) maximum_block_number: Option, + pub maximum_block_number: Option, /// The minimum block number achieved by any stage during the execution of the pipeline. - pub(crate) minimum_block_number: Option, + pub minimum_block_number: Option, } impl PipelineProgress { diff --git a/crates/stages/src/pipeline/set.rs b/crates/stages-api/src/pipeline/set.rs similarity index 100% rename from crates/stages/src/pipeline/set.rs rename to crates/stages-api/src/pipeline/set.rs diff --git a/crates/stages/src/stage.rs b/crates/stages-api/src/stage.rs similarity index 100% rename from crates/stages/src/stage.rs rename to crates/stages-api/src/stage.rs diff --git a/crates/stages-api/src/test_utils/mod.rs b/crates/stages-api/src/test_utils/mod.rs new file mode 100644 index 000000000..1a44c2fa8 --- /dev/null +++ b/crates/stages-api/src/test_utils/mod.rs @@ -0,0 +1,2 @@ +mod stage; +pub use stage::TestStage; diff --git a/crates/stages/src/test_utils/stage.rs b/crates/stages-api/src/test_utils/stage.rs similarity index 100% rename from crates/stages/src/test_utils/stage.rs rename to crates/stages-api/src/test_utils/stage.rs diff --git a/crates/stages/src/util.rs b/crates/stages-api/src/util.rs similarity index 100% rename from crates/stages/src/util.rs rename to crates/stages-api/src/util.rs diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index 84ef28e0f..4c9fc8dde 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -23,6 +23,7 @@ reth-tokio-util.workspace = true reth-etl.workspace = true reth-static-file.workspace = true reth-config.workspace = true +reth-stages-api = {workspace = true , features = ["test-utils"]} # async tokio = { workspace = true, features = ["sync"] } @@ -78,7 +79,7 @@ serde_json.workspace = true pprof = { workspace = true, features = ["flamegraph", "frame-pointer", "criterion"] } [features] -test-utils = ["reth-interfaces/test-utils", "reth-db/test-utils", "reth-provider/test-utils"] +test-utils = ["reth-interfaces/test-utils", "reth-db/test-utils", "reth-provider/test-utils", "reth-stages-api/test-utils"] [[bench]] name = "criterion" diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 03cb52383..13f7d5386 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -12,8 +12,8 @@ use reth_primitives::{stage::StageCheckpoint, BlockNumber}; use reth_stages::{ stages::{MerkleStage, SenderRecoveryStage, TransactionLookupStage}, test_utils::TestStageDB, - ExecInput, Stage, StageExt, UnwindInput, }; +use reth_stages_api::{ExecInput, Stage, StageExt, UnwindInput}; use std::{ops::RangeInclusive, sync::Arc}; mod setup; diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index 76c1faaf0..e94fb81b3 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -18,7 +18,6 @@ use reth_primitives::{fs, Account, Address, SealedBlock, B256, U256}; use reth_stages::{ stages::{AccountHashingStage, StorageHashingStage}, test_utils::{StorageKind, TestStageDB}, - ExecInput, Stage, UnwindInput, }; use reth_trie::StateRoot; use std::{collections::BTreeMap, path::Path, sync::Arc}; @@ -27,6 +26,7 @@ mod constants; mod account_hashing; pub use account_hashing::*; +use reth_stages_api::{ExecInput, Stage, UnwindInput}; pub(crate) type StageRange = (ExecInput, UnwindInput); diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 6e123f750..e11302543 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -77,12 +77,6 @@ )] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -mod error; -mod metrics; -mod pipeline; -mod stage; -mod util; - #[allow(missing_docs)] #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; @@ -95,7 +89,5 @@ pub mod stages; pub mod sets; -pub use crate::metrics::*; -pub use error::*; -pub use pipeline::*; -pub use stage::*; +// re-export the stages API +pub use reth_stages_api::*; diff --git a/crates/stages/src/prelude.rs b/crates/stages/src/prelude.rs index 3826c8d2c..a6c56e56e 100644 --- a/crates/stages/src/prelude.rs +++ b/crates/stages/src/prelude.rs @@ -1,8 +1,4 @@ -pub use crate::{ - error::{PipelineError, StageError}, - pipeline::{Pipeline, PipelineBuilder, PipelineEvent, StageSet, StageSetBuilder}, - sets::{ - DefaultStages, ExecutionStages, HashingStages, HistoryIndexingStages, OfflineStages, - OnlineStages, - }, +pub use crate::sets::{ + DefaultStages, ExecutionStages, HashingStages, HistoryIndexingStages, OfflineStages, + OnlineStages, }; diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index b52274b1e..6dfe7a6a8 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -1,5 +1,11 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; +use std::{ + cmp::Ordering, + task::{ready, Context, Poll}, +}; + use futures_util::TryStreamExt; +use tracing::*; + use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -19,11 +25,9 @@ use reth_provider::{ providers::{StaticFileProvider, StaticFileWriter}, BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, StatsReader, }; -use std::{ - cmp::Ordering, - task::{ready, Context, Poll}, -}; -use tracing::*; +use reth_stages_api::{ExecInput, ExecOutput, StageError, UnwindInput, UnwindOutput}; + +use reth_stages_api::Stage; // TODO(onbjerg): Metrics and events (gradual status for e.g. CLI) /// The body stage downloads block bodies. @@ -374,13 +378,16 @@ fn stage_checkpoint( #[cfg(test)] mod tests { - use super::*; + use assert_matches::assert_matches; + + use reth_primitives::stage::StageUnitCheckpoint; + use test_utils::*; + use crate::test_utils::{ stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner, }; - use assert_matches::assert_matches; - use reth_primitives::stage::StageUnitCheckpoint; - use test_utils::*; + + use super::*; stage_test_suite_ext!(BodyTestRunner, body); @@ -588,15 +595,16 @@ mod tests { } mod test_utils { - use crate::{ - stages::bodies::BodyStage, - test_utils::{ - ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB, - UnwindStageTestRunner, - }, - ExecInput, ExecOutput, UnwindInput, + use std::{ + collections::{HashMap, VecDeque}, + ops::RangeInclusive, + pin::Pin, + sync::Arc, + task::{Context, Poll}, }; + use futures_util::Stream; + use reth_db::{ cursor::DbCursorRO, models::{StoredBlockBodyIndices, StoredBlockOmmers}, @@ -626,12 +634,14 @@ mod tests { use reth_provider::{ providers::StaticFileWriter, HeaderProvider, ProviderFactory, TransactionsProvider, }; - use std::{ - collections::{HashMap, VecDeque}, - ops::RangeInclusive, - pin::Pin, - sync::Arc, - task::{Context, Poll}, + use reth_stages_api::{ExecInput, ExecOutput, UnwindInput}; + + use crate::{ + stages::bodies::BodyStage, + test_utils::{ + ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestStageDB, + UnwindStageTestRunner, + }, }; /// The block hash of the genesis block. diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 7c6cefbb8..ecaae33c3 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -1,7 +1,4 @@ -use crate::{ - stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, BlockErrorKind, ExecInput, ExecOutput, - MetricEvent, MetricEventsSender, Stage, StageError, UnwindInput, UnwindOutput, -}; +use crate::stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD; use num_traits::Zero; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, @@ -22,6 +19,10 @@ use reth_provider::{ BlockReader, DatabaseProviderRW, ExecutorFactory, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderError, StatsReader, TransactionVariant, }; +use reth_stages_api::{ + BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage, StageError, + UnwindInput, UnwindOutput, +}; use std::{ cmp::Ordering, ops::RangeInclusive, diff --git a/crates/stages/src/stages/finish.rs b/crates/stages/src/stages/finish.rs index e0e0057c3..c7b2f5a8e 100644 --- a/crates/stages/src/stages/finish.rs +++ b/crates/stages/src/stages/finish.rs @@ -1,7 +1,7 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::database::Database; use reth_primitives::stage::{StageCheckpoint, StageId}; use reth_provider::DatabaseProviderRW; +use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; /// The finish stage. /// diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 4afccc77f..051b6a85f 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -1,4 +1,3 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use itertools::Itertools; use reth_config::config::EtlConfig; use reth_db::{ @@ -16,6 +15,7 @@ use reth_primitives::{ Account, B256, }; use reth_provider::{AccountExtReader, DatabaseProviderRW, HashingWriter, StatsReader}; +use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use std::{ fmt::Debug, ops::{Range, RangeInclusive}, diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 54f4b9520..97da1278d 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -1,4 +1,3 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use itertools::Itertools; use reth_config::config::EtlConfig; use reth_db::{ @@ -18,6 +17,7 @@ use reth_primitives::{ BufMut, StorageEntry, B256, }; use reth_provider::{DatabaseProviderRW, HashingWriter, StatsReader, StorageReader}; +use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use std::{ fmt::Debug, sync::mpsc::{self, Receiver}, diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 3d71d8776..a862d4afc 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -1,4 +1,3 @@ -use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use futures_util::StreamExt; use reth_codecs::Compact; use reth_config::config::EtlConfig; @@ -26,6 +25,9 @@ use reth_provider::{ BlockHashReader, DatabaseProviderRW, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HeaderSyncMode, }; +use reth_stages_api::{ + BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput, +}; use std::{ sync::Arc, task::{ready, Context, Poll}, diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 89c77d6e1..d45240651 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -1,5 +1,4 @@ use super::{collect_history_indices, load_history_indices}; -use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_config::config::EtlConfig; use reth_db::{ database::Database, models::ShardedKey, table::Decode, tables, transaction::DbTxMut, @@ -11,6 +10,7 @@ use reth_primitives::{ use reth_provider::{ DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, }; +use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use std::fmt::Debug; use tracing::info; diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index b321f1c56..6d5b6e2ad 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -1,5 +1,4 @@ use super::{collect_history_indices, load_history_indices}; -use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_config::config::EtlConfig; use reth_db::{ database::Database, @@ -15,6 +14,7 @@ use reth_primitives::{ use reth_provider::{ DatabaseProviderRW, HistoryWriter, PruneCheckpointReader, PruneCheckpointWriter, }; +use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use std::fmt::Debug; use tracing::info; diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index e1d651169..9b4eec87f 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -1,4 +1,3 @@ -use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_codecs::Compact; use reth_db::{ database::Database, @@ -15,6 +14,9 @@ use reth_provider::{ DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader, StageCheckpointWriter, StatsReader, }; +use reth_stages_api::{ + BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput, +}; use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress}; use std::fmt::Debug; use tracing::*; diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs index 8d97491f1..3539451f1 100644 --- a/crates/stages/src/stages/mod.rs +++ b/crates/stages/src/stages/mod.rs @@ -40,7 +40,7 @@ use utils::*; #[cfg(test)] mod tests { use super::*; - use crate::{stage::Stage, test_utils::TestStageDB, ExecInput}; + use crate::test_utils::TestStageDB; use alloy_rlp::Decodable; use reth_db::{ cursor::DbCursorRO, @@ -61,6 +61,7 @@ mod tests { StorageReader, }; use reth_revm::EvmProcessorFactory; + use reth_stages_api::{ExecInput, Stage}; use std::sync::Arc; #[tokio::test] diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index afb65c560..04a30cb2e 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -1,4 +1,3 @@ -use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::{ cursor::DbCursorRW, database::Database, @@ -16,6 +15,9 @@ use reth_provider::{ BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader, StatsReader, }; +use reth_stages_api::{ + BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput, +}; use std::{fmt::Debug, ops::Range, sync::mpsc}; use thiserror::Error; use tracing::*; diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 7bdeb4e1a..918be21c5 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -1,4 +1,3 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use num_traits::Zero; use reth_config::config::EtlConfig; use reth_db::{ @@ -18,6 +17,7 @@ use reth_provider::{ BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, StatsReader, TransactionsProvider, TransactionsProviderExt, }; +use reth_stages_api::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use tracing::*; /// The transaction lookup stage. diff --git a/crates/stages/src/stages/utils.rs b/crates/stages/src/stages/utils.rs index e04021090..be0ecc7f6 100644 --- a/crates/stages/src/stages/utils.rs +++ b/crates/stages/src/stages/utils.rs @@ -1,5 +1,4 @@ //! Utils for `stages`. -use crate::StageError; use reth_config::config::EtlConfig; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -10,6 +9,7 @@ use reth_db::{ }; use reth_etl::Collector; use reth_primitives::BlockNumber; +use reth_stages_api::StageError; use std::{collections::HashMap, hash::Hash, ops::RangeBounds}; use tracing::info; diff --git a/crates/stages/src/test_utils/macros.rs b/crates/stages/src/test_utils/macros.rs index 0ffb16f08..0ce346d70 100644 --- a/crates/stages/src/test_utils/macros.rs +++ b/crates/stages/src/test_utils/macros.rs @@ -9,7 +9,7 @@ macro_rules! stage_test_suite { let runner = $runner::default(); // Execute the stage with empty database - let input = crate::stage::ExecInput::default(); + let input = reth_stages_api::ExecInput::default(); // Run stage execution let result = runner.execute(input).await; @@ -34,7 +34,7 @@ macro_rules! stage_test_suite { // Set up the runner let mut runner = $runner::default(); - let input = crate::stage::ExecInput { + let input = reth_stages_api::ExecInput { target: Some(target), checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(current_checkpoint)), }; @@ -67,10 +67,10 @@ macro_rules! stage_test_suite { async fn [< unwind_no_new_entries_ $name>] () { // Set up the runner let mut runner = $runner::default(); - let input = crate::stage::UnwindInput::default(); + let input = reth_stages_api::UnwindInput::default(); // Seed the database - runner.seed_execution(crate::stage::ExecInput::default()).expect("failed to seed"); + runner.seed_execution(reth_stages_api::ExecInput::default()).expect("failed to seed"); runner.before_unwind(input).expect("failed to execute before_unwind hook"); @@ -98,7 +98,7 @@ macro_rules! stage_test_suite { // Set up the runner let mut runner = $runner::default(); - let execute_input = crate::stage::ExecInput { + let execute_input = reth_stages_api::ExecInput { target: Some(target), checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(current_checkpoint)), }; @@ -125,7 +125,7 @@ macro_rules! stage_test_suite { // Run stage unwind - let unwind_input = crate::stage::UnwindInput { + let unwind_input = reth_stages_api::UnwindInput { unwind_to: current_checkpoint, checkpoint: reth_primitives::stage::StageCheckpoint::new(target), bad_block: None, @@ -165,7 +165,7 @@ macro_rules! stage_test_suite_ext { // Set up the runner let mut runner = $runner::default(); - let input = crate::stage::ExecInput { + let input = reth_stages_api::ExecInput { target: Some(current_checkpoint), checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(current_checkpoint)), }; diff --git a/crates/stages/src/test_utils/mod.rs b/crates/stages/src/test_utils/mod.rs index dd788bca7..9e6041755 100644 --- a/crates/stages/src/test_utils/mod.rs +++ b/crates/stages/src/test_utils/mod.rs @@ -15,9 +15,6 @@ pub(crate) use runner::{ mod test_db; pub use test_db::{StorageKind, TestStageDB}; -mod stage; -pub use stage::TestStage; - mod set; pub use set::TestStages; diff --git a/crates/stages/src/test_utils/runner.rs b/crates/stages/src/test_utils/runner.rs index b8ef44084..fd2064ac4 100644 --- a/crates/stages/src/test_utils/runner.rs +++ b/crates/stages/src/test_utils/runner.rs @@ -1,8 +1,10 @@ use super::TestStageDB; -use crate::{ExecInput, ExecOutput, Stage, StageError, StageExt, UnwindInput, UnwindOutput}; use reth_db::{test_utils::TempDatabase, DatabaseEnv}; use reth_interfaces::db::DatabaseError; use reth_provider::ProviderError; +use reth_stages_api::{ + ExecInput, ExecOutput, Stage, StageError, StageExt, UnwindInput, UnwindOutput, +}; use std::sync::Arc; use tokio::sync::oneshot; diff --git a/crates/stages/src/test_utils/set.rs b/crates/stages/src/test_utils/set.rs index c5b149284..f740d4131 100644 --- a/crates/stages/src/test_utils/set.rs +++ b/crates/stages/src/test_utils/set.rs @@ -1,6 +1,7 @@ -use super::{TestStage, TEST_STAGE_ID}; -use crate::{ExecOutput, StageError, StageSet, StageSetBuilder, UnwindOutput}; +use super::TEST_STAGE_ID; +use crate::{StageSet, StageSetBuilder}; use reth_db::database::Database; +use reth_stages_api::{test_utils::TestStage, ExecOutput, StageError, UnwindOutput}; use std::collections::VecDeque; #[derive(Default, Debug)]