fix(engine): do not ignore provider errors (#8519)

This commit is contained in:
Roman Krasiuk
2024-05-31 09:17:44 +02:00
committed by GitHub
parent daad63827a
commit 21613bb58d
3 changed files with 152 additions and 147 deletions

1
Cargo.lock generated
View File

@ -6302,6 +6302,7 @@ version = "0.2.0-beta.7"
dependencies = [
"assert_matches",
"futures",
"itertools 0.12.1",
"metrics",
"reth-blockchain-tree",
"reth-blockchain-tree-api",

View File

@ -43,6 +43,7 @@ metrics.workspace = true
tracing.workspace = true
thiserror.workspace = true
schnellru.workspace = true
itertools.workspace = true
[dev-dependencies]
# reth

View File

@ -1,4 +1,5 @@
use futures::{stream::BoxStream, Future, StreamExt};
use itertools::Either;
use reth_blockchain_tree_api::{
error::{BlockchainTreeError, CanonicalError, InsertBlockError, InsertBlockErrorKind},
BlockStatus, BlockValidationKind, BlockchainTreeEngine, CanonicalOutcome, InsertPayloadOk,
@ -333,6 +334,12 @@ where
})
}
/// Set the next blockchain tree action.
fn set_blockchain_tree_action(&mut self, action: BlockchainTreeAction<EngineT>) {
let previous_action = self.blockchain_tree_action.replace(action);
debug_assert!(previous_action.is_none(), "Pre-existing action found");
}
/// Pre-validate forkchoice update and check whether it can be processed.
///
/// This method returns the update outcome if validation fails or
@ -340,23 +347,23 @@ where
fn pre_validate_forkchoice_update(
&mut self,
state: ForkchoiceState,
) -> Option<OnForkChoiceUpdated> {
) -> ProviderResult<Option<OnForkChoiceUpdated>> {
if state.head_block_hash.is_zero() {
return Some(OnForkChoiceUpdated::invalid_state())
return Ok(Some(OnForkChoiceUpdated::invalid_state()))
}
// check if the new head hash is connected to any ancestor that we previously marked as
// invalid
let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu) {
return Some(OnForkChoiceUpdated::with_invalid(status))
if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? {
return Ok(Some(OnForkChoiceUpdated::with_invalid(status)))
}
if self.sync.is_pipeline_active() {
// We can only process new forkchoice updates if the pipeline is idle, since it requires
// exclusive access to the database
trace!(target: "consensus::engine", "Pipeline is syncing, skipping forkchoice update");
return Some(OnForkChoiceUpdated::syncing())
return Ok(Some(OnForkChoiceUpdated::syncing()))
}
if let Some(hook) = self.hooks.active_db_write_hook() {
@ -371,10 +378,10 @@ where
"Hook is in progress, skipping forkchoice update. \
This may affect the performance of your node as a validator."
);
return Some(OnForkChoiceUpdated::syncing())
return Ok(Some(OnForkChoiceUpdated::syncing()))
}
None
Ok(None)
}
/// Process the result of attempting to make forkchoice state head hash canonical.
@ -435,7 +442,7 @@ where
Err(err)
} else {
Ok(OnForkChoiceUpdated::valid(
self.on_failed_canonical_forkchoice_update(&state, err),
self.on_failed_canonical_forkchoice_update(&state, err)?,
))
}
}
@ -500,16 +507,21 @@ where
self.blockchain.on_forkchoice_update_received(&state);
trace!(target: "consensus::engine", ?state, "Received new forkchoice state update");
if let Some(on_updated) = self.pre_validate_forkchoice_update(state) {
// Pre-validate forkchoice state update and return if it's invalid
// or cannot be processed at the moment.
self.on_forkchoice_updated_status(state, on_updated, tx);
} else {
let previous_action = self
.blockchain_tree_action
.replace(BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx });
debug_assert!(previous_action.is_none(), "Pre-existing action found");
}
match self.pre_validate_forkchoice_update(state) {
Ok(Some(on_updated)) => {
// Pre-validate forkchoice state update and return if it's invalid
// or cannot be processed at the moment.
self.on_forkchoice_updated_status(state, on_updated, tx);
}
Ok(None) => {
self.set_blockchain_tree_action(
BlockchainTreeAction::MakeForkchoiceHeadCanonical { state, attrs, tx },
);
}
Err(error) => {
let _ = tx.send(Err(error.into()));
}
};
}
/// Called after the forkchoice update status has been resolved.
@ -713,47 +725,35 @@ where
fn latest_valid_hash_for_invalid_payload(
&mut self,
parent_hash: B256,
insert_err: Option<&InsertBlockErrorKind>,
) -> Option<B256> {
// check pre merge block error
if insert_err.map(|err| err.is_block_pre_merge()).unwrap_or_default() {
return Some(B256::ZERO)
}
) -> ProviderResult<Option<B256>> {
// Check if parent exists in side chain or in canonical chain.
// TODO: handle find_block_by_hash errors.
if matches!(self.blockchain.find_block_by_hash(parent_hash, BlockSource::Any), Ok(Some(_)))
{
Some(parent_hash)
} else {
// iterate over ancestors in the invalid cache
// until we encounter the first valid ancestor
let mut current_hash = parent_hash;
let mut current_header = self.invalid_headers.get(&current_hash);
while let Some(header) = current_header {
current_hash = header.parent_hash;
current_header = self.invalid_headers.get(&current_hash);
// If current_header is None, then the current_hash does not have an invalid
// ancestor in the cache, check its presence in blockchain tree
if current_header.is_none() &&
matches!(
// TODO: handle find_block_by_hash errors.
self.blockchain.find_block_by_hash(current_hash, BlockSource::Any),
Ok(Some(_))
)
{
return Some(current_hash)
}
}
None
if self.blockchain.find_block_by_hash(parent_hash, BlockSource::Any)?.is_some() {
return Ok(Some(parent_hash))
}
// iterate over ancestors in the invalid cache
// until we encounter the first valid ancestor
let mut current_hash = parent_hash;
let mut current_header = self.invalid_headers.get(&current_hash);
while let Some(header) = current_header {
current_hash = header.parent_hash;
current_header = self.invalid_headers.get(&current_hash);
// If current_header is None, then the current_hash does not have an invalid
// ancestor in the cache, check its presence in blockchain tree
if current_header.is_none() &&
self.blockchain.find_block_by_hash(current_hash, BlockSource::Any)?.is_some()
{
return Ok(Some(current_hash))
}
}
Ok(None)
}
/// Prepares the invalid payload response for the given hash, checking the
/// database for the parent hash and populating the payload status with the latest valid hash
/// according to the engine api spec.
fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> PayloadStatus {
fn prepare_invalid_response(&mut self, mut parent_hash: B256) -> ProviderResult<PayloadStatus> {
// Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
// PoW block, which we need to identify by looking at the parent's block difficulty
if let Ok(Some(parent)) = self.blockchain.header_by_hash_or_number(parent_hash.into()) {
@ -762,12 +762,11 @@ where
}
}
let valid_parent_hash =
self.latest_valid_hash_for_invalid_payload(parent_hash, None).unwrap_or_default();
PayloadStatus::from_status(PayloadStatusEnum::Invalid {
let valid_parent_hash = self.latest_valid_hash_for_invalid_payload(parent_hash)?;
Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
})
.with_latest_valid_hash(valid_parent_hash)
.with_latest_valid_hash(valid_parent_hash.unwrap_or_default()))
}
/// Checks if the given `check` hash points to an invalid header, inserting the given `head`
@ -779,27 +778,27 @@ where
&mut self,
check: B256,
head: B256,
) -> Option<PayloadStatus> {
) -> ProviderResult<Option<PayloadStatus>> {
// check if the check hash was previously marked as invalid
let header = self.invalid_headers.get(&check)?;
let Some(header) = self.invalid_headers.get(&check) else { return Ok(None) };
// populate the latest valid hash field
let status = self.prepare_invalid_response(header.parent_hash);
let status = self.prepare_invalid_response(header.parent_hash)?;
// insert the head block into the invalid header cache
self.invalid_headers.insert_with_invalid_ancestor(head, header);
Some(status)
Ok(Some(status))
}
/// Checks if the given `head` points to an invalid header, which requires a specific response
/// to a forkchoice update.
fn check_invalid_ancestor(&mut self, head: B256) -> Option<PayloadStatus> {
fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult<Option<PayloadStatus>> {
// check if the head was previously marked as invalid
let header = self.invalid_headers.get(&head)?;
let Some(header) = self.invalid_headers.get(&head) else { return Ok(None) };
// populate the latest valid hash field
Some(self.prepare_invalid_response(header.parent_hash))
Ok(Some(self.prepare_invalid_response(header.parent_hash)?))
}
/// Record latency metrics for one call to make a block canonical
@ -964,24 +963,24 @@ where
&mut self,
state: &ForkchoiceState,
error: CanonicalError,
) -> PayloadStatus {
) -> ProviderResult<PayloadStatus> {
debug_assert!(self.sync.is_pipeline_idle(), "pipeline must be idle");
// check if the new head was previously invalidated, if so then we deem this FCU
// as invalid
if let Some(invalid_ancestor) = self.check_invalid_ancestor(state.head_block_hash) {
if let Some(invalid_ancestor) = self.check_invalid_ancestor(state.head_block_hash)? {
warn!(target: "consensus::engine", %error, ?state, ?invalid_ancestor, head=?state.head_block_hash, "Failed to canonicalize the head hash, head is also considered invalid");
debug!(target: "consensus::engine", head=?state.head_block_hash, current_error=%error, "Head was previously marked as invalid");
return invalid_ancestor
return Ok(invalid_ancestor)
}
match &error {
CanonicalError::Validation(BlockValidationError::BlockPreMerge { .. }) => {
warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash");
return PayloadStatus::from_status(PayloadStatusEnum::Invalid {
return Ok(PayloadStatus::from_status(PayloadStatusEnum::Invalid {
validation_error: error.to_string(),
})
.with_latest_valid_hash(B256::ZERO)
.with_latest_valid_hash(B256::ZERO))
}
CanonicalError::BlockchainTree(BlockchainTreeError::BlockHashNotFoundInChain {
..
@ -992,7 +991,7 @@ where
}
CanonicalError::OptimisticTargetRevert(block_number) => {
self.sync.set_pipeline_sync_target(PipelineTarget::Unwind(*block_number));
return PayloadStatus::from_status(PayloadStatusEnum::Syncing)
return Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
}
_ => {
warn!(target: "consensus::engine", %error, ?state, "Failed to canonicalize the head hash");
@ -1033,7 +1032,7 @@ where
}
debug!(target: "consensus::engine", %target, "Syncing to new target");
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
Ok(PayloadStatus::from_status(PayloadStatusEnum::Syncing))
}
/// Return the parent hash of the lowest buffered ancestor for the requested block, if there
@ -1066,71 +1065,40 @@ where
&mut self,
payload: ExecutionPayload,
cancun_fields: Option<CancunPayloadFields>,
tx: oneshot::Sender<Result<PayloadStatus, BeaconOnNewPayloadError>>,
) {
) -> Result<Either<PayloadStatus, SealedBlock>, BeaconOnNewPayloadError> {
self.metrics.new_payload_messages.increment(1);
let block = match self.ensure_well_formed_payload(payload, cancun_fields) {
Ok(block) => block,
Err(status) => {
let _ = tx.send(Ok(status));
return
}
};
let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block.hash());
if lowest_buffered_ancestor == block.hash() {
lowest_buffered_ancestor = block.parent_hash;
}
// now check the block itself
if let Some(status) =
self.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.hash())
{
let _ = tx.send(Ok(status));
return
}
let previous_action = self
.blockchain_tree_action
.replace(BlockchainTreeAction::InsertNewPayload { block, tx });
debug_assert!(previous_action.is_none(), "Pre-existing action found");
}
/// Ensures that the given payload does not violate any consensus rules that concern the block's
/// layout, like:
/// - missing or invalid base fee
/// - invalid extra data
/// - invalid transactions
/// - incorrect hash
/// - the versioned hashes passed with the payload do not exactly match transaction versioned
/// hashes
/// - the block does not contain blob transactions if it is pre-cancun
///
/// This validates the following engine API rule:
///
/// 3. Given the expected array of blob versioned hashes client software **MUST** run its
/// validation by taking the following steps:
///
/// 1. Obtain the actual array by concatenating blob versioned hashes lists
/// (`tx.blob_versioned_hashes`) of each [blob
/// transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
/// in the payload, respecting the order of inclusion. If the payload has no blob
/// transactions the expected array **MUST** be `[]`.
///
/// 2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage | null}`
/// if the expected and the actual arrays don't match.
///
/// This validation **MUST** be instantly run in all cases even during active sync process.
fn ensure_well_formed_payload(
&mut self,
payload: ExecutionPayload,
cancun_fields: Option<CancunPayloadFields>,
) -> Result<SealedBlock, PayloadStatus> {
// Ensures that the given payload does not violate any consensus rules that concern the
// block's layout, like:
// - missing or invalid base fee
// - invalid extra data
// - invalid transactions
// - incorrect hash
// - the versioned hashes passed with the payload do not exactly match transaction
// versioned hashes
// - the block does not contain blob transactions if it is pre-cancun
//
// This validates the following engine API rule:
//
// 3. Given the expected array of blob versioned hashes client software **MUST** run its
// validation by taking the following steps:
//
// 1. Obtain the actual array by concatenating blob versioned hashes lists
// (`tx.blob_versioned_hashes`) of each [blob
// transaction](https://eips.ethereum.org/EIPS/eip-4844#new-transaction-type) included
// in the payload, respecting the order of inclusion. If the payload has no blob
// transactions the expected array **MUST** be `[]`.
//
// 2. Return `{status: INVALID, latestValidHash: null, validationError: errorMessage |
// null}` if the expected and the actual arrays don't match.
//
// This validation **MUST** be instantly run in all cases even during active sync process.
let parent_hash = payload.parent_hash();
match self.payload_validator.ensure_well_formed_payload(payload, cancun_fields.into()) {
Ok(block) => Ok(block),
let block = match self
.payload_validator
.ensure_well_formed_payload(payload, cancun_fields.into())
{
Ok(block) => block,
Err(error) => {
error!(target: "consensus::engine", %error, "Invalid payload");
// we need to convert the error to a payload status (response to the CL)
@ -1142,12 +1110,28 @@ where
// > `latestValidHash: null` if the expected and the actual arrays don't match (<https://github.com/ethereum/execution-apis/blob/fe8e13c288c592ec154ce25c534e26cb7ce0530d/src/engine/cancun.md?plain=1#L103>)
None
} else {
self.latest_valid_hash_for_invalid_payload(parent_hash, None)
self.latest_valid_hash_for_invalid_payload(parent_hash)
.map_err(BeaconOnNewPayloadError::internal)?
};
let status = PayloadStatusEnum::from(error);
Err(PayloadStatus::new(status, latest_valid_hash))
return Ok(Either::Left(PayloadStatus::new(status, latest_valid_hash)))
}
};
let mut lowest_buffered_ancestor = self.lowest_buffered_ancestor_or(block.hash());
if lowest_buffered_ancestor == block.hash() {
lowest_buffered_ancestor = block.parent_hash;
}
// now check the block itself
if let Some(status) = self
.check_invalid_ancestor_with_head(lowest_buffered_ancestor, block.hash())
.map_err(BeaconOnNewPayloadError::internal)?
{
Ok(Either::Left(status))
} else {
Ok(Either::Right(block))
}
}
@ -1239,10 +1223,10 @@ where
let elapsed = start.elapsed();
let mut latest_valid_hash = None;
let block = Arc::new(block);
let status = match status {
InsertPayloadOk::Inserted(BlockStatus::Valid(attachment)) => {
latest_valid_hash = Some(block_hash);
let block = Arc::new(block);
let event = if attachment.is_canonical() {
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed)
} else {
@ -1259,7 +1243,9 @@ where
InsertPayloadOk::AlreadySeen(BlockStatus::Disconnected { .. }) => {
// check if the block's parent is already marked as invalid
if let Some(status) =
self.check_invalid_ancestor_with_head(block.parent_hash, block.hash())
self.check_invalid_ancestor_with_head(block.parent_hash, block.hash()).map_err(
|error| InsertBlockError::new(block, InsertBlockErrorKind::Provider(error)),
)?
{
return Ok(status)
}
@ -1397,12 +1383,14 @@ where
EngineSyncEvent::FetchedFullBlock(block) => {
trace!(target: "consensus::engine", hash=?block.hash(), number=%block.number, "Downloaded full block");
// Insert block only if the block's parent is not marked as invalid
if self.check_invalid_ancestor_with_head(block.parent_hash, block.hash()).is_none()
if self
.check_invalid_ancestor_with_head(block.parent_hash, block.hash())
.map_err(|error| BeaconConsensusEngineError::Common(error.into()))?
.is_none()
{
let previous_action = self
.blockchain_tree_action
.replace(BlockchainTreeAction::InsertDownloadedPayload { block });
debug_assert!(previous_action.is_none(), "Pre-existing action found");
self.set_blockchain_tree_action(
BlockchainTreeAction::InsertDownloadedPayload { block },
);
}
EngineEventOutcome::Processed
}
@ -1489,7 +1477,7 @@ where
.check_invalid_ancestor_with_head(
lowest_buffered_ancestor,
sync_target_state.head_block_hash,
)
)?
.is_some()
{
warn!(
@ -1657,8 +1645,12 @@ where
// invalid headers cache and `Ok` with [PayloadStatusEnum::Invalid] is
// returned.
warn!(target: "consensus::engine", invalid_hash=?block.hash(), invalid_number=?block.number, %error, "Invalid block error on new payload");
let latest_valid_hash = self
.latest_valid_hash_for_invalid_payload(block.parent_hash, Some(&error));
let latest_valid_hash = if error.is_block_pre_merge() {
// zero hash must be returned if block is pre-merge
Some(B256::ZERO)
} else {
self.latest_valid_hash_for_invalid_payload(block.parent_hash)?
};
// keep track of the invalid header
self.invalid_headers.insert(block.header);
PayloadStatus::new(
@ -1673,14 +1665,13 @@ where
// if we're currently syncing and the inserted block is the targeted
// FCU head block, we can try to make it canonical.
if block_hash == target.head_block_hash {
let previous_action = self.blockchain_tree_action.replace(
self.set_blockchain_tree_action(
BlockchainTreeAction::MakeNewPayloadCanonical {
payload_num_hash: block_num_hash,
status,
tx,
},
);
debug_assert!(previous_action.is_none(), "Pre-existing action found");
return Ok(EngineEventOutcome::Processed)
}
}
@ -1847,7 +1838,19 @@ where
this.on_forkchoice_updated(state, payload_attrs, tx);
}
BeaconEngineMessage::NewPayload { payload, cancun_fields, tx } => {
this.on_new_payload(payload, cancun_fields, tx);
match this.on_new_payload(payload, cancun_fields) {
Ok(Either::Right(block)) => {
this.set_blockchain_tree_action(
BlockchainTreeAction::InsertNewPayload { block, tx },
);
}
Ok(Either::Left(status)) => {
let _ = tx.send(Ok(status));
}
Err(error) => {
let _ = tx.send(Err(error));
}
}
}
BeaconEngineMessage::TransitionConfigurationExchanged => {
this.blockchain.on_transition_configuration_exchanged();