bench: fix reth-stages criterion benchmarks (#7786)

This commit is contained in:
Oliver Nordbjerg
2024-04-22 12:00:46 +02:00
committed by GitHub
parent c0926ba10e
commit ce2f1602a1
3 changed files with 60 additions and 33 deletions

View File

@ -59,7 +59,7 @@ paste.workspace = true
tempfile.workspace = true tempfile.workspace = true
# Stage benchmarks # Stage benchmarks
criterion = { workspace = true, features = ["async_futures"] } criterion = { workspace = true, features = ["async_tokio"] }
# io # io
serde_json.workspace = true serde_json.workspace = true

View File

@ -1,8 +1,5 @@
#![allow(missing_docs)] #![allow(missing_docs)]
use criterion::{ use criterion::{criterion_main, measurement::WallTime, BenchmarkGroup, Criterion};
async_executor::FuturesExecutor, criterion_group, criterion_main, measurement::WallTime,
BenchmarkGroup, Criterion,
};
#[cfg(not(target_os = "windows"))] #[cfg(not(target_os = "windows"))]
use pprof::criterion::{Output, PProfProfiler}; use pprof::criterion::{Output, PProfProfiler};
use reth_config::config::EtlConfig; use reth_config::config::EtlConfig;
@ -15,29 +12,37 @@ use reth_stages::{
}; };
use reth_stages_api::{ExecInput, Stage, StageExt, UnwindInput}; use reth_stages_api::{ExecInput, Stage, StageExt, UnwindInput};
use std::{ops::RangeInclusive, sync::Arc}; use std::{ops::RangeInclusive, sync::Arc};
use tokio::runtime::Runtime;
mod setup; mod setup;
use setup::StageRange; use setup::StageRange;
#[cfg(not(target_os = "windows"))] // Expanded form of `criterion_group!`
criterion_group! { //
name = benches; // This is currently needed to only instantiate the tokio runtime once.
config = Criterion::default().with_profiler(PProfProfiler::new(1000, Output::Flamegraph(None))); fn benches() {
targets = transaction_lookup, account_hashing, senders, merkle #[cfg(not(target_os = "windows"))]
} let mut criterion = Criterion::default()
.with_profiler(PProfProfiler::new(1000, Output::Flamegraph(None)))
.configure_from_args();
#[cfg(target_os = "windows")] let runtime = Runtime::new().unwrap();
criterion_group! { let _guard = runtime.enter();
name = benches;
config = Criterion::default(); #[cfg(target_os = "windows")]
targets = transaction_lookup, account_hashing, senders, merkle let mut criterion = Criterion::default().configure_from_args();
transaction_lookup(&mut criterion, &runtime);
account_hashing(&mut criterion, &runtime);
senders(&mut criterion, &runtime);
merkle(&mut criterion, &runtime);
} }
criterion_main!(benches); criterion_main!(benches);
const DEFAULT_NUM_BLOCKS: u64 = 10_000; const DEFAULT_NUM_BLOCKS: u64 = 10_000;
fn account_hashing(c: &mut Criterion) { fn account_hashing(c: &mut Criterion, runtime: &Runtime) {
let mut group = c.benchmark_group("Stages"); let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times // don't need to run each stage for that many times
@ -46,25 +51,39 @@ fn account_hashing(c: &mut Criterion) {
let num_blocks = 10_000; let num_blocks = 10_000;
let (db, stage, range) = setup::prepare_account_hashing(num_blocks); let (db, stage, range) = setup::prepare_account_hashing(num_blocks);
measure_stage(&mut group, &db, setup::stage_unwind, stage, range, "AccountHashing".to_string()); measure_stage(
runtime,
&mut group,
&db,
setup::stage_unwind,
stage,
range,
"AccountHashing".to_string(),
);
} }
fn senders(c: &mut Criterion) { fn senders(c: &mut Criterion, runtime: &Runtime) {
let mut group = c.benchmark_group("Stages"); let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times // don't need to run each stage for that many times
group.sample_size(10); group.sample_size(10);
let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);
for batch in [1000usize, 10_000, 100_000, 250_000] { let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS };
let stage = SenderRecoveryStage { commit_threshold: DEFAULT_NUM_BLOCKS };
let label = format!("SendersRecovery-batch-{batch}");
measure_stage(&mut group, &db, setup::stage_unwind, stage, 0..=DEFAULT_NUM_BLOCKS, label); measure_stage(
} runtime,
&mut group,
&db,
setup::stage_unwind,
stage,
0..=DEFAULT_NUM_BLOCKS,
"SendersRecovery".to_string(),
);
} }
fn transaction_lookup(c: &mut Criterion) { fn transaction_lookup(c: &mut Criterion, runtime: &Runtime) {
let mut group = c.benchmark_group("Stages"); let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times // don't need to run each stage for that many times
group.sample_size(10); group.sample_size(10);
@ -73,6 +92,7 @@ fn transaction_lookup(c: &mut Criterion) {
let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);
measure_stage( measure_stage(
runtime,
&mut group, &mut group,
&db, &db,
setup::stage_unwind, setup::stage_unwind,
@ -82,7 +102,7 @@ fn transaction_lookup(c: &mut Criterion) {
); );
} }
fn merkle(c: &mut Criterion) { fn merkle(c: &mut Criterion, runtime: &Runtime) {
let mut group = c.benchmark_group("Stages"); let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times // don't need to run each stage for that many times
group.sample_size(10); group.sample_size(10);
@ -91,6 +111,7 @@ fn merkle(c: &mut Criterion) {
let stage = MerkleStage::Both { clean_threshold: u64::MAX }; let stage = MerkleStage::Both { clean_threshold: u64::MAX };
measure_stage( measure_stage(
runtime,
&mut group, &mut group,
&db, &db,
setup::unwind_hashes, setup::unwind_hashes,
@ -101,6 +122,7 @@ fn merkle(c: &mut Criterion) {
let stage = MerkleStage::Both { clean_threshold: 0 }; let stage = MerkleStage::Both { clean_threshold: 0 };
measure_stage( measure_stage(
runtime,
&mut group, &mut group,
&db, &db,
setup::unwind_hashes, setup::unwind_hashes,
@ -111,6 +133,7 @@ fn merkle(c: &mut Criterion) {
} }
fn measure_stage<F, S>( fn measure_stage<F, S>(
runtime: &Runtime,
group: &mut BenchmarkGroup<'_, WallTime>, group: &mut BenchmarkGroup<'_, WallTime>,
db: &TestStageDB, db: &TestStageDB,
setup: F, setup: F,
@ -135,7 +158,7 @@ fn measure_stage<F, S>(
let (input, _) = stage_range; let (input, _) = stage_range;
group.bench_function(label, move |b| { group.bench_function(label, move |b| {
b.to_async(FuturesExecutor).iter_with_setup( b.to_async(runtime).iter_with_setup(
|| { || {
// criterion setup does not support async, so we have to use our own runtime // criterion setup does not support async, so we have to use our own runtime
setup(stage.clone(), db, stage_range) setup(stage.clone(), db, stage_range)

View File

@ -21,6 +21,7 @@ use reth_stages::{
}; };
use reth_trie::StateRoot; use reth_trie::StateRoot;
use std::{collections::BTreeMap, path::Path, sync::Arc}; use std::{collections::BTreeMap, path::Path, sync::Arc};
use tokio::runtime::Handle;
mod constants; mod constants;
@ -37,12 +38,14 @@ pub(crate) fn stage_unwind<S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>>(
) { ) {
let (_, unwind) = range; let (_, unwind) = range;
tokio::runtime::Runtime::new().unwrap().block_on(async { // NOTE(onbjerg): This is unfortunately needed because Criterion does not support async setup
let mut stage = stage.clone(); tokio::task::block_in_place(move || {
let provider = db.factory.provider_rw().unwrap(); Handle::current().block_on(async move {
let mut stage = stage.clone();
let provider = db.factory.provider_rw().unwrap();
// Clear previous run // Clear previous run
stage stage
.unwind(&provider, unwind) .unwind(&provider, unwind)
.map_err(|e| { .map_err(|e| {
format!( format!(
@ -52,7 +55,8 @@ pub(crate) fn stage_unwind<S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>>(
}) })
.unwrap(); .unwrap();
provider.commit().unwrap(); provider.commit().unwrap();
})
}); });
} }