mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
perf(engine): parallel storage roots (#10666)
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -6977,6 +6977,7 @@ dependencies = [
|
||||
"reth-tasks",
|
||||
"reth-tracing",
|
||||
"reth-trie",
|
||||
"reth-trie-parallel",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
|
||||
@ -225,10 +225,16 @@ impl AppendableChain {
|
||||
provider.block_execution_data_provider.execution_outcome().clone();
|
||||
execution_outcome.extend(initial_execution_outcome.clone());
|
||||
let hashed_state = execution_outcome.hash_state_slow();
|
||||
ParallelStateRoot::new(consistent_view, hashed_state)
|
||||
.incremental_root_with_updates()
|
||||
.map(|(root, updates)| (root, Some(updates)))
|
||||
.map_err(ProviderError::from)?
|
||||
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
|
||||
ParallelStateRoot::new(
|
||||
consistent_view,
|
||||
Default::default(),
|
||||
hashed_state,
|
||||
prefix_sets,
|
||||
)
|
||||
.incremental_root_with_updates()
|
||||
.map(|(root, updates)| (root, Some(updates)))
|
||||
.map_err(ProviderError::from)?
|
||||
} else {
|
||||
let hashed_state =
|
||||
HashedPostState::from_bundle_state(&initial_execution_outcome.state().state);
|
||||
|
||||
@ -31,8 +31,9 @@ reth-revm.workspace = true
|
||||
reth-rpc-types.workspace = true
|
||||
reth-stages-api.workspace = true
|
||||
reth-tasks.workspace = true
|
||||
reth-trie.workspace = true
|
||||
reth-node-types.workspace = true
|
||||
reth-trie.workspace = true
|
||||
reth-trie-parallel.workspace = true
|
||||
|
||||
# common
|
||||
futures.workspace = true
|
||||
|
||||
@ -27,8 +27,9 @@ use reth_primitives::{
|
||||
SealedHeader, B256, U256,
|
||||
};
|
||||
use reth_provider::{
|
||||
BlockReader, ExecutionOutcome, ProviderError, StateProviderBox, StateProviderFactory,
|
||||
StateReader, StateRootProvider, TransactionVariant,
|
||||
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
|
||||
ProviderError, StateProviderBox, StateProviderFactory, StateReader, StateRootProvider,
|
||||
TransactionVariant,
|
||||
};
|
||||
use reth_revm::database::StateProviderDatabase;
|
||||
use reth_rpc_types::{
|
||||
@ -39,7 +40,8 @@ use reth_rpc_types::{
|
||||
ExecutionPayload,
|
||||
};
|
||||
use reth_stages_api::ControlFlow;
|
||||
use reth_trie::{updates::TrieUpdates, HashedPostState};
|
||||
use reth_trie::{prefix_set::TriePrefixSetsMut, updates::TrieUpdates, HashedPostState};
|
||||
use reth_trie_parallel::parallel_root::ParallelStateRoot;
|
||||
use std::{
|
||||
cmp::Ordering,
|
||||
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
|
||||
@ -518,7 +520,8 @@ impl<P: Debug, E: Debug, T: EngineTypes + Debug> std::fmt::Debug for EngineApiTr
|
||||
|
||||
impl<P, E, T> EngineApiTreeHandler<P, E, T>
|
||||
where
|
||||
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
|
||||
P: DatabaseProviderFactory + BlockReader + StateProviderFactory + StateReader + Clone + 'static,
|
||||
<P as DatabaseProviderFactory>::Provider: BlockReader,
|
||||
E: BlockExecutorProvider,
|
||||
T: EngineTypes,
|
||||
{
|
||||
@ -2167,8 +2170,34 @@ where
|
||||
let hashed_state = HashedPostState::from_bundle_state(&output.state.state);
|
||||
|
||||
let root_time = Instant::now();
|
||||
let (state_root, trie_output) =
|
||||
state_provider.state_root_with_updates(hashed_state.clone())?;
|
||||
let mut state_root_result = None;
|
||||
|
||||
// We attempt to compute state root in parallel if we are currently not persisting anything
|
||||
// to database. This is safe, because the database state cannot change until we
|
||||
// finish parallel computation. It is important that nothing is being persisted as
|
||||
// we are computing in parallel, because we initialize a different database transaction
|
||||
// per thread and it might end up with a different view of the database.
|
||||
let persistence_in_progress = self.persistence_state.in_progress();
|
||||
if !persistence_in_progress {
|
||||
state_root_result = match self
|
||||
.compute_state_root_in_parallel(block.parent_hash, &hashed_state)
|
||||
{
|
||||
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
|
||||
Err(ProviderError::ConsistentView(error)) => {
|
||||
debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
|
||||
None
|
||||
}
|
||||
Err(error) => return Err(error.into()),
|
||||
};
|
||||
}
|
||||
|
||||
let (state_root, trie_output) = if let Some(result) = state_root_result {
|
||||
result
|
||||
} else {
|
||||
debug!(target: "engine", persistence_in_progress, "Failed to compute state root in parallel");
|
||||
state_provider.state_root_with_updates(hashed_state.clone())?
|
||||
};
|
||||
|
||||
if state_root != block.state_root {
|
||||
// call post-block hook
|
||||
self.invalid_block_hook.on_invalid_block(
|
||||
@ -2220,6 +2249,45 @@ where
|
||||
Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid))
|
||||
}
|
||||
|
||||
/// Compute state root for the given hashed post state in parallel.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// Returns `Ok(_)` if computed successfully.
|
||||
/// Returns `Err(_)` if error was encountered during computation.
|
||||
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
|
||||
/// should be used instead.
|
||||
fn compute_state_root_in_parallel(
|
||||
&self,
|
||||
parent_hash: B256,
|
||||
hashed_state: &HashedPostState,
|
||||
) -> ProviderResult<(B256, TrieUpdates)> {
|
||||
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
|
||||
let mut trie_nodes = TrieUpdates::default();
|
||||
let mut state = HashedPostState::default();
|
||||
let mut prefix_sets = TriePrefixSetsMut::default();
|
||||
|
||||
if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(parent_hash) {
|
||||
// Retrieve revert state for historical block.
|
||||
let revert_state = consistent_view.revert_state(historical)?;
|
||||
prefix_sets.extend(revert_state.construct_prefix_sets());
|
||||
state.extend(revert_state);
|
||||
|
||||
// Extend with contents of parent in-memory blocks.
|
||||
for block in blocks.iter().rev() {
|
||||
trie_nodes.extend_ref(block.trie.as_ref());
|
||||
state.extend_ref(block.hashed_state.as_ref());
|
||||
}
|
||||
}
|
||||
|
||||
// Extend with block we are validating root for.
|
||||
prefix_sets.extend(hashed_state.construct_prefix_sets());
|
||||
state.extend_ref(hashed_state);
|
||||
|
||||
Ok(ParallelStateRoot::new(consistent_view, trie_nodes, state, prefix_sets.freeze())
|
||||
.incremental_root_with_updates()?)
|
||||
}
|
||||
|
||||
/// Handles an error that occurred while inserting a block.
|
||||
///
|
||||
/// If this is a validation error this will mark the block as invalid.
|
||||
|
||||
@ -1,7 +1,10 @@
|
||||
use crate::{BlockNumReader, DatabaseProviderFactory, HeaderProvider};
|
||||
use reth_errors::ProviderError;
|
||||
use reth_primitives::{GotExpected, B256};
|
||||
use reth_storage_api::BlockReader;
|
||||
use reth_storage_api::{BlockReader, DBProvider};
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
use reth_trie::HashedPostState;
|
||||
use reth_trie_db::DatabaseHashedPostState;
|
||||
|
||||
pub use reth_storage_errors::provider::ConsistentViewError;
|
||||
|
||||
@ -43,6 +46,21 @@ where
|
||||
Ok(Self::new(provider, tip))
|
||||
}
|
||||
|
||||
/// Retrieve revert hashed state down to the given block hash.
|
||||
pub fn revert_state(&self, block_hash: B256) -> ProviderResult<HashedPostState> {
|
||||
let provider = self.provider_ro()?;
|
||||
let block_number = provider
|
||||
.block_number(block_hash)?
|
||||
.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
|
||||
if block_number == provider.best_block_number()? &&
|
||||
block_number == provider.last_block_number()?
|
||||
{
|
||||
Ok(HashedPostState::default())
|
||||
} else {
|
||||
Ok(HashedPostState::from_reverts(provider.tx_ref(), block_number + 1)?)
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates new read-only provider and performs consistency checks on the current tip.
|
||||
pub fn provider_ro(&self) -> ProviderResult<Factory::Provider> {
|
||||
// Create a new provider.
|
||||
|
||||
@ -1,26 +1,29 @@
|
||||
use crate::{
|
||||
traits::{BlockSource, ReceiptProvider},
|
||||
AccountReader, BlockExecutionReader, BlockHashReader, BlockIdReader, BlockNumReader,
|
||||
BlockReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader, EvmEnvProvider,
|
||||
HeaderProvider, ReceiptProviderIdExt, RequestsProvider, StateProvider, StateProviderBox,
|
||||
StateProviderFactory, StateReader, StateRootProvider, TransactionVariant, TransactionsProvider,
|
||||
WithdrawalsProvider,
|
||||
BlockReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader, DatabaseProvider,
|
||||
EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt, RequestsProvider, StateProvider,
|
||||
StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
|
||||
TransactionsProvider, WithdrawalsProvider,
|
||||
};
|
||||
use parking_lot::Mutex;
|
||||
use reth_chainspec::{ChainInfo, ChainSpec};
|
||||
use reth_db::mock::{DatabaseMock, TxMock};
|
||||
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
|
||||
use reth_evm::ConfigureEvmEnv;
|
||||
use reth_execution_types::{Chain, ExecutionOutcome};
|
||||
use reth_primitives::{
|
||||
keccak256, Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumber,
|
||||
BlockNumberOrTag, BlockWithSenders, Bytecode, Bytes, Header, Receipt, SealedBlock,
|
||||
BlockNumberOrTag, BlockWithSenders, Bytecode, Bytes, GotExpected, Header, Receipt, SealedBlock,
|
||||
SealedBlockWithSenders, SealedHeader, StorageKey, StorageValue, TransactionMeta,
|
||||
TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, Withdrawals, B256,
|
||||
U256,
|
||||
};
|
||||
use reth_stages_types::{StageCheckpoint, StageId};
|
||||
use reth_storage_api::{StageCheckpointReader, StateProofProvider, StorageRootProvider};
|
||||
use reth_storage_errors::provider::{ProviderError, ProviderResult};
|
||||
use reth_storage_api::{
|
||||
DatabaseProviderFactory, StageCheckpointReader, StateProofProvider, StorageRootProvider,
|
||||
};
|
||||
use reth_storage_errors::provider::{ConsistentViewError, ProviderError, ProviderResult};
|
||||
use reth_trie::{
|
||||
prefix_set::TriePrefixSetsMut, updates::TrieUpdates, AccountProof, HashedPostState,
|
||||
HashedStorage,
|
||||
@ -141,6 +144,15 @@ impl MockEthProvider {
|
||||
}
|
||||
}
|
||||
|
||||
impl DatabaseProviderFactory for MockEthProvider {
|
||||
type DB = DatabaseMock;
|
||||
type Provider = DatabaseProvider<TxMock>;
|
||||
|
||||
fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
|
||||
Err(ConsistentViewError::Syncing { best_block: GotExpected::new(0, 0) }.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl HeaderProvider for MockEthProvider {
|
||||
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Header>> {
|
||||
let lock = self.headers.lock();
|
||||
|
||||
@ -62,7 +62,14 @@ pub fn calculate_state_root(c: &mut Criterion) {
|
||||
// parallel root
|
||||
group.bench_function(BenchmarkId::new("parallel root", size), |b| {
|
||||
b.to_async(&runtime).iter_with_setup(
|
||||
|| ParallelStateRoot::new(view.clone(), updated_state.clone()),
|
||||
|| {
|
||||
ParallelStateRoot::new(
|
||||
view.clone(),
|
||||
Default::default(),
|
||||
updated_state.clone(),
|
||||
updated_state.construct_prefix_sets().freeze(),
|
||||
)
|
||||
},
|
||||
|calculator| async { calculator.incremental_root() },
|
||||
);
|
||||
});
|
||||
|
||||
@ -11,7 +11,8 @@ use reth_provider::{
|
||||
use reth_trie::{
|
||||
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
|
||||
node_iter::{TrieElement, TrieNodeIter},
|
||||
trie_cursor::TrieCursorFactory,
|
||||
prefix_set::TriePrefixSets,
|
||||
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
|
||||
updates::TrieUpdates,
|
||||
walker::TrieWalker,
|
||||
HashBuilder, HashedPostState, Nibbles, StorageRoot, TrieAccount,
|
||||
@ -37,8 +38,12 @@ use tracing::*;
|
||||
pub struct ParallelStateRoot<Factory> {
|
||||
/// Consistent view of the database.
|
||||
view: ConsistentDbView<Factory>,
|
||||
/// Cached trie nodes.
|
||||
trie_nodes: TrieUpdates,
|
||||
/// Changed hashed state.
|
||||
hashed_state: HashedPostState,
|
||||
/// A set of prefix sets that have changed.
|
||||
prefix_sets: TriePrefixSets,
|
||||
/// Parallel state root metrics.
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics: ParallelStateRootMetrics,
|
||||
@ -46,10 +51,17 @@ pub struct ParallelStateRoot<Factory> {
|
||||
|
||||
impl<Factory> ParallelStateRoot<Factory> {
|
||||
/// Create new parallel state root calculator.
|
||||
pub fn new(view: ConsistentDbView<Factory>, hashed_state: HashedPostState) -> Self {
|
||||
pub fn new(
|
||||
view: ConsistentDbView<Factory>,
|
||||
trie_nodes: TrieUpdates,
|
||||
hashed_state: HashedPostState,
|
||||
prefix_sets: TriePrefixSets,
|
||||
) -> Self {
|
||||
Self {
|
||||
view,
|
||||
trie_nodes,
|
||||
hashed_state,
|
||||
prefix_sets,
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics: ParallelStateRootMetrics::default(),
|
||||
}
|
||||
@ -77,12 +89,15 @@ where
|
||||
retain_updates: bool,
|
||||
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
|
||||
let mut tracker = ParallelTrieTracker::default();
|
||||
let prefix_sets = self.hashed_state.construct_prefix_sets().freeze();
|
||||
let storage_root_targets = StorageRootTargets::new(
|
||||
self.hashed_state.accounts.keys().copied(),
|
||||
prefix_sets.storage_prefix_sets,
|
||||
);
|
||||
let trie_nodes_sorted = self.trie_nodes.into_sorted();
|
||||
let hashed_state_sorted = self.hashed_state.into_sorted();
|
||||
let storage_root_targets = StorageRootTargets::new(
|
||||
self.prefix_sets
|
||||
.account_prefix_set
|
||||
.iter()
|
||||
.map(|nibbles| B256::from_slice(&nibbles.pack())),
|
||||
self.prefix_sets.storage_prefix_sets,
|
||||
);
|
||||
|
||||
// Pre-calculate storage roots in parallel for accounts which were changed.
|
||||
tracker.set_precomputed_storage_roots(storage_root_targets.len() as u64);
|
||||
@ -91,7 +106,10 @@ where
|
||||
.into_par_iter()
|
||||
.map(|(hashed_address, prefix_set)| {
|
||||
let provider_ro = self.view.provider_ro()?;
|
||||
let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_ro.tx_ref());
|
||||
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
|
||||
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
|
||||
&trie_nodes_sorted,
|
||||
);
|
||||
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
|
||||
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
|
||||
&hashed_state_sorted,
|
||||
@ -113,15 +131,18 @@ where
|
||||
let mut trie_updates = TrieUpdates::default();
|
||||
|
||||
let provider_ro = self.view.provider_ro()?;
|
||||
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
|
||||
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
|
||||
&trie_nodes_sorted,
|
||||
);
|
||||
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
|
||||
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
|
||||
&hashed_state_sorted,
|
||||
);
|
||||
let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_ro.tx_ref());
|
||||
|
||||
let walker = TrieWalker::new(
|
||||
trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?,
|
||||
prefix_sets.account_prefix_set,
|
||||
self.prefix_sets.account_prefix_set,
|
||||
)
|
||||
.with_deletions_retained(retain_updates);
|
||||
let mut account_node_iter = TrieNodeIter::new(
|
||||
@ -171,7 +192,7 @@ where
|
||||
trie_updates.finalize(
|
||||
account_node_iter.walker,
|
||||
hash_builder,
|
||||
prefix_sets.destroyed_accounts,
|
||||
self.prefix_sets.destroyed_accounts,
|
||||
);
|
||||
|
||||
let stats = tracker.finish();
|
||||
@ -270,9 +291,14 @@ mod tests {
|
||||
}
|
||||
|
||||
assert_eq!(
|
||||
ParallelStateRoot::new(consistent_view.clone(), HashedPostState::default())
|
||||
.incremental_root()
|
||||
.unwrap(),
|
||||
ParallelStateRoot::new(
|
||||
consistent_view.clone(),
|
||||
Default::default(),
|
||||
HashedPostState::default(),
|
||||
Default::default()
|
||||
)
|
||||
.incremental_root()
|
||||
.unwrap(),
|
||||
test_utils::state_root(state.clone())
|
||||
);
|
||||
|
||||
@ -301,8 +327,11 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
|
||||
assert_eq!(
|
||||
ParallelStateRoot::new(consistent_view, hashed_state).incremental_root().unwrap(),
|
||||
ParallelStateRoot::new(consistent_view, Default::default(), hashed_state, prefix_sets)
|
||||
.incremental_root()
|
||||
.unwrap(),
|
||||
test_utils::state_root(state)
|
||||
);
|
||||
}
|
||||
|
||||
@ -25,7 +25,7 @@ pub struct StateRoot<T, H> {
|
||||
pub trie_cursor_factory: T,
|
||||
/// The factory for hashed cursors.
|
||||
pub hashed_cursor_factory: H,
|
||||
/// A set of prefix sets that have changes.
|
||||
/// A set of prefix sets that have changed.
|
||||
pub prefix_sets: TriePrefixSets,
|
||||
/// Previous intermediate state.
|
||||
previous_state: Option<IntermediateStateRootState>,
|
||||
|
||||
Reference in New Issue
Block a user