feat(bin, pipeline): report pipeline progress (#2932)

This commit is contained in:
Alexey Shekhirin
2023-06-01 02:17:33 +04:00
committed by GitHub
parent aea35263da
commit 9a0aeea543
3 changed files with 113 additions and 14 deletions

View File

@ -12,6 +12,10 @@ use reth_primitives::stage::{StageCheckpoint, StageId};
pub enum PipelineEvent {
/// Emitted when a stage is about to be run.
Running {
/// 1-indexed ID of the stage that is about to be run out of total stages in the pipeline.
pipeline_position: usize,
/// Total number of stages in the pipeline.
pipeline_total: usize,
/// The stage that is about to be run.
stage_id: StageId,
/// The previous checkpoint of the stage.
@ -19,6 +23,10 @@ pub enum PipelineEvent {
},
/// Emitted when a stage has run a single time.
Ran {
/// 1-indexed ID of the stage that was run out of total stages in the pipeline.
pipeline_position: usize,
/// Total number of stages in the pipeline.
pipeline_total: usize,
/// The stage that was run.
stage_id: StageId,
/// The result of executing the stage.

View File

@ -299,6 +299,8 @@ where
previous_stage: Option<(StageId, StageCheckpoint)>,
stage_index: usize,
) -> Result<ControlFlow, PipelineError> {
let total_stages = self.stages.len();
let stage = &mut self.stages[stage_index];
let stage_id = stage.id();
let mut made_progress = false;
@ -326,7 +328,12 @@ where
})
}
self.listeners.notify(PipelineEvent::Running { stage_id, checkpoint: prev_checkpoint });
self.listeners.notify(PipelineEvent::Running {
pipeline_position: stage_index + 1,
pipeline_total: total_stages,
stage_id,
checkpoint: prev_checkpoint,
});
match stage
.execute(&mut tx, ExecInput { previous_stage, checkpoint: prev_checkpoint })
@ -350,7 +357,12 @@ where
);
tx.save_stage_checkpoint(stage_id, checkpoint)?;
self.listeners.notify(PipelineEvent::Ran { stage_id, result: out.clone() });
self.listeners.notify(PipelineEvent::Ran {
pipeline_position: stage_index + 1,
pipeline_total: total_stages,
stage_id,
result: out.clone(),
});
// TODO: Make the commit interval configurable
tx.commit()?;
@ -481,13 +493,27 @@ mod tests {
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Running { stage_id: StageId::Other("A"), checkpoint: None },
PipelineEvent::Running {
pipeline_position: 1,
pipeline_total: 2,
stage_id: StageId::Other("A"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 1,
pipeline_total: 2,
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None },
PipelineEvent::Running {
pipeline_position: 2,
pipeline_total: 2,
stage_id: StageId::Other("B"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 2,
pipeline_total: 2,
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
@ -534,18 +560,39 @@ mod tests {
events.collect::<Vec<PipelineEvent>>().await,
vec![
// Executing
PipelineEvent::Running { stage_id: StageId::Other("A"), checkpoint: None },
PipelineEvent::Running {
pipeline_position: 1,
pipeline_total: 3,
stage_id: StageId::Other("A"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 1,
pipeline_total: 3,
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None },
PipelineEvent::Running {
pipeline_position: 2,
pipeline_total: 3,
stage_id: StageId::Other("B"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 2,
pipeline_total: 3,
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running { stage_id: StageId::Other("C"), checkpoint: None },
PipelineEvent::Running {
pipeline_position: 3,
pipeline_total: 3,
stage_id: StageId::Other("C"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 3,
pipeline_total: 3,
stage_id: StageId::Other("C"),
result: ExecOutput { checkpoint: StageCheckpoint::new(20), done: true },
},
@ -623,13 +670,27 @@ mod tests {
events.collect::<Vec<PipelineEvent>>().await,
vec![
// Executing
PipelineEvent::Running { stage_id: StageId::Other("A"), checkpoint: None },
PipelineEvent::Running {
pipeline_position: 1,
pipeline_total: 2,
stage_id: StageId::Other("A"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 1,
pipeline_total: 2,
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(100), done: true },
},
PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None },
PipelineEvent::Running {
pipeline_position: 2,
pipeline_total: 2,
stage_id: StageId::Other("B"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 2,
pipeline_total: 2,
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
@ -697,12 +758,24 @@ mod tests {
assert_eq!(
events.collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Running { stage_id: StageId::Other("A"), checkpoint: None },
PipelineEvent::Running {
pipeline_position: 1,
pipeline_total: 2,
stage_id: StageId::Other("A"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 1,
pipeline_total: 2,
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None },
PipelineEvent::Running {
pipeline_position: 2,
pipeline_total: 2,
stage_id: StageId::Other("B"),
checkpoint: None
},
PipelineEvent::Error { stage_id: StageId::Other("B") },
PipelineEvent::Unwinding {
stage_id: StageId::Other("A"),
@ -717,15 +790,26 @@ mod tests {
result: UnwindOutput { checkpoint: StageCheckpoint::new(0) },
},
PipelineEvent::Running {
pipeline_position: 1,
pipeline_total: 2,
stage_id: StageId::Other("A"),
checkpoint: Some(StageCheckpoint::new(0))
},
PipelineEvent::Ran {
pipeline_position: 1,
pipeline_total: 2,
stage_id: StageId::Other("A"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},
PipelineEvent::Running { stage_id: StageId::Other("B"), checkpoint: None },
PipelineEvent::Running {
pipeline_position: 2,
pipeline_total: 2,
stage_id: StageId::Other("B"),
checkpoint: None
},
PipelineEvent::Ran {
pipeline_position: 2,
pipeline_total: 2,
stage_id: StageId::Other("B"),
result: ExecOutput { checkpoint: StageCheckpoint::new(10), done: true },
},