From afebbe64b7153b2f26b4096f7faa870257244d17 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Thu, 30 Nov 2023 02:41:58 -0800 Subject: [PATCH] fix: bench builds (#5635) --- crates/stages/benches/criterion.rs | 16 ++- .../stages/benches/setup/account_hashing.rs | 5 +- crates/stages/benches/setup/mod.rs | 39 ++++--- crates/storage/db/src/lib.rs | 9 +- .../provider/src/providers/database/mod.rs | 101 +++++++++--------- 5 files changed, 88 insertions(+), 82 deletions(-) diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index 2f73ec71f..2a45c2521 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -3,10 +3,9 @@ use criterion::{ BenchmarkGroup, Criterion, }; use pprof::criterion::{Output, PProfProfiler}; -use reth_db::DatabaseEnv; +use reth_db::{test_utils::TempDatabase, DatabaseEnv}; use reth_interfaces::test_utils::TestConsensus; -use reth_primitives::{stage::StageCheckpoint, MAINNET}; -use reth_provider::ProviderFactory; +use reth_primitives::stage::StageCheckpoint; use reth_stages::{ stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage}, test_utils::TestStageDB, @@ -122,22 +121,21 @@ fn measure_stage_with_path( stage_range: StageRange, label: String, ) where - S: Clone + Stage, + S: Clone + Stage>>, F: Fn(S, &TestStageDB, StageRange), { - let tx = TestStageDB::new(&path); + let db = TestStageDB::new(&path); let (input, _) = stage_range; group.bench_function(label, move |b| { b.to_async(FuturesExecutor).iter_with_setup( || { // criterion setup does not support async, so we have to use our own runtime - setup(stage.clone(), &tx, stage_range) + setup(stage.clone(), &db, stage_range) }, |_| async { let mut stage = stage.clone(); - let factory = ProviderFactory::new(tx.factory.db(), MAINNET.clone()); - let provider = factory.provider_rw().unwrap(); + let provider = db.factory.provider_rw().unwrap(); stage .execute_ready(input) .await @@ -156,7 +154,7 @@ fn measure_stage( block_interval: std::ops::Range, label: String, ) where - S: Clone + Stage, + S: Clone + Stage>>, F: Fn(S, &TestStageDB, StageRange), { let path = setup::txs_testdata(block_interval.end); diff --git a/crates/stages/benches/setup/account_hashing.rs b/crates/stages/benches/setup/account_hashing.rs index a94a8250a..175678242 100644 --- a/crates/stages/benches/setup/account_hashing.rs +++ b/crates/stages/benches/setup/account_hashing.rs @@ -33,6 +33,7 @@ fn find_stage_range(db: &Path) -> StageRange { let mut stage_range = None; TestStageDB::new(db) .factory + .db_ref() .view(|tx| { let mut cursor = tx.cursor_read::()?; let from = cursor.first()?.unwrap().0; @@ -62,8 +63,8 @@ fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) { // create the dirs std::fs::create_dir_all(&path).unwrap(); println!("Account Hashing testdata not found, generating to {:?}", path.display()); - let tx = TestStageDB::new(&path); - let provider = tx.provider_rw(); + let db = TestStageDB::new(&path); + let provider = db.factory.provider_rw().unwrap(); let _accounts = AccountHashingStage::seed(&provider, opts); provider.commit().expect("failed to commit"); } diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index 3850ca44f..4b972afcf 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -2,6 +2,7 @@ use itertools::concat; use reth_db::{ cursor::DbCursorRO, tables, + test_utils::TempDatabase, transaction::{DbTx, DbTxMut}, DatabaseEnv, }; @@ -12,8 +13,7 @@ use reth_interfaces::test_utils::{ random_eoa_account_range, }, }; -use reth_primitives::{Account, Address, SealedBlock, B256, MAINNET}; -use reth_provider::ProviderFactory; +use reth_primitives::{Account, Address, SealedBlock, B256, U256}; use reth_stages::{ stages::{AccountHashingStage, StorageHashingStage}, test_utils::TestStageDB, @@ -23,6 +23,7 @@ use reth_trie::StateRoot; use std::{ collections::BTreeMap, path::{Path, PathBuf}, + sync::Arc, }; mod constants; @@ -32,7 +33,7 @@ pub use account_hashing::*; pub(crate) type StageRange = (ExecInput, UnwindInput); -pub(crate) fn stage_unwind>( +pub(crate) fn stage_unwind>>>( stage: S, db: &TestStageDB, range: StageRange, @@ -41,8 +42,7 @@ pub(crate) fn stage_unwind>( tokio::runtime::Runtime::new().unwrap().block_on(async { let mut stage = stage.clone(); - let factory = ProviderFactory::new(db.factory.db(), MAINNET.clone()); - let provider = factory.provider_rw().unwrap(); + let provider = db.factory.provider_rw().unwrap(); // Clear previous run stage @@ -50,7 +50,7 @@ pub(crate) fn stage_unwind>( .map_err(|e| { format!( "{e}\nMake sure your test database at `{}` isn't too old and incompatible with newer stage changes.", - db.path.as_ref().unwrap().display() + db.factory.db_ref().path().display() ) }) .unwrap(); @@ -59,7 +59,7 @@ pub(crate) fn stage_unwind>( }); } -pub(crate) fn unwind_hashes>( +pub(crate) fn unwind_hashes>>>( stage: S, db: &TestStageDB, range: StageRange, @@ -67,8 +67,7 @@ pub(crate) fn unwind_hashes>( let (input, unwind) = range; let mut stage = stage.clone(); - let factory = ProviderFactory::new(db.factory.db(), MAINNET.clone()); - let provider = factory.provider_rw().unwrap(); + let provider = db.factory.provider_rw().unwrap(); StorageHashingStage::default().unwind(&provider, unwind).unwrap(); AccountHashingStage::default().unwind(&provider, unwind).unwrap(); @@ -105,7 +104,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { // create the dirs std::fs::create_dir_all(&path).unwrap(); println!("Transactions testdata not found, generating to {:?}", path.display()); - let tx = TestStageDB::new(&path); + let db = TestStageDB::new(&path); let accounts: BTreeMap = concat([ random_eoa_account_range(&mut rng, 0..n_eoa), @@ -124,11 +123,11 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { key_range.clone(), ); - tx.insert_accounts_and_storages(start_state.clone()).unwrap(); + db.insert_accounts_and_storages(start_state.clone()).unwrap(); // make first block after genesis have valid state root let (root, updates) = - StateRoot::new(tx.provider_rw().tx_ref()).root_with_updates().unwrap(); + StateRoot::new(db.factory.provider_rw().unwrap().tx_ref()).root_with_updates().unwrap(); let second_block = blocks.get_mut(1).unwrap(); let cloned_second = second_block.clone(); let mut updated_header = cloned_second.header.unseal(); @@ -137,8 +136,8 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { let offset = transitions.len() as u64; - tx.insert_changesets(transitions, None).unwrap(); - tx.commit(|tx| updates.flush(tx)).unwrap(); + db.insert_changesets(transitions, None).unwrap(); + db.commit(|tx| Ok(updates.flush(tx)?)).unwrap(); let (transitions, final_state) = random_changeset_range( &mut rng, @@ -148,13 +147,13 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { key_range, ); - tx.insert_changesets(transitions, Some(offset)).unwrap(); + db.insert_changesets(transitions, Some(offset)).unwrap(); - tx.insert_accounts_and_storages(final_state).unwrap(); + db.insert_accounts_and_storages(final_state).unwrap(); // make last block have valid state root let root = { - let tx_mut = tx.provider_rw(); + let tx_mut = db.factory.provider_rw().unwrap(); let root = StateRoot::new(tx_mut.tx_ref()).root().unwrap(); tx_mut.commit().unwrap(); root @@ -166,12 +165,12 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf { updated_header.state_root = root; *last_block = SealedBlock { header: updated_header.seal_slow(), ..cloned_last }; - tx.insert_blocks(blocks.iter(), None).unwrap(); + db.insert_blocks(blocks.iter(), None).unwrap(); // initialize TD - tx.commit(|tx| { + db.commit(|tx| { let (head, _) = tx.cursor_read::()?.first()?.unwrap_or_default(); - tx.put::(head, reth_primitives::U256::from(0).into()) + Ok(tx.put::(head, U256::from(0).into())?) }) .unwrap(); } diff --git a/crates/storage/db/src/lib.rs b/crates/storage/db/src/lib.rs index e813bf0d1..0c8078a69 100644 --- a/crates/storage/db/src/lib.rs +++ b/crates/storage/db/src/lib.rs @@ -182,12 +182,17 @@ pub mod test_utils { } impl TempDatabase { - /// returns the ref of inner db + /// Returns the reference to inner db. pub fn db(&self) -> &DB { self.db.as_ref().unwrap() } - /// returns the inner db + /// Returns the path to the database. + pub fn path(&self) -> &Path { + &self.path + } + + /// Convert temp database into inner. pub fn into_inner_db(mut self) -> DB { self.db.take().unwrap() // take out db to avoid clean path in drop fn } diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 0d1ca70ab..f2f6141b8 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -22,7 +22,7 @@ use reth_primitives::{ use revm::primitives::{BlockEnv, CfgEnv}; use std::{ ops::{RangeBounds, RangeInclusive}, - path::PathBuf, + path::{Path, PathBuf}, sync::Arc, }; use tokio::sync::watch; @@ -46,6 +46,57 @@ pub struct ProviderFactory { snapshot_provider: Option>, } +impl Clone for ProviderFactory { + fn clone(&self) -> Self { + Self { + db: self.db.clone(), + chain_spec: Arc::clone(&self.chain_spec), + snapshot_provider: self.snapshot_provider.clone(), + } + } +} + +impl ProviderFactory {} + +impl ProviderFactory { + /// Create new database provider factory. + pub fn new(db: DB, chain_spec: Arc) -> Self { + Self { db, chain_spec, snapshot_provider: None } + } + + /// Create new database provider by passing a path. [`ProviderFactory`] will own the database + /// instance. + pub fn new_with_database_path>( + path: P, + chain_spec: Arc, + log_level: Option, + ) -> RethResult> { + Ok(ProviderFactory:: { + db: init_db(path, log_level).map_err(|e| RethError::Custom(e.to_string()))?, + chain_spec, + snapshot_provider: None, + }) + } + + /// Database provider that comes with a shared snapshot provider. + pub fn with_snapshots( + mut self, + snapshots_path: PathBuf, + highest_snapshot_tracker: watch::Receiver>, + ) -> Self { + self.snapshot_provider = Some(Arc::new( + SnapshotProvider::new(snapshots_path) + .with_highest_tracker(Some(highest_snapshot_tracker)), + )); + self + } + + /// Returns reference to the underlying database. + pub fn db_ref(&self) -> &DB { + &self.db + } +} + impl ProviderFactory { /// Returns a provider with a created `DbTx` inside, which allows fetching data from the /// database using different types of providers. Example: [`HeaderProvider`] @@ -73,55 +124,7 @@ impl ProviderFactory { Ok(DatabaseProviderRW(provider)) } -} -impl ProviderFactory { - /// create new database provider - pub fn new(db: DB, chain_spec: Arc) -> Self { - Self { db, chain_spec, snapshot_provider: None } - } - - /// database provider comes with a shared snapshot provider - pub fn with_snapshots( - mut self, - snapshots_path: PathBuf, - highest_snapshot_tracker: watch::Receiver>, - ) -> Self { - self.snapshot_provider = Some(Arc::new( - SnapshotProvider::new(snapshots_path) - .with_highest_tracker(Some(highest_snapshot_tracker)), - )); - self - } -} - -impl ProviderFactory { - /// create new database provider by passing a path. [`ProviderFactory`] will own the database - /// instance. - pub fn new_with_database_path>( - path: P, - chain_spec: Arc, - log_level: Option, - ) -> RethResult> { - Ok(ProviderFactory:: { - db: init_db(path, log_level).map_err(|e| RethError::Custom(e.to_string()))?, - chain_spec, - snapshot_provider: None, - }) - } -} - -impl Clone for ProviderFactory { - fn clone(&self) -> Self { - Self { - db: self.db.clone(), - chain_spec: Arc::clone(&self.chain_spec), - snapshot_provider: self.snapshot_provider.clone(), - } - } -} - -impl ProviderFactory { /// Storage provider for latest block pub fn latest(&self) -> ProviderResult { trace!(target: "providers::db", "Returning latest state provider");