feat(trie): integrate TrieInput into ParallelStateRoot & AsyncStateRoot (#10930)

This commit is contained in:
Roman Krasiuk
2024-09-16 12:05:16 +02:00
committed by GitHub
parent 06dbd3a610
commit 605b93a205
6 changed files with 71 additions and 94 deletions

View File

@ -20,7 +20,7 @@ use reth_provider::{
FullExecutionDataProvider, ProviderError, StateRootProvider, TryIntoHistoricalStateProvider,
};
use reth_revm::database::StateProviderDatabase;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use std::{
collections::BTreeMap,
@ -224,13 +224,9 @@ impl AppendableChain {
let mut execution_outcome =
provider.block_execution_data_provider.execution_outcome().clone();
execution_outcome.extend(initial_execution_outcome.clone());
let hashed_state = execution_outcome.hash_state_slow();
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
ParallelStateRoot::new(
consistent_view,
Default::default(),
hashed_state,
prefix_sets,
TrieInput::from_state(execution_outcome.hash_state_slow()),
)
.incremental_root_with_updates()
.map(|(root, updates)| (root, Some(updates)))

View File

@ -40,7 +40,7 @@ use reth_rpc_types::{
ExecutionPayload,
};
use reth_stages_api::ControlFlow;
use reth_trie::{prefix_set::TriePrefixSetsMut, updates::TrieUpdates, HashedPostState};
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use std::{
cmp::Ordering,
@ -2263,29 +2263,23 @@ where
hashed_state: &HashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let mut trie_nodes = TrieUpdates::default();
let mut state = HashedPostState::default();
let mut prefix_sets = TriePrefixSetsMut::default();
let mut input = TrieInput::default();
if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(parent_hash) {
// Retrieve revert state for historical block.
let revert_state = consistent_view.revert_state(historical)?;
prefix_sets.extend(revert_state.construct_prefix_sets());
state.extend(revert_state);
input.append(revert_state);
// Extend with contents of parent in-memory blocks.
for block in blocks.iter().rev() {
trie_nodes.extend_ref(block.trie.as_ref());
state.extend_ref(block.hashed_state.as_ref());
input.append_cached_ref(block.trie_updates(), block.hashed_state())
}
}
// Extend with block we are validating root for.
prefix_sets.extend(hashed_state.construct_prefix_sets());
state.extend_ref(hashed_state);
input.append_ref(hashed_state);
Ok(ParallelStateRoot::new(consistent_view, trie_nodes, state, prefix_sets.freeze())
.incremental_root_with_updates()?)
Ok(ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()?)
}
/// Handles an error that occurred while inserting a block.

View File

@ -11,6 +11,7 @@ use reth_provider::{
use reth_tasks::pool::BlockingTaskPool;
use reth_trie::{
hashed_cursor::HashedPostStateCursorFactory, HashedPostState, HashedStorage, StateRoot,
TrieInput,
};
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseStateRoot};
use reth_trie_parallel::{async_root::AsyncStateRoot, parallel_root::ParallelStateRoot};
@ -65,9 +66,7 @@ pub fn calculate_state_root(c: &mut Criterion) {
|| {
ParallelStateRoot::new(
view.clone(),
Default::default(),
updated_state.clone(),
updated_state.construct_prefix_sets().freeze(),
TrieInput::from_state(updated_state.clone()),
)
},
|calculator| async { calculator.incremental_root() },
@ -81,9 +80,7 @@ pub fn calculate_state_root(c: &mut Criterion) {
AsyncStateRoot::new(
view.clone(),
blocking_pool.clone(),
Default::default(),
updated_state.clone(),
updated_state.construct_prefix_sets().freeze(),
TrieInput::from_state(updated_state.clone()),
)
},
|calculator| calculator.incremental_root(),

View File

@ -12,11 +12,10 @@ use reth_tasks::pool::BlockingTaskPool;
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
node_iter::{TrieElement, TrieNodeIter},
prefix_set::TriePrefixSets,
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
updates::TrieUpdates,
walker::TrieWalker,
HashBuilder, HashedPostState, Nibbles, StorageRoot, TrieAccount,
HashBuilder, Nibbles, StorageRoot, TrieAccount, TrieInput,
};
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use std::{collections::HashMap, sync::Arc};
@ -42,12 +41,8 @@ pub struct AsyncStateRoot<Factory> {
view: ConsistentDbView<Factory>,
/// Blocking task pool.
blocking_pool: BlockingTaskPool,
/// Cached trie nodes.
trie_nodes: TrieUpdates,
/// Changed hashed state.
hashed_state: HashedPostState,
/// A set of prefix sets that have changed.
prefix_sets: TriePrefixSets,
/// Trie input.
input: TrieInput,
/// Parallel state root metrics.
#[cfg(feature = "metrics")]
metrics: ParallelStateRootMetrics,
@ -58,16 +53,12 @@ impl<Factory> AsyncStateRoot<Factory> {
pub fn new(
view: ConsistentDbView<Factory>,
blocking_pool: BlockingTaskPool,
trie_nodes: TrieUpdates,
hashed_state: HashedPostState,
prefix_sets: TriePrefixSets,
input: TrieInput,
) -> Self {
Self {
view,
blocking_pool,
trie_nodes,
hashed_state,
prefix_sets,
input,
#[cfg(feature = "metrics")]
metrics: ParallelStateRootMetrics::default(),
}
@ -95,14 +86,12 @@ where
retain_updates: bool,
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
let mut tracker = ParallelTrieTracker::default();
let trie_nodes_sorted = Arc::new(self.trie_nodes.into_sorted());
let hashed_state_sorted = Arc::new(self.hashed_state.into_sorted());
let trie_nodes_sorted = Arc::new(self.input.nodes.into_sorted());
let hashed_state_sorted = Arc::new(self.input.state.into_sorted());
let prefix_sets = self.input.prefix_sets.freeze();
let storage_root_targets = StorageRootTargets::new(
self.prefix_sets
.account_prefix_set
.iter()
.map(|nibbles| B256::from_slice(&nibbles.pack())),
self.prefix_sets.storage_prefix_sets,
prefix_sets.account_prefix_set.iter().map(|nibbles| B256::from_slice(&nibbles.pack())),
prefix_sets.storage_prefix_sets,
);
// Pre-calculate storage roots async for accounts which were changed.
@ -156,7 +145,7 @@ where
let walker = TrieWalker::new(
trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?,
self.prefix_sets.account_prefix_set,
prefix_sets.account_prefix_set,
)
.with_deletions_retained(retain_updates);
let mut account_node_iter = TrieNodeIter::new(
@ -208,7 +197,7 @@ where
trie_updates.finalize(
account_node_iter.walker,
hash_builder,
self.prefix_sets.destroyed_accounts,
prefix_sets.destroyed_accounts,
);
let stats = tracker.finish();
@ -255,7 +244,7 @@ mod tests {
use rayon::ThreadPoolBuilder;
use reth_primitives::{keccak256, Account, Address, StorageEntry, U256};
use reth_provider::{test_utils::create_test_provider_factory, HashingWriter};
use reth_trie::{test_utils, HashedStorage};
use reth_trie::{test_utils, HashedPostState, HashedStorage};
#[tokio::test]
async fn random_async_root() {
@ -309,8 +298,6 @@ mod tests {
consistent_view.clone(),
blocking_pool.clone(),
Default::default(),
HashedPostState::default(),
Default::default(),
)
.incremental_root()
.await
@ -343,14 +330,11 @@ mod tests {
}
}
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
assert_eq!(
AsyncStateRoot::new(
consistent_view.clone(),
blocking_pool.clone(),
Default::default(),
hashed_state,
prefix_sets
TrieInput::from_state(hashed_state)
)
.incremental_root()
.await

View File

@ -11,11 +11,10 @@ use reth_provider::{
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
node_iter::{TrieElement, TrieNodeIter},
prefix_set::TriePrefixSets,
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
updates::TrieUpdates,
walker::TrieWalker,
HashBuilder, HashedPostState, Nibbles, StorageRoot, TrieAccount,
HashBuilder, Nibbles, StorageRoot, TrieAccount, TrieInput,
};
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
use std::collections::HashMap;
@ -38,12 +37,8 @@ use tracing::*;
pub struct ParallelStateRoot<Factory> {
/// Consistent view of the database.
view: ConsistentDbView<Factory>,
/// Cached trie nodes.
trie_nodes: TrieUpdates,
/// Changed hashed state.
hashed_state: HashedPostState,
/// A set of prefix sets that have changed.
prefix_sets: TriePrefixSets,
/// Trie input.
input: TrieInput,
/// Parallel state root metrics.
#[cfg(feature = "metrics")]
metrics: ParallelStateRootMetrics,
@ -51,17 +46,10 @@ pub struct ParallelStateRoot<Factory> {
impl<Factory> ParallelStateRoot<Factory> {
/// Create new parallel state root calculator.
pub fn new(
view: ConsistentDbView<Factory>,
trie_nodes: TrieUpdates,
hashed_state: HashedPostState,
prefix_sets: TriePrefixSets,
) -> Self {
pub fn new(view: ConsistentDbView<Factory>, input: TrieInput) -> Self {
Self {
view,
trie_nodes,
hashed_state,
prefix_sets,
input,
#[cfg(feature = "metrics")]
metrics: ParallelStateRootMetrics::default(),
}
@ -89,14 +77,12 @@ where
retain_updates: bool,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let mut tracker = ParallelTrieTracker::default();
let trie_nodes_sorted = self.trie_nodes.into_sorted();
let hashed_state_sorted = self.hashed_state.into_sorted();
let trie_nodes_sorted = self.input.nodes.into_sorted();
let hashed_state_sorted = self.input.state.into_sorted();
let prefix_sets = self.input.prefix_sets.freeze();
let storage_root_targets = StorageRootTargets::new(
self.prefix_sets
.account_prefix_set
.iter()
.map(|nibbles| B256::from_slice(&nibbles.pack())),
self.prefix_sets.storage_prefix_sets,
prefix_sets.account_prefix_set.iter().map(|nibbles| B256::from_slice(&nibbles.pack())),
prefix_sets.storage_prefix_sets,
);
// Pre-calculate storage roots in parallel for accounts which were changed.
@ -142,7 +128,7 @@ where
let walker = TrieWalker::new(
trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?,
self.prefix_sets.account_prefix_set,
prefix_sets.account_prefix_set,
)
.with_deletions_retained(retain_updates);
let mut account_node_iter = TrieNodeIter::new(
@ -192,7 +178,7 @@ where
trie_updates.finalize(
account_node_iter.walker,
hash_builder,
self.prefix_sets.destroyed_accounts,
prefix_sets.destroyed_accounts,
);
let stats = tracker.finish();
@ -243,7 +229,7 @@ mod tests {
use rand::Rng;
use reth_primitives::{keccak256, Account, Address, StorageEntry, U256};
use reth_provider::{test_utils::create_test_provider_factory, HashingWriter};
use reth_trie::{test_utils, HashedStorage};
use reth_trie::{test_utils, HashedPostState, HashedStorage};
#[tokio::test]
async fn random_parallel_root() {
@ -291,14 +277,9 @@ mod tests {
}
assert_eq!(
ParallelStateRoot::new(
consistent_view.clone(),
Default::default(),
HashedPostState::default(),
Default::default()
)
.incremental_root()
.unwrap(),
ParallelStateRoot::new(consistent_view.clone(), Default::default())
.incremental_root()
.unwrap(),
test_utils::state_root(state.clone())
);
@ -327,9 +308,8 @@ mod tests {
}
}
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
assert_eq!(
ParallelStateRoot::new(consistent_view, Default::default(), hashed_state, prefix_sets)
ParallelStateRoot::new(consistent_view, TrieInput::from_state(hashed_state))
.incremental_root()
.unwrap(),
test_utils::state_root(state)

View File

@ -31,19 +31,45 @@ impl TrieInput {
Self { nodes: TrieUpdates::default(), state, prefix_sets }
}
/// Prepend state to the input and extend the prefix sets
/// Prepend state to the input and extend the prefix sets.
pub fn prepend(&mut self, mut state: HashedPostState) {
self.prefix_sets.extend(state.construct_prefix_sets());
std::mem::swap(&mut self.state, &mut state);
self.state.extend(state);
}
/// Prepend intermediate nodes and state to the input. Prefix sets for incoming state will be
/// ignored.
/// Prepend intermediate nodes and state to the input.
/// Prefix sets for incoming state will be ignored.
pub fn prepend_cached(&mut self, mut nodes: TrieUpdates, mut state: HashedPostState) {
std::mem::swap(&mut self.nodes, &mut nodes);
self.nodes.extend(nodes);
std::mem::swap(&mut self.state, &mut state);
self.state.extend(state);
}
/// Append state to the input and extend the prefix sets.
pub fn append(&mut self, state: HashedPostState) {
self.prefix_sets.extend(state.construct_prefix_sets());
self.state.extend(state);
}
/// Append state to the input by reference and extend the prefix sets.
pub fn append_ref(&mut self, state: &HashedPostState) {
self.prefix_sets.extend(state.construct_prefix_sets());
self.state.extend_ref(state);
}
/// Append intermediate nodes and state to the input.
/// Prefix sets for incoming state will be ignored.
pub fn append_cached(&mut self, nodes: TrieUpdates, state: HashedPostState) {
self.nodes.extend(nodes);
self.state.extend(state);
}
/// Append intermediate nodes and state to the input by reference.
/// Prefix sets for incoming state will be ignored.
pub fn append_cached_ref(&mut self, nodes: &TrieUpdates, state: &HashedPostState) {
self.nodes.extend_ref(nodes);
self.state.extend_ref(state);
}
}