fix(disc): also emit discovered node when buckets are full (#414)

This commit is contained in:
Matthias Seitz
2022-12-13 18:53:53 +01:00
committed by GitHub
parent 2ace52a8dd
commit 02be8121e0
2 changed files with 32 additions and 26 deletions

View File

@ -16,7 +16,7 @@
//! This implementation consists of a [`Discv4`] and [`Discv4Service`] pair. The service manages the
//! state and drives the UDP socket. The (optional) [`Discv4`] serves as the frontend to interact
//! with the service via a channel. Whenever the underlying table changes service produces a
//! [`TableUpdate`] that listeners will receive.
//! [`DiscoveryUpdate`] that listeners will receive.
use crate::{
error::{DecodePacketError, Discv4Error},
node::{kad_key, NodeKey},
@ -25,7 +25,7 @@ use crate::{
use bytes::Bytes;
use discv5::{
kbucket::{
Distance, Entry as BucketEntry, InsertResult, KBucketsTable, NodeStatus,
Distance, Entry as BucketEntry, FailureReason, InsertResult, KBucketsTable, NodeStatus,
MAX_NODES_PER_BUCKET,
},
ConnectionDirection, ConnectionState,
@ -227,8 +227,8 @@ impl Discv4 {
let _ = self.to_service.try_send(cmd);
}
/// Returns the receiver half of new listener channel that streams [`TableUpdate`]s.
pub async fn update_stream(&self) -> Result<ReceiverStream<TableUpdate>, Discv4Error> {
/// Returns the receiver half of new listener channel that streams [`DiscoveryUpdate`]s.
pub async fn update_stream(&self) -> Result<ReceiverStream<DiscoveryUpdate>, Discv4Error> {
let (tx, rx) = oneshot::channel();
let cmd = Discv4Command::Updates(tx);
self.to_service.send(cmd).await?;
@ -273,7 +273,7 @@ pub struct Discv4Service {
/// Commands listener
commands_rx: Option<mpsc::Receiver<Discv4Command>>,
/// All subscribers for table updates
update_listeners: Vec<mpsc::Sender<TableUpdate>>,
update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
/// The interval when to trigger lookups
lookup_interval: Interval,
/// Used to rotate targets to lookup
@ -383,10 +383,10 @@ impl Discv4Service {
/// in the DHT and populates its routing table with the closest proven neighbours.
///
/// This is similar to adding all bootnodes via [`Self::add_node`], but does not fire a
/// [`TableUpdate::Added`] event for the given bootnodes. So boot nodes don't appear in the
/// [`DiscoveryUpdate::Added`] event for the given bootnodes. So boot nodes don't appear in the
/// update stream, which is usually desirable, since bootnodes should not be connected to.
///
/// If adding the configured boodnodes should result in a [`TableUpdate::Added`], see
/// If adding the configured boodnodes should result in a [`DiscoveryUpdate::Added`], see
/// [`Self::add_all_nodes`].
///
/// **Note:** This is a noop if there are no bootnodes.
@ -422,8 +422,8 @@ impl Discv4Service {
})
}
/// Creates a new channel for [`TableUpdate`]s
pub fn update_stream(&mut self) -> ReceiverStream<TableUpdate> {
/// Creates a new channel for [`DiscoveryUpdate`]s
pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
let (tx, rx) = mpsc::channel(512);
self.update_listeners.push(tx);
ReceiverStream::new(rx)
@ -495,7 +495,7 @@ impl Discv4Service {
}
/// Notifies all listeners
fn notify(&mut self, update: TableUpdate) {
fn notify(&mut self, update: DiscoveryUpdate) {
self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
Ok(()) => true,
Err(err) => match err {
@ -513,7 +513,7 @@ impl Discv4Service {
let key = kad_key(node_id);
let removed = self.kbuckets.remove(&key);
if removed {
self.notify(TableUpdate::Removed(node_id));
self.notify(DiscoveryUpdate::Removed(node_id));
}
removed
}
@ -535,11 +535,15 @@ impl Discv4Service {
) {
InsertResult::Inserted => {
debug!(target : "net::disc",?record, "inserted new record to table");
self.notify(TableUpdate::Added(record));
self.notify(DiscoveryUpdate::Added(record));
}
InsertResult::ValueUpdated { .. } | InsertResult::Updated { .. } => {
trace!(target : "net::disc",?record, "updated record");
}
InsertResult::Failed(FailureReason::BucketFull) => {
debug!(target : "net::disc", ?record, "discovered new record but bucket is full");
self.notify(DiscoveryUpdate::Discovered(record));
}
res => {
warn!(target : "net::disc",?record, ?res, "failed to insert");
}
@ -1029,7 +1033,7 @@ pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_i
/// The commands sent from the frontend to the service
enum Discv4Command {
Lookup { node_id: Option<PeerId>, tx: Option<NodeRecordSender> },
Updates(OneshotSender<ReceiverStream<TableUpdate>>),
Updates(OneshotSender<ReceiverStream<DiscoveryUpdate>>),
}
/// Event type receiver produces
@ -1272,15 +1276,17 @@ enum PingReason {
Lookup(NodeRecord, LookupContext),
}
/// Represents state changes in the underlying node table
/// Represents node related updates state changes in the underlying node table
#[derive(Debug, Clone)]
pub enum TableUpdate {
/// A new node was inserted to the table.
pub enum DiscoveryUpdate {
/// A new node was discovered _and_ added to the table.
Added(NodeRecord),
/// A new node was discovered but _not_ added to the table because the bucket is full.
Discovered(NodeRecord),
/// Node that was removed from the table
Removed(PeerId),
/// A series of updates
Batch(Vec<TableUpdate>),
Batch(Vec<DiscoveryUpdate>),
}
#[cfg(test)]
@ -1340,13 +1346,13 @@ mod tests {
let mut table = HashMap::new();
while let Some(update) = updates.next().await {
match update {
TableUpdate::Added(record) => {
DiscoveryUpdate::Added(record) => {
table.insert(record.id, record);
}
TableUpdate::Removed(id) => {
DiscoveryUpdate::Removed(id) => {
table.remove(&id);
}
TableUpdate::Batch(_) => {}
_ => {}
}
println!("total peers {}", table.len());
}

View File

@ -2,7 +2,7 @@
use crate::error::NetworkError;
use futures::StreamExt;
use reth_discv4::{Discv4, Discv4Config, NodeRecord, TableUpdate};
use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config, NodeRecord};
use reth_primitives::PeerId;
use secp256k1::SecretKey;
use std::{
@ -26,7 +26,7 @@ pub struct Discovery {
/// Handler to interact with the Discovery v4 service
_discv4: Discv4,
/// All KAD table updates from the discv4 service.
discv4_updates: ReceiverStream<TableUpdate>,
discv4_updates: ReceiverStream<DiscoveryUpdate>,
/// The initial config for the discv4 service
_dsicv4_config: Discv4Config,
/// Events buffered until polled.
@ -71,9 +71,9 @@ impl Discovery {
self.local_enr.id
}
fn on_discv4_update(&mut self, update: TableUpdate) {
fn on_discv4_update(&mut self, update: DiscoveryUpdate) {
match update {
TableUpdate::Added(node) => {
DiscoveryUpdate::Added(node) | DiscoveryUpdate::Discovered(node) => {
let id = node.id;
let addr = node.tcp_addr();
match self.discovered_nodes.entry(id) {
@ -84,10 +84,10 @@ impl Discovery {
}
}
}
TableUpdate::Removed(node) => {
DiscoveryUpdate::Removed(node) => {
self.discovered_nodes.remove(&node);
}
TableUpdate::Batch(updates) => {
DiscoveryUpdate::Batch(updates) => {
for update in updates {
self.on_discv4_update(update);
}