feat(net): batch-import received txs (#6299)

This commit is contained in:
ArtificialPB
2024-01-31 19:11:42 +01:00
committed by GitHub
parent 1223895466
commit dd40ea54dc

View File

@ -54,6 +54,7 @@ use reth_transaction_pool::{
PropagatedTransactions, TransactionPool, ValidPoolTransaction,
};
use std::{
cmp::max,
collections::{hash_map::Entry, HashMap, HashSet},
num::NonZeroUsize,
pin::Pin,
@ -83,7 +84,8 @@ const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096;
const POOLED_TRANSACTIONS_RESPONSE_SOFT_LIMIT_BYTE_SIZE: usize = 2 * 1024 * 1024;
/// The future for inserting a function into the pool
pub type PoolImportFuture = Pin<Box<dyn Future<Output = PoolResult<TxHash>> + Send + 'static>>;
pub type PoolImportFuture =
Pin<Box<dyn Future<Output = PoolResult<Vec<PoolResult<TxHash>>>> + Send + 'static>>;
/// Api to interact with [`TransactionsManager`] task.
#[derive(Debug, Clone)]
@ -277,11 +279,6 @@ impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool + 'static,
{
#[inline]
fn update_import_metrics(&self) {
self.metrics.pending_pool_imports.set(self.pool_imports.len() as f64);
}
#[inline]
fn update_request_metrics(&self) {
self.metrics
@ -965,6 +962,9 @@ where
let mut num_already_seen = 0;
if let Some(peer) = self.peers.get_mut(&peer_id) {
// pre-size to avoid reallocations, assuming ~50% of the transactions are new
let mut new_txs = Vec::with_capacity(max(1, transactions.len() / 2));
for tx in transactions {
// recover transaction
let tx = if let Ok(tx) = tx.try_into_ecrecovered() {
@ -990,19 +990,32 @@ where
Entry::Vacant(entry) => {
// this is a new transaction that should be imported into the pool
let pool_transaction = <Pool::Transaction as FromRecoveredPooledTransaction>::from_recovered_pooled_transaction(tx);
new_txs.push(pool_transaction);
let pool = self.pool.clone();
let import = Box::pin(async move {
pool.add_external_transaction(pool_transaction).await
});
self.pool_imports.push(import);
entry.insert(vec![peer_id]);
}
}
}
// import new transactions as a batch to minimize lock contention on the underlying pool
if !new_txs.is_empty() {
let pool = self.pool.clone();
let metric_pending_pool_imports = self.metrics.pending_pool_imports.clone();
metric_pending_pool_imports.increment(new_txs.len() as f64);
let import = Box::pin(async move {
let added = new_txs.len();
let res = pool.add_external_transactions(new_txs).await;
metric_pending_pool_imports.decrement(added as f64);
res
});
self.pool_imports.push(import);
}
if num_already_seen > 0 {
self.metrics.messages_with_already_seen_transactions.increment(1);
trace!(target: "net::tx", num_txs=%num_already_seen, ?peer_id, client=?peer.client_version, "Peer sent already seen transactions");
@ -1103,32 +1116,39 @@ where
this.request_buffered_hashes();
this.update_request_metrics();
this.update_import_metrics();
// Advance all imports
while let Poll::Ready(Some(import_res)) = this.pool_imports.poll_next_unpin(cx) {
match import_res {
Ok(hash) => {
this.on_good_import(hash);
}
let import_res = match import_res {
Ok(res) => res,
Err(err) => {
// if we're _currently_ syncing and the transaction is bad we ignore it,
// otherwise we penalize the peer that sent the bad
// transaction with the assumption that the peer should have
// known that this transaction is bad. (e.g. consensus
// rules)
if err.is_bad_transaction() && !this.network.is_syncing() {
debug!(target: "net::tx", ?err, "bad pool transaction import");
this.on_bad_import(err.hash);
continue
debug!(target: "net::tx", ?err, "bad pool transaction batch import");
continue
}
};
for res in import_res {
match res {
Ok(hash) => {
this.on_good_import(hash);
}
Err(err) => {
// if we're _currently_ syncing and the transaction is bad we ignore it,
// otherwise we penalize the peer that sent the bad
// transaction with the assumption that the peer should have
// known that this transaction is bad. (e.g. consensus
// rules)
if err.is_bad_transaction() && !this.network.is_syncing() {
debug!(target: "net::tx", ?err, "bad pool transaction import");
this.on_bad_import(err.hash);
continue
}
this.on_good_import(err.hash);
}
this.on_good_import(err.hash);
}
}
}
this.update_import_metrics();
// handle and propagate new transactions
let mut new_txs = Vec::new();
while let Poll::Ready(Some(hash)) = this.pending_transactions.poll_next_unpin(cx) {