perf: optimize engine poll loop (#4655)

This commit is contained in:
Matthias Seitz
2023-09-19 17:16:04 +02:00
committed by GitHub
parent d846199525
commit a96dbb476c

View File

@ -38,7 +38,7 @@ use reth_tasks::TaskSpawner;
use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
task::{ready, Context, Poll},
time::Instant,
};
use tokio::sync::{
@ -1723,9 +1723,8 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// Process all incoming messages from the CL, these can affect the state of the
// SyncController, hence they are polled first, and they're also time sensitive.
loop {
// Control loop that advances the state
'main: loop {
// Poll a running hook with db write access first, as we will not be able to process
// any engine messages until it's finished.
if let Poll::Ready(result) = this.hooks.poll_running_hook_with_db_write(
@ -1737,12 +1736,11 @@ where
}
}
let mut engine_messages_pending = false;
let mut sync_pending = false;
// handle next engine message
match this.engine_message_rx.poll_next_unpin(cx) {
Poll::Ready(Some(msg)) => match msg {
// Process all incoming messages from the CL, these can affect the state of the
// SyncController, hence they are polled first, and they're also time sensitive, hence
// they're always drained first.
while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) {
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
match this.on_forkchoice_updated(state, payload_attrs, tx) {
OnForkchoiceUpdateOutcome::Processed => {}
@ -1767,13 +1765,6 @@ where
BeaconEngineMessage::EventListener(tx) => {
this.listeners.push_listener(tx);
}
},
Poll::Ready(None) => {
unreachable!("Engine holds the a sender to the message channel")
}
Poll::Pending => {
// no more CL messages to process
engine_messages_pending = true;
}
}
@ -1783,36 +1774,38 @@ where
if let Some(res) = this.on_sync_event(sync_event) {
return Poll::Ready(res)
}
// this could have taken a while, so we start the next cycle to handle any new
// engine messages
continue 'main
}
Poll::Pending => {
// no more sync events to process
sync_pending = true;
}
}
// we're pending if both engine messages and sync events are pending (fully drained)
let is_pending = engine_messages_pending && sync_pending;
// at this point, all engine messages and sync events are fully drained
// Poll next hook if all conditions are met:
// 1. No engine and sync messages are pending
// 1. Engine and sync messages are fully drained (both pending)
// 2. Latest FCU status is not INVALID
if is_pending && !this.forkchoice_state_tracker.is_latest_invalid() {
if let Poll::Ready(result) = this.hooks.poll_next_hook(
if !this.forkchoice_state_tracker.is_latest_invalid() {
let action = ready!(this.hooks.poll_next_hook(
cx,
EngineContext { tip_block_number: this.blockchain.canonical_tip().number },
this.sync.is_pipeline_active(),
) {
if let Err(err) = this.on_hook_action(result?) {
return Poll::Ready(Err(err))
}
))?;
if let Err(err) = this.on_hook_action(action) {
return Poll::Ready(Err(err))
}
// ensure we're polling until pending while also checking for new engine messages
// before polling the next hook
continue 'main
}
if is_pending {
// incoming engine messages and sync events are drained, so we can yield back
// control
return Poll::Pending
}
// incoming engine messages and sync events are drained, so we can yield back
// control
return Poll::Pending
}
}
}