diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 166cbbe7a..f3bbf17fd 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -515,7 +515,10 @@ where .sessions_mut() .send_message(&peer_id, PeerMessage::PooledTransactions(msg)), NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => { - self.swarm.state_mut().add_peer_kind(peer, kind, addr); + // only add peer if we are not shutting down + if !self.swarm.is_shutting_down() { + self.swarm.state_mut().add_peer_kind(peer, kind, addr); + } } NetworkHandleMessage::RemovePeer(peer_id, kind) => { self.swarm.state_mut().remove_peer(peer_id, kind); @@ -731,6 +734,10 @@ where if let Some(reason) = reason { this.disconnect_metrics.increment(reason); } + this.metrics.backed_off_peers.set( + this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) + as f64, + ); this.event_listeners .send(NetworkEvent::SessionClosed { peer_id, reason }); } @@ -761,6 +768,10 @@ where this.metrics .incoming_connections .set(this.swarm.state().peers().num_inbound_connections() as f64); + this.metrics.backed_off_peers.set( + this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) + as f64, + ); } SwarmEvent::OutgoingPendingSessionClosed { remote_addr, @@ -795,6 +806,10 @@ where this.metrics .outgoing_connections .set(this.swarm.state().peers().num_outbound_connections() as f64); + this.metrics.backed_off_peers.set( + this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) + as f64, + ); } SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => { trace!( @@ -814,6 +829,10 @@ where this.metrics .outgoing_connections .set(this.swarm.state().peers().num_outbound_connections() as f64); + this.metrics.backed_off_peers.set( + this.swarm.state().peers().num_backed_off_peers().saturating_sub(1) + as f64, + ); } SwarmEvent::BadMessage { peer_id } => { this.swarm.state_mut().peers_mut().apply_reputation_change( diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index de692533a..1c111e4b4 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -9,6 +9,9 @@ pub struct NetworkMetrics { /// Number of currently connected peers pub(crate) connected_peers: Gauge, + /// Number of currently backed off peers + pub(crate) backed_off_peers: Gauge, + /// Number of peers known to the node pub(crate) tracked_peers: Gauge, diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 88fede007..16eae888d 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -1,7 +1,7 @@ use crate::{ error::{BackoffKind, SessionError}, peers::{ - reputation::{is_banned_reputation, BACKOFF_REPUTATION_CHANGE, DEFAULT_REPUTATION}, + reputation::{is_banned_reputation, DEFAULT_REPUTATION}, ReputationChangeWeights, DEFAULT_MAX_PEERS_INBOUND, DEFAULT_MAX_PEERS_OUTBOUND, }, session::{Direction, PendingSessionHandshakeError}, @@ -95,10 +95,12 @@ pub(crate) struct PeersManager { reputation_weights: ReputationChangeWeights, /// Tracks current slot stats. connection_info: ConnectionInfo, - /// Tracks unwanted ips/peer ids, + /// Tracks unwanted ips/peer ids. ban_list: BanList, - /// Interval at which to check for peers to unban. - unban_interval: Interval, + /// Tracks currently backed off peers. + backed_off_peers: HashMap, + /// Interval at which to check for peers to unban and release from the backoff map. + release_interval: Interval, /// How long to ban bad peers. ban_duration: Duration, /// How long peers to which we could not connect for non-fatal reasons, e.g. @@ -151,9 +153,10 @@ impl PeersManager { now + refill_slots_interval, refill_slots_interval, ), - unban_interval: tokio::time::interval_at(now + unban_interval, unban_interval), + release_interval: tokio::time::interval_at(now + unban_interval, unban_interval), connection_info, ban_list, + backed_off_peers: Default::default(), ban_duration, backoff_durations, connect_trusted_nodes_only, @@ -189,6 +192,12 @@ impl PeersManager { self.connection_info.num_outbound } + /// Returns the number of currently backed off peers. + #[inline] + pub(crate) fn num_backed_off_peers(&self) -> usize { + self.backed_off_peers.len() + } + /// Invoked when a new _incoming_ tcp connection is accepted. /// /// returns an error if the inbound ip address is on the ban list or @@ -245,7 +254,7 @@ impl PeersManager { /// be scheduled. pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) { // we only need to check the peer id here as the ip address will have been checked at - // on_inbound_pending_session + // on_inbound_pending_session. We also check if the peer is in the backoff list here. if self.ban_list.is_banned_peer(&peer_id) { self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id }); return @@ -289,7 +298,10 @@ impl PeersManager { fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) { trace!(target: "net::peers", ?peer_id, "backing off"); - self.ban_list.ban_peer_until(peer_id, until); + if let Some(peer) = self.peers.get_mut(&peer_id) { + peer.backed_off = true; + self.backed_off_peers.insert(peer_id, until); + } } /// Unbans the peer @@ -455,7 +467,7 @@ impl PeersManager { let mut backoff_until = None; if let Some(mut peer) = self.peers.get_mut(peer_id) { - let reputation_change = if let Some(kind) = err.should_backoff() { + if let Some(kind) = err.should_backoff() { // Increment peer.backoff_counter if kind.is_severe() { peer.severe_backoff_counter += 1; @@ -464,19 +476,18 @@ impl PeersManager { let backoff_time = self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter); - backoff_until = Some(backoff_time); - // The peer has signaled that it is currently unable to process any more // connections, so we will hold off on attempting any new connections for a // while - BACKOFF_REPUTATION_CHANGE.into() + backoff_until = Some(backoff_time); } else { - self.reputation_weights.change(reputation_change) + // If the error was not a backoff error, we reduce the peer's reputation + let reputation_change = self.reputation_weights.change(reputation_change); + peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32()); }; self.connection_info.decr_state(peer.state); peer.state = PeerConnectionState::Idle; - peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32()); } if let Some(backoff_until) = backoff_until { self.backoff_peer_until(*peer_id, backoff_until); @@ -610,7 +621,7 @@ impl PeersManager { /// Returns the idle peer with the highest reputation. /// /// Peers that are `trusted`, see [PeerKind], are prioritized as long as they're not currently - /// marked as banned. + /// marked as banned or backed off. /// /// If `connect_trusted_nodes_only` is enabled, see [PeersConfig], then this will only consider /// `trusted` peers. @@ -620,6 +631,7 @@ impl PeersManager { let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| { peer.state.is_unconnected() && !peer.is_banned() && + !peer.is_backed_off() && (!self.connect_trusted_nodes_only || peer.is_trusted()) }); @@ -705,8 +717,9 @@ impl PeersManager { } } - if self.unban_interval.poll_tick(cx).is_ready() { - let (_, unbanned_peers) = self.ban_list.evict(std::time::Instant::now()); + if self.release_interval.poll_tick(cx).is_ready() { + let now = std::time::Instant::now(); + let (_, unbanned_peers) = self.ban_list.evict(now); for peer_id in unbanned_peers { if let Some(peer) = self.peers.get_mut(&peer_id) { @@ -716,6 +729,18 @@ impl PeersManager { } self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id }); } + + // clear the backoff list of expired backoffs, and mark the relevant peers as + // ready to be dialed + self.backed_off_peers.retain(|peer_id, until| { + if now > *until { + if let Some(peer) = self.peers.get_mut(peer_id) { + peer.backed_off = false; + } + return false + } + true + }) } if self.refill_slots_interval.poll_tick(cx).is_ready() { @@ -817,6 +842,8 @@ pub struct Peer { remove_after_disconnect: bool, /// The kind of peer kind: PeerKind, + /// Whether the peer is currently backed off. + backed_off: bool, /// Counts number of times the peer was backed off due to a severe [BackoffKind]. severe_backoff_counter: u32, } @@ -840,6 +867,7 @@ impl Peer { fork_id: None, remove_after_disconnect: false, kind: Default::default(), + backed_off: false, severe_backoff_counter: 0, } } @@ -886,6 +914,11 @@ impl Peer { is_banned_reputation(self.reputation) } + #[inline] + fn is_backed_off(&self) -> bool { + self.backed_off + } + /// Unbans the peer by resetting its reputation #[inline] fn unban(&mut self) { @@ -1362,24 +1395,20 @@ mod test { }) .await; - assert!(peers.ban_list.is_banned_peer(&peer)); - assert!(peers.peers.get(&peer).is_some()); + assert!(peers.backed_off_peers.contains_key(&peer)); + assert!(peers.peers.get(&peer).unwrap().is_backed_off()); tokio::time::sleep(backoff_durations.low).await; - match event!(peers) { - PeerAction::UnBanPeer { peer_id, .. } => { - assert_eq!(peer_id, peer); - } - _ => unreachable!(), - } - match event!(peers) { PeerAction::Connect { peer_id, .. } => { assert_eq!(peer_id, peer); } _ => unreachable!(), } + + assert!(!peers.backed_off_peers.contains_key(&peer)); + assert!(!peers.peers.get(&peer).unwrap().is_backed_off()); } #[tokio::test] @@ -1429,24 +1458,20 @@ mod test { }) .await; - assert!(peers.ban_list.is_banned_peer(&peer)); - assert!(peers.peers.get(&peer).is_some()); + assert!(peers.backed_off_peers.contains_key(&peer)); + assert!(peers.peers.get(&peer).unwrap().is_backed_off()); tokio::time::sleep(backoff_durations.high).await; - match event!(peers) { - PeerAction::UnBanPeer { peer_id, .. } => { - assert_eq!(peer_id, peer); - } - _ => unreachable!(), - } - match event!(peers) { PeerAction::Connect { peer_id, .. } => { assert_eq!(peer_id, peer); } _ => unreachable!(), } + + assert!(!peers.backed_off_peers.contains_key(&peer)); + assert!(!peers.peers.get(&peer).unwrap().is_backed_off()); } #[tokio::test] diff --git a/crates/net/network/src/peers/reputation.rs b/crates/net/network/src/peers/reputation.rs index 671d2c874..a8fc04030 100644 --- a/crates/net/network/src/peers/reputation.rs +++ b/crates/net/network/src/peers/reputation.rs @@ -26,10 +26,6 @@ const BAD_MESSAGE_REPUTATION_CHANGE: i32 = 16 * REPUTATION_UNIT; /// The reputation change to apply to a peer which violates protocol rules: minimal reputation const BAD_PROTOCOL_REPUTATION_CHANGE: i32 = i32::MIN; -/// A reputation change to apply to backoff the peer. This has the same effect as marking the peer -/// as banned. -pub(crate) const BACKOFF_REPUTATION_CHANGE: i32 = i32::MIN; - /// Returns `true` if the given reputation is below the [`BANNED_REPUTATION`] threshold #[inline] pub(crate) fn is_banned_reputation(reputation: i32) -> bool { diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index 32d635bda..b6a4f1ed5 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -284,7 +284,7 @@ where /// Checks if the node's network connection state is 'ShuttingDown' #[inline] - fn is_shutting_down(&self) -> bool { + pub(crate) fn is_shutting_down(&self) -> bool { matches!(self.net_connection_state, NetworkConnectionState::ShuttingDown) } } diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index 0561b6bd1..5549ba615 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -532,6 +532,7 @@ async fn test_shutdown() { let mut expected_connections = HashSet::from([*handle1.peer_id(), *handle2.peer_id()]); let mut listener0 = NetworkEventStream::new(handle0.event_listener()); + let mut listener1 = NetworkEventStream::new(handle1.event_listener()); // Before shutting down, we have two connected peers let peer1 = listener0.next_session_established().await.unwrap(); @@ -549,10 +550,13 @@ async fn test_shutdown() { assert!(expected_connections.remove(&peer1)); assert!(expected_connections.remove(&peer2)); - // New connections are rejected + // Connected peers receive a shutdown signal + let (_peer, reason) = listener1.next_session_closed().await.unwrap(); + assert_eq!(reason, Some(DisconnectReason::ClientQuitting)); + + // New connections ignored handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); - let (_peer, reason) = listener0.next_session_closed().await.unwrap(); - assert_eq!(reason, Some(DisconnectReason::DisconnectRequested)); + assert_eq!(handle0.num_connected_peers(), 0); } #[tokio::test(flavor = "multi_thread")]