perf(SendersRecovery): re-use Secp256K1 context for >2x speedup and add benches (#1171)

This commit is contained in:
Georgios Konstantopoulos
2023-02-04 17:09:32 -08:00
committed by GitHub
parent 75692bc5f3
commit 533e7c9cc5
11 changed files with 167 additions and 43 deletions

5
.gitignore vendored
View File

@ -19,5 +19,8 @@ target/
# Generated test-vectors for DB # Generated test-vectors for DB
testdata/micro/db testdata/micro/db
# Generated data for stage benchmarks
crates/stages/testdata
# prometheus data dir # prometheus data dir
data/ data/

2
Cargo.lock generated
View File

@ -895,6 +895,7 @@ dependencies = [
"ciborium", "ciborium",
"clap 3.2.23", "clap 3.2.23",
"criterion-plot", "criterion-plot",
"futures",
"itertools 0.10.5", "itertools 0.10.5",
"lazy_static", "lazy_static",
"num-traits", "num-traits",
@ -4691,6 +4692,7 @@ dependencies = [
"assert_matches", "assert_matches",
"async-trait", "async-trait",
"cita_trie", "cita_trie",
"criterion",
"futures-util", "futures-util",
"hasher", "hasher",
"itertools 0.10.5", "itertools 0.10.5",

View File

@ -4,7 +4,7 @@ pub(crate) mod secp256k1 {
use super::*; use super::*;
use ::secp256k1::{ use ::secp256k1::{
ecdsa::{RecoverableSignature, RecoveryId}, ecdsa::{RecoverableSignature, RecoveryId},
Error, Message, Secp256k1, Error, Message, SECP256K1,
}; };
/// secp256k1 signer recovery /// secp256k1 signer recovery
@ -12,8 +12,7 @@ pub(crate) mod secp256k1 {
let sig = let sig =
RecoverableSignature::from_compact(&sig[0..64], RecoveryId::from_i32(sig[64] as i32)?)?; RecoverableSignature::from_compact(&sig[0..64], RecoveryId::from_i32(sig[64] as i32)?)?;
let secp = Secp256k1::new(); let public = SECP256K1.recover_ecdsa(&Message::from_slice(&msg[..32])?, &sig)?;
let public = secp.recover_ecdsa(&Message::from_slice(&msg[..32])?, &sig)?;
let hash = keccak256(&public.serialize_uncompressed()[1..]); let hash = keccak256(&public.serialize_uncompressed()[1..]);
Ok(Address::from_slice(&hash[12..])) Ok(Address::from_slice(&hash[12..]))
} }

View File

@ -49,6 +49,7 @@ proptest = { version = "1.0", optional = true }
[dev-dependencies] [dev-dependencies]
# reth # reth
reth-primitives = { path = "../primitives", features = ["arbitrary"]}
reth-db = { path = "../storage/db", features = ["test-utils", "mdbx"] } reth-db = { path = "../storage/db", features = ["test-utils", "mdbx"] }
reth-interfaces = { path = "../interfaces", features = ["test-utils"] } reth-interfaces = { path = "../interfaces", features = ["test-utils"] }
reth-downloaders = { path = "../net/downloaders" } reth-downloaders = { path = "../net/downloaders" }
@ -59,9 +60,10 @@ assert_matches = "1.5.0"
rand = "0.8.5" rand = "0.8.5"
paste = "1.0" paste = "1.0"
# arbitrary utils # Stage benchmarks
arbitrary = { version = "1.1.7", features = ["derive"] } criterion = { version = "0.4.0", features = ["async_futures"] }
proptest = { version = "1.0" } proptest = { version = "1.0" }
arbitrary = { version = "1.1.7", features = ["derive"] }
# trie # trie
reth-staged-sync = { path = "../staged-sync" } reth-staged-sync = { path = "../staged-sync" }
@ -70,3 +72,10 @@ triehash = "0.8"
[features] [features]
default = ["serde"] default = ["serde"]
serde = ["dep:serde"] serde = ["dep:serde"]
test-utils = []
[[bench]]
name = "criterion"
harness = false
required-features = ["test-utils"]

View File

@ -0,0 +1,8 @@
# Stage Benchmarks
Test vectors are automatically generated if they cannot be found.
## Usage
```
cargo bench --package reth-stages --bench criterion --features test-utils
```

View File

@ -0,0 +1,103 @@
use criterion::{
async_executor::FuturesExecutor, criterion_group, criterion_main, measurement::WallTime,
BenchmarkGroup, Criterion,
};
use reth_db::mdbx::{Env, WriteMap};
use reth_primitives::H256;
use reth_stages::{
stages::{SenderRecoveryStage, TransactionLookupStage},
test_utils::TestTransaction,
ExecInput, Stage, StageId, UnwindInput,
};
use std::path::{Path, PathBuf};
criterion_group!(benches, tx_lookup, senders);
criterion_main!(benches);
fn senders(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
group.measurement_time(std::time::Duration::from_millis(2000));
group.warm_up_time(std::time::Duration::from_millis(2000));
// don't need to run each stage for that many times
group.sample_size(10);
for batch in [1000usize, 10_000, 100_000, 250_000] {
let num_blocks = 10_000;
let mut stage = SenderRecoveryStage::default();
stage.batch_size = batch;
stage.commit_threshold = num_blocks;
let label = format!("SendersRecovery-batch-{}", batch);
measure_stage(&mut group, stage, num_blocks - 1 /* why do we need - 1 here? */, label);
}
}
fn tx_lookup(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
group.measurement_time(std::time::Duration::from_millis(2000));
group.warm_up_time(std::time::Duration::from_millis(2000));
// don't need to run each stage for that many times
group.sample_size(10);
let num_blocks = 10_000;
let stage = TransactionLookupStage::new(num_blocks);
measure_stage(&mut group, stage, num_blocks, "TransactionLookup".to_string());
}
fn measure_stage<S: Clone + Default + Stage<Env<WriteMap>>>(
group: &mut BenchmarkGroup<WallTime>,
stage: S,
num_blocks: u64,
label: String,
) {
let path = txs_testdata(num_blocks as usize);
let tx = TestTransaction::new(&path);
let mut input = ExecInput::default();
input.previous_stage = Some((StageId("Another"), num_blocks));
group.bench_function(label, move |b| {
b.to_async(FuturesExecutor).iter_with_setup(
|| {
// criterion setup does not support async, so we have to use our own runtime
tokio::runtime::Runtime::new().unwrap().block_on(async {
let mut stage = stage.clone();
let mut db_tx = tx.inner();
// Clear previous run
stage.unwind(&mut db_tx, UnwindInput::default()).await.unwrap();
db_tx.commit().unwrap();
});
},
|_| async {
let mut stage = stage.clone();
let mut db_tx = tx.inner();
stage.execute(&mut db_tx, input.clone()).await.unwrap();
db_tx.commit().unwrap();
},
)
});
}
use reth_interfaces::test_utils::generators::random_block_range;
// Helper for generating testdata for the sender recovery stage and tx lookup stages (512MB).
// Returns the path to the database file and the number of blocks written.
fn txs_testdata(num_blocks: usize) -> PathBuf {
let path = Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata").join("txs-bench");
let txs_range = 100..150;
if !path.exists() {
// create the dirs
std::fs::create_dir_all(&path).unwrap();
println!("Transactions testdata not found, generating to {:?}", path.display());
let tx = TestTransaction::new(&path);
// This takes a while because it does sig recovery internally
let blocks = random_block_range(0..num_blocks as u64, H256::zero(), txs_range);
tx.insert_blocks(blocks.iter(), None).unwrap();
tx.inner().commit().unwrap();
}
path
}

View File

@ -57,8 +57,9 @@ mod stage;
mod trie; mod trie;
mod util; mod util;
#[cfg(test)] #[allow(missing_docs)]
mod test_utils; #[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
/// A re-export of common structs and traits. /// A re-export of common structs and traits.
pub mod prelude; pub mod prelude;

View File

@ -21,7 +21,7 @@ const SENDER_RECOVERY: StageId = StageId("SenderRecovery");
/// The sender recovery stage iterates over existing transactions, /// The sender recovery stage iterates over existing transactions,
/// recovers the transaction signer and stores them /// recovers the transaction signer and stores them
/// in [`TxSenders`][reth_db::tables::TxSenders] table. /// in [`TxSenders`][reth_db::tables::TxSenders] table.
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct SenderRecoveryStage { pub struct SenderRecoveryStage {
/// The size of the chunk for parallel sender recovery /// The size of the chunk for parallel sender recovery
pub batch_size: usize, pub batch_size: usize,
@ -32,7 +32,7 @@ pub struct SenderRecoveryStage {
impl Default for SenderRecoveryStage { impl Default for SenderRecoveryStage {
fn default() -> Self { fn default() -> Self {
Self { batch_size: 1000, commit_threshold: 5000 } Self { batch_size: 250000, commit_threshold: 10000 }
} }
} }

View File

@ -17,7 +17,7 @@ const TRANSACTION_LOOKUP: StageId = StageId("TransactionLookup");
/// This stage walks over the bodies table, and sets the transaction hash of each transaction in a /// This stage walks over the bodies table, and sets the transaction hash of each transaction in a
/// block to the corresponding `TransitionId` at each block. This is written to the /// block to the corresponding `TransitionId` at each block. This is written to the
/// [`tables::TxHashNumber`] This is used for looking up changesets via the transaction hash. /// [`tables::TxHashNumber`] This is used for looking up changesets via the transaction hash.
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct TransactionLookupStage { pub struct TransactionLookupStage {
/// The number of table entries to commit at once /// The number of table entries to commit at once
commit_threshold: u64, commit_threshold: u64,

View File

@ -1,3 +1,4 @@
#![allow(unused)]
use crate::StageId; use crate::StageId;
mod macros; mod macros;
@ -8,7 +9,7 @@ pub(crate) use macros::*;
pub(crate) use runner::{ pub(crate) use runner::{
ExecuteStageTestRunner, StageTestRunner, TestRunnerError, UnwindStageTestRunner, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, UnwindStageTestRunner,
}; };
pub(crate) use test_db::TestTransaction; pub use test_db::TestTransaction;
/// The previous test stage id mock used for testing /// The previous test stage id mock used for testing
pub(crate) const PREV_STAGE_ID: StageId = StageId("PrevStage"); pub(crate) const PREV_STAGE_ID: StageId = StageId("PrevStage");

View File

@ -1,6 +1,10 @@
use reth_db::{ use reth_db::{
cursor::{DbCursorRO, DbCursorRW}, cursor::{DbCursorRO, DbCursorRW},
mdbx::{test_utils::create_test_db, tx::Tx, Env, EnvKind, WriteMap, RW}, mdbx::{
test_utils::{create_test_db, create_test_db_with_path},
tx::Tx,
Env, EnvKind, WriteMap, RW,
},
models::{BlockNumHash, StoredBlockBody}, models::{BlockNumHash, StoredBlockBody},
table::Table, table::Table,
tables, tables,
@ -8,20 +12,21 @@ use reth_db::{
Error as DbError, Error as DbError,
}; };
use reth_primitives::{BlockNumber, SealedBlock, SealedHeader}; use reth_primitives::{BlockNumber, SealedBlock, SealedHeader};
use std::{borrow::Borrow, sync::Arc}; use std::{borrow::Borrow, path::Path, sync::Arc};
use crate::db::Transaction; use crate::db::Transaction;
/// The [TestTransaction] is used as an internal /// The [TestTransaction] is used as an internal
/// database for testing stage implementation. /// database for testing stage implementation.
/// ///
/// ```rust /// ```rust,ignore
/// let tx = TestTransaction::default(); /// let tx = TestTransaction::default();
/// stage.execute(&mut tx.container(), input); /// stage.execute(&mut tx.container(), input);
/// ``` /// ```
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct TestTransaction { pub struct TestTransaction {
tx: Arc<Env<WriteMap>>, /// WriteMap DB
pub tx: Arc<Env<WriteMap>>,
} }
impl Default for TestTransaction { impl Default for TestTransaction {
@ -32,18 +37,22 @@ impl Default for TestTransaction {
} }
impl TestTransaction { impl TestTransaction {
pub fn new(path: &Path) -> Self {
Self { tx: Arc::new(create_test_db_with_path::<WriteMap>(EnvKind::RW, path)) }
}
/// Return a database wrapped in [Transaction]. /// Return a database wrapped in [Transaction].
pub(crate) fn inner(&self) -> Transaction<'_, Env<WriteMap>> { pub fn inner(&self) -> Transaction<'_, Env<WriteMap>> {
Transaction::new(self.tx.borrow()).expect("failed to create db container") Transaction::new(self.tx.borrow()).expect("failed to create db container")
} }
/// Get a pointer to an internal database. /// Get a pointer to an internal database.
pub(crate) fn inner_raw(&self) -> Arc<Env<WriteMap>> { pub fn inner_raw(&self) -> Arc<Env<WriteMap>> {
self.tx.clone() self.tx.clone()
} }
/// Invoke a callback with transaction committing it afterwards /// Invoke a callback with transaction committing it afterwards
pub(crate) fn commit<F>(&self, f: F) -> Result<(), DbError> pub fn commit<F>(&self, f: F) -> Result<(), DbError>
where where
F: FnOnce(&mut Tx<'_, RW, WriteMap>) -> Result<(), DbError>, F: FnOnce(&mut Tx<'_, RW, WriteMap>) -> Result<(), DbError>,
{ {
@ -54,7 +63,7 @@ impl TestTransaction {
} }
/// Invoke a callback with a read transaction /// Invoke a callback with a read transaction
pub(crate) fn query<F, R>(&self, f: F) -> Result<R, DbError> pub fn query<F, R>(&self, f: F) -> Result<R, DbError>
where where
F: FnOnce(&Tx<'_, RW, WriteMap>) -> Result<R, DbError>, F: FnOnce(&Tx<'_, RW, WriteMap>) -> Result<R, DbError>,
{ {
@ -62,15 +71,16 @@ impl TestTransaction {
} }
/// Check if the table is empty /// Check if the table is empty
pub(crate) fn table_is_empty<T: Table>(&self) -> Result<bool, DbError> { pub fn table_is_empty<T: Table>(&self) -> Result<bool, DbError> {
self.query(|tx| { self.query(|tx| {
let last = tx.cursor_read::<T>()?.last()?; let last = tx.cursor_read::<T>()?.last()?;
Ok(last.is_none()) Ok(last.is_none())
}) })
} }
#[allow(clippy::type_complexity)]
/// Return full table as Vec /// Return full table as Vec
pub(crate) fn table<T: Table>(&self) -> Result<Vec<(T::Key, T::Value)>, DbError> pub fn table<T: Table>(&self) -> Result<Vec<(T::Key, T::Value)>, DbError>
where where
T::Key: Default + Ord, T::Key: Default + Ord,
{ {
@ -82,12 +92,12 @@ impl TestTransaction {
/// Map a collection of values and store them in the database. /// Map a collection of values and store them in the database.
/// This function commits the transaction before exiting. /// This function commits the transaction before exiting.
/// ///
/// ```rust /// ```rust,ignore
/// let tx = TestTransaction::default(); /// let tx = TestTransaction::default();
/// tx.map_put::<Table, _, _>(&items, |item| item)?; /// tx.map_put::<Table, _, _>(&items, |item| item)?;
/// ``` /// ```
#[allow(dead_code)] #[allow(dead_code)]
pub(crate) fn map_put<T, S, F>(&self, values: &[S], mut map: F) -> Result<(), DbError> pub fn map_put<T, S, F>(&self, values: &[S], mut map: F) -> Result<(), DbError>
where where
T: Table, T: Table,
S: Clone, S: Clone,
@ -106,16 +116,12 @@ impl TestTransaction {
/// optional last element that was stored. /// optional last element that was stored.
/// This function commits the transaction before exiting. /// This function commits the transaction before exiting.
/// ///
/// ```rust /// ```rust,ignore
/// let tx = TestTransaction::default(); /// let tx = TestTransaction::default();
/// tx.transform_append::<Table, _, _>(&items, |prev, item| prev.unwrap_or_default() + item)?; /// tx.transform_append::<Table, _, _>(&items, |prev, item| prev.unwrap_or_default() + item)?;
/// ``` /// ```
#[allow(dead_code)] #[allow(dead_code)]
pub(crate) fn transform_append<T, S, F>( pub fn transform_append<T, S, F>(&self, values: &[S], mut transform: F) -> Result<(), DbError>
&self,
values: &[S],
mut transform: F,
) -> Result<(), DbError>
where where
T: Table, T: Table,
<T as Table>::Value: Clone, <T as Table>::Value: Clone,
@ -135,11 +141,7 @@ impl TestTransaction {
/// Check that there is no table entry above a given /// Check that there is no table entry above a given
/// number by [Table::Key] /// number by [Table::Key]
pub(crate) fn ensure_no_entry_above<T, F>( pub fn ensure_no_entry_above<T, F>(&self, num: u64, mut selector: F) -> Result<(), DbError>
&self,
num: u64,
mut selector: F,
) -> Result<(), DbError>
where where
T: Table, T: Table,
F: FnMut(T::Key) -> BlockNumber, F: FnMut(T::Key) -> BlockNumber,
@ -155,7 +157,7 @@ impl TestTransaction {
/// Check that there is no table entry above a given /// Check that there is no table entry above a given
/// number by [Table::Value] /// number by [Table::Value]
pub(crate) fn ensure_no_entry_above_by_value<T, F>( pub fn ensure_no_entry_above_by_value<T, F>(
&self, &self,
num: u64, num: u64,
mut selector: F, mut selector: F,
@ -176,7 +178,7 @@ impl TestTransaction {
/// Insert ordered collection of [SealedHeader] into the corresponding tables /// Insert ordered collection of [SealedHeader] into the corresponding tables
/// that are supposed to be populated by the headers stage. /// that are supposed to be populated by the headers stage.
pub(crate) fn insert_headers<'a, I>(&self, headers: I) -> Result<(), DbError> pub fn insert_headers<'a, I>(&self, headers: I) -> Result<(), DbError>
where where
I: Iterator<Item = &'a SealedHeader>, I: Iterator<Item = &'a SealedHeader>,
{ {
@ -197,11 +199,7 @@ impl TestTransaction {
/// Insert ordered collection of [SealedBlock] into corresponding tables. /// Insert ordered collection of [SealedBlock] into corresponding tables.
/// Superset functionality of [TestTransaction::insert_headers]. /// Superset functionality of [TestTransaction::insert_headers].
pub(crate) fn insert_blocks<'a, I>( pub fn insert_blocks<'a, I>(&self, blocks: I, tx_offset: Option<u64>) -> Result<(), DbError>
&self,
blocks: I,
tx_offset: Option<u64>,
) -> Result<(), DbError>
where where
I: Iterator<Item = &'a SealedBlock>, I: Iterator<Item = &'a SealedBlock>,
{ {