perf: limit number of new batch of dials (#4530)

This commit is contained in:
Matthias Seitz
2023-09-11 20:35:50 +02:00
committed by GitHub
parent 6beb3c4322
commit 660ea0c937
2 changed files with 46 additions and 11 deletions

View File

@ -2,7 +2,8 @@ use crate::{
error::{BackoffKind, SessionError},
peers::{
reputation::{is_banned_reputation, DEFAULT_REPUTATION},
ReputationChangeWeights, DEFAULT_MAX_PEERS_INBOUND, DEFAULT_MAX_PEERS_OUTBOUND,
ReputationChangeWeights, DEFAULT_MAX_CONCURRENT_DIALS, DEFAULT_MAX_PEERS_INBOUND,
DEFAULT_MAX_PEERS_OUTBOUND,
},
session::{Direction, PendingSessionHandshakeError},
};
@ -336,6 +337,7 @@ impl PeersManager {
}
}
/// Returns the tracked reputation for a peer.
pub(crate) fn get_reputation(&self, peer_id: &PeerId) -> Option<i32> {
self.peers.get(peer_id).map(|peer| peer.reputation)
}
@ -370,17 +372,19 @@ impl PeersManager {
}
}
/// Gracefully disconnected a pending session
/// Gracefully disconnected a pending _outgoing_ session
pub(crate) fn on_pending_session_gracefully_closed(&mut self, peer_id: &PeerId) {
if let Some(peer) = self.peers.get_mut(peer_id) {
peer.state = PeerConnectionState::Idle;
} else {
return
}
self.connection_info.decr_out()
self.connection_info.decr_out();
}
/// Invoked when a pending outgoing session was closed during authentication or the handshake.
/// Invoked when an _outgoing_ pending session was closed during authentication or the
/// handshake.
pub(crate) fn on_pending_session_dropped(
&mut self,
remote_addr: &SocketAddr,
@ -428,7 +432,8 @@ impl PeersManager {
self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::Dropped)
}
/// Called when an attempt to create a pending session failed while setting up a tcp connection.
/// Called when an attempt to create an _outgoing_ pending session failed while setting up a tcp
/// connection.
pub(crate) fn on_outgoing_connection_failure(
&mut self,
remote_addr: &SocketAddr,
@ -513,7 +518,8 @@ impl PeersManager {
self.fill_outbound_slots();
}
/// Invoked if a session was disconnected because there's already a connection to the peer.
/// Invoked if a pending session was disconnected because there's already a connection to the
/// peer.
///
/// If the session was an outgoing connection, this means that the peer initiated a connection
/// to us at the same time and this connection is already established.
@ -580,8 +586,6 @@ impl PeersManager {
// disconnecting, See `on_incoming_session_established`
peer.remove_after_disconnect = false;
}
return
}
Entry::Vacant(entry) => {
trace!(target : "net::peers", ?peer_id, ?addr, "discovered new node");
@ -591,8 +595,6 @@ impl PeersManager {
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
}
}
self.fill_outbound_slots();
}
/// Removes the tracked node from the set.
@ -681,6 +683,7 @@ impl PeersManager {
self.tick();
// as long as there a slots available try to fill them with the best peers
let mut new_outbound_dials = 1;
while self.connection_info.has_out_capacity() {
let action = {
let (peer_id, peer) = match self.best_unconnected() {
@ -700,7 +703,13 @@ impl PeersManager {
};
self.connection_info.inc_out();
self.queued_actions.push_back(action);
new_outbound_dials += 1;
if new_outbound_dials > self.connection_info.max_concurrent_outbound_dials {
break
}
}
}
@ -778,7 +787,7 @@ impl Default for PeersManager {
/// Tracks stats about connected nodes
#[derive(Debug, Clone, PartialEq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize), serde(default))]
pub struct ConnectionInfo {
/// Counter for currently occupied slots for active outbound connections.
#[cfg_attr(feature = "serde", serde(skip))]
@ -790,6 +799,9 @@ pub struct ConnectionInfo {
max_outbound: usize,
/// Maximum allowed inbound connections.
max_inbound: usize,
/// Maximum allowed concurrent outbound dials.
#[cfg_attr(feature = "serde", serde(default))]
max_concurrent_outbound_dials: usize,
}
// === impl ConnectionInfo ===
@ -837,6 +849,7 @@ impl Default for ConnectionInfo {
num_inbound: 0,
max_outbound: DEFAULT_MAX_PEERS_OUTBOUND,
max_inbound: DEFAULT_MAX_PEERS_INBOUND,
max_concurrent_outbound_dials: DEFAULT_MAX_CONCURRENT_DIALS,
}
}
}
@ -2172,4 +2185,23 @@ mod test {
assert_eq!(peer.state, PeerConnectionState::Idle);
assert!(!peer.remove_after_disconnect);
}
#[tokio::test]
async fn test_max_concurrent_dials() {
let config = PeersConfig::default();
let mut peer_manager = PeersManager::new(config);
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));
let socket_addr = SocketAddr::new(ip, 8008);
for _ in 0..peer_manager.connection_info.max_concurrent_outbound_dials * 2 {
peer_manager.add_peer(PeerId::random(), socket_addr, None);
}
peer_manager.fill_outbound_slots();
let dials = peer_manager
.queued_actions
.iter()
.filter(|ev| matches!(ev, PeerAction::Connect { .. }))
.count();
assert_eq!(dials, peer_manager.connection_info.max_concurrent_outbound_dials);
}
}