fix: bench builds (#5635)

This commit is contained in:
Roman Krasiuk
2023-11-30 02:41:58 -08:00
committed by GitHub
parent 2c5a748c55
commit afebbe64b7
5 changed files with 88 additions and 82 deletions

View File

@ -3,10 +3,9 @@ use criterion::{
BenchmarkGroup, Criterion,
};
use pprof::criterion::{Output, PProfProfiler};
use reth_db::DatabaseEnv;
use reth_db::{test_utils::TempDatabase, DatabaseEnv};
use reth_interfaces::test_utils::TestConsensus;
use reth_primitives::{stage::StageCheckpoint, MAINNET};
use reth_provider::ProviderFactory;
use reth_primitives::stage::StageCheckpoint;
use reth_stages::{
stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage},
test_utils::TestStageDB,
@ -122,22 +121,21 @@ fn measure_stage_with_path<F, S>(
stage_range: StageRange,
label: String,
) where
S: Clone + Stage<DatabaseEnv>,
S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>,
F: Fn(S, &TestStageDB, StageRange),
{
let tx = TestStageDB::new(&path);
let db = TestStageDB::new(&path);
let (input, _) = stage_range;
group.bench_function(label, move |b| {
b.to_async(FuturesExecutor).iter_with_setup(
|| {
// criterion setup does not support async, so we have to use our own runtime
setup(stage.clone(), &tx, stage_range)
setup(stage.clone(), &db, stage_range)
},
|_| async {
let mut stage = stage.clone();
let factory = ProviderFactory::new(tx.factory.db(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();
let provider = db.factory.provider_rw().unwrap();
stage
.execute_ready(input)
.await
@ -156,7 +154,7 @@ fn measure_stage<F, S>(
block_interval: std::ops::Range<u64>,
label: String,
) where
S: Clone + Stage<DatabaseEnv>,
S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>,
F: Fn(S, &TestStageDB, StageRange),
{
let path = setup::txs_testdata(block_interval.end);

View File

@ -33,6 +33,7 @@ fn find_stage_range(db: &Path) -> StageRange {
let mut stage_range = None;
TestStageDB::new(db)
.factory
.db_ref()
.view(|tx| {
let mut cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let from = cursor.first()?.unwrap().0;
@ -62,8 +63,8 @@ fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) {
// create the dirs
std::fs::create_dir_all(&path).unwrap();
println!("Account Hashing testdata not found, generating to {:?}", path.display());
let tx = TestStageDB::new(&path);
let provider = tx.provider_rw();
let db = TestStageDB::new(&path);
let provider = db.factory.provider_rw().unwrap();
let _accounts = AccountHashingStage::seed(&provider, opts);
provider.commit().expect("failed to commit");
}

View File

@ -2,6 +2,7 @@ use itertools::concat;
use reth_db::{
cursor::DbCursorRO,
tables,
test_utils::TempDatabase,
transaction::{DbTx, DbTxMut},
DatabaseEnv,
};
@ -12,8 +13,7 @@ use reth_interfaces::test_utils::{
random_eoa_account_range,
},
};
use reth_primitives::{Account, Address, SealedBlock, B256, MAINNET};
use reth_provider::ProviderFactory;
use reth_primitives::{Account, Address, SealedBlock, B256, U256};
use reth_stages::{
stages::{AccountHashingStage, StorageHashingStage},
test_utils::TestStageDB,
@ -23,6 +23,7 @@ use reth_trie::StateRoot;
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
sync::Arc,
};
mod constants;
@ -32,7 +33,7 @@ pub use account_hashing::*;
pub(crate) type StageRange = (ExecInput, UnwindInput);
pub(crate) fn stage_unwind<S: Clone + Stage<DatabaseEnv>>(
pub(crate) fn stage_unwind<S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>>(
stage: S,
db: &TestStageDB,
range: StageRange,
@ -41,8 +42,7 @@ pub(crate) fn stage_unwind<S: Clone + Stage<DatabaseEnv>>(
tokio::runtime::Runtime::new().unwrap().block_on(async {
let mut stage = stage.clone();
let factory = ProviderFactory::new(db.factory.db(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();
let provider = db.factory.provider_rw().unwrap();
// Clear previous run
stage
@ -50,7 +50,7 @@ pub(crate) fn stage_unwind<S: Clone + Stage<DatabaseEnv>>(
.map_err(|e| {
format!(
"{e}\nMake sure your test database at `{}` isn't too old and incompatible with newer stage changes.",
db.path.as_ref().unwrap().display()
db.factory.db_ref().path().display()
)
})
.unwrap();
@ -59,7 +59,7 @@ pub(crate) fn stage_unwind<S: Clone + Stage<DatabaseEnv>>(
});
}
pub(crate) fn unwind_hashes<S: Clone + Stage<DatabaseEnv>>(
pub(crate) fn unwind_hashes<S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>>(
stage: S,
db: &TestStageDB,
range: StageRange,
@ -67,8 +67,7 @@ pub(crate) fn unwind_hashes<S: Clone + Stage<DatabaseEnv>>(
let (input, unwind) = range;
let mut stage = stage.clone();
let factory = ProviderFactory::new(db.factory.db(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();
let provider = db.factory.provider_rw().unwrap();
StorageHashingStage::default().unwind(&provider, unwind).unwrap();
AccountHashingStage::default().unwind(&provider, unwind).unwrap();
@ -105,7 +104,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
// create the dirs
std::fs::create_dir_all(&path).unwrap();
println!("Transactions testdata not found, generating to {:?}", path.display());
let tx = TestStageDB::new(&path);
let db = TestStageDB::new(&path);
let accounts: BTreeMap<Address, Account> = concat([
random_eoa_account_range(&mut rng, 0..n_eoa),
@ -124,11 +123,11 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
key_range.clone(),
);
tx.insert_accounts_and_storages(start_state.clone()).unwrap();
db.insert_accounts_and_storages(start_state.clone()).unwrap();
// make first block after genesis have valid state root
let (root, updates) =
StateRoot::new(tx.provider_rw().tx_ref()).root_with_updates().unwrap();
StateRoot::new(db.factory.provider_rw().unwrap().tx_ref()).root_with_updates().unwrap();
let second_block = blocks.get_mut(1).unwrap();
let cloned_second = second_block.clone();
let mut updated_header = cloned_second.header.unseal();
@ -137,8 +136,8 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
let offset = transitions.len() as u64;
tx.insert_changesets(transitions, None).unwrap();
tx.commit(|tx| updates.flush(tx)).unwrap();
db.insert_changesets(transitions, None).unwrap();
db.commit(|tx| Ok(updates.flush(tx)?)).unwrap();
let (transitions, final_state) = random_changeset_range(
&mut rng,
@ -148,13 +147,13 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
key_range,
);
tx.insert_changesets(transitions, Some(offset)).unwrap();
db.insert_changesets(transitions, Some(offset)).unwrap();
tx.insert_accounts_and_storages(final_state).unwrap();
db.insert_accounts_and_storages(final_state).unwrap();
// make last block have valid state root
let root = {
let tx_mut = tx.provider_rw();
let tx_mut = db.factory.provider_rw().unwrap();
let root = StateRoot::new(tx_mut.tx_ref()).root().unwrap();
tx_mut.commit().unwrap();
root
@ -166,12 +165,12 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
updated_header.state_root = root;
*last_block = SealedBlock { header: updated_header.seal_slow(), ..cloned_last };
tx.insert_blocks(blocks.iter(), None).unwrap();
db.insert_blocks(blocks.iter(), None).unwrap();
// initialize TD
tx.commit(|tx| {
db.commit(|tx| {
let (head, _) = tx.cursor_read::<tables::Headers>()?.first()?.unwrap_or_default();
tx.put::<tables::HeaderTD>(head, reth_primitives::U256::from(0).into())
Ok(tx.put::<tables::HeaderTD>(head, U256::from(0).into())?)
})
.unwrap();
}

View File

@ -182,12 +182,17 @@ pub mod test_utils {
}
impl<DB> TempDatabase<DB> {
/// returns the ref of inner db
/// Returns the reference to inner db.
pub fn db(&self) -> &DB {
self.db.as_ref().unwrap()
}
/// returns the inner db
/// Returns the path to the database.
pub fn path(&self) -> &Path {
&self.path
}
/// Convert temp database into inner.
pub fn into_inner_db(mut self) -> DB {
self.db.take().unwrap() // take out db to avoid clean path in drop fn
}

View File

@ -22,7 +22,7 @@ use reth_primitives::{
use revm::primitives::{BlockEnv, CfgEnv};
use std::{
ops::{RangeBounds, RangeInclusive},
path::PathBuf,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::watch;
@ -46,6 +46,57 @@ pub struct ProviderFactory<DB> {
snapshot_provider: Option<Arc<SnapshotProvider>>,
}
impl<DB: Clone> Clone for ProviderFactory<DB> {
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
chain_spec: Arc::clone(&self.chain_spec),
snapshot_provider: self.snapshot_provider.clone(),
}
}
}
impl<DB: Database> ProviderFactory<DB> {}
impl<DB> ProviderFactory<DB> {
/// Create new database provider factory.
pub fn new(db: DB, chain_spec: Arc<ChainSpec>) -> Self {
Self { db, chain_spec, snapshot_provider: None }
}
/// Create new database provider by passing a path. [`ProviderFactory`] will own the database
/// instance.
pub fn new_with_database_path<P: AsRef<Path>>(
path: P,
chain_spec: Arc<ChainSpec>,
log_level: Option<LogLevel>,
) -> RethResult<ProviderFactory<DatabaseEnv>> {
Ok(ProviderFactory::<DatabaseEnv> {
db: init_db(path, log_level).map_err(|e| RethError::Custom(e.to_string()))?,
chain_spec,
snapshot_provider: None,
})
}
/// Database provider that comes with a shared snapshot provider.
pub fn with_snapshots(
mut self,
snapshots_path: PathBuf,
highest_snapshot_tracker: watch::Receiver<Option<HighestSnapshots>>,
) -> Self {
self.snapshot_provider = Some(Arc::new(
SnapshotProvider::new(snapshots_path)
.with_highest_tracker(Some(highest_snapshot_tracker)),
));
self
}
/// Returns reference to the underlying database.
pub fn db_ref(&self) -> &DB {
&self.db
}
}
impl<DB: Database> ProviderFactory<DB> {
/// Returns a provider with a created `DbTx` inside, which allows fetching data from the
/// database using different types of providers. Example: [`HeaderProvider`]
@ -73,55 +124,7 @@ impl<DB: Database> ProviderFactory<DB> {
Ok(DatabaseProviderRW(provider))
}
}
impl<DB> ProviderFactory<DB> {
/// create new database provider
pub fn new(db: DB, chain_spec: Arc<ChainSpec>) -> Self {
Self { db, chain_spec, snapshot_provider: None }
}
/// database provider comes with a shared snapshot provider
pub fn with_snapshots(
mut self,
snapshots_path: PathBuf,
highest_snapshot_tracker: watch::Receiver<Option<HighestSnapshots>>,
) -> Self {
self.snapshot_provider = Some(Arc::new(
SnapshotProvider::new(snapshots_path)
.with_highest_tracker(Some(highest_snapshot_tracker)),
));
self
}
}
impl<DB: Database> ProviderFactory<DB> {
/// create new database provider by passing a path. [`ProviderFactory`] will own the database
/// instance.
pub fn new_with_database_path<P: AsRef<std::path::Path>>(
path: P,
chain_spec: Arc<ChainSpec>,
log_level: Option<LogLevel>,
) -> RethResult<ProviderFactory<DatabaseEnv>> {
Ok(ProviderFactory::<DatabaseEnv> {
db: init_db(path, log_level).map_err(|e| RethError::Custom(e.to_string()))?,
chain_spec,
snapshot_provider: None,
})
}
}
impl<DB: Clone> Clone for ProviderFactory<DB> {
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
chain_spec: Arc::clone(&self.chain_spec),
snapshot_provider: self.snapshot_provider.clone(),
}
}
}
impl<DB: Database> ProviderFactory<DB> {
/// Storage provider for latest block
pub fn latest(&self) -> ProviderResult<StateProviderBox> {
trace!(target: "providers::db", "Returning latest state provider");