feat(stages): add table checkpoint to AccountHashing and StorageHashing (#1667)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
joshieDo
2023-03-15 02:25:54 +08:00
committed by GitHub
parent 237fd5ce6e
commit 46c96a1466
9 changed files with 317 additions and 136 deletions

1
Cargo.lock generated
View File

@ -5099,6 +5099,7 @@ dependencies = [
"proptest", "proptest",
"rand 0.8.5", "rand 0.8.5",
"rayon", "rayon",
"reth-codecs",
"reth-db", "reth-db",
"reth-downloaders", "reth-downloaders",
"reth-eth-wire", "reth-eth-wire",

View File

@ -36,8 +36,7 @@ pub(crate) async fn dump_hashing_account_stage<DB: Database>(
unwind_and_copy::<DB>(db_tool, from, tip_block_number, &output_db).await?; unwind_and_copy::<DB>(db_tool, from, tip_block_number, &output_db).await?;
if should_run { if should_run {
println!("\n# AccountHashing stage does not support dry run, so it will actually be committing changes."); dry_run(output_db, to, from).await?;
run(output_db, to, from).await?;
} }
Ok(()) Ok(())
@ -69,7 +68,7 @@ async fn unwind_and_copy<DB: Database>(
} }
/// Try to re-execute the stage straightaway /// Try to re-execute the stage straightaway
async fn run( async fn dry_run(
output_db: reth_db::mdbx::Env<reth_db::mdbx::WriteMap>, output_db: reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
to: u64, to: u64,
from: u64, from: u64,
@ -82,15 +81,21 @@ async fn run(
..Default::default() ..Default::default()
}; };
exec_stage let mut exec_output = false;
.execute( while !exec_output {
&mut tx, exec_output = exec_stage
reth_stages::ExecInput { .execute(
previous_stage: Some((StageId("Another"), to)), &mut tx,
stage_progress: Some(from), reth_stages::ExecInput {
}, previous_stage: Some((StageId("Another"), to)),
) stage_progress: Some(from),
.await?; },
)
.await?
.done;
}
tx.drop()?;
info!(target: "reth::cli", "Success."); info!(target: "reth::cli", "Success.");

View File

@ -6,8 +6,9 @@ use crate::{
use eyre::Result; use eyre::Result;
use reth_db::{database::Database, table::TableImporter, tables}; use reth_db::{database::Database, table::TableImporter, tables};
use reth_provider::Transaction; use reth_provider::Transaction;
use reth_stages::{stages::StorageHashingStage, Stage, UnwindInput}; use reth_stages::{stages::StorageHashingStage, Stage, StageId, UnwindInput};
use std::ops::DerefMut; use std::ops::DerefMut;
use tracing::info;
pub(crate) async fn dump_hashing_storage_stage<DB: Database>( pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
db_tool: &mut DbTool<'_, DB>, db_tool: &mut DbTool<'_, DB>,
@ -16,14 +17,14 @@ pub(crate) async fn dump_hashing_storage_stage<DB: Database>(
output_db: &PlatformPath<DbPath>, output_db: &PlatformPath<DbPath>,
should_run: bool, should_run: bool,
) -> Result<()> { ) -> Result<()> {
if should_run {
eyre::bail!("StorageHashing stage does not support dry run.")
}
let (output_db, tip_block_number) = setup::<DB>(from, to, output_db, db_tool)?; let (output_db, tip_block_number) = setup::<DB>(from, to, output_db, db_tool)?;
unwind_and_copy::<DB>(db_tool, from, tip_block_number, &output_db).await?; unwind_and_copy::<DB>(db_tool, from, tip_block_number, &output_db).await?;
if should_run {
dry_run(output_db, to, from).await?;
}
Ok(()) Ok(())
} }
@ -53,3 +54,38 @@ async fn unwind_and_copy<DB: Database>(
Ok(()) Ok(())
} }
/// Try to re-execute the stage straightaway
async fn dry_run(
output_db: reth_db::mdbx::Env<reth_db::mdbx::WriteMap>,
to: u64,
from: u64,
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage.");
let mut tx = Transaction::new(&output_db)?;
let mut exec_stage = StorageHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()
};
let mut exec_output = false;
while !exec_output {
exec_output = exec_stage
.execute(
&mut tx,
reth_stages::ExecInput {
previous_stage: Some((StageId("Another"), to)),
stage_progress: Some(from),
},
)
.await?
.done;
}
tx.drop()?;
info!(target: "reth::cli", "Success.");
Ok(())
}

View File

@ -0,0 +1,42 @@
use crate::{Address, H256};
use reth_codecs::{main_codec, Compact};
/// Saves the progress of MerkleStage
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq)]
pub struct ProofCheckpoint {
/// The next hashed account to insert into the trie.
pub hashed_address: Option<H256>,
/// The next storage entry to insert into the trie.
pub storage_key: Option<H256>,
/// Current intermediate root for `AccountsTrie`.
pub account_root: Option<H256>,
/// Current intermediate storage root from an account.
pub storage_root: Option<H256>,
}
/// Saves the progress of AccountHashing
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq)]
pub struct AccountHashingCheckpoint {
/// The next account to start hashing from
pub address: Option<Address>,
/// Start transition id
pub from: u64,
/// Last transition id
pub to: u64,
}
/// Saves the progress of StorageHashing
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq)]
pub struct StorageHashingCheckpoint {
/// The next account to start hashing from
pub address: Option<Address>,
/// The next storage slot to start hashing from
pub storage: Option<H256>,
/// Start transition id
pub from: u64,
/// Last transition id
pub to: u64,
}

View File

@ -14,6 +14,7 @@ mod bits;
mod block; mod block;
pub mod bloom; pub mod bloom;
mod chain; mod chain;
mod checkpoints;
pub mod constants; pub mod constants;
pub mod contract; pub mod contract;
mod error; mod error;
@ -34,7 +35,6 @@ mod withdrawal;
/// Helper function for calculating Merkle proofs and hashes /// Helper function for calculating Merkle proofs and hashes
pub mod proofs; pub mod proofs;
pub use proofs::ProofCheckpoint;
pub use account::{Account, Bytecode}; pub use account::{Account, Bytecode};
pub use bits::H512; pub use bits::H512;
@ -46,6 +46,7 @@ pub use chain::{
AllGenesisFormats, Chain, ChainInfo, ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI, AllGenesisFormats, Chain, ChainInfo, ChainSpec, ChainSpecBuilder, ForkCondition, GOERLI,
MAINNET, SEPOLIA, MAINNET, SEPOLIA,
}; };
pub use checkpoints::{AccountHashingCheckpoint, ProofCheckpoint, StorageHashingCheckpoint};
pub use constants::{ pub use constants::{
EMPTY_OMMER_ROOT, GOERLI_GENESIS, KECCAK_EMPTY, MAINNET_GENESIS, SEPOLIA_GENESIS, EMPTY_OMMER_ROOT, GOERLI_GENESIS, KECCAK_EMPTY, MAINNET_GENESIS, SEPOLIA_GENESIS,
}; };

View File

@ -1,5 +1,3 @@
use std::collections::HashMap;
use crate::{ use crate::{
keccak256, Address, Bytes, GenesisAccount, Header, Log, Receipt, TransactionSigned, Withdrawal, keccak256, Address, Bytes, GenesisAccount, Header, Log, Receipt, TransactionSigned, Withdrawal,
H256, H256,
@ -8,8 +6,8 @@ use bytes::BytesMut;
use hash_db::Hasher; use hash_db::Hasher;
use hex_literal::hex; use hex_literal::hex;
use plain_hasher::PlainHasher; use plain_hasher::PlainHasher;
use reth_codecs::{main_codec, Compact};
use reth_rlp::Encodable; use reth_rlp::Encodable;
use std::collections::HashMap;
use triehash::{ordered_trie_root, sec_trie_root}; use triehash::{ordered_trie_root, sec_trie_root};
/// Keccak-256 hash of the RLP of an empty list, KEC("\xc0"). /// Keccak-256 hash of the RLP of an empty list, KEC("\xc0").
@ -35,23 +33,8 @@ impl Hasher for KeccakHasher {
} }
} }
/// Saves the progress of MerkleStage
#[main_codec]
#[derive(Default, Debug, Copy, Clone, PartialEq)]
pub struct ProofCheckpoint {
/// The next hashed account to insert into the trie.
pub hashed_address: Option<H256>,
/// The next storage entry to insert into the trie.
pub storage_key: Option<H256>,
/// Current intermediate root for `AccountsTrie`.
pub account_root: Option<H256>,
/// Current intermediate storage root from an account.
pub storage_root: Option<H256>,
}
/// Calculate a transaction root. /// Calculate a transaction root.
/// ///
/// Iterates over the given transactions and the merkle merkle trie root of
/// `(rlp(index), encoded(tx))` pairs. /// `(rlp(index), encoded(tx))` pairs.
pub fn calculate_transaction_root<'a>( pub fn calculate_transaction_root<'a>(
transactions: impl IntoIterator<Item = &'a TransactionSigned>, transactions: impl IntoIterator<Item = &'a TransactionSigned>,

View File

@ -18,6 +18,7 @@ normal = [
reth-primitives = { path = "../primitives" } reth-primitives = { path = "../primitives" }
reth-interfaces = { path = "../interfaces" } reth-interfaces = { path = "../interfaces" }
reth-db = { path = "../storage/db" } reth-db = { path = "../storage/db" }
reth-codecs = { path = "../storage/codecs" }
reth-provider = { path = "../storage/provider" } reth-provider = { path = "../storage/provider" }
reth-metrics-derive = { path = "../metrics/metrics-derive" } reth-metrics-derive = { path = "../metrics/metrics-derive" }

View File

@ -1,11 +1,12 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use reth_codecs::Compact;
use reth_db::{ use reth_db::{
cursor::{DbCursorRO, DbCursorRW}, cursor::{DbCursorRO, DbCursorRW},
database::Database, database::Database,
tables, tables,
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
}; };
use reth_primitives::keccak256; use reth_primitives::{keccak256, AccountHashingCheckpoint};
use reth_provider::Transaction; use reth_provider::Transaction;
use std::{collections::BTreeMap, fmt::Debug, ops::Range}; use std::{collections::BTreeMap, fmt::Debug, ops::Range};
use tracing::*; use tracing::*;
@ -30,6 +31,43 @@ impl Default for AccountHashingStage {
} }
} }
impl AccountHashingStage {
/// Saves the hashing progress
pub fn save_checkpoint<DB: Database>(
&mut self,
tx: &Transaction<'_, DB>,
checkpoint: AccountHashingCheckpoint,
) -> Result<(), StageError> {
debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?checkpoint, "Saving inner account hashing checkpoint");
let mut buf = vec![];
checkpoint.to_compact(&mut buf);
Ok(tx.put::<tables::SyncStageProgress>(ACCOUNT_HASHING.0.into(), buf)?)
}
/// Gets the hashing progress
pub fn get_checkpoint<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
) -> Result<AccountHashingCheckpoint, StageError> {
let buf =
tx.get::<tables::SyncStageProgress>(ACCOUNT_HASHING.0.into())?.unwrap_or_default();
if buf.is_empty() {
return Ok(AccountHashingCheckpoint::default())
}
let (checkpoint, _) = AccountHashingCheckpoint::from_compact(&buf, buf.len());
if checkpoint.address.is_some() {
debug!(target: "sync::stages::account_hashing::exec", checkpoint = ?checkpoint, "Continuing inner account hashing checkpoint");
}
Ok(checkpoint)
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
/// `SeedOpts` provides configuration parameters for calling `AccountHashingStage::seed` /// `SeedOpts` provides configuration parameters for calling `AccountHashingStage::seed`
/// in unit tests or benchmarks to generate an initial database state for running the /// in unit tests or benchmarks to generate an initial database state for running the
@ -137,43 +175,58 @@ impl<DB: Database> Stage<DB> for AccountHashingStage {
// AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
// genesis accounts are not in changeset. // genesis accounts are not in changeset.
if to_transition - from_transition > self.clean_threshold || stage_progress == 0 { if to_transition - from_transition > self.clean_threshold || stage_progress == 0 {
// clear table, load all accounts and hash it let mut checkpoint = self.get_checkpoint(tx)?;
tx.clear::<tables::HashedAccount>()?;
tx.commit()?;
let mut first_key = None; if checkpoint.address.is_none() ||
loop { // Checkpoint is no longer valid if the range of transitions changed.
let next_key = { // An already hashed account may have been changed with the new range, and therefore should be hashed again.
let mut accounts = tx.cursor_read::<tables::PlainAccountState>()?; checkpoint.to != to_transition ||
checkpoint.from != from_transition
{
// clear table, load all accounts and hash it
tx.clear::<tables::HashedAccount>()?;
let hashed_batch = accounts checkpoint = AccountHashingCheckpoint::default();
.walk(first_key)? self.save_checkpoint(tx, checkpoint)?;
.take(self.commit_threshold as usize) }
.map(|res| res.map(|(address, account)| (keccak256(address), account)))
.collect::<Result<BTreeMap<_, _>, _>>()?;
let mut hashed_account_cursor = tx.cursor_write::<tables::HashedAccount>()?; let start_address = checkpoint.address.take();
let next_address = {
let mut accounts = tx.cursor_read::<tables::PlainAccountState>()?;
// iterate and put presorted hashed accounts let hashed_batch = accounts
if first_key.is_none() { .walk(start_address)?
hashed_batch .take(self.commit_threshold as usize)
.into_iter() .map(|res| res.map(|(address, account)| (keccak256(address), account)))
.try_for_each(|(k, v)| hashed_account_cursor.append(k, v))?; .collect::<Result<BTreeMap<_, _>, _>>()?;
} else {
hashed_batch
.into_iter()
.try_for_each(|(k, v)| hashed_account_cursor.insert(k, v))?;
}
// next key of iterator let mut hashed_account_cursor = tx.cursor_write::<tables::HashedAccount>()?;
accounts.next()?
}; // iterate and put presorted hashed accounts
tx.commit()?; if start_address.is_none() {
if let Some((next_key, _)) = next_key { hashed_batch
first_key = Some(next_key); .into_iter()
continue .try_for_each(|(k, v)| hashed_account_cursor.append(k, v))?;
} else {
hashed_batch
.into_iter()
.try_for_each(|(k, v)| hashed_account_cursor.insert(k, v))?;
} }
break
// next key of iterator
accounts.next()?
};
if let Some((next_address, _)) = &next_address {
checkpoint.address = Some(*next_address);
checkpoint.from = from_transition;
checkpoint.to = to_transition;
}
self.save_checkpoint(tx, checkpoint)?;
if next_address.is_some() {
return Ok(ExecOutput { stage_progress, done: false })
} }
} else { } else {
// Aggregate all transition changesets and and make list of account that have been // Aggregate all transition changesets and and make list of account that have been

View File

@ -1,5 +1,6 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput};
use num_traits::Zero; use num_traits::Zero;
use reth_codecs::Compact;
use reth_db::{ use reth_db::{
cursor::DbDupCursorRO, cursor::DbDupCursorRO,
database::Database, database::Database,
@ -7,7 +8,7 @@ use reth_db::{
tables, tables,
transaction::{DbTx, DbTxMut}, transaction::{DbTx, DbTxMut},
}; };
use reth_primitives::{keccak256, Address, StorageEntry}; use reth_primitives::{keccak256, Address, StorageEntry, StorageHashingCheckpoint};
use reth_provider::Transaction; use reth_provider::Transaction;
use std::{collections::BTreeMap, fmt::Debug}; use std::{collections::BTreeMap, fmt::Debug};
use tracing::*; use tracing::*;
@ -32,6 +33,43 @@ impl Default for StorageHashingStage {
} }
} }
impl StorageHashingStage {
/// Saves the hashing progress
pub fn save_checkpoint<DB: Database>(
&mut self,
tx: &Transaction<'_, DB>,
checkpoint: StorageHashingCheckpoint,
) -> Result<(), StageError> {
debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?checkpoint, "Saving inner storage hashing checkpoint");
let mut buf = vec![];
checkpoint.to_compact(&mut buf);
Ok(tx.put::<tables::SyncStageProgress>(STORAGE_HASHING.0.into(), buf)?)
}
/// Gets the hashing progress
pub fn get_checkpoint<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
) -> Result<StorageHashingCheckpoint, StageError> {
let buf =
tx.get::<tables::SyncStageProgress>(STORAGE_HASHING.0.into())?.unwrap_or_default();
if buf.is_empty() {
return Ok(StorageHashingCheckpoint::default())
}
let (checkpoint, _) = StorageHashingCheckpoint::from_compact(&buf, buf.len());
if checkpoint.address.is_some() {
debug!(target: "sync::stages::storage_hashing::exec", checkpoint = ?checkpoint, "Continuing inner storage hashing checkpoint");
}
Ok(checkpoint)
}
}
#[async_trait::async_trait] #[async_trait::async_trait]
impl<DB: Database> Stage<DB> for StorageHashingStage { impl<DB: Database> Stage<DB> for StorageHashingStage {
/// Return the id of the stage /// Return the id of the stage
@ -57,77 +95,92 @@ impl<DB: Database> Stage<DB> for StorageHashingStage {
// AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as // AccountHashing table. Also, if we start from genesis, we need to hash from scratch, as
// genesis accounts are not in changeset, along with their storages. // genesis accounts are not in changeset, along with their storages.
if to_transition - from_transition > self.clean_threshold || stage_progress == 0 { if to_transition - from_transition > self.clean_threshold || stage_progress == 0 {
tx.clear::<tables::HashedStorage>()?; let mut checkpoint = self.get_checkpoint(tx)?;
tx.commit()?;
let mut current_key = None; if checkpoint.address.is_none() ||
let mut current_subkey = None; // Checkpoint is no longer valid if the range of transitions changed.
// An already hashed storage may have been changed with the new range, and therefore should be hashed again.
checkpoint.to != to_transition ||
checkpoint.from != from_transition
{
tx.clear::<tables::HashedStorage>()?;
checkpoint = StorageHashingCheckpoint::default();
self.save_checkpoint(tx, checkpoint)?;
}
let mut current_key = checkpoint.address.take();
let mut current_subkey = checkpoint.storage.take();
let mut keccak_address = None; let mut keccak_address = None;
loop { let mut hashed_batch = BTreeMap::new();
let mut hashed_batch = BTreeMap::new(); let mut remaining = self.commit_threshold as usize;
let mut remaining = self.commit_threshold as usize; {
{ let mut storage = tx.cursor_dup_read::<tables::PlainStorageState>()?;
let mut storage = tx.cursor_dup_read::<tables::PlainStorageState>()?; while !remaining.is_zero() {
while !remaining.is_zero() { hashed_batch.extend(
hashed_batch.extend( storage
storage .walk_dup(current_key, current_subkey)?
.walk_dup(current_key, current_subkey)? .take(remaining)
.take(remaining) .map(|res| {
.map(|res| { res.map(|(address, slot)| {
res.map(|(address, slot)| { // Address caching for the first iteration when current_key
// Address caching for the first iteration when current_key // is None
// is None let keccak_address =
let keccak_address = if let Some(keccak_address) = keccak_address {
if let Some(keccak_address) = keccak_address { keccak_address
keccak_address } else {
} else { keccak256(address)
keccak256(address) };
};
// TODO cache map keccak256(slot.key) ? // TODO cache map keccak256(slot.key) ?
((keccak_address, keccak256(slot.key)), slot.value) ((keccak_address, keccak256(slot.key)), slot.value)
})
}) })
.collect::<Result<BTreeMap<_, _>, _>>()?, })
); .collect::<Result<BTreeMap<_, _>, _>>()?,
);
remaining = self.commit_threshold as usize - hashed_batch.len(); remaining = self.commit_threshold as usize - hashed_batch.len();
if let Some((address, slot)) = storage.next_dup()? { if let Some((address, slot)) = storage.next_dup()? {
// There's still some remaining elements on this key, so we need to save // There's still some remaining elements on this key, so we need to save
// the cursor position for the next // the cursor position for the next
// iteration // iteration
current_key = Some(address); current_key = Some(address);
current_subkey = Some(slot.key); current_subkey = Some(slot.key);
} else {
// Go to the next key
current_key = storage.next_no_dup()?.map(|(key, _)| key);
current_subkey = None;
// Cache keccak256(address) for the next key if it exists
if let Some(address) = current_key {
keccak_address = Some(keccak256(address));
} else { } else {
// Go to the next key // We have reached the end of table
current_key = storage.next_no_dup()?.map(|(key, _)| key); break
current_subkey = None;
// Cache keccak256(address) for the next key if it exists
if let Some(address) = current_key {
keccak_address = Some(keccak256(address));
} else {
// We have reached the end of table
break
}
} }
} }
} }
}
// iterate and put presorted hashed slots // iterate and put presorted hashed slots
hashed_batch.into_iter().try_for_each(|((addr, key), value)| { hashed_batch.into_iter().try_for_each(|((addr, key), value)| {
tx.put::<tables::HashedStorage>(addr, StorageEntry { key, value }) tx.put::<tables::HashedStorage>(addr, StorageEntry { key, value })
})?; })?;
tx.commit()?; if let Some(address) = &current_key {
checkpoint.address = Some(*address);
checkpoint.storage = current_subkey;
checkpoint.from = from_transition;
checkpoint.to = to_transition;
}
// We have reached the end of table self.save_checkpoint(tx, checkpoint)?;
if current_key.is_none() {
break if current_key.is_some() {
} return Ok(ExecOutput { stage_progress, done: false })
} }
} else { } else {
// Aggregate all transition changesets and and make list of storages that have been // Aggregate all transition changesets and and make list of storages that have been
@ -170,7 +223,6 @@ mod tests {
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID, TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID,
}; };
use assert_matches::assert_matches;
use reth_db::{ use reth_db::{
cursor::{DbCursorRO, DbCursorRW}, cursor::{DbCursorRO, DbCursorRW},
mdbx::{tx::Tx, WriteMap, RW}, mdbx::{tx::Tx, WriteMap, RW},
@ -205,18 +257,25 @@ mod tests {
runner.seed_execution(input).expect("failed to seed execution"); runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input); loop {
if let Ok(result) = runner.execute(input).await.unwrap() {
if !result.done {
// Continue from checkpoint
continue
} else {
assert!(result.stage_progress == previous_stage);
// Assert the successful result // Validate the stage execution
let result = rx.await.unwrap(); assert!(
assert_matches!( runner.validate_execution(input, Some(result)).is_ok(),
result, "execution validation"
Ok(ExecOutput { done, stage_progress }) );
if done && stage_progress == previous_stage
);
// Validate the stage execution break
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); }
}
panic!("Failed execution");
}
} }
struct StorageHashingTestRunner { struct StorageHashingTestRunner {