fix: poll sync controller once if pipeline sync is pending (#3179)

This commit is contained in:
Matthias Seitz
2023-06-15 20:51:24 +02:00
committed by GitHub
parent ab2a38a549
commit 691d0f91a1
2 changed files with 45 additions and 16 deletions

View File

@ -1244,29 +1244,53 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// Process all incoming messages first.
while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) {
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
if this.on_forkchoice_updated(state, payload_attrs, tx) {
return Poll::Ready(Ok(()))
// Process all incoming messages from the CL, these can affect the state of the
// SyncController, hence they are polled first, and they're also time sensitive.
loop {
// If a new pipeline run is pending we poll the sync controller first so that it takes
// precedence over any FCU messages. This ensures that a queued pipeline run via
// [EngineSyncController::set_pipeline_sync_target] are processed before any forkchoice
// updates.
if this.sync.is_pipeline_sync_pending() {
// the next event is guaranteed to be a [EngineSyncEvent::PipelineStarted]
if let Poll::Ready(sync_event) = this.sync.poll(cx) {
if let Some(res) = this.on_sync_event(sync_event) {
return Poll::Ready(res)
}
}
BeaconEngineMessage::NewPayload { payload, tx } => {
this.metrics.new_payload_messages.increment(1);
let res = this.on_new_payload(payload);
let _ = tx.send(res);
}
// handle next engine message, else exit the loop
match this.engine_message_rx.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
if this.on_forkchoice_updated(state, payload_attrs, tx) {
return Poll::Ready(Ok(()))
}
}
BeaconEngineMessage::NewPayload { payload, tx } => {
this.metrics.new_payload_messages.increment(1);
let res = this.on_new_payload(payload);
let _ = tx.send(res);
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
this.blockchain.on_transition_configuration_exchanged();
}
BeaconEngineMessage::EventListener(tx) => {
this.listeners.push_listener(tx);
}
},
Poll::Ready(None) => {
unreachable!("Engine holds the a sender to the message channel")
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
this.blockchain.on_transition_configuration_exchanged();
}
BeaconEngineMessage::EventListener(tx) => {
this.listeners.push_listener(tx);
Poll::Pending => {
// no more CL messages to process
break
}
}
}
// poll sync controller
// drain the sync controller
while let Poll::Ready(sync_event) = this.sync.poll(cx) {
if let Some(res) = this.on_sync_event(sync_event) {
return Poll::Ready(res)

View File

@ -106,6 +106,11 @@ where
self.run_pipeline_continuously
}
/// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
pub(crate) fn is_pipeline_sync_pending(&self) -> bool {
self.pending_pipeline_target.is_some() && self.pipeline_state.is_idle()
}
/// Returns `true` if the pipeline is idle.
pub(crate) fn is_pipeline_idle(&self) -> bool {
self.pipeline_state.is_idle()