feat(net): add NetworkMetrics (#708)

* Added NetworkMetrics

* update docs

* add more metrics and make naming consistent

* add active inbound and outbound connection metrics
This commit is contained in:
Sanket Shanbhag
2023-01-05 14:47:57 +05:30
committed by GitHub
parent 528c19f079
commit e4bd5b4fe9
7 changed files with 103 additions and 6 deletions

2
Cargo.lock generated
View File

@ -3816,6 +3816,7 @@ dependencies = [
"futures",
"hex",
"linked_hash_set",
"metrics",
"parking_lot 0.12.1",
"pin-project",
"rand 0.8.5",
@ -3823,6 +3824,7 @@ dependencies = [
"reth-ecies",
"reth-eth-wire",
"reth-interfaces",
"reth-metrics-derive",
"reth-net-common",
"reth-primitives",
"reth-provider",

View File

@ -38,6 +38,10 @@ tokio-stream = "0.1"
# io
serde = { version = "1.0", optional = true }
# metrics
metrics = "0.20.1"
reth-metrics-derive = { path = "../../metrics/metrics-derive" }
# misc
auto_impl = "1"
aquamarine = "0.1" # docs

View File

@ -124,6 +124,7 @@ mod import;
mod listener;
mod manager;
mod message;
mod metrics;
mod network;
pub mod peers;
mod session;

View File

@ -23,6 +23,7 @@ use crate::{
import::{BlockImport, BlockImportOutcome, BlockValidation},
listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender},
metrics::NetworkMetrics,
network::{NetworkHandle, NetworkHandleMessage},
peers::{PeersHandle, PeersManager, ReputationChangeKind},
session::SessionManager,
@ -102,6 +103,8 @@ pub struct NetworkManager<C> {
/// This is updated via internal events and shared via `Arc` with the [`NetworkHandle`]
/// Updated by the `NetworkWorker` and loaded by the `NetworkService`.
num_active_peers: Arc<AtomicUsize>,
/// Metrics for the Network
metrics: NetworkMetrics,
}
// === impl NetworkManager ===
@ -204,6 +207,7 @@ where
to_transactions_manager: None,
to_eth_request_handler: None,
num_active_peers,
metrics: Default::default(),
})
}
@ -527,7 +531,8 @@ where
this.on_peer_message(peer_id, message)
}
SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => {
this.on_invalid_message(peer_id, capabilities, message)
this.on_invalid_message(peer_id, capabilities, message);
this.metrics.invalid_messages_received.increment(1);
}
SwarmEvent::TcpListenerClosed { remote_addr } => {
trace!(target : "net", ?remote_addr, "TCP listener closed.");
@ -537,9 +542,17 @@ where
}
SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
trace!(target : "net", ?session_id, ?remote_addr, "Incoming connection");
this.metrics.total_incoming_connections.increment(1);
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
}
SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
trace!(target : "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
this.metrics.total_outgoing_connections.increment(1);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
}
SwarmEvent::SessionEstablished {
peer_id,
@ -550,6 +563,7 @@ where
direction,
} => {
let total_active = this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
this.metrics.connected_peers.set(total_active as f64);
info!(
target : "net",
?remote_addr,
@ -564,7 +578,6 @@ where
.peers_mut()
.on_active_inbound_session(peer_id, remote_addr);
}
this.event_listeners.send(NetworkEvent::SessionEstablished {
peer_id,
capabilities,
@ -573,15 +586,18 @@ where
});
}
SwarmEvent::PeerAdded(peer_id) => {
info!(target: "net", ?peer_id, "Peer added");
trace!(target: "net", ?peer_id, "Peer added");
this.event_listeners.send(NetworkEvent::PeerAdded(peer_id));
this.metrics.tracked_peers.increment(1f64);
}
SwarmEvent::PeerRemoved(peer_id) => {
info!(target: "net", ?peer_id, "Peer dropped");
trace!(target: "net", ?peer_id, "Peer dropped");
this.event_listeners.send(NetworkEvent::PeerRemoved(peer_id));
this.metrics.tracked_peers.decrement(1f64);
}
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
let total_active = this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
this.metrics.connected_peers.set(total_active as f64);
trace!(
target : "net",
?remote_addr,
@ -607,7 +623,15 @@ where
.peers_mut()
.on_active_session_gracefully_closed(peer_id);
}
this.metrics.closed_sessions.increment(1);
// This can either be an incoming or outgoing connection which was closed.
// So we update both metrics
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
this.event_listeners.send(NetworkEvent::SessionClosed { peer_id, reason });
}
SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
@ -623,12 +647,17 @@ where
.state_mut()
.peers_mut()
.on_incoming_pending_session_dropped(remote_addr, err);
this.metrics.pending_session_failures.increment(1);
} else {
this.swarm
.state_mut()
.peers_mut()
.on_incoming_pending_session_gracefully_closed();
}
this.metrics.closed_sessions.increment(1);
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
}
SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
warn!(
@ -645,12 +674,17 @@ where
&peer_id,
err,
);
this.metrics.pending_session_failures.increment(1);
} else {
this.swarm
.state_mut()
.peers_mut()
.on_pending_session_gracefully_closed(&peer_id);
}
this.metrics.closed_sessions.increment(1);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
}
SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
warn!(
@ -670,6 +704,7 @@ where
.state_mut()
.peers_mut()
.apply_reputation_change(&peer_id, ReputationChangeKind::FailedToConnect);
this.metrics.invalid_messages_received.increment(1);
}
}
}

View File

@ -0,0 +1,34 @@
use metrics::{Counter, Gauge};
use reth_metrics_derive::Metrics;
/// Metrics for the entire network, handled by NetworkManager
#[derive(Metrics)]
#[metrics(scope = "network")]
pub struct NetworkMetrics {
/// Number of currently connected peers
pub(crate) connected_peers: Gauge,
/// Number of peers known to the node
pub(crate) tracked_peers: Gauge,
/// Cumulative number of failures of pending sessions
pub(crate) pending_session_failures: Counter,
/// Total number of sessions closed
pub(crate) closed_sessions: Counter,
/// Number of active incoming connections
pub(crate) incoming_connections: Gauge,
/// Number of active outgoing connections
pub(crate) outgoing_connections: Gauge,
/// Total Number of incoming connections handled
pub(crate) total_incoming_connections: Counter,
/// Total Number of outgoing connections established
pub(crate) total_outgoing_connections: Counter,
/// Number of invalid/malformed messages received from peers
pub(crate) invalid_messages_received: Counter,
}

View File

@ -178,6 +178,16 @@ impl PeersManager {
self.connection_info.decr_in()
}
/// Returns the number of currently active inbound connections.
pub(crate) fn num_inbound_connections(&self) -> usize {
self.connection_info.num_inbound
}
/// Returns the number of currently active outbound connections.
pub(crate) fn num_outbound_connections(&self) -> usize {
self.connection_info.num_outbound
}
/// Invoked when a pending session was closed.
pub(crate) fn on_incoming_pending_session_dropped(
&mut self,

View File

@ -17,7 +17,7 @@ The main difference between metrics and traces is therefore that metrics are sys
To add metrics use the [`metrics`][metrics] crate.
1. Add the code emitting the metric.
2. Add the metrics description in the crate's metrics describer module, e.g.: [stages metrics describer](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/stages_metrics_describer.rs).
2. Add the metrics description in the crate's metrics describer module, e.g.: [stages metrics describer](https://github.com/paradigmxyz/reth/blob/main/crates/stages/src/metrics.rs).
3. Document the metric in this file.
#### Metric anatomy
@ -69,6 +69,17 @@ How the metrics are exposed to the end-user is determined by the CLI.
- `transaction_pool.invalid_transactions`: Number of invalid transactions
- `transaction_pool.removed_transactions`: Number of removed transactions from the pool
#### Network
- `network.connected_peers`: Number of currently connected peers
- `network.tracked_peers`: Number of peers known to the node
- `network.pending_session_failures`: Cumulative number of failures of pending sessions
- `network.closed_sessions`: Total number of sessions closed
- `network.incoming_connections`: Number of active incoming connections
- `network.outgoing_connections`: Number of active outgoing connections
- `network.total_incoming_connections`: Total number of incoming connections handled
- `network.total_outgoing_connections`: Total number of outgoing connections established
- `network.invalid_messages_received`: Number of invalid/malformed messages received from peers
[metrics]: https://docs.rs/metrics
[metrics.Key]: https://docs.rs/metrics/latest/metrics/struct.Key.html
[metrics.KeyName]: https://docs.rs/metrics/latest/metrics/struct.KeyName.html