chore: move events mod to node core (#6585)

This commit is contained in:
Matthias Seitz
2024-02-13 16:58:00 +01:00
committed by GitHub
parent 7c16846cc8
commit ada3547fd1
8 changed files with 34 additions and 30 deletions

View File

@ -0,0 +1,98 @@
//! Events related to Consensus Layer health.
use futures::Stream;
use reth_provider::CanonChainTracker;
use std::{
fmt,
pin::Pin,
task::{ready, Context, Poll},
time::Duration,
};
use tokio::time::{Instant, Interval};
/// Interval of checking Consensus Layer client health.
const CHECK_INTERVAL: Duration = Duration::from_secs(300);
/// Period of not exchanging transition configurations with Consensus Layer client,
/// after which the warning is issued.
const NO_TRANSITION_CONFIG_EXCHANGED_PERIOD: Duration = Duration::from_secs(120);
/// Period of not receiving fork choice updates from Consensus Layer client,
/// after which the warning is issued.
const NO_FORKCHOICE_UPDATE_RECEIVED_PERIOD: Duration = Duration::from_secs(120);
/// A Stream of [ConsensusLayerHealthEvent].
pub struct ConsensusLayerHealthEvents {
interval: Interval,
canon_chain: Box<dyn CanonChainTracker>,
}
impl fmt::Debug for ConsensusLayerHealthEvents {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ConsensusLayerHealthEvents").field("interval", &self.interval).finish()
}
}
impl ConsensusLayerHealthEvents {
/// Creates a new [ConsensusLayerHealthEvents] with the given canonical chain tracker.
pub fn new(canon_chain: Box<dyn CanonChainTracker>) -> Self {
// Skip the first tick to prevent the false `ConsensusLayerHealthEvent::NeverSeen` event.
let interval = tokio::time::interval_at(Instant::now() + CHECK_INTERVAL, CHECK_INTERVAL);
Self { interval, canon_chain }
}
}
impl Stream for ConsensusLayerHealthEvents {
type Item = ConsensusLayerHealthEvent;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
ready!(this.interval.poll_tick(cx));
if let Some(fork_choice) = this.canon_chain.last_received_update_timestamp() {
if fork_choice.elapsed() <= NO_FORKCHOICE_UPDATE_RECEIVED_PERIOD {
// We had an FCU, and it's recent. CL is healthy.
continue
} else {
// We had an FCU, but it's too old.
return Poll::Ready(Some(
ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(
fork_choice.elapsed(),
),
))
}
}
if let Some(transition_config) =
this.canon_chain.last_exchanged_transition_configuration_timestamp()
{
if transition_config.elapsed() <= NO_TRANSITION_CONFIG_EXCHANGED_PERIOD {
// We never had an FCU, but had a transition config exchange, and it's recent.
return Poll::Ready(Some(ConsensusLayerHealthEvent::NeverReceivedUpdates))
} else {
// We never had an FCU, but had a transition config exchange, but it's too old.
return Poll::Ready(Some(ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(
transition_config.elapsed(),
)))
}
}
// We never had both FCU and transition config exchange.
return Poll::Ready(Some(ConsensusLayerHealthEvent::NeverSeen))
}
}
}
/// Event that is triggered when Consensus Layer health is degraded from the
/// Execution Layer point of view.
#[derive(Clone, Copy, Debug)]
pub enum ConsensusLayerHealthEvent {
/// Consensus Layer client was never seen.
NeverSeen,
/// Consensus Layer client has not been seen for a while.
HasNotBeenSeenForAWhile(Duration),
/// Updates from the Consensus Layer client were never received.
NeverReceivedUpdates,
/// Updates from the Consensus Layer client have not been received for a while.
HaveNotReceivedUpdatesForAWhile(Duration),
}

View File

@ -0,0 +1,4 @@
//! Various event handlers for the node.
pub mod cl;
pub mod node;

View File

@ -0,0 +1,535 @@
//! Support for handling events emitted by node components.
use crate::events::cl::ConsensusLayerHealthEvent;
use futures::Stream;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, ForkchoiceStatus};
use reth_db::{database::Database, database_metrics::DatabaseMetadata};
use reth_interfaces::consensus::ForkchoiceState;
use reth_network::{NetworkEvent, NetworkHandle};
use reth_network_api::PeersInfo;
use reth_primitives::{
constants,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
BlockNumber, B256,
};
use reth_prune::PrunerEvent;
use reth_stages::{ExecOutput, PipelineEvent};
use std::{
fmt::{Display, Formatter},
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant, SystemTime, UNIX_EPOCH},
};
use tokio::time::Interval;
use tracing::{info, warn};
/// Interval of reporting node state.
const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(25);
/// The current high-level state of the node.
struct NodeState<DB> {
/// Database environment.
/// Used for freelist calculation reported in the "Status" log message.
/// See [EventHandler::poll].
db: DB,
/// Connection to the network.
network: Option<NetworkHandle>,
/// The stage currently being executed.
current_stage: Option<CurrentStage>,
/// The latest block reached by either pipeline or consensus engine.
latest_block: Option<BlockNumber>,
/// The time of the latest block seen by the pipeline
latest_block_time: Option<u64>,
/// Hash of the head block last set by fork choice update
head_block_hash: Option<B256>,
/// Hash of the safe block last set by fork choice update
safe_block_hash: Option<B256>,
/// Hash of finalized block last set by fork choice update
finalized_block_hash: Option<B256>,
}
impl<DB> NodeState<DB> {
fn new(db: DB, network: Option<NetworkHandle>, latest_block: Option<BlockNumber>) -> Self {
Self {
db,
network,
current_stage: None,
latest_block,
latest_block_time: None,
head_block_hash: None,
safe_block_hash: None,
finalized_block_hash: None,
}
}
fn num_connected_peers(&self) -> usize {
self.network.as_ref().map(|net| net.num_connected_peers()).unwrap_or_default()
}
/// Processes an event emitted by the pipeline
fn handle_pipeline_event(&mut self, event: PipelineEvent) {
match event {
PipelineEvent::Run { pipeline_stages_progress, stage_id, checkpoint, target } => {
let checkpoint = checkpoint.unwrap_or_default();
let current_stage = CurrentStage {
stage_id,
eta: match &self.current_stage {
Some(current_stage) if current_stage.stage_id == stage_id => {
current_stage.eta
}
_ => Eta::default(),
},
checkpoint,
target,
};
if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
target = %OptionalField(target),
%stage_eta,
"Executing stage",
);
} else {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
target = %OptionalField(target),
"Executing stage",
);
}
self.current_stage = Some(current_stage);
}
PipelineEvent::Ran {
pipeline_stages_progress,
stage_id,
result: ExecOutput { checkpoint, done },
} => {
if stage_id.is_finish() {
self.latest_block = Some(checkpoint.block_number);
}
if let Some(current_stage) = self.current_stage.as_mut() {
current_stage.checkpoint = checkpoint;
current_stage.eta.update(checkpoint);
let target = OptionalField(current_stage.target);
let stage_progress = OptionalField(
checkpoint.entities().and_then(|entities| entities.fmt_percentage()),
);
let message =
if done { "Stage finished executing" } else { "Stage committed progress" };
if let Some(stage_eta) = current_stage.eta.fmt_for_stage(stage_id) {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
%target,
%stage_progress,
%stage_eta,
"{message}",
)
} else {
info!(
pipeline_stages = %pipeline_stages_progress,
stage = %stage_id,
checkpoint = %checkpoint.block_number,
%target,
%stage_progress,
"{message}",
)
}
}
if done {
self.current_stage = None;
}
}
_ => (),
}
}
fn handle_network_event(&mut self, _: NetworkEvent) {
// NOTE(onbjerg): This used to log established/disconnecting sessions, but this is already
// logged in the networking component. I kept this stub in case we want to catch other
// networking events later on.
}
fn handle_consensus_engine_event(&mut self, event: BeaconConsensusEngineEvent) {
match event {
BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status) => {
let ForkchoiceState { head_block_hash, safe_block_hash, finalized_block_hash } =
state;
if status != ForkchoiceStatus::Valid ||
(self.safe_block_hash != Some(safe_block_hash) &&
self.finalized_block_hash != Some(finalized_block_hash))
{
info!(
?head_block_hash,
?safe_block_hash,
?finalized_block_hash,
?status,
"Forkchoice updated"
);
}
self.head_block_hash = Some(head_block_hash);
self.safe_block_hash = Some(safe_block_hash);
self.finalized_block_hash = Some(finalized_block_hash);
}
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => {
info!(
number=block.number,
hash=?block.hash(),
peers=self.num_connected_peers(),
txs=block.body.len(),
mgas=%format!("{:.3}", block.header.gas_used as f64 / constants::MGAS_TO_GAS as f64),
full=%format!("{:.1}%", block.header.gas_used as f64 * 100.0 / block.header.gas_limit as f64),
base_fee=%format!("{:.2}gwei", block.header.base_fee_per_gas.unwrap_or(0) as f64 / constants::GWEI_TO_WEI as f64),
blobs=block.header.blob_gas_used.unwrap_or(0) / constants::eip4844::DATA_GAS_PER_BLOB,
excess_blobs=block.header.excess_blob_gas.unwrap_or(0) / constants::eip4844::DATA_GAS_PER_BLOB,
?elapsed,
"Block added to canonical chain"
);
}
BeaconConsensusEngineEvent::CanonicalChainCommitted(head, elapsed) => {
self.latest_block = Some(head.number);
self.latest_block_time = Some(head.timestamp);
info!(number=head.number, hash=?head.hash(), ?elapsed, "Canonical chain committed");
}
BeaconConsensusEngineEvent::ForkBlockAdded(block) => {
info!(number=block.number, hash=?block.hash(), "Block added to fork chain");
}
}
}
fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) {
// If pipeline is running, it's fine to not receive any messages from the CL.
// So we need to report about CL health only when pipeline is idle.
if self.current_stage.is_none() {
match event {
ConsensusLayerHealthEvent::NeverSeen => {
warn!("Post-merge network, but never seen beacon client. Please launch one to follow the chain!")
}
ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(period) => {
warn!(?period, "Post-merge network, but no beacon client seen for a while. Please launch one to follow the chain!")
}
ConsensusLayerHealthEvent::NeverReceivedUpdates => {
warn!("Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!")
}
ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => {
warn!(?period, "Beacon client online, but no consensus updates received for a while. Please fix your beacon client to follow the chain!")
}
}
}
}
fn handle_pruner_event(&self, event: PrunerEvent) {
match event {
PrunerEvent::Finished { tip_block_number, elapsed, stats } => {
info!(tip_block_number, ?elapsed, ?stats, "Pruner finished");
}
}
}
}
impl<DB: DatabaseMetadata> NodeState<DB> {
fn freelist(&self) -> Option<usize> {
self.db.metadata().freelist_size()
}
}
/// Helper type for formatting of optional fields:
/// - If [Some(x)], then `x` is written
/// - If [None], then `None` is written
struct OptionalField<T: Display>(Option<T>);
impl<T: Display> Display for OptionalField<T> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some(field) = &self.0 {
write!(f, "{field}")
} else {
write!(f, "None")
}
}
}
/// The stage currently being executed.
struct CurrentStage {
stage_id: StageId,
eta: Eta,
checkpoint: StageCheckpoint,
target: Option<BlockNumber>,
}
/// A node event.
#[derive(Debug)]
pub enum NodeEvent {
/// A network event.
Network(NetworkEvent),
/// A sync pipeline event.
Pipeline(PipelineEvent),
/// A consensus engine event.
ConsensusEngine(BeaconConsensusEngineEvent),
/// A Consensus Layer health event.
ConsensusLayerHealth(ConsensusLayerHealthEvent),
/// A pruner event
Pruner(PrunerEvent),
}
impl From<NetworkEvent> for NodeEvent {
fn from(event: NetworkEvent) -> NodeEvent {
NodeEvent::Network(event)
}
}
impl From<PipelineEvent> for NodeEvent {
fn from(event: PipelineEvent) -> NodeEvent {
NodeEvent::Pipeline(event)
}
}
impl From<BeaconConsensusEngineEvent> for NodeEvent {
fn from(event: BeaconConsensusEngineEvent) -> Self {
NodeEvent::ConsensusEngine(event)
}
}
impl From<ConsensusLayerHealthEvent> for NodeEvent {
fn from(event: ConsensusLayerHealthEvent) -> Self {
NodeEvent::ConsensusLayerHealth(event)
}
}
impl From<PrunerEvent> for NodeEvent {
fn from(event: PrunerEvent) -> Self {
NodeEvent::Pruner(event)
}
}
/// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node.
pub async fn handle_events<E, DB>(
network: Option<NetworkHandle>,
latest_block_number: Option<BlockNumber>,
events: E,
db: DB,
) where
E: Stream<Item = NodeEvent> + Unpin,
DB: DatabaseMetadata + Database + 'static,
{
let state = NodeState::new(db, network, latest_block_number);
let start = tokio::time::Instant::now() + Duration::from_secs(3);
let mut info_interval = tokio::time::interval_at(start, INFO_MESSAGE_INTERVAL);
info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let handler = EventHandler { state, events, info_interval };
handler.await
}
/// Handles events emitted by the node and logs them accordingly.
#[pin_project::pin_project]
struct EventHandler<E, DB> {
state: NodeState<DB>,
#[pin]
events: E,
#[pin]
info_interval: Interval,
}
impl<E, DB> Future for EventHandler<E, DB>
where
E: Stream<Item = NodeEvent> + Unpin,
DB: DatabaseMetadata + Database + 'static,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
while this.info_interval.poll_tick(cx).is_ready() {
let freelist = OptionalField(this.state.freelist());
if let Some(CurrentStage { stage_id, eta, checkpoint, target }) =
&this.state.current_stage
{
let stage_progress = OptionalField(
checkpoint.entities().and_then(|entities| entities.fmt_percentage()),
);
if let Some(stage_eta) = eta.fmt_for_stage(*stage_id) {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%freelist,
stage = %stage_id,
checkpoint = checkpoint.block_number,
target = %OptionalField(*target),
%stage_progress,
%stage_eta,
"Status"
);
} else {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%freelist,
stage = %stage_id,
checkpoint = checkpoint.block_number,
target = %OptionalField(*target),
%stage_progress,
"Status"
);
}
} else if let Some(latest_block) = this.state.latest_block {
let now =
SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
if now - this.state.latest_block_time.unwrap_or(0) > 60 {
// Once we start receiving consensus nodes, don't emit status unless stalled for
// 1 minute
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%freelist,
%latest_block,
"Status"
);
}
} else {
info!(
target: "reth::cli",
connected_peers = this.state.num_connected_peers(),
%freelist,
"Status"
);
}
}
while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {
match event {
NodeEvent::Network(event) => {
this.state.handle_network_event(event);
}
NodeEvent::Pipeline(event) => {
this.state.handle_pipeline_event(event);
}
NodeEvent::ConsensusEngine(event) => {
this.state.handle_consensus_engine_event(event);
}
NodeEvent::ConsensusLayerHealth(event) => {
this.state.handle_consensus_layer_health_event(event)
}
NodeEvent::Pruner(event) => {
this.state.handle_pruner_event(event);
}
}
}
Poll::Pending
}
}
/// A container calculating the estimated time that a stage will complete in, based on stage
/// checkpoints reported by the pipeline.
///
/// One `Eta` is only valid for a single stage.
#[derive(Default, Copy, Clone)]
struct Eta {
/// The last stage checkpoint
last_checkpoint: EntitiesCheckpoint,
/// The last time the stage reported its checkpoint
last_checkpoint_time: Option<Instant>,
/// The current ETA
eta: Option<Duration>,
}
impl Eta {
/// Update the ETA given the checkpoint, if possible.
fn update(&mut self, checkpoint: StageCheckpoint) {
let Some(current) = checkpoint.entities() else {
return;
};
if let Some(last_checkpoint_time) = &self.last_checkpoint_time {
let processed_since_last = current.processed - self.last_checkpoint.processed;
let elapsed = last_checkpoint_time.elapsed();
let per_second = processed_since_last as f64 / elapsed.as_secs_f64();
self.eta = Duration::try_from_secs_f64(
((current.total - current.processed) as f64) / per_second,
)
.ok();
}
self.last_checkpoint = current;
self.last_checkpoint_time = Some(Instant::now());
}
/// Returns `true` if the ETA is available, i.e. at least one checkpoint has been reported.
fn is_available(&self) -> bool {
self.eta.zip(self.last_checkpoint_time).is_some()
}
/// Format ETA for a given stage.
///
/// NOTE: Currently ETA is enabled only for the stages that have predictable progress.
/// It's not the case for network-dependent ([StageId::Headers] and [StageId::Bodies]) and
/// [StageId::Execution] stages.
fn fmt_for_stage(&self, stage: StageId) -> Option<String> {
if !self.is_available() ||
matches!(stage, StageId::Headers | StageId::Bodies | StageId::Execution)
{
None
} else {
Some(self.to_string())
}
}
}
impl Display for Eta {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Some((eta, last_checkpoint_time)) = self.eta.zip(self.last_checkpoint_time) {
let remaining = eta.checked_sub(last_checkpoint_time.elapsed());
if let Some(remaining) = remaining {
return write!(
f,
"{}",
humantime::format_duration(Duration::from_secs(remaining.as_secs()))
)
}
}
write!(f, "unknown")
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration, Instant};
#[test]
fn eta_display_no_milliseconds() {
let eta = Eta {
last_checkpoint_time: Some(Instant::now()),
eta: Some(Duration::from_millis(
13 * 60 * 1000 + // Minutes
37 * 1000 + // Seconds
999, // Milliseconds
)),
..Default::default()
}
.to_string();
assert_eq!(eta, "13m 37s");
}
}

View File

@ -10,6 +10,7 @@
pub mod args;
pub mod cli;
pub mod dirs;
pub mod events;
pub mod init;
pub mod metrics;
pub mod node_config;