fix: track invalid payload hashes (#2521)

This commit is contained in:
Matthias Seitz
2023-05-02 17:16:32 +02:00
committed by GitHub
parent ddf38f41f2
commit 949b3639c3
4 changed files with 129 additions and 24 deletions

1
Cargo.lock generated
View File

@ -4683,6 +4683,7 @@ dependencies = [
"reth-stages",
"reth-tasks",
"reth-tracing",
"schnellru",
"thiserror",
"tokio",
"tokio-stream",

View File

@ -27,6 +27,7 @@ futures = "0.3"
tracing = "0.1"
thiserror = "1.0"
metrics = "0.20.1"
schnellru = "0.2"
[dev-dependencies]
# reth

View File

@ -9,12 +9,16 @@ use reth_interfaces::{
Error,
};
use reth_payload_builder::{PayloadBuilderAttributes, PayloadBuilderHandle};
use reth_primitives::{listener::EventListeners, BlockNumber, Header, SealedBlock, H256};
use reth_primitives::{
listener::EventListeners, BlockNumber, Header, SealedBlock, SealedHeader, H256, U256,
};
use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum,
PayloadValidationError,
};
use reth_stages::{stages::FINISH, Pipeline};
use reth_tasks::TaskSpawner;
use schnellru::{ByLength, LruMap};
use std::{
pin::Pin,
sync::Arc,
@ -43,6 +47,9 @@ pub use pipeline_state::PipelineState;
mod event;
pub use event::BeaconConsensusEngineEvent;
/// The maximum number of invalid headers that can be tracked by the engine.
const MAX_INVALID_HEADERS: u32 = 512u32;
/// A _shareable_ beacon consensus frontend. Used to interact with the spawned beacon consensus
/// engine.
///
@ -164,6 +171,9 @@ where
payload_builder: PayloadBuilderHandle,
/// Listeners for engine events.
listeners: EventListeners<BeaconConsensusEngineEvent>,
/// Tracks the header of invalid payloads that were rejected by the engine because they're
/// invalid.
invalid_headers: InvalidHeaderCache,
/// Consensus engine metrics.
metrics: Metrics,
}
@ -227,6 +237,7 @@ where
continuous,
payload_builder,
listeners: EventListeners::default(),
invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
metrics: Metrics::default(),
};
@ -275,6 +286,37 @@ where
self.blockchain_tree.find_canonical_ancestor(parent_hash)
}
/// Loads the header for the given `block_number` from the database.
fn load_header(&self, block_number: u64) -> Result<Option<Header>, Error> {
Ok(self.db.view(|tx| tx.get::<tables::Headers>(block_number))??)
}
/// Checks if the given `head` points to an invalid header, which requires a specific response
/// to a forkchoice update.
fn check_invalid_ancestor(&mut self, head: H256) -> Option<PayloadStatus> {
// check if the head was previously marked as invalid
let (parent_hash, parent_number) = {
let header = self.invalid_headers.get(&head)?;
(header.parent_hash, header.number.saturating_sub(1))
};
let mut latest_valid_hash = parent_hash;
// Edge case: the `latestValid` field is the zero hash if the parent block is the terminal
// PoW block, which we need to identify by looking at the parent's block difficulty
if let Ok(Some(parent)) = self.load_header(parent_number) {
if parent.difficulty != U256::ZERO {
latest_valid_hash = H256::zero();
}
}
let status = PayloadStatus::from_status(PayloadStatusEnum::Invalid {
validation_error: PayloadValidationError::LinksToRejectedPayload.to_string(),
})
.with_latest_valid_hash(latest_valid_hash);
Some(status)
}
/// Called to resolve chain forks and ensure that the Execution layer is working with the latest
/// valid chain.
///
@ -314,8 +356,7 @@ where
if let Some(attrs) = attrs {
// get header for further validation
let header = self
.db
.view(|tx| tx.get::<tables::Headers>(head_block_number))??
.load_header(head_block_number)?
.expect("was canonicalized, so it exists");
return Ok(self.process_payload_attributes(attrs, header, state))
}
@ -323,27 +364,7 @@ where
PayloadStatus::new(PayloadStatusEnum::Valid, Some(state.head_block_hash))
}
Err(error) => {
warn!(target: "consensus::engine", ?error, ?state, "Error canonicalizing the head hash");
// If this is the first forkchoice received, start downloading from safe block
// hash.
let target = if is_first_forkchoice &&
!state.safe_block_hash.is_zero() &&
self.get_block_number(state.safe_block_hash)?.is_none()
{
PipelineTarget::Safe
} else {
PipelineTarget::Head
};
self.require_pipeline_run(target);
match error {
Error::Execution(error @ ExecutorError::BlockPreMerge { .. }) => {
PayloadStatus::from_status(PayloadStatusEnum::Invalid {
validation_error: error.to_string(),
})
.with_latest_valid_hash(H256::zero())
}
_ => PayloadStatus::from_status(PayloadStatusEnum::Syncing),
}
self.on_failed_canonical_forkchoice_update(&state, error, is_first_forkchoice)
}
}
} else {
@ -356,6 +377,50 @@ where
Ok(OnForkChoiceUpdated::valid(status))
}
/// Handler for a failed a forkchoice update due to a canonicalization error.
///
/// This will determine if the state's head is invalid, and if so, return immediately.
///
/// If the newest head is not invalid, then this will trigger a new pipeline run to sync the gap
///
/// See [Self::on_forkchoice_updated] and [BlockchainTreeEngine::make_canonical].
fn on_failed_canonical_forkchoice_update(
&mut self,
state: &ForkchoiceState,
error: Error,
is_first_forkchoice: bool,
) -> PayloadStatus {
warn!(target: "consensus::engine", ?error, ?state, "Error canonicalizing the head hash");
// check if the new head was previously invalidated, if so then we deem this FCU
// as invalid
if let Some(invalid_ancestor) = self.check_invalid_ancestor(state.head_block_hash) {
debug!(target: "consensus::engine", head=?state.head_block_hash, "Head was previously marked as invalid");
return invalid_ancestor
}
// If this is the first forkchoice received, start downloading from safe block
// hash, if we have that block.
let target = if is_first_forkchoice &&
!state.safe_block_hash.is_zero() &&
self.get_block_number(state.safe_block_hash).ok().flatten().is_none()
{
PipelineTarget::Safe
} else {
PipelineTarget::Head
};
self.require_pipeline_run(target);
match error {
Error::Execution(error @ ExecutorError::BlockPreMerge { .. }) => {
PayloadStatus::from_status(PayloadStatusEnum::Invalid {
validation_error: error.to_string(),
})
.with_latest_valid_hash(H256::zero())
}
_ => PayloadStatus::from_status(PayloadStatusEnum::Syncing),
}
}
/// Validates the payload attributes with respect to the header and fork choice state.
fn process_payload_attributes(
&self,
@ -426,6 +491,8 @@ where
}
};
let header = block.header.clone();
let status = if self.is_pipeline_idle() {
match self.blockchain_tree.insert_block_without_senders(block) {
Ok(status) => {
@ -451,6 +518,9 @@ where
PayloadStatus::new(status, latest_valid_hash)
}
Err(error) => {
// payload is deemed invalid, insert it into the cache
self.invalid_headers.insert(header);
let latest_valid_hash =
self.latest_valid_hash_for_invalid_payload(parent_hash, Some(&error));
let status = PayloadStatusEnum::Invalid { validation_error: error.to_string() };
@ -458,11 +528,13 @@ where
}
}
} else if let Err(error) = self.blockchain_tree.buffer_block_without_sender(block) {
// received a new payload while we're still syncing to the target
let latest_valid_hash =
self.latest_valid_hash_for_invalid_payload(parent_hash, Some(&error));
let status = PayloadStatusEnum::Invalid { validation_error: error.to_string() };
PayloadStatus::new(status, latest_valid_hash)
} else {
// successfully buffered the block
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
};
trace!(target: "consensus::engine", ?block_hash, block_number, ?status, "Returning payload status");
@ -692,6 +764,29 @@ enum PipelineTarget {
Safe,
}
/// Keeps track of invalid headerst.
struct InvalidHeaderCache {
headers: LruMap<H256, Header>,
}
impl InvalidHeaderCache {
fn new(max_length: u32) -> Self {
Self { headers: LruMap::new(ByLength::new(max_length)) }
}
/// Returns the header if it exists in the cache.
fn get(&mut self, hash: &H256) -> Option<&Header> {
self.headers.get(hash).map(|h| &*h)
}
/// Inserts a new header into the map.
fn insert(&mut self, header: SealedHeader) {
let hash = header.hash;
let header = header.unseal();
self.headers.insert(hash, header);
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -385,6 +385,14 @@ impl PayloadStatusEnum {
}
}
/// Various validation errors
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
pub enum PayloadValidationError {
/// Thrown when a forkchoice update's head links to a previously rejected payload.
#[error("links to previously rejected block")]
LinksToRejectedPayload,
}
#[cfg(test)]
mod tests {
use super::*;