fix: unwind stages with respect to commit threshold (#2500)

This commit is contained in:
Alexey Shekhirin
2023-05-13 02:50:47 +09:00
committed by GitHub
parent e2c7b38d55
commit 748f90ed3c
15 changed files with 185 additions and 155 deletions

View File

@ -280,6 +280,8 @@ where
self.listeners
.notify(PipelineEvent::Unwound { stage_id, result: unwind_output });
tx.commit()?;
}
Err(err) => {
self.listeners.notify(PipelineEvent::Error { stage_id });
@ -289,7 +291,6 @@ where
}
}
tx.commit()?;
Ok(())
}

View File

@ -3,7 +3,10 @@ use async_trait::async_trait;
use reth_db::database::Database;
use reth_primitives::BlockNumber;
use reth_provider::Transaction;
use std::{cmp::min, ops::RangeInclusive};
use std::{
cmp::{max, min},
ops::RangeInclusive,
};
/// Stage execution input, see [Stage::execute].
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
@ -42,8 +45,8 @@ impl ExecInput {
&self,
threshold: u64,
) -> (RangeInclusive<BlockNumber>, bool) {
// plus +1 is to skip present block and allways start from block number 1, not 0.
let current_block = self.stage_progress.unwrap_or_default();
// +1 is to skip present block and always start from block number 1, not 0.
let start = current_block + 1;
let target = self.previous_stage_progress();
@ -66,20 +69,26 @@ pub struct UnwindInput {
}
impl UnwindInput {
/// Return next block range that needs to be executed.
/// Return next block range that needs to be unwound.
pub fn unwind_block_range(&self) -> RangeInclusive<BlockNumber> {
self.unwind_block_range_with_threshold(u64::MAX)
self.unwind_block_range_with_threshold(u64::MAX).0
}
/// Return the next block range to execute.
pub fn unwind_block_range_with_threshold(&self, threshold: u64) -> RangeInclusive<BlockNumber> {
// plus +1 is to skip present block.
let start = self.unwind_to + 1;
let mut end = self.stage_progress;
/// Return the next block range to unwind and the block we're unwinding to.
pub fn unwind_block_range_with_threshold(
&self,
threshold: u64,
) -> (RangeInclusive<BlockNumber>, BlockNumber, bool) {
// +1 is to skip the block we're unwinding to
let mut start = self.unwind_to + 1;
let end = self.stage_progress;
end = min(end, start.saturating_add(threshold));
start = max(start, end.saturating_sub(threshold));
start..=end
let unwind_to = start - 1;
let is_final_range = unwind_to == self.unwind_to;
(start..=end, unwind_to, is_final_range)
}
}

View File

@ -155,7 +155,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// - We got fewer blocks than our target
// - We reached our target and the target was not limited by the batch size of the stage
let done = highest_block == to_block;
info!(target: "sync::stages::bodies", stage_progress = highest_block, target = to_block, done, "Sync iteration finished");
info!(target: "sync::stages::bodies", stage_progress = highest_block, target = to_block, is_final_range = done, "Stage iteration finished");
Ok(ExecOutput { stage_progress: highest_block, done })
}
@ -165,7 +165,6 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::bodies", to_block = input.unwind_to, "Unwinding");
// Cursors to unwind bodies, ommers
let mut body_cursor = tx.cursor_write::<tables::BlockBodyIndices>()?;
let mut transaction_cursor = tx.cursor_write::<tables::Transactions>()?;
@ -209,6 +208,7 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
tx.delete::<tables::BlockBodyIndices>(number, None)?;
}
info!(target: "sync::stages::bodies", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}

View File

@ -141,7 +141,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
let mut executor = self.executor_factory.with_sp(LatestStateProviderRef::new(&**tx));
// Progress tracking
let mut progress = start_block;
let mut stage_progress = start_block;
// Execute block range
let mut state = PostState::default();
@ -164,7 +164,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
// Merge state changes
state.extend(block_state);
progress = block_number;
stage_progress = block_number;
// Write history periodically to free up memory
if self.thresholds.should_write_history(state.changeset_size() as u64) {
@ -176,7 +176,6 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
// Check if we should commit now
if self.thresholds.is_end_of_batch(block_number - start_block, state.size() as u64) {
info!(target: "sync::stages::execution", ?block_number, "Threshold hit, committing.");
break
}
}
@ -186,7 +185,10 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
trace!(target: "sync::stages::execution", accounts = state.accounts().len(), "Writing updated state to database");
state.write_to_db(&**tx)?;
trace!(target: "sync::stages::execution", took = ?Instant::now().duration_since(start), "Wrote state");
Ok(ExecOutput { stage_progress: progress, done: progress == max_block })
let is_final_range = stage_progress == max_block;
info!(target: "sync::stages::execution", stage_progress, is_final_range, "Stage iteration finished");
Ok(ExecOutput { stage_progress, done: is_final_range })
}
}
@ -237,22 +239,21 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::execution", to_block = input.unwind_to, "Unwinding");
// Acquire changeset cursors
let mut account_changeset = tx.cursor_dup_write::<tables::AccountChangeSet>()?;
let mut storage_changeset = tx.cursor_dup_write::<tables::StorageChangeSet>()?;
let block_range = input.unwind_to + 1..=input.stage_progress;
let (range, unwind_to, is_final_range) =
input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX));
if block_range.is_empty() {
if range.is_empty() {
return Ok(UnwindOutput { stage_progress: input.unwind_to })
}
// get all batches for account change
// Check if walk and walk_dup would do the same thing
let account_changeset_batch =
account_changeset.walk_range(block_range.clone())?.collect::<Result<Vec<_>, _>>()?;
account_changeset.walk_range(range.clone())?.collect::<Result<Vec<_>, _>>()?;
// revert all changes to PlainState
for (_, changeset) in account_changeset_batch.into_iter().rev() {
@ -265,7 +266,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
// get all batches for storage change
let storage_changeset_batch = storage_changeset
.walk_range(BlockNumberAddress::range(block_range.clone()))?
.walk_range(BlockNumberAddress::range(range.clone()))?
.collect::<Result<Vec<_>, _>>()?;
// revert all changes to PlainStorage
@ -286,7 +287,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
// Discard unwinded changesets
let mut rev_acc_changeset_walker = account_changeset.walk_back(None)?;
while let Some((block_num, _)) = rev_acc_changeset_walker.next().transpose()? {
if block_num < *block_range.start() {
if block_num <= unwind_to {
break
}
// delete all changesets
@ -295,14 +296,15 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
let mut rev_storage_changeset_walker = storage_changeset.walk_back(None)?;
while let Some((key, _)) = rev_storage_changeset_walker.next().transpose()? {
if key.block_number() < *block_range.start() {
if key.block_number() < *range.start() {
break
}
// delete all changesets
tx.delete::<tables::StorageChangeSet>(key, None)?;
}
Ok(UnwindOutput { stage_progress: input.unwind_to })
info!(target: "sync::stages::execution", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_to })
}
}

View File

@ -255,6 +255,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
if next_address.is_some() {
// from block is correct here as were are iteration over state for this
// particular block
info!(target: "sync::stages::hashing_account", stage_progress = input.stage_progress(), is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { stage_progress: input.stage_progress(), done: false })
}
} else {
@ -269,7 +270,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
tx.insert_account_for_hashing(accounts.into_iter())?;
}
info!(target: "sync::stages::hashing_account", "Stage finished");
info!(target: "sync::stages::hashing_account", stage_progress = input.previous_stage_progress(), is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true })
}
@ -279,15 +280,14 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
// There is no threshold on account unwind, we will always take changesets and
// apply past values to HashedAccount table.
let range = input.unwind_block_range();
let (range, unwind_progress, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
// Aggregate all transition changesets and and make list of account that have been changed.
tx.unwind_account_hashing(range)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
info!(target: "sync::stages::hashing_account", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_progress })
}
}

View File

@ -181,6 +181,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
if current_key.is_some() {
// `from_block` is correct here as were are iteration over state for this
// particular block.
info!(target: "sync::stages::hashing_storage", stage_progress = input.stage_progress(), is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { stage_progress: input.stage_progress(), done: false })
}
} else {
@ -194,7 +195,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx.insert_storage_for_hashing(storages.into_iter())?;
}
info!(target: "sync::stages::hashing_storage", "Stage finished");
info!(target: "sync::stages::hashing_storage", stage_progress = input.previous_stage_progress(), is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { stage_progress: input.previous_stage_progress(), done: true })
}
@ -204,11 +205,13 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let range = input.unwind_block_range();
let (range, unwind_progress, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
tx.unwind_storage_hashing(BlockNumberAddress::range(range))?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
info!(target: "sync::stages::hashing_storage", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_progress })
}
}

View File

@ -241,12 +241,13 @@ where
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
// TODO: handle bad block
info!(target: "sync::stages::headers", to_block = input.unwind_to, "Unwinding");
tx.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
input.unwind_to + 1,
)?;
tx.unwind_table_by_num::<tables::CanonicalHeaders>(input.unwind_to)?;
tx.unwind_table_by_num::<tables::Headers>(input.unwind_to)?;
info!(target: "sync::stages::headers", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}

View File

@ -46,7 +46,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
// Insert changeset to history index
tx.insert_account_history_index(indices)?;
info!(target: "sync::stages::index_account_history", "Stage finished");
info!(target: "sync::stages::index_account_history", stage_progress = *range.end(), is_final_range, "Stage iteration finished");
Ok(ExecOutput { stage_progress: *range.end(), done: is_final_range })
}
@ -56,13 +56,14 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, "Unwinding");
let range = input.unwind_block_range();
let (range, unwind_progress, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
tx.unwind_account_history_indices(range)?;
info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
// from HistoryIndex higher than that number.
Ok(UnwindOutput { stage_progress: input.unwind_to })
Ok(UnwindOutput { stage_progress: unwind_progress })
}
}

View File

@ -46,7 +46,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
let indices = tx.get_storage_transition_ids_from_changeset(range.clone())?;
tx.insert_storage_history_index(indices)?;
info!(target: "sync::stages::index_storage_history", "Stage finished");
info!(target: "sync::stages::index_storage_history", stage_progress = *range.end(), done = is_final_range, "Stage iteration finished");
Ok(ExecOutput { stage_progress: *range.end(), done: is_final_range })
}
@ -56,12 +56,13 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, "Unwinding");
let range = input.unwind_block_range();
let (range, unwind_progress, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
tx.unwind_storage_history_indices(BlockNumberAddress::range(range))?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
info!(target: "sync::stages::index_storage_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_progress })
}
}

View File

@ -226,7 +226,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
self.validate_state_root(trie_root, block_root, to_block)?;
info!(target: "sync::stages::merkle::exec", "Stage finished");
info!(target: "sync::stages::merkle::exec", stage_progress = to_block, is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { stage_progress: to_block, done: true })
}
@ -238,13 +238,14 @@ impl<DB: Database> Stage<DB> for MerkleStage {
) -> Result<UnwindOutput, StageError> {
let range = input.unwind_block_range();
if matches!(self, MerkleStage::Execution { .. }) {
info!(target: "sync::stages::merkle::exec", "Stage is always skipped");
info!(target: "sync::stages::merkle::unwind", "Stage is always skipped");
return Ok(UnwindOutput { stage_progress: input.unwind_to })
}
if input.unwind_to == 0 {
tx.clear::<tables::AccountsTrie>()?;
tx.clear::<tables::StoragesTrie>()?;
info!(target: "sync::stages::merkle::unwind", stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
return Ok(UnwindOutput { stage_progress: input.unwind_to })
}
@ -264,7 +265,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
info!(target: "sync::stages::merkle::unwind", "Nothing to unwind");
}
info!(target: "sync::stages::merkle::unwind", "Stage finished");
info!(target: "sync::stages::merkle::unwind", stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}

View File

@ -136,7 +136,7 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
}
info!(target: "sync::stages::sender_recovery", stage_progress = end_block, is_final_range, "Sync iteration finished");
info!(target: "sync::stages::sender_recovery", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput { stage_progress: end_block, done: is_final_range })
}
@ -146,11 +146,15 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::sender_recovery", to_block = input.unwind_to, "Unwinding");
let (_, unwind_to, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
// Lookup latest tx id that we should unwind to
let latest_tx_id = tx.block_body_indices(input.unwind_to)?.last_tx_num();
let latest_tx_id = tx.block_body_indices(unwind_to)?.last_tx_num();
tx.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
info!(target: "sync::stages::sender_recovery", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_to })
}
}

View File

@ -81,7 +81,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
.map_err(|error| StageError::Validation { block: header.number, error })?;
cursor_td.append(block_number, td.into())?;
}
info!(target: "sync::stages::total_difficulty", stage_progress = end_block, is_final_range, "Sync iteration finished");
info!(target: "sync::stages::total_difficulty", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput { stage_progress: end_block, done: is_final_range })
}
@ -91,9 +91,13 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::total_difficulty", to_block = input.unwind_to, "Unwinding");
tx.unwind_table_by_num::<tables::HeaderTD>(input.unwind_to)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
let (_, unwind_to, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
tx.unwind_table_by_num::<tables::HeaderTD>(unwind_to)?;
info!(target: "sync::stages::total_difficulty", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_to })
}
}

View File

@ -143,7 +143,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
}
info!(target: "sync::stages::transaction_lookup", stage_progress = end_block, is_final_range, "Sync iteration finished");
info!(target: "sync::stages::transaction_lookup", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput { done: is_final_range, stage_progress: end_block })
}
@ -153,14 +153,16 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
info!(target: "sync::stages::transaction_lookup", to_block = input.unwind_to, "Unwinding");
let (range, unwind_to, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
// Cursors to unwind tx hash to number
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut tx_hash_number_cursor = tx.cursor_write::<tables::TxHashNumber>()?;
let mut transaction_cursor = tx.cursor_read::<tables::Transactions>()?;
let mut rev_walker = body_cursor.walk_back(None)?;
let mut rev_walker = body_cursor.walk_back(Some(*range.end()))?;
while let Some((number, body)) = rev_walker.next().transpose()? {
if number <= input.unwind_to {
if number <= unwind_to {
break
}
@ -175,7 +177,8 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
}
Ok(UnwindOutput { stage_progress: input.unwind_to })
info!(target: "sync::stages::transaction_lookup", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { stage_progress: unwind_to })
}
}