diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index bb3696050..8a1a4266a 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -15,8 +15,9 @@ use reth_interfaces::{ BlockStatus, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk, }, consensus::ForkchoiceState, - executor::{BlockExecutionError, BlockValidationError}, + executor::BlockValidationError, p2p::{bodies::client::BodiesClient, headers::client::HeadersClient}, + provider::ProviderResult, sync::{NetworkSyncUpdater, SyncState}, RethError, RethResult, }; @@ -391,7 +392,7 @@ where &mut self, state: ForkchoiceState, attrs: Option, - ) -> RethResult { + ) -> Result { trace!(target: "consensus::engine", ?state, "Received new forkchoice state update"); // Pre-validate forkchoice state update and return if it's invalid or @@ -425,7 +426,7 @@ where mut attrs: Option, make_canonical_result: Result, elapsed: Duration, - ) -> RethResult { + ) -> Result { match make_canonical_result { Ok(outcome) => { let should_update_head = match &outcome { @@ -469,7 +470,7 @@ where Err(err) => { if err.is_fatal() { error!(target: "consensus::engine", %err, "Encountered fatal error"); - Err(err.into()) + Err(err) } else { Ok(OnForkChoiceUpdated::valid( self.on_failed_canonical_forkchoice_update(&state, err), @@ -527,23 +528,21 @@ where state: ForkchoiceState, attrs: Option, tx: oneshot::Sender>, - ) -> OnForkchoiceUpdateOutcome { + ) -> Result { self.metrics.forkchoice_updated_messages.increment(1); self.blockchain.on_forkchoice_update_received(&state); let on_updated = match self.forkchoice_updated(state, attrs) { Ok(response) => response, Err(error) => { - if let RethError::Execution(ref err) = error { - if err.is_fatal() { - // FCU resulted in a fatal error from which we can't recover - let err = err.clone(); - let _ = tx.send(Err(error)); - return OnForkchoiceUpdateOutcome::Fatal(err) - } + if error.is_fatal() { + // FCU resulted in a fatal error from which we can't recover + let err = error.clone(); + let _ = tx.send(Err(RethError::Canonical(error))); + return Err(err) } - let _ = tx.send(Err(error)); - return OnForkchoiceUpdateOutcome::Processed + let _ = tx.send(Err(RethError::Canonical(error))); + return Ok(OnForkchoiceUpdateOutcome::Processed) } }; @@ -568,7 +567,7 @@ where if self.sync.has_reached_max_block(tip_number) { // Terminate the sync early if it's reached the maximum user // configured block. - return OnForkchoiceUpdateOutcome::ReachedMaxBlock + return Ok(OnForkchoiceUpdateOutcome::ReachedMaxBlock) } } ForkchoiceStatus::Syncing => { @@ -580,7 +579,7 @@ where // notify listeners about new processed FCU self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, fcu_status)); - OnForkchoiceUpdateOutcome::Processed + Ok(OnForkchoiceUpdateOutcome::Processed) } /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less @@ -844,7 +843,7 @@ where fn ensure_consistent_forkchoice_state( &mut self, state: ForkchoiceState, - ) -> RethResult> { + ) -> ProviderResult> { // Ensure that the finalized block, if not zero, is known and in the canonical chain // after the head block is canonicalized. // @@ -924,17 +923,17 @@ where /// /// Returns an error if the block is not found. #[inline] - fn update_safe_block(&self, safe_block_hash: B256) -> RethResult<()> { + fn update_safe_block(&self, safe_block_hash: B256) -> ProviderResult<()> { if !safe_block_hash.is_zero() { if self.blockchain.safe_block_hash()? == Some(safe_block_hash) { // nothing to update return Ok(()) } - let safe = - self.blockchain.find_block_by_hash(safe_block_hash, BlockSource::Any)?.ok_or_else( - || RethError::Provider(ProviderError::UnknownBlockHash(safe_block_hash)), - )?; + let safe = self + .blockchain + .find_block_by_hash(safe_block_hash, BlockSource::Any)? + .ok_or_else(|| ProviderError::UnknownBlockHash(safe_block_hash))?; self.blockchain.set_safe(safe.header.seal(safe_block_hash)); } Ok(()) @@ -944,7 +943,7 @@ where /// /// Returns an error if the block is not found. #[inline] - fn update_finalized_block(&self, finalized_block_hash: B256) -> RethResult<()> { + fn update_finalized_block(&self, finalized_block_hash: B256) -> ProviderResult<()> { if !finalized_block_hash.is_zero() { if self.blockchain.finalized_block_hash()? == Some(finalized_block_hash) { // nothing to update @@ -954,9 +953,7 @@ where let finalized = self .blockchain .find_block_by_hash(finalized_block_hash, BlockSource::Any)? - .ok_or_else(|| { - RethError::Provider(ProviderError::UnknownBlockHash(finalized_block_hash)) - })?; + .ok_or_else(|| ProviderError::UnknownBlockHash(finalized_block_hash))?; self.blockchain.finalize_block(finalized.number); self.blockchain.set_finalized(finalized.header.seal(finalized_block_hash)); } @@ -1762,14 +1759,14 @@ where match msg { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { match this.on_forkchoice_updated(state, payload_attrs, tx) { - OnForkchoiceUpdateOutcome::Processed => {} - OnForkchoiceUpdateOutcome::ReachedMaxBlock => { + Ok(OnForkchoiceUpdateOutcome::Processed) => {} + Ok(OnForkchoiceUpdateOutcome::ReachedMaxBlock) => { // reached the max block, we can terminate the future return Poll::Ready(Ok(())) } - OnForkchoiceUpdateOutcome::Fatal(err) => { + Err(err) => { // fatal error, we can terminate the future - return Poll::Ready(Err(RethError::Execution(err).into())) + return Poll::Ready(Err(RethError::Canonical(err).into())) } } } @@ -1838,8 +1835,6 @@ enum OnForkchoiceUpdateOutcome { Processed, /// FCU was processed successfully and reached max block. ReachedMaxBlock, - /// FCU resulted in a __fatal__ block execution error from which we can't recover. - Fatal(BlockExecutionError), } /// Represents outcomes of processing a sync event