diff --git a/Cargo.lock b/Cargo.lock index 8cfa0b4dc..9403be2a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4464,6 +4464,7 @@ dependencies = [ "metrics-exporter-prometheus", "metrics-util", "num_cpus", + "pin-project", "proptest", "reth-auto-seal-consensus", "reth-basic-payload-builder", @@ -6545,13 +6546,14 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.27.0" +version = "1.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d0de47a4eecbe11f498978a9b29d792f0d2692d1dd003650c24c76510e3bc001" +checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" dependencies = [ "autocfg", "bytes", "libc", + "memchr", "mio", "num_cpus", "parking_lot 0.12.1", @@ -6564,13 +6566,13 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.0.0" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61a573bdc87985e9d6ddeed1b3d864e8a302c847e40d647746df2f1de209d1ce" +checksum = "d266c00fde287f55d3f1c3e96c500c362a2b8c695076ec180f27918820bc6df8" dependencies = [ "proc-macro2 1.0.56", "quote 1.0.26", - "syn 2.0.15", + "syn 1.0.109", ] [[package]] diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index f007b8525..51d5c4978 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -69,8 +69,9 @@ tui = "0.19.0" human_bytes = "0.4.1" # async -tokio = { version = "1.21", features = ["sync", "macros", "rt-multi-thread"] } +tokio = { version = "1.21", features = ["sync", "macros", "time", "rt-multi-thread"] } futures = "0.3.25" +pin-project = "1.0" # http/rpc hyper = "0.14.25" @@ -83,4 +84,4 @@ num_cpus = "1.13.0" tempfile = { version = "3.3.0" } backon = "0.4" hex = "0.4" -thiserror = "1.0" \ No newline at end of file +thiserror = "1.0" diff --git a/bin/reth/src/node/events.rs b/bin/reth/src/node/events.rs index d74ecea5b..b4ef7a84e 100644 --- a/bin/reth/src/node/events.rs +++ b/bin/reth/src/node/events.rs @@ -1,11 +1,17 @@ //! Support for handling events emitted by node components. -use futures::{Stream, StreamExt}; +use futures::Stream; use reth_network::{NetworkEvent, NetworkHandle}; use reth_network_api::PeersInfo; use reth_primitives::BlockNumber; use reth_stages::{PipelineEvent, StageId}; -use std::time::Duration; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; +use tokio::time::Interval; use tracing::{info, warn}; /// The current high-level state of the node. @@ -28,7 +34,7 @@ impl NodeState { } /// Processes an event emitted by the pipeline - async fn handle_pipeline_event(&mut self, event: PipelineEvent) { + fn handle_pipeline_event(&mut self, event: PipelineEvent) { match event { PipelineEvent::Running { stage_id, stage_progress } => { let notable = self.current_stage.is_none(); @@ -53,7 +59,7 @@ impl NodeState { } } - async fn handle_network_event(&mut self, event: NetworkEvent) { + fn handle_network_event(&mut self, event: NetworkEvent) { match event { NetworkEvent::SessionEstablished { peer_id, status, .. } => { info!(target: "reth::cli", connected_peers = self.num_connected_peers(), peer_id = %peer_id, best_block = %status.blockhash, "Peer connected"); @@ -68,6 +74,7 @@ impl NodeState { } /// A node event. +#[derive(Debug)] pub enum NodeEvent { /// A network event. Network(NetworkEvent), @@ -86,33 +93,60 @@ impl From for NodeEvent { NodeEvent::Pipeline(evt) } } - /// Displays relevant information to the user from components of the node, and periodically /// displays the high-level status of the node. pub async fn handle_events( network: Option, - mut events: impl Stream + Unpin, + events: impl Stream + Unpin, ) { - let mut state = NodeState::new(network); + let state = NodeState::new(network); - let mut interval = tokio::time::interval(Duration::from_secs(30)); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - loop { - tokio::select! { - Some(event) = events.next() => { - match event { - NodeEvent::Network(event) => { - state.handle_network_event(event).await; - }, - NodeEvent::Pipeline(event) => { - state.handle_pipeline_event(event).await; - } + let mut info_interval = tokio::time::interval(Duration::from_secs(30)); + info_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + + let handler = EventHandler { state, events, info_interval }; + handler.await +} + +/// Handles events emitted by the node and logs them accordingly. +#[pin_project::pin_project] +struct EventHandler { + state: NodeState, + #[pin] + events: St, + #[pin] + info_interval: Interval, +} + +impl Future for EventHandler +where + St: Stream + Unpin, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + while this.info_interval.poll_tick(cx).is_ready() { + let stage = this + .state + .current_stage + .map(|id| id.to_string()) + .unwrap_or_else(|| "None".to_string()); + info!(target: "reth::cli", connected_peers = this.state.num_connected_peers(), %stage, checkpoint = this.state.current_checkpoint, "Status"); + } + + while let Poll::Ready(Some(event)) = this.events.as_mut().poll_next(cx) { + match event { + NodeEvent::Network(event) => { + this.state.handle_network_event(event); + } + NodeEvent::Pipeline(event) => { + this.state.handle_pipeline_event(event); } - }, - _ = interval.tick() => { - let stage = state.current_stage.map(|id| id.to_string()).unwrap_or_else(|| "None".to_string()); - info!(target: "reth::cli", connected_peers = state.num_connected_peers(), %stage, checkpoint = state.current_checkpoint, "Status"); } } + + Poll::Pending } } diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index c504a9be7..969675148 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -100,11 +100,11 @@ pub struct Command { /// - goerli /// - sepolia #[arg( - long, - value_name = "CHAIN_OR_PATH", - verbatim_doc_comment, - default_value = "mainnet", - value_parser = genesis_value_parser + long, + value_name = "CHAIN_OR_PATH", + verbatim_doc_comment, + default_value = "mainnet", + value_parser = genesis_value_parser )] chain: Arc, @@ -350,7 +350,7 @@ impl Command { // Run consensus engine to completion let (rx, tx) = oneshot::channel(); info!(target: "reth::cli", "Starting consensus engine"); - ctx.task_executor.spawn_critical_blocking("consensus engine", async move { + ctx.task_executor.spawn_critical("consensus engine", async move { let res = beacon_consensus_engine.await; let _ = rx.send(res); }); diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 41e740801..92f4ccb92 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -281,7 +281,7 @@ where PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash)) } Err(error) => { - warn!(target: "consensus::engine", ?state, ?error, "Error canonicalizing the head hash"); + warn!(target: "consensus::engine", ?error, ?state, "Error canonicalizing the head hash"); // If this is the first forkchoice received, start downloading from safe block // hash. let target = if is_first_forkchoice && @@ -435,7 +435,7 @@ where trace!(target: "consensus::engine", ?tip, "Starting the pipeline"); let (tx, rx) = oneshot::channel(); let db = self.db.clone(); - self.task_spawner.spawn_critical( + self.task_spawner.spawn_critical_blocking( "pipeline", Box::pin(async move { let result = pipeline.run_as_fut(db, tip).await; @@ -693,12 +693,40 @@ mod tests { self.engine_handle.new_payload(payload).await } + /// Sends the `ExecutionPayload` message to the consensus engine and retries if the engine + /// is syncing. + async fn send_new_payload_retry_on_syncing( + &self, + payload: ExecutionPayload, + ) -> BeaconEngineResult { + loop { + let result = self.send_new_payload(payload.clone()).await?; + if !result.is_syncing() { + return Ok(result) + } + } + } + async fn send_forkchoice_updated( &self, state: ForkchoiceState, ) -> BeaconEngineResult { self.engine_handle.fork_choice_updated(state, None).await } + + /// Sends the `ForkchoiceUpdated` message to the consensus engine and retries if the engine + /// is syncing. + async fn send_forkchoice_retry_on_syncing( + &self, + state: ForkchoiceState, + ) -> BeaconEngineResult { + loop { + let result = self.engine_handle.fork_choice_updated(state, None).await?; + if !result.is_syncing() { + return Ok(result) + } + } + } } fn setup_consensus_engine( @@ -954,13 +982,12 @@ mod tests { let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); assert_matches!(rx_invalid.await, Ok(result) => assert_eq!(result, expected_result)); - let res = env.send_forkchoice_updated(forkchoice); + let result = env.send_forkchoice_retry_on_syncing(forkchoice).await.unwrap(); let expected_result = ForkchoiceUpdated::new(PayloadStatus::new( PayloadStatusEnum::Valid, Some(block1.hash), )); - assert_matches!(res.await, Ok(result) => assert_eq!(result, expected_result)); - + assert_eq!(result, expected_result); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } @@ -1003,10 +1030,10 @@ mod tests { let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); assert_matches!(invalid_rx.await, Ok(result) => assert_eq!(result, expected_result)); - let valid_rx = env.send_forkchoice_updated(next_forkchoice_state); + let result = env.send_forkchoice_retry_on_syncing(next_forkchoice_state).await.unwrap(); let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Valid) .with_latest_valid_hash(next_head.hash); - assert_matches!(valid_rx.await, Ok(result) => assert_eq!(result, expected_result)); + assert_eq!(result, expected_result); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } @@ -1079,18 +1106,20 @@ mod tests { let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); - let res = env - .send_forkchoice_updated(ForkchoiceState { + let result = env + .send_forkchoice_retry_on_syncing(ForkchoiceState { head_block_hash: block1.hash, finalized_block_hash: block1.hash, ..Default::default() }) - .await; + .await + .unwrap(); let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { validation_error: ExecutorError::BlockPreMerge { hash: block1.hash }.to_string(), }) .with_latest_valid_hash(H256::zero()); - assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); + + assert_eq!(result, expected_result); } } @@ -1167,11 +1196,11 @@ mod tests { assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); // Send new payload - let res = env.send_new_payload(block2.clone().into()).await; + let result = + env.send_new_payload_retry_on_syncing(block2.clone().into()).await.unwrap(); let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Valid) .with_latest_valid_hash(block2.hash); - assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); - + assert_eq!(result, expected_result); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } @@ -1261,12 +1290,14 @@ mod tests { assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); // Send new payload - let res = env.send_new_payload(block2.clone().into()).await; + let result = + env.send_new_payload_retry_on_syncing(block2.clone().into()).await.unwrap(); + let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid { validation_error: ExecutorError::BlockPreMerge { hash: block2.hash }.to_string(), }) .with_latest_valid_hash(H256::zero()); - assert_matches!(res, Ok(result) => assert_eq!(result, expected_result)); + assert_eq!(result, expected_result); assert_matches!(engine_rx.try_recv(), Err(TryRecvError::Empty)); } diff --git a/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs b/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs index 249899075..617de8145 100644 --- a/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs +++ b/crates/rpc/rpc-types/src/eth/engine/forkchoice.rs @@ -37,4 +37,19 @@ impl ForkchoiceUpdated { self.payload_id = Some(id); self } + + /// Returns true if the payload status is syncing. + pub fn is_syncing(&self) -> bool { + self.payload_status.is_syncing() + } + + /// Returns true if the payload status is valid. + pub fn is_valid(&self) -> bool { + self.payload_status.is_valid() + } + + /// Returns true if the payload status is invalid. + pub fn is_invalid(&self) -> bool { + self.payload_status.is_invalid() + } } diff --git a/crates/rpc/rpc-types/src/eth/engine/payload.rs b/crates/rpc/rpc-types/src/eth/engine/payload.rs index 445114931..17b68a6d7 100644 --- a/crates/rpc/rpc-types/src/eth/engine/payload.rs +++ b/crates/rpc/rpc-types/src/eth/engine/payload.rs @@ -268,6 +268,21 @@ impl PayloadStatus { self.latest_valid_hash = Some(latest_valid_hash); self } + + /// Returns true if the payload status is syncing. + pub fn is_syncing(&self) -> bool { + self.status.is_syncing() + } + + /// Returns true if the payload status is valid. + pub fn is_valid(&self) -> bool { + self.status.is_valid() + } + + /// Returns true if the payload status is invalid. + pub fn is_invalid(&self) -> bool { + self.status.is_invalid() + } } impl Serialize for PayloadStatus { @@ -348,6 +363,21 @@ impl PayloadStatusEnum { _ => None, } } + + /// Returns true if the payload status is syncing. + pub fn is_syncing(&self) -> bool { + matches!(self, PayloadStatusEnum::Syncing) + } + + /// Returns true if the payload status is valid. + pub fn is_valid(&self) -> bool { + matches!(self, PayloadStatusEnum::Valid) + } + + /// Returns true if the payload status is invalid. + pub fn is_invalid(&self) -> bool { + matches!(self, PayloadStatusEnum::Invalid { .. }) + } } #[cfg(test)] diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 92cab8fbb..d328b89d5 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -134,8 +134,11 @@ where } /// Set tip for reverse sync. + #[track_caller] pub fn set_tip(&self, tip: H256) { - self.tip_tx.as_ref().expect("tip sender is set").send(tip).expect("tip channel closed"); + let _ = self.tip_tx.as_ref().expect("tip sender is set").send(tip).map_err(|_| { + warn!(target: "sync::pipeline", "tip channel closed"); + }); } /// Listen for events on the pipeline. @@ -157,11 +160,11 @@ where } /// Consume the pipeline and run it. Return the pipeline and its result as a future. + #[track_caller] pub fn run_as_fut(mut self, db: Arc, tip: H256) -> PipelineFut { // TODO: fix this in a follow up PR. ideally, consensus engine would be responsible for // updating metrics. self.register_metrics(db.clone()); - Box::pin(async move { self.set_tip(tip); let result = self.run_loop(db).await; diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 7a21e166f..02954b7fc 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -76,6 +76,13 @@ pub trait TaskSpawner: Send + Sync + Unpin + std::fmt::Debug + DynClone { /// Spawns a blocking task onto the runtime. fn spawn_blocking(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>; + + /// This spawns a critical blocking task onto the runtime. + fn spawn_critical_blocking( + &self, + name: &'static str, + fut: BoxFuture<'static, ()>, + ) -> JoinHandle<()>; } dyn_clone::clone_trait_object!(TaskSpawner); @@ -97,6 +104,14 @@ impl TaskSpawner for TokioTaskExecutor { fn spawn_blocking(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { tokio::task::spawn_blocking(move || tokio::runtime::Handle::current().block_on(fut)) } + + fn spawn_critical_blocking( + &self, + _name: &'static str, + fut: BoxFuture<'static, ()>, + ) -> JoinHandle<()> { + tokio::task::spawn_blocking(move || tokio::runtime::Handle::current().block_on(fut)) + } } /// Many reth components require to spawn tasks for long-running jobs. For example `discovery` @@ -353,6 +368,14 @@ impl TaskSpawner for TaskExecutor { fn spawn_blocking(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { self.spawn_blocking(fut) } + + fn spawn_critical_blocking( + &self, + name: &'static str, + fut: BoxFuture<'static, ()>, + ) -> JoinHandle<()> { + TaskExecutor::spawn_critical_blocking(self, name, fut) + } } /// Determines how a task is spawned