From 9312424db05b066cee6fae40471dc679ed35cef8 Mon Sep 17 00:00:00 2001 From: Abner Zheng Date: Fri, 15 Mar 2024 19:16:17 +0800 Subject: [PATCH] Always accept incoming connections from trusted peers (#7140) --- crates/net/network/src/peers/manager.rs | 58 +++++++++--- crates/net/network/src/session/mod.rs | 39 -------- crates/net/network/src/swarm.rs | 11 +-- crates/net/network/tests/it/connect.rs | 117 ++++++++++++++++++++++++ 4 files changed, 164 insertions(+), 61 deletions(-) diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 4dd309b58..4bf31fad9 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -213,8 +213,7 @@ impl PeersManager { /// Invoked when a new _incoming_ tcp connection is accepted. /// - /// returns an error if the inbound ip address is on the ban list or - /// we have reached our limit for max inbound connections + /// returns an error if the inbound ip address is on the ban list pub(crate) fn on_incoming_pending_session( &mut self, addr: IpAddr, @@ -222,10 +221,6 @@ impl PeersManager { if self.ban_list.is_banned_ip(&addr) { return Err(InboundConnectionError::IpBanned) } - if !self.connection_info.has_in_capacity() { - return Err(InboundConnectionError::ExceedsLimit(self.connection_info.max_inbound)) - } - // keep track of new connection self.connection_info.inc_in(); Ok(()) } @@ -284,6 +279,14 @@ impl PeersManager { return } value.state = PeerConnectionState::In; + // if a peer is not trusted and we don't have capacity for more inbound connections, + // disconnecting the peer + if !value.is_trusted() && !self.connection_info.has_in_capacity() { + self.queued_actions.push_back(PeerAction::Disconnect { + peer_id, + reason: Some(DisconnectReason::TooManyPeers), + }); + } } Entry::Vacant(entry) => { // peer is missing in the table, we add it but mark it as to be removed after @@ -292,6 +295,14 @@ impl PeersManager { peer.remove_after_disconnect = true; entry.insert(peer); self.queued_actions.push_back(PeerAction::PeerAdded(peer_id)); + + // disconnect the peer if we don't have capacity for more inbound connections + if !self.connection_info.has_in_capacity() { + self.queued_actions.push_back(PeerAction::Disconnect { + peer_id, + reason: Some(DisconnectReason::TooManyPeers), + }); + } } } } @@ -573,7 +584,10 @@ impl PeersManager { /// to us at the same time and this connection is already established. pub(crate) fn on_already_connected(&mut self, direction: Direction) { match direction { - Direction::Incoming => {} + Direction::Incoming => { + // need to decrement the ingoing counter + self.connection_info.decr_in(); + } Direction::Outgoing(_) => { // need to decrement the outgoing counter self.connection_info.decr_out(); @@ -875,7 +889,7 @@ impl ConnectionInfo { /// Returns `true` if there's still capacity for a new incoming connection. fn has_in_capacity(&self) -> bool { - self.num_inbound < self.max_inbound + self.num_inbound <= self.max_inbound } fn decr_state(&mut self, state: PeerConnectionState) { @@ -1420,7 +1434,6 @@ impl Default for PeerBackoffDurations { #[derive(Debug, Error)] pub enum InboundConnectionError { - ExceedsLimit(usize), IpBanned, } @@ -1449,7 +1462,7 @@ mod tests { DisconnectReason, }; use reth_net_common::ban_list::BanList; - use reth_network_api::ReputationChangeKind; + use reth_network_api::{Direction, ReputationChangeKind}; use reth_primitives::{PeerId, B512}; use std::{ collections::HashSet, @@ -1989,6 +2002,28 @@ mod tests { } } + #[tokio::test] + async fn test_already_connected() { + let peer = PeerId::random(); + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); + let mut peers = PeersManager::default(); + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); + peers.on_incoming_session_established(peer, socket_addr); + + // peer should have been added and num_inbound should have been increased + let p = peers.peers.get_mut(&peer).expect("peer not found"); + assert_eq!(p.addr, socket_addr); + assert_eq!(peers.connection_info.num_inbound, 1); + + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); + peers.on_already_connected(Direction::Incoming); + + // peer should not be connected and num_inbound should not have been increased + let p = peers.peers.get_mut(&peer).expect("peer not found"); + assert_eq!(p.addr, socket_addr); + assert_eq!(peers.connection_info.num_inbound, 1); + } + #[tokio::test] async fn test_reputation_change_trusted_peer() { let peer = PeerId::random(); @@ -2166,9 +2201,6 @@ mod tests { Ok(_) => panic!(), Err(err) => match err { super::InboundConnectionError::IpBanned {} => {} - super::InboundConnectionError::ExceedsLimit { .. } => { - panic!() - } }, } } diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index ff2984928..775c9ede7 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -53,9 +53,6 @@ pub use handle::{ use reth_eth_wire::multiplex::RlpxProtocolMultiplexer; pub use reth_network_api::{Direction, PeerInfo}; -/// Maximum allowed graceful disconnects at a time. -const MAX_GRACEFUL_DISCONNECTS: usize = 15; - /// Internal identifier for active sessions. #[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)] pub struct SessionId(usize); @@ -113,8 +110,6 @@ pub struct SessionManager { bandwidth_meter: BandwidthMeter, /// Metrics for the session manager. metrics: SessionManagerMetrics, - /// Tracks the number of active graceful disconnects for incoming connections. - graceful_disconnects_counter: GracefulDisconnects, } // === impl SessionManager === @@ -156,7 +151,6 @@ impl SessionManager { bandwidth_meter, extra_protocols, metrics: Default::default(), - graceful_disconnects_counter: Default::default(), } } @@ -310,27 +304,6 @@ impl SessionManager { } } - /// Sends a disconnect message to the peer with the given [DisconnectReason]. - pub(crate) fn disconnect_incoming_connection( - &mut self, - stream: TcpStream, - reason: DisconnectReason, - ) { - let counter = self.graceful_disconnects_counter.clone(); - if counter.exceeds_limit() { - // simply drop the connection if there are too many active disconnects already - return - } - let secret_key = self.secret_key; - - self.spawn(async move { - if let Ok(stream) = get_eciess_stream(stream, secret_key, Direction::Incoming).await { - let _ = UnauthedP2PStream::new(stream).send_disconnect(reason).await; - } - drop(counter) - }); - } - /// Initiates a shutdown of all sessions. /// /// It will trigger the disconnect on all the session tasks to gracefully terminate. The result @@ -635,18 +608,6 @@ impl SessionManager { } } -/// Keep track of graceful disconnects for incoming connections. -#[derive(Debug, Clone, Default)] -struct GracefulDisconnects(Arc<()>); - -impl GracefulDisconnects { - /// Returns true if the number of graceful disconnects exceeds the limit - /// [MAX_GRACEFUL_DISCONNECTS] - fn exceeds_limit(&self) -> bool { - Arc::strong_count(&self.0) > MAX_GRACEFUL_DISCONNECTS - } -} - /// Events produced by the [`SessionManager`] #[derive(Debug)] pub enum SessionEvent { diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 021425b1b..41b546c7b 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -10,7 +10,7 @@ use futures::Stream; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, errors::EthStreamError, - DisconnectReason, EthVersion, Status, + EthVersion, Status, }; use reth_primitives::PeerId; use reth_provider::{BlockNumReader, BlockReader}; @@ -29,7 +29,7 @@ use tracing::trace; /// /// A swarm emits [`SwarmEvent`]s when polled. /// -/// The manages the [`ConnectionListener`] and delegates new incoming connections to the +/// It manages the [`ConnectionListener`] and delegates new incoming connections to the /// [`SessionManager`]. Outgoing connections are either initiated on demand or triggered by the /// [`NetworkState`] and also delegated to the [`NetworkState`]. /// @@ -203,13 +203,6 @@ where InboundConnectionError::IpBanned => { trace!(target: "net", ?remote_addr, "The incoming ip address is in the ban list"); } - InboundConnectionError::ExceedsLimit(limit) => { - trace!(target: "net", %limit, ?remote_addr, "Exceeded incoming connection limit; disconnecting"); - self.sessions.disconnect_incoming_connection( - stream, - DisconnectReason::TooManyPeers, - ); - } } return None } diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index fccb7fac0..454e4b359 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -585,6 +585,123 @@ async fn test_disconnect_incoming_when_exceeded_incoming_connections() { net_handle.terminate().await; } +#[tokio::test(flavor = "multi_thread")] +async fn test_always_accept_incoming_connections_from_trusted_peers() { + reth_tracing::init_test_tracing(); + let other_peer1 = new_random_peer(10, HashSet::new()).await; + let other_peer2 = new_random_peer(10, HashSet::new()).await; + let other_peer3 = new_random_peer(0, HashSet::new()).await; + + // setup the peer with max_inbound = 1, and add other_peer_3 as trust nodes + let peer = new_random_peer( + 1, + HashSet::from([NodeRecord::new(other_peer3.local_addr(), *other_peer3.peer_id())]), + ) + .await; + + let handle = peer.handle().clone(); + let other_peer_handle1 = other_peer1.handle().clone(); + let other_peer_handle2 = other_peer2.handle().clone(); + let other_peer_handle3 = other_peer3.handle().clone(); + + tokio::task::spawn(peer); + tokio::task::spawn(other_peer1); + tokio::task::spawn(other_peer2); + tokio::task::spawn(other_peer3); + + let mut events = NetworkEventStream::new(handle.event_listener()); + let mut events2 = NetworkEventStream::new(other_peer_handle2.event_listener()); + + // though we added other_peer3 as a trust node, the incoming connection should fail because + // peer3 doesn't allow inbound connections + let (peer_id, reason) = events.next_session_closed().await.unwrap(); + assert_eq!(peer_id, *other_peer_handle3.peer_id()); + assert_eq!(reason, Some(DisconnectReason::TooManyPeers)); + + // incoming connection should succeed + other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr()); + let peer_id = events.next_session_established().await.unwrap(); + assert_eq!(peer_id, *other_peer_handle1.peer_id()); + assert_eq!(handle.num_connected_peers(), 1); + + // incoming connection should fail because exceeding max_inbound + other_peer_handle2.add_peer(*handle.peer_id(), handle.local_addr()); + let (peer_id, reason) = events.next_session_closed().await.unwrap(); + assert_eq!(peer_id, *other_peer_handle2.peer_id()); + // fixme: this should be `Some(DisconnectReason::TooManyPeers)` but `None` + assert_eq!(reason, None); + + let (peer_id, reason) = events2.next_session_closed().await.unwrap(); + assert_eq!(peer_id, *handle.peer_id()); + assert_eq!(reason, Some(DisconnectReason::TooManyPeers)); + + // outbound connection from `other_peer3` should succeed + other_peer_handle3.add_peer(*handle.peer_id(), handle.local_addr()); + let peer_id = events.next_session_established().await.unwrap(); + assert_eq!(peer_id, *other_peer_handle3.peer_id()); + + // sleep is needed because the disconnect event happened after session_established event + tokio::time::sleep(Duration::from_secs(3)).await; + assert_eq!(handle.num_connected_peers(), 2); +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_rejected_by_already_connect() { + reth_tracing::init_test_tracing(); + let other_peer1 = new_random_peer(10, HashSet::new()).await; + let other_peer2 = new_random_peer(10, HashSet::new()).await; + + // setup the peer with max_inbound = 2 + let peer = new_random_peer(2, HashSet::new()).await; + + let handle = peer.handle().clone(); + let other_peer_handle1 = other_peer1.handle().clone(); + let other_peer_handle2 = other_peer2.handle().clone(); + + tokio::task::spawn(peer); + tokio::task::spawn(other_peer1); + tokio::task::spawn(other_peer2); + + let mut events = NetworkEventStream::new(handle.event_listener()); + + // incoming connection should succeed + other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr()); + let peer_id = events.next_session_established().await.unwrap(); + assert_eq!(peer_id, *other_peer_handle1.peer_id()); + assert_eq!(handle.num_connected_peers(), 1); + + // incoming connection from the same peer should be rejected by already connected + // and num_inbount should still be 1 + other_peer_handle1.add_peer(*handle.peer_id(), handle.local_addr()); + tokio::time::sleep(Duration::from_secs(1)).await; + + // incoming connection from other_peer2 should succeed + other_peer_handle2.add_peer(*handle.peer_id(), handle.local_addr()); + let peer_id = events.next_session_established().await.unwrap(); + assert_eq!(peer_id, *other_peer_handle2.peer_id()); + + // wait 2 seconds and check that other_peer2 is not rejected by TooManyPeers + tokio::time::sleep(Duration::from_secs(2)).await; + assert_eq!(handle.num_connected_peers(), 2); +} + +async fn new_random_peer( + max_in_bound: usize, + trusted_nodes: HashSet, +) -> NetworkManager { + let secret_key = SecretKey::new(&mut rand::thread_rng()); + let peers_config = + PeersConfig::default().with_max_inbound(max_in_bound).with_trusted_nodes(trusted_nodes); + + let config = NetworkConfigBuilder::new(secret_key) + .listener_port(0) + .disable_discovery() + .peer_config(peers_config) + .build(NoopProvider::default()); + + NetworkManager::new(config).await.unwrap() +} + #[tokio::test(flavor = "multi_thread")] async fn test_connect_many() { reth_tracing::init_test_tracing();