fix(stages): add commit threshold to merkle stage v2 (#1656)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
joshieDo
2023-03-14 12:47:16 +08:00
committed by GitHub
parent c5cd236e1a
commit 5b90cbc411
12 changed files with 476 additions and 180 deletions

1
Cargo.lock generated
View File

@ -4851,6 +4851,7 @@ dependencies = [
"itertools 0.10.5",
"parking_lot 0.12.1",
"proptest",
"reth-codecs",
"reth-db",
"reth-interfaces",
"reth-primitives",

View File

@ -44,10 +44,7 @@ pub(crate) async fn dump_merkle_stage<DB: Database>(
unwind_and_copy::<DB>(db_tool, (from, to), tip_block_number, &output_db).await?;
if should_run {
println!(
"\n# Merkle stage does not support dry run, so it will actually be committing changes."
);
run(output_db, to, from).await?;
dry_run(output_db, to, from).await?;
}
Ok(())
@ -113,7 +110,7 @@ async fn unwind_and_copy<DB: Database>(
}
/// Try to re-execute the stage straightaway
async fn run(
async fn dry_run(
output_db: reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
to: u64,
from: u64,
@ -121,18 +118,24 @@ async fn run(
info!(target: "reth::cli", "Executing stage.");
let mut tx = Transaction::new(&output_db)?;
MerkleStage::Execution {
clean_threshold: u64::MAX, // Forces updating the root instead of calculating from scratch
let mut exec_output = false;
while !exec_output {
exec_output = MerkleStage::Execution {
clean_threshold: u64::MAX, /* Forces updating the root instead of calculating from
* scratch */
}
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?
.done;
}
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?;
tx.drop()?;
info!(target: "reth::cli", "Success.");

View File

@ -34,6 +34,7 @@ mod withdrawal;
/// Helper function for calculating Merkle proofs and hashes
pub mod proofs;
pub use proofs::ProofCheckpoint;
pub use account::{Account, Bytecode};
pub use bits::H512;

View File

@ -8,6 +8,7 @@ use bytes::BytesMut;
use hash_db::Hasher;
use hex_literal::hex;
use plain_hasher::PlainHasher;
use reth_codecs::{main_codec, Compact};
use reth_rlp::Encodable;
use triehash::{ordered_trie_root, sec_trie_root};
@ -34,6 +35,20 @@ impl Hasher for KeccakHasher {
}
}
/// Saves the progress of MerkleStage
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq)]
pub struct ProofCheckpoint {
/// The next hashed account to insert into the trie.
pub hashed_address: Option<H256>,
/// The next storage entry to insert into the trie.
pub storage_key: Option<H256>,
/// Current intermediate root for `AccountsTrie`.
pub account_root: Option<H256>,
/// Current intermediate storage root from an account.
pub storage_root: Option<H256>,
}
/// Calculate a transaction root.
///
/// Iterates over the given transactions and the merkle merkle trie root of

View File

@ -123,7 +123,8 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
tx.insert_accounts_and_storages(start_state.clone()).unwrap();
// make first block after genesis have valid state root
let root = DBTrieLoader::default().calculate_root(&tx.inner()).unwrap();
let root =
DBTrieLoader::default().calculate_root(&tx.inner()).and_then(|e| e.root()).unwrap();
let second_block = blocks.get_mut(1).unwrap();
let cloned_second = second_block.clone();
let mut updated_header = cloned_second.header.unseal();
@ -144,7 +145,8 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
// make last block have valid state root
let root = {
let mut tx_mut = tx.inner();
let root = DBTrieLoader::default().calculate_root(&tx_mut).unwrap();
let root =
DBTrieLoader::default().calculate_root(&tx_mut).and_then(|e| e.root()).unwrap();
tx_mut.commit().unwrap();
root
};

View File

@ -1,7 +1,10 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_db::{database::Database, tables, transaction::DbTx};
use reth_interfaces::consensus;
use reth_provider::{trie::DBTrieLoader, Transaction};
use reth_provider::{
trie::{DBTrieLoader, TrieProgress},
Transaction,
};
use std::{fmt::Debug, ops::DerefMut};
use tracing::*;
@ -105,23 +108,32 @@ impl<DB: Database> Stage<DB> for MerkleStage {
let trie_root = if from_transition == to_transition {
block_root
} else if to_transition - from_transition > threshold || stage_progress == 0 {
debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Rebuilding trie");
// if there are more blocks than threshold it is faster to rebuild the trie
DBTrieLoader::new(tx.deref_mut())
.calculate_root()
.map_err(|e| StageError::Fatal(Box::new(e)))?
} else {
debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Updating trie");
// Iterate over changeset (similar to Hashing stages) and take new values
let current_root = tx.get_header(stage_progress)?.state_root;
DBTrieLoader::new(tx.deref_mut())
.update_root(current_root, from_transition..to_transition)
.map_err(|e| StageError::Fatal(Box::new(e)))?
let res = if to_transition - from_transition > threshold || stage_progress == 0 {
debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Rebuilding trie");
// if there are more blocks than threshold it is faster to rebuild the trie
let mut loader = DBTrieLoader::new(tx.deref_mut());
loader.calculate_root().map_err(|e| StageError::Fatal(Box::new(e)))?
} else {
debug!(target: "sync::stages::merkle::exec", current = ?stage_progress, target = ?previous_stage_progress, "Updating trie");
// Iterate over changeset (similar to Hashing stages) and take new values
let current_root = tx.get_header(stage_progress)?.state_root;
let mut loader = DBTrieLoader::new(tx.deref_mut());
loader
.update_root(current_root, from_transition..to_transition)
.map_err(|e| StageError::Fatal(Box::new(e)))?
};
match res {
TrieProgress::Complete(root) => root,
TrieProgress::InProgress(_) => {
return Ok(ExecOutput { stage_progress, done: false })
}
}
};
if block_root != trie_root {
warn!(target: "sync::stages::merkle::exec", ?previous_stage_progress, got = ?block_root, expected = ?trie_root, "Block's root state failed verification");
warn!(target: "sync::stages::merkle::exec", ?previous_stage_progress, got = ?trie_root, expected = ?block_root, "Block's root state failed verification");
return Err(StageError::Validation {
block: previous_stage_progress,
error: consensus::ConsensusError::BodyStateRootDiff {
@ -156,13 +168,28 @@ impl<DB: Database> Stage<DB> for MerkleStage {
}
let current_root = tx.get_header(input.stage_progress)?.state_root;
let from_transition = tx.get_block_transition(input.unwind_to)?;
let to_transition = tx.get_block_transition(input.stage_progress)?;
let block_root = DBTrieLoader::new(tx.deref_mut())
.update_root(current_root, from_transition..to_transition)
.map_err(|e| StageError::Fatal(Box::new(e)))?;
let mut loader = DBTrieLoader::new(tx.deref_mut());
let block_root = loop {
match loader
.update_root(current_root, from_transition..to_transition)
.map_err(|e| StageError::Fatal(Box::new(e)))?
{
TrieProgress::Complete(root) => break root,
TrieProgress::InProgress(_) => {
// Save the loader's progress & drop it to allow committing to the database,
// otherwise we're hitting the borrow checker
let progress = loader.current;
let _ = loader;
tx.commit()?;
// Reinstantiate the loader from where it was left off.
loader = DBTrieLoader::new(tx.deref_mut());
loader.current = progress;
}
}
};
if block_root != target_root {
let unwind_to = input.unwind_to;
@ -447,7 +474,10 @@ mod tests {
impl MerkleTestRunner {
fn state_root(&self) -> Result<H256, TestRunnerError> {
Ok(create_trie_loader(&self.tx.inner()).calculate_root().unwrap())
Ok(create_trie_loader(&self.tx.inner())
.calculate_root()
.and_then(|e| e.root())
.unwrap())
}
pub(crate) fn generate_initial_trie(
@ -459,8 +489,10 @@ mod tests {
)?;
let mut tx = self.tx.inner();
let root =
create_trie_loader(&tx).calculate_root().expect("couldn't create initial trie");
let root = create_trie_loader(&tx)
.calculate_root()
.and_then(|e| e.root())
.expect("couldn't create initial trie");
tx.commit()?;
@ -471,7 +503,10 @@ mod tests {
if previous_stage_progress != 0 {
let block_root =
self.tx.inner().get_header(previous_stage_progress).unwrap().state_root;
let root = create_trie_loader(&self.tx.inner()).calculate_root().unwrap();
let root = create_trie_loader(&self.tx().inner())
.calculate_root()
.and_then(|e| e.root())
.unwrap();
assert_eq!(block_root, root);
}
Ok(())

View File

@ -44,7 +44,8 @@ impl_compression_for_compact!(
StoredBlockBody,
StoredBlockOmmers,
StoredBlockWithdrawals,
Bytecode
Bytecode,
ProofCheckpoint
);
impl_compression_for_compact!(AccountBeforeTx, TransactionSigned);
impl_compression_for_compact!(CompactU256);

View File

@ -32,7 +32,7 @@ pub enum TableType {
}
/// Default tables that should be present inside database.
pub const TABLES: [(TableType, &str); 26] = [
pub const TABLES: [(TableType, &str); 27] = [
(TableType::Table, CanonicalHeaders::const_name()),
(TableType::Table, HeaderTD::const_name()),
(TableType::Table, HeaderNumbers::const_name()),
@ -59,6 +59,7 @@ pub const TABLES: [(TableType, &str); 26] = [
(TableType::DupSort, StoragesTrie::const_name()),
(TableType::Table, TxSenders::const_name()),
(TableType::Table, SyncStage::const_name()),
(TableType::Table, SyncStageProgress::const_name()),
];
#[macro_export]
@ -293,6 +294,11 @@ table!(
( SyncStage ) StageId | BlockNumber
);
table!(
/// Stores arbitrary data to keep track of a stage first-sync progress.
( SyncStageProgress ) StageId | Vec<u8>
);
///
/// Alias Types

View File

@ -13,8 +13,9 @@ reth-primitives = { path = "../../primitives" }
reth-interfaces = { path = "../../interfaces" }
reth-revm-primitives = { path = "../../revm/revm-primitives" }
reth-db = { path = "../db" }
reth-tracing = { path = "../../tracing" }
reth-rlp = { path = "../../rlp" }
reth-codecs = { path = "../codecs" }
reth-tracing = {path = "../../tracing"}
reth-rlp = {path = "../../rlp"}
revm-primitives = "1.0.0"

View File

@ -76,7 +76,7 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for LatestStateProviderRef<'a, 'b, TX>
.state_root;
let (account_proof, storage_root) = loader
.generate_acount_proof(self.db, root, hashed_address)
.generate_acount_proof(root, hashed_address)
.map_err(|_| ProviderError::StateTree)?;
let account_proof = account_proof.into_iter().map(Bytes::from).collect();
@ -86,7 +86,7 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for LatestStateProviderRef<'a, 'b, TX>
} else {
let hashed_keys: Vec<H256> = keys.iter().map(keccak256).collect();
loader
.generate_storage_proofs(self.db, storage_root, hashed_address, &hashed_keys)
.generate_storage_proofs(storage_root, hashed_address, &hashed_keys)
.map_err(|_| ProviderError::StateTree)?
.into_iter()
.map(|v| v.into_iter().map(Bytes::from).collect())

View File

@ -308,8 +308,8 @@ where
// merkle tree
{
let current_root = self.get_header(parent_block_number)?.state_root;
let loader = DBTrieLoader::new(self.deref_mut());
let root = loader.update_root(current_root, from..to)?;
let mut loader = DBTrieLoader::new(self.deref_mut());
let root = loader.update_root(current_root, from..to).and_then(|e| e.root())?;
if root != block.state_root {
return Err(TransactionError::StateTrieRootMismatch {
got: root,

View File

@ -1,5 +1,6 @@
use cita_trie::{PatriciaTrie, Trie};
use hasher::HasherKeccak;
use reth_codecs::Compact;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO},
models::{AccountBeforeTx, TransitionIdAddress},
@ -7,8 +8,8 @@ use reth_db::{
transaction::{DbTx, DbTxMut},
};
use reth_primitives::{
keccak256, proofs::EMPTY_ROOT, Account, Address, StorageEntry, StorageTrieEntry, TransitionId,
H256, KECCAK_EMPTY, U256,
keccak256, proofs::EMPTY_ROOT, Account, Address, ProofCheckpoint, StorageEntry,
StorageTrieEntry, TransitionId, H256, KECCAK_EMPTY, U256,
};
use reth_rlp::{
encode_fixed_size, Decodable, DecodeError, Encodable, RlpDecodable, RlpEncodable,
@ -38,6 +39,8 @@ pub enum TrieError {
/// Error when encoding/decoding a value.
#[error("{0:?}")]
DecodeError(#[from] DecodeError),
#[error("Trie requires committing a checkpoint.")]
UnexpectedCheckpoint,
}
/// Database wrapper implementing HashDB trait, with a read-write transaction.
@ -207,11 +210,18 @@ where
}
/// Database wrapper implementing HashDB trait, with a read-only transaction.
struct HashDatabase<'tx, 'itx, TX: DbTx<'itx>> {
pub struct HashDatabase<'tx, 'itx, TX: DbTx<'itx>> {
tx: &'tx TX,
_p: PhantomData<&'itx ()>, // to suppress "unused" lifetime 'itx
}
impl<'tx, 'itx, TX: DbTx<'itx>> HashDatabase<'tx, 'itx, TX> {
/// Creates a new Hash database with the given transaction
pub fn new(tx: &'tx TX) -> Self {
Self { tx, _p: Default::default() }
}
}
impl<'tx, 'itx, TX> cita_trie::DB for HashDatabase<'tx, 'itx, TX>
where
TX: DbTx<'itx>,
@ -250,12 +260,19 @@ impl<'tx, 'itx, TX: DbTx<'itx>> HashDatabase<'tx, 'itx, TX> {
}
/// Database wrapper implementing HashDB trait, with a read-only transaction.
struct DupHashDatabase<'tx, 'itx, TX: DbTx<'itx>> {
pub struct DupHashDatabase<'tx, 'itx, TX: DbTx<'itx>> {
tx: &'tx TX,
key: H256,
_p: PhantomData<&'itx ()>, // to suppress "unused" lifetime 'itx
}
impl<'tx, 'itx, TX: DbTx<'itx>> DupHashDatabase<'tx, 'itx, TX> {
/// Creates a new DupHash database with the given transaction and key.
pub fn new(tx: &'tx TX, key: H256) -> Self {
Self { tx, key, _p: Default::default() }
}
}
impl<'tx, 'itx, TX> cita_trie::DB for DupHashDatabase<'tx, 'itx, TX>
where
TX: DbTx<'itx>,
@ -338,14 +355,41 @@ pub type MerkleProof = Vec<Vec<u8>>;
/// Struct for calculating the root of a merkle patricia tree,
/// while populating the database with intermediate hashes.
#[derive(Debug)]
pub struct DBTrieLoader<'tx, TX> {
tx: &'tx TX,
/// The maximum number of keys to insert before committing. Both from `AccountsTrie` and
/// `StoragesTrie`.
pub commit_threshold: u64,
/// The current number of inserted keys from both `AccountsTrie` and `StoragesTrie`.
pub current: u64,
/// The transaction to use for inserting the trie nodes.
pub tx: &'tx TX,
}
/// Status of the trie calculation.
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum TrieProgress {
/// Trie has finished with the passed root.
Complete(H256),
/// Trie has hit its commit threshold.
InProgress(ProofCheckpoint),
}
impl TrieProgress {
/// Consumes the root from its `Complete` variant. If that's not possible, throw
/// `TrieError::UnexpectedCheckpoint`.
pub fn root(self) -> Result<H256, TrieError> {
match self {
Self::Complete(root) => Ok(root),
_ => Err(TrieError::UnexpectedCheckpoint),
}
}
}
impl<'tx, TX> DBTrieLoader<'tx, TX> {
/// Create new instance of trie loader.
pub fn new(tx: &'tx TX) -> Self {
Self { tx }
Self { tx, commit_threshold: 500_000, current: 0 }
}
}
@ -355,151 +399,243 @@ where
TX: DbTxMut<'db> + DbTx<'db> + Send + Sync,
{
/// Calculates the root of the state trie, saving intermediate hashes in the database.
pub fn calculate_root(&self) -> Result<H256, TrieError> {
self.tx.clear::<tables::AccountsTrie>()?;
self.tx.clear::<tables::StoragesTrie>()?;
pub fn calculate_root(&mut self) -> Result<TrieProgress, TrieError> {
let mut checkpoint = self.get_checkpoint()?;
if checkpoint.hashed_address.is_none() {
self.tx.clear::<tables::AccountsTrie>()?;
self.tx.clear::<tables::StoragesTrie>()?;
}
let previous_root = checkpoint.account_root.unwrap_or(EMPTY_ROOT);
let hasher = Arc::new(HasherKeccak::new());
let mut trie = if let Some(root) = checkpoint.account_root {
PatriciaTrie::from(
Arc::new(HashDatabaseMut::from_root(self.tx, root)?),
hasher,
root.as_bytes(),
)?
} else {
PatriciaTrie::new(Arc::new(HashDatabaseMut::new(self.tx)?), hasher)
};
let mut accounts_cursor = self.tx.cursor_read::<tables::HashedAccount>()?;
let mut walker = accounts_cursor.walk(None)?;
let db = Arc::new(HashDatabaseMut::new(self.tx)?);
let hasher = Arc::new(HasherKeccak::new());
let mut trie = PatriciaTrie::new(Arc::clone(&db), Arc::clone(&hasher));
let mut walker = accounts_cursor.walk(checkpoint.hashed_address.take())?;
while let Some((hashed_address, account)) = walker.next().transpose()? {
let value = EthAccount::from(account)
.with_storage_root(self.calculate_storage_root(hashed_address)?);
match self.calculate_storage_root(
hashed_address,
checkpoint.storage_key.take(),
checkpoint.storage_root.take(),
)? {
TrieProgress::Complete(root) => {
let value = EthAccount::from(account).with_storage_root(root);
let mut out = Vec::new();
Encodable::encode(&value, &mut out);
trie.insert(hashed_address.as_bytes().to_vec(), out)?;
let mut out = Vec::new();
Encodable::encode(&value, &mut out);
trie.insert(hashed_address.as_bytes().to_vec(), out)?;
if self.has_hit_threshold() {
return self.save_account_checkpoint(
ProofCheckpoint::default(),
self.replace_account_root(&mut trie, previous_root)?,
hashed_address,
)
}
}
TrieProgress::InProgress(checkpoint) => {
return self.save_account_checkpoint(
checkpoint,
self.replace_account_root(&mut trie, previous_root)?,
hashed_address,
)
}
}
}
let root = H256::from_slice(trie.root()?.as_slice());
Ok(root)
// Reset inner stage progress
self.save_checkpoint(ProofCheckpoint::default())?;
Ok(TrieProgress::Complete(self.replace_account_root(&mut trie, previous_root)?))
}
/// Calculate the accounts storage root.
pub fn calculate_storage_root(&self, address: H256) -> Result<H256, TrieError> {
let db = Arc::new(DupHashDatabaseMut::new(self.tx, address)?);
let hasher = Arc::new(HasherKeccak::new());
let mut trie = PatriciaTrie::new(Arc::clone(&db), Arc::clone(&hasher));
fn calculate_storage_root(
&mut self,
address: H256,
next_storage: Option<H256>,
previous_root: Option<H256>,
) -> Result<TrieProgress, TrieError> {
let mut storage_cursor = self.tx.cursor_dup_read::<tables::HashedStorage>()?;
// Should be able to use walk_dup, but any call to next() causes an assert fail in mdbx.c
// let mut walker = storage_cursor.walk_dup(address, H256::zero())?;
let mut current = storage_cursor.seek_by_key_subkey(address, H256::zero())?;
let hasher = Arc::new(HasherKeccak::new());
let (mut current_entry, mut trie) = if let Some(entry) = next_storage {
(
storage_cursor.seek_by_key_subkey(address, entry)?.filter(|e| e.key == entry),
PatriciaTrie::from(
Arc::new(DupHashDatabaseMut::from_root(
self.tx,
address,
previous_root.expect("is some"),
)?),
hasher,
previous_root.expect("is some").as_bytes(),
)?,
)
} else {
(
storage_cursor.seek_by_key_subkey(address, H256::zero())?,
PatriciaTrie::new(Arc::new(DupHashDatabaseMut::new(self.tx, address)?), hasher),
)
};
while let Some(StorageEntry { key: storage_key, value }) = current {
let previous_root = previous_root.unwrap_or(EMPTY_ROOT);
while let Some(StorageEntry { key: storage_key, value }) = current_entry {
let out = encode_fixed_size(&value).to_vec();
trie.insert(storage_key.to_vec(), out)?;
current = storage_cursor.next_dup()?.map(|(_, v)| v);
// Should be able to use walk_dup, but any call to next() causes an assert fail in
// mdbx.c
current_entry = storage_cursor.next_dup()?.map(|(_, v)| v);
let threshold = self.has_hit_threshold();
if let Some(current_entry) = current_entry {
if threshold {
return Ok(TrieProgress::InProgress(ProofCheckpoint {
storage_root: Some(self.replace_storage_root(
trie,
address,
previous_root,
)?),
storage_key: Some(current_entry.key),
..Default::default()
}))
}
}
}
let root = H256::from_slice(trie.root()?.as_slice());
// if root is empty remove it from db
if root == EMPTY_ROOT {
self.tx.delete::<tables::StoragesTrie>(address, None)?;
}
Ok(root)
Ok(TrieProgress::Complete(self.replace_storage_root(trie, address, previous_root)?))
}
/// Calculates the root of the state trie by updating an existing trie.
pub fn update_root(
&self,
root: H256,
&mut self,
mut previous_root: H256,
tid_range: Range<TransitionId>,
) -> Result<H256, TrieError> {
) -> Result<TrieProgress, TrieError> {
let mut checkpoint = self.get_checkpoint()?;
if let Some(account_root) = checkpoint.account_root.take() {
previous_root = account_root;
}
let next_acc = checkpoint.hashed_address.take();
let changed_accounts = self
.gather_changes(tid_range)?
.into_iter()
.skip_while(|(addr, _)| next_acc.is_some() && next_acc.expect("is some") != *addr);
let mut trie = PatriciaTrie::from(
Arc::new(HashDatabaseMut::from_root(self.tx, previous_root)?),
Arc::new(HasherKeccak::new()),
previous_root.as_bytes(),
)?;
let mut accounts_cursor = self.tx.cursor_read::<tables::HashedAccount>()?;
let changed_accounts = self.gather_changes(tid_range)?;
let db = Arc::new(HashDatabaseMut::from_root(self.tx, root)?);
let hasher = Arc::new(HasherKeccak::new());
let mut trie = PatriciaTrie::from(Arc::clone(&db), Arc::clone(&hasher), root.as_bytes())?;
for (address, changed_storages) in changed_accounts {
let storage_root = if let Some(account) = trie.get(address.as_slice())? {
trie.remove(address.as_bytes())?;
for (hashed_address, changed_storages) in changed_accounts {
let res = if let Some(account) = trie.get(hashed_address.as_slice())? {
trie.remove(hashed_address.as_bytes())?;
let storage_root = EthAccount::decode(&mut account.as_slice())?.storage_root;
self.update_storage_root(storage_root, address, changed_storages)?
self.update_storage_root(
checkpoint.storage_root.take().unwrap_or(storage_root),
hashed_address,
changed_storages,
checkpoint.storage_key.take(),
)?
} else {
self.calculate_storage_root(address)?
self.calculate_storage_root(
hashed_address,
checkpoint.storage_key.take(),
checkpoint.storage_root.take(),
)?
};
if let Some((_, account)) = accounts_cursor.seek_exact(address)? {
let storage_root = match res {
TrieProgress::Complete(root) => root,
TrieProgress::InProgress(checkpoint) => {
return self.save_account_checkpoint(
checkpoint,
self.replace_account_root(&mut trie, previous_root)?,
hashed_address,
)
}
};
if let Some((_, account)) = accounts_cursor.seek_exact(hashed_address)? {
let value = EthAccount::from(account).with_storage_root(storage_root);
let mut out = Vec::new();
Encodable::encode(&value, &mut out);
trie.insert(address.as_bytes().to_vec(), out)?;
trie.insert(hashed_address.as_bytes().to_vec(), out)?;
if self.has_hit_threshold() {
return self.save_account_checkpoint(
ProofCheckpoint::default(),
self.replace_account_root(&mut trie, previous_root)?,
hashed_address,
)
}
}
}
let new_root = H256::from_slice(trie.root()?.as_slice());
if new_root != root {
let mut cursor = self.tx.cursor_write::<tables::AccountsTrie>()?;
if cursor.seek_exact(root)?.is_some() {
cursor.delete_current()?;
}
}
// Reset inner stage progress
self.save_checkpoint(ProofCheckpoint::default())?;
Ok(new_root)
Ok(TrieProgress::Complete(self.replace_account_root(&mut trie, previous_root)?))
}
/// Update the account's storage root
pub fn update_storage_root(
&self,
root: H256,
fn update_storage_root(
&mut self,
previous_root: H256,
address: H256,
changed_storages: BTreeSet<H256>,
) -> Result<H256, TrieError> {
let db = Arc::new(DupHashDatabaseMut::from_root(self.tx, address, root)?);
next_storage: Option<H256>,
) -> Result<TrieProgress, TrieError> {
let mut hashed_storage_cursor = self.tx.cursor_dup_read::<tables::HashedStorage>()?;
let mut trie = PatriciaTrie::new(
Arc::new(DupHashDatabaseMut::from_root(self.tx, address, previous_root)?),
Arc::new(HasherKeccak::new()),
);
let hasher = Arc::new(HasherKeccak::new());
let mut trie = PatriciaTrie::from(Arc::clone(&db), Arc::clone(&hasher), root.as_bytes())?;
let mut storage_cursor = self.tx.cursor_dup_read::<tables::HashedStorage>()?;
let changed_storages = changed_storages
.into_iter()
.skip_while(|k| next_storage.is_some() && *k == next_storage.expect("is some"));
for key in changed_storages {
if let Some(StorageEntry { value, .. }) =
storage_cursor.seek_by_key_subkey(address, key)?.filter(|e| e.key == key)
hashed_storage_cursor.seek_by_key_subkey(address, key)?.filter(|e| e.key == key)
{
let out = encode_fixed_size(&value).to_vec();
trie.insert(key.as_bytes().to_vec(), out)?;
if self.has_hit_threshold() {
return Ok(TrieProgress::InProgress(ProofCheckpoint {
storage_root: Some(self.replace_storage_root(
trie,
address,
previous_root,
)?),
storage_key: Some(key),
..Default::default()
}))
}
} else {
trie.remove(key.as_bytes())?;
}
}
let new_root = H256::from_slice(trie.root()?.as_slice());
if new_root != root {
let mut cursor = self.tx.cursor_dup_write::<tables::StoragesTrie>()?;
if cursor
.seek_by_key_subkey(address, root)?
.filter(|entry| entry.hash == root)
.is_some()
{
cursor.delete_current()?;
}
}
// if root is empty remove it from db
if new_root == EMPTY_ROOT {
self.tx.delete::<tables::StoragesTrie>(address, None)?;
}
Ok(new_root)
Ok(TrieProgress::Complete(self.replace_storage_root(trie, address, previous_root)?))
}
fn gather_changes(
@ -537,6 +673,102 @@ where
Ok(hashed_changes)
}
fn save_account_checkpoint(
&mut self,
mut checkpoint: ProofCheckpoint,
root: H256,
hashed_address: H256,
) -> Result<TrieProgress, TrieError> {
checkpoint.account_root = Some(root);
checkpoint.hashed_address = Some(hashed_address);
debug!(target: "sync::stages::merkle::exec", account = ?hashed_address, storage = ?checkpoint.storage_key, "Saving inner trie checkpoint");
self.save_checkpoint(checkpoint)?;
Ok(TrieProgress::InProgress(checkpoint))
}
fn has_hit_threshold(&mut self) -> bool {
self.current += 1;
self.current >= self.commit_threshold
}
/// Saves the trie progress
pub fn save_checkpoint(&mut self, checkpoint: ProofCheckpoint) -> Result<(), TrieError> {
let mut buf = vec![];
checkpoint.to_compact(&mut buf);
// It allows unwind (which commits), to reuse this instance.
self.current = 0;
Ok(self.tx.put::<tables::SyncStageProgress>("TrieLoader".into(), buf)?)
}
/// Gets the trie progress
pub fn get_checkpoint(&self) -> Result<ProofCheckpoint, TrieError> {
let buf =
self.tx.get::<tables::SyncStageProgress>("TrieLoader".into())?.unwrap_or_default();
if buf.is_empty() {
return Ok(ProofCheckpoint::default())
}
let (checkpoint, _) = ProofCheckpoint::from_compact(&buf, buf.len());
if checkpoint.account_root.is_some() {
debug!(target: "sync::stages::merkle::exec", checkpoint = ?checkpoint, "Continuing inner trie checkpoint");
}
Ok(checkpoint)
}
/// Finds the most recent account trie root and removes the previous one if applicable.
fn replace_account_root(
&self,
trie: &mut PatriciaTrie<HashDatabaseMut<'_, TX>, HasherKeccak>,
previous_root: H256,
) -> Result<H256, TrieError> {
let new_root = H256::from_slice(trie.root()?.as_slice());
if new_root != previous_root {
let mut cursor = self.tx.cursor_write::<tables::AccountsTrie>()?;
if cursor.seek_exact(previous_root)?.is_some() {
cursor.delete_current()?;
}
}
Ok(new_root)
}
/// Finds the most recent storage trie root and removes the previous one if applicable.
fn replace_storage_root(
&self,
mut trie: PatriciaTrie<DupHashDatabaseMut<'_, TX>, HasherKeccak>,
address: H256,
previous_root: H256,
) -> Result<H256, TrieError> {
let new_root = H256::from_slice(trie.root()?.as_slice());
if new_root != previous_root {
let mut trie_cursor = self.tx.cursor_dup_write::<tables::StoragesTrie>()?;
if trie_cursor
.seek_by_key_subkey(address, previous_root)?
.filter(|entry| entry.hash == previous_root)
.is_some()
{
trie_cursor.delete_current()?;
}
}
if new_root == EMPTY_ROOT {
self.tx.delete::<tables::StoragesTrie>(address, None)?;
}
Ok(new_root)
}
}
// Read-only impls
@ -545,13 +777,12 @@ where
TX: DbTx<'db> + Send + Sync,
{
/// Returns a Merkle proof of the given account, plus its storage root hash.
pub fn generate_acount_proof<'itx>(
pub fn generate_acount_proof(
&self,
tx: &'tx impl DbTx<'itx>,
root: H256,
address: H256,
) -> Result<(MerkleProof, H256), TrieError> {
let db = Arc::new(HashDatabase::from_root(tx, root)?);
let db = Arc::new(HashDatabase::from_root(self.tx, root)?);
let hasher = Arc::new(HasherKeccak::new());
let trie = PatriciaTrie::from(Arc::clone(&db), Arc::clone(&hasher), root.as_bytes())?;
@ -565,14 +796,13 @@ where
}
/// Returns a Merkle proof of the given storage keys, starting at the given root hash.
pub fn generate_storage_proofs<'itx>(
pub fn generate_storage_proofs(
&self,
tx: &'tx impl DbTx<'itx>,
storage_root: H256,
address: H256,
keys: &[H256],
) -> Result<Vec<MerkleProof>, TrieError> {
let db = Arc::new(DupHashDatabase::from_root(tx, address, storage_root)?);
let db = Arc::new(DupHashDatabase::from_root(self.tx, address, storage_root)?);
let hasher = Arc::new(HasherKeccak::new());
let trie =
@ -588,6 +818,7 @@ where
#[cfg(test)]
mod tests {
use crate::Transaction;
use std::ops::DerefMut;
use super::*;
use assert_matches::assert_matches;
@ -648,7 +879,7 @@ mod tests {
let tx = Transaction::new(db.as_ref()).unwrap();
assert_matches!(
create_test_loader(&tx).calculate_root(),
Ok(got) if got == EMPTY_ROOT
Ok(got) if got.root().unwrap() == EMPTY_ROOT
);
}
@ -664,14 +895,15 @@ mod tests {
let expected = H256(sec_trie_root::<KeccakHasher, _, _, _>([(address, encoded_account)]).0);
assert_matches!(
create_test_loader(&tx).calculate_root(),
Ok(got) if got == expected
Ok(got) if got.root().unwrap() == expected
);
}
#[test]
fn two_accounts_trie() {
let db = create_test_rw_db();
let tx = Transaction::new(db.as_ref()).unwrap();
let mut tx = Transaction::new(db.as_ref()).unwrap();
let mut trie = DBTrieLoader::new(tx.deref_mut());
let accounts = [
(
@ -684,7 +916,7 @@ mod tests {
),
];
for (address, account) in accounts {
tx.put::<tables::HashedAccount>(keccak256(address), account).unwrap();
trie.tx.put::<tables::HashedAccount>(keccak256(address), account).unwrap();
}
let encoded_accounts = accounts.iter().map(|(k, v)| {
let mut out = Vec::new();
@ -693,26 +925,28 @@ mod tests {
});
let expected = H256(sec_trie_root::<KeccakHasher, _, _, _>(encoded_accounts).0);
assert_matches!(
create_test_loader(&tx).calculate_root(),
Ok(got) if got == expected
trie.calculate_root(),
Ok(got) if got.root().unwrap() == expected
);
}
#[test]
fn single_storage_trie() {
let db = create_test_rw_db();
let tx = Transaction::new(db.as_ref()).unwrap();
let mut tx = Transaction::new(db.as_ref()).unwrap();
let mut trie = DBTrieLoader::new(tx.deref_mut());
let address = Address::from_str("9fe4abd71ad081f091bd06dd1c16f7e92927561e").unwrap();
let hashed_address = keccak256(address);
let storage = Vec::from([(H256::from_low_u64_be(2), U256::from(1))]);
for (k, v) in storage.clone() {
tx.put::<tables::HashedStorage>(
hashed_address,
StorageEntry { key: keccak256(k), value: v },
)
.unwrap();
trie.tx
.put::<tables::HashedStorage>(
hashed_address,
StorageEntry { key: keccak256(k), value: v },
)
.unwrap();
}
let encoded_storage = storage.iter().map(|(k, v)| {
let out = encode_fixed_size(v).to_vec();
@ -720,8 +954,8 @@ mod tests {
});
let expected = H256(sec_trie_root::<KeccakHasher, _, _, _>(encoded_storage).0);
assert_matches!(
create_test_loader(&tx).calculate_storage_root(hashed_address),
Ok(got) if got == expected
trie.calculate_storage_root(hashed_address, None, None),
Ok(got) if got.root().unwrap() == expected
);
}
@ -766,7 +1000,7 @@ mod tests {
let expected = H256(sec_trie_root::<KeccakHasher, _, _, _>([(address, out)]).0);
assert_matches!(
create_test_loader(&tx).calculate_root(),
Ok(got) if got == expected
Ok(got) if got.root().unwrap() == expected
);
}
@ -781,7 +1015,7 @@ mod tests {
assert_matches!(
create_test_loader(&tx).calculate_root(),
Ok(got) if got == state_root
Ok(got) if got.root().unwrap() == state_root
);
}
@ -863,7 +1097,7 @@ mod tests {
let expected = H256(sec_trie_root::<KeccakHasher, _, _, _>(encoded_accounts).0);
assert_matches!(
create_test_loader(&tx).calculate_root(),
Ok(got) if got == expected
Ok(got) if got.root().unwrap() == expected
, "where expected is {expected:?}");
}
@ -882,8 +1116,8 @@ mod tests {
load_mainnet_genesis_root(&mut tx);
let root = {
let trie = create_test_loader(&tx);
trie.calculate_root().expect("should be able to load trie")
let mut trie = create_test_loader(&tx);
trie.calculate_root().expect("should be able to load trie").root().unwrap()
};
tx.commit().unwrap();
@ -891,9 +1125,8 @@ mod tests {
let address = Address::from(hex!("000d836201318ec6899a67540690382780743280"));
let trie = create_test_loader(&tx);
let (proof, storage_root) = trie
.generate_acount_proof(&tx.inner().tx().unwrap(), root, keccak256(address))
.expect("failed to generate proof");
let (proof, storage_root) =
trie.generate_acount_proof(root, keccak256(address)).expect("failed to generate proof");
// values extracted from geth via rpc:
// {
@ -947,16 +1180,15 @@ mod tests {
}
let root = {
let trie = create_test_loader(&tx);
trie.calculate_root().expect("should be able to load trie")
let mut trie = create_test_loader(&tx);
trie.calculate_root().expect("should be able to load trie").root().unwrap()
};
tx.commit().unwrap();
let trie = create_test_loader(&tx);
let (account_proof, storage_root) = trie
.generate_acount_proof(&tx.inner().tx().unwrap(), root, hashed_address)
.expect("failed to generate proof");
let (account_proof, storage_root) =
trie.generate_acount_proof(root, hashed_address).expect("failed to generate proof");
// values extracted from geth via rpc:
let expected_account = hex!("f86fa1205126413e7857595763591580306b3f228f999498c4c5dfa74f633364936e7651b84bf849819b8418b0d164a029ff6f4d518044318d75b118cf439d8d3d7249c8afcba06ba9ecdf8959410571a02ce1a85814ad94a94ed2a1abaf7c57e9b64326622c1b8c21b4ba4d0e7df61392").as_slice();
@ -980,7 +1212,6 @@ mod tests {
let storage_proofs = trie
.generate_storage_proofs(
&tx.inner().tx().unwrap(),
storage_root,
hashed_address,
&[keccak256(H256::from_low_u64_be(2)), keccak256(H256::zero())],