diff --git a/examples/network-proxy/src/main.rs b/examples/network-proxy/src/main.rs index 6baa9233b..f875b7f78 100644 --- a/examples/network-proxy/src/main.rs +++ b/examples/network-proxy/src/main.rs @@ -14,8 +14,8 @@ use futures::StreamExt; use reth_chainspec::DEV; use reth_network::{ config::rng_secret_key, eth_requests::IncomingEthRequest, p2p::HeadersClient, - BlockDownloaderProvider, FetchClient, NetworkConfig, NetworkEventListenerProvider, - NetworkHandle, NetworkInfo, NetworkManager, Peers, + transactions::NetworkTransactionEvent, BlockDownloaderProvider, FetchClient, NetworkConfig, + NetworkEventListenerProvider, NetworkHandle, NetworkInfo, NetworkManager, Peers, }; #[tokio::main] @@ -28,13 +28,16 @@ async fn main() -> eyre::Result<()> { // Configure the network let config = NetworkConfig::builder(local_key).build_with_noop_provider(DEV.clone()); - let (tx, mut rx) = tokio::sync::mpsc::channel(1000); + let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1000); + let (transactions_tx, mut transactions_rx) = tokio::sync::mpsc::unbounded_channel(); // create the network instance let network = NetworkManager::eth(config) .await? // install the channel through which the network sends incoming eth requests - .with_eth_request_handler(tx); + .with_eth_request_handler(requests_tx) + // install the channel through which the network sends incoming transaction messages + .with_transactions(transactions_tx); // get a handle to the network to interact with it let handle = network.handle().clone(); @@ -57,15 +60,30 @@ async fn main() -> eyre::Result<()> { run_peer(handle).await.unwrap(); }); - while let Some(event) = rx.recv().await { - match event { - IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => { - println!("Received block headers request: {:?}, {:?}", peer_id, request); - response.send(Ok(vec![DEV.genesis_header().clone()].into())).unwrap(); + loop { + // receive incoming eth requests and transaction messages + tokio::select! { + eth_request = requests_rx.recv() => { + let Some(eth_request) = eth_request else {break}; + match eth_request { + IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => { + println!("Received block headers request: {:?}, {:?}", peer_id, request); + response.send(Ok(vec![DEV.genesis_header().clone()].into())).unwrap(); + } + IncomingEthRequest::GetBlockBodies { .. } => {} + IncomingEthRequest::GetNodeData { .. } => {} + IncomingEthRequest::GetReceipts { .. } => {} + } + } + transaction_message = transactions_rx.recv() => { + let Some(transaction_message) = transaction_message else {break}; + match transaction_message { + NetworkTransactionEvent::IncomingTransactions{ .. } => {} + NetworkTransactionEvent::IncomingPooledTransactionHashes{ .. } => {} + NetworkTransactionEvent::GetPooledTransactions{ .. } => {} + NetworkTransactionEvent::GetTransactionsHandle(_) => {} + } } - IncomingEthRequest::GetBlockBodies { .. } => {} - IncomingEthRequest::GetNodeData { .. } => {} - IncomingEthRequest::GetReceipts { .. } => {} } }