feat(net): impl peer management (#194)

This commit is contained in:
Matthias Seitz
2022-11-12 08:41:15 +01:00
committed by GitHub
parent 9575eb89fb
commit 139efee599
2 changed files with 251 additions and 35 deletions

View File

@ -1,6 +1,5 @@
use reth_discv4::NodeId;
use futures::StreamExt;
use reth_discv4::NodeId;
use std::{
collections::{hash_map::Entry, HashMap, VecDeque},
net::SocketAddr,
@ -13,8 +12,15 @@ use tokio::{
};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// The reputation value below which new connection from/to peers are rejected.
pub const BANNED_REPUTATION: i32 = 0;
/// The reputation change to apply to a node that dropped the connection.
const REMOTE_DISCONNECT_REPUTATION_CHANGE: i32 = -100;
/// A communication channel to the [`PeersManager`] to apply changes to the peer set.
pub struct PeersHandle {
/// Sender half of command channel back to the [`PeersManager`]
manager_tx: mpsc::UnboundedSender<PeerCommand>,
}
@ -26,13 +32,13 @@ pub struct PeersHandle {
/// The [`PeersManager`] will be notified on peer related changes
pub(crate) struct PeersManager {
/// All peers known to the network
peers: HashMap<NodeId, Node>,
peers: HashMap<NodeId, Peer>,
/// Copy of the receiver half, so new [`PeersHandle`] can be created on demand.
manager_tx: mpsc::UnboundedSender<PeerCommand>,
/// Receiver half of the command channel.
handle_rx: UnboundedReceiverStream<PeerCommand>,
/// Buffered actions until the manager is polled.
actions: VecDeque<PeerAction>,
queued_actions: VecDeque<PeerAction>,
/// Interval for triggering connections if there are free slots.
refill_slots_interval: Interval,
/// Tracks current slot stats.
@ -48,7 +54,7 @@ impl PeersManager {
peers: Default::default(),
manager_tx,
handle_rx: UnboundedReceiverStream::new(handle_rx),
actions: Default::default(),
queued_actions: Default::default(),
refill_slots_interval: tokio::time::interval_at(
Instant::now() + refill_slots_interval,
refill_slots_interval,
@ -57,27 +63,116 @@ impl PeersManager {
}
}
/// Returns a new [`PeersHandle`] that can send commands to this type
/// Returns a new [`PeersHandle`] that can send commands to this type.
pub(crate) fn handle(&self) -> PeersHandle {
PeersHandle { manager_tx: self.manager_tx.clone() }
}
pub(crate) fn add_discovered_node(&mut self, node: NodeId, addr: SocketAddr) {
match self.peers.entry(node) {
Entry::Occupied(_) => {}
/// Called when a new _incoming_ active session was established to the given peer.
///
/// This will update the state of the peer if not yet tracked.
///
/// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will
/// be scheduled.
pub(crate) fn on_active_session(&mut self, peer_id: NodeId, addr: SocketAddr) {
match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
let value = entry.get_mut();
if value.is_banned() {
self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
return
}
value.state = PeerConnectionState::In;
}
Entry::Vacant(entry) => {
entry.insert(Node::new(addr));
entry.insert(Peer::with_state(addr, PeerConnectionState::In));
}
}
// keep track of new connection
self.connection_info.inc_in();
}
/// Called when a session to a peer was disconnected.
///
/// Accepts an additional [`ReputationChange`] value to apply to the peer.
pub(crate) fn on_disconnected(&mut self, peer: NodeId, reputation_change: ReputationChange) {
if let Some(mut peer) = self.peers.get_mut(&peer) {
self.connection_info.decr_state(peer.state);
peer.state = PeerConnectionState::Idle;
peer.reputation -= reputation_change.0;
}
}
/// Called for a newly discovered peer.
///
/// If the peer already exists, then the address will e updated. If the addresses differ, the
/// old address is returned
pub(crate) fn add_discovered_node(&mut self, peer_id: NodeId, addr: SocketAddr) {
match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
let node = entry.get_mut();
node.addr = addr;
}
Entry::Vacant(entry) => {
entry.insert(Peer::new(addr));
}
}
}
pub(crate) fn remove_discovered_node(&mut self, _node: NodeId) {}
/// Removes the tracked node from the set.
pub(crate) fn remove_discovered_node(&mut self, peer_id: NodeId) {
if let Some(entry) = self.peers.remove(&peer_id) {
if entry.state.is_connected() {
self.connection_info.decr_state(entry.state);
self.queued_actions.push_back(PeerAction::Disconnect { peer_id })
}
}
}
/// Returns the idle peer with the highest reputation.
///
/// Returns `None` if no peer is available.
fn best_unconnected(&mut self) -> Option<(NodeId, &mut Peer)> {
self.peers
.iter_mut()
.filter(|(_, peer)| peer.state.is_unconnected())
.fold(None::<(&NodeId, &mut Peer)>, |mut best_peer, candidate| {
if let Some(best_peer) = best_peer.take() {
if best_peer.1.reputation >= candidate.1.reputation {
return Some(best_peer)
}
}
Some(candidate)
})
.map(|(id, peer)| (*id, peer))
}
/// If there's capacity for new outbound connections, this will queue new
/// [`PeerAction::Connect`] actions.
///
/// New connections are only initiated, if slots are available and appropriate peers are
/// available.
fn fill_outbound_slots(&mut self) {
// This checks if there are free slots for new outbound connections available that can be
// filled
// as long as there a slots available try to fill them with the best peers
while self.connection_info.has_out_capacity() {
let action = {
let (peer_id, peer) = match self.best_unconnected() {
Some(peer) => peer,
_ => break,
};
// If best peer does not meet reputation threshold exit immediately.
if peer.is_banned() {
break
}
peer.state = PeerConnectionState::Out;
PeerAction::Connect { peer_id, remote_addr: peer.addr }
};
self.connection_info.inc_out();
self.queued_actions.push_back(action);
}
}
/// Advances the state.
@ -87,15 +182,25 @@ impl PeersManager {
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PeerAction> {
loop {
// drain buffered actions
if let Some(action) = self.actions.pop_front() {
if let Some(action) = self.queued_actions.pop_front() {
return Poll::Ready(action)
}
while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
match cmd {
PeerCommand::Add { peer_id, addr } => {
self.add_discovered_node(peer_id, addr);
}
PeerCommand::Remove(peer) => self.remove_discovered_node(peer),
}
}
if self.refill_slots_interval.poll_tick(cx).is_ready() {
self.fill_outbound_slots();
}
while let Poll::Ready(Some(_cmd)) = self.handle_rx.poll_next_unpin(cx) {
// TODO handle incoming command
if self.queued_actions.is_empty() {
return Poll::Pending
}
}
}
@ -114,27 +219,126 @@ pub struct ConnectionInfo {
max_inbound: usize,
}
/// Tracks info about a single node.
struct Node {
/// Where to reach the node
addr: SocketAddr,
/// Reputation of the node.
reputation: i32,
// === impl ConnectionInfo ===
impl ConnectionInfo {
/// Returns `true` if there's still capacity for a new outgoing connection.
fn has_out_capacity(&self) -> bool {
self.num_outbound < self.max_outbound
}
fn decr_state(&mut self, state: PeerConnectionState) {
match state {
PeerConnectionState::Idle => {}
PeerConnectionState::In => self.decr_in(),
PeerConnectionState::Out => self.decr_out(),
}
}
fn decr_out(&mut self) {
self.num_outbound -= 1;
}
fn inc_out(&mut self) {
self.num_outbound += 1;
}
fn inc_in(&mut self) {
self.num_inbound += 1;
}
fn decr_in(&mut self) {
self.num_inbound -= 1;
}
}
// === impl Node ===
/// Tracks info about a single peer.
struct Peer {
/// Where to reach the peer
addr: SocketAddr,
/// Reputation of the peer.
reputation: i32,
/// The state of the connection, if any.
state: PeerConnectionState,
}
impl Node {
// === impl Peer ===
impl Peer {
fn new(addr: SocketAddr) -> Self {
Self { addr, reputation: 0 }
Self::with_state(addr, Default::default())
}
fn with_state(addr: SocketAddr, state: PeerConnectionState) -> Self {
Self { addr, state, reputation: 0 }
}
/// Returns true if the peer's reputation is below the banned threshold.
#[inline]
fn is_banned(&self) -> bool {
self.reputation < BANNED_REPUTATION
}
}
/// Represents the kind of connection established to the peer, if any
#[derive(Debug, Clone, Copy, Default)]
enum PeerConnectionState {
/// Not connected currently.
#[default]
Idle,
/// Connected via incoming connection.
In,
/// Connected via outgoing connection.
Out,
}
// === impl PeerConnectionState ===
impl PeerConnectionState {
/// Returns whether we're currently connected with this peer
#[inline]
fn is_connected(&self) -> bool {
matches!(self, PeerConnectionState::In | PeerConnectionState::Out)
}
/// Returns if there's currently no connection to that peer.
#[inline]
fn is_unconnected(&self) -> bool {
matches!(self, PeerConnectionState::Idle)
}
}
/// Represents a change in a peer's reputation.
#[derive(Debug, Copy, Clone, Default)]
pub(crate) struct ReputationChange(i32);
// === impl ReputationChange ===
impl ReputationChange {
/// Apply no reputation change.
pub(crate) const fn none() -> Self {
Self(0)
}
/// Reputation change for a peer that dropped the connection.
pub(crate) const fn dropped() -> Self {
Self(REMOTE_DISCONNECT_REPUTATION_CHANGE)
}
}
/// Commands the [`PeersManager`] listens for.
pub enum PeerCommand {
Add(NodeId),
pub(crate) enum PeerCommand {
/// Command for manually add
Add {
/// Identifier of the peer.
peer_id: NodeId,
/// The address of the peer
addr: SocketAddr,
},
/// Remove a peer from the set
///
/// If currently connected this will disconnect the sessin
Remove(NodeId),
// TODO reputation change
}
/// Actions the peer manager can trigger.
@ -143,12 +347,18 @@ pub enum PeerAction {
/// Start a new connection to a peer.
Connect {
/// The peer to connect to.
node_id: NodeId,
peer_id: NodeId,
/// Where to reach the node
remote_addr: SocketAddr,
},
/// Disconnect an existing connection.
Disconnect { node_id: NodeId },
Disconnect { peer_id: NodeId },
/// Disconnect an existing incoming connection, because the peers reputation is below the
/// banned threshold.
DisconnectBannedIncoming {
/// Peer id of the established connection.
peer_id: NodeId,
},
}
/// Config type for initiating a [`PeersManager`] instance

View File

@ -132,12 +132,18 @@ where
/// Event hook for new actions derived from the peer management set.
fn on_peer_action(&mut self, action: PeerAction) {
match action {
PeerAction::Connect { node_id, remote_addr } => {
self.queued_messages.push_back(StateAction::Connect { node_id, remote_addr });
PeerAction::Connect { peer_id, remote_addr } => {
self.queued_messages
.push_back(StateAction::Connect { node_id: peer_id, remote_addr });
}
PeerAction::Disconnect { node_id } => {
self.state_fetcher.on_pending_disconnect(&node_id);
self.queued_messages.push_back(StateAction::Disconnect { node_id });
PeerAction::Disconnect { peer_id } => {
self.state_fetcher.on_pending_disconnect(&peer_id);
self.queued_messages.push_back(StateAction::Disconnect { node_id: peer_id });
}
PeerAction::DisconnectBannedIncoming { peer_id } => {
// TODO: can IP ban
self.state_fetcher.on_pending_disconnect(&peer_id);
self.queued_messages.push_back(StateAction::Disconnect { node_id: peer_id });
}
}
}