From f475adf2434252b2550545370c51dc1b72905aed Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Wed, 21 Feb 2024 20:12:14 +0100 Subject: [PATCH] Clean up tx manager (#6681) --- crates/net/eth-wire/src/types/transactions.rs | 3 +- crates/net/network/src/metrics.rs | 22 +- .../net/network/src/transactions/constants.rs | 55 +++-- .../net/network/src/transactions/fetcher.rs | 48 ++-- crates/net/network/src/transactions/mod.rs | 206 ++++++------------ .../network/src/transactions/validation.rs | 14 +- 6 files changed, 174 insertions(+), 174 deletions(-) diff --git a/crates/net/eth-wire/src/types/transactions.rs b/crates/net/eth-wire/src/types/transactions.rs index a561e0ce1..fb6573dac 100644 --- a/crates/net/eth-wire/src/types/transactions.rs +++ b/crates/net/eth-wire/src/types/transactions.rs @@ -1,6 +1,7 @@ //! Implements the `GetPooledTransactions` and `PooledTransactions` message types. use alloy_rlp::{RlpDecodableWrapper, RlpEncodableWrapper}; +use derive_more::Deref; use reth_codecs::derive_arbitrary; use reth_primitives::{PooledTransactionsElement, TransactionSigned, B256}; @@ -33,7 +34,7 @@ where /// corresponds to a requested hash. Hashes may need to be re-requested if the bodies are not /// included in the response. // #[derive_arbitrary(rlp, 10)] -#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)] +#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default, Deref)] #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] pub struct PooledTransactions( /// The transaction bodies, each of which should correspond to a requested hash. diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index 3217a935d..cbb4d819f 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -57,14 +57,27 @@ pub struct SessionManagerMetrics { #[derive(Metrics)] #[metrics(scope = "network")] pub struct TransactionsManagerMetrics { + /* ================ BROADCAST ================ */ /// Total number of propagated transactions pub(crate) propagated_transactions: Counter, /// Total number of reported bad transactions pub(crate) reported_bad_transactions: Counter, - /// Total number of messages with already seen hashes - pub(crate) messages_with_already_seen_hashes: Counter, - /// Total number of messages with already seen full transactions - pub(crate) messages_with_already_seen_transactions: Counter, + + /* -- Freq txns already marked as seen by peer -- */ + /// Total number of messages from a peer, announcing transactions that have already been + /// marked as seen by that peer. + pub(crate) messages_with_hashes_already_seen_by_peer: Counter, + /// Total number of messages from a peer, with transaction that have already been marked as + /// seen by that peer. + pub(crate) messages_with_transactions_already_seen_by_peer: Counter, + /// Total number of occurrences, of a peer announcing a transaction that has already been + /// marked as seen by that peer. + pub(crate) occurrences_hash_already_seen_by_peer: Counter, + /// Total number of times a transaction is seen from a peer, that has already been marked as + /// seen by that peer. + pub(crate) occurrences_of_transaction_already_seen_by_peer: Counter, + + /* ================ POOL IMPORTS ================ */ /// Number of transactions about to be imported into the pool. pub(crate) pending_pool_imports: Gauge, /// Total number of bad imports. @@ -76,6 +89,7 @@ pub struct TransactionsManagerMetrics { pub(crate) capacity_pending_pool_imports: Counter, /// Currently active outgoing [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) /// requests. + /* ================ TX FETCHER ================ */ pub(crate) inflight_transaction_requests: Gauge, /// Number of inflight requests at which the /// [`TransactionFetcher`](crate::transactions::TransactionFetcher) is considered to be at diff --git a/crates/net/network/src/transactions/constants.rs b/crates/net/network/src/transactions/constants.rs index 9191a7182..47895ad8f 100644 --- a/crates/net/network/src/transactions/constants.rs +++ b/crates/net/network/src/transactions/constants.rs @@ -36,17 +36,10 @@ pub const SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE: usize = 2 * 1024 * pub mod tx_manager { use super::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE; - /// Default limit for number of transactions to keep track of for a single peer, for - /// transactions that the peer's pool and local pool have in common. + /// Default limit for number of transactions to keep track of for a single peer. /// /// Default is 10 KiB. - pub const DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER_AND_IN_POOL: usize = 10 * 1024; - - /// Default limit for the number of transactions to keep track of for a single peer, for - /// transactions that are in the peer's pool but maybe not in the local pool yet. - /// - /// Default is 10 KiB. - pub const DEFAULT_CAPACITY_CACHE_SENT_BY_PEER_AND_MAYBE_IN_POOL: usize = 10 * 1024; + pub const DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER: usize = 10 * 1024; /// Default maximum pending pool imports to tolerate. /// @@ -62,7 +55,10 @@ pub mod tx_manager { } pub mod tx_fetcher { - use crate::peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND}; + use crate::{ + peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND}, + transactions::fetcher::TransactionFetcherInfo, + }; use super::{ SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE, @@ -193,16 +189,14 @@ pub mod tx_fetcher { /// Default divisor of the max inflight request when calculating search breadth of the search /// for any idle peer to which to send a request filled with hashes pending fetch. The max - /// inflight requests is configured in - /// [`TransactionFetcherInfo`](crate::transactions::fetcher::TransactionFetcherInfo). + /// inflight requests is configured in [`TransactionFetcherInfo`]. /// /// Default is 3 requests. pub const DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER: usize = 3; /// Default divisor of the max inflight request when calculating search breadth of the search /// for the intersection of hashes announced by a peer and hashes pending fetch. The max - /// inflight requests is configured in - /// [`TransactionFetcherInfo`](crate::transactions::fetcher::TransactionFetcherInfo). + /// inflight requests is configured in [`TransactionFetcherInfo`]. /// /// Default is 2 requests. pub const DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION: usize = 2; @@ -238,4 +232,37 @@ pub mod tx_fetcher { /// /// Default is 120 bytes. pub const MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED: usize = 120; + + /// Marginal on the number of hashes to preallocate memory for in a + /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request, when packed + /// according to the [`Eth68`](reth_eth_wire::EthVersion::Eth68) protocol version. To make + /// sure enough memory is preallocated in most cases, it's sensible to use a margin. This, + /// since the capacity is calculated based on median value + /// [`MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED`]. There may otherwise be a noteworthy number of + /// cases where just 1 or 2 bytes too little memory is preallocated. + /// + /// Default is 8 hashes. + pub const DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST: usize = 8; + + /// Returns the approx number of transaction hashes that a + /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request will have capacity + /// for w.r.t. the [`Eth68`](reth_eth_wire::EthVersion::Eth68) protocol version. This is useful + /// for preallocating memory. + pub const fn approx_capacity_get_pooled_transactions_req_eth68( + info: &TransactionFetcherInfo, + ) -> usize { + let max_size_expected_response = + info.soft_limit_byte_size_pooled_transactions_response_on_pack_request; + + max_size_expected_response / MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED + + DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST + } + + /// Returns the approx number of transactions that a + /// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request will + /// have capacity for w.r.t. the [`Eth66`](reth_eth_wire::EthVersion::Eth66) protocol version. + /// This is useful for preallocating memory. + pub const fn approx_capacity_get_pooled_transactions_req_eth66() -> usize { + SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST + } } diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 71b1c952f..24fa4e0ca 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -2,11 +2,12 @@ use crate::{ cache::{LruCache, LruMap}, message::PeerRequest, }; + use derive_more::Constructor; use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; use pin_project::pin_project; use reth_eth_wire::{ - GetPooledTransactions, HandleAnnouncement, RequestTxHashes, ValidAnnouncementData, + EthVersion, GetPooledTransactions, HandleAnnouncement, RequestTxHashes, ValidAnnouncementData, }; use reth_interfaces::p2p::error::{RequestError, RequestResult}; use reth_primitives::{PeerId, PooledTransactionsElement, TxHash}; @@ -245,6 +246,7 @@ impl TransactionFetcher { surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash)); surplus_hashes.shrink_to_fit(); + hashes_to_request.shrink_to_fit(); surplus_hashes } @@ -260,16 +262,17 @@ impl TransactionFetcher { hashes_to_request: &mut RequestTxHashes, hashes_from_announcement: ValidAnnouncementData, ) -> RequestTxHashes { - let (mut request_hashes, _version) = hashes_from_announcement.into_request_hashes(); - if request_hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST { - *hashes_to_request = request_hashes; + let (mut hashes, _version) = hashes_from_announcement.into_request_hashes(); + if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST { + *hashes_to_request = hashes; + hashes_to_request.shrink_to_fit(); RequestTxHashes::default() } else { - let surplus_hashes = request_hashes - .split_off(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST - 1); - - *hashes_to_request = request_hashes; + let surplus_hashes = + hashes.split_off(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST - 1); + *hashes_to_request = hashes; + hashes_to_request.shrink_to_fit(); RequestTxHashes::new(surplus_hashes) } @@ -351,7 +354,9 @@ impl TransactionFetcher { has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool, metrics_increment_egress_peer_channel_full: impl FnOnce(), ) { - let mut hashes_to_request = RequestTxHashes::with_capacity(32); + let init_capacity_req = approx_capacity_get_pooled_transactions_req_eth68(&self.info); + let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req); + let is_session_active = |peer_id: &PeerId| peers.contains_key(peer_id); // budget to look for an idle peer before giving up @@ -381,7 +386,7 @@ impl TransactionFetcher { self.fill_request_from_hashes_pending_fetch( &mut hashes_to_request, - peer.seen_transactions.maybe_pending_transaction_hashes(), + &peer.seen_transactions, budget_fill_request, ); @@ -797,6 +802,19 @@ impl TransactionFetcher { Some(limit) } } + + /// Returns the approx number of transactions that a [`GetPooledTransactions`] request will + /// have capacity for w.r.t. the given version of the protocol. + pub fn approx_capacity_get_pooled_transactions_req( + &self, + announcement_version: EthVersion, + ) -> usize { + if announcement_version.is_eth68() { + approx_capacity_get_pooled_transactions_req_eth68(&self.info) + } else { + approx_capacity_get_pooled_transactions_req_eth66() + } + } } impl Stream for TransactionFetcher { @@ -825,7 +843,7 @@ impl Stream for TransactionFetcher { return match result { Ok(Ok(transactions)) => { // clear received hashes - let mut fetched = Vec::with_capacity(transactions.hashes().count()); + let mut fetched = Vec::with_capacity(transactions.len()); requested_hashes.retain(|requested_hash| { if transactions.hashes().any(|hash| hash == requested_hash) { // hash is now known, stop tracking @@ -834,6 +852,7 @@ impl Stream for TransactionFetcher { } true }); + fetched.shrink_to_fit(); self.remove_hashes_from_transaction_fetcher(fetched); @@ -984,7 +1003,7 @@ pub struct TransactionFetcherInfo { /// Soft limit for the byte size of the expected /// [`PooledTransactions`] response on packing a /// [`GetPooledTransactions`] request with hashes. - soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize, + pub(super) soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize, /// Soft limit for the byte size of a [`PooledTransactions`] /// response on assembling a [`GetPooledTransactions`] /// request. Spec'd at 2 MiB. @@ -1020,7 +1039,6 @@ mod test { use std::collections::HashSet; use derive_more::IntoIterator; - use reth_eth_wire::EthVersion; use reth_primitives::B256; use crate::transactions::tests::{default_cache, new_mock_session}; @@ -1142,11 +1160,11 @@ mod test { let (mut peer_1_data, mut peer_1_mock_session_rx) = new_mock_session(peer_1, EthVersion::Eth66); for hash in &seen_hashes { - peer_1_data.seen_transactions.seen_in_announcement(*hash); + peer_1_data.seen_transactions.insert(*hash); } let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66); for hash in &seen_hashes { - peer_2_data.seen_transactions.seen_in_announcement(*hash); + peer_2_data.seen_transactions.insert(*hash); } let mut peers = HashMap::new(); peers.insert(peer_1, peer_1_data); diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 708daa9af..4ab56b7fe 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -55,7 +55,6 @@ use reth_transaction_pool::{ PropagatedTransactions, TransactionPool, ValidPoolTransaction, }; use std::{ - cmp::max, collections::{hash_map::Entry, HashMap, HashSet}, num::NonZeroUsize, pin::Pin, @@ -338,8 +337,7 @@ where // we sent a response at which point we assume that the peer is aware of the // transactions - peer.seen_transactions - .extend_seen_by_peer_and_in_pool(transactions.iter().map(|tx| *tx.hash())); + peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.hash())); let resp = PooledTransactions(transactions); let _ = response.send(Ok(resp)); @@ -407,9 +405,7 @@ where // transaction lists, before deciding whether or not to send full transactions to the // peer. for tx in to_propagate.iter() { - if !peer.seen_transactions.has_seen_transaction(&tx.hash()) { - peer.seen_transactions.seen_by_peer_and_in_pool(tx.hash()); - + if peer.seen_transactions.insert(tx.hash()) { hashes.push(tx); // Do not send full 4844 transaction hashes to peers. @@ -495,9 +491,7 @@ where // Iterate through the transactions to propagate and fill the hashes and full transaction for tx in to_propagate { - if !peer.seen_transactions.has_seen_transaction(&tx.hash()) { - peer.seen_transactions.seen_by_peer_and_in_pool(tx.hash()); - + if peer.seen_transactions.insert(tx.hash()) { full_transactions.push(&tx); } } @@ -543,8 +537,7 @@ where let mut hashes = PooledTransactionsHashesBuilder::new(peer.version); for tx in to_propagate { - if peer.seen_transactions.has_seen_transaction(&tx.hash()) { - peer.seen_transactions.seen_by_peer_and_in_pool(tx.hash()); + if !peer.seen_transactions.insert(tx.hash()) { hashes.push(&tx); } } @@ -597,7 +590,34 @@ where return }; - let client_version = peer.client_version.clone(); + let client = peer.client_version.clone(); + + // keep track of the transactions the peer knows + let mut count_txns_already_seen_by_peer = 0; + for tx in msg.iter_hashes().copied() { + if !peer.seen_transactions.insert(tx) { + count_txns_already_seen_by_peer += 1; + } + } + if count_txns_already_seen_by_peer > 0 { + // this may occur if transactions are sent or announced to a peer, at the same time as + // the peer sends/announces those hashes to us. this is because, marking + // txns as seen by a peer is done optimistically upon sending them to the + // peer. + self.metrics.messages_with_hashes_already_seen_by_peer.increment(1); + self.metrics + .occurrences_hash_already_seen_by_peer + .increment(count_txns_already_seen_by_peer); + + trace!(target: "net::tx", + count_txns_already_seen_by_peer=%count_txns_already_seen_by_peer, + peer_id=format!("{peer_id:#}"), + client=?client, + "Peer sent hashes that have already been marked as seen by peer" + ); + + self.report_already_seen(peer_id); + } // 1. filter out known hashes // @@ -606,24 +626,7 @@ where // most hashes will be filtered out here since this the mempool protocol is a gossip // protocol, healthy peers will send many of the same hashes. // - let already_known_by_pool = self.pool.retain_unknown(&mut msg); - - // keep track of the transactions the peer knows - let mut num_already_seen = 0; - if let Some(pools_intersection) = already_known_by_pool { - for tx in pools_intersection.into_hashes() { - if peer.seen_transactions.has_seen_transaction(&tx) { - num_already_seen += 1; - } - peer.seen_transactions.seen_by_peer_and_in_pool(tx); - } - } - for tx in msg.iter_hashes().copied() { - if peer.seen_transactions.has_seen_transaction(&tx) { - num_already_seen += 1; - } - peer.seen_transactions.seen_in_announcement(tx); - } + let _already_known_by_pool = self.pool.retain_unknown(&mut msg); if msg.is_empty() { // nothing to request @@ -676,7 +679,7 @@ where |hash| bad_imports.contains(hash), &peer_id, |peer_id| self.peers.contains_key(&peer_id), - &client_version, + &client, ); if valid_announcement_data.is_empty() { @@ -689,7 +692,7 @@ where hashes_len=valid_announcement_data.iter().count(), hashes=?valid_announcement_data.keys().collect::>(), msg_version=%valid_announcement_data.msg_version(), - client_version=%client_version, + client_version=%client, "received previously unseen and pending hashes in announcement from peer" ); @@ -704,7 +707,7 @@ where peer_id=format!("{peer_id:#}"), hashes=?*hashes, msg_version=%msg_version, - client_version=%client_version, + client_version=%client, "buffering hashes announced by busy peer" ); @@ -713,21 +716,25 @@ where return } - // load message version before announcement data is destructed in packing + // load message version before announcement data type is destructed in packing let msg_version = valid_announcement_data.msg_version(); + // // demand recommended soft limit on response, however the peer may enforce an arbitrary // limit on the response (2MB) - let mut hashes_to_request = RequestTxHashes::with_capacity(valid_announcement_data.len()); + // + // request buffer is shrunk via call to pack request! + let init_capacity_req = + self.transaction_fetcher.approx_capacity_get_pooled_transactions_req(msg_version); + let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req); let surplus_hashes = self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data); - hashes_to_request.shrink_to_fit(); if !surplus_hashes.is_empty() { trace!(target: "net::tx", peer_id=format!("{peer_id:#}"), surplus_hashes=?*surplus_hashes, msg_version=%msg_version, - client_version=%client_version, + client_version=%client, "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes" ); @@ -738,7 +745,7 @@ where peer_id=format!("{peer_id:#}"), hashes=?*hashes_to_request, msg_version=%msg_version, - client_version=%client_version, + client_version=%client, "sending hashes in `GetPooledTransactions` request to peer's session" ); @@ -758,17 +765,10 @@ where peer_id=format!("{peer_id:#}"), failed_to_request_hashes=?*failed_to_request_hashes, conn_eth_version=%conn_eth_version, - client_version=%client_version, + client_version=%client, "sending `GetPooledTransactions` request to peer's session failed, buffering hashes" ); self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id)); - return - } - - if num_already_seen > 0 { - self.metrics.messages_with_already_seen_hashes.increment(1); - trace!(target: "net::tx", num_hashes=%num_already_seen, ?peer_id, client=?client_version, "Peer sent already seen hashes"); - self.report_already_seen(peer_id); } } @@ -831,12 +831,7 @@ where let hashes = self .peers .get(&peer_id) - .map(|peer| { - peer.seen_transactions - .iter_transaction_hashes() - .copied() - .collect::>() - }) + .map(|peer| peer.seen_transactions.iter().copied().collect::>()) .unwrap_or_default(); res.insert(peer_id, hashes); } @@ -882,7 +877,7 @@ where } for pooled_tx in pooled_txs.into_iter() { - peer.seen_transactions.seen_by_peer_and_in_pool(*pooled_tx.hash()); + peer.seen_transactions.insert(*pooled_tx.hash()); msg_builder.push_pooled(pooled_tx); } @@ -911,12 +906,11 @@ where // tracks the quality of the given transactions let mut has_bad_transactions = false; - let mut num_already_seen = 0; + let mut num_already_seen_by_peer = 0; if let Some(peer) = self.peers.get_mut(&peer_id) { - // pre-size to avoid reallocations, assuming ~50% of the transactions are new - let mut new_txs = Vec::with_capacity(max(1, transactions.len() / 2)); - + // pre-size to avoid reallocations + let mut new_txs = Vec::with_capacity(transactions.len()); for tx in transactions { // recover transaction let tx = match tx.try_into_ecrecovered() { @@ -936,15 +930,10 @@ where // track that the peer knows this transaction, but only if this is a new broadcast. // If we received the transactions as the response to our GetPooledTransactions // requests (based on received `NewPooledTransactionHashes`) then we already - // recorded the hashes in [`Self::on_new_pooled_transaction_hashes`] as `peer. - // seen_transactions.transactions_received_as_hash`. In that case, we don't move - // hashes from `peer.seen_transactions.transactions_received_as_hash` to - // `peer.seen_transactions.transactions_received_in_full_or_sent` here. The - // division of the `seen_transactions` list, just serves as a hint for tx fetcher - // of which hashes are missing. It's good enough without reallocating hashes. + // recorded the hashes in [`Self::on_new_pooled_transaction_hashes`]. - if source.is_broadcast() && peer.seen_transactions.has_seen_transaction(tx.hash()) { - num_already_seen += 1; + if source.is_broadcast() && !peer.seen_transactions.insert(*tx.hash()) { + num_already_seen_by_peer += 1; } match self.transactions_by_peers.entry(*tx.hash()) { @@ -971,6 +960,7 @@ where } } } + new_txs.shrink_to_fit(); // import new transactions as a batch to minimize lock contention on the underlying pool if !new_txs.is_empty() { @@ -1001,9 +991,12 @@ where self.pool_imports.push(import); } - if num_already_seen > 0 { - self.metrics.messages_with_already_seen_transactions.increment(1); - trace!(target: "net::tx", num_txs=%num_already_seen, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions"); + if num_already_seen_by_peer > 0 { + self.metrics.messages_with_transactions_already_seen_by_peer.increment(1); + self.metrics + .occurrences_of_transaction_already_seen_by_peer + .increment(num_already_seen_by_peer); + trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions"); } } @@ -1012,7 +1005,7 @@ where self.report_peer_bad_transactions(peer_id) } - if num_already_seen > 0 { + if num_already_seen_by_peer > 0 { self.report_already_seen(peer_id); } } @@ -1329,72 +1322,13 @@ impl TransactionSource { } } -/// Tracks transactions a peer has seen. -#[derive(Debug)] -struct TransactionsSeenByPeer { - /// Keeps track of transactions that we know the peer has seen because they were announced by - /// the peer. It's possible that these transactions are pending fetch. - transactions_received_as_hash: LruCache, - /// Keeps track of transactions that we know the peer has seen because they were received in - /// full from the peer or sent to the peer. - transactions_received_in_full_or_sent: LruCache, -} - -impl TransactionsSeenByPeer { - /// Returns `true` if peer has seen transaction. - fn has_seen_transaction(&self, hash: &TxHash) -> bool { - self.transactions_received_in_full_or_sent.contains(hash) || - self.transactions_received_as_hash.contains(hash) - } - - /// Inserts a transaction hash that has been seen in an announcement. - fn seen_in_announcement(&mut self, hash: TxHash) { - _ = self.transactions_received_as_hash.insert(hash); - } - - /// Inserts a hash of a transaction that has either been sent to the peer, or has been - /// received in full from the peer over broadcast. - fn seen_by_peer_and_in_pool(&mut self, hash: TxHash) { - _ = self.transactions_received_in_full_or_sent.insert(hash); - } - - /// Inserts a list of transactions that have either been sent to the peer, or have been - /// received in full from the peer over broadcast. - fn extend_seen_by_peer_and_in_pool(&mut self, hashes: impl IntoIterator) { - self.transactions_received_in_full_or_sent.extend(hashes) - } - - /// Returns an iterator over all transactions that the peer has seen. - fn iter_transaction_hashes(&self) -> impl Iterator { - self.transactions_received_as_hash - .iter() - .chain(self.transactions_received_in_full_or_sent.iter()) - } - - /// Returns an iterator over all transaction hashes that the peer has sent in an announcement. - fn maybe_pending_transaction_hashes(&self) -> &LruCache { - &self.transactions_received_as_hash - } -} - -impl Default for TransactionsSeenByPeer { - fn default() -> Self { - Self { - transactions_received_as_hash: LruCache::new( - NonZeroUsize::new(DEFAULT_CAPACITY_CACHE_SENT_BY_PEER_AND_MAYBE_IN_POOL).unwrap(), - ), - transactions_received_in_full_or_sent: LruCache::new( - NonZeroUsize::new(DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER_AND_IN_POOL).unwrap(), - ), - } - } -} - /// Tracks a single peer #[derive(Debug)] struct Peer { - /// Keeps track of transactions that we know the peer has seen. - seen_transactions: TransactionsSeenByPeer, + /// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in + /// the sense that transactions are preemptively marked as seen by peer when they are sent to + /// the peer. + seen_transactions: LruCache, /// A communication channel directly to the peer's session task. request_tx: PeerRequestSender, /// negotiated version of the session. @@ -1406,7 +1340,9 @@ struct Peer { impl Peer { fn new(request_tx: PeerRequestSender, version: EthVersion, client_version: Arc) -> Self { Self { - seen_transactions: TransactionsSeenByPeer::default(), + seen_transactions: LruCache::new( + NonZeroUsize::new(DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER).expect("infallible"), + ), request_tx, version, client_version, @@ -1917,8 +1853,8 @@ mod tests { let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version); // mark hashes as seen by peer so it can fish them out from the cache for hashes pending // fetch - peer_1.seen_transactions.seen_in_announcement(seen_hashes[0]); - peer_1.seen_transactions.seen_in_announcement(seen_hashes[1]); + peer_1.seen_transactions.insert(seen_hashes[0]); + peer_1.seen_transactions.insert(seen_hashes[1]); tx_manager.peers.insert(peer_id_1, peer_1); // hashes are seen and currently not inflight, with one fallback peer, and are buffered diff --git a/crates/net/network/src/transactions/validation.rs b/crates/net/network/src/transactions/validation.rs index 630abd8cc..807f74731 100644 --- a/crates/net/network/src/transactions/validation.rs +++ b/crates/net/network/src/transactions/validation.rs @@ -269,19 +269,19 @@ impl FilterAnnouncement for EthAnnouncementFilter { // // 3. checks if announcement is spam packed with duplicate hashes // - let mut deduped_data = HashMap::with_capacity(hashes.len()); - let original_len = hashes.len(); + let mut deduped_data = HashMap::with_capacity(hashes.len()); for hash in hashes.into_iter().rev() { if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) { deduped_data.insert(hash, Some((ty, size))); } } + deduped_data.shrink_to_fit(); + if deduped_data.len() != original_len { should_report_peer = true } - ( if should_report_peer { FilterOutcome::ReportPeer } else { FilterOutcome::Ok }, ValidAnnouncementData::new_eth68(deduped_data), @@ -300,7 +300,9 @@ impl FilterAnnouncement for EthAnnouncementFilter { let NewPooledTransactionHashes66(hashes) = msg; + // // 1. checks if the announcement is empty + // if hashes.is_empty() { debug!(target: "net::tx", network=%Self, @@ -309,14 +311,16 @@ impl FilterAnnouncement for EthAnnouncementFilter { return (FilterOutcome::ReportPeer, ValidAnnouncementData::empty_eth66()) } + // // 2. checks if announcement is spam packed with duplicate hashes - let mut deduped_data = HashMap::with_capacity(hashes.len()); - + // let original_len = hashes.len(); + let mut deduped_data = HashMap::with_capacity(hashes.len()); for hash in hashes.into_iter().rev() { deduped_data.insert(hash, None); } + deduped_data.shrink_to_fit(); ( if deduped_data.len() != original_len {