From 2e47e9fb0d72c90857c88953aaffcb2cf34b9526 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 27 May 2024 14:27:34 +0200 Subject: [PATCH] feat: add udp packet ratelimiting (#8406) Co-authored-by: Federico Gimenez --- Cargo.lock | 1 + crates/net/discv4/Cargo.toml | 1 + crates/net/discv4/src/lib.rs | 90 +++++++++++++++++++++++++++++++++- crates/net/discv4/src/proto.rs | 2 +- 4 files changed, 92 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9aaedd530..60f18e1d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6701,6 +6701,7 @@ dependencies = [ "reth-network-types", "reth-primitives", "reth-tracing", + "schnellru", "secp256k1 0.28.2", "serde", "thiserror", diff --git a/crates/net/discv4/Cargo.toml b/crates/net/discv4/Cargo.toml index 13ef81f44..719ec83a6 100644 --- a/crates/net/discv4/Cargo.toml +++ b/crates/net/discv4/Cargo.toml @@ -33,6 +33,7 @@ tokio = { workspace = true, features = ["io-util", "net", "time"] } tokio-stream.workspace = true # misc +schnellru.workspace = true tracing.workspace = true thiserror.workspace = true parking_lot.workspace = true diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index 2a8990dee..8e2ff1251 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -45,7 +45,9 @@ use secp256k1::SecretKey; use std::{ cell::RefCell, collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque}, - fmt, io, + fmt, + future::poll_fn, + io, net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}, pin::Pin, rc::Rc, @@ -1796,7 +1798,13 @@ pub(crate) async fn send_loop(udp: Arc, rx: EgressReceiver) { } } +/// Rate limits the number of incoming packets from individual IPs to 1 packet/second +const MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP: usize = 60usize; + /// Continuously awaits new incoming messages and sends them back through the channel. +/// +/// The receive loop enforce primitive rate limiting for ips to prevent message spams from +/// individual IPs pub(crate) async fn receive_loop(udp: Arc, tx: IngressSender, local_id: PeerId) { let send = |event: IngressEvent| async { let _ = tx.send(event).await.map_err(|err| { @@ -1808,6 +1816,12 @@ pub(crate) async fn receive_loop(udp: Arc, tx: IngressSender, local_i }); }; + let mut cache = ReceiveCache::default(); + + // tick at half the rate of the limit + let tick = MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP / 2; + let mut interval = tokio::time::interval(Duration::from_secs(tick as u64)); + let mut buf = [0; MAX_PACKET_SIZE]; loop { let res = udp.recv_from(&mut buf).await; @@ -1817,6 +1831,12 @@ pub(crate) async fn receive_loop(udp: Arc, tx: IngressSender, local_i send(IngressEvent::RecvError(err)).await; } Ok((read, remote_addr)) => { + // rate limit incoming packets by IP + if cache.inc_ip(remote_addr.ip()) > MAX_INCOMING_PACKETS_PER_MINUTE_BY_IP { + trace!(target: "discv4", ?remote_addr, "Too many incoming packets from IP."); + continue + } + let packet = &buf[..read]; match Message::decode(packet) { Ok(packet) => { @@ -1825,6 +1845,13 @@ pub(crate) async fn receive_loop(udp: Arc, tx: IngressSender, local_i debug!(target: "discv4", ?remote_addr, "Received own packet."); continue } + + // skip if we've already received the same packet + if cache.contains_packet(packet.hash) { + debug!(target: "discv4", ?remote_addr, "Received duplicate packet."); + continue + } + send(IngressEvent::Packet(remote_addr, packet)).await; } Err(err) => { @@ -1834,6 +1861,67 @@ pub(crate) async fn receive_loop(udp: Arc, tx: IngressSender, local_i } } } + + // reset the tracked ips if the interval has passed + if poll_fn(|cx| match interval.poll_tick(cx) { + Poll::Ready(_) => Poll::Ready(true), + Poll::Pending => Poll::Ready(false), + }) + .await + { + cache.tick_ips(tick); + } + } +} + +/// A cache for received packets and their source address. +/// +/// This is used to discard duplicated packets and rate limit messages from the same source. +struct ReceiveCache { + /// keeps track of how many messages we've received from a given IP address since the last + /// tick. + /// + /// This is used to count the number of messages received from a given IP address within an + /// interval. + ip_messages: HashMap, + // keeps track of unique packet hashes + unique_packets: schnellru::LruMap, +} + +impl ReceiveCache { + /// Updates the counter for each IP address and removes IPs that have exceeded the limit. + /// + /// This will decrement the counter for each IP address and remove IPs that have reached 0. + fn tick_ips(&mut self, tick: usize) { + self.ip_messages.retain(|_, count| { + if let Some(reset) = count.checked_sub(tick) { + *count = reset; + true + } else { + false + } + }); + } + + /// Increases the counter for the given IP address and returns the new count. + fn inc_ip(&mut self, ip: IpAddr) -> usize { + let ctn = self.ip_messages.entry(ip).or_default(); + *ctn = ctn.saturating_add(1); + *ctn + } + + /// Returns true if we previously received the packet + fn contains_packet(&mut self, hash: B256) -> bool { + !self.unique_packets.insert(hash, ()) + } +} + +impl Default for ReceiveCache { + fn default() -> Self { + Self { + ip_messages: Default::default(), + unique_packets: schnellru::LruMap::new(schnellru::ByLength::new(32)), + } } } diff --git a/crates/net/discv4/src/proto.rs b/crates/net/discv4/src/proto.rs index 62dd9235d..be26487a6 100644 --- a/crates/net/discv4/src/proto.rs +++ b/crates/net/discv4/src/proto.rs @@ -72,7 +72,7 @@ pub enum Message { impl Message { /// Returns the id for this type - pub fn msg_type(&self) -> MessageId { + pub const fn msg_type(&self) -> MessageId { match self { Message::Ping(_) => MessageId::Ping, Message::Pong(_) => MessageId::Pong,