Tx fetcher metrics (#6951)

This commit is contained in:
Emilia Hane
2024-03-04 18:39:54 +01:00
committed by GitHub
parent 0d3b77f3c1
commit ebe72f7ae8
3 changed files with 141 additions and 87 deletions

View File

@ -59,10 +59,10 @@ pub struct NetworkMetrics {
/* -- Poll duration of items nested in `NetworkManager` future -- */
/// Time spent streaming messages sent over the [`NetworkHandle`](crate::NetworkHandle), which
/// can be cloned and shared via [`NetworkManager::handle`](crate::NetworkManager::handle), in
/// one call to poll the [`NetworkManager`](crate::NetworkManager) future.
/// one call to poll the [`NetworkManager`](crate::NetworkManager) future. At least
/// [`TransactionsManager`](crate::transactions::TransactionsManager) holds this handle.
///
/// Duration in seconds.
// todo: find out how many components hold the network handle.
pub(crate) duration_poll_network_handle: Gauge,
/// Time spent polling [`Swarm`](crate::swarm::Swarm), in one call to poll the
/// [`NetworkManager`](crate::NetworkManager) future.
@ -119,22 +119,6 @@ pub struct TransactionsManagerMetrics {
/// capacity. Note, this is not a limit to the number of inflight requests, but a health
/// measure.
pub(crate) capacity_pending_pool_imports: Counter,
/// Currently active outgoing [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions)
/// requests.
/* ================ TX FETCHER ================ */
pub(crate) inflight_transaction_requests: Gauge,
/// Number of inflight requests at which the
/// [`TransactionFetcher`](crate::transactions::TransactionFetcher) is considered to be at
/// capacity. Note, this is not a limit to the number of inflight requests, but a health
/// measure.
pub(crate) capacity_inflight_requests: Counter,
/// Hashes in currently active outgoing
/// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests.
pub(crate) hashes_inflight_transaction_requests: Gauge,
/// How often we failed to send a request to the peer because the channel was full.
pub(crate) egress_peer_channel_full: Counter,
/// Total number of hashes pending fetch.
pub(crate) hashes_pending_fetch: Gauge,
/* ================ POLL DURATION ================ */
@ -191,17 +175,53 @@ pub struct TransactionsManagerMetrics {
pub(crate) acc_duration_poll_commands: Gauge,
}
/// Metrics for the [`TransactionsManager`](crate::transactions::TransactionsManager).
#[derive(Metrics)]
#[metrics(scope = "network")]
pub struct TransactionFetcherMetrics {
/// Currently active outgoing [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions)
/// requests.
pub(crate) inflight_transaction_requests: Gauge,
/// Number of inflight requests at which the
/// [`TransactionFetcher`](crate::transactions::TransactionFetcher) is considered to be at
/// capacity. Note, this is not a limit to the number of inflight requests, but a health
/// measure.
pub(crate) capacity_inflight_requests: Counter,
/// Hashes in currently active outgoing
/// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests.
pub(crate) hashes_inflight_transaction_requests: Gauge,
/// How often we failed to send a request to the peer because the channel was full.
pub(crate) egress_peer_channel_full: Counter,
/// Total number of hashes pending fetch.
pub(crate) hashes_pending_fetch: Gauge,
/* ================ SEARCH DURATION ================ */
/// Time spent searching for an idle peer in call to
/// [`TransactionFetcher::find_any_idle_fallback_peer_for_any_pending_hash`](crate::transactions::TransactionFetcher::find_any_idle_fallback_peer_for_any_pending_hash).
///
/// Duration in seconds.
pub(crate) duration_find_idle_fallback_peer_for_any_pending_hash: Gauge,
/// Time spent searching for hashes pending fetch, announced by a given peer in
/// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`](crate::transactions::TransactionFetcher::fill_request_from_hashes_pending_fetch).
///
/// Duration in seconds.
pub(crate) duration_fill_request_from_hashes_pending_fetch: Gauge,
}
/// Measures the duration of executing the given code block. The duration is added to the given
/// accumulator value passed as a mutable reference.
#[macro_export]
macro_rules! duration_metered_exec {
($code:block, $acc:ident) => {
($code:expr, $acc:ident) => {{
let start = Instant::now();
$code;
let res = $code;
*$acc += start.elapsed();
};
res
}};
}
/// Metrics for Disconnection types

View File

@ -27,7 +27,9 @@
use crate::{
cache::{LruCache, LruMap},
duration_metered_exec,
message::PeerRequest,
metrics::TransactionFetcherMetrics,
transactions::{validation, PartiallyFilterMessage},
};
use derive_more::{Constructor, Deref};
@ -48,6 +50,7 @@ use std::{
num::NonZeroUsize,
pin::Pin,
task::{ready, Context, Poll},
time::{Duration, Instant},
};
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tracing::{debug, trace};
@ -87,18 +90,52 @@ pub struct TransactionFetcher {
pub(super) filter_valid_message: MessageFilter,
/// Info on capacity of the transaction fetcher.
pub info: TransactionFetcherInfo,
#[doc(hidden)]
metrics: TransactionFetcherMetrics,
}
// === impl TransactionFetcher ===
impl TransactionFetcher {
/// Updates metrics.
#[inline]
pub fn update_metrics(&self) {
let metrics = &self.metrics;
metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64;
let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64;
metrics.hashes_pending_fetch.set(hashes_pending_fetch);
metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
}
#[inline]
fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) {
let metrics = &self.metrics;
let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations;
metrics
.duration_find_idle_fallback_peer_for_any_pending_hash
.set(find_idle_peer.as_secs_f64());
metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64());
}
/// Sets up transaction fetcher with config
pub fn with_transaction_fetcher_config(mut self, config: &TransactionFetcherConfig) -> Self {
self.info.soft_limit_byte_size_pooled_transactions_response =
pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self {
let mut tx_fetcher = TransactionFetcher::default();
tx_fetcher.info.soft_limit_byte_size_pooled_transactions_response =
config.soft_limit_byte_size_pooled_transactions_response;
self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request =
tx_fetcher.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request =
config.soft_limit_byte_size_pooled_transactions_response_on_pack_request;
self
tx_fetcher
.metrics
.capacity_inflight_requests
.increment(tx_fetcher.info.max_inflight_requests as u64);
tx_fetcher
}
/// Removes the specified hashes from inflight tracking.
@ -384,24 +421,34 @@ impl TransactionFetcher {
&mut self,
peers: &HashMap<PeerId, PeerMetadata>,
has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
metrics_increment_egress_peer_channel_full: impl FnOnce(),
) {
let init_capacity_req = approx_capacity_get_pooled_transactions_req_eth68(&self.info);
let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req);
let is_session_active = |peer_id: &PeerId| peers.contains_key(peer_id);
let mut search_durations = TxFetcherSearchDurations::default();
// budget to look for an idle peer before giving up
let budget_find_idle_fallback_peer = self
.search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports);
let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
&mut hashes_to_request,
is_session_active,
budget_find_idle_fallback_peer,
) else {
// no peers are idle or budget is depleted
return
};
let acc = &mut search_durations.fill_request;
let peer_id = duration_metered_exec!(
{
let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
&mut hashes_to_request,
is_session_active,
budget_find_idle_fallback_peer,
) else {
// no peers are idle or budget is depleted
return
};
peer_id
},
acc
);
// peer should always exist since `is_session_active` already checked
let Some(peer) = peers.get(&peer_id) else { return };
let conn_eth_version = peer.version;
@ -415,15 +462,23 @@ impl TransactionFetcher {
&has_capacity_wrt_pending_pool_imports,
);
self.fill_request_from_hashes_pending_fetch(
&mut hashes_to_request,
&peer.seen_transactions,
budget_fill_request,
let acc = &mut search_durations.find_idle_peer;
duration_metered_exec!(
{
self.fill_request_from_hashes_pending_fetch(
&mut hashes_to_request,
&peer.seen_transactions,
budget_fill_request,
)
},
acc
);
// free unused memory
hashes_to_request.shrink_to_fit();
self.update_pending_fetch_cache_search_metrics(search_durations);
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?*hashes_to_request,
@ -432,11 +487,9 @@ impl TransactionFetcher {
);
// request the buffered missing transactions
if let Some(failed_to_request_hashes) = self.request_transactions_from_peer(
hashes_to_request,
peer,
metrics_increment_egress_peer_channel_full,
) {
if let Some(failed_to_request_hashes) =
self.request_transactions_from_peer(hashes_to_request, peer)
{
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
failed_to_request_hashes=?failed_to_request_hashes,
@ -575,7 +628,6 @@ impl TransactionFetcher {
&mut self,
new_announced_hashes: RequestTxHashes,
peer: &PeerMetadata,
metrics_increment_egress_peer_channel_full: impl FnOnce(),
) -> Option<RequestTxHashes> {
let peer_id: PeerId = peer.request_tx.peer_id;
let conn_eth_version = peer.version;
@ -642,7 +694,7 @@ impl TransactionFetcher {
// peer channel is full
match err {
TrySendError::Full(_) | TrySendError::Closed(_) => {
metrics_increment_egress_peer_channel_full();
self.metrics.egress_peer_channel_full.increment(1);
return Some(new_announced_hashes)
}
}
@ -1013,6 +1065,7 @@ impl Default for TransactionFetcher {
),
filter_valid_message: Default::default(),
info: TransactionFetcherInfo::default(),
metrics: Default::default(),
}
}
}
@ -1259,6 +1312,12 @@ impl Default for TransactionFetcherInfo {
}
}
#[derive(Debug, Default)]
struct TxFetcherSearchDurations {
find_idle_peer: Duration,
fill_request: Duration,
}
#[cfg(test)]
mod test {
use std::{collections::HashSet, str::FromStr};
@ -1425,7 +1484,7 @@ mod test {
// TEST
tx_fetcher.on_fetch_pending_hashes(&peers, |_| true, || ());
tx_fetcher.on_fetch_pending_hashes(&peers, |_| true);
// mock session of peer_1 receives request
let req = peer_1_mock_session_rx

View File

@ -255,7 +255,7 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
let (command_tx, command_rx) = mpsc::unbounded_channel();
let transaction_fetcher = TransactionFetcher::default().with_transaction_fetcher_config(
let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config(
&transactions_manager_config.transaction_fetcher_config,
);
@ -263,11 +263,7 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
// over the network
let pending = pool.pending_transactions_listener();
let pending_pool_imports_info = PendingPoolImportsInfo::default();
let metrics = TransactionsManagerMetrics::default();
metrics
.capacity_inflight_requests
.increment(transaction_fetcher.info.max_inflight_requests as u64);
metrics
.capacity_pending_pool_imports
.increment(pending_pool_imports_info.max_pending_pool_imports as u64);
@ -314,19 +310,6 @@ impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool + 'static,
{
#[inline]
fn update_fetch_metrics(&self) {
let tx_fetcher = &self.transaction_fetcher;
self.metrics.inflight_transaction_requests.set(tx_fetcher.inflight_requests.len() as f64);
let hashes_pending_fetch = tx_fetcher.hashes_pending_fetch.len() as f64;
let total_hashes = tx_fetcher.hashes_fetch_inflight_and_pending_fetch.len() as f64;
self.metrics.hashes_pending_fetch.set(hashes_pending_fetch);
self.metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch);
}
#[inline]
fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
let metrics = &self.metrics;
@ -804,11 +787,8 @@ where
//
// get handle to peer's session again, at this point we know it exists
let Some(peer) = self.peers.get_mut(&peer_id) else { return };
let metrics = &self.metrics;
if let Some(failed_to_request_hashes) =
self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer, || {
metrics.egress_peer_channel_full.increment(1)
})
self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
{
let conn_eth_version = peer.version;
@ -1138,17 +1118,6 @@ where
}
}
#[derive(Debug, Default)]
struct TxManagerPollDurations {
acc_network_events: Duration,
acc_pending_imports: Duration,
acc_tx_events: Duration,
acc_imported_txns: Duration,
acc_fetch_events: Duration,
acc_pending_fetch: Duration,
acc_cmds: Duration,
}
/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See
/// [`crate::NetworkManager`] for more context on the design pattern.
///
@ -1201,14 +1170,9 @@ where
let has_capacity_wrt_pending_pool_imports =
|divisor| info.has_capacity(max_pending_pool_imports / divisor);
let metrics = &this.metrics;
let metrics_increment_egress_peer_channel_full =
|| metrics.egress_peer_channel_full.increment(1);
this.transaction_fetcher.on_fetch_pending_hashes(
&this.peers,
has_capacity_wrt_pending_pool_imports,
metrics_increment_egress_peer_channel_full,
);
}
},
@ -1255,7 +1219,7 @@ where
let acc = &mut poll_durations.acc_fetch_events;
duration_metered_exec!(
{
this.update_fetch_metrics();
this.transaction_fetcher.update_metrics();
// Advance fetching transaction events (flush transaction fetcher and queue for
// import to pool).
@ -1289,7 +1253,7 @@ where
some_ready = true;
}
this.update_fetch_metrics();
this.transaction_fetcher.update_metrics();
},
acc
);
@ -1621,6 +1585,17 @@ impl Default for PendingPoolImportsInfo {
}
}
#[derive(Debug, Default)]
struct TxManagerPollDurations {
acc_network_events: Duration,
acc_pending_imports: Duration,
acc_tx_events: Duration,
acc_imported_txns: Duration,
acc_fetch_events: Duration,
acc_pending_fetch: Duration,
acc_cmds: Duration,
}
#[cfg(test)]
mod tests {
use super::*;
@ -2073,7 +2048,7 @@ mod tests {
assert_eq!(tx_fetcher.active_peers.len(), 0);
// sends request for buffered hashes to peer_1
tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true, || ());
tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true);
let tx_fetcher = &mut tx_manager.transaction_fetcher;