feat: move hashed state and trie writing to provider (#9636)

This commit is contained in:
Dan Cline
2024-07-24 10:26:03 -04:00
committed by GitHub
parent 7ad93d5ae4
commit 21335d65d0
24 changed files with 1382 additions and 1273 deletions

View File

@ -19,9 +19,9 @@ use reth_network::NetworkHandle;
use reth_network_api::NetworkInfo;
use reth_primitives::BlockHashOrNumber;
use reth_provider::{
AccountExtReader, ChainSpecProvider, HashingWriter, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StateWriter,
StaticFileProviderFactory, StorageReader,
writer::StorageWriter, AccountExtReader, ChainSpecProvider, HashingWriter, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderFactory, StageCheckpointReader,
StateWriter, StaticFileProviderFactory, StorageReader,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::StageId;
@ -168,7 +168,8 @@ impl Command {
.try_seal_with_senders()
.map_err(|_| BlockValidationError::SenderRecoveryError)?,
)?;
execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?;
let mut storage_writer = StorageWriter::new(Some(&provider_rw), None);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
let storage_lists = provider_rw.changed_storages_with_range(block.number..=block.number)?;
let storages = provider_rw.plain_state_storages(storage_lists)?;
provider_rw.insert_storage_for_hashing(storages)?;

View File

@ -16,8 +16,8 @@ use reth_network_api::NetworkInfo;
use reth_network_p2p::full_block::FullBlockClient;
use reth_primitives::BlockHashOrNumber;
use reth_provider::{
BlockNumReader, BlockWriter, ChainSpecProvider, HeaderProvider, LatestStateProviderRef,
OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter,
writer::StorageWriter, BlockNumReader, BlockWriter, ChainSpecProvider, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::{
@ -150,7 +150,10 @@ impl Command {
),
));
executor.execute_and_verify_one((&sealed_block.clone().unseal(), td).into())?;
executor.finalize().write_to_storage(&provider_rw, None, OriginalValuesKnown::Yes)?;
let execution_outcome = executor.finalize();
let mut storage_writer = StorageWriter::new(Some(&provider_rw), None);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
let checkpoint = Some(StageCheckpoint::new(
block_number.checked_sub(1).ok_or(eyre::eyre!("GenesisBlockHasNoParent"))?,

View File

@ -78,16 +78,15 @@ impl<DB: Database> PersistenceService<DB> {
// Must be written after blocks because of the receipt lookup.
let execution_outcome = block.execution_outcome().clone();
// TODO: do we provide a static file producer here?
execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?;
let mut storage_writer = StorageWriter::new(Some(&provider_rw), None);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
// insert hashes and intermediate merkle nodes
{
let trie_updates = block.trie_updates().clone();
let hashed_state = block.hashed_state();
// TODO: use single storage writer in task when sf / db tasks are combined
let storage_writer = StorageWriter::new(Some(&provider_rw), None);
storage_writer.write_hashed_state(&hashed_state.clone().into_sorted())?;
trie_updates.write_to_database(provider_rw.tx_ref())?;
storage_writer.write_trie_updates(&trie_updates)?;
}
// update history indices
@ -186,7 +185,7 @@ impl<DB: Database> PersistenceService<DB> {
let receipts_writer =
provider.get_writer(first_block.number, StaticFileSegment::Receipts)?;
let storage_writer = StorageWriter::new(Some(&provider_rw), Some(receipts_writer));
let mut storage_writer = StorageWriter::new(Some(&provider_rw), Some(receipts_writer));
let receipts_iter = blocks.iter().map(|block| {
let receipts = block.execution_outcome().receipts().receipt_vec.clone();
debug_assert!(receipts.len() == 1);

View File

@ -16,8 +16,8 @@ use reth_node_core::version::SHORT_VERSION;
use reth_optimism_primitives::bedrock_import::is_dup_tx;
use reth_primitives::Receipts;
use reth_provider::{
OriginalValuesKnown, ProviderFactory, StageCheckpointReader, StateWriter,
StaticFileProviderFactory, StaticFileWriter, StatsReader,
writer::StorageWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointReader,
StateWriter, StaticFileProviderFactory, StaticFileWriter, StatsReader,
};
use reth_stages::StageId;
use reth_static_file_types::StaticFileSegment;
@ -140,7 +140,7 @@ where
);
// We're reusing receipt writing code internal to
// `ExecutionOutcome::write_to_storage`, so we just use a default empty
// `StorageWriter::append_receipts_from_blocks`, so we just use a default empty
// `BundleState`.
let execution_outcome =
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
@ -149,11 +149,8 @@ where
static_file_provider.get_writer(first_block, StaticFileSegment::Receipts)?;
// finally, write the receipts
execution_outcome.write_to_storage(
&provider,
Some(static_file_producer),
OriginalValuesKnown::Yes,
)?;
let mut storage_writer = StorageWriter::new(Some(&provider), Some(static_file_producer));
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
}
provider.commit()?;

View File

@ -6,6 +6,7 @@ use reth_db_api::{
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{Account, Address, SealedBlock, B256, U256};
use reth_provider::TrieWriter;
use reth_stages::{
stages::{AccountHashingStage, StorageHashingStage},
test_utils::{StorageKind, TestStageDB},
@ -139,12 +140,10 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB {
let offset = transitions.len() as u64;
let provider_rw = db.factory.provider_rw().unwrap();
db.insert_changesets(transitions, None).unwrap();
db.commit(|tx| {
updates.write_to_database(tx)?;
Ok(())
})
.unwrap();
provider_rw.write_trie_updates(&updates).unwrap();
provider_rw.commit().unwrap();
let (transitions, final_state) = random_changeset_range(
&mut rng,

View File

@ -10,6 +10,7 @@ use reth_primitives::{BlockNumber, Header, StaticFileSegment};
use reth_primitives_traits::format_gas_throughput;
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
writer::StorageWriter,
BlockReader, DatabaseProviderRW, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderError, StateWriter, StatsReader, TransactionVariant,
};
@ -358,8 +359,11 @@ where
}
let time = Instant::now();
// write output
state.write_to_storage(provider, static_file_producer, OriginalValuesKnown::Yes)?;
let mut writer = StorageWriter::new(Some(provider), static_file_producer);
writer.write_to_storage(state, OriginalValuesKnown::Yes)?;
let db_write_duration = time.elapsed();
debug!(
target: "sync::stages::execution",

View File

@ -7,8 +7,8 @@ use reth_db_api::{
};
use reth_primitives::{BlockNumber, GotExpected, SealedHeader, B256};
use reth_provider::{
DatabaseProviderRW, HeaderProvider, ProviderError, StageCheckpointReader,
StageCheckpointWriter, StatsReader,
writer::StorageWriter, DatabaseProviderRW, HeaderProvider, ProviderError,
StageCheckpointReader, StageCheckpointWriter, StatsReader,
};
use reth_stages_api::{
BlockErrorKind, EntitiesCheckpoint, ExecInput, ExecOutput, MerkleCheckpoint, Stage,
@ -218,7 +218,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
})?;
match progress {
StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
updates.write_to_database(tx)?;
let writer = StorageWriter::new(Some(provider), None);
writer.write_trie_updates(&updates)?;
let checkpoint = MerkleCheckpoint::new(
to_block,
@ -238,7 +239,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
})
}
StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
updates.write_to_database(tx)?;
let writer = StorageWriter::new(Some(provider), None);
writer.write_trie_updates(&updates)?;
entities_checkpoint.processed += hashed_entries_walked as u64;
@ -253,7 +255,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
StageError::Fatal(Box::new(e))
})?;
updates.write_to_database(provider.tx_ref())?;
let writer = StorageWriter::new(Some(provider), None);
writer.write_trie_updates(&updates)?;
let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
provider.count_entries::<tables::HashedStorages>()?)
@ -326,7 +329,8 @@ impl<DB: Database> Stage<DB> for MerkleStage {
validate_state_root(block_root, target.seal_slow(), input.unwind_to)?;
// Validation passed, apply unwind changes to the database.
updates.write_to_database(provider.tx_ref())?;
let writer = StorageWriter::new(Some(provider), None);
writer.write_trie_updates(&updates)?;
// TODO(alexey): update entities checkpoint
} else {

View File

@ -11,12 +11,13 @@ use reth_primitives::{
Account, Address, Bytecode, Receipts, StaticFileSegment, StorageEntry, B256, U256,
};
use reth_provider::{
bundle_state::{BundleStateInit, RevertsInit},
errors::provider::ProviderResult,
providers::{StaticFileProvider, StaticFileWriter},
BlockHashReader, BlockNumReader, ChainSpecProvider, DatabaseProviderRW, ExecutionOutcome,
HashingWriter, HistoryWriter, OriginalValuesKnown, ProviderError, ProviderFactory,
StageCheckpointWriter, StateWriter, StaticFileProviderFactory,
writer::StorageWriter,
BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DatabaseProviderRW,
ExecutionOutcome, HashingWriter, HistoryWriter, OriginalValuesKnown, ProviderError,
ProviderFactory, RevertsInit, StageCheckpointWriter, StateWriter, StaticFileProviderFactory,
TrieWriter,
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_trie::{IntermediateStateRootState, StateRoot as StateRootComputer, StateRootProgress};
@ -202,7 +203,8 @@ pub fn insert_state<'a, 'b, DB: Database>(
Vec::new(),
);
execution_outcome.write_to_storage(provider, None, OriginalValuesKnown::Yes)?;
let mut storage_writer = StorageWriter::new(Some(provider), None);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
trace!(target: "reth::cli", "Inserted state");
@ -462,7 +464,7 @@ fn compute_state_root<DB: Database>(provider: &DatabaseProviderRW<DB>) -> eyre::
.root_with_progress()?
{
StateRootProgress::Progress(state, _, updates) => {
let updated_len = updates.write_to_database(tx)?;
let updated_len = provider.write_trie_updates(&updates)?;
total_flushed_updates += updated_len;
trace!(target: "reth::cli",
@ -482,7 +484,7 @@ fn compute_state_root<DB: Database>(provider: &DatabaseProviderRW<DB>) -> eyre::
}
}
StateRootProgress::Complete(root, _, updates) => {
let updated_len = updates.write_to_database(tx)?;
let updated_len = provider.write_trie_updates(&updates)?;
total_flushed_updates += updated_len;
trace!(target: "reth::cli",

View File

@ -75,9 +75,4 @@ rand.workspace = true
[features]
optimism = ["reth-primitives/optimism", "reth-execution-types/optimism"]
serde = ["reth-execution-types/serde"]
test-utils = [
"alloy-rlp",
"reth-db/test-utils",
"reth-nippy-jar/test-utils",
"reth-chain-state/test-utils"
]
test-utils = ["alloy-rlp", "reth-db/test-utils", "reth-nippy-jar/test-utils", "reth-trie/test-utils", "reth-chain-state/test-utils", "reth-db/test-utils"]

File diff suppressed because it is too large Load Diff

View File

@ -1,10 +1,8 @@
//! Bundle state module.
//! This module contains all the logic related to bundle state.
mod execution_outcome;
mod state_changes;
mod state_reverts;
pub use execution_outcome::{AccountRevertInit, BundleStateInit, OriginalValuesKnown, RevertsInit};
pub use state_changes::StateChanges;
pub use state_reverts::{StateReverts, StorageRevertsIter};

View File

@ -34,7 +34,10 @@ pub use reth_storage_errors::provider::{ProviderError, ProviderResult};
pub use reth_execution_types::*;
pub mod bundle_state;
pub use bundle_state::{OriginalValuesKnown, StateChanges, StateReverts};
pub use bundle_state::{StateChanges, StateReverts};
/// Re-export `OriginalValuesKnown`
pub use revm::db::states::OriginalValuesKnown;
/// Writer standalone type.
pub mod writer;

View File

@ -1,5 +1,4 @@
use crate::{
bundle_state::{BundleStateInit, RevertsInit},
providers::{database::metrics, static_file::StaticFileWriter, StaticFileProvider},
to_range,
traits::{
@ -7,12 +6,13 @@ use crate::{
},
writer::StorageWriter,
AccountReader, BlockExecutionReader, BlockExecutionWriter, BlockHashReader, BlockNumReader,
BlockReader, BlockWriter, EvmEnvProvider, FinalizedBlockReader, FinalizedBlockWriter,
HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HistoricalStateProvider,
HistoryWriter, LatestStateProvider, OriginalValuesKnown, ProviderError, PruneCheckpointReader,
PruneCheckpointWriter, RequestsProvider, StageCheckpointReader, StateProviderBox, StateWriter,
StatsReader, StorageReader, TransactionVariant, TransactionsProvider, TransactionsProviderExt,
WithdrawalsProvider,
BlockReader, BlockWriter, BundleStateInit, EvmEnvProvider, FinalizedBlockReader,
FinalizedBlockWriter, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
HistoricalStateProvider, HistoryWriter, LatestStateProvider, OriginalValuesKnown,
ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RequestsProvider, RevertsInit,
StageCheckpointReader, StateProviderBox, StateWriter, StatsReader, StorageReader,
StorageTrieWriter, TransactionVariant, TransactionsProvider, TransactionsProviderExt,
TrieWriter, WithdrawalsProvider,
};
use itertools::{izip, Itertools};
use reth_chainspec::{ChainInfo, ChainSpec, EthereumHardforks};
@ -44,8 +44,9 @@ use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_errors::provider::{ProviderResult, RootMismatch};
use reth_trie::{
prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
updates::TrieUpdates,
HashedPostStateSorted, Nibbles, StateRoot,
trie_cursor::DatabaseStorageTrieCursor,
updates::{StorageTrieUpdates, TrieUpdates},
HashedPostStateSorted, Nibbles, StateRoot, StoredNibbles,
};
use reth_trie_db::DatabaseStateRoot;
use revm::primitives::{BlockEnv, CfgEnvWithHandlerCfg};
@ -2627,6 +2628,93 @@ impl<TX: DbTx> StorageReader for DatabaseProvider<TX> {
}
}
impl<TX: DbTxMut + DbTx> TrieWriter for DatabaseProvider<TX> {
/// Writes trie updates. Returns the number of entries modified.
fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize> {
if trie_updates.is_empty() {
return Ok(0)
}
// Track the number of inserted entries.
let mut num_entries = 0;
// Merge updated and removed nodes. Updated nodes must take precedence.
let mut account_updates = trie_updates
.removed_nodes_ref()
.iter()
.filter_map(|n| {
(!trie_updates.account_nodes_ref().contains_key(n)).then_some((n, None))
})
.collect::<Vec<_>>();
account_updates.extend(
trie_updates.account_nodes_ref().iter().map(|(nibbles, node)| (nibbles, Some(node))),
);
// Sort trie node updates.
account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
let tx = self.tx_ref();
let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
for (key, updated_node) in account_updates {
let nibbles = StoredNibbles(key.clone());
match updated_node {
Some(node) => {
if !nibbles.0.is_empty() {
num_entries += 1;
account_trie_cursor.upsert(nibbles, node.clone())?;
}
}
None => {
num_entries += 1;
if account_trie_cursor.seek_exact(nibbles)?.is_some() {
account_trie_cursor.delete_current()?;
}
}
}
}
num_entries += self.write_storage_trie_updates(trie_updates.storage_tries_ref())?;
Ok(num_entries)
}
}
impl<TX: DbTxMut + DbTx> StorageTrieWriter for DatabaseProvider<TX> {
/// Writes storage trie updates from the given storage trie map. First sorts the storage trie
/// updates by the hashed address, writing in sorted order.
fn write_storage_trie_updates(
&self,
storage_tries: &HashMap<B256, StorageTrieUpdates>,
) -> ProviderResult<usize> {
let mut num_entries = 0;
let mut storage_tries = Vec::from_iter(storage_tries);
storage_tries.sort_unstable_by(|a, b| a.0.cmp(b.0));
let mut cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
for (hashed_address, storage_trie_updates) in storage_tries {
let mut db_storage_trie_cursor =
DatabaseStorageTrieCursor::new(cursor, *hashed_address);
num_entries +=
db_storage_trie_cursor.write_storage_trie_updates(storage_trie_updates)?;
cursor = db_storage_trie_cursor.cursor;
}
Ok(num_entries)
}
fn write_individual_storage_trie_updates(
&self,
hashed_address: B256,
updates: &StorageTrieUpdates,
) -> ProviderResult<usize> {
if updates.is_empty() {
return Ok(0)
}
let cursor = self.tx_ref().cursor_dup_write::<tables::StoragesTrie>()?;
let mut trie_db_cursor = DatabaseStorageTrieCursor::new(cursor, hashed_address);
Ok(trie_db_cursor.write_storage_trie_updates(updates)?)
}
}
impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
fn unwind_account_hashing(
&self,
@ -2822,7 +2910,7 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
block_hash: end_block_hash,
})))
}
trie_updates.write_to_database(&self.tx)?;
self.write_trie_updates(&trie_updates)?;
}
durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
@ -3031,7 +3119,7 @@ impl<DB: Database> BlockExecutionWriter for DatabaseProviderRW<DB> {
block_hash: parent_hash,
})))
}
trie_updates.write_to_database(&self.tx)?;
self.write_trie_updates(&trie_updates)?;
// get blocks
let blocks = self.take_block_range(range.clone())?;
@ -3119,7 +3207,7 @@ impl<DB: Database> BlockExecutionWriter for DatabaseProviderRW<DB> {
block_hash: parent_hash,
})))
}
trie_updates.write_to_database(&self.tx)?;
self.write_trie_updates(&trie_updates)?;
// get blocks
let blocks = self.take_block_range(range.clone())?;
@ -3333,14 +3421,16 @@ impl<DB: Database> BlockWriter for DatabaseProviderRW<DB> {
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
execution_outcome.write_to_storage(self, None, OriginalValuesKnown::No)?;
// TODO: should _these_ be moved to storagewriter? seems like storagewriter should be
// _above_ db provider
let mut storage_writer = StorageWriter::new(Some(self), None);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
durations_recorder.record_relative(metrics::Action::InsertState);
// insert hashes and intermediate merkle nodes
{
let storage_writer = StorageWriter::new(Some(self), None);
storage_writer.write_hashed_state(&hashed_state)?;
trie_updates.write_to_database(&self.tx)?;
self.write_trie_updates(&trie_updates)?;
}
durations_recorder.record_relative(metrics::Action::InsertHashes);

View File

@ -1,9 +1,13 @@
use crate::{providers::StaticFileProvider, ProviderFactory};
use crate::{providers::StaticFileProvider, HashingWriter, ProviderFactory, TrieWriter};
use reth_chainspec::{ChainSpec, MAINNET};
use reth_db::{
test_utils::{create_test_rw_db, create_test_static_files_dir, TempDatabase},
DatabaseEnv,
Database, DatabaseEnv,
};
use reth_errors::ProviderResult;
use reth_primitives::{Account, StorageEntry, B256};
use reth_trie::StateRoot;
use reth_trie_db::DatabaseStateRoot;
use std::sync::Arc;
pub mod blocks;
@ -31,3 +35,39 @@ pub fn create_test_provider_factory_with_chain_spec(
StaticFileProvider::read_write(static_dir.into_path()).expect("static file provider"),
)
}
/// Inserts the genesis alloc from the provided chain spec into the trie.
pub fn insert_genesis<DB: Database>(
provider_factory: &ProviderFactory<DB>,
chain_spec: Arc<ChainSpec>,
) -> ProviderResult<B256> {
let provider = provider_factory.provider_rw()?;
// Hash accounts and insert them into hashing table.
let genesis = chain_spec.genesis();
let alloc_accounts = genesis
.alloc
.iter()
.map(|(addr, account)| (*addr, Some(Account::from_genesis_account(account))));
provider.insert_account_for_hashing(alloc_accounts).unwrap();
let alloc_storage = genesis.alloc.clone().into_iter().filter_map(|(addr, account)| {
// Only return `Some` if there is storage.
account.storage.map(|storage| {
(
addr,
storage.into_iter().map(|(key, value)| StorageEntry { key, value: value.into() }),
)
})
});
provider.insert_storage_for_hashing(alloc_storage)?;
let (root, updates) = StateRoot::from_tx(provider.tx_ref())
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
provider.write_trie_updates(&updates).unwrap();
provider.commit()?;
Ok(root)
}

View File

@ -24,6 +24,9 @@ pub use spec::ChainSpecProvider;
mod hashing;
pub use hashing::HashingWriter;
mod trie;
pub use trie::{StorageTrieWriter, TrieWriter};
mod history;
pub use history::HistoryWriter;

View File

@ -1,19 +1,14 @@
use crate::{providers::StaticFileProviderRWRefMut, DatabaseProviderRW};
use reth_db::Database;
use reth_execution_types::ExecutionOutcome;
use reth_storage_errors::provider::ProviderResult;
use revm::db::OriginalValuesKnown;
/// A helper trait for [`ExecutionOutcome`](reth_execution_types::ExecutionOutcome) to
/// write state and receipts to storage.
/// A helper trait for [`ExecutionOutcome`] to write state and receipts to storage.
pub trait StateWriter {
/// Write the data and receipts to the database or static files if `static_file_producer` is
/// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts.
fn write_to_storage<DB>(
self,
provider_rw: &DatabaseProviderRW<DB>,
static_file_producer: Option<StaticFileProviderRWRefMut<'_>>,
fn write_to_storage(
&mut self,
execution_outcome: ExecutionOutcome,
is_value_known: OriginalValuesKnown,
) -> ProviderResult<()>
where
DB: Database;
) -> ProviderResult<()>;
}

View File

@ -0,0 +1,36 @@
use std::collections::HashMap;
use auto_impl::auto_impl;
use reth_primitives::B256;
use reth_storage_errors::provider::ProviderResult;
use reth_trie::updates::{StorageTrieUpdates, TrieUpdates};
/// Trie Writer
#[auto_impl(&, Arc, Box)]
pub trait TrieWriter: Send + Sync {
/// Writes trie updates to the database.
///
/// Returns the number of entries modified.
fn write_trie_updates(&self, trie_updates: &TrieUpdates) -> ProviderResult<usize>;
}
/// Storage Trie Writer
#[auto_impl(&, Arc, Box)]
pub trait StorageTrieWriter: Send + Sync {
/// Writes storage trie updates from the given storage trie map.
///
/// First sorts the storage trie updates by the hashed address key, writing in sorted order.
///
/// Returns the number of entries modified.
fn write_storage_trie_updates(
&self,
storage_tries: &HashMap<B256, StorageTrieUpdates>,
) -> ProviderResult<usize>;
/// Writes storage trie updates for the given hashed address.
fn write_individual_storage_trie_updates(
&self,
hashed_address: B256,
updates: &StorageTrieUpdates,
) -> ProviderResult<usize>;
}

File diff suppressed because it is too large Load Diff

View File

@ -73,6 +73,6 @@ similar-asserts.workspace = true
criterion.workspace = true
[features]
metrics = ["reth-metrics", "dep:metrics"]
metrics = ["reth-metrics", "reth-trie/metrics", "dep:metrics"]
serde = ["dep:serde"]
test-utils = ["triehash", "reth-trie-common/test-utils"]

View File

@ -4,7 +4,9 @@ use reth_db_api::database::Database;
use reth_primitives::{
constants::EMPTY_ROOT_HASH, keccak256, Account, Address, Bytes, StorageEntry, B256, U256,
};
use reth_provider::{test_utils::create_test_provider_factory, HashingWriter, ProviderFactory};
use reth_provider::{
test_utils::create_test_provider_factory, HashingWriter, ProviderFactory, TrieWriter,
};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{proof::Proof, Nibbles, StateRoot};
use reth_trie_common::{AccountProof, StorageProof};
@ -40,7 +42,7 @@ fn insert_genesis<DB: Database>(
provider_factory: &ProviderFactory<DB>,
chain_spec: Arc<ChainSpec>,
) -> ProviderResult<B256> {
let mut provider = provider_factory.provider_rw()?;
let provider = provider_factory.provider_rw()?;
// Hash accounts and insert them into hashing table.
let genesis = chain_spec.genesis();
@ -64,7 +66,7 @@ fn insert_genesis<DB: Database>(
let (root, updates) = StateRoot::from_tx(provider.tx_ref())
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
updates.write_to_database(provider.tx_mut())?;
provider.write_trie_updates(&updates).unwrap();
provider.commit()?;

View File

@ -6,7 +6,9 @@ use reth_db_api::{
transaction::DbTxMut,
};
use reth_primitives::{hex_literal::hex, Account, StorageEntry, U256};
use reth_provider::{test_utils::create_test_provider_factory, DatabaseProviderRW};
use reth_provider::{
test_utils::create_test_provider_factory, DatabaseProviderRW, StorageTrieWriter, TrieWriter,
};
use reth_trie::{
prefix_set::PrefixSetMut,
test_utils::{state_root, state_root_prehashed, storage_root, storage_root_prehashed},
@ -82,7 +84,7 @@ fn incremental_vs_full_root(inputs: &[&str], modified: &str) {
let modified_root = loader.root().unwrap();
// Update the intermediate roots table so that we can run the incremental verification
trie_updates.write_to_database(tx.tx_ref(), hashed_address).unwrap();
tx.write_individual_storage_trie_updates(hashed_address, &trie_updates).unwrap();
// 3. Calculate the incremental root
let mut storage_changes = PrefixSetMut::default();
@ -637,7 +639,7 @@ fn account_trie_around_extension_node_with_dbtrie() {
let (got, updates) = StateRoot::from_tx(tx.tx_ref()).root_with_updates().unwrap();
assert_eq!(expected, got);
updates.write_to_database(tx.tx_ref()).unwrap();
tx.write_trie_updates(&updates).unwrap();
// read the account updates from the db
let mut accounts_trie = tx.tx_ref().cursor_read::<tables::AccountsTrie>().unwrap();
@ -684,7 +686,7 @@ proptest! {
state.iter().map(|(&key, &balance)| (key, (Account { balance, ..Default::default() }, std::iter::empty())))
);
assert_eq!(expected_root, state_root);
trie_updates.write_to_database(tx.tx_ref()).unwrap();
tx.write_trie_updates(&trie_updates).unwrap();
}
}
}

View File

@ -6,6 +6,7 @@ use rayon::ThreadPoolBuilder;
use reth_primitives::{Account, B256, U256};
use reth_provider::{
providers::ConsistentDbView, test_utils::create_test_provider_factory, writer::StorageWriter,
TrieWriter,
};
use reth_tasks::pool::BlockingTaskPool;
use reth_trie::{
@ -32,7 +33,7 @@ pub fn calculate_state_root(c: &mut Criterion) {
storage_writer.write_hashed_state(&db_state.into_sorted()).unwrap();
let (_, updates) =
StateRoot::from_tx(provider_rw.tx_ref()).root_with_updates().unwrap();
updates.write_to_database(provider_rw.tx_ref()).unwrap();
provider_rw.write_trie_updates(&updates).unwrap();
provider_rw.commit().unwrap();
}

View File

@ -1,11 +1,17 @@
use super::{TrieCursor, TrieCursorFactory};
use crate::{BranchNodeCompact, Nibbles, StoredNibbles, StoredNibblesSubKey};
use reth_db::{tables, DatabaseError};
use crate::{
updates::StorageTrieUpdates, BranchNodeCompact, Nibbles, StoredNibbles, StoredNibblesSubKey,
};
use reth_db::{
cursor::{DbCursorRW, DbDupCursorRW},
tables, DatabaseError,
};
use reth_db_api::{
cursor::{DbCursorRO, DbDupCursorRO},
transaction::DbTx,
};
use reth_primitives::B256;
use reth_trie_common::StorageTrieEntry;
/// Implementation of the trie cursor factory for a database transaction.
impl<'a, TX: DbTx> TrieCursorFactory for &'a TX {
@ -86,6 +92,62 @@ impl<C> DatabaseStorageTrieCursor<C> {
}
}
impl<C> DatabaseStorageTrieCursor<C>
where
C: DbCursorRO<tables::StoragesTrie>
+ DbCursorRW<tables::StoragesTrie>
+ DbDupCursorRO<tables::StoragesTrie>
+ DbDupCursorRW<tables::StoragesTrie>,
{
/// Writes storage updates
pub fn write_storage_trie_updates(
&mut self,
updates: &StorageTrieUpdates,
) -> Result<usize, DatabaseError> {
// The storage trie for this account has to be deleted.
if updates.is_deleted && self.cursor.seek_exact(self.hashed_address)?.is_some() {
self.cursor.delete_current_duplicates()?;
}
// Merge updated and removed nodes. Updated nodes must take precedence.
let mut storage_updates = updates
.removed_nodes
.iter()
.filter_map(|n| (!updates.storage_nodes.contains_key(n)).then_some((n, None)))
.collect::<Vec<_>>();
storage_updates
.extend(updates.storage_nodes.iter().map(|(nibbles, node)| (nibbles, Some(node))));
// Sort trie node updates.
storage_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
let mut num_entries = 0;
for (nibbles, maybe_updated) in storage_updates.into_iter().filter(|(n, _)| !n.is_empty()) {
num_entries += 1;
let nibbles = StoredNibblesSubKey(nibbles.clone());
// Delete the old entry if it exists.
if self
.cursor
.seek_by_key_subkey(self.hashed_address, nibbles.clone())?
.filter(|e| e.nibbles == nibbles)
.is_some()
{
self.cursor.delete_current()?;
}
// There is an updated version of this node, insert new entry.
if let Some(node) = maybe_updated {
self.cursor.upsert(
self.hashed_address,
StorageTrieEntry { nibbles, node: node.clone() },
)?;
}
}
Ok(num_entries)
}
}
impl<C> TrieCursor for DatabaseStorageTrieCursor<C>
where
C: DbCursorRO<tables::StoragesTrie> + DbDupCursorRO<tables::StoragesTrie> + Send + Sync,

View File

@ -1,12 +1,4 @@
use crate::{
walker::TrieWalker, BranchNodeCompact, HashBuilder, Nibbles, StorageTrieEntry, StoredNibbles,
StoredNibblesSubKey,
};
use reth_db::tables;
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
transaction::{DbTx, DbTxMut},
};
use crate::{walker::TrieWalker, BranchNodeCompact, HashBuilder, Nibbles};
use reth_primitives::B256;
use std::collections::{HashMap, HashSet};
@ -84,64 +76,6 @@ impl TrieUpdates {
.collect();
TrieUpdatesSorted { removed_nodes: self.removed_nodes, account_nodes, storage_tries }
}
/// Flush updates all aggregated updates to the database.
///
/// # Returns
///
/// The number of storage trie entries updated in the database.
pub fn write_to_database<TX>(self, tx: &TX) -> Result<usize, reth_db::DatabaseError>
where
TX: DbTx + DbTxMut,
{
if self.is_empty() {
return Ok(0)
}
// Track the number of inserted entries.
let mut num_entries = 0;
// Merge updated and removed nodes. Updated nodes must take precedence.
let mut account_updates = self
.removed_nodes
.into_iter()
.filter_map(|n| (!self.account_nodes.contains_key(&n)).then_some((n, None)))
.collect::<Vec<_>>();
account_updates
.extend(self.account_nodes.into_iter().map(|(nibbles, node)| (nibbles, Some(node))));
// Sort trie node updates.
account_updates.sort_unstable_by(|a, b| a.0.cmp(&b.0));
let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
for (key, updated_node) in account_updates {
let nibbles = StoredNibbles(key);
match updated_node {
Some(node) => {
if !nibbles.0.is_empty() {
num_entries += 1;
account_trie_cursor.upsert(nibbles, node)?;
}
}
None => {
num_entries += 1;
if account_trie_cursor.seek_exact(nibbles)?.is_some() {
account_trie_cursor.delete_current()?;
}
}
}
}
let mut storage_tries = Vec::from_iter(self.storage_tries);
storage_tries.sort_unstable_by(|a, b| a.0.cmp(&b.0));
let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
for (hashed_address, storage_trie_updates) in storage_tries {
let updated_storage_entries =
storage_trie_updates.write_with_cursor(&mut storage_trie_cursor, hashed_address)?;
num_entries += updated_storage_entries;
}
Ok(num_entries)
}
}
/// Trie updates for storage trie of a single account.
@ -225,77 +159,6 @@ impl StorageTrieUpdates {
storage_nodes,
}
}
/// Initializes a storage trie cursor and writes updates to database.
pub fn write_to_database<TX>(
self,
tx: &TX,
hashed_address: B256,
) -> Result<usize, reth_db::DatabaseError>
where
TX: DbTx + DbTxMut,
{
if self.is_empty() {
return Ok(0)
}
let mut cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
self.write_with_cursor(&mut cursor, hashed_address)
}
/// Writes updates to database.
///
/// # Returns
///
/// The number of storage trie entries updated in the database.
fn write_with_cursor<C>(
self,
cursor: &mut C,
hashed_address: B256,
) -> Result<usize, reth_db::DatabaseError>
where
C: DbCursorRO<tables::StoragesTrie>
+ DbCursorRW<tables::StoragesTrie>
+ DbDupCursorRO<tables::StoragesTrie>
+ DbDupCursorRW<tables::StoragesTrie>,
{
// The storage trie for this account has to be deleted.
if self.is_deleted && cursor.seek_exact(hashed_address)?.is_some() {
cursor.delete_current_duplicates()?;
}
// Merge updated and removed nodes. Updated nodes must take precedence.
let mut storage_updates = self
.removed_nodes
.into_iter()
.filter_map(|n| (!self.storage_nodes.contains_key(&n)).then_some((n, None)))
.collect::<Vec<_>>();
storage_updates
.extend(self.storage_nodes.into_iter().map(|(nibbles, node)| (nibbles, Some(node))));
// Sort trie node updates.
storage_updates.sort_unstable_by(|a, b| a.0.cmp(&b.0));
let mut num_entries = 0;
for (nibbles, maybe_updated) in storage_updates.into_iter().filter(|(n, _)| !n.is_empty()) {
num_entries += 1;
let nibbles = StoredNibblesSubKey(nibbles);
// Delete the old entry if it exists.
if cursor
.seek_by_key_subkey(hashed_address, nibbles.clone())?
.filter(|e| e.nibbles == nibbles)
.is_some()
{
cursor.delete_current()?;
}
// There is an updated version of this node, insert new entry.
if let Some(node) = maybe_updated {
cursor.upsert(hashed_address, StorageTrieEntry { nibbles, node })?;
}
}
Ok(num_entries)
}
}
/// Sorted trie updates used for lookups and insertions.