chore(engine): flatten fcu processing (#8027)

This commit is contained in:
Roman Krasiuk
2024-05-01 21:23:21 +02:00
committed by GitHub
parent 9ae9af484d
commit 9d2ca45c30

View File

@ -45,7 +45,6 @@ use std::{
use tokio::sync::{ use tokio::sync::{
mpsc, mpsc,
mpsc::{UnboundedReceiver, UnboundedSender}, mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
}; };
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::*; use tracing::*;
@ -381,40 +380,6 @@ where
None None
} }
/// Called to resolve chain forks and ensure that the Execution layer is working with the latest
/// valid chain.
///
/// These responses should adhere to the [Engine API Spec for
/// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
///
/// Returns an error if an internal error occurred like a database error.
fn forkchoice_updated(
&mut self,
state: ForkchoiceState,
attrs: Option<EngineT::PayloadAttributes>,
) -> Result<OnForkChoiceUpdated, CanonicalError> {
trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");
// Pre-validate forkchoice state update and return if it's invalid or
// cannot be processed at the moment.
if let Some(on_updated) = self.pre_validate_forkchoice_update(state) {
return Ok(on_updated)
}
let start = Instant::now();
let make_canonical_result = self.blockchain.make_canonical(state.head_block_hash);
let elapsed = self.record_make_canonical_latency(start, &make_canonical_result);
let status = self.on_forkchoice_updated_make_canonical_result(
state,
attrs,
make_canonical_result,
elapsed,
)?;
trace!(target: "consensus::engine", ?status, ?state, "Returning forkchoice status");
Ok(status)
}
/// Process the result of attempting to make forkchoice state head hash canonical. /// Process the result of attempting to make forkchoice state head hash canonical.
/// ///
/// # Returns /// # Returns
@ -519,56 +484,54 @@ where
false false
} }
/// Invoked when we receive a new forkchoice update message. /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree
/// to resolve chain forks and ensure that the Execution Layer is working with the latest valid
/// chain.
/// ///
/// Returns `true` if the engine now reached its maximum block number, See /// These responses should adhere to the [Engine API Spec for
/// [EngineSyncController::has_reached_max_block]. /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1).
///
/// Returns an error if an internal error occurred like a database error.
fn on_forkchoice_updated( fn on_forkchoice_updated(
&mut self, &mut self,
state: ForkchoiceState, state: ForkchoiceState,
attrs: Option<EngineT::PayloadAttributes>, attrs: Option<EngineT::PayloadAttributes>,
tx: oneshot::Sender<Result<OnForkChoiceUpdated, RethError>>, ) -> Result<OnForkChoiceUpdated, CanonicalError> {
) -> Result<OnForkchoiceUpdateOutcome, CanonicalError> {
self.metrics.forkchoice_updated_messages.increment(1); self.metrics.forkchoice_updated_messages.increment(1);
self.blockchain.on_forkchoice_update_received(&state); self.blockchain.on_forkchoice_update_received(&state);
trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");
let on_updated = match self.forkchoice_updated(state, attrs) { // Pre-validate forkchoice state update and return if it's invalid or
Ok(response) => response, // cannot be processed at the moment.
Err(error) => { if let Some(on_updated) = self.pre_validate_forkchoice_update(state) {
if error.is_fatal() { return Ok(on_updated)
// FCU resulted in a fatal error from which we can't recover }
let err = error.clone();
let _ = tx.send(Err(RethError::Canonical(error)));
return Err(err)
}
let _ = tx.send(Err(RethError::Canonical(error)));
return Ok(OnForkchoiceUpdateOutcome::Processed)
}
};
let fcu_status = on_updated.forkchoice_status(); let start = Instant::now();
let make_canonical_result = self.blockchain.make_canonical(state.head_block_hash);
let elapsed = self.record_make_canonical_latency(start, &make_canonical_result);
// update the forkchoice state tracker let status = self.on_forkchoice_updated_make_canonical_result(
self.forkchoice_state_tracker.set_latest(state, fcu_status); state,
attrs,
make_canonical_result,
elapsed,
)?;
trace!(target: "consensus::engine", ?status, ?state, "Returning forkchoice status");
Ok(status)
}
// send the response to the CL ASAP /// Called after the forkchoice update status has been resolved.
let _ = tx.send(Ok(on_updated)); /// Depending on the outcome, the method updates the sync state and notifies the listeners
/// about new processed FCU.
match fcu_status { fn on_forkchoice_updated_status(&mut self, state: ForkchoiceState, status: ForkchoiceStatus) {
match status {
ForkchoiceStatus::Invalid => {} ForkchoiceStatus::Invalid => {}
ForkchoiceStatus::Valid => { ForkchoiceStatus::Valid => {
// FCU head is valid, we're no longer syncing // FCU head is valid, we're no longer syncing
self.sync_state_updater.update_sync_state(SyncState::Idle); self.sync_state_updater.update_sync_state(SyncState::Idle);
// node's fully synced, clear active download requests // node's fully synced, clear active download requests
self.sync.clear_block_download_requests(); self.sync.clear_block_download_requests();
// check if we reached the maximum configured block
let tip_number = self.blockchain.canonical_tip().number;
if self.sync.has_reached_max_block(tip_number) {
// Terminate the sync early if it's reached the maximum user
// configured block.
return Ok(OnForkchoiceUpdateOutcome::ReachedMaxBlock)
}
} }
ForkchoiceStatus::Syncing => { ForkchoiceStatus::Syncing => {
// we're syncing // we're syncing
@ -577,9 +540,7 @@ where
} }
// notify listeners about new processed FCU // notify listeners about new processed FCU
self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, fcu_status)); self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status));
Ok(OnForkchoiceUpdateOutcome::Processed)
} }
/// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
@ -966,7 +927,7 @@ where
/// ///
/// If the newest head is not invalid, then this will trigger a new pipeline run to sync the gap /// If the newest head is not invalid, then this will trigger a new pipeline run to sync the gap
/// ///
/// See [Self::forkchoice_updated] and [BlockchainTreeEngine::make_canonical]. /// See [Self::on_forkchoice_updated] and [BlockchainTreeEngine::make_canonical].
fn on_failed_canonical_forkchoice_update( fn on_failed_canonical_forkchoice_update(
&mut self, &mut self,
state: &ForkchoiceState, state: &ForkchoiceState,
@ -1758,17 +1719,34 @@ where
if let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) { if let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) {
match msg { match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
match this.on_forkchoice_updated(state, payload_attrs, tx) { match this.on_forkchoice_updated(state, payload_attrs) {
Ok(OnForkchoiceUpdateOutcome::Processed) => {} Ok(on_updated) => {
Ok(OnForkchoiceUpdateOutcome::ReachedMaxBlock) => { let fcu_status = on_updated.forkchoice_status();
// reached the max block, we can terminate the future // update the forkchoice state tracker
return Poll::Ready(Ok(())) this.forkchoice_state_tracker.set_latest(state, fcu_status);
// send the response to the CL ASAP
let _ = tx.send(Ok(on_updated));
if fcu_status.is_valid() {
let tip_number = this.blockchain.canonical_tip().number;
if this.sync.has_reached_max_block(tip_number) {
// Terminate the sync early if it's reached the
// maximum user configured block.
return Poll::Ready(Ok(()))
}
}
this.on_forkchoice_updated_status(state, fcu_status);
} }
Err(err) => { Err(error) => {
// fatal error, we can terminate the future if error.is_fatal() {
return Poll::Ready(Err(RethError::Canonical(err).into())) // fatal error, we can terminate the future
let _ = tx.send(Err(RethError::Canonical(error.clone())));
return Poll::Ready(Err(RethError::Canonical(error).into()))
}
let _ = tx.send(Err(RethError::Canonical(error)));
} }
} };
} }
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => { BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
this.metrics.new_payload_messages.increment(1); this.metrics.new_payload_messages.increment(1);
@ -1828,15 +1806,6 @@ where
} }
} }
/// Represents all outcomes of an applied fork choice update.
#[derive(Debug)]
enum OnForkchoiceUpdateOutcome {
/// FCU was processed successfully.
Processed,
/// FCU was processed successfully and reached max block.
ReachedMaxBlock,
}
/// Represents outcomes of processing a sync event /// Represents outcomes of processing a sync event
#[derive(Debug)] #[derive(Debug)]
enum SyncEventOutcome { enum SyncEventOutcome {