chore: discv4 touchups (#6639)

Co-authored-by: Emilia Hane <elsaemiliaevahane@gmail.com>
This commit is contained in:
Matthias Seitz
2024-02-18 02:13:14 +01:00
committed by GitHub
parent cde7a1db79
commit 2b92948501
2 changed files with 81 additions and 21 deletions

View File

@ -34,7 +34,7 @@ pub struct Discv4Config {
pub ping_interval: Duration,
/// The duration of we consider a ping timed out.
pub ping_expiration: Duration,
/// The rate at which lookups should be triggered.
/// The rate at which new random lookups should be triggered.
pub lookup_interval: Duration,
/// The duration of we consider a FindNode request timed out.
pub request_timeout: Duration,

View File

@ -47,7 +47,7 @@ use secp256k1::SecretKey;
use std::{
cell::RefCell,
collections::{btree_map, hash_map::Entry, BTreeMap, HashMap, VecDeque},
io,
fmt, io,
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4},
pin::Pin,
rc::Rc,
@ -130,6 +130,12 @@ const ENDPOINT_PROOF_EXPIRATION: Duration = Duration::from_secs(24 * 60 * 60);
/// Duration used to expire nodes from the routing table 1hr
const EXPIRE_DURATION: Duration = Duration::from_secs(60 * 60);
// Restricts how many udp messages can be processed in a single [Discv4Service::poll] call.
//
// This will act as a manual yield point when draining the socket messages where the most CPU
// expensive part is handling outgoing messages: encoding and hashing the packet
const UDP_MESSAGE_POLL_LOOP_BUDGET: i32 = 4;
type EgressSender = mpsc::Sender<(Bytes, SocketAddr)>;
type EgressReceiver = mpsc::Receiver<(Bytes, SocketAddr)>;
@ -139,6 +145,10 @@ pub(crate) type IngressReceiver = mpsc::Receiver<IngressEvent>;
type NodeRecordSender = OneshotSender<Vec<NodeRecord>>;
/// The Discv4 frontend
///
/// This communicates with the [Discv4Service] by sending commands over a channel.
///
/// See also [Discv4::spawn]
#[derive(Debug, Clone)]
pub struct Discv4 {
/// The address of the udp socket
@ -397,8 +407,10 @@ impl Discv4 {
}
/// Manages discv4 peer discovery over UDP.
///
/// This is a [Stream] to handles incoming and outgoing discv4 messages and emits updates via:
/// [Discv4Service::update_stream].
#[must_use = "Stream does nothing unless polled"]
#[allow(missing_debug_implementations)]
pub struct Discv4Service {
/// Local address of the UDP socket.
local_address: SocketAddr,
@ -419,8 +431,12 @@ pub struct Discv4Service {
/// The routing table.
kbuckets: KBucketsTable<NodeKey, NodeEntry>,
/// Receiver for incoming messages
///
/// Receives incoming messages from the UDP task.
ingress: IngressReceiver,
/// Sender for sending outgoing messages
///
/// Sends outgoind messages to the UDP task.
egress: EgressSender,
/// Buffered pending pings to apply backpressure.
///
@ -446,7 +462,7 @@ pub struct Discv4Service {
commands_rx: mpsc::UnboundedReceiver<Discv4Command>,
/// All subscribers for table updates
update_listeners: Vec<mpsc::Sender<DiscoveryUpdate>>,
/// The interval when to trigger lookups
/// The interval when to trigger random lookups
lookup_interval: Interval,
/// Used to rotate targets to lookup
lookup_rotator: LookupTargetRotator,
@ -577,7 +593,7 @@ impl Discv4Service {
}
}
/// Returns the current enr sequence
/// Returns the current enr sequence of the local record.
fn enr_seq(&self) -> Option<u64> {
(self.config.enable_eip868).then(|| self.local_eip_868_enr.seq())
}
@ -601,19 +617,19 @@ impl Discv4Service {
}
/// Returns the [PeerId] that identifies this node
pub fn local_peer_id(&self) -> &PeerId {
pub const fn local_peer_id(&self) -> &PeerId {
&self.local_node_record.id
}
/// Returns the address of the UDP socket
pub fn local_addr(&self) -> SocketAddr {
pub const fn local_addr(&self) -> SocketAddr {
self.local_address
}
/// Returns the ENR of this service.
///
/// Note: this will include the external address if resolved.
pub fn local_enr(&self) -> NodeRecord {
pub const fn local_enr(&self) -> NodeRecord {
self.local_node_record
}
@ -680,7 +696,7 @@ impl Discv4Service {
})
}
/// Creates a new channel for [`DiscoveryUpdate`]s
/// Creates a new bounded channel for [`DiscoveryUpdate`]s.
pub fn update_stream(&mut self) -> ReceiverStream<DiscoveryUpdate> {
let (tx, rx) = mpsc::channel(512);
self.update_listeners.push(tx);
@ -762,7 +778,9 @@ impl Discv4Service {
self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
}
/// Notifies all listeners
/// Notifies all listeners.
///
/// Removes all listeners that are closed.
fn notify(&mut self, update: DiscoveryUpdate) {
self.update_listeners.retain_mut(|listener| match listener.try_send(update.clone()) {
Ok(()) => true,
@ -855,6 +873,7 @@ impl Discv4Service {
}
}
(Some(_), None) => {
// got an ENR
self.send_enr_request(record);
}
_ => {}
@ -949,6 +968,8 @@ impl Discv4Service {
}
_ => return false,
}
// send the initial ping to the _new_ node
self.try_ping(record, PingReason::InitialInsert);
true
}
@ -1540,10 +1561,11 @@ impl Discv4Service {
}
// trigger self lookup
if self.config.enable_lookup && self.lookup_interval.poll_tick(cx).is_ready() {
let _ = self.lookup_interval.poll_tick(cx);
let target = self.lookup_rotator.next(&self.local_node_record.id);
self.lookup_with(target, None);
if self.config.enable_lookup {
while self.lookup_interval.poll_tick(cx).is_ready() {
let target = self.lookup_rotator.next(&self.local_node_record.id);
self.lookup_with(target, None);
}
}
// re-ping some peers
@ -1558,7 +1580,7 @@ impl Discv4Service {
self.set_external_ip_addr(ip);
}
// process all incoming commands, this channel can never close
// drain all incoming `Discv4` commands, this channel can never close
while let Poll::Ready(Some(cmd)) = self.commands_rx.poll_recv(cx) {
match cmd {
Discv4Command::Add(enr) => {
@ -1608,6 +1630,9 @@ impl Discv4Service {
}
}
// restricts how many messages we process in a single poll before yielding back control
let mut udp_message_budget = UDP_MESSAGE_POLL_LOOP_BUDGET;
// process all incoming datagrams
while let Poll::Ready(Some(event)) = self.ingress.poll_recv(cx) {
match event {
@ -1649,6 +1674,17 @@ impl Discv4Service {
self.queued_events.push_back(event);
}
}
udp_message_budget -= 1;
if udp_message_budget < 0 {
trace!(target: "discv4", budget=UDP_MESSAGE_POLL_LOOP_BUDGET, "exhausted message poll budget");
if self.queued_events.is_empty() {
// we've exceeded the message budget and have no events to process
// this will make sure we're woken up again
cx.waker().wake_by_ref();
}
break
}
}
// try resending buffered pings
@ -1656,12 +1692,12 @@ impl Discv4Service {
// evict expired nodes
while self.evict_expired_requests_interval.poll_tick(cx).is_ready() {
self.evict_expired_requests(Instant::now())
self.evict_expired_requests(Instant::now());
}
// evict expired nodes
while self.expire_interval.poll_tick(cx).is_ready() {
self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION)
self.received_pongs.evict_expired(Instant::now(), EXPIRE_DURATION);
}
if self.queued_events.is_empty() {
@ -1686,6 +1722,20 @@ impl Stream for Discv4Service {
}
}
impl fmt::Debug for Discv4Service {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Discv4Service")
.field("local_address", &self.local_address)
.field("local_peer_id", &self.local_peer_id())
.field("local_node_record", &self.local_node_record)
.field("queued_pings", &self.queued_pings)
.field("pending_lookup", &self.pending_lookup)
.field("pending_find_nodes", &self.pending_find_nodes)
.field("lookup_interval", &self.lookup_interval)
.finish_non_exhaustive()
}
}
/// The Event type the Service stream produces.
///
/// This is mainly used for testing purposes and represents messages the service processed
@ -1763,7 +1813,7 @@ pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_i
}
}
/// The commands sent from the frontend to the service
/// The commands sent from the frontend [Discv4] to the service [Discv4Service].
enum Discv4Command {
Add(NodeRecord),
SetTcpPort(u16),
@ -1779,6 +1829,7 @@ enum Discv4Command {
}
/// Event type receiver produces
#[derive(Debug)]
pub(crate) enum IngressEvent {
/// Encountered an error when reading a datagram message.
RecvError(io::Error),
@ -1789,6 +1840,7 @@ pub(crate) enum IngressEvent {
}
/// Tracks a sent ping
#[derive(Debug)]
struct PingRequest {
// Timestamp when the request was sent.
sent_at: Instant,
@ -1953,6 +2005,9 @@ struct LookupContextInner {
/// The closest nodes
closest_nodes: RefCell<BTreeMap<Distance, QueryNode>>,
/// A listener for all the nodes retrieved in this lookup
///
/// This is present if the lookup was triggered manually via [Discv4] and we want to return all
/// the nodes once the lookup finishes.
listener: Option<NodeRecordSender>,
}
@ -1981,6 +2036,7 @@ struct QueryNode {
responded: bool,
}
#[derive(Debug)]
struct FindNodeRequest {
// Timestamp when the request was sent.
sent_at: Instant,
@ -2000,6 +2056,7 @@ impl FindNodeRequest {
}
}
#[derive(Debug)]
struct EnrRequestState {
// Timestamp when the request was sent.
sent_at: Instant,
@ -2081,6 +2138,7 @@ impl NodeEntry {
}
/// Represents why a ping is issued
#[derive(Debug)]
enum PingReason {
/// Initial ping to a previously unknown peer that was inserted into the table.
InitialInsert,
@ -2089,7 +2147,7 @@ enum PingReason {
EstablishBond,
/// Re-ping a peer.
RePing,
/// Part of a lookup to ensure endpoint is proven.
/// Part of a lookup to ensure endpoint is proven before we can send a FindNode request.
Lookup(NodeRecord, LookupContext),
}
@ -2212,9 +2270,10 @@ mod tests {
}
}
#[tokio::test]
// Bootstraps with mainnet boot nodes
#[tokio::test(flavor = "multi_thread")]
#[ignore]
async fn test_lookup() {
async fn test_mainnet_lookup() {
reth_tracing::init_test_tracing();
let fork_id = ForkId { hash: ForkHash(hex!("743f3d89")), next: 16191202 };
@ -2396,6 +2455,7 @@ mod tests {
// done
assert_eq!(service.pending_find_nodes.len(), 2);
}
#[tokio::test]
async fn test_no_local_in_closest() {
reth_tracing::init_test_tracing();