feat: track buffered outgoing messages (#12220)

This commit is contained in:
caglarkaya
2024-11-01 14:32:12 +03:00
committed by GitHub
parent f52186cc4d
commit 927be855ff
3 changed files with 35 additions and 3 deletions

View File

@ -85,6 +85,8 @@ pub struct SessionManagerMetrics {
pub(crate) total_dial_successes: Counter,
/// Number of dropped outgoing peer messages.
pub(crate) total_outgoing_peer_messages_dropped: Counter,
/// Number of queued outgoing messages
pub(crate) queued_outgoing_messages: Gauge,
}
/// Metrics for the [`TransactionsManager`](crate::transactions::TransactionsManager).

View File

@ -12,6 +12,7 @@ use std::{
};
use futures::{stream::Fuse, SinkExt, StreamExt};
use metrics::Gauge;
use reth_eth_wire::{
errors::{EthHandshakeError, EthStreamError, P2PStreamError},
message::{EthBroadcastMessage, RequestPair},
@ -87,7 +88,7 @@ pub(crate) struct ActiveSession {
/// All requests that were sent by the remote peer and we're waiting on an internal response
pub(crate) received_requests_from_remote: Vec<ReceivedRequest>,
/// Buffered messages that should be handled and sent to the peer.
pub(crate) queued_outgoing: VecDeque<OutgoingMessage>,
pub(crate) queued_outgoing: QueuedOutgoingMessages,
/// The maximum time we wait for a response from a peer.
pub(crate) internal_request_timeout: Arc<AtomicU64>,
/// Interval when to check for timed out requests.
@ -757,6 +758,32 @@ fn calculate_new_timeout(current_timeout: Duration, estimated_rtt: Duration) ->
smoothened_timeout.clamp(MINIMUM_TIMEOUT, MAXIMUM_TIMEOUT)
}
/// A helper struct that wraps the queue of outgoing messages and a metric to track their count
pub(crate) struct QueuedOutgoingMessages {
messages: VecDeque<OutgoingMessage>,
count: Gauge,
}
impl QueuedOutgoingMessages {
pub(crate) const fn new(metric: Gauge) -> Self {
Self { messages: VecDeque::new(), count: metric }
}
pub(crate) fn push_back(&mut self, message: OutgoingMessage) {
self.messages.push_back(message);
self.count.increment(1);
}
pub(crate) fn pop_front(&mut self) -> Option<OutgoingMessage> {
self.messages.pop_front().inspect(|_| self.count.decrement(1))
}
pub(crate) fn shrink_to_fit(&mut self) {
self.messages.shrink_to_fit();
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -882,7 +909,7 @@ mod tests {
internal_request_tx: ReceiverStream::new(messages_rx).fuse(),
inflight_requests: Default::default(),
conn,
queued_outgoing: Default::default(),
queued_outgoing: QueuedOutgoingMessages::new(Gauge::noop()),
received_requests_from_remote: Default::default(),
internal_request_timeout_interval: tokio::time::interval(
INITIAL_REQUEST_TIMEOUT,

View File

@ -5,6 +5,7 @@ mod conn;
mod counter;
mod handle;
use active::QueuedOutgoingMessages;
pub use conn::EthRlpxConnection;
pub use handle::{
ActiveSessionHandle, ActiveSessionMessage, PendingSessionEvent, PendingSessionHandle,
@ -495,7 +496,9 @@ impl SessionManager {
internal_request_tx: ReceiverStream::new(messages_rx).fuse(),
inflight_requests: Default::default(),
conn,
queued_outgoing: Default::default(),
queued_outgoing: QueuedOutgoingMessages::new(
self.metrics.queued_outgoing_messages.clone(),
),
received_requests_from_remote: Default::default(),
internal_request_timeout_interval: tokio::time::interval(
self.initial_internal_request_timeout,