feat: extend example with tx handling (#14030)

This commit is contained in:
Matthias Seitz
2025-01-28 11:11:09 +01:00
committed by GitHub
parent cd45a20eeb
commit 1751370a5a

View File

@ -14,8 +14,8 @@ use futures::StreamExt;
use reth_chainspec::DEV; use reth_chainspec::DEV;
use reth_network::{ use reth_network::{
config::rng_secret_key, eth_requests::IncomingEthRequest, p2p::HeadersClient, config::rng_secret_key, eth_requests::IncomingEthRequest, p2p::HeadersClient,
BlockDownloaderProvider, FetchClient, NetworkConfig, NetworkEventListenerProvider, transactions::NetworkTransactionEvent, BlockDownloaderProvider, FetchClient, NetworkConfig,
NetworkHandle, NetworkInfo, NetworkManager, Peers, NetworkEventListenerProvider, NetworkHandle, NetworkInfo, NetworkManager, Peers,
}; };
#[tokio::main] #[tokio::main]
@ -28,13 +28,16 @@ async fn main() -> eyre::Result<()> {
// Configure the network // Configure the network
let config = NetworkConfig::builder(local_key).build_with_noop_provider(DEV.clone()); let config = NetworkConfig::builder(local_key).build_with_noop_provider(DEV.clone());
let (tx, mut rx) = tokio::sync::mpsc::channel(1000); let (requests_tx, mut requests_rx) = tokio::sync::mpsc::channel(1000);
let (transactions_tx, mut transactions_rx) = tokio::sync::mpsc::unbounded_channel();
// create the network instance // create the network instance
let network = NetworkManager::eth(config) let network = NetworkManager::eth(config)
.await? .await?
// install the channel through which the network sends incoming eth requests // install the channel through which the network sends incoming eth requests
.with_eth_request_handler(tx); .with_eth_request_handler(requests_tx)
// install the channel through which the network sends incoming transaction messages
.with_transactions(transactions_tx);
// get a handle to the network to interact with it // get a handle to the network to interact with it
let handle = network.handle().clone(); let handle = network.handle().clone();
@ -57,15 +60,30 @@ async fn main() -> eyre::Result<()> {
run_peer(handle).await.unwrap(); run_peer(handle).await.unwrap();
}); });
while let Some(event) = rx.recv().await { loop {
match event { // receive incoming eth requests and transaction messages
IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => { tokio::select! {
println!("Received block headers request: {:?}, {:?}", peer_id, request); eth_request = requests_rx.recv() => {
response.send(Ok(vec![DEV.genesis_header().clone()].into())).unwrap(); let Some(eth_request) = eth_request else {break};
match eth_request {
IncomingEthRequest::GetBlockHeaders { peer_id, request, response } => {
println!("Received block headers request: {:?}, {:?}", peer_id, request);
response.send(Ok(vec![DEV.genesis_header().clone()].into())).unwrap();
}
IncomingEthRequest::GetBlockBodies { .. } => {}
IncomingEthRequest::GetNodeData { .. } => {}
IncomingEthRequest::GetReceipts { .. } => {}
}
}
transaction_message = transactions_rx.recv() => {
let Some(transaction_message) = transaction_message else {break};
match transaction_message {
NetworkTransactionEvent::IncomingTransactions{ .. } => {}
NetworkTransactionEvent::IncomingPooledTransactionHashes{ .. } => {}
NetworkTransactionEvent::GetPooledTransactions{ .. } => {}
NetworkTransactionEvent::GetTransactionsHandle(_) => {}
}
} }
IncomingEthRequest::GetBlockBodies { .. } => {}
IncomingEthRequest::GetNodeData { .. } => {}
IncomingEthRequest::GetReceipts { .. } => {}
} }
} }