diff --git a/crates/chain-state/src/in_memory.rs b/crates/chain-state/src/in_memory.rs index c84b6905e..3614bc7e4 100644 --- a/crates/chain-state/src/in_memory.rs +++ b/crates/chain-state/src/in_memory.rs @@ -5,13 +5,13 @@ use crate::{ }; use parking_lot::RwLock; use reth_chainspec::ChainInfo; -use reth_execution_types::ExecutionOutcome; +use reth_execution_types::{Chain, ExecutionOutcome}; use reth_primitives::{ Address, BlockNumHash, Header, Receipt, Receipts, SealedBlock, SealedBlockWithSenders, SealedHeader, B256, }; use reth_trie::{updates::TrieUpdates, HashedPostState}; -use std::{collections::HashMap, sync::Arc, time::Instant}; +use std::{collections::HashMap, ops::Deref, sync::Arc, time::Instant}; use tokio::sync::broadcast; /// Size of the broadcast channel used to notify canonical state events. @@ -68,6 +68,11 @@ impl InMemoryState { pub(crate) fn pending_state(&self) -> Option> { self.pending.read().as_ref().map(|state| Arc::new(BlockState::new(state.block.clone()))) } + + #[cfg(test)] + fn block_count(&self) -> usize { + self.blocks.read().len() + } } /// Inner type to provide in memory state. It includes a chain tracker to be @@ -128,6 +133,52 @@ impl CanonicalInMemoryState { Self { inner: Arc::new(inner) } } + /// Append new blocks to the in memory state. + fn update_blocks(&self, new_blocks: I, reorged: I) + where + I: IntoIterator, + { + // acquire all locks + let mut blocks = self.inner.in_memory_state.blocks.write(); + let mut numbers = self.inner.in_memory_state.numbers.write(); + let mut pending = self.inner.in_memory_state.pending.write(); + + // we first remove the blocks from the reorged chain + for block in reorged { + let hash = block.block().hash(); + let number = block.block().number; + blocks.remove(&hash); + numbers.remove(&number); + } + + // insert the new blocks + for block in new_blocks { + let parent = blocks.get(&block.block().parent_hash).cloned(); + let block_state = BlockState::with_parent(block.clone(), parent.map(|p| (*p).clone())); + let hash = block_state.hash(); + let number = block_state.number(); + + // append new blocks + blocks.insert(hash, Arc::new(block_state)); + numbers.insert(number, hash); + } + + // remove the pending state + pending.take(); + } + + /// Update the in memory state with the given chain update. + pub fn update_chain(&self, new_chain: NewCanonicalChain) { + match new_chain { + NewCanonicalChain::Commit { new } => { + self.update_blocks(new, vec![]); + } + NewCanonicalChain::Reorg { new, old } => { + self.update_blocks(new, old); + } + } + } + /// Returns in memory state corresponding the given hash. pub fn state_by_hash(&self, hash: B256) -> Option> { self.inner.in_memory_state.state_by_hash(hash) @@ -283,6 +334,11 @@ impl BlockState { Self { block, parent: None } } + /// `BlockState` constructor with parent. + pub fn with_parent(block: ExecutedBlock, parent: Option) -> Self { + Self { block, parent: parent.map(Box::new) } + } + /// Returns the hash and block of the on disk block this state can be traced back to. pub fn anchor(&self) -> BlockNumHash { if let Some(parent) = &self.parent { @@ -401,6 +457,65 @@ impl ExecutedBlock { } } +/// Non-empty chain of blocks. +#[derive(Debug)] +pub enum NewCanonicalChain { + /// A simple append to the current canonical head + Commit { + /// all blocks that lead back to the canonical head + new: Vec, + }, + /// A reorged chain consists of two chains that trace back to a shared ancestor block at which + /// point they diverge. + Reorg { + /// All blocks of the _new_ chain + new: Vec, + /// All blocks of the _old_ chain + old: Vec, + }, +} + +impl NewCanonicalChain { + /// Converts the new chain into a notification that will be emitted to listeners + pub fn to_chain_notification(&self) -> CanonStateNotification { + // TODO: do we need to merge execution outcome for multiblock commit or reorg? + // implement this properly + match self { + Self::Commit { new } => CanonStateNotification::Commit { + new: Arc::new(Chain::new( + new.iter().map(ExecutedBlock::sealed_block_with_senders), + new.last().unwrap().execution_output.deref().clone(), + None, + )), + }, + Self::Reorg { new, old } => CanonStateNotification::Reorg { + new: Arc::new(Chain::new( + new.iter().map(ExecutedBlock::sealed_block_with_senders), + new.last().unwrap().execution_output.deref().clone(), + None, + )), + old: Arc::new(Chain::new( + old.iter().map(ExecutedBlock::sealed_block_with_senders), + old.last().unwrap().execution_output.deref().clone(), + None, + )), + }, + } + } + + /// Returns the new tip of the chain. + /// + /// Returns the new tip for [`Self::Reorg`] and [`Self::Commit`] variants which commit at least + /// 1 new block. + pub fn tip(&self) -> &SealedBlock { + match self { + Self::Commit { new } | Self::Reorg { new, .. } => { + new.last().expect("non empty blocks").block() + } + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -546,4 +661,22 @@ mod tests { assert_eq!(state.receipts(), &receipts); } + + #[test] + fn test_in_memory_state_chain_update() { + let state = CanonicalInMemoryState::new(HashMap::new(), HashMap::new(), None); + let block1 = get_executed_block_with_number(0); + let block2 = get_executed_block_with_number(0); + let chain = NewCanonicalChain::Commit { new: vec![block1.clone()] }; + state.update_chain(chain); + assert_eq!(state.head_state().unwrap().block().block().hash(), block1.block().hash()); + assert_eq!(state.state_by_number(0).unwrap().block().block().hash(), block1.block().hash()); + + let chain = NewCanonicalChain::Reorg { new: vec![block2.clone()], old: vec![block1] }; + state.update_chain(chain); + assert_eq!(state.head_state().unwrap().block().block().hash(), block2.block().hash()); + assert_eq!(state.state_by_number(0).unwrap().block().block().hash(), block2.block().hash()); + + assert_eq!(state.inner.in_memory_state.block_count(), 1); + } } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 7b96713ff..b953d3472 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -13,7 +13,7 @@ use reth_blockchain_tree::{ error::InsertBlockErrorKind, BlockAttachment, BlockBuffer, BlockStatus, }; use reth_blockchain_tree_api::{error::InsertBlockError, InsertPayloadOk}; -use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock}; +use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain}; use reth_consensus::{Consensus, PostExecutionInput}; use reth_engine_primitives::EngineTypes; use reth_errors::{ConsensusError, ProviderResult}; @@ -26,8 +26,7 @@ use reth_primitives::{ SealedBlockWithSenders, SealedHeader, B256, U256, }; use reth_provider::{ - BlockReader, CanonStateNotification, Chain, ExecutionOutcome, StateProvider, - StateProviderFactory, StateRootProvider, + BlockReader, ExecutionOutcome, StateProvider, StateProviderFactory, StateRootProvider, }; use reth_revm::database::StateProviderDatabase; use reth_rpc_types::{ @@ -41,7 +40,6 @@ use reth_stages_api::ControlFlow; use reth_trie::HashedPostState; use std::{ collections::{BTreeMap, HashMap}, - ops::Deref, sync::{mpsc::Receiver, Arc}, }; use tokio::sync::{ @@ -188,62 +186,6 @@ impl TreeState { } } -/// Non-empty chain of blocks. -enum NewCanonicalChain { - /// A simple append to the current canonical head - Commit { - /// all blocks that lead back to the canonical head - new: Vec, - }, - /// A reorged chain consists of two chains that trace back to a shared ancestor block at which - /// point they diverge. - Reorg { - /// All blocks of the _new_ chain - new: Vec, - /// All blocks of the _old_ chain - old: Vec, - }, -} - -impl NewCanonicalChain { - /// Converts the new chain into a notification that will be emitted to listeners - fn to_chain_notification(&self) -> CanonStateNotification { - // TODO: do we need to merge execution outcome for multiblock commit or reorg? - // implement this properly - match self { - Self::Commit { new } => CanonStateNotification::Commit { - new: Arc::new(Chain::new( - new.iter().map(ExecutedBlock::sealed_block_with_senders), - new.last().unwrap().execution_output.deref().clone(), - None, - )), - }, - Self::Reorg { new, old } => CanonStateNotification::Reorg { - new: Arc::new(Chain::new( - new.iter().map(ExecutedBlock::sealed_block_with_senders), - new.last().unwrap().execution_output.deref().clone(), - None, - )), - old: Arc::new(Chain::new( - old.iter().map(ExecutedBlock::sealed_block_with_senders), - old.last().unwrap().execution_output.deref().clone(), - None, - )), - }, - } - } - - /// Returns the new tip of the chain. - /// - /// Returns the new tip for [`Self::Reorg`] and [`Self::Commit`] variants which commit at least - /// 1 new block. - fn tip(&self) -> &SealedBlock { - match self { - Self::Commit { new } | Self::Reorg { new, .. } => new.last().unwrap().block(), - } - } -} - /// Tracks the state of the engine api internals. /// /// This type is not shareable. @@ -1418,16 +1360,19 @@ where } // 2. ensure we can apply a new chain update for the head block - if let Some(update) = self.state.tree_state.on_new_head(state.head_block_hash) { + if let Some(chain_update) = self.state.tree_state.on_new_head(state.head_block_hash) { // update the tracked canonical head - self.state.tree_state.set_canonical_head(update.tip().num_hash()); - // TODO - // update inmemory state + self.state.tree_state.set_canonical_head(chain_update.tip().num_hash()); - self.canonical_in_memory_state.set_canonical_head(update.tip().header.clone()); + let tip = chain_update.tip().header.clone(); + let notification = chain_update.to_chain_notification(); + + // update the tracked in-memory state with the new chain + self.canonical_in_memory_state.update_chain(chain_update); + self.canonical_in_memory_state.set_canonical_head(tip.clone()); // sends an event to all active listeners about the new canonical chain - self.canonical_in_memory_state.notify_canon_state(update.to_chain_notification()); + self.canonical_in_memory_state.notify_canon_state(notification); // update the safe and finalized blocks and ensure their values are valid, but only // after the head block is made canonical @@ -1437,7 +1382,7 @@ where } if let Some(attr) = attrs { - let updated = self.process_payload_attributes(attr, &update.tip().header, state); + let updated = self.process_payload_attributes(attr, &tip, state); return Ok(TreeOutcome::new(updated)) }