feat(tree): measure different parts of canonicalization (#5266)

This commit is contained in:
Alexey Shekhirin
2023-11-03 16:53:59 +00:00
committed by GitHub
parent 3ab1afc9aa
commit 4fcd20c890
9 changed files with 285 additions and 22 deletions

2
Cargo.lock generated
View File

@ -6299,12 +6299,14 @@ dependencies = [
"auto_impl",
"dashmap",
"itertools 0.11.0",
"metrics",
"parking_lot 0.12.1",
"pin-project",
"rand 0.8.5",
"rayon",
"reth-db",
"reth-interfaces",
"reth-metrics",
"reth-nippy-jar",
"reth-primitives",
"reth-trie",

View File

@ -2,7 +2,7 @@
use crate::{
canonical_chain::CanonicalChain,
chain::BlockKind,
metrics::TreeMetrics,
metrics::{MakeCanonicalAction, MakeCanonicalDurationsRecorder, TreeMetrics},
state::{BlockChainId, TreeState},
AppendableChain, BlockIndices, BlockchainTreeConfig, BundleStateData, TreeExternals,
};
@ -911,11 +911,16 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
#[track_caller]
#[instrument(level = "trace", skip(self), target = "blockchain_tree")]
pub fn make_canonical(&mut self, block_hash: &BlockHash) -> RethResult<CanonicalOutcome> {
let mut durations_recorder = MakeCanonicalDurationsRecorder::default();
let old_block_indices = self.block_indices().clone();
let old_buffered_blocks = self.state.buffered_blocks.parent_to_child.clone();
durations_recorder.record_relative(MakeCanonicalAction::CloneOldBlocks);
// If block is already canonical don't return error.
if let Some(header) = self.find_canonical_header(block_hash)? {
let canonical_header = self.find_canonical_header(block_hash)?;
durations_recorder.record_relative(MakeCanonicalAction::FindCanonicalHeader);
if let Some(header) = canonical_header {
info!(target: "blockchain_tree", ?block_hash, "Block is already canonical, ignoring.");
// TODO: this could be fetched from the chainspec first
let td = self.externals.database().provider()?.header_td(block_hash)?.ok_or(
@ -945,6 +950,7 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
// we are splitting chain at the block hash that we want to make canonical
let canonical = self.split_chain(chain_id, chain, SplitAt::Hash(*block_hash));
durations_recorder.record_relative(MakeCanonicalAction::SplitChain);
let mut block_fork = canonical.fork_block();
let mut block_fork_number = canonical.fork_block_number();
@ -959,9 +965,10 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
block_fork_number = canonical.fork_block_number();
chains_to_promote.push(canonical);
}
durations_recorder.record_relative(MakeCanonicalAction::SplitChainForks);
let old_tip = self.block_indices().canonical_tip();
// Merge all chain into one chain.
// Merge all chains into one chain.
let mut new_canon_chain = chains_to_promote.pop().expect("There is at least one block");
trace!(target: "blockchain_tree", ?new_canon_chain, "Merging chains");
let mut chain_appended = false;
@ -970,12 +977,14 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
trace!(target: "blockchain_tree", ?chain, "Appending chain");
new_canon_chain.append_chain(chain).expect("We have just build the chain.");
}
durations_recorder.record_relative(MakeCanonicalAction::MergeAllChains);
if chain_appended {
trace!(target: "blockchain_tree", ?new_canon_chain, "Canonical appended chain");
trace!(target: "blockchain_tree", ?new_canon_chain, "Canonical chain appended");
}
// update canonical index
self.block_indices_mut().canonicalize_blocks(new_canon_chain.blocks());
durations_recorder.record_relative(MakeCanonicalAction::UpdateCanonicalIndex);
// event about new canonical chain.
let chain_notification;
@ -989,7 +998,8 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
chain_notification =
CanonStateNotification::Commit { new: Arc::new(new_canon_chain.clone()) };
// append to database
self.commit_canonical(new_canon_chain)?;
self.commit_canonical_to_database(new_canon_chain)?;
durations_recorder.record_relative(MakeCanonicalAction::CommitCanonicalChainToDatabase);
} else {
// it forks to canonical block that is not the tip.
@ -1005,7 +1015,9 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
unreachable!("all chains should point to canonical chain.");
}
let old_canon_chain = self.revert_canonical(canon_fork.number);
let old_canon_chain = self.revert_canonical_from_database(canon_fork.number);
durations_recorder
.record_relative(MakeCanonicalAction::RevertCanonicalChainFromDatabase);
let old_canon_chain = match old_canon_chain {
val @ Err(_) => {
@ -1022,7 +1034,8 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
Ok(val) => val,
};
// commit new canonical chain.
self.commit_canonical(new_canon_chain.clone())?;
self.commit_canonical_to_database(new_canon_chain.clone())?;
durations_recorder.record_relative(MakeCanonicalAction::CommitCanonicalChainToDatabase);
if let Some(old_canon_chain) = old_canon_chain {
// state action
@ -1034,6 +1047,7 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
// insert old canon chain
self.insert_chain(AppendableChain::new(old_canon_chain));
durations_recorder.record_relative(MakeCanonicalAction::InsertOldCanonicalChain);
self.update_reorg_metrics(reorg_depth as f64);
} else {
@ -1045,12 +1059,17 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
}
}
//
let head = chain_notification.tip().header.clone();
// send notification about new canonical chain.
let _ = self.canon_state_notification_sender.send(chain_notification);
debug!(
target: "blockchain_tree",
actions = ?durations_recorder.actions,
"Canonicalization finished"
);
Ok(CanonicalOutcome::Committed { head })
}
@ -1066,8 +1085,8 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
self.canon_state_notification_sender.clone()
}
/// Canonicalize the given chain and commit it to the database.
fn commit_canonical(&self, chain: Chain) -> RethResult<()> {
/// Write the given chain to the database as canonical.
fn commit_canonical_to_database(&self, chain: Chain) -> RethResult<()> {
let provider = DatabaseProvider::new_rw(
self.externals.db.tx_mut()?,
self.externals.chain_spec.clone(),
@ -1095,7 +1114,7 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
return Ok(())
}
// revert `N` blocks from current canonical chain and put them inside BlockchanTree
let old_canon_chain = self.revert_canonical(unwind_to)?;
let old_canon_chain = self.revert_canonical_from_database(unwind_to)?;
// check if there is block in chain
if let Some(old_canon_chain) = old_canon_chain {
@ -1110,7 +1129,10 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
/// Revert canonical blocks from the database and return them.
///
/// The block, `revert_until`, is non-inclusive, i.e. `revert_until` stays in the database.
fn revert_canonical(&mut self, revert_until: BlockNumber) -> RethResult<Option<Chain>> {
fn revert_canonical_from_database(
&mut self,
revert_until: BlockNumber,
) -> RethResult<Option<Chain>> {
// read data that is needed for new sidechain
let provider = DatabaseProvider::new_rw(

View File

@ -1,7 +1,9 @@
use metrics::Histogram;
use reth_metrics::{
metrics::{Counter, Gauge},
Metrics,
};
use std::time::{Duration, Instant};
/// Metrics for the entire blockchain tree
#[derive(Metrics)]
@ -26,3 +28,73 @@ pub struct BlockBufferMetrics {
/// Total blocks in the block buffer
pub blocks: Gauge,
}
#[derive(Debug)]
pub(crate) struct MakeCanonicalDurationsRecorder {
start: Instant,
pub(crate) actions: Vec<(MakeCanonicalAction, Duration)>,
latest: Option<Duration>,
}
impl Default for MakeCanonicalDurationsRecorder {
fn default() -> Self {
Self { start: Instant::now(), actions: Vec::new(), latest: None }
}
}
impl MakeCanonicalDurationsRecorder {
/// Records the duration since last record, saves it for future logging and instantly reports as
/// a metric with `action` label.
pub(crate) fn record_relative(&mut self, action: MakeCanonicalAction) {
let elapsed = self.start.elapsed();
let duration = elapsed - self.latest.unwrap_or_default();
self.actions.push((action, duration));
MakeCanonicalMetrics::new_with_labels(&[("action", action.as_str())])
.duration
.record(duration);
self.latest = Some(elapsed);
}
}
#[derive(Debug, Copy, Clone)]
pub(crate) enum MakeCanonicalAction {
CloneOldBlocks,
FindCanonicalHeader,
SplitChain,
SplitChainForks,
MergeAllChains,
UpdateCanonicalIndex,
CommitCanonicalChainToDatabase,
RevertCanonicalChainFromDatabase,
InsertOldCanonicalChain,
}
impl MakeCanonicalAction {
fn as_str(&self) -> &'static str {
match self {
MakeCanonicalAction::CloneOldBlocks => "clone old blocks",
MakeCanonicalAction::FindCanonicalHeader => "find canonical header",
MakeCanonicalAction::SplitChain => "split chain",
MakeCanonicalAction::SplitChainForks => "split chain forks",
MakeCanonicalAction::MergeAllChains => "merge all chains",
MakeCanonicalAction::UpdateCanonicalIndex => "update canonical index",
MakeCanonicalAction::CommitCanonicalChainToDatabase => {
"commit canonical chain to database"
}
MakeCanonicalAction::RevertCanonicalChainFromDatabase => {
"revert canonical chain from database"
}
MakeCanonicalAction::InsertOldCanonicalChain => "insert old canonical chain",
}
}
}
#[derive(Metrics)]
#[metrics(scope = "blockchain_tree.make_canonical")]
/// Canonicalization metrics
struct MakeCanonicalMetrics {
/// The time it took to execute an action
duration: Histogram,
}

View File

@ -392,6 +392,7 @@ where
target: "sync::pipeline",
stage = %stage_id,
checkpoint = checkpoint.block_number,
?target,
%progress,
%done,
"Stage committed progress"
@ -401,6 +402,7 @@ where
target: "sync::pipeline",
stage = %stage_id,
checkpoint = checkpoint.block_number,
?target,
%done,
"Stage committed progress"
);

View File

@ -25,6 +25,10 @@ tokio-stream = { workspace = true, features = ["sync"] }
# tracing
tracing.workspace = true
# metrics
reth-metrics.workspace = true
metrics.workspace = true
# misc
auto_impl = "1.0"
itertools.workspace = true

View File

@ -0,0 +1,102 @@
use metrics::Histogram;
use reth_metrics::Metrics;
use std::time::{Duration, Instant};
#[derive(Debug)]
pub(crate) struct DurationsRecorder {
start: Instant,
pub(crate) actions: Vec<(Action, Duration)>,
latest: Option<Duration>,
}
impl Default for DurationsRecorder {
fn default() -> Self {
Self { start: Instant::now(), actions: Vec::new(), latest: None }
}
}
impl DurationsRecorder {
/// Saves the provided duration for future logging and instantly reports as a metric with
/// `action` label.
pub(crate) fn record_duration(&mut self, action: Action, duration: Duration) {
self.actions.push((action, duration));
Metrics::new_with_labels(&[("action", format!("{action:?}"))]).duration.record(duration);
self.latest = Some(self.start.elapsed());
}
/// Records the duration since last record, saves it for future logging and instantly reports as
/// a metric with `action` label.
pub(crate) fn record_relative(&mut self, action: Action) {
let elapsed = self.start.elapsed();
let duration = elapsed - self.latest.unwrap_or_default();
self.actions.push((action, duration));
Metrics::new_with_labels(&[("action", action.as_str())]).duration.record(duration);
self.latest = Some(elapsed);
}
}
#[derive(Debug, Copy, Clone)]
pub(crate) enum Action {
InsertStorageHashing,
InsertAccountHashing,
InsertMerkleTree,
InsertBlock,
InsertState,
InsertHashes,
InsertHistoryIndices,
UpdatePipelineStages,
InsertCanonicalHeaders,
InsertHeaders,
InsertHeaderNumbers,
InsertHeaderTD,
InsertBlockOmmers,
InsertTxSenders,
InsertTransactions,
InsertTxHashNumbers,
InsertBlockWithdrawals,
InsertBlockBodyIndices,
InsertTransactionBlock,
RecoverSigners,
GetNextTxNum,
GetParentTD,
}
impl Action {
fn as_str(&self) -> &'static str {
match self {
Action::InsertStorageHashing => "insert storage hashing",
Action::InsertAccountHashing => "insert account hashing",
Action::InsertMerkleTree => "insert merkle tree",
Action::InsertBlock => "insert block",
Action::InsertState => "insert state",
Action::InsertHashes => "insert hashes",
Action::InsertHistoryIndices => "insert history indices",
Action::UpdatePipelineStages => "update pipeline stages",
Action::InsertCanonicalHeaders => "insert canonical headers",
Action::InsertHeaders => "insert headers",
Action::InsertHeaderNumbers => "insert header numbers",
Action::InsertHeaderTD => "insert header TD",
Action::InsertBlockOmmers => "insert block ommers",
Action::InsertTxSenders => "insert tx senders",
Action::InsertTransactions => "insert transactions",
Action::InsertTxHashNumbers => "insert tx hash numbers",
Action::InsertBlockWithdrawals => "insert block withdrawals",
Action::InsertBlockBodyIndices => "insert block body indices",
Action::InsertTransactionBlock => "insert transaction block",
Action::RecoverSigners => "recover signers",
Action::GetNextTxNum => "get next tx num",
Action::GetParentTD => "get parent TD",
}
}
}
#[derive(Metrics)]
#[metrics(scope = "storage.providers.database")]
/// Database provider metrics
struct Metrics {
/// The time it took to execute an action
duration: Histogram,
}

View File

@ -21,7 +21,9 @@ use std::{
};
use tracing::trace;
mod metrics;
mod provider;
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW};
/// A common provider that fetches data from a database.

View File

@ -1,5 +1,6 @@
use crate::{
bundle_state::{BundleStateInit, BundleStateWithReceipts, RevertsInit},
providers::database::metrics,
traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
},
@ -48,7 +49,9 @@ use std::{
fmt::Debug,
ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
sync::{mpsc, Arc},
time::{Duration, Instant},
};
use tracing::debug;
/// A [`DatabaseProvider`] that holds a read-only database transaction.
pub type DatabaseProviderRO<'this, DB> = DatabaseProvider<<DB as DatabaseGAT<'this>>::TX>;
@ -1600,6 +1603,8 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
let mut storage_prefix_set: HashMap<B256, PrefixSetMut> = HashMap::default();
let mut destroyed_accounts = HashSet::default();
let mut durations_recorder = metrics::DurationsRecorder::default();
// storage hashing stage
{
let lists = self.changed_storages_with_range(range.clone())?;
@ -1615,6 +1620,7 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
}
}
}
durations_recorder.record_relative(metrics::Action::InsertStorageHashing);
// account hashing stage
{
@ -1628,6 +1634,7 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
}
}
}
durations_recorder.record_relative(metrics::Action::InsertAccountHashing);
// merkle tree
{
@ -1652,6 +1659,10 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
}
trie_updates.flush(&self.tx)?;
}
durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
debug!(target: "providers::db", ?range, actions = ?durations_recorder.actions, "Inserted hashes");
Ok(())
}
@ -1824,7 +1835,7 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
}
impl<TX: DbTxMut + DbTx> HistoryWriter for DatabaseProvider<TX> {
fn calculate_history_indices(&self, range: RangeInclusive<BlockNumber>) -> RethResult<()> {
fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> RethResult<()> {
// account history stage
{
let indices = self.changed_accounts_and_blocks_with_range(range.clone())?;
@ -2062,28 +2073,39 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
prune_modes: Option<&PruneModes>,
) -> RethResult<StoredBlockBodyIndices> {
let block_number = block.number;
self.tx.put::<tables::CanonicalHeaders>(block.number, block.hash())?;
let mut durations_recorder = metrics::DurationsRecorder::default();
self.tx.put::<tables::CanonicalHeaders>(block_number, block.hash())?;
durations_recorder.record_relative(metrics::Action::InsertCanonicalHeaders);
// Put header with canonical hashes.
self.tx.put::<tables::Headers>(block.number, block.header.as_ref().clone())?;
self.tx.put::<tables::HeaderNumbers>(block.hash(), block.number)?;
self.tx.put::<tables::Headers>(block_number, block.header.as_ref().clone())?;
durations_recorder.record_relative(metrics::Action::InsertHeaders);
self.tx.put::<tables::HeaderNumbers>(block.hash(), block_number)?;
durations_recorder.record_relative(metrics::Action::InsertHeaderNumbers);
// total difficulty
let ttd = if block.number == 0 {
let ttd = if block_number == 0 {
block.difficulty
} else {
let parent_block_number = block.number - 1;
let parent_block_number = block_number - 1;
let parent_ttd = self.header_td_by_number(parent_block_number)?.unwrap_or_default();
durations_recorder.record_relative(metrics::Action::GetParentTD);
parent_ttd + block.difficulty
};
self.tx.put::<tables::HeaderTD>(block.number, ttd.into())?;
self.tx.put::<tables::HeaderTD>(block_number, ttd.into())?;
durations_recorder.record_relative(metrics::Action::InsertHeaderTD);
// insert body ommers data
if !block.ommers.is_empty() {
self.tx.put::<tables::BlockOmmers>(
block.number,
block_number,
StoredBlockOmmers { ommers: block.ommers },
)?;
durations_recorder.record_relative(metrics::Action::InsertBlockOmmers);
}
let mut next_tx_num = self
@ -2092,6 +2114,7 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
.last()?
.map(|(n, _)| n + 1)
.unwrap_or_default();
durations_recorder.record_relative(metrics::Action::GetNextTxNum);
let first_tx_num = next_tx_num;
let tx_count = block.body.len() as u64;
@ -2103,10 +2126,14 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
let senders = TransactionSigned::recover_signers(&block.body, block.body.len()).ok_or(
BlockExecutionError::Validation(BlockValidationError::SenderRecoveryError),
)?;
durations_recorder.record_relative(metrics::Action::RecoverSigners);
debug_assert_eq!(senders.len(), block.body.len(), "missing one or more senders");
block.body.into_iter().zip(senders).collect()
};
let mut tx_senders_elapsed = Duration::default();
let mut transactions_elapsed = Duration::default();
let mut tx_hash_numbers_elapsed = Duration::default();
for (transaction, sender) in tx_iter {
let hash = transaction.hash();
@ -2115,20 +2142,31 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
.filter(|prune_mode| prune_mode.is_full())
.is_none()
{
let start = Instant::now();
self.tx.put::<tables::TxSenders>(next_tx_num, sender)?;
tx_senders_elapsed += start.elapsed();
}
let start = Instant::now();
self.tx.put::<tables::Transactions>(next_tx_num, transaction.into())?;
transactions_elapsed += start.elapsed();
if prune_modes
.and_then(|modes| modes.transaction_lookup)
.filter(|prune_mode| prune_mode.is_full())
.is_none()
{
let start = Instant::now();
self.tx.put::<tables::TxHashNumber>(hash, next_tx_num)?;
tx_hash_numbers_elapsed += start.elapsed();
}
next_tx_num += 1;
}
durations_recorder.record_duration(metrics::Action::InsertTxSenders, tx_senders_elapsed);
durations_recorder
.record_duration(metrics::Action::InsertTransactions, transactions_elapsed);
durations_recorder
.record_duration(metrics::Action::InsertTxHashNumbers, tx_hash_numbers_elapsed);
if let Some(withdrawals) = block.withdrawals {
if !withdrawals.is_empty() {
@ -2136,16 +2174,26 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
block_number,
StoredBlockWithdrawals { withdrawals },
)?;
durations_recorder.record_relative(metrics::Action::InsertBlockWithdrawals);
}
}
let block_indices = StoredBlockBodyIndices { first_tx_num, tx_count };
self.tx.put::<tables::BlockBodyIndices>(block_number, block_indices.clone())?;
durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);
if !block_indices.is_empty() {
self.tx.put::<tables::TransactionBlock>(block_indices.last_tx_num(), block_number)?;
durations_recorder.record_relative(metrics::Action::InsertTransactionBlock);
}
debug!(
target: "providers::db",
?block_number,
actions = ?durations_recorder.actions,
"Inserted block"
);
Ok(block_indices)
}
@ -2168,22 +2216,31 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
let last_block_hash = last.hash();
let expected_state_root = last.state_root;
let mut durations_recorder = metrics::DurationsRecorder::default();
// Insert the blocks
for block in blocks {
let (block, senders) = block.into_components();
self.insert_block(block, Some(senders), prune_modes)?;
durations_recorder.record_relative(metrics::Action::InsertBlock);
}
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
state.write_to_db(self.tx_ref(), OriginalValuesKnown::No)?;
durations_recorder.record_relative(metrics::Action::InsertState);
self.insert_hashes(first_number..=last_block_number, last_block_hash, expected_state_root)?;
durations_recorder.record_relative(metrics::Action::InsertHashes);
self.calculate_history_indices(first_number..=last_block_number)?;
self.update_history_indices(first_number..=last_block_number)?;
durations_recorder.record_relative(metrics::Action::InsertHistoryIndices);
// Update pipeline progress
self.update_pipeline_stages(new_tip_number, false)?;
durations_recorder.record_relative(metrics::Action::UpdatePipelineStages);
debug!(target: "providers::db", actions = ?durations_recorder.actions, "Appended blocks");
Ok(())
}

View File

@ -37,5 +37,5 @@ pub trait HistoryWriter: Send + Sync {
) -> RethResult<()>;
/// Read account/storage changesets and update account/storage history indices.
fn calculate_history_indices(&self, range: RangeInclusive<BlockNumber>) -> RethResult<()>;
fn update_history_indices(&self, range: RangeInclusive<BlockNumber>) -> RethResult<()>;
}