From faa38cec4d612794e0c0e23c346a32ff6723c815 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Sat, 10 Aug 2024 11:45:45 +0200 Subject: [PATCH] fix: TreeState::on_new_head fork and reorg handling (#10232) --- Cargo.lock | 2 + crates/chain-state/Cargo.toml | 16 +- crates/chain-state/src/test_utils.rs | 45 ++++-- crates/engine/tree/src/tree/mod.rs | 209 ++++++++++++++++++--------- 4 files changed, 183 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 99e69276f..7b3820ac9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6482,6 +6482,8 @@ dependencies = [ name = "reth-chain-state" version = "1.0.5" dependencies = [ + "alloy-signer", + "alloy-signer-local", "auto_impl", "derive_more", "metrics", diff --git a/crates/chain-state/Cargo.toml b/crates/chain-state/Cargo.toml index 2a2911cb9..9f0706a74 100644 --- a/crates/chain-state/Cargo.toml +++ b/crates/chain-state/Cargo.toml @@ -21,8 +21,6 @@ reth-primitives.workspace = true reth-storage-api.workspace = true reth-trie.workspace = true -revm = { workspace = true, optional = true } - # async tokio = { workspace = true, features = ["sync", "macros", "rt-multi-thread"] } tokio-stream = { workspace = true, features = ["sync"] } @@ -36,11 +34,23 @@ derive_more.workspace = true metrics.workspace = true parking_lot.workspace = true pin-project.workspace = true + +# optional deps for test-utils +alloy-signer = { workspace = true, optional = true } +alloy-signer-local = { workspace = true, optional = true } rand = { workspace = true, optional = true } +revm = { workspace = true, optional = true } [dev-dependencies] +alloy-signer.workspace = true +alloy-signer-local.workspace = true rand.workspace = true revm.workspace = true [features] -test-utils = ["rand", "revm"] +test-utils = [ + "alloy-signer", + "alloy-signer-local", + "rand", + "revm" +] diff --git a/crates/chain-state/src/test_utils.rs b/crates/chain-state/src/test_utils.rs index 1548b0af5..f63d4bbfa 100644 --- a/crates/chain-state/src/test_utils.rs +++ b/crates/chain-state/src/test_utils.rs @@ -2,6 +2,8 @@ use crate::{ in_memory::ExecutedBlock, CanonStateNotification, CanonStateNotifications, CanonStateSubscriptions, }; +use alloy_signer::SignerSync; +use alloy_signer_local::PrivateKeySigner; use rand::{thread_rng, Rng}; use reth_chainspec::ChainSpec; use reth_execution_types::{Chain, ExecutionOutcome}; @@ -26,6 +28,8 @@ use tokio::sync::broadcast::{self, Sender}; pub struct TestBlockBuilder { /// The account that signs all the block's transactions. pub signer: Address, + /// Private key for signing. + pub signer_pk: PrivateKeySigner, /// Keeps track of signer's account info after execution, will be updated in /// methods related to block execution. pub signer_execute_account_info: AccountInfo, @@ -39,9 +43,12 @@ pub struct TestBlockBuilder { impl Default for TestBlockBuilder { fn default() -> Self { let initial_account_info = AccountInfo::from_balance(U256::from(10).pow(U256::from(18))); + let signer_pk = PrivateKeySigner::random(); + let signer = signer_pk.address(); Self { chain_spec: ChainSpec::default(), - signer: Address::random(), + signer, + signer_pk, signer_execute_account_info: initial_account_info.clone(), signer_build_account_info: initial_account_info, } @@ -49,9 +56,11 @@ impl Default for TestBlockBuilder { } impl TestBlockBuilder { - /// Signer setter. - pub const fn with_signer(mut self, signer: Address) -> Self { - self.signer = signer; + /// Signer pk setter. + pub fn with_signer_pk(mut self, signer_pk: PrivateKeySigner) -> Self { + self.signer = signer_pk.address(); + self.signer_pk = signer_pk; + self } @@ -75,17 +84,25 @@ impl TestBlockBuilder { let mut rng = thread_rng(); let mock_tx = |nonce: u64| -> TransactionSignedEcRecovered { + let tx = Transaction::Eip1559(TxEip1559 { + chain_id: self.chain_spec.chain.id(), + nonce, + gas_limit: 21_000, + to: Address::random().into(), + max_fee_per_gas: EIP1559_INITIAL_BASE_FEE as u128, + max_priority_fee_per_gas: 1, + ..Default::default() + }); + let signature_hash = tx.signature_hash(); + let signature = self.signer_pk.sign_hash_sync(&signature_hash).unwrap(); + TransactionSigned::from_transaction_and_signature( - Transaction::Eip1559(TxEip1559 { - chain_id: self.chain_spec.chain.id(), - nonce, - gas_limit: 21_000, - to: Address::random().into(), - max_fee_per_gas: EIP1559_INITIAL_BASE_FEE as u128, - max_priority_fee_per_gas: 1, - ..Default::default() - }), - Signature::default(), + tx, + Signature { + r: signature.r(), + s: signature.s(), + odd_y_parity: signature.v().y_parity(), + }, ) .with_signer(self.signer) }; diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 3329835c8..f1ddebd2f 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -221,57 +221,62 @@ impl TreeState { /// Note: This does not update the tracked state and instead returns the new chain based on the /// given head. fn on_new_head(&self, new_head: B256) -> Option { - let mut new_chain = Vec::new(); - let mut current_hash = new_head; - let mut fork_point = None; + let new_head_block = self.blocks_by_hash.get(&new_head)?; + let new_head_number = new_head_block.block.number; + let current_canonical_number = self.current_canonical_head.number; - // walk back the chain until we reach the canonical block - while current_hash != self.canonical_block_hash() { - let current_block = self.blocks_by_hash.get(¤t_hash)?; - new_chain.push(current_block.clone()); + let mut new_chain = vec![new_head_block.clone()]; + let mut current_hash = new_head_block.block.parent_hash; + let mut current_number = new_head_number - 1; - // check if this block's parent has multiple children - if let Some(children) = self.parent_to_child.get(¤t_block.block.parent_hash) { - if children.len() > 1 || - self.canonical_block_hash() == current_block.block.parent_hash - { - // we've found a fork point - fork_point = Some(current_block.block.parent_hash); - break; - } + // Walk back the new chain until we reach a block we know about + while current_number > current_canonical_number { + if let Some(block) = self.blocks_by_hash.get(¤t_hash) { + new_chain.push(block.clone()); + current_hash = block.block.parent_hash; + current_number -= 1; + } else { + return None; // We don't have the full chain } - - current_hash = current_block.block.parent_hash; } new_chain.reverse(); - // if we found a fork point, collect the reorged blocks - let reorged = if let Some(fork_hash) = fork_point { - let mut reorged = Vec::new(); - let mut current_hash = self.current_canonical_head.hash; - // walk back the chain up to the fork hash - while current_hash != fork_hash { - if let Some(block) = self.blocks_by_hash.get(¤t_hash) { - reorged.push(block.clone()); - current_hash = block.block.parent_hash; - } else { - // current hash not found in memory - warn!(target: "consensus::engine", invalid_hash=?current_hash, "Block not found in TreeState while walking back fork"); - return None; - } - } - reorged.reverse(); - reorged - } else { - Vec::new() - }; - - if reorged.is_empty() { - Some(NewCanonicalChain::Commit { new: new_chain }) - } else { - Some(NewCanonicalChain::Reorg { new: new_chain, old: reorged }) + if current_hash == self.current_canonical_head.hash { + // Simple extension of the current chain + return Some(NewCanonicalChain::Commit { new: new_chain }); } + + // We have a reorg. Walk back both chains to find the fork point. + let mut old_chain = Vec::new(); + let mut old_hash = self.current_canonical_head.hash; + + while old_hash != current_hash { + if let Some(block) = self.blocks_by_hash.get(&old_hash) { + old_chain.push(block.clone()); + old_hash = block.block.parent_hash; + } else { + // This shouldn't happen as we're walking back the canonical chain + warn!(target: "consensus::engine", invalid_hash=?old_hash, "Canonical block not found in TreeState"); + return None; + } + + if old_hash == current_hash { + // We've found the fork point + break; + } + + if let Some(block) = self.blocks_by_hash.get(¤t_hash) { + new_chain.insert(0, block.clone()); + current_hash = block.block.parent_hash; + } else { + // This shouldn't happen as we've already walked this path + warn!(target: "consensus::engine", invalid_hash=?current_hash, "New chain block not found in TreeState"); + return None; + } + } + + Some(NewCanonicalChain::Reorg { new: new_chain, old: old_chain }) } } @@ -2021,7 +2026,7 @@ mod tests { use reth_chainspec::{ChainSpec, HOLESKY, MAINNET}; use reth_ethereum_engine_primitives::EthEngineTypes; use reth_evm::test_utils::MockExecutorProvider; - use reth_primitives::{Address, Bytes}; + use reth_primitives::Bytes; use reth_provider::test_utils::MockEthProvider; use reth_rpc_types_compat::engine::block_to_payload_v1; use std::{ @@ -2076,9 +2081,7 @@ mod tests { TreeConfig::default(), ); - let block_builder = TestBlockBuilder::default() - .with_chain_spec((*chain_spec).clone()) - .with_signer(Address::random()); + let block_builder = TestBlockBuilder::default().with_chain_spec((*chain_spec).clone()); Self { to_tree_tx: tree.incoming_tx.clone(), tree, @@ -2219,18 +2222,8 @@ mod tests { ) { for block in chain.clone() { self.insert_block(block.clone()).unwrap(); - - // check for CanonicalBlockAdded events, we expect chain.len() blocks added - let event = self.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus( - BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _), - ) => { - assert!(chain.clone().into_iter().any(|b| b.hash() == block.hash())); - } - _ => panic!("Unexpected event: {:#?}", event), - } } + self.check_canon_chain_insertion(chain).await; } async fn check_canon_commit(&mut self, hash: B256) { @@ -2262,9 +2255,24 @@ mod tests { } } - fn persist_blocks(&mut self, blocks: Vec) { - self.setup_range_insertion_for_chain(blocks.clone()); + async fn check_canon_chain_insertion( + &mut self, + chain: impl IntoIterator + Clone, + ) { + for _ in chain.clone() { + let event = self.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::BeaconConsensus( + BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _), + ) => { + assert!(chain.clone().into_iter().any(|b| b.hash() == block.hash())); + } + _ => panic!("Unexpected event: {:#?}", event), + } + } + } + fn persist_blocks(&self, blocks: Vec) { let mut block_data: Vec<(B256, Block)> = Vec::with_capacity(blocks.len()); let mut headers_data: Vec<(B256, Header)> = Vec::with_capacity(blocks.len()); @@ -2279,8 +2287,13 @@ mod tests { } fn setup_range_insertion_for_chain(&mut self, chain: Vec) { - let mut execution_outcomes = Vec::new(); - for block in &chain { + // setting up execution outcomes for the chain, the blocks will be + // executed starting from the oldest, so we need to reverse. + let mut chain_rev = chain; + chain_rev.reverse(); + + let mut execution_outcomes = Vec::with_capacity(chain_rev.len()); + for block in &chain_rev { let execution_outcome = self.block_builder.get_execution_outcome(block.clone()); self.tree.provider.add_state_root(block.state_root); execution_outcomes.push(execution_outcome); @@ -2877,11 +2890,7 @@ mod tests { main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect(); test_harness.persist_blocks(backfilled_chain.clone()); - // setting up execution outcomes for the chain, the blocks will be - // executed starting from the oldest, so we need to reverse. - let mut backfilled_chain_rev = backfilled_chain.clone(); - backfilled_chain_rev.reverse(); - test_harness.setup_range_insertion_for_chain(backfilled_chain_rev.to_vec()); + test_harness.setup_range_insertion_for_chain(backfilled_chain); // send message to mark backfill finished test_harness.tree.on_engine_message(FromEngine::Event( @@ -2924,11 +2933,7 @@ mod tests { .drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len()) .collect(); - // setting up execution outcomes for the chain, the blocks will be - // executed starting from the oldest, so we need to reverse. - let mut remaining_rev = remaining.clone(); - remaining_rev.reverse(); - test_harness.setup_range_insertion_for_chain(remaining_rev.to_vec()); + test_harness.setup_range_insertion_for_chain(remaining.clone()); // tell engine block range downloaded test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(remaining.clone())); @@ -2982,4 +2987,64 @@ mod tests { test_harness.check_fcu(main_last_hash, ForkchoiceStatus::Valid).await; test_harness.check_canon_head(main_last_hash); } + + #[tokio::test] + async fn test_engine_tree_forks_with_older_canonical_head() { + reth_tracing::init_test_tracing(); + + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + + // create base chain and setup test harness with it + let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); + test_harness = test_harness.with_blocks(base_chain.clone()); + + let old_head = base_chain.first().unwrap().block(); + + // extend base chain + let extension_chain = test_harness.block_builder.create_fork(old_head, 5); + let fork_block = extension_chain.last().unwrap().block.clone(); + test_harness.insert_chain(extension_chain).await; + + // fcu to old_head + test_harness.fcu_to(old_head.hash(), ForkchoiceStatus::Valid).await; + + // create two competing chains starting from fork_block + let chain_a = test_harness.block_builder.create_fork(&fork_block, 10); + let chain_b = test_harness.block_builder.create_fork(&fork_block, 10); + + // insert chain A blocks using newPayload + test_harness.setup_range_insertion_for_chain(chain_a.clone()); + for block in &chain_a { + let payload = block_to_payload_v1(block.block.clone()); + test_harness.tree.on_new_payload(payload.into(), None).unwrap(); + } + + test_harness.check_canon_chain_insertion(chain_a.clone()).await; + + // insert chain B blocks using newPayload + test_harness.setup_range_insertion_for_chain(chain_b.clone()); + for block in &chain_b { + let payload = block_to_payload_v1(block.block.clone()); + test_harness.tree.on_new_payload(payload.into(), None).unwrap(); + } + + test_harness.check_canon_chain_insertion(chain_b.clone()).await; + + // send FCU to make the tip of chain B the new head + let chain_b_tip_hash = chain_b.last().unwrap().hash(); + test_harness.send_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await; + + // check for CanonicalChainCommitted event + test_harness.check_canon_commit(chain_b_tip_hash).await; + + // verify FCU was processed + test_harness.check_fcu(chain_b_tip_hash, ForkchoiceStatus::Valid).await; + + // verify the new canonical head + test_harness.check_canon_head(chain_b_tip_hash); + + // verify that chain A is now considered a fork + assert!(test_harness.tree.state.tree_state.is_fork(chain_a.last().unwrap().hash())); + } }