mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
trie: revamp trie updates (#9239)
This commit is contained in:
@ -25,7 +25,7 @@ use reth_provider::{
|
||||
use reth_revm::database::StateProviderDatabase;
|
||||
use reth_stages::StageId;
|
||||
use reth_tasks::TaskExecutor;
|
||||
use reth_trie::{updates::TrieKey, StateRoot};
|
||||
use reth_trie::StateRoot;
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
use tracing::*;
|
||||
|
||||
@ -187,15 +187,16 @@ impl Command {
|
||||
// Compare updates
|
||||
let mut in_mem_mismatched = Vec::new();
|
||||
let mut incremental_mismatched = Vec::new();
|
||||
let mut in_mem_updates_iter = in_memory_updates.into_iter().peekable();
|
||||
let mut incremental_updates_iter = incremental_trie_updates.into_iter().peekable();
|
||||
let mut in_mem_updates_iter = in_memory_updates.account_nodes_ref().iter().peekable();
|
||||
let mut incremental_updates_iter =
|
||||
incremental_trie_updates.account_nodes_ref().iter().peekable();
|
||||
|
||||
while in_mem_updates_iter.peek().is_some() || incremental_updates_iter.peek().is_some() {
|
||||
match (in_mem_updates_iter.next(), incremental_updates_iter.next()) {
|
||||
(Some(in_mem), Some(incr)) => {
|
||||
similar_asserts::assert_eq!(in_mem.0, incr.0, "Nibbles don't match");
|
||||
if in_mem.1 != incr.1 &&
|
||||
matches!(in_mem.0, TrieKey::AccountNode(ref nibbles) if nibbles.len() > self.skip_node_depth.unwrap_or_default())
|
||||
in_mem.0.len() > self.skip_node_depth.unwrap_or_default()
|
||||
{
|
||||
in_mem_mismatched.push(in_mem);
|
||||
incremental_mismatched.push(incr);
|
||||
|
||||
@ -86,7 +86,7 @@ impl<DB: Database> Persistence<DB> {
|
||||
let trie_updates = block.trie_updates().clone();
|
||||
let hashed_state = block.hashed_state();
|
||||
HashedStateChanges(hashed_state.clone()).write_to_db(provider_rw.tx_ref())?;
|
||||
trie_updates.flush(provider_rw.tx_ref())?;
|
||||
trie_updates.write_to_database(provider_rw.tx_ref())?;
|
||||
}
|
||||
|
||||
// update history indices
|
||||
|
||||
@ -139,7 +139,11 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB {
|
||||
let offset = transitions.len() as u64;
|
||||
|
||||
db.insert_changesets(transitions, None).unwrap();
|
||||
db.commit(|tx| Ok(updates.flush(tx)?)).unwrap();
|
||||
db.commit(|tx| {
|
||||
updates.write_to_database(tx)?;
|
||||
Ok(())
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let (transitions, final_state) = random_changeset_range(
|
||||
&mut rng,
|
||||
|
||||
@ -217,7 +217,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
|
||||
})?;
|
||||
match progress {
|
||||
StateRootProgress::Progress(state, hashed_entries_walked, updates) => {
|
||||
updates.flush(tx)?;
|
||||
updates.write_to_database(tx)?;
|
||||
|
||||
let checkpoint = MerkleCheckpoint::new(
|
||||
to_block,
|
||||
@ -237,7 +237,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
|
||||
})
|
||||
}
|
||||
StateRootProgress::Complete(root, hashed_entries_walked, updates) => {
|
||||
updates.flush(tx)?;
|
||||
updates.write_to_database(tx)?;
|
||||
|
||||
entities_checkpoint.processed += hashed_entries_walked as u64;
|
||||
|
||||
@ -252,7 +252,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
|
||||
error!(target: "sync::stages::merkle", %e, ?current_block_number, ?to_block, "Incremental state root failed! {INVALID_STATE_ROOT_ERROR_MESSAGE}");
|
||||
StageError::Fatal(Box::new(e))
|
||||
})?;
|
||||
updates.flush(provider.tx_ref())?;
|
||||
updates.write_to_database(provider.tx_ref())?;
|
||||
|
||||
let total_hashed_entries = (provider.count_entries::<tables::HashedAccounts>()? +
|
||||
provider.count_entries::<tables::HashedStorages>()?)
|
||||
@ -325,7 +325,7 @@ impl<DB: Database> Stage<DB> for MerkleStage {
|
||||
validate_state_root(block_root, target.seal_slow(), input.unwind_to)?;
|
||||
|
||||
// Validation passed, apply unwind changes to the database.
|
||||
updates.flush(provider.tx_ref())?;
|
||||
updates.write_to_database(provider.tx_ref())?;
|
||||
|
||||
// TODO(alexey): update entities checkpoint
|
||||
} else {
|
||||
|
||||
@ -464,19 +464,17 @@ fn compute_state_root<DB: Database>(provider: &DatabaseProviderRW<DB>) -> eyre::
|
||||
.root_with_progress()?
|
||||
{
|
||||
StateRootProgress::Progress(state, _, updates) => {
|
||||
let updates_len = updates.len();
|
||||
let updated_len = updates.write_to_database(tx)?;
|
||||
total_flushed_updates += updated_len;
|
||||
|
||||
trace!(target: "reth::cli",
|
||||
last_account_key = %state.last_account_key,
|
||||
updates_len,
|
||||
updated_len,
|
||||
total_flushed_updates,
|
||||
"Flushing trie updates"
|
||||
);
|
||||
|
||||
intermediate_state = Some(*state);
|
||||
updates.flush(tx)?;
|
||||
|
||||
total_flushed_updates += updates_len;
|
||||
|
||||
if total_flushed_updates % SOFT_LIMIT_COUNT_FLUSHED_UPDATES == 0 {
|
||||
info!(target: "reth::cli",
|
||||
@ -486,15 +484,12 @@ fn compute_state_root<DB: Database>(provider: &DatabaseProviderRW<DB>) -> eyre::
|
||||
}
|
||||
}
|
||||
StateRootProgress::Complete(root, _, updates) => {
|
||||
let updates_len = updates.len();
|
||||
|
||||
updates.flush(tx)?;
|
||||
|
||||
total_flushed_updates += updates_len;
|
||||
let updated_len = updates.write_to_database(tx)?;
|
||||
total_flushed_updates += updated_len;
|
||||
|
||||
trace!(target: "reth::cli",
|
||||
%root,
|
||||
updates_len = updates_len,
|
||||
updated_len,
|
||||
total_flushed_updates,
|
||||
"State root has been computed"
|
||||
);
|
||||
|
||||
@ -892,7 +892,7 @@ mod tests {
|
||||
}
|
||||
|
||||
let (_, updates) = StateRoot::from_tx(tx).root_with_updates().unwrap();
|
||||
updates.flush(tx).unwrap();
|
||||
updates.write_to_database(tx).unwrap();
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
|
||||
@ -2399,7 +2399,7 @@ impl<TX: DbTxMut + DbTx> HashingWriter for DatabaseProvider<TX> {
|
||||
block_hash: end_block_hash,
|
||||
})))
|
||||
}
|
||||
trie_updates.flush(&self.tx)?;
|
||||
trie_updates.write_to_database(&self.tx)?;
|
||||
}
|
||||
durations_recorder.record_relative(metrics::Action::InsertMerkleTree);
|
||||
|
||||
@ -2595,7 +2595,7 @@ impl<TX: DbTxMut + DbTx> BlockExecutionWriter for DatabaseProvider<TX> {
|
||||
block_hash: parent_hash,
|
||||
})))
|
||||
}
|
||||
trie_updates.flush(&self.tx)?;
|
||||
trie_updates.write_to_database(&self.tx)?;
|
||||
}
|
||||
|
||||
// get blocks
|
||||
@ -2797,7 +2797,7 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
|
||||
// insert hashes and intermediate merkle nodes
|
||||
{
|
||||
HashedStateChanges(hashed_state).write_to_db(&self.tx)?;
|
||||
trie_updates.flush(&self.tx)?;
|
||||
trie_updates.write_to_database(&self.tx)?;
|
||||
}
|
||||
durations_recorder.record_relative(metrics::Action::InsertHashes);
|
||||
|
||||
|
||||
@ -30,7 +30,7 @@ pub fn calculate_state_root(c: &mut Criterion) {
|
||||
HashedStateChanges(db_state).write_to_db(provider_rw.tx_ref()).unwrap();
|
||||
let (_, updates) =
|
||||
StateRoot::from_tx(provider_rw.tx_ref()).root_with_updates().unwrap();
|
||||
updates.flush(provider_rw.tx_ref()).unwrap();
|
||||
updates.write_to_database(provider_rw.tx_ref()).unwrap();
|
||||
provider_rw.commit().unwrap();
|
||||
}
|
||||
|
||||
|
||||
@ -166,7 +166,7 @@ where
|
||||
};
|
||||
|
||||
if retain_updates {
|
||||
trie_updates.extend(updates.into_iter());
|
||||
trie_updates.insert_storage_updates(hashed_address, updates);
|
||||
}
|
||||
|
||||
account_rlp.clear();
|
||||
@ -179,7 +179,7 @@ where
|
||||
|
||||
let root = hash_builder.root();
|
||||
|
||||
trie_updates.finalize_state_updates(
|
||||
trie_updates.finalize(
|
||||
account_node_iter.walker,
|
||||
hash_builder,
|
||||
prefix_sets.destroyed_accounts,
|
||||
|
||||
@ -148,7 +148,7 @@ where
|
||||
};
|
||||
|
||||
if retain_updates {
|
||||
trie_updates.extend(updates.into_iter());
|
||||
trie_updates.insert_storage_updates(hashed_address, updates);
|
||||
}
|
||||
|
||||
account_rlp.clear();
|
||||
@ -161,7 +161,7 @@ where
|
||||
|
||||
let root = hash_builder.root();
|
||||
|
||||
trie_updates.finalize_state_updates(
|
||||
trie_updates.finalize(
|
||||
account_node_iter.walker,
|
||||
hash_builder,
|
||||
prefix_sets.destroyed_accounts,
|
||||
|
||||
@ -12,6 +12,7 @@ use reth_db_api::transaction::DbTx;
|
||||
use reth_execution_errors::{StateRootError, StorageRootError};
|
||||
use reth_primitives::{constants::EMPTY_ROOT_HASH, keccak256, Address, B256};
|
||||
use reth_trie_common::{proof::ProofRetainer, AccountProof, StorageProof, TrieAccount};
|
||||
|
||||
/// A struct for generating merkle proofs.
|
||||
///
|
||||
/// Proof generator adds the target address and slots to the prefix set, enables the proof retainer
|
||||
@ -226,7 +227,7 @@ mod tests {
|
||||
let (root, updates) = StateRoot::from_tx(provider.tx_ref())
|
||||
.root_with_updates()
|
||||
.map_err(Into::<reth_db::DatabaseError>::into)?;
|
||||
updates.flush(provider.tx_mut())?;
|
||||
updates.write_to_database(provider.tx_mut())?;
|
||||
|
||||
provider.commit()?;
|
||||
|
||||
|
||||
@ -5,7 +5,7 @@ use crate::{
|
||||
progress::{IntermediateStateRootState, StateRootProgress},
|
||||
stats::TrieTracker,
|
||||
trie_cursor::TrieCursorFactory,
|
||||
updates::{TrieKey, TrieOp, TrieUpdates},
|
||||
updates::{StorageTrieUpdates, TrieUpdates},
|
||||
walker::TrieWalker,
|
||||
HashBuilder, Nibbles, TrieAccount,
|
||||
};
|
||||
@ -237,6 +237,7 @@ where
|
||||
|
||||
let mut account_rlp = Vec::with_capacity(128);
|
||||
let mut hashed_entries_walked = 0;
|
||||
let mut updated_storage_nodes = 0;
|
||||
while let Some(node) = account_node_iter.try_next()? {
|
||||
match node {
|
||||
TrieElement::Branch(node) => {
|
||||
@ -273,7 +274,9 @@ where
|
||||
let (root, storage_slots_walked, updates) =
|
||||
storage_root_calculator.root_with_updates()?;
|
||||
hashed_entries_walked += storage_slots_walked;
|
||||
trie_updates.extend(updates);
|
||||
// We only walk over hashed address once, so it's safe to insert.
|
||||
updated_storage_nodes += updates.len();
|
||||
trie_updates.insert_storage_updates(hashed_address, updates);
|
||||
root
|
||||
} else {
|
||||
storage_root_calculator.root()?
|
||||
@ -285,12 +288,14 @@ where
|
||||
hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp);
|
||||
|
||||
// Decide if we need to return intermediate progress.
|
||||
let total_updates_len = trie_updates.len() +
|
||||
account_node_iter.walker.deleted_keys_len() +
|
||||
let total_updates_len = updated_storage_nodes +
|
||||
account_node_iter.walker.removed_keys_len() +
|
||||
hash_builder.updates_len();
|
||||
if retain_updates && total_updates_len as u64 >= self.threshold {
|
||||
let (walker_stack, walker_deleted_keys) = account_node_iter.walker.split();
|
||||
trie_updates.removed_nodes.extend(walker_deleted_keys);
|
||||
let (hash_builder, hash_builder_updates) = hash_builder.split();
|
||||
trie_updates.account_nodes.extend(hash_builder_updates);
|
||||
|
||||
let state = IntermediateStateRootState {
|
||||
hash_builder,
|
||||
@ -298,13 +303,6 @@ where
|
||||
last_account_key: hashed_address,
|
||||
};
|
||||
|
||||
trie_updates.extend(
|
||||
walker_deleted_keys
|
||||
.into_iter()
|
||||
.map(|nibbles| (TrieKey::AccountNode(nibbles), TrieOp::Delete)),
|
||||
);
|
||||
trie_updates.extend_with_account_updates(hash_builder_updates);
|
||||
|
||||
return Ok(StateRootProgress::Progress(
|
||||
Box::new(state),
|
||||
hashed_entries_walked,
|
||||
@ -317,7 +315,7 @@ where
|
||||
|
||||
let root = hash_builder.root();
|
||||
|
||||
trie_updates.finalize_state_updates(
|
||||
trie_updates.finalize(
|
||||
account_node_iter.walker,
|
||||
hash_builder,
|
||||
self.prefix_sets.destroyed_accounts,
|
||||
@ -456,7 +454,7 @@ where
|
||||
/// # Returns
|
||||
///
|
||||
/// The storage root and storage trie updates for a given address.
|
||||
pub fn root_with_updates(self) -> Result<(B256, usize, TrieUpdates), StorageRootError> {
|
||||
pub fn root_with_updates(self) -> Result<(B256, usize, StorageTrieUpdates), StorageRootError> {
|
||||
self.calculate(true)
|
||||
}
|
||||
|
||||
@ -479,7 +477,7 @@ where
|
||||
pub fn calculate(
|
||||
self,
|
||||
retain_updates: bool,
|
||||
) -> Result<(B256, usize, TrieUpdates), StorageRootError> {
|
||||
) -> Result<(B256, usize, StorageTrieUpdates), StorageRootError> {
|
||||
trace!(target: "trie::storage_root", hashed_address = ?self.hashed_address, "calculating storage root");
|
||||
|
||||
let mut hashed_storage_cursor =
|
||||
@ -487,11 +485,7 @@ where
|
||||
|
||||
// short circuit on empty storage
|
||||
if hashed_storage_cursor.is_storage_empty()? {
|
||||
return Ok((
|
||||
EMPTY_ROOT_HASH,
|
||||
0,
|
||||
TrieUpdates::from([(TrieKey::StorageTrie(self.hashed_address), TrieOp::Delete)]),
|
||||
))
|
||||
return Ok((EMPTY_ROOT_HASH, 0, StorageTrieUpdates::deleted()))
|
||||
}
|
||||
|
||||
let mut tracker = TrieTracker::default();
|
||||
@ -520,12 +514,8 @@ where
|
||||
|
||||
let root = hash_builder.root();
|
||||
|
||||
let mut trie_updates = TrieUpdates::default();
|
||||
trie_updates.finalize_storage_updates(
|
||||
self.hashed_address,
|
||||
storage_node_iter.walker,
|
||||
hash_builder,
|
||||
);
|
||||
let mut trie_updates = StorageTrieUpdates::default();
|
||||
trie_updates.finalize(storage_node_iter.walker, hash_builder);
|
||||
|
||||
let stats = tracker.finish();
|
||||
|
||||
@ -551,11 +541,9 @@ where
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::{
|
||||
hashed_cursor::HashedPostStateCursorFactory,
|
||||
prefix_set::PrefixSetMut,
|
||||
test_utils::{state_root, state_root_prehashed, storage_root, storage_root_prehashed},
|
||||
trie_cursor::InMemoryTrieCursorFactory,
|
||||
BranchNodeCompact, HashedPostState, HashedStorage, TrieMask,
|
||||
BranchNodeCompact, TrieMask,
|
||||
};
|
||||
use proptest::{prelude::ProptestConfig, proptest};
|
||||
use proptest_arbitrary_interop::arb;
|
||||
@ -569,7 +557,6 @@ mod tests {
|
||||
use reth_trie_common::triehash::KeccakHasher;
|
||||
use std::{
|
||||
collections::{BTreeMap, HashMap},
|
||||
iter,
|
||||
ops::Mul,
|
||||
str::FromStr,
|
||||
sync::Arc,
|
||||
@ -629,7 +616,7 @@ mod tests {
|
||||
let modified_root = loader.root().unwrap();
|
||||
|
||||
// Update the intermediate roots table so that we can run the incremental verification
|
||||
trie_updates.flush(tx.tx_ref()).unwrap();
|
||||
trie_updates.write_to_database(tx.tx_ref(), hashed_address).unwrap();
|
||||
|
||||
// 3. Calculate the incremental root
|
||||
let mut storage_changes = PrefixSetMut::default();
|
||||
@ -988,14 +975,7 @@ mod tests {
|
||||
assert_eq!(root, computed_expected_root);
|
||||
|
||||
// Check account trie
|
||||
let mut account_updates = trie_updates
|
||||
.iter()
|
||||
.filter_map(|(k, v)| match (k, v) {
|
||||
(TrieKey::AccountNode(nibbles), TrieOp::Update(node)) => Some((nibbles, node)),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
account_updates.sort_unstable_by(|a, b| a.0.cmp(b.0));
|
||||
let account_updates = trie_updates.clone().into_sorted().account_nodes;
|
||||
assert_eq!(account_updates.len(), 2);
|
||||
|
||||
let (nibbles1a, node1a) = account_updates.first().unwrap();
|
||||
@ -1015,16 +995,13 @@ mod tests {
|
||||
assert_eq!(node2a.hashes.len(), 1);
|
||||
|
||||
// Check storage trie
|
||||
let storage_updates = trie_updates
|
||||
.iter()
|
||||
.filter_map(|entry| match entry {
|
||||
(TrieKey::StorageNode(_, nibbles), TrieOp::Update(node)) => Some((nibbles, node)),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(storage_updates.len(), 1);
|
||||
let mut updated_storage_trie =
|
||||
trie_updates.storage_tries.iter().filter(|(_, u)| !u.storage_nodes.is_empty());
|
||||
assert_eq!(updated_storage_trie.clone().count(), 1);
|
||||
let (_, storage_trie_updates) = updated_storage_trie.next().unwrap();
|
||||
assert_eq!(storage_trie_updates.storage_nodes.len(), 1);
|
||||
|
||||
let (nibbles3, node3) = storage_updates.first().unwrap();
|
||||
let (nibbles3, node3) = storage_trie_updates.storage_nodes.iter().next().unwrap();
|
||||
assert!(nibbles3.is_empty());
|
||||
assert_eq!(node3.state_mask, TrieMask::new(0b1010));
|
||||
assert_eq!(node3.tree_mask, TrieMask::new(0b0000));
|
||||
@ -1058,14 +1035,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert_eq!(root, expected_state_root);
|
||||
|
||||
let mut account_updates = trie_updates
|
||||
.iter()
|
||||
.filter_map(|entry| match entry {
|
||||
(TrieKey::AccountNode(nibbles), TrieOp::Update(node)) => Some((nibbles, node)),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
account_updates.sort_by(|a, b| a.0.cmp(b.0));
|
||||
let account_updates = trie_updates.into_sorted().account_nodes;
|
||||
assert_eq!(account_updates.len(), 2);
|
||||
|
||||
let (nibbles1b, node1b) = account_updates.first().unwrap();
|
||||
@ -1112,19 +1082,11 @@ mod tests {
|
||||
.root_with_updates()
|
||||
.unwrap();
|
||||
assert_eq!(root, computed_expected_root);
|
||||
assert_eq!(trie_updates.len(), 7);
|
||||
assert_eq!(trie_updates.iter().filter(|(_, op)| op.is_update()).count(), 2);
|
||||
assert_eq!(trie_updates.account_nodes.len() + trie_updates.removed_nodes.len(), 1);
|
||||
|
||||
let account_updates = trie_updates
|
||||
.iter()
|
||||
.filter_map(|entry| match entry {
|
||||
(TrieKey::AccountNode(nibbles), TrieOp::Update(node)) => Some((nibbles, node)),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(account_updates.len(), 1);
|
||||
assert_eq!(trie_updates.account_nodes.len(), 1);
|
||||
|
||||
let (nibbles1c, node1c) = account_updates.first().unwrap();
|
||||
let (nibbles1c, node1c) = trie_updates.account_nodes.iter().next().unwrap();
|
||||
assert_eq!(nibbles1c[..], [0xB]);
|
||||
|
||||
assert_eq!(node1c.state_mask, TrieMask::new(0b1011));
|
||||
@ -1171,19 +1133,15 @@ mod tests {
|
||||
.root_with_updates()
|
||||
.unwrap();
|
||||
assert_eq!(root, computed_expected_root);
|
||||
assert_eq!(trie_updates.len(), 6);
|
||||
assert_eq!(trie_updates.iter().filter(|(_, op)| op.is_update()).count(), 1); // no storage root update
|
||||
|
||||
let account_updates = trie_updates
|
||||
assert_eq!(trie_updates.account_nodes.len() + trie_updates.removed_nodes.len(), 1);
|
||||
assert!(!trie_updates
|
||||
.storage_tries
|
||||
.iter()
|
||||
.filter_map(|entry| match entry {
|
||||
(TrieKey::AccountNode(nibbles), TrieOp::Update(node)) => Some((nibbles, node)),
|
||||
_ => None,
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
assert_eq!(account_updates.len(), 1);
|
||||
.any(|(_, u)| !u.storage_nodes.is_empty() || !u.removed_nodes.is_empty())); // no storage root update
|
||||
|
||||
let (nibbles1d, node1d) = account_updates.first().unwrap();
|
||||
assert_eq!(trie_updates.account_nodes.len(), 1);
|
||||
|
||||
let (nibbles1d, node1d) = trie_updates.account_nodes.iter().next().unwrap();
|
||||
assert_eq!(nibbles1d[..], [0xB]);
|
||||
|
||||
assert_eq!(node1d.state_mask, TrieMask::new(0b1011));
|
||||
@ -1207,19 +1165,7 @@ mod tests {
|
||||
|
||||
let (got, updates) = StateRoot::from_tx(tx.tx_ref()).root_with_updates().unwrap();
|
||||
assert_eq!(expected, got);
|
||||
|
||||
// Check account trie
|
||||
let account_updates = updates
|
||||
.iter()
|
||||
.filter_map(|entry| match entry {
|
||||
(TrieKey::AccountNode(nibbles), TrieOp::Update(node)) => {
|
||||
Some((nibbles.clone(), node.clone()))
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
assert_trie_updates(&account_updates);
|
||||
assert_trie_updates(&updates.account_nodes);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -1231,7 +1177,7 @@ mod tests {
|
||||
|
||||
let (got, updates) = StateRoot::from_tx(tx.tx_ref()).root_with_updates().unwrap();
|
||||
assert_eq!(expected, got);
|
||||
updates.flush(tx.tx_ref()).unwrap();
|
||||
updates.write_to_database(tx.tx_ref()).unwrap();
|
||||
|
||||
// read the account updates from the db
|
||||
let mut accounts_trie = tx.tx_ref().cursor_read::<tables::AccountsTrie>().unwrap();
|
||||
@ -1278,7 +1224,7 @@ mod tests {
|
||||
state.iter().map(|(&key, &balance)| (key, (Account { balance, ..Default::default() }, std::iter::empty())))
|
||||
);
|
||||
assert_eq!(expected_root, state_root);
|
||||
trie_updates.flush(tx.tx_ref()).unwrap();
|
||||
trie_updates.write_to_database(tx.tx_ref()).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1294,26 +1240,14 @@ mod tests {
|
||||
let (got, _, updates) =
|
||||
StorageRoot::from_tx_hashed(tx.tx_ref(), hashed_address).root_with_updates().unwrap();
|
||||
assert_eq!(expected_root, got);
|
||||
|
||||
// Check account trie
|
||||
let storage_updates = updates
|
||||
.iter()
|
||||
.filter_map(|entry| match entry {
|
||||
(TrieKey::StorageNode(_, nibbles), TrieOp::Update(node)) => {
|
||||
Some((nibbles.clone(), node.clone()))
|
||||
}
|
||||
_ => None,
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
assert_eq!(expected_updates, storage_updates);
|
||||
|
||||
assert_trie_updates(&storage_updates);
|
||||
assert_eq!(expected_updates, updates);
|
||||
assert_trie_updates(&updates.storage_nodes);
|
||||
}
|
||||
|
||||
fn extension_node_storage_trie(
|
||||
tx: &DatabaseProviderRW<Arc<TempDatabase<DatabaseEnv>>>,
|
||||
hashed_address: B256,
|
||||
) -> (B256, HashMap<Nibbles, BranchNodeCompact>) {
|
||||
) -> (B256, StorageTrieUpdates) {
|
||||
let value = U256::from(1);
|
||||
|
||||
let mut hashed_storage = tx.tx_ref().cursor_write::<tables::HashedStorages>().unwrap();
|
||||
@ -1336,7 +1270,8 @@ mod tests {
|
||||
|
||||
let root = hb.root();
|
||||
let (_, updates) = hb.split();
|
||||
(root, updates)
|
||||
let trie_updates = StorageTrieUpdates { storage_nodes: updates, ..Default::default() };
|
||||
(root, trie_updates)
|
||||
}
|
||||
|
||||
fn extension_node_trie(tx: &DatabaseProviderRW<Arc<TempDatabase<DatabaseEnv>>>) -> B256 {
|
||||
@ -1377,126 +1312,4 @@ mod tests {
|
||||
assert_eq!(node.root_hash, None);
|
||||
assert_eq!(node.hashes.len(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn trie_updates_across_multiple_iterations() {
|
||||
let address = Address::ZERO;
|
||||
let hashed_address = keccak256(address);
|
||||
|
||||
let factory = create_test_provider_factory();
|
||||
|
||||
let mut hashed_storage = BTreeMap::default();
|
||||
let mut post_state = HashedPostState::default();
|
||||
|
||||
// Block #1
|
||||
// Update specific storage slots
|
||||
let mut modified_storage = BTreeMap::default();
|
||||
|
||||
// 0x0f..
|
||||
let modified_key_prefix = Nibbles::from_nibbles(
|
||||
[0x0, 0xf].into_iter().chain(iter::repeat(0).take(62)).collect::<Vec<_>>(),
|
||||
);
|
||||
|
||||
// 0x0faa0..
|
||||
let mut modified_entry1 = modified_key_prefix.clone();
|
||||
modified_entry1.set_at(2, 0xa);
|
||||
modified_entry1.set_at(3, 0xa);
|
||||
|
||||
// 0x0faaa..
|
||||
let mut modified_entry2 = modified_key_prefix.clone();
|
||||
modified_entry2.set_at(2, 0xa);
|
||||
modified_entry2.set_at(3, 0xa);
|
||||
modified_entry2.set_at(4, 0xa);
|
||||
|
||||
// 0x0fab0..
|
||||
let mut modified_entry3 = modified_key_prefix.clone();
|
||||
modified_entry3.set_at(2, 0xa);
|
||||
modified_entry3.set_at(3, 0xb);
|
||||
|
||||
// 0x0fba0..
|
||||
let mut modified_entry4 = modified_key_prefix;
|
||||
modified_entry4.set_at(2, 0xb);
|
||||
modified_entry4.set_at(3, 0xa);
|
||||
|
||||
[modified_entry1, modified_entry2, modified_entry3.clone(), modified_entry4]
|
||||
.into_iter()
|
||||
.for_each(|key| {
|
||||
modified_storage.insert(B256::from_slice(&key.pack()), U256::from(1));
|
||||
});
|
||||
|
||||
// Update main hashed storage.
|
||||
hashed_storage.extend(modified_storage.clone());
|
||||
post_state.extend(HashedPostState::default().with_storages([(
|
||||
hashed_address,
|
||||
HashedStorage::from_iter(false, modified_storage.clone()),
|
||||
)]));
|
||||
|
||||
let (storage_root, block1_updates) = compute_storage_root(
|
||||
address,
|
||||
factory.provider().unwrap().tx_ref(),
|
||||
&post_state,
|
||||
&TrieUpdates::default(),
|
||||
);
|
||||
assert_eq!(storage_root, storage_root_prehashed(hashed_storage.clone()));
|
||||
|
||||
// Block #2
|
||||
// Set 0x0fab0.. hashed slot to 0
|
||||
modified_storage.insert(B256::from_slice(&modified_entry3.pack()), U256::ZERO);
|
||||
|
||||
// Update main hashed storage.
|
||||
hashed_storage.remove(&B256::from_slice(&modified_entry3.pack()));
|
||||
post_state.extend(HashedPostState::default().with_storages([(
|
||||
hashed_address,
|
||||
HashedStorage::from_iter(false, modified_storage.clone()),
|
||||
)]));
|
||||
|
||||
let (storage_root, block2_updates) = compute_storage_root(
|
||||
address,
|
||||
factory.provider().unwrap().tx_ref(),
|
||||
&post_state,
|
||||
&block1_updates,
|
||||
);
|
||||
assert_eq!(storage_root, storage_root_prehashed(hashed_storage.clone()));
|
||||
|
||||
// Commit trie updates
|
||||
{
|
||||
let mut updates = block1_updates;
|
||||
updates.extend(block2_updates);
|
||||
|
||||
let provider_rw = factory.provider_rw().unwrap();
|
||||
let mut hashed_storage_cursor =
|
||||
provider_rw.tx_ref().cursor_dup_write::<tables::HashedStorages>().unwrap();
|
||||
for (hashed_slot, value) in &hashed_storage {
|
||||
hashed_storage_cursor
|
||||
.upsert(hashed_address, StorageEntry { key: *hashed_slot, value: *value })
|
||||
.unwrap();
|
||||
}
|
||||
updates.flush(provider_rw.tx_ref()).unwrap();
|
||||
provider_rw.commit().unwrap();
|
||||
}
|
||||
|
||||
// Recompute storage root for block #3
|
||||
let storage_root =
|
||||
StorageRoot::from_tx(factory.provider().unwrap().tx_ref(), address).root().unwrap();
|
||||
assert_eq!(storage_root, storage_root_prehashed(hashed_storage.clone()));
|
||||
}
|
||||
|
||||
fn compute_storage_root<TX: DbTx>(
|
||||
address: Address,
|
||||
tx: &TX,
|
||||
post_state: &HashedPostState,
|
||||
update: &TrieUpdates,
|
||||
) -> (B256, TrieUpdates) {
|
||||
let mut prefix_sets = post_state.construct_prefix_sets();
|
||||
let (root, _, updates) = StorageRoot::from_tx(tx, address)
|
||||
.with_hashed_cursor_factory(HashedPostStateCursorFactory::new(
|
||||
tx,
|
||||
&post_state.clone().into_sorted(),
|
||||
))
|
||||
.with_trie_cursor_factory(InMemoryTrieCursorFactory::new(tx, &update.sorted()))
|
||||
.with_prefix_set(prefix_sets.storage_prefix_sets.remove(&keccak256(address)).unwrap())
|
||||
.root_with_updates()
|
||||
.unwrap();
|
||||
(root, updates)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
use super::{TrieCursor, TrieCursorFactory};
|
||||
use crate::updates::{TrieKey, TrieUpdatesSorted};
|
||||
use crate::updates::TrieUpdatesSorted;
|
||||
use reth_db::DatabaseError;
|
||||
use reth_primitives::B256;
|
||||
use reth_trie_common::{BranchNodeCompact, Nibbles};
|
||||
@ -39,6 +39,7 @@ impl<'a, CF: TrieCursorFactory> TrieCursorFactory for InMemoryTrieCursorFactory<
|
||||
/// The cursor to iterate over account trie updates and corresponding database entries.
|
||||
/// It will always give precedence to the data from the trie updates.
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct InMemoryAccountTrieCursor<'a, C> {
|
||||
cursor: C,
|
||||
trie_updates: &'a TrieUpdatesSorted,
|
||||
@ -54,57 +55,27 @@ impl<'a, C> InMemoryAccountTrieCursor<'a, C> {
|
||||
impl<'a, C: TrieCursor> TrieCursor for InMemoryAccountTrieCursor<'a, C> {
|
||||
fn seek_exact(
|
||||
&mut self,
|
||||
key: Nibbles,
|
||||
_key: Nibbles,
|
||||
) -> Result<Option<(Nibbles, BranchNodeCompact)>, DatabaseError> {
|
||||
if let Some((nibbles, trie_op)) = self.trie_updates.find_account_node(&key) {
|
||||
self.last_key = Some(nibbles);
|
||||
Ok(trie_op.into_update().map(|node| (key, node)))
|
||||
} else {
|
||||
let result = self.cursor.seek_exact(key)?;
|
||||
self.last_key = result.as_ref().map(|(key, _)| key.clone());
|
||||
Ok(result)
|
||||
}
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn seek(
|
||||
&mut self,
|
||||
key: Nibbles,
|
||||
_key: Nibbles,
|
||||
) -> Result<Option<(Nibbles, BranchNodeCompact)>, DatabaseError> {
|
||||
let trie_update_entry = self
|
||||
.trie_updates
|
||||
.trie_operations
|
||||
.iter()
|
||||
.find(|(k, _)| matches!(k, TrieKey::AccountNode(nibbles) if nibbles <= &key))
|
||||
.cloned();
|
||||
|
||||
if let Some((trie_key, trie_op)) = trie_update_entry {
|
||||
let nibbles = match trie_key {
|
||||
TrieKey::AccountNode(nibbles) => {
|
||||
self.last_key = Some(nibbles.clone());
|
||||
nibbles
|
||||
}
|
||||
_ => panic!("Invalid trie key"),
|
||||
};
|
||||
return Ok(trie_op.into_update().map(|node| (nibbles, node)))
|
||||
}
|
||||
|
||||
let result = self.cursor.seek(key)?;
|
||||
self.last_key = result.as_ref().map(|(key, _)| key.clone());
|
||||
Ok(result)
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn current(&mut self) -> Result<Option<Nibbles>, DatabaseError> {
|
||||
if self.last_key.is_some() {
|
||||
Ok(self.last_key.clone())
|
||||
} else {
|
||||
self.cursor.current()
|
||||
}
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
/// The cursor to iterate over storage trie updates and corresponding database entries.
|
||||
/// It will always give precedence to the data from the trie updates.
|
||||
#[derive(Debug)]
|
||||
#[allow(dead_code)]
|
||||
pub struct InMemoryStorageTrieCursor<'a, C> {
|
||||
cursor: C,
|
||||
trie_update_index: usize,
|
||||
@ -122,55 +93,19 @@ impl<'a, C> InMemoryStorageTrieCursor<'a, C> {
|
||||
impl<'a, C: TrieCursor> TrieCursor for InMemoryStorageTrieCursor<'a, C> {
|
||||
fn seek_exact(
|
||||
&mut self,
|
||||
key: Nibbles,
|
||||
_key: Nibbles,
|
||||
) -> Result<Option<(Nibbles, BranchNodeCompact)>, DatabaseError> {
|
||||
if let Some((trie_key, trie_op)) =
|
||||
self.trie_updates.find_storage_node(&self.hashed_address, &key)
|
||||
{
|
||||
self.last_key = Some(trie_key);
|
||||
Ok(trie_op.into_update().map(|node| (key, node)))
|
||||
} else {
|
||||
let result = self.cursor.seek_exact(key)?;
|
||||
self.last_key = result.as_ref().map(|(key, _)| key.clone());
|
||||
Ok(result)
|
||||
}
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn seek(
|
||||
&mut self,
|
||||
key: Nibbles,
|
||||
_key: Nibbles,
|
||||
) -> Result<Option<(Nibbles, BranchNodeCompact)>, DatabaseError> {
|
||||
let mut trie_update_entry = self.trie_updates.trie_operations.get(self.trie_update_index);
|
||||
while trie_update_entry
|
||||
.filter(|(k, _)| matches!(k, TrieKey::StorageNode(address, nibbles) if address == &self.hashed_address && nibbles < &key)).is_some()
|
||||
{
|
||||
self.trie_update_index += 1;
|
||||
trie_update_entry = self.trie_updates.trie_operations.get(self.trie_update_index);
|
||||
}
|
||||
|
||||
if let Some((trie_key, trie_op)) =
|
||||
trie_update_entry.filter(|(k, _)| matches!(k, TrieKey::StorageNode(_, _)))
|
||||
{
|
||||
let nibbles = match trie_key {
|
||||
TrieKey::StorageNode(_, nibbles) => {
|
||||
self.last_key = Some(nibbles.clone());
|
||||
nibbles.clone()
|
||||
}
|
||||
_ => panic!("this should not happen!"),
|
||||
};
|
||||
return Ok(trie_op.as_update().map(|node| (nibbles, node.clone())))
|
||||
}
|
||||
|
||||
let result = self.cursor.seek(key)?;
|
||||
self.last_key = result.as_ref().map(|(key, _)| key.clone());
|
||||
Ok(result)
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
fn current(&mut self) -> Result<Option<Nibbles>, DatabaseError> {
|
||||
if self.last_key.is_some() {
|
||||
Ok(self.last_key.clone())
|
||||
} else {
|
||||
self.cursor.current()
|
||||
}
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,276 +2,282 @@ use crate::{
|
||||
walker::TrieWalker, BranchNodeCompact, HashBuilder, Nibbles, StorageTrieEntry,
|
||||
StoredBranchNode, StoredNibbles, StoredNibblesSubKey,
|
||||
};
|
||||
use derive_more::Deref;
|
||||
use reth_db::tables;
|
||||
use reth_db_api::{
|
||||
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
|
||||
transaction::{DbTx, DbTxMut},
|
||||
};
|
||||
use reth_primitives::B256;
|
||||
use std::collections::{hash_map::IntoIter, HashMap, HashSet};
|
||||
|
||||
/// The key of a trie node.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
pub enum TrieKey {
|
||||
/// A node in the account trie.
|
||||
AccountNode(Nibbles),
|
||||
/// A node in the storage trie.
|
||||
StorageNode(B256, Nibbles),
|
||||
/// Storage trie of an account.
|
||||
StorageTrie(B256),
|
||||
}
|
||||
|
||||
impl TrieKey {
|
||||
/// Returns reference to account node key if the key is for [`Self::AccountNode`].
|
||||
pub const fn as_account_node_key(&self) -> Option<&Nibbles> {
|
||||
if let Self::AccountNode(nibbles) = &self {
|
||||
Some(nibbles)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns reference to storage node key if the key is for [`Self::StorageNode`].
|
||||
pub const fn as_storage_node_key(&self) -> Option<(&B256, &Nibbles)> {
|
||||
if let Self::StorageNode(key, subkey) = &self {
|
||||
Some((key, subkey))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns reference to storage trie key if the key is for [`Self::StorageTrie`].
|
||||
pub const fn as_storage_trie_key(&self) -> Option<&B256> {
|
||||
if let Self::StorageTrie(key) = &self {
|
||||
Some(key)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The operation to perform on the trie.
|
||||
#[derive(PartialEq, Eq, Debug, Clone)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
pub enum TrieOp {
|
||||
/// Delete the node entry.
|
||||
Delete,
|
||||
/// Update the node entry with the provided value.
|
||||
Update(BranchNodeCompact),
|
||||
}
|
||||
|
||||
impl TrieOp {
|
||||
/// Returns `true` if the operation is an update.
|
||||
pub const fn is_update(&self) -> bool {
|
||||
matches!(self, Self::Update(..))
|
||||
}
|
||||
|
||||
/// Returns reference to updated branch node if operation is [`Self::Update`].
|
||||
pub const fn as_update(&self) -> Option<&BranchNodeCompact> {
|
||||
if let Self::Update(node) = &self {
|
||||
Some(node)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns owned updated branch node if operation is [`Self::Update`].
|
||||
pub fn into_update(self) -> Option<BranchNodeCompact> {
|
||||
if let Self::Update(node) = self {
|
||||
Some(node)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
/// The aggregation of trie updates.
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq, Deref)]
|
||||
#[derive(PartialEq, Eq, Clone, Default, Debug)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
pub struct TrieUpdates {
|
||||
trie_operations: HashMap<TrieKey, TrieOp>,
|
||||
}
|
||||
|
||||
impl<const N: usize> From<[(TrieKey, TrieOp); N]> for TrieUpdates {
|
||||
fn from(value: [(TrieKey, TrieOp); N]) -> Self {
|
||||
Self { trie_operations: HashMap::from(value) }
|
||||
}
|
||||
}
|
||||
|
||||
impl IntoIterator for TrieUpdates {
|
||||
type Item = (TrieKey, TrieOp);
|
||||
type IntoIter = IntoIter<TrieKey, TrieOp>;
|
||||
|
||||
fn into_iter(self) -> Self::IntoIter {
|
||||
self.trie_operations.into_iter()
|
||||
}
|
||||
pub(crate) account_nodes: HashMap<Nibbles, BranchNodeCompact>,
|
||||
pub(crate) removed_nodes: HashSet<Nibbles>,
|
||||
pub(crate) storage_tries: HashMap<B256, StorageTrieUpdates>,
|
||||
}
|
||||
|
||||
impl TrieUpdates {
|
||||
/// Extend the updates with trie updates.
|
||||
pub fn extend(&mut self, updates: impl IntoIterator<Item = (TrieKey, TrieOp)>) {
|
||||
self.trie_operations.extend(updates);
|
||||
/// Returns `true` if the updates are empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.account_nodes.is_empty() &&
|
||||
self.removed_nodes.is_empty() &&
|
||||
self.storage_tries.is_empty()
|
||||
}
|
||||
|
||||
/// Extend the updates with account trie updates.
|
||||
pub fn extend_with_account_updates(&mut self, updates: HashMap<Nibbles, BranchNodeCompact>) {
|
||||
self.extend(
|
||||
updates
|
||||
.into_iter()
|
||||
.map(|(nibbles, node)| (TrieKey::AccountNode(nibbles), TrieOp::Update(node))),
|
||||
);
|
||||
/// Returns reference to updated account nodes.
|
||||
pub const fn account_nodes_ref(&self) -> &HashMap<Nibbles, BranchNodeCompact> {
|
||||
&self.account_nodes
|
||||
}
|
||||
|
||||
/// Insert storage updates for a given hashed address.
|
||||
pub fn insert_storage_updates(
|
||||
&mut self,
|
||||
hashed_address: B256,
|
||||
storage_updates: StorageTrieUpdates,
|
||||
) {
|
||||
let existing = self.storage_tries.insert(hashed_address, storage_updates);
|
||||
debug_assert!(existing.is_none());
|
||||
}
|
||||
|
||||
/// Finalize state trie updates.
|
||||
pub fn finalize_state_updates<C>(
|
||||
pub fn finalize<C>(
|
||||
&mut self,
|
||||
walker: TrieWalker<C>,
|
||||
hash_builder: HashBuilder,
|
||||
destroyed_accounts: HashSet<B256>,
|
||||
) {
|
||||
// Add updates from trie walker.
|
||||
let (_, deleted_keys) = walker.split();
|
||||
self.extend(
|
||||
deleted_keys.into_iter().map(|nibbles| (TrieKey::AccountNode(nibbles), TrieOp::Delete)),
|
||||
);
|
||||
// Retrieve deleted keys from trie walker.
|
||||
let (_, removed_node_keys) = walker.split();
|
||||
self.removed_nodes.extend(removed_node_keys);
|
||||
|
||||
// Add account node updates from hash builder.
|
||||
let (_, hash_builder_updates) = hash_builder.split();
|
||||
self.extend_with_account_updates(hash_builder_updates);
|
||||
// Retrieve updated nodes from hash builder.
|
||||
let (_, updated_nodes) = hash_builder.split();
|
||||
self.account_nodes.extend(updated_nodes);
|
||||
|
||||
// Add deleted storage tries for destroyed accounts.
|
||||
self.extend(
|
||||
destroyed_accounts.into_iter().map(|key| (TrieKey::StorageTrie(key), TrieOp::Delete)),
|
||||
);
|
||||
for destroyed in destroyed_accounts {
|
||||
self.storage_tries.entry(destroyed).or_default().set_deleted(true);
|
||||
}
|
||||
}
|
||||
|
||||
/// Finalize storage trie updates for a given address.
|
||||
pub fn finalize_storage_updates<C>(
|
||||
&mut self,
|
||||
hashed_address: B256,
|
||||
walker: TrieWalker<C>,
|
||||
hash_builder: HashBuilder,
|
||||
) {
|
||||
// Add updates from trie walker.
|
||||
let (_, deleted_keys) = walker.split();
|
||||
self.extend(
|
||||
deleted_keys
|
||||
/// Converts trie updates into [`TrieUpdatesSorted`].
|
||||
pub fn into_sorted(self) -> TrieUpdatesSorted {
|
||||
let mut account_nodes = Vec::from_iter(self.account_nodes);
|
||||
account_nodes.sort_unstable_by(|a, b| a.0.cmp(&b.0));
|
||||
let mut storage_tries = Vec::from_iter(
|
||||
self.storage_tries
|
||||
.into_iter()
|
||||
.map(|nibbles| (TrieKey::StorageNode(hashed_address, nibbles), TrieOp::Delete)),
|
||||
.map(|(hashed_address, updates)| (hashed_address, updates.into_sorted())),
|
||||
);
|
||||
|
||||
// Add storage node updates from hash builder.
|
||||
let (_, hash_builder_updates) = hash_builder.split();
|
||||
self.extend(hash_builder_updates.into_iter().map(|(nibbles, node)| {
|
||||
(TrieKey::StorageNode(hashed_address, nibbles), TrieOp::Update(node))
|
||||
}));
|
||||
storage_tries.sort_unstable_by(|a, b| a.0.cmp(&b.0));
|
||||
TrieUpdatesSorted { removed_nodes: self.removed_nodes, account_nodes, storage_tries }
|
||||
}
|
||||
|
||||
/// Flush updates all aggregated updates to the database.
|
||||
pub fn flush(self, tx: &(impl DbTx + DbTxMut)) -> Result<(), reth_db::DatabaseError> {
|
||||
if self.trie_operations.is_empty() {
|
||||
return Ok(())
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// The number of storage trie entries updated in the database.
|
||||
pub fn write_to_database<TX>(self, tx: &TX) -> Result<usize, reth_db::DatabaseError>
|
||||
where
|
||||
TX: DbTx + DbTxMut,
|
||||
{
|
||||
if self.is_empty() {
|
||||
return Ok(0)
|
||||
}
|
||||
|
||||
// Track the number of inserted entries.
|
||||
let mut num_entries = 0;
|
||||
|
||||
// Merge updated and removed nodes. Updated nodes must take precedence.
|
||||
let mut account_updates = self
|
||||
.removed_nodes
|
||||
.into_iter()
|
||||
.filter_map(|n| (!self.account_nodes.contains_key(&n)).then_some((n, None)))
|
||||
.collect::<Vec<_>>();
|
||||
account_updates
|
||||
.extend(self.account_nodes.into_iter().map(|(nibbles, node)| (nibbles, Some(node))));
|
||||
// Sort trie node updates.
|
||||
account_updates.sort_unstable_by(|a, b| a.0.cmp(&b.0));
|
||||
|
||||
let mut account_trie_cursor = tx.cursor_write::<tables::AccountsTrie>()?;
|
||||
let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
|
||||
|
||||
let mut trie_operations = Vec::from_iter(self.trie_operations);
|
||||
trie_operations.sort_unstable_by(|a, b| a.0.cmp(&b.0));
|
||||
for (key, operation) in trie_operations {
|
||||
match key {
|
||||
TrieKey::AccountNode(nibbles) => {
|
||||
let nibbles = StoredNibbles(nibbles);
|
||||
match operation {
|
||||
TrieOp::Delete => {
|
||||
if account_trie_cursor.seek_exact(nibbles)?.is_some() {
|
||||
account_trie_cursor.delete_current()?;
|
||||
}
|
||||
}
|
||||
TrieOp::Update(node) => {
|
||||
if !nibbles.0.is_empty() {
|
||||
account_trie_cursor.upsert(nibbles, StoredBranchNode(node))?;
|
||||
}
|
||||
}
|
||||
for (key, updated_node) in account_updates {
|
||||
let nibbles = StoredNibbles(key);
|
||||
match updated_node {
|
||||
Some(node) => {
|
||||
if !nibbles.0.is_empty() {
|
||||
num_entries += 1;
|
||||
account_trie_cursor.upsert(nibbles, StoredBranchNode(node))?;
|
||||
}
|
||||
}
|
||||
TrieKey::StorageTrie(hashed_address) => match operation {
|
||||
TrieOp::Delete => {
|
||||
if storage_trie_cursor.seek_exact(hashed_address)?.is_some() {
|
||||
storage_trie_cursor.delete_current_duplicates()?;
|
||||
}
|
||||
}
|
||||
TrieOp::Update(..) => unreachable!("Cannot update full storage trie."),
|
||||
},
|
||||
TrieKey::StorageNode(hashed_address, nibbles) => {
|
||||
if !nibbles.is_empty() {
|
||||
let nibbles = StoredNibblesSubKey(nibbles);
|
||||
// Delete the old entry if it exists.
|
||||
if storage_trie_cursor
|
||||
.seek_by_key_subkey(hashed_address, nibbles.clone())?
|
||||
.filter(|e| e.nibbles == nibbles)
|
||||
.is_some()
|
||||
{
|
||||
storage_trie_cursor.delete_current()?;
|
||||
}
|
||||
|
||||
// The operation is an update, insert new entry.
|
||||
if let TrieOp::Update(node) = operation {
|
||||
storage_trie_cursor
|
||||
.upsert(hashed_address, StorageTrieEntry { nibbles, node })?;
|
||||
}
|
||||
None => {
|
||||
num_entries += 1;
|
||||
if account_trie_cursor.seek_exact(nibbles)?.is_some() {
|
||||
account_trie_cursor.delete_current()?;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
let mut storage_tries = Vec::from_iter(self.storage_tries);
|
||||
storage_tries.sort_unstable_by(|a, b| a.0.cmp(&b.0));
|
||||
let mut storage_trie_cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
|
||||
for (hashed_address, storage_trie_updates) in storage_tries {
|
||||
let updated_storage_entries =
|
||||
storage_trie_updates.write_with_cursor(&mut storage_trie_cursor, hashed_address)?;
|
||||
num_entries += updated_storage_entries;
|
||||
}
|
||||
|
||||
/// creates [`TrieUpdatesSorted`] by sorting the `trie_operations`.
|
||||
pub fn sorted(&self) -> TrieUpdatesSorted {
|
||||
let mut trie_operations = Vec::from_iter(self.trie_operations.clone());
|
||||
trie_operations.sort_unstable_by(|a, b| a.0.cmp(&b.0));
|
||||
TrieUpdatesSorted { trie_operations }
|
||||
}
|
||||
|
||||
/// converts trie updates into [`TrieUpdatesSorted`].
|
||||
pub fn into_sorted(self) -> TrieUpdatesSorted {
|
||||
let mut trie_operations = Vec::from_iter(self.trie_operations);
|
||||
trie_operations.sort_unstable_by(|a, b| a.0.cmp(&b.0));
|
||||
TrieUpdatesSorted { trie_operations }
|
||||
Ok(num_entries)
|
||||
}
|
||||
}
|
||||
|
||||
/// The aggregation of trie updates.
|
||||
#[derive(Debug, Default, Clone, PartialEq, Eq, Deref)]
|
||||
/// Trie updates for storage trie of a single account.
|
||||
#[derive(PartialEq, Eq, Clone, Default, Debug)]
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
pub struct StorageTrieUpdates {
|
||||
/// Flag indicating whether the trie was deleted.
|
||||
pub(crate) is_deleted: bool,
|
||||
/// Collection of updated storage trie nodes.
|
||||
pub(crate) storage_nodes: HashMap<Nibbles, BranchNodeCompact>,
|
||||
/// Collection of removed storage trie nodes.
|
||||
pub(crate) removed_nodes: HashSet<Nibbles>,
|
||||
}
|
||||
|
||||
impl StorageTrieUpdates {
|
||||
/// Returns empty storage trie updates with `deleted` set to `true`.
|
||||
pub fn deleted() -> Self {
|
||||
Self {
|
||||
is_deleted: true,
|
||||
storage_nodes: HashMap::default(),
|
||||
removed_nodes: HashSet::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the length of updated nodes.
|
||||
pub fn len(&self) -> usize {
|
||||
(self.is_deleted as usize) + self.storage_nodes.len() + self.removed_nodes.len()
|
||||
}
|
||||
|
||||
/// Returns `true` if storage updates are empty.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
!self.is_deleted && self.storage_nodes.is_empty() && self.removed_nodes.is_empty()
|
||||
}
|
||||
|
||||
/// Sets `deleted` flag on the storage trie.
|
||||
pub fn set_deleted(&mut self, deleted: bool) {
|
||||
self.is_deleted = deleted;
|
||||
}
|
||||
|
||||
/// Finalize storage trie updates for by taking updates from walker and hash builder.
|
||||
pub fn finalize<C>(&mut self, walker: TrieWalker<C>, hash_builder: HashBuilder) {
|
||||
// Retrieve deleted keys from trie walker.
|
||||
let (_, removed_keys) = walker.split();
|
||||
self.removed_nodes.extend(removed_keys);
|
||||
|
||||
// Retrieve updated nodes from hash builder.
|
||||
let (_, updated_nodes) = hash_builder.split();
|
||||
self.storage_nodes.extend(updated_nodes);
|
||||
}
|
||||
|
||||
/// Convert storage trie updates into [`StorageTrieUpdatesSorted`].
|
||||
pub fn into_sorted(self) -> StorageTrieUpdatesSorted {
|
||||
let mut storage_nodes = Vec::from_iter(self.storage_nodes);
|
||||
storage_nodes.sort_unstable_by(|a, b| a.0.cmp(&b.0));
|
||||
StorageTrieUpdatesSorted {
|
||||
is_deleted: self.is_deleted,
|
||||
removed_nodes: self.removed_nodes,
|
||||
storage_nodes,
|
||||
}
|
||||
}
|
||||
|
||||
/// Initializes a storage trie cursor and writes updates to database.
|
||||
pub fn write_to_database<TX>(
|
||||
self,
|
||||
tx: &TX,
|
||||
hashed_address: B256,
|
||||
) -> Result<usize, reth_db::DatabaseError>
|
||||
where
|
||||
TX: DbTx + DbTxMut,
|
||||
{
|
||||
if self.is_empty() {
|
||||
return Ok(0)
|
||||
}
|
||||
|
||||
let mut cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
|
||||
self.write_with_cursor(&mut cursor, hashed_address)
|
||||
}
|
||||
|
||||
/// Writes updates to database.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// The number of storage trie entries updated in the database.
|
||||
fn write_with_cursor<C>(
|
||||
self,
|
||||
cursor: &mut C,
|
||||
hashed_address: B256,
|
||||
) -> Result<usize, reth_db::DatabaseError>
|
||||
where
|
||||
C: DbCursorRO<tables::StoragesTrie>
|
||||
+ DbCursorRW<tables::StoragesTrie>
|
||||
+ DbDupCursorRO<tables::StoragesTrie>
|
||||
+ DbDupCursorRW<tables::StoragesTrie>,
|
||||
{
|
||||
// The storage trie for this account has to be deleted.
|
||||
if self.is_deleted && cursor.seek_exact(hashed_address)?.is_some() {
|
||||
cursor.delete_current_duplicates()?;
|
||||
}
|
||||
|
||||
// Merge updated and removed nodes. Updated nodes must take precedence.
|
||||
let mut storage_updates = self
|
||||
.removed_nodes
|
||||
.into_iter()
|
||||
.filter_map(|n| (!self.storage_nodes.contains_key(&n)).then_some((n, None)))
|
||||
.collect::<Vec<_>>();
|
||||
storage_updates
|
||||
.extend(self.storage_nodes.into_iter().map(|(nibbles, node)| (nibbles, Some(node))));
|
||||
// Sort trie node updates.
|
||||
storage_updates.sort_unstable_by(|a, b| a.0.cmp(&b.0));
|
||||
|
||||
let mut num_entries = 0;
|
||||
for (nibbles, maybe_updated) in storage_updates.into_iter().filter(|(n, _)| !n.is_empty()) {
|
||||
num_entries += 1;
|
||||
let nibbles = StoredNibblesSubKey(nibbles);
|
||||
// Delete the old entry if it exists.
|
||||
if cursor
|
||||
.seek_by_key_subkey(hashed_address, nibbles.clone())?
|
||||
.filter(|e| e.nibbles == nibbles)
|
||||
.is_some()
|
||||
{
|
||||
cursor.delete_current()?;
|
||||
}
|
||||
|
||||
// There is an updated version of this node, insert new entry.
|
||||
if let Some(node) = maybe_updated {
|
||||
cursor.upsert(hashed_address, StorageTrieEntry { nibbles, node })?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(num_entries)
|
||||
}
|
||||
}
|
||||
|
||||
/// Sorted trie updates used for lookups and insertions.
|
||||
#[derive(PartialEq, Eq, Clone, Default, Debug)]
|
||||
pub struct TrieUpdatesSorted {
|
||||
/// Sorted collection of trie operations.
|
||||
pub(crate) trie_operations: Vec<(TrieKey, TrieOp)>,
|
||||
pub(crate) account_nodes: Vec<(Nibbles, BranchNodeCompact)>,
|
||||
pub(crate) removed_nodes: HashSet<Nibbles>,
|
||||
pub(crate) storage_tries: Vec<(B256, StorageTrieUpdatesSorted)>,
|
||||
}
|
||||
|
||||
impl TrieUpdatesSorted {
|
||||
/// Find the account node with the given nibbles.
|
||||
pub fn find_account_node(&self, key: &Nibbles) -> Option<(Nibbles, TrieOp)> {
|
||||
self.trie_operations.iter().find_map(|(k, op)| {
|
||||
k.as_account_node_key()
|
||||
.filter(|nibbles| nibbles == &key)
|
||||
.map(|nibbles| (nibbles.clone(), op.clone()))
|
||||
})
|
||||
}
|
||||
|
||||
/// Find the storage node with the given hashed address and key.
|
||||
pub fn find_storage_node(
|
||||
&self,
|
||||
hashed_address: &B256,
|
||||
key: &Nibbles,
|
||||
) -> Option<(Nibbles, TrieOp)> {
|
||||
self.trie_operations.iter().find_map(|(k, op)| {
|
||||
k.as_storage_node_key()
|
||||
.filter(|(address, nibbles)| address == &hashed_address && nibbles == &key)
|
||||
.map(|(_, nibbles)| (nibbles.clone(), op.clone()))
|
||||
})
|
||||
}
|
||||
/// Sorted trie updates used for lookups and insertions.
|
||||
#[derive(PartialEq, Eq, Clone, Default, Debug)]
|
||||
pub struct StorageTrieUpdatesSorted {
|
||||
pub(crate) is_deleted: bool,
|
||||
pub(crate) storage_nodes: Vec<(Nibbles, BranchNodeCompact)>,
|
||||
pub(crate) removed_nodes: HashSet<Nibbles>,
|
||||
}
|
||||
|
||||
@ -22,15 +22,15 @@ pub struct TrieWalker<C> {
|
||||
pub can_skip_current_node: bool,
|
||||
/// A `PrefixSet` representing the changes to be applied to the trie.
|
||||
pub changes: PrefixSet,
|
||||
/// The retained trie node keys that need to be deleted.
|
||||
deleted_keys: Option<HashSet<Nibbles>>,
|
||||
/// The retained trie node keys that need to be removed.
|
||||
removed_keys: Option<HashSet<Nibbles>>,
|
||||
}
|
||||
|
||||
impl<C> TrieWalker<C> {
|
||||
/// Constructs a new `TrieWalker` from existing stack and a cursor.
|
||||
pub fn from_stack(cursor: C, stack: Vec<CursorSubNode>, changes: PrefixSet) -> Self {
|
||||
let mut this =
|
||||
Self { cursor, changes, stack, can_skip_current_node: false, deleted_keys: None };
|
||||
Self { cursor, changes, stack, can_skip_current_node: false, removed_keys: None };
|
||||
this.update_skip_node();
|
||||
this
|
||||
}
|
||||
@ -38,14 +38,14 @@ impl<C> TrieWalker<C> {
|
||||
/// Sets the flag whether the trie updates should be stored.
|
||||
pub fn with_deletions_retained(mut self, retained: bool) -> Self {
|
||||
if retained {
|
||||
self.deleted_keys = Some(HashSet::default());
|
||||
self.removed_keys = Some(HashSet::default());
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
/// Split the walker into stack and trie updates.
|
||||
pub fn split(mut self) -> (Vec<CursorSubNode>, HashSet<Nibbles>) {
|
||||
let keys = self.deleted_keys.take();
|
||||
let keys = self.removed_keys.take();
|
||||
(self.stack, keys.unwrap_or_default())
|
||||
}
|
||||
|
||||
@ -58,9 +58,9 @@ impl<C> TrieWalker<C> {
|
||||
println!("====================== END STACK ======================\n");
|
||||
}
|
||||
|
||||
/// The current length of the deleted keys.
|
||||
pub fn deleted_keys_len(&self) -> usize {
|
||||
self.deleted_keys.as_ref().map_or(0, |u| u.len())
|
||||
/// The current length of the removed keys.
|
||||
pub fn removed_keys_len(&self) -> usize {
|
||||
self.removed_keys.as_ref().map_or(0, |u| u.len())
|
||||
}
|
||||
|
||||
/// Returns the current key in the trie.
|
||||
@ -112,7 +112,7 @@ impl<C: TrieCursor> TrieWalker<C> {
|
||||
changes,
|
||||
stack: vec![CursorSubNode::default()],
|
||||
can_skip_current_node: false,
|
||||
deleted_keys: None,
|
||||
removed_keys: None,
|
||||
};
|
||||
|
||||
// Set up the root node of the trie in the stack, if it exists.
|
||||
@ -188,7 +188,7 @@ impl<C: TrieCursor> TrieWalker<C> {
|
||||
// Delete the current node if it's included in the prefix set or it doesn't contain the root
|
||||
// hash.
|
||||
if !self.can_skip_current_node || nibble != -1 {
|
||||
if let Some((keys, key)) = self.deleted_keys.as_mut().zip(self.cursor.current()?) {
|
||||
if let Some((keys, key)) = self.removed_keys.as_mut().zip(self.cursor.current()?) {
|
||||
keys.insert(key);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user