chore: move update_pipeline_stages to StageCheckpointWriter (#3229)

This commit is contained in:
joshieDo
2023-06-19 12:53:18 +01:00
committed by GitHub
parent 96abde0965
commit 1049202f0f
2 changed files with 31 additions and 22 deletions

View File

@ -837,27 +837,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> {
Ok(blocks)
}
/// Update all pipeline sync stage progress.
pub fn update_pipeline_stages(
&self,
block_number: BlockNumber,
drop_stage_checkpoint: bool,
) -> std::result::Result<(), TransactionError> {
// iterate over all existing stages in the table and update its progress.
let mut cursor = self.tx.cursor_write::<tables::SyncStage>()?;
while let Some((stage_name, checkpoint)) = cursor.next()? {
cursor.upsert(
stage_name,
StageCheckpoint {
block_number,
..if drop_stage_checkpoint { Default::default() } else { checkpoint }
},
)?
}
Ok(())
}
/// Insert storage change index to database. Used inside StorageHistoryIndex stage
pub fn insert_storage_history_index(
&self,
@ -1816,4 +1795,24 @@ impl<'this, TX: DbTxMut<'this>> StageCheckpointWriter for DatabaseProvider<'this
fn save_stage_checkpoint(&self, id: StageId, checkpoint: StageCheckpoint) -> Result<()> {
Ok(self.tx.put::<tables::SyncStage>(id.to_string(), checkpoint)?)
}
fn update_pipeline_stages(
&self,
block_number: BlockNumber,
drop_stage_checkpoint: bool,
) -> Result<()> {
// iterate over all existing stages in the table and update its progress.
let mut cursor = self.tx.cursor_write::<tables::SyncStage>()?;
while let Some((stage_name, checkpoint)) = cursor.next()? {
cursor.upsert(
stage_name,
StageCheckpoint {
block_number,
..if drop_stage_checkpoint { Default::default() } else { checkpoint }
},
)?
}
Ok(())
}
}

View File

@ -1,5 +1,8 @@
use reth_interfaces::Result;
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_primitives::{
stage::{StageCheckpoint, StageId},
BlockNumber,
};
/// The trait for fetching stage checkpoint related data.
#[auto_impl::auto_impl(&, Arc)]
@ -19,4 +22,11 @@ pub trait StageCheckpointWriter: Send + Sync {
/// Save stage checkpoint progress.
fn save_stage_checkpoint_progress(&self, id: StageId, checkpoint: Vec<u8>) -> Result<()>;
/// Update all pipeline sync stage progress.
fn update_pipeline_stages(
&self,
block_number: BlockNumber,
drop_stage_checkpoint: bool,
) -> Result<()>;
}