diff --git a/crates/net/discv4/src/config.rs b/crates/net/discv4/src/config.rs index c934f3361..38467304d 100644 --- a/crates/net/discv4/src/config.rs +++ b/crates/net/discv4/src/config.rs @@ -23,7 +23,7 @@ pub struct Discv4Config { pub udp_egress_message_buffer: usize, /// Size of the channel buffer for incoming messages. pub udp_ingress_message_buffer: usize, - /// The number of allowed failures for `FindNode` requests. Default: 5. + /// The number of allowed consecutive failures for `FindNode` requests. Default: 5. pub max_find_node_failures: u8, /// The interval to use when checking for expired nodes that need to be re-pinged. Default: /// 10min. diff --git a/crates/net/discv4/src/lib.rs b/crates/net/discv4/src/lib.rs index e955e45df..779c7ee63 100644 --- a/crates/net/discv4/src/lib.rs +++ b/crates/net/discv4/src/lib.rs @@ -743,7 +743,8 @@ impl Discv4Service { trace!(target: "discv4", ?target, "Starting lookup"); let target_key = kad_key(target); - // Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes + // Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes to which we have + // a valid endpoint proof let ctx = LookupContext::new( target_key.clone(), self.kbuckets @@ -772,7 +773,10 @@ impl Discv4Service { trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes"); for node in closest { - self.find_node(&node, ctx.clone()); + // here we still want to check against previous request failures and if necessary + // re-establish a new endpoint proof because it can be the case that the other node lost + // our entry and no longer has an endpoint proof on their end + self.find_node_checked(&node, ctx.clone()); } } @@ -788,6 +792,22 @@ impl Discv4Service { self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx)); } + /// Sends a new `FindNode` packet to the node with `target` as the lookup target but checks + /// whether we should should send a new ping first to renew the endpoint proof by checking the + /// previously failed findNode requests. It could be that the node is no longer reachable or + /// lost our entry. + fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) { + let max_failures = self.config.max_find_node_failures; + let needs_ping = self + .on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures)) + .unwrap_or(true); + if needs_ping { + self.try_ping(*node, PingReason::Lookup(*node, ctx)) + } else { + self.find_node(node, ctx) + } + } + /// Notifies all listeners. /// /// Removes all listeners that are closed. @@ -860,7 +880,7 @@ impl Discv4Service { self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected()) } - /// Check if the peer has a bond + /// Check if the peer has an active bond. fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool { if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) { if timestamp.elapsed() < self.config.bond_expiration { @@ -870,6 +890,19 @@ impl Discv4Service { false } + /// Applies a closure on the pending or present [`NodeEntry`]. + fn on_entry(&mut self, peer_id: PeerId, f: F) -> Option + where + F: FnOnce(&NodeEntry) -> R, + { + let key = kad_key(peer_id); + match self.kbuckets.entry(&key) { + BucketEntry::Present(entry, _) => Some(f(entry.value())), + BucketEntry::Pending(mut entry, _) => Some(f(entry.value())), + _ => None, + } + } + /// Update the entry on RE-ping. /// /// Invoked when we received the Pong to our [`PingReason::RePing`] ping. @@ -929,7 +962,7 @@ impl Discv4Service { match self.kbuckets.entry(&key) { kbucket::Entry::Present(mut entry, old_status) => { // endpoint is now proven - entry.value_mut().has_endpoint_proof = true; + entry.value_mut().establish_proof(); entry.value_mut().update_with_enr(last_enr_seq); if !old_status.is_connected() { @@ -945,7 +978,7 @@ impl Discv4Service { } kbucket::Entry::Pending(mut entry, mut status) => { // endpoint is now proven - entry.value().has_endpoint_proof = true; + entry.value().establish_proof(); entry.value().update_with_enr(last_enr_seq); if !status.is_connected() { @@ -1129,6 +1162,8 @@ impl Discv4Service { // try to send it ctx.unmark_queried(record.id); } else { + // we just received a ping from that peer so we can send a find node request + // directly self.find_node(&record, ctx); } } @@ -1419,14 +1454,28 @@ impl Discv4Service { BucketEntry::SelfEntry => { // we received our own node entry } - BucketEntry::Present(mut entry, _) => { - if entry.value_mut().has_endpoint_proof { - self.find_node(&closest, ctx.clone()); + BucketEntry::Present(entry, _) => { + if entry.value().has_endpoint_proof { + if entry + .value() + .exceeds_find_node_failures(self.config.max_find_node_failures) + { + self.try_ping(closest, PingReason::Lookup(closest, ctx.clone())) + } else { + self.find_node(&closest, ctx.clone()); + } } } BucketEntry::Pending(mut entry, _) => { if entry.value().has_endpoint_proof { - self.find_node(&closest, ctx.clone()); + if entry + .value() + .exceeds_find_node_failures(self.config.max_find_node_failures) + { + self.try_ping(closest, PingReason::Lookup(closest, ctx.clone())) + } else { + self.find_node(&closest, ctx.clone()); + } } } } @@ -1486,27 +1535,27 @@ impl Discv4Service { self.remove_node(node_id); } - self.evict_failed_neighbours(now); + self.evict_failed_find_nodes(now); } /// Handles failed responses to `FindNode` - fn evict_failed_neighbours(&mut self, now: Instant) { - let mut failed_neighbours = Vec::new(); + fn evict_failed_find_nodes(&mut self, now: Instant) { + let mut failed_find_nodes = Vec::new(); self.pending_find_nodes.retain(|node_id, find_node_request| { if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration { if !find_node_request.answered { // node actually responded but with fewer entries than expected, but we don't // treat this as an hard error since it responded. - failed_neighbours.push(*node_id); + failed_find_nodes.push(*node_id); } return false } true }); - trace!(target: "discv4", num=%failed_neighbours.len(), "processing failed neighbours"); + trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes"); - for node_id in failed_neighbours { + for node_id in failed_find_nodes { let key = kad_key(node_id); let failures = match self.kbuckets.entry(&key) { kbucket::Entry::Present(mut entry, _) => { @@ -1523,7 +1572,7 @@ impl Discv4Service { // if the node failed to respond anything useful multiple times, remove the node from // the table, but only if there are enough other nodes in the bucket (bucket must be at // least half full) - if failures > (self.config.max_find_node_failures as usize) { + if failures > self.config.max_find_node_failures { self.soft_remove_node(node_id); } } @@ -2216,8 +2265,8 @@ struct NodeEntry { last_enr_seq: Option, /// `ForkId` if retrieved via ENR requests. fork_id: Option, - /// Counter for failed findNode requests. - find_node_failures: usize, + /// Counter for failed _consecutive_ findNode requests. + find_node_failures: u8, /// Whether the endpoint of the peer is proven. has_endpoint_proof: bool, } @@ -2244,6 +2293,17 @@ impl NodeEntry { node } + /// Marks the entry with an established proof and resets the consecutive failure counter. + fn establish_proof(&mut self) { + self.has_endpoint_proof = true; + self.find_node_failures = 0; + } + + /// Returns true if the tracked find node failures exceed the max amount + const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool { + self.find_node_failures >= max_failures + } + /// Updates the last timestamp and sets the enr seq fn update_with_enr(&mut self, last_enr_seq: Option) -> Option { self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq)) @@ -2660,6 +2720,45 @@ mod tests { assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1); } + #[tokio::test] + async fn test_reping_on_find_node_failures() { + reth_tracing::init_test_tracing(); + + let config = Discv4Config::builder().build(); + let (_discv4, mut service) = create_discv4_with_config(config).await; + + let target = PeerId::random(); + + let id = PeerId::random(); + let key = kad_key(id); + let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id); + + let mut entry = NodeEntry::new_proven(record); + entry.find_node_failures = u8::MAX; + let _ = service.kbuckets.insert_or_update( + &key, + entry, + NodeStatus { + direction: ConnectionDirection::Incoming, + state: ConnectionState::Connected, + }, + ); + + service.lookup(target); + assert_eq!(service.pending_find_nodes.len(), 0); + assert_eq!(service.pending_pings.len(), 1); + + service.update_on_pong(record, None); + + service + .on_entry(record.id, |entry| { + // reset on pong + assert_eq!(entry.find_node_failures, 0); + assert!(entry.has_endpoint_proof); + }) + .unwrap(); + } + #[tokio::test] async fn test_service_commands() { reth_tracing::init_test_tracing();