mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 19:09:54 +00:00
fix(disc) Send find_node request only after verifiying our endpoint proof (#4909)
This commit is contained in:
@ -432,6 +432,11 @@ pub struct Discv4Service {
|
||||
queued_pings: VecDeque<(NodeRecord, PingReason)>,
|
||||
/// Currently active pings to specific nodes.
|
||||
pending_pings: HashMap<PeerId, PingRequest>,
|
||||
/// Currently active endpoint proof verification lookups to specific nodes.
|
||||
///
|
||||
/// Entries here means we've proven the peer's endpoint but haven't completed our end of the
|
||||
/// endpoint proof
|
||||
pending_lookup: HashMap<PeerId, (Instant, LookupContext)>,
|
||||
/// Currently active FindNode requests
|
||||
pending_find_nodes: HashMap<PeerId, FindNodeRequest>,
|
||||
/// Currently active ENR requests
|
||||
@ -546,6 +551,7 @@ impl Discv4Service {
|
||||
egress: egress_tx,
|
||||
queued_pings: Default::default(),
|
||||
pending_pings: Default::default(),
|
||||
pending_lookup: Default::default(),
|
||||
pending_find_nodes: Default::default(),
|
||||
pending_enr_requests: Default::default(),
|
||||
commands_rx,
|
||||
@ -988,10 +994,17 @@ impl Discv4Service {
|
||||
// the ping interval
|
||||
let mut is_new_insert = false;
|
||||
let mut needs_bond = false;
|
||||
let mut is_proven = false;
|
||||
|
||||
let old_enr = match self.kbuckets.entry(&key) {
|
||||
kbucket::Entry::Present(mut entry, _) => entry.value_mut().update_with_enr(ping.enr_sq),
|
||||
kbucket::Entry::Pending(mut entry, _) => entry.value().update_with_enr(ping.enr_sq),
|
||||
kbucket::Entry::Present(mut entry, _) => {
|
||||
is_proven = entry.value().has_endpoint_proof;
|
||||
entry.value_mut().update_with_enr(ping.enr_sq)
|
||||
}
|
||||
kbucket::Entry::Pending(mut entry, _) => {
|
||||
is_proven = entry.value().has_endpoint_proof;
|
||||
entry.value().update_with_enr(ping.enr_sq)
|
||||
}
|
||||
kbucket::Entry::Absent(entry) => {
|
||||
let mut node = NodeEntry::new(record);
|
||||
node.last_enr_seq = ping.enr_sq;
|
||||
@ -1044,6 +1057,19 @@ impl Discv4Service {
|
||||
self.try_ping(record, PingReason::InitialInsert);
|
||||
} else if needs_bond {
|
||||
self.try_ping(record, PingReason::EstablishBond);
|
||||
} else if is_proven {
|
||||
// if node has been proven, this means we've recieved a pong and verified its endpoint
|
||||
// proof. We've also sent a pong above to verify our endpoint proof, so we can now
|
||||
// send our find_nodes request if PingReason::Lookup
|
||||
if let Some((_, ctx)) = self.pending_lookup.remove(&record.id) {
|
||||
if self.pending_find_nodes.contains_key(&record.id) {
|
||||
// there's already another pending request, unmark it so the next round can
|
||||
// try to send it
|
||||
ctx.unmark_queried(record.id);
|
||||
} else {
|
||||
self.find_node(&record, ctx);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Request ENR if included in the ping
|
||||
match (ping.enr_sq, old_enr) {
|
||||
@ -1156,13 +1182,11 @@ impl Discv4Service {
|
||||
}
|
||||
PingReason::Lookup(node, ctx) => {
|
||||
self.update_on_pong(node, pong.enr_sq);
|
||||
if self.pending_find_nodes.contains_key(&node.id) {
|
||||
// there's already another pending request, unmark it so the next round can try
|
||||
// to send it
|
||||
ctx.unmark_queried(node.id);
|
||||
} else {
|
||||
self.find_node(&node, ctx);
|
||||
}
|
||||
// insert node and assoc. lookup_context into the pending_lookup table to complete
|
||||
// our side of the endpoint proof verification.
|
||||
// Start the lookup timer here - and evict accordingly. Note that this is a separate
|
||||
// timer than the ping_request timer.
|
||||
self.pending_lookup.insert(node.id, (Instant::now(), ctx));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1279,7 +1303,7 @@ impl Discv4Service {
|
||||
};
|
||||
|
||||
// This is the recursive lookup step where we initiate new FindNode requests for new nodes
|
||||
// that where discovered.
|
||||
// that were discovered.
|
||||
for node in msg.nodes.into_iter().map(NodeRecord::into_ipv4_mapped) {
|
||||
// prevent banned peers from being added to the context
|
||||
if self.config.ban_list.is_banned(&node.id, &node.address) {
|
||||
@ -1299,7 +1323,8 @@ impl Discv4Service {
|
||||
match self.kbuckets.entry(&key) {
|
||||
BucketEntry::Absent(entry) => {
|
||||
// the node's endpoint is not proven yet, so we need to ping it first, on
|
||||
// success, it will initiate a `FindNode` request.
|
||||
// success, we will add the node to the pending_lookup table, and wait to send
|
||||
// back a Pong before initiating a FindNode request.
|
||||
// In order to prevent that this node is selected again on subsequent responses,
|
||||
// while the ping is still active, we always mark it as queried.
|
||||
ctx.mark_queried(closest.id);
|
||||
@ -1365,6 +1390,21 @@ impl Discv4Service {
|
||||
self.remove_node(node_id);
|
||||
}
|
||||
|
||||
let mut failed_lookups = Vec::new();
|
||||
self.pending_lookup.retain(|node_id, (lookup_sent_at, _)| {
|
||||
if now.duration_since(*lookup_sent_at) > self.config.ping_expiration {
|
||||
failed_lookups.push(*node_id);
|
||||
return false
|
||||
}
|
||||
true
|
||||
});
|
||||
debug!(target: "discv4", num=%failed_lookups.len(), "evicting nodes due to failed lookup");
|
||||
|
||||
// remove nodes that failed the e2e lookup process, so we can restart it
|
||||
for node_id in failed_lookups {
|
||||
self.remove_node(node_id);
|
||||
}
|
||||
|
||||
self.evict_failed_neighbours(now);
|
||||
}
|
||||
|
||||
@ -1802,7 +1842,7 @@ impl LookupTargetRotator {
|
||||
/// Tracks lookups across multiple `FindNode` requests.
|
||||
///
|
||||
/// If this type is dropped by all
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Debug)]
|
||||
struct LookupContext {
|
||||
inner: Rc<LookupContextInner>,
|
||||
}
|
||||
@ -1905,7 +1945,7 @@ impl LookupContext {
|
||||
// guaranteed that there's only 1 owner ([`Discv4Service`]) of all possible [`Rc`] clones of
|
||||
// [`LookupContext`].
|
||||
unsafe impl Send for LookupContext {}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct LookupContextInner {
|
||||
/// The target to lookup.
|
||||
target: discv5::Key<NodeKey>,
|
||||
@ -2272,7 +2312,7 @@ mod tests {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let config = Discv4Config::builder().build();
|
||||
let (_discv4, mut service) = create_discv4_with_config(config).await;
|
||||
let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
|
||||
|
||||
let id = PeerId::random();
|
||||
let key = kad_key(id);
|
||||
@ -2296,9 +2336,65 @@ mod tests {
|
||||
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_on_neighbours_recursive_lookup() {
|
||||
reth_tracing::init_test_tracing();
|
||||
|
||||
let config = Discv4Config::builder().build();
|
||||
let (_discv4, mut service) = create_discv4_with_config(config.clone()).await;
|
||||
let (_discv4, mut service2) = create_discv4_with_config(config).await;
|
||||
|
||||
let id = PeerId::random();
|
||||
let key = kad_key(id);
|
||||
let record = NodeRecord::new("0.0.0.0:0".parse().unwrap(), id);
|
||||
|
||||
let _ = service.kbuckets.insert_or_update(
|
||||
&key,
|
||||
NodeEntry::new_proven(record),
|
||||
NodeStatus {
|
||||
direction: ConnectionDirection::Incoming,
|
||||
state: ConnectionState::Connected,
|
||||
},
|
||||
);
|
||||
// Needed in this test to populate self.pending_find_nodes for as a prereq to a valid
|
||||
// on_neighbours request
|
||||
service.lookup_self();
|
||||
assert_eq!(service.pending_find_nodes.len(), 1);
|
||||
|
||||
poll_fn(|cx| {
|
||||
let _ = service.poll(cx);
|
||||
assert_eq!(service.pending_find_nodes.len(), 1);
|
||||
|
||||
Poll::Ready(())
|
||||
})
|
||||
.await;
|
||||
|
||||
let expiry = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs() +
|
||||
10000000000000;
|
||||
let msg = Neighbours { nodes: vec![service2.local_node_record], expire: expiry };
|
||||
service.on_neighbours(msg, record.tcp_addr(), id);
|
||||
// wait for the processed ping
|
||||
let event = poll_fn(|cx| service2.poll(cx)).await;
|
||||
assert_eq!(event, Discv4Event::Ping);
|
||||
// assert that no find_node req has been added here on top of the initial one, since both
|
||||
// sides of the endpoint proof is not completed here
|
||||
assert_eq!(service.pending_find_nodes.len(), 1);
|
||||
// we now wait for PONG
|
||||
let event = poll_fn(|cx| service.poll(cx)).await;
|
||||
assert_eq!(event, Discv4Event::Pong);
|
||||
// Ideally we want to assert against service.pending_lookup.len() here - but because the
|
||||
// service2 sends Pong and Ping consecutivley on_ping(), the pending_lookup table gets
|
||||
// drained almost immediately - and no way to grab the handle to its intermediary state here
|
||||
// :(
|
||||
let event = poll_fn(|cx| service.poll(cx)).await;
|
||||
assert_eq!(event, Discv4Event::Ping);
|
||||
// assert that we've added the find_node req here after both sides of the endpoint proof is
|
||||
// done
|
||||
assert_eq!(service.pending_find_nodes.len(), 2);
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn test_no_local_in_closest() {
|
||||
reth_tracing::init_test_tracing();
|
||||
@ -2463,14 +2559,12 @@ mod tests {
|
||||
// we now wait for PONG
|
||||
let event = poll_fn(|cx| service_2.poll(cx)).await;
|
||||
|
||||
// Since the endpoint was already proven from 1 POV it can already send a FindNode so the
|
||||
// next event is either the PONG or Find Node
|
||||
match event {
|
||||
Discv4Event::FindNode | Discv4Event::EnrRequest => {
|
||||
Discv4Event::EnrRequest => {
|
||||
// since we support enr in the ping it may also request the enr
|
||||
let event = poll_fn(|cx| service_2.poll(cx)).await;
|
||||
match event {
|
||||
Discv4Event::FindNode | Discv4Event::EnrRequest => {
|
||||
Discv4Event::EnrRequest => {
|
||||
let event = poll_fn(|cx| service_2.poll(cx)).await;
|
||||
assert_eq!(event, Discv4Event::Pong);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user