feat(net): reputation system and peer reporting (#239)

* refactor: move to module

* feat(net): reputation system and peer reporting

* be specific in test

* use saturating sub
This commit is contained in:
Matthias Seitz
2022-11-24 10:14:50 +01:00
committed by GitHub
parent 7c2ee1ba3a
commit 8966daad9d
14 changed files with 424 additions and 87 deletions

View File

@ -23,7 +23,7 @@ use crate::{
listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender},
network::{NetworkHandle, NetworkHandleMessage},
peers::PeersManager,
peers::{PeersManager, ReputationChangeKind},
session::SessionManager,
state::NetworkState,
swarm::{Swarm, SwarmEvent},
@ -188,13 +188,16 @@ where
/// Event hook for an unexpected message from the peer.
fn on_invalid_message(
&self,
&mut self,
peer_id: PeerId,
_capabilities: Arc<Capabilities>,
_message: CapabilityMessage,
) {
trace!(target : "net", ?peer_id, "received unexpected message");
// TODO: disconnect?
self.swarm
.state_mut()
.peers_mut()
.apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
}
/// Sends an event to the [`TransactionsManager`] if configured
@ -235,7 +238,10 @@ where
}
},
Err(_err) => {
// TODO report peer for bad block
self.swarm
.state_mut()
.peers_mut()
.apply_reputation_change(&peer, ReputationChangeKind::BadBlock);
}
}
}
@ -326,6 +332,9 @@ where
NetworkHandleMessage::DisconnectPeer(peer_id) => {
self.swarm.sessions_mut().disconnect(peer_id, None);
}
NetworkHandleMessage::ReputationChange(peer_id, kind) => {
self.swarm.state_mut().peers_mut().apply_reputation_change(&peer_id, kind);
}
}
}
}
@ -380,7 +389,13 @@ where
SwarmEvent::OutgoingTcpConnection { remote_addr } => {
trace!(target : "net", ?remote_addr,"Starting outbound connection.");
}
SwarmEvent::SessionEstablished { peer_id, remote_addr, capabilities, messages } => {
SwarmEvent::SessionEstablished {
peer_id,
remote_addr,
capabilities,
messages,
direction,
} => {
let total_active = this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
trace!(
target : "net",
@ -390,6 +405,10 @@ where
"Session established"
);
if direction.is_incoming() {
this.swarm.state_mut().peers_mut().on_active_session(peer_id, remote_addr);
}
this.event_listeners.send(NetworkEvent::SessionEstablished {
peer_id,
capabilities,
@ -407,11 +426,35 @@ where
"Session disconnected"
);
if error.is_some() {
// If the connection was closed due to an error, we report the peer
this.swarm.state_mut().peers_mut().on_connection_dropped(&peer_id);
} else {
// Gracefully disconnected
this.swarm.state_mut().peers_mut().on_disconnected(&peer_id);
}
this.event_listeners.send(NetworkEvent::SessionClosed { peer_id });
}
SwarmEvent::IncomingPendingSessionClosed { .. } => {}
SwarmEvent::OutgoingPendingSessionClosed { .. } => {}
SwarmEvent::OutgoingConnectionError { .. } => {}
SwarmEvent::OutgoingPendingSessionClosed { peer_id, .. } => {
this.swarm
.state_mut()
.peers_mut()
.apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect);
}
SwarmEvent::OutgoingConnectionError { peer_id, .. } => {
this.swarm
.state_mut()
.peers_mut()
.apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect);
}
SwarmEvent::BadMessage { peer_id } => {
this.swarm
.state_mut()
.peers_mut()
.apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect);
}
}
}

View File

@ -10,7 +10,7 @@ use reth_eth_wire::{
NewBlock, NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts,
Transactions,
};
use reth_interfaces::p2p::error::RequestResult;
use reth_interfaces::p2p::error::{RequestError, RequestResult};
use reth_primitives::{Header, PeerId, Receipt, TransactionSigned, H256};
use std::{
sync::Arc,
@ -126,6 +126,22 @@ pub enum PeerRequest {
// === impl PeerRequest ===
impl PeerRequest {
/// Invoked if we received a response which does not match the request
pub(crate) fn send_bad_response(self) {
self.send_err_response(RequestError::BadResponse)
}
/// Send an error back to the receiver.
pub(crate) fn send_err_response(self, err: RequestError) {
let _ = match self {
PeerRequest::GetBlockHeaders { response, .. } => response.send(Err(err)).ok(),
PeerRequest::GetBlockBodies { response, .. } => response.send(Err(err)).ok(),
PeerRequest::GetPooledTransactions { response, .. } => response.send(Err(err)).ok(),
PeerRequest::GetNodeData { response, .. } => response.send(Err(err)).ok(),
PeerRequest::GetReceipts { response, .. } => response.send(Err(err)).ok(),
};
}
/// Returns the [`EthMessage`] for this type
pub fn create_request_message(&self, request_id: u64) -> EthMessage {
match self {

View File

@ -1,4 +1,9 @@
use crate::{config::NetworkMode, manager::NetworkEvent, message::PeerRequest, peers::PeersHandle};
use crate::{
config::NetworkMode,
manager::NetworkEvent,
message::PeerRequest,
peers::{PeersHandle, ReputationChangeKind},
};
use parking_lot::Mutex;
use reth_eth_wire::{NewBlock, NewPooledTransactionHashes, Transactions};
use reth_primitives::{PeerId, H256};
@ -88,7 +93,12 @@ impl NetworkHandle {
/// Sends a message to the [`NetworkManager`] to disconnect an existing connection to the given
/// peer.
pub fn disconnect_peer(&self, peer: PeerId) {
let _ = self.inner.to_manager_tx.send(NetworkHandleMessage::DisconnectPeer(peer));
self.send_message(NetworkHandleMessage::DisconnectPeer(peer))
}
/// Send a reputation change for the given peer.
pub fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) {
self.send_message(NetworkHandleMessage::ReputationChange(peer_id, kind));
}
/// Sends a [`PeerRequest`] to the given peer's session.
@ -134,4 +144,6 @@ pub(crate) enum NetworkHandleMessage {
/// The request to send to the peer's sessions.
request: PeerRequest,
},
/// Apply a reputation change to the given peer.
ReputationChange(PeerId, ReputationChangeKind),
}

View File

@ -1,3 +1,4 @@
use crate::peers::{reputation::BANNED_REPUTATION, ReputationChangeKind, ReputationChangeWeights};
use futures::StreamExt;
use reth_eth_wire::DisconnectReason;
use reth_primitives::PeerId;
@ -13,18 +14,21 @@ use tokio::{
};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// The reputation value below which new connection from/to peers are rejected.
pub const BANNED_REPUTATION: i32 = 0;
/// The reputation change to apply to a node that dropped the connection.
const REMOTE_DISCONNECT_REPUTATION_CHANGE: i32 = -100;
/// A communication channel to the [`PeersManager`] to apply changes to the peer set.
pub struct PeersHandle {
/// Sender half of command channel back to the [`PeersManager`]
manager_tx: mpsc::UnboundedSender<PeerCommand>,
}
// === impl PeersHandle ===
impl PeersHandle {
/// Send a reputation change for the given peer
pub fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) {
let _ = self.manager_tx.send(PeerCommand::ReputationChange(peer_id, kind));
}
}
/// Maintains the state of _all_ the peers known to the network.
///
/// This is supposed to be owned by the network itself, but can be reached via the [`PeersHandle`].
@ -42,6 +46,8 @@ pub(crate) struct PeersManager {
queued_actions: VecDeque<PeerAction>,
/// Interval for triggering connections if there are free slots.
refill_slots_interval: Interval,
/// How to weigh reputation changes
reputation_weights: ReputationChangeWeights,
/// Tracks current slot stats.
connection_info: ConnectionInfo,
}
@ -49,13 +55,14 @@ pub(crate) struct PeersManager {
impl PeersManager {
/// Create a new instance with the given config
pub(crate) fn new(config: PeersConfig) -> Self {
let PeersConfig { refill_slots_interval, connection_info } = config;
let PeersConfig { refill_slots_interval, connection_info, reputation_weights } = config;
let (manager_tx, handle_rx) = mpsc::unbounded_channel();
Self {
peers: Default::default(),
manager_tx,
handle_rx: UnboundedReceiverStream::new(handle_rx),
queued_actions: Default::default(),
reputation_weights,
refill_slots_interval: tokio::time::interval_at(
Instant::now() + refill_slots_interval,
refill_slots_interval,
@ -94,14 +101,44 @@ impl PeersManager {
self.connection_info.inc_in();
}
/// Called when a session to a peer was disconnected.
///
/// Accepts an additional [`ReputationChange`] value to apply to the peer.
pub(crate) fn on_disconnected(&mut self, peer: PeerId, reputation_change: ReputationChange) {
if let Some(mut peer) = self.peers.get_mut(&peer) {
/// Apply the corresponding reputation change to the given peer
pub(crate) fn apply_reputation_change(&mut self, peer_id: &PeerId, rep: ReputationChangeKind) {
let reputation_change = self.reputation_weights.change(rep);
let should_disconnect = if let Some(mut peer) = self.peers.get_mut(peer_id) {
peer.reputation = peer.reputation.saturating_sub(reputation_change.as_i32());
let should_disconnect = peer.state.is_connected() && peer.is_banned();
if should_disconnect {
peer.state.disconnect();
}
should_disconnect
} else {
false
};
if should_disconnect {
// start the disconnect process
self.queued_actions
.push_back(PeerAction::Disconnect { peer_id: *peer_id, reason: None })
}
}
/// Gracefully disconnected
pub(crate) fn on_disconnected(&mut self, peer_id: &PeerId) {
if let Some(mut peer) = self.peers.get_mut(peer_id) {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
peer.reputation -= reputation_change.0;
}
}
/// Called when a session to a peer was forcefully disconnected.
pub(crate) fn on_connection_dropped(&mut self, peer_id: &PeerId) {
let reputation_change = self.reputation_weights.change(ReputationChangeKind::Dropped);
if let Some(mut peer) = self.peers.get_mut(peer_id) {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
peer.reputation = peer.reputation.saturating_sub(reputation_change.as_i32());
}
}
@ -190,10 +227,13 @@ impl PeersManager {
while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
match cmd {
PeerCommand::Add { peer_id, addr } => {
PeerCommand::Add(peer_id, addr) => {
self.add_discovered_node(peer_id, addr);
}
PeerCommand::Remove(peer) => self.remove_discovered_node(peer),
PeerCommand::ReputationChange(peer_id, rep) => {
self.apply_reputation_change(&peer_id, rep)
}
}
}
@ -232,8 +272,8 @@ impl ConnectionInfo {
fn decr_state(&mut self, state: PeerConnectionState) {
match state {
PeerConnectionState::Idle => {}
PeerConnectionState::In => self.decr_in(),
PeerConnectionState::Out => self.decr_out(),
PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
}
}
@ -288,6 +328,10 @@ enum PeerConnectionState {
/// Not connected currently.
#[default]
Idle,
/// Disconnect of an incoming connection in progress
DisconnectingIn,
/// Disconnect of an outgoing connection in progress
DisconnectingOut,
/// Connected via incoming connection.
In,
/// Connected via outgoing connection.
@ -297,6 +341,16 @@ enum PeerConnectionState {
// === impl PeerConnectionState ===
impl PeerConnectionState {
/// Sets the disconnect state
#[inline]
fn disconnect(&mut self) {
match self {
PeerConnectionState::In => *self = PeerConnectionState::DisconnectingIn,
PeerConnectionState::Out => *self = PeerConnectionState::DisconnectingOut,
_ => {}
}
}
/// Returns whether we're currently connected with this peer
#[inline]
fn is_connected(&self) -> bool {
@ -310,37 +364,16 @@ impl PeerConnectionState {
}
}
/// Represents a change in a peer's reputation.
#[derive(Debug, Copy, Clone, Default)]
pub(crate) struct ReputationChange(i32);
// === impl ReputationChange ===
impl ReputationChange {
/// Apply no reputation change.
pub(crate) const fn none() -> Self {
Self(0)
}
/// Reputation change for a peer that dropped the connection.
pub(crate) const fn dropped() -> Self {
Self(REMOTE_DISCONNECT_REPUTATION_CHANGE)
}
}
/// Commands the [`PeersManager`] listens for.
pub(crate) enum PeerCommand {
/// Command for manually add
Add {
/// Identifier of the peer.
peer_id: PeerId,
/// The address of the peer
addr: SocketAddr,
},
Add(PeerId, SocketAddr),
/// Remove a peer from the set
///
/// If currently connected this will disconnect the sessin
/// If currently connected this will disconnect the session
Remove(PeerId),
/// Apply a reputation change to the given peer.
ReputationChange(PeerId, ReputationChangeKind),
}
/// Actions the peer manager can trigger.
@ -370,6 +403,8 @@ pub struct PeersConfig {
pub refill_slots_interval: Duration,
/// Restrictions on connections
pub connection_info: ConnectionInfo,
/// How to weigh reputation changes
pub reputation_weights: ReputationChangeWeights,
}
impl Default for PeersConfig {
@ -382,6 +417,7 @@ impl Default for PeersConfig {
max_outbound: 70,
max_inbound: 30,
},
reputation_weights: Default::default(),
}
}
}

View File

@ -0,0 +1,9 @@
//! Peer related implementations
mod manager;
mod reputation;
pub(crate) use manager::{PeerAction, PeersManager};
pub use manager::{PeersConfig, PeersHandle};
pub(crate) use reputation::ReputationChange;
pub use reputation::{ReputationChangeKind, ReputationChangeWeights};

View File

@ -0,0 +1,136 @@
//! Peer reputation management
/// The type that tracks the reputation score.
pub(crate) type Reputation = i32;
/// The minimal unit we're measuring reputation
const REPUTATION_UNIT: i32 = -1024;
/// The reputation value below which new connection from/to peers are rejected.
pub(crate) const BANNED_REPUTATION: i32 = 100 * REPUTATION_UNIT;
/// The reputation change to apply to a peer that dropped the connection.
const REMOTE_DISCONNECT_REPUTATION_CHANGE: i32 = 4 * REPUTATION_UNIT;
/// The reputation change to apply to a peer that we failed to connect to.
const FAILED_TO_CONNECT_REPUTATION_CHANGE: i32 = 24 * REPUTATION_UNIT;
/// The reputation change to apply to a peer that failed to respond in time.
const TIMEOUT_REPUTATION_CHANGE: i32 = REPUTATION_UNIT;
/// The reputation change to apply to a peer that sent a bad message.
const BAD_MESSAGE_REPUTATION_CHANGE: i32 = 8 * REPUTATION_UNIT;
/// The reputation change to apply to a peer which violates protocol rules: minimal reputation
const BAD_PROTOCOL_REPUTATION_CHANGE: i32 = i32::MIN;
/// Various kinds of reputation changes.
#[derive(Debug, Copy, Clone)]
pub enum ReputationChangeKind {
/// Received an unspecific bad message from the peer
BadMessage,
/// Peer sent a bad block.
///
/// Note: this will we only used in pre-merge, pow consensus, since after no more block announcements are sent via devp2p: [EIP-3675](https://eips.ethereum.org/EIPS/eip-3675#devp2p)
BadBlock,
/// Peer sent a bad transaction messages. E.g. Transactions which weren't recoverable.
BadTransactions,
/// Peer failed to respond in time.
Timeout,
/// Peer does not adhere to network protocol rules.
BadProtocol,
/// Failed to establish a connection to the peer.
FailedToConnect,
/// Connection dropped by peer.
Dropped,
/// Apply a reputation change by value
Other(Reputation),
}
/// How the [`ReputationChangeKind`] are weighted.
#[derive(Debug, Clone)]
pub struct ReputationChangeWeights {
/// Weight for [`ReputationChangeKind::BadMessage`]
pub bad_message: Reputation,
/// Weight for [`ReputationChangeKind::BadBlock`]
pub bad_block: Reputation,
/// Weight for [`ReputationChangeKind::BadTransactions`]
pub bad_transactions: Reputation,
/// Weight for [`ReputationChangeKind::Timeout`]
pub timeout: Reputation,
/// Weight for [`ReputationChangeKind::BadProtocol`]
pub bad_protocol: Reputation,
/// Weight for [`ReputationChangeKind::FailedToConnect`]
pub failed_to_connect: Reputation,
/// Weight for [`ReputationChangeKind::Dropped`]
pub dropped: Reputation,
}
// === impl ReputationChangeWeights ===
impl ReputationChangeWeights {
/// Returns the quantifiable [`ReputationChange`] for the given [`ReputationChangeKind`] using
/// the configured weights
pub(crate) fn change(&self, kind: ReputationChangeKind) -> ReputationChange {
match kind {
ReputationChangeKind::BadMessage => self.bad_message.into(),
ReputationChangeKind::BadBlock => self.bad_block.into(),
ReputationChangeKind::BadTransactions => self.bad_transactions.into(),
ReputationChangeKind::Timeout => self.timeout.into(),
ReputationChangeKind::BadProtocol => self.bad_protocol.into(),
ReputationChangeKind::FailedToConnect => self.failed_to_connect.into(),
ReputationChangeKind::Dropped => self.dropped.into(),
ReputationChangeKind::Other(val) => val.into(),
}
}
}
impl Default for ReputationChangeWeights {
fn default() -> Self {
Self {
bad_block: BAD_MESSAGE_REPUTATION_CHANGE,
bad_transactions: BAD_MESSAGE_REPUTATION_CHANGE,
bad_message: BAD_MESSAGE_REPUTATION_CHANGE,
timeout: TIMEOUT_REPUTATION_CHANGE,
bad_protocol: BAD_PROTOCOL_REPUTATION_CHANGE,
failed_to_connect: FAILED_TO_CONNECT_REPUTATION_CHANGE,
dropped: REMOTE_DISCONNECT_REPUTATION_CHANGE,
}
}
}
/// Represents a change in a peer's reputation.
#[derive(Debug, Copy, Clone, Default)]
pub(crate) struct ReputationChange(Reputation);
// === impl ReputationChange ===
impl ReputationChange {
/// Apply no reputation change.
pub(crate) const fn none() -> Self {
Self(0)
}
/// Reputation change for a peer that dropped the connection.
pub(crate) const fn dropped() -> Self {
Self(REMOTE_DISCONNECT_REPUTATION_CHANGE)
}
/// Helper type for easier conversion
#[inline]
pub(crate) fn as_i32(self) -> Reputation {
self.0
}
}
impl From<ReputationChange> for Reputation {
fn from(value: ReputationChange) -> Self {
value.0
}
}
impl From<Reputation> for ReputationChange {
fn from(value: Reputation) -> Self {
ReputationChange(value)
}
}

View File

@ -110,17 +110,18 @@ impl ActiveSession {
/// Processes a response received from the peer
macro_rules! on_response {
($resp:ident, $item:ident) => {
($this:ident, $resp:ident, $item:ident) => {
let RequestPair { request_id, message } = $resp;
#[allow(clippy::collapsible_match)]
if let Some(resp) = self.inflight_requests.remove(&request_id) {
if let PeerRequest::$item { response, .. } = resp {
let _ = response.send(Ok(message));
} else {
// TODO handle bad response
resp.send_bad_response();
$this.on_bad_message();
}
} else {
// TODO handle unexpected response
$this.on_bad_message()
}
};
}
@ -147,31 +148,31 @@ impl ActiveSession {
on_request!(req, BlockHeaders, GetBlockHeaders);
}
EthMessage::BlockHeaders(resp) => {
on_response!(resp, GetBlockHeaders);
on_response!(self, resp, GetBlockHeaders);
}
EthMessage::GetBlockBodies(req) => {
on_request!(req, BlockBodies, GetBlockBodies);
}
EthMessage::BlockBodies(resp) => {
on_response!(resp, GetBlockBodies);
on_response!(self, resp, GetBlockBodies);
}
EthMessage::GetPooledTransactions(req) => {
on_request!(req, PooledTransactions, GetPooledTransactions);
}
EthMessage::PooledTransactions(resp) => {
on_response!(resp, GetPooledTransactions);
on_response!(self, resp, GetPooledTransactions);
}
EthMessage::GetNodeData(req) => {
on_request!(req, NodeData, GetNodeData);
}
EthMessage::NodeData(resp) => {
on_response!(resp, GetNodeData);
on_response!(self, resp, GetNodeData);
}
EthMessage::GetReceipts(req) => {
on_request!(req, Receipts, GetReceipts);
}
EthMessage::Receipts(resp) => {
on_response!(resp, GetReceipts);
on_response!(self, resp, GetReceipts);
}
};
@ -241,6 +242,13 @@ impl ActiveSession {
.try_send(ActiveSessionMessage::ValidMessage { peer_id: self.remote_peer_id, message })
}
/// Notify the manager that the peer sent a bad message
fn on_bad_message(&self) {
let _ = self
.to_session
.try_send(ActiveSessionMessage::BadMessage { peer_id: self.remote_peer_id });
}
/// Report back that this session has been closed.
fn emit_disconnect(&self) {
// NOTE: we clone here so there's enough capacity to deliver this message
@ -514,8 +522,8 @@ mod tests {
remote_addr,
peer_id,
capabilities,
status: _,
conn,
..
} => {
let (_to_session_tx, messages_rx) = mpsc::channel(10);
let (commands_to_session, commands_rx) = mpsc::channel(10);

View File

@ -72,6 +72,7 @@ pub(crate) enum PendingSessionEvent {
capabilities: Arc<Capabilities>,
status: Status,
conn: EthStream<P2PStream<ECIESStream<TcpStream>>>,
direction: Direction,
},
/// Handshake unsuccessful, session was disconnected.
Disconnected {
@ -88,7 +89,12 @@ pub(crate) enum PendingSessionEvent {
error: io::Error,
},
/// Thrown when authentication via Ecies failed.
EciesAuthError { remote_addr: SocketAddr, session_id: SessionId, error: ECIESError },
EciesAuthError {
remote_addr: SocketAddr,
session_id: SessionId,
error: ECIESError,
direction: Direction,
},
}
/// Commands that can be sent to the spawned session.
@ -132,4 +138,9 @@ pub(crate) enum ActiveSessionMessage {
/// Message received from the peer.
message: CapabilityMessage,
},
/// Received a bad message from the peer.
BadMessage {
/// Identifier of the remote peer.
peer_id: PeerId,
},
}

View File

@ -246,6 +246,9 @@ impl SessionManager {
ActiveSessionMessage::InvalidMessage { peer_id, capabilities, message } => {
Poll::Ready(SessionEvent::InvalidMessage { peer_id, message, capabilities })
}
ActiveSessionMessage::BadMessage { peer_id } => {
Poll::Ready(SessionEvent::BadMessage { peer_id })
}
}
}
}
@ -273,6 +276,7 @@ impl SessionManager {
capabilities,
conn,
status,
direction,
} => {
// move from pending to established.
let _ = self.pending_sessions.remove(&session_id);
@ -317,6 +321,7 @@ impl SessionManager {
capabilities,
status,
messages,
direction,
})
}
PendingSessionEvent::Disconnected { remote_addr, session_id, direction, error } => {
@ -358,12 +363,18 @@ impl SessionManager {
"connection refused"
);
let _ = self.pending_sessions.remove(&session_id);
return Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
return Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
remote_addr,
peer_id,
error: None,
})
}
PendingSessionEvent::EciesAuthError { remote_addr, session_id, error } => {
PendingSessionEvent::EciesAuthError {
remote_addr,
session_id,
error,
direction,
} => {
let _ = self.pending_sessions.remove(&session_id);
warn!(
target : "net::session",
@ -373,10 +384,21 @@ impl SessionManager {
"ecies auth failed"
);
let _ = self.pending_sessions.remove(&session_id);
return Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
remote_addr,
error: None,
})
return match direction {
Direction::Incoming => {
Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
remote_addr,
error: None,
})
}
Direction::Outgoing(peer_id) => {
Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
remote_addr,
peer_id,
error: None,
})
}
}
}
}
}
@ -431,6 +453,7 @@ pub(crate) enum SessionEvent {
capabilities: Arc<Capabilities>,
status: Status,
messages: PeerRequestSender,
direction: Direction,
},
/// A session received a valid message via RLPx.
ValidMessage {
@ -446,6 +469,11 @@ pub(crate) enum SessionEvent {
/// Message received from the peer.
message: CapabilityMessage,
},
/// Received a bad message from the peer.
BadMessage {
/// Identifier of the remote peer.
peer_id: PeerId,
},
/// Closed an incoming pending session during authentication.
IncomingPendingSessionClosed { remote_addr: SocketAddr, error: Option<EthStreamError> },
/// Closed an outgoing pending session during authentication.
@ -555,6 +583,13 @@ pub(crate) enum Direction {
Outgoing(PeerId),
}
impl Direction {
/// Returns `true` if this an incoming connection.
pub(crate) fn is_incoming(&self) -> bool {
matches!(self, Direction::Incoming)
}
}
async fn authenticate(
disconnect_rx: oneshot::Receiver<()>,
events: mpsc::Sender<PendingSessionEvent>,
@ -572,7 +607,12 @@ async fn authenticate(
Ok(stream) => stream,
Err(error) => {
let _ = events
.send(PendingSessionEvent::EciesAuthError { remote_addr, session_id, error })
.send(PendingSessionEvent::EciesAuthError {
remote_addr,
session_id,
error,
direction,
})
.await;
return
}
@ -586,6 +626,7 @@ async fn authenticate(
remote_addr,
session_id,
error,
direction,
})
.await;
return
@ -669,5 +710,6 @@ async fn authenticate_stream(
capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
status: their_status,
conn: eth_stream,
direction,
}
}

View File

@ -80,6 +80,11 @@ where
}
}
/// Returns mutable access to the [`PeersManager`]
pub(crate) fn peers_mut(&mut self) -> &mut PeersManager {
&mut self.peers_manager
}
/// How many peers we're currently connected to.
pub fn num_connected_peers(&self) -> usize {
self.connected_peers.len()

View File

@ -1,7 +1,7 @@
use crate::{
listener::{ConnectionListener, ListenerEvent},
message::{PeerMessage, PeerRequestSender},
session::{SessionEvent, SessionId, SessionManager},
session::{Direction, SessionEvent, SessionId, SessionManager},
state::{AddSessionError, NetworkState, StateAction},
};
use futures::Stream;
@ -29,7 +29,7 @@ use tracing::warn;
/// [`SessionsManager`]. Outgoing connections are either initiated on demand or triggered by the
/// [`NetworkState`] and also delegated to the [`NetworkState`].
#[must_use = "Swarm does nothing unless polled"]
pub struct Swarm<C> {
pub(crate) struct Swarm<C> {
/// Listens for new incoming connections.
incoming: ConnectionListener,
/// All sessions.
@ -86,6 +86,7 @@ where
capabilities,
status,
messages,
direction,
} => match self.state.on_session_activated(
peer_id,
capabilities.clone(),
@ -97,13 +98,15 @@ where
remote_addr,
capabilities,
messages,
direction,
}),
Err(err) => {
match err {
AddSessionError::AtCapacity { peer } => {
self.sessions.disconnect(peer, Some(DisconnectReason::TooManyPeers))
self.sessions.disconnect(peer, Some(DisconnectReason::TooManyPeers));
}
};
self.state.peers_mut().on_disconnected(&peer_id);
None
}
},
@ -125,14 +128,12 @@ where
}
SessionEvent::SessionClosedOnConnectionError { peer_id, remote_addr, error } => {
self.state.on_session_closed(peer_id);
// TODO(mattsse): reputation change on error
Some(SwarmEvent::SessionClosed { peer_id, remote_addr, error: Some(error) })
}
SessionEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error })
}
SessionEvent::BadMessage { peer_id } => Some(SwarmEvent::BadMessage { peer_id }),
}
}
@ -232,7 +233,7 @@ where
/// All events created or delegated by the [`Swarm`] that represents changes to the state of the
/// network.
pub enum SwarmEvent {
pub(crate) enum SwarmEvent {
/// Events related to the actual network protocol.
ValidMessage {
/// The peer that sent the message
@ -248,6 +249,11 @@ pub enum SwarmEvent {
/// Message received from the peer.
message: CapabilityMessage,
},
/// Received a bad message from the peer.
BadMessage {
/// Identifier of the remote peer.
peer_id: PeerId,
},
/// The underlying tcp listener closed.
TcpListenerClosed {
/// Address of the closed listener.
@ -275,6 +281,7 @@ pub enum SwarmEvent {
remote_addr: SocketAddr,
capabilities: Arc<Capabilities>,
messages: PeerRequestSender,
direction: Direction,
},
SessionClosed {
peer_id: PeerId,

View File

@ -5,6 +5,7 @@ use crate::{
manager::NetworkEvent,
message::{PeerRequest, PeerRequestSender},
network::NetworkHandleMessage,
peers::ReputationChangeKind,
NetworkHandle,
};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
@ -230,13 +231,14 @@ where
/// Starts the import process for the given transactions.
fn import_transactions(&mut self, peer_id: PeerId, transactions: Vec<TransactionSigned>) {
let mut has_bad_transactions = false;
if let Some(peer) = self.peers.get_mut(&peer_id) {
for tx in transactions {
// recover transaction
let tx = if let Some(tx) = tx.into_ecrecovered() {
tx
} else {
// TODO: report peer?
has_bad_transactions = true;
continue
};
@ -263,17 +265,25 @@ where
}
}
}
}
fn on_good_import(&mut self, hash: TxHash) {
if let Some(_peers) = self.transactions_by_peers.remove(&hash) {
// TODO report good peer?
if has_bad_transactions {
self.report_bad_message(peer_id);
}
}
fn report_bad_message(&self, peer_id: PeerId) {
self.network.reputation_change(peer_id, ReputationChangeKind::BadTransactions);
}
fn on_good_import(&mut self, hash: TxHash) {
self.transactions_by_peers.remove(&hash);
}
fn on_bad_import(&mut self, hash: TxHash) {
if let Some(_peers) = self.transactions_by_peers.remove(&hash) {
// TODO report bad peer?
if let Some(peers) = self.transactions_by_peers.remove(&hash) {
for peer_id in peers {
self.report_bad_message(peer_id);
}
}
}
}
@ -329,10 +339,10 @@ where
this.import_transactions(req.peer_id, txs.0);
}
Poll::Ready(Ok(Err(_))) => {
// TODO report bad peer
this.report_bad_message(req.peer_id);
}
Poll::Ready(Err(_)) => {
// TODO report bad peer
this.report_bad_message(req.peer_id);
}
}
}

View File

@ -42,7 +42,7 @@ async fn test_establish_connections() {
let net = handle.terminate().await;
net.for_each(|peer| {
assert!(peer.num_peers() >= 1);
});
assert_eq!(net.peers()[0].num_peers(), 2);
assert_eq!(net.peers()[1].num_peers(), 1);
assert_eq!(net.peers()[2].num_peers(), 1);
}