From 6d1e8a2ecccba6b91ca3052d3ad0b0705830ec03 Mon Sep 17 00:00:00 2001 From: "Supernovahs.eth" <91280922+supernovahs@users.noreply.github.com> Date: Mon, 17 Jul 2023 15:12:28 +0530 Subject: [PATCH] feat:new discovered node record event stream (#3707) Co-authored-by: Matthias Seitz --- crates/net/network/src/discovery.rs | 43 ++++++++++++++++++++++------- crates/net/network/src/manager.rs | 10 ++++++- crates/net/network/src/network.rs | 15 ++++++++-- crates/net/network/src/state.rs | 13 ++++++++- 4 files changed, 67 insertions(+), 14 deletions(-) diff --git a/crates/net/network/src/discovery.rs b/crates/net/network/src/discovery.rs index 0bf64e7c0..5af257956 100644 --- a/crates/net/network/src/discovery.rs +++ b/crates/net/network/src/discovery.rs @@ -1,6 +1,9 @@ //! Discovery support for the network. -use crate::error::{NetworkError, ServiceKind}; +use crate::{ + error::{NetworkError, ServiceKind}, + manager::DiscoveredEvent, +}; use futures::StreamExt; use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config, EnrForkIdEntry}; use reth_dns_discovery::{ @@ -14,12 +17,14 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::task::JoinHandle; +use tokio::{sync::mpsc, task::JoinHandle}; use tokio_stream::wrappers::ReceiverStream; /// An abstraction over the configured discovery protocol. /// -/// Listens for new discovered nodes and emits events for discovered nodes and their address. +/// Listens for new discovered nodes and emits events for discovered nodes and their +/// address.#[derive(Debug, Clone)] + pub struct Discovery { /// All nodes discovered via discovery protocol. /// @@ -41,6 +46,8 @@ pub struct Discovery { _dns_disc_service: Option>, /// Events buffered until polled. queued_events: VecDeque, + /// List of listeners subscribed to discovery events. + discovery_listeners: Vec>, } impl Discovery { @@ -84,6 +91,7 @@ impl Discovery { }; Ok(Self { + discovery_listeners: Default::default(), local_enr, discv4, discv4_updates, @@ -96,6 +104,17 @@ impl Discovery { }) } + /// Registers a listener for receiving [DiscoveryEvent] updates. + pub(crate) fn add_listener(&mut self, tx: mpsc::UnboundedSender) { + self.discovery_listeners.push(tx); + } + + /// Notifies all registered listeners with the provided `event`. + #[inline] + fn notify_listeners(&mut self, event: &DiscoveryEvent) { + self.discovery_listeners.retain_mut(|listener| listener.send(event.clone()).is_ok()); + } + /// Updates the `eth:ForkId` field in discv4. #[allow(unused)] pub(crate) fn update_fork_id(&self, fork_id: ForkId) { @@ -139,11 +158,9 @@ impl Discovery { Entry::Occupied(_entry) => {} Entry::Vacant(entry) => { entry.insert(addr); - self.queued_events.push_back(DiscoveryEvent::Discovered { - peer_id: id, - socket_addr: addr, - fork_id, - }); + self.queued_events.push_back(DiscoveryEvent::NewNode( + DiscoveredEvent::EventQueued { peer_id: id, socket_addr: addr, fork_id }, + )); } } } @@ -174,6 +191,7 @@ impl Discovery { loop { // Drain all buffered events first if let Some(event) = self.queued_events.pop_front() { + self.notify_listeners(&event); return Poll::Ready(event) } @@ -204,6 +222,9 @@ impl Discovery { /// /// NOTE: This instance does nothing pub(crate) fn noop() -> Self { + let (_discovery_listeners, _): (mpsc::UnboundedSender, _) = + mpsc::unbounded_channel(); + Self { discovered_nodes: Default::default(), local_enr: NodeRecord { @@ -219,14 +240,16 @@ impl Discovery { _dns_discovery: None, dns_discovery_updates: None, _dns_disc_service: None, + discovery_listeners: Default::default(), } } } /// Events produced by the [`Discovery`] manager. +#[derive(Debug, Clone)] pub enum DiscoveryEvent { - /// A new node was discovered - Discovered { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option }, + /// Discovered a node + NewNode(DiscoveredEvent), /// Retrieved a [`ForkId`] from the peer via ENR request, See EnrForkId(PeerId, ForkId), } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 0a997d85a..78ec6d332 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -41,7 +41,7 @@ use reth_eth_wire::{ use reth_metrics::common::mpsc::UnboundedMeteredSender; use reth_net_common::bandwidth_meter::BandwidthMeter; use reth_network_api::ReputationChangeKind; -use reth_primitives::{listener::EventListeners, NodeRecord, PeerId, H256}; +use reth_primitives::{listener::EventListeners, ForkId, NodeRecord, PeerId, H256}; use reth_provider::BlockReader; use reth_rpc_types::{EthProtocolInfo, NetworkStatus}; use std::{ @@ -515,6 +515,9 @@ where NetworkHandleMessage::EventListener(tx) => { self.event_listeners.push_listener(tx); } + NetworkHandleMessage::DiscoveryListener(tx) => { + self.swarm.state_mut().discovery_mut().add_listener(tx); + } NetworkHandleMessage::AnnounceBlock(block, hash) => { if self.handle.mode().is_stake() { // See [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p) @@ -919,3 +922,8 @@ pub enum NetworkEvent { /// Event emitted when a new peer is removed PeerRemoved(PeerId), } + +#[derive(Debug, Clone)] +pub enum DiscoveredEvent { + EventQueued { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option }, +} diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 44337e155..9a3c8926c 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -1,6 +1,6 @@ use crate::{ - config::NetworkMode, manager::NetworkEvent, message::PeerRequest, peers::PeersHandle, - session::PeerInfo, FetchClient, + config::NetworkMode, discovery::DiscoveryEvent, manager::NetworkEvent, message::PeerRequest, + peers::PeersHandle, session::PeerInfo, FetchClient, }; use async_trait::async_trait; use parking_lot::Mutex; @@ -83,6 +83,15 @@ impl NetworkHandle { UnboundedReceiverStream::new(rx) } + /// Returns a new [`DiscoveryEvent`] stream. + /// + /// This stream yields [`DiscoveryEvent`]s for each peer that is discovered. + pub fn discovery_listener(&self) -> UnboundedReceiverStream { + let (tx, rx) = mpsc::unbounded_channel(); + let _ = self.manager().send(NetworkHandleMessage::DiscoveryListener(tx)); + UnboundedReceiverStream::new(rx) + } + /// Returns a new [`FetchClient`] that can be cloned and shared. /// /// The [`FetchClient`] is the entrypoint for sending requests to the network. @@ -320,4 +329,6 @@ pub(crate) enum NetworkHandleMessage { GetReputationById(PeerId, oneshot::Sender>), /// Gracefully shutdown network Shutdown(oneshot::Sender<()>), + /// Add a new listener for `DiscoveryEvent`. + DiscoveryListener(UnboundedSender), } diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index 104f263f4..446a67962 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -4,6 +4,7 @@ use crate::{ cache::LruCache, discovery::{Discovery, DiscoveryEvent}, fetch::{BlockResponseOutcome, FetchAction, StateFetcher}, + manager::DiscoveredEvent, message::{ BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse, PeerResponseResult, @@ -11,6 +12,7 @@ use crate::{ peers::{PeerAction, PeersManager}, FetchClient, }; + use reth_eth_wire::{ capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status, }; @@ -95,6 +97,11 @@ where &mut self.peers_manager } + /// Returns mutable access to the [`Discovery`] + pub(crate) fn discovery_mut(&mut self) -> &mut Discovery { + &mut self.discovery + } + /// Returns access to the [`PeersManager`] pub(crate) fn peers(&self) -> &PeersManager { &self.peers_manager @@ -277,7 +284,11 @@ where /// Event hook for events received from the discovery service. fn on_discovery_event(&mut self, event: DiscoveryEvent) { match event { - DiscoveryEvent::Discovered { peer_id, socket_addr, fork_id } => { + DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued { + peer_id, + socket_addr, + fork_id, + }) => { self.queued_messages.push_back(StateAction::DiscoveredNode { peer_id, socket_addr,