perf(tx-pool): reuse write lock to insert txs batch (#12806)

This commit is contained in:
Hai | RISE
2024-12-06 20:28:40 +07:00
committed by GitHub
parent 2f46fe6d48
commit 634db30b6b

View File

@ -83,7 +83,7 @@ use crate::{
};
use alloy_primitives::{Address, TxHash, B256};
use best::BestTransactions;
use parking_lot::{Mutex, RwLock, RwLockReadGuard};
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use reth_eth_wire_types::HandleMempoolData;
use reth_execution_types::ChangedAccount;
@ -425,6 +425,7 @@ where
/// come in through that function, either as a batch or `std::iter::once`.
fn add_transaction(
&self,
pool: &mut RwLockWriteGuard<'_, TxPool<T>>,
origin: TransactionOrigin,
tx: TransactionValidationOutcome<T::Transaction>,
) -> PoolResult<TxHash> {
@ -458,7 +459,7 @@ where
origin,
};
let added = self.pool.write().add_transaction(tx, balance, state_nonce)?;
let added = pool.add_transaction(tx, balance, state_nonce)?;
let hash = *added.hash();
// transaction was successfully inserted into the pool
@ -521,33 +522,52 @@ where
}
/// Adds all transactions in the iterator to the pool, returning a list of results.
///
/// Note: A large batch may lock the pool for a long time that blocks important operations
/// like updating the pool on canonical state changes. The caller should consider having
/// a max batch size to balance transaction insertions with other updates.
pub fn add_transactions(
&self,
origin: TransactionOrigin,
transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
) -> Vec<PoolResult<TxHash>> {
let mut added =
transactions.into_iter().map(|tx| self.add_transaction(origin, tx)).collect::<Vec<_>>();
// Add the transactions and enforce the pool size limits in one write lock
let (mut added, discarded) = {
let mut pool = self.pool.write();
let added = transactions
.into_iter()
.map(|tx| self.add_transaction(&mut pool, origin, tx))
.collect::<Vec<_>>();
// If at least one transaction was added successfully, then we enforce the pool size limits.
let discarded =
if added.iter().any(Result::is_ok) { self.discard_worst() } else { Default::default() };
// Enforce the pool size limits if at least one transaction was added successfully
let discarded = if added.iter().any(Result::is_ok) {
pool.discard_worst()
} else {
Default::default()
};
if discarded.is_empty() {
return added
}
(added, discarded)
};
{
let mut listener = self.event_listener.write();
discarded.iter().for_each(|tx| listener.discarded(tx));
}
if !discarded.is_empty() {
// Delete any blobs associated with discarded blob transactions
self.delete_discarded_blobs(discarded.iter());
// It may happen that a newly added transaction is immediately discarded, so we need to
// adjust the result here
for res in &mut added {
if let Ok(hash) = res {
if discarded.contains(hash) {
*res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
let discarded_hashes =
discarded.into_iter().map(|tx| *tx.hash()).collect::<HashSet<_>>();
{
let mut listener = self.event_listener.write();
discarded_hashes.iter().for_each(|hash| listener.discarded(hash));
}
// A newly added transaction may be immediately discarded, so we need to
// adjust the result here
for res in &mut added {
if let Ok(hash) = res {
if discarded_hashes.contains(hash) {
*res = Err(PoolError::new(*hash, PoolErrorKind::DiscardedOnInsert))
}
}
}
}
@ -883,20 +903,6 @@ where
self.pool.read().is_exceeded()
}
/// Enforces the size limits of pool and returns the discarded transactions if violated.
///
/// If some of the transactions are blob transactions, they are also removed from the blob
/// store.
pub fn discard_worst(&self) -> HashSet<TxHash> {
let discarded = self.pool.write().discard_worst();
// delete any blobs associated with discarded blob transactions
self.delete_discarded_blobs(discarded.iter());
// then collect into tx hashes
discarded.into_iter().map(|tx| *tx.hash()).collect()
}
/// Inserts a blob transaction into the blob store
fn insert_blob(&self, hash: TxHash, blob: BlobTransactionSidecar) {
debug!(target: "txpool", "[{:?}] storing blob sidecar", hash);
@ -1305,23 +1311,18 @@ mod tests {
}
// Add the transaction to the pool with external origin and valid outcome.
test_pool
.add_transaction(
TransactionOrigin::External,
TransactionValidationOutcome::Valid {
balance: U256::from(1_000),
state_nonce: 0,
transaction: ValidTransaction::ValidWithSidecar {
transaction: tx,
sidecar: sidecar.clone(),
},
propagate: true,
test_pool.add_transactions(
TransactionOrigin::External,
[TransactionValidationOutcome::Valid {
balance: U256::from(1_000),
state_nonce: 0,
transaction: ValidTransaction::ValidWithSidecar {
transaction: tx,
sidecar: sidecar.clone(),
},
)
.unwrap();
// Evict the worst transactions from the pool.
test_pool.discard_worst();
propagate: true,
}],
);
}
// Assert that the size of the pool's blob component is equal to the maximum blob limit.