chore(stages): cleanup execute/unwind logs (#3055)

This commit is contained in:
Alexey Shekhirin
2023-06-08 21:13:53 +04:00
committed by GitHub
parent cb542ab9d8
commit 6c56e9b143
12 changed files with 18 additions and 39 deletions

View File

@ -265,6 +265,14 @@ where
match output {
Ok(unwind_output) => {
checkpoint = unwind_output.checkpoint;
info!(
target: "sync::pipeline",
stage = %stage_id,
unwind_to = to,
progress = checkpoint.block_number,
done = checkpoint.block_number == to,
"Stage unwound"
);
self.metrics.stage_checkpoint(
stage_id, checkpoint,
// We assume it was set in the previous execute iteration, so it

View File

@ -154,7 +154,6 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
// - We got fewer blocks than our target
// - We reached our target and the target was not limited by the batch size of the stage
let done = highest_block == to_block;
info!(target: "sync::stages::bodies", stage_progress = highest_block, target = to_block, is_final_range = done, "Stage iteration finished");
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(highest_block)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
@ -211,7 +210,6 @@ impl<DB: Database, D: BodyDownloader> Stage<DB> for BodyStage<D> {
rev_walker.delete_current()?;
}
info!(target: "sync::stages::bodies", to_block = input.unwind_to, stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(input.unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),

View File

@ -189,12 +189,11 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
state.write_to_db(&**tx)?;
trace!(target: "sync::stages::execution", took = ?start.elapsed(), "Wrote state");
let is_final_range = stage_progress == max_block;
info!(target: "sync::stages::execution", stage_progress, is_final_range, "Stage iteration finished");
let done = stage_progress == max_block;
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(stage_progress)
.with_execution_stage_checkpoint(stage_checkpoint),
done: is_final_range,
done,
})
}
}
@ -335,7 +334,7 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
let mut account_changeset = tx.cursor_dup_write::<tables::AccountChangeSet>()?;
let mut storage_changeset = tx.cursor_dup_write::<tables::StorageChangeSet>()?;
let (range, unwind_to, is_final_range) =
let (range, unwind_to, _) =
input.unwind_block_range_with_threshold(self.thresholds.max_blocks.unwrap_or(u64::MAX));
if range.is_empty() {
@ -414,7 +413,6 @@ impl<EF: ExecutorFactory, DB: Database> Stage<DB> for ExecutionStage<EF> {
StageCheckpoint::new(unwind_to)
};
info!(target: "sync::stages::execution", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint })
}
}

View File

@ -235,7 +235,6 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
},
);
info!(target: "sync::stages::hashing_account", checkpoint = %checkpoint, is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { checkpoint, done: false })
}
} else {
@ -258,7 +257,6 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
..Default::default()
});
info!(target: "sync::stages::hashing_account", checkpoint = %checkpoint, is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { checkpoint, done: true })
}
@ -268,7 +266,7 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, is_final_range) =
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);
// Aggregate all transition changesets and make a list of accounts that have been changed.
@ -279,7 +277,6 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
stage_checkpoint.progress = stage_checkpoint_progress(tx)?;
info!(target: "sync::stages::hashing_account", to_block = input.unwind_to, %unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_progress)
.with_account_hashing_stage_checkpoint(stage_checkpoint),

View File

@ -165,7 +165,6 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
},
);
info!(target: "sync::stages::hashing_storage", checkpoint = %checkpoint, is_final_range = false, "Stage iteration finished");
return Ok(ExecOutput { checkpoint, done: false })
}
} else {
@ -187,7 +186,6 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
..Default::default()
});
info!(target: "sync::stages::hashing_storage", checkpoint = %checkpoint, is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { checkpoint, done: true })
}
@ -197,7 +195,7 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, is_final_range) =
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);
tx.unwind_storage_hashing(BlockNumberAddress::range(range))?;
@ -207,7 +205,6 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
stage_checkpoint.progress = stage_checkpoint_progress(tx)?;
info!(target: "sync::stages::hashing_storage", to_block = input.unwind_to, %unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_progress)
.with_storage_hashing_stage_checkpoint(stage_checkpoint),

View File

@ -341,7 +341,6 @@ where
checkpoint = checkpoint.with_headers_stage_checkpoint(stage_checkpoint);
}
info!(target: "sync::stages::headers", to_block = input.unwind_to, checkpoint = input.unwind_to, is_final_range = true, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint })
}
}

View File

@ -11,7 +11,6 @@ use std::{
fmt::Debug,
ops::{Deref, RangeInclusive},
};
use tracing::*;
/// Stage is indexing history the account changesets generated in
/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information
@ -58,7 +57,6 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
stage_checkpoint.progress.processed += changesets;
info!(target: "sync::stages::index_account_history", stage_progress = *range.end(), is_final_range, "Stage iteration finished");
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(*range.end())
.with_index_history_stage_checkpoint(stage_checkpoint),
@ -72,7 +70,7 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, is_final_range) =
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);
let changesets = tx.unwind_account_history_indices(range)?;
@ -86,7 +84,6 @@ impl<DB: Database> Stage<DB> for IndexAccountHistoryStage {
StageCheckpoint::new(unwind_progress)
};
info!(target: "sync::stages::index_account_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
// from HistoryIndex higher than that number.
Ok(UnwindOutput { checkpoint })
}

View File

@ -14,7 +14,6 @@ use std::{
fmt::Debug,
ops::{Deref, RangeInclusive},
};
use tracing::*;
/// Stage is indexing history the account changesets generated in
/// [`ExecutionStage`][crate::stages::ExecutionStage]. For more information
@ -61,7 +60,6 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
stage_checkpoint.progress.processed += changesets;
info!(target: "sync::stages::index_storage_history", stage_progress = *range.end(), done = is_final_range, "Stage iteration finished");
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(*range.end())
.with_index_history_stage_checkpoint(stage_checkpoint),
@ -75,7 +73,7 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_progress, is_final_range) =
let (range, unwind_progress, _) =
input.unwind_block_range_with_threshold(self.commit_threshold);
let changesets = tx.unwind_storage_history_indices(BlockNumberAddress::range(range))?;
@ -89,7 +87,6 @@ impl<DB: Database> Stage<DB> for IndexStorageHistoryStage {
StageCheckpoint::new(unwind_progress)
};
info!(target: "sync::stages::index_storage_history", to_block = input.unwind_to, unwind_progress, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint })
}
}

View File

@ -264,7 +264,6 @@ impl<DB: Database> Stage<DB> for MerkleStage {
self.validate_state_root(trie_root, block.seal_slow(), to_block)?;
info!(target: "sync::stages::merkle::exec", stage_progress = to_block, is_final_range = true, "Stage iteration finished");
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(to_block)
.with_entities_stage_checkpoint(entities_checkpoint),
@ -294,7 +293,6 @@ impl<DB: Database> Stage<DB> for MerkleStage {
if input.unwind_to == 0 {
tx.clear::<tables::AccountsTrie>()?;
tx.clear::<tables::StoragesTrie>()?;
info!(target: "sync::stages::merkle::unwind", stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
entities_checkpoint.processed = 0;
@ -322,7 +320,6 @@ impl<DB: Database> Stage<DB> for MerkleStage {
info!(target: "sync::stages::merkle::unwind", "Nothing to unwind");
}
info!(target: "sync::stages::merkle::unwind", stage_progress = input.unwind_to, is_final_range = true, "Unwind iteration finished");
Ok(UnwindOutput { checkpoint: StageCheckpoint::new(input.unwind_to) })
}
}

View File

@ -148,7 +148,6 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
}
info!(target: "sync::stages::sender_recovery", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
@ -162,14 +161,12 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (_, unwind_to, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
// Lookup latest tx id that we should unwind to
let latest_tx_id = tx.block_body_indices(unwind_to)?.last_tx_num();
tx.unwind_table_by_num::<tables::TxSenders>(latest_tx_id)?;
info!(target: "sync::stages::sender_recovery", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),

View File

@ -83,7 +83,6 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
cursor_td.append(block_number, td.into())?;
}
info!(target: "sync::stages::total_difficulty", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
@ -97,12 +96,10 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (_, unwind_to, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
let (_, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
tx.unwind_table_by_num::<tables::HeaderTD>(unwind_to)?;
info!(target: "sync::stages::total_difficulty", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),

View File

@ -136,7 +136,6 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
}
info!(target: "sync::stages::transaction_lookup", stage_progress = end_block, is_final_range, "Stage iteration finished");
Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),
@ -150,8 +149,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let (range, unwind_to, is_final_range) =
input.unwind_block_range_with_threshold(self.commit_threshold);
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
// Cursors to unwind tx hash to number
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
@ -174,7 +172,6 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}
}
info!(target: "sync::stages::transaction_lookup", to_block = input.unwind_to, unwind_progress = unwind_to, is_final_range, "Unwind iteration finished");
Ok(UnwindOutput {
checkpoint: StageCheckpoint::new(unwind_to)
.with_entities_stage_checkpoint(stage_checkpoint(tx)?),