feat(trie): write sorted hashed state (#9519)

This commit is contained in:
Roman Krasiuk
2024-07-15 08:15:58 -07:00
committed by GitHub
parent 80b02fec0f
commit 63e5dac0b6
11 changed files with 46 additions and 38 deletions

View File

@ -283,8 +283,7 @@ impl Command {
debug!(target: "reth::cli", ?execution_outcome, "Executed block");
let hashed_post_state = execution_outcome.hash_state_slow();
let (state_root, trie_updates) = execution_outcome
.hash_state_slow()
let (state_root, trie_updates) = hashed_post_state
.state_root_with_updates(provider_factory.provider()?.tx_ref())?;
if state_root != block_with_senders.state_root {
@ -300,7 +299,7 @@ impl Command {
provider_rw.append_blocks_with_state(
Vec::from([block_with_senders]),
execution_outcome,
hashed_post_state,
hashed_post_state.into_sorted(),
trie_updates,
)?;
info!(target: "reth::cli", "Successfully appended built block");

View File

@ -26,6 +26,7 @@ use reth_provider::{
use reth_prune_types::PruneModes;
use reth_stages_api::{MetricEvent, MetricEventsSender};
use reth_storage_errors::provider::{ProviderResult, RootMismatch};
use reth_trie::{hashed_cursor::HashedPostStateCursorFactory, StateRoot};
use std::{
collections::{btree_map::Entry, BTreeMap, HashSet},
sync::Arc,
@ -1214,6 +1215,8 @@ where
) -> Result<(), CanonicalError> {
let (blocks, state, chain_trie_updates) = chain.into_inner();
let hashed_state = state.hash_state_slow();
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
let hashed_state_sorted = hashed_state.into_sorted();
// Compute state root or retrieve cached trie updates before opening write transaction.
let block_hash_numbers =
@ -1233,8 +1236,13 @@ where
// State root calculation can take a while, and we're sure no write transaction
// will be open in parallel. See https://github.com/paradigmxyz/reth/issues/6168.
.disable_long_read_transaction_safety();
let (state_root, trie_updates) = hashed_state
.state_root_with_updates(provider.tx_ref())
let (state_root, trie_updates) = StateRoot::from_tx(provider.tx_ref())
.with_hashed_cursor_factory(HashedPostStateCursorFactory::new(
provider.tx_ref(),
&hashed_state_sorted,
))
.with_prefix_sets(prefix_sets)
.root_with_updates()
.map_err(Into::<BlockValidationError>::into)?;
let tip = blocks.tip();
if state_root != tip.state_root {
@ -1256,7 +1264,7 @@ where
.append_blocks_with_state(
blocks.into_blocks().collect(),
state,
hashed_state,
hashed_state_sorted,
trie_updates,
)
.map_err(|e| CanonicalError::CanonicalCommit(e.to_string()))?;

View File

@ -85,7 +85,7 @@ impl<DB: Database> DatabaseService<DB> {
{
let trie_updates = block.trie_updates().clone();
let hashed_state = block.hashed_state();
HashedStateChanges(hashed_state.clone()).write_to_db(&provider_rw)?;
HashedStateChanges(hashed_state).write_to_db(&provider_rw)?;
trie_updates.write_to_database(provider_rw.tx_ref())?;
}

View File

@ -11,7 +11,7 @@ use reth_prune_types::PruneModes;
use reth_stages::{test_utils::TestStages, ExecOutput, StageError};
use reth_stages_api::Pipeline;
use reth_static_file::StaticFileProducer;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use reth_trie::{updates::TrieUpdates, HashedPostStateSorted};
use revm::db::BundleState;
use std::{collections::VecDeque, ops::Range, sync::Arc};
use tokio::sync::watch;
@ -103,7 +103,7 @@ pub(crate) fn get_executed_block_with_number(block_number: BlockNumber) -> Execu
block_number,
vec![Requests::default()],
)),
Arc::new(HashedPostState::default()),
Arc::new(HashedPostStateSorted::default()),
Arc::new(TrieUpdates::default()),
)
}

View File

@ -32,7 +32,7 @@ use reth_rpc_types::{
},
ExecutionPayload,
};
use reth_trie::{updates::TrieUpdates, HashedPostState};
use reth_trie::{updates::TrieUpdates, HashedPostState, HashedPostStateSorted};
use std::{
collections::{BTreeMap, HashMap},
marker::PhantomData,
@ -53,7 +53,7 @@ pub struct ExecutedBlock {
block: Arc<SealedBlock>,
senders: Arc<Vec<Address>>,
execution_output: Arc<ExecutionOutcome>,
hashed_state: Arc<HashedPostState>,
hashed_state: Arc<HashedPostStateSorted>,
trie: Arc<TrieUpdates>,
}
@ -62,7 +62,7 @@ impl ExecutedBlock {
block: Arc<SealedBlock>,
senders: Arc<Vec<Address>>,
execution_output: Arc<ExecutionOutcome>,
hashed_state: Arc<HashedPostState>,
hashed_state: Arc<HashedPostStateSorted>,
trie: Arc<TrieUpdates>,
) -> Self {
Self { block, senders, execution_output, hashed_state, trie }
@ -84,7 +84,7 @@ impl ExecutedBlock {
}
/// Returns a reference to the hashed state result of the execution outcome
pub(crate) fn hashed_state(&self) -> &HashedPostState {
pub(crate) fn hashed_state(&self) -> &HashedPostStateSorted {
&self.hashed_state
}
@ -666,7 +666,7 @@ where
block_number,
vec![Requests::from(output.requests)],
)),
hashed_state: Arc::new(hashed_state),
hashed_state: Arc::new(hashed_state.into_sorted()),
trie: Arc::new(trie_output),
};
self.state.tree_state.insert_executed(executed);

View File

@ -382,7 +382,7 @@ mod tests {
use reth_provider::{BlockWriter, ProviderFactory, StaticFileProviderFactory};
use reth_stages_api::StageUnitCheckpoint;
use reth_testing_utils::generators::{self, random_header, random_header_range};
use reth_trie::{updates::TrieUpdates, HashedPostState};
use reth_trie::{updates::TrieUpdates, HashedPostStateSorted};
use test_runner::HeadersTestRunner;
mod test_runner {
@ -630,7 +630,7 @@ mod tests {
.append_blocks_with_state(
sealed_blocks,
ExecutionOutcome::default(),
HashedPostState::default(),
HashedPostStateSorted::default(),
TrieUpdates::default(),
)
.unwrap();

View File

@ -7,23 +7,22 @@ use reth_db_api::{
DatabaseError,
};
use reth_primitives::{StorageEntry, U256};
use reth_trie::HashedPostState;
use reth_trie::HashedPostStateSorted;
/// Changes to the hashed state.
#[derive(Debug, Default)]
pub struct HashedStateChanges(pub HashedPostState);
#[derive(Debug)]
pub struct HashedStateChanges<'a>(pub &'a HashedPostStateSorted);
impl HashedStateChanges {
impl HashedStateChanges<'_> {
/// Write the bundle state to the database.
pub fn write_to_db<DB>(self, provider: &DatabaseProviderRW<DB>) -> Result<(), DatabaseError>
where
DB: Database,
{
// Write hashed account updates.
let sorted_accounts = self.0.accounts.into_iter().sorted_unstable_by_key(|(key, _)| *key);
let mut hashed_accounts_cursor =
provider.tx_ref().cursor_write::<tables::HashedAccounts>()?;
for (hashed_address, account) in sorted_accounts {
for (hashed_address, account) in self.0.accounts().accounts_sorted() {
if let Some(account) = account {
hashed_accounts_cursor.upsert(hashed_address, account)?;
} else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
@ -32,19 +31,18 @@ impl HashedStateChanges {
}
// Write hashed storage changes.
let sorted_storages = self.0.storages.into_iter().sorted_by_key(|(key, _)| *key);
let sorted_storages = self.0.account_storages().iter().sorted_by_key(|(key, _)| *key);
let mut hashed_storage_cursor =
provider.tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
for (hashed_address, storage) in sorted_storages {
if storage.wiped && hashed_storage_cursor.seek_exact(hashed_address)?.is_some() {
if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
hashed_storage_cursor.delete_current_duplicates()?;
}
let sorted_storage = storage.storage.into_iter().sorted_by_key(|(key, _)| *key);
for (hashed_slot, value) in sorted_storage {
for (hashed_slot, value) in storage.storage_slots_sorted() {
let entry = StorageEntry { key: hashed_slot, value };
if let Some(db_entry) =
hashed_storage_cursor.seek_by_key_subkey(hashed_address, entry.key)?
hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
{
if db_entry.key == entry.key {
hashed_storage_cursor.delete_current()?;
@ -52,7 +50,7 @@ impl HashedStateChanges {
}
if entry.value != U256::ZERO {
hashed_storage_cursor.upsert(hashed_address, entry)?;
hashed_storage_cursor.upsert(*hashed_address, entry)?;
}
}
}
@ -67,7 +65,7 @@ mod tests {
use crate::test_utils::create_test_provider_factory;
use reth_db_api::transaction::DbTx;
use reth_primitives::{keccak256, Account, Address, B256};
use reth_trie::HashedStorage;
use reth_trie::{HashedPostState, HashedStorage};
#[test]
fn wiped_entries_are_removed() {
@ -102,7 +100,10 @@ mod tests {
hashed_state.storages.insert(destroyed_address_hashed, HashedStorage::new(true));
let provider_rw = provider_factory.provider_rw().unwrap();
assert_eq!(HashedStateChanges(hashed_state).write_to_db(&provider_rw), Ok(()));
assert_eq!(
HashedStateChanges(&hashed_state.into_sorted()).write_to_db(&provider_rw),
Ok(())
);
provider_rw.commit().unwrap();
let provider = provider_factory.provider().unwrap();

View File

@ -44,7 +44,7 @@ use reth_storage_errors::provider::{ProviderResult, RootMismatch};
use reth_trie::{
prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
updates::TrieUpdates,
HashedPostState, Nibbles, StateRoot,
HashedPostStateSorted, Nibbles, StateRoot,
};
use revm::primitives::{BlockEnv, CfgEnvWithHandlerCfg};
use std::{
@ -3274,7 +3274,7 @@ impl<DB: Database> BlockWriter for DatabaseProviderRW<DB> {
&self,
blocks: Vec<SealedBlockWithSenders>,
execution_outcome: ExecutionOutcome,
hashed_state: HashedPostState,
hashed_state: HashedPostStateSorted,
trie_updates: TrieUpdates,
) -> ProviderResult<()> {
if blocks.is_empty() {
@ -3302,7 +3302,7 @@ impl<DB: Database> BlockWriter for DatabaseProviderRW<DB> {
// insert hashes and intermediate merkle nodes
{
HashedStateChanges(hashed_state).write_to_db(self)?;
HashedStateChanges(&hashed_state).write_to_db(self)?;
trie_updates.write_to_database(&self.tx)?;
}
durations_recorder.record_relative(metrics::Action::InsertHashes);

View File

@ -3,7 +3,7 @@ use reth_execution_types::{Chain, ExecutionOutcome};
use reth_primitives::{BlockNumber, SealedBlockWithSenders};
use reth_storage_api::BlockReader;
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use reth_trie::{updates::TrieUpdates, HashedPostStateSorted};
use std::ops::RangeInclusive;
/// BlockExecution Writer
@ -61,7 +61,7 @@ pub trait BlockWriter: Send + Sync {
&self,
blocks: Vec<SealedBlockWithSenders>,
execution_outcome: ExecutionOutcome,
hashed_state: HashedPostState,
hashed_state: HashedPostStateSorted,
trie_updates: TrieUpdates,
) -> ProviderResult<()>;
}

View File

@ -27,7 +27,7 @@ pub fn calculate_state_root(c: &mut Criterion) {
let provider_factory = create_test_provider_factory();
{
let provider_rw = provider_factory.provider_rw().unwrap();
HashedStateChanges(db_state).write_to_db(&provider_rw).unwrap();
HashedStateChanges(&db_state.into_sorted()).write_to_db(&provider_rw).unwrap();
let (_, updates) =
StateRoot::from_tx(provider_rw.tx_ref()).root_with_updates().unwrap();
updates.write_to_database(provider_rw.tx_ref()).unwrap();

View File

@ -322,7 +322,7 @@ impl HashedStorage {
}
/// Sorted hashed post state optimized for iterating during state trie calculation.
#[derive(PartialEq, Eq, Clone, Debug)]
#[derive(PartialEq, Eq, Clone, Default, Debug)]
pub struct HashedPostStateSorted {
/// Updated state of accounts.
pub(crate) accounts: HashedAccountsSorted,
@ -343,7 +343,7 @@ impl HashedPostStateSorted {
}
/// Sorted account state optimized for iterating during state trie calculation.
#[derive(Clone, Eq, PartialEq, Debug)]
#[derive(Clone, Eq, PartialEq, Default, Debug)]
pub struct HashedAccountsSorted {
/// Sorted collection of hashed addresses and their account info.
pub(crate) accounts: Vec<(B256, Account)>,