fix: track established bonds (#4229)

This commit is contained in:
Matthias Seitz
2023-08-16 18:17:20 +02:00
committed by GitHub
parent d643d03030
commit b232e05b26
3 changed files with 106 additions and 35 deletions

View File

@ -31,7 +31,7 @@ pub struct Discv4Config {
/// The number of allowed failures for `FindNode` requests. Default: 5.
pub max_find_node_failures: u8,
/// The interval to use when checking for expired nodes that need to be re-pinged. Default:
/// 300sec, 5min.
/// 10min.
pub ping_interval: Duration,
/// The duration of we consider a ping timed out.
pub ping_expiration: Duration,
@ -69,6 +69,8 @@ pub struct Discv4Config {
/// If configured and a `external_ip_resolver` is configured, try to resolve the external ip
/// using this interval.
pub resolve_external_ip_interval: Option<Duration>,
/// The duration after which we consider a bond expired.
pub bond_expiration: Duration,
}
impl Discv4Config {
@ -121,16 +123,17 @@ impl Default for Discv4Config {
/// Every outgoing request will eventually lead to an incoming response
udp_ingress_message_buffer: 1024,
max_find_node_failures: 5,
ping_interval: Duration::from_secs(300),
ping_interval: Duration::from_secs(60 * 10),
/// unified expiration and timeout durations, mirrors geth's `expiration` duration
ping_expiration: Duration::from_secs(20),
bond_expiration: Duration::from_secs(60 * 60),
enr_expiration: Duration::from_secs(20),
neighbours_expiration: Duration::from_secs(20),
request_timeout: Duration::from_secs(20),
lookup_interval: Duration::from_secs(20),
ban_list: Default::default(),
ban_duration: Some(Duration::from_secs(3600)), // 1 hour
ban_duration: Some(Duration::from_secs(60 * 60)), // 1 hour
bootstrap_nodes: Default::default(),
enable_dht_random_walk: true,
enable_lookup: true,
@ -200,6 +203,12 @@ impl Discv4ConfigBuilder {
self
}
/// Sets the expiration duration for a bond with a peer
pub fn bond_expiration(&mut self, duration: Duration) -> &mut Self {
self.config.bond_expiration = duration;
self
}
/// Whether to discover random nodes in the DHT.
pub fn enable_dht_random_walk(&mut self, enable_dht_random_walk: bool) -> &mut Self {
self.config.enable_dht_random_walk = enable_dht_random_walk;

View File

@ -77,12 +77,15 @@ pub use config::{Discv4Config, Discv4ConfigBuilder};
mod node;
use node::{kad_key, NodeKey};
mod table;
// reexport NodeRecord primitive
pub use reth_primitives::NodeRecord;
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;
use crate::table::PongTable;
use reth_net_nat::ResolveNatInterval;
/// reexport to get public ip.
pub use reth_net_nat::{external_ip, NatResolver};
@ -117,6 +120,9 @@ const SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS: usize = (MAX_PACKET_SIZE - 109) / 91;
/// Mirrors geth's `bondExpiration` of 24h
const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
/// Duration used to expire nodes from the routing table 1hr
const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
@ -405,6 +411,10 @@ pub struct Discv4Service {
config: Discv4Config,
/// Buffered events populated during poll.
queued_events: VecDeque<Discv4Event>,
/// Keeps track of nodes from which we have received a `Pong` message.
received_pongs: PongTable,
/// Interval used to expire additionally tracked nodes
expire_interval: Interval,
}
impl Discv4Service {
@ -498,6 +508,8 @@ impl Discv4Service {
resolve_external_ip_interval: config.resolve_external_ip_interval(),
config,
queued_events: Default::default(),
received_pongs: Default::default(),
expire_interval: tokio::time::interval(EXPIRE_DURATION),
}
}
@ -582,7 +594,7 @@ impl Discv4Service {
) {
InsertResult::Failed(_) => {}
_ => {
self.try_ping(record, PingReason::Initial);
self.try_ping(record, PingReason::InitialInsert);
}
}
}
@ -735,6 +747,16 @@ impl Discv4Service {
self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
}
/// Check if the peer has a bond
fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
if timestamp.elapsed() < self.config.bond_expiration {
return true
}
}
false
}
/// Update the entry on RE-ping
///
/// On re-ping we check for a changed enr_seq if eip868 is enabled and when it changed we sent a
@ -860,7 +882,7 @@ impl Discv4Service {
}
_ => return false,
}
self.try_ping(record, PingReason::Initial);
self.try_ping(record, PingReason::InitialInsert);
true
}
@ -903,6 +925,7 @@ impl Discv4Service {
// Note: we only mark if the node is absent because the `last 12h` condition is handled by
// the ping interval
let mut is_new_insert = false;
let mut needs_bond = false;
let old_enr = match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, _) => entry.value_mut().update_with_enr(ping.enr_sq),
@ -928,13 +951,14 @@ impl Discv4Service {
// full, we can't add any additional peers to that bucket, but we still want
// to emit an event that we discovered the node
debug!(target : "discv4", ?record, "discovered new record but bucket is full");
self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record))
self.notify(DiscoveryUpdate::DiscoveredAtCapacity(record));
needs_bond = true;
}
BucketInsertResult::FailedFilter |
BucketInsertResult::TooManyIncoming |
BucketInsertResult::NodeExists => {
BucketInsertResult::TooManyIncoming | BucketInsertResult::NodeExists => {
needs_bond = true;
// insert unsuccessful but we still want to send the pong
}
BucketInsertResult::FailedFilter => return,
}
None
@ -955,7 +979,9 @@ impl Discv4Service {
// if node was absent also send a ping to establish the endpoint proof from our end
if is_new_insert {
self.try_ping(record, PingReason::Initial);
self.try_ping(record, PingReason::InitialInsert);
} else if needs_bond {
self.try_ping(record, PingReason::EstablishBond);
} else {
// Request ENR if included in the ping
match (ping.enr_sq, old_enr) {
@ -1053,10 +1079,16 @@ impl Discv4Service {
Entry::Vacant(_) => return,
};
// keep track of the pong
self.received_pongs.on_pong(remote_id, remote_addr.ip());
match reason {
PingReason::Initial => {
PingReason::InitialInsert => {
self.update_on_pong(node, pong.enr_sq);
}
PingReason::EstablishBond => {
// nothing to do here
}
PingReason::RePing => {
self.update_on_reping(node, pong.enr_sq);
}
@ -1079,25 +1111,13 @@ impl Discv4Service {
// ping's expiration timestamp is in the past
return
}
if node_id == *self.local_peer_id() {
// ignore find node requests to ourselves
return
}
let key = kad_key(node_id);
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(_, status) => {
if status.is_connected() {
self.respond_closest(msg.id, remote_addr)
}
}
kbucket::Entry::Pending(_, status) => {
if status.is_connected() {
self.respond_closest(msg.id, remote_addr)
}
}
kbucket::Entry::Absent(_) => {
// no existing endpoint proof
// > To guard against traffic amplification attacks, Neighbors replies should only be sent if the sender of FindNode has been verified by the endpoint proof procedure.
}
kbucket::Entry::SelfEntry => {}
if self.has_bond(node_id, remote_addr.ip()) {
self.respond_closest(msg.id, remote_addr)
}
}
@ -1144,8 +1164,7 @@ impl Discv4Service {
return
}
let key = kad_key(id);
if let BucketEntry::Present(_, _) = self.kbuckets.entry(&key) {
if self.has_bond(id, remote_addr.ip()) {
self.send_packet(
Message::EnrResponse(EnrResponse {
request_hash,
@ -1543,10 +1562,15 @@ impl Discv4Service {
self.ping_buffered();
// evict expired nodes
if self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
self.evict_expired_requests(Instant::now())
}
// evict expired nodes
while self.expire_interval.poll_tick(cx).is_ready() {
self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION)
}
if self.queued_events.is_empty() {
return Poll::Pending
}
@ -1956,8 +1980,11 @@ impl NodeEntry {
/// Represents why a ping is issued
enum PingReason {
/// Initial ping to a previously unknown peer.
Initial,
/// Initial ping to a previously unknown peer that was inserted into the table.
InitialInsert,
/// Initial ping to a previously unknown peer that didn't fit into the table. But we still want
/// to establish a bond.
EstablishBond,
/// Re-ping a peer..
RePing,
/// Part of a lookup to ensure endpoint is proven.
@ -2315,7 +2342,7 @@ mod tests {
sent_at: Instant::now(),
node: service_2.local_node_record,
echo_hash,
reason: PingReason::Initial,
reason: PingReason::InitialInsert,
};
service_1.pending_pings.insert(*service_2.local_peer_id(), ping_request);

View File

@ -0,0 +1,35 @@
//! Additional support for tracking nodes.
use reth_primitives::PeerId;
use std::{collections::HashMap, net::IpAddr, time::Instant};
/// Keeps track of nodes from which we have received a `Pong` message.
#[derive(Debug, Clone, Default)]
pub(crate) struct PongTable {
/// The nodes we have received a `Pong` from.
nodes: HashMap<NodeKey, Instant>,
}
impl PongTable {
/// Updates the timestamp we received a `Pong` from the given node.
pub(crate) fn on_pong(&mut self, remote_id: PeerId, remote_ip: IpAddr) {
let key = NodeKey { remote_id, remote_ip };
self.nodes.insert(key, Instant::now());
}
/// Returns the timestamp we received a `Pong` from the given node.
pub(crate) fn last_pong(&self, remote_id: PeerId, remote_ip: IpAddr) -> Option<Instant> {
self.nodes.get(&NodeKey { remote_id, remote_ip }).copied()
}
/// Removes all nodes from the table that have not sent a `Pong` for at least `timeout`.
pub(crate) fn evict_expired(&mut self, now: Instant, timeout: std::time::Duration) {
self.nodes.retain(|_, last_pong| now - *last_pong < timeout);
}
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub(crate) struct NodeKey {
pub(crate) remote_id: PeerId,
pub(crate) remote_ip: IpAddr,
}