perf(provider): compute hashes and trie updates before opening write tx (#5505)

This commit is contained in:
Roman Krasiuk
2023-11-28 00:54:09 -08:00
committed by GitHub
parent 98250f8b50
commit 608f100605
8 changed files with 236 additions and 30 deletions

View File

@ -277,11 +277,26 @@ impl Command {
let state = executor.take_output_state();
debug!(target: "reth::cli", ?state, "Executed block");
let hashed_state = state.hash_state_slow();
let (state_root, trie_updates) = state
.state_root_calculator(provider_factory.provider()?.tx_ref(), &hashed_state)
.root_with_updates()?;
if state_root != block_with_senders.state_root {
eyre::bail!(
"state root mismatch. expected: {}. got: {}",
block_with_senders.state_root,
state_root
);
}
// Attempt to insert new block without committing
let provider_rw = provider_factory.provider_rw()?;
provider_rw.append_blocks_with_bundle_state(
provider_rw.append_blocks_with_state(
Vec::from([block_with_senders]),
state,
hashed_state,
trie_updates,
None,
)?;
info!(target: "reth::cli", "Successfully appended built block");

View File

@ -6,7 +6,7 @@ use crate::{
state::{BlockChainId, TreeState},
AppendableChain, BlockIndices, BlockchainTreeConfig, BundleStateData, TreeExternals,
};
use reth_db::database::Database;
use reth_db::{database::Database, DatabaseError};
use reth_interfaces::{
blockchain_tree::{
error::{BlockchainTreeError, CanonicalError, InsertBlockError, InsertBlockErrorKind},
@ -14,17 +14,18 @@ use reth_interfaces::{
},
consensus::{Consensus, ConsensusError},
executor::{BlockExecutionError, BlockValidationError},
RethResult,
provider::RootMismatch,
RethError, RethResult,
};
use reth_primitives::{
BlockHash, BlockNumHash, BlockNumber, ForkBlock, Hardfork, PruneModes, Receipt, SealedBlock,
SealedBlockWithSenders, SealedHeader, U256,
BlockHash, BlockNumHash, BlockNumber, ForkBlock, GotExpected, Hardfork, PruneModes, Receipt,
SealedBlock, SealedBlockWithSenders, SealedHeader, U256,
};
use reth_provider::{
chain::{ChainSplit, ChainSplitTarget},
BlockExecutionWriter, BlockNumReader, BlockWriter, BundleStateWithReceipts,
CanonStateNotification, CanonStateNotificationSender, CanonStateNotifications, Chain,
ChainSpecProvider, DisplayBlocksChain, ExecutorFactory, HeaderProvider,
ChainSpecProvider, DisplayBlocksChain, ExecutorFactory, HeaderProvider, ProviderError,
};
use reth_stages::{MetricEvent, MetricEventsSender};
use std::{collections::BTreeMap, sync::Arc};
@ -1104,14 +1105,35 @@ impl<DB: Database, EF: ExecutorFactory> BlockchainTree<DB, EF> {
/// Write the given chain to the database as canonical.
fn commit_canonical_to_database(&self, chain: Chain) -> RethResult<()> {
let provider_rw = self.externals.provider_factory.provider_rw()?;
// Compute state root before opening write transaction.
let hashed_state = chain.state().hash_state_slow();
let (state_root, trie_updates) = chain
.state()
.state_root_calculator(
self.externals.provider_factory.provider()?.tx_ref(),
&hashed_state,
)
.root_with_updates()
.map_err(Into::<DatabaseError>::into)?;
let tip = chain.tip();
if state_root != tip.state_root {
return Err(RethError::Provider(ProviderError::StateRootMismatch(Box::new(
RootMismatch {
root: GotExpected { got: state_root, expected: tip.state_root },
block_number: tip.number,
block_hash: tip.hash,
},
))))
}
let (blocks, state) = chain.into_inner();
let provider_rw = self.externals.provider_factory.provider_rw()?;
provider_rw
.append_blocks_with_bundle_state(
.append_blocks_with_state(
blocks.into_blocks().collect(),
state,
hashed_state,
trie_updates,
self.prune_modes.as_ref(),
)
.map_err(|e| BlockExecutionError::CanonicalCommit { inner: e.to_string() })?;

View File

@ -128,7 +128,6 @@ impl BundleStateWithReceipts {
///
/// The hashed post state.
pub fn hash_state_slow(&self) -> HashedPostState {
//let mut storages = BTreeMap::default();
let mut hashed_state = HashedPostState::default();
for (address, account) in self.bundle.state() {
@ -136,7 +135,7 @@ impl BundleStateWithReceipts {
if let Some(account) = &account.info {
hashed_state.insert_account(hashed_address, into_reth_acc(account.clone()))
} else {
hashed_state.insert_cleared_account(hashed_address);
hashed_state.insert_destroyed_account(hashed_address);
}
// insert storage.
@ -155,8 +154,8 @@ impl BundleStateWithReceipts {
hashed_state.sorted()
}
/// Returns [StateRoot] calculator.
fn state_root_calculator<'a, 'b, TX: DbTx>(
/// Returns [StateRoot] calculator based on database and in-memory state.
pub fn state_root_calculator<'a, 'b, TX: DbTx>(
&self,
tx: &'a TX,
hashed_post_state: &'b HashedPostState,
@ -167,6 +166,7 @@ impl BundleStateWithReceipts {
.with_hashed_cursor_factory(hashed_cursor_factory)
.with_changed_account_prefixes(account_prefix_set)
.with_changed_storage_prefixes(storage_prefix_set)
.with_destroyed_accounts(hashed_post_state.destroyed_accounts())
}
/// Calculate the state root for this [BundleState].

View File

@ -0,0 +1,128 @@
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
tables,
transaction::{DbTx, DbTxMut},
DatabaseError,
};
use reth_primitives::{Account, StorageEntry, B256, U256};
use reth_trie::hashed_cursor::HashedPostState;
use std::collections::BTreeMap;
/// Changes to the hashed state.
#[derive(Debug, Default)]
pub struct HashedStateChanges(pub HashedPostState);
impl HashedStateChanges {
/// Write the bundle state to the database.
pub fn write_to_db<TX: DbTxMut + DbTx>(self, tx: &TX) -> Result<(), DatabaseError> {
// Collect hashed account changes.
let mut hashed_accounts = BTreeMap::<B256, Option<Account>>::default();
for (hashed_address, account) in self.0.accounts() {
hashed_accounts.insert(hashed_address, account);
}
// Write hashed account updates.
let mut hashed_accounts_cursor = tx.cursor_write::<tables::HashedAccount>()?;
for (hashed_address, account) in hashed_accounts {
if let Some(account) = account {
hashed_accounts_cursor.upsert(hashed_address, account)?;
} else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
}
// Collect hashed storage changes.
let mut hashed_storages = BTreeMap::<B256, (bool, BTreeMap<B256, U256>)>::default();
for (hashed_address, storage) in self.0.storages() {
let entry = hashed_storages.entry(*hashed_address).or_default();
entry.0 |= storage.wiped();
for (hashed_slot, value) in storage.storage_slots() {
entry.1.insert(hashed_slot, value);
}
}
// Write hashed storage changes.
let mut hashed_storage_cursor = tx.cursor_dup_write::<tables::HashedStorage>()?;
for (hashed_address, (wiped, storage)) in hashed_storages {
if wiped && hashed_storage_cursor.seek_exact(hashed_address)?.is_some() {
hashed_storage_cursor.delete_current_duplicates()?;
}
for (hashed_slot, value) in storage {
let entry = StorageEntry { key: hashed_slot, value };
if let Some(db_entry) =
hashed_storage_cursor.seek_by_key_subkey(hashed_address, entry.key)?
{
if db_entry.key == entry.key {
hashed_storage_cursor.delete_current()?;
}
}
if entry.value != U256::ZERO {
hashed_storage_cursor.upsert(hashed_address, entry)?;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::create_test_provider_factory;
use reth_primitives::{keccak256, Address};
use reth_trie::hashed_cursor::HashedStorage;
#[test]
fn wiped_entries_are_removed() {
let provider_factory = create_test_provider_factory();
let addresses = (0..10).map(|_| Address::random()).collect::<Vec<_>>();
let destroyed_address = *addresses.first().unwrap();
let destroyed_address_hashed = keccak256(destroyed_address);
let slot = B256::with_last_byte(1);
let hashed_slot = keccak256(slot);
{
let provider_rw = provider_factory.provider_rw().unwrap();
let mut accounts_cursor =
provider_rw.tx_ref().cursor_write::<tables::HashedAccount>().unwrap();
let mut storage_cursor =
provider_rw.tx_ref().cursor_write::<tables::HashedStorage>().unwrap();
for address in addresses {
let hashed_address = keccak256(address);
accounts_cursor
.insert(hashed_address, Account { nonce: 1, ..Default::default() })
.unwrap();
storage_cursor
.insert(hashed_address, StorageEntry { key: hashed_slot, value: U256::from(1) })
.unwrap();
}
provider_rw.commit().unwrap();
}
let mut hashed_state = HashedPostState::default();
hashed_state.insert_destroyed_account(destroyed_address_hashed);
hashed_state.insert_hashed_storage(destroyed_address_hashed, HashedStorage::new(true));
let provider_rw = provider_factory.provider_rw().unwrap();
assert_eq!(HashedStateChanges(hashed_state).write_to_db(provider_rw.tx_ref()), Ok(()));
provider_rw.commit().unwrap();
let provider = provider_factory.provider().unwrap();
assert_eq!(
provider.tx_ref().get::<tables::HashedAccount>(destroyed_address_hashed),
Ok(None)
);
assert_eq!(
provider
.tx_ref()
.cursor_read::<tables::HashedStorage>()
.unwrap()
.seek_by_key_subkey(destroyed_address_hashed, hashed_slot),
Ok(None)
);
}
}

View File

@ -1,11 +1,13 @@
//! Bundle state module.
//! This module contains all the logic related to bundle state.
mod bundle_state_with_receipts;
mod hashed_state_changes;
mod state_changes;
mod state_reverts;
pub use bundle_state_with_receipts::{
AccountRevertInit, BundleStateInit, BundleStateWithReceipts, OriginalValuesKnown, RevertsInit,
};
pub use hashed_state_changes::HashedStateChanges;
pub use state_changes::StateChanges;
pub use state_reverts::StateReverts;

View File

@ -1,5 +1,5 @@
use crate::{
bundle_state::{BundleStateInit, BundleStateWithReceipts, RevertsInit},
bundle_state::{BundleStateInit, BundleStateWithReceipts, HashedStateChanges, RevertsInit},
providers::{database::metrics, SnapshotProvider},
traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
@ -43,7 +43,9 @@ use reth_primitives::{
TransactionMeta, TransactionSigned, TransactionSignedEcRecovered, TransactionSignedNoHash,
TxHash, TxNumber, Withdrawal, B256, U256,
};
use reth_trie::{prefix_set::PrefixSetMut, StateRoot};
use reth_trie::{
hashed_cursor::HashedPostState, prefix_set::PrefixSetMut, updates::TrieUpdates, StateRoot,
};
use revm::primitives::{BlockEnv, CfgEnv, SpecId};
use std::{
collections::{hash_map, BTreeMap, BTreeSet, HashMap, HashSet},
@ -2287,10 +2289,12 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
Ok(block_indices)
}
fn append_blocks_with_bundle_state(
fn append_blocks_with_state(
&self,
blocks: Vec<SealedBlockWithSenders>,
state: BundleStateWithReceipts,
hashed_state: HashedPostState,
trie_updates: TrieUpdates,
prune_modes: Option<&PruneModes>,
) -> ProviderResult<()> {
if blocks.is_empty() {
@ -2303,8 +2307,6 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
let last = blocks.last().unwrap();
let last_block_number = last.number;
let last_block_hash = last.hash();
let expected_state_root = last.state_root;
let mut durations_recorder = metrics::DurationsRecorder::default();
@ -2320,7 +2322,11 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
state.write_to_db(self.tx_ref(), OriginalValuesKnown::No)?;
durations_recorder.record_relative(metrics::Action::InsertState);
self.insert_hashes(first_number..=last_block_number, last_block_hash, expected_state_root)?;
// insert hashes and intermediate merkle nodes
{
HashedStateChanges(hashed_state).write_to_db(&self.tx)?;
trie_updates.flush(&self.tx)?;
}
durations_recorder.record_relative(metrics::Action::InsertHashes);
self.update_history_indices(first_number..=last_block_number)?;

View File

@ -10,6 +10,7 @@ use reth_primitives::{
ChainSpec, Header, PruneModes, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader,
B256,
};
use reth_trie::{hashed_cursor::HashedPostState, updates::TrieUpdates};
use std::ops::RangeInclusive;
/// Enum to control transaction hash inclusion.
@ -291,10 +292,12 @@ pub trait BlockWriter: Send + Sync {
/// # Returns
///
/// Returns `Ok(())` on success, or an error if any operation fails.
fn append_blocks_with_bundle_state(
fn append_blocks_with_state(
&self,
blocks: Vec<SealedBlockWithSenders>,
state: BundleStateWithReceipts,
hashed_state: HashedPostState,
trie_updates: TrieUpdates,
prune_modes: Option<&PruneModes>,
) -> ProviderResult<()>;
}

View File

@ -32,6 +32,19 @@ impl HashedStorage {
}
}
/// Returns `true` if the storage was wiped.
pub fn wiped(&self) -> bool {
self.wiped
}
/// Returns all storage slots.
pub fn storage_slots(&self) -> impl Iterator<Item = (B256, U256)> + '_ {
self.zero_valued_slots
.iter()
.map(|slot| (*slot, U256::ZERO))
.chain(self.non_zero_valued_storage.iter().cloned())
}
/// Sorts the non zero value storage entries.
pub fn sort_storage(&mut self) {
if !self.sorted {
@ -58,8 +71,8 @@ impl HashedStorage {
pub struct HashedPostState {
/// Map of hashed addresses to account info.
accounts: Vec<(B256, Account)>,
/// Set of cleared accounts.
cleared_accounts: HashSet<B256>,
/// Set of destroyed accounts.
destroyed_accounts: HashSet<B256>,
/// Map of hashed addresses to hashed storage.
storages: HashMap<B256, HashedStorage>,
/// Whether the account and storage entries were sorted or not.
@ -70,7 +83,7 @@ impl Default for HashedPostState {
fn default() -> Self {
Self {
accounts: Vec::new(),
cleared_accounts: HashSet::new(),
destroyed_accounts: HashSet::new(),
storages: HashMap::new(),
sorted: true, // empty is sorted
}
@ -84,6 +97,18 @@ impl HashedPostState {
self
}
/// Returns all accounts with their state.
pub fn accounts(&self) -> impl Iterator<Item = (B256, Option<Account>)> + '_ {
self.destroyed_accounts.iter().map(|hashed_address| (*hashed_address, None)).chain(
self.accounts.iter().map(|(hashed_address, account)| (*hashed_address, Some(*account))),
)
}
/// Returns all account storages.
pub fn storages(&self) -> impl Iterator<Item = (&B256, &HashedStorage)> {
self.storages.iter()
}
/// Sort account and storage entries.
pub fn sort(&mut self) {
if !self.sorted {
@ -102,9 +127,9 @@ impl HashedPostState {
self.sorted = false;
}
/// Insert cleared hashed account key.
pub fn insert_cleared_account(&mut self, hashed_address: B256) {
self.cleared_accounts.insert(hashed_address);
/// Insert destroyed hashed account key.
pub fn insert_destroyed_account(&mut self, hashed_address: B256) {
self.destroyed_accounts.insert(hashed_address);
}
/// Insert hashed storage entry.
@ -113,6 +138,11 @@ impl HashedPostState {
self.storages.insert(hashed_address, hashed_storage);
}
/// Returns all destroyed accounts.
pub fn destroyed_accounts(&self) -> HashSet<B256> {
self.destroyed_accounts.clone()
}
/// Construct (PrefixSet)[PrefixSet] from hashed post state.
/// The prefix sets contain the hashed account and storage keys that have been changed in the
/// post state.
@ -125,7 +155,7 @@ impl HashedPostState {
for (hashed_address, _) in &self.accounts {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
}
for hashed_address in &self.cleared_accounts {
for hashed_address in &self.destroyed_accounts {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
}
@ -213,7 +243,7 @@ impl<'b, C> HashedPostStateAccountCursor<'b, C> {
/// This function only checks the post state, not the database, because the latter does not
/// store destroyed accounts.
fn is_account_cleared(&self, account: &B256) -> bool {
self.post_state.cleared_accounts.contains(account)
self.post_state.destroyed_accounts.contains(account)
}
/// Return the account with the lowest hashed account key.
@ -667,7 +697,7 @@ mod tests {
let mut hashed_post_state = HashedPostState::default();
for (hashed_address, account) in accounts.iter().filter(|x| x.0[31] % 2 != 0) {
if removed_keys.contains(hashed_address) {
hashed_post_state.insert_cleared_account(*hashed_address);
hashed_post_state.insert_destroyed_account(*hashed_address);
} else {
hashed_post_state.insert_account(*hashed_address, *account);
}
@ -722,7 +752,7 @@ mod tests {
if let Some(account) = account {
hashed_post_state.insert_account(*hashed_address, *account);
} else {
hashed_post_state.insert_cleared_account(*hashed_address);
hashed_post_state.insert_destroyed_account(*hashed_address);
}
}
hashed_post_state.sort();