From 533e7c9cc546c26f34472af2433bc092429f8913 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Sat, 4 Feb 2023 17:09:32 -0800 Subject: [PATCH] perf(SendersRecovery): re-use Secp256K1 context for >2x speedup and add benches (#1171) --- .gitignore | 5 +- Cargo.lock | 2 + crates/primitives/src/transaction/util.rs | 5 +- crates/stages/Cargo.toml | 13 ++- crates/stages/benches/README.md | 8 ++ crates/stages/benches/criterion.rs | 103 ++++++++++++++++++++ crates/stages/src/lib.rs | 5 +- crates/stages/src/stages/sender_recovery.rs | 4 +- crates/stages/src/stages/tx_lookup.rs | 2 +- crates/stages/src/test_utils/mod.rs | 3 +- crates/stages/src/test_utils/test_db.rs | 60 ++++++------ 11 files changed, 167 insertions(+), 43 deletions(-) create mode 100644 crates/stages/benches/README.md create mode 100644 crates/stages/benches/criterion.rs diff --git a/.gitignore b/.gitignore index d2b552757..400b66a6b 100644 --- a/.gitignore +++ b/.gitignore @@ -19,5 +19,8 @@ target/ # Generated test-vectors for DB testdata/micro/db +# Generated data for stage benchmarks +crates/stages/testdata + # prometheus data dir -data/ \ No newline at end of file +data/ diff --git a/Cargo.lock b/Cargo.lock index 439f4c88a..c30903ea4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -895,6 +895,7 @@ dependencies = [ "ciborium", "clap 3.2.23", "criterion-plot", + "futures", "itertools 0.10.5", "lazy_static", "num-traits", @@ -4691,6 +4692,7 @@ dependencies = [ "assert_matches", "async-trait", "cita_trie", + "criterion", "futures-util", "hasher", "itertools 0.10.5", diff --git a/crates/primitives/src/transaction/util.rs b/crates/primitives/src/transaction/util.rs index 58bfc7427..5bb11d41c 100644 --- a/crates/primitives/src/transaction/util.rs +++ b/crates/primitives/src/transaction/util.rs @@ -4,7 +4,7 @@ pub(crate) mod secp256k1 { use super::*; use ::secp256k1::{ ecdsa::{RecoverableSignature, RecoveryId}, - Error, Message, Secp256k1, + Error, Message, SECP256K1, }; /// secp256k1 signer recovery @@ -12,8 +12,7 @@ pub(crate) mod secp256k1 { let sig = RecoverableSignature::from_compact(&sig[0..64], RecoveryId::from_i32(sig[64] as i32)?)?; - let secp = Secp256k1::new(); - let public = secp.recover_ecdsa(&Message::from_slice(&msg[..32])?, &sig)?; + let public = SECP256K1.recover_ecdsa(&Message::from_slice(&msg[..32])?, &sig)?; let hash = keccak256(&public.serialize_uncompressed()[1..]); Ok(Address::from_slice(&hash[12..])) } diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index 2b44416e3..2b41be94a 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -49,6 +49,7 @@ proptest = { version = "1.0", optional = true } [dev-dependencies] # reth +reth-primitives = { path = "../primitives", features = ["arbitrary"]} reth-db = { path = "../storage/db", features = ["test-utils", "mdbx"] } reth-interfaces = { path = "../interfaces", features = ["test-utils"] } reth-downloaders = { path = "../net/downloaders" } @@ -59,9 +60,10 @@ assert_matches = "1.5.0" rand = "0.8.5" paste = "1.0" -# arbitrary utils -arbitrary = { version = "1.1.7", features = ["derive"] } +# Stage benchmarks +criterion = { version = "0.4.0", features = ["async_futures"] } proptest = { version = "1.0" } +arbitrary = { version = "1.1.7", features = ["derive"] } # trie reth-staged-sync = { path = "../staged-sync" } @@ -70,3 +72,10 @@ triehash = "0.8" [features] default = ["serde"] serde = ["dep:serde"] +test-utils = [] + + +[[bench]] +name = "criterion" +harness = false +required-features = ["test-utils"] diff --git a/crates/stages/benches/README.md b/crates/stages/benches/README.md new file mode 100644 index 000000000..085994614 --- /dev/null +++ b/crates/stages/benches/README.md @@ -0,0 +1,8 @@ +# Stage Benchmarks + +Test vectors are automatically generated if they cannot be found. + +## Usage +``` +cargo bench --package reth-stages --bench criterion --features test-utils +``` \ No newline at end of file diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs new file mode 100644 index 000000000..8df2f666c --- /dev/null +++ b/crates/stages/benches/criterion.rs @@ -0,0 +1,103 @@ +use criterion::{ + async_executor::FuturesExecutor, criterion_group, criterion_main, measurement::WallTime, + BenchmarkGroup, Criterion, +}; +use reth_db::mdbx::{Env, WriteMap}; +use reth_primitives::H256; +use reth_stages::{ + stages::{SenderRecoveryStage, TransactionLookupStage}, + test_utils::TestTransaction, + ExecInput, Stage, StageId, UnwindInput, +}; +use std::path::{Path, PathBuf}; + +criterion_group!(benches, tx_lookup, senders); +criterion_main!(benches); + +fn senders(c: &mut Criterion) { + let mut group = c.benchmark_group("Stages"); + group.measurement_time(std::time::Duration::from_millis(2000)); + group.warm_up_time(std::time::Duration::from_millis(2000)); + // don't need to run each stage for that many times + group.sample_size(10); + + for batch in [1000usize, 10_000, 100_000, 250_000] { + let num_blocks = 10_000; + let mut stage = SenderRecoveryStage::default(); + stage.batch_size = batch; + stage.commit_threshold = num_blocks; + let label = format!("SendersRecovery-batch-{}", batch); + measure_stage(&mut group, stage, num_blocks - 1 /* why do we need - 1 here? */, label); + } +} + +fn tx_lookup(c: &mut Criterion) { + let mut group = c.benchmark_group("Stages"); + group.measurement_time(std::time::Duration::from_millis(2000)); + group.warm_up_time(std::time::Duration::from_millis(2000)); + // don't need to run each stage for that many times + group.sample_size(10); + + let num_blocks = 10_000; + let stage = TransactionLookupStage::new(num_blocks); + measure_stage(&mut group, stage, num_blocks, "TransactionLookup".to_string()); +} + +fn measure_stage>>( + group: &mut BenchmarkGroup, + stage: S, + num_blocks: u64, + label: String, +) { + let path = txs_testdata(num_blocks as usize); + let tx = TestTransaction::new(&path); + + let mut input = ExecInput::default(); + input.previous_stage = Some((StageId("Another"), num_blocks)); + + group.bench_function(label, move |b| { + b.to_async(FuturesExecutor).iter_with_setup( + || { + // criterion setup does not support async, so we have to use our own runtime + tokio::runtime::Runtime::new().unwrap().block_on(async { + let mut stage = stage.clone(); + let mut db_tx = tx.inner(); + + // Clear previous run + stage.unwind(&mut db_tx, UnwindInput::default()).await.unwrap(); + + db_tx.commit().unwrap(); + }); + }, + |_| async { + let mut stage = stage.clone(); + let mut db_tx = tx.inner(); + stage.execute(&mut db_tx, input.clone()).await.unwrap(); + db_tx.commit().unwrap(); + }, + ) + }); +} + +use reth_interfaces::test_utils::generators::random_block_range; + +// Helper for generating testdata for the sender recovery stage and tx lookup stages (512MB). +// Returns the path to the database file and the number of blocks written. +fn txs_testdata(num_blocks: usize) -> PathBuf { + let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench"); + let txs_range = 100..150; + + if !path.exists() { + // create the dirs + std::fs::create_dir_all(&path).unwrap(); + println!("Transactions testdata not found, generating to {:?}", path.display()); + let tx = TestTransaction::new(&path); + + // This takes a while because it does sig recovery internally + let blocks = random_block_range(0..num_blocks as u64, H256::zero(), txs_range); + tx.insert_blocks(blocks.iter(), None).unwrap(); + tx.inner().commit().unwrap(); + } + + path +} diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 29203e1bf..211b2bd12 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -57,8 +57,9 @@ mod stage; mod trie; mod util; -#[cfg(test)] -mod test_utils; +#[allow(missing_docs)] +#[cfg(any(test, feature = "test-utils"))] +pub mod test_utils; /// A re-export of common structs and traits. pub mod prelude; diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 6f18da67e..8f8264877 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -21,7 +21,7 @@ const SENDER_RECOVERY: StageId = StageId("SenderRecovery"); /// The sender recovery stage iterates over existing transactions, /// recovers the transaction signer and stores them /// in [`TxSenders`][reth_db::tables::TxSenders] table. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct SenderRecoveryStage { /// The size of the chunk for parallel sender recovery pub batch_size: usize, @@ -32,7 +32,7 @@ pub struct SenderRecoveryStage { impl Default for SenderRecoveryStage { fn default() -> Self { - Self { batch_size: 1000, commit_threshold: 5000 } + Self { batch_size: 250000, commit_threshold: 10000 } } } diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 19e33ff62..807815496 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -17,7 +17,7 @@ const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup"); /// This stage walks over the bodies table, and sets the transaction hash of each transaction in a /// block to the corresponding `TransitionId` at each block. This is written to the /// [`tables::TxHashNumber`] This is used for looking up changesets via the transaction hash. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct TransactionLookupStage { /// The number of table entries to commit at once commit_threshold: u64, diff --git a/crates/stages/src/test_utils/mod.rs b/crates/stages/src/test_utils/mod.rs index 4b82df6cd..7ec5b2e84 100644 --- a/crates/stages/src/test_utils/mod.rs +++ b/crates/stages/src/test_utils/mod.rs @@ -1,3 +1,4 @@ +#![allow(unused)] use crate::StageId; mod macros; @@ -8,7 +9,7 @@ pub(crate) use macros::*; pub(crate) use runner::{ ExecuteStageTestRunner, StageTestRunner, TestRunnerError, UnwindStageTestRunner, }; -pub(crate) use test_db::TestTransaction; +pub use test_db::TestTransaction; /// The previous test stage id mock used for testing pub(crate) const PREV_STAGE_ID: StageId = StageId("PrevStage"); diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 998ce4dff..29d065db2 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -1,6 +1,10 @@ use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, - mdbx::{test_utils::create_test_db, tx::Tx, Env, EnvKind, WriteMap, RW}, + mdbx::{ + test_utils::{create_test_db, create_test_db_with_path}, + tx::Tx, + Env, EnvKind, WriteMap, RW, + }, models::{BlockNumHash, StoredBlockBody}, table::Table, tables, @@ -8,20 +12,21 @@ use reth_db::{ Error as DbError, }; use reth_primitives::{BlockNumber, SealedBlock, SealedHeader}; -use std::{borrow::Borrow, sync::Arc}; +use std::{borrow::Borrow, path::Path, sync::Arc}; use crate::db::Transaction; /// The [TestTransaction] is used as an internal /// database for testing stage implementation. /// -/// ```rust +/// ```rust,ignore /// let tx = TestTransaction::default(); /// stage.execute(&mut tx.container(), input); /// ``` #[derive(Debug)] -pub(crate) struct TestTransaction { - tx: Arc>, +pub struct TestTransaction { + /// WriteMap DB + pub tx: Arc>, } impl Default for TestTransaction { @@ -32,18 +37,22 @@ impl Default for TestTransaction { } impl TestTransaction { + pub fn new(path: &Path) -> Self { + Self { tx: Arc::new(create_test_db_with_path::(EnvKind::RW, path)) } + } + /// Return a database wrapped in [Transaction]. - pub(crate) fn inner(&self) -> Transaction<'_, Env> { + pub fn inner(&self) -> Transaction<'_, Env> { Transaction::new(self.tx.borrow()).expect("failed to create db container") } /// Get a pointer to an internal database. - pub(crate) fn inner_raw(&self) -> Arc> { + pub fn inner_raw(&self) -> Arc> { self.tx.clone() } /// Invoke a callback with transaction committing it afterwards - pub(crate) fn commit(&self, f: F) -> Result<(), DbError> + pub fn commit(&self, f: F) -> Result<(), DbError> where F: FnOnce(&mut Tx<'_, RW, WriteMap>) -> Result<(), DbError>, { @@ -54,7 +63,7 @@ impl TestTransaction { } /// Invoke a callback with a read transaction - pub(crate) fn query(&self, f: F) -> Result + pub fn query(&self, f: F) -> Result where F: FnOnce(&Tx<'_, RW, WriteMap>) -> Result, { @@ -62,15 +71,16 @@ impl TestTransaction { } /// Check if the table is empty - pub(crate) fn table_is_empty(&self) -> Result { + pub fn table_is_empty(&self) -> Result { self.query(|tx| { let last = tx.cursor_read::()?.last()?; Ok(last.is_none()) }) } + #[allow(clippy::type_complexity)] /// Return full table as Vec - pub(crate) fn table(&self) -> Result, DbError> + pub fn table(&self) -> Result, DbError> where T::Key: Default + Ord, { @@ -82,12 +92,12 @@ impl TestTransaction { /// Map a collection of values and store them in the database. /// This function commits the transaction before exiting. /// - /// ```rust + /// ```rust,ignore /// let tx = TestTransaction::default(); /// tx.map_put::(&items, |item| item)?; /// ``` #[allow(dead_code)] - pub(crate) fn map_put(&self, values: &[S], mut map: F) -> Result<(), DbError> + pub fn map_put(&self, values: &[S], mut map: F) -> Result<(), DbError> where T: Table, S: Clone, @@ -106,16 +116,12 @@ impl TestTransaction { /// optional last element that was stored. /// This function commits the transaction before exiting. /// - /// ```rust + /// ```rust,ignore /// let tx = TestTransaction::default(); /// tx.transform_append::(&items, |prev, item| prev.unwrap_or_default() + item)?; /// ``` #[allow(dead_code)] - pub(crate) fn transform_append( - &self, - values: &[S], - mut transform: F, - ) -> Result<(), DbError> + pub fn transform_append(&self, values: &[S], mut transform: F) -> Result<(), DbError> where T: Table, ::Value: Clone, @@ -135,11 +141,7 @@ impl TestTransaction { /// Check that there is no table entry above a given /// number by [Table::Key] - pub(crate) fn ensure_no_entry_above( - &self, - num: u64, - mut selector: F, - ) -> Result<(), DbError> + pub fn ensure_no_entry_above(&self, num: u64, mut selector: F) -> Result<(), DbError> where T: Table, F: FnMut(T::Key) -> BlockNumber, @@ -155,7 +157,7 @@ impl TestTransaction { /// Check that there is no table entry above a given /// number by [Table::Value] - pub(crate) fn ensure_no_entry_above_by_value( + pub fn ensure_no_entry_above_by_value( &self, num: u64, mut selector: F, @@ -176,7 +178,7 @@ impl TestTransaction { /// Insert ordered collection of [SealedHeader] into the corresponding tables /// that are supposed to be populated by the headers stage. - pub(crate) fn insert_headers<'a, I>(&self, headers: I) -> Result<(), DbError> + pub fn insert_headers<'a, I>(&self, headers: I) -> Result<(), DbError> where I: Iterator, { @@ -197,11 +199,7 @@ impl TestTransaction { /// Insert ordered collection of [SealedBlock] into corresponding tables. /// Superset functionality of [TestTransaction::insert_headers]. - pub(crate) fn insert_blocks<'a, I>( - &self, - blocks: I, - tx_offset: Option, - ) -> Result<(), DbError> + pub fn insert_blocks<'a, I>(&self, blocks: I, tx_offset: Option) -> Result<(), DbError> where I: Iterator, {