From 2478c9f11c02c64ccc7ce932cb6b52dc6a3a43a3 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Mon, 29 May 2023 22:22:49 +0300 Subject: [PATCH] chore(stage): cleanup stage id (#2898) --- bin/reth/src/debug_cmd/execution.rs | 8 +- bin/reth/src/debug_cmd/merkle.rs | 49 ++++--- bin/reth/src/node/events.rs | 4 +- bin/reth/src/node/mod.rs | 18 ++- bin/reth/src/stage/drop.rs | 37 +++-- bin/reth/src/stage/dump/execution.rs | 6 +- bin/reth/src/stage/dump/hashing_account.rs | 6 +- bin/reth/src/stage/dump/hashing_storage.rs | 6 +- bin/reth/src/stage/dump/merkle.rs | 8 +- bin/reth/src/stage/run.rs | 9 +- crates/consensus/auto-seal/src/task.rs | 8 +- crates/consensus/beacon/src/engine/mod.rs | 21 ++- crates/primitives/src/lib.rs | 1 + crates/primitives/src/stage/id.rs | 108 ++++++++++++++ crates/primitives/src/stage/mod.rs | 4 + crates/staged-sync/src/utils/init.rs | 5 +- crates/stages/benches/criterion.rs | 6 +- .../stages/benches/setup/account_hashing.rs | 8 +- crates/stages/src/id.rs | 133 ------------------ crates/stages/src/lib.rs | 2 - crates/stages/src/pipeline/builder.rs | 4 +- crates/stages/src/pipeline/event.rs | 7 +- crates/stages/src/pipeline/mod.rs | 123 ++++++++-------- crates/stages/src/pipeline/set.rs | 3 +- crates/stages/src/pipeline/sync_metrics.rs | 5 +- crates/stages/src/prelude.rs | 1 - crates/stages/src/stage.rs | 4 +- crates/stages/src/stages/bodies.rs | 9 +- crates/stages/src/stages/execution.rs | 9 +- crates/stages/src/stages/finish.rs | 9 +- crates/stages/src/stages/hashing_account.rs | 9 +- crates/stages/src/stages/hashing_storage.rs | 11 +- crates/stages/src/stages/headers.rs | 10 +- .../src/stages/index_account_history.rs | 9 +- .../src/stages/index_storage_history.rs | 9 +- crates/stages/src/stages/merkle.rs | 26 ++-- crates/stages/src/stages/sender_recovery.rs | 11 +- crates/stages/src/stages/total_difficulty.rs | 9 +- crates/stages/src/stages/tx_lookup.rs | 10 +- crates/stages/src/test_utils/mod.rs | 6 +- crates/stages/src/test_utils/stage.rs | 3 +- .../provider/src/providers/database.rs | 20 ++- crates/storage/provider/src/transaction.rs | 20 ++- testing/ef-tests/src/cases/blockchain_test.rs | 9 +- 44 files changed, 397 insertions(+), 386 deletions(-) create mode 100644 crates/primitives/src/stage/id.rs create mode 100644 crates/primitives/src/stage/mod.rs delete mode 100644 crates/stages/src/id.rs diff --git a/bin/reth/src/debug_cmd/execution.rs b/bin/reth/src/debug_cmd/execution.rs index ecaff0885..1fcd0daeb 100644 --- a/bin/reth/src/debug_cmd/execution.rs +++ b/bin/reth/src/debug_cmd/execution.rs @@ -25,8 +25,8 @@ use reth_interfaces::{ }; use reth_network::NetworkHandle; use reth_network_api::NetworkInfo; -use reth_primitives::{BlockHashOrNumber, BlockNumber, ChainSpec, H256}; -use reth_provider::{ShareableDatabase, Transaction}; +use reth_primitives::{stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, H256}; +use reth_provider::{providers::get_stage_checkpoint, ShareableDatabase, Transaction}; use reth_staged_sync::utils::{ chainspec::genesis_value_parser, init::{init_db, init_genesis}, @@ -35,7 +35,7 @@ use reth_stages::{ sets::DefaultStages, stages::{ ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage, - TotalDifficultyStage, FINISH, + TotalDifficultyStage, }, Pipeline, StageSet, }; @@ -250,7 +250,7 @@ impl Command { .spawn_critical("events task", events::handle_events(Some(network.clone()), events)); let latest_block_number = - FINISH.get_checkpoint(&db.tx()?)?.unwrap_or_default().block_number; + get_stage_checkpoint(&db.tx()?, StageId::Finish)?.unwrap_or_default().block_number; if latest_block_number >= self.to { info!(target: "reth::cli", latest = latest_block_number, "Nothing to run"); return Ok(()) diff --git a/bin/reth/src/debug_cmd/merkle.rs b/bin/reth/src/debug_cmd/merkle.rs index bdf497234..4f50090e0 100644 --- a/bin/reth/src/debug_cmd/merkle.rs +++ b/bin/reth/src/debug_cmd/merkle.rs @@ -2,18 +2,17 @@ use crate::dirs::{DataDirPath, MaybePlatformPath}; use clap::Parser; use reth_db::{cursor::DbCursorRO, tables, transaction::DbTx}; -use reth_primitives::{ChainSpec, StageCheckpoint}; +use reth_primitives::{stage::StageId, ChainSpec, StageCheckpoint}; use reth_provider::Transaction; use reth_staged_sync::utils::{chainspec::genesis_value_parser, init::init_db}; use reth_stages::{ stages::{ AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, - StorageHashingStage, ACCOUNT_HASHING, EXECUTION, MERKLE_EXECUTION, SENDER_RECOVERY, - STORAGE_HASHING, + StorageHashingStage, }, ExecInput, Stage, }; -use std::{ops::Deref, sync::Arc}; +use std::sync::Arc; /// `reth merkle-debug` command #[derive(Debug, Parser)] @@ -66,21 +65,22 @@ impl Command { let mut tx = Transaction::new(db.as_ref())?; let execution_checkpoint_block = - EXECUTION.get_checkpoint(tx.deref())?.unwrap_or_default().block_number; + tx.get_stage_checkpoint(StageId::Execution)?.unwrap_or_default().block_number; assert!(execution_checkpoint_block < self.to, "Nothing to run"); // Check if any of hashing or merkle stages aren't on the same block number as // Execution stage or have any intermediate progress. - let should_reset_stages = [ACCOUNT_HASHING, STORAGE_HASHING, MERKLE_EXECUTION] - .into_iter() - .map(|stage| stage.get_checkpoint(tx.deref())) - .collect::, _>>()? - .into_iter() - .map(Option::unwrap_or_default) - .any(|checkpoint| { - checkpoint.block_number != execution_checkpoint_block || - checkpoint.stage_checkpoint.is_some() - }); + let should_reset_stages = + [StageId::AccountHashing, StageId::StorageHashing, StageId::MerkleExecute] + .into_iter() + .map(|stage_id| tx.get_stage_checkpoint(stage_id)) + .collect::, _>>()? + .into_iter() + .map(Option::unwrap_or_default) + .any(|checkpoint| { + checkpoint.block_number != execution_checkpoint_block || + checkpoint.stage_checkpoint.is_some() + }); let factory = reth_revm::Factory::new(self.chain.clone()); let mut execution_stage = ExecutionStage::new( @@ -109,7 +109,10 @@ impl Command { .execute( &mut tx, ExecInput { - previous_stage: Some((SENDER_RECOVERY, StageCheckpoint::new(block))), + previous_stage: Some(( + StageId::SenderRecovery, + StageCheckpoint::new(block), + )), checkpoint: block.checked_sub(1).map(StageCheckpoint::new), }, ) @@ -121,7 +124,7 @@ impl Command { .execute( &mut tx, ExecInput { - previous_stage: Some((EXECUTION, StageCheckpoint::new(block))), + previous_stage: Some((StageId::Execution, StageCheckpoint::new(block))), checkpoint: progress.map(StageCheckpoint::new), }, ) @@ -135,7 +138,10 @@ impl Command { .execute( &mut tx, ExecInput { - previous_stage: Some((ACCOUNT_HASHING, StageCheckpoint::new(block))), + previous_stage: Some(( + StageId::AccountHashing, + StageCheckpoint::new(block), + )), checkpoint: progress.map(StageCheckpoint::new), }, ) @@ -147,7 +153,10 @@ impl Command { .execute( &mut tx, ExecInput { - previous_stage: Some((STORAGE_HASHING, StageCheckpoint::new(block))), + previous_stage: Some(( + StageId::StorageHashing, + StageCheckpoint::new(block), + )), checkpoint: progress.map(StageCheckpoint::new), }, ) @@ -165,7 +174,7 @@ impl Command { .collect::, _>>()?; let clean_input = ExecInput { - previous_stage: Some((STORAGE_HASHING, StageCheckpoint::new(block))), + previous_stage: Some((StageId::StorageHashing, StageCheckpoint::new(block))), checkpoint: None, }; loop { diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index bf5782382..d6ce1b5d1 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -4,8 +4,8 @@ use futures::Stream; use reth_beacon_consensus::BeaconConsensusEngineEvent; use reth_network::{NetworkEvent, NetworkHandle}; use reth_network_api::PeersInfo; -use reth_primitives::StageCheckpoint; -use reth_stages::{ExecOutput, PipelineEvent, StageId}; +use reth_primitives::{stage::StageId, StageCheckpoint}; +use reth_stages::{ExecOutput, PipelineEvent}; use std::{ future::Future, pin::Pin, diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 41102a4e3..54ae7113d 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -41,8 +41,13 @@ use reth_interfaces::{ }; use reth_network::{error::NetworkError, NetworkConfig, NetworkHandle, NetworkManager}; use reth_network_api::NetworkInfo; -use reth_primitives::{BlockHashOrNumber, ChainSpec, Head, Header, SealedHeader, H256}; -use reth_provider::{BlockProvider, CanonStateSubscriptions, HeaderProvider, ShareableDatabase}; +use reth_primitives::{ + stage::StageId, BlockHashOrNumber, ChainSpec, Head, Header, SealedHeader, H256, +}; +use reth_provider::{ + providers::get_stage_checkpoint, BlockProvider, CanonStateSubscriptions, HeaderProvider, + ShareableDatabase, +}; use reth_revm::Factory; use reth_revm_inspectors::stack::Hook; use reth_rpc_engine_api::EngineApi; @@ -55,7 +60,7 @@ use reth_stages::{ prelude::*, stages::{ ExecutionStage, ExecutionStageThresholds, HeaderSyncMode, SenderRecoveryStage, - TotalDifficultyStage, FINISH, + TotalDifficultyStage, }, }; use reth_tasks::TaskExecutor; @@ -75,7 +80,6 @@ use reth_payload_builder::PayloadBuilderService; use reth_primitives::bytes::BytesMut; use reth_provider::providers::BlockchainProvider; use reth_rlp::Encodable; -use reth_stages::stages::{MERKLE_EXECUTION, MERKLE_UNWIND}; pub mod events; @@ -506,7 +510,7 @@ impl Command { db: Arc>, ) -> Result { db.view(|tx| { - let head = FINISH.get_checkpoint(tx)?.unwrap_or_default().block_number; + let head = get_stage_checkpoint(tx, StageId::Finish)?.unwrap_or_default().block_number; let header = tx .get::(head)? .expect("the header for the latest block is missing, database is corrupt"); @@ -681,8 +685,8 @@ impl Command { max_changesets: stage_conf.execution.max_changesets, }, )) - .disable_if(MERKLE_UNWIND, || self.auto_mine) - .disable_if(MERKLE_EXECUTION, || self.auto_mine), + .disable_if(StageId::MerkleUnwind, || self.auto_mine) + .disable_if(StageId::MerkleExecute, || self.auto_mine), ) .build(db); diff --git a/bin/reth/src/stage/drop.rs b/bin/reth/src/stage/drop.rs index 29cc9628d..db5c304f4 100644 --- a/bin/reth/src/stage/drop.rs +++ b/bin/reth/src/stage/drop.rs @@ -11,12 +11,8 @@ use reth_db::{ tables, transaction::DbTxMut, }; -use reth_primitives::ChainSpec; +use reth_primitives::{stage::StageId, ChainSpec}; use reth_staged_sync::utils::{chainspec::genesis_value_parser, init::insert_genesis_state}; -use reth_stages::stages::{ - ACCOUNT_HASHING, EXECUTION, INDEX_ACCOUNT_HISTORY, INDEX_STORAGE_HISTORY, MERKLE_EXECUTION, - MERKLE_UNWIND, STORAGE_HASHING, -}; use std::sync::Arc; use tracing::info; @@ -73,7 +69,10 @@ impl Command { tx.clear::()?; tx.clear::()?; tx.clear::()?; - tx.put::(EXECUTION.0.to_string(), Default::default())?; + tx.put::( + StageId::Execution.to_string(), + Default::default(), + )?; insert_genesis_state::>(tx, self.chain.genesis())?; Ok::<_, eyre::Error>(()) })??; @@ -82,11 +81,17 @@ impl Command { tool.db.update(|tx| { // Clear hashed accounts tx.clear::()?; - tx.put::(ACCOUNT_HASHING.0.to_string(), Default::default())?; + tx.put::( + StageId::AccountHashing.to_string(), + Default::default(), + )?; // Clear hashed storages tx.clear::()?; - tx.put::(STORAGE_HASHING.0.to_string(), Default::default())?; + tx.put::( + StageId::StorageHashing.to_string(), + Default::default(), + )?; Ok::<_, eyre::Error>(()) })??; @@ -96,11 +101,17 @@ impl Command { tx.clear::()?; tx.clear::()?; tx.put::( - MERKLE_EXECUTION.0.to_string(), + StageId::MerkleExecute.to_string(), Default::default(), )?; - tx.put::(MERKLE_UNWIND.0.to_string(), Default::default())?; - tx.delete::(MERKLE_EXECUTION.0.into(), None)?; + tx.put::( + StageId::MerkleUnwind.to_string(), + Default::default(), + )?; + tx.delete::( + StageId::MerkleExecute.to_string(), + None, + )?; Ok::<_, eyre::Error>(()) })??; } @@ -109,11 +120,11 @@ impl Command { tx.clear::()?; tx.clear::()?; tx.put::( - INDEX_ACCOUNT_HISTORY.0.to_string(), + StageId::IndexAccountHistory.to_string(), Default::default(), )?; tx.put::( - INDEX_STORAGE_HISTORY.0.to_string(), + StageId::IndexStorageHistory.to_string(), Default::default(), )?; Ok::<_, eyre::Error>(()) diff --git a/bin/reth/src/stage/dump/execution.rs b/bin/reth/src/stage/dump/execution.rs index 8a84d8d7d..2f4e581f1 100644 --- a/bin/reth/src/stage/dump/execution.rs +++ b/bin/reth/src/stage/dump/execution.rs @@ -4,9 +4,9 @@ use eyre::Result; use reth_db::{ cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx, }; -use reth_primitives::{StageCheckpoint, MAINNET}; +use reth_primitives::{stage::StageId, StageCheckpoint, MAINNET}; use reth_provider::Transaction; -use reth_stages::{stages::ExecutionStage, Stage, StageId, UnwindInput}; +use reth_stages::{stages::ExecutionStage, Stage, UnwindInput}; use std::{ops::DerefMut, path::PathBuf, sync::Arc}; use tracing::info; @@ -136,7 +136,7 @@ async fn dry_run( .execute( &mut tx, reth_stages::ExecInput { - previous_stage: Some((StageId("Another"), StageCheckpoint::new(to))), + previous_stage: Some((StageId::Other("Another"), StageCheckpoint::new(to))), checkpoint: Some(StageCheckpoint::new(from)), }, ) diff --git a/bin/reth/src/stage/dump/hashing_account.rs b/bin/reth/src/stage/dump/hashing_account.rs index 46ab8cd85..52e25b0f5 100644 --- a/bin/reth/src/stage/dump/hashing_account.rs +++ b/bin/reth/src/stage/dump/hashing_account.rs @@ -2,9 +2,9 @@ use super::setup; use crate::utils::DbTool; use eyre::Result; use reth_db::{database::Database, table::TableImporter, tables}; -use reth_primitives::{BlockNumber, StageCheckpoint}; +use reth_primitives::{stage::StageId, BlockNumber, StageCheckpoint}; use reth_provider::Transaction; -use reth_stages::{stages::AccountHashingStage, Stage, StageId, UnwindInput}; +use reth_stages::{stages::AccountHashingStage, Stage, UnwindInput}; use std::{ops::DerefMut, path::PathBuf}; use tracing::info; @@ -80,7 +80,7 @@ async fn dry_run( .execute( &mut tx, reth_stages::ExecInput { - previous_stage: Some((StageId("Another"), StageCheckpoint::new(to))), + previous_stage: Some((StageId::Other("Another"), StageCheckpoint::new(to))), checkpoint: Some(StageCheckpoint::new(from)), }, ) diff --git a/bin/reth/src/stage/dump/hashing_storage.rs b/bin/reth/src/stage/dump/hashing_storage.rs index 88d09b6a3..ec8311754 100644 --- a/bin/reth/src/stage/dump/hashing_storage.rs +++ b/bin/reth/src/stage/dump/hashing_storage.rs @@ -2,9 +2,9 @@ use super::setup; use crate::utils::DbTool; use eyre::Result; use reth_db::{database::Database, table::TableImporter, tables}; -use reth_primitives::StageCheckpoint; +use reth_primitives::{stage::StageId, StageCheckpoint}; use reth_provider::Transaction; -use reth_stages::{stages::StorageHashingStage, Stage, StageId, UnwindInput}; +use reth_stages::{stages::StorageHashingStage, Stage, UnwindInput}; use std::{ops::DerefMut, path::PathBuf}; use tracing::info; @@ -77,7 +77,7 @@ async fn dry_run( .execute( &mut tx, reth_stages::ExecInput { - previous_stage: Some((StageId("Another"), StageCheckpoint::new(to))), + previous_stage: Some((StageId::Other("Another"), StageCheckpoint::new(to))), checkpoint: Some(StageCheckpoint::new(from)), }, ) diff --git a/bin/reth/src/stage/dump/merkle.rs b/bin/reth/src/stage/dump/merkle.rs index 9913d2e18..0895db9a3 100644 --- a/bin/reth/src/stage/dump/merkle.rs +++ b/bin/reth/src/stage/dump/merkle.rs @@ -2,14 +2,14 @@ use super::setup; use crate::utils::DbTool; use eyre::Result; use reth_db::{database::Database, table::TableImporter, tables}; -use reth_primitives::{BlockNumber, StageCheckpoint, MAINNET}; +use reth_primitives::{stage::StageId, BlockNumber, StageCheckpoint, MAINNET}; use reth_provider::Transaction; use reth_stages::{ stages::{ AccountHashingStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, StorageHashingStage, }, - Stage, StageId, UnwindInput, + Stage, UnwindInput, }; use std::{ops::DerefMut, path::PathBuf, sync::Arc}; use tracing::info; @@ -55,7 +55,7 @@ async fn unwind_and_copy( bad_block: None, }; let execute_input = reth_stages::ExecInput { - previous_stage: Some((StageId("Another"), StageCheckpoint::new(to))), + previous_stage: Some((StageId::Other("Another"), StageCheckpoint::new(to))), checkpoint: Some(StageCheckpoint::new(from)), }; @@ -129,7 +129,7 @@ async fn dry_run( .execute( &mut tx, reth_stages::ExecInput { - previous_stage: Some((StageId("Another"), StageCheckpoint::new(to))), + previous_stage: Some((StageId::Other("Another"), StageCheckpoint::new(to))), checkpoint: Some(StageCheckpoint::new(from)), }, ) diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index fe05679ae..067a3a29e 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -11,7 +11,7 @@ use clap::Parser; use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; -use reth_primitives::{ChainSpec, StageCheckpoint}; +use reth_primitives::{stage::StageId, ChainSpec, StageCheckpoint}; use reth_provider::{ShareableDatabase, Transaction}; use reth_staged_sync::utils::{chainspec::chain_spec_value_parser, init::init_db}; use reth_stages::{ @@ -19,7 +19,7 @@ use reth_stages::{ BodyStage, ExecutionStage, ExecutionStageThresholds, MerkleStage, SenderRecoveryStage, TransactionLookupStage, }, - ExecInput, ExecOutput, Stage, StageId, UnwindInput, + ExecInput, ExecOutput, Stage, UnwindInput, }; use std::{net::SocketAddr, path::PathBuf, sync::Arc}; use tracing::*; @@ -199,7 +199,10 @@ impl Command { let unwind_stage = unwind_stage.as_mut().unwrap_or(&mut exec_stage); let mut input = ExecInput { - previous_stage: Some((StageId("No Previous Stage"), StageCheckpoint::new(self.to))), + previous_stage: Some(( + StageId::Other("No Previous Stage"), + StageCheckpoint::new(self.to), + )), checkpoint: Some(StageCheckpoint::new(self.from)), }; diff --git a/crates/consensus/auto-seal/src/task.rs b/crates/consensus/auto-seal/src/task.rs index 515a29a29..c9e7825cd 100644 --- a/crates/consensus/auto-seal/src/task.rs +++ b/crates/consensus/auto-seal/src/task.rs @@ -4,7 +4,9 @@ use reth_beacon_consensus::BeaconEngineMessage; use reth_interfaces::consensus::ForkchoiceState; use reth_primitives::{ constants::{EMPTY_RECEIPTS, EMPTY_TRANSACTIONS}, - proofs, Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom, + proofs, + stage::StageId, + Block, BlockBody, ChainSpec, Header, IntoRecoveredTransaction, ReceiptWithBloom, SealedBlockWithSenders, EMPTY_OMMER_ROOT, U256, }; use reth_provider::{CanonChainTracker, CanonStateNotificationSender, Chain, StateProviderFactory}; @@ -12,7 +14,7 @@ use reth_revm::{ database::{State, SubState}, executor::Executor, }; -use reth_stages::{stages::FINISH, PipelineEvent}; +use reth_stages::PipelineEvent; use reth_transaction_pool::{TransactionPool, ValidPoolTransaction}; use std::{ collections::VecDeque, @@ -233,7 +235,7 @@ where if let Some(PipelineEvent::Running { stage_id, .. }) = events.next().await { - if stage_id == FINISH { + if stage_id == StageId::Finish { debug!(target: "consensus::auto", "received finish stage event"); break } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index bd2d45125..a452b163b 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -17,14 +17,17 @@ use reth_interfaces::{ }; use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle}; use reth_primitives::{ - listener::EventListeners, BlockNumber, Head, Header, SealedBlock, SealedHeader, H256, U256, + listener::EventListeners, stage::StageId, BlockNumber, Head, Header, SealedBlock, SealedHeader, + H256, U256, +}; +use reth_provider::{ + providers::get_stage_checkpoint, BlockProvider, BlockSource, CanonChainTracker, ProviderError, }; -use reth_provider::{BlockProvider, BlockSource, CanonChainTracker, ProviderError}; use reth_rpc_types::engine::{ ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum, PayloadValidationError, }; -use reth_stages::{stages::FINISH, Pipeline}; +use reth_stages::Pipeline; use reth_tasks::TaskSpawner; use schnellru::{ByLength, LruMap}; use std::{ @@ -405,7 +408,9 @@ where debug!(target: "consensus::engine", hash=?state.head_block_hash, number=header.number, "canonicalized new head"); let pipeline_min_progress = - FINISH.get_checkpoint(&self.db.tx()?)?.unwrap_or_default().block_number; + get_stage_checkpoint(&self.db.tx()?, StageId::Finish)? + .unwrap_or_default() + .block_number; if pipeline_min_progress < header.number { debug!(target: "consensus::engine", last_finished=pipeline_min_progress, head_number=header.number, "pipeline run to head required"); @@ -1282,6 +1287,7 @@ mod tests { mod fork_choice_updated { use super::*; + use reth_db::transaction::DbTxMut; use reth_interfaces::test_utils::generators::random_block; use reth_rpc_types::engine::ForkchoiceUpdateError; @@ -1338,7 +1344,12 @@ mod tests { let block1 = random_block(1, Some(genesis.hash), None, Some(0)); insert_blocks(env.db.as_ref(), [&genesis, &block1].into_iter()); env.db - .update(|tx| FINISH.save_checkpoint(tx, StageCheckpoint::new(block1.number))) + .update(|tx| { + tx.put::( + StageId::Finish.to_string(), + StageCheckpoint::new(block1.number), + ) + }) .unwrap() .unwrap(); diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 5e59fdc55..42cb8619c 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -31,6 +31,7 @@ mod log; mod net; mod peer; mod receipt; +pub mod stage; mod storage; mod transaction; pub mod trie; diff --git a/crates/primitives/src/stage/id.rs b/crates/primitives/src/stage/id.rs new file mode 100644 index 000000000..fb0ce846b --- /dev/null +++ b/crates/primitives/src/stage/id.rs @@ -0,0 +1,108 @@ +/// Stage IDs for all known stages. +/// +/// For custom stages, use [`StageId::Other`] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[allow(missing_docs)] +pub enum StageId { + Headers, + Bodies, + SenderRecovery, + TotalDifficulty, + AccountHashing, + StorageHashing, + IndexAccountHistory, + IndexStorageHistory, + MerkleExecute, + MerkleUnwind, + Execution, + TransactionLookup, + Finish, + Other(&'static str), +} + +impl StageId { + /// All supported Stages + pub const ALL: [StageId; 13] = [ + StageId::Headers, + StageId::Bodies, + StageId::SenderRecovery, + StageId::TotalDifficulty, + StageId::AccountHashing, + StageId::StorageHashing, + StageId::IndexAccountHistory, + StageId::IndexStorageHistory, + StageId::MerkleExecute, + StageId::MerkleUnwind, + StageId::Execution, + StageId::TransactionLookup, + StageId::Finish, + ]; + + /// Return stage id formatted as string. + pub fn as_str(&self) -> &str { + match self { + StageId::Headers => "Headers", + StageId::Bodies => "Bodies", + StageId::SenderRecovery => "SenderRecovery", + StageId::TotalDifficulty => "TotalDifficulty", + StageId::AccountHashing => "AccountHashing", + StageId::StorageHashing => "StorageHashing", + StageId::IndexAccountHistory => "IndexAccountHistory", + StageId::IndexStorageHistory => "IndexStorageHistory", + StageId::MerkleExecute => "MerkleExecute", + StageId::MerkleUnwind => "MerkleUnwind", + StageId::Execution => "Execution", + StageId::TransactionLookup => "TransactionLookup", + StageId::Finish => "Finish", + StageId::Other(s) => s, + } + } + + /// Returns true if it's a downloading stage [StageId::Headers] or [StageId::Bodies] + pub fn is_downloading_stage(&self) -> bool { + matches!(self, StageId::Headers | StageId::Bodies) + } + + /// Returns true indicating if it's the finish stage [StageId::Finish] + pub fn is_finish(&self) -> bool { + matches!(self, StageId::Finish) + } +} + +impl std::fmt::Display for StageId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.as_str()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn stage_id_as_string() { + assert_eq!(StageId::Headers.to_string(), "Headers"); + assert_eq!(StageId::TotalDifficulty.to_string(), "TotalDifficulty"); + assert_eq!(StageId::Bodies.to_string(), "Bodies"); + assert_eq!(StageId::SenderRecovery.to_string(), "SenderRecovery"); + assert_eq!(StageId::Execution.to_string(), "Execution"); + assert_eq!(StageId::MerkleUnwind.to_string(), "MerkleUnwind"); + assert_eq!(StageId::AccountHashing.to_string(), "AccountHashing"); + assert_eq!(StageId::StorageHashing.to_string(), "StorageHashing"); + assert_eq!(StageId::MerkleExecute.to_string(), "MerkleExecute"); + assert_eq!(StageId::IndexAccountHistory.to_string(), "IndexAccountHistory"); + assert_eq!(StageId::IndexStorageHistory.to_string(), "IndexStorageHistory"); + assert_eq!(StageId::TransactionLookup.to_string(), "TransactionLookup"); + assert_eq!(StageId::Finish.to_string(), "Finish"); + + assert_eq!(StageId::Other("Foo").to_string(), "Foo"); + } + + #[test] + fn is_downloading_stage() { + assert!(StageId::Headers.is_downloading_stage()); + assert!(StageId::Bodies.is_downloading_stage()); + + assert!(!StageId::Execution.is_downloading_stage()); + } +} diff --git a/crates/primitives/src/stage/mod.rs b/crates/primitives/src/stage/mod.rs new file mode 100644 index 000000000..f76f901fb --- /dev/null +++ b/crates/primitives/src/stage/mod.rs @@ -0,0 +1,4 @@ +//! Staged sync primitives. + +mod id; +pub use id::StageId; diff --git a/crates/staged-sync/src/utils/init.rs b/crates/staged-sync/src/utils/init.rs index 958329478..782c554a1 100644 --- a/crates/staged-sync/src/utils/init.rs +++ b/crates/staged-sync/src/utils/init.rs @@ -5,9 +5,8 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; -use reth_primitives::{Account, Bytecode, ChainSpec, H256, U256}; +use reth_primitives::{stage::StageId, Account, Bytecode, ChainSpec, H256, U256}; use reth_provider::{PostState, Transaction, TransactionError}; -use reth_stages::StageKind; use std::{path::Path, sync::Arc}; use tracing::debug; @@ -81,7 +80,7 @@ pub fn init_genesis( insert_genesis_state::(&tx, genesis)?; // insert sync stage - for stage in StageKind::ALL.iter() { + for stage in StageId::ALL.iter() { tx.put::(stage.to_string(), Default::default())?; } diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index fb169a451..147eb4a2c 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -5,11 +5,11 @@ use criterion::{ use pprof::criterion::{Output, PProfProfiler}; use reth_db::mdbx::{Env, WriteMap}; use reth_interfaces::test_utils::TestConsensus; -use reth_primitives::StageCheckpoint; +use reth_primitives::{stage::StageId, StageCheckpoint}; use reth_stages::{ stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, test_utils::TestTransaction, - ExecInput, Stage, StageId, UnwindInput, + ExecInput, Stage, UnwindInput, }; use std::{path::PathBuf, sync::Arc}; @@ -163,7 +163,7 @@ fn measure_stage( ( ExecInput { previous_stage: Some(( - StageId("Another"), + StageId::Other("Another"), StageCheckpoint::new(block_interval.end), )), checkpoint: Some(StageCheckpoint::new(block_interval.start)), diff --git a/crates/stages/benches/setup/account_hashing.rs b/crates/stages/benches/setup/account_hashing.rs index 71c9a9275..053dd960f 100644 --- a/crates/stages/benches/setup/account_hashing.rs +++ b/crates/stages/benches/setup/account_hashing.rs @@ -2,11 +2,11 @@ use super::{constants, StageRange}; use reth_db::{ cursor::DbCursorRO, database::Database, tables, transaction::DbTx, DatabaseError as DbError, }; -use reth_primitives::StageCheckpoint; +use reth_primitives::{stage::StageId, StageCheckpoint}; use reth_stages::{ stages::{AccountHashingStage, SeedOpts}, test_utils::TestTransaction, - ExecInput, StageId, UnwindInput, + ExecInput, UnwindInput, }; use std::path::{Path, PathBuf}; @@ -40,7 +40,7 @@ fn find_stage_range(db: &Path) -> StageRange { stage_range = Some(( ExecInput { - previous_stage: Some((StageId("Another"), to)), + previous_stage: Some((StageId::Other("Another"), to)), checkpoint: Some(StageCheckpoint::new(from)), }, UnwindInput { unwind_to: from, checkpoint: to, bad_block: None }, @@ -70,7 +70,7 @@ fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) { path, ( ExecInput { - previous_stage: Some((StageId("Another"), StageCheckpoint::new(num_blocks))), + previous_stage: Some((StageId::Other("Another"), StageCheckpoint::new(num_blocks))), ..Default::default() }, UnwindInput::default(), diff --git a/crates/stages/src/id.rs b/crates/stages/src/id.rs deleted file mode 100644 index bd66e1e61..000000000 --- a/crates/stages/src/id.rs +++ /dev/null @@ -1,133 +0,0 @@ -use crate::stages::{ - ACCOUNT_HASHING, BODIES, EXECUTION, FINISH, HEADERS, INDEX_ACCOUNT_HISTORY, - INDEX_STORAGE_HISTORY, MERKLE_EXECUTION, MERKLE_UNWIND, SENDER_RECOVERY, TOTAL_DIFFICULTY, - TRANSACTION_LOOKUP, -}; -use reth_db::{ - tables::SyncStage, - transaction::{DbTx, DbTxMut}, - DatabaseError as DbError, -}; -use reth_primitives::StageCheckpoint; -use std::fmt::Display; - -/// All known stages -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -#[allow(missing_docs)] -pub enum StageKind { - Headers, - Bodies, - SenderRecovery, - TotalDifficulty, - AccountHashing, - StorageHashing, - IndexAccountHistory, - IndexStorageHistory, - MerkleExecution, - MerkleUnwind, - Execution, - TransactionLookup, - Finish, -} - -impl StageKind { - /// All supported Stages - pub const ALL: [StageKind; 13] = [ - StageKind::Headers, - StageKind::Bodies, - StageKind::SenderRecovery, - StageKind::TotalDifficulty, - StageKind::AccountHashing, - StageKind::StorageHashing, - StageKind::IndexAccountHistory, - StageKind::IndexStorageHistory, - StageKind::MerkleExecution, - StageKind::MerkleUnwind, - StageKind::Execution, - StageKind::TransactionLookup, - StageKind::Finish, - ]; - - /// Returns the ID of this stage. - pub fn id(&self) -> StageId { - match self { - StageKind::Headers => HEADERS, - StageKind::Bodies => BODIES, - StageKind::SenderRecovery => SENDER_RECOVERY, - StageKind::TotalDifficulty => TOTAL_DIFFICULTY, - StageKind::AccountHashing => ACCOUNT_HASHING, - StageKind::StorageHashing => ACCOUNT_HASHING, - StageKind::IndexAccountHistory => INDEX_ACCOUNT_HISTORY, - StageKind::IndexStorageHistory => INDEX_STORAGE_HISTORY, - StageKind::MerkleExecution => MERKLE_EXECUTION, - StageKind::MerkleUnwind => MERKLE_UNWIND, - StageKind::Execution => EXECUTION, - StageKind::TransactionLookup => TRANSACTION_LOOKUP, - StageKind::Finish => FINISH, - } - } -} - -impl Display for StageKind { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.id()) - } -} - -/// The ID of a stage. -/// -/// Each stage ID must be unique. -#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] -pub struct StageId(pub &'static str); - -impl Display for StageId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl StageId { - /// Returns true if it's a downloading stage [HEADERS] or [BODIES - pub fn is_downloading_stage(&self) -> bool { - *self == HEADERS || *self == BODIES - } - - /// Returns true indicating if it's the finish stage [FINISH] - pub fn is_finish(&self) -> bool { - *self == FINISH - } - - /// Get the last committed progress of this stage. - pub fn get_checkpoint<'db>( - &self, - tx: &impl DbTx<'db>, - ) -> Result, DbError> { - tx.get::(self.0.to_string()) - } - - /// Save the progress of this stage. - pub fn save_checkpoint<'db>( - &self, - tx: &impl DbTxMut<'db>, - checkpoint: StageCheckpoint, - ) -> Result<(), DbError> { - tx.put::(self.0.to_string(), checkpoint) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn stage_id_display() { - assert_eq!(StageId("foo").to_string(), "foo"); - assert_eq!(StageId("bar").to_string(), "bar"); - } - - #[test] - fn is_downloading_stage() { - assert!(HEADERS.is_downloading_stage()); - assert!(BODIES.is_downloading_stage()); - } -} diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 1bdd46e7f..5fd974367 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -54,7 +54,6 @@ //! .build(db); //! ``` mod error; -mod id; mod pipeline; mod stage; mod util; @@ -72,6 +71,5 @@ pub mod stages; pub mod sets; pub use error::*; -pub use id::*; pub use pipeline::*; pub use stage::*; diff --git a/crates/stages/src/pipeline/builder.rs b/crates/stages/src/pipeline/builder.rs index ace8e987f..6994ebe57 100644 --- a/crates/stages/src/pipeline/builder.rs +++ b/crates/stages/src/pipeline/builder.rs @@ -1,6 +1,6 @@ -use crate::{pipeline::BoxedStage, Pipeline, Stage, StageId, StageSet}; +use crate::{pipeline::BoxedStage, Pipeline, Stage, StageSet}; use reth_db::database::Database; -use reth_primitives::{BlockNumber, H256}; +use reth_primitives::{stage::StageId, BlockNumber, H256}; use tokio::sync::watch; /// Builds a [`Pipeline`]. diff --git a/crates/stages/src/pipeline/event.rs b/crates/stages/src/pipeline/event.rs index 695bc46ac..a724b972a 100644 --- a/crates/stages/src/pipeline/event.rs +++ b/crates/stages/src/pipeline/event.rs @@ -1,8 +1,5 @@ -use crate::{ - id::StageId, - stage::{ExecOutput, UnwindInput, UnwindOutput}, -}; -use reth_primitives::StageCheckpoint; +use crate::stage::{ExecOutput, UnwindInput, UnwindOutput}; +use reth_primitives::{stage::StageId, StageCheckpoint}; /// An event emitted by a [Pipeline][crate::Pipeline]. /// diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 7a3b3b7da..bcfbbb626 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -1,9 +1,11 @@ -use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput}; +use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, UnwindInput}; use futures_util::Future; use reth_db::database::Database; -use reth_primitives::{listener::EventListeners, BlockNumber, StageCheckpoint, H256}; -use reth_provider::Transaction; -use std::{ops::Deref, pin::Pin}; +use reth_primitives::{ + listener::EventListeners, stage::StageId, BlockNumber, StageCheckpoint, H256, +}; +use reth_provider::{providers::get_stage_checkpoint, Transaction}; +use std::pin::Pin; use tokio::sync::watch; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; @@ -130,18 +132,17 @@ where } /// Registers progress metrics for each registered stage - pub fn register_metrics(&mut self) { + pub fn register_metrics(&mut self) -> Result<(), PipelineError> { + let tx = self.db.tx()?; for stage in &self.stages { let stage_id = stage.id(); self.metrics.stage_checkpoint( stage_id, - self.db - .view(|tx| stage_id.get_checkpoint(tx).ok().flatten().unwrap_or_default()) - .ok() - .unwrap_or_default(), + get_stage_checkpoint(&tx, stage_id)?.unwrap_or_default(), None, ); } + Ok(()) } /// Consume the pipeline and run it until it reaches the provided tip, if set. Return the @@ -150,7 +151,7 @@ where pub fn run_as_fut(mut self, tip: Option) -> PipelineFut { // TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for // updating metrics. - self.register_metrics(); + let _ = self.register_metrics(); // ignore error Box::pin(async move { // NOTE: the tip should only be None if we are in continuous sync mode. if let Some(tip) = tip { @@ -165,7 +166,7 @@ where /// Run the pipeline in an infinite loop. Will terminate early if the user has specified /// a `max_block` in the pipeline. pub async fn run(&mut self) -> Result<(), PipelineError> { - self.register_metrics(); + let _ = self.register_metrics(); // ignore error loop { let next_action = self.run_loop().await?; @@ -225,7 +226,7 @@ where previous_stage = Some(( stage_id, - self.db.view(|tx| stage_id.get_checkpoint(tx))??.unwrap_or_default(), + get_stage_checkpoint(&self.db.tx()?, stage_id)?.unwrap_or_default(), )); } @@ -250,7 +251,7 @@ where let span = info_span!("Unwinding", stage = %stage_id); let _enter = span.enter(); - let mut stage_progress = stage_id.get_checkpoint(tx.deref())?.unwrap_or_default(); + let mut stage_progress = tx.get_stage_checkpoint(stage_id)?.unwrap_or_default(); if stage_progress.block_number < to { debug!(target: "sync::pipeline", from = %stage_progress, %to, "Unwind point too far for stage"); self.listeners.notify(PipelineEvent::Skipped { stage_id }); @@ -273,7 +274,7 @@ where // doesn't change when we unwind. None, ); - stage_id.save_checkpoint(tx.deref(), stage_progress)?; + tx.save_stage_checkpoint(stage_id, stage_progress)?; self.listeners .notify(PipelineEvent::Unwound { stage_id, result: unwind_output }); @@ -302,7 +303,7 @@ where loop { let mut tx = Transaction::new(&self.db)?; - let prev_checkpoint = stage_id.get_checkpoint(tx.deref())?; + let prev_checkpoint = tx.get_stage_checkpoint(stage_id)?; let stage_reached_max_block = prev_checkpoint .zip(self.max_block) @@ -345,7 +346,7 @@ where checkpoint, previous_stage.map(|(_, checkpoint)| checkpoint.block_number), ); - stage_id.save_checkpoint(tx.deref(), checkpoint)?; + tx.save_stage_checkpoint(stage_id, checkpoint)?; self.listeners.notify(PipelineEvent::Ran { stage_id, result: out.clone() }); @@ -416,7 +417,7 @@ impl std::fmt::Debug for Pipeline { #[cfg(test)] mod tests { use super::*; - use crate::{test_utils::TestStage, StageId, UnwindOutput}; + use crate::{test_utils::TestStage, UnwindOutput}; use assert_matches::assert_matches; use reth_db::mdbx::{self, test_utils, EnvKind}; use reth_interfaces::{consensus, provider::ProviderError}; @@ -456,11 +457,11 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( - TestStage::new(StageId("A")) + TestStage::new(StageId::Other("A")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })), ) .add_stage( - TestStage::new(StageId("B")) + TestStage::new(StageId::Other("B")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .with_max_block(10) @@ -476,14 +477,14 @@ mod tests { assert_eq!( events.collect::>().await, vec![ - PipelineEvent::Running { stage_id: StageId("A"), checkpoint: None }, + PipelineEvent::Running { stage_id: StageId::Other("A"), checkpoint: None }, PipelineEvent::Ran { - stage_id: StageId("A"), + stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, - PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None }, + PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None }, PipelineEvent::Ran { - stage_id: StageId("B"), + stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, ] @@ -497,17 +498,17 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( - TestStage::new(StageId("A")) + TestStage::new(StageId::Other("A")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) .add_stage( - TestStage::new(StageId("B")) + TestStage::new(StageId::Other("B")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) .add_stage( - TestStage::new(StageId("C")) + TestStage::new(StageId::Other("C")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(20), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(1) })), ) @@ -529,24 +530,24 @@ mod tests { events.collect::>().await, vec![ // Executing - PipelineEvent::Running { stage_id: StageId("A"), checkpoint: None }, + PipelineEvent::Running { stage_id: StageId::Other("A"), checkpoint: None }, PipelineEvent::Ran { - stage_id: StageId("A"), + stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, - PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None }, + PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None }, PipelineEvent::Ran { - stage_id: StageId("B"), + stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, - PipelineEvent::Running { stage_id: StageId("C"), checkpoint: None }, + PipelineEvent::Running { stage_id: StageId::Other("C"), checkpoint: None }, PipelineEvent::Ran { - stage_id: StageId("C"), + stage_id: StageId::Other("C"), result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true }, }, // Unwinding PipelineEvent::Unwinding { - stage_id: StageId("C"), + stage_id: StageId::Other("C"), input: UnwindInput { checkpoint: StageCheckpoint::new(20), unwind_to: 1, @@ -554,11 +555,11 @@ mod tests { } }, PipelineEvent::Unwound { - stage_id: StageId("C"), + stage_id: StageId::Other("C"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, }, PipelineEvent::Unwinding { - stage_id: StageId("B"), + stage_id: StageId::Other("B"), input: UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 1, @@ -566,11 +567,11 @@ mod tests { } }, PipelineEvent::Unwound { - stage_id: StageId("B"), + stage_id: StageId::Other("B"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, }, PipelineEvent::Unwinding { - stage_id: StageId("A"), + stage_id: StageId::Other("A"), input: UnwindInput { checkpoint: StageCheckpoint::new(100), unwind_to: 1, @@ -578,7 +579,7 @@ mod tests { } }, PipelineEvent::Unwound { - stage_id: StageId("A"), + stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(1) }, }, ] @@ -592,12 +593,12 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( - TestStage::new(StageId("A")) + TestStage::new(StageId::Other("A")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(100), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(50) })), ) .add_stage( - TestStage::new(StageId("B")) + TestStage::new(StageId::Other("B")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .with_max_block(10) @@ -618,21 +619,21 @@ mod tests { events.collect::>().await, vec![ // Executing - PipelineEvent::Running { stage_id: StageId("A"), checkpoint: None }, + PipelineEvent::Running { stage_id: StageId::Other("A"), checkpoint: None }, PipelineEvent::Ran { - stage_id: StageId("A"), + stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true }, }, - PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None }, + PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None }, PipelineEvent::Ran { - stage_id: StageId("B"), + stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, // Unwinding // Nothing to unwind in stage "B" - PipelineEvent::Skipped { stage_id: StageId("B") }, + PipelineEvent::Skipped { stage_id: StageId::Other("B") }, PipelineEvent::Unwinding { - stage_id: StageId("A"), + stage_id: StageId::Other("A"), input: UnwindInput { checkpoint: StageCheckpoint::new(100), unwind_to: 50, @@ -640,7 +641,7 @@ mod tests { } }, PipelineEvent::Unwound { - stage_id: StageId("A"), + stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(50) }, }, ] @@ -665,13 +666,13 @@ mod tests { let mut pipeline = Pipeline::builder() .add_stage( - TestStage::new(StageId("A")) + TestStage::new(StageId::Other("A")) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })) .add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) })) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) .add_stage( - TestStage::new(StageId("B")) + TestStage::new(StageId::Other("B")) .add_exec(Err(StageError::Validation { block: 5, error: consensus::ConsensusError::BaseFeeMissing, @@ -692,15 +693,15 @@ mod tests { assert_eq!( events.collect::>().await, vec![ - PipelineEvent::Running { stage_id: StageId("A"), checkpoint: None }, + PipelineEvent::Running { stage_id: StageId::Other("A"), checkpoint: None }, PipelineEvent::Ran { - stage_id: StageId("A"), + stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, - PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None }, - PipelineEvent::Error { stage_id: StageId("B") }, + PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None }, + PipelineEvent::Error { stage_id: StageId::Other("B") }, PipelineEvent::Unwinding { - stage_id: StageId("A"), + stage_id: StageId::Other("A"), input: UnwindInput { checkpoint: StageCheckpoint::new(10), unwind_to: 0, @@ -708,20 +709,20 @@ mod tests { } }, PipelineEvent::Unwound { - stage_id: StageId("A"), + stage_id: StageId::Other("A"), result: UnwindOutput { checkpoint: StageCheckpoint::new(0) }, }, PipelineEvent::Running { - stage_id: StageId("A"), + stage_id: StageId::Other("A"), checkpoint: Some(StageCheckpoint::new(0)) }, PipelineEvent::Ran { - stage_id: StageId("A"), + stage_id: StageId::Other("A"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, - PipelineEvent::Running { stage_id: StageId("B"), checkpoint: None }, + PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None }, PipelineEvent::Ran { - stage_id: StageId("B"), + stage_id: StageId::Other("B"), result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true }, }, ] @@ -735,7 +736,7 @@ mod tests { let db = test_utils::create_test_db::(EnvKind::RW); let mut pipeline = Pipeline::builder() .add_stage( - TestStage::new(StageId("NonFatal")) + TestStage::new(StageId::Other("NonFatal")) .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) .add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })), ) @@ -747,7 +748,7 @@ mod tests { // Fatal let db = test_utils::create_test_db::(EnvKind::RW); let mut pipeline = Pipeline::builder() - .add_stage(TestStage::new(StageId("Fatal")).add_exec(Err( + .add_stage(TestStage::new(StageId::Other("Fatal")).add_exec(Err( StageError::DatabaseIntegrity(ProviderError::BlockBodyIndicesNotFound(5)), ))) .build(db); diff --git a/crates/stages/src/pipeline/set.rs b/crates/stages/src/pipeline/set.rs index 78cb8d2b6..dde9e0238 100644 --- a/crates/stages/src/pipeline/set.rs +++ b/crates/stages/src/pipeline/set.rs @@ -1,5 +1,6 @@ -use crate::{Stage, StageId}; +use crate::Stage; use reth_db::database::Database; +use reth_primitives::stage::StageId; use std::{ collections::HashMap, fmt::{Debug, Formatter}, diff --git a/crates/stages/src/pipeline/sync_metrics.rs b/crates/stages/src/pipeline/sync_metrics.rs index 09f508d26..5f092bc9e 100644 --- a/crates/stages/src/pipeline/sync_metrics.rs +++ b/crates/stages/src/pipeline/sync_metrics.rs @@ -1,9 +1,10 @@ -use crate::StageId; use reth_metrics::{ metrics::{self, Gauge}, Metrics, }; -use reth_primitives::{BlockNumber, EntitiesCheckpoint, StageCheckpoint, StageUnitCheckpoint}; +use reth_primitives::{ + stage::StageId, BlockNumber, EntitiesCheckpoint, StageCheckpoint, StageUnitCheckpoint, +}; use std::collections::HashMap; #[derive(Metrics)] diff --git a/crates/stages/src/prelude.rs b/crates/stages/src/prelude.rs index 7308c52ab..3826c8d2c 100644 --- a/crates/stages/src/prelude.rs +++ b/crates/stages/src/prelude.rs @@ -1,6 +1,5 @@ pub use crate::{ error::{PipelineError, StageError}, - id::StageId, pipeline::{Pipeline, PipelineBuilder, PipelineEvent, StageSet, StageSetBuilder}, sets::{ DefaultStages, ExecutionStages, HashingStages, HistoryIndexingStages, OfflineStages, diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 3ff3f48b7..28795b889 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -1,7 +1,7 @@ -use crate::{error::StageError, id::StageId}; +use crate::error::StageError; use async_trait::async_trait; use reth_db::database::Database; -use reth_primitives::{BlockNumber, StageCheckpoint}; +use reth_primitives::{stage::StageId, BlockNumber, StageCheckpoint}; use reth_provider::Transaction; use std::{ cmp::{max, min}, diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index e8c87594a..1ad3cb1e8 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -1,4 +1,4 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use futures_util::TryStreamExt; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -11,14 +11,11 @@ use reth_interfaces::{ consensus::Consensus, p2p::bodies::{downloader::BodyDownloader, response::BlockResponse}, }; -use reth_primitives::StageCheckpoint; +use reth_primitives::{stage::StageId, StageCheckpoint}; use reth_provider::Transaction; use std::sync::Arc; use tracing::*; -/// The [`StageId`] of the bodies downloader stage. -pub const BODIES: StageId = StageId("Bodies"); - // TODO(onbjerg): Metrics and events (gradual status for e.g. CLI) /// The body stage downloads block bodies. /// @@ -62,7 +59,7 @@ pub struct BodyStage { impl Stage for BodyStage { /// Return the id of the stage fn id(&self) -> StageId { - BODIES + StageId::Bodies } /// Download block bodies from the last checkpoint for this stage up until the latest synced diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index e91933029..50074bf81 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -1,4 +1,4 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, database::Database, @@ -11,7 +11,7 @@ use reth_metrics::{ Metrics, }; use reth_primitives::{ - constants::MGAS_TO_GAS, Block, BlockNumber, BlockWithSenders, StageCheckpoint, + constants::MGAS_TO_GAS, stage::StageId, Block, BlockNumber, BlockWithSenders, StageCheckpoint, TransactionSigned, U256, }; use reth_provider::{ @@ -20,9 +20,6 @@ use reth_provider::{ use std::time::Instant; use tracing::*; -/// The [`StageId`] of the execution stage. -pub const EXECUTION: StageId = StageId("Execution"); - /// Execution stage metrics. #[derive(Metrics)] #[metrics(scope = "sync.execution")] @@ -207,7 +204,7 @@ const BIG_STACK_SIZE: usize = 64 * 1024 * 1024; impl Stage for ExecutionStage { /// Return the id of the stage fn id(&self) -> StageId { - EXECUTION + StageId::Execution } /// Execute the stage diff --git a/crates/stages/src/stages/finish.rs b/crates/stages/src/stages/finish.rs index d7e436f9a..bd941f2c2 100644 --- a/crates/stages/src/stages/finish.rs +++ b/crates/stages/src/stages/finish.rs @@ -1,11 +1,8 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::database::Database; -use reth_primitives::StageCheckpoint; +use reth_primitives::{stage::StageId, StageCheckpoint}; use reth_provider::Transaction; -/// The [`StageId`] of the finish stage. -pub const FINISH: StageId = StageId("Finish"); - /// The finish stage. /// /// This stage does not write anything; it's checkpoint is used to denote the highest fully synced @@ -16,7 +13,7 @@ pub struct FinishStage; #[async_trait::async_trait] impl Stage for FinishStage { fn id(&self) -> StageId { - FINISH + StageId::Finish } async fn execute( diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index a043a6a1e..cca766569 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -1,4 +1,4 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use itertools::Itertools; use rayon::slice::ParallelSliceMut; use reth_db::{ @@ -8,7 +8,7 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, RawKey, RawTable, }; -use reth_primitives::{keccak256, AccountHashingCheckpoint, StageCheckpoint}; +use reth_primitives::{keccak256, stage::StageId, AccountHashingCheckpoint, StageCheckpoint}; use reth_provider::Transaction; use std::{ cmp::max, @@ -18,9 +18,6 @@ use std::{ use tokio::sync::mpsc; use tracing::*; -/// The [`StageId`] of the account hashing stage. -pub const ACCOUNT_HASHING: StageId = StageId("AccountHashing"); - /// Account hashing stage hashes plain account. /// This is preparation before generating intermediate hashes and calculating Merkle tree root. #[derive(Clone, Debug)] @@ -115,7 +112,7 @@ impl AccountHashingStage { impl Stage for AccountHashingStage { /// Return the id of the stage fn id(&self) -> StageId { - ACCOUNT_HASHING + StageId::AccountHashing } /// Execute the stage. diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 5b74f5d0f..24e5355b8 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -1,4 +1,4 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use num_traits::Zero; use reth_db::{ cursor::DbDupCursorRO, @@ -7,14 +7,13 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; -use reth_primitives::{keccak256, StageCheckpoint, StorageEntry, StorageHashingCheckpoint}; +use reth_primitives::{ + keccak256, stage::StageId, StageCheckpoint, StorageEntry, StorageHashingCheckpoint, +}; use reth_provider::Transaction; use std::{collections::BTreeMap, fmt::Debug}; use tracing::*; -/// The [`StageId`] of the storage hashing stage. -pub const STORAGE_HASHING: StageId = StageId("StorageHashing"); - /// Storage hashing stage hashes plain storage. /// This is preparation before generating intermediate hashes and calculating Merkle tree root. #[derive(Debug)] @@ -36,7 +35,7 @@ impl Default for StorageHashingStage { impl Stage for StorageHashingStage { /// Return the id of the stage fn id(&self) -> StageId { - STORAGE_HASHING + StageId::StorageHashing } /// Execute the stage. diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 1b144d8bd..ec5c04937 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -1,4 +1,4 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use futures_util::StreamExt; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -11,15 +11,13 @@ use reth_interfaces::{ provider::ProviderError, }; use reth_primitives::{ - BlockHashOrNumber, BlockNumber, EntitiesCheckpoint, SealedHeader, StageCheckpoint, H256, + stage::StageId, BlockHashOrNumber, BlockNumber, EntitiesCheckpoint, SealedHeader, + StageCheckpoint, H256, }; use reth_provider::Transaction; use tokio::sync::watch; use tracing::*; -/// The [`StageId`] of the headers downloader stage. -pub const HEADERS: StageId = StageId("Headers"); - /// The header sync mode. #[derive(Debug)] pub enum HeaderSyncMode { @@ -185,7 +183,7 @@ where { /// Return the id of the stage fn id(&self) -> StageId { - HEADERS + StageId::Headers } /// Download the headers in reverse order (falling block numbers) diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 22282df40..aab1b96e7 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -1,13 +1,10 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::database::Database; -use reth_primitives::StageCheckpoint; +use reth_primitives::{stage::StageId, StageCheckpoint}; use reth_provider::Transaction; use std::fmt::Debug; use tracing::*; -/// The [`StageId`] of the account history indexing stage. -pub const INDEX_ACCOUNT_HISTORY: StageId = StageId("IndexAccountHistory"); - /// Stage is indexing history the account changesets generated in /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information /// on index sharding take a look at [`reth_db::tables::AccountHistory`] @@ -28,7 +25,7 @@ impl Default for IndexAccountHistoryStage { impl Stage for IndexAccountHistoryStage { /// Return the id of the stage fn id(&self) -> StageId { - INDEX_ACCOUNT_HISTORY + StageId::IndexAccountHistory } /// Execute the stage. diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index fc208cbaf..bc23ff14b 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -1,13 +1,10 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::{database::Database, models::BlockNumberAddress}; -use reth_primitives::StageCheckpoint; +use reth_primitives::{stage::StageId, StageCheckpoint}; use reth_provider::Transaction; use std::fmt::Debug; use tracing::*; -/// The [`StageId`] of the storage history indexing stage. -pub const INDEX_STORAGE_HISTORY: StageId = StageId("IndexStorageHistory"); - /// Stage is indexing history the account changesets generated in /// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information /// on index sharding take a look at [`reth_db::tables::StorageHistory`]. @@ -28,7 +25,7 @@ impl Default for IndexStorageHistoryStage { impl Stage for IndexStorageHistoryStage { /// Return the id of the stage fn id(&self) -> StageId { - INDEX_STORAGE_HISTORY + StageId::IndexStorageHistory } /// Execute the stage. diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index f661aec2d..608b8bb34 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -1,4 +1,4 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_codecs::Compact; use reth_db::{ database::Database, @@ -7,22 +7,13 @@ use reth_db::{ }; use reth_interfaces::consensus; use reth_primitives::{ - hex, trie::StoredSubNode, BlockNumber, MerkleCheckpoint, StageCheckpoint, H256, + hex, stage::StageId, trie::StoredSubNode, BlockNumber, MerkleCheckpoint, StageCheckpoint, H256, }; use reth_provider::Transaction; use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress}; use std::{fmt::Debug, ops::DerefMut}; use tracing::*; -/// The [`StageId`] of the merkle hashing execution stage. -pub const MERKLE_EXECUTION: StageId = StageId("MerkleExecute"); - -/// The [`StageId`] of the merkle hashing unwind stage. -pub const MERKLE_UNWIND: StageId = StageId("MerkleUnwind"); - -/// The [`StageId`] of the merkle hashing unwind and execution stage. -pub const MERKLE_BOTH: StageId = StageId("MerkleBoth"); - /// The merkle hashing stage uses input from /// [`AccountHashingStage`][crate::stages::AccountHashingStage] and /// [`StorageHashingStage`][crate::stages::AccountHashingStage] to calculate intermediate hashes @@ -95,8 +86,9 @@ impl MerkleStage { &self, tx: &Transaction<'_, DB>, ) -> Result, StageError> { - let buf = - tx.get::(MERKLE_EXECUTION.0.into())?.unwrap_or_default(); + let buf = tx + .get::(StageId::MerkleExecute.to_string())? + .unwrap_or_default(); if buf.is_empty() { return Ok(None) @@ -122,7 +114,7 @@ impl MerkleStage { ); checkpoint.to_compact(&mut buf); } - tx.put::(MERKLE_EXECUTION.0.into(), buf)?; + tx.put::(StageId::MerkleExecute.to_string(), buf)?; Ok(()) } } @@ -132,10 +124,10 @@ impl Stage for MerkleStage { /// Return the id of the stage fn id(&self) -> StageId { match self { - MerkleStage::Execution { .. } => MERKLE_EXECUTION, - MerkleStage::Unwind => MERKLE_UNWIND, + MerkleStage::Execution { .. } => StageId::MerkleExecute, + MerkleStage::Unwind => StageId::MerkleUnwind, #[cfg(any(test, feature = "test-utils"))] - MerkleStage::Both { .. } => MERKLE_BOTH, + MerkleStage::Both { .. } => StageId::Other("MerkleBoth"), } } diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index ad37986b1..4a16c001a 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -1,4 +1,4 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use itertools::Itertools; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, @@ -7,16 +7,15 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, RawKey, RawTable, RawValue, }; -use reth_primitives::{keccak256, StageCheckpoint, TransactionSignedNoHash, TxNumber, H160}; +use reth_primitives::{ + keccak256, stage::StageId, StageCheckpoint, TransactionSignedNoHash, TxNumber, H160, +}; use reth_provider::Transaction; use std::fmt::Debug; use thiserror::Error; use tokio::sync::mpsc; use tracing::*; -/// The [`StageId`] of the sender recovery stage. -pub const SENDER_RECOVERY: StageId = StageId("SenderRecovery"); - /// The sender recovery stage iterates over existing transactions, /// recovers the transaction signer and stores them /// in [`TxSenders`][reth_db::tables::TxSenders] table. @@ -37,7 +36,7 @@ impl Default for SenderRecoveryStage { impl Stage for SenderRecoveryStage { /// Return the id of the stage fn id(&self) -> StageId { - SENDER_RECOVERY + StageId::SenderRecovery } /// Retrieve the range of transactions to iterate over by querying diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index b06f839bf..81832ffdd 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -1,4 +1,4 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -6,14 +6,11 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_interfaces::{consensus::Consensus, provider::ProviderError}; -use reth_primitives::{StageCheckpoint, U256}; +use reth_primitives::{stage::StageId, StageCheckpoint, U256}; use reth_provider::Transaction; use std::sync::Arc; use tracing::*; -/// The [`StageId`] of the total difficulty stage. -pub const TOTAL_DIFFICULTY: StageId = StageId("TotalDifficulty"); - /// The total difficulty stage. /// /// This stage walks over inserted headers and computes total difficulty @@ -44,7 +41,7 @@ impl TotalDifficultyStage { impl Stage for TotalDifficultyStage { /// Return the id of the stage fn id(&self) -> StageId { - TOTAL_DIFFICULTY + StageId::TotalDifficulty } /// Write total difficulty entries diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 0ee8689c2..6685a0131 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -1,4 +1,4 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use itertools::Itertools; use rayon::prelude::*; use reth_db::{ @@ -8,16 +8,14 @@ use reth_db::{ transaction::{DbTx, DbTxMut}, }; use reth_primitives::{ - rpc_utils::keccak256, BlockNumber, StageCheckpoint, TransactionSignedNoHash, TxNumber, H256, + rpc_utils::keccak256, stage::StageId, BlockNumber, StageCheckpoint, TransactionSignedNoHash, + TxNumber, H256, }; use reth_provider::Transaction; use thiserror::Error; use tokio::sync::mpsc; use tracing::*; -/// The [`StageId`] of the transaction lookup stage. -pub const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup"); - /// The transaction lookup stage. /// /// This stage walks over the bodies table, and sets the transaction hash of each transaction in a @@ -46,7 +44,7 @@ impl TransactionLookupStage { impl Stage for TransactionLookupStage { /// Return the id of the stage fn id(&self) -> StageId { - TRANSACTION_LOOKUP + StageId::TransactionLookup } /// Write transaction hash -> id entries diff --git a/crates/stages/src/test_utils/mod.rs b/crates/stages/src/test_utils/mod.rs index 3b499fc69..bae39017e 100644 --- a/crates/stages/src/test_utils/mod.rs +++ b/crates/stages/src/test_utils/mod.rs @@ -1,5 +1,5 @@ #![allow(unused)] -use crate::StageId; +use reth_primitives::stage::StageId; mod macros; pub(crate) use macros::*; @@ -19,7 +19,7 @@ mod set; pub use set::TestStages; /// The test stage id -pub const TEST_STAGE_ID: StageId = StageId("TestStage"); +pub const TEST_STAGE_ID: StageId = StageId::Other("TestStage"); /// The previous test stage id mock used for testing -pub(crate) const PREV_STAGE_ID: StageId = StageId("PrevStage"); +pub(crate) const PREV_STAGE_ID: StageId = StageId::Other("PrevStage"); diff --git a/crates/stages/src/test_utils/stage.rs b/crates/stages/src/test_utils/stage.rs index bd9913f9b..81056a5e1 100644 --- a/crates/stages/src/test_utils/stage.rs +++ b/crates/stages/src/test_utils/stage.rs @@ -1,5 +1,6 @@ -use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use reth_db::database::Database; +use reth_primitives::stage::StageId; use reth_provider::Transaction; use std::collections::VecDeque; diff --git a/crates/storage/provider/src/providers/database.rs b/crates/storage/provider/src/providers/database.rs index bf28aa38f..65d2c9196 100644 --- a/crates/storage/provider/src/providers/database.rs +++ b/crates/storage/provider/src/providers/database.rs @@ -7,9 +7,9 @@ use crate::{ use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; use reth_interfaces::Result; use reth_primitives::{ - Block, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, ChainSpec, Head, Header, Receipt, - SealedBlock, SealedHeader, TransactionMeta, TransactionSigned, TxHash, TxNumber, Withdrawal, - H256, U256, + stage::StageId, Block, BlockHash, BlockHashOrNumber, BlockNumber, ChainInfo, ChainSpec, Head, + Header, Receipt, SealedBlock, SealedHeader, StageCheckpoint, TransactionMeta, + TransactionSigned, TxHash, TxNumber, Withdrawal, H256, U256, }; use reth_revm_primitives::{ config::revm_spec, @@ -659,7 +659,7 @@ fn best_block_number<'a, TX>( where TX: DbTx<'a> + Send + Sync, { - tx.get::("Finish".to_string()) + tx.get::("Finish".to_string()) // TODO: .map(|result| result.map(|checkpoint| checkpoint.block_number)) } @@ -674,6 +674,18 @@ where tx.cursor_read::()?.last() } +/// Get checkpoint for the given stage. +#[inline] +pub fn get_stage_checkpoint<'a, TX>( + tx: &TX, + id: StageId, +) -> std::result::Result, reth_interfaces::db::DatabaseError> +where + TX: DbTx<'a> + Send + Sync, +{ + tx.get::(id.to_string()) +} + #[cfg(test)] mod tests { use super::ShareableDatabase; diff --git a/crates/storage/provider/src/transaction.rs b/crates/storage/provider/src/transaction.rs index d3be66a22..405979c3f 100644 --- a/crates/storage/provider/src/transaction.rs +++ b/crates/storage/provider/src/transaction.rs @@ -1,6 +1,7 @@ use crate::{ insert_canonical_block, post_state::{PostState, StorageChangeset}, + providers::get_stage_checkpoint, }; use itertools::{izip, Itertools}; use reth_db::{ @@ -19,8 +20,8 @@ use reth_db::{ }; use reth_interfaces::{db::DatabaseError as DbError, provider::ProviderError}; use reth_primitives::{ - keccak256, Account, Address, BlockHash, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock, - SealedBlockWithSenders, StageCheckpoint, StorageEntry, TransactionSigned, + keccak256, stage::StageId, Account, Address, BlockHash, BlockNumber, ChainSpec, Hardfork, + Header, SealedBlock, SealedBlockWithSenders, StageCheckpoint, StorageEntry, TransactionSigned, TransactionSignedEcRecovered, H256, U256, }; use reth_trie::{StateRoot, StateRootError}; @@ -1281,6 +1282,21 @@ where Ok(()) } + /// Get the stage checkpoint. + pub fn get_stage_checkpoint(&self, id: StageId) -> Result, DbError> { + get_stage_checkpoint(self.deref(), id) + } + + /// Save stage checkpoint. + pub fn save_stage_checkpoint( + &self, + id: StageId, + checkpoint: StageCheckpoint, + ) -> Result<(), DbError> { + self.put::(id.to_string(), checkpoint)?; + Ok(()) + } + /// Return full table as Vec pub fn table(&self) -> Result>, DbError> where diff --git a/testing/ef-tests/src/cases/blockchain_test.rs b/testing/ef-tests/src/cases/blockchain_test.rs index 3e52403a6..6b7568583 100644 --- a/testing/ef-tests/src/cases/blockchain_test.rs +++ b/testing/ef-tests/src/cases/blockchain_test.rs @@ -5,9 +5,9 @@ use crate::{ Case, Error, Suite, }; use reth_db::mdbx::test_utils::create_test_rw_db; -use reth_primitives::{BlockBody, SealedBlock, StageCheckpoint}; +use reth_primitives::{stage::StageId, BlockBody, SealedBlock, StageCheckpoint}; use reth_provider::Transaction; -use reth_stages::{stages::ExecutionStage, ExecInput, Stage, StageId}; +use reth_stages::{stages::ExecutionStage, ExecInput, Stage}; use std::{collections::BTreeMap, ffi::OsStr, fs, ops::Deref, path::Path, sync::Arc}; /// A handler for the blockchain test suite. @@ -105,8 +105,9 @@ impl Case for BlockchainTestCase { .execute( &mut transaction, ExecInput { - previous_stage: last_block - .map(|b| (StageId("Dummy"), StageCheckpoint::new(b))), + previous_stage: last_block.map(|b| { + (StageId::Other("Dummy"), StageCheckpoint::new(b)) + }), checkpoint: None, }, )