feat: add in memory state container (#9574)

This commit is contained in:
Federico Gimenez
2024-07-17 18:37:27 +02:00
committed by GitHub
parent 281ecf9627
commit 0994d36c4e
4 changed files with 261 additions and 69 deletions

1
Cargo.lock generated
View File

@ -6947,6 +6947,7 @@ dependencies = [
"futures", "futures",
"metrics", "metrics",
"parking_lot 0.12.3", "parking_lot 0.12.3",
"rand 0.8.5",
"reth-beacon-consensus", "reth-beacon-consensus",
"reth-blockchain-tree", "reth-blockchain-tree",
"reth-blockchain-tree-api", "reth-blockchain-tree-api",

View File

@ -59,6 +59,7 @@ tracing.workspace = true
# optional deps for test-utils # optional deps for test-utils
reth-stages = { workspace = true, optional = true } reth-stages = { workspace = true, optional = true }
reth-tracing = { workspace = true, optional = true } reth-tracing = { workspace = true, optional = true }
rand = { workspace = true, optional = true }
[dev-dependencies] [dev-dependencies]
# reth # reth
@ -73,11 +74,13 @@ reth-stages = { workspace = true, features = ["test-utils"] }
reth-tracing.workspace = true reth-tracing.workspace = true
assert_matches.workspace = true assert_matches.workspace = true
rand.workspace = true
[features] [features]
test-utils = [ test-utils = [
"reth-db/test-utils", "reth-db/test-utils",
"reth-network-p2p/test-utils", "reth-network-p2p/test-utils",
"reth-stages/test-utils", "reth-stages/test-utils",
"reth-tracing" "reth-tracing",
"rand"
] ]

View File

@ -1,4 +1,5 @@
use crate::tree::ExecutedBlock; use crate::tree::ExecutedBlock;
use rand::Rng;
use reth_chainspec::ChainSpec; use reth_chainspec::ChainSpec;
use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase}; use reth_db::{mdbx::DatabaseEnv, test_utils::TempDatabase};
use reth_network_p2p::test_utils::TestFullBlockClient; use reth_network_p2p::test_utils::TestFullBlockClient;
@ -82,7 +83,7 @@ pub(crate) fn insert_headers_into_client(
} }
} }
pub(crate) fn get_executed_block_with_number(block_number: BlockNumber) -> ExecutedBlock { fn get_executed_block(block_number: BlockNumber, receipts: Receipts) -> ExecutedBlock {
let mut block = Block::default(); let mut block = Block::default();
let mut header = block.header.clone(); let mut header = block.header.clone();
header.number = block_number; header.number = block_number;
@ -99,7 +100,7 @@ pub(crate) fn get_executed_block_with_number(block_number: BlockNumber) -> Execu
Arc::new(sealed_with_senders.senders), Arc::new(sealed_with_senders.senders),
Arc::new(ExecutionOutcome::new( Arc::new(ExecutionOutcome::new(
BundleState::default(), BundleState::default(),
Receipts { receipt_vec: vec![vec![]] }, receipts,
block_number, block_number,
vec![Requests::default()], vec![Requests::default()],
)), )),
@ -108,6 +109,16 @@ pub(crate) fn get_executed_block_with_number(block_number: BlockNumber) -> Execu
) )
} }
pub(crate) fn get_executed_block_with_receipts(receipts: Receipts) -> ExecutedBlock {
let number = rand::thread_rng().gen::<u64>();
get_executed_block(number, receipts)
}
pub(crate) fn get_executed_block_with_number(block_number: BlockNumber) -> ExecutedBlock {
get_executed_block(block_number, Receipts { receipt_vec: vec![vec![]] })
}
pub(crate) fn get_executed_blocks(range: Range<u64>) -> impl Iterator<Item = ExecutedBlock> { pub(crate) fn get_executed_blocks(range: Range<u64>) -> impl Iterator<Item = ExecutedBlock> {
range.map(get_executed_block_with_number) range.map(get_executed_block_with_number)
} }

View File

@ -152,27 +152,43 @@ impl TreeState {
} }
} }
impl InMemoryState for TreeState { /// Container type for in memory state data.
fn in_memory_state_by_hash(&self, hash: B256) -> Option<Arc<State>> { #[derive(Debug, Default)]
let sealed_block = self.block_by_hash(hash)?; pub struct InMemoryStateImpl {
Some(Arc::new(State::new(sealed_block))) blocks: HashMap<B256, Arc<State>>,
numbers: HashMap<u64, B256>,
pending: Option<State>,
}
impl InMemoryStateImpl {
const fn new(
blocks: HashMap<B256, Arc<State>>,
numbers: HashMap<u64, B256>,
pending: Option<State>,
) -> Self {
Self { blocks, numbers, pending }
}
}
impl InMemoryState for InMemoryStateImpl {
fn state_by_hash(&self, hash: B256) -> Option<Arc<State>> {
self.blocks.get(&hash).cloned()
} }
fn in_memory_state_by_number(&self, number: u64) -> Option<Arc<State>> { fn state_by_number(&self, number: u64) -> Option<Arc<State>> {
let sealed_block = self.block_by_number(number)?; self.numbers.get(&number).and_then(|hash| self.blocks.get(hash).cloned())
Some(Arc::new(State::new(sealed_block)))
} }
fn in_memory_current_head(&self) -> Option<(BlockNumber, B256)> { fn current_head(&self) -> Option<(BlockNumber, B256)> {
self.current_head self.numbers.iter().max_by_key(|(&number, _)| number).map(|(&number, &hash)| (number, hash))
} }
fn in_memory_pending_state(&self) -> Option<Arc<State>> { fn pending_block_hash(&self) -> Option<B256> {
self.pending.clone() self.pending.as_ref().map(|state| state.hash())
} }
fn in_memory_pending_block_hash(&self) -> Option<B256> { fn pending_state(&self) -> Option<Arc<State>> {
self.pending.as_ref().map(|state| state.block_hash) self.pending.as_ref().map(|state| Arc::new(State(state.0.clone())))
} }
} }
@ -300,6 +316,7 @@ pub struct EngineApiTreeHandlerImpl<P, E, T: EngineTypes> {
persistence_state: PersistenceState, persistence_state: PersistenceState,
/// (tmp) The flag indicating whether the pipeline is active. /// (tmp) The flag indicating whether the pipeline is active.
is_pipeline_active: bool, is_pipeline_active: bool,
canonical_in_memory_state: InMemoryStateImpl,
_marker: PhantomData<T>, _marker: PhantomData<T>,
} }
@ -331,6 +348,7 @@ where
persistence_state: PersistenceState::default(), persistence_state: PersistenceState::default(),
is_pipeline_active: false, is_pipeline_active: false,
state, state,
canonical_in_memory_state: InMemoryStateImpl::default(),
_marker: PhantomData, _marker: PhantomData,
} }
} }
@ -914,54 +932,63 @@ impl PersistenceState {
/// Represents the tree state kept in memory. /// Represents the tree state kept in memory.
trait InMemoryState: Send + Sync { trait InMemoryState: Send + Sync {
/// Returns the state for a given block hash. /// Returns the state for a given block hash.
fn in_memory_state_by_hash(&self, hash: B256) -> Option<Arc<State>>; fn state_by_hash(&self, hash: B256) -> Option<Arc<State>>;
/// Returns the state for a given block number. /// Returns the state for a given block number.
fn in_memory_state_by_number(&self, number: u64) -> Option<Arc<State>>; fn state_by_number(&self, number: u64) -> Option<Arc<State>>;
/// Returns the current chain head. /// Returns the current chain head.
fn in_memory_current_head(&self) -> Option<(BlockNumber, B256)>; fn current_head(&self) -> Option<(BlockNumber, B256)>;
/// Returns the pending block hash. /// Returns the pending block hash.
fn in_memory_pending_block_hash(&self) -> Option<B256>; fn pending_block_hash(&self) -> Option<B256>;
/// Returns the pending state corresponding to the current head plus one, /// Returns the pending state corresponding to the current head plus one,
/// from the payload received in newPayload that does not have a FCU yet. /// from the payload received in newPayload that does not have a FCU yet.
fn in_memory_pending_state(&self) -> Option<Arc<State>>; fn pending_state(&self) -> Option<Arc<State>>;
} }
/// State composed of a block hash, and the receipts, state and transactions root /// State after applying the given block.
/// after executing it.
#[derive(Debug, PartialEq, Eq)] #[derive(Debug, PartialEq, Eq)]
pub struct State { pub struct State(ExecutedBlock);
/// Block hash defining the state.
block_hash: B256,
/// Block number defining the state.
block_number: BlockNumber,
/// State root after applying the block.
state_root: B256,
/// Transactions root of the block.
transactions_root: B256,
/// Receipts root after applying the block.
receipts_root: B256,
}
impl State { impl State {
fn new(sealed_block: Arc<SealedBlock>) -> Self { const fn new(executed_block: ExecutedBlock) -> Self {
Self { Self(executed_block)
block_hash: sealed_block.hash(), }
block_number: sealed_block.number,
state_root: sealed_block.state_root, fn block(&self) -> ExecutedBlock {
transactions_root: sealed_block.transactions_root, self.0.clone()
receipts_root: sealed_block.receipts_root, }
}
fn hash(&self) -> B256 {
self.0.block().hash()
}
fn number(&self) -> u64 {
self.0.block().number
}
fn state_root(&self) -> B256 {
self.0.block().header.state_root
}
fn receipts(&self) -> &Receipts {
&self.0.execution_outcome().receipts
} }
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::{static_files::StaticFileAction, test_utils::get_executed_blocks}; use crate::{
static_files::StaticFileAction,
test_utils::{
get_executed_block_with_number, get_executed_block_with_receipts, get_executed_blocks,
},
};
use rand::Rng;
use reth_beacon_consensus::EthBeaconConsensus; use reth_beacon_consensus::EthBeaconConsensus;
use reth_chainspec::{ChainSpecBuilder, MAINNET}; use reth_chainspec::{ChainSpecBuilder, MAINNET};
use reth_ethereum_engine_primitives::EthEngineTypes; use reth_ethereum_engine_primitives::EthEngineTypes;
use reth_evm::test_utils::MockExecutorProvider; use reth_evm::test_utils::MockExecutorProvider;
use reth_primitives::Receipt;
use reth_provider::test_utils::MockEthProvider; use reth_provider::test_utils::MockEthProvider;
use std::sync::mpsc::{channel, Sender}; use std::sync::mpsc::{channel, Sender};
use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::unbounded_channel;
@ -978,12 +1005,16 @@ mod tests {
let mut blocks_by_hash = HashMap::new(); let mut blocks_by_hash = HashMap::new();
let mut blocks_by_number = BTreeMap::new(); let mut blocks_by_number = BTreeMap::new();
let mut state_by_hash = HashMap::new();
let mut hash_by_number = HashMap::new();
for block in &blocks { for block in &blocks {
blocks_by_hash.insert(block.block().hash(), block.clone()); let sealed_block = block.block();
blocks_by_number let hash = sealed_block.hash();
.entry(block.block().number) let number = sealed_block.number;
.or_insert_with(Vec::new) blocks_by_hash.insert(hash, block.clone());
.push(block.clone()); blocks_by_number.entry(number).or_insert_with(Vec::new).push(block.clone());
state_by_hash.insert(hash, Arc::new(State(block.clone())));
hash_by_number.insert(number, hash);
} }
let tree_state = TreeState { blocks_by_hash, blocks_by_number, ..Default::default() }; let tree_state = TreeState { blocks_by_hash, blocks_by_number, ..Default::default() };
@ -1016,21 +1047,26 @@ mod tests {
forkchoice_state_tracker: ForkchoiceStateTracker::default(), forkchoice_state_tracker: ForkchoiceStateTracker::default(),
}; };
TestHarness { let mut tree = EngineApiTreeHandlerImpl::new(
tree: EngineApiTreeHandlerImpl::new( provider,
provider, executor_factory,
executor_factory, consensus,
consensus, payload_validator,
payload_validator, to_tree_rx,
to_tree_rx, from_tree_tx,
from_tree_tx, engine_api_tree_state,
engine_api_tree_state, persistence_handle,
persistence_handle, );
), let last_executed_block = blocks.last().unwrap().clone();
to_tree_tx, let pending = Some(State::new(last_executed_block));
blocks, tree.canonical_in_memory_state =
sf_action_rx, InMemoryStateImpl::new(state_by_hash, hash_by_number, pending);
}
TestHarness { tree, to_tree_tx, blocks, sf_action_rx }
}
fn create_mock_state(block_number: u64) -> State {
State::new(get_executed_block_with_number(block_number))
} }
#[tokio::test] #[tokio::test]
@ -1055,8 +1091,8 @@ mod tests {
} }
} }
#[test] #[tokio::test]
fn test_in_memory_state_trait_impl() { async fn test_in_memory_state_trait_impl() {
let TestHarness { tree, to_tree_tx, sf_action_rx, blocks } = get_default_test_harness(10); let TestHarness { tree, to_tree_tx, sf_action_rx, blocks } = get_default_test_harness(10);
let head_block = blocks.last().unwrap().block(); let head_block = blocks.last().unwrap().block();
@ -1065,15 +1101,156 @@ mod tests {
for executed_block in blocks { for executed_block in blocks {
let sealed_block = executed_block.block(); let sealed_block = executed_block.block();
let expected_state = State::new(Arc::new(sealed_block.clone())); let expected_state = State::new(executed_block.clone());
let actual_state_by_hash = let actual_state_by_hash =
tree.state.tree_state.in_memory_state_by_hash(sealed_block.hash()).unwrap(); tree.canonical_in_memory_state.state_by_hash(sealed_block.hash()).unwrap();
assert_eq!(expected_state, *actual_state_by_hash); assert_eq!(expected_state, *actual_state_by_hash);
let actual_state_by_number = let actual_state_by_number =
tree.state.tree_state.in_memory_state_by_number(sealed_block.number).unwrap(); tree.canonical_in_memory_state.state_by_number(sealed_block.number).unwrap();
assert_eq!(expected_state, *actual_state_by_number); assert_eq!(expected_state, *actual_state_by_number);
} }
} }
#[tokio::test]
async fn test_in_memory_state_impl_state_by_hash() {
let mut state_by_hash = HashMap::new();
let number = rand::thread_rng().gen::<u64>();
let state = Arc::new(create_mock_state(number));
state_by_hash.insert(state.hash(), state.clone());
let in_memory_state = InMemoryStateImpl::new(state_by_hash, HashMap::new(), None);
assert_eq!(in_memory_state.state_by_hash(state.hash()), Some(state));
assert_eq!(in_memory_state.state_by_hash(B256::random()), None);
}
#[tokio::test]
async fn test_in_memory_state_impl_state_by_number() {
let mut state_by_hash = HashMap::new();
let mut hash_by_number = HashMap::new();
let number = rand::thread_rng().gen::<u64>();
let state = Arc::new(create_mock_state(number));
let hash = state.hash();
state_by_hash.insert(hash, state.clone());
hash_by_number.insert(number, hash);
let in_memory_state = InMemoryStateImpl::new(state_by_hash, hash_by_number, None);
assert_eq!(in_memory_state.state_by_number(number), Some(state));
assert_eq!(in_memory_state.state_by_number(number + 1), None);
}
#[tokio::test]
async fn test_in_memory_state_impl_current_head() {
let mut hash_by_number = HashMap::new();
let hash1 = B256::random();
let hash2 = B256::random();
hash_by_number.insert(1, hash1);
hash_by_number.insert(2, hash2);
let in_memory_state = InMemoryStateImpl::new(HashMap::new(), hash_by_number, None);
assert_eq!(in_memory_state.current_head(), Some((2, hash2)));
}
#[tokio::test]
async fn test_in_memory_state_impl_pending_block_hash() {
let number = rand::thread_rng().gen::<u64>();
let pending_state = create_mock_state(number);
let pending_hash = pending_state.hash();
let in_memory_state =
InMemoryStateImpl::new(HashMap::new(), HashMap::new(), Some(pending_state));
assert_eq!(in_memory_state.pending_block_hash(), Some(pending_hash));
}
#[tokio::test]
async fn test_in_memory_state_impl_pending_state() {
let pending_number = rand::thread_rng().gen::<u64>();
let pending_state = create_mock_state(pending_number);
let pending_hash = pending_state.hash();
let in_memory_state =
InMemoryStateImpl::new(HashMap::new(), HashMap::new(), Some(pending_state));
let result = in_memory_state.pending_state();
assert!(result.is_some());
let actual_pending_state = result.unwrap();
assert_eq!(actual_pending_state.0.block().hash(), pending_hash);
assert_eq!(actual_pending_state.0.block().number, pending_number);
}
#[tokio::test]
async fn test_in_memory_state_impl_no_pending_state() {
let in_memory_state = InMemoryStateImpl::new(HashMap::new(), HashMap::new(), None);
assert_eq!(in_memory_state.pending_block_hash(), None);
assert_eq!(in_memory_state.pending_state(), None);
}
#[tokio::test]
async fn test_state_new() {
let number = rand::thread_rng().gen::<u64>();
let block = get_executed_block_with_number(number);
let state = State::new(block.clone());
assert_eq!(state.0, block);
}
#[tokio::test]
async fn test_state_block() {
let number = rand::thread_rng().gen::<u64>();
let block = get_executed_block_with_number(number);
let state = State::new(block.clone());
assert_eq!(state.block(), block);
}
#[tokio::test]
async fn test_state_hash() {
let number = rand::thread_rng().gen::<u64>();
let block = get_executed_block_with_number(number);
let state = State::new(block.clone());
assert_eq!(state.hash(), block.block().hash());
}
#[tokio::test]
async fn test_state_number() {
let number = rand::thread_rng().gen::<u64>();
let block = get_executed_block_with_number(number);
let state = State::new(block);
assert_eq!(state.number(), number);
}
#[tokio::test]
async fn test_state_state_root() {
let number = rand::thread_rng().gen::<u64>();
let block = get_executed_block_with_number(number);
let state = State::new(block.clone());
assert_eq!(state.state_root(), block.block().state_root);
}
#[tokio::test]
async fn test_state_receipts() {
let receipts = Receipts { receipt_vec: vec![vec![Some(Receipt::default())]] };
let block = get_executed_block_with_receipts(receipts.clone());
let state = State::new(block);
assert_eq!(state.receipts(), &receipts);
}
} }