feat(engine): poll running hook between CL messages (#4839)

This commit is contained in:
Alexey Shekhirin
2023-09-29 09:20:22 +01:00
committed by GitHub
parent e84adc5531
commit 8f3e4a1573

View File

@ -1790,48 +1790,60 @@ where
// Control loop that advances the state
'main: loop {
// Poll a running hook with db write access first, as we will not be able to process
// any engine messages until it's finished.
if let Poll::Ready(result) = this.hooks.poll_running_hook_with_db_write(
cx,
EngineContext {
tip_block_number: this.blockchain.canonical_tip().number,
finalized_block_number: this.blockchain.finalized_block_number()?,
},
)? {
this.on_hook_result(result)?;
}
// Poll a running hook with db write access (if any) and CL messages first, draining
// both and then proceeding to polling other parts such as SyncController and hooks.
loop {
// Poll a running hook with db write access first, as we will not be able to process
// any engine messages until it's finished.
if let Poll::Ready(result) = this.hooks.poll_running_hook_with_db_write(
cx,
EngineContext {
tip_block_number: this.blockchain.canonical_tip().number,
finalized_block_number: this.blockchain.finalized_block_number()?,
},
)? {
this.on_hook_result(result)?;
continue
}
// Process all incoming messages from the CL, these can affect the state of the
// SyncController, hence they are polled first, and they're also time sensitive, hence
// they're always drained first.
while let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) {
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
match this.on_forkchoice_updated(state, payload_attrs, tx) {
OnForkchoiceUpdateOutcome::Processed => {}
OnForkchoiceUpdateOutcome::ReachedMaxBlock => {
// reached the max block, we can terminate the future
return Poll::Ready(Ok(()))
}
OnForkchoiceUpdateOutcome::Fatal(err) => {
// fatal error, we can terminate the future
return Poll::Ready(Err(RethError::Execution(err).into()))
// Process one incoming message from the CL. We don't drain the messages right away,
// because we want to sneak a polling of running hook in between them.
//
// These messages can affect the state of the SyncController and they're also time
// sensitive, hence they are polled first.
if let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) {
match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
match this.on_forkchoice_updated(state, payload_attrs, tx) {
OnForkchoiceUpdateOutcome::Processed => {}
OnForkchoiceUpdateOutcome::ReachedMaxBlock => {
// reached the max block, we can terminate the future
return Poll::Ready(Ok(()))
}
OnForkchoiceUpdateOutcome::Fatal(err) => {
// fatal error, we can terminate the future
return Poll::Ready(Err(RethError::Execution(err).into()))
}
}
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
this.metrics.new_payload_messages.increment(1);
let res = this.on_new_payload(payload, cancun_fields);
let _ = tx.send(res);
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
this.blockchain.on_transition_configuration_exchanged();
}
BeaconEngineMessage::EventListener(tx) => {
this.listeners.push_listener(tx);
}
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
this.metrics.new_payload_messages.increment(1);
let res = this.on_new_payload(payload, cancun_fields);
let _ = tx.send(res);
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
this.blockchain.on_transition_configuration_exchanged();
}
BeaconEngineMessage::EventListener(tx) => {
this.listeners.push_listener(tx);
}
continue
}
// Both running hook with db write access and engine messages are pending,
// proceed to other polls
break
}
// process sync events if any