feat(stages): unwind prune checkpoints (#9528)

This commit is contained in:
Alexey Shekhirin
2024-07-16 11:47:19 +01:00
committed by GitHub
parent 01075f6980
commit fcc6307ada
6 changed files with 64 additions and 6 deletions

View File

@ -1,6 +1,8 @@
use reth_db_api::database::Database;
use reth_provider::{DatabaseProviderRW, PruneCheckpointReader};
use reth_prune::{PruneMode, PruneModes, PruneSegment, PrunerBuilder};
use reth_provider::{DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter};
use reth_prune::{
PruneMode, PruneModes, PruneSegment, PrunerBuilder, SegmentOutput, SegmentOutputCheckpoint,
};
use reth_stages_api::{
ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
};
@ -49,7 +51,34 @@ impl<DB: Database> Stage<DB> for PruneStage {
if result.progress.is_finished() {
Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true })
} else {
info!(target: "sync::stages::prune::exec", segments = ?result.segments, "Pruner has more data to prune");
if let Some((last_segment, last_segment_output)) = result.segments.last() {
match last_segment_output {
SegmentOutput {
progress,
pruned,
checkpoint:
checkpoint @ Some(SegmentOutputCheckpoint { block_number: Some(_), .. }),
} => {
info!(
target: "sync::stages::prune::exec",
?last_segment,
?progress,
?pruned,
?checkpoint,
"Last segment has more data to prune"
)
}
SegmentOutput { progress, pruned, checkpoint: _ } => {
info!(
target: "sync::stages::prune::exec",
?last_segment,
?progress,
?pruned,
"Last segment has more data to prune"
)
}
}
}
// We cannot set the checkpoint yet, because prune segments may have different highest
// pruned block numbers
Ok(ExecOutput { checkpoint: input.checkpoint(), done: false })
@ -58,10 +87,16 @@ impl<DB: Database> Stage<DB> for PruneStage {
fn unwind(
&mut self,
_provider: &DatabaseProviderRW<DB>,
provider: &DatabaseProviderRW<DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::prune::unwind", "Stage is always skipped");
// We cannot recover the data that was pruned in `execute`, so we just update the
// checkpoints.
let prune_checkpoints = provider.get_prune_checkpoints()?;
for (segment, mut checkpoint) in prune_checkpoints {
checkpoint.block_number = Some(input.unwind_to);
provider.save_prune_checkpoint(segment, checkpoint)?;
}
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
}

View File

@ -582,6 +582,10 @@ impl<DB: Database> PruneCheckpointReader for ProviderFactory<DB> {
) -> ProviderResult<Option<PruneCheckpoint>> {
self.provider()?.get_prune_checkpoint(segment)
}
fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
self.provider()?.get_prune_checkpoints()
}
}
impl<DB> Clone for ProviderFactory<DB> {

View File

@ -3327,6 +3327,14 @@ impl<TX: DbTx> PruneCheckpointReader for DatabaseProvider<TX> {
) -> ProviderResult<Option<PruneCheckpoint>> {
Ok(self.tx.get::<tables::PruneCheckpoints>(segment)?)
}
fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
Ok(self
.tx
.cursor_read::<tables::PruneCheckpoints>()?
.walk(None)?
.collect::<Result<_, _>>()?)
}
}
impl<TX: DbTxMut> PruneCheckpointWriter for DatabaseProvider<TX> {

View File

@ -581,6 +581,10 @@ where
) -> ProviderResult<Option<PruneCheckpoint>> {
self.database.provider()?.get_prune_checkpoint(segment)
}
fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
self.database.provider()?.get_prune_checkpoints()
}
}
impl<DB> ChainSpecProvider for BlockchainProvider<DB>

View File

@ -474,6 +474,10 @@ impl PruneCheckpointReader for NoopProvider {
) -> ProviderResult<Option<PruneCheckpoint>> {
Ok(None)
}
fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>> {
Ok(Vec::new())
}
}
impl StaticFileProviderFactory for NoopProvider {

View File

@ -4,11 +4,14 @@ use reth_storage_errors::provider::ProviderResult;
/// The trait for fetching prune checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
pub trait PruneCheckpointReader: Send + Sync {
/// Fetch the checkpoint for the given prune segment.
/// Fetch the prune checkpoint for the given segment.
fn get_prune_checkpoint(
&self,
segment: PruneSegment,
) -> ProviderResult<Option<PruneCheckpoint>>;
/// Fetch all the prune checkpoints.
fn get_prune_checkpoints(&self) -> ProviderResult<Vec<(PruneSegment, PruneCheckpoint)>>;
}
/// The trait for updating prune checkpoint related data.