fix: check failed find nodes requests before sending new ones (#11997)

This commit is contained in:
Matthias Seitz
2024-10-23 15:37:39 +02:00
committed by GitHub
parent 252cdf7f35
commit 55d98bbc6e
2 changed files with 118 additions and 19 deletions

View File

@ -23,7 +23,7 @@ pub struct Discv4Config {
pub udp_egress_message_buffer: usize,
/// Size of the channel buffer for incoming messages.
pub udp_ingress_message_buffer: usize,
/// The number of allowed failures for `FindNode` requests. Default: 5.
/// The number of allowed consecutive failures for `FindNode` requests. Default: 5.
pub max_find_node_failures: u8,
/// The interval to use when checking for expired nodes that need to be re-pinged. Default:
/// 10min.

View File

@ -743,7 +743,8 @@ impl Discv4Service {
trace!(target: "discv4", ?target, "Starting lookup");
let target_key = kad_key(target);
// Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes
// Start a lookup context with the 16 (MAX_NODES_PER_BUCKET) closest nodes to which we have
// a valid endpoint proof
let ctx = LookupContext::new(
target_key.clone(),
self.kbuckets
@ -772,7 +773,10 @@ impl Discv4Service {
trace!(target: "discv4", ?target, num = closest.len(), "Start lookup closest nodes");
for node in closest {
self.find_node(&node, ctx.clone());
// here we still want to check against previous request failures and if necessary
// re-establish a new endpoint proof because it can be the case that the other node lost
// our entry and no longer has an endpoint proof on their end
self.find_node_checked(&node, ctx.clone());
}
}
@ -788,6 +792,22 @@ impl Discv4Service {
self.pending_find_nodes.insert(node.id, FindNodeRequest::new(ctx));
}
/// Sends a new `FindNode` packet to the node with `target` as the lookup target but checks
/// whether we should should send a new ping first to renew the endpoint proof by checking the
/// previously failed findNode requests. It could be that the node is no longer reachable or
/// lost our entry.
fn find_node_checked(&mut self, node: &NodeRecord, ctx: LookupContext) {
let max_failures = self.config.max_find_node_failures;
let needs_ping = self
.on_entry(node.id, |entry| entry.exceeds_find_node_failures(max_failures))
.unwrap_or(true);
if needs_ping {
self.try_ping(*node, PingReason::Lookup(*node, ctx))
} else {
self.find_node(node, ctx)
}
}
/// Notifies all listeners.
///
/// Removes all listeners that are closed.
@ -860,7 +880,7 @@ impl Discv4Service {
self.kbuckets.buckets_iter().fold(0, |count, bucket| count + bucket.num_connected())
}
/// Check if the peer has a bond
/// Check if the peer has an active bond.
fn has_bond(&self, remote_id: PeerId, remote_ip: IpAddr) -> bool {
if let Some(timestamp) = self.received_pongs.last_pong(remote_id, remote_ip) {
if timestamp.elapsed() < self.config.bond_expiration {
@ -870,6 +890,19 @@ impl Discv4Service {
false
}
/// Applies a closure on the pending or present [`NodeEntry`].
fn on_entry<F, R>(&mut self, peer_id: PeerId, f: F) -> Option<R>
where
F: FnOnce(&NodeEntry) -> R,
{
let key = kad_key(peer_id);
match self.kbuckets.entry(&key) {
BucketEntry::Present(entry, _) => Some(f(entry.value())),
BucketEntry::Pending(mut entry, _) => Some(f(entry.value())),
_ => None,
}
}
/// Update the entry on RE-ping.
///
/// Invoked when we received the Pong to our [`PingReason::RePing`] ping.
@ -929,7 +962,7 @@ impl Discv4Service {
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, old_status) => {
// endpoint is now proven
entry.value_mut().has_endpoint_proof = true;
entry.value_mut().establish_proof();
entry.value_mut().update_with_enr(last_enr_seq);
if !old_status.is_connected() {
@ -945,7 +978,7 @@ impl Discv4Service {
}
kbucket::Entry::Pending(mut entry, mut status) => {
// endpoint is now proven
entry.value().has_endpoint_proof = true;
entry.value().establish_proof();
entry.value().update_with_enr(last_enr_seq);
if !status.is_connected() {
@ -1129,6 +1162,8 @@ impl Discv4Service {
// try to send it
ctx.unmark_queried(record.id);
} else {
// we just received a ping from that peer so we can send a find node request
// directly
self.find_node(&record, ctx);
}
}
@ -1419,14 +1454,28 @@ impl Discv4Service {
BucketEntry::SelfEntry => {
// we received our own node entry
}
BucketEntry::Present(mut entry, _) => {
if entry.value_mut().has_endpoint_proof {
self.find_node(&closest, ctx.clone());
BucketEntry::Present(entry, _) => {
if entry.value().has_endpoint_proof {
if entry
.value()
.exceeds_find_node_failures(self.config.max_find_node_failures)
{
self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
} else {
self.find_node(&closest, ctx.clone());
}
}
}
BucketEntry::Pending(mut entry, _) => {
if entry.value().has_endpoint_proof {
self.find_node(&closest, ctx.clone());
if entry
.value()
.exceeds_find_node_failures(self.config.max_find_node_failures)
{
self.try_ping(closest, PingReason::Lookup(closest, ctx.clone()))
} else {
self.find_node(&closest, ctx.clone());
}
}
}
}
@ -1486,27 +1535,27 @@ impl Discv4Service {
self.remove_node(node_id);
}
self.evict_failed_neighbours(now);
self.evict_failed_find_nodes(now);
}
/// Handles failed responses to `FindNode`
fn evict_failed_neighbours(&mut self, now: Instant) {
let mut failed_neighbours = Vec::new();
fn evict_failed_find_nodes(&mut self, now: Instant) {
let mut failed_find_nodes = Vec::new();
self.pending_find_nodes.retain(|node_id, find_node_request| {
if now.duration_since(find_node_request.sent_at) > self.config.neighbours_expiration {
if !find_node_request.answered {
// node actually responded but with fewer entries than expected, but we don't
// treat this as an hard error since it responded.
failed_neighbours.push(*node_id);
failed_find_nodes.push(*node_id);
}
return false
}
true
});
trace!(target: "discv4", num=%failed_neighbours.len(), "processing failed neighbours");
trace!(target: "discv4", num=%failed_find_nodes.len(), "processing failed find nodes");
for node_id in failed_neighbours {
for node_id in failed_find_nodes {
let key = kad_key(node_id);
let failures = match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, _) => {
@ -1523,7 +1572,7 @@ impl Discv4Service {
// if the node failed to respond anything useful multiple times, remove the node from
// the table, but only if there are enough other nodes in the bucket (bucket must be at
// least half full)
if failures > (self.config.max_find_node_failures as usize) {
if failures > self.config.max_find_node_failures {
self.soft_remove_node(node_id);
}
}
@ -2216,8 +2265,8 @@ struct NodeEntry {
last_enr_seq: Option<u64>,
/// `ForkId` if retrieved via ENR requests.
fork_id: Option<ForkId>,
/// Counter for failed findNode requests.
find_node_failures: usize,
/// Counter for failed _consecutive_ findNode requests.
find_node_failures: u8,
/// Whether the endpoint of the peer is proven.
has_endpoint_proof: bool,
}
@ -2244,6 +2293,17 @@ impl NodeEntry {
node
}
/// Marks the entry with an established proof and resets the consecutive failure counter.
fn establish_proof(&mut self) {
self.has_endpoint_proof = true;
self.find_node_failures = 0;
}
/// Returns true if the tracked find node failures exceed the max amount
const fn exceeds_find_node_failures(&self, max_failures: u8) -> bool {
self.find_node_failures >= max_failures
}
/// Updates the last timestamp and sets the enr seq
fn update_with_enr(&mut self, last_enr_seq: Option<u64>) -> Option<u64> {
self.update_now(|s| std::mem::replace(&mut s.last_enr_seq, last_enr_seq))
@ -2660,6 +2720,45 @@ mod tests {
assert_eq!(ctx.inner.closest_nodes.borrow().len(), 1);
}
#[tokio::test]
async fn test_reping_on_find_node_failures() {
reth_tracing::init_test_tracing();
let config = Discv4Config::builder().build();
let (_discv4, mut service) = create_discv4_with_config(config).await;
let target = PeerId::random();
let id = PeerId::random();
let key = kad_key(id);
let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
let mut entry = NodeEntry::new_proven(record);
entry.find_node_failures = u8::MAX;
let _ = service.kbuckets.insert_or_update(
&key,
entry,
NodeStatus {
direction: ConnectionDirection::Incoming,
state: ConnectionState::Connected,
},
);
service.lookup(target);
assert_eq!(service.pending_find_nodes.len(), 0);
assert_eq!(service.pending_pings.len(), 1);
service.update_on_pong(record, None);
service
.on_entry(record.id, |entry| {
// reset on pong
assert_eq!(entry.find_node_failures, 0);
assert!(entry.has_endpoint_proof);
})
.unwrap();
}
#[tokio::test]
async fn test_service_commands() {
reth_tracing::init_test_tracing();