From 2884eae0755ea2809c4d70dd39863997ca55dfa4 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 1 Mar 2023 14:20:00 +0800 Subject: [PATCH] perf: bench merkle stage (#1497) --- bin/reth/src/dump_stage/merkle.rs | 137 +++++++++++++++ bin/reth/src/dump_stage/mod.rs | 8 + crates/interfaces/Cargo.toml | 16 +- .../interfaces/src/test_utils/generators.rs | 116 ++++++++++++- crates/primitives/src/storage.rs | 6 + crates/stages/benches/criterion.rs | 138 +++++++++------ .../stages/benches/setup/account_hashing.rs | 10 +- crates/stages/benches/setup/mod.rs | 151 ++++++++++++++-- crates/stages/src/lib.rs | 1 + crates/stages/src/stages/hashing_storage.rs | 41 +++-- crates/stages/src/stages/merkle.rs | 161 +++++------------- crates/stages/src/test_utils/test_db.rs | 151 +++++++++++----- crates/stages/src/trie/mod.rs | 106 ++++++++---- crates/storage/db/benches/utils.rs | 4 + 14 files changed, 766 insertions(+), 280 deletions(-) create mode 100644 bin/reth/src/dump_stage/merkle.rs diff --git a/bin/reth/src/dump_stage/merkle.rs b/bin/reth/src/dump_stage/merkle.rs new file mode 100644 index 000000000..8e4b2b561 --- /dev/null +++ b/bin/reth/src/dump_stage/merkle.rs @@ -0,0 +1,137 @@ +use crate::{ + db::DbTool, + dirs::{DbPath, PlatformPath}, + dump_stage::setup, +}; +use eyre::Result; +use reth_db::{database::Database, table::TableImporter, tables, transaction::DbTx}; +use reth_primitives::MAINNET; +use reth_provider::Transaction; +use reth_stages::{ + stages::{AccountHashingStage, ExecutionStage, MerkleStage, StorageHashingStage}, + DefaultDB, Stage, StageId, UnwindInput, +}; +use std::ops::DerefMut; +use tracing::info; + +pub(crate) async fn dump_merkle_stage( + db_tool: &mut DbTool<'_, DB>, + from: u64, + to: u64, + output_db: &PlatformPath, + should_run: bool, +) -> Result<()> { + let (output_db, tip_block_number) = setup::(from, to, output_db, db_tool)?; + + output_db.update(|tx| { + tx.import_table_with_range::(&db_tool.db.tx()?, Some(from), to) + })??; + + let tx = db_tool.db.tx()?; + let from_transition_rev = + tx.get::(from)?.expect("there should be at least one."); + let to_transition_rev = + tx.get::(to)?.expect("there should be at least one."); + + output_db.update(|tx| { + tx.import_table_with_range::( + &db_tool.db.tx()?, + Some(from_transition_rev), + to_transition_rev, + ) + })??; + + unwind_and_copy::(db_tool, (from, to), tip_block_number, &output_db).await?; + + if should_run { + println!( + "\n# Merkle stage does not support dry run, so it will actually be committing changes." + ); + run(output_db, to, from).await?; + } + + Ok(()) +} + +/// Dry-run an unwind to FROM block and copy the necessary table data to the new database. +async fn unwind_and_copy( + db_tool: &mut DbTool<'_, DB>, + range: (u64, u64), + tip_block_number: u64, + output_db: &reth_db::mdbx::Env, +) -> eyre::Result<()> { + let (from, to) = range; + let mut unwind_tx = Transaction::new(db_tool.db)?; + let unwind = UnwindInput { unwind_to: from, stage_progress: tip_block_number, bad_block: None }; + let execute_input = reth_stages::ExecInput { + previous_stage: Some((StageId("Another"), to)), + stage_progress: Some(from), + }; + + // Unwind hashes all the way to FROM + StorageHashingStage::default().unwind(&mut unwind_tx, unwind).await.unwrap(); + AccountHashingStage::default().unwind(&mut unwind_tx, unwind).await.unwrap(); + + MerkleStage::default_unwind().unwind(&mut unwind_tx, unwind).await?; + + // Bring Plainstate to TO (hashing stage execution requires it) + let mut exec_stage: ExecutionStage<'_, DefaultDB<'_>> = ExecutionStage::from(MAINNET.clone()); + exec_stage.commit_threshold = u64::MAX; + exec_stage + .unwind( + &mut unwind_tx, + UnwindInput { unwind_to: to, stage_progress: tip_block_number, bad_block: None }, + ) + .await?; + + // Bring hashes to TO + AccountHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX } + .execute(&mut unwind_tx, execute_input) + .await + .unwrap(); + StorageHashingStage { clean_threshold: u64::MAX, commit_threshold: u64::MAX } + .execute(&mut unwind_tx, execute_input) + .await + .unwrap(); + + let unwind_inner_tx = unwind_tx.deref_mut(); + + // TODO optimize we can actually just get the entries we need + output_db.update(|tx| tx.import_dupsort::(unwind_inner_tx))??; + + output_db.update(|tx| tx.import_table::(unwind_inner_tx))??; + output_db.update(|tx| tx.import_dupsort::(unwind_inner_tx))??; + output_db.update(|tx| tx.import_table::(unwind_inner_tx))??; + output_db.update(|tx| tx.import_dupsort::(unwind_inner_tx))??; + + unwind_tx.drop()?; + + Ok(()) +} + +/// Try to re-execute the stage straightaway +async fn run( + output_db: reth_db::mdbx::Env, + to: u64, + from: u64, +) -> eyre::Result<()> { + info!(target: "reth::cli", "Executing stage."); + + let mut tx = Transaction::new(&output_db)?; + + MerkleStage::Execution { + clean_threshold: u64::MAX, // Forces updating the root instead of calculating from scratch + } + .execute( + &mut tx, + reth_stages::ExecInput { + previous_stage: Some((StageId("Another"), to)), + stage_progress: Some(from), + }, + ) + .await?; + + info!(target: "reth::cli", "Success."); + + Ok(()) +} diff --git a/bin/reth/src/dump_stage/mod.rs b/bin/reth/src/dump_stage/mod.rs index 0bc3863a3..4eddb3817 100644 --- a/bin/reth/src/dump_stage/mod.rs +++ b/bin/reth/src/dump_stage/mod.rs @@ -8,6 +8,9 @@ use hashing_account::dump_hashing_account_stage; mod execution; use execution::dump_execution_stage; +mod merkle; +use merkle::dump_merkle_stage; + use crate::{ db::DbTool, dirs::{DbPath, PlatformPath}, @@ -45,6 +48,8 @@ pub enum Stages { StorageHashing(StageCommand), /// AccountHashing stage. AccountHashing(StageCommand), + /// Merkle stage. + Merkle(StageCommand), } /// Stage command that takes a range @@ -94,6 +99,9 @@ impl Command { Stages::AccountHashing(StageCommand { output_db, from, to, dry_run, .. }) => { dump_hashing_account_stage(&mut tool, *from, *to, output_db, *dry_run).await? } + Stages::Merkle(StageCommand { output_db, from, to, dry_run, .. }) => { + dump_merkle_stage(&mut tool, *from, *to, output_db, *dry_run).await? + } } Ok(()) diff --git a/crates/interfaces/Cargo.toml b/crates/interfaces/Cargo.toml index 9448840bf..a1b2dfcf2 100644 --- a/crates/interfaces/Cargo.toml +++ b/crates/interfaces/Cargo.toml @@ -10,7 +10,7 @@ readme = "README.md" reth-codecs = { path = "../storage/codecs" } reth-primitives = { path = "../primitives" } reth-rpc-types = { path = "../rpc/rpc-types" } -reth-network-api = { path = "../net/network-api"} +reth-network-api = { path = "../net/network-api" } revm-primitives = "1.0" async-trait = "0.1.57" thiserror = "1.0.37" @@ -26,16 +26,24 @@ futures = "0.3" tokio-stream = "0.1.11" rand = "0.8.5" arbitrary = { version = "1.1.7", features = ["derive"], optional = true } -secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"], optional = true } +secp256k1 = { version = "0.24.2", default-features = false, features = [ + "alloc", + "recovery", + "rand", +], optional = true } modular-bitfield = "0.11.2" [dev-dependencies] reth-db = { path = "../storage/db", features = ["test-utils"] } tokio = { version = "1.21.2", features = ["full"] } tokio-stream = { version = "0.1.11", features = ["sync"] } -arbitrary = { version = "1.1.7", features = ["derive"]} +arbitrary = { version = "1.1.7", features = ["derive"] } hex-literal = "0.3" -secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"] } +secp256k1 = { version = "0.24.2", default-features = false, features = [ + "alloc", + "recovery", + "rand", +] } [features] bench = [] diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index 4d0b7fa07..d7e17aee6 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -1,9 +1,10 @@ -use rand::{distributions::uniform::SampleRange, thread_rng, Rng}; +use rand::{distributions::uniform::SampleRange, seq::SliceRandom, thread_rng, Rng}; use reth_primitives::{ - proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, Transaction, - TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256, + proofs, Account, Address, Bytes, Header, SealedBlock, SealedHeader, Signature, StorageEntry, + Transaction, TransactionKind, TransactionSigned, TxLegacy, H160, H256, U256, }; use secp256k1::{KeyPair, Message as SecpMessage, Secp256k1, SecretKey}; +use std::{collections::BTreeMap, ops::Sub}; // TODO(onbjerg): Maybe we should split this off to its own crate, or move the helpers to the // relevant crates? @@ -165,6 +166,115 @@ pub fn random_block_range( blocks } +type Transition = Vec<(Address, Account, Vec)>; +type AccountState = (Account, Vec); + +/// Generate a range of transitions for given blocks and accounts. +/// Assumes all accounts start with an empty storage. +/// +/// Returns a Vec of account and storage changes for each transition, +/// along with the final state of all accounts and storages. +pub fn random_transition_range<'a, IBlk, IAcc>( + blocks: IBlk, + accounts: IAcc, + n_changes: std::ops::Range, + key_range: std::ops::Range, +) -> (Vec, BTreeMap) +where + IBlk: IntoIterator, + IAcc: IntoIterator))>, +{ + let mut rng = rand::thread_rng(); + let mut state: BTreeMap<_, _> = accounts + .into_iter() + .map(|(addr, (acc, st))| (addr, (acc, st.into_iter().map(|e| (e.key, e.value)).collect()))) + .collect(); + + let valid_addresses = state.keys().copied().collect(); + + let num_transitions: usize = blocks.into_iter().map(|block| block.body.len()).sum(); + let mut transitions = Vec::with_capacity(num_transitions); + + (0..num_transitions).for_each(|i| { + let mut transition = Vec::new(); + let (from, to, mut transfer, new_entries) = + random_account_change(&valid_addresses, n_changes.clone(), key_range.clone()); + + // extract from sending account + let (prev_from, _) = state.get_mut(&from).unwrap(); + transition.push((from, *prev_from, Vec::new())); + + transfer = transfer.min(prev_from.balance).max(U256::from(1)); + prev_from.balance = prev_from.balance.wrapping_sub(transfer); + + // deposit in receiving account and update storage + let (prev_to, storage): &mut (Account, BTreeMap) = state.get_mut(&to).unwrap(); + + let old_entries = new_entries + .into_iter() + .filter_map(|entry| { + let old = if entry.value != U256::ZERO { + storage.insert(entry.key, entry.value) + } else { + let old = storage.remove(&entry.key); + if matches!(old, Some(U256::ZERO)) { + return None + } + old + }; + Some(StorageEntry { value: old.unwrap_or(U256::from(0)), ..entry }) + }) + .collect(); + + transition.push((to, *prev_to, old_entries)); + + prev_to.balance = prev_to.balance.wrapping_add(transfer); + + transitions.push(transition); + }); + + let final_state = state + .into_iter() + .map(|(addr, (acc, storage))| { + (addr, (acc, storage.into_iter().map(|v| v.into()).collect())) + }) + .collect(); + (transitions, final_state) +} + +/// Generate a random account change. +/// +/// Returns two addresses, a balance_change, and a Vec of new storage entries. +pub fn random_account_change( + valid_addresses: &Vec
, + n_changes: std::ops::Range, + key_range: std::ops::Range, +) -> (Address, Address, U256, Vec) { + let mut rng = rand::thread_rng(); + let mut addresses = valid_addresses.choose_multiple(&mut rng, 2).cloned(); + + let addr_from = addresses.next().unwrap_or_else(Address::random); + let addr_to = addresses.next().unwrap_or_else(Address::random); + + let balance_change = U256::from(rng.gen::()); + + let storage_changes = (0..n_changes.sample_single(&mut rng)) + .map(|_| random_storage_entry(key_range.clone())) + .collect(); + + (addr_from, addr_to, balance_change, storage_changes) +} + +/// Generate a random storage change. +pub fn random_storage_entry(key_range: std::ops::Range) -> StorageEntry { + let mut rng = rand::thread_rng(); + + let key = H256::from_low_u64_be(key_range.sample_single(&mut rng)); + let value = U256::from(rng.gen::()); + + StorageEntry { key, value } +} + /// Generate random Externaly Owned Account (EOA account without contract). pub fn random_eoa_account() -> (Address, Account) { let nonce: u64 = rand::random(); diff --git a/crates/primitives/src/storage.rs b/crates/primitives/src/storage.rs index f64554c88..67e027c34 100644 --- a/crates/primitives/src/storage.rs +++ b/crates/primitives/src/storage.rs @@ -12,6 +12,12 @@ pub struct StorageEntry { pub value: U256, } +impl From<(H256, U256)> for StorageEntry { + fn from((key, value): (H256, U256)) -> Self { + StorageEntry { key, value } + } +} + // NOTE: Removing main_codec and manually encode subkey // and compress second part of the value. If we have compression // over whole value (Even SubKey) that would mess up fetching of values with seek_by_key_subkey diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 3fa21ee6f..0a34cbb43 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -5,21 +5,24 @@ use criterion::{ use pprof::criterion::{Output, PProfProfiler}; use reth_db::mdbx::{Env, WriteMap}; use reth_stages::{ - stages::{SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, + stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, test_utils::TestTransaction, ExecInput, Stage, StageId, UnwindInput, }; use std::path::PathBuf; mod setup; +use setup::StageRange; criterion_group! { name = benches; - config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); - targets = transaction_lookup, account_hashing, senders, total_difficulty + config = Criterion::default().with_profiler(PProfProfiler::new(1000, Output::Flamegraph(None))); + targets = transaction_lookup, account_hashing, senders, total_difficulty, merkle } criterion_main!(benches); +const DEFAULT_NUM_BLOCKS: u64 = 10_000; + fn account_hashing(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); @@ -29,32 +32,42 @@ fn account_hashing(c: &mut Criterion) { let num_blocks = 10_000; let (path, stage, execution_range) = setup::prepare_account_hashing(num_blocks); - measure_stage_with_path(&mut group, stage, path, "AccountHashing".to_string(), execution_range); + measure_stage_with_path( + path, + &mut group, + setup::stage_unwind, + stage, + execution_range, + "AccountHashing".to_string(), + ); } fn senders(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); - // don't need to run each stage for that many times group.sample_size(10); for batch in [1000usize, 10_000, 100_000, 250_000] { - let num_blocks = 10_000; - let stage = SenderRecoveryStage { commit_threshold: num_blocks, ..Default::default() }; + let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS }; let label = format!("SendersRecovery-batch-{batch}"); - measure_stage(&mut group, stage, num_blocks, label); + + measure_stage(&mut group, setup::stage_unwind, stage, 0..DEFAULT_NUM_BLOCKS, label); } } fn transaction_lookup(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); - // don't need to run each stage for that many times group.sample_size(10); + let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS); - let num_blocks = 10_000; - let stage = TransactionLookupStage::new(num_blocks); - measure_stage(&mut group, stage, num_blocks, "TransactionLookup".to_string()); + measure_stage( + &mut group, + setup::stage_unwind, + stage, + 0..DEFAULT_NUM_BLOCKS, + "TransactionLookup".to_string(), + ); } fn total_difficulty(c: &mut Criterion) { @@ -63,44 +76,60 @@ fn total_difficulty(c: &mut Criterion) { group.warm_up_time(std::time::Duration::from_millis(2000)); // don't need to run each stage for that many times group.sample_size(10); - - let num_blocks = 10_000; let stage = TotalDifficultyStage::default(); - measure_stage(&mut group, stage, num_blocks, "TotalDifficulty".to_string()); + + measure_stage( + &mut group, + setup::stage_unwind, + stage, + 0..DEFAULT_NUM_BLOCKS, + "TotalDifficulty".to_string(), + ); } -fn measure_stage_with_path>>( - group: &mut BenchmarkGroup, - stage: S, +fn merkle(c: &mut Criterion) { + let mut group = c.benchmark_group("Stages"); + // don't need to run each stage for that many times + group.sample_size(10); + + let stage = MerkleStage::Both { clean_threshold: u64::MAX }; + measure_stage( + &mut group, + setup::unwind_hashes, + stage, + 1..DEFAULT_NUM_BLOCKS + 1, + "Merkle-incremental".to_string(), + ); + + let stage = MerkleStage::Both { clean_threshold: 0 }; + measure_stage( + &mut group, + setup::unwind_hashes, + stage, + 1..DEFAULT_NUM_BLOCKS + 1, + "Merkle-fullhash".to_string(), + ); +} + +fn measure_stage_with_path( path: PathBuf, + group: &mut BenchmarkGroup, + setup: F, + stage: S, + stage_range: StageRange, label: String, - stage_range: (ExecInput, UnwindInput), -) { +) where + S: Clone + Stage>, + F: Fn(S, &TestTransaction, StageRange), +{ let tx = TestTransaction::new(&path); - let (input, unwind) = stage_range; + let (input, _) = stage_range; group.bench_function(label, move |b| { b.to_async(FuturesExecutor).iter_with_setup( || { // criterion setup does not support async, so we have to use our own runtime - tokio::runtime::Runtime::new().unwrap().block_on(async { - let mut stage = stage.clone(); - let mut db_tx = tx.inner(); - - // Clear previous run - stage - .unwind(&mut db_tx, unwind) - .await - .map_err(|e| { - eyre::eyre!(format!( - "{e}\nMake sure your test database at `{}` isn't too old and incompatible with newer stage changes.", - path.display() - )) - }) - .unwrap(); - - db_tx.commit().unwrap(); - }); + setup(stage.clone(), &tx, stage_range) }, |_| async { let mut stage = stage.clone(); @@ -112,25 +141,34 @@ fn measure_stage_with_path>>( }); } -fn measure_stage>>( +fn measure_stage( group: &mut BenchmarkGroup, + setup: F, stage: S, - num_blocks: u64, + block_interval: std::ops::Range, label: String, -) { - let path = setup::txs_testdata(num_blocks as usize); +) where + S: Clone + Stage>, + F: Fn(S, &TestTransaction, StageRange), +{ + let path = setup::txs_testdata(block_interval.end); measure_stage_with_path( - group, - stage, path, - label, + group, + setup, + stage, ( ExecInput { - previous_stage: Some((StageId("Another"), num_blocks)), - ..Default::default() + previous_stage: Some((StageId("Another"), block_interval.end)), + stage_progress: Some(block_interval.start), + }, + UnwindInput { + stage_progress: block_interval.end, + unwind_to: block_interval.start, + bad_block: None, }, - UnwindInput::default(), ), - ) + label, + ); } diff --git a/crates/stages/benches/setup/account_hashing.rs b/crates/stages/benches/setup/account_hashing.rs index ec408c358..033b71da9 100644 --- a/crates/stages/benches/setup/account_hashing.rs +++ b/crates/stages/benches/setup/account_hashing.rs @@ -1,4 +1,4 @@ -use super::constants; +use super::{constants, StageRange}; use reth_db::{ cursor::DbCursorRO, database::Database, tables, transaction::DbTx, Error as DbError, }; @@ -15,9 +15,7 @@ use std::path::{Path, PathBuf}; /// generate its own random data. /// /// Returns the path to the database file, stage and range of stage execution if it exists. -pub fn prepare_account_hashing( - num_blocks: u64, -) -> (PathBuf, AccountHashingStage, (ExecInput, UnwindInput)) { +pub fn prepare_account_hashing(num_blocks: u64) -> (PathBuf, AccountHashingStage, StageRange) { let (path, stage_range) = match std::env::var(constants::ACCOUNT_HASHING_DB) { Ok(db) => { let path = Path::new(&db).to_path_buf(); @@ -30,7 +28,7 @@ pub fn prepare_account_hashing( (path, AccountHashingStage::default(), stage_range) } -fn find_stage_range(db: &Path) -> (ExecInput, UnwindInput) { +fn find_stage_range(db: &Path) -> StageRange { let mut stage_range = None; TestTransaction::new(db) .tx @@ -54,7 +52,7 @@ fn find_stage_range(db: &Path) -> (ExecInput, UnwindInput) { stage_range.expect("Could not find the stage range from the external DB.") } -fn generate_testdata_db(num_blocks: u64) -> (PathBuf, (ExecInput, UnwindInput)) { +fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) { let opts = SeedOpts { blocks: 0..num_blocks + 1, accounts: 0..10_000, diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index 7f7932bfe..34876d031 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -1,34 +1,165 @@ +use itertools::concat; use reth_db::{ cursor::DbCursorRO, + mdbx::{Env, WriteMap}, tables, transaction::{DbTx, DbTxMut}, }; -use reth_interfaces::test_utils::generators::random_block_range; -use reth_primitives::H256; -use reth_stages::test_utils::TestTransaction; -use std::path::{Path, PathBuf}; +use reth_interfaces::test_utils::generators::{ + random_block_range, random_contract_account_range, random_eoa_account_range, + random_transition_range, +}; +use reth_primitives::{Account, Address, SealedBlock, H256}; +use reth_stages::{ + stages::{AccountHashingStage, StorageHashingStage}, + test_utils::TestTransaction, + DBTrieLoader, ExecInput, Stage, UnwindInput, +}; +use std::{ + collections::BTreeMap, + path::{Path, PathBuf}, +}; mod constants; mod account_hashing; pub use account_hashing::*; -// Helper for generating testdata for the sender recovery stage and tx lookup stages (512MB). -// Returns the path to the database file and the number of blocks written. -pub fn txs_testdata(num_blocks: usize) -> PathBuf { +pub(crate) type StageRange = (ExecInput, UnwindInput); + +pub(crate) fn stage_unwind>>( + stage: S, + tx: &TestTransaction, + range: StageRange, +) { + let (_, unwind) = range; + + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut stage = stage.clone(); + let mut db_tx = tx.inner(); + + // Clear previous run + stage + .unwind(&mut db_tx, unwind) + .await + .map_err(|e| { + eyre::eyre!(format!( + "{e}\nMake sure your test database at `{}` isn't too old and incompatible with newer stage changes.", + tx.path.as_ref().unwrap().display() + )) + }) + .unwrap(); + + db_tx.commit().unwrap(); + }); +} + +pub(crate) fn unwind_hashes>>( + stage: S, + tx: &TestTransaction, + range: StageRange, +) { + let (input, unwind) = range; + + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut stage = stage.clone(); + let mut db_tx = tx.inner(); + + StorageHashingStage::default().unwind(&mut db_tx, unwind).await.unwrap(); + AccountHashingStage::default().unwind(&mut db_tx, unwind).await.unwrap(); + + let target_root = db_tx.get_header(unwind.unwind_to).unwrap().state_root; + let _ = db_tx.delete::(target_root, None); + + // Clear previous run + stage.unwind(&mut db_tx, unwind).await.unwrap(); + + AccountHashingStage::default().execute(&mut db_tx, input).await.unwrap(); + StorageHashingStage::default().execute(&mut db_tx, input).await.unwrap(); + + db_tx.commit().unwrap(); + }); +} + +// Helper for generating testdata for the benchmarks. +// Returns the path to the database file. +pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench"); let txs_range = 100..150; + // number of storage changes per transition + let n_changes = 0..3; + + // range of possible values for a storage key + let key_range = 0..300; + + // number of accounts + let n_eoa = 131; + let n_contract = 31; + if !path.exists() { // create the dirs std::fs::create_dir_all(&path).unwrap(); println!("Transactions testdata not found, generating to {:?}", path.display()); let tx = TestTransaction::new(&path); - // This takes a while because it does sig recovery internally - let blocks = random_block_range(0..num_blocks as u64 + 1, H256::zero(), txs_range); + let accounts: BTreeMap = concat([ + random_eoa_account_range(0..n_eoa), + random_contract_account_range(&mut (0..n_contract)), + ]) + .into_iter() + .collect(); + + let mut blocks = random_block_range(0..num_blocks + 1, H256::zero(), txs_range); + + let (transitions, start_state) = random_transition_range( + blocks.iter().take(2), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + n_changes.clone(), + key_range.clone(), + ); + + tx.insert_accounts_and_storages(start_state.clone()).unwrap(); + + // make first block after genesis have valid state root + let root = DBTrieLoader::default().calculate_root(&tx.inner()).unwrap(); + let second_block = blocks.get_mut(1).unwrap(); + let cloned_second = second_block.clone(); + let mut updated_header = cloned_second.header.unseal(); + updated_header.state_root = root; + *second_block = SealedBlock { header: updated_header.seal_slow(), ..cloned_second }; + + let offset = transitions.len() as u64; + + tx.insert_transitions(transitions, None).unwrap(); + + let (transitions, final_state) = + random_transition_range(blocks.iter().skip(2), start_state, n_changes, key_range); + + tx.insert_transitions(transitions, Some(offset)).unwrap(); + + tx.insert_accounts_and_storages(final_state).unwrap(); + + // make last block have valid state root + let root = { + let mut tx_mut = tx.inner(); + let root = DBTrieLoader::default().calculate_root(&tx_mut).unwrap(); + tx_mut.commit().unwrap(); + root + }; + + tx.query(|tx| { + assert!(tx.get::(root)?.is_some()); + Ok(()) + }) + .unwrap(); + + let last_block = blocks.last_mut().unwrap(); + let cloned_last = last_block.clone(); + let mut updated_header = cloned_last.header.unseal(); + updated_header.state_root = root; + *last_block = SealedBlock { header: updated_header.seal_slow(), ..cloned_last }; - // insert all blocks tx.insert_blocks(blocks.iter(), None).unwrap(); // initialize TD diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 59e6c075e..21dd5b57c 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -77,6 +77,7 @@ pub use error::*; pub use id::*; pub use pipeline::*; pub use stage::*; +pub use trie::DBTrieLoader; // NOTE: Needed so the link in the module-level rustdoc works. #[allow(unused_extern_crates)] diff --git a/crates/stages/src/stages/hashing_storage.rs b/crates/stages/src/stages/hashing_storage.rs index 2427d90a5..e7b5d3c49 100644 --- a/crates/stages/src/stages/hashing_storage.rs +++ b/crates/stages/src/stages/hashing_storage.rs @@ -160,33 +160,40 @@ impl Stage for StorageHashingStage { // Assumption we are okay with is that plain state represent // `previous_stage_progress` state. .map(|(address, storage)| { - storage - .into_iter() - .map(|key| { - plain_storage - .seek_by_key_subkey(address, key) - .map(|ret| (keccak256(key), ret.map(|e| e.value))) - }) - .collect::, _>>() - .map(|storage| (keccak256(address), storage)) + let res = ( + keccak256(address), + storage + .into_iter() + .map(|key| { + Ok::, reth_db::Error>( + plain_storage + .seek_by_key_subkey(address, key)? + .filter(|v| v.key == key) + .map(|ret| (keccak256(key), ret.value)), + ) + }) + .collect::>, _>>()? + .into_iter() + .flatten() + .collect::>(), + ); + Ok::<_, reth_db::Error>(res) }) .collect::, _>>()? .into_iter() // Hash the address and key and apply them to HashedStorage (if Storage is None // just remove it); - .try_for_each(|(address, storage)| { + .try_for_each(|(hashed_address, storage)| { storage.into_iter().try_for_each(|(key, val)| -> Result<(), StageError> { if hashed_storage - .seek_by_key_subkey(address, key)? + .seek_by_key_subkey(hashed_address, key)? .filter(|entry| entry.key == key) .is_some() { hashed_storage.delete_current()?; } - if let Some(value) = val { - hashed_storage.upsert(address, StorageEntry { key, value })?; - } + hashed_storage.upsert(hashed_address, StorageEntry { key, value: val })?; Ok(()) }) })?; @@ -232,9 +239,9 @@ impl Stage for StorageHashingStage { .collect::>() .into_iter() // Apply values to HashedStorage (if Value is zero just remove it); - .try_for_each(|((address, key), value)| -> Result<(), StageError> { + .try_for_each(|((hashed_address, key), value)| -> Result<(), StageError> { if hashed_storage - .seek_by_key_subkey(address, key)? + .seek_by_key_subkey(hashed_address, key)? .filter(|entry| entry.key == key) .is_some() { @@ -242,7 +249,7 @@ impl Stage for StorageHashingStage { } if value != U256::ZERO { - hashed_storage.upsert(address, StorageEntry { key, value })?; + hashed_storage.upsert(hashed_address, StorageEntry { key, value })?; } Ok(()) })?; diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index d86470bff..884c137a8 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -14,6 +14,9 @@ pub const MERKLE_EXECUTION: StageId = StageId("MerkleExecute"); /// The [`StageId`] of the merkle hashing unwind stage. pub const MERKLE_UNWIND: StageId = StageId("MerkleUnwind"); +/// The [`StageId`] of the merkle hashing unwind and execution stage. +pub const MERKLE_BOTH: StageId = StageId("MerkleBoth"); + /// The merkle hashing stage uses input from /// [`AccountHashingStage`][crate::stages::AccountHashingStage] and /// [`StorageHashingStage`][crate::stages::AccountHashingStage] to calculate intermediate hashes @@ -35,7 +38,7 @@ pub const MERKLE_UNWIND: StageId = StageId("MerkleUnwind"); /// - [`AccountHashingStage`][crate::stages::AccountHashingStage] /// - [`StorageHashingStage`][crate::stages::StorageHashingStage] /// - [`MerkleStage::Execution`] -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum MerkleStage { /// The execution portion of the merkle stage. Execution { @@ -46,7 +49,9 @@ pub enum MerkleStage { /// The unwind portion of the merkle stage. Unwind, - #[cfg(test)] + /// Able to execute and unwind. Used for tests + #[cfg(any(test, feature = "test-utils"))] + #[allow(missing_docs)] Both { clean_threshold: u64 }, } @@ -69,8 +74,8 @@ impl Stage for MerkleStage { match self { MerkleStage::Execution { .. } => MERKLE_EXECUTION, MerkleStage::Unwind => MERKLE_UNWIND, - #[cfg(test)] - MerkleStage::Both { .. } => unreachable!(), + #[cfg(any(test, feature = "test-utils"))] + MerkleStage::Both { .. } => MERKLE_BOTH, } } @@ -89,7 +94,7 @@ impl Stage for MerkleStage { }) } MerkleStage::Execution { clean_threshold } => *clean_threshold, - #[cfg(test)] + #[cfg(any(test, feature = "test-utils"))] MerkleStage::Both { clean_threshold } => *clean_threshold, }; @@ -156,10 +161,22 @@ impl Stage for MerkleStage { let from_transition = tx.get_block_transition(input.unwind_to)?; let to_transition = tx.get_block_transition(input.stage_progress)?; - loader + let block_root = loader .update_root(tx, current_root, from_transition..to_transition) .map_err(|e| StageError::Fatal(Box::new(e)))?; + if block_root != target_root { + let unwind_to = input.unwind_to; + warn!(target: "sync::stages::merkle::unwind", ?unwind_to, got = ?block_root, expected = ?target_root, "Block's root state failed verification"); + return Err(StageError::Validation { + block: unwind_to, + error: consensus::Error::BodyStateRootDiff { + got: block_root, + expected: target_root, + }, + }) + } + info!(target: "sync::stages::merkle::unwind", "Stage finished"); Ok(UnwindOutput { stage_progress: input.unwind_to }) } @@ -175,12 +192,11 @@ mod tests { use assert_matches::assert_matches; use reth_db::{ cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW}, - models::{AccountBeforeTx, StoredBlockBody}, tables, transaction::{DbTx, DbTxMut}, }; use reth_interfaces::test_utils::generators::{ - random_block, random_block_range, random_contract_account_range, + random_block, random_block_range, random_contract_account_range, random_transition_range, }; use reth_primitives::{keccak256, Account, Address, SealedBlock, StorageEntry, H256, U256}; use std::collections::BTreeMap; @@ -276,12 +292,16 @@ mod tests { let end = input.previous_stage_progress() + 1; let n_accounts = 31; - let mut accounts = random_contract_account_range(&mut (0..n_accounts)); + let accounts = random_contract_account_range(&mut (0..n_accounts)) + .into_iter() + .collect::>(); let SealedBlock { header, body, ommers, withdrawals } = random_block(stage_progress, None, Some(0), None); let mut header = header.unseal(); - header.state_root = self.generate_initial_trie(&accounts)?; + + header.state_root = + self.generate_initial_trie(accounts.iter().map(|(k, v)| (*k, *v)))?; let sealed_head = SealedBlock { header: header.seal_slow(), body, ommers, withdrawals }; let head_hash = sealed_head.hash(); @@ -289,64 +309,18 @@ mod tests { blocks.extend(random_block_range((stage_progress + 1)..end, head_hash, 0..3)); - self.tx.insert_headers(blocks.iter().map(|block| &block.header))?; + self.tx.insert_blocks(blocks.iter(), None)?; - let (mut transition_id, mut tx_id) = (0, 0); + let (transitions, final_state) = random_transition_range( + blocks.iter(), + accounts.into_iter().map(|(addr, acc)| (addr, (acc, Vec::new()))), + 0..3, + 0..256, + ); - let mut storages: BTreeMap> = BTreeMap::new(); + self.tx.insert_transitions(transitions, None)?; - for progress in blocks.iter() { - // Insert last progress data - self.tx.commit(|tx| { - let body = StoredBlockBody { - start_tx_id: tx_id, - tx_count: progress.body.len() as u64, - }; - - progress.body.iter().try_for_each(|transaction| { - tx.put::(transaction.hash(), tx_id)?; - tx.put::(tx_id, transaction.clone())?; - tx.put::(tx_id, transition_id)?; - - // seed account changeset - let (addr, prev_acc) = accounts - .get_mut(rand::random::() % n_accounts as usize) - .unwrap(); - let acc_before_tx = - AccountBeforeTx { address: *addr, info: Some(*prev_acc) }; - - tx.put::(transition_id, acc_before_tx)?; - - prev_acc.nonce += 1; - prev_acc.balance = prev_acc.balance.wrapping_add(U256::from(1)); - - let new_entry = StorageEntry { - key: keccak256([rand::random::()]), - value: U256::from(rand::random::() % 30 + 1), - }; - let storage = storages.entry(*addr).or_default(); - let old_value = storage.entry(new_entry.key).or_default(); - - tx.put::( - (transition_id, *addr).into(), - StorageEntry { key: new_entry.key, value: *old_value }, - )?; - - *old_value = new_entry.value; - - tx_id += 1; - transition_id += 1; - - Ok(()) - })?; - - tx.put::(progress.number, transition_id)?; - tx.put::(progress.number, body) - })?; - } - - self.insert_accounts(&accounts)?; - self.insert_storages(&storages)?; + self.tx.insert_accounts_and_storages(final_state)?; let last_block_number = end - 1; let root = self.state_root()?; @@ -471,9 +445,11 @@ mod tests { pub(crate) fn generate_initial_trie( &self, - accounts: &[(Address, Account)], + accounts: impl IntoIterator, ) -> Result { - self.insert_accounts(accounts)?; + self.tx.insert_accounts_and_storages( + accounts.into_iter().map(|(addr, acc)| (addr, (acc, std::iter::empty()))), + )?; let loader = DBTrieLoader::default(); @@ -485,57 +461,6 @@ mod tests { Ok(root) } - pub(crate) fn insert_accounts( - &self, - accounts: &[(Address, Account)], - ) -> Result<(), TestRunnerError> { - for (addr, acc) in accounts.iter() { - self.tx.commit(|tx| { - tx.put::(*addr, *acc)?; - tx.put::(keccak256(addr), *acc)?; - Ok(()) - })?; - } - - Ok(()) - } - - fn insert_storages( - &self, - storages: &BTreeMap>, - ) -> Result<(), TestRunnerError> { - self.tx - .commit(|tx| { - storages.iter().try_for_each(|(&addr, storage)| { - storage.iter().try_for_each(|(&key, &value)| { - let entry = StorageEntry { key, value }; - tx.put::(addr, entry) - }) - })?; - storages - .iter() - .map(|(addr, storage)| { - ( - keccak256(addr), - storage - .iter() - .filter(|(_, &value)| value != U256::ZERO) - .map(|(key, value)| (keccak256(key), value)), - ) - }) - .collect::>() - .into_iter() - .try_for_each(|(addr, storage)| { - storage.into_iter().try_for_each(|(key, &value)| { - let entry = StorageEntry { key, value }; - tx.put::(addr, entry) - }) - })?; - Ok(()) - }) - .map_err(|e| e.into()) - } - fn check_root(&self, previous_stage_progress: u64) -> Result<(), TestRunnerError> { if previous_stage_progress != 0 { let block_root = diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index a040c6513..bf1a0043f 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -1,19 +1,26 @@ use reth_db::{ - cursor::{DbCursorRO, DbCursorRW}, + cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}, mdbx::{ test_utils::{create_test_db, create_test_db_with_path}, tx::Tx, Env, EnvKind, WriteMap, RW, }, - models::StoredBlockBody, + models::{AccountBeforeTx, BlockNumHash, StoredBlockBody}, table::Table, tables, transaction::{DbTx, DbTxMut}, Error as DbError, }; -use reth_primitives::{BlockNumber, SealedBlock, SealedHeader, U256}; +use reth_primitives::{ + keccak256, Account, Address, BlockNumber, SealedBlock, SealedHeader, StorageEntry, H256, U256, +}; use reth_provider::Transaction; -use std::{borrow::Borrow, path::Path, sync::Arc}; +use std::{ + borrow::Borrow, + collections::BTreeMap, + path::{Path, PathBuf}, + sync::Arc, +}; /// The [TestTransaction] is used as an internal /// database for testing stage implementation. @@ -26,18 +33,22 @@ use std::{borrow::Borrow, path::Path, sync::Arc}; pub struct TestTransaction { /// WriteMap DB pub tx: Arc>, + pub path: Option, } impl Default for TestTransaction { /// Create a new instance of [TestTransaction] fn default() -> Self { - Self { tx: create_test_db::(EnvKind::RW) } + Self { tx: create_test_db::(EnvKind::RW), path: None } } } impl TestTransaction { pub fn new(path: &Path) -> Self { - Self { tx: Arc::new(create_test_db_with_path::(EnvKind::RW, path)) } + Self { + tx: Arc::new(create_test_db_with_path::(EnvKind::RW, path)), + path: Some(path.to_path_buf()), + } } /// Return a database wrapped in [Transaction]. @@ -177,23 +188,20 @@ impl TestTransaction { }) } + /// Inserts a single [SealedHeader] into the corresponding tables of the headers stage. + fn insert_header(tx: &mut Tx<'_, RW, WriteMap>, header: &SealedHeader) -> Result<(), DbError> { + tx.put::(header.number, header.hash())?; + tx.put::(header.hash(), header.number)?; + tx.put::(header.number, header.clone().unseal()) + } + /// Insert ordered collection of [SealedHeader] into the corresponding tables /// that are supposed to be populated by the headers stage. pub fn insert_headers<'a, I>(&self, headers: I) -> Result<(), DbError> where I: Iterator, { - self.commit(|tx| { - let headers = headers.collect::>(); - - for header in headers { - tx.put::(header.number, header.hash())?; - tx.put::(header.hash(), header.number)?; - tx.put::(header.number, header.clone().unseal())?; - } - - Ok(()) - }) + self.commit(|tx| headers.into_iter().try_for_each(|header| Self::insert_header(tx, header))) } /// Inserts total difficulty of headers into the corresponding tables. @@ -204,23 +212,19 @@ impl TestTransaction { I: Iterator, { self.commit(|tx| { - let headers = headers.collect::>(); - let mut td = U256::ZERO; - for header in headers { + headers.into_iter().try_for_each(|header| { + Self::insert_header(tx, header)?; td += header.difficulty; - tx.put::(header.number, td.into())?; - tx.put::(header.number, header.hash())?; - tx.put::(header.hash(), header.number)?; - tx.put::(header.number, header.clone().unseal())?; - } - - Ok(()) + tx.put::(header.number, td.into()) + }) }) } /// Insert ordered collection of [SealedBlock] into corresponding tables. /// Superset functionality of [TestTransaction::insert_headers]. + /// + /// Assumes that there's a single transition for each transaction (i.e. no block rewards). pub fn insert_blocks<'a, I>(&self, blocks: I, tx_offset: Option) -> Result<(), DbError> where I: Iterator, @@ -228,12 +232,8 @@ impl TestTransaction { self.commit(|tx| { let mut current_tx_id = tx_offset.unwrap_or_default(); - for block in blocks { - // Insert into header tables. - tx.put::(block.number, block.hash())?; - tx.put::(block.hash(), block.number)?; - tx.put::(block.number, block.header.clone().unseal())?; - + blocks.into_iter().try_for_each(|block| { + Self::insert_header(tx, &block.header)?; // Insert into body tables. tx.put::( block.number, @@ -242,13 +242,88 @@ impl TestTransaction { tx_count: block.body.len() as u64, }, )?; - for body_tx in block.body.clone() { - tx.put::(current_tx_id, body_tx)?; + block.body.iter().try_for_each(|body_tx| { + tx.put::(current_tx_id, current_tx_id)?; + tx.put::(current_tx_id, body_tx.clone())?; current_tx_id += 1; - } - } + Ok(()) + })?; + tx.put::(block.number, current_tx_id) + }) + }) + } - Ok(()) + /// Insert collection of ([Address], [Account]) into corresponding tables. + pub fn insert_accounts_and_storages(&self, accounts: I) -> Result<(), DbError> + where + I: IntoIterator, + S: IntoIterator, + { + self.commit(|tx| { + accounts.into_iter().try_for_each(|(address, (account, storage))| { + let hashed_address = keccak256(address); + + // Insert into account tables. + tx.put::(address, account)?; + tx.put::(hashed_address, account)?; + + // Insert into storage tables. + storage.into_iter().filter(|e| e.value != U256::ZERO).try_for_each(|entry| { + let hashed_entry = StorageEntry { key: keccak256(entry.key), ..entry }; + + let mut cursor = tx.cursor_dup_write::()?; + if let Some(e) = cursor + .seek_by_key_subkey(address, entry.key)? + .filter(|e| e.key == entry.key) + { + cursor.delete_current()?; + } + cursor.upsert(address, entry)?; + + let mut cursor = tx.cursor_dup_write::()?; + if let Some(e) = cursor + .seek_by_key_subkey(hashed_address, hashed_entry.key)? + .filter(|e| e.key == hashed_entry.key) + { + cursor.delete_current()?; + } + cursor.upsert(hashed_address, hashed_entry)?; + + Ok(()) + }) + }) + }) + } + + /// Insert collection of Vec<([Address], [Account], Vec<[StorageEntry]>)> into + /// corresponding tables. + pub fn insert_transitions( + &self, + transitions: I, + transition_offset: Option, + ) -> Result<(), DbError> + where + I: IntoIterator)>>, + { + let offset = transition_offset.unwrap_or_default(); + self.commit(|tx| { + transitions.into_iter().enumerate().try_for_each(|(transition_id, changes)| { + changes.into_iter().try_for_each(|(address, old_account, old_storage)| { + let tid = offset + transition_id as u64; + // Insert into account changeset. + tx.put::( + tid, + AccountBeforeTx { address, info: Some(old_account) }, + )?; + + let tid_address = (tid, address).into(); + + // Insert into storage changeset. + old_storage.into_iter().try_for_each(|entry| { + tx.put::(tid_address, entry) + }) + }) + }) }) } } diff --git a/crates/stages/src/trie/mod.rs b/crates/stages/src/trie/mod.rs index d5f4cb94b..199df9f84 100644 --- a/crates/stages/src/trie/mod.rs +++ b/crates/stages/src/trie/mod.rs @@ -24,7 +24,7 @@ use std::{ use tracing::*; #[derive(Debug, thiserror::Error)] -pub(crate) enum TrieError { +pub enum TrieError { #[error("Some error occurred: {0}")] InternalError(#[from] cita_trie::TrieError), #[error("The root node wasn't found in the DB")] @@ -54,17 +54,33 @@ where Ok(::get(self, key)?.is_some()) } - fn insert(&self, key: Vec, value: Vec) -> Result<(), Self::Error> { - // Caching and bulk inserting shouldn't be needed, as the data is ordered - self.tx.put::(H256::from_slice(key.as_slice()), value)?; + fn insert(&self, _key: Vec, _value: Vec) -> Result<(), Self::Error> { + unreachable!("Use batch instead."); + } + + // Insert a batch of data into the cache. + fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { + let mut cursor = self.tx.cursor_write::()?; + for (key, value) in keys.into_iter().zip(values.into_iter()) { + cursor.upsert(H256::from_slice(key.as_slice()), value)?; + } Ok(()) } - fn remove(&self, key: &[u8]) -> Result<(), Self::Error> { - self.tx.delete::(H256::from_slice(key), None)?; + fn remove_batch(&self, keys: &[Vec]) -> Result<(), Self::Error> { + let mut cursor = self.tx.cursor_write::()?; + for key in keys { + if cursor.seek_exact(H256::from_slice(key.as_slice()))?.is_some() { + cursor.delete_current()?; + } + } Ok(()) } + fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> { + unreachable!("Use batch instead."); + } + fn flush(&self) -> Result<(), Self::Error> { Ok(()) } @@ -104,31 +120,49 @@ where fn get(&self, key: &[u8]) -> Result>, Self::Error> { let mut cursor = self.tx.cursor_dup_read::()?; - Ok(cursor.seek_by_key_subkey(self.key, H256::from_slice(key))?.map(|entry| entry.node)) + let subkey = H256::from_slice(key); + Ok(cursor + .seek_by_key_subkey(self.key, subkey)? + .filter(|entry| entry.hash == subkey) + .map(|entry| entry.node)) } fn contains(&self, key: &[u8]) -> Result { Ok(::get(self, key)?.is_some()) } - fn insert(&self, key: Vec, value: Vec) -> Result<(), Self::Error> { - // Caching and bulk inserting shouldn't be needed, as the data is ordered - self.tx.put::( - self.key, - StorageTrieEntry { hash: H256::from_slice(key.as_slice()), node: value }, - )?; + fn insert(&self, _key: Vec, _value: Vec) -> Result<(), Self::Error> { + unreachable!("Use batch instead."); + } + + /// Insert a batch of data into the cache. + fn insert_batch(&self, keys: Vec>, values: Vec>) -> Result<(), Self::Error> { + let mut cursor = self.tx.cursor_dup_write::()?; + for (key, node) in keys.into_iter().zip(values.into_iter()) { + let hash = H256::from_slice(key.as_slice()); + if cursor.seek_by_key_subkey(self.key, hash)?.filter(|e| e.hash == hash).is_some() { + cursor.delete_current()?; + } + cursor.upsert(self.key, StorageTrieEntry { hash, node })?; + } Ok(()) } - fn remove(&self, key: &[u8]) -> Result<(), Self::Error> { + fn remove_batch(&self, keys: &[Vec]) -> Result<(), Self::Error> { let mut cursor = self.tx.cursor_dup_write::()?; - cursor - .seek_by_key_subkey(self.key, H256::from_slice(key))? - .map(|_| cursor.delete_current()) - .transpose()?; + for key in keys { + let hash = H256::from_slice(key.as_slice()); + if cursor.seek_by_key_subkey(self.key, hash)?.filter(|e| e.hash == hash).is_some() { + cursor.delete_current()?; + } + } Ok(()) } + fn remove(&self, _key: &[u8]) -> Result<(), Self::Error> { + unreachable!("Use batch instead."); + } + fn flush(&self) -> Result<(), Self::Error> { Ok(()) } @@ -139,7 +173,7 @@ impl<'tx, 'itx, DB: Database> DupHashDatabase<'tx, 'itx, DB> { fn new(tx: &'tx Transaction<'itx, DB>, key: H256) -> Result { let root = EMPTY_ROOT; let mut cursor = tx.cursor_dup_write::()?; - if cursor.seek_by_key_subkey(key, root)?.is_none() { + if cursor.seek_by_key_subkey(key, root)?.filter(|entry| entry.hash == root).is_none() { tx.put::( key, StorageTrieEntry { hash: root, node: [EMPTY_STRING_CODE].to_vec() }, @@ -155,6 +189,7 @@ impl<'tx, 'itx, DB: Database> DupHashDatabase<'tx, 'itx, DB> { } tx.cursor_dup_read::()? .seek_by_key_subkey(key, root)? + .filter(|entry| entry.hash == root) .ok_or(TrieError::MissingRoot(root))?; Ok(Self { tx, key }) } @@ -190,12 +225,14 @@ impl EthAccount { } } +/// Struct for calculating the root of a merkle patricia tree, +/// while populating the database with intermediate hashes. #[derive(Debug, Default)] -pub(crate) struct DBTrieLoader; +pub struct DBTrieLoader; impl DBTrieLoader { /// Calculates the root of the state trie, saving intermediate hashes in the database. - pub(crate) fn calculate_root( + pub fn calculate_root( &self, tx: &Transaction<'_, DB>, ) -> Result { @@ -255,7 +292,7 @@ impl DBTrieLoader { } /// Calculates the root of the state trie by updating an existing trie. - pub(crate) fn update_root( + pub fn update_root( &self, tx: &Transaction<'_, DB>, root: H256, @@ -272,20 +309,21 @@ impl DBTrieLoader { let mut trie = PatriciaTrie::from(Arc::clone(&db), Arc::clone(&hasher), root.as_bytes())?; for (address, changed_storages) in changed_accounts { - if let Some(account) = trie.get(address.as_slice())? { - let storage_root = EthAccount::decode(&mut account.as_slice())?.storage_root; + let storage_root = if let Some(account) = trie.get(address.as_slice())? { trie.remove(address.as_bytes())?; - if let Some((_, account)) = accounts_cursor.seek_exact(address)? { - let value = EthAccount::from_with_root( - account, - self.update_storage_root(tx, storage_root, address, changed_storages)?, - ); + let storage_root = EthAccount::decode(&mut account.as_slice())?.storage_root; + self.update_storage_root(tx, storage_root, address, changed_storages)? + } else { + self.calculate_storage_root(tx, address)? + }; - let mut out = Vec::new(); - Encodable::encode(&value, &mut out); - trie.insert(address.as_bytes().to_vec(), out)?; - } + if let Some((_, account)) = accounts_cursor.seek_exact(address)? { + let value = EthAccount::from_with_root(account, storage_root); + + let mut out = Vec::new(); + Encodable::encode(&value, &mut out); + trie.insert(address.as_bytes().to_vec(), out)?; } } @@ -310,7 +348,7 @@ impl DBTrieLoader { for key in changed_storages { if let Some(StorageEntry { value, .. }) = - storage_cursor.seek_by_key_subkey(address, key)? + storage_cursor.seek_by_key_subkey(address, key)?.filter(|e| e.key == key) { let out = encode_fixed_size(&value).to_vec(); trie.insert(key.as_bytes().to_vec(), out)?; diff --git a/crates/storage/db/benches/utils.rs b/crates/storage/db/benches/utils.rs index 08d88a9ba..29561dfe6 100644 --- a/crates/storage/db/benches/utils.rs +++ b/crates/storage/db/benches/utils.rs @@ -1,3 +1,4 @@ +#[allow(unused_imports)] use reth_db::{ database::Database, mdbx::{test_utils::create_test_db_with_path, EnvKind, WriteMap}, @@ -7,12 +8,15 @@ use reth_db::{ use std::path::Path; /// Path where the DB is initialized for benchmarks. +#[allow(unused)] const BENCH_DB_PATH: &str = "/tmp/reth-benches"; /// Used for RandomRead and RandomWrite benchmarks. +#[allow(unused)] const RANDOM_INDEXES: [usize; 10] = [23, 2, 42, 5, 3, 99, 54, 0, 33, 64]; /// Returns bench vectors in the format: `Vec<(Key, EncodedKey, Value, CompressedValue)>`. +#[allow(unused)] fn load_vectors() -> Vec<(T::Key, bytes::Bytes, T::Value, bytes::Bytes)> where T: Default,