From 660ea0c9376c4b03793995f700ee36182cfdf781 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 11 Sep 2023 20:35:50 +0200 Subject: [PATCH] perf: limit number of new batch of dials (#4530) --- crates/net/network/src/peers/manager.rs | 54 ++++++++++++++++++++----- crates/net/network/src/peers/mod.rs | 3 ++ 2 files changed, 46 insertions(+), 11 deletions(-) diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 1c4dc44dc..a9a7e48bd 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -2,7 +2,8 @@ use crate::{ error::{BackoffKind, SessionError}, peers::{ reputation::{is_banned_reputation, DEFAULT_REPUTATION}, - ReputationChangeWeights, DEFAULT_MAX_PEERS_INBOUND, DEFAULT_MAX_PEERS_OUTBOUND, + ReputationChangeWeights, DEFAULT_MAX_CONCURRENT_DIALS, DEFAULT_MAX_PEERS_INBOUND, + DEFAULT_MAX_PEERS_OUTBOUND, }, session::{Direction, PendingSessionHandshakeError}, }; @@ -336,6 +337,7 @@ impl PeersManager { } } + /// Returns the tracked reputation for a peer. pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option { self.peers.get(peer_id).map(|peer| peer.reputation) } @@ -370,17 +372,19 @@ impl PeersManager { } } - /// Gracefully disconnected a pending session + /// Gracefully disconnected a pending _outgoing_ session pub(crate) fn on_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) { if let Some(peer) = self.peers.get_mut(peer_id) { peer.state = PeerConnectionState::Idle; } else { return } - self.connection_info.decr_out() + + self.connection_info.decr_out(); } - /// Invoked when a pending outgoing session was closed during authentication or the handshake. + /// Invoked when an _outgoing_ pending session was closed during authentication or the + /// handshake. pub(crate) fn on_pending_session_dropped( &mut self, remote_addr: &SocketAddr, @@ -428,7 +432,8 @@ impl PeersManager { self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped) } - /// Called when an attempt to create a pending session failed while setting up a tcp connection. + /// Called when an attempt to create an _outgoing_ pending session failed while setting up a tcp + /// connection. pub(crate) fn on_outgoing_connection_failure( &mut self, remote_addr: &SocketAddr, @@ -513,7 +518,8 @@ impl PeersManager { self.fill_outbound_slots(); } - /// Invoked if a session was disconnected because there's already a connection to the peer. + /// Invoked if a pending session was disconnected because there's already a connection to the + /// peer. /// /// If the session was an outgoing connection, this means that the peer initiated a connection /// to us at the same time and this connection is already established. @@ -580,8 +586,6 @@ impl PeersManager { // disconnecting, See `on_incoming_session_established` peer.remove_after_disconnect = false; } - - return } Entry::Vacant(entry) => { trace!(target : "net::peers", ?peer_id, ?addr, "discovered new node"); @@ -591,8 +595,6 @@ impl PeersManager { self.queued_actions.push_back(PeerAction::PeerAdded(peer_id)); } } - - self.fill_outbound_slots(); } /// Removes the tracked node from the set. @@ -681,6 +683,7 @@ impl PeersManager { self.tick(); // as long as there a slots available try to fill them with the best peers + let mut new_outbound_dials = 1; while self.connection_info.has_out_capacity() { let action = { let (peer_id, peer) = match self.best_unconnected() { @@ -700,7 +703,13 @@ impl PeersManager { }; self.connection_info.inc_out(); + self.queued_actions.push_back(action); + + new_outbound_dials += 1; + if new_outbound_dials > self.connection_info.max_concurrent_outbound_dials { + break + } } } @@ -778,7 +787,7 @@ impl Default for PeersManager { /// Tracks stats about connected nodes #[derive(Debug, Clone, PartialEq)] -#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize), serde(default))] pub struct ConnectionInfo { /// Counter for currently occupied slots for active outbound connections. #[cfg_attr(feature = "serde", serde(skip))] @@ -790,6 +799,9 @@ pub struct ConnectionInfo { max_outbound: usize, /// Maximum allowed inbound connections. max_inbound: usize, + /// Maximum allowed concurrent outbound dials. + #[cfg_attr(feature = "serde", serde(default))] + max_concurrent_outbound_dials: usize, } // === impl ConnectionInfo === @@ -837,6 +849,7 @@ impl Default for ConnectionInfo { num_inbound: 0, max_outbound: DEFAULT_MAX_PEERS_OUTBOUND, max_inbound: DEFAULT_MAX_PEERS_INBOUND, + max_concurrent_outbound_dials: DEFAULT_MAX_CONCURRENT_DIALS, } } } @@ -2172,4 +2185,23 @@ mod test { assert_eq!(peer.state, PeerConnectionState::Idle); assert!(!peer.remove_after_disconnect); } + + #[tokio::test] + async fn test_max_concurrent_dials() { + let config = PeersConfig::default(); + let mut peer_manager = PeersManager::new(config); + let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)); + let socket_addr = SocketAddr::new(ip, 8008); + for _ in 0..peer_manager.connection_info.max_concurrent_outbound_dials * 2 { + peer_manager.add_peer(PeerId::random(), socket_addr, None); + } + + peer_manager.fill_outbound_slots(); + let dials = peer_manager + .queued_actions + .iter() + .filter(|ev| matches!(ev, PeerAction::Connect { .. })) + .count(); + assert_eq!(dials, peer_manager.connection_info.max_concurrent_outbound_dials); + } } diff --git a/crates/net/network/src/peers/mod.rs b/crates/net/network/src/peers/mod.rs index 80f58b01b..fb28d4013 100644 --- a/crates/net/network/src/peers/mod.rs +++ b/crates/net/network/src/peers/mod.rs @@ -13,3 +13,6 @@ pub(crate) const DEFAULT_MAX_PEERS_OUTBOUND: usize = 100; /// Maximum number of available slots for inbound sessions. pub(crate) const DEFAULT_MAX_PEERS_INBOUND: usize = 30; + +/// Maximum number of available slots concurrent outgoing dials. +pub(crate) const DEFAULT_MAX_CONCURRENT_DIALS: usize = 10;