feat: add metrics for tx channel (#3345)

This commit is contained in:
Matthias Seitz
2023-06-22 21:34:02 +02:00
committed by GitHub
parent 3390671cb6
commit a4c2f5f69c
6 changed files with 40 additions and 8 deletions

1
Cargo.lock generated
View File

@ -5374,6 +5374,7 @@ dependencies = [
name = "reth-metrics" name = "reth-metrics"
version = "0.1.0-alpha.1" version = "0.1.0-alpha.1"
dependencies = [ dependencies = [
"futures",
"metrics", "metrics",
"reth-metrics-derive", "reth-metrics-derive",
"tokio", "tokio",

View File

@ -17,6 +17,7 @@ metrics = "0.20.1"
# async # async
tokio = { workspace = true, features = ["full"], optional = true } tokio = { workspace = true, features = ["full"], optional = true }
futures = { workspace = true, optional = true }
[features] [features]
common = ["tokio"] common = ["tokio", "futures"]

View File

@ -1,9 +1,13 @@
//! Support for metering senders. Facilitates debugging by exposing metrics for number of messages //! Support for metering senders. Facilitates debugging by exposing metrics for number of messages
//! sent, number of errors, etc. //! sent, number of errors, etc.
use futures::Stream;
use metrics::Counter; use metrics::Counter;
use reth_metrics_derive::Metrics; use reth_metrics_derive::Metrics;
use std::task::{ready, Context, Poll}; use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::mpsc::{ use tokio::sync::mpsc::{
self, self,
error::{SendError, TryRecvError, TrySendError}, error::{SendError, TryRecvError, TrySendError},
@ -115,6 +119,14 @@ impl<T> UnboundedMeteredReceiver<T> {
} }
} }
impl<T> Stream for UnboundedMeteredReceiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
/// A wrapper type around [Sender](mpsc::Sender) that updates metrics on send. /// A wrapper type around [Sender](mpsc::Sender) that updates metrics on send.
#[derive(Debug)] #[derive(Debug)]
pub struct MeteredSender<T> { pub struct MeteredSender<T> {
@ -215,6 +227,14 @@ impl<T> MeteredReceiver<T> {
} }
} }
impl<T> Stream for MeteredReceiver<T> {
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}
/// Throughput metrics for [MeteredSender] /// Throughput metrics for [MeteredSender]
#[derive(Clone, Metrics)] #[derive(Clone, Metrics)]
#[metrics(dynamic = true)] #[metrics(dynamic = true)]

View File

@ -23,7 +23,7 @@ use crate::{
import::{BlockImport, BlockImportOutcome, BlockValidation}, import::{BlockImport, BlockImportOutcome, BlockValidation},
listener::ConnectionListener, listener::ConnectionListener,
message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender}, message::{NewBlockMessage, PeerMessage, PeerRequest, PeerRequestSender},
metrics::{DisconnectMetrics, NetworkMetrics}, metrics::{DisconnectMetrics, NetworkMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
network::{NetworkHandle, NetworkHandleMessage}, network::{NetworkHandle, NetworkHandleMessage},
peers::{PeersHandle, PeersManager}, peers::{PeersHandle, PeersManager},
session::SessionManager, session::SessionManager,
@ -38,6 +38,7 @@ use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage}, capability::{Capabilities, CapabilityMessage},
DisconnectReason, EthVersion, Status, DisconnectReason, EthVersion, Status,
}; };
use reth_metrics::common::mpsc::UnboundedMeteredSender;
use reth_net_common::bandwidth_meter::BandwidthMeter; use reth_net_common::bandwidth_meter::BandwidthMeter;
use reth_network_api::ReputationChangeKind; use reth_network_api::ReputationChangeKind;
use reth_primitives::{listener::EventListeners, NodeRecord, PeerId, H256}; use reth_primitives::{listener::EventListeners, NodeRecord, PeerId, H256};
@ -55,6 +56,7 @@ use std::{
use tokio::sync::mpsc::{self, error::TrySendError}; use tokio::sync::mpsc::{self, error::TrySendError};
use tokio_stream::wrappers::UnboundedReceiverStream; use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
/// Manages the _entire_ state of the network. /// Manages the _entire_ state of the network.
/// ///
/// This is an endless [`Future`] that consistently drives the state of the entire network forward. /// This is an endless [`Future`] that consistently drives the state of the entire network forward.
@ -97,7 +99,7 @@ pub struct NetworkManager<C> {
event_listeners: EventListeners<NetworkEvent>, event_listeners: EventListeners<NetworkEvent>,
/// Sender half to send events to the /// Sender half to send events to the
/// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured. /// [`TransactionsManager`](crate::transactions::TransactionsManager) task, if configured.
to_transactions_manager: Option<mpsc::UnboundedSender<NetworkTransactionEvent>>, to_transactions_manager: Option<UnboundedMeteredSender<NetworkTransactionEvent>>,
/// Sender half to send events to the /// Sender half to send events to the
/// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler) task, if configured. /// [`EthRequestHandler`](crate::eth_requests::EthRequestHandler) task, if configured.
/// ///
@ -128,7 +130,8 @@ impl<C> NetworkManager<C> {
/// Sets the dedicated channel for events indented for the /// Sets the dedicated channel for events indented for the
/// [`TransactionsManager`](crate::transactions::TransactionsManager). /// [`TransactionsManager`](crate::transactions::TransactionsManager).
pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent>) { pub fn set_transactions(&mut self, tx: mpsc::UnboundedSender<NetworkTransactionEvent>) {
self.to_transactions_manager = Some(tx); self.to_transactions_manager =
Some(UnboundedMeteredSender::new(tx, NETWORK_POOL_TRANSACTIONS_SCOPE));
} }
/// Sets the dedicated channel for events indented for the /// Sets the dedicated channel for events indented for the

View File

@ -4,6 +4,9 @@ use reth_metrics::{
Metrics, Metrics,
}; };
/// Scope for monitoring transactions sent from the manager to the tx manager
pub(crate) const NETWORK_POOL_TRANSACTIONS_SCOPE: &str = "network.pool.transactions";
/// Metrics for the entire network, handled by NetworkManager /// Metrics for the entire network, handled by NetworkManager
#[derive(Metrics)] #[derive(Metrics)]
#[metrics(scope = "network")] #[metrics(scope = "network")]

View File

@ -4,7 +4,7 @@ use crate::{
cache::LruCache, cache::LruCache,
manager::NetworkEvent, manager::NetworkEvent,
message::{PeerRequest, PeerRequestSender}, message::{PeerRequest, PeerRequestSender},
metrics::TransactionsManagerMetrics, metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
NetworkHandle, NetworkHandle,
}; };
use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
@ -13,6 +13,7 @@ use reth_eth_wire::{
NewPooledTransactionHashes68, PooledTransactions, Transactions, NewPooledTransactionHashes68, PooledTransactions, Transactions,
}; };
use reth_interfaces::{p2p::error::RequestResult, sync::SyncStateProvider}; use reth_interfaces::{p2p::error::RequestResult, sync::SyncStateProvider};
use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
use reth_network_api::{Peers, ReputationChangeKind}; use reth_network_api::{Peers, ReputationChangeKind};
use reth_primitives::{ use reth_primitives::{
FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, H256, FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, H256,
@ -109,7 +110,7 @@ pub struct TransactionsManager<Pool> {
/// Incoming commands from [`TransactionsHandle`]. /// Incoming commands from [`TransactionsHandle`].
pending_transactions: ReceiverStream<TxHash>, pending_transactions: ReceiverStream<TxHash>,
/// Incoming events from the [`NetworkManager`](crate::NetworkManager). /// Incoming events from the [`NetworkManager`](crate::NetworkManager).
transaction_events: UnboundedReceiverStream<NetworkTransactionEvent>, transaction_events: UnboundedMeteredReceiver<NetworkTransactionEvent>,
/// TransactionsManager metrics /// TransactionsManager metrics
metrics: TransactionsManagerMetrics, metrics: TransactionsManagerMetrics,
} }
@ -140,7 +141,10 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
command_tx, command_tx,
command_rx: UnboundedReceiverStream::new(command_rx), command_rx: UnboundedReceiverStream::new(command_rx),
pending_transactions: ReceiverStream::new(pending), pending_transactions: ReceiverStream::new(pending),
transaction_events: UnboundedReceiverStream::new(from_network), transaction_events: UnboundedMeteredReceiver::new(
from_network,
NETWORK_POOL_TRANSACTIONS_SCOPE,
),
metrics: Default::default(), metrics: Default::default(),
} }
} }