diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 192df3329..18d75623d 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -27,14 +27,12 @@ use crate::{ session::SessionManager, state::NetworkState, swarm::{Swarm, SwarmEvent}, + transactions::NetworkTransactionEvent, }; use futures::{Future, StreamExt}; use parking_lot::Mutex; -use reth_eth_wire::{ - capability::{Capabilities, CapabilityMessage}, - GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions, -}; -use reth_interfaces::{p2p::error::RequestResult, provider::BlockProvider}; +use reth_eth_wire::capability::{Capabilities, CapabilityMessage}; +use reth_interfaces::provider::BlockProvider; use reth_primitives::PeerId; use std::{ net::SocketAddr, @@ -45,7 +43,7 @@ use std::{ }, task::{Context, Poll}, }; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{error, trace}; @@ -85,6 +83,8 @@ pub struct NetworkManager { listener_address: Arc>, /// All listeners for [`Network`] events. event_listeners: NetworkEventListeners, + /// Sender half to send events to the [`TransactionsManager`] task, if configured. + to_transactions: Option>, /// Tracks the number of active session (connected peers). /// /// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`] @@ -153,11 +153,17 @@ where block_import, listener_address, event_listeners: Default::default(), + to_transactions: None, num_active_peers, local_node_id, }) } + /// Sets the dedicated channel for events indented for the [`TransactionsManager`] + pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender) { + self.to_transactions = Some(tx); + } + /// Returns the [`NetworkHandle`] that can be cloned and shared. /// /// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`] @@ -176,17 +182,23 @@ where // TODO: disconnect? } + /// Sends an event to the [`TransactionsManager`] if configured + fn notify_tx_manager(&self, event: NetworkTransactionEvent) { + if let Some(ref tx) = self.to_transactions { + let _ = tx.send(event); + } + } + /// Handle an incoming request from the peer fn on_eth_request(&mut self, peer_id: PeerId, req: PeerRequest) { match req { PeerRequest::GetBlockHeaders { .. } => {} PeerRequest::GetBlockBodies { .. } => {} PeerRequest::GetPooledTransactions { request, response } => { - // notify listeners about this request - self.event_listeners.send(NetworkEvent::GetPooledTransactions { + self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions { peer_id, request, - response: Arc::new(response), + response, }); } PeerRequest::GetNodeData { .. } => {} @@ -249,11 +261,16 @@ where }); } PeerMessage::PooledTransactions(msg) => { - self.event_listeners - .send(NetworkEvent::IncomingPooledTransactionHashes { peer_id, msg }); + self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes { + peer_id, + msg, + }); } PeerMessage::Transactions(msg) => { - self.event_listeners.send(NetworkEvent::IncomingTransactions { peer_id, msg }); + self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions { + peer_id, + msg, + }); } PeerMessage::EthRequest(req) => { self.on_eth_request(peer_id, req); @@ -383,9 +400,10 @@ where } } -/// Events emitted by the network that are of interest for subscribers. +/// (Non-exhaustive) Events emitted by the network that are of interest for subscribers. /// -/// This includes any event types that may be relevant to tasks +/// This includes any event types that may be relevant to tasks, for metrics, keep track of peers +/// etc. #[derive(Debug, Clone)] pub enum NetworkEvent { /// Closed the peer session. @@ -396,16 +414,6 @@ pub enum NetworkEvent { capabilities: Arc, messages: PeerRequestSender, }, - /// Received list of transactions to the given peer. - IncomingTransactions { peer_id: PeerId, msg: Arc }, - /// Received list of transactions hashes to the given peer. - IncomingPooledTransactionHashes { peer_id: PeerId, msg: Arc }, - /// Incoming `GetPooledTransactions` request from a peer. - GetPooledTransactions { - peer_id: PeerId, - request: GetPooledTransactions, - response: Arc>>, - }, } /// Bundles all listeners for [`NetworkEvent`]s. diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index d952439be..b08ba1b05 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -8,7 +8,9 @@ use crate::{ NetworkHandle, }; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; -use reth_eth_wire::{GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions}; +use reth_eth_wire::{ + GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions, +}; use reth_interfaces::p2p::error::RequestResult; use reth_primitives::{ FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, H256, @@ -22,7 +24,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::sync::{mpsc, oneshot, oneshot::Sender}; +use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; /// Cache limit of transactions to keep track of for a single peer. @@ -80,6 +82,8 @@ pub struct TransactionsManager { command_rx: UnboundedReceiverStream, /// Incoming commands from [`TransactionsHandle`]. pending_transactions: ReceiverStream, + /// Incoming events from the [`NetworkManager`] + transaction_events: UnboundedReceiverStream, } // === impl TransactionsManager === @@ -90,7 +94,13 @@ where ::Transaction: IntoRecoveredTransaction, { /// Sets up a new instance. - pub fn new(network: NetworkHandle, pool: Pool) -> Self { + /// + /// Note: This expects an existing [`NetworkManager`] instance. + pub fn new( + network: NetworkHandle, + pool: Pool, + from_network: mpsc::UnboundedReceiver, + ) -> Self { let network_events = network.event_listener(); let (command_tx, command_rx) = mpsc::unbounded_channel(); @@ -108,6 +118,7 @@ where command_tx, command_rx: UnboundedReceiverStream::new(command_rx), pending_transactions: ReceiverStream::new(pending), + transaction_events: UnboundedReceiverStream::new(from_network), } } @@ -121,7 +132,7 @@ where &mut self, peer_id: PeerId, request: GetPooledTransactions, - response: Sender>, + response: oneshot::Sender>, ) { if let Some(peer) = self.peers.get_mut(&peer_id) { let transactions = self @@ -171,8 +182,24 @@ where } } - /// Handles a received event - fn on_event(&mut self, event: NetworkEvent) { + /// Handles dedicated transaction events related tot the `eth` protocol. + fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) { + match event { + NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => { + let transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone()); + self.import_transactions(peer_id, transactions.0); + } + NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => { + self.on_new_pooled_transactions(peer_id, msg) + } + NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => { + self.on_get_pooled_transactions(peer_id, request, response) + } + } + } + + /// Handles a received event related to common network events. + fn on_network_event(&mut self, event: NetworkEvent) { match event { NetworkEvent::SessionClosed { peer_id } => { // remove the peer @@ -198,20 +225,6 @@ where msg, }) } - NetworkEvent::IncomingTransactions { peer_id, msg } => { - let transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone()); - self.import_transactions(peer_id, transactions.0); - } - NetworkEvent::IncomingPooledTransactionHashes { peer_id, msg } => { - self.on_new_pooled_transactions(peer_id, msg) - } - NetworkEvent::GetPooledTransactions { peer_id, request, response } => { - if let Ok(response) = Arc::try_unwrap(response) { - // TODO(mattsse): there should be a dedicated channel for the transaction - // manager instead - self.on_get_pooled_transactions(peer_id, request, response) - } - } } } @@ -278,6 +291,15 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = self.get_mut(); + // drain incoming events + while let Poll::Ready(Some(event)) = this.transaction_events.poll_next_unpin(cx) { + this.on_network_tx_event(event); + } + + while let Poll::Ready(Some(event)) = this.network_events.poll_next_unpin(cx) { + this.on_network_event(event); + } + // Advance all imports while let Poll::Ready(Some(import_res)) = this.pool_imports.poll_next_unpin(cx) { match import_res { @@ -338,3 +360,18 @@ struct Peer { enum TransactionsCommand { Propagate(H256), } + +/// All events related to transactions emitted by the network +#[derive(Debug)] +pub enum NetworkTransactionEvent { + /// Received list of transactions to the given peer. + IncomingTransactions { peer_id: PeerId, msg: Arc }, + /// Received list of transactions hashes to the given peer. + IncomingPooledTransactionHashes { peer_id: PeerId, msg: Arc }, + /// Incoming `GetPooledTransactions` request from a peer. + GetPooledTransactions { + peer_id: PeerId, + request: GetPooledTransactions, + response: oneshot::Sender>, + }, +}