diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs index 71942029e..a43cd900b 100644 --- a/crates/net/eth-wire/src/types/broadcast.rs +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -7,7 +7,9 @@ use alloy_rlp::{ use derive_more::{Constructor, Deref, DerefMut, IntoIterator}; use reth_codecs::derive_arbitrary; -use reth_primitives::{Block, Bytes, TransactionSigned, TxHash, B256, U128}; +use reth_primitives::{ + Block, Bytes, PooledTransactionsElement, TransactionSigned, TxHash, B256, U128, +}; use std::{collections::HashMap, mem, sync::Arc}; @@ -439,10 +441,9 @@ impl Decodable for NewPooledTransactionHashes68 { } } -/// Interface for handling announcement data in filters in the transaction manager and transaction -/// pool. Note: this trait may disappear when distinction between eth66 and eth68 hashes is more -/// clearly defined, see . -pub trait HandleAnnouncement { +/// Interface for handling mempool message data. Used in various filters in pipelines in +/// `TransactionsManager` and in queries to `TransactionPool`. +pub trait HandleMempoolData { /// The announcement contains no entries. fn is_empty(&self) -> bool; @@ -452,13 +453,16 @@ pub trait HandleAnnouncement { /// Retain only entries for which the hash in the entry satisfies a given predicate, return /// the rest. fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self; +} +/// Extension of [`HandleMempoolData`] interface, for mempool messages that are versioned. +pub trait HandleVersionedMempoolData { /// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or /// [`Eth68`](EthVersion::Eth68). fn msg_version(&self) -> EthVersion; } -impl HandleAnnouncement for NewPooledTransactionHashes { +impl HandleMempoolData for NewPooledTransactionHashes { fn is_empty(&self) -> bool { self.is_empty() } @@ -473,13 +477,15 @@ impl HandleAnnouncement for NewPooledTransactionHashes { NewPooledTransactionHashes::Eth68(msg) => Self::Eth68(msg.retain_by_hash(f)), } } +} +impl HandleVersionedMempoolData for NewPooledTransactionHashes { fn msg_version(&self) -> EthVersion { self.version() } } -impl HandleAnnouncement for NewPooledTransactionHashes68 { +impl HandleMempoolData for NewPooledTransactionHashes68 { fn is_empty(&self) -> bool { self.hashes.is_empty() } @@ -511,13 +517,15 @@ impl HandleAnnouncement for NewPooledTransactionHashes68 { Self { hashes: removed_hashes, types: removed_types, sizes: removed_sizes } } +} +impl HandleVersionedMempoolData for NewPooledTransactionHashes68 { fn msg_version(&self) -> EthVersion { EthVersion::Eth68 } } -impl HandleAnnouncement for NewPooledTransactionHashes66 { +impl HandleMempoolData for NewPooledTransactionHashes66 { fn is_empty(&self) -> bool { self.0.is_empty() } @@ -543,7 +551,9 @@ impl HandleAnnouncement for NewPooledTransactionHashes66 { Self(removed_hashes) } +} +impl HandleVersionedMempoolData for NewPooledTransactionHashes66 { fn msg_version(&self) -> EthVersion { EthVersion::Eth66 } @@ -601,7 +611,7 @@ impl ValidAnnouncementData { } } -impl HandleAnnouncement for ValidAnnouncementData { +impl HandleMempoolData for ValidAnnouncementData { fn is_empty(&self) -> bool { self.data.is_empty() } @@ -619,7 +629,9 @@ impl HandleAnnouncement for ValidAnnouncementData { ValidAnnouncementData::new(rest, self.version) } +} +impl HandleVersionedMempoolData for ValidAnnouncementData { fn msg_version(&self) -> EthVersion { self.version } @@ -658,6 +670,34 @@ impl FromIterator<(TxHash, Option<(u8, usize)>)> for RequestTxHashes { } } +impl HandleMempoolData for Vec { + fn is_empty(&self) -> bool { + self.is_empty() + } + + fn len(&self) -> usize { + self.len() + } + + fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self { + let mut indices_to_remove = vec![]; + for (i, tx) in self.iter().enumerate() { + if !f(tx.hash()) { + indices_to_remove.push(i); + } + } + + let mut removed_txns = Vec::with_capacity(indices_to_remove.len()); + + for index in indices_to_remove.into_iter().rev() { + let hash = self.remove(index); + removed_txns.push(hash); + } + + removed_txns + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/net/network/src/metrics.rs b/crates/net/network/src/metrics.rs index cbb4d819f..880a302de 100644 --- a/crates/net/network/src/metrics.rs +++ b/crates/net/network/src/metrics.rs @@ -77,6 +77,12 @@ pub struct TransactionsManagerMetrics { /// seen by that peer. pub(crate) occurrences_of_transaction_already_seen_by_peer: Counter, + /* -- Freq txns already in pool -- */ + /// Total number of times a hash is announced that is already in the local pool. + pub(crate) occurrences_hashes_already_in_pool: Counter, + /// Total number of times a transaction is sent that is already in the local pool. + pub(crate) occurrences_transactions_already_in_pool: Counter, + /* ================ POOL IMPORTS ================ */ /// Number of transactions about to be imported into the pool. pub(crate) pending_pool_imports: Gauge, diff --git a/crates/net/network/src/transactions/fetcher.rs b/crates/net/network/src/transactions/fetcher.rs index 24fa4e0ca..18038405c 100644 --- a/crates/net/network/src/transactions/fetcher.rs +++ b/crates/net/network/src/transactions/fetcher.rs @@ -7,7 +7,8 @@ use derive_more::Constructor; use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt}; use pin_project::pin_project; use reth_eth_wire::{ - EthVersion, GetPooledTransactions, HandleAnnouncement, RequestTxHashes, ValidAnnouncementData, + EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData, + RequestTxHashes, ValidAnnouncementData, }; use reth_interfaces::p2p::error::{RequestError, RequestResult}; use reth_primitives::{PeerId, PooledTransactionsElement, TxHash}; @@ -192,7 +193,7 @@ impl TransactionFetcher { pub(super) fn pack_request_eth68( &mut self, hashes_to_request: &mut RequestTxHashes, - hashes_from_announcement: impl HandleAnnouncement + hashes_from_announcement: impl HandleMempoolData + IntoIterator)>, ) -> RequestTxHashes { let mut acc_size_response = 0; @@ -1048,7 +1049,7 @@ mod test { #[derive(IntoIterator)] struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>); - impl HandleAnnouncement for TestValidAnnouncementData { + impl HandleMempoolData for TestValidAnnouncementData { fn is_empty(&self) -> bool { self.0.is_empty() } @@ -1057,10 +1058,6 @@ mod test { self.0.len() } - fn msg_version(&self) -> EthVersion { - EthVersion::Eth68 - } - fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self { let mut indices_to_remove = vec![]; for (i, (hash, _)) in self.0.iter().enumerate() { @@ -1080,6 +1077,12 @@ mod test { } } + impl HandleVersionedMempoolData for TestValidAnnouncementData { + fn msg_version(&self) -> EthVersion { + EthVersion::Eth68 + } + } + #[test] fn pack_eth68_request() { reth_tracing::init_test_tracing(); diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 4ab56b7fe..a56124257 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -36,9 +36,9 @@ use crate::{ }; use futures::{stream::FuturesUnordered, Future, StreamExt}; use reth_eth_wire::{ - EthVersion, GetPooledTransactions, HandleAnnouncement, NewPooledTransactionHashes, - NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions, - RequestTxHashes, Transactions, + EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData, + NewPooledTransactionHashes, NewPooledTransactionHashes66, NewPooledTransactionHashes68, + PooledTransactions, RequestTxHashes, Transactions, }; use reth_interfaces::{ p2p::error::{RequestError, RequestResult}, @@ -626,7 +626,10 @@ where // most hashes will be filtered out here since this the mempool protocol is a gossip // protocol, healthy peers will send many of the same hashes. // - let _already_known_by_pool = self.pool.retain_unknown(&mut msg); + let already_known_by_pool = self.pool.retain_unknown(&mut msg); + if let Some(intersection) = already_known_by_pool { + self.metrics.occurrences_hashes_already_in_pool.increment(intersection.len() as u64); + } if msg.is_empty() { // nothing to request @@ -893,7 +896,7 @@ where fn import_transactions( &mut self, peer_id: PeerId, - transactions: Vec, + mut transactions: Vec, source: TransactionSource, ) { // If the node is pipeline syncing, ignore transactions @@ -904,10 +907,31 @@ where return } + let Some(peer) = self.peers.get_mut(&peer_id) else { return }; + + // track that the peer knows these transaction, but only if this is a new broadcast. + // If we received the transactions as the response to our `GetPooledTransactions`` + // requests (based on received `NewPooledTransactionHashes`) then we already + // recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`. + let mut num_already_seen_by_peer = 0; + for tx in transactions.iter() { + if source.is_broadcast() && !peer.seen_transactions.insert(*tx.hash()) { + num_already_seen_by_peer += 1; + } + } + + // 1. filter out already imported txns + let already_known_by_pool = self.pool.retain_unknown(&mut transactions); + if let Some(intersection) = already_known_by_pool { + self.metrics + .occurrences_transactions_already_in_pool + .increment(intersection.len() as u64); + } + // tracks the quality of the given transactions let mut has_bad_transactions = false; - let mut num_already_seen_by_peer = 0; + // 2. filter out transactions that are invalid or already pending import if let Some(peer) = self.peers.get_mut(&peer_id) { // pre-size to avoid reallocations let mut new_txs = Vec::with_capacity(transactions.len()); @@ -927,15 +951,6 @@ where } }; - // track that the peer knows this transaction, but only if this is a new broadcast. - // If we received the transactions as the response to our GetPooledTransactions - // requests (based on received `NewPooledTransactionHashes`) then we already - // recorded the hashes in [`Self::on_new_pooled_transaction_hashes`]. - - if source.is_broadcast() && !peer.seen_transactions.insert(*tx.hash()) { - num_already_seen_by_peer += 1; - } - match self.transactions_by_peers.entry(*tx.hash()) { Entry::Occupied(mut entry) => { // transaction was already inserted @@ -962,7 +977,8 @@ where } new_txs.shrink_to_fit(); - // import new transactions as a batch to minimize lock contention on the underlying pool + // 3. import new transactions as a batch to minimize lock contention on the underlying + // pool if !new_txs.is_empty() { let pool = self.pool.clone(); // update metrics diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index ed007e5a9..17d18d9eb 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -151,7 +151,7 @@ use crate::{identifier::TransactionId, pool::PoolInner}; use aquamarine as _; -use reth_eth_wire::HandleAnnouncement; +use reth_eth_wire::HandleMempoolData; use reth_primitives::{Address, BlobTransactionSidecar, PooledTransactionsElement, TxHash, U256}; use reth_provider::StateProviderFactory; use std::{collections::HashSet, sync::Arc}; @@ -457,7 +457,7 @@ where fn retain_unknown(&self, announcement: &mut A) -> Option where - A: HandleAnnouncement, + A: HandleMempoolData, { self.pool.retain_unknown(announcement) } diff --git a/crates/transaction-pool/src/noop.rs b/crates/transaction-pool/src/noop.rs index 98dfd2492..c55a8bf05 100644 --- a/crates/transaction-pool/src/noop.rs +++ b/crates/transaction-pool/src/noop.rs @@ -16,7 +16,7 @@ use crate::{ PropagatedTransactions, TransactionEvents, TransactionOrigin, TransactionPool, TransactionValidationOutcome, TransactionValidator, ValidPoolTransaction, }; -use reth_eth_wire::HandleAnnouncement; +use reth_eth_wire::HandleMempoolData; use reth_primitives::{Address, BlobTransactionSidecar, TxHash, U256}; use std::{collections::HashSet, marker::PhantomData, sync::Arc}; use tokio::sync::{mpsc, mpsc::Receiver}; @@ -176,7 +176,7 @@ impl TransactionPool for NoopTransactionPool { fn retain_unknown(&self, _announcement: &mut A) -> Option where - A: HandleAnnouncement, + A: HandleMempoolData, { None } diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 4e4aa5664..b547fc0e4 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -82,7 +82,7 @@ use crate::{ }; use best::BestTransactions; use parking_lot::{Mutex, RwLock, RwLockReadGuard}; -use reth_eth_wire::HandleAnnouncement; +use reth_eth_wire::HandleMempoolData; use reth_primitives::{ Address, BlobTransaction, BlobTransactionSidecar, IntoRecoveredTransaction, PooledTransactionsElement, TransactionSigned, TxHash, B256, @@ -669,9 +669,9 @@ where } /// Removes and returns all transactions that are present in the pool. - pub(crate) fn retain_unknown(&self, announcement: &mut A) -> Option + pub(crate) fn retain_unknown(&self, announcement: &mut A) -> Option where - A: HandleAnnouncement, + A: HandleMempoolData, { if announcement.is_empty() { return None diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index fa1a5d5f6..178f88264 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -8,7 +8,7 @@ use crate::{ AllTransactionsEvents, }; use futures_util::{ready, Stream}; -use reth_eth_wire::HandleAnnouncement; +use reth_eth_wire::HandleMempoolData; use reth_primitives::{ kzg::KzgSettings, AccessList, Address, BlobTransactionSidecar, BlobTransactionValidationError, FromRecoveredPooledTransaction, FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, @@ -290,7 +290,7 @@ pub trait TransactionPool: Send + Sync + Clone { /// Consumer: P2P fn retain_unknown(&self, announcement: &mut A) -> Option where - A: HandleAnnouncement; + A: HandleMempoolData; /// Returns if the transaction for the given hash is already included in this pool. fn contains(&self, tx_hash: &TxHash) -> bool {