diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 99252837a..ca3c413ee 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -10,7 +10,7 @@ use async_trait::async_trait; use reth_db::mdbx; -use reth_primitives::U64; +use reth_primitives::BlockNumber; use std::fmt::Display; use thiserror::Error; @@ -21,27 +21,27 @@ pub use pipeline::*; #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub struct ExecInput { /// The stage that was run before the current stage and the block number it reached. - pub previous_stage: Option<(StageId, U64)>, + pub previous_stage: Option<(StageId, BlockNumber)>, /// The progress of this stage the last time it was executed. - pub stage_progress: Option, + pub stage_progress: Option, } /// Stage unwind input, see [Stage::unwind]. #[derive(Debug, PartialEq, Eq, Clone, Copy)] pub struct UnwindInput { /// The current highest block of the stage. - pub stage_progress: U64, + pub stage_progress: BlockNumber, /// The block to unwind to. - pub unwind_to: U64, + pub unwind_to: BlockNumber, /// The bad block that caused the unwind, if any. - pub bad_block: Option, + pub bad_block: Option, } /// The output of a stage execution. #[derive(Debug, PartialEq, Eq, Clone)] pub struct ExecOutput { /// How far the stage got. - pub stage_progress: U64, + pub stage_progress: BlockNumber, /// Whether or not the stage is done. pub done: bool, /// Whether or not the stage reached the tip of the chain. @@ -52,7 +52,7 @@ pub struct ExecOutput { #[derive(Debug, PartialEq, Eq, Clone)] pub struct UnwindOutput { /// The block at which the stage has unwound to. - pub stage_progress: U64, + pub stage_progress: BlockNumber, } /// A stage execution error. @@ -64,7 +64,7 @@ pub enum StageError { #[error("Stage encountered a validation error in block {block}.")] Validation { /// The block that failed validation. - block: U64, + block: BlockNumber, }, /// The stage encountered an internal error. #[error(transparent)] @@ -126,7 +126,7 @@ impl StageId { pub fn get_progress<'db, K, E>( &self, tx: &mdbx::Transaction<'db, K, E>, - ) -> Result, mdbx::Error> + ) -> Result, mdbx::Error> where K: mdbx::TransactionKind, E: mdbx::EnvironmentKind, @@ -134,14 +134,14 @@ impl StageId { // TODO: Clean up when we get better database abstractions let bytes: Option> = tx.get(&tx.open_db(Some("SyncStage"))?, self.0.as_ref())?; - Ok(bytes.map(|b| U64::from_big_endian(b.as_ref()))) + Ok(bytes.map(|b| BlockNumber::from_be_bytes(b.try_into().expect("Database corrupt")))) } /// Save the progress of this stage. pub fn save_progress<'db, E>( &self, tx: &mdbx::Transaction<'db, mdbx::RW, E>, - block: U64, + block: BlockNumber, ) -> Result<(), mdbx::Error> where E: mdbx::EnvironmentKind, @@ -150,7 +150,7 @@ impl StageId { tx.put( &tx.open_db(Some("SyncStage"))?, self.0, - block.0[0].to_be_bytes(), + block.to_be_bytes(), mdbx::WriteFlags::UPSERT, ) } diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index 39fef096c..ed11f773f 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -1,6 +1,6 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use reth_db::mdbx; -use reth_primitives::U64; +use reth_primitives::BlockNumber; use std::fmt::{Debug, Formatter}; use tokio::sync::mpsc::Sender; use tracing::*; @@ -41,7 +41,7 @@ where E: mdbx::EnvironmentKind, { stages: Vec>, - max_block: Option, + max_block: Option, events_sender: Option>, } @@ -106,7 +106,7 @@ where /// Set the target block. /// /// Once this block is reached, syncing will stop. - pub fn set_max_block(mut self, block: Option) -> Self { + pub fn set_max_block(mut self, block: Option) -> Self { self.max_block = block; self } @@ -123,8 +123,8 @@ where db: &'db mdbx::Environment, ) -> Result<(), Box> { let mut previous_stage = None; - let mut minimum_progress: Option = None; - let mut maximum_progress: Option = None; + let mut minimum_progress: Option = None; + let mut maximum_progress: Option = None; let mut reached_tip_flag = true; 'run: loop { @@ -265,8 +265,8 @@ where pub async fn unwind( &mut self, db: &'db mdbx::Environment, - to: U64, - bad_block: Option, + to: BlockNumber, + bad_block: Option, ) -> Result<(), Box> { // Sort stages by unwind priority let mut unwind_pipeline = { @@ -353,7 +353,7 @@ pub enum PipelineEvent { /// The stage that is about to be run. stage_id: StageId, /// The previous checkpoint of the stage. - stage_progress: Option, + stage_progress: Option, }, /// Emitted when a stage has run a single time. /// @@ -407,7 +407,7 @@ mod tests { Pipeline::::new_with_channel(tx) .push( TestStage::new(StageId("A")).add_exec(Ok(ExecOutput { - stage_progress: 20.into(), + stage_progress: 20, done: true, reached_tip: true, })), @@ -415,13 +415,13 @@ mod tests { ) .push( TestStage::new(StageId("B")).add_exec(Ok(ExecOutput { - stage_progress: 10.into(), + stage_progress: 10, done: true, reached_tip: true, })), false, ) - .set_max_block(Some(10.into())) + .set_max_block(Some(10)) .run(&db) .await }); @@ -433,20 +433,12 @@ mod tests { PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("A"), - result: Some(ExecOutput { - stage_progress: 20.into(), - done: true, - reached_tip: true, - }), + result: Some(ExecOutput { stage_progress: 20, done: true, reached_tip: true }), }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("B"), - result: Some(ExecOutput { - stage_progress: 10.into(), - done: true, - reached_tip: true, - }), + result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }), }, ] ); @@ -464,34 +456,30 @@ mod tests { .push( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { - stage_progress: 100.into(), + stage_progress: 100, done: true, reached_tip: true, })) - .add_unwind(Ok(UnwindOutput { stage_progress: 1.into() })), + .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), false, ) .push( TestStage::new(StageId("B")) .add_exec(Ok(ExecOutput { - stage_progress: 10.into(), + stage_progress: 10, done: true, reached_tip: true, })) - .add_unwind(Ok(UnwindOutput { stage_progress: 1.into() })), + .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), false, ) - .set_max_block(Some(10.into())); + .set_max_block(Some(10)); // Sync first pipeline.run(&db).await.expect("Could not run pipeline"); // Unwind - pipeline - .set_channel(tx) - .unwind(&db, 1.into(), None) - .await - .expect("Could not unwind pipeline"); + pipeline.set_channel(tx).unwind(&db, 1, None).await.expect("Could not unwind pipeline"); }); // Check that the stages were unwound in reverse order @@ -500,27 +488,19 @@ mod tests { vec![ PipelineEvent::Unwinding { stage_id: StageId("B"), - input: UnwindInput { - stage_progress: 10.into(), - unwind_to: 1.into(), - bad_block: None - } + input: UnwindInput { stage_progress: 10, unwind_to: 1, bad_block: None } }, PipelineEvent::Unwound { stage_id: StageId("B"), - result: Some(UnwindOutput { stage_progress: 1.into() }), + result: Some(UnwindOutput { stage_progress: 1 }), }, PipelineEvent::Unwinding { stage_id: StageId("A"), - input: UnwindInput { - stage_progress: 100.into(), - unwind_to: 1.into(), - bad_block: None - } + input: UnwindInput { stage_progress: 100, unwind_to: 1, bad_block: None } }, PipelineEvent::Unwound { stage_id: StageId("A"), - result: Some(UnwindOutput { stage_progress: 1.into() }), + result: Some(UnwindOutput { stage_progress: 1 }), }, ] ); @@ -549,13 +529,13 @@ mod tests { .push( TestStage::new(StageId("A")) .add_exec(Ok(ExecOutput { - stage_progress: 10.into(), + stage_progress: 10, done: true, reached_tip: true, })) - .add_unwind(Ok(UnwindOutput { stage_progress: 0.into() })) + .add_unwind(Ok(UnwindOutput { stage_progress: 0 })) .add_exec(Ok(ExecOutput { - stage_progress: 10.into(), + stage_progress: 10, done: true, reached_tip: true, })), @@ -563,16 +543,16 @@ mod tests { ) .push( TestStage::new(StageId("B")) - .add_exec(Err(StageError::Validation { block: 5.into() })) - .add_unwind(Ok(UnwindOutput { stage_progress: 0.into() })) + .add_exec(Err(StageError::Validation { block: 5 })) + .add_unwind(Ok(UnwindOutput { stage_progress: 0 })) .add_exec(Ok(ExecOutput { - stage_progress: 10.into(), + stage_progress: 10, done: true, reached_tip: true, })), false, ) - .set_max_block(Some(10.into())) + .set_max_block(Some(10)) .set_channel(tx) .run(&db) .await @@ -586,43 +566,27 @@ mod tests { PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("A"), - result: Some(ExecOutput { - stage_progress: 10.into(), - done: true, - reached_tip: true, - }), + result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }), }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("B"), result: None }, PipelineEvent::Unwinding { stage_id: StageId("A"), - input: UnwindInput { - stage_progress: 10.into(), - unwind_to: 0.into(), - bad_block: Some(5.into()) - } + input: UnwindInput { stage_progress: 10, unwind_to: 0, bad_block: Some(5) } }, PipelineEvent::Unwound { stage_id: StageId("A"), - result: Some(UnwindOutput { stage_progress: 0.into() }), + result: Some(UnwindOutput { stage_progress: 0 }), }, - PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0.into()) }, + PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0) }, PipelineEvent::Ran { stage_id: StageId("A"), - result: Some(ExecOutput { - stage_progress: 10.into(), - done: true, - reached_tip: true, - }), + result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }), }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("B"), - result: Some(ExecOutput { - stage_progress: 10.into(), - done: true, - reached_tip: true, - }), + result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }), }, ] );