feat(net): temporarily ban bad peers (#492)

* feat(net): temporarily ban bad peers

* use half duration interval

* Update crates/net/network/src/peers/manager.rs

Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>

* fix bad test

Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2022-12-16 18:22:25 +01:00
committed by GitHub
parent 3989d5d3e0
commit c1f124d3e3
6 changed files with 301 additions and 39 deletions

View File

@ -32,6 +32,21 @@ impl BanList {
Self { banned_ips, banned_peers }
}
/// Removes all peers that are no longer banned.
pub fn evict_peers(&mut self, now: Instant) -> Vec<PeerId> {
let mut evicted = Vec::new();
self.banned_peers.retain(|peer, until| {
if let Some(until) = until {
if now > *until {
evicted.push(*peer);
return false
}
}
true
});
evicted
}
/// Removes all entries that should no longer be banned.
pub fn evict(&mut self, now: Instant) {
self.banned_ips.retain(|_, until| {

View File

@ -23,6 +23,8 @@ pub use client::FetchClient;
///
/// This type is hooked into the staged sync pipeline and delegates download request to available
/// peers and sends the response once ready.
///
/// This type maintains a list of connected peers that are available for requests.
pub struct StateFetcher {
/// Currently active [`GetBlockHeaders`] requests
inflight_headers_requests:
@ -30,7 +32,7 @@ pub struct StateFetcher {
/// Currently active [`GetBlockBodies`] requests
inflight_bodies_requests:
HashMap<PeerId, Request<Vec<H256>, PeerRequestResult<Vec<BlockBody>>>>,
/// The list of available peers for requests.
/// The list of _available_ peers for requests.
peers: HashMap<PeerId, Peer>,
/// The handle to the peers manager
peers_handle: PeersHandle,
@ -63,6 +65,9 @@ impl StateFetcher {
self.peers.insert(peer_id, Peer { state: PeerState::Idle, best_hash, best_number });
}
/// Removes the peer from the peer list, after which it is no longer available for future
/// requests.
///
/// Invoked when an active session was closed.
///
/// This cancels als inflight request and sends an error to the receiver.
@ -97,7 +102,7 @@ impl StateFetcher {
}
}
/// Returns the next idle peer that's ready to accept a request
/// Returns the _next_ idle peer that's ready to accept a request.
fn next_peer(&mut self) -> Option<(&PeerId, &mut Peer)> {
self.peers.iter_mut().find(|(_, peer)| peer.state.is_idle())
}
@ -181,7 +186,7 @@ impl StateFetcher {
/// Returns a new followup request for the peer.
///
/// Caution: this expects that the peer is _not_ closed
/// Caution: this expects that the peer is _not_ closed.
fn followup_request(&mut self, peer_id: PeerId) -> Option<BlockResponseOutcome> {
let req = self.queued_requests.pop_front()?;
let req = self.prepare_block_request(peer_id, req);

View File

@ -603,9 +603,9 @@ where
?error,
"Outgoing pending session failed"
);
let swarm = this.swarm.state_mut().peers_mut();
swarm.on_closed_outgoing_pending_session();
swarm.apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect);
let peers = this.swarm.state_mut().peers_mut();
peers.on_closed_outgoing_pending_session(&peer_id);
peers.apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect);
if error.map(|err| err.merits_discovery_ban()).unwrap_or_default() {
this.swarm.state_mut().ban_discovery(peer_id, remote_addr.ip());

View File

@ -1,6 +1,9 @@
use crate::{
error::{error_merits_discovery_ban, is_fatal_protocol_error},
peers::{reputation::BANNED_REPUTATION, ReputationChangeKind, ReputationChangeWeights},
peers::{
reputation::{is_banned_reputation, DEFAULT_REPUTATION},
ReputationChangeKind, ReputationChangeWeights,
},
};
use futures::StreamExt;
use reth_eth_wire::{error::EthStreamError, DisconnectReason};
@ -19,7 +22,7 @@ use tokio::{
time::{Instant, Interval},
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace};
use tracing::trace;
/// A communication channel to the [`PeersManager`] to apply manual changes to the peer set.
#[derive(Clone, Debug)]
@ -82,14 +85,24 @@ pub(crate) struct PeersManager {
connection_info: ConnectionInfo,
/// Tracks unwanted ips/peer ids,
ban_list: BanList,
/// Interval at which to check for peers to unban.
unban_interval: Interval,
/// How long to ban bad peers.
ban_duration: Duration,
}
impl PeersManager {
/// Create a new instance with the given config
pub(crate) fn new(config: PeersConfig) -> Self {
let PeersConfig { refill_slots_interval, connection_info, reputation_weights, ban_list } =
config;
let PeersConfig {
refill_slots_interval,
connection_info,
reputation_weights,
ban_list,
ban_duration,
} = config;
let (manager_tx, handle_rx) = mpsc::unbounded_channel();
let now = Instant::now();
Self {
peers: Default::default(),
manager_tx,
@ -97,11 +110,14 @@ impl PeersManager {
queued_actions: Default::default(),
reputation_weights,
refill_slots_interval: tokio::time::interval_at(
Instant::now() + refill_slots_interval,
now + refill_slots_interval,
refill_slots_interval,
),
// Use half of ban duration for interval
unban_interval: tokio::time::interval_at(now + ban_duration, ban_duration / 2),
connection_info,
ban_list,
ban_duration,
}
}
@ -135,8 +151,11 @@ impl PeersManager {
self.connection_info.decr_in()
}
/// Invoked when a pending session was closed.
pub(crate) fn on_closed_outgoing_pending_session(&mut self) {
/// Invoked when a pending outgoing session was closed.
pub(crate) fn on_closed_outgoing_pending_session(&mut self, peer_id: &PeerId) {
if let Some(mut peer) = self.peers.get_mut(peer_id) {
peer.state = PeerConnectionState::Idle;
}
self.connection_info.decr_out()
}
@ -169,29 +188,40 @@ impl PeersManager {
}
}
/// Bans the peer temporarily with the given timeout
fn ban_peer(&mut self, peer_id: PeerId) {
self.ban_list.ban_peer_until(peer_id, std::time::Instant::now() + self.ban_duration);
self.queued_actions.push_back(PeerAction::BanPeer { peer_id });
}
/// Unbans the peer
fn unban_peer(&mut self, peer_id: PeerId) {
self.ban_list.unban_peer(&peer_id);
self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
}
/// Apply the corresponding reputation change to the given peer
pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
let reputation_change = self.reputation_weights.change(rep);
let should_disconnect = if let Some(mut peer) = self.peers.get_mut(peer_id) {
// we add reputation since negative reputation change decrease total reputation
peer.reputation = peer.reputation.saturating_add(reputation_change.as_i32());
trace!(target: "net::peers", repuation=%peer.reputation, banned=%peer.is_banned(), "applied reputation change");
let should_disconnect = peer.state.is_connected() && peer.is_banned();
if should_disconnect {
debug!(target: "net::peers", repuation=%peer.reputation, "disconnecting peer on reputation change");
peer.state.disconnect();
}
should_disconnect
let outcome = if let Some(peer) = self.peers.get_mut(peer_id) {
peer.apply_reputation(reputation_change.as_i32())
} else {
false
return
};
if should_disconnect {
// start the disconnect process
self.queued_actions
.push_back(PeerAction::Disconnect { peer_id: *peer_id, reason: None })
match outcome {
ReputationChangeOutcome::None => {}
ReputationChangeOutcome::Ban => {
self.ban_peer(*peer_id);
}
ReputationChangeOutcome::Unban => self.unban_peer(*peer_id),
ReputationChangeOutcome::DisconnectAndBan => {
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id: *peer_id,
reason: Some(DisconnectReason::DisconnectRequested),
});
self.ban_peer(*peer_id);
}
}
}
@ -375,6 +405,17 @@ impl PeersManager {
self.fill_outbound_slots();
}
if self.unban_interval.poll_tick(cx).is_ready() {
for peer_id in self.ban_list.evict_peers(std::time::Instant::now()) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.unban();
} else {
continue
}
self.queued_actions.push_back(PeerAction::UnBanPeer { peer_id });
}
}
if self.queued_actions.is_empty() {
return Poll::Pending
}
@ -382,6 +423,12 @@ impl PeersManager {
}
}
impl Default for PeersManager {
fn default() -> Self {
PeersManager::new(Default::default())
}
}
/// Tracks stats about connected nodes
#[derive(Debug)]
pub struct ConnectionInfo {
@ -439,8 +486,8 @@ impl Default for ConnectionInfo {
}
}
#[derive(Debug, Clone)]
/// Tracks info about a single peer.
#[derive(Debug, Clone)]
pub struct Peer {
/// Where to reach the peer
addr: SocketAddr,
@ -460,18 +507,60 @@ impl Peer {
}
fn with_state(addr: SocketAddr, state: PeerConnectionState) -> Self {
Self { addr, state, reputation: 0, fork_id: None }
Self { addr, state, reputation: DEFAULT_REPUTATION, fork_id: None }
}
/// Applies a reputation change to the peer and returns what action should be taken.
fn apply_reputation(&mut self, reputation: i32) -> ReputationChangeOutcome {
let previous = self.reputation;
// we add reputation since negative reputation change decrease total reputation
self.reputation = previous.saturating_add(reputation);
trace!(target: "net::peers", repuation=%self.reputation, banned=%self.is_banned(), "applied reputation change");
if self.state.is_connected() && self.is_banned() {
self.state.disconnect();
return ReputationChangeOutcome::DisconnectAndBan
}
if self.is_banned() && !is_banned_reputation(previous) {
return ReputationChangeOutcome::Ban
}
if !self.is_banned() && is_banned_reputation(previous) {
return ReputationChangeOutcome::Unban
}
ReputationChangeOutcome::None
}
/// Returns true if the peer's reputation is below the banned threshold.
#[inline]
fn is_banned(&self) -> bool {
self.reputation < BANNED_REPUTATION
is_banned_reputation(self.reputation)
}
/// Unbans the peer by resetting its reputation
#[inline]
fn unban(&mut self) {
self.reputation = DEFAULT_REPUTATION
}
}
/// Outcomes when a reputation change is applied to a peer
enum ReputationChangeOutcome {
/// Nothing to do.
None,
/// Ban the peer.
Ban,
/// Ban and disconnect
DisconnectAndBan,
/// Unban the peer
Unban,
}
/// Represents the kind of connection established to the peer, if any
#[derive(Debug, Clone, Copy, Default)]
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
enum PeerConnectionState {
/// Not connected currently.
#[default]
@ -546,19 +635,25 @@ pub enum PeerAction {
},
/// Ban the peer in discovery.
DiscoveryBan { peer_id: PeerId, ip_addr: IpAddr },
/// Ban the peer temporarily
BanPeer { peer_id: PeerId },
/// Unban the peer temporarily
UnBanPeer { peer_id: PeerId },
}
/// Config type for initiating a [`PeersManager`] instance
#[derive(Debug)]
pub struct PeersConfig {
/// How often to recheck free slots for outbound connections
/// How often to recheck free slots for outbound connections.
pub refill_slots_interval: Duration,
/// Restrictions on connections
/// Restrictions on connections.
pub connection_info: ConnectionInfo,
/// How to weigh reputation changes
/// How to weigh reputation changes.
pub reputation_weights: ReputationChangeWeights,
/// Restrictions on PeerIds and Ips
/// Restrictions on PeerIds and Ips.
pub ban_list: BanList,
/// How long to ban bad peers.
pub ban_duration: Duration,
}
impl Default for PeersConfig {
@ -568,6 +663,8 @@ impl Default for PeersConfig {
connection_info: Default::default(),
reputation_weights: Default::default(),
ban_list: Default::default(),
// Ban peers for 12h
ban_duration: Duration::from_secs(60 * 60 * 12),
}
}
}
@ -626,7 +723,7 @@ mod test {
use crate::{
peers::{
manager::{ConnectionInfo, PeerConnectionState},
PeerAction,
PeerAction, ReputationChangeKind,
},
PeersConfig,
};
@ -634,9 +731,143 @@ mod test {
use reth_primitives::{PeerId, H512};
use std::{
collections::HashSet,
future::{poll_fn, Future},
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
task::{Context, Poll},
};
struct PeerActionFuture<'a> {
peers: &'a mut PeersManager,
}
impl<'a> Future for PeerActionFuture<'a> {
type Output = PeerAction;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.get_mut().peers.poll(cx)
}
}
macro_rules! event {
($peers:expr) => {
PeerActionFuture { peers: &mut $peers }.await
};
}
#[tokio::test]
async fn test_insert() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_ban() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.ban_peer(peer);
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
}
#[tokio::test]
async fn test_reputation_change() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
let p = peers.peers.get(&peer).unwrap();
assert!(p.is_banned());
match event!(peers) {
PeerAction::Disconnect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => {
unreachable!()
}
}
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_reputation_change_connected() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
match event!(peers) {
PeerAction::Connect { peer_id, remote_addr } => {
assert_eq!(peer_id, peer);
assert_eq!(remote_addr, socket_addr);
}
_ => unreachable!(),
}
let p = peers.peers.get_mut(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::Out);
peers.apply_reputation_change(&peer, ReputationChangeKind::BadProtocol);
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::DisconnectingOut);
assert!(p.is_banned());
peers.on_disconnected(&peer);
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::Idle);
assert!(p.is_banned());
match event!(peers) {
PeerAction::Disconnect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
}
#[tokio::test]
async fn test_discovery_ban_list() {
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));

View File

@ -3,6 +3,9 @@
/// The type that tracks the reputation score.
pub(crate) type Reputation = i32;
/// The default reputation of a peer
pub(crate) const DEFAULT_REPUTATION: Reputation = 0;
/// The minimal unit we're measuring reputation
const REPUTATION_UNIT: i32 = -1024;
@ -24,6 +27,12 @@ const BAD_MESSAGE_REPUTATION_CHANGE: i32 = 8 * REPUTATION_UNIT;
/// The reputation change to apply to a peer which violates protocol rules: minimal reputation
const BAD_PROTOCOL_REPUTATION_CHANGE: i32 = i32::MIN;
/// Returns `true` if the given reputation is below the [`BANNED_REPUTATION`] threshold
#[inline]
pub(crate) fn is_banned_reputation(reputation: i32) -> bool {
reputation < BANNED_REPUTATION
}
/// Various kinds of reputation changes.
#[derive(Debug, Copy, Clone)]
pub enum ReputationChangeKind {

View File

@ -285,6 +285,8 @@ where
self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None });
}
PeerAction::DiscoveryBan { peer_id, ip_addr } => self.ban_discovery(peer_id, ip_addr),
PeerAction::BanPeer { .. } => {}
PeerAction::UnBanPeer { .. } => {}
}
}