Clean up tx manager (#6681)

This commit is contained in:
Emilia Hane
2024-02-21 20:12:14 +01:00
committed by GitHub
parent 4a386d5782
commit f475adf243
6 changed files with 174 additions and 174 deletions

View File

@ -1,6 +1,7 @@
//! Implements the `GetPooledTransactions` and `PooledTransactions` message types.
use alloy_rlp::{RlpDecodableWrapper, RlpEncodableWrapper};
use derive_more::Deref;
use reth_codecs::derive_arbitrary;
use reth_primitives::{PooledTransactionsElement, TransactionSigned, B256};
@ -33,7 +34,7 @@ where
/// corresponds to a requested hash. Hashes may need to be re-requested if the bodies are not
/// included in the response.
// #[derive_arbitrary(rlp, 10)]
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default)]
#[derive(Clone, Debug, PartialEq, Eq, RlpEncodableWrapper, RlpDecodableWrapper, Default, Deref)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct PooledTransactions(
/// The transaction bodies, each of which should correspond to a requested hash.

View File

@ -57,14 +57,27 @@ pub struct SessionManagerMetrics {
#[derive(Metrics)]
#[metrics(scope = "network")]
pub struct TransactionsManagerMetrics {
/* ================ BROADCAST ================ */
/// Total number of propagated transactions
pub(crate) propagated_transactions: Counter,
/// Total number of reported bad transactions
pub(crate) reported_bad_transactions: Counter,
/// Total number of messages with already seen hashes
pub(crate) messages_with_already_seen_hashes: Counter,
/// Total number of messages with already seen full transactions
pub(crate) messages_with_already_seen_transactions: Counter,
/* -- Freq txns already marked as seen by peer -- */
/// Total number of messages from a peer, announcing transactions that have already been
/// marked as seen by that peer.
pub(crate) messages_with_hashes_already_seen_by_peer: Counter,
/// Total number of messages from a peer, with transaction that have already been marked as
/// seen by that peer.
pub(crate) messages_with_transactions_already_seen_by_peer: Counter,
/// Total number of occurrences, of a peer announcing a transaction that has already been
/// marked as seen by that peer.
pub(crate) occurrences_hash_already_seen_by_peer: Counter,
/// Total number of times a transaction is seen from a peer, that has already been marked as
/// seen by that peer.
pub(crate) occurrences_of_transaction_already_seen_by_peer: Counter,
/* ================ POOL IMPORTS ================ */
/// Number of transactions about to be imported into the pool.
pub(crate) pending_pool_imports: Gauge,
/// Total number of bad imports.
@ -76,6 +89,7 @@ pub struct TransactionsManagerMetrics {
pub(crate) capacity_pending_pool_imports: Counter,
/// Currently active outgoing [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions)
/// requests.
/* ================ TX FETCHER ================ */
pub(crate) inflight_transaction_requests: Gauge,
/// Number of inflight requests at which the
/// [`TransactionFetcher`](crate::transactions::TransactionFetcher) is considered to be at

View File

@ -36,17 +36,10 @@ pub const SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE: usize = 2 * 1024 *
pub mod tx_manager {
use super::SOFT_LIMIT_COUNT_HASHES_IN_NEW_POOLED_TRANSACTIONS_BROADCAST_MESSAGE;
/// Default limit for number of transactions to keep track of for a single peer, for
/// transactions that the peer's pool and local pool have in common.
/// Default limit for number of transactions to keep track of for a single peer.
///
/// Default is 10 KiB.
pub const DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER_AND_IN_POOL: usize = 10 * 1024;
/// Default limit for the number of transactions to keep track of for a single peer, for
/// transactions that are in the peer's pool but maybe not in the local pool yet.
///
/// Default is 10 KiB.
pub const DEFAULT_CAPACITY_CACHE_SENT_BY_PEER_AND_MAYBE_IN_POOL: usize = 10 * 1024;
pub const DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER: usize = 10 * 1024;
/// Default maximum pending pool imports to tolerate.
///
@ -62,7 +55,10 @@ pub mod tx_manager {
}
pub mod tx_fetcher {
use crate::peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND};
use crate::{
peers::{DEFAULT_MAX_COUNT_PEERS_INBOUND, DEFAULT_MAX_COUNT_PEERS_OUTBOUND},
transactions::fetcher::TransactionFetcherInfo,
};
use super::{
SOFT_LIMIT_BYTE_SIZE_POOLED_TRANSACTIONS_RESPONSE,
@ -193,16 +189,14 @@ pub mod tx_fetcher {
/// Default divisor of the max inflight request when calculating search breadth of the search
/// for any idle peer to which to send a request filled with hashes pending fetch. The max
/// inflight requests is configured in
/// [`TransactionFetcherInfo`](crate::transactions::fetcher::TransactionFetcherInfo).
/// inflight requests is configured in [`TransactionFetcherInfo`].
///
/// Default is 3 requests.
pub const DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_IDLE_PEER: usize = 3;
/// Default divisor of the max inflight request when calculating search breadth of the search
/// for the intersection of hashes announced by a peer and hashes pending fetch. The max
/// inflight requests is configured in
/// [`TransactionFetcherInfo`](crate::transactions::fetcher::TransactionFetcherInfo).
/// inflight requests is configured in [`TransactionFetcherInfo`].
///
/// Default is 2 requests.
pub const DEFAULT_DIVISOR_MAX_COUNT_INFLIGHT_REQUESTS_ON_FIND_INTERSECTION: usize = 2;
@ -238,4 +232,37 @@ pub mod tx_fetcher {
///
/// Default is 120 bytes.
pub const MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED: usize = 120;
/// Marginal on the number of hashes to preallocate memory for in a
/// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request, when packed
/// according to the [`Eth68`](reth_eth_wire::EthVersion::Eth68) protocol version. To make
/// sure enough memory is preallocated in most cases, it's sensible to use a margin. This,
/// since the capacity is calculated based on median value
/// [`MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED`]. There may otherwise be a noteworthy number of
/// cases where just 1 or 2 bytes too little memory is preallocated.
///
/// Default is 8 hashes.
pub const DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST: usize = 8;
/// Returns the approx number of transaction hashes that a
/// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request will have capacity
/// for w.r.t. the [`Eth68`](reth_eth_wire::EthVersion::Eth68) protocol version. This is useful
/// for preallocating memory.
pub const fn approx_capacity_get_pooled_transactions_req_eth68(
info: &TransactionFetcherInfo,
) -> usize {
let max_size_expected_response =
info.soft_limit_byte_size_pooled_transactions_response_on_pack_request;
max_size_expected_response / MEDIAN_BYTE_SIZE_SMALL_LEGACY_TX_ENCODED +
DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST
}
/// Returns the approx number of transactions that a
/// [`GetPooledTransactions`](reth_eth_wire::GetPooledTransactions) request will
/// have capacity for w.r.t. the [`Eth66`](reth_eth_wire::EthVersion::Eth66) protocol version.
/// This is useful for preallocating memory.
pub const fn approx_capacity_get_pooled_transactions_req_eth66() -> usize {
SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST
}
}

View File

@ -2,11 +2,12 @@ use crate::{
cache::{LruCache, LruMap},
message::PeerRequest,
};
use derive_more::Constructor;
use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
use pin_project::pin_project;
use reth_eth_wire::{
GetPooledTransactions, HandleAnnouncement, RequestTxHashes, ValidAnnouncementData,
EthVersion, GetPooledTransactions, HandleAnnouncement, RequestTxHashes, ValidAnnouncementData,
};
use reth_interfaces::p2p::error::{RequestError, RequestResult};
use reth_primitives::{PeerId, PooledTransactionsElement, TxHash};
@ -245,6 +246,7 @@ impl TransactionFetcher {
surplus_hashes.extend(hashes_from_announcement_iter.map(|(hash, _metadata)| hash));
surplus_hashes.shrink_to_fit();
hashes_to_request.shrink_to_fit();
surplus_hashes
}
@ -260,16 +262,17 @@ impl TransactionFetcher {
hashes_to_request: &mut RequestTxHashes,
hashes_from_announcement: ValidAnnouncementData,
) -> RequestTxHashes {
let (mut request_hashes, _version) = hashes_from_announcement.into_request_hashes();
if request_hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
*hashes_to_request = request_hashes;
let (mut hashes, _version) = hashes_from_announcement.into_request_hashes();
if hashes.len() <= SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST {
*hashes_to_request = hashes;
hashes_to_request.shrink_to_fit();
RequestTxHashes::default()
} else {
let surplus_hashes = request_hashes
.split_off(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST - 1);
*hashes_to_request = request_hashes;
let surplus_hashes =
hashes.split_off(SOFT_LIMIT_COUNT_HASHES_IN_GET_POOLED_TRANSACTIONS_REQUEST - 1);
*hashes_to_request = hashes;
hashes_to_request.shrink_to_fit();
RequestTxHashes::new(surplus_hashes)
}
@ -351,7 +354,9 @@ impl TransactionFetcher {
has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
metrics_increment_egress_peer_channel_full: impl FnOnce(),
) {
let mut hashes_to_request = RequestTxHashes::with_capacity(32);
let init_capacity_req = approx_capacity_get_pooled_transactions_req_eth68(&self.info);
let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req);
let is_session_active = |peer_id: &PeerId| peers.contains_key(peer_id);
// budget to look for an idle peer before giving up
@ -381,7 +386,7 @@ impl TransactionFetcher {
self.fill_request_from_hashes_pending_fetch(
&mut hashes_to_request,
peer.seen_transactions.maybe_pending_transaction_hashes(),
&peer.seen_transactions,
budget_fill_request,
);
@ -797,6 +802,19 @@ impl TransactionFetcher {
Some(limit)
}
}
/// Returns the approx number of transactions that a [`GetPooledTransactions`] request will
/// have capacity for w.r.t. the given version of the protocol.
pub fn approx_capacity_get_pooled_transactions_req(
&self,
announcement_version: EthVersion,
) -> usize {
if announcement_version.is_eth68() {
approx_capacity_get_pooled_transactions_req_eth68(&self.info)
} else {
approx_capacity_get_pooled_transactions_req_eth66()
}
}
}
impl Stream for TransactionFetcher {
@ -825,7 +843,7 @@ impl Stream for TransactionFetcher {
return match result {
Ok(Ok(transactions)) => {
// clear received hashes
let mut fetched = Vec::with_capacity(transactions.hashes().count());
let mut fetched = Vec::with_capacity(transactions.len());
requested_hashes.retain(|requested_hash| {
if transactions.hashes().any(|hash| hash == requested_hash) {
// hash is now known, stop tracking
@ -834,6 +852,7 @@ impl Stream for TransactionFetcher {
}
true
});
fetched.shrink_to_fit();
self.remove_hashes_from_transaction_fetcher(fetched);
@ -984,7 +1003,7 @@ pub struct TransactionFetcherInfo {
/// Soft limit for the byte size of the expected
/// [`PooledTransactions`] response on packing a
/// [`GetPooledTransactions`] request with hashes.
soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
pub(super) soft_limit_byte_size_pooled_transactions_response_on_pack_request: usize,
/// Soft limit for the byte size of a [`PooledTransactions`]
/// response on assembling a [`GetPooledTransactions`]
/// request. Spec'd at 2 MiB.
@ -1020,7 +1039,6 @@ mod test {
use std::collections::HashSet;
use derive_more::IntoIterator;
use reth_eth_wire::EthVersion;
use reth_primitives::B256;
use crate::transactions::tests::{default_cache, new_mock_session};
@ -1142,11 +1160,11 @@ mod test {
let (mut peer_1_data, mut peer_1_mock_session_rx) =
new_mock_session(peer_1, EthVersion::Eth66);
for hash in &seen_hashes {
peer_1_data.seen_transactions.seen_in_announcement(*hash);
peer_1_data.seen_transactions.insert(*hash);
}
let (mut peer_2_data, _) = new_mock_session(peer_2, EthVersion::Eth66);
for hash in &seen_hashes {
peer_2_data.seen_transactions.seen_in_announcement(*hash);
peer_2_data.seen_transactions.insert(*hash);
}
let mut peers = HashMap::new();
peers.insert(peer_1, peer_1_data);

View File

@ -55,7 +55,6 @@ use reth_transaction_pool::{
PropagatedTransactions, TransactionPool, ValidPoolTransaction,
};
use std::{
cmp::max,
collections::{hash_map::Entry, HashMap, HashSet},
num::NonZeroUsize,
pin::Pin,
@ -338,8 +337,7 @@ where
// we sent a response at which point we assume that the peer is aware of the
// transactions
peer.seen_transactions
.extend_seen_by_peer_and_in_pool(transactions.iter().map(|tx| *tx.hash()));
peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.hash()));
let resp = PooledTransactions(transactions);
let _ = response.send(Ok(resp));
@ -407,9 +405,7 @@ where
// transaction lists, before deciding whether or not to send full transactions to the
// peer.
for tx in to_propagate.iter() {
if !peer.seen_transactions.has_seen_transaction(&tx.hash()) {
peer.seen_transactions.seen_by_peer_and_in_pool(tx.hash());
if peer.seen_transactions.insert(tx.hash()) {
hashes.push(tx);
// Do not send full 4844 transaction hashes to peers.
@ -495,9 +491,7 @@ where
// Iterate through the transactions to propagate and fill the hashes and full transaction
for tx in to_propagate {
if !peer.seen_transactions.has_seen_transaction(&tx.hash()) {
peer.seen_transactions.seen_by_peer_and_in_pool(tx.hash());
if peer.seen_transactions.insert(tx.hash()) {
full_transactions.push(&tx);
}
}
@ -543,8 +537,7 @@ where
let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
for tx in to_propagate {
if peer.seen_transactions.has_seen_transaction(&tx.hash()) {
peer.seen_transactions.seen_by_peer_and_in_pool(tx.hash());
if !peer.seen_transactions.insert(tx.hash()) {
hashes.push(&tx);
}
}
@ -597,7 +590,34 @@ where
return
};
let client_version = peer.client_version.clone();
let client = peer.client_version.clone();
// keep track of the transactions the peer knows
let mut count_txns_already_seen_by_peer = 0;
for tx in msg.iter_hashes().copied() {
if !peer.seen_transactions.insert(tx) {
count_txns_already_seen_by_peer += 1;
}
}
if count_txns_already_seen_by_peer > 0 {
// this may occur if transactions are sent or announced to a peer, at the same time as
// the peer sends/announces those hashes to us. this is because, marking
// txns as seen by a peer is done optimistically upon sending them to the
// peer.
self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
self.metrics
.occurrences_hash_already_seen_by_peer
.increment(count_txns_already_seen_by_peer);
trace!(target: "net::tx",
count_txns_already_seen_by_peer=%count_txns_already_seen_by_peer,
peer_id=format!("{peer_id:#}"),
client=?client,
"Peer sent hashes that have already been marked as seen by peer"
);
self.report_already_seen(peer_id);
}
// 1. filter out known hashes
//
@ -606,24 +626,7 @@ where
// most hashes will be filtered out here since this the mempool protocol is a gossip
// protocol, healthy peers will send many of the same hashes.
//
let already_known_by_pool = self.pool.retain_unknown(&mut msg);
// keep track of the transactions the peer knows
let mut num_already_seen = 0;
if let Some(pools_intersection) = already_known_by_pool {
for tx in pools_intersection.into_hashes() {
if peer.seen_transactions.has_seen_transaction(&tx) {
num_already_seen += 1;
}
peer.seen_transactions.seen_by_peer_and_in_pool(tx);
}
}
for tx in msg.iter_hashes().copied() {
if peer.seen_transactions.has_seen_transaction(&tx) {
num_already_seen += 1;
}
peer.seen_transactions.seen_in_announcement(tx);
}
let _already_known_by_pool = self.pool.retain_unknown(&mut msg);
if msg.is_empty() {
// nothing to request
@ -676,7 +679,7 @@ where
|hash| bad_imports.contains(hash),
&peer_id,
|peer_id| self.peers.contains_key(&peer_id),
&client_version,
&client,
);
if valid_announcement_data.is_empty() {
@ -689,7 +692,7 @@ where
hashes_len=valid_announcement_data.iter().count(),
hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
msg_version=%valid_announcement_data.msg_version(),
client_version=%client_version,
client_version=%client,
"received previously unseen and pending hashes in announcement from peer"
);
@ -704,7 +707,7 @@ where
peer_id=format!("{peer_id:#}"),
hashes=?*hashes,
msg_version=%msg_version,
client_version=%client_version,
client_version=%client,
"buffering hashes announced by busy peer"
);
@ -713,21 +716,25 @@ where
return
}
// load message version before announcement data is destructed in packing
// load message version before announcement data type is destructed in packing
let msg_version = valid_announcement_data.msg_version();
//
// demand recommended soft limit on response, however the peer may enforce an arbitrary
// limit on the response (2MB)
let mut hashes_to_request = RequestTxHashes::with_capacity(valid_announcement_data.len());
//
// request buffer is shrunk via call to pack request!
let init_capacity_req =
self.transaction_fetcher.approx_capacity_get_pooled_transactions_req(msg_version);
let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req);
let surplus_hashes =
self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
hashes_to_request.shrink_to_fit();
if !surplus_hashes.is_empty() {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
surplus_hashes=?*surplus_hashes,
msg_version=%msg_version,
client_version=%client_version,
client_version=%client,
"some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
);
@ -738,7 +745,7 @@ where
peer_id=format!("{peer_id:#}"),
hashes=?*hashes_to_request,
msg_version=%msg_version,
client_version=%client_version,
client_version=%client,
"sending hashes in `GetPooledTransactions` request to peer's session"
);
@ -758,17 +765,10 @@ where
peer_id=format!("{peer_id:#}"),
failed_to_request_hashes=?*failed_to_request_hashes,
conn_eth_version=%conn_eth_version,
client_version=%client_version,
client_version=%client,
"sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
);
self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
return
}
if num_already_seen > 0 {
self.metrics.messages_with_already_seen_hashes.increment(1);
trace!(target: "net::tx", num_hashes=%num_already_seen, ?peer_id, client=?client_version, "Peer sent already seen hashes");
self.report_already_seen(peer_id);
}
}
@ -831,12 +831,7 @@ where
let hashes = self
.peers
.get(&peer_id)
.map(|peer| {
peer.seen_transactions
.iter_transaction_hashes()
.copied()
.collect::<HashSet<_>>()
})
.map(|peer| peer.seen_transactions.iter().copied().collect::<HashSet<_>>())
.unwrap_or_default();
res.insert(peer_id, hashes);
}
@ -882,7 +877,7 @@ where
}
for pooled_tx in pooled_txs.into_iter() {
peer.seen_transactions.seen_by_peer_and_in_pool(*pooled_tx.hash());
peer.seen_transactions.insert(*pooled_tx.hash());
msg_builder.push_pooled(pooled_tx);
}
@ -911,12 +906,11 @@ where
// tracks the quality of the given transactions
let mut has_bad_transactions = false;
let mut num_already_seen = 0;
let mut num_already_seen_by_peer = 0;
if let Some(peer) = self.peers.get_mut(&peer_id) {
// pre-size to avoid reallocations, assuming ~50% of the transactions are new
let mut new_txs = Vec::with_capacity(max(1, transactions.len() / 2));
// pre-size to avoid reallocations
let mut new_txs = Vec::with_capacity(transactions.len());
for tx in transactions {
// recover transaction
let tx = match tx.try_into_ecrecovered() {
@ -936,15 +930,10 @@ where
// track that the peer knows this transaction, but only if this is a new broadcast.
// If we received the transactions as the response to our GetPooledTransactions
// requests (based on received `NewPooledTransactionHashes`) then we already
// recorded the hashes in [`Self::on_new_pooled_transaction_hashes`] as `peer.
// seen_transactions.transactions_received_as_hash`. In that case, we don't move
// hashes from `peer.seen_transactions.transactions_received_as_hash` to
// `peer.seen_transactions.transactions_received_in_full_or_sent` here. The
// division of the `seen_transactions` list, just serves as a hint for tx fetcher
// of which hashes are missing. It's good enough without reallocating hashes.
// recorded the hashes in [`Self::on_new_pooled_transaction_hashes`].
if source.is_broadcast() && peer.seen_transactions.has_seen_transaction(tx.hash()) {
num_already_seen += 1;
if source.is_broadcast() && !peer.seen_transactions.insert(*tx.hash()) {
num_already_seen_by_peer += 1;
}
match self.transactions_by_peers.entry(*tx.hash()) {
@ -971,6 +960,7 @@ where
}
}
}
new_txs.shrink_to_fit();
// import new transactions as a batch to minimize lock contention on the underlying pool
if !new_txs.is_empty() {
@ -1001,9 +991,12 @@ where
self.pool_imports.push(import);
}
if num_already_seen > 0 {
self.metrics.messages_with_already_seen_transactions.increment(1);
trace!(target: "net::tx", num_txs=%num_already_seen, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
if num_already_seen_by_peer > 0 {
self.metrics.messages_with_transactions_already_seen_by_peer.increment(1);
self.metrics
.occurrences_of_transaction_already_seen_by_peer
.increment(num_already_seen_by_peer);
trace!(target: "net::tx", num_txs=%num_already_seen_by_peer, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
}
}
@ -1012,7 +1005,7 @@ where
self.report_peer_bad_transactions(peer_id)
}
if num_already_seen > 0 {
if num_already_seen_by_peer > 0 {
self.report_already_seen(peer_id);
}
}
@ -1329,72 +1322,13 @@ impl TransactionSource {
}
}
/// Tracks transactions a peer has seen.
#[derive(Debug)]
struct TransactionsSeenByPeer {
/// Keeps track of transactions that we know the peer has seen because they were announced by
/// the peer. It's possible that these transactions are pending fetch.
transactions_received_as_hash: LruCache<B256>,
/// Keeps track of transactions that we know the peer has seen because they were received in
/// full from the peer or sent to the peer.
transactions_received_in_full_or_sent: LruCache<B256>,
}
impl TransactionsSeenByPeer {
/// Returns `true` if peer has seen transaction.
fn has_seen_transaction(&self, hash: &TxHash) -> bool {
self.transactions_received_in_full_or_sent.contains(hash) ||
self.transactions_received_as_hash.contains(hash)
}
/// Inserts a transaction hash that has been seen in an announcement.
fn seen_in_announcement(&mut self, hash: TxHash) {
_ = self.transactions_received_as_hash.insert(hash);
}
/// Inserts a hash of a transaction that has either been sent to the peer, or has been
/// received in full from the peer over broadcast.
fn seen_by_peer_and_in_pool(&mut self, hash: TxHash) {
_ = self.transactions_received_in_full_or_sent.insert(hash);
}
/// Inserts a list of transactions that have either been sent to the peer, or have been
/// received in full from the peer over broadcast.
fn extend_seen_by_peer_and_in_pool(&mut self, hashes: impl IntoIterator<Item = TxHash>) {
self.transactions_received_in_full_or_sent.extend(hashes)
}
/// Returns an iterator over all transactions that the peer has seen.
fn iter_transaction_hashes(&self) -> impl Iterator<Item = &TxHash> {
self.transactions_received_as_hash
.iter()
.chain(self.transactions_received_in_full_or_sent.iter())
}
/// Returns an iterator over all transaction hashes that the peer has sent in an announcement.
fn maybe_pending_transaction_hashes(&self) -> &LruCache<TxHash> {
&self.transactions_received_as_hash
}
}
impl Default for TransactionsSeenByPeer {
fn default() -> Self {
Self {
transactions_received_as_hash: LruCache::new(
NonZeroUsize::new(DEFAULT_CAPACITY_CACHE_SENT_BY_PEER_AND_MAYBE_IN_POOL).unwrap(),
),
transactions_received_in_full_or_sent: LruCache::new(
NonZeroUsize::new(DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER_AND_IN_POOL).unwrap(),
),
}
}
}
/// Tracks a single peer
#[derive(Debug)]
struct Peer {
/// Keeps track of transactions that we know the peer has seen.
seen_transactions: TransactionsSeenByPeer,
/// Optimistically keeps track of transactions that we know the peer has seen. Optimistic, in
/// the sense that transactions are preemptively marked as seen by peer when they are sent to
/// the peer.
seen_transactions: LruCache<B256>,
/// A communication channel directly to the peer's session task.
request_tx: PeerRequestSender,
/// negotiated version of the session.
@ -1406,7 +1340,9 @@ struct Peer {
impl Peer {
fn new(request_tx: PeerRequestSender, version: EthVersion, client_version: Arc<str>) -> Self {
Self {
seen_transactions: TransactionsSeenByPeer::default(),
seen_transactions: LruCache::new(
NonZeroUsize::new(DEFAULT_CAPACITY_CACHE_SEEN_BY_PEER).expect("infallible"),
),
request_tx,
version,
client_version,
@ -1917,8 +1853,8 @@ mod tests {
let (mut peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
// mark hashes as seen by peer so it can fish them out from the cache for hashes pending
// fetch
peer_1.seen_transactions.seen_in_announcement(seen_hashes[0]);
peer_1.seen_transactions.seen_in_announcement(seen_hashes[1]);
peer_1.seen_transactions.insert(seen_hashes[0]);
peer_1.seen_transactions.insert(seen_hashes[1]);
tx_manager.peers.insert(peer_id_1, peer_1);
// hashes are seen and currently not inflight, with one fallback peer, and are buffered

View File

@ -269,19 +269,19 @@ impl FilterAnnouncement for EthAnnouncementFilter {
//
// 3. checks if announcement is spam packed with duplicate hashes
//
let mut deduped_data = HashMap::with_capacity(hashes.len());
let original_len = hashes.len();
let mut deduped_data = HashMap::with_capacity(hashes.len());
for hash in hashes.into_iter().rev() {
if let (Some(ty), Some(size)) = (types.pop(), sizes.pop()) {
deduped_data.insert(hash, Some((ty, size)));
}
}
deduped_data.shrink_to_fit();
if deduped_data.len() != original_len {
should_report_peer = true
}
(
if should_report_peer { FilterOutcome::ReportPeer } else { FilterOutcome::Ok },
ValidAnnouncementData::new_eth68(deduped_data),
@ -300,7 +300,9 @@ impl FilterAnnouncement for EthAnnouncementFilter {
let NewPooledTransactionHashes66(hashes) = msg;
//
// 1. checks if the announcement is empty
//
if hashes.is_empty() {
debug!(target: "net::tx",
network=%Self,
@ -309,14 +311,16 @@ impl FilterAnnouncement for EthAnnouncementFilter {
return (FilterOutcome::ReportPeer, ValidAnnouncementData::empty_eth66())
}
//
// 2. checks if announcement is spam packed with duplicate hashes
let mut deduped_data = HashMap::with_capacity(hashes.len());
//
let original_len = hashes.len();
let mut deduped_data = HashMap::with_capacity(hashes.len());
for hash in hashes.into_iter().rev() {
deduped_data.insert(hash, None);
}
deduped_data.shrink_to_fit();
(
if deduped_data.len() != original_len {