feat(stage): consolidate Validation and ExecutionError variants in StageError enum (#5106)

This commit is contained in:
Thomas Coratger
2023-10-23 19:22:18 +02:00
committed by GitHub
parent 36dde36479
commit 6b79978747
6 changed files with 95 additions and 79 deletions

View File

@ -7,17 +7,28 @@ use reth_primitives::SealedHeader;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
/// Represents the specific error type within a block error.
#[derive(Error, Debug)]
pub enum BlockErrorKind {
/// The block encountered a validation error.
#[error("Validation error: {0}")]
Validation(#[source] consensus::ConsensusError),
/// The block encountered an execution error.
#[error("Execution error: {0}")]
Execution(#[source] executor::BlockExecutionError),
}
/// A stage execution error.
#[derive(Error, Debug)]
pub enum StageError {
/// The stage encountered a state validation error.
#[error("Stage encountered a validation error in block {number}: {error}.", number = block.number)]
Validation {
/// The block that failed validation.
/// The stage encountered an error related to a block.
#[error("Stage encountered a block error in block {number}: {error}.", number = block.number)]
Block {
/// The block that caused the error.
block: SealedHeader,
/// The underlying consensus error.
/// The specific error type, either consensus or execution error.
#[source]
error: consensus::ConsensusError,
error: BlockErrorKind,
},
/// The stage encountered a downloader error where the responses cannot be attached to the
/// current head.
@ -39,16 +50,6 @@ pub enum StageError {
/// The stage encountered a database error.
#[error("An internal database error occurred: {0}")]
Database(#[from] DbError),
#[error("Stage encountered a execution error in block {number}: {error}.", number = block.number)]
/// The stage encountered a execution error
// TODO: Probably redundant, should be rolled into `Validation`
ExecutionError {
/// The block that failed execution.
block: SealedHeader,
/// The underlying execution error.
#[source]
error: executor::BlockExecutionError,
},
/// Invalid pruning configuration
#[error(transparent)]
PruningConfiguration(#[from] reth_primitives::PruneSegmentError),

View File

@ -1,10 +1,9 @@
use crate::{
error::*, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage, StageError,
UnwindInput,
error::*, BlockErrorKind, ExecInput, ExecOutput, MetricEvent, MetricEventsSender, Stage,
StageError, UnwindInput,
};
use futures_util::Future;
use reth_db::database::Database;
use reth_interfaces::executor::BlockExecutionError;
use reth_primitives::{
constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH, stage::StageId, BlockNumber, ChainSpec, B256,
};
@ -423,53 +422,60 @@ where
.saturating_sub(BEACON_CONSENSUS_REORG_UNWIND_DEPTH)
.max(1);
Ok(ControlFlow::Unwind { target: unwind_to, bad_block: local_head })
} else if let StageError::Validation { block, error } = err {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
"Stage encountered a validation error: {error}"
);
} else if let StageError::Block { block, error } = err {
match error {
BlockErrorKind::Validation(validation_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
"Stage encountered a validation error: {validation_error}"
);
// FIXME: When handling errors, we do not commit the database transaction.
// This leads to the Merkle stage not clearing its
// checkpoint, and restarting from an invalid place.
drop(provider_rw);
provider_rw = factory.provider_rw().map_err(PipelineError::Interface)?;
provider_rw
.save_stage_checkpoint_progress(StageId::MerkleExecute, vec![])?;
provider_rw.save_stage_checkpoint(
StageId::MerkleExecute,
prev_checkpoint.unwrap_or_default(),
)?;
provider_rw.commit()?;
// FIXME: When handling errors, we do not commit the database
// transaction. This leads to the Merkle
// stage not clearing its checkpoint, and
// restarting from an invalid place.
drop(provider_rw);
provider_rw =
factory.provider_rw().map_err(PipelineError::Interface)?;
provider_rw.save_stage_checkpoint_progress(
StageId::MerkleExecute,
vec![],
)?;
provider_rw.save_stage_checkpoint(
StageId::MerkleExecute,
prev_checkpoint.unwrap_or_default(),
)?;
provider_rw.commit()?;
// We unwind because of a validation error. If the unwind itself fails,
// we bail entirely, otherwise we restart the execution loop from the
// beginning.
Ok(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
})
} else if let StageError::ExecutionError {
block,
error: BlockExecutionError::Validation(error),
} = err
{
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
"Stage encountered an execution error: {error}"
);
// We unwind because of a validation error. If the unwind itself
// fails, we bail entirely,
// otherwise we restart the execution loop from the
// beginning.
Ok(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
})
}
BlockErrorKind::Execution(execution_error) => {
error!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block.number,
"Stage encountered an execution error: {execution_error}"
);
// We unwind because of an execution error. If the unwind itself fails, we
// bail entirely, otherwise we restart the execution loop from the
// beginning.
Ok(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
})
// We unwind because of an execution error. If the unwind itself
// fails, we bail entirely,
// otherwise we restart
// the execution loop from the beginning.
Ok(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: block,
})
}
}
} else if err.is_fatal() {
error!(
target: "sync::pipeline",
@ -817,9 +823,11 @@ mod tests {
)
.add_stage(
TestStage::new(StageId::Other("B"))
.add_exec(Err(StageError::Validation {
.add_exec(Err(StageError::Block {
block: random_header(&mut generators::rng(), 5, Default::default()),
error: consensus::ConsensusError::BaseFeeMissing,
error: BlockErrorKind::Validation(
consensus::ConsensusError::BaseFeeMissing,
),
}))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))
.add_exec(Ok(ExecOutput { checkpoint: StageCheckpoint::new(10), done: true })),

View File

@ -1,6 +1,6 @@
use crate::{
stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, ExecInput, ExecOutput, MetricEvent,
MetricEventsSender, Stage, StageError, UnwindInput, UnwindOutput,
stages::MERKLE_STAGE_DEFAULT_CLEAN_THRESHOLD, BlockErrorKind, ExecInput, ExecOutput,
MetricEvent, MetricEventsSender, Stage, StageError, UnwindInput, UnwindOutput,
};
use num_traits::Zero;
use reth_db::{
@ -161,7 +161,10 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
// Execute the block
let (block, senders) = block.into_components();
executor.execute_and_verify_receipt(&block, td, Some(senders)).map_err(|error| {
StageError::ExecutionError { block: block.header.clone().seal_slow(), error }
StageError::Block {
block: block.header.clone().seal_slow(),
error: BlockErrorKind::Execution(error),
}
})?;
execution_duration += time.elapsed();

View File

@ -1,4 +1,4 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_codecs::Compact;
use reth_db::{
database::Database,
@ -88,12 +88,12 @@ impl MerkleStage {
Ok(())
} else {
warn!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Failed to verify block state root");
Err(StageError::Validation {
Err(StageError::Block {
block: expected.clone(),
error: consensus::ConsensusError::BodyStateRootDiff {
error: BlockErrorKind::Validation(consensus::ConsensusError::BodyStateRootDiff {
got,
expected: expected.state_root,
},
}),
})
}
}

View File

@ -1,4 +1,4 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use itertools::Itertools;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
@ -145,10 +145,11 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
let sealed_header = provider
.sealed_header(block_number)?
.ok_or(ProviderError::HeaderNotFound(block_number.into()))?;
return Err(StageError::Validation {
return Err(StageError::Block {
block: sealed_header,
error:
error: BlockErrorKind::Validation(
consensus::ConsensusError::TransactionSignerRecoveryError,
),
})
}
SenderRecoveryStageError::StageError(err) => return Err(err),

View File

@ -1,4 +1,4 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
@ -82,9 +82,12 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
let (block_number, header) = entry?;
td += header.difficulty;
self.consensus
.validate_header_with_total_difficulty(&header, td)
.map_err(|error| StageError::Validation { block: header.seal_slow(), error })?;
self.consensus.validate_header_with_total_difficulty(&header, td).map_err(|error| {
StageError::Block {
block: header.seal_slow(),
error: BlockErrorKind::Validation(error),
}
})?;
cursor_td.append(block_number, td.into())?;
}