feat(net): fix peer backoffs to not interfere with reputation (#2422)

This commit is contained in:
mempirate
2023-04-27 15:53:12 +02:00
committed by GitHub
parent e3afe02b07
commit 93d3bd9dcd
6 changed files with 90 additions and 43 deletions

View File

@ -515,7 +515,10 @@ where
.sessions_mut()
.send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
self.swarm.state_mut().add_peer_kind(peer, kind, addr);
// only add peer if we are not shutting down
if !self.swarm.is_shutting_down() {
self.swarm.state_mut().add_peer_kind(peer, kind, addr);
}
}
NetworkHandleMessage::RemovePeer(peer_id, kind) => {
self.swarm.state_mut().remove_peer(peer_id, kind);
@ -731,6 +734,10 @@ where
if let Some(reason) = reason {
this.disconnect_metrics.increment(reason);
}
this.metrics.backed_off_peers.set(
this.swarm.state().peers().num_backed_off_peers().saturating_sub(1)
as f64,
);
this.event_listeners
.send(NetworkEvent::SessionClosed { peer_id, reason });
}
@ -761,6 +768,10 @@ where
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
this.metrics.backed_off_peers.set(
this.swarm.state().peers().num_backed_off_peers().saturating_sub(1)
as f64,
);
}
SwarmEvent::OutgoingPendingSessionClosed {
remote_addr,
@ -795,6 +806,10 @@ where
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
this.metrics.backed_off_peers.set(
this.swarm.state().peers().num_backed_off_peers().saturating_sub(1)
as f64,
);
}
SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
trace!(
@ -814,6 +829,10 @@ where
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
this.metrics.backed_off_peers.set(
this.swarm.state().peers().num_backed_off_peers().saturating_sub(1)
as f64,
);
}
SwarmEvent::BadMessage { peer_id } => {
this.swarm.state_mut().peers_mut().apply_reputation_change(

View File

@ -9,6 +9,9 @@ pub struct NetworkMetrics {
/// Number of currently connected peers
pub(crate) connected_peers: Gauge,
/// Number of currently backed off peers
pub(crate) backed_off_peers: Gauge,
/// Number of peers known to the node
pub(crate) tracked_peers: Gauge,

View File

@ -1,7 +1,7 @@
use crate::{
error::{BackoffKind, SessionError},
peers::{
reputation::{is_banned_reputation, BACKOFF_REPUTATION_CHANGE, DEFAULT_REPUTATION},
reputation::{is_banned_reputation, DEFAULT_REPUTATION},
ReputationChangeWeights, DEFAULT_MAX_PEERS_INBOUND, DEFAULT_MAX_PEERS_OUTBOUND,
},
session::{Direction, PendingSessionHandshakeError},
@ -95,10 +95,12 @@ pub(crate) struct PeersManager {
reputation_weights: ReputationChangeWeights,
/// Tracks current slot stats.
connection_info: ConnectionInfo,
/// Tracks unwanted ips/peer ids,
/// Tracks unwanted ips/peer ids.
ban_list: BanList,
/// Interval at which to check for peers to unban.
unban_interval: Interval,
/// Tracks currently backed off peers.
backed_off_peers: HashMap<PeerId, std::time::Instant>,
/// Interval at which to check for peers to unban and release from the backoff map.
release_interval: Interval,
/// How long to ban bad peers.
ban_duration: Duration,
/// How long peers to which we could not connect for non-fatal reasons, e.g.
@ -151,9 +153,10 @@ impl PeersManager {
now + refill_slots_interval,
refill_slots_interval,
),
unban_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
release_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
connection_info,
ban_list,
backed_off_peers: Default::default(),
ban_duration,
backoff_durations,
connect_trusted_nodes_only,
@ -189,6 +192,12 @@ impl PeersManager {
self.connection_info.num_outbound
}
/// Returns the number of currently backed off peers.
#[inline]
pub(crate) fn num_backed_off_peers(&self) -> usize {
self.backed_off_peers.len()
}
/// Invoked when a new _incoming_ tcp connection is accepted.
///
/// returns an error if the inbound ip address is on the ban list or
@ -245,7 +254,7 @@ impl PeersManager {
/// be scheduled.
pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
// we only need to check the peer id here as the ip address will have been checked at
// on_inbound_pending_session
// on_inbound_pending_session. We also check if the peer is in the backoff list here.
if self.ban_list.is_banned_peer(&peer_id) {
self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
return
@ -289,7 +298,10 @@ impl PeersManager {
fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
trace!(target: "net::peers", ?peer_id, "backing off");
self.ban_list.ban_peer_until(peer_id, until);
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.backed_off = true;
self.backed_off_peers.insert(peer_id, until);
}
}
/// Unbans the peer
@ -455,7 +467,7 @@ impl PeersManager {
let mut backoff_until = None;
if let Some(mut peer) = self.peers.get_mut(peer_id) {
let reputation_change = if let Some(kind) = err.should_backoff() {
if let Some(kind) = err.should_backoff() {
// Increment peer.backoff_counter
if kind.is_severe() {
peer.severe_backoff_counter += 1;
@ -464,19 +476,18 @@ impl PeersManager {
let backoff_time =
self.backoff_durations.backoff_until(kind, peer.severe_backoff_counter);
backoff_until = Some(backoff_time);
// The peer has signaled that it is currently unable to process any more
// connections, so we will hold off on attempting any new connections for a
// while
BACKOFF_REPUTATION_CHANGE.into()
backoff_until = Some(backoff_time);
} else {
self.reputation_weights.change(reputation_change)
// If the error was not a backoff error, we reduce the peer's reputation
let reputation_change = self.reputation_weights.change(reputation_change);
peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
};
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
}
if let Some(backoff_until) = backoff_until {
self.backoff_peer_until(*peer_id, backoff_until);
@ -610,7 +621,7 @@ impl PeersManager {
/// Returns the idle peer with the highest reputation.
///
/// Peers that are `trusted`, see [PeerKind], are prioritized as long as they're not currently
/// marked as banned.
/// marked as banned or backed off.
///
/// If `connect_trusted_nodes_only` is enabled, see [PeersConfig], then this will only consider
/// `trusted` peers.
@ -620,6 +631,7 @@ impl PeersManager {
let mut unconnected = self.peers.iter_mut().filter(|(_, peer)| {
peer.state.is_unconnected() &&
!peer.is_banned() &&
!peer.is_backed_off() &&
(!self.connect_trusted_nodes_only || peer.is_trusted())
});
@ -705,8 +717,9 @@ impl PeersManager {
}
}
if self.unban_interval.poll_tick(cx).is_ready() {
let (_, unbanned_peers) = self.ban_list.evict(std::time::Instant::now());
if self.release_interval.poll_tick(cx).is_ready() {
let now = std::time::Instant::now();
let (_, unbanned_peers) = self.ban_list.evict(now);
for peer_id in unbanned_peers {
if let Some(peer) = self.peers.get_mut(&peer_id) {
@ -716,6 +729,18 @@ impl PeersManager {
}
self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
}
// clear the backoff list of expired backoffs, and mark the relevant peers as
// ready to be dialed
self.backed_off_peers.retain(|peer_id, until| {
if now > *until {
if let Some(peer) = self.peers.get_mut(peer_id) {
peer.backed_off = false;
}
return false
}
true
})
}
if self.refill_slots_interval.poll_tick(cx).is_ready() {
@ -817,6 +842,8 @@ pub struct Peer {
remove_after_disconnect: bool,
/// The kind of peer
kind: PeerKind,
/// Whether the peer is currently backed off.
backed_off: bool,
/// Counts number of times the peer was backed off due to a severe [BackoffKind].
severe_backoff_counter: u32,
}
@ -840,6 +867,7 @@ impl Peer {
fork_id: None,
remove_after_disconnect: false,
kind: Default::default(),
backed_off: false,
severe_backoff_counter: 0,
}
}
@ -886,6 +914,11 @@ impl Peer {
is_banned_reputation(self.reputation)
}
#[inline]
fn is_backed_off(&self) -> bool {
self.backed_off
}
/// Unbans the peer by resetting its reputation
#[inline]
fn unban(&mut self) {
@ -1362,24 +1395,20 @@ mod test {
})
.await;
assert!(peers.ban_list.is_banned_peer(&peer));
assert!(peers.peers.get(&peer).is_some());
assert!(peers.backed_off_peers.contains_key(&peer));
assert!(peers.peers.get(&peer).unwrap().is_backed_off());
tokio::time::sleep(backoff_durations.low).await;
match event!(peers) {
PeerAction::UnBanPeer { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
assert!(!peers.backed_off_peers.contains_key(&peer));
assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
}
#[tokio::test]
@ -1429,24 +1458,20 @@ mod test {
})
.await;
assert!(peers.ban_list.is_banned_peer(&peer));
assert!(peers.peers.get(&peer).is_some());
assert!(peers.backed_off_peers.contains_key(&peer));
assert!(peers.peers.get(&peer).unwrap().is_backed_off());
tokio::time::sleep(backoff_durations.high).await;
match event!(peers) {
PeerAction::UnBanPeer { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
assert!(!peers.backed_off_peers.contains_key(&peer));
assert!(!peers.peers.get(&peer).unwrap().is_backed_off());
}
#[tokio::test]

View File

@ -26,10 +26,6 @@ const BAD_MESSAGE_REPUTATION_CHANGE: i32 = 16 * REPUTATION_UNIT;
/// The reputation change to apply to a peer which violates protocol rules: minimal reputation
const BAD_PROTOCOL_REPUTATION_CHANGE: i32 = i32::MIN;
/// A reputation change to apply to backoff the peer. This has the same effect as marking the peer
/// as banned.
pub(crate) const BACKOFF_REPUTATION_CHANGE: i32 = i32::MIN;
/// Returns `true` if the given reputation is below the [`BANNED_REPUTATION`] threshold
#[inline]
pub(crate) fn is_banned_reputation(reputation: i32) -> bool {

View File

@ -284,7 +284,7 @@ where
/// Checks if the node's network connection state is 'ShuttingDown'
#[inline]
fn is_shutting_down(&self) -> bool {
pub(crate) fn is_shutting_down(&self) -> bool {
matches!(self.net_connection_state, NetworkConnectionState::ShuttingDown)
}
}

View File

@ -532,6 +532,7 @@ async fn test_shutdown() {
let mut expected_connections = HashSet::from([*handle1.peer_id(), *handle2.peer_id()]);
let mut listener0 = NetworkEventStream::new(handle0.event_listener());
let mut listener1 = NetworkEventStream::new(handle1.event_listener());
// Before shutting down, we have two connected peers
let peer1 = listener0.next_session_established().await.unwrap();
@ -549,10 +550,13 @@ async fn test_shutdown() {
assert!(expected_connections.remove(&peer1));
assert!(expected_connections.remove(&peer2));
// New connections are rejected
// Connected peers receive a shutdown signal
let (_peer, reason) = listener1.next_session_closed().await.unwrap();
assert_eq!(reason, Some(DisconnectReason::ClientQuitting));
// New connections ignored
handle0.add_peer(*handle1.peer_id(), handle1.local_addr());
let (_peer, reason) = listener0.next_session_closed().await.unwrap();
assert_eq!(reason, Some(DisconnectReason::DisconnectRequested));
assert_eq!(handle0.num_connected_peers(), 0);
}
#[tokio::test(flavor = "multi_thread")]