mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat:new discovered node record event stream (#3707)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
@ -1,6 +1,9 @@
|
|||||||
//! Discovery support for the network.
|
//! Discovery support for the network.
|
||||||
|
|
||||||
use crate::error::{NetworkError, ServiceKind};
|
use crate::{
|
||||||
|
error::{NetworkError, ServiceKind},
|
||||||
|
manager::DiscoveredEvent,
|
||||||
|
};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config, EnrForkIdEntry};
|
use reth_discv4::{DiscoveryUpdate, Discv4, Discv4Config, EnrForkIdEntry};
|
||||||
use reth_dns_discovery::{
|
use reth_dns_discovery::{
|
||||||
@ -14,12 +17,14 @@ use std::{
|
|||||||
sync::Arc,
|
sync::Arc,
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
};
|
};
|
||||||
use tokio::task::JoinHandle;
|
use tokio::{sync::mpsc, task::JoinHandle};
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
|
|
||||||
/// An abstraction over the configured discovery protocol.
|
/// An abstraction over the configured discovery protocol.
|
||||||
///
|
///
|
||||||
/// Listens for new discovered nodes and emits events for discovered nodes and their address.
|
/// Listens for new discovered nodes and emits events for discovered nodes and their
|
||||||
|
/// address.#[derive(Debug, Clone)]
|
||||||
|
|
||||||
pub struct Discovery {
|
pub struct Discovery {
|
||||||
/// All nodes discovered via discovery protocol.
|
/// All nodes discovered via discovery protocol.
|
||||||
///
|
///
|
||||||
@ -41,6 +46,8 @@ pub struct Discovery {
|
|||||||
_dns_disc_service: Option<JoinHandle<()>>,
|
_dns_disc_service: Option<JoinHandle<()>>,
|
||||||
/// Events buffered until polled.
|
/// Events buffered until polled.
|
||||||
queued_events: VecDeque<DiscoveryEvent>,
|
queued_events: VecDeque<DiscoveryEvent>,
|
||||||
|
/// List of listeners subscribed to discovery events.
|
||||||
|
discovery_listeners: Vec<mpsc::UnboundedSender<DiscoveryEvent>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Discovery {
|
impl Discovery {
|
||||||
@ -84,6 +91,7 @@ impl Discovery {
|
|||||||
};
|
};
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
discovery_listeners: Default::default(),
|
||||||
local_enr,
|
local_enr,
|
||||||
discv4,
|
discv4,
|
||||||
discv4_updates,
|
discv4_updates,
|
||||||
@ -96,6 +104,17 @@ impl Discovery {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Registers a listener for receiving [DiscoveryEvent] updates.
|
||||||
|
pub(crate) fn add_listener(&mut self, tx: mpsc::UnboundedSender<DiscoveryEvent>) {
|
||||||
|
self.discovery_listeners.push(tx);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Notifies all registered listeners with the provided `event`.
|
||||||
|
#[inline]
|
||||||
|
fn notify_listeners(&mut self, event: &DiscoveryEvent) {
|
||||||
|
self.discovery_listeners.retain_mut(|listener| listener.send(event.clone()).is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
/// Updates the `eth:ForkId` field in discv4.
|
/// Updates the `eth:ForkId` field in discv4.
|
||||||
#[allow(unused)]
|
#[allow(unused)]
|
||||||
pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
|
pub(crate) fn update_fork_id(&self, fork_id: ForkId) {
|
||||||
@ -139,11 +158,9 @@ impl Discovery {
|
|||||||
Entry::Occupied(_entry) => {}
|
Entry::Occupied(_entry) => {}
|
||||||
Entry::Vacant(entry) => {
|
Entry::Vacant(entry) => {
|
||||||
entry.insert(addr);
|
entry.insert(addr);
|
||||||
self.queued_events.push_back(DiscoveryEvent::Discovered {
|
self.queued_events.push_back(DiscoveryEvent::NewNode(
|
||||||
peer_id: id,
|
DiscoveredEvent::EventQueued { peer_id: id, socket_addr: addr, fork_id },
|
||||||
socket_addr: addr,
|
));
|
||||||
fork_id,
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -174,6 +191,7 @@ impl Discovery {
|
|||||||
loop {
|
loop {
|
||||||
// Drain all buffered events first
|
// Drain all buffered events first
|
||||||
if let Some(event) = self.queued_events.pop_front() {
|
if let Some(event) = self.queued_events.pop_front() {
|
||||||
|
self.notify_listeners(&event);
|
||||||
return Poll::Ready(event)
|
return Poll::Ready(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -204,6 +222,9 @@ impl Discovery {
|
|||||||
///
|
///
|
||||||
/// NOTE: This instance does nothing
|
/// NOTE: This instance does nothing
|
||||||
pub(crate) fn noop() -> Self {
|
pub(crate) fn noop() -> Self {
|
||||||
|
let (_discovery_listeners, _): (mpsc::UnboundedSender<DiscoveryEvent>, _) =
|
||||||
|
mpsc::unbounded_channel();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
discovered_nodes: Default::default(),
|
discovered_nodes: Default::default(),
|
||||||
local_enr: NodeRecord {
|
local_enr: NodeRecord {
|
||||||
@ -219,14 +240,16 @@ impl Discovery {
|
|||||||
_dns_discovery: None,
|
_dns_discovery: None,
|
||||||
dns_discovery_updates: None,
|
dns_discovery_updates: None,
|
||||||
_dns_disc_service: None,
|
_dns_disc_service: None,
|
||||||
|
discovery_listeners: Default::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Events produced by the [`Discovery`] manager.
|
/// Events produced by the [`Discovery`] manager.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
pub enum DiscoveryEvent {
|
pub enum DiscoveryEvent {
|
||||||
/// A new node was discovered
|
/// Discovered a node
|
||||||
Discovered { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option<ForkId> },
|
NewNode(DiscoveredEvent),
|
||||||
/// Retrieved a [`ForkId`] from the peer via ENR request, See <https://eips.ethereum.org/EIPS/eip-868>
|
/// Retrieved a [`ForkId`] from the peer via ENR request, See <https://eips.ethereum.org/EIPS/eip-868>
|
||||||
EnrForkId(PeerId, ForkId),
|
EnrForkId(PeerId, ForkId),
|
||||||
}
|
}
|
||||||
|
|||||||
@ -41,7 +41,7 @@ use reth_eth_wire::{
|
|||||||
use reth_metrics::common::mpsc::UnboundedMeteredSender;
|
use reth_metrics::common::mpsc::UnboundedMeteredSender;
|
||||||
use reth_net_common::bandwidth_meter::BandwidthMeter;
|
use reth_net_common::bandwidth_meter::BandwidthMeter;
|
||||||
use reth_network_api::ReputationChangeKind;
|
use reth_network_api::ReputationChangeKind;
|
||||||
use reth_primitives::{listener::EventListeners, NodeRecord, PeerId, H256};
|
use reth_primitives::{listener::EventListeners, ForkId, NodeRecord, PeerId, H256};
|
||||||
use reth_provider::BlockReader;
|
use reth_provider::BlockReader;
|
||||||
use reth_rpc_types::{EthProtocolInfo, NetworkStatus};
|
use reth_rpc_types::{EthProtocolInfo, NetworkStatus};
|
||||||
use std::{
|
use std::{
|
||||||
@ -515,6 +515,9 @@ where
|
|||||||
NetworkHandleMessage::EventListener(tx) => {
|
NetworkHandleMessage::EventListener(tx) => {
|
||||||
self.event_listeners.push_listener(tx);
|
self.event_listeners.push_listener(tx);
|
||||||
}
|
}
|
||||||
|
NetworkHandleMessage::DiscoveryListener(tx) => {
|
||||||
|
self.swarm.state_mut().discovery_mut().add_listener(tx);
|
||||||
|
}
|
||||||
NetworkHandleMessage::AnnounceBlock(block, hash) => {
|
NetworkHandleMessage::AnnounceBlock(block, hash) => {
|
||||||
if self.handle.mode().is_stake() {
|
if self.handle.mode().is_stake() {
|
||||||
// See [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
|
// See [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
|
||||||
@ -919,3 +922,8 @@ pub enum NetworkEvent {
|
|||||||
/// Event emitted when a new peer is removed
|
/// Event emitted when a new peer is removed
|
||||||
PeerRemoved(PeerId),
|
PeerRemoved(PeerId),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum DiscoveredEvent {
|
||||||
|
EventQueued { peer_id: PeerId, socket_addr: SocketAddr, fork_id: Option<ForkId> },
|
||||||
|
}
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
config::NetworkMode, manager::NetworkEvent, message::PeerRequest, peers::PeersHandle,
|
config::NetworkMode, discovery::DiscoveryEvent, manager::NetworkEvent, message::PeerRequest,
|
||||||
session::PeerInfo, FetchClient,
|
peers::PeersHandle, session::PeerInfo, FetchClient,
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use parking_lot::Mutex;
|
use parking_lot::Mutex;
|
||||||
@ -83,6 +83,15 @@ impl NetworkHandle {
|
|||||||
UnboundedReceiverStream::new(rx)
|
UnboundedReceiverStream::new(rx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a new [`DiscoveryEvent`] stream.
|
||||||
|
///
|
||||||
|
/// This stream yields [`DiscoveryEvent`]s for each peer that is discovered.
|
||||||
|
pub fn discovery_listener(&self) -> UnboundedReceiverStream<DiscoveryEvent> {
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
let _ = self.manager().send(NetworkHandleMessage::DiscoveryListener(tx));
|
||||||
|
UnboundedReceiverStream::new(rx)
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns a new [`FetchClient`] that can be cloned and shared.
|
/// Returns a new [`FetchClient`] that can be cloned and shared.
|
||||||
///
|
///
|
||||||
/// The [`FetchClient`] is the entrypoint for sending requests to the network.
|
/// The [`FetchClient`] is the entrypoint for sending requests to the network.
|
||||||
@ -320,4 +329,6 @@ pub(crate) enum NetworkHandleMessage {
|
|||||||
GetReputationById(PeerId, oneshot::Sender<Option<Reputation>>),
|
GetReputationById(PeerId, oneshot::Sender<Option<Reputation>>),
|
||||||
/// Gracefully shutdown network
|
/// Gracefully shutdown network
|
||||||
Shutdown(oneshot::Sender<()>),
|
Shutdown(oneshot::Sender<()>),
|
||||||
|
/// Add a new listener for `DiscoveryEvent`.
|
||||||
|
DiscoveryListener(UnboundedSender<DiscoveryEvent>),
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,6 +4,7 @@ use crate::{
|
|||||||
cache::LruCache,
|
cache::LruCache,
|
||||||
discovery::{Discovery, DiscoveryEvent},
|
discovery::{Discovery, DiscoveryEvent},
|
||||||
fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
|
fetch::{BlockResponseOutcome, FetchAction, StateFetcher},
|
||||||
|
manager::DiscoveredEvent,
|
||||||
message::{
|
message::{
|
||||||
BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse,
|
BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse,
|
||||||
PeerResponseResult,
|
PeerResponseResult,
|
||||||
@ -11,6 +12,7 @@ use crate::{
|
|||||||
peers::{PeerAction, PeersManager},
|
peers::{PeerAction, PeersManager},
|
||||||
FetchClient,
|
FetchClient,
|
||||||
};
|
};
|
||||||
|
|
||||||
use reth_eth_wire::{
|
use reth_eth_wire::{
|
||||||
capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status,
|
capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status,
|
||||||
};
|
};
|
||||||
@ -95,6 +97,11 @@ where
|
|||||||
&mut self.peers_manager
|
&mut self.peers_manager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns mutable access to the [`Discovery`]
|
||||||
|
pub(crate) fn discovery_mut(&mut self) -> &mut Discovery {
|
||||||
|
&mut self.discovery
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns access to the [`PeersManager`]
|
/// Returns access to the [`PeersManager`]
|
||||||
pub(crate) fn peers(&self) -> &PeersManager {
|
pub(crate) fn peers(&self) -> &PeersManager {
|
||||||
&self.peers_manager
|
&self.peers_manager
|
||||||
@ -277,7 +284,11 @@ where
|
|||||||
/// Event hook for events received from the discovery service.
|
/// Event hook for events received from the discovery service.
|
||||||
fn on_discovery_event(&mut self, event: DiscoveryEvent) {
|
fn on_discovery_event(&mut self, event: DiscoveryEvent) {
|
||||||
match event {
|
match event {
|
||||||
DiscoveryEvent::Discovered { peer_id, socket_addr, fork_id } => {
|
DiscoveryEvent::NewNode(DiscoveredEvent::EventQueued {
|
||||||
|
peer_id,
|
||||||
|
socket_addr,
|
||||||
|
fork_id,
|
||||||
|
}) => {
|
||||||
self.queued_messages.push_back(StateAction::DiscoveredNode {
|
self.queued_messages.push_back(StateAction::DiscoveredNode {
|
||||||
peer_id,
|
peer_id,
|
||||||
socket_addr,
|
socket_addr,
|
||||||
|
|||||||
Reference in New Issue
Block a user