feat: use buffered ancestor to determine sync target (#2802)

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
Dan Cline
2023-05-31 14:06:16 -04:00
committed by GitHub
parent 9cd32e516a
commit 1641f555f2
16 changed files with 196 additions and 63 deletions

View File

@ -185,8 +185,8 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
}
// check if block is disconnected
if self.buffered_blocks.block(block).is_some() {
return Ok(Some(BlockStatus::Disconnected))
if let Some(block) = self.buffered_blocks.block(block) {
return Ok(Some(BlockStatus::Disconnected { missing_parent: block.parent_num_hash() }))
}
Ok(None)
@ -323,8 +323,20 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
}
// insert block inside unconnected block buffer. Delaying its execution.
self.buffered_blocks.insert_block(block);
Ok(BlockStatus::Disconnected)
self.buffered_blocks.insert_block(block.clone());
// find the lowest ancestor of the block in the buffer to return as the missing parent
// this shouldn't return None because that only happens if the block was evicted, which
// shouldn't happen right after insertion
let lowest_ancestor =
self.buffered_blocks.lowest_ancestor(&block.hash).ok_or_else(|| {
InsertBlockError::tree_error(
BlockchainTreeError::BlockBufferingFailed { block_hash: block.hash },
block.block,
)
})?;
Ok(BlockStatus::Disconnected { missing_parent: lowest_ancestor.parent_num_hash() })
}
/// This tries to append the given block to the canonical chain.
@ -570,10 +582,7 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTree<DB, C, EF>
}
/// Gets the lowest ancestor for the given block in the block buffer.
pub fn lowest_buffered_ancestor(
&mut self,
hash: &BlockHash,
) -> Option<&SealedBlockWithSenders> {
pub fn lowest_buffered_ancestor(&self, hash: &BlockHash) -> Option<&SealedBlockWithSenders> {
self.buffered_blocks.lowest_ancestor(hash)
}
@ -1196,7 +1205,10 @@ mod tests {
tree.finalize_block(10);
// block 2 parent is not known, block2 is buffered.
assert_eq!(tree.insert_block(block2.clone()).unwrap(), BlockStatus::Disconnected);
assert_eq!(
tree.insert_block(block2.clone()).unwrap(),
BlockStatus::Disconnected { missing_parent: block2.parent_num_hash() }
);
// Buffered block: [block2]
// Trie state:
@ -1213,7 +1225,7 @@ mod tests {
assert_eq!(
tree.is_block_known(block2.num_hash()).unwrap(),
Some(BlockStatus::Disconnected)
Some(BlockStatus::Disconnected { missing_parent: block2.parent_num_hash() })
);
// check if random block is known
@ -1477,7 +1489,11 @@ mod tests {
block2b.hash = H256([0x99; 32]);
block2b.parent_hash = H256([0x88; 32]);
assert_eq!(tree.insert_block(block2b.clone()).unwrap(), BlockStatus::Disconnected);
assert_eq!(
tree.insert_block(block2b.clone()).unwrap(),
BlockStatus::Disconnected { missing_parent: block2b.parent_num_hash() }
);
TreeTester::default()
.with_buffered_blocks(BTreeMap::from([(
block2b.number,

View File

@ -108,6 +108,11 @@ impl<DB: Database, C: Consensus, EF: ExecutorFactory> BlockchainTreeViewer
None
}
fn lowest_buffered_ancestor(&self, hash: BlockHash) -> Option<SealedBlockWithSenders> {
trace!(target: "blockchain_tree", ?hash, "Returning lowest buffered ancestor");
self.tree.read().lowest_buffered_ancestor(&hash).cloned()
}
fn canonical_tip(&self) -> BlockNumHash {
trace!(target: "blockchain_tree", "Returning canonical tip");
self.tree.read().block_indices().canonical_tip()

View File

@ -41,6 +41,16 @@ impl OnForkChoiceUpdated {
fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
}
}
/// Creates a new instance of `OnForkChoiceUpdated` with the given payload status, if the
/// forkchoice update failed due to an invalid payload.
pub(crate) fn with_invalid(status: PayloadStatus) -> Self {
Self {
is_valid_update: false,
fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))),
}
}
/// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update failed because the
/// given state is considered invalid
pub(crate) fn invalid_state() -> Self {

View File

@ -27,7 +27,7 @@ use reth_rpc_types::engine::{
ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum,
PayloadValidationError,
};
use reth_stages::Pipeline;
use reth_stages::{ControlFlow, Pipeline};
use reth_tasks::TaskSpawner;
use schnellru::{ByLength, LruMap};
use std::{
@ -386,6 +386,12 @@ where
return Ok(OnForkChoiceUpdated::invalid_state())
}
let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash);
if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu) {
return Ok(OnForkChoiceUpdated::with_invalid(status))
}
// TODO: check PoW / EIP-3675 terminal block conditions for the fork choice head
// TODO: ensure validity of the payload (is this satisfied already?)
@ -554,15 +560,40 @@ where
} else {
state.head_block_hash
};
self.sync.set_pipeline_sync_target(target);
// we need to first check the buffer for the head and its ancestors
let lowest_unknown_hash = self.lowest_buffered_ancestor_or(target);
trace!(target: "consensus::engine", request=?lowest_unknown_hash, "Triggering pipeline with target instead of downloading");
self.sync.set_pipeline_sync_target(lowest_unknown_hash);
} else {
// trigger a full block download for the _missing_ new head
self.sync.download_full_block(state.head_block_hash);
// we need to first check the buffer for the head and its ancestors
let lowest_unknown_hash = self.lowest_buffered_ancestor_or(state.head_block_hash);
trace!(target: "consensus::engine", request=?lowest_unknown_hash, "Triggering full block download for missing ancestors of the new head");
// trigger a full block download for missing hash, or the parent of its lowest buffered
// ancestor
self.sync.download_full_block(lowest_unknown_hash);
}
PayloadStatus::from_status(PayloadStatusEnum::Syncing)
}
/// Return the parent hash of the lowest buffered ancestor for the requested block, if there
/// are any buffered ancestors. If there are no buffered ancestors, and the block itself does
/// not exist in the buffer, this returns the hash that is passed in.
///
/// Returns the parent hash of the block itself if the block is buffered and has no other
/// buffered ancestors.
fn lowest_buffered_ancestor_or(&self, hash: H256) -> H256 {
self.blockchain
.lowest_buffered_ancestor(hash)
.map(|block| block.parent_hash)
.unwrap_or_else(|| hash)
}
/// Validates the payload attributes with respect to the header and fork choice state.
///
/// Note: At this point, the fork choice update is considered to be VALID, however, we can still
@ -636,6 +667,12 @@ where
};
let block_hash = block.hash();
// TODO: see other notes about checking entire invalid parent chain
// check that the payload parent is not invalid
if let Some(status) = self.check_invalid_ancestor(block.parent_hash) {
return Ok(status)
}
let res = if self.sync.is_pipeline_idle() {
// we can only insert new payloads if the pipeline is _not_ running, because it holds
// exclusive access to the database
@ -736,7 +773,7 @@ where
self.listeners.notify(BeaconConsensusEngineEvent::ForkBlockAdded(block));
PayloadStatusEnum::Accepted
}
BlockStatus::Disconnected => PayloadStatusEnum::Syncing,
BlockStatus::Disconnected { .. } => PayloadStatusEnum::Syncing,
};
Ok(PayloadStatus::new(status, latest_valid_hash))
}
@ -808,6 +845,7 @@ where
) -> Option<Result<(), BeaconConsensusEngineError>> {
match ev {
EngineSyncEvent::FetchedFullBlock(block) => {
trace!(target: "consensus::engine", hash=?block.hash, "Fetched full block");
// it is guaranteed that the pipeline is not active at this point.
// TODO(mattsse): better error handling and start closing the gap if there's any by
@ -836,6 +874,7 @@ where
return Some(Err(BeaconConsensusEngineError::PipelineChannelClosed))
}
EngineSyncEvent::PipelineFinished { result, reached_max_block } => {
trace!(target: "consensus::engine", ?result, ?reached_max_block, "Pipeline finished");
match result {
Ok(ctrl) => {
if reached_max_block {
@ -854,9 +893,11 @@ where
}
};
if ctrl.is_unwind() {
// Attempt to sync to the head block after unwind.
self.sync.set_pipeline_sync_target(current_state.head_block_hash);
if let ControlFlow::Unwind { bad_block, .. } = ctrl {
trace!(target: "consensus::engine", hash=?bad_block.hash, "Bad block detected in unwind");
// update the `invalid_headers` cache with the new invalid headers
self.invalid_headers.insert(bad_block);
return None
}
@ -881,14 +922,39 @@ where
self.blockchain.set_canonical_head(max_header);
}
// Update the state and hashes of the blockchain tree if possible.
match self.restore_tree_if_possible(current_state) {
Ok(_) => self.sync_state_updater.update_sync_state(SyncState::Idle),
Err(error) => {
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree");
return Some(Err(error.into()))
}
};
// TODO: figure out how to make this less complex:
// restore_tree_if_possible will run the pipeline if the current_state head
// hash is missing. This can arise if we buffer the forkchoice head, and if
// the head is an ancestor of an invalid block. In this case we won't have
// the head hash in the database, so we would set the pipeline sync target
// to a known-invalid head.
//
// This is why we check the invalid header cache here.
// This might be incorrect, because we need to check exactly the invalid
// block here, which is not necessarily the head hash! we might need to
// insert all ancestors into the invalid block cache.
//
// We would need to accompany this change with a change to the invalid
// header cache, because currently we return the parent of the checked
// invalid header as the `latestValidHash`, which could be incorrect if
// there are other parents in the invalid header cache.
//
// Here, we check if the lowest buffered ancestor parent is invalid (if it
// exists), or if the head is invalid. ideally we want "is a descendant of
// this block invalid"
let lowest_buffered_ancestor =
self.lowest_buffered_ancestor_or(current_state.head_block_hash);
if self.invalid_headers.get(&lowest_buffered_ancestor).is_none() {
// Update the state and hashes of the blockchain tree if possible.
match self.restore_tree_if_possible(current_state) {
Ok(_) => self.sync_state_updater.update_sync_state(SyncState::Idle),
Err(error) => {
error!(target: "consensus::engine", ?error, "Error restoring blockchain tree");
return Some(Err(error.into()))
}
};
}
}
// Any pipeline error at this point is fatal.
Err(error) => return Some(Err(error.into())),

View File

@ -25,6 +25,9 @@ pub enum BlockchainTreeError {
BlockNumberNotFoundInChain { block_number: BlockNumber },
#[error("Block hash {block_hash} not found in blockchain tree chain")]
BlockHashNotFoundInChain { block_hash: BlockHash },
// Thrown if the block failed to buffer
#[error("Block with hash {block_hash:?} failed to buffer")]
BlockBufferingFailed { block_hash: BlockHash },
}
/// Error thrown when inserting a block failed because the block is considered invalid.

View File

@ -124,7 +124,10 @@ pub enum BlockStatus {
/// (It is side chain) or hasn't been fully validated but ancestors of a payload are known.
Accepted,
/// If blocks is not connected to canonical chain.
Disconnected,
Disconnected {
/// The lowest parent block that is not connected to the canonical chain.
missing_parent: BlockNumHash,
},
}
/// Allows read only functionality on the blockchain tree.
@ -168,6 +171,12 @@ pub trait BlockchainTreeViewer: Send + Sync {
/// Note: this could be the given `parent_hash` if it's already canonical.
fn find_canonical_ancestor(&self, parent_hash: BlockHash) -> Option<BlockHash>;
/// Given the hash of a block, this checks the buffered blocks for the lowest ancestor in the
/// buffer.
///
/// If there is a buffered block with the given hash, this returns the block itself.
fn lowest_buffered_ancestor(&self, hash: BlockHash) -> Option<SealedBlockWithSenders>;
/// Return BlockchainTree best known canonical chain tip (BlockHash, BlockNumber)
fn canonical_tip(&self) -> BlockNumHash;

View File

@ -5,6 +5,7 @@ use thiserror::Error;
#[allow(missing_docs)]
#[derive(Error, Debug, Clone, PartialEq, Eq)]
pub enum BlockExecutionError {
// === validation errors ===
#[error("EVM reported invalid transaction ({hash:?}): {message}")]
EVM { hash: H256, message: String },
#[error("Failed to recover sender for transaction")]
@ -20,8 +21,22 @@ pub enum BlockExecutionError {
},
#[error("Block gas used {got} is different from expected gas used {expected}.")]
BlockGasUsed { got: u64, expected: u64 },
#[error("Block {hash:?} is pre merge")]
BlockPreMerge { hash: H256 },
#[error("Missing total difficulty")]
MissingTotalDifficulty { hash: H256 },
// === misc provider error ===
#[error("Provider error")]
ProviderError,
// === transaction errors ===
#[error("Transaction error on revert: {inner:?}")]
CanonicalRevert { inner: String },
#[error("Transaction error on commit: {inner:?}")]
CanonicalCommit { inner: String },
// === tree errors ===
// TODO(mattsse): move this to tree error
#[error("Block hash {block_hash} not found in blockchain tree chain")]
BlockHashNotFoundInChain { block_hash: BlockHash },
@ -29,14 +44,6 @@ pub enum BlockExecutionError {
"Appending chain on fork (other_chain_fork:?) is not possible as the tip is {chain_tip:?}"
)]
AppendChainDoesntConnect { chain_tip: BlockNumHash, other_chain_fork: BlockNumHash },
#[error("Transaction error on revert: {inner:?}")]
CanonicalRevert { inner: String },
#[error("Transaction error on commit: {inner:?}")]
CanonicalCommit { inner: String },
#[error("Block {hash:?} is pre merge")]
BlockPreMerge { hash: H256 },
#[error("Missing total difficulty")]
MissingTotalDifficulty { hash: H256 },
/// Only used for TestExecutor
///

View File

@ -113,7 +113,7 @@ impl Signature {
})
}
/// Recover signature from hash.
/// Recover signer address from message hash.
pub fn recover_signer(&self, hash: H256) -> Option<Address> {
let mut sig: [u8; 65] = [0; 65];

View File

@ -10,12 +10,17 @@ pub(crate) mod secp256k1 {
};
use revm_primitives::{B256, U256};
/// secp256k1 signer recovery
/// Recovers the address of the sender using secp256k1 pubkey recovery.
///
/// Converts the public key into an ethereum address by hashing the public key with keccak256.
pub fn recover_signer(sig: &[u8; 65], msg: &[u8; 32]) -> Result<Address, Error> {
let sig =
RecoverableSignature::from_compact(&sig[0..64], RecoveryId::from_i32(sig[64] as i32)?)?;
let public = SECP256K1.recover_ecdsa(&Message::from_slice(&msg[..32])?, &sig)?;
// strip out the first byte because that should be the SECP256K1_TAG_PUBKEY_UNCOMPRESSED
// tag returned by libsecp's uncompressed pubkey serialization
let hash = keccak256(&public.serialize_uncompressed()[1..]);
Ok(Address::from_slice(&hash[12..]))
}

View File

@ -3,7 +3,7 @@ use reth_interfaces::{
consensus, db::DatabaseError as DbError, executor, p2p::error::DownloadError,
provider::ProviderError,
};
use reth_primitives::BlockNumber;
use reth_primitives::SealedHeader;
use reth_provider::TransactionError;
use thiserror::Error;
use tokio::sync::mpsc::error::SendError;
@ -12,10 +12,10 @@ use tokio::sync::mpsc::error::SendError;
#[derive(Error, Debug)]
pub enum StageError {
/// The stage encountered a state validation error.
#[error("Stage encountered a validation error in block {block}: {error}.")]
#[error("Stage encountered a validation error in block {number}: {error}.", number = block.number)]
Validation {
/// The block that failed validation.
block: BlockNumber,
block: SealedHeader,
/// The underlying consensus error.
#[source]
error: consensus::ConsensusError,
@ -23,12 +23,12 @@ pub enum StageError {
/// The stage encountered a database error.
#[error("An internal database error occurred: {0}")]
Database(#[from] DbError),
#[error("Stage encountered a execution error in block {block}: {error}.")]
#[error("Stage encountered a execution error in block {number}: {error}.", number = block.number)]
/// The stage encountered a execution error
// TODO: Probably redundant, should be rolled into `Validation`
ExecutionError {
/// The block that failed execution.
block: BlockNumber,
block: SealedHeader,
/// The underlying execution error.
#[source]
error: executor::BlockExecutionError,
@ -71,7 +71,6 @@ impl StageError {
StageError::Download(_) |
StageError::DatabaseIntegrity(_) |
StageError::StageCheckpoint(_) |
StageError::ExecutionError { .. } |
StageError::ChannelClosed |
StageError::Fatal(_) |
StageError::Transaction(_)

View File

@ -1,4 +1,4 @@
use reth_primitives::BlockNumber;
use reth_primitives::{BlockNumber, SealedHeader};
/// Determines the control flow during pipeline execution.
#[derive(Debug, Eq, PartialEq)]
@ -8,7 +8,7 @@ pub enum ControlFlow {
/// The block to unwind to.
target: BlockNumber,
/// The block that caused the unwind.
bad_block: Option<BlockNumber>,
bad_block: SealedHeader,
},
/// The pipeline is allowed to continue executing stages.
Continue {

View File

@ -221,7 +221,7 @@ where
}
ControlFlow::Continue { progress } => self.progress.update(progress),
ControlFlow::Unwind { target, bad_block } => {
self.unwind(target, bad_block).await?;
self.unwind(target, Some(bad_block.number)).await?;
return Ok(ControlFlow::Unwind { target, bad_block })
}
}
@ -371,7 +371,7 @@ where
warn!(
target: "sync::pipeline",
stage = %stage_id,
bad_block = %block,
bad_block = %block.number,
"Stage encountered a validation error: {error}"
);
@ -380,7 +380,7 @@ where
// beginning.
Ok(ControlFlow::Unwind {
target: prev_checkpoint.unwrap_or_default().block_number,
bad_block: Some(block),
bad_block: block,
})
} else if err.is_fatal() {
error!(
@ -422,7 +422,9 @@ mod tests {
use crate::{test_utils::TestStage, UnwindOutput};
use assert_matches::assert_matches;
use reth_db::mdbx::{self, test_utils, EnvKind};
use reth_interfaces::{consensus, provider::ProviderError};
use reth_interfaces::{
consensus, provider::ProviderError, test_utils::generators::random_header,
};
use tokio_stream::StreamExt;
#[test]
@ -676,7 +678,7 @@ mod tests {
.add_stage(
TestStage::new(StageId::Other("B"))
.add_exec(Err(StageError::Validation {
block: 5,
block: random_header(5, Default::default()),
error: consensus::ConsensusError::BaseFeeMissing,
}))
.add_unwind(Ok(UnwindOutput { checkpoint: StageCheckpoint::new(0) }))

View File

@ -156,7 +156,10 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
let (block, senders) = block.into_components();
let block_state = executor
.execute_and_verify_receipt(&block, td, Some(senders))
.map_err(|error| StageError::ExecutionError { block: block_number, error })?;
.map_err(|error| StageError::ExecutionError {
block: block.header.clone().seal_slow(),
error,
})?;
// Gas metrics
self.metrics

View File

@ -10,7 +10,7 @@ use reth_primitives::{
hex,
stage::{MerkleCheckpoint, StageCheckpoint, StageId},
trie::StoredSubNode,
BlockNumber, H256,
BlockNumber, SealedHeader, H256,
};
use reth_provider::Transaction;
use reth_trie::{IntermediateStateRootState, StateRoot, StateRootProgress};
@ -66,20 +66,23 @@ impl MerkleStage {
Self::Unwind
}
/// Check that the computed state root matches the expected.
/// Check that the computed state root matches the root in the expected header.
fn validate_state_root(
&self,
got: H256,
expected: H256,
expected: SealedHeader,
target_block: BlockNumber,
) -> Result<(), StageError> {
if got == expected {
if got == expected.state_root {
Ok(())
} else {
warn!(target: "sync::stages::merkle", ?target_block, ?got, ?expected, "Block's root state failed verification");
Err(StageError::Validation {
block: target_block,
error: consensus::ConsensusError::BodyStateRootDiff { got, expected },
block: expected.clone(),
error: consensus::ConsensusError::BodyStateRootDiff {
got,
expected: expected.state_root,
},
})
}
}
@ -154,7 +157,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let (from_block, to_block) = range.clone().into_inner();
let current_block = input.previous_stage_checkpoint().block_number;
let block_root = tx.get_header(current_block)?.state_root;
let block = tx.get_header(current_block)?;
let block_root = block.state_root;
let mut checkpoint = self.get_execution_checkpoint(tx)?;
@ -219,7 +223,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
// Reset the checkpoint
self.save_execution_checkpoint(tx, None)?;
self.validate_state_root(trie_root, block_root, to_block)?;
self.validate_state_root(trie_root, block.seal_slow(), to_block)?;
info!(target: "sync::stages::merkle::exec", stage_progress = to_block, is_final_range = true, "Stage iteration finished");
Ok(ExecOutput { checkpoint: StageCheckpoint::new(to_block), done: true })
@ -251,8 +255,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
.map_err(|e| StageError::Fatal(Box::new(e)))?;
// Validate the calulated state root
let target_root = tx.get_header(input.unwind_to)?.state_root;
self.validate_state_root(block_root, target_root, input.unwind_to)?;
let target = tx.get_header(input.unwind_to)?;
self.validate_state_root(block_root, target.seal_slow(), input.unwind_to)?;
// Validation passed, apply unwind changes to the database.
updates.flush(tx.deref_mut())?;

View File

@ -78,7 +78,7 @@ impl<DB: Database> Stage<DB> for TotalDifficultyStage {
self.consensus
.validate_header_with_total_difficulty(&header, td)
.map_err(|error| StageError::Validation { block: header.number, error })?;
.map_err(|error| StageError::Validation { block: header.seal_slow(), error })?;
cursor_td.append(block_number, td.into())?;
}
info!(target: "sync::stages::total_difficulty", stage_progress = end_block, is_final_range, "Stage iteration finished");

View File

@ -487,6 +487,10 @@ where
self.tree.find_canonical_ancestor(hash)
}
fn lowest_buffered_ancestor(&self, hash: BlockHash) -> Option<SealedBlockWithSenders> {
self.tree.lowest_buffered_ancestor(hash)
}
fn canonical_tip(&self) -> BlockNumHash {
self.tree.canonical_tip()
}