diff --git a/bin/reth/src/debug_cmd/merkle.rs b/bin/reth/src/debug_cmd/merkle.rs index df428156e..193f4f331 100644 --- a/bin/reth/src/debug_cmd/merkle.rs +++ b/bin/reth/src/debug_cmd/merkle.rs @@ -111,7 +111,7 @@ impl Command { .execute( &mut tx, ExecInput { - previous_stage: Some((StageId::SenderRecovery, block)), + target: Some(block), checkpoint: block.checked_sub(1).map(StageCheckpoint::new), }, ) @@ -123,7 +123,7 @@ impl Command { .execute( &mut tx, ExecInput { - previous_stage: Some((StageId::Execution, block)), + target: Some(block), checkpoint: progress.map(StageCheckpoint::new), }, ) @@ -137,7 +137,7 @@ impl Command { .execute( &mut tx, ExecInput { - previous_stage: Some((StageId::AccountHashing, block)), + target: Some(block), checkpoint: progress.map(StageCheckpoint::new), }, ) @@ -149,7 +149,7 @@ impl Command { .execute( &mut tx, ExecInput { - previous_stage: Some((StageId::StorageHashing, block)), + target: Some(block), checkpoint: progress.map(StageCheckpoint::new), }, ) @@ -166,10 +166,7 @@ impl Command { .walk_range(..)? .collect::, _>>()?; - let clean_input = ExecInput { - previous_stage: Some((StageId::StorageHashing, block)), - checkpoint: None, - }; + let clean_input = ExecInput { target: Some(block), checkpoint: None }; loop { let clean_result = merkle_stage.execute(&mut tx, clean_input).await; assert!(clean_result.is_ok(), "Clean state root calculation failed"); diff --git a/bin/reth/src/stage/dump/execution.rs b/bin/reth/src/stage/dump/execution.rs index 50593dd6c..832677331 100644 --- a/bin/reth/src/stage/dump/execution.rs +++ b/bin/reth/src/stage/dump/execution.rs @@ -4,10 +4,7 @@ use eyre::Result; use reth_db::{ cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx, }; -use reth_primitives::{ - stage::{StageCheckpoint, StageId}, - MAINNET, -}; +use reth_primitives::{stage::StageCheckpoint, MAINNET}; use reth_provider::Transaction; use reth_stages::{stages::ExecutionStage, Stage, UnwindInput}; use std::{ops::DerefMut, path::PathBuf, sync::Arc}; @@ -139,7 +136,7 @@ async fn dry_run( .execute( &mut tx, reth_stages::ExecInput { - previous_stage: Some((StageId::Other("Another"), to)), + target: Some(to), checkpoint: Some(StageCheckpoint::new(from)), }, ) diff --git a/bin/reth/src/stage/dump/hashing_account.rs b/bin/reth/src/stage/dump/hashing_account.rs index f45affb28..642fa525a 100644 --- a/bin/reth/src/stage/dump/hashing_account.rs +++ b/bin/reth/src/stage/dump/hashing_account.rs @@ -2,10 +2,7 @@ use super::setup; use crate::utils::DbTool; use eyre::Result; use reth_db::{database::Database, table::TableImporter, tables}; -use reth_primitives::{ - stage::{StageCheckpoint, StageId}, - BlockNumber, -}; +use reth_primitives::{stage::StageCheckpoint, BlockNumber}; use reth_provider::Transaction; use reth_stages::{stages::AccountHashingStage, Stage, UnwindInput}; use std::{ops::DerefMut, path::PathBuf}; @@ -83,7 +80,7 @@ async fn dry_run( .execute( &mut tx, reth_stages::ExecInput { - previous_stage: Some((StageId::Other("Another"), to)), + target: Some(to), checkpoint: Some(StageCheckpoint::new(from)), }, ) diff --git a/bin/reth/src/stage/dump/hashing_storage.rs b/bin/reth/src/stage/dump/hashing_storage.rs index 7abc8d6b5..6529541be 100644 --- a/bin/reth/src/stage/dump/hashing_storage.rs +++ b/bin/reth/src/stage/dump/hashing_storage.rs @@ -2,7 +2,7 @@ use super::setup; use crate::utils::DbTool; use eyre::Result; use reth_db::{database::Database, table::TableImporter, tables}; -use reth_primitives::stage::{StageCheckpoint, StageId}; +use reth_primitives::stage::StageCheckpoint; use reth_provider::Transaction; use reth_stages::{stages::StorageHashingStage, Stage, UnwindInput}; use std::{ops::DerefMut, path::PathBuf}; @@ -77,7 +77,7 @@ async fn dry_run( .execute( &mut tx, reth_stages::ExecInput { - previous_stage: Some((StageId::Other("Another"), to)), + target: Some(to), checkpoint: Some(StageCheckpoint::new(from)), }, ) diff --git a/bin/reth/src/stage/dump/merkle.rs b/bin/reth/src/stage/dump/merkle.rs index b0447c248..a1f0d599a 100644 --- a/bin/reth/src/stage/dump/merkle.rs +++ b/bin/reth/src/stage/dump/merkle.rs @@ -2,10 +2,7 @@ use super::setup; use crate::utils::DbTool; use eyre::Result; use reth_db::{database::Database, table::TableImporter, tables}; -use reth_primitives::{ - stage::{StageCheckpoint, StageId}, - BlockNumber, MAINNET, -}; +use reth_primitives::{stage::StageCheckpoint, BlockNumber, MAINNET}; use reth_provider::Transaction; use reth_stages::{ stages::{ @@ -57,10 +54,8 @@ async fn unwind_and_copy( checkpoint: StageCheckpoint::new(tip_block_number), bad_block: None, }; - let execute_input = reth_stages::ExecInput { - previous_stage: Some((StageId::Other("Another"), to)), - checkpoint: Some(StageCheckpoint::new(from)), - }; + let execute_input = + reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) }; // Unwind hashes all the way to FROM StorageHashingStage::default().unwind(&mut unwind_tx, unwind).await.unwrap(); @@ -128,7 +123,7 @@ async fn dry_run( .execute( &mut tx, reth_stages::ExecInput { - previous_stage: Some((StageId::Other("Another"), to)), + target: Some(to), checkpoint: Some(StageCheckpoint::new(from)), }, ) diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index 4a2a638ca..9f4291830 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -11,7 +11,7 @@ use clap::Parser; use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; -use reth_primitives::{stage::StageId, ChainSpec}; +use reth_primitives::ChainSpec; use reth_provider::{providers::get_stage_checkpoint, ShareableDatabase, Transaction}; use reth_staged_sync::utils::init::init_db; use reth_stages::{ @@ -232,7 +232,7 @@ impl Command { } let mut input = ExecInput { - previous_stage: Some((StageId::Other("No Previous Stage"), self.to)), + target: Some(self.to), checkpoint: Some(checkpoint.with_block_number(self.from)), }; diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 6969a71f0..aa5a000fc 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -5,7 +5,7 @@ use criterion::{ use pprof::criterion::{Output, PProfProfiler}; use reth_db::mdbx::{Env, WriteMap}; use reth_interfaces::test_utils::TestConsensus; -use reth_primitives::stage::{StageCheckpoint, StageId}; +use reth_primitives::stage::StageCheckpoint; use reth_stages::{ stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, test_utils::TestTransaction, @@ -162,7 +162,7 @@ fn measure_stage( stage, ( ExecInput { - previous_stage: Some((StageId::Other("Another"), block_interval.end)), + target: Some(block_interval.end), checkpoint: Some(StageCheckpoint::new(block_interval.start)), }, UnwindInput { diff --git a/crates/stages/benches/setup/account_hashing.rs b/crates/stages/benches/setup/account_hashing.rs index 662474a7d..893e2c931 100644 --- a/crates/stages/benches/setup/account_hashing.rs +++ b/crates/stages/benches/setup/account_hashing.rs @@ -2,7 +2,7 @@ use super::{constants, StageRange}; use reth_db::{ cursor::DbCursorRO, database::Database, tables, transaction::DbTx, DatabaseError as DbError, }; -use reth_primitives::stage::{StageCheckpoint, StageId}; +use reth_primitives::stage::StageCheckpoint; use reth_stages::{ stages::{AccountHashingStage, SeedOpts}, test_utils::TestTransaction, @@ -40,7 +40,7 @@ fn find_stage_range(db: &Path) -> StageRange { stage_range = Some(( ExecInput { - previous_stage: Some((StageId::Other("Another"), to.block_number)), + target: Some(to.block_number), checkpoint: Some(StageCheckpoint::new(from)), }, UnwindInput { unwind_to: from, checkpoint: to, bad_block: None }, @@ -66,14 +66,5 @@ fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) { let mut tx = tx.inner(); let _accounts = AccountHashingStage::seed(&mut tx, opts); } - ( - path, - ( - ExecInput { - previous_stage: Some((StageId::Other("Another"), num_blocks)), - ..Default::default() - }, - UnwindInput::default(), - ), - ) + (path, (ExecInput { target: Some(num_blocks), ..Default::default() }, UnwindInput::default())) } diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index c1a48dda1..edb8e02e6 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -223,10 +223,9 @@ where } } - previous_stage = Some(( - stage_id, + previous_stage = Some( get_stage_checkpoint(&self.db.tx()?, stage_id)?.unwrap_or_default().block_number, - )); + ); } Ok(self.progress.next_ctrl()) @@ -292,7 +291,7 @@ where async fn execute_stage_to_completion( &mut self, - previous_stage: Option<(StageId, BlockNumber)>, + previous_stage: Option, stage_index: usize, ) -> Result { let total_stages = self.stages.len(); @@ -300,6 +299,7 @@ where let stage = &mut self.stages[stage_index]; let stage_id = stage.id(); let mut made_progress = false; + let target = self.max_block.or(previous_stage); loop { let mut tx = Transaction::new(&self.db)?; @@ -332,10 +332,7 @@ where checkpoint: prev_checkpoint, }); - match stage - .execute(&mut tx, ExecInput { previous_stage, checkpoint: prev_checkpoint }) - .await - { + match stage.execute(&mut tx, ExecInput { target, checkpoint: prev_checkpoint }).await { Ok(out @ ExecOutput { checkpoint, done }) => { made_progress |= checkpoint.block_number != prev_checkpoint.unwrap_or_default().block_number; @@ -347,11 +344,7 @@ where %done, "Stage committed progress" ); - self.metrics.stage_checkpoint( - stage_id, - checkpoint, - self.max_block.or(previous_stage.map(|(_, block_number)| block_number)), - ); + self.metrics.stage_checkpoint(stage_id, checkpoint, target); tx.save_stage_checkpoint(stage_id, checkpoint)?; self.listeners.notify(PipelineEvent::Ran { diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 0abad4d9f..ba4a286f7 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -14,8 +14,8 @@ use std::{ /// Stage execution input, see [Stage::execute]. #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] pub struct ExecInput { - /// The stage that was run before the current stage and the block number it reached. - pub previous_stage: Option<(StageId, BlockNumber)>, + /// The target block number the stage needs to execute towards. + pub target: Option, /// The checkpoint of this stage the last time it was executed. pub checkpoint: Option, } @@ -35,12 +35,12 @@ impl ExecInput { /// Returns `true` if the target block number has already been reached. pub fn target_reached(&self) -> bool { - self.checkpoint().block_number >= self.previous_stage_checkpoint_block_number() + self.checkpoint().block_number >= self.target() } - /// Return the progress of the previous stage or default. - pub fn previous_stage_checkpoint_block_number(&self) -> BlockNumber { - self.previous_stage.map(|(_, block_number)| block_number).unwrap_or_default() + /// Return the target block number or default. + pub fn target(&self) -> BlockNumber { + self.target.unwrap_or_default() } /// Return next block range that needs to be executed. @@ -62,7 +62,7 @@ impl ExecInput { ) -> (RangeInclusive, bool) { let current_block = self.checkpoint(); let start = current_block.block_number + 1; - let target = self.previous_stage_checkpoint_block_number(); + let target = self.target(); let end = min(target, current_block.block_number.saturating_add(threshold)); @@ -83,7 +83,7 @@ impl ExecInput { .get::(start_block)? .ok_or(ProviderError::BlockBodyIndicesNotFound(start_block))?; - let target_block = self.previous_stage_checkpoint_block_number(); + let target_block = self.target(); let first_tx_number = start_block_body.first_tx_num(); let mut last_tx_number = start_block_body.last_tx_num(); diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index ffcde2274..d5bebfb08 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -236,7 +236,6 @@ mod tests { use super::*; use crate::test_utils::{ stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner, - PREV_STAGE_ID, }; use assert_matches::assert_matches; use reth_primitives::stage::StageUnitCheckpoint; @@ -252,7 +251,7 @@ mod tests { // Set up test runner let mut runner = BodyTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; runner.seed_execution(input).expect("failed to seed execution"); @@ -290,7 +289,7 @@ mod tests { // Set up test runner let mut runner = BodyTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; runner.seed_execution(input).expect("failed to seed execution"); @@ -328,7 +327,7 @@ mod tests { // Set up test runner let mut runner = BodyTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; runner.seed_execution(input).expect("failed to seed execution"); @@ -355,10 +354,8 @@ mod tests { let first_run_checkpoint = first_run.unwrap().checkpoint; // Execute again on top of the previous run - let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), - checkpoint: Some(first_run_checkpoint), - }; + let input = + ExecInput { target: Some(previous_stage), checkpoint: Some(first_run_checkpoint) }; let rx = runner.execute(input); // Check that we synced more blocks @@ -389,7 +386,7 @@ mod tests { // Set up test runner let mut runner = BodyTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; runner.seed_execution(input).expect("failed to seed execution"); @@ -560,7 +557,7 @@ mod tests { fn seed_execution(&mut self, input: ExecInput) -> Result { let start = input.checkpoint().block_number; - let end = input.previous_stage_checkpoint_block_number(); + let end = input.target(); let blocks = random_block_range(start..=end, GENESIS_HASH, 0..2); self.tx.insert_headers_with_td(blocks.iter().map(|block| &block.header))?; if let Some(progress) = blocks.first() { diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 1a8cc2ea1..ea3308482 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -139,7 +139,7 @@ impl ExecutionStage { input: ExecInput, ) -> Result { let start_block = input.next_block(); - let max_block = input.previous_stage_checkpoint_block_number(); + let max_block = input.target(); // Build executor let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx)); @@ -449,7 +449,7 @@ impl ExecutionStageThresholds { #[cfg(test)] mod tests { use super::*; - use crate::test_utils::{TestTransaction, PREV_STAGE_ID}; + use crate::test_utils::TestTransaction; use assert_matches::assert_matches; use reth_db::{ mdbx::{test_utils::create_test_db, EnvKind, WriteMap}, @@ -601,7 +601,7 @@ mod tests { let state_db = create_test_db::(EnvKind::RW); let mut tx = Transaction::new(state_db.as_ref()).unwrap(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, 1)), + target: Some(1), /// The progress of this stage the last time it was executed. checkpoint: None, }; @@ -705,7 +705,7 @@ mod tests { let state_db = create_test_db::(EnvKind::RW); let mut tx = Transaction::new(state_db.as_ref()).unwrap(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, 1)), + target: Some(1), /// The progress of this stage the last time it was executed. checkpoint: None, }; @@ -791,7 +791,7 @@ mod tests { let test_tx = TestTransaction::default(); let mut tx = test_tx.inner(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, 1)), + target: Some(1), /// The progress of this stage the last time it was executed. checkpoint: None, }; diff --git a/crates/stages/src/stages/finish.rs b/crates/stages/src/stages/finish.rs index 2ec60d607..e5f4f9218 100644 --- a/crates/stages/src/stages/finish.rs +++ b/crates/stages/src/stages/finish.rs @@ -21,10 +21,7 @@ impl Stage for FinishStage { _tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - Ok(ExecOutput { - checkpoint: StageCheckpoint::new(input.previous_stage_checkpoint_block_number()), - done: true, - }) + Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) } async fn unwind( @@ -74,7 +71,7 @@ mod tests { self.tx.insert_headers_with_td(std::iter::once(&head))?; // use previous progress as seed size - let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1; + let end = input.target.unwrap_or_default() + 1; if start + 1 >= end { return Ok(Vec::default()) @@ -95,7 +92,7 @@ mod tests { assert!(output.done, "stage should always be done"); assert_eq!( output.checkpoint.block_number, - input.previous_stage_checkpoint_block_number(), + input.target(), "stage progress should always match progress of previous stage" ); } diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index 39def8647..808f0bed6 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -252,7 +252,7 @@ impl Stage for AccountHashingStage { // We finished the hashing stage, no future iterations is expected for the same block range, // so no checkpoint is needed. - let checkpoint = StageCheckpoint::new(input.previous_stage_checkpoint_block_number()) + let checkpoint = StageCheckpoint::new(input.target()) .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint { progress: stage_checkpoint_progress(tx)?, ..Default::default() @@ -301,7 +301,6 @@ mod tests { use super::*; use crate::test_utils::{ stage_test_suite_ext, ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner, - PREV_STAGE_ID, }; use assert_matches::assert_matches; use reth_primitives::{stage::StageUnitCheckpoint, Account, U256}; @@ -317,7 +316,7 @@ mod tests { runner.set_clean_threshold(1); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -358,7 +357,7 @@ mod tests { runner.set_commit_threshold(5); let mut input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -537,11 +536,7 @@ mod tests { fn seed_execution(&mut self, input: ExecInput) -> Result { Ok(AccountHashingStage::seed( &mut self.tx.inner(), - SeedOpts { - blocks: 1..=input.previous_stage_checkpoint_block_number(), - accounts: 0..10, - txs: 0..3, - }, + SeedOpts { blocks: 1..=input.target(), accounts: 0..10, txs: 0..3 }, ) .unwrap()) } diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index fc19a2050..b2222d9a8 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -181,7 +181,7 @@ impl Stage for StorageHashingStage { // We finished the hashing stage, no future iterations is expected for the same block range, // so no checkpoint is needed. - let checkpoint = StageCheckpoint::new(input.previous_stage_checkpoint_block_number()) + let checkpoint = StageCheckpoint::new(input.target()) .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint { progress: stage_checkpoint_progress(tx)?, ..Default::default() @@ -229,7 +229,7 @@ mod tests { use super::*; use crate::test_utils::{ stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID, + TestTransaction, UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_db::{ @@ -262,7 +262,7 @@ mod tests { runner.set_commit_threshold(1); let mut input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -322,7 +322,7 @@ mod tests { runner.set_commit_threshold(500); let mut input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -487,7 +487,7 @@ mod tests { fn seed_execution(&mut self, input: ExecInput) -> Result { let stage_progress = input.next_block(); - let end = input.previous_stage_checkpoint_block_number(); + let end = input.target(); let n_accounts = 31; let mut accounts = random_contract_account_range(&mut (0..n_accounts)); diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 40cd245f0..3bb20ca0f 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -374,7 +374,6 @@ mod tests { use super::*; use crate::test_utils::{ stage_test_suite, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner, - PREV_STAGE_ID, }; use assert_matches::assert_matches; use reth_interfaces::test_utils::generators::random_header; @@ -446,7 +445,7 @@ mod tests { self.tx.commit(|tx| tx.put::(head.number, U256::ZERO.into()))?; // use previous checkpoint as seed size - let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1; + let end = input.target.unwrap_or_default() + 1; if start + 1 >= end { return Ok(Vec::default()) @@ -554,7 +553,7 @@ mod tests { let mut runner = HeadersTestRunner::with_linear_downloader(); let (checkpoint, previous_stage) = (1000, 1200); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(checkpoint)), }; let headers = runner.seed_execution(input).expect("failed to seed execution"); @@ -648,7 +647,7 @@ mod tests { // pick range that's larger than the configured headers batch size let (checkpoint, previous_stage) = (600, 1200); let mut input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(checkpoint)), }; let headers = runner.seed_execution(input).expect("failed to seed execution"); diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index a435f99b2..8c33cbe2a 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -142,7 +142,7 @@ mod tests { use std::collections::BTreeMap; use super::*; - use crate::test_utils::{TestTransaction, PREV_STAGE_ID}; + use crate::test_utils::TestTransaction; use reth_db::{ models::{ sharded_key::NUM_OF_INDICES_IN_SHARD, AccountBeforeTx, ShardedKey, @@ -206,8 +206,7 @@ mod tests { } async fn run(tx: &TestTransaction, run_to: u64) { - let input = - ExecInput { previous_stage: Some((PREV_STAGE_ID, run_to)), ..Default::default() }; + let input = ExecInput { target: Some(run_to), ..Default::default() }; let mut stage = IndexAccountHistoryStage::default(); let mut tx = tx.inner(); let out = stage.execute(&mut tx, input).await.unwrap(); diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index 6bb328d0b..b5fc84113 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -45,7 +45,7 @@ impl Stage for IndexStorageHistoryStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let target = input.previous_stage_checkpoint_block_number(); + let target = input.target(); let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); if range.is_empty() { @@ -145,7 +145,7 @@ mod tests { use std::collections::BTreeMap; use super::*; - use crate::test_utils::{TestTransaction, PREV_STAGE_ID}; + use crate::test_utils::TestTransaction; use reth_db::{ models::{ storage_sharded_key::{StorageShardedKey, NUM_OF_INDICES_IN_SHARD}, @@ -219,8 +219,7 @@ mod tests { } async fn run(tx: &TestTransaction, run_to: u64) { - let input = - ExecInput { previous_stage: Some((PREV_STAGE_ID, run_to)), ..Default::default() }; + let input = ExecInput { target: Some(run_to), ..Default::default() }; let mut stage = IndexStorageHistoryStage::default(); let mut tx = tx.inner(); let out = stage.execute(&mut tx, input).await.unwrap(); @@ -307,7 +306,7 @@ mod tests { async fn insert_index_to_full_shard() { // init let tx = TestTransaction::default(); - let _input = ExecInput { previous_stage: Some((PREV_STAGE_ID, 5)), ..Default::default() }; + let _input = ExecInput { target: Some(5), ..Default::default() }; // change does not matter only that account is present in changeset. let full_list = vec![3; NUM_OF_INDICES_IN_SHARD]; diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 5228a64c6..aae483dd6 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -149,7 +149,7 @@ impl Stage for MerkleStage { let threshold = match self { MerkleStage::Unwind => { info!(target: "sync::stages::merkle::unwind", "Stage is always skipped"); - return Ok(ExecOutput::done(input.previous_stage_checkpoint_block_number())) + return Ok(ExecOutput::done(input.target())) } MerkleStage::Execution { clean_threshold } => *clean_threshold, #[cfg(any(test, feature = "test-utils"))] @@ -158,7 +158,7 @@ impl Stage for MerkleStage { let range = input.next_block_range(); let (from_block, to_block) = range.clone().into_inner(); - let current_block = input.previous_stage_checkpoint_block_number(); + let current_block = input.target(); let block = tx.get_header(current_block)?; let block_root = block.state_root; @@ -332,7 +332,7 @@ mod tests { use super::*; use crate::test_utils::{ stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID, + TestTransaction, UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_db::{ @@ -360,7 +360,7 @@ mod tests { let mut runner = MerkleTestRunner::default(); // set low threshold so we hash the whole storage let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -400,7 +400,7 @@ mod tests { // Set up the runner let mut runner = MerkleTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -462,7 +462,7 @@ mod tests { fn seed_execution(&mut self, input: ExecInput) -> Result { let stage_progress = input.checkpoint().block_number; let start = stage_progress + 1; - let end = input.previous_stage_checkpoint_block_number(); + let end = input.target(); let num_of_accounts = 31; let accounts = random_contract_account_range(&mut (0..num_of_accounts)) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index b417f8cd8..ae1e6d9d0 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -233,7 +233,7 @@ mod tests { use super::*; use crate::test_utils::{ stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID, + TestTransaction, UnwindStageTestRunner, }; stage_test_suite_ext!(SenderRecoveryTestRunner, sender_recovery); @@ -246,13 +246,13 @@ mod tests { // Set up the runner let runner = SenderRecoveryTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; // Insert blocks with a single transaction at block `stage_progress + 10` let non_empty_block_number = stage_progress + 10; - let blocks = (stage_progress..=input.previous_stage_checkpoint_block_number()) + let blocks = (stage_progress..=input.target()) .map(|number| { random_block(number, None, Some((number == non_empty_block_number) as u8), None) }) @@ -293,7 +293,7 @@ mod tests { let total_transactions = runner.tx.table::().unwrap().len() as u64; let first_input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -325,7 +325,7 @@ mod tests { // Execute second time to completion runner.set_threshold(u64::MAX); let second_input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(expected_progress)), }; let result = runner.execute(second_input).await.unwrap(); @@ -398,7 +398,7 @@ mod tests { fn seed_execution(&mut self, input: ExecInput) -> Result { let stage_progress = input.checkpoint().block_number; - let end = input.previous_stage_checkpoint_block_number(); + let end = input.target(); let blocks = random_block_range(stage_progress..=end, H256::zero(), 0..2); self.tx.insert_blocks(blocks.iter(), None)?; diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index 582155182..c23a4d9c9 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -132,7 +132,7 @@ mod tests { use super::*; use crate::test_utils::{ stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID, + TestTransaction, UnwindStageTestRunner, }; stage_test_suite_ext!(TotalDifficultyTestRunner, total_difficulty); @@ -146,7 +146,7 @@ mod tests { runner.set_threshold(threshold); let first_input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -170,7 +170,7 @@ mod tests { // Execute second time let second_input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(expected_progress)), }; let result = runner.execute(second_input).await.unwrap(); @@ -239,7 +239,7 @@ mod tests { })?; // use previous progress as seed size - let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1; + let end = input.target.unwrap_or_default() + 1; if start + 1 >= end { return Ok(Vec::default()) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index ac5aa780f..76b48f359 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -196,7 +196,7 @@ mod tests { use super::*; use crate::test_utils::{ stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID, + TestTransaction, UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_interfaces::test_utils::generators::{random_block, random_block_range}; @@ -212,13 +212,13 @@ mod tests { // Set up the runner let runner = TransactionLookupTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; // Insert blocks with a single transaction at block `stage_progress + 10` let non_empty_block_number = stage_progress + 10; - let blocks = (stage_progress..=input.previous_stage_checkpoint_block_number()) + let blocks = (stage_progress..=input.target()) .map(|number| { random_block(number, None, Some((number == non_empty_block_number) as u8), None) }) @@ -253,7 +253,7 @@ mod tests { runner.set_threshold(threshold); let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold let first_input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -291,7 +291,7 @@ mod tests { // Execute second time to completion runner.set_threshold(u64::MAX); let second_input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(StageCheckpoint::new(expected_progress)), }; let result = runner.execute(second_input).await.unwrap(); @@ -365,7 +365,7 @@ mod tests { fn seed_execution(&mut self, input: ExecInput) -> Result { let stage_progress = input.checkpoint().block_number; - let end = input.previous_stage_checkpoint_block_number(); + let end = input.target(); let blocks = random_block_range(stage_progress + 1..=end, H256::zero(), 0..2); self.tx.insert_blocks(blocks.iter(), None)?; diff --git a/crates/stages/src/test_utils/macros.rs b/crates/stages/src/test_utils/macros.rs index af7fdab41..f691d1371 100644 --- a/crates/stages/src/test_utils/macros.rs +++ b/crates/stages/src/test_utils/macros.rs @@ -29,7 +29,7 @@ macro_rules! stage_test_suite { // Set up the runner let mut runner = $runner::default(); let input = crate::stage::ExecInput { - previous_stage: Some((crate::test_utils::PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(stage_progress)), }; let seed = runner.seed_execution(input).expect("failed to seed"); @@ -81,7 +81,7 @@ macro_rules! stage_test_suite { // Set up the runner let mut runner = $runner::default(); let execute_input = crate::stage::ExecInput { - previous_stage: Some((crate::test_utils::PREV_STAGE_ID, previous_stage)), + target: Some(previous_stage), checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(stage_progress)), }; let seed = runner.seed_execution(execute_input).expect("failed to seed"); @@ -138,7 +138,7 @@ macro_rules! stage_test_suite_ext { // Set up the runner let mut runner = $runner::default(); let input = crate::stage::ExecInput { - previous_stage: Some((crate::test_utils::PREV_STAGE_ID, stage_progress)), + target: Some(stage_progress), checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(stage_progress)), }; let seed = runner.seed_execution(input).expect("failed to seed"); diff --git a/crates/stages/src/test_utils/mod.rs b/crates/stages/src/test_utils/mod.rs index bae39017e..b9fe397a8 100644 --- a/crates/stages/src/test_utils/mod.rs +++ b/crates/stages/src/test_utils/mod.rs @@ -20,6 +20,3 @@ pub use set::TestStages; /// The test stage id pub const TEST_STAGE_ID: StageId = StageId::Other("TestStage"); - -/// The previous test stage id mock used for testing -pub(crate) const PREV_STAGE_ID: StageId = StageId::Other("PrevStage"); diff --git a/testing/ef-tests/src/cases/blockchain_test.rs b/testing/ef-tests/src/cases/blockchain_test.rs index 72ee38ae6..ec295bd03 100644 --- a/testing/ef-tests/src/cases/blockchain_test.rs +++ b/testing/ef-tests/src/cases/blockchain_test.rs @@ -5,7 +5,7 @@ use crate::{ Case, Error, Suite, }; use reth_db::mdbx::test_utils::create_test_rw_db; -use reth_primitives::{stage::StageId, BlockBody, SealedBlock}; +use reth_primitives::{BlockBody, SealedBlock}; use reth_provider::Transaction; use reth_stages::{stages::ExecutionStage, ExecInput, Stage}; use std::{collections::BTreeMap, ffi::OsStr, fs, ops::Deref, path::Path, sync::Arc}; @@ -104,11 +104,7 @@ impl Case for BlockchainTestCase { let _ = stage .execute( &mut transaction, - ExecInput { - previous_stage: last_block - .map(|b| (StageId::Other("Dummy"), b)), - checkpoint: None, - }, + ExecInput { target: last_block, checkpoint: None }, ) .await; });