diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 78bcde9b2..64376b8ae 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -2,13 +2,15 @@ use crate::{ builder::ETH_REQUEST_CHANNEL_CAPACITY, error::NetworkError, eth_requests::EthRequestHandler, - NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkHandle, NetworkManager, + transactions::TransactionsManager, NetworkConfig, NetworkConfigBuilder, NetworkEvent, + NetworkHandle, NetworkManager, }; use futures::{FutureExt, StreamExt}; use pin_project::pin_project; use reth_eth_wire::{capability::Capability, DisconnectReason, HelloBuilder}; use reth_primitives::PeerId; use reth_provider::{test_utils::NoopProvider, BlockReader, HeaderProvider}; +use reth_transaction_pool::test_utils::TestPool; use secp256k1::SecretKey; use std::{ fmt, @@ -18,7 +20,10 @@ use std::{ task::{Context, Poll}, }; use tokio::{ - sync::{mpsc::channel, oneshot}, + sync::{ + mpsc::{channel, unbounded_channel}, + oneshot, + }, task::JoinHandle, }; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -93,7 +98,8 @@ where let PeerConfig { config, client, secret_key } = config; let network = NetworkManager::new(config).await?; - let peer = Peer { network, client, secret_key, request_handler: None }; + let peer = + Peer { network, client, secret_key, request_handler: None, transactions_manager: None }; self.peers.push(peer); Ok(()) } @@ -212,6 +218,8 @@ pub struct Peer { network: NetworkManager, #[pin] request_handler: Option>, + #[pin] + transactions_manager: Option>, client: C, secret_key: SecretKey, } @@ -245,6 +253,14 @@ where let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx); self.request_handler = Some(request_handler); } + + /// Set a new transactions manager that's connected to the peer's network + pub fn install_transactions_manager(&mut self, pool: TestPool) { + let (tx, rx) = unbounded_channel(); + self.network.set_transactions(tx); + let transactions_manager = TransactionsManager::new(self.handle(), pool, rx); + self.transactions_manager = Some(transactions_manager); + } } impl Future for Peer @@ -260,6 +276,10 @@ where let _ = request.poll(cx); } + if let Some(tx_manager) = this.transactions_manager.as_pin_mut() { + let _ = tx_manager.poll(cx); + } + this.network.poll(cx) } } @@ -282,7 +302,8 @@ where pub async fn launch(self) -> Result, NetworkError> { let PeerConfig { config, client, secret_key } = self; let network = NetworkManager::new(config).await?; - let peer = Peer { network, client, secret_key, request_handler: None }; + let peer = + Peer { network, client, secret_key, request_handler: None, transactions_manager: None }; Ok(peer) } diff --git a/crates/net/network/tests/it/big_pooled_txs_req.rs b/crates/net/network/tests/it/big_pooled_txs_req.rs new file mode 100644 index 000000000..5472e492a --- /dev/null +++ b/crates/net/network/tests/it/big_pooled_txs_req.rs @@ -0,0 +1,91 @@ +use std::sync::Arc; + +use reth_eth_wire::{GetPooledTransactions, PooledTransactions}; +use reth_interfaces::sync::{NetworkSyncUpdater, SyncState}; +use reth_network::{ + test_utils::{NetworkEventStream, Testnet}, + PeerRequest, +}; +use reth_network_api::{NetworkInfo, Peers}; +use reth_primitives::{Signature, TransactionSigned, B256}; +use reth_provider::test_utils::MockEthProvider; +use reth_transaction_pool::{ + test_utils::{testing_pool, MockTransaction}, + TransactionOrigin, TransactionPool, +}; +use tokio::sync::oneshot; + +// peer0: `GetPooledTransactions` requestor +// peer1: `GetPooledTransactions` responder +#[tokio::test(flavor = "multi_thread")] +async fn test_large_tx_req() { + reth_tracing::init_test_tracing(); + + // create 2000 fake txs + let txs: Vec = (0..2000) + .map(|_| { + // replace rng txhash with real txhash + let mut tx = MockTransaction::eip1559(); + + let ts = TransactionSigned { + hash: Default::default(), + signature: Signature::default(), + transaction: tx.clone().into(), + }; + tx.set_hash(ts.recalculate_hash()); + tx + }) + .collect(); + let txs_hashes: Vec = txs.iter().map(|tx| tx.get_hash()).collect(); + + // setup testnet + let mock_provider = Arc::new(MockEthProvider::default()); + let mut net = Testnet::create_with(2, mock_provider.clone()).await; + + // install request handlers + net.for_each_mut(|peer| peer.install_request_handler()); + + // insert generated txs into responding peer's pool + let pool1 = testing_pool(); + pool1.add_transactions(TransactionOrigin::Private, txs).await.unwrap(); + + // install transactions managers + net.peers_mut()[0].install_transactions_manager(testing_pool()); + net.peers_mut()[1].install_transactions_manager(pool1); + + // connect peers together and check for connection existance + let handle0 = net.peers()[0].handle(); + let handle1 = net.peers()[1].handle(); + let mut events0 = NetworkEventStream::new(handle0.event_listener()); + + let _handle = net.spawn(); + + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + let connected = events0.next_session_established().await.unwrap(); + assert_eq!(connected, *handle1.peer_id()); + + // stop syncing + handle0.update_sync_state(SyncState::Idle); + handle1.update_sync_state(SyncState::Idle); + assert!(!handle0.is_syncing() && !handle1.is_syncing()); + + // make `GetPooledTransactions` request + let (send, receive) = oneshot::channel(); + handle0.send_request( + *handle1.peer_id(), + PeerRequest::GetPooledTransactions { + request: GetPooledTransactions(txs_hashes.clone()), + response: send, + }, + ); + + // check all txs have been received + match receive.await.unwrap() { + Ok(PooledTransactions(txs)) => { + txs.into_iter().for_each(|tx| assert!(txs_hashes.contains(tx.hash()))); + } + Err(e) => { + panic!("error: {:?}", e); + } + } +} diff --git a/crates/net/network/tests/it/main.rs b/crates/net/network/tests/it/main.rs index 106bf6130..6937d7aed 100644 --- a/crates/net/network/tests/it/main.rs +++ b/crates/net/network/tests/it/main.rs @@ -1,3 +1,4 @@ +mod big_pooled_txs_req; mod clique; mod connect; mod geth; diff --git a/crates/transaction-pool/src/test_utils/mock.rs b/crates/transaction-pool/src/test_utils/mock.rs index f764fad32..9940d6927 100644 --- a/crates/transaction-pool/src/test_utils/mock.rs +++ b/crates/transaction-pool/src/test_utils/mock.rs @@ -675,17 +675,7 @@ impl FromRecoveredPooledTransaction for MockTransaction { impl IntoRecoveredTransaction for MockTransaction { fn to_recovered_transaction(&self) -> TransactionSignedEcRecovered { - let tx = Transaction::Legacy(TxLegacy { - chain_id: self.chain_id(), - nonce: self.get_nonce(), - gas_price: self.get_gas_price(), - gas_limit: self.get_gas_limit(), - to: TransactionKind::Call(Address::from_slice( - &hex!("d3e8763675e4c425df46cc3b5c0f6cbdac396046")[..], - )), - value: 693361000000000u64.into(), - input: Default::default(), - }); + let tx = self.clone().into(); let signed_tx = TransactionSigned { hash: *self.hash(), @@ -697,6 +687,98 @@ impl IntoRecoveredTransaction for MockTransaction { } } +impl From for Transaction { + fn from(mock: MockTransaction) -> Self { + match mock { + MockTransaction::Legacy { + hash, + sender, + nonce, + gas_price, + gas_limit, + to, + value, + input, + } => Self::Legacy(TxLegacy { + chain_id: Some(1), + nonce, + gas_price, + gas_limit, + to, + value: value.into(), + input: input.clone(), + }), + MockTransaction::Eip1559 { + hash, + sender, + nonce, + max_fee_per_gas, + max_priority_fee_per_gas, + gas_limit, + to, + value, + accesslist, + input, + } => Self::Eip1559(TxEip1559 { + chain_id: 1, + nonce, + gas_limit, + max_fee_per_gas, + max_priority_fee_per_gas, + to, + value: value.into(), + access_list: accesslist.clone(), + input: input.clone(), + }), + MockTransaction::Eip4844 { + hash, + sender, + nonce, + max_fee_per_gas, + max_priority_fee_per_gas, + max_fee_per_blob_gas, + gas_limit, + to, + value, + accesslist, + input, + } => Self::Eip4844(TxEip4844 { + chain_id: 1, + nonce, + gas_limit, + max_fee_per_gas, + max_priority_fee_per_gas, + to, + value: value.into(), + access_list: accesslist, + blob_versioned_hashes: vec![hash], + max_fee_per_blob_gas, + input, + }), + MockTransaction::Eip2930 { + hash, + sender, + nonce, + to, + gas_limit, + input, + value, + gas_price, + accesslist, + } => Self::Eip2930(TxEip2930 { + chain_id: 1, + nonce, + gas_price, + gas_limit, + to, + value: value.into(), + access_list: accesslist, + input, + }), + } + } +} + #[cfg(any(test, feature = "arbitrary"))] impl proptest::arbitrary::Arbitrary for MockTransaction { type Parameters = (); @@ -836,7 +918,7 @@ impl MockTransactionFactory { } } -#[derive(Clone, Default)] +#[derive(Clone, Default, Debug)] #[non_exhaustive] pub struct MockOrdering;