From 691d0f91a134026afcea969f7473dfa86fa354d5 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 15 Jun 2023 20:51:24 +0200 Subject: [PATCH] fix: poll sync controller once if pipeline sync is pending (#3179) --- crates/consensus/beacon/src/engine/mod.rs | 56 +++++++++++++++------- crates/consensus/beacon/src/engine/sync.rs | 5 ++ 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 43175b86f..ee784a050 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1244,29 +1244,53 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); - // Process all incoming messages first. - while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) { - match msg { - BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { - if this.on_forkchoice_updated(state, payload_attrs, tx) { - return Poll::Ready(Ok(())) + // Process all incoming messages from the CL, these can affect the state of the + // SyncController, hence they are polled first, and they're also time sensitive. + loop { + // If a new pipeline run is pending we poll the sync controller first so that it takes + // precedence over any FCU messages. This ensures that a queued pipeline run via + // [EngineSyncController::set_pipeline_sync_target] are processed before any forkchoice + // updates. + if this.sync.is_pipeline_sync_pending() { + // the next event is guaranteed to be a [EngineSyncEvent::PipelineStarted] + if let Poll::Ready(sync_event) = this.sync.poll(cx) { + if let Some(res) = this.on_sync_event(sync_event) { + return Poll::Ready(res) } } - BeaconEngineMessage::NewPayload { payload, tx } => { - this.metrics.new_payload_messages.increment(1); - let res = this.on_new_payload(payload); - let _ = tx.send(res); + } + + // handle next engine message, else exit the loop + match this.engine_message_rx.poll_next_unpin(cx) { + Poll::Ready(Some(msg)) => match msg { + BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { + if this.on_forkchoice_updated(state, payload_attrs, tx) { + return Poll::Ready(Ok(())) + } + } + BeaconEngineMessage::NewPayload { payload, tx } => { + this.metrics.new_payload_messages.increment(1); + let res = this.on_new_payload(payload); + let _ = tx.send(res); + } + BeaconEngineMessage::TransitionConfigurationExchanged => { + this.blockchain.on_transition_configuration_exchanged(); + } + BeaconEngineMessage::EventListener(tx) => { + this.listeners.push_listener(tx); + } + }, + Poll::Ready(None) => { + unreachable!("Engine holds the a sender to the message channel") } - BeaconEngineMessage::TransitionConfigurationExchanged => { - this.blockchain.on_transition_configuration_exchanged(); - } - BeaconEngineMessage::EventListener(tx) => { - this.listeners.push_listener(tx); + Poll::Pending => { + // no more CL messages to process + break } } } - // poll sync controller + // drain the sync controller while let Poll::Ready(sync_event) = this.sync.poll(cx) { if let Some(res) = this.on_sync_event(sync_event) { return Poll::Ready(res) diff --git a/crates/consensus/beacon/src/engine/sync.rs b/crates/consensus/beacon/src/engine/sync.rs index a093b57ba..70e99657c 100644 --- a/crates/consensus/beacon/src/engine/sync.rs +++ b/crates/consensus/beacon/src/engine/sync.rs @@ -106,6 +106,11 @@ where self.run_pipeline_continuously } + /// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`. + pub(crate) fn is_pipeline_sync_pending(&self) -> bool { + self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle() + } + /// Returns `true` if the pipeline is idle. pub(crate) fn is_pipeline_idle(&self) -> bool { self.pipeline_state.is_idle()