From a96dbb476c8587b40b26c1198f48785e93a535aa Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 19 Sep 2023 17:16:04 +0200 Subject: [PATCH] perf: optimize engine poll loop (#4655) --- crates/consensus/beacon/src/engine/mod.rs | 57 ++++++++++------------- 1 file changed, 25 insertions(+), 32 deletions(-) diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index fac3e3507..66df6a083 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -38,7 +38,7 @@ use reth_tasks::TaskSpawner; use std::{ pin::Pin, sync::Arc, - task::{Context, Poll}, + task::{ready, Context, Poll}, time::Instant, }; use tokio::sync::{ @@ -1723,9 +1723,8 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - // Process all incoming messages from the CL, these can affect the state of the - // SyncController, hence they are polled first, and they're also time sensitive. - loop { + // Control loop that advances the state + 'main: loop { // Poll a running hook with db write access first, as we will not be able to process // any engine messages until it's finished. if let Poll::Ready(result) = this.hooks.poll_running_hook_with_db_write( @@ -1737,12 +1736,11 @@ where } } - let mut engine_messages_pending = false; - let mut sync_pending = false; - - // handle next engine message - match this.engine_message_rx.poll_next_unpin(cx) { - Poll::Ready(Some(msg)) => match msg { + // Process all incoming messages from the CL, these can affect the state of the + // SyncController, hence they are polled first, and they're also time sensitive, hence + // they're always drained first. + while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) { + match msg { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { match this.on_forkchoice_updated(state, payload_attrs, tx) { OnForkchoiceUpdateOutcome::Processed => {} @@ -1767,13 +1765,6 @@ where BeaconEngineMessage::EventListener(tx) => { this.listeners.push_listener(tx); } - }, - Poll::Ready(None) => { - unreachable!("Engine holds the a sender to the message channel") - } - Poll::Pending => { - // no more CL messages to process - engine_messages_pending = true; } } @@ -1783,36 +1774,38 @@ where if let Some(res) = this.on_sync_event(sync_event) { return Poll::Ready(res) } + // this could have taken a while, so we start the next cycle to handle any new + // engine messages + continue 'main } Poll::Pending => { // no more sync events to process - sync_pending = true; } } - // we're pending if both engine messages and sync events are pending (fully drained) - let is_pending = engine_messages_pending && sync_pending; + // at this point, all engine messages and sync events are fully drained // Poll next hook if all conditions are met: - // 1. No engine and sync messages are pending + // 1. Engine and sync messages are fully drained (both pending) // 2. Latest FCU status is not INVALID - if is_pending && !this.forkchoice_state_tracker.is_latest_invalid() { - if let Poll::Ready(result) = this.hooks.poll_next_hook( + if !this.forkchoice_state_tracker.is_latest_invalid() { + let action = ready!(this.hooks.poll_next_hook( cx, EngineContext { tip_block_number: this.blockchain.canonical_tip().number }, this.sync.is_pipeline_active(), - ) { - if let Err(err) = this.on_hook_action(result?) { - return Poll::Ready(Err(err)) - } + ))?; + if let Err(err) = this.on_hook_action(action) { + return Poll::Ready(Err(err)) } + + // ensure we're polling until pending while also checking for new engine messages + // before polling the next hook + continue 'main } - if is_pending { - // incoming engine messages and sync events are drained, so we can yield back - // control - return Poll::Pending - } + // incoming engine messages and sync events are drained, so we can yield back + // control + return Poll::Pending } } }