From 05ee75f32c95600f0010455ccaf66ed6187b2c59 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 28 Oct 2024 23:31:10 +0100 Subject: [PATCH] fix: restrict concurrent incoming connections (#12150) --- crates/net/network-types/src/peers/state.rs | 6 ++ crates/net/network/src/peers.rs | 62 +++++++++++++++++++-- 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/crates/net/network-types/src/peers/state.rs b/crates/net/network-types/src/peers/state.rs index f6ab1a39f..1e2466c80 100644 --- a/crates/net/network-types/src/peers/state.rs +++ b/crates/net/network-types/src/peers/state.rs @@ -31,6 +31,12 @@ impl PeerConnectionState { } } + /// Returns true if this is the idle state. + #[inline] + pub const fn is_idle(&self) -> bool { + matches!(self, Self::Idle) + } + /// Returns true if this is an active incoming connection. #[inline] pub const fn is_incoming(&self) -> bool { diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers.rs index 3d5ff7a0d..016e7fa8d 100644 --- a/crates/net/network/src/peers.rs +++ b/crates/net/network/src/peers.rs @@ -218,6 +218,11 @@ impl PeersManager { self.backed_off_peers.len() } + /// Returns the number of idle trusted peers. + fn num_idle_trusted_peers(&self) -> usize { + self.peers.iter().filter(|(_, peer)| peer.kind.is_trusted() && peer.state.is_idle()).count() + } + /// Invoked when a new _incoming_ tcp connection is accepted. /// /// returns an error if the inbound ip address is on the ban list @@ -229,9 +234,34 @@ impl PeersManager { return Err(InboundConnectionError::IpBanned) } - if !self.connection_info.has_in_capacity() && self.trusted_peer_ids.is_empty() { - // if we don't have any inbound slots and no trusted peers, we don't accept any new - // connections + // check if we even have slots for a new incoming connection + if !self.connection_info.has_in_capacity() { + if self.trusted_peer_ids.is_empty() { + // if we don't have any incoming slots and no trusted peers, we don't accept any new + // connections + return Err(InboundConnectionError::ExceedsCapacity) + } + + // there's an edge case here where no incoming connections besides from trusted peers + // are allowed (max_inbound == 0), in which case we still need to allow new pending + // incoming connections until all trusted peers are connected. + let num_idle_trusted_peers = self.num_idle_trusted_peers(); + if num_idle_trusted_peers <= self.trusted_peer_ids.len() { + // we still want to limit concurrent pending connections + let max_inbound = + self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound); + if self.connection_info.num_pending_in <= max_inbound { + self.connection_info.inc_pending_in(); + } + return Ok(()) + } + + // all trusted peers are either connected or connecting + return Err(InboundConnectionError::ExceedsCapacity) + } + + // also cap the incoming connections we can process at once + if !self.connection_info.has_in_pending_capacity() { return Err(InboundConnectionError::ExceedsCapacity) } @@ -968,17 +998,22 @@ impl ConnectionInfo { Self { config, num_outbound: 0, num_pending_out: 0, num_inbound: 0, num_pending_in: 0 } } - /// Returns `true` if there's still capacity for a new outgoing connection. + /// Returns `true` if there's still capacity to perform an outgoing connection. const fn has_out_capacity(&self) -> bool { self.num_pending_out < self.config.max_concurrent_outbound_dials && self.num_outbound < self.config.max_outbound } - /// Returns `true` if there's still capacity for a new incoming connection. + /// Returns `true` if there's still capacity to accept a new incoming connection. const fn has_in_capacity(&self) -> bool { self.num_inbound < self.config.max_inbound } + /// Returns `true` if we can handle an additional incoming pending connection. + const fn has_in_pending_capacity(&self) -> bool { + self.num_pending_in < self.config.max_inbound + } + fn decr_state(&mut self, state: PeerConnectionState) { match state { PeerConnectionState::Idle => {} @@ -1597,6 +1632,23 @@ mod tests { assert_eq!(peers.connection_info.num_pending_in, 0); } + #[tokio::test] + async fn test_reject_incoming_at_pending_capacity() { + let mut peers = PeersManager::default(); + + for count in 1..=peers.connection_info.config.max_inbound { + let socket_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, count as u8)), 8008); + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); + assert_eq!(peers.connection_info.num_pending_in, count); + } + assert!(peers.connection_info.has_in_capacity()); + assert!(!peers.connection_info.has_in_pending_capacity()); + + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 100)), 8008); + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err()); + } + #[tokio::test] async fn test_closed_incoming() { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);