Bug fix, filter out already imported transactions (#6723)

This commit is contained in:
Emilia Hane
2024-02-22 12:37:03 +01:00
committed by GitHub
parent 9760d906d7
commit d7a2123181
8 changed files with 106 additions and 41 deletions

View File

@ -7,7 +7,9 @@ use alloy_rlp::{
use derive_more::{Constructor, Deref, DerefMut, IntoIterator};
use reth_codecs::derive_arbitrary;
use reth_primitives::{Block, Bytes, TransactionSigned, TxHash, B256, U128};
use reth_primitives::{
Block, Bytes, PooledTransactionsElement, TransactionSigned, TxHash, B256, U128,
};
use std::{collections::HashMap, mem, sync::Arc};
@ -439,10 +441,9 @@ impl Decodable for NewPooledTransactionHashes68 {
}
}
/// Interface for handling announcement data in filters in the transaction manager and transaction
/// pool. Note: this trait may disappear when distinction between eth66 and eth68 hashes is more
/// clearly defined, see <https://github.com/paradigmxyz/reth/issues/6148>.
pub trait HandleAnnouncement {
/// Interface for handling mempool message data. Used in various filters in pipelines in
/// `TransactionsManager` and in queries to `TransactionPool`.
pub trait HandleMempoolData {
/// The announcement contains no entries.
fn is_empty(&self) -> bool;
@ -452,13 +453,16 @@ pub trait HandleAnnouncement {
/// Retain only entries for which the hash in the entry satisfies a given predicate, return
/// the rest.
fn retain_by_hash(&mut self, f: impl FnMut(&TxHash) -> bool) -> Self;
}
/// Extension of [`HandleMempoolData`] interface, for mempool messages that are versioned.
pub trait HandleVersionedMempoolData {
/// Returns the announcement version, either [`Eth66`](EthVersion::Eth66) or
/// [`Eth68`](EthVersion::Eth68).
fn msg_version(&self) -> EthVersion;
}
impl HandleAnnouncement for NewPooledTransactionHashes {
impl HandleMempoolData for NewPooledTransactionHashes {
fn is_empty(&self) -> bool {
self.is_empty()
}
@ -473,13 +477,15 @@ impl HandleAnnouncement for NewPooledTransactionHashes {
NewPooledTransactionHashes::Eth68(msg) => Self::Eth68(msg.retain_by_hash(f)),
}
}
}
impl HandleVersionedMempoolData for NewPooledTransactionHashes {
fn msg_version(&self) -> EthVersion {
self.version()
}
}
impl HandleAnnouncement for NewPooledTransactionHashes68 {
impl HandleMempoolData for NewPooledTransactionHashes68 {
fn is_empty(&self) -> bool {
self.hashes.is_empty()
}
@ -511,13 +517,15 @@ impl HandleAnnouncement for NewPooledTransactionHashes68 {
Self { hashes: removed_hashes, types: removed_types, sizes: removed_sizes }
}
}
impl HandleVersionedMempoolData for NewPooledTransactionHashes68 {
fn msg_version(&self) -> EthVersion {
EthVersion::Eth68
}
}
impl HandleAnnouncement for NewPooledTransactionHashes66 {
impl HandleMempoolData for NewPooledTransactionHashes66 {
fn is_empty(&self) -> bool {
self.0.is_empty()
}
@ -543,7 +551,9 @@ impl HandleAnnouncement for NewPooledTransactionHashes66 {
Self(removed_hashes)
}
}
impl HandleVersionedMempoolData for NewPooledTransactionHashes66 {
fn msg_version(&self) -> EthVersion {
EthVersion::Eth66
}
@ -601,7 +611,7 @@ impl ValidAnnouncementData {
}
}
impl HandleAnnouncement for ValidAnnouncementData {
impl HandleMempoolData for ValidAnnouncementData {
fn is_empty(&self) -> bool {
self.data.is_empty()
}
@ -619,7 +629,9 @@ impl HandleAnnouncement for ValidAnnouncementData {
ValidAnnouncementData::new(rest, self.version)
}
}
impl HandleVersionedMempoolData for ValidAnnouncementData {
fn msg_version(&self) -> EthVersion {
self.version
}
@ -658,6 +670,34 @@ impl FromIterator<(TxHash, Option<(u8, usize)>)> for RequestTxHashes {
}
}
impl HandleMempoolData for Vec<PooledTransactionsElement> {
fn is_empty(&self) -> bool {
self.is_empty()
}
fn len(&self) -> usize {
self.len()
}
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, tx) in self.iter().enumerate() {
if !f(tx.hash()) {
indices_to_remove.push(i);
}
}
let mut removed_txns = Vec::with_capacity(indices_to_remove.len());
for index in indices_to_remove.into_iter().rev() {
let hash = self.remove(index);
removed_txns.push(hash);
}
removed_txns
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -77,6 +77,12 @@ pub struct TransactionsManagerMetrics {
/// seen by that peer.
pub(crate) occurrences_of_transaction_already_seen_by_peer: Counter,
/* -- Freq txns already in pool -- */
/// Total number of times a hash is announced that is already in the local pool.
pub(crate) occurrences_hashes_already_in_pool: Counter,
/// Total number of times a transaction is sent that is already in the local pool.
pub(crate) occurrences_transactions_already_in_pool: Counter,
/* ================ POOL IMPORTS ================ */
/// Number of transactions about to be imported into the pool.
pub(crate) pending_pool_imports: Gauge,

View File

@ -7,7 +7,8 @@ use derive_more::Constructor;
use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
use pin_project::pin_project;
use reth_eth_wire::{
EthVersion, GetPooledTransactions, HandleAnnouncement, RequestTxHashes, ValidAnnouncementData,
EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
RequestTxHashes, ValidAnnouncementData,
};
use reth_interfaces::p2p::error::{RequestError, RequestResult};
use reth_primitives::{PeerId, PooledTransactionsElement, TxHash};
@ -192,7 +193,7 @@ impl TransactionFetcher {
pub(super) fn pack_request_eth68(
&mut self,
hashes_to_request: &mut RequestTxHashes,
hashes_from_announcement: impl HandleAnnouncement
hashes_from_announcement: impl HandleMempoolData
+ IntoIterator<Item = (TxHash, Option<(u8, usize)>)>,
) -> RequestTxHashes {
let mut acc_size_response = 0;
@ -1048,7 +1049,7 @@ mod test {
#[derive(IntoIterator)]
struct TestValidAnnouncementData(Vec<(TxHash, Option<(u8, usize)>)>);
impl HandleAnnouncement for TestValidAnnouncementData {
impl HandleMempoolData for TestValidAnnouncementData {
fn is_empty(&self) -> bool {
self.0.is_empty()
}
@ -1057,10 +1058,6 @@ mod test {
self.0.len()
}
fn msg_version(&self) -> EthVersion {
EthVersion::Eth68
}
fn retain_by_hash(&mut self, mut f: impl FnMut(&TxHash) -> bool) -> Self {
let mut indices_to_remove = vec![];
for (i, (hash, _)) in self.0.iter().enumerate() {
@ -1080,6 +1077,12 @@ mod test {
}
}
impl HandleVersionedMempoolData for TestValidAnnouncementData {
fn msg_version(&self) -> EthVersion {
EthVersion::Eth68
}
}
#[test]
fn pack_eth68_request() {
reth_tracing::init_test_tracing();

View File

@ -36,9 +36,9 @@ use crate::{
};
use futures::{stream::FuturesUnordered, Future, StreamExt};
use reth_eth_wire::{
EthVersion, GetPooledTransactions, HandleAnnouncement, NewPooledTransactionHashes,
NewPooledTransactionHashes66, NewPooledTransactionHashes68, PooledTransactions,
RequestTxHashes, Transactions,
EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
NewPooledTransactionHashes, NewPooledTransactionHashes66, NewPooledTransactionHashes68,
PooledTransactions, RequestTxHashes, Transactions,
};
use reth_interfaces::{
p2p::error::{RequestError, RequestResult},
@ -626,7 +626,10 @@ where
// most hashes will be filtered out here since this the mempool protocol is a gossip
// protocol, healthy peers will send many of the same hashes.
//
let _already_known_by_pool = self.pool.retain_unknown(&mut msg);
let already_known_by_pool = self.pool.retain_unknown(&mut msg);
if let Some(intersection) = already_known_by_pool {
self.metrics.occurrences_hashes_already_in_pool.increment(intersection.len() as u64);
}
if msg.is_empty() {
// nothing to request
@ -893,7 +896,7 @@ where
fn import_transactions(
&mut self,
peer_id: PeerId,
transactions: Vec<PooledTransactionsElement>,
mut transactions: Vec<PooledTransactionsElement>,
source: TransactionSource,
) {
// If the node is pipeline syncing, ignore transactions
@ -904,10 +907,31 @@ where
return
}
let Some(peer) = self.peers.get_mut(&peer_id) else { return };
// track that the peer knows these transaction, but only if this is a new broadcast.
// If we received the transactions as the response to our `GetPooledTransactions``
// requests (based on received `NewPooledTransactionHashes`) then we already
// recorded the hashes as seen by this peer in `Self::on_new_pooled_transaction_hashes`.
let mut num_already_seen_by_peer = 0;
for tx in transactions.iter() {
if source.is_broadcast() && !peer.seen_transactions.insert(*tx.hash()) {
num_already_seen_by_peer += 1;
}
}
// 1. filter out already imported txns
let already_known_by_pool = self.pool.retain_unknown(&mut transactions);
if let Some(intersection) = already_known_by_pool {
self.metrics
.occurrences_transactions_already_in_pool
.increment(intersection.len() as u64);
}
// tracks the quality of the given transactions
let mut has_bad_transactions = false;
let mut num_already_seen_by_peer = 0;
// 2. filter out transactions that are invalid or already pending import
if let Some(peer) = self.peers.get_mut(&peer_id) {
// pre-size to avoid reallocations
let mut new_txs = Vec::with_capacity(transactions.len());
@ -927,15 +951,6 @@ where
}
};
// track that the peer knows this transaction, but only if this is a new broadcast.
// If we received the transactions as the response to our GetPooledTransactions
// requests (based on received `NewPooledTransactionHashes`) then we already
// recorded the hashes in [`Self::on_new_pooled_transaction_hashes`].
if source.is_broadcast() && !peer.seen_transactions.insert(*tx.hash()) {
num_already_seen_by_peer += 1;
}
match self.transactions_by_peers.entry(*tx.hash()) {
Entry::Occupied(mut entry) => {
// transaction was already inserted
@ -962,7 +977,8 @@ where
}
new_txs.shrink_to_fit();
// import new transactions as a batch to minimize lock contention on the underlying pool
// 3. import new transactions as a batch to minimize lock contention on the underlying
// pool
if !new_txs.is_empty() {
let pool = self.pool.clone();
// update metrics

View File

@ -151,7 +151,7 @@
use crate::{identifier::TransactionId, pool::PoolInner};
use aquamarine as _;
use reth_eth_wire::HandleAnnouncement;
use reth_eth_wire::HandleMempoolData;
use reth_primitives::{Address, BlobTransactionSidecar, PooledTransactionsElement, TxHash, U256};
use reth_provider::StateProviderFactory;
use std::{collections::HashSet, sync::Arc};
@ -457,7 +457,7 @@ where
fn retain_unknown<A>(&self, announcement: &mut A) -> Option<A>
where
A: HandleAnnouncement,
A: HandleMempoolData,
{
self.pool.retain_unknown(announcement)
}

View File

@ -16,7 +16,7 @@ use crate::{
PropagatedTransactions, TransactionEvents, TransactionOrigin, TransactionPool,
TransactionValidationOutcome, TransactionValidator, ValidPoolTransaction,
};
use reth_eth_wire::HandleAnnouncement;
use reth_eth_wire::HandleMempoolData;
use reth_primitives::{Address, BlobTransactionSidecar, TxHash, U256};
use std::{collections::HashSet, marker::PhantomData, sync::Arc};
use tokio::sync::{mpsc, mpsc::Receiver};
@ -176,7 +176,7 @@ impl TransactionPool for NoopTransactionPool {
fn retain_unknown<A>(&self, _announcement: &mut A) -> Option<A>
where
A: HandleAnnouncement,
A: HandleMempoolData,
{
None
}

View File

@ -82,7 +82,7 @@ use crate::{
};
use best::BestTransactions;
use parking_lot::{Mutex, RwLock, RwLockReadGuard};
use reth_eth_wire::HandleAnnouncement;
use reth_eth_wire::HandleMempoolData;
use reth_primitives::{
Address, BlobTransaction, BlobTransactionSidecar, IntoRecoveredTransaction,
PooledTransactionsElement, TransactionSigned, TxHash, B256,
@ -669,9 +669,9 @@ where
}
/// Removes and returns all transactions that are present in the pool.
pub(crate) fn retain_unknown<A: HandleAnnouncement>(&self, announcement: &mut A) -> Option<A>
pub(crate) fn retain_unknown<A: HandleMempoolData>(&self, announcement: &mut A) -> Option<A>
where
A: HandleAnnouncement,
A: HandleMempoolData,
{
if announcement.is_empty() {
return None

View File

@ -8,7 +8,7 @@ use crate::{
AllTransactionsEvents,
};
use futures_util::{ready, Stream};
use reth_eth_wire::HandleAnnouncement;
use reth_eth_wire::HandleMempoolData;
use reth_primitives::{
kzg::KzgSettings, AccessList, Address, BlobTransactionSidecar, BlobTransactionValidationError,
FromRecoveredPooledTransaction, FromRecoveredTransaction, IntoRecoveredTransaction, PeerId,
@ -290,7 +290,7 @@ pub trait TransactionPool: Send + Sync + Clone {
/// Consumer: P2P
fn retain_unknown<A>(&self, announcement: &mut A) -> Option<A>
where
A: HandleAnnouncement;
A: HandleMempoolData;
/// Returns if the transaction for the given hash is already included in this pool.
fn contains(&self, tx_hash: &TxHash) -> bool {