diff --git a/bin/reth/src/node/cl_events.rs b/bin/reth/src/node/cl_events.rs new file mode 100644 index 000000000..22b78abd9 --- /dev/null +++ b/bin/reth/src/node/cl_events.rs @@ -0,0 +1,83 @@ +//! Events related to Consensus Layer health. + +use futures::Stream; +use reth_provider::CanonChainTracker; +use std::{ + pin::Pin, + task::{ready, Context, Poll}, + time::Duration, +}; +use tokio::time::Interval; + +/// Interval of checking Consensus Layer client health. +const CHECK_INTERVAL: Duration = Duration::from_secs(300); +/// Period of not exchanging transition configurations with Consensus Layer client, +/// after which the warning is issued. +const NO_TRANSITION_CONFIG_EXCHANGED_PERIOD: Duration = Duration::from_secs(120); +/// Period of not receiving fork choice updates from Consensus Layer client, +/// after which the warning is issued. +const NO_FORKCHOICE_UPDATE_RECEIVED_PERIOD: Duration = Duration::from_secs(120); + +/// A Stream of [ConsensusLayerHealthEvent]. +pub struct ConsensusLayerHealthEvents { + interval: Interval, + canon_chain: Box, +} + +impl ConsensusLayerHealthEvents { + /// Creates a new [ConsensusLayerHealthEvents] with the given canonical chain tracker. + pub fn new(canon_chain: Box) -> Self { + Self { interval: tokio::time::interval(CHECK_INTERVAL), canon_chain } + } +} + +impl Stream for ConsensusLayerHealthEvents { + type Item = ConsensusLayerHealthEvent; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + ready!(this.interval.poll_tick(cx)); + + return match ( + this.canon_chain.last_exchanged_transition_configuration_timestamp(), + this.canon_chain.last_received_update_timestamp(), + ) { + (None, _) => Poll::Ready(Some(ConsensusLayerHealthEvent::NeverSeen)), + (Some(transition_config), _) + if transition_config.elapsed() > NO_TRANSITION_CONFIG_EXCHANGED_PERIOD => + { + Poll::Ready(Some(ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile( + transition_config.elapsed(), + ))) + } + (Some(_), None) => { + Poll::Ready(Some(ConsensusLayerHealthEvent::NeverReceivedUpdates)) + } + (Some(_), Some(update)) + if update.elapsed() > NO_FORKCHOICE_UPDATE_RECEIVED_PERIOD => + { + Poll::Ready(Some(ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile( + update.elapsed(), + ))) + } + _ => continue, + } + } + } +} + +/// Event that is triggered when Consensus Layer health is degraded from the +/// Execution Layer point of view. +#[derive(Debug)] +pub enum ConsensusLayerHealthEvent { + /// Consensus Layer client was never seen. + NeverSeen, + /// Consensus Layer client has not been seen for a while. + HasNotBeenSeenForAWhile(Duration), + /// Updates from the Consensus Layer client were never received. + NeverReceivedUpdates, + /// Updates from the Consensus Layer client have not been received for a while. + HaveNotReceivedUpdatesForAWhile(Duration), +} diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index 2dca909b4..ac0b3039d 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -1,5 +1,6 @@ //! Support for handling events emitted by node components. +use crate::node::cl_events::ConsensusLayerHealthEvent; use futures::Stream; use reth_beacon_consensus::BeaconConsensusEngineEvent; use reth_network::{NetworkEvent, NetworkHandle}; @@ -13,11 +14,14 @@ use std::{ time::Duration, }; use tokio::time::Interval; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; + +/// Interval of reporting node state. +const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(30); /// The current high-level state of the node. struct NodeState { - /// Connection to the network + /// Connection to the network. network: Option, /// The stage currently being executed. current_stage: Option, @@ -109,6 +113,23 @@ impl NodeState { } } } + + fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) { + match event { + ConsensusLayerHealthEvent::NeverSeen => { + warn!(target: "reth::cli", "Post-merge network, but never seen beacon client. Please launch one to follow the chain!") + } + ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(period) => { + warn!(target: "reth::cli", ?period, "Post-merge network, but no beacon client seen for a while. Please launch one to follow the chain!") + } + ConsensusLayerHealthEvent::NeverReceivedUpdates => { + warn!(target: "reth::cli", "Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!") + } + ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => { + warn!(target: "reth::cli", ?period, "Beacon client online, but no consensus updates received for a while. Please fix your beacon client to follow the chain!") + } + } + } } /// A node event. @@ -120,6 +141,8 @@ pub enum NodeEvent { Pipeline(PipelineEvent), /// A consensus engine event. ConsensusEngine(BeaconConsensusEngineEvent), + /// A Consensus Layer health event. + ConsensusLayerHealth(ConsensusLayerHealthEvent), } impl From for NodeEvent { @@ -140,15 +163,21 @@ impl From for NodeEvent { } } +impl From for NodeEvent { + fn from(event: ConsensusLayerHealthEvent) -> Self { + NodeEvent::ConsensusLayerHealth(event) + } +} + /// Displays relevant information to the user from components of the node, and periodically /// displays the high-level status of the node. -pub async fn handle_events( - network: Option, - events: impl Stream + Unpin, -) { +pub async fn handle_events(network: Option, events: E) +where + E: Stream + Unpin, +{ let state = NodeState::new(network); - let mut info_interval = tokio::time::interval(Duration::from_secs(30)); + let mut info_interval = tokio::time::interval(INFO_MESSAGE_INTERVAL); info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); let handler = EventHandler { state, events, info_interval }; @@ -157,17 +186,17 @@ pub async fn handle_events( /// Handles events emitted by the node and logs them accordingly. #[pin_project::pin_project] -struct EventHandler { +struct EventHandler { state: NodeState, #[pin] - events: St, + events: E, #[pin] info_interval: Interval, } -impl Future for EventHandler +impl Future for EventHandler where - St: Stream + Unpin, + E: Stream + Unpin, { type Output = (); @@ -194,6 +223,9 @@ where NodeEvent::ConsensusEngine(event) => { this.state.handle_consensus_engine_event(event); } + NodeEvent::ConsensusLayerHealth(event) => { + this.state.handle_consensus_layer_health_event(event) + } } } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 63deba362..46d09f4bb 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -12,7 +12,7 @@ use crate::{ use clap::Parser; use eyre::Context; use fdlimit::raise_fd_limit; -use futures::{pin_mut, stream::select as stream_select, StreamExt}; +use futures::{future::Either, pin_mut, stream, stream_select, StreamExt}; use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus}; use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig}; use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine}; @@ -76,6 +76,7 @@ use crate::{ PayloadBuilderArgs, }, dirs::MaybePlatformPath, + node::cl_events::ConsensusLayerHealthEvents, }; use reth_interfaces::p2p::headers::client::HeadersClient; use reth_payload_builder::PayloadBuilderService; @@ -83,6 +84,7 @@ use reth_primitives::bytes::BytesMut; use reth_provider::providers::BlockchainProvider; use reth_rlp::Encodable; +pub mod cl_events; pub mod events; /// Start the node @@ -346,12 +348,18 @@ impl Command { )?; info!(target: "reth::cli", "Consensus engine initialized"); - let events = stream_select( - stream_select( - network.event_listener().map(Into::into), - beacon_engine_handle.event_listener().map(Into::into), - ), + let events = stream_select!( + network.event_listener().map(Into::into), + beacon_engine_handle.event_listener().map(Into::into), pipeline_events.map(Into::into), + if self.debug.tip.is_none() { + Either::Left( + ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone())) + .map(Into::into), + ) + } else { + Either::Right(stream::empty()) + } ); ctx.task_executor .spawn_critical("events task", events::handle_events(Some(network.clone()), events)); diff --git a/crates/consensus/beacon/src/engine/message.rs b/crates/consensus/beacon/src/engine/message.rs index 625de5c11..7223cb621 100644 --- a/crates/consensus/beacon/src/engine/message.rs +++ b/crates/consensus/beacon/src/engine/message.rs @@ -162,6 +162,8 @@ pub enum BeaconEngineMessage { /// The sender for returning forkchoice updated result. tx: oneshot::Sender>, }, + /// Message with exchanged transition configuration. + TransitionConfigurationExchanged, /// Add a new listener for [`BeaconEngineMessage`]. EventListener(UnboundedSender), } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 719df47a7..2003cb51c 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -83,7 +83,7 @@ impl BeaconConsensusEngineHandle { /// Sends a new payload message to the beacon consensus engine and waits for a response. /// - /// See also + /// See also pub async fn new_payload( &self, payload: ExecutionPayload, @@ -95,7 +95,7 @@ impl BeaconConsensusEngineHandle { /// Sends a forkchoice update message to the beacon consensus engine and waits for a response. /// - /// See also + /// See also pub async fn fork_choice_updated( &self, state: ForkchoiceState, @@ -124,6 +124,13 @@ impl BeaconConsensusEngineHandle { rx } + /// Sends a transition configuration exchagne message to the beacon consensus engine. + /// + /// See also + pub async fn transition_configuration_exchanged(&self) { + let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged); + } + /// Creates a new [`BeaconConsensusEngineEvent`] listener stream. pub fn event_listener(&self) -> UnboundedReceiverStream { let (tx, rx) = mpsc::unbounded_channel(); @@ -1177,6 +1184,9 @@ where let res = this.on_new_payload(payload); let _ = tx.send(res); } + BeaconEngineMessage::TransitionConfigurationExchanged => { + this.blockchain.on_transition_configuration_exchanged(); + } BeaconEngineMessage::EventListener(tx) => { this.listeners.push_listener(tx); } diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index f36552a5d..d809f6a31 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -48,7 +48,7 @@ where Self { client, chain_spec, beacon_consensus, payload_store } } - /// See also + /// See also /// Caution: This should not accept the `withdrawals` field pub async fn new_payload_v1( &self, @@ -62,7 +62,7 @@ where Ok(self.beacon_consensus.new_payload(payload).await?) } - /// See also + /// See also pub async fn new_payload_v2( &self, payload: ExecutionPayload, @@ -78,7 +78,7 @@ where /// Sends a message to the beacon consensus engine to update the fork choice _without_ /// withdrawals. /// - /// See also + /// See also /// /// Caution: This should not accept the `withdrawals` field pub async fn fork_choice_updated_v1( @@ -99,7 +99,7 @@ where /// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals, /// but only _after_ shanghai. /// - /// See also + /// See also pub async fn fork_choice_updated_v2( &self, state: ForkchoiceState, @@ -118,7 +118,7 @@ where /// Returns the most recent version of the payload that is available in the corresponding /// payload build process at the time of receiving this call. /// - /// See also + /// See also /// /// Caution: This should not return the `withdrawals` field /// @@ -136,7 +136,7 @@ where /// Returns the most recent version of the payload that is available in the corresponding /// payload build process at the time of receiving this call. /// - /// See also + /// See also /// /// Note: /// > Client software MAY stop the corresponding build process after serving this call. @@ -213,7 +213,7 @@ where /// Called to verify network configuration parameters and ensure that Consensus and Execution /// layers are using the latest configuration. - pub fn exchange_transition_configuration( + pub async fn exchange_transition_configuration( &self, config: TransitionConfiguration, ) -> EngineApiResult { @@ -251,6 +251,8 @@ where .block_hash(terminal_block_number.as_u64()) .map_err(|err| EngineApiError::Internal(Box::new(err)))?; + self.beacon_consensus.transition_configuration_exchanged().await; + // Transition configuration exchange is successful if block hashes match match local_hash { Some(hash) if hash == terminal_block_hash => Ok(TransitionConfiguration { @@ -305,7 +307,7 @@ where Client: HeaderProvider + BlockProvider + StateProviderFactory + EvmEnvProvider + 'static, { /// Handler for `engine_newPayloadV1` - /// See also + /// See also /// Caution: This should not accept the `withdrawals` field async fn new_payload_v1(&self, payload: ExecutionPayload) -> RpcResult { trace!(target: "rpc::eth", "Serving engine_newPayloadV1"); @@ -313,14 +315,14 @@ where } /// Handler for `engine_newPayloadV1` - /// See also + /// See also async fn new_payload_v2(&self, payload: ExecutionPayload) -> RpcResult { trace!(target: "rpc::eth", "Serving engine_newPayloadV1"); Ok(EngineApi::new_payload_v2(self, payload).await?) } /// Handler for `engine_forkchoiceUpdatedV1` - /// See also + /// See also /// /// Caution: This should not accept the `withdrawals` field async fn fork_choice_updated_v1( @@ -333,7 +335,7 @@ where } /// Handler for `engine_forkchoiceUpdatedV2` - /// See also + /// See also async fn fork_choice_updated_v2( &self, fork_choice_state: ForkchoiceState, @@ -348,7 +350,7 @@ where /// Returns the most recent version of the payload that is available in the corresponding /// payload build process at the time of receiving this call. /// - /// See also + /// See also /// /// Caution: This should not return the `withdrawals` field /// @@ -364,7 +366,7 @@ where /// Returns the most recent version of the payload that is available in the corresponding /// payload build process at the time of receiving this call. /// - /// See also + /// See also /// /// Note: /// > Client software MAY stop the corresponding build process after serving this call. @@ -406,13 +408,13 @@ where } /// Handler for `engine_exchangeTransitionConfigurationV1` - /// See also + /// See also async fn exchange_transition_configuration( &self, config: TransitionConfiguration, ) -> RpcResult { trace!(target: "rpc::eth", "Serving engine_getPayloadBodiesByHashV1"); - Ok(EngineApi::exchange_transition_configuration(self, config)?) + Ok(EngineApi::exchange_transition_configuration(self, config).await?) } /// Handler for `engine_exchangeCapabilitiesV1` @@ -576,7 +578,7 @@ mod tests { ..Default::default() }; - let res = api.exchange_transition_configuration(transition_config.clone()); + let res = api.exchange_transition_configuration(transition_config.clone()).await; assert_matches!( res, @@ -600,7 +602,7 @@ mod tests { }; // Unknown block number - let res = api.exchange_transition_configuration(transition_config.clone()); + let res = api.exchange_transition_configuration(transition_config.clone()).await; assert_matches!( res, @@ -614,7 +616,7 @@ mod tests { execution_terminal_block.clone().unseal(), ); - let res = api.exchange_transition_configuration(transition_config.clone()); + let res = api.exchange_transition_configuration(transition_config.clone()).await; assert_matches!( res, @@ -638,7 +640,8 @@ mod tests { handle.client.add_block(terminal_block.hash(), terminal_block.unseal()); - let config = api.exchange_transition_configuration(transition_config.clone()).unwrap(); + let config = + api.exchange_transition_configuration(transition_config.clone()).await.unwrap(); assert_eq!(config, transition_config); } } diff --git a/crates/storage/provider/src/providers/chain_info.rs b/crates/storage/provider/src/providers/chain_info.rs index 1dfe8ad6f..b15db2af4 100644 --- a/crates/storage/provider/src/providers/chain_info.rs +++ b/crates/storage/provider/src/providers/chain_info.rs @@ -20,6 +20,7 @@ impl ChainInfoTracker { Self { inner: Arc::new(ChainInfoInner { last_forkchoice_update: RwLock::new(None), + last_transition_configuration_exchange: RwLock::new(None), canonical_head_number: AtomicU64::new(head.number), canonical_head: RwLock::new(head), safe_block: RwLock::new(None), @@ -40,11 +41,20 @@ impl ChainInfoTracker { } /// Returns the instant when we received the latest forkchoice update. - #[allow(unused)] pub(crate) fn last_forkchoice_update_received_at(&self) -> Option { *self.inner.last_forkchoice_update.read() } + /// Update the timestamp when we exchanged a transition configuration. + pub(crate) fn on_transition_configuration_exchanged(&self) { + self.inner.last_transition_configuration_exchange.write().replace(Instant::now()); + } + + /// Returns the instant when we exchanged the transition configuration last time. + pub(crate) fn last_transition_configuration_exchanged_at(&self) -> Option { + *self.inner.last_transition_configuration_exchange.read() + } + /// Returns the canonical head of the chain. #[allow(unused)] pub(crate) fn get_canonical_head(&self) -> SealedHeader { @@ -115,6 +125,10 @@ struct ChainInfoInner { /// /// This is mainly used to track if we're connected to a beacon node. last_forkchoice_update: RwLock>, + /// Timestamp when we exchanged the transition configuration last time. + /// + /// This is mainly used to track if we're connected to a beacon node. + last_transition_configuration_exchange: RwLock>, /// Tracks the number of the `canonical_head`. canonical_head_number: AtomicU64, /// The canonical head of the chain. diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index a61d2554c..eb099a17b 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -523,6 +523,14 @@ where self.chain_info.last_forkchoice_update_received_at() } + fn on_transition_configuration_exchanged(&self) { + self.chain_info.on_transition_configuration_exchanged(); + } + + fn last_exchanged_transition_configuration_timestamp(&self) -> Option { + self.chain_info.last_transition_configuration_exchanged_at() + } + fn set_canonical_head(&self, header: SealedHeader) { self.chain_info.set_canonical_head(header); } diff --git a/crates/storage/provider/src/traits/chain_info.rs b/crates/storage/provider/src/traits/chain_info.rs index 587a08682..82d879df4 100644 --- a/crates/storage/provider/src/traits/chain_info.rs +++ b/crates/storage/provider/src/traits/chain_info.rs @@ -11,6 +11,13 @@ pub trait CanonChainTracker: Send + Sync { /// ([CanonChainTracker::on_forkchoice_update_received]) fn last_received_update_timestamp(&self) -> Option; + /// Notify the tracker about a transition configuration exchange. + fn on_transition_configuration_exchanged(&self); + + /// Returns the last time a transition configuration was exchanged with the CL + /// ([CanonChainTracker::on_transition_configuration_exchanged]) + fn last_exchanged_transition_configuration_timestamp(&self) -> Option; + /// Sets the canonical head of the chain. fn set_canonical_head(&self, header: SealedHeader);