chore(consensus): emit warnings if no CL found (#2961)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Alexey Shekhirin
2023-06-05 20:48:40 +04:00
committed by GitHub
parent b72c3aed90
commit e1148c81a7
9 changed files with 206 additions and 39 deletions

View File

@ -0,0 +1,83 @@
//! Events related to Consensus Layer health.
use futures::Stream;
use reth_provider::CanonChainTracker;
use std::{
pin::Pin,
task::{ready, Context, Poll},
time::Duration,
};
use tokio::time::Interval;
/// Interval of checking Consensus Layer client health.
const CHECK_INTERVAL: Duration = Duration::from_secs(300);
/// Period of not exchanging transition configurations with Consensus Layer client,
/// after which the warning is issued.
const NO_TRANSITION_CONFIG_EXCHANGED_PERIOD: Duration = Duration::from_secs(120);
/// Period of not receiving fork choice updates from Consensus Layer client,
/// after which the warning is issued.
const NO_FORKCHOICE_UPDATE_RECEIVED_PERIOD: Duration = Duration::from_secs(120);
/// A Stream of [ConsensusLayerHealthEvent].
pub struct ConsensusLayerHealthEvents {
interval: Interval,
canon_chain: Box<dyn CanonChainTracker>,
}
impl ConsensusLayerHealthEvents {
/// Creates a new [ConsensusLayerHealthEvents] with the given canonical chain tracker.
pub fn new(canon_chain: Box<dyn CanonChainTracker>) -> Self {
Self { interval: tokio::time::interval(CHECK_INTERVAL), canon_chain }
}
}
impl Stream for ConsensusLayerHealthEvents {
type Item = ConsensusLayerHealthEvent;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
ready!(this.interval.poll_tick(cx));
return match (
this.canon_chain.last_exchanged_transition_configuration_timestamp(),
this.canon_chain.last_received_update_timestamp(),
) {
(None, _) => Poll::Ready(Some(ConsensusLayerHealthEvent::NeverSeen)),
(Some(transition_config), _)
if transition_config.elapsed() > NO_TRANSITION_CONFIG_EXCHANGED_PERIOD =>
{
Poll::Ready(Some(ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(
transition_config.elapsed(),
)))
}
(Some(_), None) => {
Poll::Ready(Some(ConsensusLayerHealthEvent::NeverReceivedUpdates))
}
(Some(_), Some(update))
if update.elapsed() > NO_FORKCHOICE_UPDATE_RECEIVED_PERIOD =>
{
Poll::Ready(Some(ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(
update.elapsed(),
)))
}
_ => continue,
}
}
}
}
/// Event that is triggered when Consensus Layer health is degraded from the
/// Execution Layer point of view.
#[derive(Debug)]
pub enum ConsensusLayerHealthEvent {
/// Consensus Layer client was never seen.
NeverSeen,
/// Consensus Layer client has not been seen for a while.
HasNotBeenSeenForAWhile(Duration),
/// Updates from the Consensus Layer client were never received.
NeverReceivedUpdates,
/// Updates from the Consensus Layer client have not been received for a while.
HaveNotReceivedUpdatesForAWhile(Duration),
}

View File

@ -1,5 +1,6 @@
//! Support for handling events emitted by node components.
use crate::node::cl_events::ConsensusLayerHealthEvent;
use futures::Stream;
use reth_beacon_consensus::BeaconConsensusEngineEvent;
use reth_network::{NetworkEvent, NetworkHandle};
@ -13,11 +14,14 @@ use std::{
time::Duration,
};
use tokio::time::Interval;
use tracing::{debug, info};
use tracing::{debug, info, warn};
/// Interval of reporting node state.
const INFO_MESSAGE_INTERVAL: Duration = Duration::from_secs(30);
/// The current high-level state of the node.
struct NodeState {
/// Connection to the network
/// Connection to the network.
network: Option<NetworkHandle>,
/// The stage currently being executed.
current_stage: Option<StageId>,
@ -109,6 +113,23 @@ impl NodeState {
}
}
}
fn handle_consensus_layer_health_event(&self, event: ConsensusLayerHealthEvent) {
match event {
ConsensusLayerHealthEvent::NeverSeen => {
warn!(target: "reth::cli", "Post-merge network, but never seen beacon client. Please launch one to follow the chain!")
}
ConsensusLayerHealthEvent::HasNotBeenSeenForAWhile(period) => {
warn!(target: "reth::cli", ?period, "Post-merge network, but no beacon client seen for a while. Please launch one to follow the chain!")
}
ConsensusLayerHealthEvent::NeverReceivedUpdates => {
warn!(target: "reth::cli", "Beacon client online, but never received consensus updates. Please ensure your beacon client is operational to follow the chain!")
}
ConsensusLayerHealthEvent::HaveNotReceivedUpdatesForAWhile(period) => {
warn!(target: "reth::cli", ?period, "Beacon client online, but no consensus updates received for a while. Please fix your beacon client to follow the chain!")
}
}
}
}
/// A node event.
@ -120,6 +141,8 @@ pub enum NodeEvent {
Pipeline(PipelineEvent),
/// A consensus engine event.
ConsensusEngine(BeaconConsensusEngineEvent),
/// A Consensus Layer health event.
ConsensusLayerHealth(ConsensusLayerHealthEvent),
}
impl From<NetworkEvent> for NodeEvent {
@ -140,15 +163,21 @@ impl From<BeaconConsensusEngineEvent> for NodeEvent {
}
}
impl From<ConsensusLayerHealthEvent> for NodeEvent {
fn from(event: ConsensusLayerHealthEvent) -> Self {
NodeEvent::ConsensusLayerHealth(event)
}
}
/// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node.
pub async fn handle_events(
network: Option<NetworkHandle>,
events: impl Stream<Item = NodeEvent> + Unpin,
) {
pub async fn handle_events<E>(network: Option<NetworkHandle>, events: E)
where
E: Stream<Item = NodeEvent> + Unpin,
{
let state = NodeState::new(network);
let mut info_interval = tokio::time::interval(Duration::from_secs(30));
let mut info_interval = tokio::time::interval(INFO_MESSAGE_INTERVAL);
info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let handler = EventHandler { state, events, info_interval };
@ -157,17 +186,17 @@ pub async fn handle_events(
/// Handles events emitted by the node and logs them accordingly.
#[pin_project::pin_project]
struct EventHandler<St> {
struct EventHandler<E> {
state: NodeState,
#[pin]
events: St,
events: E,
#[pin]
info_interval: Interval,
}
impl<St> Future for EventHandler<St>
impl<E> Future for EventHandler<E>
where
St: Stream<Item = NodeEvent> + Unpin,
E: Stream<Item = NodeEvent> + Unpin,
{
type Output = ();
@ -194,6 +223,9 @@ where
NodeEvent::ConsensusEngine(event) => {
this.state.handle_consensus_engine_event(event);
}
NodeEvent::ConsensusLayerHealth(event) => {
this.state.handle_consensus_layer_health_event(event)
}
}
}

View File

@ -12,7 +12,7 @@ use crate::{
use clap::Parser;
use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{pin_mut, stream::select as stream_select, StreamExt};
use futures::{future::Either, pin_mut, stream, stream_select, StreamExt};
use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus};
use reth_basic_payload_builder::{BasicPayloadJobGenerator, BasicPayloadJobGeneratorConfig};
use reth_beacon_consensus::{BeaconConsensus, BeaconConsensusEngine};
@ -76,6 +76,7 @@ use crate::{
PayloadBuilderArgs,
},
dirs::MaybePlatformPath,
node::cl_events::ConsensusLayerHealthEvents,
};
use reth_interfaces::p2p::headers::client::HeadersClient;
use reth_payload_builder::PayloadBuilderService;
@ -83,6 +84,7 @@ use reth_primitives::bytes::BytesMut;
use reth_provider::providers::BlockchainProvider;
use reth_rlp::Encodable;
pub mod cl_events;
pub mod events;
/// Start the node
@ -346,12 +348,18 @@ impl Command {
)?;
info!(target: "reth::cli", "Consensus engine initialized");
let events = stream_select(
stream_select(
network.event_listener().map(Into::into),
beacon_engine_handle.event_listener().map(Into::into),
),
let events = stream_select!(
network.event_listener().map(Into::into),
beacon_engine_handle.event_listener().map(Into::into),
pipeline_events.map(Into::into),
if self.debug.tip.is_none() {
Either::Left(
ConsensusLayerHealthEvents::new(Box::new(blockchain_db.clone()))
.map(Into::into),
)
} else {
Either::Right(stream::empty())
}
);
ctx.task_executor
.spawn_critical("events task", events::handle_events(Some(network.clone()), events));

View File

@ -162,6 +162,8 @@ pub enum BeaconEngineMessage {
/// The sender for returning forkchoice updated result.
tx: oneshot::Sender<Result<OnForkChoiceUpdated, reth_interfaces::Error>>,
},
/// Message with exchanged transition configuration.
TransitionConfigurationExchanged,
/// Add a new listener for [`BeaconEngineMessage`].
EventListener(UnboundedSender<BeaconConsensusEngineEvent>),
}

View File

@ -83,7 +83,7 @@ impl BeaconConsensusEngineHandle {
/// Sends a new payload message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv2>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
pub async fn new_payload(
&self,
payload: ExecutionPayload,
@ -95,7 +95,7 @@ impl BeaconConsensusEngineHandle {
/// Sends a forkchoice update message to the beacon consensus engine and waits for a response.
///
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#engine_forkchoiceupdatedv2>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
pub async fn fork_choice_updated(
&self,
state: ForkchoiceState,
@ -124,6 +124,13 @@ impl BeaconConsensusEngineHandle {
rx
}
/// Sends a transition configuration exchagne message to the beacon consensus engine.
///
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_exchangetransitionconfigurationv1>
pub async fn transition_configuration_exchanged(&self) {
let _ = self.to_engine.send(BeaconEngineMessage::TransitionConfigurationExchanged);
}
/// Creates a new [`BeaconConsensusEngineEvent`] listener stream.
pub fn event_listener(&self) -> UnboundedReceiverStream<BeaconConsensusEngineEvent> {
let (tx, rx) = mpsc::unbounded_channel();
@ -1177,6 +1184,9 @@ where
let res = this.on_new_payload(payload);
let _ = tx.send(res);
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
this.blockchain.on_transition_configuration_exchanged();
}
BeaconEngineMessage::EventListener(tx) => {
this.listeners.push_listener(tx);
}

View File

@ -48,7 +48,7 @@ where
Self { client, chain_spec, beacon_consensus, payload_store }
}
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv1>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
/// Caution: This should not accept the `withdrawals` field
pub async fn new_payload_v1(
&self,
@ -62,7 +62,7 @@ where
Ok(self.beacon_consensus.new_payload(payload).await?)
}
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv1>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
pub async fn new_payload_v2(
&self,
payload: ExecutionPayload,
@ -78,7 +78,7 @@ where
/// Sends a message to the beacon consensus engine to update the fork choice _without_
/// withdrawals.
///
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_forkchoiceUpdatedV1>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceUpdatedV1>
///
/// Caution: This should not accept the `withdrawals` field
pub async fn fork_choice_updated_v1(
@ -99,7 +99,7 @@ where
/// Sends a message to the beacon consensus engine to update the fork choice _with_ withdrawals,
/// but only _after_ shanghai.
///
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#engine_forkchoiceupdatedv2>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
pub async fn fork_choice_updated_v2(
&self,
state: ForkchoiceState,
@ -118,7 +118,7 @@ where
/// Returns the most recent version of the payload that is available in the corresponding
/// payload build process at the time of receiving this call.
///
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_getPayloadV1>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
///
/// Caution: This should not return the `withdrawals` field
///
@ -136,7 +136,7 @@ where
/// Returns the most recent version of the payload that is available in the corresponding
/// payload build process at the time of receiving this call.
///
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#engine_getpayloadv2>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
///
/// Note:
/// > Client software MAY stop the corresponding build process after serving this call.
@ -213,7 +213,7 @@ where
/// Called to verify network configuration parameters and ensure that Consensus and Execution
/// layers are using the latest configuration.
pub fn exchange_transition_configuration(
pub async fn exchange_transition_configuration(
&self,
config: TransitionConfiguration,
) -> EngineApiResult<TransitionConfiguration> {
@ -251,6 +251,8 @@ where
.block_hash(terminal_block_number.as_u64())
.map_err(|err| EngineApiError::Internal(Box::new(err)))?;
self.beacon_consensus.transition_configuration_exchanged().await;
// Transition configuration exchange is successful if block hashes match
match local_hash {
Some(hash) if hash == terminal_block_hash => Ok(TransitionConfiguration {
@ -305,7 +307,7 @@ where
Client: HeaderProvider + BlockProvider + StateProviderFactory + EvmEnvProvider + 'static,
{
/// Handler for `engine_newPayloadV1`
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv1>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_newpayloadv1>
/// Caution: This should not accept the `withdrawals` field
async fn new_payload_v1(&self, payload: ExecutionPayload) -> RpcResult<PayloadStatus> {
trace!(target: "rpc::eth", "Serving engine_newPayloadV1");
@ -313,14 +315,14 @@ where
}
/// Handler for `engine_newPayloadV1`
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_newpayloadv1>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_newpayloadv2>
async fn new_payload_v2(&self, payload: ExecutionPayload) -> RpcResult<PayloadStatus> {
trace!(target: "rpc::eth", "Serving engine_newPayloadV1");
Ok(EngineApi::new_payload_v2(self, payload).await?)
}
/// Handler for `engine_forkchoiceUpdatedV1`
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_forkchoiceUpdatedV1>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_forkchoiceupdatedv1>
///
/// Caution: This should not accept the `withdrawals` field
async fn fork_choice_updated_v1(
@ -333,7 +335,7 @@ where
}
/// Handler for `engine_forkchoiceUpdatedV2`
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#engine_forkchoiceupdatedv2>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_forkchoiceupdatedv2>
async fn fork_choice_updated_v2(
&self,
fork_choice_state: ForkchoiceState,
@ -348,7 +350,7 @@ where
/// Returns the most recent version of the payload that is available in the corresponding
/// payload build process at the time of receiving this call.
///
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_getPayloadV1>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_getPayloadV1>
///
/// Caution: This should not return the `withdrawals` field
///
@ -364,7 +366,7 @@ where
/// Returns the most recent version of the payload that is available in the corresponding
/// payload build process at the time of receiving this call.
///
/// See also <https://github.com/ethereum/execution-apis/blob/main/src/engine/specification.md#engine_getpayloadv2>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/shanghai.md#engine_getpayloadv2>
///
/// Note:
/// > Client software MAY stop the corresponding build process after serving this call.
@ -406,13 +408,13 @@ where
}
/// Handler for `engine_exchangeTransitionConfigurationV1`
/// See also <https://github.com/ethereum/execution-apis/blob/8db51dcd2f4bdfbd9ad6e4a7560aac97010ad063/src/engine/specification.md#engine_exchangeTransitionConfigurationV1>
/// See also <https://github.com/ethereum/execution-apis/blob/3d627c95a4d3510a8187dd02e0250ecb4331d27e/src/engine/paris.md#engine_exchangeTransitionConfigurationV1>
async fn exchange_transition_configuration(
&self,
config: TransitionConfiguration,
) -> RpcResult<TransitionConfiguration> {
trace!(target: "rpc::eth", "Serving engine_getPayloadBodiesByHashV1");
Ok(EngineApi::exchange_transition_configuration(self, config)?)
Ok(EngineApi::exchange_transition_configuration(self, config).await?)
}
/// Handler for `engine_exchangeCapabilitiesV1`
@ -576,7 +578,7 @@ mod tests {
..Default::default()
};
let res = api.exchange_transition_configuration(transition_config.clone());
let res = api.exchange_transition_configuration(transition_config.clone()).await;
assert_matches!(
res,
@ -600,7 +602,7 @@ mod tests {
};
// Unknown block number
let res = api.exchange_transition_configuration(transition_config.clone());
let res = api.exchange_transition_configuration(transition_config.clone()).await;
assert_matches!(
res,
@ -614,7 +616,7 @@ mod tests {
execution_terminal_block.clone().unseal(),
);
let res = api.exchange_transition_configuration(transition_config.clone());
let res = api.exchange_transition_configuration(transition_config.clone()).await;
assert_matches!(
res,
@ -638,7 +640,8 @@ mod tests {
handle.client.add_block(terminal_block.hash(), terminal_block.unseal());
let config = api.exchange_transition_configuration(transition_config.clone()).unwrap();
let config =
api.exchange_transition_configuration(transition_config.clone()).await.unwrap();
assert_eq!(config, transition_config);
}
}

View File

@ -20,6 +20,7 @@ impl ChainInfoTracker {
Self {
inner: Arc::new(ChainInfoInner {
last_forkchoice_update: RwLock::new(None),
last_transition_configuration_exchange: RwLock::new(None),
canonical_head_number: AtomicU64::new(head.number),
canonical_head: RwLock::new(head),
safe_block: RwLock::new(None),
@ -40,11 +41,20 @@ impl ChainInfoTracker {
}
/// Returns the instant when we received the latest forkchoice update.
#[allow(unused)]
pub(crate) fn last_forkchoice_update_received_at(&self) -> Option<Instant> {
*self.inner.last_forkchoice_update.read()
}
/// Update the timestamp when we exchanged a transition configuration.
pub(crate) fn on_transition_configuration_exchanged(&self) {
self.inner.last_transition_configuration_exchange.write().replace(Instant::now());
}
/// Returns the instant when we exchanged the transition configuration last time.
pub(crate) fn last_transition_configuration_exchanged_at(&self) -> Option<Instant> {
*self.inner.last_transition_configuration_exchange.read()
}
/// Returns the canonical head of the chain.
#[allow(unused)]
pub(crate) fn get_canonical_head(&self) -> SealedHeader {
@ -115,6 +125,10 @@ struct ChainInfoInner {
///
/// This is mainly used to track if we're connected to a beacon node.
last_forkchoice_update: RwLock<Option<Instant>>,
/// Timestamp when we exchanged the transition configuration last time.
///
/// This is mainly used to track if we're connected to a beacon node.
last_transition_configuration_exchange: RwLock<Option<Instant>>,
/// Tracks the number of the `canonical_head`.
canonical_head_number: AtomicU64,
/// The canonical head of the chain.

View File

@ -523,6 +523,14 @@ where
self.chain_info.last_forkchoice_update_received_at()
}
fn on_transition_configuration_exchanged(&self) {
self.chain_info.on_transition_configuration_exchanged();
}
fn last_exchanged_transition_configuration_timestamp(&self) -> Option<Instant> {
self.chain_info.last_transition_configuration_exchanged_at()
}
fn set_canonical_head(&self, header: SealedHeader) {
self.chain_info.set_canonical_head(header);
}

View File

@ -11,6 +11,13 @@ pub trait CanonChainTracker: Send + Sync {
/// ([CanonChainTracker::on_forkchoice_update_received])
fn last_received_update_timestamp(&self) -> Option<Instant>;
/// Notify the tracker about a transition configuration exchange.
fn on_transition_configuration_exchanged(&self);
/// Returns the last time a transition configuration was exchanged with the CL
/// ([CanonChainTracker::on_transition_configuration_exchanged])
fn last_exchanged_transition_configuration_timestamp(&self) -> Option<Instant>;
/// Sets the canonical head of the chain.
fn set_canonical_head(&self, header: SealedHeader);