diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 428b95c0b..338a37f02 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -45,7 +45,6 @@ use std::{ use tokio::sync::{ mpsc, mpsc::{UnboundedReceiver, UnboundedSender}, - oneshot, }; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; @@ -381,40 +380,6 @@ where None } - /// Called to resolve chain forks and ensure that the Execution layer is working with the latest - /// valid chain. - /// - /// These responses should adhere to the [Engine API Spec for - /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1). - /// - /// Returns an error if an internal error occurred like a database error. - fn forkchoice_updated( - &mut self, - state: ForkchoiceState, - attrs: Option, - ) -> Result { - trace!(target: "consensus::engine", ?state, "Received new forkchoice state update"); - - // Pre-validate forkchoice state update and return if it's invalid or - // cannot be processed at the moment. - if let Some(on_updated) = self.pre_validate_forkchoice_update(state) { - return Ok(on_updated) - } - - let start = Instant::now(); - let make_canonical_result = self.blockchain.make_canonical(state.head_block_hash); - let elapsed = self.record_make_canonical_latency(start, &make_canonical_result); - - let status = self.on_forkchoice_updated_make_canonical_result( - state, - attrs, - make_canonical_result, - elapsed, - )?; - trace!(target: "consensus::engine", ?status, ?state, "Returning forkchoice status"); - Ok(status) - } - /// Process the result of attempting to make forkchoice state head hash canonical. /// /// # Returns @@ -519,56 +484,54 @@ where false } - /// Invoked when we receive a new forkchoice update message. + /// Invoked when we receive a new forkchoice update message. Calls into the blockchain tree + /// to resolve chain forks and ensure that the Execution Layer is working with the latest valid + /// chain. /// - /// Returns `true` if the engine now reached its maximum block number, See - /// [EngineSyncController::has_reached_max_block]. + /// These responses should adhere to the [Engine API Spec for + /// `engine_forkchoiceUpdated`](https://github.com/ethereum/execution-apis/blob/main/src/engine/paris.md#specification-1). + /// + /// Returns an error if an internal error occurred like a database error. fn on_forkchoice_updated( &mut self, state: ForkchoiceState, attrs: Option, - tx: oneshot::Sender>, - ) -> Result { + ) -> Result { self.metrics.forkchoice_updated_messages.increment(1); self.blockchain.on_forkchoice_update_received(&state); + trace!(target: "consensus::engine", ?state, "Received new forkchoice state update"); - let on_updated = match self.forkchoice_updated(state, attrs) { - Ok(response) => response, - Err(error) => { - if error.is_fatal() { - // FCU resulted in a fatal error from which we can't recover - let err = error.clone(); - let _ = tx.send(Err(RethError::Canonical(error))); - return Err(err) - } - let _ = tx.send(Err(RethError::Canonical(error))); - return Ok(OnForkchoiceUpdateOutcome::Processed) - } - }; + // Pre-validate forkchoice state update and return if it's invalid or + // cannot be processed at the moment. + if let Some(on_updated) = self.pre_validate_forkchoice_update(state) { + return Ok(on_updated) + } - let fcu_status = on_updated.forkchoice_status(); + let start = Instant::now(); + let make_canonical_result = self.blockchain.make_canonical(state.head_block_hash); + let elapsed = self.record_make_canonical_latency(start, &make_canonical_result); - // update the forkchoice state tracker - self.forkchoice_state_tracker.set_latest(state, fcu_status); + let status = self.on_forkchoice_updated_make_canonical_result( + state, + attrs, + make_canonical_result, + elapsed, + )?; + trace!(target: "consensus::engine", ?status, ?state, "Returning forkchoice status"); + Ok(status) + } - // send the response to the CL ASAP - let _ = tx.send(Ok(on_updated)); - - match fcu_status { + /// Called after the forkchoice update status has been resolved. + /// Depending on the outcome, the method updates the sync state and notifies the listeners + /// about new processed FCU. + fn on_forkchoice_updated_status(&mut self, state: ForkchoiceState, status: ForkchoiceStatus) { + match status { ForkchoiceStatus::Invalid => {} ForkchoiceStatus::Valid => { // FCU head is valid, we're no longer syncing self.sync_state_updater.update_sync_state(SyncState::Idle); // node's fully synced, clear active download requests self.sync.clear_block_download_requests(); - - // check if we reached the maximum configured block - let tip_number = self.blockchain.canonical_tip().number; - if self.sync.has_reached_max_block(tip_number) { - // Terminate the sync early if it's reached the maximum user - // configured block. - return Ok(OnForkchoiceUpdateOutcome::ReachedMaxBlock) - } } ForkchoiceStatus::Syncing => { // we're syncing @@ -577,9 +540,7 @@ where } // notify listeners about new processed FCU - self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, fcu_status)); - - Ok(OnForkchoiceUpdateOutcome::Processed) + self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status)); } /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less @@ -966,7 +927,7 @@ where /// /// If the newest head is not invalid, then this will trigger a new pipeline run to sync the gap /// - /// See [Self::forkchoice_updated] and [BlockchainTreeEngine::make_canonical]. + /// See [Self::on_forkchoice_updated] and [BlockchainTreeEngine::make_canonical]. fn on_failed_canonical_forkchoice_update( &mut self, state: &ForkchoiceState, @@ -1758,17 +1719,34 @@ where if let Poll::Ready(Some(msg)) = this.engine_message_rx.poll_next_unpin(cx) { match msg { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { - match this.on_forkchoice_updated(state, payload_attrs, tx) { - Ok(OnForkchoiceUpdateOutcome::Processed) => {} - Ok(OnForkchoiceUpdateOutcome::ReachedMaxBlock) => { - // reached the max block, we can terminate the future - return Poll::Ready(Ok(())) + match this.on_forkchoice_updated(state, payload_attrs) { + Ok(on_updated) => { + let fcu_status = on_updated.forkchoice_status(); + // update the forkchoice state tracker + this.forkchoice_state_tracker.set_latest(state, fcu_status); + // send the response to the CL ASAP + let _ = tx.send(Ok(on_updated)); + + if fcu_status.is_valid() { + let tip_number = this.blockchain.canonical_tip().number; + if this.sync.has_reached_max_block(tip_number) { + // Terminate the sync early if it's reached the + // maximum user configured block. + return Poll::Ready(Ok(())) + } + } + + this.on_forkchoice_updated_status(state, fcu_status); } - Err(err) => { - // fatal error, we can terminate the future - return Poll::Ready(Err(RethError::Canonical(err).into())) + Err(error) => { + if error.is_fatal() { + // fatal error, we can terminate the future + let _ = tx.send(Err(RethError::Canonical(error.clone()))); + return Poll::Ready(Err(RethError::Canonical(error).into())) + } + let _ = tx.send(Err(RethError::Canonical(error))); } - } + }; } BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => { this.metrics.new_payload_messages.increment(1); @@ -1828,15 +1806,6 @@ where } } -/// Represents all outcomes of an applied fork choice update. -#[derive(Debug)] -enum OnForkchoiceUpdateOutcome { - /// FCU was processed successfully. - Processed, - /// FCU was processed successfully and reached max block. - ReachedMaxBlock, -} - /// Represents outcomes of processing a sync event #[derive(Debug)] enum SyncEventOutcome {