mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
refactor(disc): use discv4 as log target (#423)
This commit is contained in:
@ -176,7 +176,7 @@ impl Discv4 {
|
|||||||
let socket = UdpSocket::bind(local_address).await?;
|
let socket = UdpSocket::bind(local_address).await?;
|
||||||
let local_addr = socket.local_addr()?;
|
let local_addr = socket.local_addr()?;
|
||||||
local_enr.udp_port = local_addr.port();
|
local_enr.udp_port = local_addr.port();
|
||||||
trace!( target : "net::disc", ?local_addr,"opened UDP socket");
|
trace!( target : "discv4", ?local_addr,"opened UDP socket");
|
||||||
|
|
||||||
// We don't expect many commands, so the buffer can be quite small here.
|
// We don't expect many commands, so the buffer can be quite small here.
|
||||||
let (to_service, rx) = mpsc::channel(5);
|
let (to_service, rx) = mpsc::channel(5);
|
||||||
@ -392,7 +392,7 @@ impl Discv4Service {
|
|||||||
/// **Note:** This is a noop if there are no bootnodes.
|
/// **Note:** This is a noop if there are no bootnodes.
|
||||||
pub fn bootstrap(&mut self) {
|
pub fn bootstrap(&mut self) {
|
||||||
for record in self.config.bootstrap_nodes.clone() {
|
for record in self.config.bootstrap_nodes.clone() {
|
||||||
debug!(target : "net::disc", ?record, "Adding bootstrap node");
|
debug!(target : "discv4", ?record, "Adding bootstrap node");
|
||||||
let key = kad_key(record.id);
|
let key = kad_key(record.id);
|
||||||
let entry = NodeEntry { record, last_seen: Instant::now() };
|
let entry = NodeEntry { record, last_seen: Instant::now() };
|
||||||
|
|
||||||
@ -417,7 +417,7 @@ impl Discv4Service {
|
|||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
self.bootstrap();
|
self.bootstrap();
|
||||||
while let Some(event) = self.next().await {
|
while let Some(event) = self.next().await {
|
||||||
trace!(target : "net::disc", ?event, "processed");
|
trace!(target : "discv4", ?event, "processed");
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -481,7 +481,7 @@ impl Discv4Service {
|
|||||||
|
|
||||||
/// Sends a new `FindNode` packet to the node with `target` as the lookup target.
|
/// Sends a new `FindNode` packet to the node with `target` as the lookup target.
|
||||||
fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
|
fn find_node(&mut self, node: &NodeRecord, ctx: LookupContext) {
|
||||||
trace!(target : "net::disc", ?node, lookup=?ctx.target(), "Sending FindNode");
|
trace!(target : "discv4", ?node, lookup=?ctx.target(), "Sending FindNode");
|
||||||
ctx.mark_queried(node.id);
|
ctx.mark_queried(node.id);
|
||||||
let id = ctx.target();
|
let id = ctx.target();
|
||||||
let msg = Message::FindNode(FindNode { id, expire: self.find_node_timeout() });
|
let msg = Message::FindNode(FindNode { id, expire: self.find_node_timeout() });
|
||||||
@ -534,18 +534,18 @@ impl Discv4Service {
|
|||||||
},
|
},
|
||||||
) {
|
) {
|
||||||
InsertResult::Inserted => {
|
InsertResult::Inserted => {
|
||||||
debug!(target : "net::disc",?record, "inserted new record to table");
|
debug!(target : "discv4",?record, "inserted new record to table");
|
||||||
self.notify(DiscoveryUpdate::Added(record));
|
self.notify(DiscoveryUpdate::Added(record));
|
||||||
}
|
}
|
||||||
InsertResult::ValueUpdated { .. } | InsertResult::Updated { .. } => {
|
InsertResult::ValueUpdated { .. } | InsertResult::Updated { .. } => {
|
||||||
trace!(target : "net::disc",?record, "updated record");
|
trace!(target : "discv4",?record, "updated record");
|
||||||
}
|
}
|
||||||
InsertResult::Failed(FailureReason::BucketFull) => {
|
InsertResult::Failed(FailureReason::BucketFull) => {
|
||||||
debug!(target : "net::disc", ?record, "discovered new record but bucket is full");
|
debug!(target : "discv4", ?record, "discovered new record but bucket is full");
|
||||||
self.notify(DiscoveryUpdate::Discovered(record));
|
self.notify(DiscoveryUpdate::Discovered(record));
|
||||||
}
|
}
|
||||||
res => {
|
res => {
|
||||||
warn!(target : "net::disc",?record, ?res, "failed to insert");
|
warn!(target : "discv4",?record, ?res, "failed to insert");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -575,7 +575,7 @@ impl Discv4Service {
|
|||||||
/// Encodes the packet, sends it and returns the hash.
|
/// Encodes the packet, sends it and returns the hash.
|
||||||
pub(crate) fn send_packet(&mut self, msg: Message, to: SocketAddr) -> H256 {
|
pub(crate) fn send_packet(&mut self, msg: Message, to: SocketAddr) -> H256 {
|
||||||
let (payload, hash) = msg.encode(&self.secret_key);
|
let (payload, hash) = msg.encode(&self.secret_key);
|
||||||
trace!(target : "net::disc", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
|
trace!(target : "discv4", r#type=?msg.msg_type(), ?to, ?hash, "sending packet");
|
||||||
let _ = self.egress.try_send((payload, to));
|
let _ = self.egress.try_send((payload, to));
|
||||||
hash
|
hash
|
||||||
}
|
}
|
||||||
@ -624,7 +624,7 @@ impl Discv4Service {
|
|||||||
let id = node.id;
|
let id = node.id;
|
||||||
let ping =
|
let ping =
|
||||||
Ping { from: self.local_enr.into(), to: node.into(), expire: self.ping_timeout() };
|
Ping { from: self.local_enr.into(), to: node.into(), expire: self.ping_timeout() };
|
||||||
trace!(target : "net::disc", ?ping, "sending ping");
|
trace!(target : "discv4", ?ping, "sending ping");
|
||||||
let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
|
let echo_hash = self.send_packet(Message::Ping(ping), remote_addr);
|
||||||
|
|
||||||
self.pending_pings
|
self.pending_pings
|
||||||
@ -643,7 +643,7 @@ impl Discv4Service {
|
|||||||
{
|
{
|
||||||
let request = entry.get();
|
let request = entry.get();
|
||||||
if request.echo_hash != pong.echo {
|
if request.echo_hash != pong.echo {
|
||||||
debug!( target : "net::disc", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
|
debug!( target : "discv4", from=?remote_addr, expected=?request.echo_hash, echo_hash=?pong.echo,"Got unexpected Pong");
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -710,7 +710,7 @@ impl Discv4Service {
|
|||||||
if total <= MAX_NODES_PER_BUCKET {
|
if total <= MAX_NODES_PER_BUCKET {
|
||||||
request.response_count = total;
|
request.response_count = total;
|
||||||
} else {
|
} else {
|
||||||
debug!(target : "net::disc", total, from=?remote_addr, "Got oversized Neighbors packet");
|
debug!(target : "discv4", total, from=?remote_addr, "Got oversized Neighbors packet");
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -724,7 +724,7 @@ impl Discv4Service {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Entry::Vacant(_) => {
|
Entry::Vacant(_) => {
|
||||||
debug!( target : "net::disc", from=?remote_addr, "Received unsolicited Neighbours");
|
debug!( target : "discv4", from=?remote_addr, "Received unsolicited Neighbours");
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -761,7 +761,7 @@ impl Discv4Service {
|
|||||||
|
|
||||||
for nodes in all_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
|
for nodes in all_nodes.chunks(SAFE_MAX_DATAGRAM_NEIGHBOUR_RECORDS) {
|
||||||
let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
|
let nodes = nodes.iter().map(|node| node.value.record).collect::<Vec<NodeRecord>>();
|
||||||
trace!( target : "net::disc", len = nodes.len(), to=?to,"Sent neighbours packet");
|
trace!( target : "discv4", len = nodes.len(), to=?to,"Sent neighbours packet");
|
||||||
let msg = Message::Neighbours(Neighbours { nodes, expire });
|
let msg = Message::Neighbours(Neighbours { nodes, expire });
|
||||||
self.send_packet(msg, to);
|
self.send_packet(msg, to);
|
||||||
}
|
}
|
||||||
@ -770,7 +770,7 @@ impl Discv4Service {
|
|||||||
/// Returns the current status of the node
|
/// Returns the current status of the node
|
||||||
fn node_status(&mut self, node: PeerId, addr: SocketAddr) -> NodeEntryStatus {
|
fn node_status(&mut self, node: PeerId, addr: SocketAddr) -> NodeEntryStatus {
|
||||||
if node == self.local_enr.id {
|
if node == self.local_enr.id {
|
||||||
debug!( target : "net::disc", ?node,"Got an incoming discovery request from self");
|
debug!( target : "discv4", ?node,"Got an incoming discovery request from self");
|
||||||
return NodeEntryStatus::IsLocal
|
return NodeEntryStatus::IsLocal
|
||||||
}
|
}
|
||||||
let key = kad_key(node);
|
let key = kad_key(node);
|
||||||
@ -843,7 +843,7 @@ impl Discv4Service {
|
|||||||
fn ensure_timestamp(&self, expiration: u64) -> Result<(), ()> {
|
fn ensure_timestamp(&self, expiration: u64) -> Result<(), ()> {
|
||||||
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
|
let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs();
|
||||||
if self.check_timestamps && expiration < now {
|
if self.check_timestamps && expiration < now {
|
||||||
debug!(target: "net::disc", "Expired packet");
|
debug!(target: "discv4", "Expired packet");
|
||||||
return Err(())
|
return Err(())
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -925,10 +925,10 @@ impl Discv4Service {
|
|||||||
match event {
|
match event {
|
||||||
IngressEvent::RecvError(_) => {}
|
IngressEvent::RecvError(_) => {}
|
||||||
IngressEvent::BadPacket(from, err, data) => {
|
IngressEvent::BadPacket(from, err, data) => {
|
||||||
warn!(target : "net::disc", ?from, ?err, packet=?hex::encode(&data), "bad packet");
|
warn!(target : "discv4", ?from, ?err, packet=?hex::encode(&data), "bad packet");
|
||||||
}
|
}
|
||||||
IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
|
IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => {
|
||||||
trace!( target : "net::disc", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
|
trace!( target : "discv4", r#type=?msg.msg_type(), from=?remote_addr,"received packet");
|
||||||
return match msg {
|
return match msg {
|
||||||
Message::Ping(ping) => {
|
Message::Ping(ping) => {
|
||||||
self.on_ping(ping, remote_addr, node_id, hash);
|
self.on_ping(ping, remote_addr, node_id, hash);
|
||||||
@ -988,10 +988,10 @@ pub(crate) async fn send_loop(udp: Arc<UdpSocket>, rx: EgressReceiver) {
|
|||||||
while let Some((payload, to)) = stream.next().await {
|
while let Some((payload, to)) = stream.next().await {
|
||||||
match udp.send_to(&payload, to).await {
|
match udp.send_to(&payload, to).await {
|
||||||
Ok(size) => {
|
Ok(size) => {
|
||||||
trace!( target : "net::disc", ?to, ?size,"sent payload");
|
trace!( target : "discv4", ?to, ?size,"sent payload");
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!( target : "net::disc", ?to, ?err,"Failed to send datagram.");
|
warn!( target : "discv4", ?to, ?err,"Failed to send datagram.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1004,7 +1004,7 @@ pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_i
|
|||||||
let res = udp.recv_from(&mut buf).await;
|
let res = udp.recv_from(&mut buf).await;
|
||||||
match res {
|
match res {
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!(target : "net::disc", ?err, "Failed to read datagram.");
|
warn!(target : "discv4", ?err, "Failed to read datagram.");
|
||||||
let _ = tx.send(IngressEvent::RecvError(err)).await;
|
let _ = tx.send(IngressEvent::RecvError(err)).await;
|
||||||
}
|
}
|
||||||
Ok((read, remote_addr)) => {
|
Ok((read, remote_addr)) => {
|
||||||
@ -1013,13 +1013,13 @@ pub(crate) async fn receive_loop(udp: Arc<UdpSocket>, tx: IngressSender, local_i
|
|||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
if packet.node_id == local_id {
|
if packet.node_id == local_id {
|
||||||
// received our own message
|
// received our own message
|
||||||
warn!(target : "net::disc", ?remote_addr, "Received own packet.");
|
warn!(target : "discv4", ?remote_addr, "Received own packet.");
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
let _ = tx.send(IngressEvent::Packet(remote_addr, packet)).await;
|
let _ = tx.send(IngressEvent::Packet(remote_addr, packet)).await;
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!( target : "net::disc", ?err,"Failed to decode packet");
|
warn!( target : "discv4", ?err,"Failed to decode packet");
|
||||||
let _ = tx
|
let _ = tx
|
||||||
.send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec()))
|
.send(IngressEvent::BadPacket(remote_addr, err, packet.to_vec()))
|
||||||
.await;
|
.await;
|
||||||
|
|||||||
@ -149,7 +149,7 @@ impl Stream for MockDiscovery {
|
|||||||
match event {
|
match event {
|
||||||
IngressEvent::RecvError(_) => {}
|
IngressEvent::RecvError(_) => {}
|
||||||
IngressEvent::BadPacket(from, err, data) => {
|
IngressEvent::BadPacket(from, err, data) => {
|
||||||
error!( target : "net::disc", ?from, ?err, packet=?hex::encode(&data), "bad packet");
|
error!( target : "discv4", ?from, ?err, packet=?hex::encode(&data), "bad packet");
|
||||||
}
|
}
|
||||||
IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => match msg {
|
IngressEvent::Packet(remote_addr, Packet { msg, node_id, hash }) => match msg {
|
||||||
Message::Ping(ping) => {
|
Message::Ping(ping) => {
|
||||||
|
|||||||
Reference in New Issue
Block a user