feat: add network bench (#5728)

This commit is contained in:
Matthias Seitz
2023-12-11 15:20:45 +01:00
committed by GitHub
parent b83afd4b76
commit c4c07b86bb
4 changed files with 119 additions and 0 deletions

View File

@ -89,6 +89,10 @@ enr = { workspace = true, features = ["serde", "rust-secp256k1"] }
serial_test.workspace = true
tempfile.workspace = true
## Benchmarks
pprof = { workspace = true, features = ["criterion", "flamegraph"] }
criterion = { workspace = true, features = ["async_tokio", "html_reports"] }
[features]
default = ["serde"]
serde = ["dep:serde", "dep:humantime-serde", "secp256k1/serde", "enr?/serde", "dep:serde_json"]
@ -101,3 +105,8 @@ optimism = [
"reth-network-api/optimism",
"reth-rpc-types/optimism",
]
[[bench]]
name = "bench"
required-features = ["test-utils"]
harness = false

View File

@ -0,0 +1,89 @@
use criterion::*;
use futures::StreamExt;
use pprof::criterion::{Output, PProfProfiler};
use rand::thread_rng;
use reth_network::{test_utils::Testnet, NetworkEvents};
use reth_network_api::Peers;
use reth_primitives::U256;
use reth_provider::test_utils::{ExtendedAccount, MockEthProvider};
use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction};
use std::sync::Arc;
use tokio::{runtime::Runtime as TokioRuntime, sync::mpsc::unbounded_channel};
criterion_group!(
name = brodcast_benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = broadcast_ingress_bench
);
pub fn broadcast_ingress_bench(c: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let mut group = c.benchmark_group("Broadcast Ingress");
group.sample_size(10);
group.bench_function("receive_broadcasts", move |b| {
b.to_async(&rt).iter_with_setup(
|| {
// `b.to_async(rt)` automatically enters the
// runtime context and simply calling `block_on` here will cause the code to panic.
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
let provider = MockEthProvider::default();
let mut net = Testnet::create_with(2, provider.clone()).await;
let mut peer0 = net.remove_peer(0);
let (tx, transactions_rx) = unbounded_channel();
peer0.network_mut().set_transactions(tx);
let mut events0 = peer0.handle().event_listener();
let net = net.with_eth_pool();
let handle = net.spawn();
let peer1 = handle.peers()[0].network().clone();
let peer0_id = peer0.peer_id();
peer1.add_peer(peer0_id, peer0.local_addr());
// await connection
tokio::select! {
_ = events0.next() => {}
_ = &mut peer0 => {}
}
// prepare some transactions
let mut gen = TransactionGenerator::new(thread_rng());
let num_broadcasts = 10;
for _ in 0..num_broadcasts {
for _ in 0..2 {
let mut txs = Vec::new();
let tx = gen.gen_eip1559_pooled();
// ensure the sender has balance
provider.add_account(
tx.sender(),
ExtendedAccount::new(0, U256::from(100_000_000)),
);
txs.push(Arc::new(tx.transaction().clone().into_signed()));
peer1.send_transactions(peer0_id, txs);
}
}
(num_broadcasts, transactions_rx, peer0, handle)
})
})
},
|(num_txs, mut transactions_rx, mut peer0, _handle)| async move {
let mut count = 0;
loop {
tokio::select! {
_ = transactions_rx.recv() => {
count += 1;
if count == num_txs {
break;
}
},
_ = &mut peer0 => {
}
}
}
},
)
});
}
criterion_main!(brodcast_benches);

View File

@ -96,6 +96,14 @@ where
&self.peers
}
/// Remove a peer from the [`Testnet`] and return it.
///
/// # Panics
/// If the index is out of bounds.
pub fn remove_peer(&mut self, index: usize) -> Peer<C, Pool> {
self.peers.remove(index)
}
/// Return a mutable iterator over all peers.
pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C, Pool>> + '_ {
self.peers.iter_mut()
@ -346,6 +354,16 @@ where
self.network.local_addr()
}
/// The [PeerId] of this peer.
pub fn peer_id(&self) -> PeerId {
*self.network.peer_id()
}
/// Returns mutable access to the network.
pub fn network_mut(&mut self) -> &mut NetworkManager<C> {
&mut self.network
}
/// Returns the [`NetworkHandle`] of this peer.
pub fn handle(&self) -> NetworkHandle {
self.network.handle().clone()