fix(engine): remove block execution error matching (#7735)

This commit is contained in:
Roman Krasiuk
2024-04-19 12:08:41 +02:00
committed by GitHub
parent 04aef71d72
commit e7945f9219

View File

@ -15,8 +15,9 @@ use reth_interfaces::{
BlockStatus, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk, BlockStatus, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk,
}, },
consensus::ForkchoiceState, consensus::ForkchoiceState,
executor::{BlockExecutionError, BlockValidationError}, executor::BlockValidationError,
p2p::{bodies::client::BodiesClient, headers::client::HeadersClient}, p2p::{bodies::client::BodiesClient, headers::client::HeadersClient},
provider::ProviderResult,
sync::{NetworkSyncUpdater, SyncState}, sync::{NetworkSyncUpdater, SyncState},
RethError, RethResult, RethError, RethResult,
}; };
@ -391,7 +392,7 @@ where
&mut self, &mut self,
state: ForkchoiceState, state: ForkchoiceState,
attrs: Option<EngineT::PayloadAttributes>, attrs: Option<EngineT::PayloadAttributes>,
) -> RethResult<OnForkChoiceUpdated> { ) -> Result<OnForkChoiceUpdated, CanonicalError> {
trace!(target: "consensus::engine", ?state, "Received new forkchoice state update"); trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");
// Pre-validate forkchoice state update and return if it's invalid or // Pre-validate forkchoice state update and return if it's invalid or
@ -425,7 +426,7 @@ where
mut attrs: Option<EngineT::PayloadAttributes>, mut attrs: Option<EngineT::PayloadAttributes>,
make_canonical_result: Result<CanonicalOutcome, CanonicalError>, make_canonical_result: Result<CanonicalOutcome, CanonicalError>,
elapsed: Duration, elapsed: Duration,
) -> RethResult<OnForkChoiceUpdated> { ) -> Result<OnForkChoiceUpdated, CanonicalError> {
match make_canonical_result { match make_canonical_result {
Ok(outcome) => { Ok(outcome) => {
let should_update_head = match &outcome { let should_update_head = match &outcome {
@ -469,7 +470,7 @@ where
Err(err) => { Err(err) => {
if err.is_fatal() { if err.is_fatal() {
error!(target: "consensus::engine", %err, "Encountered fatal error"); error!(target: "consensus::engine", %err, "Encountered fatal error");
Err(err.into()) Err(err)
} else { } else {
Ok(OnForkChoiceUpdated::valid( Ok(OnForkChoiceUpdated::valid(
self.on_failed_canonical_forkchoice_update(&state, err), self.on_failed_canonical_forkchoice_update(&state, err),
@ -527,23 +528,21 @@ where
state: ForkchoiceState, state: ForkchoiceState,
attrs: Option<EngineT::PayloadAttributes>, attrs: Option<EngineT::PayloadAttributes>,
tx: oneshot::Sender<Result<OnForkChoiceUpdated, RethError>>, tx: oneshot::Sender<Result<OnForkChoiceUpdated, RethError>>,
) -> OnForkchoiceUpdateOutcome { ) -> Result<OnForkchoiceUpdateOutcome, CanonicalError> {
self.metrics.forkchoice_updated_messages.increment(1); self.metrics.forkchoice_updated_messages.increment(1);
self.blockchain.on_forkchoice_update_received(&state); self.blockchain.on_forkchoice_update_received(&state);
let on_updated = match self.forkchoice_updated(state, attrs) { let on_updated = match self.forkchoice_updated(state, attrs) {
Ok(response) => response, Ok(response) => response,
Err(error) => { Err(error) => {
if let RethError::Execution(ref err) = error { if error.is_fatal() {
if err.is_fatal() { // FCU resulted in a fatal error from which we can't recover
// FCU resulted in a fatal error from which we can't recover let err = error.clone();
let err = err.clone(); let _ = tx.send(Err(RethError::Canonical(error)));
let _ = tx.send(Err(error)); return Err(err)
return OnForkchoiceUpdateOutcome::Fatal(err)
}
} }
let _ = tx.send(Err(error)); let _ = tx.send(Err(RethError::Canonical(error)));
return OnForkchoiceUpdateOutcome::Processed return Ok(OnForkchoiceUpdateOutcome::Processed)
} }
}; };
@ -568,7 +567,7 @@ where
if self.sync.has_reached_max_block(tip_number) { if self.sync.has_reached_max_block(tip_number) {
// Terminate the sync early if it's reached the maximum user // Terminate the sync early if it's reached the maximum user
// configured block. // configured block.
return OnForkchoiceUpdateOutcome::ReachedMaxBlock return Ok(OnForkchoiceUpdateOutcome::ReachedMaxBlock)
} }
} }
ForkchoiceStatus::Syncing => { ForkchoiceStatus::Syncing => {
@ -580,7 +579,7 @@ where
// notify listeners about new processed FCU // notify listeners about new processed FCU
self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, fcu_status)); self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, fcu_status));
OnForkchoiceUpdateOutcome::Processed Ok(OnForkchoiceUpdateOutcome::Processed)
} }
/// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less /// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
@ -844,7 +843,7 @@ where
fn ensure_consistent_forkchoice_state( fn ensure_consistent_forkchoice_state(
&mut self, &mut self,
state: ForkchoiceState, state: ForkchoiceState,
) -> RethResult<Option<OnForkChoiceUpdated>> { ) -> ProviderResult<Option<OnForkChoiceUpdated>> {
// Ensure that the finalized block, if not zero, is known and in the canonical chain // Ensure that the finalized block, if not zero, is known and in the canonical chain
// after the head block is canonicalized. // after the head block is canonicalized.
// //
@ -924,17 +923,17 @@ where
/// ///
/// Returns an error if the block is not found. /// Returns an error if the block is not found.
#[inline] #[inline]
fn update_safe_block(&self, safe_block_hash: B256) -> RethResult<()> { fn update_safe_block(&self, safe_block_hash: B256) -> ProviderResult<()> {
if !safe_block_hash.is_zero() { if !safe_block_hash.is_zero() {
if self.blockchain.safe_block_hash()? == Some(safe_block_hash) { if self.blockchain.safe_block_hash()? == Some(safe_block_hash) {
// nothing to update // nothing to update
return Ok(()) return Ok(())
} }
let safe = let safe = self
self.blockchain.find_block_by_hash(safe_block_hash, BlockSource::Any)?.ok_or_else( .blockchain
|| RethError::Provider(ProviderError::UnknownBlockHash(safe_block_hash)), .find_block_by_hash(safe_block_hash, BlockSource::Any)?
)?; .ok_or_else(|| ProviderError::UnknownBlockHash(safe_block_hash))?;
self.blockchain.set_safe(safe.header.seal(safe_block_hash)); self.blockchain.set_safe(safe.header.seal(safe_block_hash));
} }
Ok(()) Ok(())
@ -944,7 +943,7 @@ where
/// ///
/// Returns an error if the block is not found. /// Returns an error if the block is not found.
#[inline] #[inline]
fn update_finalized_block(&self, finalized_block_hash: B256) -> RethResult<()> { fn update_finalized_block(&self, finalized_block_hash: B256) -> ProviderResult<()> {
if !finalized_block_hash.is_zero() { if !finalized_block_hash.is_zero() {
if self.blockchain.finalized_block_hash()? == Some(finalized_block_hash) { if self.blockchain.finalized_block_hash()? == Some(finalized_block_hash) {
// nothing to update // nothing to update
@ -954,9 +953,7 @@ where
let finalized = self let finalized = self
.blockchain .blockchain
.find_block_by_hash(finalized_block_hash, BlockSource::Any)? .find_block_by_hash(finalized_block_hash, BlockSource::Any)?
.ok_or_else(|| { .ok_or_else(|| ProviderError::UnknownBlockHash(finalized_block_hash))?;
RethError::Provider(ProviderError::UnknownBlockHash(finalized_block_hash))
})?;
self.blockchain.finalize_block(finalized.number); self.blockchain.finalize_block(finalized.number);
self.blockchain.set_finalized(finalized.header.seal(finalized_block_hash)); self.blockchain.set_finalized(finalized.header.seal(finalized_block_hash));
} }
@ -1762,14 +1759,14 @@ where
match msg { match msg {
BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => {
match this.on_forkchoice_updated(state, payload_attrs, tx) { match this.on_forkchoice_updated(state, payload_attrs, tx) {
OnForkchoiceUpdateOutcome::Processed => {} Ok(OnForkchoiceUpdateOutcome::Processed) => {}
OnForkchoiceUpdateOutcome::ReachedMaxBlock => { Ok(OnForkchoiceUpdateOutcome::ReachedMaxBlock) => {
// reached the max block, we can terminate the future // reached the max block, we can terminate the future
return Poll::Ready(Ok(())) return Poll::Ready(Ok(()))
} }
OnForkchoiceUpdateOutcome::Fatal(err) => { Err(err) => {
// fatal error, we can terminate the future // fatal error, we can terminate the future
return Poll::Ready(Err(RethError::Execution(err).into())) return Poll::Ready(Err(RethError::Canonical(err).into()))
} }
} }
} }
@ -1838,8 +1835,6 @@ enum OnForkchoiceUpdateOutcome {
Processed, Processed,
/// FCU was processed successfully and reached max block. /// FCU was processed successfully and reached max block.
ReachedMaxBlock, ReachedMaxBlock,
/// FCU resulted in a __fatal__ block execution error from which we can't recover.
Fatal(BlockExecutionError),
} }
/// Represents outcomes of processing a sync event /// Represents outcomes of processing a sync event