fix(cli): event reporting (#2273)

This commit is contained in:
Matthias Seitz
2023-04-16 18:56:12 +02:00
committed by GitHub
parent d1162dbbb7
commit e27ed92d1e
9 changed files with 193 additions and 54 deletions

12
Cargo.lock generated
View File

@ -4464,6 +4464,7 @@ dependencies = [
"metrics-exporter-prometheus",
"metrics-util",
"num_cpus",
"pin-project",
"proptest",
"reth-auto-seal-consensus",
"reth-basic-payload-builder",
@ -6545,13 +6546,14 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.27.0"
version = "1.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001"
checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64"
dependencies = [
"autocfg",
"bytes",
"libc",
"memchr",
"mio",
"num_cpus",
"parking_lot 0.12.1",
@ -6564,13 +6566,13 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.0.0"
version = "1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce"
checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8"
dependencies = [
"proc-macro2 1.0.56",
"quote 1.0.26",
"syn 2.0.15",
"syn 1.0.109",
]
[[package]]

View File

@ -69,8 +69,9 @@ tui = "0.19.0"
human_bytes = "0.4.1"
# async
tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] }
tokio = { version = "1.21", features = ["sync", "macros", "time", "rt-multi-thread"] }
futures = "0.3.25"
pin-project = "1.0"
# http/rpc
hyper = "0.14.25"

View File

@ -1,11 +1,17 @@
//! Support for handling events emitted by node components.
use futures::{Stream, StreamExt};
use futures::Stream;
use reth_network::{NetworkEvent, NetworkHandle};
use reth_network_api::PeersInfo;
use reth_primitives::BlockNumber;
use reth_stages::{PipelineEvent, StageId};
use std::time::Duration;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::time::Interval;
use tracing::{info, warn};
/// The current high-level state of the node.
@ -28,7 +34,7 @@ impl NodeState {
}
/// Processes an event emitted by the pipeline
async fn handle_pipeline_event(&mut self, event: PipelineEvent) {
fn handle_pipeline_event(&mut self, event: PipelineEvent) {
match event {
PipelineEvent::Running { stage_id, stage_progress } => {
let notable = self.current_stage.is_none();
@ -53,7 +59,7 @@ impl NodeState {
}
}
async fn handle_network_event(&mut self, event: NetworkEvent) {
fn handle_network_event(&mut self, event: NetworkEvent) {
match event {
NetworkEvent::SessionEstablished { peer_id, status, .. } => {
info!(target: "reth::cli", connected_peers = self.num_connected_peers(), peer_id = %peer_id, best_block = %status.blockhash, "Peer connected");
@ -68,6 +74,7 @@ impl NodeState {
}
/// A node event.
#[derive(Debug)]
pub enum NodeEvent {
/// A network event.
Network(NetworkEvent),
@ -86,33 +93,60 @@ impl From<PipelineEvent> for NodeEvent {
NodeEvent::Pipeline(evt)
}
}
/// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node.
pub async fn handle_events(
network: Option<NetworkHandle>,
mut events: impl Stream<Item = NodeEvent> + Unpin,
events: impl Stream<Item = NodeEvent> + Unpin,
) {
let mut state = NodeState::new(network);
let state = NodeState::new(network);
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
Some(event) = events.next() => {
let mut info_interval = tokio::time::interval(Duration::from_secs(30));
info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let handler = EventHandler { state, events, info_interval };
handler.await
}
/// Handles events emitted by the node and logs them accordingly.
#[pin_project::pin_project]
struct EventHandler<St> {
state: NodeState,
#[pin]
events: St,
#[pin]
info_interval: Interval,
}
impl<St> Future for EventHandler<St>
where
St: Stream<Item = NodeEvent> + Unpin,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
while this.info_interval.poll_tick(cx).is_ready() {
let stage = this
.state
.current_stage
.map(|id| id.to_string())
.unwrap_or_else(|| "None".to_string());
info!(target: "reth::cli", connected_peers = this.state.num_connected_peers(), %stage, checkpoint = this.state.current_checkpoint, "Status");
}
while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) {
match event {
NodeEvent::Network(event) => {
state.handle_network_event(event).await;
},
this.state.handle_network_event(event);
}
NodeEvent::Pipeline(event) => {
state.handle_pipeline_event(event).await;
this.state.handle_pipeline_event(event);
}
}
},
_ = interval.tick() => {
let stage = state.current_stage.map(|id| id.to_string()).unwrap_or_else(|| "None".to_string());
info!(target: "reth::cli", connected_peers = state.num_connected_peers(), %stage, checkpoint = state.current_checkpoint, "Status");
}
}
Poll::Pending
}
}

View File

@ -350,7 +350,7 @@ impl Command {
// Run consensus engine to completion
let (rx, tx) = oneshot::channel();
info!(target: "reth::cli", "Starting consensus engine");
ctx.task_executor.spawn_critical_blocking("consensus engine", async move {
ctx.task_executor.spawn_critical("consensus engine", async move {
let res = beacon_consensus_engine.await;
let _ = rx.send(res);
});

View File

@ -281,7 +281,7 @@ where
PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash))
}
Err(error) => {
warn!(target: "consensus::engine", ?state, ?error, "Error canonicalizing the head hash");
warn!(target: "consensus::engine", ?error, ?state, "Error canonicalizing the head hash");
// If this is the first forkchoice received, start downloading from safe block
// hash.
let target = if is_first_forkchoice &&
@ -435,7 +435,7 @@ where
trace!(target: "consensus::engine", ?tip, "Starting the pipeline");
let (tx, rx) = oneshot::channel();
let db = self.db.clone();
self.task_spawner.spawn_critical(
self.task_spawner.spawn_critical_blocking(
"pipeline",
Box::pin(async move {
let result = pipeline.run_as_fut(db, tip).await;
@ -693,12 +693,40 @@ mod tests {
self.engine_handle.new_payload(payload).await
}
/// Sends the `ExecutionPayload` message to the consensus engine and retries if the engine
/// is syncing.
async fn send_new_payload_retry_on_syncing(
&self,
payload: ExecutionPayload,
) -> BeaconEngineResult<PayloadStatus> {
loop {
let result = self.send_new_payload(payload.clone()).await?;
if !result.is_syncing() {
return Ok(result)
}
}
}
async fn send_forkchoice_updated(
&self,
state: ForkchoiceState,
) -> BeaconEngineResult<ForkchoiceUpdated> {
self.engine_handle.fork_choice_updated(state, None).await
}
/// Sends the `ForkchoiceUpdated` message to the consensus engine and retries if the engine
/// is syncing.
async fn send_forkchoice_retry_on_syncing(
&self,
state: ForkchoiceState,
) -> BeaconEngineResult<ForkchoiceUpdated> {
loop {
let result = self.engine_handle.fork_choice_updated(state, None).await?;
if !result.is_syncing() {
return Ok(result)
}
}
}
}
fn setup_consensus_engine(
@ -954,13 +982,12 @@ mod tests {
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
assert_matches!(rx_invalid.await, Ok(result) => assert_eq!(result, expected_result));
let res = env.send_forkchoice_updated(forkchoice);
let result = env.send_forkchoice_retry_on_syncing(forkchoice).await.unwrap();
let expected_result = ForkchoiceUpdated::new(PayloadStatus::new(
PayloadStatusEnum::Valid,
Some(block1.hash),
));
assert_matches!(res.await, Ok(result) => assert_eq!(result, expected_result));
assert_eq!(result, expected_result);
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
}
@ -1003,10 +1030,10 @@ mod tests {
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
assert_matches!(invalid_rx.await, Ok(result) => assert_eq!(result, expected_result));
let valid_rx = env.send_forkchoice_updated(next_forkchoice_state);
let result = env.send_forkchoice_retry_on_syncing(next_forkchoice_state).await.unwrap();
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Valid)
.with_latest_valid_hash(next_head.hash);
assert_matches!(valid_rx.await, Ok(result) => assert_eq!(result, expected_result));
assert_eq!(result, expected_result);
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
}
@ -1079,18 +1106,20 @@ mod tests {
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing);
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
let res = env
.send_forkchoice_updated(ForkchoiceState {
let result = env
.send_forkchoice_retry_on_syncing(ForkchoiceState {
head_block_hash: block1.hash,
finalized_block_hash: block1.hash,
..Default::default()
})
.await;
.await
.unwrap();
let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid {
validation_error: ExecutorError::BlockPreMerge { hash: block1.hash }.to_string(),
})
.with_latest_valid_hash(H256::zero());
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
assert_eq!(result, expected_result);
}
}
@ -1167,11 +1196,11 @@ mod tests {
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
// Send new payload
let res = env.send_new_payload(block2.clone().into()).await;
let result =
env.send_new_payload_retry_on_syncing(block2.clone().into()).await.unwrap();
let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid)
.with_latest_valid_hash(block2.hash);
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
assert_eq!(result, expected_result);
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
}
@ -1261,12 +1290,14 @@ mod tests {
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
// Send new payload
let res = env.send_new_payload(block2.clone().into()).await;
let result =
env.send_new_payload_retry_on_syncing(block2.clone().into()).await.unwrap();
let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid {
validation_error: ExecutorError::BlockPreMerge { hash: block2.hash }.to_string(),
})
.with_latest_valid_hash(H256::zero());
assert_matches!(res, Ok(result) => assert_eq!(result, expected_result));
assert_eq!(result, expected_result);
assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty));
}

View File

@ -37,4 +37,19 @@ impl ForkchoiceUpdated {
self.payload_id = Some(id);
self
}
/// Returns true if the payload status is syncing.
pub fn is_syncing(&self) -> bool {
self.payload_status.is_syncing()
}
/// Returns true if the payload status is valid.
pub fn is_valid(&self) -> bool {
self.payload_status.is_valid()
}
/// Returns true if the payload status is invalid.
pub fn is_invalid(&self) -> bool {
self.payload_status.is_invalid()
}
}

View File

@ -268,6 +268,21 @@ impl PayloadStatus {
self.latest_valid_hash = Some(latest_valid_hash);
self
}
/// Returns true if the payload status is syncing.
pub fn is_syncing(&self) -> bool {
self.status.is_syncing()
}
/// Returns true if the payload status is valid.
pub fn is_valid(&self) -> bool {
self.status.is_valid()
}
/// Returns true if the payload status is invalid.
pub fn is_invalid(&self) -> bool {
self.status.is_invalid()
}
}
impl Serialize for PayloadStatus {
@ -348,6 +363,21 @@ impl PayloadStatusEnum {
_ => None,
}
}
/// Returns true if the payload status is syncing.
pub fn is_syncing(&self) -> bool {
matches!(self, PayloadStatusEnum::Syncing)
}
/// Returns true if the payload status is valid.
pub fn is_valid(&self) -> bool {
matches!(self, PayloadStatusEnum::Valid)
}
/// Returns true if the payload status is invalid.
pub fn is_invalid(&self) -> bool {
matches!(self, PayloadStatusEnum::Invalid { .. })
}
}
#[cfg(test)]

View File

@ -134,8 +134,11 @@ where
}
/// Set tip for reverse sync.
#[track_caller]
pub fn set_tip(&self, tip: H256) {
self.tip_tx.as_ref().expect("tip sender is set").send(tip).expect("tip channel closed");
let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| {
warn!(target: "sync::pipeline", "tip channel closed");
});
}
/// Listen for events on the pipeline.
@ -157,11 +160,11 @@ where
}
/// Consume the pipeline and run it. Return the pipeline and its result as a future.
#[track_caller]
pub fn run_as_fut(mut self, db: Arc<DB>, tip: H256) -> PipelineFut<DB, U> {
// TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for
// updating metrics.
self.register_metrics(db.clone());
Box::pin(async move {
self.set_tip(tip);
let result = self.run_loop(db).await;

View File

@ -76,6 +76,13 @@ pub trait TaskSpawner: Send + Sync + Unpin + std::fmt::Debug + DynClone {
/// Spawns a blocking task onto the runtime.
fn spawn_blocking(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>;
/// This spawns a critical blocking task onto the runtime.
fn spawn_critical_blocking(
&self,
name: &'static str,
fut: BoxFuture<'static, ()>,
) -> JoinHandle<()>;
}
dyn_clone::clone_trait_object!(TaskSpawner);
@ -97,6 +104,14 @@ impl TaskSpawner for TokioTaskExecutor {
fn spawn_blocking(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
tokio::task::spawn_blocking(move || tokio::runtime::Handle::current().block_on(fut))
}
fn spawn_critical_blocking(
&self,
_name: &'static str,
fut: BoxFuture<'static, ()>,
) -> JoinHandle<()> {
tokio::task::spawn_blocking(move || tokio::runtime::Handle::current().block_on(fut))
}
}
/// Many reth components require to spawn tasks for long-running jobs. For example `discovery`
@ -353,6 +368,14 @@ impl TaskSpawner for TaskExecutor {
fn spawn_blocking(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
self.spawn_blocking(fut)
}
fn spawn_critical_blocking(
&self,
name: &'static str,
fut: BoxFuture<'static, ()>,
) -> JoinHandle<()> {
TaskExecutor::spawn_critical_blocking(self, name, fut)
}
}
/// Determines how a task is spawned