mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
chore(trie): replace ParallelStateRoot with AsyncStateRoot (#11213)
This commit is contained in:
@ -25,7 +25,7 @@ reth-execution-types.workspace = true
|
|||||||
reth-stages-api.workspace = true
|
reth-stages-api.workspace = true
|
||||||
reth-trie = { workspace = true, features = ["metrics"] }
|
reth-trie = { workspace = true, features = ["metrics"] }
|
||||||
reth-trie-db = { workspace = true, features = ["metrics"] }
|
reth-trie-db = { workspace = true, features = ["metrics"] }
|
||||||
reth-trie-parallel = { workspace = true, features = ["parallel"] }
|
reth-trie-parallel.workspace = true
|
||||||
reth-network.workspace = true
|
reth-network.workspace = true
|
||||||
reth-consensus.workspace = true
|
reth-consensus.workspace = true
|
||||||
reth-node-types.workspace = true
|
reth-node-types.workspace = true
|
||||||
|
|||||||
@ -40,7 +40,7 @@ use reth_provider::{
|
|||||||
use reth_revm::database::StateProviderDatabase;
|
use reth_revm::database::StateProviderDatabase;
|
||||||
use reth_stages_api::ControlFlow;
|
use reth_stages_api::ControlFlow;
|
||||||
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
|
use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput};
|
||||||
use reth_trie_parallel::async_root::{AsyncStateRoot, AsyncStateRootError};
|
use reth_trie_parallel::parallel_root::{ParallelStateRoot, ParallelStateRootError};
|
||||||
use std::{
|
use std::{
|
||||||
cmp::Ordering,
|
cmp::Ordering,
|
||||||
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
|
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
|
||||||
@ -2195,11 +2195,11 @@ where
|
|||||||
let persistence_in_progress = self.persistence_state.in_progress();
|
let persistence_in_progress = self.persistence_state.in_progress();
|
||||||
if !persistence_in_progress {
|
if !persistence_in_progress {
|
||||||
state_root_result = match self
|
state_root_result = match self
|
||||||
.compute_state_root_async(block.parent_hash, &hashed_state)
|
.compute_state_root_parallel(block.parent_hash, &hashed_state)
|
||||||
{
|
{
|
||||||
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
|
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
|
||||||
Err(AsyncStateRootError::Provider(ProviderError::ConsistentView(error))) => {
|
Err(ParallelStateRootError::Provider(ProviderError::ConsistentView(error))) => {
|
||||||
debug!(target: "engine::tree", %error, "Async state root computation failed consistency check, falling back");
|
debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
Err(error) => return Err(InsertBlockErrorKindTwo::Other(Box::new(error))),
|
Err(error) => return Err(InsertBlockErrorKindTwo::Other(Box::new(error))),
|
||||||
@ -2265,7 +2265,7 @@ where
|
|||||||
Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid))
|
Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Compute state root for the given hashed post state asynchronously.
|
/// Compute state root for the given hashed post state in parallel.
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
///
|
///
|
||||||
@ -2273,11 +2273,11 @@ where
|
|||||||
/// Returns `Err(_)` if error was encountered during computation.
|
/// Returns `Err(_)` if error was encountered during computation.
|
||||||
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
|
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
|
||||||
/// should be used instead.
|
/// should be used instead.
|
||||||
fn compute_state_root_async(
|
fn compute_state_root_parallel(
|
||||||
&self,
|
&self,
|
||||||
parent_hash: B256,
|
parent_hash: B256,
|
||||||
hashed_state: &HashedPostState,
|
hashed_state: &HashedPostState,
|
||||||
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
|
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
|
||||||
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
|
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
|
||||||
let mut input = TrieInput::default();
|
let mut input = TrieInput::default();
|
||||||
|
|
||||||
@ -2299,7 +2299,7 @@ where
|
|||||||
// Extend with block we are validating root for.
|
// Extend with block we are validating root for.
|
||||||
input.append_ref(hashed_state);
|
input.append_ref(hashed_state);
|
||||||
|
|
||||||
AsyncStateRoot::new(consistent_view, input).incremental_root_with_updates()
|
ParallelStateRoot::new(consistent_view, input).incremental_root_with_updates()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handles an error that occurred while inserting a block.
|
/// Handles an error that occurred while inserting a block.
|
||||||
|
|||||||
@ -31,13 +31,8 @@ tracing.workspace = true
|
|||||||
# misc
|
# misc
|
||||||
thiserror.workspace = true
|
thiserror.workspace = true
|
||||||
derive_more.workspace = true
|
derive_more.workspace = true
|
||||||
|
rayon.workspace = true
|
||||||
# `async` feature
|
itertools.workspace = true
|
||||||
tokio = { workspace = true, optional = true, default-features = false }
|
|
||||||
itertools = { workspace = true, optional = true }
|
|
||||||
|
|
||||||
# `parallel` feature
|
|
||||||
rayon = { workspace = true, optional = true }
|
|
||||||
|
|
||||||
# `metrics` feature
|
# `metrics` feature
|
||||||
reth-metrics = { workspace = true, optional = true }
|
reth-metrics = { workspace = true, optional = true }
|
||||||
@ -58,12 +53,9 @@ proptest.workspace = true
|
|||||||
proptest-arbitrary-interop.workspace = true
|
proptest-arbitrary-interop.workspace = true
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["metrics", "async", "parallel"]
|
default = ["metrics"]
|
||||||
metrics = ["reth-metrics", "dep:metrics", "reth-trie/metrics"]
|
metrics = ["reth-metrics", "dep:metrics", "reth-trie/metrics"]
|
||||||
async = ["tokio/sync", "itertools"]
|
|
||||||
parallel = ["rayon"]
|
|
||||||
|
|
||||||
[[bench]]
|
[[bench]]
|
||||||
name = "root"
|
name = "root"
|
||||||
required-features = ["async", "parallel"]
|
|
||||||
harness = false
|
harness = false
|
||||||
|
|||||||
@ -13,7 +13,7 @@ use reth_trie::{
|
|||||||
TrieInput,
|
TrieInput,
|
||||||
};
|
};
|
||||||
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseStateRoot};
|
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseStateRoot};
|
||||||
use reth_trie_parallel::{async_root::AsyncStateRoot, parallel_root::ParallelStateRoot};
|
use reth_trie_parallel::parallel_root::ParallelStateRoot;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
pub fn calculate_state_root(c: &mut Criterion) {
|
pub fn calculate_state_root(c: &mut Criterion) {
|
||||||
@ -70,14 +70,6 @@ pub fn calculate_state_root(c: &mut Criterion) {
|
|||||||
|calculator| async { calculator.incremental_root() },
|
|calculator| async { calculator.incremental_root() },
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
// async root
|
|
||||||
group.bench_function(BenchmarkId::new("async root", size), |b| {
|
|
||||||
b.iter_with_setup(
|
|
||||||
|| AsyncStateRoot::new(view.clone(), TrieInput::from_state(updated_state.clone())),
|
|
||||||
|calculator| calculator.incremental_root(),
|
|
||||||
);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,328 +0,0 @@
|
|||||||
#[cfg(feature = "metrics")]
|
|
||||||
use crate::metrics::ParallelStateRootMetrics;
|
|
||||||
use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets};
|
|
||||||
use alloy_primitives::B256;
|
|
||||||
use alloy_rlp::{BufMut, Encodable};
|
|
||||||
use itertools::Itertools;
|
|
||||||
use reth_execution_errors::StorageRootError;
|
|
||||||
use reth_provider::{
|
|
||||||
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError,
|
|
||||||
};
|
|
||||||
use reth_trie::{
|
|
||||||
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
|
|
||||||
node_iter::{TrieElement, TrieNodeIter},
|
|
||||||
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
|
|
||||||
updates::TrieUpdates,
|
|
||||||
walker::TrieWalker,
|
|
||||||
HashBuilder, Nibbles, StorageRoot, TrieAccount, TrieInput,
|
|
||||||
};
|
|
||||||
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
|
|
||||||
use std::{collections::HashMap, sync::Arc};
|
|
||||||
use thiserror::Error;
|
|
||||||
use tracing::*;
|
|
||||||
|
|
||||||
/// Async state root calculator.
|
|
||||||
///
|
|
||||||
/// The calculator starts off by launching tasks to compute storage roots.
|
|
||||||
/// Then, it immediately starts walking the state trie updating the necessary trie
|
|
||||||
/// nodes in the process. Upon encountering a leaf node, it will poll the storage root
|
|
||||||
/// task for the corresponding hashed address.
|
|
||||||
///
|
|
||||||
/// Internally, the calculator uses [`ConsistentDbView`] since
|
|
||||||
/// it needs to rely on database state saying the same until
|
|
||||||
/// the last transaction is open.
|
|
||||||
/// See docs of using [`ConsistentDbView`] for caveats.
|
|
||||||
///
|
|
||||||
/// For sync usage, take a look at `ParallelStateRoot`.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct AsyncStateRoot<Factory> {
|
|
||||||
/// Consistent view of the database.
|
|
||||||
view: ConsistentDbView<Factory>,
|
|
||||||
/// Trie input.
|
|
||||||
input: TrieInput,
|
|
||||||
/// Parallel state root metrics.
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
metrics: ParallelStateRootMetrics,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Factory> AsyncStateRoot<Factory> {
|
|
||||||
/// Create new async state root calculator.
|
|
||||||
pub fn new(view: ConsistentDbView<Factory>, input: TrieInput) -> Self {
|
|
||||||
Self {
|
|
||||||
view,
|
|
||||||
input,
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
metrics: ParallelStateRootMetrics::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<Factory> AsyncStateRoot<Factory>
|
|
||||||
where
|
|
||||||
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + Send + Sync + 'static,
|
|
||||||
{
|
|
||||||
/// Calculate incremental state root asynchronously.
|
|
||||||
pub fn incremental_root(self) -> Result<B256, AsyncStateRootError> {
|
|
||||||
self.calculate(false).map(|(root, _)| root)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Calculate incremental state root with updates asynchronously.
|
|
||||||
pub fn incremental_root_with_updates(self) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
|
|
||||||
self.calculate(true)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn calculate(self, retain_updates: bool) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
|
|
||||||
let mut tracker = ParallelTrieTracker::default();
|
|
||||||
let trie_nodes_sorted = Arc::new(self.input.nodes.into_sorted());
|
|
||||||
let hashed_state_sorted = Arc::new(self.input.state.into_sorted());
|
|
||||||
let prefix_sets = self.input.prefix_sets.freeze();
|
|
||||||
let storage_root_targets = StorageRootTargets::new(
|
|
||||||
prefix_sets.account_prefix_set.iter().map(|nibbles| B256::from_slice(&nibbles.pack())),
|
|
||||||
prefix_sets.storage_prefix_sets,
|
|
||||||
);
|
|
||||||
|
|
||||||
// Pre-calculate storage roots async for accounts which were changed.
|
|
||||||
tracker.set_precomputed_storage_roots(storage_root_targets.len() as u64);
|
|
||||||
debug!(target: "trie::async_state_root", len = storage_root_targets.len(), "pre-calculating storage roots");
|
|
||||||
let mut storage_roots = HashMap::with_capacity(storage_root_targets.len());
|
|
||||||
for (hashed_address, prefix_set) in
|
|
||||||
storage_root_targets.into_iter().sorted_unstable_by_key(|(address, _)| *address)
|
|
||||||
{
|
|
||||||
let view = self.view.clone();
|
|
||||||
let hashed_state_sorted = hashed_state_sorted.clone();
|
|
||||||
let trie_nodes_sorted = trie_nodes_sorted.clone();
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
let metrics = self.metrics.storage_trie.clone();
|
|
||||||
|
|
||||||
let (tx, rx) = std::sync::mpsc::sync_channel(1);
|
|
||||||
|
|
||||||
rayon::spawn_fifo(move || {
|
|
||||||
let result = (|| -> Result<_, AsyncStateRootError> {
|
|
||||||
let provider_ro = view.provider_ro()?;
|
|
||||||
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
|
|
||||||
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
|
|
||||||
&trie_nodes_sorted,
|
|
||||||
);
|
|
||||||
let hashed_state = HashedPostStateCursorFactory::new(
|
|
||||||
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
|
|
||||||
&hashed_state_sorted,
|
|
||||||
);
|
|
||||||
Ok(StorageRoot::new_hashed(
|
|
||||||
trie_cursor_factory,
|
|
||||||
hashed_state,
|
|
||||||
hashed_address,
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
metrics,
|
|
||||||
)
|
|
||||||
.with_prefix_set(prefix_set)
|
|
||||||
.calculate(retain_updates)?)
|
|
||||||
})();
|
|
||||||
let _ = tx.send(result);
|
|
||||||
});
|
|
||||||
storage_roots.insert(hashed_address, rx);
|
|
||||||
}
|
|
||||||
|
|
||||||
trace!(target: "trie::async_state_root", "calculating state root");
|
|
||||||
let mut trie_updates = TrieUpdates::default();
|
|
||||||
|
|
||||||
let provider_ro = self.view.provider_ro()?;
|
|
||||||
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
|
|
||||||
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
|
|
||||||
&trie_nodes_sorted,
|
|
||||||
);
|
|
||||||
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
|
|
||||||
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
|
|
||||||
&hashed_state_sorted,
|
|
||||||
);
|
|
||||||
|
|
||||||
let walker = TrieWalker::new(
|
|
||||||
trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?,
|
|
||||||
prefix_sets.account_prefix_set,
|
|
||||||
)
|
|
||||||
.with_deletions_retained(retain_updates);
|
|
||||||
let mut account_node_iter = TrieNodeIter::new(
|
|
||||||
walker,
|
|
||||||
hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?,
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut hash_builder = HashBuilder::default().with_updates(retain_updates);
|
|
||||||
let mut account_rlp = Vec::with_capacity(128);
|
|
||||||
while let Some(node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
|
|
||||||
match node {
|
|
||||||
TrieElement::Branch(node) => {
|
|
||||||
hash_builder.add_branch(node.key, node.value, node.children_are_in_trie);
|
|
||||||
}
|
|
||||||
TrieElement::Leaf(hashed_address, account) => {
|
|
||||||
let (storage_root, _, updates) = match storage_roots.remove(&hashed_address) {
|
|
||||||
Some(rx) => rx.recv().map_err(|_| {
|
|
||||||
AsyncStateRootError::StorageRootChannelClosed { hashed_address }
|
|
||||||
})??,
|
|
||||||
// Since we do not store all intermediate nodes in the database, there might
|
|
||||||
// be a possibility of re-adding a non-modified leaf to the hash builder.
|
|
||||||
None => {
|
|
||||||
tracker.inc_missed_leaves();
|
|
||||||
StorageRoot::new_hashed(
|
|
||||||
trie_cursor_factory.clone(),
|
|
||||||
hashed_cursor_factory.clone(),
|
|
||||||
hashed_address,
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
self.metrics.storage_trie.clone(),
|
|
||||||
)
|
|
||||||
.calculate(retain_updates)?
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if retain_updates {
|
|
||||||
trie_updates.insert_storage_updates(hashed_address, updates);
|
|
||||||
}
|
|
||||||
|
|
||||||
account_rlp.clear();
|
|
||||||
let account = TrieAccount::from((account, storage_root));
|
|
||||||
account.encode(&mut account_rlp as &mut dyn BufMut);
|
|
||||||
hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let root = hash_builder.root();
|
|
||||||
|
|
||||||
trie_updates.finalize(
|
|
||||||
account_node_iter.walker,
|
|
||||||
hash_builder,
|
|
||||||
prefix_sets.destroyed_accounts,
|
|
||||||
);
|
|
||||||
|
|
||||||
let stats = tracker.finish();
|
|
||||||
|
|
||||||
#[cfg(feature = "metrics")]
|
|
||||||
self.metrics.record_state_trie(stats);
|
|
||||||
|
|
||||||
trace!(
|
|
||||||
target: "trie::async_state_root",
|
|
||||||
%root,
|
|
||||||
duration = ?stats.duration(),
|
|
||||||
branches_added = stats.branches_added(),
|
|
||||||
leaves_added = stats.leaves_added(),
|
|
||||||
missed_leaves = stats.missed_leaves(),
|
|
||||||
precomputed_storage_roots = stats.precomputed_storage_roots(),
|
|
||||||
"calculated state root"
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok((root, trie_updates))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Error during async state root calculation.
|
|
||||||
#[derive(Error, Debug)]
|
|
||||||
pub enum AsyncStateRootError {
|
|
||||||
/// Storage root channel for a given address was closed.
|
|
||||||
#[error("storage root channel for {hashed_address} got closed")]
|
|
||||||
StorageRootChannelClosed {
|
|
||||||
/// The hashed address for which channel was closed.
|
|
||||||
hashed_address: B256,
|
|
||||||
},
|
|
||||||
/// Receive error
|
|
||||||
#[error(transparent)]
|
|
||||||
Receive(#[from] std::sync::mpsc::RecvError),
|
|
||||||
/// Error while calculating storage root.
|
|
||||||
#[error(transparent)]
|
|
||||||
StorageRoot(#[from] StorageRootError),
|
|
||||||
/// Provider error.
|
|
||||||
#[error(transparent)]
|
|
||||||
Provider(#[from] ProviderError),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use alloy_primitives::{keccak256, Address, U256};
|
|
||||||
use rand::Rng;
|
|
||||||
use reth_primitives::{Account, StorageEntry};
|
|
||||||
use reth_provider::{test_utils::create_test_provider_factory, HashingWriter};
|
|
||||||
use reth_trie::{test_utils, HashedPostState, HashedStorage};
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn random_async_root() {
|
|
||||||
let factory = create_test_provider_factory();
|
|
||||||
let consistent_view = ConsistentDbView::new(factory.clone(), None);
|
|
||||||
|
|
||||||
let mut rng = rand::thread_rng();
|
|
||||||
let mut state = (0..100)
|
|
||||||
.map(|_| {
|
|
||||||
let address = Address::random();
|
|
||||||
let account =
|
|
||||||
Account { balance: U256::from(rng.gen::<u64>()), ..Default::default() };
|
|
||||||
let mut storage = HashMap::<B256, U256>::default();
|
|
||||||
let has_storage = rng.gen_bool(0.7);
|
|
||||||
if has_storage {
|
|
||||||
for _ in 0..100 {
|
|
||||||
storage.insert(
|
|
||||||
B256::from(U256::from(rng.gen::<u64>())),
|
|
||||||
U256::from(rng.gen::<u64>()),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
(address, (account, storage))
|
|
||||||
})
|
|
||||||
.collect::<HashMap<_, _>>();
|
|
||||||
|
|
||||||
{
|
|
||||||
let provider_rw = factory.provider_rw().unwrap();
|
|
||||||
provider_rw
|
|
||||||
.insert_account_for_hashing(
|
|
||||||
state.iter().map(|(address, (account, _))| (*address, Some(*account))),
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
provider_rw
|
|
||||||
.insert_storage_for_hashing(state.iter().map(|(address, (_, storage))| {
|
|
||||||
(
|
|
||||||
*address,
|
|
||||||
storage
|
|
||||||
.iter()
|
|
||||||
.map(|(slot, value)| StorageEntry { key: *slot, value: *value }),
|
|
||||||
)
|
|
||||||
}))
|
|
||||||
.unwrap();
|
|
||||||
provider_rw.commit().unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
AsyncStateRoot::new(consistent_view.clone(), Default::default(),)
|
|
||||||
.incremental_root()
|
|
||||||
.unwrap(),
|
|
||||||
test_utils::state_root(state.clone())
|
|
||||||
);
|
|
||||||
|
|
||||||
let mut hashed_state = HashedPostState::default();
|
|
||||||
for (address, (account, storage)) in &mut state {
|
|
||||||
let hashed_address = keccak256(address);
|
|
||||||
|
|
||||||
let should_update_account = rng.gen_bool(0.5);
|
|
||||||
if should_update_account {
|
|
||||||
*account = Account { balance: U256::from(rng.gen::<u64>()), ..*account };
|
|
||||||
hashed_state.accounts.insert(hashed_address, Some(*account));
|
|
||||||
}
|
|
||||||
|
|
||||||
let should_update_storage = rng.gen_bool(0.3);
|
|
||||||
if should_update_storage {
|
|
||||||
for (slot, value) in storage.iter_mut() {
|
|
||||||
let hashed_slot = keccak256(slot);
|
|
||||||
*value = U256::from(rng.gen::<u64>());
|
|
||||||
hashed_state
|
|
||||||
.storages
|
|
||||||
.entry(hashed_address)
|
|
||||||
.or_insert_with(|| HashedStorage::new(false))
|
|
||||||
.storage
|
|
||||||
.insert(hashed_slot, *value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
AsyncStateRoot::new(consistent_view, TrieInput::from_state(hashed_state))
|
|
||||||
.incremental_root()
|
|
||||||
.unwrap(),
|
|
||||||
test_utils::state_root(state)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -13,12 +13,7 @@ pub use storage_root_targets::StorageRootTargets;
|
|||||||
/// Parallel trie calculation stats.
|
/// Parallel trie calculation stats.
|
||||||
pub mod stats;
|
pub mod stats;
|
||||||
|
|
||||||
/// Implementation of async state root computation.
|
|
||||||
#[cfg(feature = "async")]
|
|
||||||
pub mod async_root;
|
|
||||||
|
|
||||||
/// Implementation of parallel state root computation.
|
/// Implementation of parallel state root computation.
|
||||||
#[cfg(feature = "parallel")]
|
|
||||||
pub mod parallel_root;
|
pub mod parallel_root;
|
||||||
|
|
||||||
/// Parallel state root metrics.
|
/// Parallel state root metrics.
|
||||||
|
|||||||
@ -1,10 +1,10 @@
|
|||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
use crate::metrics::ParallelStateRootMetrics;
|
use crate::metrics::ParallelStateRootMetrics;
|
||||||
use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets};
|
use crate::{stats::ParallelTrieTracker, storage_root_targets::StorageRootTargets};
|
||||||
|
use alloy_primitives::B256;
|
||||||
use alloy_rlp::{BufMut, Encodable};
|
use alloy_rlp::{BufMut, Encodable};
|
||||||
use rayon::prelude::*;
|
use itertools::Itertools;
|
||||||
use reth_execution_errors::StorageRootError;
|
use reth_execution_errors::StorageRootError;
|
||||||
use reth_primitives::B256;
|
|
||||||
use reth_provider::{
|
use reth_provider::{
|
||||||
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError,
|
providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError,
|
||||||
};
|
};
|
||||||
@ -17,22 +17,21 @@ use reth_trie::{
|
|||||||
HashBuilder, Nibbles, StorageRoot, TrieAccount, TrieInput,
|
HashBuilder, Nibbles, StorageRoot, TrieAccount, TrieInput,
|
||||||
};
|
};
|
||||||
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
|
use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory};
|
||||||
use std::collections::HashMap;
|
use std::{collections::HashMap, sync::Arc};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tracing::*;
|
use tracing::*;
|
||||||
|
|
||||||
/// Parallel incremental state root calculator.
|
/// Parallel incremental state root calculator.
|
||||||
///
|
///
|
||||||
/// The calculator starts off by pre-computing storage roots of changed
|
/// The calculator starts off by launching tasks to compute storage roots.
|
||||||
/// accounts in parallel. Once that's done, it proceeds to walking the state
|
/// Then, it immediately starts walking the state trie updating the necessary trie
|
||||||
/// trie retrieving the pre-computed storage roots when needed.
|
/// nodes in the process. Upon encountering a leaf node, it will poll the storage root
|
||||||
|
/// task for the corresponding hashed address.
|
||||||
///
|
///
|
||||||
/// Internally, the calculator uses [`ConsistentDbView`] since
|
/// Internally, the calculator uses [`ConsistentDbView`] since
|
||||||
/// it needs to rely on database state saying the same until
|
/// it needs to rely on database state saying the same until
|
||||||
/// the last transaction is open.
|
/// the last transaction is open.
|
||||||
/// See docs of using [`ConsistentDbView`] for caveats.
|
/// See docs of using [`ConsistentDbView`] for caveats.
|
||||||
///
|
|
||||||
/// If possible, use more optimized `AsyncStateRoot` instead.
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct ParallelStateRoot<Factory> {
|
pub struct ParallelStateRoot<Factory> {
|
||||||
/// Consistent view of the database.
|
/// Consistent view of the database.
|
||||||
@ -58,7 +57,7 @@ impl<Factory> ParallelStateRoot<Factory> {
|
|||||||
|
|
||||||
impl<Factory> ParallelStateRoot<Factory>
|
impl<Factory> ParallelStateRoot<Factory>
|
||||||
where
|
where
|
||||||
Factory: DatabaseProviderFactory<Provider: BlockReader> + Send + Sync,
|
Factory: DatabaseProviderFactory<Provider: BlockReader> + Clone + Send + Sync + 'static,
|
||||||
{
|
{
|
||||||
/// Calculate incremental state root in parallel.
|
/// Calculate incremental state root in parallel.
|
||||||
pub fn incremental_root(self) -> Result<B256, ParallelStateRootError> {
|
pub fn incremental_root(self) -> Result<B256, ParallelStateRootError> {
|
||||||
@ -77,8 +76,8 @@ where
|
|||||||
retain_updates: bool,
|
retain_updates: bool,
|
||||||
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
|
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
|
||||||
let mut tracker = ParallelTrieTracker::default();
|
let mut tracker = ParallelTrieTracker::default();
|
||||||
let trie_nodes_sorted = self.input.nodes.into_sorted();
|
let trie_nodes_sorted = Arc::new(self.input.nodes.into_sorted());
|
||||||
let hashed_state_sorted = self.input.state.into_sorted();
|
let hashed_state_sorted = Arc::new(self.input.state.into_sorted());
|
||||||
let prefix_sets = self.input.prefix_sets.freeze();
|
let prefix_sets = self.input.prefix_sets.freeze();
|
||||||
let storage_root_targets = StorageRootTargets::new(
|
let storage_root_targets = StorageRootTargets::new(
|
||||||
prefix_sets.account_prefix_set.iter().map(|nibbles| B256::from_slice(&nibbles.pack())),
|
prefix_sets.account_prefix_set.iter().map(|nibbles| B256::from_slice(&nibbles.pack())),
|
||||||
@ -88,30 +87,43 @@ where
|
|||||||
// Pre-calculate storage roots in parallel for accounts which were changed.
|
// Pre-calculate storage roots in parallel for accounts which were changed.
|
||||||
tracker.set_precomputed_storage_roots(storage_root_targets.len() as u64);
|
tracker.set_precomputed_storage_roots(storage_root_targets.len() as u64);
|
||||||
debug!(target: "trie::parallel_state_root", len = storage_root_targets.len(), "pre-calculating storage roots");
|
debug!(target: "trie::parallel_state_root", len = storage_root_targets.len(), "pre-calculating storage roots");
|
||||||
let mut storage_roots = storage_root_targets
|
let mut storage_roots = HashMap::with_capacity(storage_root_targets.len());
|
||||||
.into_par_iter()
|
for (hashed_address, prefix_set) in
|
||||||
.map(|(hashed_address, prefix_set)| {
|
storage_root_targets.into_iter().sorted_unstable_by_key(|(address, _)| *address)
|
||||||
let provider_ro = self.view.provider_ro()?;
|
{
|
||||||
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
|
let view = self.view.clone();
|
||||||
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
|
let hashed_state_sorted = hashed_state_sorted.clone();
|
||||||
&trie_nodes_sorted,
|
let trie_nodes_sorted = trie_nodes_sorted.clone();
|
||||||
);
|
#[cfg(feature = "metrics")]
|
||||||
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
|
let metrics = self.metrics.storage_trie.clone();
|
||||||
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
|
|
||||||
&hashed_state_sorted,
|
let (tx, rx) = std::sync::mpsc::sync_channel(1);
|
||||||
);
|
|
||||||
let storage_root_result = StorageRoot::new_hashed(
|
rayon::spawn_fifo(move || {
|
||||||
trie_cursor_factory,
|
let result = (|| -> Result<_, ParallelStateRootError> {
|
||||||
hashed_cursor_factory,
|
let provider_ro = view.provider_ro()?;
|
||||||
hashed_address,
|
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
|
||||||
#[cfg(feature = "metrics")]
|
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
|
||||||
self.metrics.storage_trie.clone(),
|
&trie_nodes_sorted,
|
||||||
)
|
);
|
||||||
.with_prefix_set(prefix_set)
|
let hashed_state = HashedPostStateCursorFactory::new(
|
||||||
.calculate(retain_updates);
|
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
|
||||||
Ok((hashed_address, storage_root_result?))
|
&hashed_state_sorted,
|
||||||
})
|
);
|
||||||
.collect::<Result<HashMap<_, _>, ParallelStateRootError>>()?;
|
Ok(StorageRoot::new_hashed(
|
||||||
|
trie_cursor_factory,
|
||||||
|
hashed_state,
|
||||||
|
hashed_address,
|
||||||
|
#[cfg(feature = "metrics")]
|
||||||
|
metrics,
|
||||||
|
)
|
||||||
|
.with_prefix_set(prefix_set)
|
||||||
|
.calculate(retain_updates)?)
|
||||||
|
})();
|
||||||
|
let _ = tx.send(result);
|
||||||
|
});
|
||||||
|
storage_roots.insert(hashed_address, rx);
|
||||||
|
}
|
||||||
|
|
||||||
trace!(target: "trie::parallel_state_root", "calculating state root");
|
trace!(target: "trie::parallel_state_root", "calculating state root");
|
||||||
let mut trie_updates = TrieUpdates::default();
|
let mut trie_updates = TrieUpdates::default();
|
||||||
@ -145,7 +157,13 @@ where
|
|||||||
}
|
}
|
||||||
TrieElement::Leaf(hashed_address, account) => {
|
TrieElement::Leaf(hashed_address, account) => {
|
||||||
let (storage_root, _, updates) = match storage_roots.remove(&hashed_address) {
|
let (storage_root, _, updates) = match storage_roots.remove(&hashed_address) {
|
||||||
Some(result) => result,
|
Some(rx) => rx.recv().map_err(|_| {
|
||||||
|
ParallelStateRootError::StorageRoot(StorageRootError::Database(
|
||||||
|
reth_db::DatabaseError::Other(format!(
|
||||||
|
"channel closed for {hashed_address}"
|
||||||
|
)),
|
||||||
|
))
|
||||||
|
})??,
|
||||||
// Since we do not store all intermediate nodes in the database, there might
|
// Since we do not store all intermediate nodes in the database, there might
|
||||||
// be a possibility of re-adding a non-modified leaf to the hash builder.
|
// be a possibility of re-adding a non-modified leaf to the hash builder.
|
||||||
None => {
|
None => {
|
||||||
|
|||||||
@ -36,7 +36,6 @@ impl IntoIterator for StorageRootTargets {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "parallel")]
|
|
||||||
impl rayon::iter::IntoParallelIterator for StorageRootTargets {
|
impl rayon::iter::IntoParallelIterator for StorageRootTargets {
|
||||||
type Iter = rayon::collections::hash_map::IntoIter<B256, PrefixSet>;
|
type Iter = rayon::collections::hash_map::IntoIter<B256, PrefixSet>;
|
||||||
type Item = (B256, PrefixSet);
|
type Item = (B256, PrefixSet);
|
||||||
|
|||||||
Reference in New Issue
Block a user