Network: Extend NetworkInfo trait with NetworkStatus (#923)

This commit is contained in:
Enrique Ortiz
2023-01-19 15:43:51 -04:00
committed by GitHub
parent 2a77e08080
commit cfef666886
11 changed files with 113 additions and 43 deletions

3
Cargo.lock generated
View File

@ -4408,7 +4408,9 @@ dependencies = [
name = "reth-network-api"
version = "0.1.0"
dependencies = [
"async-trait",
"reth-primitives",
"serde",
]
[[package]]
@ -4560,6 +4562,7 @@ name = "reth-rpc-types"
version = "0.1.0"
dependencies = [
"bytes",
"reth-network-api",
"reth-primitives",
"reth-rlp",
"serde",

View File

@ -8,4 +8,11 @@ readme = "README.md"
description = "Network interfaces"
[dependencies]
# reth
reth-primitives = { path = "../../primitives" }
# io
serde = { version = "1.0", features = ["derive"] }
# misc
async-trait = "0.1"

View File

@ -9,13 +9,22 @@
//!
//! Provides abstractions for the reth-network crate.
use reth_primitives::NodeRecord;
use std::net::SocketAddr;
use async_trait::async_trait;
use reth_primitives::{NodeRecord, H256, U256};
use serde::{Deserialize, Serialize};
use std::{error::Error, net::SocketAddr};
/// Provides general purpose information about the network.
#[async_trait]
pub trait NetworkInfo: Send + Sync {
/// Associated error type for the network implementation.
type Error: Send + Sync + Error;
/// Returns the [`SocketAddr`] that listens for incoming connections.
fn local_addr(&self) -> SocketAddr;
/// Returns the current status of the network being ran by the local node.
async fn network_status(&self) -> Result<NetworkStatus, Self::Error>;
}
/// Provides general purpose information about Peers in the network.
@ -28,3 +37,25 @@ pub trait PeersInfo: Send + Sync {
/// Returns the Ethereum Node Record of the node.
fn local_node_record(&self) -> NodeRecord;
}
/// The status of the network being ran by the local node.
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct NetworkStatus {
/// The local node client name.
pub client_name: String,
/// Information about the Ethereum Wire Protocol.
pub eth_protocol_info: EthProtocolInfo,
}
/// Information about the Ethereum Wire Protocol (ETH)
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct EthProtocolInfo {
/// The current difficulty at the head of the chain.
pub difficulty: U256,
/// The block hash of the head of the chain.
pub head: H256,
/// Network ID in base 10.
pub network: u64,
/// Genesis block of the current chain.
pub genesis: H256,
}

View File

@ -307,6 +307,11 @@ where
self.swarm.state().fetch_client()
}
/// Returns the current [`Status`] for the local node.
pub fn status(&self) -> Status {
self.swarm.sessions().status()
}
/// Event hook for an unexpected message from the peer.
fn on_invalid_message(
&mut self,
@ -499,6 +504,9 @@ where
NetworkHandleMessage::FetchClient(tx) => {
let _ = tx.send(self.fetch_client());
}
NetworkHandleMessage::GetStatus(tx) => {
let _ = tx.send(self.status());
}
NetworkHandleMessage::StatusUpdate { height, hash, total_difficulty } => {
if let Some(transition) =
self.swarm.sessions_mut().on_status_update(height, hash, total_difficulty)

View File

@ -6,14 +6,17 @@ use crate::{
session::PeerInfo,
FetchClient,
};
use async_trait::async_trait;
use parking_lot::Mutex;
use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions};
use reth_eth_wire::{
DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions, Status,
};
use reth_interfaces::{
p2p::headers::client::StatusUpdater,
sync::{SyncState, SyncStateProvider, SyncStateUpdater},
};
use reth_net_common::bandwidth_meter::BandwidthMeter;
use reth_network_api::{NetworkInfo, PeersInfo};
use reth_network_api::{EthProtocolInfo, NetworkInfo, NetworkStatus, PeersInfo};
use reth_primitives::{NodeRecord, PeerId, TransactionSigned, TxHash, H256, U256};
use std::{
net::SocketAddr,
@ -126,6 +129,13 @@ impl NetworkHandle {
self.send_message(NetworkHandleMessage::StatusUpdate { height, hash, total_difficulty });
}
/// Get the current status of the node.
pub async fn get_status(&self) -> Result<Status, oneshot::error::RecvError> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetStatus(tx));
rx.await
}
/// Announce a block over devp2p
///
/// Caution: in PoS this is a noop, since new block propagation will happen over devp2p
@ -215,10 +225,27 @@ impl PeersInfo for NetworkHandle {
}
}
#[async_trait]
impl NetworkInfo for NetworkHandle {
type Error = oneshot::error::RecvError;
fn local_addr(&self) -> SocketAddr {
*self.inner.listener_address.lock()
}
async fn network_status(&self) -> Result<NetworkStatus, Self::Error> {
let status = self.get_status().await?;
Ok(NetworkStatus {
client_name: "Reth".to_string(),
eth_protocol_info: EthProtocolInfo {
difficulty: status.total_difficulty,
head: status.blockhash,
network: status.chain.id(),
genesis: status.genesis,
},
})
}
}
impl StatusUpdater for NetworkHandle {
@ -251,7 +278,7 @@ struct NetworkInner {
listener_address: Arc<Mutex<SocketAddr>>,
/// The identifier used by this node.
local_peer_id: PeerId,
/// Access to the all the nodes
/// Access to the all the nodes.
peers: PeersHandle,
/// The mode of the network
network_mode: NetworkMode,
@ -291,6 +318,8 @@ pub(crate) enum NetworkHandleMessage {
FetchClient(oneshot::Sender<FetchClient>),
/// Apply a status update.
StatusUpdate { height: u64, hash: H256, total_difficulty: U256 },
/// Get the currenet status
GetStatus(oneshot::Sender<Status>),
/// Get PeerInfo from all the peers
GetPeerInfo(oneshot::Sender<Vec<PeerInfo>>),
/// Get PeerInfo for a specific peer

View File

@ -49,6 +49,7 @@ pub struct SessionId(usize);
/// Manages a set of sessions.
#[must_use = "Session Manager must be polled to process session events."]
#[derive(Debug)]
pub(crate) struct SessionManager {
/// Tracks the identifier for the next session.
next_id: usize,
@ -142,6 +143,11 @@ impl SessionManager {
SessionId(id)
}
/// Returns the current status of the session.
pub(crate) fn status(&self) -> Status {
self.status
}
/// Spawns the given future onto a new task that is tracked in the `spawned_tasks` [`JoinSet`].
fn spawn<F>(&self, f: F)
where

View File

@ -101,6 +101,11 @@ where
&self.incoming
}
/// Access to the [`SessionManager`].
pub(crate) fn sessions(&self) -> &SessionManager {
&self.sessions
}
/// Mutable access to the [`SessionManager`].
pub(crate) fn sessions_mut(&mut self) -> &mut SessionManager {
&mut self.sessions

View File

@ -37,5 +37,5 @@ pub trait AdminApi {
/// Returns the ENR of the node.
#[method(name = "admin_nodeInfo")]
fn node_info(&self) -> Result<NodeInfo>;
async fn node_info(&self) -> Result<NodeInfo>;
}

View File

@ -12,6 +12,7 @@ Reth RPC types
# reth
reth-primitives = { path = "../../primitives" }
reth-rlp = { path = "../../common/rlp" }
reth-network-api = { path = "../network-api"}
# misc
serde = { version = "1.0", features = ["derive"] }

View File

@ -1,9 +1,7 @@
use reth_primitives::{NodeRecord, PeerId, H256, U256};
use reth_network_api::NetworkStatus;
use reth_primitives::{NodeRecord, PeerId};
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
net::{IpAddr, SocketAddr},
};
use std::net::{IpAddr, SocketAddr};
/// Represents the `admin_nodeInfo` response, which can be queried for all the information
/// known about the running node at the networking granularity.
@ -21,28 +19,23 @@ pub struct NodeInfo {
/// Address exposed for listening for the local node.
#[serde(rename = "listenAddr")]
pub listen_addr: SocketAddr,
/// Local node client name.
pub name: String,
/// Ports exposed by the node for discovery and listening.
pub ports: Ports,
/// Networking protocols being run by the local node.
pub protocols: BTreeMap<String, ProtocolInfo>,
#[serde(flatten)]
pub status: NetworkStatus,
}
impl NodeInfo {
/// Creates a new instance of `NodeInfo`.
pub fn new(enr: NodeRecord) -> NodeInfo {
let protocol_info =
BTreeMap::from([("eth".into(), ProtocolInfo::Eth(EthProtocolInfo::default()))]);
pub fn new(enr: NodeRecord, status: NetworkStatus) -> NodeInfo {
NodeInfo {
enode: enr,
id: enr.id,
ip: enr.address,
listen_addr: enr.tcp_addr(),
name: "Reth".to_owned(),
ports: Ports { discovery: enr.udp_port, listener: enr.tcp_port },
protocols: protocol_info,
status,
}
}
}
@ -55,22 +48,3 @@ pub struct Ports {
/// Port exposed for listening.
pub listener: u16,
}
/// Information about the different protocols that can be run by the node (ETH, )
#[derive(Serialize, Deserialize, Debug)]
pub enum ProtocolInfo {
/// Information about the Ethereum Wire Protocol.
Eth(EthProtocolInfo),
}
/// Information about the Ethereum Wire Protocol (ETH)
#[derive(Serialize, Deserialize, Debug, Default)]
pub struct EthProtocolInfo {
/// The current difficulty at the head of the chain.
pub difficulty: U256,
/// The block hash of the head of the chain.
pub head: H256,
/// Network ID in base 10.
pub network: u64,
/// Genesis block of the current chain.
pub genesis: H256,
}

View File

@ -1,6 +1,7 @@
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
use reth_network::{peers::PeerKind, NetworkHandle};
use reth_network_api::PeersInfo;
use reth_network_api::{NetworkInfo, PeersInfo};
use reth_primitives::NodeRecord;
use reth_rpc_api::AdminApiServer;
use reth_rpc_types::NodeInfo;
@ -20,6 +21,7 @@ impl AdminApi {
}
}
#[async_trait]
impl AdminApiServer for AdminApi {
fn add_peer(&self, record: NodeRecord) -> RpcResult<bool> {
self.network.add_peer(record.id, record.tcp_addr());
@ -48,10 +50,14 @@ impl AdminApiServer for AdminApi {
todo!()
}
fn node_info(&self) -> RpcResult<NodeInfo> {
async fn node_info(&self) -> RpcResult<NodeInfo> {
let enr = self.network.local_node_record();
let status = match self.network.network_status().await {
Ok(status) => status,
Err(e) => return RpcResult::Err(jsonrpsee::core::Error::Custom(e.to_string())),
};
Ok(NodeInfo::new(enr))
Ok(NodeInfo::new(enr, status))
}
}