fix(stages): save full block range to index history checkpoint (#3107)

This commit is contained in:
Alexey Shekhirin
2023-06-12 19:14:39 +04:00
committed by GitHub
parent c51a222b26
commit e0cdb0bc0b
3 changed files with 132 additions and 5 deletions

View File

@ -47,7 +47,13 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let mut stage_checkpoint = stage_checkpoint(tx, input.checkpoint(), &range)?;
let mut stage_checkpoint = stage_checkpoint(
tx,
input.checkpoint(),
// It is important to provide the full block range into the checkpoint,
// not the one accounting for commit threshold, to get the correct range end.
&input.next_block_range(),
)?;
let indices = tx.get_account_transition_ids_from_changeset(range.clone())?;
let changesets = indices.values().map(|blocks| blocks.len() as u64).sum::<u64>();
@ -431,6 +437,64 @@ mod tests {
);
}
#[tokio::test]
async fn stage_checkpoint_range() {
// init
let test_tx = TestTransaction::default();
// setup
partial_setup(&test_tx);
// run
{
let mut stage = IndexAccountHistoryStage { commit_threshold: 4 }; // Two runs required
let mut tx = test_tx.inner();
let mut input = ExecInput { target: Some(5), ..Default::default() };
let out = stage.execute(&mut tx, input).await.unwrap();
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(4).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 1, to: 5 },
progress: EntitiesCheckpoint { processed: 1, total: 2 }
}
),
done: false
}
);
input.checkpoint = Some(out.checkpoint);
let out = stage.execute(&mut tx, input).await.unwrap();
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 5, to: 5 },
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
}
);
tx.commit().unwrap();
}
// verify
let table = cast(test_tx.table::<tables::AccountHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![4, 5])]));
// unwind
unwind(&test_tx, 5, 0).await;
// verify initial state
let table = test_tx.table::<tables::AccountHistory>().unwrap();
assert!(table.is_empty());
}
#[test]
fn stage_checkpoint_recalculation() {
let tx = TestTransaction::default();

View File

@ -50,7 +50,13 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
let (range, is_final_range) = input.next_block_range_with_threshold(self.commit_threshold);
let mut stage_checkpoint = stage_checkpoint(tx, input.checkpoint(), &range)?;
let mut stage_checkpoint = stage_checkpoint(
tx,
input.checkpoint(),
// It is important to provide the full block range into the checkpoint,
// not the one accounting for commit threshold, to get the correct range end.
&input.next_block_range(),
)?;
let indices = tx.get_storage_transition_ids_from_changeset(range.clone())?;
let changesets = indices.values().map(|blocks| blocks.len() as u64).sum::<u64>();
@ -446,6 +452,64 @@ mod tests {
);
}
#[tokio::test]
async fn stage_checkpoint_range() {
// init
let test_tx = TestTransaction::default();
// setup
partial_setup(&test_tx);
// run
{
let mut stage = IndexStorageHistoryStage { commit_threshold: 4 }; // Two runs required
let mut tx = test_tx.inner();
let mut input = ExecInput { target: Some(5), ..Default::default() };
let out = stage.execute(&mut tx, input).await.unwrap();
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(4).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 1, to: 5 },
progress: EntitiesCheckpoint { processed: 1, total: 2 }
}
),
done: false
}
);
input.checkpoint = Some(out.checkpoint);
let out = stage.execute(&mut tx, input).await.unwrap();
assert_eq!(
out,
ExecOutput {
checkpoint: StageCheckpoint::new(5).with_index_history_stage_checkpoint(
IndexHistoryCheckpoint {
block_range: CheckpointBlockRange { from: 5, to: 5 },
progress: EntitiesCheckpoint { processed: 2, total: 2 }
}
),
done: true
}
);
tx.commit().unwrap();
}
// verify
let table = cast(test_tx.table::<tables::StorageHistory>().unwrap());
assert_eq!(table, BTreeMap::from([(shard(u64::MAX), vec![4, 5])]));
// unwind
unwind(&test_tx, 5, 0).await;
// verify initial state
let table = test_tx.table::<tables::StorageHistory>().unwrap();
assert!(table.is_empty());
}
#[test]
fn stage_checkpoint_recalculation() {
let tx = TestTransaction::default();