feat(trie): async root intermediate nodes (#10920)

This commit is contained in:
Roman Krasiuk
2024-09-16 10:37:18 +02:00
committed by GitHub
parent b0eb78ced6
commit 59a7798750
2 changed files with 55 additions and 20 deletions

View File

@ -77,7 +77,15 @@ pub fn calculate_state_root(c: &mut Criterion) {
// async root // async root
group.bench_function(BenchmarkId::new("async root", size), |b| { group.bench_function(BenchmarkId::new("async root", size), |b| {
b.to_async(&runtime).iter_with_setup( b.to_async(&runtime).iter_with_setup(
|| AsyncStateRoot::new(view.clone(), blocking_pool.clone(), updated_state.clone()), || {
AsyncStateRoot::new(
view.clone(),
blocking_pool.clone(),
Default::default(),
updated_state.clone(),
updated_state.construct_prefix_sets().freeze(),
)
},
|calculator| calculator.incremental_root(), |calculator| calculator.incremental_root(),
); );
}); });

View File

@ -12,7 +12,8 @@ use reth_tasks::pool::BlockingTaskPool;
use reth_trie::{ use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
node_iter::{TrieElement, TrieNodeIter}, node_iter::{TrieElement, TrieNodeIter},
trie_cursor::TrieCursorFactory, prefix_set::TriePrefixSets,
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
updates::TrieUpdates, updates::TrieUpdates,
walker::TrieWalker, walker::TrieWalker,
HashBuilder, HashedPostState, Nibbles, StorageRoot, TrieAccount, HashBuilder, HashedPostState, Nibbles, StorageRoot, TrieAccount,
@ -41,8 +42,12 @@ pub struct AsyncStateRoot<Factory> {
view: ConsistentDbView<Factory>, view: ConsistentDbView<Factory>,
/// Blocking task pool. /// Blocking task pool.
blocking_pool: BlockingTaskPool, blocking_pool: BlockingTaskPool,
/// Cached trie nodes.
trie_nodes: TrieUpdates,
/// Changed hashed state. /// Changed hashed state.
hashed_state: HashedPostState, hashed_state: HashedPostState,
/// A set of prefix sets that have changed.
prefix_sets: TriePrefixSets,
/// Parallel state root metrics. /// Parallel state root metrics.
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
metrics: ParallelStateRootMetrics, metrics: ParallelStateRootMetrics,
@ -53,12 +58,16 @@ impl<Factory> AsyncStateRoot<Factory> {
pub fn new( pub fn new(
view: ConsistentDbView<Factory>, view: ConsistentDbView<Factory>,
blocking_pool: BlockingTaskPool, blocking_pool: BlockingTaskPool,
trie_nodes: TrieUpdates,
hashed_state: HashedPostState, hashed_state: HashedPostState,
prefix_sets: TriePrefixSets,
) -> Self { ) -> Self {
Self { Self {
view, view,
blocking_pool, blocking_pool,
trie_nodes,
hashed_state, hashed_state,
prefix_sets,
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
metrics: ParallelStateRootMetrics::default(), metrics: ParallelStateRootMetrics::default(),
} }
@ -86,12 +95,15 @@ where
retain_updates: bool, retain_updates: bool,
) -> Result<(B256, TrieUpdates), AsyncStateRootError> { ) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
let mut tracker = ParallelTrieTracker::default(); let mut tracker = ParallelTrieTracker::default();
let prefix_sets = self.hashed_state.construct_prefix_sets().freeze(); let trie_nodes_sorted = Arc::new(self.trie_nodes.into_sorted());
let storage_root_targets = StorageRootTargets::new(
self.hashed_state.accounts.keys().copied(),
prefix_sets.storage_prefix_sets,
);
let hashed_state_sorted = Arc::new(self.hashed_state.into_sorted()); let hashed_state_sorted = Arc::new(self.hashed_state.into_sorted());
let storage_root_targets = StorageRootTargets::new(
self.prefix_sets
.account_prefix_set
.iter()
.map(|nibbles| B256::from_slice(&nibbles.pack())),
self.prefix_sets.storage_prefix_sets,
);
// Pre-calculate storage roots async for accounts which were changed. // Pre-calculate storage roots async for accounts which were changed.
tracker.set_precomputed_storage_roots(storage_root_targets.len() as u64); tracker.set_precomputed_storage_roots(storage_root_targets.len() as u64);
@ -102,14 +114,18 @@ where
{ {
let view = self.view.clone(); let view = self.view.clone();
let hashed_state_sorted = hashed_state_sorted.clone(); let hashed_state_sorted = hashed_state_sorted.clone();
let trie_nodes_sorted = trie_nodes_sorted.clone();
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
let metrics = self.metrics.storage_trie.clone(); let metrics = self.metrics.storage_trie.clone();
let handle = let handle =
self.blocking_pool.spawn_fifo(move || -> Result<_, AsyncStateRootError> { self.blocking_pool.spawn_fifo(move || -> Result<_, AsyncStateRootError> {
let provider = view.provider_ro()?; let provider_ro = view.provider_ro()?;
let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider.tx_ref()); let trie_cursor_factory = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
&trie_nodes_sorted,
);
let hashed_state = HashedPostStateCursorFactory::new( let hashed_state = HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(provider.tx_ref()), DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
&hashed_state_sorted, &hashed_state_sorted,
); );
Ok(StorageRoot::new_hashed( Ok(StorageRoot::new_hashed(
@ -129,16 +145,18 @@ where
let mut trie_updates = TrieUpdates::default(); let mut trie_updates = TrieUpdates::default();
let provider_ro = self.view.provider_ro()?; let provider_ro = self.view.provider_ro()?;
let tx = provider_ro.tx_ref(); let trie_cursor_factory = InMemoryTrieCursorFactory::new(
let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx); DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
&trie_nodes_sorted,
);
let hashed_cursor_factory = HashedPostStateCursorFactory::new( let hashed_cursor_factory = HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(tx), DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
&hashed_state_sorted, &hashed_state_sorted,
); );
let walker = TrieWalker::new( let walker = TrieWalker::new(
trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?, trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?,
prefix_sets.account_prefix_set, self.prefix_sets.account_prefix_set,
) )
.with_deletions_retained(retain_updates); .with_deletions_retained(retain_updates);
let mut account_node_iter = TrieNodeIter::new( let mut account_node_iter = TrieNodeIter::new(
@ -190,7 +208,7 @@ where
trie_updates.finalize( trie_updates.finalize(
account_node_iter.walker, account_node_iter.walker,
hash_builder, hash_builder,
prefix_sets.destroyed_accounts, self.prefix_sets.destroyed_accounts,
); );
let stats = tracker.finish(); let stats = tracker.finish();
@ -290,7 +308,9 @@ mod tests {
AsyncStateRoot::new( AsyncStateRoot::new(
consistent_view.clone(), consistent_view.clone(),
blocking_pool.clone(), blocking_pool.clone(),
HashedPostState::default() Default::default(),
HashedPostState::default(),
Default::default(),
) )
.incremental_root() .incremental_root()
.await .await
@ -323,8 +343,15 @@ mod tests {
} }
} }
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
assert_eq!( assert_eq!(
AsyncStateRoot::new(consistent_view.clone(), blocking_pool.clone(), hashed_state) AsyncStateRoot::new(
consistent_view.clone(),
blocking_pool.clone(),
Default::default(),
hashed_state,
prefix_sets
)
.incremental_root() .incremental_root()
.await .await
.unwrap(), .unwrap(),