refactor(net): add dedicated transaction event channel (#214)

This commit is contained in:
Matthias Seitz
2022-11-16 21:06:53 +01:00
committed by GitHub
parent 09ba65fa75
commit ee7140f91e
2 changed files with 89 additions and 44 deletions

View File

@ -27,14 +27,12 @@ use crate::{
session::SessionManager,
state::NetworkState,
swarm::{Swarm, SwarmEvent},
transactions::NetworkTransactionEvent,
};
use futures::{Future, StreamExt};
use parking_lot::Mutex;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions,
};
use reth_interfaces::{p2p::error::RequestResult, provider::BlockProvider};
use reth_eth_wire::capability::{Capabilities, CapabilityMessage};
use reth_interfaces::provider::BlockProvider;
use reth_primitives::PeerId;
use std::{
net::SocketAddr,
@ -45,7 +43,7 @@ use std::{
},
task::{Context, Poll},
};
use tokio::sync::{mpsc, oneshot};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{error, trace};
@ -85,6 +83,8 @@ pub struct NetworkManager<C> {
listener_address: Arc<Mutex<SocketAddr>>,
/// All listeners for [`Network`] events.
event_listeners: NetworkEventListeners,
/// Sender half to send events to the [`TransactionsManager`] task, if configured.
to_transactions: Option<mpsc::UnboundedSender<NetworkTransactionEvent>>,
/// Tracks the number of active session (connected peers).
///
/// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
@ -153,11 +153,17 @@ where
block_import,
listener_address,
event_listeners: Default::default(),
to_transactions: None,
num_active_peers,
local_node_id,
})
}
/// Sets the dedicated channel for events indented for the [`TransactionsManager`]
pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent>) {
self.to_transactions = Some(tx);
}
/// Returns the [`NetworkHandle`] that can be cloned and shared.
///
/// The [`NetworkHandle`] can be used to interact with this [`NetworkManager`]
@ -176,17 +182,23 @@ where
// TODO: disconnect?
}
/// Sends an event to the [`TransactionsManager`] if configured
fn notify_tx_manager(&self, event: NetworkTransactionEvent) {
if let Some(ref tx) = self.to_transactions {
let _ = tx.send(event);
}
}
/// Handle an incoming request from the peer
fn on_eth_request(&mut self, peer_id: PeerId, req: PeerRequest) {
match req {
PeerRequest::GetBlockHeaders { .. } => {}
PeerRequest::GetBlockBodies { .. } => {}
PeerRequest::GetPooledTransactions { request, response } => {
// notify listeners about this request
self.event_listeners.send(NetworkEvent::GetPooledTransactions {
self.notify_tx_manager(NetworkTransactionEvent::GetPooledTransactions {
peer_id,
request,
response: Arc::new(response),
response,
});
}
PeerRequest::GetNodeData { .. } => {}
@ -249,11 +261,16 @@ where
});
}
PeerMessage::PooledTransactions(msg) => {
self.event_listeners
.send(NetworkEvent::IncomingPooledTransactionHashes { peer_id, msg });
self.notify_tx_manager(NetworkTransactionEvent::IncomingPooledTransactionHashes {
peer_id,
msg,
});
}
PeerMessage::Transactions(msg) => {
self.event_listeners.send(NetworkEvent::IncomingTransactions { peer_id, msg });
self.notify_tx_manager(NetworkTransactionEvent::IncomingTransactions {
peer_id,
msg,
});
}
PeerMessage::EthRequest(req) => {
self.on_eth_request(peer_id, req);
@ -383,9 +400,10 @@ where
}
}
/// Events emitted by the network that are of interest for subscribers.
/// (Non-exhaustive) Events emitted by the network that are of interest for subscribers.
///
/// This includes any event types that may be relevant to tasks
/// This includes any event types that may be relevant to tasks, for metrics, keep track of peers
/// etc.
#[derive(Debug, Clone)]
pub enum NetworkEvent {
/// Closed the peer session.
@ -396,16 +414,6 @@ pub enum NetworkEvent {
capabilities: Arc<Capabilities>,
messages: PeerRequestSender,
},
/// Received list of transactions to the given peer.
IncomingTransactions { peer_id: PeerId, msg: Arc<Transactions> },
/// Received list of transactions hashes to the given peer.
IncomingPooledTransactionHashes { peer_id: PeerId, msg: Arc<NewPooledTransactionHashes> },
/// Incoming `GetPooledTransactions` request from a peer.
GetPooledTransactions {
peer_id: PeerId,
request: GetPooledTransactions,
response: Arc<oneshot::Sender<RequestResult<PooledTransactions>>>,
},
}
/// Bundles all listeners for [`NetworkEvent`]s.

View File

@ -8,7 +8,9 @@ use crate::{
NetworkHandle,
};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use reth_eth_wire::{GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions};
use reth_eth_wire::{
GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions,
};
use reth_interfaces::p2p::error::RequestResult;
use reth_primitives::{
FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, H256,
@ -22,7 +24,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{mpsc, oneshot, oneshot::Sender};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
/// Cache limit of transactions to keep track of for a single peer.
@ -80,6 +82,8 @@ pub struct TransactionsManager<Pool> {
command_rx: UnboundedReceiverStream<TransactionsCommand>,
/// Incoming commands from [`TransactionsHandle`].
pending_transactions: ReceiverStream<TxHash>,
/// Incoming events from the [`NetworkManager`]
transaction_events: UnboundedReceiverStream<NetworkTransactionEvent>,
}
// === impl TransactionsManager ===
@ -90,7 +94,13 @@ where
<Pool as TransactionPool>::Transaction: IntoRecoveredTransaction,
{
/// Sets up a new instance.
pub fn new(network: NetworkHandle, pool: Pool) -> Self {
///
/// Note: This expects an existing [`NetworkManager`] instance.
pub fn new(
network: NetworkHandle,
pool: Pool,
from_network: mpsc::UnboundedReceiver<NetworkTransactionEvent>,
) -> Self {
let network_events = network.event_listener();
let (command_tx, command_rx) = mpsc::unbounded_channel();
@ -108,6 +118,7 @@ where
command_tx,
command_rx: UnboundedReceiverStream::new(command_rx),
pending_transactions: ReceiverStream::new(pending),
transaction_events: UnboundedReceiverStream::new(from_network),
}
}
@ -121,7 +132,7 @@ where
&mut self,
peer_id: PeerId,
request: GetPooledTransactions,
response: Sender<RequestResult<PooledTransactions>>,
response: oneshot::Sender<RequestResult<PooledTransactions>>,
) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
let transactions = self
@ -171,8 +182,24 @@ where
}
}
/// Handles a received event
fn on_event(&mut self, event: NetworkEvent) {
/// Handles dedicated transaction events related tot the `eth` protocol.
fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) {
match event {
NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
let transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone());
self.import_transactions(peer_id, transactions.0);
}
NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
self.on_new_pooled_transactions(peer_id, msg)
}
NetworkTransactionEvent::GetPooledTransactions { peer_id, request, response } => {
self.on_get_pooled_transactions(peer_id, request, response)
}
}
}
/// Handles a received event related to common network events.
fn on_network_event(&mut self, event: NetworkEvent) {
match event {
NetworkEvent::SessionClosed { peer_id } => {
// remove the peer
@ -198,20 +225,6 @@ where
msg,
})
}
NetworkEvent::IncomingTransactions { peer_id, msg } => {
let transactions = Arc::try_unwrap(msg).unwrap_or_else(|arc| (*arc).clone());
self.import_transactions(peer_id, transactions.0);
}
NetworkEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
self.on_new_pooled_transactions(peer_id, msg)
}
NetworkEvent::GetPooledTransactions { peer_id, request, response } => {
if let Ok(response) = Arc::try_unwrap(response) {
// TODO(mattsse): there should be a dedicated channel for the transaction
// manager instead
self.on_get_pooled_transactions(peer_id, request, response)
}
}
}
}
@ -278,6 +291,15 @@ where
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
// drain incoming events
while let Poll::Ready(Some(event)) = this.transaction_events.poll_next_unpin(cx) {
this.on_network_tx_event(event);
}
while let Poll::Ready(Some(event)) = this.network_events.poll_next_unpin(cx) {
this.on_network_event(event);
}
// Advance all imports
while let Poll::Ready(Some(import_res)) = this.pool_imports.poll_next_unpin(cx) {
match import_res {
@ -338,3 +360,18 @@ struct Peer {
enum TransactionsCommand {
Propagate(H256),
}
/// All events related to transactions emitted by the network
#[derive(Debug)]
pub enum NetworkTransactionEvent {
/// Received list of transactions to the given peer.
IncomingTransactions { peer_id: PeerId, msg: Arc<Transactions> },
/// Received list of transactions hashes to the given peer.
IncomingPooledTransactionHashes { peer_id: PeerId, msg: Arc<NewPooledTransactionHashes> },
/// Incoming `GetPooledTransactions` request from a peer.
GetPooledTransactions {
peer_id: PeerId,
request: GetPooledTransactions,
response: oneshot::Sender<RequestResult<PooledTransactions>>,
},
}