mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 19:09:54 +00:00
chore(tree, engine, prune, stages, storage): improve logs (#4790)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -6000,6 +6000,7 @@ dependencies = [
|
||||
"reth-provider",
|
||||
"reth-stages",
|
||||
"thiserror",
|
||||
"tokio-stream",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
|
||||
@ -10,6 +10,7 @@ use reth_primitives::{
|
||||
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
|
||||
BlockNumber,
|
||||
};
|
||||
use reth_prune::PrunerEvent;
|
||||
use reth_stages::{ExecOutput, PipelineEvent};
|
||||
use std::{
|
||||
future::Future,
|
||||
@ -127,6 +128,9 @@ impl NodeState {
|
||||
|
||||
info!(number=block.number, hash=?block.hash, "Block added to canonical chain");
|
||||
}
|
||||
BeaconConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
|
||||
info!(number=head.number, hash=?head.hash, ?elapsed, "Canonical chain committed");
|
||||
}
|
||||
BeaconConsensusEngineEvent::ForkBlockAdded(block) => {
|
||||
info!(number=block.number, hash=?block.hash, "Block added to fork chain");
|
||||
}
|
||||
@ -149,6 +153,20 @@ impl NodeState {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_pruner_event(&self, event: PrunerEvent) {
|
||||
match event {
|
||||
PrunerEvent::Finished { tip_block_number, elapsed, done, parts_done } => {
|
||||
info!(
|
||||
tip_block_number = tip_block_number,
|
||||
elapsed = ?elapsed,
|
||||
done = done,
|
||||
parts_done = ?parts_done,
|
||||
"Pruner finished"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A node event.
|
||||
@ -162,6 +180,8 @@ pub enum NodeEvent {
|
||||
ConsensusEngine(BeaconConsensusEngineEvent),
|
||||
/// A Consensus Layer health event.
|
||||
ConsensusLayerHealth(ConsensusLayerHealthEvent),
|
||||
/// A pruner event
|
||||
Pruner(PrunerEvent),
|
||||
}
|
||||
|
||||
impl From<NetworkEvent> for NodeEvent {
|
||||
@ -188,6 +208,12 @@ impl From<ConsensusLayerHealthEvent> for NodeEvent {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<PrunerEvent> for NodeEvent {
|
||||
fn from(event: PrunerEvent) -> Self {
|
||||
NodeEvent::Pruner(event)
|
||||
}
|
||||
}
|
||||
|
||||
/// Displays relevant information to the user from components of the node, and periodically
|
||||
/// displays the high-level status of the node.
|
||||
pub async fn handle_events<E>(
|
||||
@ -260,6 +286,9 @@ where
|
||||
NodeEvent::ConsensusLayerHealth(event) => {
|
||||
this.state.handle_consensus_layer_health_event(event)
|
||||
}
|
||||
NodeEvent::Pruner(event) => {
|
||||
this.state.handle_pruner_event(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -452,17 +452,21 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
|
||||
|
||||
let mut hooks = EngineHooks::new();
|
||||
|
||||
if let Some(prune_config) = prune_config {
|
||||
let pruner_events = if let Some(prune_config) = prune_config {
|
||||
info!(target: "reth::cli", ?prune_config, "Pruner initialized");
|
||||
let pruner = reth_prune::Pruner::new(
|
||||
let mut pruner = reth_prune::Pruner::new(
|
||||
db.clone(),
|
||||
self.chain.clone(),
|
||||
prune_config.block_interval,
|
||||
prune_config.parts,
|
||||
self.chain.prune_batch_sizes,
|
||||
);
|
||||
let events = pruner.events();
|
||||
hooks.add(PruneHook::new(pruner, Box::new(ctx.task_executor.clone())));
|
||||
}
|
||||
Either::Left(events)
|
||||
} else {
|
||||
Either::Right(stream::empty())
|
||||
};
|
||||
|
||||
// Configure the consensus engine
|
||||
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
|
||||
@ -493,7 +497,8 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
|
||||
)
|
||||
} else {
|
||||
Either::Right(stream::empty())
|
||||
}
|
||||
},
|
||||
pruner_events.map(Into::into)
|
||||
);
|
||||
ctx.task_executor.spawn_critical(
|
||||
"events task",
|
||||
|
||||
@ -302,7 +302,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
|
||||
///
|
||||
/// If blocks does not have parent [`BlockStatus::Disconnected`] would be returned, in which
|
||||
/// case it is buffered for future inclusion.
|
||||
#[instrument(skip_all, fields(block = ?block.num_hash()), target = "blockchain_tree", ret)]
|
||||
#[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()), target = "blockchain_tree", ret)]
|
||||
fn try_insert_validated_block(
|
||||
&mut self,
|
||||
block: SealedBlockWithSenders,
|
||||
@ -372,7 +372,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
|
||||
/// WARNING: this expects that the block extends the canonical chain: The block's parent is
|
||||
/// part of the canonical chain (e.g. the block's parent is the latest canonical hash). See also
|
||||
/// [Self::is_block_hash_canonical].
|
||||
#[instrument(skip_all, target = "blockchain_tree")]
|
||||
#[instrument(level = "trace", skip_all, target = "blockchain_tree")]
|
||||
fn try_append_canonical_chain(
|
||||
&mut self,
|
||||
block: SealedBlockWithSenders,
|
||||
@ -453,7 +453,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
|
||||
/// Try inserting a block into the given side chain.
|
||||
///
|
||||
/// WARNING: This expects a valid side chain id, see [BlockIndices::get_blocks_chain_id]
|
||||
#[instrument(skip_all, target = "blockchain_tree")]
|
||||
#[instrument(level = "trace", skip_all, target = "blockchain_tree")]
|
||||
fn try_insert_block_into_side_chain(
|
||||
&mut self,
|
||||
block: SealedBlockWithSenders,
|
||||
@ -923,7 +923,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
|
||||
///
|
||||
/// Returns `Ok` if the blocks were canonicalized, or if the blocks were already canonical.
|
||||
#[track_caller]
|
||||
#[instrument(skip(self), target = "blockchain_tree")]
|
||||
#[instrument(level = "trace", skip(self), target = "blockchain_tree")]
|
||||
pub fn make_canonical(&mut self, block_hash: &BlockHash) -> RethResult<CanonicalOutcome> {
|
||||
let old_block_indices = self.block_indices.clone();
|
||||
let old_buffered_blocks = self.buffered_blocks.parent_to_child.clone();
|
||||
@ -992,7 +992,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
|
||||
|
||||
// event about new canonical chain.
|
||||
let chain_notification;
|
||||
info!(
|
||||
debug!(
|
||||
target: "blockchain_tree",
|
||||
"Committing new canonical chain: {}", DisplayBlocksChain(new_canon_chain.blocks())
|
||||
);
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
use crate::engine::forkchoice::ForkchoiceStatus;
|
||||
use reth_interfaces::consensus::ForkchoiceState;
|
||||
use reth_primitives::SealedBlock;
|
||||
use std::sync::Arc;
|
||||
use reth_primitives::{SealedBlock, SealedHeader};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
/// Events emitted by [crate::BeaconConsensusEngine].
|
||||
#[derive(Clone, Debug)]
|
||||
@ -10,6 +10,8 @@ pub enum BeaconConsensusEngineEvent {
|
||||
ForkchoiceUpdated(ForkchoiceState, ForkchoiceStatus),
|
||||
/// A block was added to the canonical chain.
|
||||
CanonicalBlockAdded(Arc<SealedBlock>),
|
||||
/// A canonical chain was committed.
|
||||
CanonicalChainCommitted(SealedHeader, Duration),
|
||||
/// A block was added to the fork chain.
|
||||
ForkBlockAdded(Arc<SealedBlock>),
|
||||
}
|
||||
|
||||
@ -40,7 +40,7 @@ use std::{
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
time::Instant,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
use tokio::sync::{
|
||||
mpsc,
|
||||
@ -656,16 +656,33 @@ where
|
||||
|
||||
let start = Instant::now();
|
||||
let make_canonical_result = self.blockchain.make_canonical(&state.head_block_hash);
|
||||
self.record_make_canonical_latency(start, &make_canonical_result);
|
||||
let elapsed = self.record_make_canonical_latency(start, &make_canonical_result);
|
||||
let status = match make_canonical_result {
|
||||
Ok(outcome) => {
|
||||
if !outcome.is_already_canonical() {
|
||||
debug!(target: "consensus::engine", hash=?state.head_block_hash, number=outcome.header().number, "canonicalized new head");
|
||||
match outcome {
|
||||
CanonicalOutcome::AlreadyCanonical { ref header } => {
|
||||
debug!(
|
||||
target: "consensus::engine",
|
||||
fcu_head_num=?header.number,
|
||||
current_head_num=?self.blockchain.canonical_tip().number,
|
||||
"Ignoring beacon update to old head"
|
||||
);
|
||||
}
|
||||
CanonicalOutcome::Committed { ref head } => {
|
||||
debug!(
|
||||
target: "consensus::engine",
|
||||
hash=?state.head_block_hash,
|
||||
number=head.number,
|
||||
"Canonicalized new head"
|
||||
);
|
||||
|
||||
// new VALID update that moved the canonical chain forward
|
||||
let _ = self.update_head(outcome.header().clone());
|
||||
} else {
|
||||
debug!(target: "consensus::engine", fcu_head_num=?outcome.header().number, current_head_num=?self.blockchain.canonical_tip().number, "Ignoring beacon update to old head");
|
||||
// new VALID update that moved the canonical chain forward
|
||||
let _ = self.update_head(head.clone());
|
||||
self.listeners.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
|
||||
head.clone(),
|
||||
elapsed,
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(attrs) = attrs {
|
||||
@ -721,7 +738,7 @@ where
|
||||
&self,
|
||||
start: Instant,
|
||||
outcome: &Result<CanonicalOutcome, RethError>,
|
||||
) {
|
||||
) -> Duration {
|
||||
let elapsed = start.elapsed();
|
||||
self.metrics.make_canonical_latency.record(elapsed);
|
||||
match outcome {
|
||||
@ -733,6 +750,8 @@ where
|
||||
}
|
||||
Err(_) => self.metrics.make_canonical_error_latency.record(elapsed),
|
||||
}
|
||||
|
||||
elapsed
|
||||
}
|
||||
|
||||
/// Ensures that the given forkchoice state is consistent, assuming the head block has been
|
||||
@ -1469,10 +1488,20 @@ where
|
||||
// optimistically try to make the head of the current FCU target canonical, the sync
|
||||
// target might have changed since the block download request was issued
|
||||
// (new FCU received)
|
||||
match self.blockchain.make_canonical(&target.head_block_hash) {
|
||||
let start = Instant::now();
|
||||
let make_canonical_result = self.blockchain.make_canonical(&target.head_block_hash);
|
||||
let elapsed = self.record_make_canonical_latency(start, &make_canonical_result);
|
||||
match make_canonical_result {
|
||||
Ok(outcome) => {
|
||||
if let CanonicalOutcome::Committed { ref head } = outcome {
|
||||
self.listeners.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
|
||||
head.clone(),
|
||||
elapsed,
|
||||
));
|
||||
}
|
||||
|
||||
let new_head = outcome.into_header();
|
||||
debug!(target: "consensus::engine", hash=?new_head.hash, number=new_head.number, "canonicalized new head");
|
||||
debug!(target: "consensus::engine", hash=?new_head.hash, number=new_head.number, "Canonicalized new head");
|
||||
|
||||
// we can update the FCU blocks
|
||||
let _ = self.update_canon_chain(new_head, &target);
|
||||
|
||||
@ -38,7 +38,7 @@ use tokio::{
|
||||
};
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::sync::PollSender;
|
||||
use tracing::{debug, info, trace};
|
||||
use tracing::{debug, trace};
|
||||
|
||||
/// Constants for timeout updating
|
||||
|
||||
@ -508,7 +508,12 @@ impl Future for ActiveSession {
|
||||
progress = true;
|
||||
match cmd {
|
||||
SessionCommand::Disconnect { reason } => {
|
||||
info!(target: "net::session", ?reason, remote_peer_id=?this.remote_peer_id, "Received disconnect command for session");
|
||||
debug!(
|
||||
target: "net::session",
|
||||
?reason,
|
||||
remote_peer_id=?this.remote_peer_id,
|
||||
"Received disconnect command for session"
|
||||
);
|
||||
let reason =
|
||||
reason.unwrap_or(DisconnectReason::DisconnectRequested);
|
||||
|
||||
|
||||
@ -26,6 +26,7 @@ tracing.workspace = true
|
||||
thiserror.workspace = true
|
||||
itertools.workspace = true
|
||||
rayon.workspace = true
|
||||
tokio-stream.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
# reth
|
||||
|
||||
14
crates/prune/src/event.rs
Normal file
14
crates/prune/src/event.rs
Normal file
@ -0,0 +1,14 @@
|
||||
use reth_primitives::{BlockNumber, PrunePart};
|
||||
use std::{collections::BTreeMap, time::Duration};
|
||||
|
||||
/// An event emitted by a [Pruner][crate::Pruner].
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub enum PrunerEvent {
|
||||
/// Emitted when pruner finished running.
|
||||
Finished {
|
||||
tip_block_number: BlockNumber,
|
||||
elapsed: Duration,
|
||||
done: bool,
|
||||
parts_done: BTreeMap<PrunePart, bool>,
|
||||
},
|
||||
}
|
||||
@ -10,9 +10,11 @@
|
||||
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
|
||||
|
||||
mod error;
|
||||
mod event;
|
||||
mod metrics;
|
||||
mod pruner;
|
||||
|
||||
use crate::metrics::Metrics;
|
||||
pub use error::PrunerError;
|
||||
pub use event::PrunerEvent;
|
||||
pub use pruner::{Pruner, PrunerResult, PrunerWithResult};
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
//! Support for pruning.
|
||||
|
||||
use crate::{Metrics, PrunerError};
|
||||
use crate::{Metrics, PrunerError, PrunerEvent};
|
||||
use rayon::prelude::*;
|
||||
use reth_db::{
|
||||
abstraction::cursor::{DbCursorRO, DbCursorRW},
|
||||
@ -13,15 +13,16 @@ use reth_db::{
|
||||
};
|
||||
use reth_interfaces::RethResult;
|
||||
use reth_primitives::{
|
||||
BlockNumber, ChainSpec, PruneBatchSizes, PruneCheckpoint, PruneMode, PruneModes, PrunePart,
|
||||
TxNumber, MINIMUM_PRUNING_DISTANCE,
|
||||
listener::EventListeners, BlockNumber, ChainSpec, PruneBatchSizes, PruneCheckpoint, PruneMode,
|
||||
PruneModes, PrunePart, TxNumber, MINIMUM_PRUNING_DISTANCE,
|
||||
};
|
||||
use reth_provider::{
|
||||
BlockReader, DatabaseProviderRW, ProviderFactory, PruneCheckpointReader, PruneCheckpointWriter,
|
||||
TransactionsProvider,
|
||||
};
|
||||
use std::{collections::HashMap, ops::RangeInclusive, sync::Arc, time::Instant};
|
||||
use tracing::{debug, error, info, instrument, trace};
|
||||
use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc, time::Instant};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
use tracing::{debug, error, instrument, trace};
|
||||
|
||||
/// Result of [Pruner::run] execution.
|
||||
///
|
||||
@ -46,6 +47,7 @@ pub struct Pruner<DB> {
|
||||
modes: PruneModes,
|
||||
/// Maximum entries to prune per block, per prune part.
|
||||
batch_sizes: PruneBatchSizes,
|
||||
listeners: EventListeners<PrunerEvent>,
|
||||
}
|
||||
|
||||
impl<DB: Database> Pruner<DB> {
|
||||
@ -64,9 +66,15 @@ impl<DB: Database> Pruner<DB> {
|
||||
last_pruned_block_number: None,
|
||||
modes,
|
||||
batch_sizes,
|
||||
listeners: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Listen for events on the prune.
|
||||
pub fn events(&mut self) -> UnboundedReceiverStream<PrunerEvent> {
|
||||
self.listeners.new_listener()
|
||||
}
|
||||
|
||||
/// Run the pruner
|
||||
pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult {
|
||||
if tip_block_number == 0 {
|
||||
@ -83,7 +91,7 @@ impl<DB: Database> Pruner<DB> {
|
||||
|
||||
let mut done = true;
|
||||
|
||||
let mut parts_done = HashMap::new();
|
||||
let mut parts_done = BTreeMap::new();
|
||||
|
||||
if let Some((to_block, prune_mode)) =
|
||||
self.modes.prune_target_block_receipts(tip_block_number)?
|
||||
@ -235,7 +243,7 @@ impl<DB: Database> Pruner<DB> {
|
||||
let elapsed = start.elapsed();
|
||||
self.metrics.duration_seconds.record(elapsed);
|
||||
|
||||
info!(
|
||||
trace!(
|
||||
target: "pruner",
|
||||
%tip_block_number,
|
||||
?elapsed,
|
||||
@ -243,6 +251,14 @@ impl<DB: Database> Pruner<DB> {
|
||||
?parts_done,
|
||||
"Pruner finished"
|
||||
);
|
||||
|
||||
self.listeners.notify(PrunerEvent::Finished {
|
||||
tip_block_number,
|
||||
elapsed,
|
||||
done,
|
||||
parts_done,
|
||||
});
|
||||
|
||||
Ok(done)
|
||||
}
|
||||
|
||||
|
||||
@ -192,8 +192,14 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
|
||||
// write output
|
||||
state.write_to_db(provider.tx_ref(), OriginalValuesKnown::Yes)?;
|
||||
let db_write_duration = time.elapsed();
|
||||
info!(target: "sync::stages::execution", block_fetch=?fetch_block_duration, execution=?execution_duration,
|
||||
write_preperation=?write_preparation_duration, write=?db_write_duration, " Execution duration.");
|
||||
debug!(
|
||||
target: "sync::stages::execution",
|
||||
block_fetch = ?fetch_block_duration,
|
||||
execution = ?execution_duration,
|
||||
write_preperation = ?write_preparation_duration,
|
||||
write = ?db_write_duration,
|
||||
"Execution time"
|
||||
);
|
||||
|
||||
executor.stats().log_info();
|
||||
|
||||
|
||||
@ -4,7 +4,7 @@ use crate::{bundle_state::BundleStateWithReceipts, StateProvider};
|
||||
use reth_interfaces::executor::BlockExecutionError;
|
||||
use reth_primitives::{Address, Block, BlockNumber, ChainSpec, PruneModes, U256};
|
||||
use std::time::Duration;
|
||||
use tracing::info;
|
||||
use tracing::debug;
|
||||
|
||||
/// Executor factory that would create the EVM with particular state provider.
|
||||
///
|
||||
@ -85,13 +85,15 @@ pub struct BlockExecutorStats {
|
||||
impl BlockExecutorStats {
|
||||
/// Log duration to info level log.
|
||||
pub fn log_info(&self) {
|
||||
info!(target: "evm",
|
||||
debug!(
|
||||
target: "evm",
|
||||
evm_transact = ?self.execution_duration,
|
||||
apply_state = ?self.apply_state_duration,
|
||||
apply_post_state = ?self.apply_post_execution_state_changes_duration,
|
||||
merge_transitions = ?self.merge_transitions_duration,
|
||||
receipt_root = ?self.receipt_root_duration,
|
||||
sender_recovery = ?self.sender_recovery_duration,
|
||||
"Execution time");
|
||||
"Execution time"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user