diff --git a/Cargo.lock b/Cargo.lock index 5da9af019..211f8d8e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4396,6 +4396,7 @@ dependencies = [ "reth-ecies", "reth-eth-wire", "reth-interfaces", + "reth-metrics-common", "reth-metrics-derive", "reth-net-common", "reth-network", diff --git a/crates/metrics/common/src/metered_sender.rs b/crates/metrics/common/src/metered_sender.rs index d7bb63247..9fa57dcac 100644 --- a/crates/metrics/common/src/metered_sender.rs +++ b/crates/metrics/common/src/metered_sender.rs @@ -9,7 +9,7 @@ use tokio::sync::mpsc::{ }; /// Network throughput metrics -#[derive(Metrics)] +#[derive(Clone, Metrics)] #[metrics(dynamic = true)] struct MeteredSenderMetrics { /// Number of messages sent @@ -19,6 +19,7 @@ struct MeteredSenderMetrics { } /// Manages updating the network throughput metrics for a metered stream +#[derive(Debug)] pub struct MeteredSender { /// The [`Sender`] that this wraps around sender: Sender, @@ -34,7 +35,7 @@ impl MeteredSender { /// Calls the underlying [`Sender`]'s `try_send`, incrementing the appropriate /// metrics depending on the result. - pub fn try_send(&mut self, message: T) -> Result<(), TrySendError> { + pub fn try_send(&self, message: T) -> Result<(), TrySendError> { match self.sender.try_send(message) { Ok(()) => { self.metrics.messages_sent.increment(1); @@ -62,3 +63,9 @@ impl MeteredSender { } } } + +impl Clone for MeteredSender { + fn clone(&self) -> Self { + Self { sender: self.sender.clone(), metrics: self.metrics.clone() } + } +} diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 4ab788e80..b7c108562 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -30,6 +30,7 @@ reth-rlp-derive = { path = "../../rlp/rlp-derive" } reth-tasks = { path = "../../tasks" } reth-transaction-pool = { path = "../../transaction-pool" } reth-provider = { path = "../../storage/provider"} +reth-metrics-common = { path = "../../metrics/common" } # async/futures futures = "0.3" diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 9088cd062..71d91c047 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -19,6 +19,7 @@ use reth_eth_wire::{ DisconnectReason, EthMessage, EthStream, P2PStream, }; use reth_interfaces::p2p::error::RequestError; +use reth_metrics_common::metered_sender::MeteredSender; use reth_net_common::bandwidth_meter::MeteredStream; use reth_primitives::PeerId; use std::{ @@ -74,7 +75,7 @@ pub(crate) struct ActiveSession { /// Incoming commands from the manager pub(crate) commands_rx: ReceiverStream, /// Sink to send messages to the [`SessionManager`](super::SessionManager). - pub(crate) to_session: mpsc::Sender, + pub(crate) to_session: MeteredSender, /// Incoming request to send to delegate to the remote peer. pub(crate) request_tx: Fuse>, /// All requests sent to the remote peer we're waiting on a response @@ -744,7 +745,10 @@ mod tests { remote_capabilities: Arc::clone(&capabilities), session_id, commands_rx: ReceiverStream::new(commands_rx), - to_session: self.active_session_tx.clone(), + to_session: MeteredSender::new( + self.active_session_tx.clone(), + "network_active_session", + ), request_tx: ReceiverStream::new(messages_rx).fuse(), inflight_requests: Default::default(), conn, diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index beaac4b35..eb179f09c 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -19,6 +19,7 @@ use reth_eth_wire::{ errors::EthStreamError, DisconnectReason, HelloMessage, Status, UnauthedEthStream, UnauthedP2PStream, }; +use reth_metrics_common::metered_sender::MeteredSender; use reth_net_common::bandwidth_meter::{BandwidthMeter, MeteredStream}; use reth_primitives::{ForkFilter, ForkId, ForkTransition, PeerId, H256, U256}; use reth_tasks::TaskExecutor; @@ -91,7 +92,7 @@ pub(crate) struct SessionManager { /// /// When active session state is reached, the corresponding [`ActiveSessionHandle`] will get a /// clone of this sender half. - active_session_tx: mpsc::Sender, + active_session_tx: MeteredSender, /// Receiver half that listens for [`ActiveSessionMessage`] produced by pending sessions. active_session_rx: ReceiverStream, /// Used to measure inbound & outbound bandwidth across all managed streams @@ -129,7 +130,7 @@ impl SessionManager { active_sessions: Default::default(), pending_sessions_tx, pending_session_rx: ReceiverStream::new(pending_sessions_rx), - active_session_tx, + active_session_tx: MeteredSender::new(active_session_tx, "network_active_session"), active_session_rx: ReceiverStream::new(active_session_rx), bandwidth_meter, }