From 5e01a21ec45e9166a2b7dba56feb3ed1bc0750fe Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 26 Sep 2023 18:01:37 +0100 Subject: [PATCH] chore(tree, engine, prune, stages, storage): improve logs (#4790) --- Cargo.lock | 1 + bin/reth/src/node/events.rs | 29 +++++++++++ bin/reth/src/node/mod.rs | 13 +++-- crates/blockchain-tree/src/blockchain_tree.rs | 10 ++-- crates/consensus/beacon/src/engine/event.rs | 6 ++- crates/consensus/beacon/src/engine/mod.rs | 51 +++++++++++++++---- crates/net/network/src/session/active.rs | 9 +++- crates/prune/Cargo.toml | 1 + crates/prune/src/event.rs | 14 +++++ crates/prune/src/lib.rs | 2 + crates/prune/src/pruner.rs | 30 ++++++++--- crates/stages/src/stages/execution.rs | 10 +++- .../storage/provider/src/traits/executor.rs | 8 +-- 13 files changed, 148 insertions(+), 36 deletions(-) create mode 100644 crates/prune/src/event.rs diff --git a/Cargo.lock b/Cargo.lock index 553febacf..bddd2001b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6000,6 +6000,7 @@ dependencies = [ "reth-provider", "reth-stages", "thiserror", + "tokio-stream", "tracing", ] diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index 9a52ac7ae..a89cf101f 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -10,6 +10,7 @@ use reth_primitives::{ stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, BlockNumber, }; +use reth_prune::PrunerEvent; use reth_stages::{ExecOutput, PipelineEvent}; use std::{ future::Future, @@ -127,6 +128,9 @@ impl NodeState { info!(number=block.number, hash=?block.hash, "Block added to canonical chain"); } + BeaconConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => { + info!(number=head.number, hash=?head.hash, ?elapsed, "Canonical chain committed"); + } BeaconConsensusEngineEvent::ForkBlockAdded(block) => { info!(number=block.number, hash=?block.hash, "Block added to fork chain"); } @@ -149,6 +153,20 @@ impl NodeState { } } } + + fn handle_pruner_event(&self, event: PrunerEvent) { + match event { + PrunerEvent::Finished { tip_block_number, elapsed, done, parts_done } => { + info!( + tip_block_number = tip_block_number, + elapsed = ?elapsed, + done = done, + parts_done = ?parts_done, + "Pruner finished" + ); + } + } + } } /// A node event. @@ -162,6 +180,8 @@ pub enum NodeEvent { ConsensusEngine(BeaconConsensusEngineEvent), /// A Consensus Layer health event. ConsensusLayerHealth(ConsensusLayerHealthEvent), + /// A pruner event + Pruner(PrunerEvent), } impl From for NodeEvent { @@ -188,6 +208,12 @@ impl From for NodeEvent { } } +impl From for NodeEvent { + fn from(event: PrunerEvent) -> Self { + NodeEvent::Pruner(event) + } +} + /// Displays relevant information to the user from components of the node, and periodically /// displays the high-level status of the node. pub async fn handle_events( @@ -260,6 +286,9 @@ where NodeEvent::ConsensusLayerHealth(event) => { this.state.handle_consensus_layer_health_event(event) } + NodeEvent::Pruner(event) => { + this.state.handle_pruner_event(event); + } } } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 87ecee62c..68a106d9b 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -452,17 +452,21 @@ impl NodeCommand { let mut hooks = EngineHooks::new(); - if let Some(prune_config) = prune_config { + let pruner_events = if let Some(prune_config) = prune_config { info!(target: "reth::cli", ?prune_config, "Pruner initialized"); - let pruner = reth_prune::Pruner::new( + let mut pruner = reth_prune::Pruner::new( db.clone(), self.chain.clone(), prune_config.block_interval, prune_config.parts, self.chain.prune_batch_sizes, ); + let events = pruner.events(); hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone()))); - } + Either::Left(events) + } else { + Either::Right(stream::empty()) + }; // Configure the consensus engine let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel( @@ -493,7 +497,8 @@ impl NodeCommand { ) } else { Either::Right(stream::empty()) - } + }, + pruner_events.map(Into::into) ); ctx.task_executor.spawn_critical( "events task", diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index bbd2abc19..b28004f3f 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -302,7 +302,7 @@ impl BlockchainTree /// /// If blocks does not have parent [`BlockStatus::Disconnected`] would be returned, in which /// case it is buffered for future inclusion. - #[instrument(skip_all, fields(block = ?block.num_hash()), target = "blockchain_tree", ret)] + #[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()), target = "blockchain_tree", ret)] fn try_insert_validated_block( &mut self, block: SealedBlockWithSenders, @@ -372,7 +372,7 @@ impl BlockchainTree /// WARNING: this expects that the block extends the canonical chain: The block's parent is /// part of the canonical chain (e.g. the block's parent is the latest canonical hash). See also /// [Self::is_block_hash_canonical]. - #[instrument(skip_all, target = "blockchain_tree")] + #[instrument(level = "trace", skip_all, target = "blockchain_tree")] fn try_append_canonical_chain( &mut self, block: SealedBlockWithSenders, @@ -453,7 +453,7 @@ impl BlockchainTree /// Try inserting a block into the given side chain. /// /// WARNING: This expects a valid side chain id, see [BlockIndices::get_blocks_chain_id] - #[instrument(skip_all, target = "blockchain_tree")] + #[instrument(level = "trace", skip_all, target = "blockchain_tree")] fn try_insert_block_into_side_chain( &mut self, block: SealedBlockWithSenders, @@ -923,7 +923,7 @@ impl BlockchainTree /// /// Returns `Ok` if the blocks were canonicalized, or if the blocks were already canonical. #[track_caller] - #[instrument(skip(self), target = "blockchain_tree")] + #[instrument(level = "trace", skip(self), target = "blockchain_tree")] pub fn make_canonical(&mut self, block_hash: &BlockHash) -> RethResult { let old_block_indices = self.block_indices.clone(); let old_buffered_blocks = self.buffered_blocks.parent_to_child.clone(); @@ -992,7 +992,7 @@ impl BlockchainTree // event about new canonical chain. let chain_notification; - info!( + debug!( target: "blockchain_tree", "Committing new canonical chain: {}", DisplayBlocksChain(new_canon_chain.blocks()) ); diff --git a/crates/consensus/beacon/src/engine/event.rs b/crates/consensus/beacon/src/engine/event.rs index 7719325ba..27f737c1b 100644 --- a/crates/consensus/beacon/src/engine/event.rs +++ b/crates/consensus/beacon/src/engine/event.rs @@ -1,7 +1,7 @@ use crate::engine::forkchoice::ForkchoiceStatus; use reth_interfaces::consensus::ForkchoiceState; -use reth_primitives::SealedBlock; -use std::sync::Arc; +use reth_primitives::{SealedBlock, SealedHeader}; +use std::{sync::Arc, time::Duration}; /// Events emitted by [crate::BeaconConsensusEngine]. #[derive(Clone, Debug)] @@ -10,6 +10,8 @@ pub enum BeaconConsensusEngineEvent { ForkchoiceUpdated(ForkchoiceState, ForkchoiceStatus), /// A block was added to the canonical chain. CanonicalBlockAdded(Arc), + /// A canonical chain was committed. + CanonicalChainCommitted(SealedHeader, Duration), /// A block was added to the fork chain. ForkBlockAdded(Arc), } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index a6eaed2da..b2df319b3 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -40,7 +40,7 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, - time::Instant, + time::{Duration, Instant}, }; use tokio::sync::{ mpsc, @@ -656,16 +656,33 @@ where let start = Instant::now(); let make_canonical_result = self.blockchain.make_canonical(&state.head_block_hash); - self.record_make_canonical_latency(start, &make_canonical_result); + let elapsed = self.record_make_canonical_latency(start, &make_canonical_result); let status = match make_canonical_result { Ok(outcome) => { - if !outcome.is_already_canonical() { - debug!(target: "consensus::engine", hash=?state.head_block_hash, number=outcome.header().number, "canonicalized new head"); + match outcome { + CanonicalOutcome::AlreadyCanonical { ref header } => { + debug!( + target: "consensus::engine", + fcu_head_num=?header.number, + current_head_num=?self.blockchain.canonical_tip().number, + "Ignoring beacon update to old head" + ); + } + CanonicalOutcome::Committed { ref head } => { + debug!( + target: "consensus::engine", + hash=?state.head_block_hash, + number=head.number, + "Canonicalized new head" + ); - // new VALID update that moved the canonical chain forward - let _ = self.update_head(outcome.header().clone()); - } else { - debug!(target: "consensus::engine", fcu_head_num=?outcome.header().number, current_head_num=?self.blockchain.canonical_tip().number, "Ignoring beacon update to old head"); + // new VALID update that moved the canonical chain forward + let _ = self.update_head(head.clone()); + self.listeners.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted( + head.clone(), + elapsed, + )); + } } if let Some(attrs) = attrs { @@ -721,7 +738,7 @@ where &self, start: Instant, outcome: &Result, - ) { + ) -> Duration { let elapsed = start.elapsed(); self.metrics.make_canonical_latency.record(elapsed); match outcome { @@ -733,6 +750,8 @@ where } Err(_) => self.metrics.make_canonical_error_latency.record(elapsed), } + + elapsed } /// Ensures that the given forkchoice state is consistent, assuming the head block has been @@ -1469,10 +1488,20 @@ where // optimistically try to make the head of the current FCU target canonical, the sync // target might have changed since the block download request was issued // (new FCU received) - match self.blockchain.make_canonical(&target.head_block_hash) { + let start = Instant::now(); + let make_canonical_result = self.blockchain.make_canonical(&target.head_block_hash); + let elapsed = self.record_make_canonical_latency(start, &make_canonical_result); + match make_canonical_result { Ok(outcome) => { + if let CanonicalOutcome::Committed { ref head } = outcome { + self.listeners.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted( + head.clone(), + elapsed, + )); + } + let new_head = outcome.into_header(); - debug!(target: "consensus::engine", hash=?new_head.hash, number=new_head.number, "canonicalized new head"); + debug!(target: "consensus::engine", hash=?new_head.hash, number=new_head.number, "Canonicalized new head"); // we can update the FCU blocks let _ = self.update_canon_chain(new_head, &target); diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index b80eef231..91eaf8bce 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -38,7 +38,7 @@ use tokio::{ }; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::PollSender; -use tracing::{debug, info, trace}; +use tracing::{debug, trace}; /// Constants for timeout updating @@ -508,7 +508,12 @@ impl Future for ActiveSession { progress = true; match cmd { SessionCommand::Disconnect { reason } => { - info!(target: "net::session", ?reason, remote_peer_id=?this.remote_peer_id, "Received disconnect command for session"); + debug!( + target: "net::session", + ?reason, + remote_peer_id=?this.remote_peer_id, + "Received disconnect command for session" + ); let reason = reason.unwrap_or(DisconnectReason::DisconnectRequested); diff --git a/crates/prune/Cargo.toml b/crates/prune/Cargo.toml index 4582d27f8..3e0f94e15 100644 --- a/crates/prune/Cargo.toml +++ b/crates/prune/Cargo.toml @@ -26,6 +26,7 @@ tracing.workspace = true thiserror.workspace = true itertools.workspace = true rayon.workspace = true +tokio-stream.workspace = true [dev-dependencies] # reth diff --git a/crates/prune/src/event.rs b/crates/prune/src/event.rs new file mode 100644 index 000000000..9e0c48976 --- /dev/null +++ b/crates/prune/src/event.rs @@ -0,0 +1,14 @@ +use reth_primitives::{BlockNumber, PrunePart}; +use std::{collections::BTreeMap, time::Duration}; + +/// An event emitted by a [Pruner][crate::Pruner]. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum PrunerEvent { + /// Emitted when pruner finished running. + Finished { + tip_block_number: BlockNumber, + elapsed: Duration, + done: bool, + parts_done: BTreeMap, + }, +} diff --git a/crates/prune/src/lib.rs b/crates/prune/src/lib.rs index 30ff207bc..4087ad94f 100644 --- a/crates/prune/src/lib.rs +++ b/crates/prune/src/lib.rs @@ -10,9 +10,11 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] mod error; +mod event; mod metrics; mod pruner; use crate::metrics::Metrics; pub use error::PrunerError; +pub use event::PrunerEvent; pub use pruner::{Pruner, PrunerResult, PrunerWithResult}; diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index a38462b89..e23b400e8 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -1,6 +1,6 @@ //! Support for pruning. -use crate::{Metrics, PrunerError}; +use crate::{Metrics, PrunerError, PrunerEvent}; use rayon::prelude::*; use reth_db::{ abstraction::cursor::{DbCursorRO, DbCursorRW}, @@ -13,15 +13,16 @@ use reth_db::{ }; use reth_interfaces::RethResult; use reth_primitives::{ - BlockNumber, ChainSpec, PruneBatchSizes, PruneCheckpoint, PruneMode, PruneModes, PrunePart, - TxNumber, MINIMUM_PRUNING_DISTANCE, + listener::EventListeners, BlockNumber, ChainSpec, PruneBatchSizes, PruneCheckpoint, PruneMode, + PruneModes, PrunePart, TxNumber, MINIMUM_PRUNING_DISTANCE, }; use reth_provider::{ BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter, TransactionsProvider, }; -use std::{collections::HashMap, ops::RangeInclusive, sync::Arc, time::Instant}; -use tracing::{debug, error, info, instrument, trace}; +use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc, time::Instant}; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::{debug, error, instrument, trace}; /// Result of [Pruner::run] execution. /// @@ -46,6 +47,7 @@ pub struct Pruner { modes: PruneModes, /// Maximum entries to prune per block, per prune part. batch_sizes: PruneBatchSizes, + listeners: EventListeners, } impl Pruner { @@ -64,9 +66,15 @@ impl Pruner { last_pruned_block_number: None, modes, batch_sizes, + listeners: Default::default(), } } + /// Listen for events on the prune. + pub fn events(&mut self) -> UnboundedReceiverStream { + self.listeners.new_listener() + } + /// Run the pruner pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult { if tip_block_number == 0 { @@ -83,7 +91,7 @@ impl Pruner { let mut done = true; - let mut parts_done = HashMap::new(); + let mut parts_done = BTreeMap::new(); if let Some((to_block, prune_mode)) = self.modes.prune_target_block_receipts(tip_block_number)? @@ -235,7 +243,7 @@ impl Pruner { let elapsed = start.elapsed(); self.metrics.duration_seconds.record(elapsed); - info!( + trace!( target: "pruner", %tip_block_number, ?elapsed, @@ -243,6 +251,14 @@ impl Pruner { ?parts_done, "Pruner finished" ); + + self.listeners.notify(PrunerEvent::Finished { + tip_block_number, + elapsed, + done, + parts_done, + }); + Ok(done) } diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index e2e57810d..cd66ed954 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -192,8 +192,14 @@ impl ExecutionStage { // write output state.write_to_db(provider.tx_ref(), OriginalValuesKnown::Yes)?; let db_write_duration = time.elapsed(); - info!(target: "sync::stages::execution", block_fetch=?fetch_block_duration, execution=?execution_duration, - write_preperation=?write_preparation_duration, write=?db_write_duration, " Execution duration."); + debug!( + target: "sync::stages::execution", + block_fetch = ?fetch_block_duration, + execution = ?execution_duration, + write_preperation = ?write_preparation_duration, + write = ?db_write_duration, + "Execution time" + ); executor.stats().log_info(); diff --git a/crates/storage/provider/src/traits/executor.rs b/crates/storage/provider/src/traits/executor.rs index e0c1805fa..22832f241 100644 --- a/crates/storage/provider/src/traits/executor.rs +++ b/crates/storage/provider/src/traits/executor.rs @@ -4,7 +4,7 @@ use crate::{bundle_state::BundleStateWithReceipts, StateProvider}; use reth_interfaces::executor::BlockExecutionError; use reth_primitives::{Address, Block, BlockNumber, ChainSpec, PruneModes, U256}; use std::time::Duration; -use tracing::info; +use tracing::debug; /// Executor factory that would create the EVM with particular state provider. /// @@ -85,13 +85,15 @@ pub struct BlockExecutorStats { impl BlockExecutorStats { /// Log duration to info level log. pub fn log_info(&self) { - info!(target: "evm", + debug!( + target: "evm", evm_transact = ?self.execution_duration, apply_state = ?self.apply_state_duration, apply_post_state = ?self.apply_post_execution_state_changes_duration, merge_transitions = ?self.merge_transitions_duration, receipt_root = ?self.receipt_root_duration, sender_recovery = ?self.sender_recovery_duration, - "Execution time"); + "Execution time" + ); } }