refactor(sync): remove require_tip (#528)

`require_tip` could only be determined by the headers stage,
and it signalled that we have all of the headers to sync all
the way to the chain tip. Some stages may wait to execute
until the tip is reached, e.g. the stage that checks the
stage root, but there are a few problems:

- On initial sync, `reached_tip` would be `true`, but by
  the time we reach the hashing stage, this would actually
  no longer be the case: the other stages have spent
  enough time for us to be "out of sync". This means
  that the optimization here is lost, and the additional
  logic is added for nothing.
- When we are not doing our initial sync, `reached_tip` would
  always be `true` for each subsequent block we sync.
  The same logic applies as above, i.e. the extra logic
  is there for nothing.

In other words, `reached_tip` would *always* be `true` once
we leave the header stage, making the extra logic entirely
redundant.
This commit is contained in:
Bjerg
2022-12-19 22:04:42 +01:00
committed by GitHub
parent 13bb41517d
commit f65562e2e4
9 changed files with 80 additions and 273 deletions

View File

@ -114,8 +114,7 @@ impl Command {
// TODO: Remove magic numbers
let fetch_client = Arc::new(network.fetch_client().await?);
let mut pipeline = reth_stages::Pipeline::new()
.push(
HeaderStage {
.push(HeaderStage {
downloader: headers::linear::LinearDownloadBuilder::default()
.batch_size(config.stages.headers.downloader_batch_size)
.retries(config.stages.headers.downloader_retries)
@ -124,11 +123,8 @@ impl Command {
client: fetch_client.clone(),
network_handle: network.clone(),
commit_threshold: config.stages.headers.commit_threshold,
},
false,
)
.push(
BodyStage {
})
.push(BodyStage {
downloader: Arc::new(
bodies::concurrent::ConcurrentDownloader::new(
fetch_client.clone(),
@ -140,16 +136,11 @@ impl Command {
),
consensus: consensus.clone(),
commit_threshold: config.stages.bodies.commit_threshold,
},
false,
)
.push(
SendersStage {
})
.push(SendersStage {
batch_size: config.stages.senders.batch_size,
commit_threshold: config.stages.senders.commit_threshold,
},
false,
);
});
if let Some(tip) = self.tip {
debug!("Tip manually set: {}", tip);

View File

@ -108,24 +108,19 @@ impl<DB: Database> Pipeline<DB> {
/// # Unwinding
///
/// The unwind priority is set to 0.
pub fn push<S>(self, stage: S, require_tip: bool) -> Self
pub fn push<S>(self, stage: S) -> Self
where
S: Stage<DB> + 'static,
{
self.push_with_unwind_priority(stage, require_tip, 0)
self.push_with_unwind_priority(stage, 0)
}
/// Add a stage to the pipeline, specifying the unwind priority.
pub fn push_with_unwind_priority<S>(
mut self,
stage: S,
require_tip: bool,
unwind_priority: usize,
) -> Self
pub fn push_with_unwind_priority<S>(mut self, stage: S, unwind_priority: usize) -> Self
where
S: Stage<DB> + 'static,
{
self.stages.push(QueuedStage { stage: Box::new(stage), require_tip, unwind_priority });
self.stages.push(QueuedStage { stage: Box::new(stage), unwind_priority });
self
}
@ -152,7 +147,6 @@ impl<DB: Database> Pipeline<DB> {
max_block: self.max_block,
maximum_progress: None,
minimum_progress: None,
reached_tip: true,
};
let next_action = self.run_loop(&mut state, db.as_ref()).await?;
@ -277,9 +271,6 @@ struct QueuedStage<DB: Database> {
stage: Box<dyn Stage<DB>>,
/// The unwind priority of the stage.
unwind_priority: usize,
/// Whether or not this stage can only execute when we reach what we believe to be the tip of
/// the chain.
require_tip: bool,
}
impl<DB: Database> QueuedStage<DB> {
@ -291,19 +282,6 @@ impl<DB: Database> QueuedStage<DB> {
db: &DB,
) -> Result<ControlFlow, PipelineError> {
let stage_id = self.stage.id();
if self.require_tip && !state.reached_tip() {
warn!(
target: "sync::pipeline",
stage = %stage_id,
"Tip not reached as required by stage, skipping."
);
state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
// Stage requires us to reach the tip of the chain first, but we have
// not.
return Ok(ControlFlow::Continue)
}
loop {
let mut tx = Transaction::new(db)?;
@ -321,7 +299,6 @@ impl<DB: Database> QueuedStage<DB> {
state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
// We reached the maximum block, so we skip the stage
state.set_reached_tip(true);
return Ok(ControlFlow::Continue)
}
@ -335,7 +312,7 @@ impl<DB: Database> QueuedStage<DB> {
.execute(&mut tx, ExecInput { previous_stage, stage_progress: prev_progress })
.await
{
Ok(out @ ExecOutput { stage_progress, done, reached_tip }) => {
Ok(out @ ExecOutput { stage_progress, done }) => {
info!(
target: "sync::pipeline",
stage = %stage_id,
@ -354,7 +331,6 @@ impl<DB: Database> QueuedStage<DB> {
tx.commit()?;
state.record_progress_outliers(stage_progress);
state.set_reached_tip(reached_tip);
if done {
return Ok(ControlFlow::Continue)
@ -422,20 +398,12 @@ mod tests {
tokio::spawn(async move {
Pipeline::<Env<WriteMap>>::new_with_channel(tx)
.push(
TestStage::new(StageId("A")).add_exec(Ok(ExecOutput {
stage_progress: 20,
done: true,
reached_tip: true,
})),
false,
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput { stage_progress: 20, done: true })),
)
.push(
TestStage::new(StageId("B")).add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
})),
false,
TestStage::new(StageId("B"))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
)
.set_max_block(Some(10))
.run(db)
@ -449,12 +417,12 @@ mod tests {
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 20, done: true, reached_tip: true },
result: ExecOutput { stage_progress: 20, done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { stage_progress: 10, done: true, reached_tip: true },
result: ExecOutput { stage_progress: 10, done: true },
},
]
);
@ -471,23 +439,13 @@ mod tests {
let mut pipeline = Pipeline::<Env<mdbx::WriteMap>>::new()
.push(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput {
stage_progress: 100,
done: true,
reached_tip: true,
}))
.add_exec(Ok(ExecOutput { stage_progress: 100, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
false,
)
.push(
TestStage::new(StageId("B"))
.add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
}))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
false,
)
.set_max_block(Some(10));
@ -544,18 +502,9 @@ mod tests {
Pipeline::<Env<mdbx::WriteMap>>::new()
.push(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
}))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 0 }))
.add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
})),
false,
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
)
.push(
TestStage::new(StageId("B"))
@ -564,12 +513,7 @@ mod tests {
error: consensus::Error::BaseFeeMissing,
}))
.add_unwind(Ok(UnwindOutput { stage_progress: 0 }))
.add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
})),
false,
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
)
.set_max_block(Some(10))
.set_channel(tx)
@ -585,7 +529,7 @@ mod tests {
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 10, done: true, reached_tip: true },
result: ExecOutput { stage_progress: 10, done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Error { stage_id: StageId("B") },
@ -600,12 +544,12 @@ mod tests {
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0) },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 10, done: true, reached_tip: true },
result: ExecOutput { stage_progress: 10, done: true },
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { stage_progress: 10, done: true, reached_tip: true },
result: ExecOutput { stage_progress: 10, done: true },
},
]
);
@ -631,35 +575,20 @@ mod tests {
let mut pipeline = Pipeline::<Env<mdbx::WriteMap>>::new()
.push_with_unwind_priority(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
}))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
false,
1,
)
.push_with_unwind_priority(
TestStage::new(StageId("B"))
.add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
}))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
false,
10,
)
.push_with_unwind_priority(
TestStage::new(StageId("C"))
.add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
}))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true }))
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
false,
5,
)
.set_max_block(Some(10));
@ -703,66 +632,6 @@ mod tests {
);
}
/// Runs a simple pipeline.
#[tokio::test]
async fn skips_stages_that_require_tip() {
let (tx, rx) = channel(2);
let db = test_utils::create_test_db(EnvKind::RW);
// Run pipeline
tokio::spawn(async move {
Pipeline::<Env<mdbx::WriteMap>>::new_with_channel(tx)
.push(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput {
stage_progress: 5,
done: true,
reached_tip: false,
}))
.add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
})),
false,
)
.push(
TestStage::new(StageId("B")).add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
})),
true,
)
.set_max_block(Some(10))
.run(db)
.await
});
// Check that the stages were run in order
assert_eq!(
ReceiverStream::new(rx).collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 5, reached_tip: false, done: true }
},
PipelineEvent::Skipped { stage_id: StageId("B") },
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(5) },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 10, reached_tip: true, done: true }
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { stage_progress: 10, reached_tip: true, done: true }
},
]
);
}
/// Checks that the pipeline re-runs stages on non-fatal errors and stops on fatal ones.
#[tokio::test]
async fn pipeline_error_handling() {
@ -772,8 +641,7 @@ mod tests {
.push(
TestStage::new(StageId("NonFatal"))
.add_exec(Err(StageError::Recoverable(Box::new(std::fmt::Error))))
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true, reached_tip: true })),
false,
.add_exec(Ok(ExecOutput { stage_progress: 10, done: true })),
)
.set_max_block(Some(10))
.run(db)
@ -783,12 +651,9 @@ mod tests {
// Fatal
let db = test_utils::create_test_db(EnvKind::RW);
let result = Pipeline::<Env<WriteMap>>::new()
.push(
TestStage::new(StageId("Fatal")).add_exec(Err(StageError::DatabaseIntegrity(
.push(TestStage::new(StageId("Fatal")).add_exec(Err(StageError::DatabaseIntegrity(
DatabaseIntegrityError::BlockBody { number: 5 },
))),
false,
)
))))
.run(db)
.await;
assert_matches!(

View File

@ -12,11 +12,6 @@ pub(crate) struct PipelineState {
pub(crate) maximum_progress: Option<BlockNumber>,
/// The minimum progress achieved by any stage during the execution of the pipeline.
pub(crate) minimum_progress: Option<BlockNumber>,
/// Whether or not the previous stage reached the tip of the chain.
///
/// **Do not use this** under normal circumstances. Instead, opt for
/// [PipelineState::reached_tip] and [PipelineState::set_reached_tip].
pub(crate) reached_tip: bool,
}
impl PipelineState {
@ -27,49 +22,12 @@ impl PipelineState {
self.minimum_progress = opt::min(self.minimum_progress, stage_progress);
self.maximum_progress = opt::max(self.maximum_progress, stage_progress);
}
/// Whether or not the pipeline reached the tip of the chain.
pub(crate) fn reached_tip(&self) -> bool {
self.reached_tip ||
self.max_block
.zip(self.minimum_progress)
.map_or(false, |(target, progress)| progress >= target)
}
/// Set whether or not the pipeline has reached the tip of the chain.
pub(crate) fn set_reached_tip(&mut self, flag: bool) {
self.reached_tip = flag;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn reached_tip() {
let mut state = PipelineState {
events_sender: MaybeSender::new(None),
max_block: None,
maximum_progress: None,
minimum_progress: None,
reached_tip: false,
};
// default
assert!(!state.reached_tip());
// reached tip
state.set_reached_tip(true);
assert!(state.reached_tip());
// reached max block
state.set_reached_tip(false);
state.max_block = Some(1);
state.minimum_progress = Some(1);
assert!(state.reached_tip());
}
#[test]
fn record_progress_outliers() {
let mut state = PipelineState {
@ -77,7 +35,6 @@ mod tests {
max_block: None,
maximum_progress: None,
minimum_progress: None,
reached_tip: false,
};
state.record_progress_outliers(10);

View File

@ -37,8 +37,6 @@ pub struct ExecOutput {
pub stage_progress: BlockNumber,
/// Whether or not the stage is done.
pub done: bool,
/// Whether or not the stage reached the tip of the chain.
pub reached_tip: bool,
}
/// The output of a stage unwinding.

View File

@ -93,7 +93,7 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
// Short circuit in case we already reached the target block
let target = previous_stage_progress.min(starting_block + self.commit_threshold);
if target <= stage_progress {
return Ok(ExecOutput { stage_progress, reached_tip: true, done: true })
return Ok(ExecOutput { stage_progress, done: true })
}
let bodies_to_download = self.bodies_to_download::<DB>(tx, starting_block, target)?;
@ -124,7 +124,6 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
return Ok(ExecOutput {
stage_progress: highest_block,
done: false,
reached_tip: false,
})
};
@ -190,7 +189,7 @@ impl<DB: Database, D: BodyDownloader, C: Consensus> Stage<DB> for BodyStage<D, C
let capped = target < previous_stage_progress;
let done = highest_block < target || !capped;
Ok(ExecOutput { stage_progress: highest_block, reached_tip: true, done })
Ok(ExecOutput { stage_progress: highest_block, done })
}
/// Unwind the stage.
@ -321,7 +320,7 @@ mod tests {
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { stage_progress, reached_tip: true, done: false }) if stage_progress < 200
Ok(ExecOutput { stage_progress, done: false }) if stage_progress < 200
);
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
}
@ -348,10 +347,7 @@ mod tests {
// Check that we synced all blocks successfully, even though our `batch_size` allows us to
// sync more (if there were more headers)
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { stage_progress: 20, reached_tip: true, done: true })
);
assert_matches!(output, Ok(ExecOutput { stage_progress: 20, done: true }));
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
}
@ -377,7 +373,7 @@ mod tests {
let first_run = rx.await.unwrap();
assert_matches!(
first_run,
Ok(ExecOutput { stage_progress, reached_tip: true, done: false }) if stage_progress >= 10
Ok(ExecOutput { stage_progress, done: false }) if stage_progress >= 10
);
let first_run_progress = first_run.unwrap().stage_progress;
@ -392,7 +388,7 @@ mod tests {
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) if stage_progress > first_run_progress
Ok(ExecOutput { stage_progress, done: true }) if stage_progress > first_run_progress
);
assert!(runner.validate_execution(input, output.ok()).is_ok(), "execution validation");
}
@ -432,7 +428,7 @@ mod tests {
// Check that the error bubbles up
assert_matches!(
rx.await.unwrap(),
Ok(ExecOutput { stage_progress: out_stage_progress, done: false, reached_tip: false })
Ok(ExecOutput { stage_progress: out_stage_progress, done: false })
if out_stage_progress == stage_progress
);
assert!(runner.validate_execution(input, None).is_ok(), "execution validation");
@ -462,7 +458,7 @@ mod tests {
let output = rx.await.unwrap();
assert_matches!(
output,
Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) if stage_progress == previous_stage
Ok(ExecOutput { stage_progress, done: true }) if stage_progress == previous_stage
);
let stage_progress = output.unwrap().stage_progress;
runner
@ -521,7 +517,7 @@ mod tests {
// Check that the error bubbles up
assert_matches!(
rx.await.unwrap(),
Ok(ExecOutput { stage_progress: out_stage_progress, done: false, reached_tip: false })
Ok(ExecOutput { stage_progress: out_stage_progress, done: false })
if out_stage_progress == stage_progress
);
assert!(runner.validate_execution(input, None).is_ok(), "execution validation");

View File

@ -108,7 +108,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
// no more canonical blocks, we are done with execution.
if canonical_batch.is_empty() {
return Ok(ExecOutput { stage_progress: last_block, done: true, reached_tip: true })
return Ok(ExecOutput { stage_progress: last_block, done: true })
}
// Get block headers and bodies from canonical hashes
@ -262,7 +262,7 @@ impl<DB: Database> Stage<DB> for ExecutionStage {
let last_block = last_block + canonical_batch.len() as u64;
let is_done = canonical_batch.len() < BATCH_SIZE as usize;
Ok(ExecOutput { done: is_done, reached_tip: true, stage_progress: last_block })
Ok(ExecOutput { done: is_done, stage_progress: last_block })
}
/// Unwind the stage.
@ -414,7 +414,7 @@ mod tests {
execution_stage.config.spec_upgrades = SpecUpgrades::new_berlin_activated();
let output = execution_stage.execute(&mut tx, input).await.unwrap();
tx.commit().unwrap();
assert_eq!(output, ExecOutput { stage_progress: 1, done: true, reached_tip: true });
assert_eq!(output, ExecOutput { stage_progress: 1, done: true });
let tx = tx.deref_mut();
// check post state
let account1 = H160(hex!("1000000000000000000000000000000000000000"));

View File

@ -142,7 +142,7 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
.unwrap_or_default(),
);
Ok(ExecOutput { stage_progress, reached_tip: true, done: true })
Ok(ExecOutput { stage_progress, done: true })
}
/// Unwind the stage.
@ -403,7 +403,7 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done: true, reached_tip: true, stage_progress })
Ok(ExecOutput { done: true, stage_progress })
if stage_progress == tip.number
);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");

View File

@ -63,7 +63,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
let max_block_num = previous_stage_progress.min(stage_progress + self.commit_threshold);
if max_block_num <= stage_progress {
return Ok(ExecOutput { stage_progress, reached_tip: true, done: true })
return Ok(ExecOutput { stage_progress, done: true })
}
// Look up the start index for the transaction range
@ -74,7 +74,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
// No transactions to walk over
if start_tx_index > end_tx_index {
return Ok(ExecOutput { stage_progress: max_block_num, done: true, reached_tip: true })
return Ok(ExecOutput { stage_progress: max_block_num, done: true })
}
// Acquire the cursor for inserting elements
@ -106,7 +106,7 @@ impl<DB: Database> Stage<DB> for SendersStage {
}
let done = max_block_num >= previous_stage_progress;
Ok(ExecOutput { stage_progress: max_block_num, done, reached_tip: done })
Ok(ExecOutput { stage_progress: max_block_num, done })
}
/// Unwind the stage.
@ -168,8 +168,8 @@ mod tests {
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done, reached_tip, stage_progress })
if done && reached_tip && stage_progress == previous_stage
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == previous_stage
);
// Validate the stage execution
@ -196,7 +196,7 @@ mod tests {
let expected_progress = stage_progress + threshold;
assert_matches!(
result,
Ok(ExecOutput { done: false, reached_tip: false, stage_progress })
Ok(ExecOutput { done: false, stage_progress })
if stage_progress == expected_progress
);
@ -208,7 +208,7 @@ mod tests {
let result = runner.execute(second_input).await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done: true, reached_tip: true, stage_progress })
Ok(ExecOutput { done: true, stage_progress })
if stage_progress == previous_stage
);

View File

@ -40,8 +40,8 @@ macro_rules! stage_test_suite {
let result = rx.await.unwrap();
assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, reached_tip, stage_progress })
if done && reached_tip && stage_progress == previous_stage
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == previous_stage
);
// Validate the stage execution
@ -90,8 +90,8 @@ macro_rules! stage_test_suite {
let result = rx.await.unwrap();
assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, reached_tip, stage_progress })
if done && reached_tip && stage_progress == previous_stage
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == previous_stage
);
assert!(runner.validate_execution(execute_input, result.ok()).is_ok(), "execution validation");
@ -142,8 +142,8 @@ macro_rules! stage_test_suite_ext {
let result = rx.await.unwrap();
assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, reached_tip, stage_progress })
if done && reached_tip && stage_progress == stage_progress
Ok(ExecOutput { done, stage_progress })
if done && stage_progress == stage_progress
);
// Validate the stage execution