fix: TreeState::on_new_head fork and reorg handling (#10232)

This commit is contained in:
Federico Gimenez
2024-08-10 11:45:45 +02:00
committed by GitHub
parent 2840a95785
commit faa38cec4d
4 changed files with 183 additions and 89 deletions

2
Cargo.lock generated
View File

@ -6482,6 +6482,8 @@ dependencies = [
name = "reth-chain-state"
version = "1.0.5"
dependencies = [
"alloy-signer",
"alloy-signer-local",
"auto_impl",
"derive_more",
"metrics",

View File

@ -21,8 +21,6 @@ reth-primitives.workspace = true
reth-storage-api.workspace = true
reth-trie.workspace = true
revm = { workspace = true, optional = true }
# async
tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] }
tokio-stream = { workspace = true, features = ["sync"] }
@ -36,11 +34,23 @@ derive_more.workspace = true
metrics.workspace = true
parking_lot.workspace = true
pin-project.workspace = true
# optional deps for test-utils
alloy-signer = { workspace = true, optional = true }
alloy-signer-local = { workspace = true, optional = true }
rand = { workspace = true, optional = true }
revm = { workspace = true, optional = true }
[dev-dependencies]
alloy-signer.workspace = true
alloy-signer-local.workspace = true
rand.workspace = true
revm.workspace = true
[features]
test-utils = ["rand", "revm"]
test-utils = [
"alloy-signer",
"alloy-signer-local",
"rand",
"revm"
]

View File

@ -2,6 +2,8 @@ use crate::{
in_memory::ExecutedBlock, CanonStateNotification, CanonStateNotifications,
CanonStateSubscriptions,
};
use alloy_signer::SignerSync;
use alloy_signer_local::PrivateKeySigner;
use rand::{thread_rng, Rng};
use reth_chainspec::ChainSpec;
use reth_execution_types::{Chain, ExecutionOutcome};
@ -26,6 +28,8 @@ use tokio::sync::broadcast::{self, Sender};
pub struct TestBlockBuilder {
/// The account that signs all the block's transactions.
pub signer: Address,
/// Private key for signing.
pub signer_pk: PrivateKeySigner,
/// Keeps track of signer's account info after execution, will be updated in
/// methods related to block execution.
pub signer_execute_account_info: AccountInfo,
@ -39,9 +43,12 @@ pub struct TestBlockBuilder {
impl Default for TestBlockBuilder {
fn default() -> Self {
let initial_account_info = AccountInfo::from_balance(U256::from(10).pow(U256::from(18)));
let signer_pk = PrivateKeySigner::random();
let signer = signer_pk.address();
Self {
chain_spec: ChainSpec::default(),
signer: Address::random(),
signer,
signer_pk,
signer_execute_account_info: initial_account_info.clone(),
signer_build_account_info: initial_account_info,
}
@ -49,9 +56,11 @@ impl Default for TestBlockBuilder {
}
impl TestBlockBuilder {
/// Signer setter.
pub const fn with_signer(mut self, signer: Address) -> Self {
self.signer = signer;
/// Signer pk setter.
pub fn with_signer_pk(mut self, signer_pk: PrivateKeySigner) -> Self {
self.signer = signer_pk.address();
self.signer_pk = signer_pk;
self
}
@ -75,17 +84,25 @@ impl TestBlockBuilder {
let mut rng = thread_rng();
let mock_tx = |nonce: u64| -> TransactionSignedEcRecovered {
let tx = Transaction::Eip1559(TxEip1559 {
chain_id: self.chain_spec.chain.id(),
nonce,
gas_limit: 21_000,
to: Address::random().into(),
max_fee_per_gas: EIP1559_INITIAL_BASE_FEE as u128,
max_priority_fee_per_gas: 1,
..Default::default()
});
let signature_hash = tx.signature_hash();
let signature = self.signer_pk.sign_hash_sync(&signature_hash).unwrap();
TransactionSigned::from_transaction_and_signature(
Transaction::Eip1559(TxEip1559 {
chain_id: self.chain_spec.chain.id(),
nonce,
gas_limit: 21_000,
to: Address::random().into(),
max_fee_per_gas: EIP1559_INITIAL_BASE_FEE as u128,
max_priority_fee_per_gas: 1,
..Default::default()
}),
Signature::default(),
tx,
Signature {
r: signature.r(),
s: signature.s(),
odd_y_parity: signature.v().y_parity(),
},
)
.with_signer(self.signer)
};

View File

@ -221,57 +221,62 @@ impl TreeState {
/// Note: This does not update the tracked state and instead returns the new chain based on the
/// given head.
fn on_new_head(&self, new_head: B256) -> Option<NewCanonicalChain> {
let mut new_chain = Vec::new();
let mut current_hash = new_head;
let mut fork_point = None;
let new_head_block = self.blocks_by_hash.get(&new_head)?;
let new_head_number = new_head_block.block.number;
let current_canonical_number = self.current_canonical_head.number;
// walk back the chain until we reach the canonical block
while current_hash != self.canonical_block_hash() {
let current_block = self.blocks_by_hash.get(&current_hash)?;
new_chain.push(current_block.clone());
let mut new_chain = vec![new_head_block.clone()];
let mut current_hash = new_head_block.block.parent_hash;
let mut current_number = new_head_number - 1;
// check if this block's parent has multiple children
if let Some(children) = self.parent_to_child.get(&current_block.block.parent_hash) {
if children.len() > 1 ||
self.canonical_block_hash() == current_block.block.parent_hash
{
// we've found a fork point
fork_point = Some(current_block.block.parent_hash);
break;
}
// Walk back the new chain until we reach a block we know about
while current_number > current_canonical_number {
if let Some(block) = self.blocks_by_hash.get(&current_hash) {
new_chain.push(block.clone());
current_hash = block.block.parent_hash;
current_number -= 1;
} else {
return None; // We don't have the full chain
}
current_hash = current_block.block.parent_hash;
}
new_chain.reverse();
// if we found a fork point, collect the reorged blocks
let reorged = if let Some(fork_hash) = fork_point {
let mut reorged = Vec::new();
let mut current_hash = self.current_canonical_head.hash;
// walk back the chain up to the fork hash
while current_hash != fork_hash {
if let Some(block) = self.blocks_by_hash.get(&current_hash) {
reorged.push(block.clone());
current_hash = block.block.parent_hash;
} else {
// current hash not found in memory
warn!(target: "consensus::engine", invalid_hash=?current_hash, "Block not found in TreeState while walking back fork");
return None;
}
}
reorged.reverse();
reorged
} else {
Vec::new()
};
if reorged.is_empty() {
Some(NewCanonicalChain::Commit { new: new_chain })
} else {
Some(NewCanonicalChain::Reorg { new: new_chain, old: reorged })
if current_hash == self.current_canonical_head.hash {
// Simple extension of the current chain
return Some(NewCanonicalChain::Commit { new: new_chain });
}
// We have a reorg. Walk back both chains to find the fork point.
let mut old_chain = Vec::new();
let mut old_hash = self.current_canonical_head.hash;
while old_hash != current_hash {
if let Some(block) = self.blocks_by_hash.get(&old_hash) {
old_chain.push(block.clone());
old_hash = block.block.parent_hash;
} else {
// This shouldn't happen as we're walking back the canonical chain
warn!(target: "consensus::engine", invalid_hash=?old_hash, "Canonical block not found in TreeState");
return None;
}
if old_hash == current_hash {
// We've found the fork point
break;
}
if let Some(block) = self.blocks_by_hash.get(&current_hash) {
new_chain.insert(0, block.clone());
current_hash = block.block.parent_hash;
} else {
// This shouldn't happen as we've already walked this path
warn!(target: "consensus::engine", invalid_hash=?current_hash, "New chain block not found in TreeState");
return None;
}
}
Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain })
}
}
@ -2021,7 +2026,7 @@ mod tests {
use reth_chainspec::{ChainSpec, HOLESKY, MAINNET};
use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm::test_utils::MockExecutorProvider;
use reth_primitives::{Address, Bytes};
use reth_primitives::Bytes;
use reth_provider::test_utils::MockEthProvider;
use reth_rpc_types_compat::engine::block_to_payload_v1;
use std::{
@ -2076,9 +2081,7 @@ mod tests {
TreeConfig::default(),
);
let block_builder = TestBlockBuilder::default()
.with_chain_spec((*chain_spec).clone())
.with_signer(Address::random());
let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());
Self {
to_tree_tx: tree.incoming_tx.clone(),
tree,
@ -2219,18 +2222,8 @@ mod tests {
) {
for block in chain.clone() {
self.insert_block(block.clone()).unwrap();
// check for CanonicalBlockAdded events, we expect chain.len() blocks added
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
) => {
assert!(chain.clone().into_iter().any(|b| b.hash() == block.hash()));
}
_ => panic!("Unexpected event: {:#?}", event),
}
}
self.check_canon_chain_insertion(chain).await;
}
async fn check_canon_commit(&mut self, hash: B256) {
@ -2262,9 +2255,24 @@ mod tests {
}
}
fn persist_blocks(&mut self, blocks: Vec<SealedBlockWithSenders>) {
self.setup_range_insertion_for_chain(blocks.clone());
async fn check_canon_chain_insertion(
&mut self,
chain: impl IntoIterator<Item = SealedBlockWithSenders> + Clone,
) {
for _ in chain.clone() {
let event = self.from_tree_rx.recv().await.unwrap();
match event {
EngineApiEvent::BeaconConsensus(
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _),
) => {
assert!(chain.clone().into_iter().any(|b| b.hash() == block.hash()));
}
_ => panic!("Unexpected event: {:#?}", event),
}
}
}
fn persist_blocks(&self, blocks: Vec<SealedBlockWithSenders>) {
let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len());
let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len());
@ -2279,8 +2287,13 @@ mod tests {
}
fn setup_range_insertion_for_chain(&mut self, chain: Vec<SealedBlockWithSenders>) {
let mut execution_outcomes = Vec::new();
for block in &chain {
// setting up execution outcomes for the chain, the blocks will be
// executed starting from the oldest, so we need to reverse.
let mut chain_rev = chain;
chain_rev.reverse();
let mut execution_outcomes = Vec::with_capacity(chain_rev.len());
for block in &chain_rev {
let execution_outcome = self.block_builder.get_execution_outcome(block.clone());
self.tree.provider.add_state_root(block.state_root);
execution_outcomes.push(execution_outcome);
@ -2877,11 +2890,7 @@ mod tests {
main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect();
test_harness.persist_blocks(backfilled_chain.clone());
// setting up execution outcomes for the chain, the blocks will be
// executed starting from the oldest, so we need to reverse.
let mut backfilled_chain_rev = backfilled_chain.clone();
backfilled_chain_rev.reverse();
test_harness.setup_range_insertion_for_chain(backfilled_chain_rev.to_vec());
test_harness.setup_range_insertion_for_chain(backfilled_chain);
// send message to mark backfill finished
test_harness.tree.on_engine_message(FromEngine::Event(
@ -2924,11 +2933,7 @@ mod tests {
.drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len())
.collect();
// setting up execution outcomes for the chain, the blocks will be
// executed starting from the oldest, so we need to reverse.
let mut remaining_rev = remaining.clone();
remaining_rev.reverse();
test_harness.setup_range_insertion_for_chain(remaining_rev.to_vec());
test_harness.setup_range_insertion_for_chain(remaining.clone());
// tell engine block range downloaded
test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(remaining.clone()));
@ -2982,4 +2987,64 @@ mod tests {
test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await;
test_harness.check_canon_head(main_last_hash);
}
#[tokio::test]
async fn test_engine_tree_forks_with_older_canonical_head() {
reth_tracing::init_test_tracing();
let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec.clone());
// create base chain and setup test harness with it
let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
test_harness = test_harness.with_blocks(base_chain.clone());
let old_head = base_chain.first().unwrap().block();
// extend base chain
let extension_chain = test_harness.block_builder.create_fork(old_head, 5);
let fork_block = extension_chain.last().unwrap().block.clone();
test_harness.insert_chain(extension_chain).await;
// fcu to old_head
test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await;
// create two competing chains starting from fork_block
let chain_a = test_harness.block_builder.create_fork(&fork_block, 10);
let chain_b = test_harness.block_builder.create_fork(&fork_block, 10);
// insert chain A blocks using newPayload
test_harness.setup_range_insertion_for_chain(chain_a.clone());
for block in &chain_a {
let payload = block_to_payload_v1(block.block.clone());
test_harness.tree.on_new_payload(payload.into(), None).unwrap();
}
test_harness.check_canon_chain_insertion(chain_a.clone()).await;
// insert chain B blocks using newPayload
test_harness.setup_range_insertion_for_chain(chain_b.clone());
for block in &chain_b {
let payload = block_to_payload_v1(block.block.clone());
test_harness.tree.on_new_payload(payload.into(), None).unwrap();
}
test_harness.check_canon_chain_insertion(chain_b.clone()).await;
// send FCU to make the tip of chain B the new head
let chain_b_tip_hash = chain_b.last().unwrap().hash();
test_harness.send_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
// check for CanonicalChainCommitted event
test_harness.check_canon_commit(chain_b_tip_hash).await;
// verify FCU was processed
test_harness.check_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await;
// verify the new canonical head
test_harness.check_canon_head(chain_b_tip_hash);
// verify that chain A is now considered a fork
assert!(test_harness.tree.state.tree_state.is_fork(chain_a.last().unwrap().hash()));
}
}