diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index 6a85d47c6..6bd4efad9 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -59,10 +59,10 @@ pub struct NetworkMetrics { /* -- Poll duration of items nested in `NetworkManager` future -- */ /// Time spent streaming messages sent over the [`NetworkHandle`](crate::NetworkHandle), which /// can be cloned and shared via [`NetworkManager::handle`](crate::NetworkManager::handle), in - /// one call to poll the [`NetworkManager`](crate::NetworkManager) future. + /// one call to poll the [`NetworkManager`](crate::NetworkManager) future. At least + /// [`TransactionsManager`](crate::transactions::TransactionsManager) holds this handle. /// /// Duration in seconds. - // todo: find out how many components hold the network handle. pub(crate) duration_poll_network_handle: Gauge, /// Time spent polling [`Swarm`](crate::swarm::Swarm), in one call to poll the /// [`NetworkManager`](crate::NetworkManager) future. @@ -119,22 +119,6 @@ pub struct TransactionsManagerMetrics { /// capacity. Note, this is not a limit to the number of inflight requests, but a health /// measure. pub(crate) capacity_pending_pool_imports: Counter, - /// Currently active outgoing [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) - /// requests. - /* ================ TX FETCHER ================ */ - pub(crate) inflight_transaction_requests: Gauge, - /// Number of inflight requests at which the - /// [`TransactionFetcher`](crate::transactions::TransactionFetcher) is considered to be at - /// capacity. Note, this is not a limit to the number of inflight requests, but a health - /// measure. - pub(crate) capacity_inflight_requests: Counter, - /// Hashes in currently active outgoing - /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests. - pub(crate) hashes_inflight_transaction_requests: Gauge, - /// How often we failed to send a request to the peer because the channel was full. - pub(crate) egress_peer_channel_full: Counter, - /// Total number of hashes pending fetch. - pub(crate) hashes_pending_fetch: Gauge, /* ================ POLL DURATION ================ */ @@ -191,17 +175,53 @@ pub struct TransactionsManagerMetrics { pub(crate) acc_duration_poll_commands: Gauge, } +/// Metrics for the [`TransactionsManager`](crate::transactions::TransactionsManager). +#[derive(Metrics)] +#[metrics(scope = "network")] +pub struct TransactionFetcherMetrics { + /// Currently active outgoing [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) + /// requests. + pub(crate) inflight_transaction_requests: Gauge, + /// Number of inflight requests at which the + /// [`TransactionFetcher`](crate::transactions::TransactionFetcher) is considered to be at + /// capacity. Note, this is not a limit to the number of inflight requests, but a health + /// measure. + pub(crate) capacity_inflight_requests: Counter, + /// Hashes in currently active outgoing + /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) requests. + pub(crate) hashes_inflight_transaction_requests: Gauge, + /// How often we failed to send a request to the peer because the channel was full. + pub(crate) egress_peer_channel_full: Counter, + /// Total number of hashes pending fetch. + pub(crate) hashes_pending_fetch: Gauge, + + /* ================ SEARCH DURATION ================ */ + /// Time spent searching for an idle peer in call to + /// [`TransactionFetcher::find_any_idle_fallback_peer_for_any_pending_hash`](crate::transactions::TransactionFetcher::find_any_idle_fallback_peer_for_any_pending_hash). + /// + /// Duration in seconds. + pub(crate) duration_find_idle_fallback_peer_for_any_pending_hash: Gauge, + + /// Time spent searching for hashes pending fetch, announced by a given peer in + /// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`](crate::transactions::TransactionFetcher::fill_request_from_hashes_pending_fetch). + /// + /// Duration in seconds. + pub(crate) duration_fill_request_from_hashes_pending_fetch: Gauge, +} + /// Measures the duration of executing the given code block. The duration is added to the given /// accumulator value passed as a mutable reference. #[macro_export] macro_rules! duration_metered_exec { - ($code:block, $acc:ident) => { + ($code:expr, $acc:ident) => {{ let start = Instant::now(); - $code; + let res = $code; *$acc += start.elapsed(); - }; + + res + }}; } /// Metrics for Disconnection types diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 7a42a73a4..c551eab11 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -27,7 +27,9 @@ use crate::{ cache::{LruCache, LruMap}, + duration_metered_exec, message::PeerRequest, + metrics::TransactionFetcherMetrics, transactions::{validation, PartiallyFilterMessage}, }; use derive_more::{Constructor, Deref}; @@ -48,6 +50,7 @@ use std::{ num::NonZeroUsize, pin::Pin, task::{ready, Context, Poll}, + time::{Duration, Instant}, }; use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError}; use tracing::{debug, trace}; @@ -87,18 +90,52 @@ pub struct TransactionFetcher { pub(super) filter_valid_message: MessageFilter, /// Info on capacity of the transaction fetcher. pub info: TransactionFetcherInfo, + #[doc(hidden)] + metrics: TransactionFetcherMetrics, } // === impl TransactionFetcher === impl TransactionFetcher { + /// Updates metrics. + #[inline] + pub fn update_metrics(&self) { + let metrics = &self.metrics; + + metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64); + + let hashes_pending_fetch = self.hashes_pending_fetch.len() as f64; + let total_hashes = self.hashes_fetch_inflight_and_pending_fetch.len() as f64; + + metrics.hashes_pending_fetch.set(hashes_pending_fetch); + metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch); + } + + #[inline] + fn update_pending_fetch_cache_search_metrics(&self, durations: TxFetcherSearchDurations) { + let metrics = &self.metrics; + + let TxFetcherSearchDurations { find_idle_peer, fill_request } = durations; + metrics + .duration_find_idle_fallback_peer_for_any_pending_hash + .set(find_idle_peer.as_secs_f64()); + metrics.duration_fill_request_from_hashes_pending_fetch.set(fill_request.as_secs_f64()); + } + /// Sets up transaction fetcher with config - pub fn with_transaction_fetcher_config(mut self, config: &TransactionFetcherConfig) -> Self { - self.info.soft_limit_byte_size_pooled_transactions_response = + pub fn with_transaction_fetcher_config(config: &TransactionFetcherConfig) -> Self { + let mut tx_fetcher = TransactionFetcher::default(); + + tx_fetcher.info.soft_limit_byte_size_pooled_transactions_response = config.soft_limit_byte_size_pooled_transactions_response; - self.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request = + tx_fetcher.info.soft_limit_byte_size_pooled_transactions_response_on_pack_request = config.soft_limit_byte_size_pooled_transactions_response_on_pack_request; - self + tx_fetcher + .metrics + .capacity_inflight_requests + .increment(tx_fetcher.info.max_inflight_requests as u64); + + tx_fetcher } /// Removes the specified hashes from inflight tracking. @@ -384,24 +421,34 @@ impl TransactionFetcher { &mut self, peers: &HashMap, has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool, - metrics_increment_egress_peer_channel_full: impl FnOnce(), ) { let init_capacity_req = approx_capacity_get_pooled_transactions_req_eth68(&self.info); let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req); let is_session_active = |peer_id: &PeerId| peers.contains_key(peer_id); + let mut search_durations = TxFetcherSearchDurations::default(); + // budget to look for an idle peer before giving up let budget_find_idle_fallback_peer = self .search_breadth_budget_find_idle_fallback_peer(&has_capacity_wrt_pending_pool_imports); - let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash( - &mut hashes_to_request, - is_session_active, - budget_find_idle_fallback_peer, - ) else { - // no peers are idle or budget is depleted - return - }; + let acc = &mut search_durations.fill_request; + let peer_id = duration_metered_exec!( + { + let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash( + &mut hashes_to_request, + is_session_active, + budget_find_idle_fallback_peer, + ) else { + // no peers are idle or budget is depleted + return + }; + + peer_id + }, + acc + ); + // peer should always exist since `is_session_active` already checked let Some(peer) = peers.get(&peer_id) else { return }; let conn_eth_version = peer.version; @@ -415,15 +462,23 @@ impl TransactionFetcher { &has_capacity_wrt_pending_pool_imports, ); - self.fill_request_from_hashes_pending_fetch( - &mut hashes_to_request, - &peer.seen_transactions, - budget_fill_request, + let acc = &mut search_durations.find_idle_peer; + duration_metered_exec!( + { + self.fill_request_from_hashes_pending_fetch( + &mut hashes_to_request, + &peer.seen_transactions, + budget_fill_request, + ) + }, + acc ); // free unused memory hashes_to_request.shrink_to_fit(); + self.update_pending_fetch_cache_search_metrics(search_durations); + trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), hashes=?*hashes_to_request, @@ -432,11 +487,9 @@ impl TransactionFetcher { ); // request the buffered missing transactions - if let Some(failed_to_request_hashes) = self.request_transactions_from_peer( - hashes_to_request, - peer, - metrics_increment_egress_peer_channel_full, - ) { + if let Some(failed_to_request_hashes) = + self.request_transactions_from_peer(hashes_to_request, peer) + { debug!(target: "net::tx", peer_id=format!("{peer_id:#}"), failed_to_request_hashes=?failed_to_request_hashes, @@ -575,7 +628,6 @@ impl TransactionFetcher { &mut self, new_announced_hashes: RequestTxHashes, peer: &PeerMetadata, - metrics_increment_egress_peer_channel_full: impl FnOnce(), ) -> Option { let peer_id: PeerId = peer.request_tx.peer_id; let conn_eth_version = peer.version; @@ -642,7 +694,7 @@ impl TransactionFetcher { // peer channel is full match err { TrySendError::Full(_) | TrySendError::Closed(_) => { - metrics_increment_egress_peer_channel_full(); + self.metrics.egress_peer_channel_full.increment(1); return Some(new_announced_hashes) } } @@ -1013,6 +1065,7 @@ impl Default for TransactionFetcher { ), filter_valid_message: Default::default(), info: TransactionFetcherInfo::default(), + metrics: Default::default(), } } } @@ -1259,6 +1312,12 @@ impl Default for TransactionFetcherInfo { } } +#[derive(Debug, Default)] +struct TxFetcherSearchDurations { + find_idle_peer: Duration, + fill_request: Duration, +} + #[cfg(test)] mod test { use std::{collections::HashSet, str::FromStr}; @@ -1425,7 +1484,7 @@ mod test { // TEST - tx_fetcher.on_fetch_pending_hashes(&peers, |_| true, || ()); + tx_fetcher.on_fetch_pending_hashes(&peers, |_| true); // mock session of peer_1 receives request let req = peer_1_mock_session_rx diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index b65eb58c0..b94269214 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -255,7 +255,7 @@ impl TransactionsManager { let (command_tx, command_rx) = mpsc::unbounded_channel(); - let transaction_fetcher = TransactionFetcher::default().with_transaction_fetcher_config( + let transaction_fetcher = TransactionFetcher::with_transaction_fetcher_config( &transactions_manager_config.transaction_fetcher_config, ); @@ -263,11 +263,7 @@ impl TransactionsManager { // over the network let pending = pool.pending_transactions_listener(); let pending_pool_imports_info = PendingPoolImportsInfo::default(); - let metrics = TransactionsManagerMetrics::default(); - metrics - .capacity_inflight_requests - .increment(transaction_fetcher.info.max_inflight_requests as u64); metrics .capacity_pending_pool_imports .increment(pending_pool_imports_info.max_pending_pool_imports as u64); @@ -314,19 +310,6 @@ impl TransactionsManager where Pool: TransactionPool + 'static, { - #[inline] - fn update_fetch_metrics(&self) { - let tx_fetcher = &self.transaction_fetcher; - - self.metrics.inflight_transaction_requests.set(tx_fetcher.inflight_requests.len() as f64); - - let hashes_pending_fetch = tx_fetcher.hashes_pending_fetch.len() as f64; - let total_hashes = tx_fetcher.hashes_fetch_inflight_and_pending_fetch.len() as f64; - - self.metrics.hashes_pending_fetch.set(hashes_pending_fetch); - self.metrics.hashes_inflight_transaction_requests.set(total_hashes - hashes_pending_fetch); - } - #[inline] fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) { let metrics = &self.metrics; @@ -804,11 +787,8 @@ where // // get handle to peer's session again, at this point we know it exists let Some(peer) = self.peers.get_mut(&peer_id) else { return }; - let metrics = &self.metrics; if let Some(failed_to_request_hashes) = - self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer, || { - metrics.egress_peer_channel_full.increment(1) - }) + self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer) { let conn_eth_version = peer.version; @@ -1138,17 +1118,6 @@ where } } -#[derive(Debug, Default)] -struct TxManagerPollDurations { - acc_network_events: Duration, - acc_pending_imports: Duration, - acc_tx_events: Duration, - acc_imported_txns: Duration, - acc_fetch_events: Duration, - acc_pending_fetch: Duration, - acc_cmds: Duration, -} - /// An endless future. Preemption ensure that future is non-blocking, nonetheless. See /// [`crate::NetworkManager`] for more context on the design pattern. /// @@ -1201,14 +1170,9 @@ where let has_capacity_wrt_pending_pool_imports = |divisor| info.has_capacity(max_pending_pool_imports / divisor); - let metrics = &this.metrics; - let metrics_increment_egress_peer_channel_full = - || metrics.egress_peer_channel_full.increment(1); - this.transaction_fetcher.on_fetch_pending_hashes( &this.peers, has_capacity_wrt_pending_pool_imports, - metrics_increment_egress_peer_channel_full, ); } }, @@ -1255,7 +1219,7 @@ where let acc = &mut poll_durations.acc_fetch_events; duration_metered_exec!( { - this.update_fetch_metrics(); + this.transaction_fetcher.update_metrics(); // Advance fetching transaction events (flush transaction fetcher and queue for // import to pool). @@ -1289,7 +1253,7 @@ where some_ready = true; } - this.update_fetch_metrics(); + this.transaction_fetcher.update_metrics(); }, acc ); @@ -1621,6 +1585,17 @@ impl Default for PendingPoolImportsInfo { } } +#[derive(Debug, Default)] +struct TxManagerPollDurations { + acc_network_events: Duration, + acc_pending_imports: Duration, + acc_tx_events: Duration, + acc_imported_txns: Duration, + acc_fetch_events: Duration, + acc_pending_fetch: Duration, + acc_cmds: Duration, +} + #[cfg(test)] mod tests { use super::*; @@ -2073,7 +2048,7 @@ mod tests { assert_eq!(tx_fetcher.active_peers.len(), 0); // sends request for buffered hashes to peer_1 - tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true, || ()); + tx_fetcher.on_fetch_pending_hashes(&tx_manager.peers, |_| true); let tx_fetcher = &mut tx_manager.transaction_fetcher;