feat(engine): parallel sparse storage roots (#13269)

This commit is contained in:
Alexey Shekhirin
2024-12-10 18:29:07 +00:00
committed by GitHub
parent 88a9bd72d4
commit 8aada7a243
3 changed files with 58 additions and 25 deletions

View File

@ -1,6 +1,7 @@
//! State root task related functionality. //! State root task related functionality.
use alloy_primitives::map::{HashMap, HashSet}; use alloy_primitives::map::{HashMap, HashSet};
use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_evm::system_calls::OnStateHook; use reth_evm::system_calls::OnStateHook;
use reth_provider::{ use reth_provider::{
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory,
@ -567,30 +568,45 @@ fn update_sparse_trie(
trie.reveal_multiproof(targets, multiproof)?; trie.reveal_multiproof(targets, multiproof)?;
// Update storage slots with new values and calculate storage roots. // Update storage slots with new values and calculate storage roots.
for (address, storage) in state.storages { let (tx, rx) = mpsc::channel();
trace!(target: "engine::root::sparse", ?address, "Updating storage"); state
let storage_trie = trie.storage_trie_mut(&address).ok_or(SparseTrieError::Blind)?; .storages
.into_iter()
.map(|(address, storage)| (address, storage, trie.take_storage_trie(&address)))
.par_bridge()
.map(|(address, storage, storage_trie)| {
trace!(target: "engine::root::sparse", ?address, "Updating storage");
let mut storage_trie = storage_trie.ok_or(SparseTrieError::Blind)?;
if storage.wiped { if storage.wiped {
trace!(target: "engine::root::sparse", ?address, "Wiping storage"); trace!(target: "engine::root::sparse", ?address, "Wiping storage");
storage_trie.wipe(); storage_trie.wipe()?;
}
for (slot, value) in storage.storage {
let slot_nibbles = Nibbles::unpack(slot);
if value.is_zero() {
trace!(target: "engine::root::sparse", ?address, ?slot, "Removing storage slot");
// TODO: handle blinded node error
storage_trie.remove_leaf(&slot_nibbles)?;
} else {
trace!(target: "engine::root::sparse", ?address, ?slot, "Updating storage slot");
storage_trie
.update_leaf(slot_nibbles, alloy_rlp::encode_fixed_size(&value).to_vec())?;
} }
}
storage_trie.root(); for (slot, value) in storage.storage {
let slot_nibbles = Nibbles::unpack(slot);
if value.is_zero() {
trace!(target: "engine::root::sparse", ?address, ?slot, "Removing storage slot");
storage_trie.remove_leaf(&slot_nibbles)?;
} else {
trace!(target: "engine::root::sparse", ?address, ?slot, "Updating storage slot");
storage_trie
.update_leaf(slot_nibbles, alloy_rlp::encode_fixed_size(&value).to_vec())?;
}
}
storage_trie.root();
SparseStateTrieResult::Ok((address, storage_trie))
})
.for_each_init(|| tx.clone(), |tx, result| {
tx.send(result).unwrap()
});
drop(tx);
for result in rx {
let (address, storage_trie) = result?;
trie.insert_storage_trie(address, storage_trie);
} }
// Update accounts with new values // Update accounts with new values

View File

@ -107,14 +107,14 @@ pub enum SparseTrieError {
/// Path to the node. /// Path to the node.
path: Nibbles, path: Nibbles,
/// Node that was at the path when revealing. /// Node that was at the path when revealing.
node: Box<dyn core::fmt::Debug>, node: Box<dyn core::fmt::Debug + Send>,
}, },
/// RLP error. /// RLP error.
#[error(transparent)] #[error(transparent)]
Rlp(#[from] alloy_rlp::Error), Rlp(#[from] alloy_rlp::Error),
/// Other. /// Other.
#[error(transparent)] #[error(transparent)]
Other(#[from] Box<dyn core::error::Error>), Other(#[from] Box<dyn core::error::Error + Send>),
} }
/// Trie witness errors. /// Trie witness errors.

View File

@ -97,9 +97,26 @@ impl<F: BlindedProviderFactory> SparseStateTrie<F> {
/// Returns mutable reference to storage sparse trie if it was revealed. /// Returns mutable reference to storage sparse trie if it was revealed.
pub fn storage_trie_mut( pub fn storage_trie_mut(
&mut self, &mut self,
account: &B256, address: &B256,
) -> Option<&mut RevealedSparseTrie<F::StorageNodeProvider>> { ) -> Option<&mut RevealedSparseTrie<F::StorageNodeProvider>> {
self.storages.get_mut(account).and_then(|e| e.as_revealed_mut()) self.storages.get_mut(address).and_then(|e| e.as_revealed_mut())
}
/// Takes the storage trie for the provided address.
pub fn take_storage_trie(
&mut self,
address: &B256,
) -> Option<SparseTrie<F::StorageNodeProvider>> {
self.storages.remove(address)
}
/// Inserts storage trie for the provided address.
pub fn insert_storage_trie(
&mut self,
address: B256,
storage_trie: SparseTrie<F::StorageNodeProvider>,
) {
self.storages.insert(address, storage_trie);
} }
/// Reveal unknown trie paths from provided leaf path and its proof for the account. /// Reveal unknown trie paths from provided leaf path and its proof for the account.