feat: support admin_peers (#4435)

This commit is contained in:
Matthias Seitz
2023-08-31 13:36:50 -07:00
committed by GitHub
parent d8a7ee2eb4
commit a76da98316
11 changed files with 177 additions and 90 deletions

View File

@ -113,6 +113,11 @@ impl<S> MeteredStream<S> {
pub fn get_bandwidth_meter(&self) -> &BandwidthMeter {
&self.meter
}
/// Returns the wrapped stream
pub fn inner(&self) -> &S {
&self.inner
}
}
impl<Stream: AsyncRead> AsyncRead for MeteredStream<Stream> {

View File

@ -5,6 +5,7 @@ use reth_codecs::add_arbitrary_tests;
use reth_primitives::bytes::{BufMut, Bytes};
use reth_rlp::{Decodable, DecodeError, Encodable, RlpDecodable, RlpEncodable};
use smol_str::SmolStr;
use std::fmt;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
@ -72,6 +73,12 @@ impl Capability {
}
}
impl fmt::Display for Capability {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}", self.name, self.version)
}
}
#[cfg(any(test, feature = "arbitrary"))]
impl<'a> arbitrary::Arbitrary<'a> for Capability {
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {

View File

@ -75,6 +75,11 @@ impl<S> UnauthedP2PStream<S> {
pub fn new(inner: S) -> Self {
Self { inner }
}
/// Returns a reference to the inner stream.
pub fn inner(&self) -> &S {
&self.inner
}
}
impl<S> UnauthedP2PStream<S>
@ -242,6 +247,11 @@ impl<S> P2PStream<S> {
}
}
/// Returns a reference to the inner stream.
pub fn inner(&self) -> &S {
&self.inner
}
/// Sets a custom outgoing message buffer capacity.
///
/// # Panics

View File

@ -19,13 +19,14 @@
//!
//! - `serde` (default): Enable serde support
use async_trait::async_trait;
use reth_eth_wire::DisconnectReason;
use reth_eth_wire::{DisconnectReason, EthVersion, Status};
use reth_primitives::{NodeRecord, PeerId};
use reth_rpc_types::NetworkStatus;
use std::net::SocketAddr;
use std::{net::SocketAddr, sync::Arc};
pub use error::NetworkError;
pub use reputation::{Reputation, ReputationChangeKind};
use reth_eth_wire::capability::Capabilities;
/// Network Error
pub mod error;
@ -81,6 +82,9 @@ pub trait Peers: PeersInfo {
/// Adds a peer to the known peer set, with the given kind.
fn add_peer_kind(&self, peer: PeerId, kind: PeerKind, addr: SocketAddr);
/// Returns the rpc [PeerInfo] for all connected peers.
async fn get_peers(&self) -> Result<Vec<PeerInfo>, NetworkError>;
/// Removes a peer from the peer set that corresponds to given kind.
fn remove_peer(&self, peer: PeerId, kind: PeerKind);
@ -106,3 +110,54 @@ pub enum PeerKind {
/// Trusted peer.
Trusted,
}
/// Info about an active peer session.
#[derive(Debug, Clone)]
pub struct PeerInfo {
/// Announced capabilities of the peer
pub capabilities: Arc<Capabilities>,
/// The identifier of the remote peer
pub remote_id: PeerId,
/// The client's name and version
pub client_version: Arc<String>,
/// The peer's address we're connected to
pub remote_addr: SocketAddr,
/// The local address of the connection
pub local_addr: Option<SocketAddr>,
/// The direction of the session
pub direction: Direction,
/// The negotiated eth version.
pub eth_version: EthVersion,
/// The Status message the peer sent for the `eth` handshake
pub status: Status,
}
/// The direction of the connection.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Direction {
/// Incoming connection.
Incoming,
/// Outgoing connection to a specific node.
Outgoing(PeerId),
}
impl Direction {
/// Returns `true` if this an incoming connection.
pub fn is_incoming(&self) -> bool {
matches!(self, Direction::Incoming)
}
/// Returns `true` if this an outgoing connection.
pub fn is_outgoing(&self) -> bool {
matches!(self, Direction::Outgoing(_))
}
}
impl std::fmt::Display for Direction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Direction::Incoming => write!(f, "incoming"),
Direction::Outgoing(_) => write!(f, "outgoing"),
}
}
}

View File

@ -4,7 +4,8 @@
//! generic over it.
use crate::{
NetworkError, NetworkInfo, PeerKind, Peers, PeersInfo, Reputation, ReputationChangeKind,
NetworkError, NetworkInfo, PeerInfo, PeerKind, Peers, PeersInfo, Reputation,
ReputationChangeKind,
};
use async_trait::async_trait;
use reth_discv4::DEFAULT_DISCOVERY_PORT;
@ -66,6 +67,10 @@ impl PeersInfo for NoopNetwork {
impl Peers for NoopNetwork {
fn add_peer_kind(&self, _peer: PeerId, _kind: PeerKind, _addr: SocketAddr) {}
async fn get_peers(&self) -> Result<Vec<PeerInfo>, NetworkError> {
Ok(vec![])
}
fn remove_peer(&self, _peer: PeerId, _kind: PeerKind) {}
fn disconnect_peer(&self, _peer: PeerId) {}

View File

@ -1,6 +1,6 @@
use crate::{
config::NetworkMode, discovery::DiscoveryEvent, manager::NetworkEvent, message::PeerRequest,
peers::PeersHandle, session::PeerInfo, FetchClient,
peers::PeersHandle, FetchClient,
};
use async_trait::async_trait;
use parking_lot::Mutex;
@ -8,7 +8,8 @@ use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, Shar
use reth_interfaces::sync::{NetworkSyncUpdater, SyncState, SyncStateProvider};
use reth_net_common::bandwidth_meter::BandwidthMeter;
use reth_network_api::{
NetworkError, NetworkInfo, PeerKind, Peers, PeersInfo, Reputation, ReputationChangeKind,
NetworkError, NetworkInfo, PeerInfo, PeerKind, Peers, PeersInfo, Reputation,
ReputationChangeKind,
};
use reth_primitives::{Head, NodeRecord, PeerId, TransactionSigned, H256};
use reth_rpc_types::NetworkStatus;
@ -102,13 +103,6 @@ impl NetworkHandle {
rx.await
}
/// Returns [`PeerInfo`] for all connected peers
pub async fn get_peers(&self) -> Result<Vec<PeerInfo>, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetPeerInfo(tx));
rx.await
}
/// Returns [`PeerInfo`] for a given peer.
///
/// Returns `None` if there's no active session to the peer.
@ -209,6 +203,12 @@ impl Peers for NetworkHandle {
self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr));
}
async fn get_peers(&self) -> Result<Vec<PeerInfo>, NetworkError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetPeerInfo(tx));
Ok(rx.await?)
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to remove a peer from the
/// set corresponding to given kind.
fn remove_peer(&self, peer: PeerId, kind: PeerKind) {

View File

@ -10,6 +10,7 @@ use reth_eth_wire::{
DisconnectReason, EthStream, EthVersion, P2PStream, Status,
};
use reth_net_common::bandwidth_meter::MeteredStream;
use reth_network_api::PeerInfo;
use reth_primitives::PeerId;
use std::{io, net::SocketAddr, sync::Arc, time::Instant};
use tokio::{
@ -53,7 +54,6 @@ impl PendingSessionHandle {
/// Within an active session that supports the `Ethereum Wire Protocol `, three high-level tasks can
/// be performed: chain synchronization, block propagation and transaction exchange.
#[derive(Debug)]
#[allow(unused)]
pub struct ActiveSessionHandle {
/// The direction of the session
pub(crate) direction: Direction,
@ -73,6 +73,10 @@ pub struct ActiveSessionHandle {
pub(crate) client_version: Arc<String>,
/// The address we're connected to
pub(crate) remote_addr: SocketAddr,
/// The local address of the connection.
pub(crate) local_addr: Option<SocketAddr>,
/// The Status message the peer sent for the `eth` handshake
pub(crate) status: Status,
}
// === impl ActiveSessionHandle ===
@ -132,21 +136,20 @@ impl ActiveSessionHandle {
pub fn remote_addr(&self) -> SocketAddr {
self.remote_addr
}
}
/// Info about an active peer session.
#[derive(Debug, Clone)]
pub struct PeerInfo {
/// Announced capabilities of the peer
pub capabilities: Arc<Capabilities>,
/// The identifier of the remote peer
pub remote_id: PeerId,
/// The client's name and version
pub client_version: Arc<String>,
/// The address we're connected to
pub remote_addr: SocketAddr,
/// The direction of the session
pub direction: Direction,
/// Extracts the [PeerInfo] from the session handle.
pub(crate) fn peer_info(&self) -> PeerInfo {
PeerInfo {
remote_id: self.remote_id,
direction: self.direction,
remote_addr: self.remote_addr,
local_addr: self.local_addr,
capabilities: self.capabilities.clone(),
client_version: self.client_version.clone(),
eth_version: self.version,
status: self.status,
}
}
}
/// Events a pending session can produce.
@ -162,6 +165,8 @@ pub enum PendingSessionEvent {
session_id: SessionId,
/// The remote node's socket address
remote_addr: SocketAddr,
/// The local address of the connection
local_addr: Option<SocketAddr>,
/// The remote node's public key
peer_id: PeerId,
/// All capabilities the peer announced

View File

@ -4,7 +4,6 @@ use crate::{
metrics::SessionManagerMetrics,
session::{active::ActiveSession, config::SessionCounter},
};
pub use crate::{message::PeerRequestSender, session::handle::PeerInfo};
use fnv::FnvHashMap;
use futures::{future::Either, io, FutureExt, StreamExt};
use reth_ecies::{stream::ECIESStream, ECIESError};
@ -40,11 +39,13 @@ use tracing::{instrument, trace};
mod active;
mod config;
mod handle;
pub use crate::message::PeerRequestSender;
pub use config::{SessionLimits, SessionsConfig};
pub use handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
SessionCommand,
};
pub use reth_network_api::{Direction, PeerInfo};
/// Internal identifier for active sessions.
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
@ -388,6 +389,7 @@ impl SessionManager {
PendingSessionEvent::Established {
session_id,
remote_addr,
local_addr,
peer_id,
capabilities,
conn,
@ -460,6 +462,7 @@ impl SessionManager {
let client_version = Arc::new(client_id);
let handle = ActiveSessionHandle {
status,
direction,
session_id,
remote_id: peer_id,
@ -469,6 +472,7 @@ impl SessionManager {
commands_to_session,
client_version: Arc::clone(&client_version),
remote_addr,
local_addr,
};
self.active_sessions.insert(peer_id, handle);
@ -563,29 +567,14 @@ impl SessionManager {
/// Returns [`PeerInfo`] for all connected peers
pub fn get_peer_info(&self) -> Vec<PeerInfo> {
self.active_sessions
.values()
.map(|session| PeerInfo {
remote_id: session.remote_id,
direction: session.direction,
remote_addr: session.remote_addr,
capabilities: session.capabilities.clone(),
client_version: session.client_version.clone(),
})
.collect()
self.active_sessions.values().map(ActiveSessionHandle::peer_info).collect()
}
/// Returns [`PeerInfo`] for a given peer.
///
/// Returns `None` if there's no active session to the peer.
pub fn get_peer_info_by_id(&self, peer_id: PeerId) -> Option<PeerInfo> {
self.active_sessions.get(&peer_id).map(|session| PeerInfo {
remote_id: session.remote_id,
direction: session.direction,
remote_addr: session.remote_addr,
capabilities: session.capabilities.clone(),
client_version: session.client_version.clone(),
})
self.active_sessions.get(&peer_id).map(ActiveSessionHandle::peer_info)
}
}
@ -713,36 +702,6 @@ impl PendingSessionHandshakeError {
}
}
/// The direction of the connection.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Direction {
/// Incoming connection.
Incoming,
/// Outgoing connection to a specific node.
Outgoing(PeerId),
}
impl Direction {
/// Returns `true` if this an incoming connection.
pub fn is_incoming(&self) -> bool {
matches!(self, Direction::Incoming)
}
/// Returns `true` if this an outgoing connection.
pub(crate) fn is_outgoing(&self) -> bool {
matches!(self, Direction::Outgoing(_))
}
}
impl std::fmt::Display for Direction {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Direction::Incoming => write!(f, "incoming"),
Direction::Outgoing(_) => write!(f, "outgoing"),
}
}
}
/// The error thrown when the max configured limit has been reached and no more connections are
/// accepted.
#[derive(Debug, Clone, thiserror::Error)]
@ -837,6 +796,7 @@ async fn authenticate(
status: Status,
fork_filter: ForkFilter,
) {
let local_addr = stream.inner().local_addr().ok();
let stream = match get_eciess_stream(stream, secret_key, direction).await {
Ok(stream) => stream,
Err(error) => {
@ -858,6 +818,7 @@ async fn authenticate(
unauthed,
session_id,
remote_addr,
local_addr,
direction,
hello,
status,
@ -905,6 +866,7 @@ async fn authenticate_stream(
stream: UnauthedP2PStream<ECIESStream<MeteredStream<TcpStream>>>,
session_id: SessionId,
remote_addr: SocketAddr,
local_addr: Option<SocketAddr>,
direction: Direction,
hello: HelloMessage,
status: Status,
@ -942,6 +904,7 @@ async fn authenticate_stream(
PendingSessionEvent::Established {
session_id,
remote_addr,
local_addr,
peer_id: their_hello.id,
capabilities: Arc::new(Capabilities::from(their_hello.capabilities)),
status: their_status,