diff --git a/crates/net/eth-wire/src/lib.rs b/crates/net/eth-wire/src/lib.rs index b770044fa..11ebc2d9e 100644 --- a/crates/net/eth-wire/src/lib.rs +++ b/crates/net/eth-wire/src/lib.rs @@ -23,5 +23,5 @@ pub use types::*; pub use crate::{ ethstream::{EthStream, UnauthedEthStream}, - p2pstream::{HelloMessage, P2PStream, UnauthedP2PStream}, + p2pstream::{DisconnectReason, HelloMessage, P2PStream, UnauthedP2PStream}, }; diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers.rs index 9592f215e..607686fbe 100644 --- a/crates/net/network/src/peers.rs +++ b/crates/net/network/src/peers.rs @@ -1,4 +1,5 @@ use futures::StreamExt; +use reth_eth_wire::DisconnectReason; use reth_primitives::PeerId; use std::{ collections::{hash_map::Entry, HashMap, VecDeque}, @@ -124,8 +125,9 @@ impl PeersManager { pub(crate) fn remove_discovered_node(&mut self, peer_id: PeerId) { if let Some(entry) = self.peers.remove(&peer_id) { if entry.state.is_connected() { + // TODO(mattsse): is this right to disconnect peers? self.connection_info.decr_state(entry.state); - self.queued_actions.push_back(PeerAction::Disconnect { peer_id }) + self.queued_actions.push_back(PeerAction::Disconnect { peer_id, reason: None }) } } } @@ -352,7 +354,7 @@ pub enum PeerAction { remote_addr: SocketAddr, }, /// Disconnect an existing connection. - Disconnect { peer_id: PeerId }, + Disconnect { peer_id: PeerId, reason: Option }, /// Disconnect an existing incoming connection, because the peers reputation is below the /// banned threshold. DisconnectBannedIncoming { diff --git a/crates/net/network/src/session/handle.rs b/crates/net/network/src/session/handle.rs index 6a9248e3d..177865dee 100644 --- a/crates/net/network/src/session/handle.rs +++ b/crates/net/network/src/session/handle.rs @@ -7,7 +7,7 @@ use reth_ecies::{stream::ECIESStream, ECIESError}; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, error::EthStreamError, - EthStream, P2PStream, Status, + DisconnectReason, EthStream, P2PStream, Status, }; use reth_primitives::PeerId; use std::{io, net::SocketAddr, sync::Arc, time::Instant}; @@ -48,9 +48,9 @@ pub(crate) struct ActiveSessionHandle { impl ActiveSessionHandle { /// Sends a disconnect command to the session. - pub(crate) fn disconnect(&self) { + pub(crate) fn disconnect(&self, reason: Option) { // Note: we clone the sender which ensures the channel has capacity to send the message - let _ = self.commands_to_session.clone().try_send(SessionCommand::Disconnect); + let _ = self.commands_to_session.clone().try_send(SessionCommand::Disconnect { reason }); } } @@ -95,7 +95,10 @@ pub(crate) enum PendingSessionEvent { #[derive(Debug)] pub(crate) enum SessionCommand { /// Disconnect the connection - Disconnect, + Disconnect { + /// Why the disconnect was initiated + reason: Option, + }, /// Sends a message to the peer Message(PeerMessage), } diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index 5c6657422..69e6b285b 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -16,7 +16,8 @@ use reth_ecies::stream::ECIESStream; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, error::EthStreamError, - HelloBuilder, HelloMessage, Status, StatusBuilder, UnauthedEthStream, UnauthedP2PStream, + DisconnectReason, HelloBuilder, HelloMessage, Status, StatusBuilder, UnauthedEthStream, + UnauthedP2PStream, }; use reth_primitives::{ForkFilter, Hardfork, PeerId}; use secp256k1::{SecretKey, SECP256K1}; @@ -192,9 +193,9 @@ impl SessionManager { /// /// This will trigger the disconnect on the session task to gracefully terminate. The result /// will be picked up by the receiver. - pub(crate) fn disconnect(&self, node: PeerId) { + pub(crate) fn disconnect(&self, node: PeerId, reason: Option) { if let Some(session) = self.active_sessions.get(&node) { - session.disconnect(); + session.disconnect(reason); } } diff --git a/crates/net/network/src/state.rs b/crates/net/network/src/state.rs index a27ac007c..baf950341 100644 --- a/crates/net/network/src/state.rs +++ b/crates/net/network/src/state.rs @@ -10,7 +10,9 @@ use crate::{ }, peers::{PeerAction, PeersManager}, }; -use reth_eth_wire::{capability::Capabilities, BlockHashNumber, NewBlockHashes, Status}; +use reth_eth_wire::{ + capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status, +}; use reth_interfaces::provider::BlockProvider; use reth_primitives::{PeerId, H256}; use std::{ @@ -224,14 +226,14 @@ where self.queued_messages .push_back(StateAction::Connect { node_id: peer_id, remote_addr }); } - PeerAction::Disconnect { peer_id } => { + PeerAction::Disconnect { peer_id, reason } => { self.state_fetcher.on_pending_disconnect(&peer_id); - self.queued_messages.push_back(StateAction::Disconnect { node_id: peer_id }); + self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason }); } PeerAction::DisconnectBannedIncoming { peer_id } => { // TODO: can IP ban self.state_fetcher.on_pending_disconnect(&peer_id); - self.queued_messages.push_back(StateAction::Disconnect { node_id: peer_id }); + self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None }); } } } @@ -388,7 +390,11 @@ pub enum StateAction { /// Create a new connection to the given node. Connect { remote_addr: SocketAddr, node_id: PeerId }, /// Disconnect an existing connection - Disconnect { node_id: PeerId }, + Disconnect { + peer_id: PeerId, + /// Why the disconnect was initiated + reason: Option, + }, } #[derive(Debug, thiserror::Error)] diff --git a/crates/net/network/src/swarm.rs b/crates/net/network/src/swarm.rs index ceb40fa86..81f26036a 100644 --- a/crates/net/network/src/swarm.rs +++ b/crates/net/network/src/swarm.rs @@ -8,6 +8,7 @@ use futures::Stream; use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, error::EthStreamError, + DisconnectReason, }; use reth_interfaces::provider::BlockProvider; use reth_primitives::PeerId; @@ -90,7 +91,9 @@ where }), Err(err) => { match err { - AddSessionError::AtCapacity { peer } => self.sessions.disconnect(peer), + AddSessionError::AtCapacity { peer } => { + self.sessions.disconnect(peer, Some(DisconnectReason::TooManyPeers)) + } }; None } @@ -146,8 +149,8 @@ where StateAction::Connect { remote_addr, node_id } => { self.sessions.dial_outbound(remote_addr, node_id); } - StateAction::Disconnect { node_id } => { - self.sessions.disconnect(node_id); + StateAction::Disconnect { peer_id: node_id, reason } => { + self.sessions.disconnect(node_id, reason); } StateAction::NewBlock { peer_id, block: msg } => { let msg = PeerMessage::NewBlock(msg);