feat(stages): checkpoint hashing stages into a new table (#2735)

This commit is contained in:
Alexey Shekhirin
2023-05-22 18:12:46 +04:00
committed by GitHub
parent 1b1ca9e2ca
commit 7273ce8028
4 changed files with 174 additions and 180 deletions

View File

@ -82,12 +82,10 @@ impl Command {
tool.db.update(|tx| {
// Clear hashed accounts
tx.clear::<tables::HashedAccount>()?;
tx.put::<tables::SyncStageProgress>(ACCOUNT_HASHING.0.into(), Vec::new())?;
tx.put::<tables::SyncStage>(ACCOUNT_HASHING.0.to_string(), Default::default())?;
// Clear hashed storages
tx.clear::<tables::HashedStorage>()?;
tx.put::<tables::SyncStageProgress>(STORAGE_HASHING.0.into(), Vec::new())?;
tx.put::<tables::SyncStage>(STORAGE_HASHING.0.to_string(), Default::default())?;
Ok::<_, eyre::Error>(())

View File

@ -138,6 +138,40 @@ impl StageCheckpoint {
pub fn new(block_number: BlockNumber) -> Self {
Self { block_number, ..Default::default() }
}
/// Returns the account hashing stage checkpoint, if any.
pub fn account_hashing_stage_checkpoint(&self) -> Option<AccountHashingCheckpoint> {
match self.stage_checkpoint {
Some(StageUnitCheckpoint::Account(checkpoint)) => Some(checkpoint),
_ => None,
}
}
/// Returns the storage hashing stage checkpoint, if any.
pub fn storage_hashing_stage_checkpoint(&self) -> Option<StorageHashingCheckpoint> {
match self.stage_checkpoint {
Some(StageUnitCheckpoint::Storage(checkpoint)) => Some(checkpoint),
_ => None,
}
}
/// Sets the stage checkpoint to account hashing.
pub fn with_account_hashing_stage_checkpoint(
mut self,
checkpoint: AccountHashingCheckpoint,
) -> Self {
self.stage_checkpoint = Some(StageUnitCheckpoint::Account(checkpoint));
self
}
/// Sets the stage checkpoint to storage hashing.
pub fn with_storage_hashing_stage_checkpoint(
mut self,
checkpoint: StorageHashingCheckpoint,
) -> Self {
self.stage_checkpoint = Some(StageUnitCheckpoint::Storage(checkpoint));
self
}
}
// TODO(alexey): ideally, we'd want to display block number + stage-specific metric (if available)

View File

@ -1,7 +1,6 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use itertools::Itertools;
use rayon::slice::ParallelSliceMut;
use reth_codecs::Compact;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
@ -39,43 +38,6 @@ impl Default for AccountHashingStage {
}
}
impl AccountHashingStage {
/// Saves the hashing progress
pub fn save_checkpoint<DB: Database>(
&mut self,
tx: &Transaction<'_, DB>,
checkpoint: AccountHashingCheckpoint,
) -> Result<(), StageError> {
debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?checkpoint, "Saving inner account hashing checkpoint");
let mut buf = vec![];
checkpoint.to_compact(&mut buf);
Ok(tx.put::<tables::SyncStageProgress>(ACCOUNT_HASHING.0.into(), buf)?)
}
/// Gets the hashing progress
pub fn get_checkpoint<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
) -> Result<AccountHashingCheckpoint, StageError> {
let buf =
tx.get::<tables::SyncStageProgress>(ACCOUNT_HASHING.0.into())?.unwrap_or_default();
if buf.is_empty() {
return Ok(AccountHashingCheckpoint::default())
}
let (checkpoint, _) = AccountHashingCheckpoint::from_compact(&buf, buf.len());
if checkpoint.address.is_some() {
debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?checkpoint, "Continuing inner account hashing checkpoint");
}
Ok(checkpoint)
}
}
// TODO: Rewrite this
/// `SeedOpts` provides configuration parameters for calling `AccountHashingStage::seed`
/// in unit tests or benchmarks to generate an initial database state for running the
@ -173,22 +135,31 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
// AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
// genesis accounts are not in changeset.
if to_block - from_block > self.clean_threshold || from_block == 1 {
let mut checkpoint = self.get_checkpoint(tx)?;
let stage_checkpoint = input
.checkpoint
.and_then(|checkpoint| checkpoint.account_hashing_stage_checkpoint());
if checkpoint.address.is_none() ||
// Checkpoint is no longer valid if the range of transitions changed.
// An already hashed account may have been changed with the new range, and therefore should be hashed again.
checkpoint.from != from_block ||
checkpoint.to != to_block
{
// clear table, load all accounts and hash it
tx.clear::<tables::HashedAccount>()?;
let start_address = match stage_checkpoint {
Some(AccountHashingCheckpoint { address: address @ Some(_), from, to })
// Checkpoint is only valid if the range of transitions didn't change.
// An already hashed account may have been changed with the new range,
// and therefore should be hashed again.
if from == from_block && to == to_block =>
{
debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?stage_checkpoint, "Continuing inner account hashing checkpoint");
checkpoint = AccountHashingCheckpoint::default();
self.save_checkpoint(tx, checkpoint)?;
address
}
_ => {
// clear table, load all accounts and hash it
tx.clear::<tables::HashedAccount>()?;
None
}
}
.take()
.map(RawKey::new);
let start_address = checkpoint.address.take().map(RawKey::new);
let next_address = {
let mut accounts_cursor =
tx.cursor_read::<RawTable<tables::PlainAccountState>>()?;
@ -245,18 +216,15 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
};
if let Some((next_address, _)) = &next_address {
checkpoint.address = Some(next_address.key().unwrap());
checkpoint.from = from_block;
checkpoint.to = to_block;
}
self.save_checkpoint(tx, checkpoint)?;
if next_address.is_some() {
// from block is correct here as were are iteration over state for this
// particular block
info!(target: "sync::stages::hashing_account", stage_progress = %input.checkpoint(), is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { checkpoint: input.checkpoint(), done: false })
let checkpoint = input.checkpoint().with_account_hashing_stage_checkpoint(
AccountHashingCheckpoint {
address: Some(next_address.key().unwrap()),
from: from_block,
to: to_block,
},
);
info!(target: "sync::stages::hashing_account", stage_progress = %checkpoint, is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { checkpoint, done: false })
}
} else {
// Aggregate all transition changesets and and make list of account that have been
@ -270,8 +238,12 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
tx.insert_account_for_hashing(accounts.into_iter())?;
}
info!(target: "sync::stages::hashing_account", stage_progress = %input.previous_stage_checkpoint(), is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { checkpoint: input.previous_stage_checkpoint(), done: true })
// We finished the hashing stage, no future iterations is expected for the same block range,
// so no checkpoint is needed.
let checkpoint = input.previous_stage_checkpoint();
info!(target: "sync::stages::hashing_account", stage_progress = %checkpoint, is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { checkpoint, done: true })
}
/// Unwind the stage.
@ -295,11 +267,11 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
UnwindStageTestRunner, PREV_STAGE_ID,
stage_test_suite_ext, ExecuteStageTestRunner, TestRunnerError, UnwindStageTestRunner,
PREV_STAGE_ID,
};
use assert_matches::assert_matches;
use reth_primitives::{Account, U256};
use reth_primitives::{Account, StageUnitCheckpoint, U256};
use test_utils::*;
stage_test_suite_ext!(AccountHashingTestRunner, account_hashing);
@ -342,15 +314,10 @@ mod tests {
runner.seed_execution(input).expect("failed to seed execution");
// first run, hash first five account.
// first run, hash first five accounts.
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 10, .. }, done: false })
);
assert_eq!(runner.tx.table::<tables::HashedAccount>().unwrap().len(), 5);
let fifth_address = runner
.tx
.query(|tx| {
@ -364,20 +331,31 @@ mod tests {
})
.unwrap();
let stage_progress = runner.stage().get_checkpoint(&runner.tx.inner()).unwrap();
assert_eq!(
stage_progress,
AccountHashingCheckpoint { address: Some(fifth_address), from: 11, to: 20 }
assert_matches!(
result,
Ok(ExecOutput {
checkpoint: StageCheckpoint {
block_number: 10,
stage_checkpoint: Some(StageUnitCheckpoint::Account(
AccountHashingCheckpoint { address: Some(address), from: 11, to: 20 }
))
},
done: false
}) if address == fifth_address
);
assert_eq!(runner.tx.table::<tables::HashedAccount>().unwrap().len(), 5);
// second run, hash next five account.
// second run, hash next five accounts.
input.checkpoint = Some(result.unwrap().checkpoint);
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 20, .. }, done: true })
Ok(ExecOutput {
checkpoint: StageCheckpoint { block_number: 20, stage_checkpoint: None },
done: true
})
);
assert_eq!(runner.tx.table::<tables::HashedAccount>().unwrap().len(), 10);

View File

@ -1,6 +1,5 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use num_traits::Zero;
use reth_codecs::Compact;
use reth_db::{
cursor::DbDupCursorRO,
database::Database,
@ -33,43 +32,6 @@ impl Default for StorageHashingStage {
}
}
impl StorageHashingStage {
/// Saves the hashing progress
pub fn save_checkpoint<DB: Database>(
&mut self,
tx: &Transaction<'_, DB>,
checkpoint: StorageHashingCheckpoint,
) -> Result<(), StageError> {
debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?checkpoint, "Saving inner storage hashing checkpoint");
let mut buf = vec![];
checkpoint.to_compact(&mut buf);
Ok(tx.put::<tables::SyncStageProgress>(STORAGE_HASHING.0.into(), buf)?)
}
/// Gets the hashing progress
pub fn get_checkpoint<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
) -> Result<StorageHashingCheckpoint, StageError> {
let buf =
tx.get::<tables::SyncStageProgress>(STORAGE_HASHING.0.into())?.unwrap_or_default();
if buf.is_empty() {
return Ok(StorageHashingCheckpoint::default())
}
let (checkpoint, _) = StorageHashingCheckpoint::from_compact(&buf, buf.len());
if checkpoint.address.is_some() {
debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?checkpoint, "Continuing inner storage hashing checkpoint");
}
Ok(checkpoint)
}
}
#[async_trait::async_trait]
impl<DB: Database> Stage<DB> for StorageHashingStage {
/// Return the id of the stage
@ -94,22 +56,34 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
// AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
// genesis accounts are not in changeset, along with their storages.
if to_block - from_block > self.clean_threshold || from_block == 1 {
let mut checkpoint = self.get_checkpoint(tx)?;
let stage_checkpoint = input
.checkpoint
.and_then(|checkpoint| checkpoint.storage_hashing_stage_checkpoint());
if checkpoint.address.is_none() ||
// Checkpoint is no longer valid if the range of blocks changed.
// An already hashed storage may have been changed with the new range, and therefore should be hashed again.
checkpoint.to != to_block ||
checkpoint.from != from_block
{
tx.clear::<tables::HashedStorage>()?;
let (mut current_key, mut current_subkey) = match stage_checkpoint {
Some(StorageHashingCheckpoint {
address: address @ Some(_),
storage,
from,
to ,
})
// Checkpoint is only valid if the range of transitions didn't change.
// An already hashed storage may have been changed with the new range,
// and therefore should be hashed again.
if from == from_block && to == to_block =>
{
debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?stage_checkpoint, "Continuing inner storage hashing checkpoint");
checkpoint = StorageHashingCheckpoint::default();
self.save_checkpoint(tx, checkpoint)?;
}
(address, storage)
}
_ => {
// clear table, load all accounts and hash it
tx.clear::<tables::HashedStorage>()?;
(None, None)
}
};
let mut current_key = checkpoint.address.take();
let mut current_subkey = checkpoint.storage.take();
let mut keccak_address = None;
let mut hashed_batch = BTreeMap::new();
@ -169,20 +143,17 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx.put::<tables::HashedStorage>(addr, StorageEntry { key, value })
})?;
if let Some(address) = &current_key {
checkpoint.address = Some(*address);
checkpoint.storage = current_subkey;
checkpoint.from = from_block;
checkpoint.to = to_block;
}
self.save_checkpoint(tx, checkpoint)?;
if current_key.is_some() {
// `from_block` is correct here as were are iteration over state for this
// particular block.
info!(target: "sync::stages::hashing_storage", stage_progress = %input.checkpoint(), is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { checkpoint: input.checkpoint(), done: false })
let checkpoint = input.checkpoint().with_storage_hashing_stage_checkpoint(
StorageHashingCheckpoint {
address: current_key,
storage: current_subkey,
from: from_block,
to: to_block,
},
);
info!(target: "sync::stages::hashing_storage", stage_progress = %checkpoint, is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { checkpoint, done: false })
}
} else {
// Aggregate all changesets and and make list of storages that have been
@ -195,8 +166,12 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx.insert_storage_for_hashing(storages.into_iter())?;
}
info!(target: "sync::stages::hashing_storage", stage_progress = %input.previous_stage_checkpoint(), is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { checkpoint: input.previous_stage_checkpoint(), done: true })
// We finished the hashing stage, no future iterations is expected for the same block range,
// so no checkpoint is needed.
let checkpoint = input.previous_stage_checkpoint();
info!(target: "sync::stages::hashing_storage", stage_progress = %checkpoint, is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { checkpoint, done: true })
}
/// Unwind the stage.
@ -231,7 +206,7 @@ mod tests {
use reth_interfaces::test_utils::generators::{
random_block_range, random_contract_account_range,
};
use reth_primitives::{Address, SealedBlock, StorageEntry, H256, U256};
use reth_primitives::{Address, SealedBlock, StageUnitCheckpoint, StorageEntry, H256, U256};
stage_test_suite_ext!(StorageHashingTestRunner, storage_hashing);
@ -250,7 +225,7 @@ mod tests {
// hang on one key. Seed execution inserts more than one storage entry per address.
runner.set_commit_threshold(1);
let input = ExecInput {
let mut input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, StageCheckpoint::new(previous_stage))),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
@ -261,6 +236,7 @@ mod tests {
if let Ok(result) = runner.execute(input).await.unwrap() {
if !result.done {
// Continue from checkpoint
input.checkpoint = Some(result.checkpoint);
continue
} else {
assert!(result.checkpoint.block_number == previous_stage);
@ -296,11 +272,7 @@ mod tests {
// first run, hash first half of storages.
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 100, .. }, done: false })
);
assert_eq!(runner.tx.table::<tables::HashedStorage>().unwrap().len(), 500);
let (progress_address, progress_key) = runner
.tx
.query(|tx| {
@ -314,27 +286,29 @@ mod tests {
})
.unwrap();
let stage_progress = runner.stage().get_checkpoint(&runner.tx.inner()).unwrap();
let progress_key = stage_progress.storage.map(|_| progress_key);
assert_eq!(
stage_progress,
StorageHashingCheckpoint {
address: Some(progress_address),
storage: progress_key,
from: 101,
to: 500
}
assert_matches!(
result,
Ok(ExecOutput {
checkpoint: StageCheckpoint {
block_number: 100,
stage_checkpoint: Some(StageUnitCheckpoint::Storage(StorageHashingCheckpoint {
address: Some(address),
storage: Some(storage),
from: 101,
to: 500
}))
},
done: false
}) if address == progress_address && storage == progress_key
);
assert_eq!(runner.tx.table::<tables::HashedStorage>().unwrap().len(), 500);
// second run with commit threshold of 2 to check if subkey is set.
runner.set_commit_threshold(2);
input.checkpoint = Some(result.unwrap().checkpoint);
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 100, .. }, done: false })
);
assert_eq!(runner.tx.table::<tables::HashedStorage>().unwrap().len(), 502);
let (progress_address, progress_key) = runner
.tx
.query(|tx| {
@ -348,17 +322,24 @@ mod tests {
})
.unwrap();
let stage_progress = runner.stage().get_checkpoint(&runner.tx.inner()).unwrap();
let progress_key = stage_progress.storage.map(|_| progress_key);
assert_eq!(
stage_progress,
StorageHashingCheckpoint {
address: Some(progress_address),
storage: progress_key,
from: 101,
to: 500
}
assert_matches!(
result,
Ok(ExecOutput {
checkpoint: StageCheckpoint {
block_number: 100,
stage_checkpoint: Some(StageUnitCheckpoint::Storage(
StorageHashingCheckpoint {
address: Some(address),
storage: Some(storage),
from: 101,
to: 500,
}
))
},
done: false
}) if address == progress_address && storage == progress_key
);
assert_eq!(runner.tx.table::<tables::HashedStorage>().unwrap().len(), 502);
// third last run, hash rest of storages.
runner.set_commit_threshold(1000);
@ -368,7 +349,10 @@ mod tests {
assert_matches!(
result,
Ok(ExecOutput { checkpoint: StageCheckpoint { block_number: 500, .. }, done: true })
Ok(ExecOutput {
checkpoint: StageCheckpoint { block_number: 500, stage_checkpoint: None },
done: true
})
);
assert_eq!(
runner.tx.table::<tables::HashedStorage>().unwrap().len(),