feat: graceful incoming connection closing (#12282)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
greg
2024-11-05 15:58:16 +01:00
committed by GitHub
parent 0e83203658
commit 39a667bbfe
2 changed files with 54 additions and 3 deletions

View File

@ -110,6 +110,8 @@ pub struct SessionManager {
active_session_rx: ReceiverStream<ActiveSessionMessage>,
/// Additional `RLPx` sub-protocols to be used by the session manager.
extra_protocols: RlpxSubProtocols,
/// Tracks the ongoing graceful disconnections attempts for incoming connections.
disconnections_counter: DisconnectionsCounter,
/// Metrics for the session manager.
metrics: SessionManagerMetrics,
}
@ -151,6 +153,7 @@ impl SessionManager {
active_session_tx: MeteredPollSender::new(active_session_tx, "network_active_session"),
active_session_rx: ReceiverStream::new(active_session_rx),
extra_protocols,
disconnections_counter: Default::default(),
metrics: Default::default(),
}
}
@ -376,6 +379,35 @@ impl SessionManager {
Some(session)
}
/// Try to gracefully disconnect an incoming connection by initiating a ECIES connection and
/// sending a disconnect. If [`SessionManager`] is at capacity for ongoing disconnections, will
/// simply drop the incoming connection.
pub(crate) fn try_disconnect_incoming_connection(
&self,
stream: TcpStream,
reason: DisconnectReason,
) {
if !self.disconnections_counter.has_capacity() {
// drop the connection if we don't have capacity for gracefully disconnecting
return
}
let guard = self.disconnections_counter.clone();
let secret_key = self.secret_key;
self.spawn(async move {
trace!(
target: "net::session",
"gracefully disconnecting incoming connection"
);
if let Ok(stream) = get_ecies_stream(stream, secret_key, Direction::Incoming).await {
let mut unauth = UnauthedP2PStream::new(stream);
let _ = unauth.send_disconnect(reason).await;
drop(guard);
}
});
}
/// This polls all the session handles and returns [`SessionEvent`].
///
/// Active sessions are prioritized.
@ -615,6 +647,20 @@ impl SessionManager {
}
}
/// A counter for ongoing graceful disconnections attempts.
#[derive(Default, Debug, Clone)]
struct DisconnectionsCounter(Arc<()>);
impl DisconnectionsCounter {
const MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS: usize = 15;
/// Returns true if the [`DisconnectionsCounter`] still has capacity
/// for an additional graceful disconnection.
fn has_capacity(&self) -> bool {
Arc::strong_count(&self.0) <= Self::MAX_CONCURRENT_GRACEFUL_DISCONNECTIONS
}
}
/// Events produced by the [`SessionManager`]
#[derive(Debug)]
pub enum SessionEvent {

View File

@ -8,7 +8,8 @@ use std::{
use futures::Stream;
use reth_eth_wire::{
capability::CapabilityMessage, errors::EthStreamError, Capabilities, EthVersion, Status,
capability::CapabilityMessage, errors::EthStreamError, Capabilities, DisconnectReason,
EthVersion, Status,
};
use reth_network_api::PeerRequestSender;
use reth_network_peers::PeerId;
@ -32,7 +33,7 @@ use crate::{
/// [`SessionManager`]. Outgoing connections are either initiated on demand or triggered by the
/// [`NetworkState`] and also delegated to the [`NetworkState`].
///
/// Following diagram gives displays the dataflow contained in the [`Swarm`]
/// Following diagram displays the dataflow contained in the [`Swarm`]
///
/// The [`ConnectionListener`] yields incoming [`TcpStream`]s from peers that are spawned as session
/// tasks. After a successful `RLPx` authentication, the task is ready to accept ETH requests or
@ -70,7 +71,7 @@ impl Swarm {
Self { incoming, sessions, state }
}
/// Adds an additional protocol handler to the `RLPx` sub-protocol list.
/// Adds a protocol handler to the `RLPx` sub-protocol list.
pub(crate) fn add_rlpx_sub_protocol(&mut self, protocol: impl IntoRlpxSubProtocol) {
self.sessions_mut().add_rlpx_sub_protocol(protocol);
}
@ -201,6 +202,10 @@ impl Swarm {
}
InboundConnectionError::ExceedsCapacity => {
trace!(target: "net", ?remote_addr, "No capacity for incoming connection");
self.sessions.try_disconnect_incoming_connection(
stream,
DisconnectReason::TooManyPeers,
);
}
}
return None