From a8c1eaf140b1d5be509eca6c4e65b79988af1ae0 Mon Sep 17 00:00:00 2001 From: Bjerg Date: Tue, 11 Oct 2022 18:52:05 +0200 Subject: [PATCH] feat: simplified unwind priority (#45) --- crates/stages/src/pipeline.rs | 100 +++++++++++++++++++++++++++++++--- 1 file changed, 93 insertions(+), 7 deletions(-) diff --git a/crates/stages/src/pipeline.rs b/crates/stages/src/pipeline.rs index 87981de39..d3e84e695 100644 --- a/crates/stages/src/pipeline.rs +++ b/crates/stages/src/pipeline.rs @@ -196,13 +196,7 @@ where // Sort stages by unwind priority let mut unwind_pipeline = { let mut stages: Vec<_> = self.stages.iter_mut().enumerate().collect(); - stages.sort_by_key(|(id, stage)| { - if stage.unwind_priority > 0 { - (id - stage.unwind_priority, 0) - } else { - (*id, 1) - } - }); + stages.sort_by(|a, b| a.1.unwind_priority.cmp(&b.1.unwind_priority)); stages.reverse(); stages }; @@ -563,6 +557,98 @@ mod tests { ); } + /// Unwinds a pipeline with unwind priorities specified. + /// + /// The stages are inserted in the order A, B, C. + /// + /// By default, the pipeline is unwound in reverse insert order, i.e. C, B, A. + /// In this test we reorder it to be B, C, A by setting these unwind priorities: + /// + /// - Stage A: 1 + /// - Stage B: 10 (higher is more priority) + /// - Stage C: 5 + #[tokio::test] + async fn unwind_priority() { + let (tx, rx) = channel(2); + let db = utils::test_db().expect("Could not open test database"); + + // Run pipeline + tokio::spawn(async move { + let mut pipeline = Pipeline::::new() + .push_with_unwind_priority( + TestStage::new(StageId("A")) + .add_exec(Ok(ExecOutput { + stage_progress: 10, + done: true, + reached_tip: true, + })) + .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), + false, + 1, + ) + .push_with_unwind_priority( + TestStage::new(StageId("B")) + .add_exec(Ok(ExecOutput { + stage_progress: 10, + done: true, + reached_tip: true, + })) + .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), + false, + 10, + ) + .push_with_unwind_priority( + TestStage::new(StageId("C")) + .add_exec(Ok(ExecOutput { + stage_progress: 10, + done: true, + reached_tip: true, + })) + .add_unwind(Ok(UnwindOutput { stage_progress: 1 })), + false, + 5, + ) + .set_max_block(Some(10)); + + // Sync first + pipeline.run(&db).await.expect("Could not run pipeline"); + + // Unwind + pipeline.set_channel(tx).unwind(&db, 1, None).await.expect("Could not unwind pipeline"); + }); + + // Check that the stages were unwound in reverse order + assert_eq!( + ReceiverStream::new(rx).collect::>().await, + vec![ + PipelineEvent::Unwinding { + stage_id: StageId("B"), + input: UnwindInput { stage_progress: 10, unwind_to: 1, bad_block: None } + }, + PipelineEvent::Unwound { + stage_id: StageId("B"), + result: Some(UnwindOutput { stage_progress: 1 }), + }, + PipelineEvent::Unwinding { + stage_id: StageId("C"), + input: UnwindInput { stage_progress: 10, unwind_to: 1, bad_block: None } + }, + PipelineEvent::Unwound { + stage_id: StageId("C"), + result: Some(UnwindOutput { stage_progress: 1 }), + }, + PipelineEvent::Unwinding { + stage_id: StageId("A"), + input: UnwindInput { stage_progress: 10, unwind_to: 1, bad_block: None } + }, + PipelineEvent::Unwound { + stage_id: StageId("A"), + result: Some(UnwindOutput { stage_progress: 1 }), + }, + ] + ); + } + mod utils { use super::*; use async_trait::async_trait;