Large tx integration test (#4959)

This commit is contained in:
MouseLess.eth
2023-10-12 14:42:56 +03:00
committed by GitHub
parent 3a8c062574
commit d2a967d4b5
4 changed files with 211 additions and 16 deletions

View File

@ -2,13 +2,15 @@
use crate::{ use crate::{
builder::ETH_REQUEST_CHANNEL_CAPACITY, error::NetworkError, eth_requests::EthRequestHandler, builder::ETH_REQUEST_CHANNEL_CAPACITY, error::NetworkError, eth_requests::EthRequestHandler,
NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkHandle, NetworkManager, transactions::TransactionsManager, NetworkConfig, NetworkConfigBuilder, NetworkEvent,
NetworkHandle, NetworkManager,
}; };
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use pin_project::pin_project; use pin_project::pin_project;
use reth_eth_wire::{capability::Capability, DisconnectReason, HelloBuilder}; use reth_eth_wire::{capability::Capability, DisconnectReason, HelloBuilder};
use reth_primitives::PeerId; use reth_primitives::PeerId;
use reth_provider::{test_utils::NoopProvider, BlockReader, HeaderProvider}; use reth_provider::{test_utils::NoopProvider, BlockReader, HeaderProvider};
use reth_transaction_pool::test_utils::TestPool;
use secp256k1::SecretKey; use secp256k1::SecretKey;
use std::{ use std::{
fmt, fmt,
@ -18,7 +20,10 @@ use std::{
task::{Context, Poll}, task::{Context, Poll},
}; };
use tokio::{ use tokio::{
sync::{mpsc::channel, oneshot}, sync::{
mpsc::{channel, unbounded_channel},
oneshot,
},
task::JoinHandle, task::JoinHandle,
}; };
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
@ -93,7 +98,8 @@ where
let PeerConfig { config, client, secret_key } = config; let PeerConfig { config, client, secret_key } = config;
let network = NetworkManager::new(config).await?; let network = NetworkManager::new(config).await?;
let peer = Peer { network, client, secret_key, request_handler: None }; let peer =
Peer { network, client, secret_key, request_handler: None, transactions_manager: None };
self.peers.push(peer); self.peers.push(peer);
Ok(()) Ok(())
} }
@ -212,6 +218,8 @@ pub struct Peer<C> {
network: NetworkManager<C>, network: NetworkManager<C>,
#[pin] #[pin]
request_handler: Option<EthRequestHandler<C>>, request_handler: Option<EthRequestHandler<C>>,
#[pin]
transactions_manager: Option<TransactionsManager<TestPool>>,
client: C, client: C,
secret_key: SecretKey, secret_key: SecretKey,
} }
@ -245,6 +253,14 @@ where
let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx); let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx);
self.request_handler = Some(request_handler); self.request_handler = Some(request_handler);
} }
/// Set a new transactions manager that's connected to the peer's network
pub fn install_transactions_manager(&mut self, pool: TestPool) {
let (tx, rx) = unbounded_channel();
self.network.set_transactions(tx);
let transactions_manager = TransactionsManager::new(self.handle(), pool, rx);
self.transactions_manager = Some(transactions_manager);
}
} }
impl<C> Future for Peer<C> impl<C> Future for Peer<C>
@ -260,6 +276,10 @@ where
let _ = request.poll(cx); let _ = request.poll(cx);
} }
if let Some(tx_manager) = this.transactions_manager.as_pin_mut() {
let _ = tx_manager.poll(cx);
}
this.network.poll(cx) this.network.poll(cx)
} }
} }
@ -282,7 +302,8 @@ where
pub async fn launch(self) -> Result<Peer<C>, NetworkError> { pub async fn launch(self) -> Result<Peer<C>, NetworkError> {
let PeerConfig { config, client, secret_key } = self; let PeerConfig { config, client, secret_key } = self;
let network = NetworkManager::new(config).await?; let network = NetworkManager::new(config).await?;
let peer = Peer { network, client, secret_key, request_handler: None }; let peer =
Peer { network, client, secret_key, request_handler: None, transactions_manager: None };
Ok(peer) Ok(peer)
} }

View File

@ -0,0 +1,91 @@
use std::sync::Arc;
use reth_eth_wire::{GetPooledTransactions, PooledTransactions};
use reth_interfaces::sync::{NetworkSyncUpdater, SyncState};
use reth_network::{
test_utils::{NetworkEventStream, Testnet},
PeerRequest,
};
use reth_network_api::{NetworkInfo, Peers};
use reth_primitives::{Signature, TransactionSigned, B256};
use reth_provider::test_utils::MockEthProvider;
use reth_transaction_pool::{
test_utils::{testing_pool, MockTransaction},
TransactionOrigin, TransactionPool,
};
use tokio::sync::oneshot;
// peer0: `GetPooledTransactions` requestor
// peer1: `GetPooledTransactions` responder
#[tokio::test(flavor = "multi_thread")]
async fn test_large_tx_req() {
reth_tracing::init_test_tracing();
// create 2000 fake txs
let txs: Vec<MockTransaction> = (0..2000)
.map(|_| {
// replace rng txhash with real txhash
let mut tx = MockTransaction::eip1559();
let ts = TransactionSigned {
hash: Default::default(),
signature: Signature::default(),
transaction: tx.clone().into(),
};
tx.set_hash(ts.recalculate_hash());
tx
})
.collect();
let txs_hashes: Vec<B256> = txs.iter().map(|tx| tx.get_hash()).collect();
// setup testnet
let mock_provider = Arc::new(MockEthProvider::default());
let mut net = Testnet::create_with(2, mock_provider.clone()).await;
// install request handlers
net.for_each_mut(|peer| peer.install_request_handler());
// insert generated txs into responding peer's pool
let pool1 = testing_pool();
pool1.add_transactions(TransactionOrigin::Private, txs).await.unwrap();
// install transactions managers
net.peers_mut()[0].install_transactions_manager(testing_pool());
net.peers_mut()[1].install_transactions_manager(pool1);
// connect peers together and check for connection existance
let handle0 = net.peers()[0].handle();
let handle1 = net.peers()[1].handle();
let mut events0 = NetworkEventStream::new(handle0.event_listener());
let _handle = net.spawn();
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let connected = events0.next_session_established().await.unwrap();
assert_eq!(connected, *handle1.peer_id());
// stop syncing
handle0.update_sync_state(SyncState::Idle);
handle1.update_sync_state(SyncState::Idle);
assert!(!handle0.is_syncing() && !handle1.is_syncing());
// make `GetPooledTransactions` request
let (send, receive) = oneshot::channel();
handle0.send_request(
*handle1.peer_id(),
PeerRequest::GetPooledTransactions {
request: GetPooledTransactions(txs_hashes.clone()),
response: send,
},
);
// check all txs have been received
match receive.await.unwrap() {
Ok(PooledTransactions(txs)) => {
txs.into_iter().for_each(|tx| assert!(txs_hashes.contains(tx.hash())));
}
Err(e) => {
panic!("error: {:?}", e);
}
}
}

View File

@ -1,3 +1,4 @@
mod big_pooled_txs_req;
mod clique; mod clique;
mod connect; mod connect;
mod geth; mod geth;

View File

@ -675,17 +675,7 @@ impl FromRecoveredPooledTransaction for MockTransaction {
impl IntoRecoveredTransaction for MockTransaction { impl IntoRecoveredTransaction for MockTransaction {
fn to_recovered_transaction(&self) -> TransactionSignedEcRecovered { fn to_recovered_transaction(&self) -> TransactionSignedEcRecovered {
let tx = Transaction::Legacy(TxLegacy { let tx = self.clone().into();
chain_id: self.chain_id(),
nonce: self.get_nonce(),
gas_price: self.get_gas_price(),
gas_limit: self.get_gas_limit(),
to: TransactionKind::Call(Address::from_slice(
&hex!("d3e8763675e4c425df46cc3b5c0f6cbdac396046")[..],
)),
value: 693361000000000u64.into(),
input: Default::default(),
});
let signed_tx = TransactionSigned { let signed_tx = TransactionSigned {
hash: *self.hash(), hash: *self.hash(),
@ -697,6 +687,98 @@ impl IntoRecoveredTransaction for MockTransaction {
} }
} }
impl From<MockTransaction> for Transaction {
fn from(mock: MockTransaction) -> Self {
match mock {
MockTransaction::Legacy {
hash,
sender,
nonce,
gas_price,
gas_limit,
to,
value,
input,
} => Self::Legacy(TxLegacy {
chain_id: Some(1),
nonce,
gas_price,
gas_limit,
to,
value: value.into(),
input: input.clone(),
}),
MockTransaction::Eip1559 {
hash,
sender,
nonce,
max_fee_per_gas,
max_priority_fee_per_gas,
gas_limit,
to,
value,
accesslist,
input,
} => Self::Eip1559(TxEip1559 {
chain_id: 1,
nonce,
gas_limit,
max_fee_per_gas,
max_priority_fee_per_gas,
to,
value: value.into(),
access_list: accesslist.clone(),
input: input.clone(),
}),
MockTransaction::Eip4844 {
hash,
sender,
nonce,
max_fee_per_gas,
max_priority_fee_per_gas,
max_fee_per_blob_gas,
gas_limit,
to,
value,
accesslist,
input,
} => Self::Eip4844(TxEip4844 {
chain_id: 1,
nonce,
gas_limit,
max_fee_per_gas,
max_priority_fee_per_gas,
to,
value: value.into(),
access_list: accesslist,
blob_versioned_hashes: vec![hash],
max_fee_per_blob_gas,
input,
}),
MockTransaction::Eip2930 {
hash,
sender,
nonce,
to,
gas_limit,
input,
value,
gas_price,
accesslist,
} => Self::Eip2930(TxEip2930 {
chain_id: 1,
nonce,
gas_price,
gas_limit,
to,
value: value.into(),
access_list: accesslist,
input,
}),
}
}
}
#[cfg(any(test, feature = "arbitrary"))] #[cfg(any(test, feature = "arbitrary"))]
impl proptest::arbitrary::Arbitrary for MockTransaction { impl proptest::arbitrary::Arbitrary for MockTransaction {
type Parameters = (); type Parameters = ();
@ -836,7 +918,7 @@ impl MockTransactionFactory {
} }
} }
#[derive(Clone, Default)] #[derive(Clone, Default, Debug)]
#[non_exhaustive] #[non_exhaustive]
pub struct MockOrdering; pub struct MockOrdering;