refactor: add Peers trait und decouple rpc from NetworkHandle (#1100)

This commit is contained in:
Matthias Seitz
2023-01-31 19:32:45 +01:00
committed by GitHub
parent 5da6b07d9e
commit 5c32ad01c9
16 changed files with 110 additions and 90 deletions

3
Cargo.lock generated
View File

@ -4428,6 +4428,7 @@ name = "reth-network-api"
version = "0.1.0"
dependencies = [
"async-trait",
"reth-eth-wire",
"reth-primitives",
"serde",
"thiserror",
@ -4540,7 +4541,6 @@ dependencies = [
"hex",
"jsonrpsee",
"reth-interfaces",
"reth-network",
"reth-network-api",
"reth-primitives",
"reth-provider",
@ -4629,6 +4629,7 @@ dependencies = [
"reth-interfaces",
"reth-net-nat",
"reth-network",
"reth-network-api",
"reth-primitives",
"reth-provider",
"reth-staged-sync",

View File

@ -10,7 +10,7 @@ description = "Network interfaces"
[dependencies]
# reth
reth-primitives = { path = "../../primitives" }
reth-eth-wire = { path = "../eth-wire" }
# io
serde = { version = "1.0", features = ["derive"] }

View File

@ -10,7 +10,7 @@
//! Provides abstractions for the reth-network crate.
use async_trait::async_trait;
use reth_primitives::{NodeRecord, H256, U256};
use reth_primitives::{NodeRecord, PeerId, H256, U256};
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
@ -21,6 +21,7 @@ pub mod reputation;
pub use error::NetworkError;
pub use reputation::{Reputation, ReputationChangeKind};
use reth_eth_wire::DisconnectReason;
/// Provides general purpose information about the network.
#[async_trait]
@ -46,6 +47,44 @@ pub trait PeersInfo: Send + Sync {
fn local_node_record(&self) -> NodeRecord;
}
/// Provides an API for managing the peers of the network.
pub trait Peers: PeersInfo {
/// Adds a peer to the peer set.
fn add_peer(&self, peer: PeerId, addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Basic, addr);
}
/// Adds a trusted peer to the peer set.
fn add_trusted_peer(&self, peer: PeerId, addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Trusted, addr);
}
/// Adds a peer to the known peer set, with the given kind.
fn add_peer_kind(&self, peer: PeerId, kind: PeerKind, addr: SocketAddr);
/// Removes a peer from the peer set that corresponds to given kind.
fn remove_peer(&self, peer: PeerId, kind: PeerKind);
/// Disconnect an existing connection to the given peer.
fn disconnect_peer(&self, peer: PeerId);
/// Disconnect an existing connection to the given peer using the provided reason
fn disconnect_peer_with_reason(&self, peer: PeerId, reason: DisconnectReason);
/// Send a reputation change for the given peer.
fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind);
}
/// Represents the kind of peer
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub enum PeerKind {
/// Basic peer kind.
#[default]
Basic,
/// Trusted peer.
Trusted,
}
/// The status of the network being ran by the local node.
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct NetworkStatus {

View File

@ -17,7 +17,6 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{mpsc::UnboundedReceiver, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;

View File

@ -1,10 +1,6 @@
use crate::{
config::NetworkMode,
manager::NetworkEvent,
message::PeerRequest,
peers::{PeerKind, PeersHandle},
session::PeerInfo,
FetchClient,
config::NetworkMode, manager::NetworkEvent, message::PeerRequest, peers::PeersHandle,
session::PeerInfo, FetchClient,
};
use async_trait::async_trait;
use parking_lot::Mutex;
@ -14,7 +10,9 @@ use reth_interfaces::{
sync::{SyncState, SyncStateProvider, SyncStateUpdater},
};
use reth_net_common::bandwidth_meter::BandwidthMeter;
use reth_network_api::{NetworkError, NetworkInfo, NetworkStatus, PeersInfo, ReputationChangeKind};
use reth_network_api::{
NetworkError, NetworkInfo, NetworkStatus, PeerKind, Peers, PeersInfo, ReputationChangeKind,
};
use reth_primitives::{NodeRecord, PeerId, TransactionSigned, TxHash, H256, U256};
use std::{
net::SocketAddr,
@ -137,47 +135,6 @@ impl NetworkHandle {
self.send_message(NetworkHandleMessage::AnnounceBlock(block, hash))
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a peer to the known
/// set
pub fn add_peer(&self, peer: PeerId, addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Basic, addr);
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a trusted peer
/// to the known set
pub fn add_trusted_peer(&self, peer: PeerId, addr: SocketAddr) {
self.add_peer_kind(peer, PeerKind::Trusted, addr);
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a peer to the known
/// set, with the given kind.
pub fn add_peer_kind(&self, peer: PeerId, kind: PeerKind, addr: SocketAddr) {
self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr));
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to remove a peer from the
/// set corresponding to given kind.
pub fn remove_peer(&self, peer: PeerId, kind: PeerKind) {
self.send_message(NetworkHandleMessage::RemovePeer(peer, kind))
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to disconnect an existing
/// connection to the given peer.
pub fn disconnect_peer(&self, peer: PeerId) {
self.send_message(NetworkHandleMessage::DisconnectPeer(peer, None))
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to disconnect an existing
/// connection to the given peer using the provided reason
pub fn disconnect_peer_with_reason(&self, peer: PeerId, reason: DisconnectReason) {
self.send_message(NetworkHandleMessage::DisconnectPeer(peer, Some(reason)))
}
/// Send a reputation change for the given peer.
pub fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) {
self.send_message(NetworkHandleMessage::ReputationChange(peer_id, kind));
}
/// Sends a [`PeerRequest`] to the given peer's session.
pub fn send_request(&self, peer_id: PeerId, request: PeerRequest) {
self.send_message(NetworkHandleMessage::EthRequest { peer_id, request })
@ -219,6 +176,37 @@ impl PeersInfo for NetworkHandle {
}
}
impl Peers for NetworkHandle {
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to add a peer to the known
/// set, with the given kind.
fn add_peer_kind(&self, peer: PeerId, kind: PeerKind, addr: SocketAddr) {
self.send_message(NetworkHandleMessage::AddPeerAddress(peer, kind, addr));
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to remove a peer from the
/// set corresponding to given kind.
fn remove_peer(&self, peer: PeerId, kind: PeerKind) {
self.send_message(NetworkHandleMessage::RemovePeer(peer, kind))
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to disconnect an existing
/// connection to the given peer.
fn disconnect_peer(&self, peer: PeerId) {
self.send_message(NetworkHandleMessage::DisconnectPeer(peer, None))
}
/// Sends a message to the [`NetworkManager`](crate::NetworkManager) to disconnect an existing
/// connection to the given peer using the provided reason
fn disconnect_peer_with_reason(&self, peer: PeerId, reason: DisconnectReason) {
self.send_message(NetworkHandleMessage::DisconnectPeer(peer, Some(reason)))
}
/// Send a reputation change for the given peer.
fn reputation_change(&self, peer_id: PeerId, kind: ReputationChangeKind) {
self.send_message(NetworkHandleMessage::ReputationChange(peer_id, kind));
}
}
#[async_trait]
impl NetworkInfo for NetworkHandle {
fn local_addr(&self) -> SocketAddr {

View File

@ -9,7 +9,7 @@ use crate::{
use futures::StreamExt;
use reth_eth_wire::{errors::EthStreamError, DisconnectReason};
use reth_net_common::ban_list::BanList;
use reth_network_api::ReputationChangeKind;
use reth_network_api::{PeerKind, ReputationChangeKind};
use reth_primitives::{ForkId, NodeRecord, PeerId};
use std::{
collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
@ -860,16 +860,6 @@ impl PeerConnectionState {
}
}
/// Represents the kind of peer
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq)]
pub enum PeerKind {
/// Basic peer kind.
#[default]
Basic,
/// Trusted peer.
Trusted,
}
/// Commands the [`PeersManager`] listens for.
pub(crate) enum PeerCommand {
/// Command for manually add

View File

@ -4,5 +4,6 @@ mod manager;
mod reputation;
pub(crate) use manager::{InboundConnectionError, PeerAction, PeersManager};
pub use manager::{PeerKind, PeersConfig, PeersHandle};
pub use manager::{PeersConfig, PeersHandle};
pub use reputation::ReputationChangeWeights;
pub use reth_network_api::PeerKind;

View File

@ -8,12 +8,13 @@ use crate::{
BlockRequest, NewBlockMessage, PeerRequest, PeerRequestSender, PeerResponse,
PeerResponseResult,
},
peers::{PeerAction, PeerKind, PeersManager},
peers::{PeerAction, PeersManager},
FetchClient,
};
use reth_eth_wire::{
capability::Capabilities, BlockHashNumber, DisconnectReason, NewBlockHashes, Status,
};
use reth_network_api::PeerKind;
use reth_primitives::{ForkId, PeerId, H256};
use reth_provider::BlockProvider;
use std::{

View File

@ -13,7 +13,7 @@ use reth_eth_wire::{
GetPooledTransactions, NewPooledTransactionHashes, PooledTransactions, Transactions,
};
use reth_interfaces::{p2p::error::RequestResult, sync::SyncStateProvider};
use reth_network_api::ReputationChangeKind;
use reth_network_api::{Peers, ReputationChangeKind};
use reth_primitives::{
FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, H256,
};

View File

@ -1,9 +1,7 @@
//! Connection tests
use ethers_core::utils::Geth;
use ethers_providers::{Http, Middleware, Provider};
use futures::StreamExt;
use reth_discv4::{bootnodes::mainnet_nodes, Discv4Config};
use reth_eth_wire::DisconnectReason;
@ -18,7 +16,7 @@ use reth_network::{
},
NetworkConfigBuilder, NetworkEvent, NetworkManager, PeersConfig,
};
use reth_network_api::{NetworkInfo, PeersInfo};
use reth_network_api::{NetworkInfo, Peers, PeersInfo};
use reth_primitives::{HeadersDirection, NodeRecord, PeerId};
use reth_provider::test_utils::NoopProvider;
use reth_transaction_pool::test_utils::testing_pool;

View File

@ -7,7 +7,7 @@ use reth_interfaces::p2p::{
headers::client::{HeadersClient, HeadersRequest},
};
use reth_network::test_utils::{NetworkEventStream, Testnet};
use reth_network_api::NetworkInfo;
use reth_network_api::{NetworkInfo, Peers};
use reth_primitives::{
Block, Bytes, Header, HeadersDirection, Signature, Transaction, TransactionKind,
TransactionSigned, TxEip2930, H256, U256,

View File

@ -16,7 +16,6 @@ reth-rpc-api = { path = "../rpc-api" }
reth-rpc-types = { path = "../rpc-types" }
reth-provider = { path = "../../storage/provider" }
reth-transaction-pool = { path = "../../transaction-pool" }
reth-network = { path = "../network" }
reth-network-api = { path = "../network-api" }
reth-rpc-engine-api = { path = "../rpc-engine-api" }

View File

@ -1,31 +1,31 @@
use crate::result::ToRpcResult;
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
use reth_network::{peers::PeerKind, NetworkHandle};
use reth_network_api::{NetworkInfo, PeersInfo};
use reth_network_api::{NetworkInfo, PeerKind, Peers};
use reth_primitives::NodeRecord;
use reth_rpc_api::AdminApiServer;
use reth_rpc_types::NodeInfo;
use crate::result::ToRpcResult;
/// `admin` API implementation.
///
/// This type provides the functionality for handling `admin` related requests.
pub struct AdminApi {
pub struct AdminApi<N> {
/// An interface to interact with the network
network: NetworkHandle,
network: N,
}
impl AdminApi {
impl<N> AdminApi<N> {
/// Creates a new instance of `AdminApi`.
pub fn new(network: NetworkHandle) -> AdminApi {
pub fn new(network: N) -> Self {
AdminApi { network }
}
}
#[async_trait]
impl AdminApiServer for AdminApi {
impl<N> AdminApiServer for AdminApi<N>
where
N: NetworkInfo + Peers + 'static,
{
fn add_peer(&self, record: NodeRecord) -> RpcResult<bool> {
self.network.add_peer(record.id, record.tcp_addr());
Ok(true)
@ -61,7 +61,7 @@ impl AdminApiServer for AdminApi {
}
}
impl std::fmt::Debug for AdminApi {
impl<N> std::fmt::Debug for AdminApi<N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AdminApi").finish_non_exhaustive()
}

View File

@ -1,7 +1,6 @@
use crate::result::ToRpcResult;
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
use reth_network::NetworkHandle;
use reth_network_api::NetworkInfo;
use reth_primitives::{keccak256, Bytes, H256};
use reth_rpc_api::Web3ApiServer;
@ -9,20 +8,23 @@ use reth_rpc_api::Web3ApiServer;
/// `web3` API implementation.
///
/// This type provides the functionality for handling `web3` related requests.
pub struct Web3Api {
pub struct Web3Api<N> {
/// An interface to interact with the network
network: NetworkHandle,
network: N,
}
impl Web3Api {
impl<N> Web3Api<N> {
/// Creates a new instance of `Web3Api`.
pub fn new(network: NetworkHandle) -> Web3Api {
pub fn new(network: N) -> Self {
Web3Api { network }
}
}
#[async_trait]
impl Web3ApiServer for Web3Api {
impl<N> Web3ApiServer for Web3Api<N>
where
N: NetworkInfo + 'static,
{
async fn client_version(&self) -> RpcResult<String> {
let status = self.network.network_status().await.to_rpc_result()?;
Ok(status.client_version)
@ -33,7 +35,7 @@ impl Web3ApiServer for Web3Api {
}
}
impl std::fmt::Debug for Web3Api {
impl<N> std::fmt::Debug for Web3Api<N> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Web3Api").finish_non_exhaustive()
}

View File

@ -17,7 +17,8 @@ normal = [
# reth
reth-db = {path = "../../crates/storage/db", features = ["mdbx", "test-utils"] }
reth-discv4 = { path = "../../crates/net/discv4" }
reth-network = {path = "../../crates/net/network", features = ["serde"] }
reth-network-api = { path = "../../crates/net/network-api" }
reth-network = { path = "../../crates/net/network", features = ["serde"] }
reth-primitives = { path = "../../crates/primitives" }
reth-provider = { path = "../../crates/storage/provider", features = ["test-utils"] }
reth-net-nat = { path = "../../crates/net/nat" }

View File

@ -7,6 +7,7 @@ use reth_network::{
test_utils::{unused_port, unused_tcp_udp, NetworkEventStream},
NetworkConfig, NetworkManager,
};
use reth_network_api::Peers;
use reth_primitives::{
constants::EIP1559_INITIAL_BASE_FEE, ChainSpec, Hardfork, Header, PeerId, SealedHeader,
};