chore(sync): rename headers progress to checkpoint (#2855)

This commit is contained in:
Roman Krasiuk
2023-05-26 14:01:01 +03:00
committed by GitHub
parent 038cfdee8e
commit 966b31c191
2 changed files with 44 additions and 44 deletions

View File

@ -34,8 +34,8 @@ pub enum StageError {
error: executor::BlockExecutionError,
},
/// Invalid checkpoint passed to the stage
#[error("Invalid stage progress: {0}")]
StageProgress(u64),
#[error("Invalid stage checkpoint: {0}")]
StageCheckpoint(u64),
/// Download channel closed
#[error("Download channel closed")]
ChannelClosed,
@ -70,7 +70,7 @@ impl StageError {
StageError::Database(_) |
StageError::Download(_) |
StageError::DatabaseIntegrity(_) |
StageError::StageProgress(_) |
StageError::StageCheckpoint(_) |
StageError::ExecutionError { .. } |
StageError::ChannelClosed |
StageError::Fatal(_) |

View File

@ -43,7 +43,7 @@ pub enum HeaderSyncMode {
/// - [`CanonicalHeaders`][reth_db::tables::CanonicalHeaders]
///
/// NOTE: This stage downloads headers in reverse. Upon returning the control flow to the pipeline,
/// the stage progress is not updated unless this stage is done.
/// the stage checkpoint is not updated until this stage is done.
#[derive(Debug)]
pub struct HeaderStage<D: HeaderDownloader> {
/// Strategy for downloading the headers
@ -66,12 +66,12 @@ where
fn is_stage_done<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
stage_progress: u64,
checkpoint: u64,
) -> Result<bool, StageError> {
let mut header_cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
let (head_num, _) = header_cursor
.seek_exact(stage_progress)?
.ok_or_else(|| ProviderError::HeaderNotFound(stage_progress.into()))?;
.seek_exact(checkpoint)?
.ok_or_else(|| ProviderError::HeaderNotFound(checkpoint.into()))?;
// Check if the next entry is congruent
Ok(header_cursor.next()?.map(|(next_num, _)| head_num + 1 == next_num).unwrap_or_default())
}
@ -82,7 +82,7 @@ where
async fn get_sync_gap<DB: Database>(
&mut self,
tx: &Transaction<'_, DB>,
stage_progress: u64,
checkpoint: u64,
) -> Result<SyncGap, StageError> {
// Create a cursor over canonical header hashes
let mut cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
@ -90,8 +90,8 @@ where
// Get head hash and reposition the cursor
let (head_num, head_hash) = cursor
.seek_exact(stage_progress)?
.ok_or_else(|| ProviderError::HeaderNotFound(stage_progress.into()))?;
.seek_exact(checkpoint)?
.ok_or_else(|| ProviderError::HeaderNotFound(checkpoint.into()))?;
// Construct head
let (_, head) = header_cursor
@ -112,12 +112,12 @@ where
// Decide the tip or error out on invalid input.
// If the next element found in the cursor is not the "expected" next block per our current
// progress, then there is a gap in the database and we should start downloading in
// checkpoint, then there is a gap in the database and we should start downloading in
// reverse from there. Else, it should use whatever the forkchoice state reports.
let target = match next_header {
Some(header) if stage_progress + 1 != header.number => SyncTarget::Gap(header),
Some(header) if checkpoint + 1 != header.number => SyncTarget::Gap(header),
None => self.next_sync_target(head_num).await,
_ => return Err(StageError::StageProgress(stage_progress)),
_ => return Err(StageError::StageCheckpoint(checkpoint)),
};
Ok(SyncGap { local_head, target })
@ -195,17 +195,17 @@ where
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let current_progress = input.checkpoint.unwrap_or_default();
let current_checkpoint = input.checkpoint.unwrap_or_default();
// Lookup the head and tip of the sync range
let gap = self.get_sync_gap(tx, current_progress.block_number).await?;
let gap = self.get_sync_gap(tx, current_checkpoint.block_number).await?;
let local_head = gap.local_head.number;
let tip = gap.target.tip();
// Nothing to sync
if gap.is_closed() {
info!(target: "sync::stages::headers", stage_progress = %current_progress, target = ?tip, "Target block already reached");
return Ok(ExecOutput { checkpoint: current_progress, done: true })
info!(target: "sync::stages::headers", checkpoint = %current_checkpoint, target = ?tip, "Target block already reached");
return Ok(ExecOutput { checkpoint: current_checkpoint, done: true })
}
debug!(target: "sync::stages::headers", ?tip, head = ?gap.local_head.hash(), "Commencing sync");
@ -251,7 +251,7 @@ where
None
};
let mut stage_checkpoint = current_progress
let mut stage_checkpoint = current_checkpoint
.entities_stage_checkpoint()
.unwrap_or(EntitiesCheckpoint {
// If for some reason (e.g. due to DB migration) we don't have `processed`
@ -279,21 +279,21 @@ where
// Write the headers to db
self.write_headers::<DB>(tx, downloaded_headers)?.unwrap_or_default();
if self.is_stage_done(tx, current_progress.block_number)? {
let stage_progress = current_progress.block_number.max(
if self.is_stage_done(tx, current_checkpoint.block_number)? {
let checkpoint = current_checkpoint.block_number.max(
tx.cursor_read::<tables::CanonicalHeaders>()?
.last()?
.map(|(num, _)| num)
.unwrap_or_default(),
);
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(stage_progress)
checkpoint: StageCheckpoint::new(checkpoint)
.with_entities_stage_checkpoint(stage_checkpoint),
done: true,
})
} else {
Ok(ExecOutput {
checkpoint: current_progress.with_entities_stage_checkpoint(stage_checkpoint),
checkpoint: current_checkpoint.with_entities_stage_checkpoint(stage_checkpoint),
done: false,
})
}
@ -323,7 +323,7 @@ where
checkpoint = checkpoint.with_entities_stage_checkpoint(stage_checkpoint);
}
info!(target: "sync::stages::headers", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
info!(target: "sync::stages::headers", to_block = input.unwind_to, checkpoint = input.unwind_to, is_final_range = true, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint })
}
}
@ -427,7 +427,7 @@ mod tests {
// patch td table for `update_head` call
self.tx.commit(|tx| tx.put::<tables::HeaderTD>(head.number, U256::ZERO.into()))?;
// use previous progress as seed size
// use previous checkpoint as seed size
let end =
input.previous_stage.map(|(_, num)| num).unwrap_or_default().block_number + 1;
@ -446,12 +446,12 @@ mod tests {
input: ExecInput,
output: Option<ExecOutput>,
) -> Result<(), TestRunnerError> {
let initial_stage_progress = input.checkpoint.unwrap_or_default().block_number;
let initial_checkpoint = input.checkpoint.unwrap_or_default().block_number;
match output {
Some(output) if output.checkpoint.block_number > initial_stage_progress => {
Some(output) if output.checkpoint.block_number > initial_checkpoint => {
self.tx.query(|tx| {
for block_num in
(initial_stage_progress..output.checkpoint.block_number).rev()
(initial_checkpoint..output.checkpoint.block_number).rev()
{
// look up the header hash
let hash = tx
@ -470,7 +470,7 @@ mod tests {
Ok(())
})?;
}
_ => self.check_no_header_entry_above(initial_stage_progress)?,
_ => self.check_no_header_entry_above(initial_checkpoint)?,
};
Ok(())
}
@ -535,10 +535,10 @@ mod tests {
#[tokio::test]
async fn execute_with_linear_downloader() {
let mut runner = HeadersTestRunner::with_linear_downloader();
let (stage_progress, previous_stage) = (1000, 1200);
let (checkpoint, previous_stage) = (1000, 1200);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
checkpoint: Some(StageCheckpoint::new(checkpoint)),
};
let headers = runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
@ -558,7 +558,7 @@ mod tests {
}))
}, done: true }) if block_number == tip.number
// -1 because we don't need to download the local head
&& processed == stage_progress + headers.len() as u64 - 1
&& processed == checkpoint + headers.len() as u64 - 1
&& total == tip.number);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
@ -574,16 +574,16 @@ mod tests {
runner.send_tip(consensus_tip);
// Genesis
let stage_progress = 0;
let checkpoint = 0;
let head = random_header(0, None);
let gap_fill = random_header(1, Some(head.hash()));
let gap_tip = random_header(2, Some(gap_fill.hash()));
// Empty database
assert_matches!(
stage.get_sync_gap(&tx, stage_progress).await,
stage.get_sync_gap(&tx, checkpoint).await,
Err(StageError::DatabaseIntegrity(ProviderError::HeaderNotFound(block_number)))
if block_number.as_number().unwrap() == stage_progress
if block_number.as_number().unwrap() == checkpoint
);
// Checkpoint and no gap
@ -592,7 +592,7 @@ mod tests {
tx.put::<tables::Headers>(head.number, head.clone().unseal())
.expect("failed to write header");
let gap = stage.get_sync_gap(&tx, stage_progress).await.unwrap();
let gap = stage.get_sync_gap(&tx, checkpoint).await.unwrap();
assert_eq!(gap.local_head, head);
assert_eq!(gap.target.tip(), consensus_tip.into());
@ -602,7 +602,7 @@ mod tests {
tx.put::<tables::Headers>(gap_tip.number, gap_tip.clone().unseal())
.expect("failed to write header");
let gap = stage.get_sync_gap(&tx, stage_progress).await.unwrap();
let gap = stage.get_sync_gap(&tx, checkpoint).await.unwrap();
assert_eq!(gap.local_head, head);
assert_eq!(gap.target.tip(), gap_tip.parent_hash.into());
@ -613,20 +613,20 @@ mod tests {
.expect("failed to write header");
assert_matches!(
stage.get_sync_gap(&tx, stage_progress).await,
Err(StageError::StageProgress(progress)) if progress == stage_progress
stage.get_sync_gap(&tx, checkpoint).await,
Err(StageError::StageCheckpoint(_checkpoint)) if _checkpoint == checkpoint
);
}
/// Execute the stage in two steps
#[tokio::test]
async fn execute_from_previous_progress() {
async fn execute_from_previous_checkpoint() {
let mut runner = HeadersTestRunner::with_linear_downloader();
// pick range that's larger than the configured headers batch size
let (stage_progress, previous_stage) = (600, 1200);
let (checkpoint, previous_stage) = (600, 1200);
let mut input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
checkpoint: Some(StageCheckpoint::new(checkpoint)),
};
let headers = runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
@ -644,8 +644,8 @@ mod tests {
processed,
total: Some(total),
}))
}, done: false }) if block_number == stage_progress &&
processed == stage_progress + 500 &&
}, done: false }) if block_number == checkpoint &&
processed == checkpoint + 500 &&
total == tip.number);
runner.client.clear().await;
@ -663,7 +663,7 @@ mod tests {
}))
}, done: true }) if block_number == tip.number
// -1 because we don't need to download the local head
&& processed == stage_progress + headers.len() as u64 - 1
&& processed == checkpoint + headers.len() as u64 - 1
&& total == tip.number);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}