From 966b31c191a443a6fc296cf2c57c9ce9cf3cd302 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Fri, 26 May 2023 14:01:01 +0300 Subject: [PATCH] chore(sync): rename headers progress to checkpoint (#2855) --- crates/stages/src/error.rs | 6 +-- crates/stages/src/stages/headers.rs | 82 ++++++++++++++--------------- 2 files changed, 44 insertions(+), 44 deletions(-) diff --git a/crates/stages/src/error.rs b/crates/stages/src/error.rs index c12ecada7..d1410a1d4 100644 --- a/crates/stages/src/error.rs +++ b/crates/stages/src/error.rs @@ -34,8 +34,8 @@ pub enum StageError { error: executor::BlockExecutionError, }, /// Invalid checkpoint passed to the stage - #[error("Invalid stage progress: {0}")] - StageProgress(u64), + #[error("Invalid stage checkpoint: {0}")] + StageCheckpoint(u64), /// Download channel closed #[error("Download channel closed")] ChannelClosed, @@ -70,7 +70,7 @@ impl StageError { StageError::Database(_) | StageError::Download(_) | StageError::DatabaseIntegrity(_) | - StageError::StageProgress(_) | + StageError::StageCheckpoint(_) | StageError::ExecutionError { .. } | StageError::ChannelClosed | StageError::Fatal(_) | diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 4783a4e98..d5b3de5fe 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -43,7 +43,7 @@ pub enum HeaderSyncMode { /// - [`CanonicalHeaders`][reth_db::tables::CanonicalHeaders] /// /// NOTE: This stage downloads headers in reverse. Upon returning the control flow to the pipeline, -/// the stage progress is not updated unless this stage is done. +/// the stage checkpoint is not updated until this stage is done. #[derive(Debug)] pub struct HeaderStage { /// Strategy for downloading the headers @@ -66,12 +66,12 @@ where fn is_stage_done( &self, tx: &Transaction<'_, DB>, - stage_progress: u64, + checkpoint: u64, ) -> Result { let mut header_cursor = tx.cursor_read::()?; let (head_num, _) = header_cursor - .seek_exact(stage_progress)? - .ok_or_else(|| ProviderError::HeaderNotFound(stage_progress.into()))?; + .seek_exact(checkpoint)? + .ok_or_else(|| ProviderError::HeaderNotFound(checkpoint.into()))?; // Check if the next entry is congruent Ok(header_cursor.next()?.map(|(next_num, _)| head_num + 1 == next_num).unwrap_or_default()) } @@ -82,7 +82,7 @@ where async fn get_sync_gap( &mut self, tx: &Transaction<'_, DB>, - stage_progress: u64, + checkpoint: u64, ) -> Result { // Create a cursor over canonical header hashes let mut cursor = tx.cursor_read::()?; @@ -90,8 +90,8 @@ where // Get head hash and reposition the cursor let (head_num, head_hash) = cursor - .seek_exact(stage_progress)? - .ok_or_else(|| ProviderError::HeaderNotFound(stage_progress.into()))?; + .seek_exact(checkpoint)? + .ok_or_else(|| ProviderError::HeaderNotFound(checkpoint.into()))?; // Construct head let (_, head) = header_cursor @@ -112,12 +112,12 @@ where // Decide the tip or error out on invalid input. // If the next element found in the cursor is not the "expected" next block per our current - // progress, then there is a gap in the database and we should start downloading in + // checkpoint, then there is a gap in the database and we should start downloading in // reverse from there. Else, it should use whatever the forkchoice state reports. let target = match next_header { - Some(header) if stage_progress + 1 != header.number => SyncTarget::Gap(header), + Some(header) if checkpoint + 1 != header.number => SyncTarget::Gap(header), None => self.next_sync_target(head_num).await, - _ => return Err(StageError::StageProgress(stage_progress)), + _ => return Err(StageError::StageCheckpoint(checkpoint)), }; Ok(SyncGap { local_head, target }) @@ -195,17 +195,17 @@ where tx: &mut Transaction<'_, DB>, input: ExecInput, ) -> Result { - let current_progress = input.checkpoint.unwrap_or_default(); + let current_checkpoint = input.checkpoint.unwrap_or_default(); // Lookup the head and tip of the sync range - let gap = self.get_sync_gap(tx, current_progress.block_number).await?; + let gap = self.get_sync_gap(tx, current_checkpoint.block_number).await?; let local_head = gap.local_head.number; let tip = gap.target.tip(); // Nothing to sync if gap.is_closed() { - info!(target: "sync::stages::headers", stage_progress = %current_progress, target = ?tip, "Target block already reached"); - return Ok(ExecOutput { checkpoint: current_progress, done: true }) + info!(target: "sync::stages::headers", checkpoint = %current_checkpoint, target = ?tip, "Target block already reached"); + return Ok(ExecOutput { checkpoint: current_checkpoint, done: true }) } debug!(target: "sync::stages::headers", ?tip, head = ?gap.local_head.hash(), "Commencing sync"); @@ -251,7 +251,7 @@ where None }; - let mut stage_checkpoint = current_progress + let mut stage_checkpoint = current_checkpoint .entities_stage_checkpoint() .unwrap_or(EntitiesCheckpoint { // If for some reason (e.g. due to DB migration) we don't have `processed` @@ -279,21 +279,21 @@ where // Write the headers to db self.write_headers::(tx, downloaded_headers)?.unwrap_or_default(); - if self.is_stage_done(tx, current_progress.block_number)? { - let stage_progress = current_progress.block_number.max( + if self.is_stage_done(tx, current_checkpoint.block_number)? { + let checkpoint = current_checkpoint.block_number.max( tx.cursor_read::()? .last()? .map(|(num, _)| num) .unwrap_or_default(), ); Ok(ExecOutput { - checkpoint: StageCheckpoint::new(stage_progress) + checkpoint: StageCheckpoint::new(checkpoint) .with_entities_stage_checkpoint(stage_checkpoint), done: true, }) } else { Ok(ExecOutput { - checkpoint: current_progress.with_entities_stage_checkpoint(stage_checkpoint), + checkpoint: current_checkpoint.with_entities_stage_checkpoint(stage_checkpoint), done: false, }) } @@ -323,7 +323,7 @@ where checkpoint = checkpoint.with_entities_stage_checkpoint(stage_checkpoint); } - info!(target: "sync::stages::headers", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished"); + info!(target: "sync::stages::headers", to_block = input.unwind_to, checkpoint = input.unwind_to, is_final_range = true, "Unwind iteration finished"); Ok(UnwindOutput { checkpoint }) } } @@ -427,7 +427,7 @@ mod tests { // patch td table for `update_head` call self.tx.commit(|tx| tx.put::(head.number, U256::ZERO.into()))?; - // use previous progress as seed size + // use previous checkpoint as seed size let end = input.previous_stage.map(|(_, num)| num).unwrap_or_default().block_number + 1; @@ -446,12 +446,12 @@ mod tests { input: ExecInput, output: Option, ) -> Result<(), TestRunnerError> { - let initial_stage_progress = input.checkpoint.unwrap_or_default().block_number; + let initial_checkpoint = input.checkpoint.unwrap_or_default().block_number; match output { - Some(output) if output.checkpoint.block_number > initial_stage_progress => { + Some(output) if output.checkpoint.block_number > initial_checkpoint => { self.tx.query(|tx| { for block_num in - (initial_stage_progress..output.checkpoint.block_number).rev() + (initial_checkpoint..output.checkpoint.block_number).rev() { // look up the header hash let hash = tx @@ -470,7 +470,7 @@ mod tests { Ok(()) })?; } - _ => self.check_no_header_entry_above(initial_stage_progress)?, + _ => self.check_no_header_entry_above(initial_checkpoint)?, }; Ok(()) } @@ -535,10 +535,10 @@ mod tests { #[tokio::test] async fn execute_with_linear_downloader() { let mut runner = HeadersTestRunner::with_linear_downloader(); - let (stage_progress, previous_stage) = (1000, 1200); + let (checkpoint, previous_stage) = (1000, 1200); let input = ExecInput { previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), - checkpoint: Some(StageCheckpoint::new(stage_progress)), + checkpoint: Some(StageCheckpoint::new(checkpoint)), }; let headers = runner.seed_execution(input).expect("failed to seed execution"); let rx = runner.execute(input); @@ -558,7 +558,7 @@ mod tests { })) }, done: true }) if block_number == tip.number // -1 because we don't need to download the local head - && processed == stage_progress + headers.len() as u64 - 1 + && processed == checkpoint + headers.len() as u64 - 1 && total == tip.number); assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed"); } @@ -574,16 +574,16 @@ mod tests { runner.send_tip(consensus_tip); // Genesis - let stage_progress = 0; + let checkpoint = 0; let head = random_header(0, None); let gap_fill = random_header(1, Some(head.hash())); let gap_tip = random_header(2, Some(gap_fill.hash())); // Empty database assert_matches!( - stage.get_sync_gap(&tx, stage_progress).await, + stage.get_sync_gap(&tx, checkpoint).await, Err(StageError::DatabaseIntegrity(ProviderError::HeaderNotFound(block_number))) - if block_number.as_number().unwrap() == stage_progress + if block_number.as_number().unwrap() == checkpoint ); // Checkpoint and no gap @@ -592,7 +592,7 @@ mod tests { tx.put::(head.number, head.clone().unseal()) .expect("failed to write header"); - let gap = stage.get_sync_gap(&tx, stage_progress).await.unwrap(); + let gap = stage.get_sync_gap(&tx, checkpoint).await.unwrap(); assert_eq!(gap.local_head, head); assert_eq!(gap.target.tip(), consensus_tip.into()); @@ -602,7 +602,7 @@ mod tests { tx.put::(gap_tip.number, gap_tip.clone().unseal()) .expect("failed to write header"); - let gap = stage.get_sync_gap(&tx, stage_progress).await.unwrap(); + let gap = stage.get_sync_gap(&tx, checkpoint).await.unwrap(); assert_eq!(gap.local_head, head); assert_eq!(gap.target.tip(), gap_tip.parent_hash.into()); @@ -613,20 +613,20 @@ mod tests { .expect("failed to write header"); assert_matches!( - stage.get_sync_gap(&tx, stage_progress).await, - Err(StageError::StageProgress(progress)) if progress == stage_progress + stage.get_sync_gap(&tx, checkpoint).await, + Err(StageError::StageCheckpoint(_checkpoint)) if _checkpoint == checkpoint ); } /// Execute the stage in two steps #[tokio::test] - async fn execute_from_previous_progress() { + async fn execute_from_previous_checkpoint() { let mut runner = HeadersTestRunner::with_linear_downloader(); // pick range that's larger than the configured headers batch size - let (stage_progress, previous_stage) = (600, 1200); + let (checkpoint, previous_stage) = (600, 1200); let mut input = ExecInput { previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))), - checkpoint: Some(StageCheckpoint::new(stage_progress)), + checkpoint: Some(StageCheckpoint::new(checkpoint)), }; let headers = runner.seed_execution(input).expect("failed to seed execution"); let rx = runner.execute(input); @@ -644,8 +644,8 @@ mod tests { processed, total: Some(total), })) - }, done: false }) if block_number == stage_progress && - processed == stage_progress + 500 && + }, done: false }) if block_number == checkpoint && + processed == checkpoint + 500 && total == tip.number); runner.client.clear().await; @@ -663,7 +663,7 @@ mod tests { })) }, done: true }) if block_number == tip.number // -1 because we don't need to download the local head - && processed == stage_progress + headers.len() as u64 - 1 + && processed == checkpoint + headers.len() as u64 - 1 && total == tip.number); assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed"); }