diff --git a/bin/reth/src/debug_cmd/merkle.rs b/bin/reth/src/debug_cmd/merkle.rs index 1b90fb1a2..668b9ea08 100644 --- a/bin/reth/src/debug_cmd/merkle.rs +++ b/bin/reth/src/debug_cmd/merkle.rs @@ -115,10 +115,7 @@ impl Command { .execute( &mut tx, ExecInput { - previous_stage: Some(( - StageId::SenderRecovery, - StageCheckpoint::new(block), - )), + previous_stage: Some((StageId::SenderRecovery, block)), checkpoint: block.checked_sub(1).map(StageCheckpoint::new), }, ) @@ -130,7 +127,7 @@ impl Command { .execute( &mut tx, ExecInput { - previous_stage: Some((StageId::Execution, StageCheckpoint::new(block))), + previous_stage: Some((StageId::Execution, block)), checkpoint: progress.map(StageCheckpoint::new), }, ) @@ -144,10 +141,7 @@ impl Command { .execute( &mut tx, ExecInput { - previous_stage: Some(( - StageId::AccountHashing, - StageCheckpoint::new(block), - )), + previous_stage: Some((StageId::AccountHashing, block)), checkpoint: progress.map(StageCheckpoint::new), }, ) @@ -159,10 +153,7 @@ impl Command { .execute( &mut tx, ExecInput { - previous_stage: Some(( - StageId::StorageHashing, - StageCheckpoint::new(block), - )), + previous_stage: Some((StageId::StorageHashing, block)), checkpoint: progress.map(StageCheckpoint::new), }, ) @@ -180,7 +171,7 @@ impl Command { .collect::, _>>()?; let clean_input = ExecInput { - previous_stage: Some((StageId::StorageHashing, StageCheckpoint::new(block))), + previous_stage: Some((StageId::StorageHashing, block)), checkpoint: None, }; loop { diff --git a/bin/reth/src/stage/dump/execution.rs b/bin/reth/src/stage/dump/execution.rs index c525688c5..50593dd6c 100644 --- a/bin/reth/src/stage/dump/execution.rs +++ b/bin/reth/src/stage/dump/execution.rs @@ -139,7 +139,7 @@ async fn dry_run( .execute( &mut tx, reth_stages::ExecInput { - previous_stage: Some((StageId::Other("Another"), StageCheckpoint::new(to))), + previous_stage: Some((StageId::Other("Another"), to)), checkpoint: Some(StageCheckpoint::new(from)), }, ) diff --git a/bin/reth/src/stage/dump/hashing_account.rs b/bin/reth/src/stage/dump/hashing_account.rs index 8dab27c90..f45affb28 100644 --- a/bin/reth/src/stage/dump/hashing_account.rs +++ b/bin/reth/src/stage/dump/hashing_account.rs @@ -83,7 +83,7 @@ async fn dry_run( .execute( &mut tx, reth_stages::ExecInput { - previous_stage: Some((StageId::Other("Another"), StageCheckpoint::new(to))), + previous_stage: Some((StageId::Other("Another"), to)), checkpoint: Some(StageCheckpoint::new(from)), }, ) diff --git a/bin/reth/src/stage/dump/hashing_storage.rs b/bin/reth/src/stage/dump/hashing_storage.rs index 8fe8b0a7d..7abc8d6b5 100644 --- a/bin/reth/src/stage/dump/hashing_storage.rs +++ b/bin/reth/src/stage/dump/hashing_storage.rs @@ -77,7 +77,7 @@ async fn dry_run( .execute( &mut tx, reth_stages::ExecInput { - previous_stage: Some((StageId::Other("Another"), StageCheckpoint::new(to))), + previous_stage: Some((StageId::Other("Another"), to)), checkpoint: Some(StageCheckpoint::new(from)), }, ) diff --git a/bin/reth/src/stage/dump/merkle.rs b/bin/reth/src/stage/dump/merkle.rs index 3d0f36c54..2350e5cfa 100644 --- a/bin/reth/src/stage/dump/merkle.rs +++ b/bin/reth/src/stage/dump/merkle.rs @@ -58,7 +58,7 @@ async fn unwind_and_copy( bad_block: None, }; let execute_input = reth_stages::ExecInput { - previous_stage: Some((StageId::Other("Another"), StageCheckpoint::new(to))), + previous_stage: Some((StageId::Other("Another"), to)), checkpoint: Some(StageCheckpoint::new(from)), }; @@ -132,7 +132,7 @@ async fn dry_run( .execute( &mut tx, reth_stages::ExecInput { - previous_stage: Some((StageId::Other("Another"), StageCheckpoint::new(to))), + previous_stage: Some((StageId::Other("Another"), to)), checkpoint: Some(StageCheckpoint::new(from)), }, ) diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index a0dfee5d4..21a8732c9 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -11,10 +11,7 @@ use clap::Parser; use reth_beacon_consensus::BeaconConsensus; use reth_config::Config; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; -use reth_primitives::{ - stage::{StageCheckpoint, StageId}, - ChainSpec, -}; +use reth_primitives::{stage::StageId, ChainSpec}; use reth_provider::{providers::get_stage_checkpoint, ShareableDatabase, Transaction}; use reth_staged_sync::utils::init::init_db; use reth_stages::{ @@ -236,10 +233,7 @@ impl Command { } let mut input = ExecInput { - previous_stage: Some(( - StageId::Other("No Previous Stage"), - StageCheckpoint::new(self.to), - )), + previous_stage: Some((StageId::Other("No Previous Stage"), self.to)), checkpoint: Some(checkpoint.with_block_number(self.from)), }; diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 038fea736..6969a71f0 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -162,10 +162,7 @@ fn measure_stage( stage, ( ExecInput { - previous_stage: Some(( - StageId::Other("Another"), - StageCheckpoint::new(block_interval.end), - )), + previous_stage: Some((StageId::Other("Another"), block_interval.end)), checkpoint: Some(StageCheckpoint::new(block_interval.start)), }, UnwindInput { diff --git a/crates/stages/benches/setup/account_hashing.rs b/crates/stages/benches/setup/account_hashing.rs index 29a3c5add..662474a7d 100644 --- a/crates/stages/benches/setup/account_hashing.rs +++ b/crates/stages/benches/setup/account_hashing.rs @@ -40,7 +40,7 @@ fn find_stage_range(db: &Path) -> StageRange { stage_range = Some(( ExecInput { - previous_stage: Some((StageId::Other("Another"), to)), + previous_stage: Some((StageId::Other("Another"), to.block_number)), checkpoint: Some(StageCheckpoint::new(from)), }, UnwindInput { unwind_to: from, checkpoint: to, bad_block: None }, @@ -70,7 +70,7 @@ fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) { path, ( ExecInput { - previous_stage: Some((StageId::Other("Another"), StageCheckpoint::new(num_blocks))), + previous_stage: Some((StageId::Other("Another"), num_blocks)), ..Default::default() }, UnwindInput::default(), diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 971eddf86..27502f4c6 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -1,11 +1,7 @@ use crate::{error::*, ExecInput, ExecOutput, Stage, StageError, UnwindInput}; use futures_util::Future; use reth_db::database::Database; -use reth_primitives::{ - listener::EventListeners, - stage::{StageCheckpoint, StageId}, - BlockNumber, H256, -}; +use reth_primitives::{listener::EventListeners, stage::StageId, BlockNumber, H256}; use reth_provider::{providers::get_stage_checkpoint, Transaction}; use std::pin::Pin; use tokio::sync::watch; @@ -228,7 +224,7 @@ where previous_stage = Some(( stage_id, - get_stage_checkpoint(&self.db.tx()?, stage_id)?.unwrap_or_default(), + get_stage_checkpoint(&self.db.tx()?, stage_id)?.unwrap_or_default().block_number, )); } @@ -295,7 +291,7 @@ where async fn execute_stage_to_completion( &mut self, - previous_stage: Option<(StageId, StageCheckpoint)>, + previous_stage: Option<(StageId, BlockNumber)>, stage_index: usize, ) -> Result { let total_stages = self.stages.len(); @@ -353,7 +349,7 @@ where self.metrics.stage_checkpoint( stage_id, checkpoint, - previous_stage.map(|(_, checkpoint)| checkpoint.block_number), + self.max_block.or(previous_stage.map(|(_, block_number)| block_number)), ); tx.save_stage_checkpoint(stage_id, checkpoint)?; @@ -437,6 +433,7 @@ mod tests { use reth_interfaces::{ consensus, provider::ProviderError, test_utils::generators::random_header, }; + use reth_primitives::stage::StageCheckpoint; use tokio_stream::StreamExt; #[test] diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index bd3b22966..64b462d57 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -14,8 +14,8 @@ use std::{ /// Stage execution input, see [Stage::execute]. #[derive(Debug, Default, PartialEq, Eq, Clone, Copy)] pub struct ExecInput { - /// The stage that was run before the current stage and the checkpoint it reached. - pub previous_stage: Option<(StageId, StageCheckpoint)>, + /// The stage that was run before the current stage and the block number it reached. + pub previous_stage: Option<(StageId, BlockNumber)>, /// The checkpoint of this stage the last time it was executed. pub checkpoint: Option, } @@ -27,8 +27,8 @@ impl ExecInput { } /// Return the progress of the previous stage or default. - pub fn previous_stage_checkpoint(&self) -> StageCheckpoint { - self.previous_stage.map(|(_, checkpoint)| checkpoint).unwrap_or_default() + pub fn previous_stage_checkpoint_block_number(&self) -> BlockNumber { + self.previous_stage.map(|(_, block_number)| block_number).unwrap_or_default() } /// Return next block range that needs to be executed. @@ -51,7 +51,7 @@ impl ExecInput { let current_block = self.checkpoint(); // +1 is to skip present block and always start from block number 1, not 0. let start = current_block.block_number + 1; - let target = self.previous_stage_checkpoint().block_number; + let target = self.previous_stage_checkpoint_block_number(); let end = min(target, current_block.block_number.saturating_add(threshold)); @@ -105,9 +105,9 @@ pub struct ExecOutput { } impl ExecOutput { - /// Mark the stage as done, checkpointing at the given place. - pub fn done(checkpoint: StageCheckpoint) -> Self { - Self { checkpoint, done: true } + /// Mark the stage as done, checkpointing at the given block number. + pub fn done(block_number: BlockNumber) -> Self { + Self { checkpoint: StageCheckpoint::new(block_number), done: true } } } diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index ab547df6e..ed22ca6a2 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -73,7 +73,7 @@ impl Stage for BodyStage { if range.is_empty() { let (from, to) = range.into_inner(); info!(target: "sync::stages::bodies", from, "Target block already downloaded, skipping."); - return Ok(ExecOutput::done(StageCheckpoint::new(to))) + return Ok(ExecOutput::done(to)) } // Update the header range on the downloader @@ -231,7 +231,7 @@ mod tests { // Set up test runner let mut runner = BodyTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; runner.seed_execution(input).expect("failed to seed execution"); @@ -261,7 +261,7 @@ mod tests { // Set up test runner let mut runner = BodyTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; runner.seed_execution(input).expect("failed to seed execution"); @@ -293,7 +293,7 @@ mod tests { // Set up test runner let mut runner = BodyTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; runner.seed_execution(input).expect("failed to seed execution"); @@ -313,7 +313,7 @@ mod tests { // Execute again on top of the previous run let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(first_run_checkpoint), }; let rx = runner.execute(input); @@ -339,7 +339,7 @@ mod tests { // Set up test runner let mut runner = BodyTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; runner.seed_execution(input).expect("failed to seed execution"); @@ -497,7 +497,7 @@ mod tests { fn seed_execution(&mut self, input: ExecInput) -> Result { let start = input.checkpoint().block_number; - let end = input.previous_stage_checkpoint().block_number; + let end = input.previous_stage_checkpoint_block_number(); let blocks = random_block_range(start..=end, GENESIS_HASH, 0..2); self.tx.insert_headers_with_td(blocks.iter().map(|block| &block.header))?; if let Some(progress) = blocks.first() { diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 74a98ef3a..6021c1a40 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -139,7 +139,7 @@ impl ExecutionStage { input: ExecInput, ) -> Result { let start_block = input.checkpoint().block_number + 1; - let max_block = input.previous_stage_checkpoint().block_number; + let max_block = input.previous_stage_checkpoint_block_number(); // Build executor let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx)); @@ -632,7 +632,7 @@ mod tests { let state_db = create_test_db::(EnvKind::RW); let mut tx = Transaction::new(state_db.as_ref()).unwrap(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(1))), + previous_stage: Some((PREV_STAGE_ID, 1)), /// The progress of this stage the last time it was executed. checkpoint: None, }; @@ -736,7 +736,7 @@ mod tests { let state_db = create_test_db::(EnvKind::RW); let mut tx = Transaction::new(state_db.as_ref()).unwrap(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(1))), + previous_stage: Some((PREV_STAGE_ID, 1)), /// The progress of this stage the last time it was executed. checkpoint: None, }; @@ -822,7 +822,7 @@ mod tests { let test_tx = TestTransaction::default(); let mut tx = test_tx.inner(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(1))), + previous_stage: Some((PREV_STAGE_ID, 1)), /// The progress of this stage the last time it was executed. checkpoint: None, }; diff --git a/crates/stages/src/stages/finish.rs b/crates/stages/src/stages/finish.rs index ec4d4e53f..2ec60d607 100644 --- a/crates/stages/src/stages/finish.rs +++ b/crates/stages/src/stages/finish.rs @@ -21,7 +21,10 @@ impl Stage for FinishStage { _tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - Ok(ExecOutput { checkpoint: input.previous_stage_checkpoint(), done: true }) + Ok(ExecOutput { + checkpoint: StageCheckpoint::new(input.previous_stage_checkpoint_block_number()), + done: true, + }) } async fn unwind( @@ -71,7 +74,7 @@ mod tests { self.tx.insert_headers_with_td(std::iter::once(&head))?; // use previous progress as seed size - let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default().block_number + 1; + let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1; if start + 1 >= end { return Ok(Vec::default()) @@ -91,8 +94,8 @@ mod tests { if let Some(output) = output { assert!(output.done, "stage should always be done"); assert_eq!( - output.checkpoint, - input.previous_stage_checkpoint(), + output.checkpoint.block_number, + input.previous_stage_checkpoint_block_number(), "stage progress should always match progress of previous stage" ); } diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index dbbc1b423..82d1f6823 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -137,7 +137,7 @@ impl Stage for AccountHashingStage { ) -> Result { let range = input.next_block_range(); if range.is_empty() { - return Ok(ExecOutput::done(StageCheckpoint::new(*range.end()))) + return Ok(ExecOutput::done(*range.end())) } let (from_block, to_block) = range.into_inner(); @@ -252,12 +252,11 @@ impl Stage for AccountHashingStage { // We finished the hashing stage, no future iterations is expected for the same block range, // so no checkpoint is needed. - let checkpoint = input.previous_stage_checkpoint().with_account_hashing_stage_checkpoint( - AccountHashingCheckpoint { + let checkpoint = StageCheckpoint::new(input.previous_stage_checkpoint_block_number()) + .with_account_hashing_stage_checkpoint(AccountHashingCheckpoint { progress: stage_checkpoint_progress(tx)?, ..Default::default() - }, - ); + }); info!(target: "sync::stages::hashing_account", checkpoint = %checkpoint, is_final_range = true, "Stage iteration finished"); Ok(ExecOutput { checkpoint, done: true }) @@ -318,7 +317,7 @@ mod tests { runner.set_clean_threshold(1); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -359,7 +358,7 @@ mod tests { runner.set_commit_threshold(5); let mut input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -539,7 +538,7 @@ mod tests { Ok(AccountHashingStage::seed( &mut self.tx.inner(), SeedOpts { - blocks: 1..=input.previous_stage_checkpoint().block_number, + blocks: 1..=input.previous_stage_checkpoint_block_number(), accounts: 0..10, txs: 0..3, }, diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index ea10d27e2..c25cd9438 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -59,7 +59,7 @@ impl Stage for StorageHashingStage { ) -> Result { let range = input.next_block_range(); if range.is_empty() { - return Ok(ExecOutput::done(StageCheckpoint::new(*range.end()))) + return Ok(ExecOutput::done(*range.end())) } let (from_block, to_block) = range.into_inner(); @@ -181,12 +181,11 @@ impl Stage for StorageHashingStage { // We finished the hashing stage, no future iterations is expected for the same block range, // so no checkpoint is needed. - let checkpoint = input.previous_stage_checkpoint().with_storage_hashing_stage_checkpoint( - StorageHashingCheckpoint { + let checkpoint = StageCheckpoint::new(input.previous_stage_checkpoint_block_number()) + .with_storage_hashing_stage_checkpoint(StorageHashingCheckpoint { progress: stage_checkpoint_progress(tx)?, ..Default::default() - }, - ); + }); info!(target: "sync::stages::hashing_storage", checkpoint = %checkpoint, is_final_range = true, "Stage iteration finished"); Ok(ExecOutput { checkpoint, done: true }) @@ -263,7 +262,7 @@ mod tests { runner.set_commit_threshold(1); let mut input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -323,7 +322,7 @@ mod tests { runner.set_commit_threshold(500); let mut input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -488,7 +487,7 @@ mod tests { fn seed_execution(&mut self, input: ExecInput) -> Result { let stage_progress = input.checkpoint().block_number + 1; - let end = input.previous_stage_checkpoint().block_number; + let end = input.previous_stage_checkpoint_block_number(); let n_accounts = 31; let mut accounts = random_contract_account_range(&mut (0..n_accounts)); diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 93df9e8db..40cd245f0 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -446,8 +446,7 @@ mod tests { self.tx.commit(|tx| tx.put::(head.number, U256::ZERO.into()))?; // use previous checkpoint as seed size - let end = - input.previous_stage.map(|(_, num)| num).unwrap_or_default().block_number + 1; + let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1; if start + 1 >= end { return Ok(Vec::default()) @@ -555,7 +554,7 @@ mod tests { let mut runner = HeadersTestRunner::with_linear_downloader(); let (checkpoint, previous_stage) = (1000, 1200); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(checkpoint)), }; let headers = runner.seed_execution(input).expect("failed to seed execution"); @@ -649,7 +648,7 @@ mod tests { // pick range that's larger than the configured headers batch size let (checkpoint, previous_stage) = (600, 1200); let mut input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(checkpoint)), }; let headers = runner.seed_execution(input).expect("failed to seed execution"); diff --git a/crates/stages/src/stages/index_account_history.rs b/crates/stages/src/stages/index_account_history.rs index 87520917a..b85695774 100644 --- a/crates/stages/src/stages/index_account_history.rs +++ b/crates/stages/src/stages/index_account_history.rs @@ -45,7 +45,7 @@ impl Stage for IndexAccountHistoryStage { let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); if range.is_empty() { - return Ok(ExecOutput::done(StageCheckpoint::new(*range.end()))) + return Ok(ExecOutput::done(*range.end())) } let mut stage_checkpoint = stage_checkpoint(tx, input.checkpoint(), &range)?; @@ -206,10 +206,8 @@ mod tests { } async fn run(tx: &TestTransaction, run_to: u64) { - let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(run_to))), - ..Default::default() - }; + let input = + ExecInput { previous_stage: Some((PREV_STAGE_ID, run_to)), ..Default::default() }; let mut stage = IndexAccountHistoryStage::default(); let mut tx = tx.inner(); let out = stage.execute(&mut tx, input).await.unwrap(); diff --git a/crates/stages/src/stages/index_storage_history.rs b/crates/stages/src/stages/index_storage_history.rs index f1ed854ef..19d5427ae 100644 --- a/crates/stages/src/stages/index_storage_history.rs +++ b/crates/stages/src/stages/index_storage_history.rs @@ -45,7 +45,7 @@ impl Stage for IndexStorageHistoryStage { tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let target = input.previous_stage_checkpoint(); + let target = input.previous_stage_checkpoint_block_number(); let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); if range.is_empty() { @@ -219,10 +219,8 @@ mod tests { } async fn run(tx: &TestTransaction, run_to: u64) { - let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(run_to))), - ..Default::default() - }; + let input = + ExecInput { previous_stage: Some((PREV_STAGE_ID, run_to)), ..Default::default() }; let mut stage = IndexStorageHistoryStage::default(); let mut tx = tx.inner(); let out = stage.execute(&mut tx, input).await.unwrap(); @@ -312,10 +310,7 @@ mod tests { async fn insert_index_to_full_shard() { // init let tx = TestTransaction::default(); - let _input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(5))), - ..Default::default() - }; + let _input = ExecInput { previous_stage: Some((PREV_STAGE_ID, 5)), ..Default::default() }; // change does not matter only that account is present in changeset. let full_list = vec![3; NUM_OF_INDICES_IN_SHARD]; diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index ce58407e4..5228a64c6 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -149,7 +149,7 @@ impl Stage for MerkleStage { let threshold = match self { MerkleStage::Unwind => { info!(target: "sync::stages::merkle::unwind", "Stage is always skipped"); - return Ok(ExecOutput { checkpoint: input.previous_stage_checkpoint(), done: true }) + return Ok(ExecOutput::done(input.previous_stage_checkpoint_block_number())) } MerkleStage::Execution { clean_threshold } => *clean_threshold, #[cfg(any(test, feature = "test-utils"))] @@ -158,7 +158,7 @@ impl Stage for MerkleStage { let range = input.next_block_range(); let (from_block, to_block) = range.clone().into_inner(); - let current_block = input.previous_stage_checkpoint().block_number; + let current_block = input.previous_stage_checkpoint_block_number(); let block = tx.get_header(current_block)?; let block_root = block.state_root; @@ -360,7 +360,7 @@ mod tests { let mut runner = MerkleTestRunner::default(); // set low threshold so we hash the whole storage let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -400,7 +400,7 @@ mod tests { // Set up the runner let mut runner = MerkleTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -462,7 +462,7 @@ mod tests { fn seed_execution(&mut self, input: ExecInput) -> Result { let stage_progress = input.checkpoint().block_number; let start = stage_progress + 1; - let end = input.previous_stage_checkpoint().block_number; + let end = input.previous_stage_checkpoint_block_number(); let num_of_accounts = 31; let accounts = random_contract_account_range(&mut (0..num_of_accounts)) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 601a3c9de..d08627c07 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -60,7 +60,7 @@ impl Stage for SenderRecoveryStage { ) -> Result { let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); if range.is_empty() { - return Ok(ExecOutput::done(StageCheckpoint::new(*range.end()))) + return Ok(ExecOutput::done(*range.end())) } let (start_block, end_block) = range.clone().into_inner(); @@ -224,13 +224,13 @@ mod tests { // Set up the runner let runner = SenderRecoveryTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; // Insert blocks with a single transaction at block `stage_progress + 10` let non_empty_block_number = stage_progress + 10; - let blocks = (stage_progress..=input.previous_stage_checkpoint().block_number) + let blocks = (stage_progress..=input.previous_stage_checkpoint_block_number()) .map(|number| { random_block(number, None, Some((number == non_empty_block_number) as u8), None) }) @@ -264,7 +264,7 @@ mod tests { runner.set_threshold(threshold); let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold let first_input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -291,7 +291,7 @@ mod tests { // Execute second time let second_input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(expected_progress)), }; let result = runner.execute(second_input).await.unwrap(); @@ -366,7 +366,7 @@ mod tests { fn seed_execution(&mut self, input: ExecInput) -> Result { let stage_progress = input.checkpoint().block_number; - let end = input.previous_stage_checkpoint().block_number; + let end = input.previous_stage_checkpoint_block_number(); let blocks = random_block_range(stage_progress..=end, H256::zero(), 0..2); self.tx.insert_blocks(blocks.iter(), None)?; diff --git a/crates/stages/src/stages/total_difficulty.rs b/crates/stages/src/stages/total_difficulty.rs index cb5a1b5da..ea1daf50b 100644 --- a/crates/stages/src/stages/total_difficulty.rs +++ b/crates/stages/src/stages/total_difficulty.rs @@ -127,7 +127,7 @@ mod tests { runner.set_threshold(threshold); let first_input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -145,7 +145,7 @@ mod tests { // Execute second time let second_input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(expected_progress)), }; let result = runner.execute(second_input).await.unwrap(); @@ -208,7 +208,7 @@ mod tests { })?; // use previous progress as seed size - let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default().block_number + 1; + let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default() + 1; if start + 1 >= end { return Ok(Vec::default()) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 671b8b84d..b575de9ac 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -58,7 +58,7 @@ impl Stage for TransactionLookupStage { ) -> Result { let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold); if range.is_empty() { - return Ok(ExecOutput::done(StageCheckpoint::new(*range.end()))) + return Ok(ExecOutput::done(*range.end())) } let (start_block, end_block) = range.into_inner(); @@ -234,13 +234,13 @@ mod tests { // Set up the runner let runner = TransactionLookupTestRunner::default(); let input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; // Insert blocks with a single transaction at block `stage_progress + 10` let non_empty_block_number = stage_progress + 10; - let blocks = (stage_progress..=input.previous_stage_checkpoint().block_number) + let blocks = (stage_progress..=input.previous_stage_checkpoint_block_number()) .map(|number| { random_block(number, None, Some((number == non_empty_block_number) as u8), None) }) @@ -275,7 +275,7 @@ mod tests { runner.set_threshold(threshold); let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold let first_input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(stage_progress)), }; @@ -300,7 +300,7 @@ mod tests { // Execute second time let second_input = ExecInput { - previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), + previous_stage: Some((PREV_STAGE_ID, previous_stage)), checkpoint: Some(StageCheckpoint::new(expected_progress)), }; let result = runner.execute(second_input).await.unwrap(); @@ -375,7 +375,7 @@ mod tests { fn seed_execution(&mut self, input: ExecInput) -> Result { let stage_progress = input.checkpoint().block_number; - let end = input.previous_stage_checkpoint().block_number; + let end = input.previous_stage_checkpoint_block_number(); let blocks = random_block_range(stage_progress + 1..=end, H256::zero(), 0..2); self.tx.insert_blocks(blocks.iter(), None)?; diff --git a/crates/stages/src/test_utils/macros.rs b/crates/stages/src/test_utils/macros.rs index 07a0cb71f..af7fdab41 100644 --- a/crates/stages/src/test_utils/macros.rs +++ b/crates/stages/src/test_utils/macros.rs @@ -29,7 +29,7 @@ macro_rules! stage_test_suite { // Set up the runner let mut runner = $runner::default(); let input = crate::stage::ExecInput { - previous_stage: Some((crate::test_utils::PREV_STAGE_ID, reth_primitives::stage::StageCheckpoint::new(previous_stage))), + previous_stage: Some((crate::test_utils::PREV_STAGE_ID, previous_stage)), checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(stage_progress)), }; let seed = runner.seed_execution(input).expect("failed to seed"); @@ -81,7 +81,7 @@ macro_rules! stage_test_suite { // Set up the runner let mut runner = $runner::default(); let execute_input = crate::stage::ExecInput { - previous_stage: Some((crate::test_utils::PREV_STAGE_ID, reth_primitives::stage::StageCheckpoint::new(previous_stage))), + previous_stage: Some((crate::test_utils::PREV_STAGE_ID, previous_stage)), checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(stage_progress)), }; let seed = runner.seed_execution(execute_input).expect("failed to seed"); @@ -138,7 +138,7 @@ macro_rules! stage_test_suite_ext { // Set up the runner let mut runner = $runner::default(); let input = crate::stage::ExecInput { - previous_stage: Some((crate::test_utils::PREV_STAGE_ID, reth_primitives::stage::StageCheckpoint::new(stage_progress))), + previous_stage: Some((crate::test_utils::PREV_STAGE_ID, stage_progress)), checkpoint: Some(reth_primitives::stage::StageCheckpoint::new(stage_progress)), }; let seed = runner.seed_execution(input).expect("failed to seed"); diff --git a/testing/ef-tests/src/cases/blockchain_test.rs b/testing/ef-tests/src/cases/blockchain_test.rs index 05ae41829..72ee38ae6 100644 --- a/testing/ef-tests/src/cases/blockchain_test.rs +++ b/testing/ef-tests/src/cases/blockchain_test.rs @@ -5,10 +5,7 @@ use crate::{ Case, Error, Suite, }; use reth_db::mdbx::test_utils::create_test_rw_db; -use reth_primitives::{ - stage::{StageCheckpoint, StageId}, - BlockBody, SealedBlock, -}; +use reth_primitives::{stage::StageId, BlockBody, SealedBlock}; use reth_provider::Transaction; use reth_stages::{stages::ExecutionStage, ExecInput, Stage}; use std::{collections::BTreeMap, ffi::OsStr, fs, ops::Deref, path::Path, sync::Arc}; @@ -108,9 +105,8 @@ impl Case for BlockchainTestCase { .execute( &mut transaction, ExecInput { - previous_stage: last_block.map(|b| { - (StageId::Other("Dummy"), StageCheckpoint::new(b)) - }), + previous_stage: last_block + .map(|b| (StageId::Other("Dummy"), b)), checkpoint: None, }, )