From f65562e2e4437d92d600c6f3baa51f80dc3eaac7 Mon Sep 17 00:00:00 2001 From: Bjerg Date: Mon, 19 Dec 2022 22:04:42 +0100 Subject: [PATCH] refactor(sync): remove `require_tip` (#528) `require_tip` could only be determined by the headers stage, and it signalled that we have all of the headers to sync all the way to the chain tip. Some stages may wait to execute until the tip is reached, e.g. the stage that checks the stage root, but there are a few problems: - On initial sync, `reached_tip` would be `true`, but by the time we reach the hashing stage, this would actually no longer be the case: the other stages have spent enough time for us to be "out of sync". This means that the optimization here is lost, and the additional logic is added for nothing. - When we are not doing our initial sync, `reached_tip` would always be `true` for each subsequent block we sync. The same logic applies as above, i.e. the extra logic is there for nothing. In other words, `reached_tip` would *always* be `true` once we leave the header stage, making the extra logic entirely redundant. --- bin/reth/src/node/mod.rs | 63 ++++----- crates/stages/src/pipeline.rs | 187 ++++--------------------- crates/stages/src/pipeline/state.rs | 43 ------ crates/stages/src/stage.rs | 2 - crates/stages/src/stages/bodies.rs | 22 ++- crates/stages/src/stages/execution.rs | 6 +- crates/stages/src/stages/headers.rs | 4 +- crates/stages/src/stages/senders.rs | 14 +- crates/stages/src/test_utils/macros.rs | 12 +- 9 files changed, 80 insertions(+), 273 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 280a6e39b..50a94252b 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -114,42 +114,33 @@ impl Command { // TODO: Remove magic numbers let fetch_client = Arc::new(network.fetch_client().await?); let mut pipeline = reth_stages::Pipeline::new() - .push( - HeaderStage { - downloader: headers::linear::LinearDownloadBuilder::default() - .batch_size(config.stages.headers.downloader_batch_size) - .retries(config.stages.headers.downloader_retries) - .build(consensus.clone(), fetch_client.clone()), - consensus: consensus.clone(), - client: fetch_client.clone(), - network_handle: network.clone(), - commit_threshold: config.stages.headers.commit_threshold, - }, - false, - ) - .push( - BodyStage { - downloader: Arc::new( - bodies::concurrent::ConcurrentDownloader::new( - fetch_client.clone(), - consensus.clone(), - ) - .with_batch_size(config.stages.bodies.downloader_batch_size) - .with_retries(config.stages.bodies.downloader_retries) - .with_concurrency(config.stages.bodies.downloader_concurrency), - ), - consensus: consensus.clone(), - commit_threshold: config.stages.bodies.commit_threshold, - }, - false, - ) - .push( - SendersStage { - batch_size: config.stages.senders.batch_size, - commit_threshold: config.stages.senders.commit_threshold, - }, - false, - ); + .push(HeaderStage { + downloader: headers::linear::LinearDownloadBuilder::default() + .batch_size(config.stages.headers.downloader_batch_size) + .retries(config.stages.headers.downloader_retries) + .build(consensus.clone(), fetch_client.clone()), + consensus: consensus.clone(), + client: fetch_client.clone(), + network_handle: network.clone(), + commit_threshold: config.stages.headers.commit_threshold, + }) + .push(BodyStage { + downloader: Arc::new( + bodies::concurrent::ConcurrentDownloader::new( + fetch_client.clone(), + consensus.clone(), + ) + .with_batch_size(config.stages.bodies.downloader_batch_size) + .with_retries(config.stages.bodies.downloader_retries) + .with_concurrency(config.stages.bodies.downloader_concurrency), + ), + consensus: consensus.clone(), + commit_threshold: config.stages.bodies.commit_threshold, + }) + .push(SendersStage { + batch_size: config.stages.senders.batch_size, + commit_threshold: config.stages.senders.commit_threshold, + }); if let Some(tip) = self.tip { debug!("Tip manually set: {}", tip); diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index 0d84293e4..0af006607 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -108,24 +108,19 @@ impl Pipeline { /// # Unwinding /// /// The unwind priority is set to 0. - pub fn push(self, stage: S, require_tip: bool) -> Self + pub fn push(self, stage: S) -> Self where S: Stage + 'static, { - self.push_with_unwind_priority(stage, require_tip, 0) + self.push_with_unwind_priority(stage, 0) } /// Add a stage to the pipeline, specifying the unwind priority. - pub fn push_with_unwind_priority( - mut self, - stage: S, - require_tip: bool, - unwind_priority: usize, - ) -> Self + pub fn push_with_unwind_priority(mut self, stage: S, unwind_priority: usize) -> Self where S: Stage + 'static, { - self.stages.push(QueuedStage { stage: Box::new(stage), require_tip, unwind_priority }); + self.stages.push(QueuedStage { stage: Box::new(stage), unwind_priority }); self } @@ -152,7 +147,6 @@ impl Pipeline { max_block: self.max_block, maximum_progress: None, minimum_progress: None, - reached_tip: true, }; let next_action = self.run_loop(&mut state, db.as_ref()).await?; @@ -277,9 +271,6 @@ struct QueuedStage { stage: Box>, /// The unwind priority of the stage. unwind_priority: usize, - /// Whether or not this stage can only execute when we reach what we believe to be the tip of - /// the chain. - require_tip: bool, } impl QueuedStage { @@ -291,19 +282,6 @@ impl QueuedStage { db: &DB, ) -> Result { let stage_id = self.stage.id(); - if self.require_tip && !state.reached_tip() { - warn!( - target: "sync::pipeline", - stage = %stage_id, - "Tip not reached as required by stage, skipping." - ); - state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?; - - // Stage requires us to reach the tip of the chain first, but we have - // not. - return Ok(ControlFlow::Continue) - } - loop { let mut tx = Transaction::new(db)?; @@ -321,7 +299,6 @@ impl QueuedStage { state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?; // We reached the maximum block, so we skip the stage - state.set_reached_tip(true); return Ok(ControlFlow::Continue) } @@ -335,7 +312,7 @@ impl QueuedStage { .execute(&mut tx, ExecInput { previous_stage, stage_progress: prev_progress }) .await { - Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => { + Ok(out @ ExecOutput { stage_progress, done }) => { info!( target: "sync::pipeline", stage = %stage_id, @@ -354,7 +331,6 @@ impl QueuedStage { tx.commit()?; state.record_progress_outliers(stage_progress); - state.set_reached_tip(reached_tip); if done { return Ok(ControlFlow::Continue) @@ -422,20 +398,12 @@ mod tests { tokio::spawn(async move { Pipeline::>::new_with_channel(tx) .push( - TestStage::new(StageId("A")).add_exec(Ok(ExecOutput { - stage_progress: 20, - done: true, - reached_tip: true, - })), - false, + TestStage::new(StageId("A")) + .add_exec(Ok(ExecOutput { stage_progress: 20, done: true })), ) .push( - TestStage::new(StageId("B")).add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })), - false, + TestStage::new(StageId("B")) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) .set_max_block(Some(10)) .run(db) @@ -449,12 +417,12 @@ mod tests { PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("A"), - result: ExecOutput { stage_progress: 20, done: true, reached_tip: true }, + result: ExecOutput { stage_progress: 20, done: true }, }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("B"), - result: ExecOutput { stage_progress: 10, done: true, reached_tip: true }, + result: ExecOutput { stage_progress: 10, done: true }, }, ] ); @@ -471,23 +439,13 @@ mod tests { let mut pipeline = Pipeline::>::new() .push( TestStage::new(StageId("A")) - .add_exec(Ok(ExecOutput { - stage_progress: 100, - done: true, - reached_tip: true, - })) + .add_exec(Ok(ExecOutput { stage_progress: 100, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), - false, ) .push( TestStage::new(StageId("B")) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), - false, ) .set_max_block(Some(10)); @@ -544,18 +502,9 @@ mod tests { Pipeline::>::new() .push( TestStage::new(StageId("A")) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 0 })) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })), - false, + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) .push( TestStage::new(StageId("B")) @@ -564,12 +513,7 @@ mod tests { error: consensus::Error::BaseFeeMissing, })) .add_unwind(Ok(UnwindOutput { stage_progress: 0 })) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })), - false, + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) .set_max_block(Some(10)) .set_channel(tx) @@ -585,7 +529,7 @@ mod tests { PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("A"), - result: ExecOutput { stage_progress: 10, done: true, reached_tip: true }, + result: ExecOutput { stage_progress: 10, done: true }, }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Error { stage_id: StageId("B") }, @@ -600,12 +544,12 @@ mod tests { PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0) }, PipelineEvent::Ran { stage_id: StageId("A"), - result: ExecOutput { stage_progress: 10, done: true, reached_tip: true }, + result: ExecOutput { stage_progress: 10, done: true }, }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Ran { stage_id: StageId("B"), - result: ExecOutput { stage_progress: 10, done: true, reached_tip: true }, + result: ExecOutput { stage_progress: 10, done: true }, }, ] ); @@ -631,35 +575,20 @@ mod tests { let mut pipeline = Pipeline::>::new() .push_with_unwind_priority( TestStage::new(StageId("A")) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), - false, 1, ) .push_with_unwind_priority( TestStage::new(StageId("B")) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), - false, 10, ) .push_with_unwind_priority( TestStage::new(StageId("C")) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })) + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })) .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), - false, 5, ) .set_max_block(Some(10)); @@ -703,66 +632,6 @@ mod tests { ); } - /// Runs a simple pipeline. - #[tokio::test] - async fn skips_stages_that_require_tip() { - let (tx, rx) = channel(2); - let db = test_utils::create_test_db(EnvKind::RW); - - // Run pipeline - tokio::spawn(async move { - Pipeline::>::new_with_channel(tx) - .push( - TestStage::new(StageId("A")) - .add_exec(Ok(ExecOutput { - stage_progress: 5, - done: true, - reached_tip: false, - })) - .add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })), - false, - ) - .push( - TestStage::new(StageId("B")).add_exec(Ok(ExecOutput { - stage_progress: 10, - done: true, - reached_tip: true, - })), - true, - ) - .set_max_block(Some(10)) - .run(db) - .await - }); - - // Check that the stages were run in order - assert_eq!( - ReceiverStream::new(rx).collect::>().await, - vec![ - PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, - PipelineEvent::Ran { - stage_id: StageId("A"), - result: ExecOutput { stage_progress: 5, reached_tip: false, done: true } - }, - PipelineEvent::Skipped { stage_id: StageId("B") }, - PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(5) }, - PipelineEvent::Ran { - stage_id: StageId("A"), - result: ExecOutput { stage_progress: 10, reached_tip: true, done: true } - }, - PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, - PipelineEvent::Ran { - stage_id: StageId("B"), - result: ExecOutput { stage_progress: 10, reached_tip: true, done: true } - }, - ] - ); - } - /// Checks that the pipeline re-runs stages on non-fatal errors and stops on fatal ones. #[tokio::test] async fn pipeline_error_handling() { @@ -772,8 +641,7 @@ mod tests { .push( TestStage::new(StageId("NonFatal")) .add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error)))) - .add_exec(Ok(ExecOutput { stage_progress: 10, done: true, reached_tip: true })), - false, + .add_exec(Ok(ExecOutput { stage_progress: 10, done: true })), ) .set_max_block(Some(10)) .run(db) @@ -783,12 +651,9 @@ mod tests { // Fatal let db = test_utils::create_test_db(EnvKind::RW); let result = Pipeline::>::new() - .push( - TestStage::new(StageId("Fatal")).add_exec(Err(StageError::DatabaseIntegrity( - DatabaseIntegrityError::BlockBody { number: 5 }, - ))), - false, - ) + .push(TestStage::new(StageId("Fatal")).add_exec(Err(StageError::DatabaseIntegrity( + DatabaseIntegrityError::BlockBody { number: 5 }, + )))) .run(db) .await; assert_matches!( diff --git a/crates/stages/src/pipeline/state.rs b/crates/stages/src/pipeline/state.rs index 89154c0d8..aa393a925 100644 --- a/crates/stages/src/pipeline/state.rs +++ b/crates/stages/src/pipeline/state.rs @@ -12,11 +12,6 @@ pub(crate) struct PipelineState { pub(crate) maximum_progress: Option, /// The minimum progress achieved by any stage during the execution of the pipeline. pub(crate) minimum_progress: Option, - /// Whether or not the previous stage reached the tip of the chain. - /// - /// **Do not use this** under normal circumstances. Instead, opt for - /// [PipelineState::reached_tip] and [PipelineState::set_reached_tip]. - pub(crate) reached_tip: bool, } impl PipelineState { @@ -27,49 +22,12 @@ impl PipelineState { self.minimum_progress = opt::min(self.minimum_progress, stage_progress); self.maximum_progress = opt::max(self.maximum_progress, stage_progress); } - - /// Whether or not the pipeline reached the tip of the chain. - pub(crate) fn reached_tip(&self) -> bool { - self.reached_tip || - self.max_block - .zip(self.minimum_progress) - .map_or(false, |(target, progress)| progress >= target) - } - - /// Set whether or not the pipeline has reached the tip of the chain. - pub(crate) fn set_reached_tip(&mut self, flag: bool) { - self.reached_tip = flag; - } } #[cfg(test)] mod tests { use super::*; - #[test] - fn reached_tip() { - let mut state = PipelineState { - events_sender: MaybeSender::new(None), - max_block: None, - maximum_progress: None, - minimum_progress: None, - reached_tip: false, - }; - - // default - assert!(!state.reached_tip()); - - // reached tip - state.set_reached_tip(true); - assert!(state.reached_tip()); - - // reached max block - state.set_reached_tip(false); - state.max_block = Some(1); - state.minimum_progress = Some(1); - assert!(state.reached_tip()); - } - #[test] fn record_progress_outliers() { let mut state = PipelineState { @@ -77,7 +35,6 @@ mod tests { max_block: None, maximum_progress: None, minimum_progress: None, - reached_tip: false, }; state.record_progress_outliers(10); diff --git a/crates/stages/src/stage.rs b/crates/stages/src/stage.rs index 7ba1e03ee..7534dd250 100644 --- a/crates/stages/src/stage.rs +++ b/crates/stages/src/stage.rs @@ -37,8 +37,6 @@ pub struct ExecOutput { pub stage_progress: BlockNumber, /// Whether or not the stage is done. pub done: bool, - /// Whether or not the stage reached the tip of the chain. - pub reached_tip: bool, } /// The output of a stage unwinding. diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index d4d598ae5..cd53a50b9 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -93,7 +93,7 @@ impl Stage for BodyStage(tx, starting_block, target)?; @@ -124,7 +124,6 @@ impl Stage for BodyStage Stage for BodyStage= 10 + Ok(ExecOutput { stage_progress, done: false }) if stage_progress >= 10 ); let first_run_progress = first_run.unwrap().stage_progress; @@ -392,7 +388,7 @@ mod tests { let output = rx.await.unwrap(); assert_matches!( output, - Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) if stage_progress > first_run_progress + Ok(ExecOutput { stage_progress, done: true }) if stage_progress > first_run_progress ); assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation"); } @@ -432,7 +428,7 @@ mod tests { // Check that the error bubbles up assert_matches!( rx.await.unwrap(), - Ok(ExecOutput { stage_progress: out_stage_progress, done: false, reached_tip: false }) + Ok(ExecOutput { stage_progress: out_stage_progress, done: false }) if out_stage_progress == stage_progress ); assert!(runner.validate_execution(input, None).is_ok(), "execution validation"); @@ -462,7 +458,7 @@ mod tests { let output = rx.await.unwrap(); assert_matches!( output, - Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) if stage_progress == previous_stage + Ok(ExecOutput { stage_progress, done: true }) if stage_progress == previous_stage ); let stage_progress = output.unwrap().stage_progress; runner @@ -521,7 +517,7 @@ mod tests { // Check that the error bubbles up assert_matches!( rx.await.unwrap(), - Ok(ExecOutput { stage_progress: out_stage_progress, done: false, reached_tip: false }) + Ok(ExecOutput { stage_progress: out_stage_progress, done: false }) if out_stage_progress == stage_progress ); assert!(runner.validate_execution(input, None).is_ok(), "execution validation"); diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 092153b36..ced9cebca 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -108,7 +108,7 @@ impl Stage for ExecutionStage { // no more canonical blocks, we are done with execution. if canonical_batch.is_empty() { - return Ok(ExecOutput { stage_progress: last_block, done: true, reached_tip: true }) + return Ok(ExecOutput { stage_progress: last_block, done: true }) } // Get block headers and bodies from canonical hashes @@ -262,7 +262,7 @@ impl Stage for ExecutionStage { let last_block = last_block + canonical_batch.len() as u64; let is_done = canonical_batch.len() < BATCH_SIZE as usize; - Ok(ExecOutput { done: is_done, reached_tip: true, stage_progress: last_block }) + Ok(ExecOutput { done: is_done, stage_progress: last_block }) } /// Unwind the stage. @@ -414,7 +414,7 @@ mod tests { execution_stage.config.spec_upgrades = SpecUpgrades::new_berlin_activated(); let output = execution_stage.execute(&mut tx, input).await.unwrap(); tx.commit().unwrap(); - assert_eq!(output, ExecOutput { stage_progress: 1, done: true, reached_tip: true }); + assert_eq!(output, ExecOutput { stage_progress: 1, done: true }); let tx = tx.deref_mut(); // check post state let account1 = H160(hex!("1000000000000000000000000000000000000000")); diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 43086d3ea..796d95966 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -142,7 +142,7 @@ impl Stage for SendersStage { let max_block_num = previous_stage_progress.min(stage_progress + self.commit_threshold); if max_block_num <= stage_progress { - return Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) + return Ok(ExecOutput { stage_progress, done: true }) } // Look up the start index for the transaction range @@ -74,7 +74,7 @@ impl Stage for SendersStage { // No transactions to walk over if start_tx_index > end_tx_index { - return Ok(ExecOutput { stage_progress: max_block_num, done: true, reached_tip: true }) + return Ok(ExecOutput { stage_progress: max_block_num, done: true }) } // Acquire the cursor for inserting elements @@ -106,7 +106,7 @@ impl Stage for SendersStage { } let done = max_block_num >= previous_stage_progress; - Ok(ExecOutput { stage_progress: max_block_num, done, reached_tip: done }) + Ok(ExecOutput { stage_progress: max_block_num, done }) } /// Unwind the stage. @@ -168,8 +168,8 @@ mod tests { let result = rx.await.unwrap(); assert_matches!( result, - Ok(ExecOutput { done, reached_tip, stage_progress }) - if done && reached_tip && stage_progress == previous_stage + Ok(ExecOutput { done, stage_progress }) + if done && stage_progress == previous_stage ); // Validate the stage execution @@ -196,7 +196,7 @@ mod tests { let expected_progress = stage_progress + threshold; assert_matches!( result, - Ok(ExecOutput { done: false, reached_tip: false, stage_progress }) + Ok(ExecOutput { done: false, stage_progress }) if stage_progress == expected_progress ); @@ -208,7 +208,7 @@ mod tests { let result = runner.execute(second_input).await.unwrap(); assert_matches!( result, - Ok(ExecOutput { done: true, reached_tip: true, stage_progress }) + Ok(ExecOutput { done: true, stage_progress }) if stage_progress == previous_stage ); diff --git a/crates/stages/src/test_utils/macros.rs b/crates/stages/src/test_utils/macros.rs index 4a25ee186..b282e9b9e 100644 --- a/crates/stages/src/test_utils/macros.rs +++ b/crates/stages/src/test_utils/macros.rs @@ -40,8 +40,8 @@ macro_rules! stage_test_suite { let result = rx.await.unwrap(); assert_matches::assert_matches!( result, - Ok(ExecOutput { done, reached_tip, stage_progress }) - if done && reached_tip && stage_progress == previous_stage + Ok(ExecOutput { done, stage_progress }) + if done && stage_progress == previous_stage ); // Validate the stage execution @@ -90,8 +90,8 @@ macro_rules! stage_test_suite { let result = rx.await.unwrap(); assert_matches::assert_matches!( result, - Ok(ExecOutput { done, reached_tip, stage_progress }) - if done && reached_tip && stage_progress == previous_stage + Ok(ExecOutput { done, stage_progress }) + if done && stage_progress == previous_stage ); assert!(runner.validate_execution(execute_input, result.ok()).is_ok(), "execution validation"); @@ -142,8 +142,8 @@ macro_rules! stage_test_suite_ext { let result = rx.await.unwrap(); assert_matches::assert_matches!( result, - Ok(ExecOutput { done, reached_tip, stage_progress }) - if done && reached_tip && stage_progress == stage_progress + Ok(ExecOutput { done, stage_progress }) + if done && stage_progress == stage_progress ); // Validate the stage execution