fix(net): temporarily backoff busy peers (#548)

* fix(net): temporarily backoff busy peers

* chore: rustfmt
This commit is contained in:
Matthias Seitz
2022-12-20 22:19:06 +01:00
committed by GitHub
parent ed4213fdff
commit b2abcddeda
3 changed files with 107 additions and 11 deletions

View File

@ -50,6 +50,9 @@ pub(crate) fn is_fatal_protocol_error(err: &EthStreamError) -> bool {
P2PStreamError::EmptyProtocolMessage |
P2PStreamError::ParseVersionError(_) |
P2PStreamError::Disconnected(DisconnectReason::UselessPeer) |
P2PStreamError::Disconnected(
DisconnectReason::IncompatibleP2PProtocolVersion
) |
P2PStreamError::MismatchedProtocolVersion { .. }
)
}

View File

@ -18,6 +18,7 @@ use std::{
time::Duration,
};
use crate::peers::reputation::BACKOFF_REPUTATION_CHANGE;
use thiserror::Error;
use tokio::{
sync::{mpsc, oneshot},
@ -91,6 +92,9 @@ pub(crate) struct PeersManager {
unban_interval: Interval,
/// How long to ban bad peers.
ban_duration: Duration,
/// How long peers to which we could not connect for non-fatal reasons, e.g.
/// [`DisconnectReason::TooManyPeers`], are put in time out.
backoff_duration: Duration,
}
impl PeersManager {
@ -102,9 +106,14 @@ impl PeersManager {
reputation_weights,
ban_list,
ban_duration,
backoff_duration,
} = config;
let (manager_tx, handle_rx) = mpsc::unbounded_channel();
let now = Instant::now();
// We use half of the interval to decrease the max duration to `150%` in worst case
let unban_interval = ban_duration.min(backoff_duration) / 2;
Self {
peers: Default::default(),
manager_tx,
@ -115,11 +124,11 @@ impl PeersManager {
now + refill_slots_interval,
refill_slots_interval,
),
// Use half of ban duration for interval
unban_interval: tokio::time::interval_at(now + ban_duration, ban_duration / 2),
unban_interval: tokio::time::interval_at(now + unban_interval, unban_interval),
connection_info,
ban_list,
ban_duration,
backoff_duration,
}
}
@ -196,6 +205,11 @@ impl PeersManager {
self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
}
/// Temporarily puts the peer in timeout
fn backoff_peer(&mut self, peer_id: PeerId) {
self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + self.backoff_duration);
}
/// Unbans the peer
fn unban_peer(&mut self, peer_id: PeerId) {
self.ban_list.unban_peer(&peer_id);
@ -288,11 +302,23 @@ impl PeersManager {
ip_addr: remote_addr.ip(),
})
}
} else if let Some(mut peer) = self.peers.get_mut(peer_id) {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
let reputation_change = self.reputation_weights.change(ReputationChangeKind::Dropped);
peer.reputation = peer.reputation.saturating_sub(reputation_change.as_i32());
} else {
let reputation_change = if let Some(DisconnectReason::TooManyPeers) =
err.as_disconnected()
{
// The peer has signaled that it is currently unable to process any more
// connections, so we will hold off on attempting any new connections for a while
self.backoff_peer(*peer_id);
BACKOFF_REPUTATION_CHANGE.into()
} else {
self.reputation_weights.change(ReputationChangeKind::Dropped)
};
if let Some(mut peer) = self.peers.get_mut(peer_id) {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
}
}
self.fill_outbound_slots();
@ -440,10 +466,6 @@ impl PeersManager {
}
}
if self.refill_slots_interval.poll_tick(cx).is_ready() {
self.fill_outbound_slots();
}
if self.unban_interval.poll_tick(cx).is_ready() {
for peer_id in self.ban_list.evict_peers(std::time::Instant::now()) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
@ -455,6 +477,10 @@ impl PeersManager {
}
}
if self.refill_slots_interval.poll_tick(cx).is_ready() {
self.fill_outbound_slots();
}
if self.queued_actions.is_empty() {
return Poll::Pending
}
@ -701,6 +727,9 @@ pub struct PeersConfig {
pub ban_list: BanList,
/// How long to ban bad peers.
pub ban_duration: Duration,
/// How long to backoff peers that are we failed to connect to for non-fatal reasons, such as
/// [`DisconnectReason::TooManyPeers`].
pub backoff_duration: Duration,
}
impl Default for PeersConfig {
@ -712,6 +741,8 @@ impl Default for PeersConfig {
ban_list: Default::default(),
// Ban peers for 12h
ban_duration: Duration::from_secs(60 * 60 * 12),
// backoff peers for 1h
backoff_duration: Duration::from_secs(60 * 60),
}
}
}
@ -786,6 +817,7 @@ mod test {
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
task::{Context, Poll},
time::Duration,
};
struct PeerActionFuture<'a> {
@ -844,6 +876,63 @@ mod test {
.await;
}
#[tokio::test]
async fn test_backoff_on_busy() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let backoff_duration = Duration::from_secs(3);
let config = PeersConfig { backoff_duration, ..Default::default() };
let mut peers = PeersManager::new(config);
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
peers.on_connection_dropped(
&socket_addr,
&peer,
&EthStreamError::P2PStreamError(P2PStreamError::Disconnected(
DisconnectReason::TooManyPeers,
)),
);
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
assert!(peers.ban_list.is_banned_peer(&peer));
assert!(peers.peers.get(&peer).is_some());
tokio::time::sleep(backoff_duration).await;
match event!(peers) {
PeerAction::UnBanPeer { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_ban_on_drop() {
let peer = PeerId::random();

View File

@ -27,6 +27,10 @@ const BAD_MESSAGE_REPUTATION_CHANGE: i32 = 8 * REPUTATION_UNIT;
/// The reputation change to apply to a peer which violates protocol rules: minimal reputation
const BAD_PROTOCOL_REPUTATION_CHANGE: i32 = i32::MIN;
/// A reputation change to apply to backoff the peer. This has the same effect as marking the peer
/// as banned.
pub(crate) const BACKOFF_REPUTATION_CHANGE: i32 = i32::MIN;
/// Returns `true` if the given reputation is below the [`BANNED_REPUTATION`] threshold
#[inline]
pub(crate) fn is_banned_reputation(reputation: i32) -> bool {