feat: skip/error pipeline events (#70)

This commit is contained in:
Bjerg
2022-10-14 14:08:41 +02:00
committed by GitHub
parent 0d97014f4c
commit 4790256507
3 changed files with 117 additions and 49 deletions

View File

@ -1,7 +1,7 @@
use crate::{ use crate::{
error::*, error::*,
util::{db::TxContainer, opt::MaybeSender}, util::{db::TxContainer, opt::MaybeSender},
ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput, ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput,
}; };
use reth_db::{kv::Env, mdbx}; use reth_db::{kv::Env, mdbx};
use reth_primitives::BlockNumber; use reth_primitives::BlockNumber;
@ -154,15 +154,14 @@ where
/// Run the pipeline in an infinite loop. Will terminate early if the user has specified /// Run the pipeline in an infinite loop. Will terminate early if the user has specified
/// a `max_block` in the pipeline. /// a `max_block` in the pipeline.
pub async fn run(&mut self, db: &'db Env<E>) -> Result<(), PipelineError> { pub async fn run(&mut self, db: &'db Env<E>) -> Result<(), PipelineError> {
let mut state = PipelineState {
events_sender: self.events_sender.clone(),
max_block: self.max_block,
maximum_progress: None,
minimum_progress: None,
reached_tip: true,
};
loop { loop {
let mut state = PipelineState {
events_sender: self.events_sender.clone(),
max_block: self.max_block,
maximum_progress: None,
minimum_progress: None,
reached_tip: true,
};
let mut tx = TxContainer::new(db)?; let mut tx = TxContainer::new(db)?;
let next_action = self.run_loop(&mut state, &mut tx).await?; let next_action = self.run_loop(&mut state, &mut tx).await?;
@ -246,12 +245,7 @@ where
let mut stage_progress = stage_id.get_progress(&tx)?.unwrap_or_default(); let mut stage_progress = stage_id.get_progress(&tx)?.unwrap_or_default();
if stage_progress < to { if stage_progress < to {
debug!(from = %stage_progress, %to, "Unwind point too far for stage"); debug!(from = %stage_progress, %to, "Unwind point too far for stage");
self.events_sender self.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
.send(PipelineEvent::Unwound {
stage_id,
result: Some(UnwindOutput { stage_progress }),
})
.await?;
return Ok(()) return Ok(())
} }
@ -267,13 +261,11 @@ where
stage_id.save_progress(&tx, stage_progress)?; stage_id.save_progress(&tx, stage_progress)?;
self.events_sender self.events_sender
.send(PipelineEvent::Unwound { stage_id, result: Some(unwind_output) }) .send(PipelineEvent::Unwound { stage_id, result: unwind_output })
.await?; .await?;
} }
Err(err) => { Err(err) => {
self.events_sender self.events_sender.send(PipelineEvent::Error { stage_id }).await?;
.send(PipelineEvent::Unwound { stage_id, result: None })
.await?;
return Err(PipelineError::Stage(StageError::Internal(err))) return Err(PipelineError::Stage(StageError::Internal(err)))
} }
} }
@ -316,6 +308,7 @@ where
let stage_id = self.stage.id(); let stage_id = self.stage.id();
if self.require_tip && !state.reached_tip() { if self.require_tip && !state.reached_tip() {
info!("Tip not reached, skipping."); info!("Tip not reached, skipping.");
state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
// Stage requires us to reach the tip of the chain first, but we have // Stage requires us to reach the tip of the chain first, but we have
// not. // not.
@ -324,22 +317,24 @@ where
loop { loop {
let prev_progress = stage_id.get_progress(tx.get())?; let prev_progress = stage_id.get_progress(tx.get())?;
state
.events_sender
.send(PipelineEvent::Running { stage_id, stage_progress: prev_progress })
.await?;
let stage_reached_max_block = prev_progress let stage_reached_max_block = prev_progress
.zip(state.max_block) .zip(state.max_block)
.map_or(false, |(prev_progress, target)| prev_progress >= target); .map_or(false, |(prev_progress, target)| prev_progress >= target);
if stage_reached_max_block { if stage_reached_max_block {
info!("Stage reached maximum block, skipping."); info!("Stage reached maximum block, skipping.");
state.events_sender.send(PipelineEvent::Skipped { stage_id }).await?;
// We reached the maximum block, so we skip the stage // We reached the maximum block, so we skip the stage
state.set_reached_tip(true); state.set_reached_tip(true);
return Ok(ControlFlow::Continue) return Ok(ControlFlow::Continue)
} }
state
.events_sender
.send(PipelineEvent::Running { stage_id, stage_progress: prev_progress })
.await?;
match self match self
.stage .stage
.execute(tx.get_mut(), ExecInput { previous_stage, stage_progress: prev_progress }) .execute(tx.get_mut(), ExecInput { previous_stage, stage_progress: prev_progress })
@ -351,7 +346,7 @@ where
state state
.events_sender .events_sender
.send(PipelineEvent::Ran { stage_id, result: Some(out.clone()) }) .send(PipelineEvent::Ran { stage_id, result: out.clone() })
.await?; .await?;
// TODO: Make the commit interval configurable // TODO: Make the commit interval configurable
@ -365,7 +360,7 @@ where
} }
} }
Err(err) => { Err(err) => {
state.events_sender.send(PipelineEvent::Ran { stage_id, result: None }).await?; state.events_sender.send(PipelineEvent::Error { stage_id }).await?;
return if let StageError::Validation { block } = err { return if let StageError::Validation { block } = err {
debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error."); debug!(stage = %stage_id, bad_block = %block, "Stage encountered a validation error.");
@ -435,12 +430,12 @@ mod tests {
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran { PipelineEvent::Ran {
stage_id: StageId("A"), stage_id: StageId("A"),
result: Some(ExecOutput { stage_progress: 20, done: true, reached_tip: true }), result: ExecOutput { stage_progress: 20, done: true, reached_tip: true },
}, },
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran { PipelineEvent::Ran {
stage_id: StageId("B"), stage_id: StageId("B"),
result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }), result: ExecOutput { stage_progress: 10, done: true, reached_tip: true },
}, },
] ]
); );
@ -494,7 +489,7 @@ mod tests {
}, },
PipelineEvent::Unwound { PipelineEvent::Unwound {
stage_id: StageId("B"), stage_id: StageId("B"),
result: Some(UnwindOutput { stage_progress: 1 }), result: UnwindOutput { stage_progress: 1 },
}, },
PipelineEvent::Unwinding { PipelineEvent::Unwinding {
stage_id: StageId("A"), stage_id: StageId("A"),
@ -502,7 +497,7 @@ mod tests {
}, },
PipelineEvent::Unwound { PipelineEvent::Unwound {
stage_id: StageId("A"), stage_id: StageId("A"),
result: Some(UnwindOutput { stage_progress: 1 }), result: UnwindOutput { stage_progress: 1 },
}, },
] ]
); );
@ -568,27 +563,27 @@ mod tests {
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None }, PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran { PipelineEvent::Ran {
stage_id: StageId("A"), stage_id: StageId("A"),
result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }), result: ExecOutput { stage_progress: 10, done: true, reached_tip: true },
}, },
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran { stage_id: StageId("B"), result: None }, PipelineEvent::Error { stage_id: StageId("B") },
PipelineEvent::Unwinding { PipelineEvent::Unwinding {
stage_id: StageId("A"), stage_id: StageId("A"),
input: UnwindInput { stage_progress: 10, unwind_to: 0, bad_block: Some(5) } input: UnwindInput { stage_progress: 10, unwind_to: 0, bad_block: Some(5) }
}, },
PipelineEvent::Unwound { PipelineEvent::Unwound {
stage_id: StageId("A"), stage_id: StageId("A"),
result: Some(UnwindOutput { stage_progress: 0 }), result: UnwindOutput { stage_progress: 0 },
}, },
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0) }, PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(0) },
PipelineEvent::Ran { PipelineEvent::Ran {
stage_id: StageId("A"), stage_id: StageId("A"),
result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }), result: ExecOutput { stage_progress: 10, done: true, reached_tip: true },
}, },
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None }, PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran { PipelineEvent::Ran {
stage_id: StageId("B"), stage_id: StageId("B"),
result: Some(ExecOutput { stage_progress: 10, done: true, reached_tip: true }), result: ExecOutput { stage_progress: 10, done: true, reached_tip: true },
}, },
] ]
); );
@ -664,7 +659,7 @@ mod tests {
}, },
PipelineEvent::Unwound { PipelineEvent::Unwound {
stage_id: StageId("B"), stage_id: StageId("B"),
result: Some(UnwindOutput { stage_progress: 1 }), result: UnwindOutput { stage_progress: 1 },
}, },
PipelineEvent::Unwinding { PipelineEvent::Unwinding {
stage_id: StageId("C"), stage_id: StageId("C"),
@ -672,7 +667,7 @@ mod tests {
}, },
PipelineEvent::Unwound { PipelineEvent::Unwound {
stage_id: StageId("C"), stage_id: StageId("C"),
result: Some(UnwindOutput { stage_progress: 1 }), result: UnwindOutput { stage_progress: 1 },
}, },
PipelineEvent::Unwinding { PipelineEvent::Unwinding {
stage_id: StageId("A"), stage_id: StageId("A"),
@ -680,7 +675,67 @@ mod tests {
}, },
PipelineEvent::Unwound { PipelineEvent::Unwound {
stage_id: StageId("A"), stage_id: StageId("A"),
result: Some(UnwindOutput { stage_progress: 1 }), result: UnwindOutput { stage_progress: 1 },
},
]
);
}
/// Runs a simple pipeline.
#[tokio::test]
async fn skips_stages_that_require_tip() {
let (tx, rx) = channel(2);
let db = test_utils::create_test_db(EnvKind::RW);
// Run pipeline
tokio::spawn(async move {
Pipeline::<mdbx::WriteMap>::new_with_channel(tx)
.push(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput {
stage_progress: 5,
done: true,
reached_tip: false,
}))
.add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
})),
false,
)
.push(
TestStage::new(StageId("B")).add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
})),
true,
)
.set_max_block(Some(10))
.run(&db)
.await
});
// Check that the stages were run in order
assert_eq!(
ReceiverStream::new(rx).collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 5, reached_tip: false, done: true }
},
PipelineEvent::Skipped { stage_id: StageId("B") },
PipelineEvent::Running { stage_id: StageId("A"), stage_progress: Some(5) },
PipelineEvent::Ran {
stage_id: StageId("A"),
result: ExecOutput { stage_progress: 10, reached_tip: true, done: true }
},
PipelineEvent::Running { stage_id: StageId("B"), stage_progress: None },
PipelineEvent::Ran {
stage_id: StageId("B"),
result: ExecOutput { stage_progress: 10, reached_tip: true, done: true }
}, },
] ]
); );

View File

@ -1,6 +1,7 @@
use reth_primitives::BlockNumber; use reth_primitives::BlockNumber;
/// Determines the control flow during pipeline execution. /// Determines the control flow during pipeline execution.
#[derive(Debug)]
pub(crate) enum ControlFlow { pub(crate) enum ControlFlow {
/// An unwind was requested and must be performed before continuing. /// An unwind was requested and must be performed before continuing.
Unwind { Unwind {

View File

@ -5,6 +5,12 @@ use crate::{
use reth_primitives::BlockNumber; use reth_primitives::BlockNumber;
/// An event emitted by a [Pipeline][crate::Pipeline]. /// An event emitted by a [Pipeline][crate::Pipeline].
///
/// It is possible for multiple of these events to be emitted over the duration of a pipeline's
/// execution since:
///
/// - Other stages may ask the pipeline to unwind
/// - The pipeline will loop indefinitely unless a target block is set
#[derive(Debug, PartialEq, Eq, Clone)] #[derive(Debug, PartialEq, Eq, Clone)]
pub enum PipelineEvent { pub enum PipelineEvent {
/// Emitted when a stage is about to be run. /// Emitted when a stage is about to be run.
@ -15,16 +21,11 @@ pub enum PipelineEvent {
stage_progress: Option<BlockNumber>, stage_progress: Option<BlockNumber>,
}, },
/// Emitted when a stage has run a single time. /// Emitted when a stage has run a single time.
///
/// It is possible for multiple of these events to be emitted over the duration of a pipeline's
/// execution:
/// - If the pipeline loops, the stage will be run again at some point
/// - If the stage exits early but has acknowledged that it is not entirely done
Ran { Ran {
/// The stage that was run. /// The stage that was run.
stage_id: StageId, stage_id: StageId,
/// The result of executing the stage. If it is None then an error was encountered. /// The result of executing the stage.
result: Option<ExecOutput>, result: ExecOutput,
}, },
/// Emitted when a stage is about to be unwound. /// Emitted when a stage is about to be unwound.
Unwinding { Unwinding {
@ -34,13 +35,24 @@ pub enum PipelineEvent {
input: UnwindInput, input: UnwindInput,
}, },
/// Emitted when a stage has been unwound. /// Emitted when a stage has been unwound.
///
/// It is possible for multiple of these events to be emitted over the duration of a pipeline's
/// execution, since other stages may ask the pipeline to unwind.
Unwound { Unwound {
/// The stage that was unwound. /// The stage that was unwound.
stage_id: StageId, stage_id: StageId,
/// The result of unwinding the stage. If it is None then an error was encountered. /// The result of unwinding the stage.
result: Option<UnwindOutput>, result: UnwindOutput,
},
/// Emitted when a stage encounters an error either during execution or unwinding.
Error {
/// The stage that encountered an error.
stage_id: StageId,
},
/// Emitted when a stage was skipped due to it's run conditions not being met:
///
/// - The stage might have progressed beyond the point of our target block
/// - The stage might not need to be unwound since it has not progressed past the unwind target
/// - The stage requires that the pipeline has reached the tip, but it has not done so yet
Skipped {
/// The stage that was skipped.
stage_id: StageId,
}, },
} }