feat: rate limit incoming ips (#12153)

This commit is contained in:
Matthias Seitz
2024-10-29 03:09:11 +01:00
committed by GitHub
parent fbe04625b9
commit cc2a33cfc0
2 changed files with 63 additions and 9 deletions

View File

@ -24,6 +24,9 @@ pub const DEFAULT_MAX_COUNT_PEERS_INBOUND: u32 = 30;
/// This restricts how many outbound dials can be performed concurrently.
pub const DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS: usize = 15;
/// A temporary timeout for ips on incoming connection attempts.
pub const INBOUND_IP_THROTTLE_DURATION: Duration = Duration::from_secs(30);
/// The durations to use when a backoff should be applied to a peer.
///
/// See also [`BackoffKind`].
@ -155,6 +158,11 @@ pub struct PeersConfig {
///
/// The backoff duration increases with number of backoff attempts.
pub backoff_durations: PeerBackoffDurations,
/// How long to temporarily ban ips on incoming connection attempts.
///
/// This acts as an IP based rate limit.
#[cfg_attr(feature = "serde", serde(default, with = "humantime_serde"))]
pub incoming_ip_throttle_duration: Duration,
}
impl Default for PeersConfig {
@ -171,6 +179,7 @@ impl Default for PeersConfig {
trusted_nodes_only: false,
basic_nodes: Default::default(),
max_backoff_count: 5,
incoming_ip_throttle_duration: INBOUND_IP_THROTTLE_DURATION,
}
}
}

View File

@ -84,6 +84,8 @@ pub struct PeersManager {
max_backoff_count: u8,
/// Tracks the connection state of the node
net_connection_state: NetworkConnectionState,
/// How long to temporarily ban ip on an incoming connection attempt.
incoming_ip_throttle_duration: Duration,
}
impl PeersManager {
@ -100,6 +102,7 @@ impl PeersManager {
trusted_nodes_only,
basic_nodes,
max_backoff_count,
incoming_ip_throttle_duration,
} = config;
let (manager_tx, handle_rx) = mpsc::unbounded_channel();
let now = Instant::now();
@ -148,6 +151,7 @@ impl PeersManager {
last_tick: Instant::now(),
max_backoff_count,
net_connection_state: NetworkConnectionState::default(),
incoming_ip_throttle_duration,
}
}
@ -265,6 +269,9 @@ impl PeersManager {
return Err(InboundConnectionError::ExceedsCapacity)
}
// apply the rate limit
self.throttle_incoming_ip(addr);
self.connection_info.inc_pending_in();
Ok(())
}
@ -383,6 +390,12 @@ impl PeersManager {
self.ban_list.ban_ip_until(ip, std::time::Instant::now() + self.ban_duration);
}
/// Bans the IP temporarily to rate limit inbound connection attempts per IP.
fn throttle_incoming_ip(&mut self, ip: IpAddr) {
self.ban_list
.ban_ip_until(ip, std::time::Instant::now() + self.incoming_ip_throttle_duration);
}
/// Temporarily puts the peer in timeout by inserting it into the backedoff peers set
fn backoff_peer_until(&mut self, peer_id: PeerId, until: std::time::Instant) {
trace!(target: "net::peers", ?peer_id, "backing off");
@ -1129,15 +1142,6 @@ impl Display for InboundConnectionError {
#[cfg(test)]
mod tests {
use std::{
future::{poll_fn, Future},
io,
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use alloy_primitives::B512;
use reth_eth_wire::{
errors::{EthHandshakeError, EthStreamError, P2PHandshakeError, P2PStreamError},
@ -1149,6 +1153,14 @@ mod tests {
use reth_network_types::{
peers::reputation::DEFAULT_REPUTATION, BackoffKind, ReputationChangeKind,
};
use std::{
future::{poll_fn, Future},
io,
net::{IpAddr, Ipv4Addr, SocketAddr},
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use url::Host;
use super::PeersManager;
@ -2330,6 +2342,39 @@ mod tests {
);
}
#[tokio::test]
async fn test_incoming_rate_limit() {
let config = PeersConfig {
incoming_ip_throttle_duration: Duration::from_millis(100),
..PeersConfig::test()
};
let mut peers = PeersManager::new(config);
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(168, 0, 1, 2)), 8009);
assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
assert_eq!(
peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
InboundConnectionError::IpBanned
);
peers.release_interval.reset_immediately();
tokio::time::sleep(peers.incoming_ip_throttle_duration).await;
// await unban
poll_fn(|cx| loop {
if peers.poll(cx).is_pending() {
return Poll::Ready(());
}
})
.await;
assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
assert_eq!(
peers.on_incoming_pending_session(addr.ip()).unwrap_err(),
InboundConnectionError::IpBanned
);
}
#[tokio::test]
async fn test_tick() {
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2));