From b232e05b2645e11c21be61e114fba14bea1045fe Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 16 Aug 2023 18:17:20 +0200 Subject: [PATCH] fix: track established bonds (#4229) --- crates/net/discv4/src/config.rs | 15 ++++-- crates/net/discv4/src/lib.rs | 91 +++++++++++++++++++++------------ crates/net/discv4/src/table.rs | 35 +++++++++++++ 3 files changed, 106 insertions(+), 35 deletions(-) create mode 100644 crates/net/discv4/src/table.rs diff --git a/crates/net/discv4/src/config.rs b/crates/net/discv4/src/config.rs index 924a7b623..7a3addc39 100644 --- a/crates/net/discv4/src/config.rs +++ b/crates/net/discv4/src/config.rs @@ -31,7 +31,7 @@ pub struct Discv4Config { /// The number of allowed failures for `FindNode` requests. Default: 5. pub max_find_node_failures: u8, /// The interval to use when checking for expired nodes that need to be re-pinged. Default: - /// 300sec, 5min. + /// 10min. pub ping_interval: Duration, /// The duration of we consider a ping timed out. pub ping_expiration: Duration, @@ -69,6 +69,8 @@ pub struct Discv4Config { /// If configured and a `external_ip_resolver` is configured, try to resolve the external ip /// using this interval. pub resolve_external_ip_interval: Option, + /// The duration after which we consider a bond expired. + pub bond_expiration: Duration, } impl Discv4Config { @@ -121,16 +123,17 @@ impl Default for Discv4Config { /// Every outgoing request will eventually lead to an incoming response udp_ingress_message_buffer: 1024, max_find_node_failures: 5, - ping_interval: Duration::from_secs(300), + ping_interval: Duration::from_secs(60 * 10), /// unified expiration and timeout durations, mirrors geth's `expiration` duration ping_expiration: Duration::from_secs(20), + bond_expiration: Duration::from_secs(60 * 60), enr_expiration: Duration::from_secs(20), neighbours_expiration: Duration::from_secs(20), request_timeout: Duration::from_secs(20), lookup_interval: Duration::from_secs(20), ban_list: Default::default(), - ban_duration: Some(Duration::from_secs(3600)), // 1 hour + ban_duration: Some(Duration::from_secs(60 * 60)), // 1 hour bootstrap_nodes: Default::default(), enable_dht_random_walk: true, enable_lookup: true, @@ -200,6 +203,12 @@ impl Discv4ConfigBuilder { self } + /// Sets the expiration duration for a bond with a peer + pub fn bond_expiration(&mut self, duration: Duration) -> &mut Self { + self.config.bond_expiration = duration; + self + } + /// Whether to discover random nodes in the DHT. pub fn enable_dht_random_walk(&mut self, enable_dht_random_walk: bool) -> &mut Self { self.config.enable_dht_random_walk = enable_dht_random_walk; diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index f5bff545f..1ddedb1b3 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -77,12 +77,15 @@ pub use config::{Discv4Config, Discv4ConfigBuilder}; mod node; use node::{kad_key, NodeKey}; +mod table; + // reexport NodeRecord primitive pub use reth_primitives::NodeRecord; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; +use crate::table::PongTable; use reth_net_nat::ResolveNatInterval; /// reexport to get public ip. pub use reth_net_nat::{external_ip, NatResolver}; @@ -117,6 +120,9 @@ const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91; /// Mirrors geth's `bondExpiration` of 24h const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60); +/// Duration used to expire nodes from the routing table 1hr +const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60); + type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>; type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>; @@ -405,6 +411,10 @@ pub struct Discv4Service { config: Discv4Config, /// Buffered events populated during poll. queued_events: VecDeque, + /// Keeps track of nodes from which we have received a `Pong` message. + received_pongs: PongTable, + /// Interval used to expire additionally tracked nodes + expire_interval: Interval, } impl Discv4Service { @@ -498,6 +508,8 @@ impl Discv4Service { resolve_external_ip_interval: config.resolve_external_ip_interval(), config, queued_events: Default::default(), + received_pongs: Default::default(), + expire_interval: tokio::time::interval(EXPIRE_DURATION), } } @@ -582,7 +594,7 @@ impl Discv4Service { ) { InsertResult::Failed(_) => {} _ => { - self.try_ping(record, PingReason::Initial); + self.try_ping(record, PingReason::InitialInsert); } } } @@ -735,6 +747,16 @@ impl Discv4Service { self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected()) } + /// Check if the peer has a bond + fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool { + if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) { + if timestamp.elapsed() < self.config.bond_expiration { + return true + } + } + false + } + /// Update the entry on RE-ping /// /// On re-ping we check for a changed enr_seq if eip868 is enabled and when it changed we sent a @@ -860,7 +882,7 @@ impl Discv4Service { } _ => return false, } - self.try_ping(record, PingReason::Initial); + self.try_ping(record, PingReason::InitialInsert); true } @@ -903,6 +925,7 @@ impl Discv4Service { // Note: we only mark if the node is absent because the `last 12h` condition is handled by // the ping interval let mut is_new_insert = false; + let mut needs_bond = false; let old_enr = match self.kbuckets.entry(&key) { kbucket::Entry::Present(mut entry, _) => entry.value_mut().update_with_enr(ping.enr_sq), @@ -928,13 +951,14 @@ impl Discv4Service { // full, we can't add any additional peers to that bucket, but we still want // to emit an event that we discovered the node debug!(target : "discv4", ?record, "discovered new record but bucket is full"); - self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record)) + self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record)); + needs_bond = true; } - BucketInsertResult::FailedFilter | - BucketInsertResult::TooManyIncoming | - BucketInsertResult::NodeExists => { + BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => { + needs_bond = true; // insert unsuccessful but we still want to send the pong } + BucketInsertResult::FailedFilter => return, } None @@ -955,7 +979,9 @@ impl Discv4Service { // if node was absent also send a ping to establish the endpoint proof from our end if is_new_insert { - self.try_ping(record, PingReason::Initial); + self.try_ping(record, PingReason::InitialInsert); + } else if needs_bond { + self.try_ping(record, PingReason::EstablishBond); } else { // Request ENR if included in the ping match (ping.enr_sq, old_enr) { @@ -1053,10 +1079,16 @@ impl Discv4Service { Entry::Vacant(_) => return, }; + // keep track of the pong + self.received_pongs.on_pong(remote_id, remote_addr.ip()); + match reason { - PingReason::Initial => { + PingReason::InitialInsert => { self.update_on_pong(node, pong.enr_sq); } + PingReason::EstablishBond => { + // nothing to do here + } PingReason::RePing => { self.update_on_reping(node, pong.enr_sq); } @@ -1079,25 +1111,13 @@ impl Discv4Service { // ping's expiration timestamp is in the past return } + if node_id == *self.local_peer_id() { + // ignore find node requests to ourselves + return + } - let key = kad_key(node_id); - - match self.kbuckets.entry(&key) { - kbucket::Entry::Present(_, status) => { - if status.is_connected() { - self.respond_closest(msg.id, remote_addr) - } - } - kbucket::Entry::Pending(_, status) => { - if status.is_connected() { - self.respond_closest(msg.id, remote_addr) - } - } - kbucket::Entry::Absent(_) => { - // no existing endpoint proof - // > To guard against traffic amplification attacks, Neighbors replies should only be sent if the sender of FindNode has been verified by the endpoint proof procedure. - } - kbucket::Entry::SelfEntry => {} + if self.has_bond(node_id, remote_addr.ip()) { + self.respond_closest(msg.id, remote_addr) } } @@ -1144,8 +1164,7 @@ impl Discv4Service { return } - let key = kad_key(id); - if let BucketEntry::Present(_, _) = self.kbuckets.entry(&key) { + if self.has_bond(id, remote_addr.ip()) { self.send_packet( Message::EnrResponse(EnrResponse { request_hash, @@ -1543,10 +1562,15 @@ impl Discv4Service { self.ping_buffered(); // evict expired nodes - if self.evict_expired_requests_interval.poll_tick(cx).is_ready() { + while self.evict_expired_requests_interval.poll_tick(cx).is_ready() { self.evict_expired_requests(Instant::now()) } + // evict expired nodes + while self.expire_interval.poll_tick(cx).is_ready() { + self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION) + } + if self.queued_events.is_empty() { return Poll::Pending } @@ -1956,8 +1980,11 @@ impl NodeEntry { /// Represents why a ping is issued enum PingReason { - /// Initial ping to a previously unknown peer. - Initial, + /// Initial ping to a previously unknown peer that was inserted into the table. + InitialInsert, + /// Initial ping to a previously unknown peer that didn't fit into the table. But we still want + /// to establish a bond. + EstablishBond, /// Re-ping a peer.. RePing, /// Part of a lookup to ensure endpoint is proven. @@ -2315,7 +2342,7 @@ mod tests { sent_at: Instant::now(), node: service_2.local_node_record, echo_hash, - reason: PingReason::Initial, + reason: PingReason::InitialInsert, }; service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request); diff --git a/crates/net/discv4/src/table.rs b/crates/net/discv4/src/table.rs new file mode 100644 index 000000000..c7d75778c --- /dev/null +++ b/crates/net/discv4/src/table.rs @@ -0,0 +1,35 @@ +//! Additional support for tracking nodes. + +use reth_primitives::PeerId; +use std::{collections::HashMap, net::IpAddr, time::Instant}; + +/// Keeps track of nodes from which we have received a `Pong` message. +#[derive(Debug, Clone, Default)] +pub(crate) struct PongTable { + /// The nodes we have received a `Pong` from. + nodes: HashMap, +} + +impl PongTable { + /// Updates the timestamp we received a `Pong` from the given node. + pub(crate) fn on_pong(&mut self, remote_id: PeerId, remote_ip: IpAddr) { + let key = NodeKey { remote_id, remote_ip }; + self.nodes.insert(key, Instant::now()); + } + + /// Returns the timestamp we received a `Pong` from the given node. + pub(crate) fn last_pong(&self, remote_id: PeerId, remote_ip: IpAddr) -> Option { + self.nodes.get(&NodeKey { remote_id, remote_ip }).copied() + } + + /// Removes all nodes from the table that have not sent a `Pong` for at least `timeout`. + pub(crate) fn evict_expired(&mut self, now: Instant, timeout: std::time::Duration) { + self.nodes.retain(|_, last_pong| now - *last_pong < timeout); + } +} + +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub(crate) struct NodeKey { + pub(crate) remote_id: PeerId, + pub(crate) remote_ip: IpAddr, +}