diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index a4eff9d3a..9e8c59226 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -141,6 +141,11 @@ test-utils = [ ] [[bench]] -name = "bench" +name = "broadcast" +required-features = ["test-utils"] +harness = false + +[[bench]] +name = "tx_manager_hash_fetching" required-features = ["test-utils"] harness = false diff --git a/crates/net/network/benches/bench.rs b/crates/net/network/benches/broadcast.rs similarity index 100% rename from crates/net/network/benches/bench.rs rename to crates/net/network/benches/broadcast.rs diff --git a/crates/net/network/benches/tx_manager_hash_fetching.rs b/crates/net/network/benches/tx_manager_hash_fetching.rs new file mode 100644 index 000000000..1ab9b8fc4 --- /dev/null +++ b/crates/net/network/benches/tx_manager_hash_fetching.rs @@ -0,0 +1,97 @@ +#![allow(missing_docs)] +use alloy_primitives::U256; +use criterion::*; +use pprof::criterion::{Output, PProfProfiler}; +use rand::thread_rng; +use reth_network::{ + test_utils::Testnet, + transactions::{ + TransactionFetcherConfig, TransactionPropagationMode::Max, TransactionsManagerConfig, + }, +}; +use reth_provider::test_utils::{ExtendedAccount, MockEthProvider}; +use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction, TransactionPool}; +use tokio::runtime::Runtime as TokioRuntime; + +criterion_group!( + name = tx_fetch_benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = tx_fetch_bench +); + +pub fn tx_fetch_bench(c: &mut Criterion) { + let rt = TokioRuntime::new().unwrap(); + + let mut group = c.benchmark_group("Transaction Fetch"); + group.sample_size(10); + + group.bench_function("fetch_transactions", |b| { + b.to_async(&rt).iter_with_setup( + || { + tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { + let tx_manager_config = TransactionsManagerConfig { + propagation_mode: Max(0), + transaction_fetcher_config: TransactionFetcherConfig { + max_inflight_requests: 1, + ..Default::default() + }, + ..Default::default() + }; + + let provider = MockEthProvider::default(); + let num_peers = 10; + let net = Testnet::create_with(num_peers, provider.clone()).await; + + // install request handlers + let net = net.with_eth_pool_config(tx_manager_config); + let handle = net.spawn(); + + // connect all the peers first + handle.connect_peers().await; + + let listening_peer = &handle.peers()[num_peers - 1]; + let listening_peer_tx_listener = + listening_peer.pool().unwrap().pending_transactions_listener(); + + let num_tx_per_peer = 10; + + for i in 1..num_peers { + let peer = &handle.peers()[i]; + let peer_pool = peer.pool().unwrap(); + + for _ in 0..num_tx_per_peer { + let mut gen = TransactionGenerator::new(thread_rng()); + let tx = gen.gen_eip1559_pooled(); + let sender = tx.sender(); + provider.add_account( + sender, + ExtendedAccount::new(0, U256::from(100_000_000)), + ); + peer_pool.add_external_transaction(tx.clone()).await.unwrap(); + } + } + + // Total expected transactions + let total_expected_tx = num_tx_per_peer * (num_peers - 1); + + (listening_peer_tx_listener, total_expected_tx) + }) + }) + }, + |(mut listening_peer_tx_listener, total_expected_tx)| async move { + let mut received_tx = 0; + while listening_peer_tx_listener.recv().await.is_some() { + received_tx += 1; + if received_tx >= total_expected_tx { + break; + } + } + }, + ) + }); + + group.finish(); +} + +criterion_main!(tx_fetch_benches); diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 3a50d890e..7fd9f690f 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -194,6 +194,27 @@ where )) }) } + + /// Installs an eth pool on each peer with custom transaction manager config + pub fn with_eth_pool_config( + self, + tx_manager_config: TransactionsManagerConfig, + ) -> Testnet> { + self.map_pool(|peer| { + let blob_store = InMemoryBlobStore::default(); + let pool = TransactionValidationTaskExecutor::eth( + peer.client.clone(), + MAINNET.clone(), + blob_store.clone(), + TokioTaskExecutor::default(), + ); + + peer.map_transactions_manager_with_config( + EthTransactionPool::eth_pool(pool, blob_store, Default::default()), + tx_manager_config.clone(), + ) + }) + } } impl Testnet @@ -463,6 +484,36 @@ where secret_key, } } + + /// Map transactions manager with custom config + pub fn map_transactions_manager_with_config

( + self, + pool: P, + config: TransactionsManagerConfig, + ) -> Peer + where + P: TransactionPool, + { + let Self { mut network, request_handler, client, secret_key, .. } = self; + let (tx, rx) = unbounded_channel(); + network.set_transactions(tx); + + let transactions_manager = TransactionsManager::new( + network.handle().clone(), + pool.clone(), + rx, + config, // Use provided config + ); + + Peer { + network, + request_handler, + transactions_manager: Some(transactions_manager), + pool: Some(pool), + client, + secret_key, + } + } } impl Peer @@ -682,7 +733,7 @@ impl NetworkEventStream { /// Awaits the next `num` events for an established session pub async fn take_session_established(&mut self, mut num: usize) -> Vec { if num == 0 { - return Vec::new() + return Vec::new(); } let mut peers = Vec::with_capacity(num); while let Some(ev) = self.inner.next().await { @@ -691,7 +742,7 @@ impl NetworkEventStream { peers.push(peer_id); num -= 1; if num == 0 { - return peers + return peers; } } _ => continue, diff --git a/crates/net/network/tests/it/main.rs b/crates/net/network/tests/it/main.rs index ede445510..dd98c9624 100644 --- a/crates/net/network/tests/it/main.rs +++ b/crates/net/network/tests/it/main.rs @@ -6,6 +6,7 @@ mod multiplex; mod requests; mod session; mod startup; +mod transaction_hash_fetching; mod txgossip; const fn main() {} diff --git a/crates/net/network/tests/it/transaction_hash_fetching.rs b/crates/net/network/tests/it/transaction_hash_fetching.rs new file mode 100644 index 000000000..7f1d7593a --- /dev/null +++ b/crates/net/network/tests/it/transaction_hash_fetching.rs @@ -0,0 +1,68 @@ +use alloy_primitives::U256; +use rand::thread_rng; +use reth_network::{ + test_utils::Testnet, + transactions::{TransactionPropagationMode::Max, TransactionsManagerConfig}, +}; +use reth_provider::test_utils::{ExtendedAccount, MockEthProvider}; +use reth_tracing::init_test_tracing; +use reth_transaction_pool::{test_utils::TransactionGenerator, PoolTransaction, TransactionPool}; +use tokio::time::Duration; + +#[tokio::test(flavor = "multi_thread")] +#[ignore] +async fn transaction_hash_fetching() { + init_test_tracing(); + + let mut config = TransactionsManagerConfig { propagation_mode: Max(0), ..Default::default() }; + config.transaction_fetcher_config.max_inflight_requests = 1; + + let provider = MockEthProvider::default(); + let num_peers = 10; + let net = Testnet::create_with(num_peers, provider.clone()).await; + + // install request handlers + let net = net.with_eth_pool_config(config); + let handle = net.spawn(); + + // connect all the peers first + handle.connect_peers().await; + + let listening_peer = &handle.peers()[num_peers - 1]; + let mut listening_peer_tx_listener = + listening_peer.pool().unwrap().pending_transactions_listener(); + + let num_tx_per_peer = 10; + + // Generate transactions for peers + for i in 1..num_peers { + let peer = &handle.peers()[i]; + let peer_pool = peer.pool().unwrap(); + + for _ in 0..num_tx_per_peer { + let mut gen = TransactionGenerator::new(thread_rng()); + let tx = gen.gen_eip1559_pooled(); + let sender = tx.sender(); + provider.add_account(sender, ExtendedAccount::new(0, U256::from(100_000_000))); + peer_pool.add_external_transaction(tx).await.unwrap(); + } + } + + // Total expected transactions + let total_expected_tx = num_tx_per_peer * (num_peers - 1); + let mut received_tx = 0; + + loop { + tokio::select! { + Some(_) = listening_peer_tx_listener.recv() => { + received_tx += 1; + if received_tx >= total_expected_tx { + break; + } + } + _ = tokio::time::sleep(Duration::from_secs(10)) => { + panic!("Timed out waiting for transactions. Received {received_tx}/{total_expected_tx}"); + } + } + } +}