From 9946f232a7a9ac16f900ae18a636010258db7502 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 14 Dec 2022 12:48:28 +0100 Subject: [PATCH] chore(net): improve session error traces (#435) --- crates/net/network/src/session/active.rs | 25 +++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 02f3301ed..2f8ae5cd6 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -33,7 +33,7 @@ use tokio::{ time::Interval, }; use tokio_stream::wrappers::ReceiverStream; -use tracing::{error, warn}; +use tracing::{debug, error, warn}; /// The type that advances an established session by listening for incoming messages (from local /// node or read from connection) and emitting events back to the [`SessionsManager`]. @@ -89,8 +89,8 @@ impl ActiveSession { /// Handle a message read from the connection. /// - /// Returns an error if the message is considered to be in violation of the protocol - fn on_incoming(&mut self, msg: EthMessage) -> Option { + /// Returns an error if the message is considered to be in violation of the protocol. + fn on_incoming(&mut self, msg: EthMessage) -> Option<(EthStreamError, EthMessage)> { /// A macro that handles an incoming request /// This creates a new channel and tries to send the sender half to the session while /// storing to receiver half internally so the pending response can be polled. @@ -134,8 +134,11 @@ impl ActiveSession { } match msg { - EthMessage::Status(_) => { - return Some(EthStreamError::HandshakeError(HandshakeError::StatusNotInHandshake)) + msg @ EthMessage::Status(_) => { + return Some(( + EthStreamError::HandshakeError(HandshakeError::StatusNotInHandshake), + msg, + )) } EthMessage::NewBlockHashes(msg) => { self.emit_message(PeerMessage::NewBlockHashes(msg)); @@ -300,6 +303,7 @@ impl ActiveSession { } } for id in timedout { + warn!(target: "net::session", remote_peer_id=?self.remote_peer_id, "timed out outgoing request"); let req = self.inflight_requests.remove(&id).expect("exists; qed"); req.request.send_err_response(RequestError::Timeout); } @@ -380,10 +384,14 @@ impl Future for ActiveSession { OutgoingMessage::Eth(msg) => this.conn.start_send_unpin(msg), OutgoingMessage::Broadcast(msg) => this.conn.start_send_broadcast(msg), }; - if let Err(_err) = res { + if let Err(err) = res { + error!(target: "net::session", ?err, remote_peer_id=?this.remote_peer_id, "failed to send message"); + // notify the manager + this.close_on_error(err); return Poll::Ready(()) } } else { + // no more messages to send over the wire break } } @@ -395,6 +403,7 @@ impl Future for ActiveSession { if this.is_disconnecting() { break } else { + debug!(target: "net::session", remote_peer_id=?this.remote_peer_id, "eth stream completed"); this.emit_disconnect(); return Poll::Ready(()) } @@ -404,12 +413,14 @@ impl Future for ActiveSession { match res { Ok(msg) => { // decode and handle message - if let Some(err) = this.on_incoming(msg) { + if let Some((err, bad_protocol_msg)) = this.on_incoming(msg) { + error!(target: "net::session", ?err, msg=?bad_protocol_msg, remote_peer_id=?this.remote_peer_id, "received invalid protocol message"); this.close_on_error(err); return Poll::Ready(()) } } Err(err) => { + error!(target: "net::session", ?err, remote_peer_id=?this.remote_peer_id, "failed to receive message"); this.close_on_error(err); return Poll::Ready(()) }