RPC Admin namespace implementation (#685)

* AdminApi first draft

* Implement add_trusted_peer and remove_trusted_peer

* Fix tests

* Refactoring NetworkHandle peer related methods

* Refactoring add_discovered_node in PeersManager

* Change record type from String to NodeRecord

* Adding NetworkHandleMessage::RemovePeer command
This commit is contained in:
Aurélien
2023-01-03 14:02:05 +01:00
committed by GitHub
parent debf21d34e
commit 65bc3cb7b8
11 changed files with 141 additions and 38 deletions

View File

@ -138,3 +138,5 @@ pub use manager::{NetworkEvent, NetworkManager};
pub use message::PeerRequest;
pub use network::NetworkHandle;
pub use peers::PeersConfig;
pub use reth_eth_wire::DisconnectReason;

View File

@ -459,8 +459,11 @@ where
.swarm
.sessions_mut()
.send_message(&peer_id, PeerMessage::PooledTransactions(msg)),
NetworkHandleMessage::AddPeerAddress(peer, addr) => {
self.swarm.state_mut().add_peer_address(peer, addr);
NetworkHandleMessage::AddPeerAddress(peer, kind, addr) => {
self.swarm.state_mut().add_peer_kind(peer, kind, addr);
}
NetworkHandleMessage::RemovePeer(peer_id, kind) => {
self.swarm.state_mut().remove_peer(peer_id, kind);
}
NetworkHandleMessage::DisconnectPeer(peer_id, reason) => {
self.swarm.sessions_mut().disconnect(peer_id, reason);

View File

@ -2,7 +2,7 @@ use crate::{
config::NetworkMode,
manager::NetworkEvent,
message::PeerRequest,
peers::{PeersHandle, ReputationChangeKind},
peers::{PeerKind, PeersHandle, ReputationChangeKind},
session::PeerInfo,
FetchClient,
};
@ -142,7 +142,25 @@ impl NetworkHandle {
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a peer to the known
/// set
pub fn add_peer(&self, peer: PeerId, addr: SocketAddr) {
let _ = self.inner.to_manager_tx.send(NetworkHandleMessage::AddPeerAddress(peer, addr));
self.add_peer_kind(peer, PeerKind::Basic, addr);
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a trusted peer
/// to the known set
pub fn add_trusted_peer(&self, peer: PeerId, addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Trusted, addr);
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a peer to the known
/// set, with the given kind.
pub fn add_peer_kind(&self, peer: PeerId, kind: PeerKind, addr: SocketAddr) {
self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr));
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to remove a peer from the
/// set corresponding to given kind.
pub fn remove_peer(&self, peer: PeerId, kind: PeerKind) {
self.send_message(NetworkHandleMessage::RemovePeer(peer, kind))
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to disconnect an existing
@ -226,7 +244,9 @@ struct NetworkInner {
#[allow(missing_docs)]
pub(crate) enum NetworkHandleMessage {
/// Adds an address for a peer.
AddPeerAddress(PeerId, SocketAddr),
AddPeerAddress(PeerId, PeerKind, SocketAddr),
/// Removes a peer from the peerset correponding to the given kind.
RemovePeer(PeerId, PeerKind),
/// Disconnect a connection to a peer if it exists.
DisconnectPeer(PeerId, Option<DisconnectReason>),
/// Add a new listener for [`NetworkEvent`].

View File

@ -403,9 +403,23 @@ impl PeersManager {
/// Called for a newly discovered peer.
///
/// If the peer already exists, then the address will be updated. If the addresses differ, the
/// old address is returned
pub(crate) fn add_discovered_node(&mut self, peer_id: PeerId, addr: SocketAddr) {
/// If the peer already exists, then the address and kind will be updated.
pub(crate) fn add_peer(&mut self, peer_id: PeerId, addr: SocketAddr) {
self.add_peer_kind(peer_id, PeerKind::Basic, addr)
}
/// Called for a newly discovered trusted peer.
///
/// If the peer already exists, then the address and kind will be updated.
#[allow(dead_code)]
pub(crate) fn add_trusted_peer(&mut self, peer_id: PeerId, addr: SocketAddr) {
self.add_peer_kind(peer_id, PeerKind::Trusted, addr)
}
/// Called for a newly discovered peer.
///
/// If the peer already exists, then the address and kind will be updated.
pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: SocketAddr) {
if self.ban_list.is_banned(&peer_id, &addr.ip()) {
return
}
@ -413,12 +427,13 @@ impl PeersManager {
match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
let node = entry.get_mut();
node.kind = kind;
node.addr = addr;
return
}
Entry::Vacant(entry) => {
trace!(target : "net::peers", ?peer_id, ?addr, "discovered new node");
entry.insert(Peer::new(addr));
entry.insert(Peer::with_kind(addr, kind));
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
}
}
@ -427,7 +442,7 @@ impl PeersManager {
}
/// Removes the tracked node from the set.
pub(crate) fn remove_discovered_node(&mut self, peer_id: PeerId) {
pub(crate) fn remove_peer(&mut self, peer_id: PeerId) {
let Entry::Occupied(entry) = self.peers.entry(peer_id) else { return };
if entry.get().is_trusted() {
return
@ -453,6 +468,18 @@ impl PeersManager {
}
}
/// Removes the tracked node from the trusted set.
pub(crate) fn remove_peer_from_trusted_set(&mut self, peer_id: PeerId) {
let Entry::Occupied(mut entry) = self.peers.entry(peer_id) else { return };
if !entry.get().is_trusted() {
return
}
let peer = entry.get_mut();
peer.kind = PeerKind::Basic;
}
/// Returns the idle peer with the highest reputation.
///
/// Peers that are `trusted`, see [PeerKind], are prioritized as long as they're not currently
@ -539,9 +566,9 @@ impl PeersManager {
while let Poll::Ready(Some(cmd)) = self.handle_rx.poll_next_unpin(cx) {
match cmd {
PeerCommand::Add(peer_id, addr) => {
self.add_discovered_node(peer_id, addr);
self.add_peer(peer_id, addr);
}
PeerCommand::Remove(peer) => self.remove_discovered_node(peer),
PeerCommand::Remove(peer) => self.remove_peer(peer),
PeerCommand::ReputationChange(peer_id, rep) => {
self.apply_reputation_change(&peer_id, rep)
}
@ -679,6 +706,10 @@ impl Peer {
}
}
fn with_kind(addr: SocketAddr, kind: PeerKind) -> Self {
Self { kind, ..Self::new(addr) }
}
/// Applies a reputation change to the peer and returns what action should be taken.
fn apply_reputation(&mut self, reputation: i32) -> ReputationChangeOutcome {
let previous = self.reputation;
@ -778,7 +809,7 @@ impl PeerConnectionState {
/// Represents the kind of peer
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
enum PeerKind {
pub enum PeerKind {
/// Basic peer kind.
#[default]
Basic,
@ -979,7 +1010,7 @@ mod test {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
peers.add_peer(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1002,7 +1033,7 @@ mod test {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.ban_peer(peer);
peers.add_discovered_node(peer, socket_addr);
peers.add_peer(peer, socket_addr);
match event!(peers) {
PeerAction::BanPeer { peer_id } => {
@ -1026,7 +1057,7 @@ mod test {
let backoff_duration = Duration::from_secs(3);
let config = PeersConfig { backoff_duration, ..Default::default() };
let mut peers = PeersManager::new(config);
peers.add_discovered_node(peer, socket_addr);
peers.add_peer(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1089,7 +1120,7 @@ mod test {
let backoff_duration = Duration::from_secs(3);
let config = PeersConfig { backoff_duration, ..Default::default() };
let mut peers = PeersManager::new(config);
peers.add_discovered_node(peer, socket_addr);
peers.add_peer(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1149,7 +1180,7 @@ mod test {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
peers.add_peer(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1205,7 +1236,7 @@ mod test {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
peers.add_peer(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1317,7 +1348,7 @@ mod test {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
peers.add_peer(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1361,7 +1392,7 @@ mod test {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
peers.add_discovered_node(peer, socket_addr);
peers.add_peer(peer, socket_addr);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1380,7 +1411,7 @@ mod test {
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::Out);
peers.remove_discovered_node(peer);
peers.remove_peer(peer);
match event!(peers) {
PeerAction::PeerRemoved(peer_id) => {
@ -1398,7 +1429,7 @@ mod test {
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::DisconnectingOut);
peers.add_discovered_node(peer, socket_addr);
peers.add_peer(peer, socket_addr);
let p = peers.peers.get(&peer).unwrap();
assert_eq!(p.state, PeerConnectionState::DisconnectingOut);
@ -1413,7 +1444,7 @@ mod test {
let ban_list = BanList::new(HashSet::new(), vec![ip]);
let config = PeersConfig::default().with_ban_list(ban_list);
let mut peer_manager = PeersManager::new(config);
peer_manager.add_discovered_node(H512::default(), socket_addr);
peer_manager.add_peer(H512::default(), socket_addr);
assert!(peer_manager.peers.is_empty());
}
@ -1505,7 +1536,7 @@ mod test {
let basic_peer = PeerId::random();
let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
peers.add_discovered_node(basic_peer, basic_sock);
peers.add_peer(basic_peer, basic_sock);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
@ -1545,7 +1576,7 @@ mod test {
let basic_peer = PeerId::random();
let basic_sock = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
peers.add_discovered_node(basic_peer, basic_sock);
peers.add_peer(basic_peer, basic_sock);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {

View File

@ -4,5 +4,5 @@ mod manager;
mod reputation;
pub(crate) use manager::{InboundConnectionError, PeerAction, PeersManager};
pub use manager::{PeersConfig, PeersHandle};
pub use manager::{PeerKind, PeersConfig, PeersHandle};
pub use reputation::{ReputationChangeKind, ReputationChangeWeights};

View File

@ -8,7 +8,7 @@ use crate::{
BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse,
PeerResponseResult,
},
peers::{PeerAction, PeersManager},
peers::{PeerAction, PeerKind, PeersManager},
FetchClient,
};
use reth_eth_wire::{
@ -253,16 +253,23 @@ where
self.discovery.ban(peer_id, ip)
}
/// Adds a peer and its address to the peerset.
pub(crate) fn add_peer_address(&mut self, peer_id: PeerId, addr: SocketAddr) {
self.peers_manager.add_discovered_node(peer_id, addr)
/// Adds a peer and its address with the given kind to the peerset.
pub(crate) fn add_peer_kind(&mut self, peer_id: PeerId, kind: PeerKind, addr: SocketAddr) {
self.peers_manager.add_peer_kind(peer_id, kind, addr)
}
pub(crate) fn remove_peer(&mut self, peer_id: PeerId, kind: PeerKind) {
match kind {
PeerKind::Basic => self.peers_manager.remove_peer(peer_id),
PeerKind::Trusted => self.peers_manager.remove_peer_from_trusted_set(peer_id),
}
}
/// Event hook for events received from the discovery service.
fn on_discovery_event(&mut self, event: DiscoveryEvent) {
match event {
DiscoveryEvent::Discovered(peer, addr) => {
self.peers_manager.add_discovered_node(peer, addr);
self.peers_manager.add_peer(peer, addr);
}
DiscoveryEvent::EnrForkId(peer_id, fork_id) => {
self.queued_messages

View File

@ -237,7 +237,7 @@ where
if self.sessions.is_valid_fork_id(fork_id) {
self.state_mut().peers_mut().set_discovered_fork_id(peer_id, fork_id);
} else {
self.state_mut().peers_mut().remove_discovered_node(peer_id);
self.state_mut().peers_mut().remove_peer(peer_id);
}
}
}

View File

@ -1,4 +1,5 @@
use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc};
use reth_primitives::NodeRecord;
/// Admin namespace rpc interface that gives access to several non-standard RPC methods.
#[rpc(server)]
@ -6,24 +7,24 @@ use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc};
pub trait AdminApi {
/// Adds the given node record to the peerset.
#[method(name = "admin_addPeer")]
async fn add_peer(&self, record: String) -> Result<bool>;
fn add_peer(&self, record: NodeRecord) -> Result<bool>;
/// Disconnects from a remote node if the connection exists.
///
/// Returns true if the peer was successfully removed.
#[method(name = "admin_removePeer")]
async fn remove_peer(&self, record: String) -> Result<bool>;
fn remove_peer(&self, record: NodeRecord) -> Result<bool>;
/// Adds the given node record to the trusted peerset.
#[method(name = "admin_addTrustedPeer")]
async fn add_trusted_peer(&self, record: String) -> Result<bool>;
fn add_trusted_peer(&self, record: NodeRecord) -> Result<bool>;
/// Removes a remote node from the trusted peer set, but it does not disconnect it
/// automatically.
///
/// Returns true if the peer was successfully removed.
#[method(name = "admin_removeTrustedPeer")]
async fn remove_trusted_peer(&self, record: String) -> Result<bool>;
fn remove_trusted_peer(&self, record: NodeRecord) -> Result<bool>;
/// Creates an RPC subscription which serves events received from the network.
#[subscription(

View File

@ -20,7 +20,7 @@ mod trace;
mod web3;
pub use self::{
debug::DebugApiServer, engine::EngineApiServer, eth::EthApiServer,
admin::AdminApiServer, debug::DebugApiServer, engine::EngineApiServer, eth::EthApiServer,
eth_filter::EthFilterApiServer, eth_pubsub::EthPubSubApiServer, net::NetApiServer,
web3::Web3ApiServer,
};

View File

@ -0,0 +1,38 @@
use jsonrpsee::core::RpcResult;
use reth_network::{peers::PeerKind, NetworkHandle};
use reth_primitives::NodeRecord;
use reth_rpc_api::AdminApiServer;
struct AdminApi {
/// An interface to interact with the network
network: NetworkHandle,
}
impl AdminApiServer for AdminApi {
fn add_peer(&self, record: NodeRecord) -> RpcResult<bool> {
self.network.add_peer(record.id, record.tcp_addr());
Ok(true)
}
fn remove_peer(&self, record: NodeRecord) -> RpcResult<bool> {
self.network.remove_peer(record.id, PeerKind::Basic);
Ok(true)
}
fn add_trusted_peer(&self, record: NodeRecord) -> RpcResult<bool> {
self.network.add_trusted_peer(record.id, record.tcp_addr());
Ok(true)
}
fn remove_trusted_peer(&self, record: NodeRecord) -> RpcResult<bool> {
self.network.remove_peer(record.id, PeerKind::Trusted);
Ok(true)
}
fn subscribe(
&self,
_subscription_sink: jsonrpsee::SubscriptionSink,
) -> jsonrpsee::types::SubscriptionResult {
todo!()
}
}

View File

@ -11,6 +11,7 @@
//!
//! Provides the implementation of all RPC interfaces.
mod admin;
mod engine;
mod eth;
mod net;