From 605b93a2056cacdb3defb9caf38d41c8165431b3 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Mon, 16 Sep 2024 12:05:16 +0200 Subject: [PATCH] feat(trie): integrate `TrieInput` into `ParallelStateRoot` & `AsyncStateRoot` (#10930) --- crates/blockchain-tree/src/chain.rs | 8 +--- crates/engine/tree/src/tree/mod.rs | 18 +++----- crates/trie/parallel/benches/root.rs | 9 ++-- crates/trie/parallel/src/async_root.rs | 44 ++++++------------ crates/trie/parallel/src/parallel_root.rs | 54 +++++++---------------- crates/trie/trie/src/input.rs | 32 ++++++++++++-- 6 files changed, 71 insertions(+), 94 deletions(-) diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index b070fd2aa..596458e20 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -20,7 +20,7 @@ use reth_provider::{ FullExecutionDataProvider, ProviderError, StateRootProvider, TryIntoHistoricalStateProvider, }; use reth_revm::database::StateProviderDatabase; -use reth_trie::{updates::TrieUpdates, HashedPostState}; +use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; use reth_trie_parallel::parallel_root::ParallelStateRoot; use std::{ collections::BTreeMap, @@ -224,13 +224,9 @@ impl AppendableChain { let mut execution_outcome = provider.block_execution_data_provider.execution_outcome().clone(); execution_outcome.extend(initial_execution_outcome.clone()); - let hashed_state = execution_outcome.hash_state_slow(); - let prefix_sets = hashed_state.construct_prefix_sets().freeze(); ParallelStateRoot::new( consistent_view, - Default::default(), - hashed_state, - prefix_sets, + TrieInput::from_state(execution_outcome.hash_state_slow()), ) .incremental_root_with_updates() .map(|(root, updates)| (root, Some(updates))) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index b20075505..f095098ac 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -40,7 +40,7 @@ use reth_rpc_types::{ ExecutionPayload, }; use reth_stages_api::ControlFlow; -use reth_trie::{prefix_set::TriePrefixSetsMut, updates::TrieUpdates, HashedPostState}; +use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; use reth_trie_parallel::parallel_root::ParallelStateRoot; use std::{ cmp::Ordering, @@ -2263,29 +2263,23 @@ where hashed_state: &HashedPostState, ) -> ProviderResult<(B256, TrieUpdates)> { let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; - let mut trie_nodes = TrieUpdates::default(); - let mut state = HashedPostState::default(); - let mut prefix_sets = TriePrefixSetsMut::default(); + let mut input = TrieInput::default(); if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(parent_hash) { // Retrieve revert state for historical block. let revert_state = consistent_view.revert_state(historical)?; - prefix_sets.extend(revert_state.construct_prefix_sets()); - state.extend(revert_state); + input.append(revert_state); // Extend with contents of parent in-memory blocks. for block in blocks.iter().rev() { - trie_nodes.extend_ref(block.trie.as_ref()); - state.extend_ref(block.hashed_state.as_ref()); + input.append_cached_ref(block.trie_updates(), block.hashed_state()) } } // Extend with block we are validating root for. - prefix_sets.extend(hashed_state.construct_prefix_sets()); - state.extend_ref(hashed_state); + input.append_ref(hashed_state); - Ok(ParallelStateRoot::new(consistent_view, trie_nodes, state, prefix_sets.freeze()) - .incremental_root_with_updates()?) + Ok(ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()?) } /// Handles an error that occurred while inserting a block. diff --git a/crates/trie/parallel/benches/root.rs b/crates/trie/parallel/benches/root.rs index 744683b9a..8f5e27480 100644 --- a/crates/trie/parallel/benches/root.rs +++ b/crates/trie/parallel/benches/root.rs @@ -11,6 +11,7 @@ use reth_provider::{ use reth_tasks::pool::BlockingTaskPool; use reth_trie::{ hashed_cursor::HashedPostStateCursorFactory, HashedPostState, HashedStorage, StateRoot, + TrieInput, }; use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseStateRoot}; use reth_trie_parallel::{async_root::AsyncStateRoot, parallel_root::ParallelStateRoot}; @@ -65,9 +66,7 @@ pub fn calculate_state_root(c: &mut Criterion) { || { ParallelStateRoot::new( view.clone(), - Default::default(), - updated_state.clone(), - updated_state.construct_prefix_sets().freeze(), + TrieInput::from_state(updated_state.clone()), ) }, |calculator| async { calculator.incremental_root() }, @@ -81,9 +80,7 @@ pub fn calculate_state_root(c: &mut Criterion) { AsyncStateRoot::new( view.clone(), blocking_pool.clone(), - Default::default(), - updated_state.clone(), - updated_state.construct_prefix_sets().freeze(), + TrieInput::from_state(updated_state.clone()), ) }, |calculator| calculator.incremental_root(), diff --git a/crates/trie/parallel/src/async_root.rs b/crates/trie/parallel/src/async_root.rs index 600a3b9a2..f9206815c 100644 --- a/crates/trie/parallel/src/async_root.rs +++ b/crates/trie/parallel/src/async_root.rs @@ -12,11 +12,10 @@ use reth_tasks::pool::BlockingTaskPool; use reth_trie::{ hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, node_iter::{TrieElement, TrieNodeIter}, - prefix_set::TriePrefixSets, trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory}, updates::TrieUpdates, walker::TrieWalker, - HashBuilder, HashedPostState, Nibbles, StorageRoot, TrieAccount, + HashBuilder, Nibbles, StorageRoot, TrieAccount, TrieInput, }; use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; use std::{collections::HashMap, sync::Arc}; @@ -42,12 +41,8 @@ pub struct AsyncStateRoot { view: ConsistentDbView, /// Blocking task pool. blocking_pool: BlockingTaskPool, - /// Cached trie nodes. - trie_nodes: TrieUpdates, - /// Changed hashed state. - hashed_state: HashedPostState, - /// A set of prefix sets that have changed. - prefix_sets: TriePrefixSets, + /// Trie input. + input: TrieInput, /// Parallel state root metrics. #[cfg(feature = "metrics")] metrics: ParallelStateRootMetrics, @@ -58,16 +53,12 @@ impl AsyncStateRoot { pub fn new( view: ConsistentDbView, blocking_pool: BlockingTaskPool, - trie_nodes: TrieUpdates, - hashed_state: HashedPostState, - prefix_sets: TriePrefixSets, + input: TrieInput, ) -> Self { Self { view, blocking_pool, - trie_nodes, - hashed_state, - prefix_sets, + input, #[cfg(feature = "metrics")] metrics: ParallelStateRootMetrics::default(), } @@ -95,14 +86,12 @@ where retain_updates: bool, ) -> Result<(B256, TrieUpdates), AsyncStateRootError> { let mut tracker = ParallelTrieTracker::default(); - let trie_nodes_sorted = Arc::new(self.trie_nodes.into_sorted()); - let hashed_state_sorted = Arc::new(self.hashed_state.into_sorted()); + let trie_nodes_sorted = Arc::new(self.input.nodes.into_sorted()); + let hashed_state_sorted = Arc::new(self.input.state.into_sorted()); + let prefix_sets = self.input.prefix_sets.freeze(); let storage_root_targets = StorageRootTargets::new( - self.prefix_sets - .account_prefix_set - .iter() - .map(|nibbles| B256::from_slice(&nibbles.pack())), - self.prefix_sets.storage_prefix_sets, + prefix_sets.account_prefix_set.iter().map(|nibbles| B256::from_slice(&nibbles.pack())), + prefix_sets.storage_prefix_sets, ); // Pre-calculate storage roots async for accounts which were changed. @@ -156,7 +145,7 @@ where let walker = TrieWalker::new( trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?, - self.prefix_sets.account_prefix_set, + prefix_sets.account_prefix_set, ) .with_deletions_retained(retain_updates); let mut account_node_iter = TrieNodeIter::new( @@ -208,7 +197,7 @@ where trie_updates.finalize( account_node_iter.walker, hash_builder, - self.prefix_sets.destroyed_accounts, + prefix_sets.destroyed_accounts, ); let stats = tracker.finish(); @@ -255,7 +244,7 @@ mod tests { use rayon::ThreadPoolBuilder; use reth_primitives::{keccak256, Account, Address, StorageEntry, U256}; use reth_provider::{test_utils::create_test_provider_factory, HashingWriter}; - use reth_trie::{test_utils, HashedStorage}; + use reth_trie::{test_utils, HashedPostState, HashedStorage}; #[tokio::test] async fn random_async_root() { @@ -309,8 +298,6 @@ mod tests { consistent_view.clone(), blocking_pool.clone(), Default::default(), - HashedPostState::default(), - Default::default(), ) .incremental_root() .await @@ -343,14 +330,11 @@ mod tests { } } - let prefix_sets = hashed_state.construct_prefix_sets().freeze(); assert_eq!( AsyncStateRoot::new( consistent_view.clone(), blocking_pool.clone(), - Default::default(), - hashed_state, - prefix_sets + TrieInput::from_state(hashed_state) ) .incremental_root() .await diff --git a/crates/trie/parallel/src/parallel_root.rs b/crates/trie/parallel/src/parallel_root.rs index b38ea2f7f..65d2e386f 100644 --- a/crates/trie/parallel/src/parallel_root.rs +++ b/crates/trie/parallel/src/parallel_root.rs @@ -11,11 +11,10 @@ use reth_provider::{ use reth_trie::{ hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, node_iter::{TrieElement, TrieNodeIter}, - prefix_set::TriePrefixSets, trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory}, updates::TrieUpdates, walker::TrieWalker, - HashBuilder, HashedPostState, Nibbles, StorageRoot, TrieAccount, + HashBuilder, Nibbles, StorageRoot, TrieAccount, TrieInput, }; use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; use std::collections::HashMap; @@ -38,12 +37,8 @@ use tracing::*; pub struct ParallelStateRoot { /// Consistent view of the database. view: ConsistentDbView, - /// Cached trie nodes. - trie_nodes: TrieUpdates, - /// Changed hashed state. - hashed_state: HashedPostState, - /// A set of prefix sets that have changed. - prefix_sets: TriePrefixSets, + /// Trie input. + input: TrieInput, /// Parallel state root metrics. #[cfg(feature = "metrics")] metrics: ParallelStateRootMetrics, @@ -51,17 +46,10 @@ pub struct ParallelStateRoot { impl ParallelStateRoot { /// Create new parallel state root calculator. - pub fn new( - view: ConsistentDbView, - trie_nodes: TrieUpdates, - hashed_state: HashedPostState, - prefix_sets: TriePrefixSets, - ) -> Self { + pub fn new(view: ConsistentDbView, input: TrieInput) -> Self { Self { view, - trie_nodes, - hashed_state, - prefix_sets, + input, #[cfg(feature = "metrics")] metrics: ParallelStateRootMetrics::default(), } @@ -89,14 +77,12 @@ where retain_updates: bool, ) -> Result<(B256, TrieUpdates), ParallelStateRootError> { let mut tracker = ParallelTrieTracker::default(); - let trie_nodes_sorted = self.trie_nodes.into_sorted(); - let hashed_state_sorted = self.hashed_state.into_sorted(); + let trie_nodes_sorted = self.input.nodes.into_sorted(); + let hashed_state_sorted = self.input.state.into_sorted(); + let prefix_sets = self.input.prefix_sets.freeze(); let storage_root_targets = StorageRootTargets::new( - self.prefix_sets - .account_prefix_set - .iter() - .map(|nibbles| B256::from_slice(&nibbles.pack())), - self.prefix_sets.storage_prefix_sets, + prefix_sets.account_prefix_set.iter().map(|nibbles| B256::from_slice(&nibbles.pack())), + prefix_sets.storage_prefix_sets, ); // Pre-calculate storage roots in parallel for accounts which were changed. @@ -142,7 +128,7 @@ where let walker = TrieWalker::new( trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?, - self.prefix_sets.account_prefix_set, + prefix_sets.account_prefix_set, ) .with_deletions_retained(retain_updates); let mut account_node_iter = TrieNodeIter::new( @@ -192,7 +178,7 @@ where trie_updates.finalize( account_node_iter.walker, hash_builder, - self.prefix_sets.destroyed_accounts, + prefix_sets.destroyed_accounts, ); let stats = tracker.finish(); @@ -243,7 +229,7 @@ mod tests { use rand::Rng; use reth_primitives::{keccak256, Account, Address, StorageEntry, U256}; use reth_provider::{test_utils::create_test_provider_factory, HashingWriter}; - use reth_trie::{test_utils, HashedStorage}; + use reth_trie::{test_utils, HashedPostState, HashedStorage}; #[tokio::test] async fn random_parallel_root() { @@ -291,14 +277,9 @@ mod tests { } assert_eq!( - ParallelStateRoot::new( - consistent_view.clone(), - Default::default(), - HashedPostState::default(), - Default::default() - ) - .incremental_root() - .unwrap(), + ParallelStateRoot::new(consistent_view.clone(), Default::default()) + .incremental_root() + .unwrap(), test_utils::state_root(state.clone()) ); @@ -327,9 +308,8 @@ mod tests { } } - let prefix_sets = hashed_state.construct_prefix_sets().freeze(); assert_eq!( - ParallelStateRoot::new(consistent_view, Default::default(), hashed_state, prefix_sets) + ParallelStateRoot::new(consistent_view, TrieInput::from_state(hashed_state)) .incremental_root() .unwrap(), test_utils::state_root(state) diff --git a/crates/trie/trie/src/input.rs b/crates/trie/trie/src/input.rs index 3364f626a..18f9ada2f 100644 --- a/crates/trie/trie/src/input.rs +++ b/crates/trie/trie/src/input.rs @@ -31,19 +31,45 @@ impl TrieInput { Self { nodes: TrieUpdates::default(), state, prefix_sets } } - /// Prepend state to the input and extend the prefix sets + /// Prepend state to the input and extend the prefix sets. pub fn prepend(&mut self, mut state: HashedPostState) { self.prefix_sets.extend(state.construct_prefix_sets()); std::mem::swap(&mut self.state, &mut state); self.state.extend(state); } - /// Prepend intermediate nodes and state to the input. Prefix sets for incoming state will be - /// ignored. + /// Prepend intermediate nodes and state to the input. + /// Prefix sets for incoming state will be ignored. pub fn prepend_cached(&mut self, mut nodes: TrieUpdates, mut state: HashedPostState) { std::mem::swap(&mut self.nodes, &mut nodes); self.nodes.extend(nodes); std::mem::swap(&mut self.state, &mut state); self.state.extend(state); } + + /// Append state to the input and extend the prefix sets. + pub fn append(&mut self, state: HashedPostState) { + self.prefix_sets.extend(state.construct_prefix_sets()); + self.state.extend(state); + } + + /// Append state to the input by reference and extend the prefix sets. + pub fn append_ref(&mut self, state: &HashedPostState) { + self.prefix_sets.extend(state.construct_prefix_sets()); + self.state.extend_ref(state); + } + + /// Append intermediate nodes and state to the input. + /// Prefix sets for incoming state will be ignored. + pub fn append_cached(&mut self, nodes: TrieUpdates, state: HashedPostState) { + self.nodes.extend(nodes); + self.state.extend(state); + } + + /// Append intermediate nodes and state to the input by reference. + /// Prefix sets for incoming state will be ignored. + pub fn append_cached_ref(&mut self, nodes: &TrieUpdates, state: &HashedPostState) { + self.nodes.extend_ref(nodes); + self.state.extend_ref(state); + } }