chore(net): improve session error traces (#435)

This commit is contained in:
Matthias Seitz
2022-12-14 12:48:28 +01:00
committed by GitHub
parent 601bdc5022
commit 9946f232a7

View File

@ -33,7 +33,7 @@ use tokio::{
time::Interval,
};
use tokio_stream::wrappers::ReceiverStream;
use tracing::{error, warn};
use tracing::{debug, error, warn};
/// The type that advances an established session by listening for incoming messages (from local
/// node or read from connection) and emitting events back to the [`SessionsManager`].
@ -89,8 +89,8 @@ impl ActiveSession {
/// Handle a message read from the connection.
///
/// Returns an error if the message is considered to be in violation of the protocol
fn on_incoming(&mut self, msg: EthMessage) -> Option<EthStreamError> {
/// Returns an error if the message is considered to be in violation of the protocol.
fn on_incoming(&mut self, msg: EthMessage) -> Option<(EthStreamError, EthMessage)> {
/// A macro that handles an incoming request
/// This creates a new channel and tries to send the sender half to the session while
/// storing to receiver half internally so the pending response can be polled.
@ -134,8 +134,11 @@ impl ActiveSession {
}
match msg {
EthMessage::Status(_) => {
return Some(EthStreamError::HandshakeError(HandshakeError::StatusNotInHandshake))
msg @ EthMessage::Status(_) => {
return Some((
EthStreamError::HandshakeError(HandshakeError::StatusNotInHandshake),
msg,
))
}
EthMessage::NewBlockHashes(msg) => {
self.emit_message(PeerMessage::NewBlockHashes(msg));
@ -300,6 +303,7 @@ impl ActiveSession {
}
}
for id in timedout {
warn!(target: "net::session", remote_peer_id=?self.remote_peer_id, "timed out outgoing request");
let req = self.inflight_requests.remove(&id).expect("exists; qed");
req.request.send_err_response(RequestError::Timeout);
}
@ -380,10 +384,14 @@ impl Future for ActiveSession {
OutgoingMessage::Eth(msg) => this.conn.start_send_unpin(msg),
OutgoingMessage::Broadcast(msg) => this.conn.start_send_broadcast(msg),
};
if let Err(_err) = res {
if let Err(err) = res {
error!(target: "net::session", ?err, remote_peer_id=?this.remote_peer_id, "failed to send message");
// notify the manager
this.close_on_error(err);
return Poll::Ready(())
}
} else {
// no more messages to send over the wire
break
}
}
@ -395,6 +403,7 @@ impl Future for ActiveSession {
if this.is_disconnecting() {
break
} else {
debug!(target: "net::session", remote_peer_id=?this.remote_peer_id, "eth stream completed");
this.emit_disconnect();
return Poll::Ready(())
}
@ -404,12 +413,14 @@ impl Future for ActiveSession {
match res {
Ok(msg) => {
// decode and handle message
if let Some(err) = this.on_incoming(msg) {
if let Some((err, bad_protocol_msg)) = this.on_incoming(msg) {
error!(target: "net::session", ?err, msg=?bad_protocol_msg, remote_peer_id=?this.remote_peer_id, "received invalid protocol message");
this.close_on_error(err);
return Poll::Ready(())
}
}
Err(err) => {
error!(target: "net::session", ?err, remote_peer_id=?this.remote_peer_id, "failed to receive message");
this.close_on_error(err);
return Poll::Ready(())
}