fix: make eth requests channel bounded (#2811)

This commit is contained in:
Sanket Shanbhag
2023-05-24 22:45:23 +05:30
committed by GitHub
parent 0b81096f8b
commit 718dbfc971
5 changed files with 38 additions and 19 deletions

View File

@ -7,6 +7,10 @@ use crate::{
use reth_transaction_pool::TransactionPool;
use tokio::sync::mpsc;
/// We set the max channel capacity of the EthRequestHandler to 256
/// 256 requests with malicious 10MB body requests is 2.6GB which can be absorbed by the node.
pub(crate) const ETH_REQUEST_CHANNEL_CAPACITY: usize = 256;
/// A builder that can configure all components of the network.
pub struct NetworkBuilder<C, Tx, Eth> {
pub(crate) network: NetworkManager<C>,
@ -49,7 +53,7 @@ impl<C, Tx, Eth> NetworkBuilder<C, Tx, Eth> {
client: Client,
) -> NetworkBuilder<C, Tx, EthRequestHandler<Client>> {
let NetworkBuilder { mut network, transactions, .. } = self;
let (tx, rx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::channel(ETH_REQUEST_CHANNEL_CAPACITY);
network.set_eth_request_handler(tx);
let peers = network.handle().peers_handle().clone();
let request_handler = EthRequestHandler::new(client, peers, rx);

View File

@ -16,8 +16,8 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio::sync::{mpsc::Receiver, oneshot};
use tokio_stream::wrappers::ReceiverStream;
// Limits: <https://github.com/ethereum/go-ethereum/blob/b0d44338bbcefee044f1f635a84487cbbd8f0538/eth/protocols/eth/handler.go#L34-L56>
@ -54,7 +54,7 @@ pub struct EthRequestHandler<C> {
// TODO use to report spammers
peers: PeersHandle,
/// Incoming request from the [NetworkManager](crate::NetworkManager).
incoming_requests: UnboundedReceiverStream<IncomingEthRequest>,
incoming_requests: ReceiverStream<IncomingEthRequest>,
/// Metrics for the eth request handler.
metrics: EthRequestHandlerMetrics,
}
@ -62,13 +62,9 @@ pub struct EthRequestHandler<C> {
// === impl EthRequestHandler ===
impl<C> EthRequestHandler<C> {
/// Create a new instance
pub fn new(
client: C,
peers: PeersHandle,
incoming: UnboundedReceiver<IncomingEthRequest>,
) -> Self {
pub fn new(client: C, peers: PeersHandle, incoming: Receiver<IncomingEthRequest>) -> Self {
let metrics = Default::default();
Self { client, peers, incoming_requests: UnboundedReceiverStream::new(incoming), metrics }
Self { client, peers, incoming_requests: ReceiverStream::new(incoming), metrics }
}
}

View File

@ -52,7 +52,7 @@ use std::{
},
task::{Context, Poll},
};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, info, trace};
/// Manages the _entire_ state of the network.
@ -100,7 +100,18 @@ pub struct NetworkManager<C> {
to_transactions_manager: Option<mpsc::UnboundedSender<NetworkTransactionEvent>>,
/// Sender half to send events to the
/// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler) task, if configured.
to_eth_request_handler: Option<mpsc::UnboundedSender<IncomingEthRequest>>,
///
/// The channel that originally receives and bundles all requests from all sessions is already
/// bounded. However, since handling an eth request is more I/O intensive than delegating
/// them from the bounded channel to the eth-request channel, it is possible that this
/// builds up if the node is flooded with requests.
///
/// Even though nonmalicious requests are relatively cheap, it's possible to craft
/// body requests with bogus data up until the allowed max message size limit.
/// Thus, we use a bounded channel here to avoid unbounded build up if the node is flooded with
/// requests. This channel size is set at
/// [`ETH_REQUEST_CHANNEL_CAPACITY`](crate::builder::ETH_REQUEST_CHANNEL_CAPACITY)
to_eth_request_handler: Option<mpsc::Sender<IncomingEthRequest>>,
/// Tracks the number of active session (connected peers).
///
/// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
@ -122,7 +133,7 @@ impl<C> NetworkManager<C> {
/// Sets the dedicated channel for events indented for the
/// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler).
pub fn set_eth_request_handler(&mut self, tx: mpsc::UnboundedSender<IncomingEthRequest>) {
pub fn set_eth_request_handler(&mut self, tx: mpsc::Sender<IncomingEthRequest>) {
self.to_eth_request_handler = Some(tx);
}
@ -363,7 +374,12 @@ where
/// configured.
fn delegate_eth_request(&self, event: IncomingEthRequest) {
if let Some(ref reqs) = self.to_eth_request_handler {
let _ = reqs.send(event);
let _ = reqs.try_send(event).map_err(|e| {
if let TrySendError::Full(_) = e {
debug!(target:"net", "EthRequestHandler channel is full!");
self.metrics.total_dropped_eth_requests_at_full_capacity.increment(1);
}
});
}
}

View File

@ -35,6 +35,9 @@ pub struct NetworkMetrics {
/// Number of invalid/malformed messages received from peers
pub(crate) invalid_messages_received: Counter,
/// Number of Eth Requests dropped due to channel being at full capacity
pub(crate) total_dropped_eth_requests_at_full_capacity: Counter,
}
/// Metrics for the TransactionsManager

View File

@ -1,8 +1,8 @@
//! A network implementation for testing purposes.
use crate::{
error::NetworkError, eth_requests::EthRequestHandler, NetworkConfig, NetworkConfigBuilder,
NetworkEvent, NetworkHandle, NetworkManager,
builder::ETH_REQUEST_CHANNEL_CAPACITY, error::NetworkError, eth_requests::EthRequestHandler,
NetworkConfig, NetworkConfigBuilder, NetworkEvent, NetworkHandle, NetworkManager,
};
use futures::{FutureExt, StreamExt};
use pin_project::pin_project;
@ -18,7 +18,7 @@ use std::{
task::{Context, Poll},
};
use tokio::{
sync::{mpsc::unbounded_channel, oneshot},
sync::{mpsc::channel, oneshot},
task::JoinHandle,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
@ -237,9 +237,9 @@ where
self.network.handle().clone()
}
/// Set a new request handler that's connected tot the peer's network
/// Set a new request handler that's connected to the peer's network
pub fn install_request_handler(&mut self) {
let (tx, rx) = unbounded_channel();
let (tx, rx) = channel(ETH_REQUEST_CHANNEL_CAPACITY);
self.network.set_eth_request_handler(tx);
let peers = self.network.peers_handle();
let request_handler = EthRequestHandler::new(self.client.clone(), peers, rx);