From 0994d36c4ef022d143195d91b85dd06aeedd1d2b Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Wed, 17 Jul 2024 18:37:27 +0200 Subject: [PATCH] feat: add in memory state container (#9574) --- Cargo.lock | 1 + crates/engine/tree/Cargo.toml | 5 +- crates/engine/tree/src/test_utils.rs | 15 +- crates/engine/tree/src/tree/mod.rs | 309 +++++++++++++++++++++------ 4 files changed, 261 insertions(+), 69 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 612863dee..c2b718634 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6947,6 +6947,7 @@ dependencies = [ "futures", "metrics", "parking_lot 0.12.3", + "rand 0.8.5", "reth-beacon-consensus", "reth-blockchain-tree", "reth-blockchain-tree-api", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index a8d29b020..475aa1c45 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -59,6 +59,7 @@ tracing.workspace = true # optional deps for test-utils reth-stages = { workspace = true, optional = true } reth-tracing = { workspace = true, optional = true } +rand = { workspace = true, optional = true } [dev-dependencies] # reth @@ -73,11 +74,13 @@ reth-stages = { workspace = true, features = ["test-utils"] } reth-tracing.workspace = true assert_matches.workspace = true +rand.workspace = true [features] test-utils = [ "reth-db/test-utils", "reth-network-p2p/test-utils", "reth-stages/test-utils", - "reth-tracing" + "reth-tracing", + "rand" ] diff --git a/crates/engine/tree/src/test_utils.rs b/crates/engine/tree/src/test_utils.rs index 399caa4f7..f946f2259 100644 --- a/crates/engine/tree/src/test_utils.rs +++ b/crates/engine/tree/src/test_utils.rs @@ -1,4 +1,5 @@ use crate::tree::ExecutedBlock; +use rand::Rng; use reth_chainspec::ChainSpec; use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase}; use reth_network_p2p::test_utils::TestFullBlockClient; @@ -82,7 +83,7 @@ pub(crate) fn insert_headers_into_client( } } -pub(crate) fn get_executed_block_with_number(block_number: BlockNumber) -> ExecutedBlock { +fn get_executed_block(block_number: BlockNumber, receipts: Receipts) -> ExecutedBlock { let mut block = Block::default(); let mut header = block.header.clone(); header.number = block_number; @@ -99,7 +100,7 @@ pub(crate) fn get_executed_block_with_number(block_number: BlockNumber) -> Execu Arc::new(sealed_with_senders.senders), Arc::new(ExecutionOutcome::new( BundleState::default(), - Receipts { receipt_vec: vec![vec![]] }, + receipts, block_number, vec![Requests::default()], )), @@ -108,6 +109,16 @@ pub(crate) fn get_executed_block_with_number(block_number: BlockNumber) -> Execu ) } +pub(crate) fn get_executed_block_with_receipts(receipts: Receipts) -> ExecutedBlock { + let number = rand::thread_rng().gen::(); + + get_executed_block(number, receipts) +} + +pub(crate) fn get_executed_block_with_number(block_number: BlockNumber) -> ExecutedBlock { + get_executed_block(block_number, Receipts { receipt_vec: vec![vec![]] }) +} + pub(crate) fn get_executed_blocks(range: Range) -> impl Iterator { range.map(get_executed_block_with_number) } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 4b6c8e1b1..5f603d56d 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -152,27 +152,43 @@ impl TreeState { } } -impl InMemoryState for TreeState { - fn in_memory_state_by_hash(&self, hash: B256) -> Option> { - let sealed_block = self.block_by_hash(hash)?; - Some(Arc::new(State::new(sealed_block))) +/// Container type for in memory state data. +#[derive(Debug, Default)] +pub struct InMemoryStateImpl { + blocks: HashMap>, + numbers: HashMap, + pending: Option, +} + +impl InMemoryStateImpl { + const fn new( + blocks: HashMap>, + numbers: HashMap, + pending: Option, + ) -> Self { + Self { blocks, numbers, pending } + } +} + +impl InMemoryState for InMemoryStateImpl { + fn state_by_hash(&self, hash: B256) -> Option> { + self.blocks.get(&hash).cloned() } - fn in_memory_state_by_number(&self, number: u64) -> Option> { - let sealed_block = self.block_by_number(number)?; - Some(Arc::new(State::new(sealed_block))) + fn state_by_number(&self, number: u64) -> Option> { + self.numbers.get(&number).and_then(|hash| self.blocks.get(hash).cloned()) } - fn in_memory_current_head(&self) -> Option<(BlockNumber, B256)> { - self.current_head + fn current_head(&self) -> Option<(BlockNumber, B256)> { + self.numbers.iter().max_by_key(|(&number, _)| number).map(|(&number, &hash)| (number, hash)) } - fn in_memory_pending_state(&self) -> Option> { - self.pending.clone() + fn pending_block_hash(&self) -> Option { + self.pending.as_ref().map(|state| state.hash()) } - fn in_memory_pending_block_hash(&self) -> Option { - self.pending.as_ref().map(|state| state.block_hash) + fn pending_state(&self) -> Option> { + self.pending.as_ref().map(|state| Arc::new(State(state.0.clone()))) } } @@ -300,6 +316,7 @@ pub struct EngineApiTreeHandlerImpl { persistence_state: PersistenceState, /// (tmp) The flag indicating whether the pipeline is active. is_pipeline_active: bool, + canonical_in_memory_state: InMemoryStateImpl, _marker: PhantomData, } @@ -331,6 +348,7 @@ where persistence_state: PersistenceState::default(), is_pipeline_active: false, state, + canonical_in_memory_state: InMemoryStateImpl::default(), _marker: PhantomData, } } @@ -914,54 +932,63 @@ impl PersistenceState { /// Represents the tree state kept in memory. trait InMemoryState: Send + Sync { /// Returns the state for a given block hash. - fn in_memory_state_by_hash(&self, hash: B256) -> Option>; + fn state_by_hash(&self, hash: B256) -> Option>; /// Returns the state for a given block number. - fn in_memory_state_by_number(&self, number: u64) -> Option>; + fn state_by_number(&self, number: u64) -> Option>; /// Returns the current chain head. - fn in_memory_current_head(&self) -> Option<(BlockNumber, B256)>; + fn current_head(&self) -> Option<(BlockNumber, B256)>; /// Returns the pending block hash. - fn in_memory_pending_block_hash(&self) -> Option; + fn pending_block_hash(&self) -> Option; /// Returns the pending state corresponding to the current head plus one, /// from the payload received in newPayload that does not have a FCU yet. - fn in_memory_pending_state(&self) -> Option>; + fn pending_state(&self) -> Option>; } -/// State composed of a block hash, and the receipts, state and transactions root -/// after executing it. +/// State after applying the given block. #[derive(Debug, PartialEq, Eq)] -pub struct State { - /// Block hash defining the state. - block_hash: B256, - /// Block number defining the state. - block_number: BlockNumber, - /// State root after applying the block. - state_root: B256, - /// Transactions root of the block. - transactions_root: B256, - /// Receipts root after applying the block. - receipts_root: B256, -} +pub struct State(ExecutedBlock); impl State { - fn new(sealed_block: Arc) -> Self { - Self { - block_hash: sealed_block.hash(), - block_number: sealed_block.number, - state_root: sealed_block.state_root, - transactions_root: sealed_block.transactions_root, - receipts_root: sealed_block.receipts_root, - } + const fn new(executed_block: ExecutedBlock) -> Self { + Self(executed_block) + } + + fn block(&self) -> ExecutedBlock { + self.0.clone() + } + + fn hash(&self) -> B256 { + self.0.block().hash() + } + + fn number(&self) -> u64 { + self.0.block().number + } + + fn state_root(&self) -> B256 { + self.0.block().header.state_root + } + + fn receipts(&self) -> &Receipts { + &self.0.execution_outcome().receipts } } #[cfg(test)] mod tests { use super::*; - use crate::{static_files::StaticFileAction, test_utils::get_executed_blocks}; + use crate::{ + static_files::StaticFileAction, + test_utils::{ + get_executed_block_with_number, get_executed_block_with_receipts, get_executed_blocks, + }, + }; + use rand::Rng; use reth_beacon_consensus::EthBeaconConsensus; use reth_chainspec::{ChainSpecBuilder, MAINNET}; use reth_ethereum_engine_primitives::EthEngineTypes; use reth_evm::test_utils::MockExecutorProvider; + use reth_primitives::Receipt; use reth_provider::test_utils::MockEthProvider; use std::sync::mpsc::{channel, Sender}; use tokio::sync::mpsc::unbounded_channel; @@ -978,12 +1005,16 @@ mod tests { let mut blocks_by_hash = HashMap::new(); let mut blocks_by_number = BTreeMap::new(); + let mut state_by_hash = HashMap::new(); + let mut hash_by_number = HashMap::new(); for block in &blocks { - blocks_by_hash.insert(block.block().hash(), block.clone()); - blocks_by_number - .entry(block.block().number) - .or_insert_with(Vec::new) - .push(block.clone()); + let sealed_block = block.block(); + let hash = sealed_block.hash(); + let number = sealed_block.number; + blocks_by_hash.insert(hash, block.clone()); + blocks_by_number.entry(number).or_insert_with(Vec::new).push(block.clone()); + state_by_hash.insert(hash, Arc::new(State(block.clone()))); + hash_by_number.insert(number, hash); } let tree_state = TreeState { blocks_by_hash, blocks_by_number, ..Default::default() }; @@ -1016,21 +1047,26 @@ mod tests { forkchoice_state_tracker: ForkchoiceStateTracker::default(), }; - TestHarness { - tree: EngineApiTreeHandlerImpl::new( - provider, - executor_factory, - consensus, - payload_validator, - to_tree_rx, - from_tree_tx, - engine_api_tree_state, - persistence_handle, - ), - to_tree_tx, - blocks, - sf_action_rx, - } + let mut tree = EngineApiTreeHandlerImpl::new( + provider, + executor_factory, + consensus, + payload_validator, + to_tree_rx, + from_tree_tx, + engine_api_tree_state, + persistence_handle, + ); + let last_executed_block = blocks.last().unwrap().clone(); + let pending = Some(State::new(last_executed_block)); + tree.canonical_in_memory_state = + InMemoryStateImpl::new(state_by_hash, hash_by_number, pending); + + TestHarness { tree, to_tree_tx, blocks, sf_action_rx } + } + + fn create_mock_state(block_number: u64) -> State { + State::new(get_executed_block_with_number(block_number)) } #[tokio::test] @@ -1055,8 +1091,8 @@ mod tests { } } - #[test] - fn test_in_memory_state_trait_impl() { + #[tokio::test] + async fn test_in_memory_state_trait_impl() { let TestHarness { tree, to_tree_tx, sf_action_rx, blocks } = get_default_test_harness(10); let head_block = blocks.last().unwrap().block(); @@ -1065,15 +1101,156 @@ mod tests { for executed_block in blocks { let sealed_block = executed_block.block(); - let expected_state = State::new(Arc::new(sealed_block.clone())); + let expected_state = State::new(executed_block.clone()); let actual_state_by_hash = - tree.state.tree_state.in_memory_state_by_hash(sealed_block.hash()).unwrap(); + tree.canonical_in_memory_state.state_by_hash(sealed_block.hash()).unwrap(); assert_eq!(expected_state, *actual_state_by_hash); let actual_state_by_number = - tree.state.tree_state.in_memory_state_by_number(sealed_block.number).unwrap(); + tree.canonical_in_memory_state.state_by_number(sealed_block.number).unwrap(); assert_eq!(expected_state, *actual_state_by_number); } } + + #[tokio::test] + async fn test_in_memory_state_impl_state_by_hash() { + let mut state_by_hash = HashMap::new(); + let number = rand::thread_rng().gen::(); + let state = Arc::new(create_mock_state(number)); + state_by_hash.insert(state.hash(), state.clone()); + + let in_memory_state = InMemoryStateImpl::new(state_by_hash, HashMap::new(), None); + + assert_eq!(in_memory_state.state_by_hash(state.hash()), Some(state)); + assert_eq!(in_memory_state.state_by_hash(B256::random()), None); + } + + #[tokio::test] + async fn test_in_memory_state_impl_state_by_number() { + let mut state_by_hash = HashMap::new(); + let mut hash_by_number = HashMap::new(); + + let number = rand::thread_rng().gen::(); + let state = Arc::new(create_mock_state(number)); + let hash = state.hash(); + + state_by_hash.insert(hash, state.clone()); + hash_by_number.insert(number, hash); + + let in_memory_state = InMemoryStateImpl::new(state_by_hash, hash_by_number, None); + + assert_eq!(in_memory_state.state_by_number(number), Some(state)); + assert_eq!(in_memory_state.state_by_number(number + 1), None); + } + + #[tokio::test] + async fn test_in_memory_state_impl_current_head() { + let mut hash_by_number = HashMap::new(); + let hash1 = B256::random(); + let hash2 = B256::random(); + hash_by_number.insert(1, hash1); + hash_by_number.insert(2, hash2); + + let in_memory_state = InMemoryStateImpl::new(HashMap::new(), hash_by_number, None); + + assert_eq!(in_memory_state.current_head(), Some((2, hash2))); + } + + #[tokio::test] + async fn test_in_memory_state_impl_pending_block_hash() { + let number = rand::thread_rng().gen::(); + let pending_state = create_mock_state(number); + let pending_hash = pending_state.hash(); + + let in_memory_state = + InMemoryStateImpl::new(HashMap::new(), HashMap::new(), Some(pending_state)); + + assert_eq!(in_memory_state.pending_block_hash(), Some(pending_hash)); + } + + #[tokio::test] + async fn test_in_memory_state_impl_pending_state() { + let pending_number = rand::thread_rng().gen::(); + let pending_state = create_mock_state(pending_number); + let pending_hash = pending_state.hash(); + + let in_memory_state = + InMemoryStateImpl::new(HashMap::new(), HashMap::new(), Some(pending_state)); + + let result = in_memory_state.pending_state(); + assert!(result.is_some()); + let actual_pending_state = result.unwrap(); + assert_eq!(actual_pending_state.0.block().hash(), pending_hash); + assert_eq!(actual_pending_state.0.block().number, pending_number); + } + + #[tokio::test] + async fn test_in_memory_state_impl_no_pending_state() { + let in_memory_state = InMemoryStateImpl::new(HashMap::new(), HashMap::new(), None); + + assert_eq!(in_memory_state.pending_block_hash(), None); + assert_eq!(in_memory_state.pending_state(), None); + } + + #[tokio::test] + async fn test_state_new() { + let number = rand::thread_rng().gen::(); + let block = get_executed_block_with_number(number); + + let state = State::new(block.clone()); + + assert_eq!(state.0, block); + } + + #[tokio::test] + async fn test_state_block() { + let number = rand::thread_rng().gen::(); + let block = get_executed_block_with_number(number); + + let state = State::new(block.clone()); + + assert_eq!(state.block(), block); + } + + #[tokio::test] + async fn test_state_hash() { + let number = rand::thread_rng().gen::(); + let block = get_executed_block_with_number(number); + + let state = State::new(block.clone()); + + assert_eq!(state.hash(), block.block().hash()); + } + + #[tokio::test] + async fn test_state_number() { + let number = rand::thread_rng().gen::(); + let block = get_executed_block_with_number(number); + + let state = State::new(block); + + assert_eq!(state.number(), number); + } + + #[tokio::test] + async fn test_state_state_root() { + let number = rand::thread_rng().gen::(); + let block = get_executed_block_with_number(number); + + let state = State::new(block.clone()); + + assert_eq!(state.state_root(), block.block().state_root); + } + + #[tokio::test] + async fn test_state_receipts() { + let receipts = Receipts { receipt_vec: vec![vec![Some(Receipt::default())]] }; + + let block = get_executed_block_with_receipts(receipts.clone()); + + let state = State::new(block); + + assert_eq!(state.receipts(), &receipts); + } }