feat(net): add disconnect reason (#218)

This commit is contained in:
Matthias Seitz
2022-11-16 20:11:34 +01:00
committed by GitHub
parent 09c5c3449e
commit 3ffc0da7c8
6 changed files with 33 additions and 18 deletions

View File

@ -23,5 +23,5 @@ pub use types::*;
pub use crate::{
ethstream::{EthStream, UnauthedEthStream},
p2pstream::{HelloMessage, P2PStream, UnauthedP2PStream},
p2pstream::{DisconnectReason, HelloMessage, P2PStream, UnauthedP2PStream},
};

View File

@ -1,4 +1,5 @@
use futures::StreamExt;
use reth_eth_wire::DisconnectReason;
use reth_primitives::PeerId;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
@ -124,8 +125,9 @@ impl PeersManager {
pub(crate) fn remove_discovered_node(&mut self, peer_id: PeerId) {
if let Some(entry) = self.peers.remove(&peer_id) {
if entry.state.is_connected() {
// TODO(mattsse): is this right to disconnect peers?
self.connection_info.decr_state(entry.state);
self.queued_actions.push_back(PeerAction::Disconnect { peer_id })
self.queued_actions.push_back(PeerAction::Disconnect { peer_id, reason: None })
}
}
}
@ -352,7 +354,7 @@ pub enum PeerAction {
remote_addr: SocketAddr,
},
/// Disconnect an existing connection.
Disconnect { peer_id: PeerId },
Disconnect { peer_id: PeerId, reason: Option<DisconnectReason> },
/// Disconnect an existing incoming connection, because the peers reputation is below the
/// banned threshold.
DisconnectBannedIncoming {

View File

@ -7,7 +7,7 @@ use reth_ecies::{stream::ECIESStream, ECIESError};
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
error::EthStreamError,
EthStream, P2PStream, Status,
DisconnectReason, EthStream, P2PStream, Status,
};
use reth_primitives::PeerId;
use std::{io, net::SocketAddr, sync::Arc, time::Instant};
@ -48,9 +48,9 @@ pub(crate) struct ActiveSessionHandle {
impl ActiveSessionHandle {
/// Sends a disconnect command to the session.
pub(crate) fn disconnect(&self) {
pub(crate) fn disconnect(&self, reason: Option<DisconnectReason>) {
// Note: we clone the sender which ensures the channel has capacity to send the message
let _ = self.commands_to_session.clone().try_send(SessionCommand::Disconnect);
let _ = self.commands_to_session.clone().try_send(SessionCommand::Disconnect { reason });
}
}
@ -95,7 +95,10 @@ pub(crate) enum PendingSessionEvent {
#[derive(Debug)]
pub(crate) enum SessionCommand {
/// Disconnect the connection
Disconnect,
Disconnect {
/// Why the disconnect was initiated
reason: Option<DisconnectReason>,
},
/// Sends a message to the peer
Message(PeerMessage),
}

View File

@ -16,7 +16,8 @@ use reth_ecies::stream::ECIESStream;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
error::EthStreamError,
HelloBuilder, HelloMessage, Status, StatusBuilder, UnauthedEthStream, UnauthedP2PStream,
DisconnectReason, HelloBuilder, HelloMessage, Status, StatusBuilder, UnauthedEthStream,
UnauthedP2PStream,
};
use reth_primitives::{ForkFilter, Hardfork, PeerId};
use secp256k1::{SecretKey, SECP256K1};
@ -192,9 +193,9 @@ impl SessionManager {
///
/// This will trigger the disconnect on the session task to gracefully terminate. The result
/// will be picked up by the receiver.
pub(crate) fn disconnect(&self, node: PeerId) {
pub(crate) fn disconnect(&self, node: PeerId, reason: Option<DisconnectReason>) {
if let Some(session) = self.active_sessions.get(&node) {
session.disconnect();
session.disconnect(reason);
}
}

View File

@ -10,7 +10,9 @@ use crate::{
},
peers::{PeerAction, PeersManager},
};
use reth_eth_wire::{capability::Capabilities, BlockHashNumber, NewBlockHashes, Status};
use reth_eth_wire::{
capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status,
};
use reth_interfaces::provider::BlockProvider;
use reth_primitives::{PeerId, H256};
use std::{
@ -224,14 +226,14 @@ where
self.queued_messages
.push_back(StateAction::Connect { node_id: peer_id, remote_addr });
}
PeerAction::Disconnect { peer_id } => {
PeerAction::Disconnect { peer_id, reason } => {
self.state_fetcher.on_pending_disconnect(&peer_id);
self.queued_messages.push_back(StateAction::Disconnect { node_id: peer_id });
self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason });
}
PeerAction::DisconnectBannedIncoming { peer_id } => {
// TODO: can IP ban
self.state_fetcher.on_pending_disconnect(&peer_id);
self.queued_messages.push_back(StateAction::Disconnect { node_id: peer_id });
self.queued_messages.push_back(StateAction::Disconnect { peer_id, reason: None });
}
}
}
@ -388,7 +390,11 @@ pub enum StateAction {
/// Create a new connection to the given node.
Connect { remote_addr: SocketAddr, node_id: PeerId },
/// Disconnect an existing connection
Disconnect { node_id: PeerId },
Disconnect {
peer_id: PeerId,
/// Why the disconnect was initiated
reason: Option<DisconnectReason>,
},
}
#[derive(Debug, thiserror::Error)]

View File

@ -8,6 +8,7 @@ use futures::Stream;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
error::EthStreamError,
DisconnectReason,
};
use reth_interfaces::provider::BlockProvider;
use reth_primitives::PeerId;
@ -90,7 +91,9 @@ where
}),
Err(err) => {
match err {
AddSessionError::AtCapacity { peer } => self.sessions.disconnect(peer),
AddSessionError::AtCapacity { peer } => {
self.sessions.disconnect(peer, Some(DisconnectReason::TooManyPeers))
}
};
None
}
@ -146,8 +149,8 @@ where
StateAction::Connect { remote_addr, node_id } => {
self.sessions.dial_outbound(remote_addr, node_id);
}
StateAction::Disconnect { node_id } => {
self.sessions.disconnect(node_id);
StateAction::Disconnect { peer_id: node_id, reason } => {
self.sessions.disconnect(node_id, reason);
}
StateAction::NewBlock { peer_id, block: msg } => {
let msg = PeerMessage::NewBlock(msg);