From 9bed8cf46661b9db2d4687d5fe4a72d37f398080 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Sat, 17 Aug 2024 16:49:53 -0700 Subject: [PATCH] feat: improve connecting downloaded blocks (#10368) --- crates/chain-state/src/test_utils.rs | 13 ++- crates/engine/tree/src/tree/mod.rs | 117 ++++++++++++++++++++++----- 2 files changed, 104 insertions(+), 26 deletions(-) diff --git a/crates/chain-state/src/test_utils.rs b/crates/chain-state/src/test_utils.rs index f63d4bbfa..d101b309a 100644 --- a/crates/chain-state/src/test_utils.rs +++ b/crates/chain-state/src/test_utils.rs @@ -5,11 +5,11 @@ use crate::{ use alloy_signer::SignerSync; use alloy_signer_local::PrivateKeySigner; use rand::{thread_rng, Rng}; -use reth_chainspec::ChainSpec; +use reth_chainspec::{ChainSpec, EthereumHardfork}; use reth_execution_types::{Chain, ExecutionOutcome}; use reth_primitives::{ constants::{EIP1559_INITIAL_BASE_FEE, EMPTY_ROOT_HASH}, - proofs::{calculate_receipt_root, calculate_transaction_root}, + proofs::{calculate_receipt_root, calculate_transaction_root, calculate_withdrawals_root}, Address, BlockNumber, Header, Receipt, Receipts, Requests, SealedBlock, SealedBlockWithSenders, Signature, Transaction, TransactionSigned, TransactionSignedEcRecovered, TxEip1559, B256, U256, }; @@ -156,7 +156,12 @@ impl TestBlockBuilder { ), )])), // use the number as the timestamp so it is monotonically increasing - timestamp: number, + timestamp: number + + EthereumHardfork::Cancun.activation_timestamp(self.chain_spec.chain).unwrap(), + withdrawals_root: Some(calculate_withdrawals_root(&[])), + blob_gas_used: Some(0), + excess_blob_gas: Some(0), + parent_beacon_block_root: Some(B256::random()), ..Default::default() }; @@ -164,7 +169,7 @@ impl TestBlockBuilder { header: header.seal_slow(), body: transactions.into_iter().map(|tx| tx.into_signed()).collect(), ommers: Vec::new(), - withdrawals: None, + withdrawals: Some(vec![].into()), requests: None, }; diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index f1ddebd2f..e88f3cb9e 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -646,10 +646,15 @@ where } } else { let mut latest_valid_hash = None; + let num_hash = block.num_hash(); match self.insert_block_without_senders(block) { Ok(status) => { let status = match status { - InsertPayloadOk::Inserted(BlockStatus::Valid(_)) | + InsertPayloadOk::Inserted(BlockStatus::Valid(_)) => { + latest_valid_hash = Some(block_hash); + self.try_connect_buffered_blocks(num_hash); + PayloadStatusEnum::Valid + } InsertPayloadOk::AlreadySeen(BlockStatus::Valid(_)) => { latest_valid_hash = Some(block_hash); PayloadStatusEnum::Valid @@ -2028,7 +2033,7 @@ mod tests { use reth_evm::test_utils::MockExecutorProvider; use reth_primitives::Bytes; use reth_provider::test_utils::MockEthProvider; - use reth_rpc_types_compat::engine::block_to_payload_v1; + use reth_rpc_types_compat::engine::{block_to_payload_v1, payload::block_to_payload_v3}; use std::{ str::FromStr, sync::mpsc::{channel, Sender}, @@ -2216,6 +2221,19 @@ mod tests { } } + async fn send_new_payload(&mut self, block: SealedBlockWithSenders) { + let payload = block_to_payload_v3(block.block.clone()); + self.tree + .on_new_payload( + payload.into(), + Some(CancunPayloadFields { + parent_beacon_block_root: block.parent_beacon_block_root.unwrap(), + versioned_hashes: vec![], + }), + ) + .unwrap(); + } + async fn insert_chain( &mut self, chain: impl IntoIterator + Clone, @@ -2259,16 +2277,20 @@ mod tests { &mut self, chain: impl IntoIterator + Clone, ) { - for _ in chain.clone() { - let event = self.from_tree_rx.recv().await.unwrap(); - match event { - EngineApiEvent::BeaconConsensus( - BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _), - ) => { - assert!(chain.clone().into_iter().any(|b| b.hash() == block.hash())); - } - _ => panic!("Unexpected event: {:#?}", event), + for block in chain.clone() { + self.check_canon_block_added(block.hash()).await; + } + } + + async fn check_canon_block_added(&mut self, expected_hash: B256) { + let event = self.from_tree_rx.recv().await.unwrap(); + match event { + EngineApiEvent::BeaconConsensus( + BeaconConsensusEngineEvent::CanonicalBlockAdded(block, _), + ) => { + assert!(block.hash() == expected_hash); } + _ => panic!("Unexpected event: {:#?}", event), } } @@ -2286,16 +2308,29 @@ mod tests { self.provider.extend_headers(headers_data); } - fn setup_range_insertion_for_chain(&mut self, chain: Vec) { + fn setup_range_insertion_for_valid_chain(&mut self, chain: Vec) { + self.setup_range_insertion_for_chain(chain, None) + } + + fn setup_range_insertion_for_chain( + &mut self, + chain: Vec, + invalid_index: Option, + ) { // setting up execution outcomes for the chain, the blocks will be // executed starting from the oldest, so we need to reverse. let mut chain_rev = chain; chain_rev.reverse(); let mut execution_outcomes = Vec::with_capacity(chain_rev.len()); - for block in &chain_rev { + for (index, block) in chain_rev.iter().enumerate() { let execution_outcome = self.block_builder.get_execution_outcome(block.clone()); - self.tree.provider.add_state_root(block.state_root); + let state_root = if invalid_index.is_some() && invalid_index.unwrap() == index { + B256::random() + } else { + block.state_root + }; + self.tree.provider.add_state_root(state_root); execution_outcomes.push(execution_outcome); } self.extend_execution_outcome(execution_outcomes); @@ -2890,7 +2925,7 @@ mod tests { main_chain.clone().drain(0..(MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize).collect(); test_harness.persist_blocks(backfilled_chain.clone()); - test_harness.setup_range_insertion_for_chain(backfilled_chain); + test_harness.setup_range_insertion_for_valid_chain(backfilled_chain); // send message to mark backfill finished test_harness.tree.on_engine_message(FromEngine::Event( @@ -2933,7 +2968,7 @@ mod tests { .drain((MIN_BLOCKS_FOR_PIPELINE_RUN + 1) as usize..main_chain.len()) .collect(); - test_harness.setup_range_insertion_for_chain(remaining.clone()); + test_harness.setup_range_insertion_for_valid_chain(remaining.clone()); // tell engine block range downloaded test_harness.tree.on_engine_message(FromEngine::DownloadedBlocks(remaining.clone())); @@ -3004,6 +3039,8 @@ mod tests { // extend base chain let extension_chain = test_harness.block_builder.create_fork(old_head, 5); let fork_block = extension_chain.last().unwrap().block.clone(); + + test_harness.setup_range_insertion_for_valid_chain(extension_chain.clone()); test_harness.insert_chain(extension_chain).await; // fcu to old_head @@ -3014,19 +3051,17 @@ mod tests { let chain_b = test_harness.block_builder.create_fork(&fork_block, 10); // insert chain A blocks using newPayload - test_harness.setup_range_insertion_for_chain(chain_a.clone()); + test_harness.setup_range_insertion_for_valid_chain(chain_a.clone()); for block in &chain_a { - let payload = block_to_payload_v1(block.block.clone()); - test_harness.tree.on_new_payload(payload.into(), None).unwrap(); + test_harness.send_new_payload(block.clone()).await; } test_harness.check_canon_chain_insertion(chain_a.clone()).await; // insert chain B blocks using newPayload - test_harness.setup_range_insertion_for_chain(chain_b.clone()); + test_harness.setup_range_insertion_for_valid_chain(chain_b.clone()); for block in &chain_b { - let payload = block_to_payload_v1(block.block.clone()); - test_harness.tree.on_new_payload(payload.into(), None).unwrap(); + test_harness.send_new_payload(block.clone()).await; } test_harness.check_canon_chain_insertion(chain_b.clone()).await; @@ -3047,4 +3082,42 @@ mod tests { // verify that chain A is now considered a fork assert!(test_harness.tree.state.tree_state.is_fork(chain_a.last().unwrap().hash())); } + + #[tokio::test] + async fn test_engine_tree_buffered_blocks_are_eventually_connected() { + let chain_spec = MAINNET.clone(); + let mut test_harness = TestHarness::new(chain_spec.clone()); + + let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect(); + test_harness = test_harness.with_blocks(base_chain.clone()); + + // side chain consisting of two blocks, the last will be inserted first + // so that we force it to be buffered + let side_chain = + test_harness.block_builder.create_fork(base_chain.last().unwrap().block(), 2); + + // buffer last block of side chain + let buffered_block = side_chain.last().unwrap(); + let buffered_block_hash = buffered_block.hash(); + + test_harness.setup_range_insertion_for_valid_chain(vec![buffered_block.clone()]); + test_harness.send_new_payload(buffered_block.clone()).await; + + assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_some()); + + let non_buffered_block = side_chain.first().unwrap(); + let non_buffered_block_hash = non_buffered_block.hash(); + + // insert block that continues the canon chain, should not be buffered + test_harness.setup_range_insertion_for_valid_chain(vec![non_buffered_block.clone()]); + test_harness.send_new_payload(non_buffered_block.clone()).await; + assert!(test_harness.tree.state.buffer.block(&non_buffered_block_hash).is_none()); + + // the previously buffered block should be connected now + assert!(test_harness.tree.state.buffer.block(&buffered_block_hash).is_none()); + + // both blocks are added to the canon chain in order + test_harness.check_canon_block_added(non_buffered_block_hash).await; + test_harness.check_canon_block_added(buffered_block_hash).await; + } }