From 02be8121e023c1a68bb24a7a961d852c571edf81 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 13 Dec 2022 18:53:53 +0100 Subject: [PATCH] fix(disc): also emit discovered node when buckets are full (#414) --- crates/net/discv4/src/lib.rs | 46 ++++++++++++++++------------- crates/net/network/src/discovery.rs | 12 ++++---- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index b801a8780..209e3c602 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -16,7 +16,7 @@ //! This implementation consists of a [`Discv4`] and [`Discv4Service`] pair. The service manages the //! state and drives the UDP socket. The (optional) [`Discv4`] serves as the frontend to interact //! with the service via a channel. Whenever the underlying table changes service produces a -//! [`TableUpdate`] that listeners will receive. +//! [`DiscoveryUpdate`] that listeners will receive. use crate::{ error::{DecodePacketError, Discv4Error}, node::{kad_key, NodeKey}, @@ -25,7 +25,7 @@ use crate::{ use bytes::Bytes; use discv5::{ kbucket::{ - Distance, Entry as BucketEntry, InsertResult, KBucketsTable, NodeStatus, + Distance, Entry as BucketEntry, FailureReason, InsertResult, KBucketsTable, NodeStatus, MAX_NODES_PER_BUCKET, }, ConnectionDirection, ConnectionState, @@ -227,8 +227,8 @@ impl Discv4 { let _ = self.to_service.try_send(cmd); } - /// Returns the receiver half of new listener channel that streams [`TableUpdate`]s. - pub async fn update_stream(&self) -> Result, Discv4Error> { + /// Returns the receiver half of new listener channel that streams [`DiscoveryUpdate`]s. + pub async fn update_stream(&self) -> Result, Discv4Error> { let (tx, rx) = oneshot::channel(); let cmd = Discv4Command::Updates(tx); self.to_service.send(cmd).await?; @@ -273,7 +273,7 @@ pub struct Discv4Service { /// Commands listener commands_rx: Option>, /// All subscribers for table updates - update_listeners: Vec>, + update_listeners: Vec>, /// The interval when to trigger lookups lookup_interval: Interval, /// Used to rotate targets to lookup @@ -383,10 +383,10 @@ impl Discv4Service { /// in the DHT and populates its routing table with the closest proven neighbours. /// /// This is similar to adding all bootnodes via [`Self::add_node`], but does not fire a - /// [`TableUpdate::Added`] event for the given bootnodes. So boot nodes don't appear in the + /// [`DiscoveryUpdate::Added`] event for the given bootnodes. So boot nodes don't appear in the /// update stream, which is usually desirable, since bootnodes should not be connected to. /// - /// If adding the configured boodnodes should result in a [`TableUpdate::Added`], see + /// If adding the configured boodnodes should result in a [`DiscoveryUpdate::Added`], see /// [`Self::add_all_nodes`]. /// /// **Note:** This is a noop if there are no bootnodes. @@ -422,8 +422,8 @@ impl Discv4Service { }) } - /// Creates a new channel for [`TableUpdate`]s - pub fn update_stream(&mut self) -> ReceiverStream { + /// Creates a new channel for [`DiscoveryUpdate`]s + pub fn update_stream(&mut self) -> ReceiverStream { let (tx, rx) = mpsc::channel(512); self.update_listeners.push(tx); ReceiverStream::new(rx) @@ -495,7 +495,7 @@ impl Discv4Service { } /// Notifies all listeners - fn notify(&mut self, update: TableUpdate) { + fn notify(&mut self, update: DiscoveryUpdate) { self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) { Ok(()) => true, Err(err) => match err { @@ -513,7 +513,7 @@ impl Discv4Service { let key = kad_key(node_id); let removed = self.kbuckets.remove(&key); if removed { - self.notify(TableUpdate::Removed(node_id)); + self.notify(DiscoveryUpdate::Removed(node_id)); } removed } @@ -535,11 +535,15 @@ impl Discv4Service { ) { InsertResult::Inserted => { debug!(target : "net::disc",?record, "inserted new record to table"); - self.notify(TableUpdate::Added(record)); + self.notify(DiscoveryUpdate::Added(record)); } InsertResult::ValueUpdated { .. } | InsertResult::Updated { .. } => { trace!(target : "net::disc",?record, "updated record"); } + InsertResult::Failed(FailureReason::BucketFull) => { + debug!(target : "net::disc", ?record, "discovered new record but bucket is full"); + self.notify(DiscoveryUpdate::Discovered(record)); + } res => { warn!(target : "net::disc",?record, ?res, "failed to insert"); } @@ -1029,7 +1033,7 @@ pub(crate) async fn receive_loop(udp: Arc, tx: IngressSender, local_i /// The commands sent from the frontend to the service enum Discv4Command { Lookup { node_id: Option, tx: Option }, - Updates(OneshotSender>), + Updates(OneshotSender>), } /// Event type receiver produces @@ -1272,15 +1276,17 @@ enum PingReason { Lookup(NodeRecord, LookupContext), } -/// Represents state changes in the underlying node table +/// Represents node related updates state changes in the underlying node table #[derive(Debug, Clone)] -pub enum TableUpdate { - /// A new node was inserted to the table. +pub enum DiscoveryUpdate { + /// A new node was discovered _and_ added to the table. Added(NodeRecord), + /// A new node was discovered but _not_ added to the table because the bucket is full. + Discovered(NodeRecord), /// Node that was removed from the table Removed(PeerId), /// A series of updates - Batch(Vec), + Batch(Vec), } #[cfg(test)] @@ -1340,13 +1346,13 @@ mod tests { let mut table = HashMap::new(); while let Some(update) = updates.next().await { match update { - TableUpdate::Added(record) => { + DiscoveryUpdate::Added(record) => { table.insert(record.id, record); } - TableUpdate::Removed(id) => { + DiscoveryUpdate::Removed(id) => { table.remove(&id); } - TableUpdate::Batch(_) => {} + _ => {} } println!("total peers {}", table.len()); } diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index 7affd9eb7..ee50f2e79 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -2,7 +2,7 @@ use crate::error::NetworkError; use futures::StreamExt; -use reth_discv4::{Discv4, Discv4Config, NodeRecord, TableUpdate}; +use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config, NodeRecord}; use reth_primitives::PeerId; use secp256k1::SecretKey; use std::{ @@ -26,7 +26,7 @@ pub struct Discovery { /// Handler to interact with the Discovery v4 service _discv4: Discv4, /// All KAD table updates from the discv4 service. - discv4_updates: ReceiverStream, + discv4_updates: ReceiverStream, /// The initial config for the discv4 service _dsicv4_config: Discv4Config, /// Events buffered until polled. @@ -71,9 +71,9 @@ impl Discovery { self.local_enr.id } - fn on_discv4_update(&mut self, update: TableUpdate) { + fn on_discv4_update(&mut self, update: DiscoveryUpdate) { match update { - TableUpdate::Added(node) => { + DiscoveryUpdate::Added(node) | DiscoveryUpdate::Discovered(node) => { let id = node.id; let addr = node.tcp_addr(); match self.discovered_nodes.entry(id) { @@ -84,10 +84,10 @@ impl Discovery { } } } - TableUpdate::Removed(node) => { + DiscoveryUpdate::Removed(node) => { self.discovered_nodes.remove(&node); } - TableUpdate::Batch(updates) => { + DiscoveryUpdate::Batch(updates) => { for update in updates { self.on_discv4_update(update); }