mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(net): Metered senders (#726)
Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>
This commit is contained in:
9
Cargo.lock
generated
9
Cargo.lock
generated
@ -4260,6 +4260,15 @@ dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reth-metrics-common"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"metrics",
|
||||
"reth-metrics-derive",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reth-metrics-derive"
|
||||
version = "0.1.0"
|
||||
|
||||
@ -32,6 +32,7 @@ members = [
|
||||
"crates/tasks",
|
||||
"crates/transaction-pool",
|
||||
"crates/metrics/metrics-derive",
|
||||
"crates/metrics/common",
|
||||
"crates/cli/utils",
|
||||
]
|
||||
default-members = ["bin/reth"]
|
||||
|
||||
19
crates/metrics/common/Cargo.toml
Normal file
19
crates/metrics/common/Cargo.toml
Normal file
@ -0,0 +1,19 @@
|
||||
[package]
|
||||
name = "reth-metrics-common"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
repository = "https://github.com/paradigmxyz/reth"
|
||||
description = """
|
||||
Common metric types across the Reth codebase
|
||||
"""
|
||||
|
||||
[dependencies]
|
||||
# reth
|
||||
reth-metrics-derive = { path = "../../metrics/metrics-derive" }
|
||||
|
||||
# async
|
||||
tokio = { version = "1.21.2", features = ["full"] }
|
||||
|
||||
# metrics
|
||||
metrics = "0.20.1"
|
||||
10
crates/metrics/common/src/lib.rs
Normal file
10
crates/metrics/common/src/lib.rs
Normal file
@ -0,0 +1,10 @@
|
||||
#![warn(missing_docs, unreachable_pub)]
|
||||
#![deny(unused_must_use, rust_2018_idioms)]
|
||||
#![doc(test(
|
||||
no_crate_inject,
|
||||
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
|
||||
))]
|
||||
|
||||
//! Common metric types that can be used across the Reth codebase
|
||||
|
||||
pub mod metered_sender;
|
||||
64
crates/metrics/common/src/metered_sender.rs
Normal file
64
crates/metrics/common/src/metered_sender.rs
Normal file
@ -0,0 +1,64 @@
|
||||
//! Support for metering senders. Facilitates debugging by exposing metrics for number of messages
|
||||
//! sent, number of errors, etc.
|
||||
|
||||
use metrics::Counter;
|
||||
use reth_metrics_derive::Metrics;
|
||||
use tokio::sync::mpsc::{
|
||||
error::{SendError, TrySendError},
|
||||
Sender,
|
||||
};
|
||||
|
||||
/// Network throughput metrics
|
||||
#[derive(Metrics)]
|
||||
#[metrics(dynamic = true)]
|
||||
struct MeteredSenderMetrics {
|
||||
/// Number of messages sent
|
||||
messages_sent: Counter,
|
||||
/// Number of failed message deliveries
|
||||
send_errors: Counter,
|
||||
}
|
||||
|
||||
/// Manages updating the network throughput metrics for a metered stream
|
||||
pub struct MeteredSender<T> {
|
||||
/// The [`Sender`] that this wraps around
|
||||
sender: Sender<T>,
|
||||
/// Holds the gauges for inbound and outbound throughput
|
||||
metrics: MeteredSenderMetrics,
|
||||
}
|
||||
|
||||
impl<T> MeteredSender<T> {
|
||||
/// Creates a new [`MeteredSender`] wrapping around the provided [`Sender`]
|
||||
pub fn new(sender: Sender<T>, scope: &'static str) -> Self {
|
||||
Self { sender, metrics: MeteredSenderMetrics::new(scope) }
|
||||
}
|
||||
|
||||
/// Calls the underlying [`Sender`]'s `try_send`, incrementing the appropriate
|
||||
/// metrics depending on the result.
|
||||
pub fn try_send(&mut self, message: T) -> Result<(), TrySendError<T>> {
|
||||
match self.sender.try_send(message) {
|
||||
Ok(()) => {
|
||||
self.metrics.messages_sent.increment(1);
|
||||
Ok(())
|
||||
}
|
||||
Err(error) => {
|
||||
self.metrics.send_errors.increment(1);
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Calls the underlying [`Sender`]'s `send`, incrementing the appropriate
|
||||
/// metrics depending on the result.
|
||||
pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
|
||||
match self.sender.send(value).await {
|
||||
Ok(()) => {
|
||||
self.metrics.messages_sent.increment(1);
|
||||
Ok(())
|
||||
}
|
||||
Err(error) => {
|
||||
self.metrics.send_errors.increment(1);
|
||||
Err(error)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -14,4 +14,4 @@ reth-primitives = { path = "../../primitives" }
|
||||
|
||||
# async
|
||||
pin-project = "1.0"
|
||||
tokio = { version = "1.21.2", features = ["full"] }
|
||||
tokio = { version = "1.21.2", features = ["full"] }
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
//! Support for monitoring bandwidth. Takes heavy inspiration from https://github.com/libp2p/rust-libp2p/blob/master/src/bandwidth.rs
|
||||
//! Support for metering bandwidth. Takes heavy inspiration from https://github.com/libp2p/rust-libp2p/blob/master/src/bandwidth.rs
|
||||
|
||||
// Copyright 2019 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
@ -90,7 +90,7 @@ pub struct MeteredStream<S> {
|
||||
/// The stream this instruments
|
||||
#[pin]
|
||||
inner: S,
|
||||
/// The [`BandwidthMeter`] struct this uses to monitor bandwidth
|
||||
/// The [`BandwidthMeter`] struct this uses to meter bandwidth
|
||||
meter: BandwidthMeter,
|
||||
}
|
||||
|
||||
@ -209,13 +209,13 @@ mod tests {
|
||||
// Taken in large part from https://docs.rs/tokio/latest/tokio/io/struct.DuplexStream.html#example
|
||||
|
||||
let (client, server) = duplex(64);
|
||||
let mut monitored_client = MeteredStream::new(client);
|
||||
let mut monitored_server = MeteredStream::new(server);
|
||||
let mut metered_client = MeteredStream::new(client);
|
||||
let mut metered_server = MeteredStream::new(server);
|
||||
|
||||
duplex_stream_ping_pong(&mut monitored_client, &mut monitored_server).await;
|
||||
duplex_stream_ping_pong(&mut metered_client, &mut metered_server).await;
|
||||
|
||||
assert_bandwidth_counts(monitored_client.get_bandwidth_meter(), 4, 4);
|
||||
assert_bandwidth_counts(monitored_server.get_bandwidth_meter(), 4, 4);
|
||||
assert_bandwidth_counts(metered_client.get_bandwidth_meter(), 4, 4);
|
||||
assert_bandwidth_counts(metered_server.get_bandwidth_meter(), 4, 4);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@ -252,18 +252,18 @@ mod tests {
|
||||
let shared_client_bandwidth_meter = BandwidthMeter::default();
|
||||
let shared_server_bandwidth_meter = BandwidthMeter::default();
|
||||
|
||||
let mut monitored_client_1 =
|
||||
let mut metered_client_1 =
|
||||
MeteredStream::new_with_meter(client_1, shared_client_bandwidth_meter.clone());
|
||||
let mut monitored_server_1 =
|
||||
let mut metered_server_1 =
|
||||
MeteredStream::new_with_meter(server_1, shared_server_bandwidth_meter.clone());
|
||||
|
||||
let mut monitored_client_2 =
|
||||
let mut metered_client_2 =
|
||||
MeteredStream::new_with_meter(client_2, shared_client_bandwidth_meter.clone());
|
||||
let mut monitored_server_2 =
|
||||
let mut metered_server_2 =
|
||||
MeteredStream::new_with_meter(server_2, shared_server_bandwidth_meter.clone());
|
||||
|
||||
duplex_stream_ping_pong(&mut monitored_client_1, &mut monitored_server_1).await;
|
||||
duplex_stream_ping_pong(&mut monitored_client_2, &mut monitored_server_2).await;
|
||||
duplex_stream_ping_pong(&mut metered_client_1, &mut metered_server_1).await;
|
||||
duplex_stream_ping_pong(&mut metered_client_2, &mut metered_server_2).await;
|
||||
|
||||
assert_bandwidth_counts(&shared_client_bandwidth_meter, 8, 8);
|
||||
assert_bandwidth_counts(&shared_server_bandwidth_meter, 8, 8);
|
||||
|
||||
Reference in New Issue
Block a user