feat: simplified unwind priority (#45)

This commit is contained in:
Bjerg
2022-10-11 18:52:05 +02:00
committed by GitHub
parent 3fed7cfe21
commit a8c1eaf140

View File

@ -196,13 +196,7 @@ where
// Sort stages by unwind priority
let mut unwind_pipeline = {
let mut stages: Vec<_> = self.stages.iter_mut().enumerate().collect();
stages.sort_by_key(|(id, stage)| {
if stage.unwind_priority > 0 {
(id - stage.unwind_priority, 0)
} else {
(*id, 1)
}
});
stages.sort_by(|a, b| a.1.unwind_priority.cmp(&b.1.unwind_priority));
stages.reverse();
stages
};
@ -563,6 +557,98 @@ mod tests {
);
}
/// Unwinds a pipeline with unwind priorities specified.
///
/// The stages are inserted in the order A, B, C.
///
/// By default, the pipeline is unwound in reverse insert order, i.e. C, B, A.
/// In this test we reorder it to be B, C, A by setting these unwind priorities:
///
/// - Stage A: 1
/// - Stage B: 10 (higher is more priority)
/// - Stage C: 5
#[tokio::test]
async fn unwind_priority() {
let (tx, rx) = channel(2);
let db = utils::test_db().expect("Could not open test database");
// Run pipeline
tokio::spawn(async move {
let mut pipeline = Pipeline::<mdbx::WriteMap>::new()
.push_with_unwind_priority(
TestStage::new(StageId("A"))
.add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
}))
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
false,
1,
)
.push_with_unwind_priority(
TestStage::new(StageId("B"))
.add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
}))
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
false,
10,
)
.push_with_unwind_priority(
TestStage::new(StageId("C"))
.add_exec(Ok(ExecOutput {
stage_progress: 10,
done: true,
reached_tip: true,
}))
.add_unwind(Ok(UnwindOutput { stage_progress: 1 })),
false,
5,
)
.set_max_block(Some(10));
// Sync first
pipeline.run(&db).await.expect("Could not run pipeline");
// Unwind
pipeline.set_channel(tx).unwind(&db, 1, None).await.expect("Could not unwind pipeline");
});
// Check that the stages were unwound in reverse order
assert_eq!(
ReceiverStream::new(rx).collect::<Vec<PipelineEvent>>().await,
vec![
PipelineEvent::Unwinding {
stage_id: StageId("B"),
input: UnwindInput { stage_progress: 10, unwind_to: 1, bad_block: None }
},
PipelineEvent::Unwound {
stage_id: StageId("B"),
result: Some(UnwindOutput { stage_progress: 1 }),
},
PipelineEvent::Unwinding {
stage_id: StageId("C"),
input: UnwindInput { stage_progress: 10, unwind_to: 1, bad_block: None }
},
PipelineEvent::Unwound {
stage_id: StageId("C"),
result: Some(UnwindOutput { stage_progress: 1 }),
},
PipelineEvent::Unwinding {
stage_id: StageId("A"),
input: UnwindInput { stage_progress: 10, unwind_to: 1, bad_block: None }
},
PipelineEvent::Unwound {
stage_id: StageId("A"),
result: Some(UnwindOutput { stage_progress: 1 }),
},
]
);
}
mod utils {
use super::*;
use async_trait::async_trait;