diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index 5357b0642..e64a7be57 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -1,7 +1,7 @@ use crate::{ error::*, util::{db::TxContainer, opt::MaybeSender}, - ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, + ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, }; use reth_db::{kv::Env, mdbx}; use reth_primitives::BlockNumber; @@ -154,15 +154,14 @@ where /// Run the pipeline in an infinite loop. Will terminate early if the user has specified /// a `max_block` in the pipeline. pub async fn run(&mut self, db: &'db Env) -> Result<(), PipelineError> { - let mut state = PipelineState { - events_sender: self.events_sender.clone(), - max_block: self.max_block, - maximum_progress: None, - minimum_progress: None, - reached_tip: true, - }; - loop { + let mut state = PipelineState { + events_sender: self.events_sender.clone(), + max_block: self.max_block, + maximum_progress: None, + minimum_progress: None, + reached_tip: true, + }; let mut tx = TxContainer::new(db)?; let next_action = self.run_loop(&mut state, &mut tx).await?; @@ -246,12 +245,7 @@ where let mut stage_progress = stage_id.get_progress(&tx)?.unwrap_or_default(); if stage_progress < to { debug!(from = %stage_progress, %to, "Unwind point too far for stage"); - self.events_sender - .send(PipelineEvent::Unwound { - stage_id, - result: Some(UnwindOutput { stage_progress }), - }) - .await?; + self.events_sender.send(PipelineEvent::Skipped { stage_id }).await?; return Ok(()) } @@ -267,13 +261,11 @@ where stage_id.save_progress(&tx, stage_progress)?; self.events_sender - .send(PipelineEvent::Unwound { stage_id, result: Some(unwind_output) }) + .send(PipelineEvent::Unwound { stage_id, result: unwind_output }) .await?; } Err(err) => { - self.events_sender - .send(PipelineEvent::Unwound { stage_id, result: None }) - .await?; + self.events_sender.send(PipelineEvent::Error { stage_id }).await?; return Err(PipelineError::Stage(StageError::Internal(err))) } } @@ -316,6 +308,7 @@ where let stage_id = self.stage.id(); if self.require_tip && !state.reached_tip() { info!("Tip not reached, skipping."); + state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?; // Stage requires us to reach the tip of the chain first, but we have // not. @@ -324,22 +317,24 @@ where loop { let prev_progress = stage_id.get_progress(tx.get())?; - state - .events_sender - .send(PipelineEvent::Running { stage_id, stage_progress: prev_progress }) - .await?; let stage_reached_max_block = prev_progress .zip(state.max_block) .map_or(false, |(prev_progress, target)| prev_progress >= target); if stage_reached_max_block { info!("Stage reached maximum block, skipping."); + state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?; // We reached the maximum block, so we skip the stage state.set_reached_tip(true); return Ok(ControlFlow::Continue) } + state + .events_sender + .send(PipelineEvent::Running { stage_id, stage_progress: prev_progress }) + .await?; + match self .stage .execute(tx.get_mut(), ExecInput { previous_stage, stage_progress: prev_progress }) @@ -351,7 +346,7 @@ where state .events_sender - .send(PipelineEvent::Ran { stage_id, result: Some(out.clone()) }) + .send(PipelineEvent::Ran { stage_id, result: out.clone() }) .await?; // TODO: Make the commit interval configurable @@ -365,7 +360,7 @@ where } } Err(err) => { - state.events_sender.send(PipelineEvent::Ran { stage_id, result: None }).await?; + state.events_sender.send(PipelineEvent::Error { stage_id }).await?; return if let StageError::Validation { block } = err { debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error."); @@ -435,12 +430,12 @@ mod tests { PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("A"), - result: Some(ExecOutput { stage_progress: 20, done: true, reached_tip: true }), + result: ExecOutput { stage_progress: 20, done: true, reached_tip: true }, }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("B"), - result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }), + result: ExecOutput { stage_progress: 10, done: true, reached_tip: true }, }, ] ); @@ -494,7 +489,7 @@ mod tests { }, PipelineEvent::Unwound { stage_id: StageId("B"), - result: Some(UnwindOutput { stage_progress: 1 }), + result: UnwindOutput { stage_progress: 1 }, }, PipelineEvent::Unwinding { stage_id: StageId("A"), @@ -502,7 +497,7 @@ mod tests { }, PipelineEvent::Unwound { stage_id: StageId("A"), - result: Some(UnwindOutput { stage_progress: 1 }), + result: UnwindOutput { stage_progress: 1 }, }, ] ); @@ -568,27 +563,27 @@ mod tests { PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("A"), - result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }), + result: ExecOutput { stage_progress: 10, done: true, reached_tip: true }, }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, - PipelineEvent::Ran { stage_id: StageId("B"), result: None }, + PipelineEvent::Error { stage_id: StageId("B") }, PipelineEvent::Unwinding { stage_id: StageId("A"), input: UnwindInput { stage_progress: 10, unwind_to: 0, bad_block: Some(5) } }, PipelineEvent::Unwound { stage_id: StageId("A"), - result: Some(UnwindOutput { stage_progress: 0 }), + result: UnwindOutput { stage_progress: 0 }, }, PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0) }, PipelineEvent::Ran { stage_id: StageId("A"), - result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }), + result: ExecOutput { stage_progress: 10, done: true, reached_tip: true }, }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("B"), - result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }), + result: ExecOutput { stage_progress: 10, done: true, reached_tip: true }, }, ] ); @@ -664,7 +659,7 @@ mod tests { }, PipelineEvent::Unwound { stage_id: StageId("B"), - result: Some(UnwindOutput { stage_progress: 1 }), + result: UnwindOutput { stage_progress: 1 }, }, PipelineEvent::Unwinding { stage_id: StageId("C"), @@ -672,7 +667,7 @@ mod tests { }, PipelineEvent::Unwound { stage_id: StageId("C"), - result: Some(UnwindOutput { stage_progress: 1 }), + result: UnwindOutput { stage_progress: 1 }, }, PipelineEvent::Unwinding { stage_id: StageId("A"), @@ -680,7 +675,67 @@ mod tests { }, PipelineEvent::Unwound { stage_id: StageId("A"), - result: Some(UnwindOutput { stage_progress: 1 }), + result: UnwindOutput { stage_progress: 1 }, + }, + ] + ); + } + + /// Runs a simple pipeline. + #[tokio::test] + async fn skips_stages_that_require_tip() { + let (tx, rx) = channel(2); + let db = test_utils::create_test_db(EnvKind::RW); + + // Run pipeline + tokio::spawn(async move { + Pipeline::::new_with_channel(tx) + .push( + TestStage::new(StageId("A")) + .add_exec(Ok(ExecOutput { + stage_progress: 5, + done: true, + reached_tip: false, + })) + .add_exec(Ok(ExecOutput { + stage_progress: 10, + done: true, + reached_tip: true, + })), + false, + ) + .push( + TestStage::new(StageId("B")).add_exec(Ok(ExecOutput { + stage_progress: 10, + done: true, + reached_tip: true, + })), + true, + ) + .set_max_block(Some(10)) + .run(&db) + .await + }); + + // Check that the stages were run in order + assert_eq!( + ReceiverStream::new(rx).collect::>().await, + vec![ + PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, + PipelineEvent::Ran { + stage_id: StageId("A"), + result: ExecOutput { stage_progress: 5, reached_tip: false, done: true } + }, + PipelineEvent::Skipped { stage_id: StageId("B") }, + PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(5) }, + PipelineEvent::Ran { + stage_id: StageId("A"), + result: ExecOutput { stage_progress: 10, reached_tip: true, done: true } + }, + PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, + PipelineEvent::Ran { + stage_id: StageId("B"), + result: ExecOutput { stage_progress: 10, reached_tip: true, done: true } }, ] ); diff --git a/crates/stages/src/pipeline/ctrl.rs b/crates/stages/src/pipeline/ctrl.rs index 819d5d0a6..461ca6146 100644 --- a/crates/stages/src/pipeline/ctrl.rs +++ b/crates/stages/src/pipeline/ctrl.rs @@ -1,6 +1,7 @@ use reth_primitives::BlockNumber; /// Determines the control flow during pipeline execution. +#[derive(Debug)] pub(crate) enum ControlFlow { /// An unwind was requested and must be performed before continuing. Unwind { diff --git a/crates/stages/src/pipeline/event.rs b/crates/stages/src/pipeline/event.rs index cd07bb2ce..89b24490e 100644 --- a/crates/stages/src/pipeline/event.rs +++ b/crates/stages/src/pipeline/event.rs @@ -5,6 +5,12 @@ use crate::{ use reth_primitives::BlockNumber; /// An event emitted by a [Pipeline][crate::Pipeline]. +/// +/// It is possible for multiple of these events to be emitted over the duration of a pipeline's +/// execution since: +/// +/// - Other stages may ask the pipeline to unwind +/// - The pipeline will loop indefinitely unless a target block is set #[derive(Debug, PartialEq, Eq, Clone)] pub enum PipelineEvent { /// Emitted when a stage is about to be run. @@ -15,16 +21,11 @@ pub enum PipelineEvent { stage_progress: Option, }, /// Emitted when a stage has run a single time. - /// - /// It is possible for multiple of these events to be emitted over the duration of a pipeline's - /// execution: - /// - If the pipeline loops, the stage will be run again at some point - /// - If the stage exits early but has acknowledged that it is not entirely done Ran { /// The stage that was run. stage_id: StageId, - /// The result of executing the stage. If it is None then an error was encountered. - result: Option, + /// The result of executing the stage. + result: ExecOutput, }, /// Emitted when a stage is about to be unwound. Unwinding { @@ -34,13 +35,24 @@ pub enum PipelineEvent { input: UnwindInput, }, /// Emitted when a stage has been unwound. - /// - /// It is possible for multiple of these events to be emitted over the duration of a pipeline's - /// execution, since other stages may ask the pipeline to unwind. Unwound { /// The stage that was unwound. stage_id: StageId, - /// The result of unwinding the stage. If it is None then an error was encountered. - result: Option, + /// The result of unwinding the stage. + result: UnwindOutput, + }, + /// Emitted when a stage encounters an error either during execution or unwinding. + Error { + /// The stage that encountered an error. + stage_id: StageId, + }, + /// Emitted when a stage was skipped due to it's run conditions not being met: + /// + /// - The stage might have progressed beyond the point of our target block + /// - The stage might not need to be unwound since it has not progressed past the unwind target + /// - The stage requires that the pipeline has reached the tip, but it has not done so yet + Skipped { + /// The stage that was skipped. + stage_id: StageId, }, }