feat(txpool) feed new pending transactions to BestTxns iterator (#4053)

This commit is contained in:
prames
2023-08-04 17:13:17 -04:00
committed by GitHub
parent df94dba14b
commit 544c51cc9f
5 changed files with 81 additions and 0 deletions

View File

@ -7,6 +7,7 @@ use std::{
collections::{BTreeMap, BTreeSet, HashSet},
sync::Arc,
};
use tokio::sync::broadcast::Receiver;
use tracing::debug;
/// An iterator that returns transactions that can be executed on the current state (*best*
@ -61,6 +62,12 @@ pub(crate) struct BestTransactions<T: TransactionOrdering> {
pub(crate) independent: BTreeSet<PendingTransaction<T>>,
/// There might be the case where a yielded transactions is invalid, this will track it.
pub(crate) invalid: HashSet<TxHash>,
/// Used to recieve any new pending transactions that have been added to the pool after this
/// iterator was snapshotted
///
/// These new pending transactions are inserted into this iterator's pool before yielding the
/// next value
pub(crate) new_transaction_reciever: Receiver<PendingTransaction<T>>,
}
impl<T: TransactionOrdering> BestTransactions<T> {
@ -76,6 +83,36 @@ impl<T: TransactionOrdering> BestTransactions<T> {
pub(crate) fn ancestor(&self, id: &TransactionId) -> Option<&PendingTransaction<T>> {
self.all.get(&id.unchecked_ancestor()?)
}
/// Non-blocking read on the new pending transactions subscription channel
fn try_recv(&mut self) -> Option<PendingTransaction<T>> {
match self.new_transaction_reciever.try_recv() {
Ok(tx) => Some(tx),
// note TryRecvError::Lagged can be returned here, which is an error that attempts to
// correct itself on consecutive try_recv() attempts
// the cost of ignoring this error is allowing old transactions to get
// overwritten after the chan buffer size is met
// this case is still better than the existing iterator behavior where no new
// pending txs are surfaced to consumers
Err(_) => None,
}
}
/// Checks for new transactions that have come into the PendingPool after this iterator was
/// created and inserts them
fn add_new_transactions(&mut self) {
while let Some(pending_tx) = self.try_recv() {
let tx = pending_tx.transaction.clone();
// same logic as PendingPool::add_transaction/PendingPool::best_with_unlocked
let tx_id = *tx.id();
if self.ancestor(&tx_id).is_none() {
self.independent.insert(pending_tx.clone());
}
self.all.insert(tx_id, pending_tx);
}
}
}
impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
@ -89,6 +126,7 @@ impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
fn next(&mut self) -> Option<Self::Item> {
loop {
self.add_new_transactions();
// Remove the next independent tx with the highest priority
let best = self.independent.pop_last()?;
let hash = best.transaction.hash();

View File

@ -10,6 +10,7 @@ use std::{
collections::{BTreeMap, BTreeSet},
sync::Arc,
};
use tokio::sync::broadcast;
/// A pool of validated and gapless transactions that are ready to be executed on the current state
/// and are waiting to be included in a block.
@ -42,6 +43,9 @@ pub(crate) struct PendingPool<T: TransactionOrdering> {
///
/// See also [`PoolTransaction::size`](crate::traits::PoolTransaction::size).
size_of: SizeTracker,
/// Used to broadcast new transactions that have been added to the PendingPool to existing
/// snapshots of this pool.
new_transaction_notifier: broadcast::Sender<PendingTransaction<T>>,
}
// === impl PendingPool ===
@ -49,6 +53,7 @@ pub(crate) struct PendingPool<T: TransactionOrdering> {
impl<T: TransactionOrdering> PendingPool<T> {
/// Create a new pool instance.
pub(crate) fn new(ordering: T) -> Self {
let (new_transaction_notifier, _) = broadcast::channel(200);
Self {
ordering,
submission_id: 0,
@ -56,6 +61,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
all: Default::default(),
independent_transactions: Default::default(),
size_of: Default::default(),
new_transaction_notifier,
}
}
@ -82,6 +88,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
all: self.by_id.clone(),
independent: self.independent_transactions.clone(),
invalid: Default::default(),
new_transaction_reciever: self.new_transaction_notifier.subscribe(),
}
}
@ -223,6 +230,11 @@ impl<T: TransactionOrdering> PendingPool<T> {
}
self.all.insert(tx.clone());
// send the new transaction to any existing pendingpool snapshot iterators
if self.new_transaction_notifier.receiver_count() > 0 {
let _ = self.new_transaction_notifier.send(tx.clone());
}
self.by_id.insert(tx_id, tx);
}

View File

@ -477,6 +477,10 @@ impl ChangedAccount {
///
/// This makes no assumptions about the order of the transactions, but expects that _all_
/// transactions are valid (no nonce gaps.) for the tracked state of the pool.
///
/// Note: this iterator will always return the best transaction that it currently knows.
/// There is no guarantee transactions will be returned sequentially in decreasing
/// priority order.
pub trait BestTransactions: Iterator + Send {
/// Mark the transaction as invalid.
///

View File

@ -2,5 +2,7 @@
#[cfg(feature = "test-utils")]
mod listeners;
#[cfg(feature = "test-utils")]
mod pending;
fn main() {}

View File

@ -0,0 +1,25 @@
use assert_matches::assert_matches;
use reth_transaction_pool::{
test_utils::{testing_pool, MockTransactionFactory},
TransactionOrigin, TransactionPool,
};
#[tokio::test(flavor = "multi_thread")]
async fn txpool_new_pending_txs() {
let txpool = testing_pool();
let mut mock_tx_factory = MockTransactionFactory::default();
let transaction = mock_tx_factory.create_eip1559();
let added_result =
txpool.add_transaction(TransactionOrigin::External, transaction.transaction.clone()).await;
assert_matches!(added_result, Ok(hash) if hash == transaction.transaction.get_hash());
let mut best_txns = txpool.best_transactions();
assert_matches!(best_txns.next(), Some(tx) if tx.transaction.get_hash() == transaction.transaction.get_hash());
assert_matches!(best_txns.next(), None);
let transaction = mock_tx_factory.create_eip1559();
let added_result =
txpool.add_transaction(TransactionOrigin::External, transaction.transaction.clone()).await;
assert_matches!(added_result, Ok(hash) if hash == transaction.transaction.get_hash());
assert_matches!(best_txns.next(), Some(tx) if tx.transaction.get_hash() == transaction.transaction.get_hash());
}