diff --git a/Cargo.lock b/Cargo.lock index b474f2509..96ccb4205 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5374,6 +5374,7 @@ dependencies = [ name = "reth-metrics" version = "0.1.0-alpha.1" dependencies = [ + "futures", "metrics", "reth-metrics-derive", "tokio", diff --git a/crates/metrics/Cargo.toml b/crates/metrics/Cargo.toml index 75719a7aa..adbd26cb8 100644 --- a/crates/metrics/Cargo.toml +++ b/crates/metrics/Cargo.toml @@ -17,6 +17,7 @@ metrics = "0.20.1" # async tokio = { workspace = true, features = ["full"], optional = true } +futures = { workspace = true, optional = true } [features] -common = ["tokio"] +common = ["tokio", "futures"] diff --git a/crates/metrics/src/common/mpsc.rs b/crates/metrics/src/common/mpsc.rs index fe52d8857..0f4688e21 100644 --- a/crates/metrics/src/common/mpsc.rs +++ b/crates/metrics/src/common/mpsc.rs @@ -1,9 +1,13 @@ //! Support for metering senders. Facilitates debugging by exposing metrics for number of messages //! sent, number of errors, etc. +use futures::Stream; use metrics::Counter; use reth_metrics_derive::Metrics; -use std::task::{ready, Context, Poll}; +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; use tokio::sync::mpsc::{ self, error::{SendError, TryRecvError, TrySendError}, @@ -115,6 +119,14 @@ impl UnboundedMeteredReceiver { } } +impl Stream for UnboundedMeteredReceiver { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.receiver.poll_recv(cx) + } +} + /// A wrapper type around [Sender](mpsc::Sender) that updates metrics on send. #[derive(Debug)] pub struct MeteredSender { @@ -215,6 +227,14 @@ impl MeteredReceiver { } } +impl Stream for MeteredReceiver { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.receiver.poll_recv(cx) + } +} + /// Throughput metrics for [MeteredSender] #[derive(Clone, Metrics)] #[metrics(dynamic = true)] diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 4c5a03873..8a70a9507 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -23,7 +23,7 @@ use crate::{ import::{BlockImport, BlockImportOutcome, BlockValidation}, listener::ConnectionListener, message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender}, - metrics::{DisconnectMetrics, NetworkMetrics}, + metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, network::{NetworkHandle, NetworkHandleMessage}, peers::{PeersHandle, PeersManager}, session::SessionManager, @@ -38,6 +38,7 @@ use reth_eth_wire::{ capability::{Capabilities, CapabilityMessage}, DisconnectReason, EthVersion, Status, }; +use reth_metrics::common::mpsc::UnboundedMeteredSender; use reth_net_common::bandwidth_meter::BandwidthMeter; use reth_network_api::ReputationChangeKind; use reth_primitives::{listener::EventListeners, NodeRecord, PeerId, H256}; @@ -55,6 +56,7 @@ use std::{ use tokio::sync::mpsc::{self, error::TrySendError}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, info, trace, warn}; + /// Manages the _entire_ state of the network. /// /// This is an endless [`Future`] that consistently drives the state of the entire network forward. @@ -97,7 +99,7 @@ pub struct NetworkManager { event_listeners: EventListeners, /// Sender half to send events to the /// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured. - to_transactions_manager: Option>, + to_transactions_manager: Option>, /// Sender half to send events to the /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler) task, if configured. /// @@ -128,7 +130,8 @@ impl NetworkManager { /// Sets the dedicated channel for events indented for the /// [`TransactionsManager`](crate::transactions::TransactionsManager). pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender) { - self.to_transactions_manager = Some(tx); + self.to_transactions_manager = + Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE)); } /// Sets the dedicated channel for events indented for the diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index bea6d8cd4..d7969da80 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -4,6 +4,9 @@ use reth_metrics::{ Metrics, }; +/// Scope for monitoring transactions sent from the manager to the tx manager +pub(crate) const NETWORK_POOL_TRANSACTIONS_SCOPE: &str = "network.pool.transactions"; + /// Metrics for the entire network, handled by NetworkManager #[derive(Metrics)] #[metrics(scope = "network")] diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index b15d8a23a..c5d88c3e6 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -4,7 +4,7 @@ use crate::{ cache::LruCache, manager::NetworkEvent, message::{PeerRequest, PeerRequestSender}, - metrics::TransactionsManagerMetrics, + metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE}, NetworkHandle, }; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; @@ -13,6 +13,7 @@ use reth_eth_wire::{ NewPooledTransactionHashes68, PooledTransactions, Transactions, }; use reth_interfaces::{p2p::error::RequestResult, sync::SyncStateProvider}; +use reth_metrics::common::mpsc::UnboundedMeteredReceiver; use reth_network_api::{Peers, ReputationChangeKind}; use reth_primitives::{ FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, H256, @@ -109,7 +110,7 @@ pub struct TransactionsManager { /// Incoming commands from [`TransactionsHandle`]. pending_transactions: ReceiverStream, /// Incoming events from the [`NetworkManager`](crate::NetworkManager). - transaction_events: UnboundedReceiverStream, + transaction_events: UnboundedMeteredReceiver, /// TransactionsManager metrics metrics: TransactionsManagerMetrics, } @@ -140,7 +141,10 @@ impl TransactionsManager { command_tx, command_rx: UnboundedReceiverStream::new(command_rx), pending_transactions: ReceiverStream::new(pending), - transaction_events: UnboundedReceiverStream::new(from_network), + transaction_events: UnboundedMeteredReceiver::new( + from_network, + NETWORK_POOL_TRANSACTIONS_SCOPE, + ), metrics: Default::default(), } }