refactor(provider): trie provider (#1723)

This commit is contained in:
Roman Krasiuk
2023-03-13 15:23:47 +02:00
committed by GitHub
parent 139c0c4562
commit c110c91629
3 changed files with 116 additions and 96 deletions

View File

@ -108,15 +108,15 @@ impl<DB: Database> Stage<DB> for MerkleStage {
} else if to_transition - from_transition > threshold || stage_progress == 0 {
debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Rebuilding trie");
// if there are more blocks than threshold it is faster to rebuild the trie
let loader = DBTrieLoader::default();
loader.calculate_root(tx).map_err(|e| StageError::Fatal(Box::new(e)))?
DBTrieLoader::<DB>::new(tx)
.calculate_root()
.map_err(|e| StageError::Fatal(Box::new(e)))?
} else {
debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Updating trie");
// Iterate over changeset (similar to Hashing stages) and take new values
let current_root = tx.get_header(stage_progress)?.state_root;
let loader = DBTrieLoader::default();
loader
.update_root(tx, current_root, from_transition..to_transition)
DBTrieLoader::<DB>::new(tx)
.update_root(current_root, from_transition..to_transition)
.map_err(|e| StageError::Fatal(Box::new(e)))?
};
@ -155,14 +155,13 @@ impl<DB: Database> Stage<DB> for MerkleStage {
return Ok(UnwindOutput { stage_progress: input.unwind_to })
}
let loader = DBTrieLoader::default();
let current_root = tx.get_header(input.stage_progress)?.state_root;
let from_transition = tx.get_block_transition(input.unwind_to)?;
let to_transition = tx.get_block_transition(input.stage_progress)?;
let block_root = loader
.update_root(tx, current_root, from_transition..to_transition)
let block_root = DBTrieLoader::<DB>::new(tx)
.update_root(current_root, from_transition..to_transition)
.map_err(|e| StageError::Fatal(Box::new(e)))?;
if block_root != target_root {
@ -192,6 +191,7 @@ mod tests {
use assert_matches::assert_matches;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
mdbx::{Env, WriteMap},
tables,
transaction::{DbTx, DbTxMut},
};
@ -260,6 +260,12 @@ mod tests {
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
}
fn create_trie_loader<'tx, 'db>(
tx: &'tx Transaction<'db, Env<WriteMap>>,
) -> DBTrieLoader<'tx, 'db, Env<WriteMap>> {
DBTrieLoader::<Env<WriteMap>>::new(tx)
}
struct MerkleTestRunner {
tx: TestTransaction,
clean_threshold: u64,
@ -440,7 +446,7 @@ mod tests {
impl MerkleTestRunner {
fn state_root(&self) -> Result<H256, TestRunnerError> {
Ok(DBTrieLoader::default().calculate_root(&self.tx.inner()).unwrap())
Ok(create_trie_loader(&self.tx.inner()).calculate_root().unwrap())
}
pub(crate) fn generate_initial_trie(
@ -451,10 +457,9 @@ mod tests {
accounts.into_iter().map(|(addr, acc)| (addr, (acc, std::iter::empty()))),
)?;
let loader = DBTrieLoader::default();
let mut tx = self.tx.inner();
let root = loader.calculate_root(&tx).expect("couldn't create initial trie");
let root =
create_trie_loader(&tx).calculate_root().expect("couldn't create initial trie");
tx.commit()?;
@ -465,7 +470,7 @@ mod tests {
if previous_stage_progress != 0 {
let block_root =
self.tx.inner().get_header(previous_stage_progress).unwrap().state_root;
let root = DBTrieLoader::default().calculate_root(&self.tx.inner()).unwrap();
let root = create_trie_loader(&self.tx.inner()).calculate_root().unwrap();
assert_eq!(block_root, root);
}
Ok(())

View File

@ -308,8 +308,8 @@ where
// merkle tree
{
let current_root = self.get_header(parent_block_number)?.state_root;
let loader = DBTrieLoader::default();
let root = loader.update_root(self, current_root, from..to)?;
let loader = DBTrieLoader::<DB>::new(self);
let root = loader.update_root(current_root, from..to)?;
if root != block.state_root {
return Err(TransactionError::StateTrieRootMismatch {
got: root,

View File

@ -1,9 +1,8 @@
use crate::Transaction;
use cita_trie::{PatriciaTrie, Trie};
use hasher::HasherKeccak;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
database::Database,
database::{Database, DatabaseGAT},
models::{AccountBeforeTx, TransitionIdAddress},
tables,
transaction::{DbTx, DbTxMut},
@ -38,11 +37,11 @@ pub enum TrieError {
}
/// Database wrapper implementing HashDB trait.
struct HashDatabase<'tx, 'itx, DB: Database> {
tx: &'tx Transaction<'itx, DB>,
pub struct HashDatabase<'tx, 'db, DB: Database> {
tx: &'tx <DB as DatabaseGAT<'db>>::TXMut,
}
impl<'tx, 'itx, DB> cita_trie::DB for HashDatabase<'tx, 'itx, DB>
impl<'tx, 'db, DB> cita_trie::DB for HashDatabase<'tx, 'db, DB>
where
DB: Database,
{
@ -88,9 +87,9 @@ where
}
}
impl<'tx, 'itx, DB: Database> HashDatabase<'tx, 'itx, DB> {
impl<'tx, 'db, DB: Database> HashDatabase<'tx, 'db, DB> {
/// Instantiates a new Database for the accounts trie, with an empty root
fn new(tx: &'tx Transaction<'itx, DB>) -> Result<Self, TrieError> {
pub fn new(tx: &'tx <DB as DatabaseGAT<'db>>::TXMut) -> Result<Self, TrieError> {
let root = EMPTY_ROOT;
if tx.get::<tables::AccountsTrie>(root)?.is_none() {
tx.put::<tables::AccountsTrie>(root, [EMPTY_STRING_CODE].to_vec())?;
@ -99,7 +98,10 @@ impl<'tx, 'itx, DB: Database> HashDatabase<'tx, 'itx, DB> {
}
/// Instantiates a new Database for the accounts trie, with an existing root
fn from_root(tx: &'tx Transaction<'itx, DB>, root: H256) -> Result<Self, TrieError> {
pub fn from_root(
tx: &'tx <DB as DatabaseGAT<'db>>::TXMut,
root: H256,
) -> Result<Self, TrieError> {
if root == EMPTY_ROOT {
return Self::new(tx)
}
@ -109,12 +111,12 @@ impl<'tx, 'itx, DB: Database> HashDatabase<'tx, 'itx, DB> {
}
/// Database wrapper implementing HashDB trait.
struct DupHashDatabase<'tx, 'itx, DB: Database> {
tx: &'tx Transaction<'itx, DB>,
pub struct DupHashDatabase<'tx, 'db, DB: Database> {
tx: &'tx <DB as DatabaseGAT<'db>>::TXMut,
key: H256,
}
impl<'tx, 'itx, DB> cita_trie::DB for DupHashDatabase<'tx, 'itx, DB>
impl<'tx, 'db, DB> cita_trie::DB for DupHashDatabase<'tx, 'db, DB>
where
DB: Database,
{
@ -170,9 +172,9 @@ where
}
}
impl<'tx, 'itx, DB: Database> DupHashDatabase<'tx, 'itx, DB> {
impl<'tx, 'db, DB: Database> DupHashDatabase<'tx, 'db, DB> {
/// Instantiates a new Database for the storage trie, with an empty root
fn new(tx: &'tx Transaction<'itx, DB>, key: H256) -> Result<Self, TrieError> {
pub fn new(tx: &'tx <DB as DatabaseGAT<'db>>::TXMut, key: H256) -> Result<Self, TrieError> {
let root = EMPTY_ROOT;
let mut cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
if cursor.seek_by_key_subkey(key, root)?.filter(|entry| entry.hash == root).is_none() {
@ -185,7 +187,11 @@ impl<'tx, 'itx, DB: Database> DupHashDatabase<'tx, 'itx, DB> {
}
/// Instantiates a new Database for the storage trie, with an existing root
fn from_root(tx: &'tx Transaction<'itx, DB>, key: H256, root: H256) -> Result<Self, TrieError> {
pub fn from_root(
tx: &'tx <DB as DatabaseGAT<'db>>::TXMut,
key: H256,
root: H256,
) -> Result<Self, TrieError> {
if root == EMPTY_ROOT {
return Self::new(tx, key)
}
@ -199,7 +205,7 @@ impl<'tx, 'itx, DB: Database> DupHashDatabase<'tx, 'itx, DB> {
/// An Ethereum account, for RLP encoding traits deriving.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Default, RlpEncodable, RlpDecodable)]
pub(crate) struct EthAccount {
pub struct EthAccount {
/// Account nonce.
nonce: u64,
/// Account balance.
@ -222,39 +228,47 @@ impl From<Account> for EthAccount {
}
impl EthAccount {
pub(crate) fn from_with_root(acc: Account, storage_root: H256) -> EthAccount {
Self { storage_root, ..Self::from(acc) }
/// Set storage root on account.
pub fn with_storage_root(mut self, storage_root: H256) -> Self {
self.storage_root = storage_root;
self
}
/// Get account's storage root.
pub fn storage_root(&self) -> H256 {
self.storage_root
}
}
/// Struct for calculating the root of a merkle patricia tree,
/// while populating the database with intermediate hashes.
#[derive(Debug, Default)]
pub struct DBTrieLoader;
pub struct DBTrieLoader<'tx, 'db, DB: Database> {
tx: &'tx <DB as DatabaseGAT<'db>>::TXMut,
}
impl<'tx, 'db, DB: Database> DBTrieLoader<'tx, 'db, DB> {
/// Create new instance of trie loader.
pub fn new(tx: &'tx <DB as DatabaseGAT<'db>>::TXMut) -> Self {
Self { tx }
}
impl DBTrieLoader {
/// Calculates the root of the state trie, saving intermediate hashes in the database.
pub fn calculate_root<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
) -> Result<H256, TrieError> {
tx.clear::<tables::AccountsTrie>()?;
tx.clear::<tables::StoragesTrie>()?;
pub fn calculate_root(&self) -> Result<H256, TrieError> {
self.tx.clear::<tables::AccountsTrie>()?;
self.tx.clear::<tables::StoragesTrie>()?;
let mut accounts_cursor = tx.cursor_read::<tables::HashedAccount>()?;
let mut accounts_cursor = self.tx.cursor_read::<tables::HashedAccount>()?;
let mut walker = accounts_cursor.walk(None)?;
let db = Arc::new(HashDatabase::new(tx)?);
let db = Arc::new(HashDatabase::<DB>::new(self.tx)?);
let hasher = Arc::new(HasherKeccak::new());
let mut trie = PatriciaTrie::new(Arc::clone(&db), Arc::clone(&hasher));
while let Some((hashed_address, account)) = walker.next().transpose()? {
let value = EthAccount::from_with_root(
account,
self.calculate_storage_root(tx, hashed_address)?,
);
let value = EthAccount::from(account)
.with_storage_root(self.calculate_storage_root(hashed_address)?);
let mut out = Vec::new();
Encodable::encode(&value, &mut out);
@ -265,18 +279,15 @@ impl DBTrieLoader {
Ok(root)
}
fn calculate_storage_root<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
address: H256,
) -> Result<H256, TrieError> {
let db = Arc::new(DupHashDatabase::new(tx, address)?);
/// Calculate the accounts storage root.
pub fn calculate_storage_root(&self, address: H256) -> Result<H256, TrieError> {
let db = Arc::new(DupHashDatabase::<DB>::new(self.tx, address)?);
let hasher = Arc::new(HasherKeccak::new());
let mut trie = PatriciaTrie::new(Arc::clone(&db), Arc::clone(&hasher));
let mut storage_cursor = tx.cursor_dup_read::<tables::HashedStorage>()?;
let mut storage_cursor = self.tx.cursor_dup_read::<tables::HashedStorage>()?;
// Should be able to use walk_dup, but any call to next() causes an assert fail in mdbx.c
// let mut walker = storage_cursor.walk_dup(address, H256::zero())?;
@ -292,24 +303,23 @@ impl DBTrieLoader {
// if root is empty remove it from db
if root == EMPTY_ROOT {
tx.delete::<tables::StoragesTrie>(address, None)?;
self.tx.delete::<tables::StoragesTrie>(address, None)?;
}
Ok(root)
}
/// Calculates the root of the state trie by updating an existing trie.
pub fn update_root<DB: Database>(
pub fn update_root(
&self,
tx: &Transaction<'_, DB>,
root: H256,
tid_range: Range<TransitionId>,
) -> Result<H256, TrieError> {
let mut accounts_cursor = tx.cursor_read::<tables::HashedAccount>()?;
let mut accounts_cursor = self.tx.cursor_read::<tables::HashedAccount>()?;
let changed_accounts = self.gather_changes(tx, tid_range)?;
let changed_accounts = self.gather_changes(tid_range)?;
let db = Arc::new(HashDatabase::from_root(tx, root)?);
let db = Arc::new(HashDatabase::<DB>::from_root(self.tx, root)?);
let hasher = Arc::new(HasherKeccak::new());
@ -320,13 +330,13 @@ impl DBTrieLoader {
trie.remove(address.as_bytes())?;
let storage_root = EthAccount::decode(&mut account.as_slice())?.storage_root;
self.update_storage_root(tx, storage_root, address, changed_storages)?
self.update_storage_root(storage_root, address, changed_storages)?
} else {
self.calculate_storage_root(tx, address)?
self.calculate_storage_root(address)?
};
if let Some((_, account)) = accounts_cursor.seek_exact(address)? {
let value = EthAccount::from_with_root(account, storage_root);
let value = EthAccount::from(account).with_storage_root(storage_root);
let mut out = Vec::new();
Encodable::encode(&value, &mut out);
@ -336,7 +346,7 @@ impl DBTrieLoader {
let new_root = H256::from_slice(trie.root()?.as_slice());
if new_root != root {
let mut cursor = tx.cursor_write::<tables::AccountsTrie>()?;
let mut cursor = self.tx.cursor_write::<tables::AccountsTrie>()?;
if cursor.seek_exact(root)?.is_some() {
cursor.delete_current()?;
}
@ -345,19 +355,19 @@ impl DBTrieLoader {
Ok(new_root)
}
fn update_storage_root<DB: Database>(
/// Update the account's storage root
pub fn update_storage_root(
&self,
tx: &Transaction<'_, DB>,
root: H256,
address: H256,
changed_storages: BTreeSet<H256>,
) -> Result<H256, TrieError> {
let db = Arc::new(DupHashDatabase::from_root(tx, address, root)?);
let db = Arc::new(DupHashDatabase::<DB>::from_root(self.tx, address, root)?);
let hasher = Arc::new(HasherKeccak::new());
let mut trie = PatriciaTrie::from(Arc::clone(&db), Arc::clone(&hasher), root.as_bytes())?;
let mut storage_cursor = tx.cursor_dup_read::<tables::HashedStorage>()?;
let mut storage_cursor = self.tx.cursor_dup_read::<tables::HashedStorage>()?;
for key in changed_storages {
if let Some(StorageEntry { value, .. }) =
@ -372,7 +382,7 @@ impl DBTrieLoader {
let new_root = H256::from_slice(trie.root()?.as_slice());
if new_root != root {
let mut cursor = tx.cursor_dup_write::<tables::StoragesTrie>()?;
let mut cursor = self.tx.cursor_dup_write::<tables::StoragesTrie>()?;
if cursor
.seek_by_key_subkey(address, root)?
.filter(|entry| entry.hash == root)
@ -384,18 +394,17 @@ impl DBTrieLoader {
// if root is empty remove it from db
if new_root == EMPTY_ROOT {
tx.delete::<tables::StoragesTrie>(address, None)?;
self.tx.delete::<tables::StoragesTrie>(address, None)?;
}
Ok(new_root)
}
fn gather_changes<DB: Database>(
fn gather_changes(
&self,
tx: &Transaction<'_, DB>,
tid_range: Range<TransitionId>,
) -> Result<BTreeMap<H256, BTreeSet<H256>>, TrieError> {
let mut account_cursor = tx.cursor_read::<tables::AccountChangeSet>()?;
let mut account_cursor = self.tx.cursor_read::<tables::AccountChangeSet>()?;
let mut account_changes: BTreeMap<Address, BTreeSet<H256>> = BTreeMap::new();
@ -405,7 +414,7 @@ impl DBTrieLoader {
account_changes.insert(address, Default::default());
}
let mut storage_cursor = tx.cursor_dup_read::<tables::StorageChangeSet>()?;
let mut storage_cursor = self.tx.cursor_dup_read::<tables::StorageChangeSet>()?;
let start = TransitionIdAddress((tid_range.start, Address::zero()));
let end = TransitionIdAddress((tid_range.end, Address::zero()));
@ -430,30 +439,43 @@ impl DBTrieLoader {
#[cfg(test)]
mod tests {
use crate::Transaction;
use super::*;
use assert_matches::assert_matches;
use proptest::{prelude::ProptestConfig, proptest};
use reth_db::{mdbx::test_utils::create_test_rw_db, tables, transaction::DbTxMut};
use reth_db::{
mdbx::{test_utils::create_test_rw_db, Env, WriteMap},
tables,
transaction::DbTxMut,
};
use reth_primitives::{
hex_literal::hex,
keccak256,
proofs::{genesis_state_root, KeccakHasher, EMPTY_ROOT},
Address, ChainSpec, MAINNET,
};
use std::{collections::HashMap, str::FromStr};
use std::{collections::HashMap, ops::Deref, str::FromStr};
use triehash::sec_trie_root;
fn create_test_loader<'tx, 'db>(
tx: &'tx Transaction<'db, Env<WriteMap>>,
) -> DBTrieLoader<'tx, 'db, Arc<Env<WriteMap>>> {
DBTrieLoader::new(tx.deref())
}
#[test]
fn empty_trie() {
let trie = DBTrieLoader::default();
let db = create_test_rw_db();
let tx = Transaction::new(db.as_ref()).unwrap();
assert_matches!(trie.calculate_root(&tx), Ok(got) if got == EMPTY_ROOT);
assert_matches!(
create_test_loader(&tx).calculate_root(),
Ok(got) if got == EMPTY_ROOT
);
}
#[test]
fn single_account_trie() {
let trie = DBTrieLoader::default();
let db = create_test_rw_db();
let tx = Transaction::new(db.as_ref()).unwrap();
let address = Address::from_str("9fe4abd71ad081f091bd06dd1c16f7e92927561e").unwrap();
@ -463,14 +485,13 @@ mod tests {
EthAccount::from(account).encode(&mut encoded_account);
let expected = H256(sec_trie_root::<KeccakHasher, _, _, _>([(address, encoded_account)]).0);
assert_matches!(
trie.calculate_root(&tx),
create_test_loader(&tx).calculate_root(),
Ok(got) if got == expected
);
}
#[test]
fn two_accounts_trie() {
let trie = DBTrieLoader::default();
let db = create_test_rw_db();
let tx = Transaction::new(db.as_ref()).unwrap();
@ -494,14 +515,13 @@ mod tests {
});
let expected = H256(sec_trie_root::<KeccakHasher, _, _, _>(encoded_accounts).0);
assert_matches!(
trie.calculate_root(&tx),
create_test_loader(&tx).calculate_root(),
Ok(got) if got == expected
);
}
#[test]
fn single_storage_trie() {
let trie = DBTrieLoader::default();
let db = create_test_rw_db();
let tx = Transaction::new(db.as_ref()).unwrap();
@ -522,14 +542,13 @@ mod tests {
});
let expected = H256(sec_trie_root::<KeccakHasher, _, _, _>(encoded_storage).0);
assert_matches!(
trie.calculate_storage_root(&tx, hashed_address),
create_test_loader(&tx).calculate_storage_root(hashed_address),
Ok(got) if got == expected
);
}
#[test]
fn single_account_with_storage_trie() {
let trie = DBTrieLoader::default();
let db = create_test_rw_db();
let tx = Transaction::new(db.as_ref()).unwrap();
@ -562,22 +581,19 @@ mod tests {
(k, out)
});
let eth_account = EthAccount::from_with_root(
account,
H256(sec_trie_root::<KeccakHasher, _, _, _>(encoded_storage).0),
);
let storage_root = H256(sec_trie_root::<KeccakHasher, _, _, _>(encoded_storage).0);
let eth_account = EthAccount::from(account).with_storage_root(storage_root);
eth_account.encode(&mut out);
let expected = H256(sec_trie_root::<KeccakHasher, _, _, _>([(address, out)]).0);
assert_matches!(
trie.calculate_root(&tx),
create_test_loader(&tx).calculate_root(),
Ok(got) if got == expected
);
}
#[test]
fn verify_genesis() {
let trie = DBTrieLoader::default();
let db = create_test_rw_db();
let mut tx = Transaction::new(db.as_ref()).unwrap();
let ChainSpec { genesis, .. } = MAINNET.clone();
@ -599,7 +615,7 @@ mod tests {
let state_root = genesis_state_root(&genesis.alloc);
assert_matches!(
trie.calculate_root(&tx),
create_test_loader(&tx).calculate_root(),
Ok(got) if got == state_root
);
}
@ -643,13 +659,12 @@ mod tests {
BTreeSet::from([keccak256(H256::zero()), keccak256(H256::from_low_u64_be(2))]),
)]);
assert_matches!(
DBTrieLoader::default().gather_changes(&tx, 32..33),
create_test_loader(&tx).gather_changes(32..33),
Ok(got) if got == expected
);
}
fn test_with_accounts(accounts: BTreeMap<Address, (Account, BTreeSet<StorageEntry>)>) {
let trie = DBTrieLoader::default();
let db = create_test_rw_db();
let tx = Transaction::new(db.as_ref()).unwrap();
@ -675,14 +690,14 @@ mod tests {
EMPTY_ROOT
};
let mut out = Vec::new();
EthAccount::from_with_root(account, storage_root).encode(&mut out);
EthAccount::from(account).with_storage_root(storage_root).encode(&mut out);
(address, out)
})
.collect::<Vec<(Address, Vec<u8>)>>();
let expected = H256(sec_trie_root::<KeccakHasher, _, _, _>(encoded_accounts).0);
assert_matches!(
trie.calculate_root(&tx),
create_test_loader(&tx).calculate_root(),
Ok(got) if got == expected
, "where expected is {expected:?}");
}