feat: perform canonical in memory state update (#9763)

This commit is contained in:
Matthias Seitz
2024-07-24 16:17:52 +02:00
committed by GitHub
parent 212196522b
commit 7ad93d5ae4
2 changed files with 147 additions and 69 deletions

View File

@ -5,13 +5,13 @@ use crate::{
};
use parking_lot::RwLock;
use reth_chainspec::ChainInfo;
use reth_execution_types::ExecutionOutcome;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_primitives::{
Address, BlockNumHash, Header, Receipt, Receipts, SealedBlock, SealedBlockWithSenders,
SealedHeader, B256,
};
use reth_trie::{updates::TrieUpdates, HashedPostState};
use std::{collections::HashMap, sync::Arc, time::Instant};
use std::{collections::HashMap, ops::Deref, sync::Arc, time::Instant};
use tokio::sync::broadcast;
/// Size of the broadcast channel used to notify canonical state events.
@ -68,6 +68,11 @@ impl InMemoryState {
pub(crate) fn pending_state(&self) -> Option<Arc<BlockState>> {
self.pending.read().as_ref().map(|state| Arc::new(BlockState::new(state.block.clone())))
}
#[cfg(test)]
fn block_count(&self) -> usize {
self.blocks.read().len()
}
}
/// Inner type to provide in memory state. It includes a chain tracker to be
@ -128,6 +133,52 @@ impl CanonicalInMemoryState {
Self { inner: Arc::new(inner) }
}
/// Append new blocks to the in memory state.
fn update_blocks<I>(&self, new_blocks: I, reorged: I)
where
I: IntoIterator<Item = ExecutedBlock>,
{
// acquire all locks
let mut blocks = self.inner.in_memory_state.blocks.write();
let mut numbers = self.inner.in_memory_state.numbers.write();
let mut pending = self.inner.in_memory_state.pending.write();
// we first remove the blocks from the reorged chain
for block in reorged {
let hash = block.block().hash();
let number = block.block().number;
blocks.remove(&hash);
numbers.remove(&number);
}
// insert the new blocks
for block in new_blocks {
let parent = blocks.get(&block.block().parent_hash).cloned();
let block_state = BlockState::with_parent(block.clone(), parent.map(|p| (*p).clone()));
let hash = block_state.hash();
let number = block_state.number();
// append new blocks
blocks.insert(hash, Arc::new(block_state));
numbers.insert(number, hash);
}
// remove the pending state
pending.take();
}
/// Update the in memory state with the given chain update.
pub fn update_chain(&self, new_chain: NewCanonicalChain) {
match new_chain {
NewCanonicalChain::Commit { new } => {
self.update_blocks(new, vec![]);
}
NewCanonicalChain::Reorg { new, old } => {
self.update_blocks(new, old);
}
}
}
/// Returns in memory state corresponding the given hash.
pub fn state_by_hash(&self, hash: B256) -> Option<Arc<BlockState>> {
self.inner.in_memory_state.state_by_hash(hash)
@ -283,6 +334,11 @@ impl BlockState {
Self { block, parent: None }
}
/// `BlockState` constructor with parent.
pub fn with_parent(block: ExecutedBlock, parent: Option<Self>) -> Self {
Self { block, parent: parent.map(Box::new) }
}
/// Returns the hash and block of the on disk block this state can be traced back to.
pub fn anchor(&self) -> BlockNumHash {
if let Some(parent) = &self.parent {
@ -401,6 +457,65 @@ impl ExecutedBlock {
}
}
/// Non-empty chain of blocks.
#[derive(Debug)]
pub enum NewCanonicalChain {
/// A simple append to the current canonical head
Commit {
/// all blocks that lead back to the canonical head
new: Vec<ExecutedBlock>,
},
/// A reorged chain consists of two chains that trace back to a shared ancestor block at which
/// point they diverge.
Reorg {
/// All blocks of the _new_ chain
new: Vec<ExecutedBlock>,
/// All blocks of the _old_ chain
old: Vec<ExecutedBlock>,
},
}
impl NewCanonicalChain {
/// Converts the new chain into a notification that will be emitted to listeners
pub fn to_chain_notification(&self) -> CanonStateNotification {
// TODO: do we need to merge execution outcome for multiblock commit or reorg?
// implement this properly
match self {
Self::Commit { new } => CanonStateNotification::Commit {
new: Arc::new(Chain::new(
new.iter().map(ExecutedBlock::sealed_block_with_senders),
new.last().unwrap().execution_output.deref().clone(),
None,
)),
},
Self::Reorg { new, old } => CanonStateNotification::Reorg {
new: Arc::new(Chain::new(
new.iter().map(ExecutedBlock::sealed_block_with_senders),
new.last().unwrap().execution_output.deref().clone(),
None,
)),
old: Arc::new(Chain::new(
old.iter().map(ExecutedBlock::sealed_block_with_senders),
old.last().unwrap().execution_output.deref().clone(),
None,
)),
},
}
}
/// Returns the new tip of the chain.
///
/// Returns the new tip for [`Self::Reorg`] and [`Self::Commit`] variants which commit at least
/// 1 new block.
pub fn tip(&self) -> &SealedBlock {
match self {
Self::Commit { new } | Self::Reorg { new, .. } => {
new.last().expect("non empty blocks").block()
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -546,4 +661,22 @@ mod tests {
assert_eq!(state.receipts(), &receipts);
}
#[test]
fn test_in_memory_state_chain_update() {
let state = CanonicalInMemoryState::new(HashMap::new(), HashMap::new(), None);
let block1 = get_executed_block_with_number(0);
let block2 = get_executed_block_with_number(0);
let chain = NewCanonicalChain::Commit { new: vec![block1.clone()] };
state.update_chain(chain);
assert_eq!(state.head_state().unwrap().block().block().hash(), block1.block().hash());
assert_eq!(state.state_by_number(0).unwrap().block().block().hash(), block1.block().hash());
let chain = NewCanonicalChain::Reorg { new: vec![block2.clone()], old: vec![block1] };
state.update_chain(chain);
assert_eq!(state.head_state().unwrap().block().block().hash(), block2.block().hash());
assert_eq!(state.state_by_number(0).unwrap().block().block().hash(), block2.block().hash());
assert_eq!(state.inner.in_memory_state.block_count(), 1);
}
}

View File

@ -13,7 +13,7 @@ use reth_blockchain_tree::{
error::InsertBlockErrorKind, BlockAttachment, BlockBuffer, BlockStatus,
};
use reth_blockchain_tree_api::{error::InsertBlockError, InsertPayloadOk};
use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock};
use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain};
use reth_consensus::{Consensus, PostExecutionInput};
use reth_engine_primitives::EngineTypes;
use reth_errors::{ConsensusError, ProviderResult};
@ -26,8 +26,7 @@ use reth_primitives::{
SealedBlockWithSenders, SealedHeader, B256, U256,
};
use reth_provider::{
BlockReader, CanonStateNotification, Chain, ExecutionOutcome, StateProvider,
StateProviderFactory, StateRootProvider,
BlockReader, ExecutionOutcome, StateProvider, StateProviderFactory, StateRootProvider,
};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_types::{
@ -41,7 +40,6 @@ use reth_stages_api::ControlFlow;
use reth_trie::HashedPostState;
use std::{
collections::{BTreeMap, HashMap},
ops::Deref,
sync::{mpsc::Receiver, Arc},
};
use tokio::sync::{
@ -188,62 +186,6 @@ impl TreeState {
}
}
/// Non-empty chain of blocks.
enum NewCanonicalChain {
/// A simple append to the current canonical head
Commit {
/// all blocks that lead back to the canonical head
new: Vec<ExecutedBlock>,
},
/// A reorged chain consists of two chains that trace back to a shared ancestor block at which
/// point they diverge.
Reorg {
/// All blocks of the _new_ chain
new: Vec<ExecutedBlock>,
/// All blocks of the _old_ chain
old: Vec<ExecutedBlock>,
},
}
impl NewCanonicalChain {
/// Converts the new chain into a notification that will be emitted to listeners
fn to_chain_notification(&self) -> CanonStateNotification {
// TODO: do we need to merge execution outcome for multiblock commit or reorg?
// implement this properly
match self {
Self::Commit { new } => CanonStateNotification::Commit {
new: Arc::new(Chain::new(
new.iter().map(ExecutedBlock::sealed_block_with_senders),
new.last().unwrap().execution_output.deref().clone(),
None,
)),
},
Self::Reorg { new, old } => CanonStateNotification::Reorg {
new: Arc::new(Chain::new(
new.iter().map(ExecutedBlock::sealed_block_with_senders),
new.last().unwrap().execution_output.deref().clone(),
None,
)),
old: Arc::new(Chain::new(
old.iter().map(ExecutedBlock::sealed_block_with_senders),
old.last().unwrap().execution_output.deref().clone(),
None,
)),
},
}
}
/// Returns the new tip of the chain.
///
/// Returns the new tip for [`Self::Reorg`] and [`Self::Commit`] variants which commit at least
/// 1 new block.
fn tip(&self) -> &SealedBlock {
match self {
Self::Commit { new } | Self::Reorg { new, .. } => new.last().unwrap().block(),
}
}
}
/// Tracks the state of the engine api internals.
///
/// This type is not shareable.
@ -1418,16 +1360,19 @@ where
}
// 2. ensure we can apply a new chain update for the head block
if let Some(update) = self.state.tree_state.on_new_head(state.head_block_hash) {
if let Some(chain_update) = self.state.tree_state.on_new_head(state.head_block_hash) {
// update the tracked canonical head
self.state.tree_state.set_canonical_head(update.tip().num_hash());
// TODO
// update inmemory state
self.state.tree_state.set_canonical_head(chain_update.tip().num_hash());
self.canonical_in_memory_state.set_canonical_head(update.tip().header.clone());
let tip = chain_update.tip().header.clone();
let notification = chain_update.to_chain_notification();
// update the tracked in-memory state with the new chain
self.canonical_in_memory_state.update_chain(chain_update);
self.canonical_in_memory_state.set_canonical_head(tip.clone());
// sends an event to all active listeners about the new canonical chain
self.canonical_in_memory_state.notify_canon_state(update.to_chain_notification());
self.canonical_in_memory_state.notify_canon_state(notification);
// update the safe and finalized blocks and ensure their values are valid, but only
// after the head block is made canonical
@ -1437,7 +1382,7 @@ where
}
if let Some(attr) = attrs {
let updated = self.process_payload_attributes(attr, &update.tip().header, state);
let updated = self.process_payload_attributes(attr, &tip, state);
return Ok(TreeOutcome::new(updated))
}