From 46c96a146616ffd440edd7cec1402dfcf2dc207c Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 15 Mar 2023 02:25:54 +0800 Subject: [PATCH] feat(stages): add table checkpoint to `AccountHashing` and `StorageHashing` (#1667) Co-authored-by: Georgios Konstantopoulos --- Cargo.lock | 1 + bin/reth/src/dump_stage/hashing_account.rs | 29 +-- bin/reth/src/dump_stage/hashing_storage.rs | 46 ++++- crates/primitives/src/checkpoints.rs | 42 +++++ crates/primitives/src/lib.rs | 3 +- crates/primitives/src/proofs.rs | 19 +- crates/stages/Cargo.toml | 1 + crates/stages/src/stages/hashing_account.rs | 117 ++++++++---- crates/stages/src/stages/hashing_storage.rs | 195 +++++++++++++------- 9 files changed, 317 insertions(+), 136 deletions(-) create mode 100644 crates/primitives/src/checkpoints.rs diff --git a/Cargo.lock b/Cargo.lock index 89b60ecad..5b4fb682d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5099,6 +5099,7 @@ dependencies = [ "proptest", "rand 0.8.5", "rayon", + "reth-codecs", "reth-db", "reth-downloaders", "reth-eth-wire", diff --git a/bin/reth/src/dump_stage/hashing_account.rs b/bin/reth/src/dump_stage/hashing_account.rs index e5f6ccf01..5f43ce61a 100644 --- a/bin/reth/src/dump_stage/hashing_account.rs +++ b/bin/reth/src/dump_stage/hashing_account.rs @@ -36,8 +36,7 @@ pub(crate) async fn dump_hashing_account_stage( unwind_and_copy::(db_tool, from, tip_block_number, &output_db).await?; if should_run { - println!("\n# AccountHashing stage does not support dry run, so it will actually be committing changes."); - run(output_db, to, from).await?; + dry_run(output_db, to, from).await?; } Ok(()) @@ -69,7 +68,7 @@ async fn unwind_and_copy( } /// Try to re-execute the stage straightaway -async fn run( +async fn dry_run( output_db: reth_db::mdbx::Env, to: u64, from: u64, @@ -82,15 +81,21 @@ async fn run( ..Default::default() }; - exec_stage - .execute( - &mut tx, - reth_stages::ExecInput { - previous_stage: Some((StageId("Another"), to)), - stage_progress: Some(from), - }, - ) - .await?; + let mut exec_output = false; + while !exec_output { + exec_output = exec_stage + .execute( + &mut tx, + reth_stages::ExecInput { + previous_stage: Some((StageId("Another"), to)), + stage_progress: Some(from), + }, + ) + .await? + .done; + } + + tx.drop()?; info!(target: "reth::cli", "Success."); diff --git a/bin/reth/src/dump_stage/hashing_storage.rs b/bin/reth/src/dump_stage/hashing_storage.rs index 557c6a956..03cbb74a3 100644 --- a/bin/reth/src/dump_stage/hashing_storage.rs +++ b/bin/reth/src/dump_stage/hashing_storage.rs @@ -6,8 +6,9 @@ use crate::{ use eyre::Result; use reth_db::{database::Database, table::TableImporter, tables}; use reth_provider::Transaction; -use reth_stages::{stages::StorageHashingStage, Stage, UnwindInput}; +use reth_stages::{stages::StorageHashingStage, Stage, StageId, UnwindInput}; use std::ops::DerefMut; +use tracing::info; pub(crate) async fn dump_hashing_storage_stage( db_tool: &mut DbTool<'_, DB>, @@ -16,14 +17,14 @@ pub(crate) async fn dump_hashing_storage_stage( output_db: &PlatformPath, should_run: bool, ) -> Result<()> { - if should_run { - eyre::bail!("StorageHashing stage does not support dry run.") - } - let (output_db, tip_block_number) = setup::(from, to, output_db, db_tool)?; unwind_and_copy::(db_tool, from, tip_block_number, &output_db).await?; + if should_run { + dry_run(output_db, to, from).await?; + } + Ok(()) } @@ -53,3 +54,38 @@ async fn unwind_and_copy( Ok(()) } + +/// Try to re-execute the stage straightaway +async fn dry_run( + output_db: reth_db::mdbx::Env, + to: u64, + from: u64, +) -> eyre::Result<()> { + info!(target: "reth::cli", "Executing stage."); + + let mut tx = Transaction::new(&output_db)?; + let mut exec_stage = StorageHashingStage { + clean_threshold: 1, // Forces hashing from scratch + ..Default::default() + }; + + let mut exec_output = false; + while !exec_output { + exec_output = exec_stage + .execute( + &mut tx, + reth_stages::ExecInput { + previous_stage: Some((StageId("Another"), to)), + stage_progress: Some(from), + }, + ) + .await? + .done; + } + + tx.drop()?; + + info!(target: "reth::cli", "Success."); + + Ok(()) +} diff --git a/crates/primitives/src/checkpoints.rs b/crates/primitives/src/checkpoints.rs new file mode 100644 index 000000000..62a90bfa0 --- /dev/null +++ b/crates/primitives/src/checkpoints.rs @@ -0,0 +1,42 @@ +use crate::{Address, H256}; +use reth_codecs::{main_codec, Compact}; + +/// Saves the progress of MerkleStage +#[main_codec] +#[derive(Default, Debug, Copy, Clone, PartialEq)] +pub struct ProofCheckpoint { + /// The next hashed account to insert into the trie. + pub hashed_address: Option, + /// The next storage entry to insert into the trie. + pub storage_key: Option, + /// Current intermediate root for `AccountsTrie`. + pub account_root: Option, + /// Current intermediate storage root from an account. + pub storage_root: Option, +} + +/// Saves the progress of AccountHashing +#[main_codec] +#[derive(Default, Debug, Copy, Clone, PartialEq)] +pub struct AccountHashingCheckpoint { + /// The next account to start hashing from + pub address: Option
, + /// Start transition id + pub from: u64, + /// Last transition id + pub to: u64, +} + +/// Saves the progress of StorageHashing +#[main_codec] +#[derive(Default, Debug, Copy, Clone, PartialEq)] +pub struct StorageHashingCheckpoint { + /// The next account to start hashing from + pub address: Option
, + /// The next storage slot to start hashing from + pub storage: Option, + /// Start transition id + pub from: u64, + /// Last transition id + pub to: u64, +} diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index 6c6585e06..b6b60c376 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -14,6 +14,7 @@ mod bits; mod block; pub mod bloom; mod chain; +mod checkpoints; pub mod constants; pub mod contract; mod error; @@ -34,7 +35,6 @@ mod withdrawal; /// Helper function for calculating Merkle proofs and hashes pub mod proofs; -pub use proofs::ProofCheckpoint; pub use account::{Account, Bytecode}; pub use bits::H512; @@ -46,6 +46,7 @@ pub use chain::{ AllGenesisFormats, Chain, ChainInfo, ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI, MAINNET, SEPOLIA, }; +pub use checkpoints::{AccountHashingCheckpoint, ProofCheckpoint, StorageHashingCheckpoint}; pub use constants::{ EMPTY_OMMER_ROOT, GOERLI_GENESIS, KECCAK_EMPTY, MAINNET_GENESIS, SEPOLIA_GENESIS, }; diff --git a/crates/primitives/src/proofs.rs b/crates/primitives/src/proofs.rs index c94a68aad..28f7f6e8b 100644 --- a/crates/primitives/src/proofs.rs +++ b/crates/primitives/src/proofs.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use crate::{ keccak256, Address, Bytes, GenesisAccount, Header, Log, Receipt, TransactionSigned, Withdrawal, H256, @@ -8,8 +6,8 @@ use bytes::BytesMut; use hash_db::Hasher; use hex_literal::hex; use plain_hasher::PlainHasher; -use reth_codecs::{main_codec, Compact}; use reth_rlp::Encodable; +use std::collections::HashMap; use triehash::{ordered_trie_root, sec_trie_root}; /// Keccak-256 hash of the RLP of an empty list, KEC("\xc0"). @@ -35,23 +33,8 @@ impl Hasher for KeccakHasher { } } -/// Saves the progress of MerkleStage -#[main_codec] -#[derive(Default, Debug, Copy, Clone, PartialEq)] -pub struct ProofCheckpoint { - /// The next hashed account to insert into the trie. - pub hashed_address: Option, - /// The next storage entry to insert into the trie. - pub storage_key: Option, - /// Current intermediate root for `AccountsTrie`. - pub account_root: Option, - /// Current intermediate storage root from an account. - pub storage_root: Option, -} - /// Calculate a transaction root. /// -/// Iterates over the given transactions and the merkle merkle trie root of /// `(rlp(index), encoded(tx))` pairs. pub fn calculate_transaction_root<'a>( transactions: impl IntoIterator, diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index b0ec65911..85c10f4c5 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -18,6 +18,7 @@ normal = [ reth-primitives = { path = "../primitives" } reth-interfaces = { path = "../interfaces" } reth-db = { path = "../storage/db" } +reth-codecs = { path = "../storage/codecs" } reth-provider = { path = "../storage/provider" } reth-metrics-derive = { path = "../metrics/metrics-derive" } diff --git a/crates/stages/src/stages/hashing_account.rs b/crates/stages/src/stages/hashing_account.rs index a6ea74375..a129a5bcb 100644 --- a/crates/stages/src/stages/hashing_account.rs +++ b/crates/stages/src/stages/hashing_account.rs @@ -1,11 +1,12 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use reth_codecs::Compact; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, tables, transaction::{DbTx, DbTxMut}, }; -use reth_primitives::keccak256; +use reth_primitives::{keccak256, AccountHashingCheckpoint}; use reth_provider::Transaction; use std::{collections::BTreeMap, fmt::Debug, ops::Range}; use tracing::*; @@ -30,6 +31,43 @@ impl Default for AccountHashingStage { } } +impl AccountHashingStage { + /// Saves the hashing progress + pub fn save_checkpoint( + &mut self, + tx: &Transaction<'_, DB>, + checkpoint: AccountHashingCheckpoint, + ) -> Result<(), StageError> { + debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?checkpoint, "Saving inner account hashing checkpoint"); + + let mut buf = vec![]; + checkpoint.to_compact(&mut buf); + + Ok(tx.put::(ACCOUNT_HASHING.0.into(), buf)?) + } + + /// Gets the hashing progress + pub fn get_checkpoint( + &self, + tx: &Transaction<'_, DB>, + ) -> Result { + let buf = + tx.get::(ACCOUNT_HASHING.0.into())?.unwrap_or_default(); + + if buf.is_empty() { + return Ok(AccountHashingCheckpoint::default()) + } + + let (checkpoint, _) = AccountHashingCheckpoint::from_compact(&buf, buf.len()); + + if checkpoint.address.is_some() { + debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?checkpoint, "Continuing inner account hashing checkpoint"); + } + + Ok(checkpoint) + } +} + #[derive(Clone, Debug)] /// `SeedOpts` provides configuration parameters for calling `AccountHashingStage::seed` /// in unit tests or benchmarks to generate an initial database state for running the @@ -137,43 +175,58 @@ impl Stage for AccountHashingStage { // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as // genesis accounts are not in changeset. if to_transition - from_transition > self.clean_threshold || stage_progress == 0 { - // clear table, load all accounts and hash it - tx.clear::()?; - tx.commit()?; + let mut checkpoint = self.get_checkpoint(tx)?; - let mut first_key = None; - loop { - let next_key = { - let mut accounts = tx.cursor_read::()?; + if checkpoint.address.is_none() || + // Checkpoint is no longer valid if the range of transitions changed. + // An already hashed account may have been changed with the new range, and therefore should be hashed again. + checkpoint.to != to_transition || + checkpoint.from != from_transition + { + // clear table, load all accounts and hash it + tx.clear::()?; - let hashed_batch = accounts - .walk(first_key)? - .take(self.commit_threshold as usize) - .map(|res| res.map(|(address, account)| (keccak256(address), account))) - .collect::, _>>()?; + checkpoint = AccountHashingCheckpoint::default(); + self.save_checkpoint(tx, checkpoint)?; + } - let mut hashed_account_cursor = tx.cursor_write::()?; + let start_address = checkpoint.address.take(); + let next_address = { + let mut accounts = tx.cursor_read::()?; - // iterate and put presorted hashed accounts - if first_key.is_none() { - hashed_batch - .into_iter() - .try_for_each(|(k, v)| hashed_account_cursor.append(k, v))?; - } else { - hashed_batch - .into_iter() - .try_for_each(|(k, v)| hashed_account_cursor.insert(k, v))?; - } + let hashed_batch = accounts + .walk(start_address)? + .take(self.commit_threshold as usize) + .map(|res| res.map(|(address, account)| (keccak256(address), account))) + .collect::, _>>()?; - // next key of iterator - accounts.next()? - }; - tx.commit()?; - if let Some((next_key, _)) = next_key { - first_key = Some(next_key); - continue + let mut hashed_account_cursor = tx.cursor_write::()?; + + // iterate and put presorted hashed accounts + if start_address.is_none() { + hashed_batch + .into_iter() + .try_for_each(|(k, v)| hashed_account_cursor.append(k, v))?; + } else { + hashed_batch + .into_iter() + .try_for_each(|(k, v)| hashed_account_cursor.insert(k, v))?; } - break + + // next key of iterator + accounts.next()? + }; + + if let Some((next_address, _)) = &next_address { + checkpoint.address = Some(*next_address); + checkpoint.from = from_transition; + checkpoint.to = to_transition; + } + + self.save_checkpoint(tx, checkpoint)?; + + if next_address.is_some() { + return Ok(ExecOutput { stage_progress, done: false }) } } else { // Aggregate all transition changesets and and make list of account that have been diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 22d3cac1c..0480e6931 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -1,5 +1,6 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use num_traits::Zero; +use reth_codecs::Compact; use reth_db::{ cursor::DbDupCursorRO, database::Database, @@ -7,7 +8,7 @@ use reth_db::{ tables, transaction::{DbTx, DbTxMut}, }; -use reth_primitives::{keccak256, Address, StorageEntry}; +use reth_primitives::{keccak256, Address, StorageEntry, StorageHashingCheckpoint}; use reth_provider::Transaction; use std::{collections::BTreeMap, fmt::Debug}; use tracing::*; @@ -32,6 +33,43 @@ impl Default for StorageHashingStage { } } +impl StorageHashingStage { + /// Saves the hashing progress + pub fn save_checkpoint( + &mut self, + tx: &Transaction<'_, DB>, + checkpoint: StorageHashingCheckpoint, + ) -> Result<(), StageError> { + debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?checkpoint, "Saving inner storage hashing checkpoint"); + + let mut buf = vec![]; + checkpoint.to_compact(&mut buf); + + Ok(tx.put::(STORAGE_HASHING.0.into(), buf)?) + } + + /// Gets the hashing progress + pub fn get_checkpoint( + &self, + tx: &Transaction<'_, DB>, + ) -> Result { + let buf = + tx.get::(STORAGE_HASHING.0.into())?.unwrap_or_default(); + + if buf.is_empty() { + return Ok(StorageHashingCheckpoint::default()) + } + + let (checkpoint, _) = StorageHashingCheckpoint::from_compact(&buf, buf.len()); + + if checkpoint.address.is_some() { + debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?checkpoint, "Continuing inner storage hashing checkpoint"); + } + + Ok(checkpoint) + } +} + #[async_trait::async_trait] impl Stage for StorageHashingStage { /// Return the id of the stage @@ -57,77 +95,92 @@ impl Stage for StorageHashingStage { // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as // genesis accounts are not in changeset, along with their storages. if to_transition - from_transition > self.clean_threshold || stage_progress == 0 { - tx.clear::()?; - tx.commit()?; + let mut checkpoint = self.get_checkpoint(tx)?; - let mut current_key = None; - let mut current_subkey = None; + if checkpoint.address.is_none() || + // Checkpoint is no longer valid if the range of transitions changed. + // An already hashed storage may have been changed with the new range, and therefore should be hashed again. + checkpoint.to != to_transition || + checkpoint.from != from_transition + { + tx.clear::()?; + + checkpoint = StorageHashingCheckpoint::default(); + self.save_checkpoint(tx, checkpoint)?; + } + + let mut current_key = checkpoint.address.take(); + let mut current_subkey = checkpoint.storage.take(); let mut keccak_address = None; - loop { - let mut hashed_batch = BTreeMap::new(); - let mut remaining = self.commit_threshold as usize; - { - let mut storage = tx.cursor_dup_read::()?; - while !remaining.is_zero() { - hashed_batch.extend( - storage - .walk_dup(current_key, current_subkey)? - .take(remaining) - .map(|res| { - res.map(|(address, slot)| { - // Address caching for the first iteration when current_key - // is None - let keccak_address = - if let Some(keccak_address) = keccak_address { - keccak_address - } else { - keccak256(address) - }; + let mut hashed_batch = BTreeMap::new(); + let mut remaining = self.commit_threshold as usize; + { + let mut storage = tx.cursor_dup_read::()?; + while !remaining.is_zero() { + hashed_batch.extend( + storage + .walk_dup(current_key, current_subkey)? + .take(remaining) + .map(|res| { + res.map(|(address, slot)| { + // Address caching for the first iteration when current_key + // is None + let keccak_address = + if let Some(keccak_address) = keccak_address { + keccak_address + } else { + keccak256(address) + }; - // TODO cache map keccak256(slot.key) ? - ((keccak_address, keccak256(slot.key)), slot.value) - }) + // TODO cache map keccak256(slot.key) ? + ((keccak_address, keccak256(slot.key)), slot.value) }) - .collect::, _>>()?, - ); + }) + .collect::, _>>()?, + ); - remaining = self.commit_threshold as usize - hashed_batch.len(); + remaining = self.commit_threshold as usize - hashed_batch.len(); - if let Some((address, slot)) = storage.next_dup()? { - // There's still some remaining elements on this key, so we need to save - // the cursor position for the next - // iteration + if let Some((address, slot)) = storage.next_dup()? { + // There's still some remaining elements on this key, so we need to save + // the cursor position for the next + // iteration - current_key = Some(address); - current_subkey = Some(slot.key); + current_key = Some(address); + current_subkey = Some(slot.key); + } else { + // Go to the next key + current_key = storage.next_no_dup()?.map(|(key, _)| key); + current_subkey = None; + + // Cache keccak256(address) for the next key if it exists + if let Some(address) = current_key { + keccak_address = Some(keccak256(address)); } else { - // Go to the next key - current_key = storage.next_no_dup()?.map(|(key, _)| key); - current_subkey = None; - - // Cache keccak256(address) for the next key if it exists - if let Some(address) = current_key { - keccak_address = Some(keccak256(address)); - } else { - // We have reached the end of table - break - } + // We have reached the end of table + break } } } + } - // iterate and put presorted hashed slots - hashed_batch.into_iter().try_for_each(|((addr, key), value)| { - tx.put::(addr, StorageEntry { key, value }) - })?; + // iterate and put presorted hashed slots + hashed_batch.into_iter().try_for_each(|((addr, key), value)| { + tx.put::(addr, StorageEntry { key, value }) + })?; - tx.commit()?; + if let Some(address) = ¤t_key { + checkpoint.address = Some(*address); + checkpoint.storage = current_subkey; + checkpoint.from = from_transition; + checkpoint.to = to_transition; + } - // We have reached the end of table - if current_key.is_none() { - break - } + self.save_checkpoint(tx, checkpoint)?; + + if current_key.is_some() { + return Ok(ExecOutput { stage_progress, done: false }) } } else { // Aggregate all transition changesets and and make list of storages that have been @@ -170,7 +223,6 @@ mod tests { stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID, }; - use assert_matches::assert_matches; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, mdbx::{tx::Tx, WriteMap, RW}, @@ -205,18 +257,25 @@ mod tests { runner.seed_execution(input).expect("failed to seed execution"); - let rx = runner.execute(input); + loop { + if let Ok(result) = runner.execute(input).await.unwrap() { + if !result.done { + // Continue from checkpoint + continue + } else { + assert!(result.stage_progress == previous_stage); - // Assert the successful result - let result = rx.await.unwrap(); - assert_matches!( - result, - Ok(ExecOutput { done, stage_progress }) - if done && stage_progress == previous_stage - ); + // Validate the stage execution + assert!( + runner.validate_execution(input, Some(result)).is_ok(), + "execution validation" + ); - // Validate the stage execution - assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); + break + } + } + panic!("Failed execution"); + } } struct StorageHashingTestRunner {