feat: discard txs by tx_count of sender (#4520)

Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
This commit is contained in:
chirag-bgh
2023-11-30 00:14:50 +05:30
committed by GitHub
parent 5a7644709b
commit 15992d9bdf
7 changed files with 842 additions and 55 deletions

View File

@ -74,6 +74,11 @@ optimism = [
"revm/optimism",
]
[[bench]]
name = "truncate"
required-features = ["test-utils", "arbitrary"]
harness = false
[[bench]]
name = "reorder"
required-features = ["test-utils", "arbitrary"]

View File

@ -0,0 +1,198 @@
use criterion::{
criterion_group, criterion_main, measurement::WallTime, BenchmarkGroup, Criterion,
};
use proptest::{
prelude::*,
strategy::{Strategy, ValueTree},
test_runner::{RngAlgorithm, TestRng, TestRunner},
};
use reth_primitives::{hex_literal::hex, Address};
use reth_transaction_pool::{
pool::{BasefeeOrd, ParkedPool, PendingPool},
test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
SubPoolLimit,
};
// constant seed to use for the rng
const SEED: [u8; 32] = hex!("1337133713371337133713371337133713371337133713371337133713371337");
/// Generates a set of `depth` dependent transactions, with the specified sender. Its values are
/// generated using [Arbitrary].
fn create_transactions_for_sender(
mut runner: TestRunner,
sender: Address,
depth: usize,
) -> Vec<MockTransaction> {
// TODO: for blob truncate, this would need a flag for _only_ generating 4844 mock transactions
// assert that depth is always greater than zero, since empty vecs do not really make sense in
// this context
assert!(depth > 0);
// make sure these are all post-eip-1559 transactions
let mut txs = prop::collection::vec(any::<MockTransaction>(), depth)
.new_tree(&mut runner)
.unwrap()
.current();
let mut nonce = 0;
for tx in txs.iter_mut() {
// reject pre-eip1559 tx types, if there is a legacy tx, replace it with an eip1559 tx
if tx.is_legacy() || tx.is_eip2930() {
*tx = MockTransaction::eip1559();
// set fee values using arbitrary
tx.set_priority_fee(any::<u128>().new_tree(&mut runner).unwrap().current());
tx.set_max_fee(any::<u128>().new_tree(&mut runner).unwrap().current());
}
tx.set_sender(sender);
tx.set_nonce(nonce);
nonce += 1;
}
txs
}
/// Generates many transactions, each with a different sender. The number of transactions per
/// sender is generated using [Arbitrary]. The number of senders is specified by `senders`.
///
/// Because this uses [Arbitrary], the number of transactions per sender needs to be bounded. This
/// is done by using the `max_depth` parameter.
///
/// This uses [create_transactions_for_sender] to generate the transactions.
fn generate_many_transactions(senders: usize, max_depth: usize) -> Vec<MockTransaction> {
let config = ProptestConfig::default();
let rng = TestRng::from_seed(RngAlgorithm::ChaCha, &SEED);
let mut runner = TestRunner::new_with_rng(config, rng);
let mut txs = Vec::new();
for idx in 0..senders {
// modulo max_depth so we know it is bounded, plus one so the minimum is always 1
let depth = any::<usize>().new_tree(&mut runner).unwrap().current() % max_depth + 1;
// set sender to an Address determined by the sender index. This should make any necessary
// debugging easier.
let idx_slice = idx.to_be_bytes();
// pad with 12 bytes of zeros before rest
let addr_slice = [0u8; 12].into_iter().chain(idx_slice.into_iter()).collect::<Vec<_>>();
let sender = Address::from_slice(&addr_slice);
txs.extend(create_transactions_for_sender(runner.clone(), sender, depth));
}
txs
}
fn txpool_truncate(c: &mut Criterion) {
let mut group = c.benchmark_group("Transaction Pool Truncate");
// the first few benchmarks (5, 10, 20, 100) should cause the txpool to hit the max tx limit,
// so they are there to make sure we do not regress on best-case performance.
//
// the last few benchmarks (1000, 2000) should hit the max tx limit, at least for large enough
// depth, so these should benchmark closer to real-world performance
for senders in [5, 10, 20, 100, 1000, 2000] {
// the max we'll be benching is 20, because MAX_ACCOUNT_SLOTS so far is 16. So 20 should be
// a reasonable worst-case benchmark
for max_depth in [5, 10, 20] {
println!("Generating transactions for benchmark with {senders} unique senders and a max depth of {max_depth}...");
let txs = generate_many_transactions(senders, max_depth);
// benchmark parked pool
truncate_parked(&mut group, "ParkedPool", txs.clone(), senders, max_depth);
// benchmark pending pool
truncate_pending(&mut group, "PendingPool", txs, senders, max_depth);
// TODO: benchmark blob truncate
}
}
let large_senders = 5000;
let max_depth = 16;
// let's run a benchmark that includes a large number of senders and max_depth of 16 to ensure
// we hit the TXPOOL_SUBPOOL_MAX_TXS_DEFAULT limit, which is currently 10k
println!("Generating transactions for large benchmark with {large_senders} unique senders and a max depth of {max_depth}...");
let txs = generate_many_transactions(large_senders, max_depth);
// benchmark parked
truncate_parked(&mut group, "ParkedPool", txs.clone(), large_senders, max_depth);
// benchmark pending
truncate_pending(&mut group, "PendingPool", txs, large_senders, max_depth);
}
fn truncate_pending(
group: &mut BenchmarkGroup<WallTime>,
description: &str,
seed: Vec<MockTransaction>,
senders: usize,
max_depth: usize,
) {
let setup = || {
let mut txpool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
for tx in seed.iter() {
// add transactions with a basefee of zero, so they are not immediately removed
txpool.add_transaction(f.validated_arc(tx.clone()), 0);
}
txpool
};
let group_id = format!(
"txpool | total txs: {} | total senders: {} | max depth: {} | {}",
seed.len(),
senders,
max_depth,
description,
);
// for now we just use the default SubPoolLimit
group.bench_function(group_id, |b| {
b.iter_with_setup(setup, |mut txpool| {
txpool.truncate_pool(SubPoolLimit::default());
std::hint::black_box(());
});
});
}
fn truncate_parked(
group: &mut BenchmarkGroup<WallTime>,
description: &str,
seed: Vec<MockTransaction>,
senders: usize,
max_depth: usize,
) {
let setup = || {
let mut txpool = ParkedPool::<BasefeeOrd<_>>::default();
let mut f = MockTransactionFactory::default();
for tx in seed.iter() {
txpool.add_transaction(f.validated_arc(tx.clone()));
}
txpool
};
let group_id = format!(
"txpool | total txs: {} | total senders: {} | max depth: {} | {}",
seed.len(),
senders,
max_depth,
description,
);
// for now we just use the default SubPoolLimit
group.bench_function(group_id, |b| {
b.iter_with_setup(setup, |mut txpool| {
txpool.truncate_pool(SubPoolLimit::default());
std::hint::black_box(());
});
});
}
criterion_group!(truncate, txpool_truncate);
criterion_main!(truncate);

View File

@ -107,6 +107,8 @@ use crate::{
};
use alloy_rlp::Encodable;
pub use listener::{AllTransactionsEvents, TransactionEvents};
pub use parked::{BasefeeOrd, ParkedOrd, ParkedPool};
pub use pending::PendingPool;
mod best;
mod blob;

View File

@ -1,8 +1,14 @@
use crate::{
identifier::TransactionId, pool::size::SizeTracker, PoolTransaction, ValidPoolTransaction,
identifier::{SenderId, TransactionId},
pool::size::SizeTracker,
PoolTransaction, SubPoolLimit, ValidPoolTransaction,
};
use std::{
cmp::Ordering,
collections::{BTreeMap, BTreeSet, BinaryHeap},
ops::{Bound::Unbounded, Deref},
sync::Arc,
};
use fnv::FnvHashMap;
use std::{cmp::Ordering, collections::BTreeSet, ops::Deref, sync::Arc};
/// A pool of transactions that are currently parked and are waiting for external changes (e.g.
/// basefee, ancestor transactions, balance) that eventually move the transaction into the pending
@ -13,14 +19,15 @@ use std::{cmp::Ordering, collections::BTreeSet, ops::Deref, sync::Arc};
///
/// Note: This type is generic over [ParkedPool] which enforces that the underlying transaction type
/// is [ValidPoolTransaction] wrapped in an [Arc].
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub(crate) struct ParkedPool<T: ParkedOrd> {
pub struct ParkedPool<T: ParkedOrd> {
/// Keeps track of transactions inserted in the pool.
///
/// This way we can determine when transactions were submitted to the pool.
submission_id: u64,
/// _All_ Transactions that are currently inside the pool grouped by their identifier.
by_id: FnvHashMap<TransactionId, ParkedPoolTransaction<T>>,
by_id: BTreeMap<TransactionId, ParkedPoolTransaction<T>>,
/// All transactions sorted by their order function.
///
/// The higher, the better.
@ -39,7 +46,7 @@ impl<T: ParkedOrd> ParkedPool<T> {
/// # Panics
///
/// If the transaction is already included.
pub(crate) fn add_transaction(&mut self, tx: Arc<ValidPoolTransaction<T::Transaction>>) {
pub fn add_transaction(&mut self, tx: Arc<ValidPoolTransaction<T::Transaction>>) {
let id = *tx.id();
assert!(
!self.by_id.contains_key(&id),
@ -51,6 +58,7 @@ impl<T: ParkedOrd> ParkedPool<T> {
// keep track of size
self.size_of += tx.size();
// update or create sender entry
let transaction = ParkedPoolTransaction { submission_id, transaction: tx.into() };
self.by_id.insert(id, transaction.clone());
@ -79,10 +87,106 @@ impl<T: ParkedOrd> ParkedPool<T> {
Some(tx.transaction.into())
}
/// Removes the worst transaction from this pool.
pub(crate) fn pop_worst(&mut self) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let worst = self.best.iter().next().map(|tx| *tx.transaction.id())?;
self.remove_transaction(&worst)
/// Get transactions by sender
pub(crate) fn get_txs_by_sender(&self, sender: SenderId) -> Vec<TransactionId> {
self.by_id
.range((sender.start_bound(), Unbounded))
.take_while(move |(other, _)| sender == other.sender)
.map(|(_, tx)| *tx.transaction.id())
.collect()
}
/// Returns sender ids sorted by each sender's last submission id. Senders with older last
/// submission ids are first. Note that _last_ submission ids are the newest submission id for
/// that sender, so this sorts senders by the last time they submitted a transaction in
/// descending order. Senders that have least recently submitted a transaction are first.
///
/// Similar to `Heartbeat` in Geth
pub fn get_senders_by_submission_id(&self) -> Vec<SubmissionSenderId> {
// iterate through by_id, and get the last submission id for each sender
let senders = self
.by_id
.iter()
.fold(Vec::new(), |mut set: Vec<SubmissionSenderId>, (_, tx)| {
if let Some(last) = set.last_mut() {
// sort by last
if last.sender_id == tx.transaction.sender_id() {
if last.submission_id < tx.submission_id {
// update last submission id
last.submission_id = tx.submission_id;
}
} else {
// new entry
set.push(SubmissionSenderId::new(
tx.transaction.sender_id(),
tx.submission_id,
));
}
} else {
// first entry
set.push(SubmissionSenderId::new(tx.transaction.sender_id(), tx.submission_id));
}
set
})
.into_iter()
// sort by submission id
.collect::<BinaryHeap<_>>();
// sort s.t. senders with older submission ids are first
senders.into_sorted_vec()
}
/// Truncates the pool by removing transactions, until the given [SubPoolLimit] has been met.
///
/// This is done by first ordering senders by the last time they have submitted a transaction,
/// using [get_senders_by_submission_id](ParkedPool::get_senders_by_submission_id) to determine
/// this ordering.
///
/// Then, for each sender, all transactions for that sender are removed, until the pool limits
/// have been met.
///
/// Any removed transactions are returned.
pub fn truncate_pool(
&mut self,
limit: SubPoolLimit,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
if self.len() <= limit.max_txs {
// if we are below the limits, we don't need to drop anything
return Vec::new()
}
let mut removed = Vec::new();
let mut sender_ids = self.get_senders_by_submission_id();
let queued = self.len();
let mut drop = queued - limit.max_txs;
while drop > 0 && !sender_ids.is_empty() {
// SAFETY: This will not panic due to `!addresses.is_empty()`
let sender_id = sender_ids.pop().unwrap().sender_id;
let mut list = self.get_txs_by_sender(sender_id);
// Drop all transactions if they are less than the overflow
if list.len() <= drop {
for txid in &list {
if let Some(tx) = self.remove_transaction(txid) {
removed.push(tx);
}
}
drop -= list.len();
continue
}
// Otherwise drop only last few transactions
// SAFETY: This will not panic because `list.len() > drop`
for txid in list.split_off(drop) {
if let Some(tx) = self.remove_transaction(&txid) {
removed.push(tx);
}
drop -= 1;
}
}
removed
}
fn next_id(&mut self) -> u64 {
@ -226,10 +330,40 @@ impl<T: ParkedOrd> Ord for ParkedPoolTransaction<T> {
}
}
/// Includes a [SenderId] and `submission_id`. This is used to sort senders by their last
/// submission id.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct SubmissionSenderId {
/// The sender id
pub(crate) sender_id: SenderId,
/// The submission id
pub(crate) submission_id: u64,
}
impl SubmissionSenderId {
/// Creates a new [SubmissionSenderId] based on the [SenderId] and `submission_id`.
fn new(sender_id: SenderId, submission_id: u64) -> Self {
Self { sender_id, submission_id }
}
}
impl Ord for SubmissionSenderId {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse ordering for `submission_id`
other.submission_id.cmp(&self.submission_id)
}
}
impl PartialOrd for SubmissionSenderId {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// Helper trait used for custom `Ord` wrappers around a transaction.
///
/// This is effectively a wrapper for `Arc<ValidPoolTransaction>` with custom `Ord` implementation.
pub(crate) trait ParkedOrd:
pub trait ParkedOrd:
Ord
+ Clone
+ From<Arc<ValidPoolTransaction<Self::Transaction>>>
@ -294,7 +428,7 @@ macro_rules! impl_ord_wrapper {
///
/// Caution: This assumes all transaction in the `BaseFee` sub-pool have a fee value.
#[derive(Debug)]
pub(crate) struct BasefeeOrd<T: PoolTransaction>(Arc<ValidPoolTransaction<T>>);
pub struct BasefeeOrd<T: PoolTransaction>(Arc<ValidPoolTransaction<T>>);
impl_ord_wrapper!(BasefeeOrd);
@ -332,6 +466,8 @@ impl<T: PoolTransaction> Ord for QueuedOrd<T> {
mod tests {
use super::*;
use crate::test_utils::{MockTransaction, MockTransactionFactory};
use reth_primitives::address;
use std::collections::HashSet;
#[test]
fn test_enforce_parked_basefee() {
@ -359,7 +495,7 @@ mod tests {
let root_tx = f.validated_arc(t.clone());
pool.add_transaction(root_tx.clone());
let descendant_tx = f.validated_arc(t.inc_nonce().inc_price());
let descendant_tx = f.validated_arc(t.inc_nonce().decr_price());
pool.add_transaction(descendant_tx.clone());
assert!(pool.by_id.contains_key(root_tx.id()));
@ -368,22 +504,178 @@ mod tests {
let removed = pool.enforce_basefee(u64::MAX);
assert!(removed.is_empty());
assert_eq!(pool.len(), 2);
// two dependent tx in the pool with decreasing fee
{
// TODO: test change might not be intended, re review
let mut pool2 = pool.clone();
let removed = pool2.enforce_basefee(descendant_tx.max_fee_per_gas() as u64);
let removed = pool2.enforce_basefee(root_tx.max_fee_per_gas() as u64);
assert_eq!(removed.len(), 1);
assert_eq!(pool2.len(), 1);
// descendant got popped
assert!(pool2.by_id.contains_key(root_tx.id()));
assert!(!pool2.by_id.contains_key(descendant_tx.id()));
// root got popped - descendant should be skipped
assert!(!pool2.by_id.contains_key(root_tx.id()));
assert!(pool2.by_id.contains_key(descendant_tx.id()));
}
// remove root transaction via root tx fee
let removed = pool.enforce_basefee(root_tx.max_fee_per_gas() as u64);
// remove root transaction via descendant tx fee
let removed = pool.enforce_basefee(descendant_tx.max_fee_per_gas() as u64);
assert_eq!(removed.len(), 2);
assert!(pool.is_empty());
}
#[test]
fn truncate_parked_by_submission_id() {
// this test ensures that we evict from the pending pool by sender
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let a = address!("000000000000000000000000000000000000000a");
let b = address!("000000000000000000000000000000000000000b");
let c = address!("000000000000000000000000000000000000000c");
let d = address!("000000000000000000000000000000000000000d");
// TODO: make creating these mock tx chains easier
// create a chain of transactions by sender A, B, C
let a1 = MockTransaction::eip1559().with_sender(a);
let a2 = a1.next();
let a3 = a2.next();
let a4 = a3.next();
let b1 = MockTransaction::eip1559().with_sender(b);
let b2 = b1.next();
let b3 = b2.next();
// C has the same number of txs as B
let c1 = MockTransaction::eip1559().with_sender(c);
let c2 = c1.next();
let c3 = c2.next();
let d1 = MockTransaction::eip1559().with_sender(d);
// just construct a list of all txs to add
let expected_parked = vec![c1.clone(), c2.clone(), c3.clone(), d1.clone()]
.into_iter()
.map(|tx| (tx.sender(), tx.nonce()))
.collect::<HashSet<_>>();
// we expect the truncate operation to go through the senders with the most txs, removing
// txs based on when they were submitted, removing the oldest txs first, until the pool is
// not over the limit
let expected_removed = vec![
a1.clone(),
a2.clone(),
a3.clone(),
a4.clone(),
b1.clone(),
b2.clone(),
b3.clone(),
]
.into_iter()
.map(|tx| (tx.sender(), tx.nonce()))
.collect::<HashSet<_>>();
let all_txs = vec![a1, a2, a3, a4, b1, b2, b3, c1, c2, c3, d1];
// add all the transactions to the pool
for tx in all_txs {
pool.add_transaction(f.validated_arc(tx));
}
// we should end up with the most recently submitted transactions
let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX };
// truncate the pool
let removed = pool.truncate_pool(pool_limit);
assert_eq!(removed.len(), expected_removed.len());
// get the inner txs from the removed txs
let removed =
removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
assert_eq!(removed, expected_removed);
// get the parked pool
let parked = pool.all().collect::<Vec<_>>();
assert_eq!(parked.len(), expected_parked.len());
// get the inner txs from the parked txs
let parked = parked.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
assert_eq!(parked, expected_parked);
}
#[test]
fn test_senders_by_submission_id() {
// this test ensures that we evict from the pending pool by sender
let mut f = MockTransactionFactory::default();
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let a = address!("000000000000000000000000000000000000000a");
let b = address!("000000000000000000000000000000000000000b");
let c = address!("000000000000000000000000000000000000000c");
let d = address!("000000000000000000000000000000000000000d");
// create a chain of transactions by sender A, B, C
let a1 = MockTransaction::eip1559().with_sender(a);
let a2 = a1.next();
let a3 = a2.next();
let a4 = a3.next();
let b1 = MockTransaction::eip1559().with_sender(b);
let b2 = b1.next();
let b3 = b2.next();
// C has the same number of txs as B
let c1 = MockTransaction::eip1559().with_sender(c);
let c2 = c1.next();
let c3 = c2.next();
let d1 = MockTransaction::eip1559().with_sender(d);
let all_txs = vec![
a1.clone(),
a2.clone(),
a3.clone(),
a4.clone(),
b1.clone(),
b2.clone(),
b3.clone(),
c1.clone(),
c2.clone(),
c3.clone(),
d1.clone(),
];
// add all the transactions to the pool
for tx in all_txs {
pool.add_transaction(f.validated_arc(tx));
}
// get senders by submission id - a4, b3, c3, d1, reversed
let senders = pool
.get_senders_by_submission_id()
.into_iter()
.map(|s| s.sender_id)
.collect::<Vec<_>>();
assert_eq!(senders.len(), 4);
let expected_senders =
vec![d, c, b, a].into_iter().map(|s| f.ids.sender_id(&s).unwrap()).collect::<Vec<_>>();
assert_eq!(senders, expected_senders);
let mut pool = ParkedPool::<BasefeeOrd<_>>::default();
let all_txs = vec![a1, b1, c1, d1, a2, b2, c2, a3, b3, c3, a4];
// add all the transactions to the pool
for tx in all_txs {
pool.add_transaction(f.validated_arc(tx));
}
let senders = pool
.get_senders_by_submission_id()
.into_iter()
.map(|s| s.sender_id)
.collect::<Vec<_>>();
assert_eq!(senders.len(), 4);
let expected_senders =
vec![a, c, b, d].into_iter().map(|s| f.ids.sender_id(&s).unwrap()).collect::<Vec<_>>();
assert_eq!(senders, expected_senders);
}
}

View File

@ -1,7 +1,7 @@
use crate::{
identifier::TransactionId,
pool::{best::BestTransactions, size::SizeTracker},
Priority, TransactionOrdering, ValidPoolTransaction,
Priority, SubPoolLimit, TransactionOrdering, ValidPoolTransaction,
};
use crate::pool::best::BestTransactionsWithBasefee;
@ -22,8 +22,9 @@ use tokio::sync::broadcast;
///
/// Once an `independent` transaction was executed it *unlocks* the next nonce, if this transaction
/// is also pending, then this will be moved to the `independent` queue.
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub(crate) struct PendingPool<T: TransactionOrdering> {
pub struct PendingPool<T: TransactionOrdering> {
/// How to order transactions.
ordering: T,
/// Keeps track of transactions inserted in the pool.
@ -34,6 +35,11 @@ pub(crate) struct PendingPool<T: TransactionOrdering> {
by_id: BTreeMap<TransactionId, PendingTransaction<T>>,
/// _All_ transactions sorted by priority
all: BTreeSet<PendingTransaction<T>>,
/// The highest nonce transactions for each sender - like the `independent` set, but the
/// highest instead of lowest nonce.
///
/// Sorted by their scoring value.
highest_nonces: BTreeSet<PendingTransaction<T>>,
/// Independent transactions that can be included directly and don't require other
/// transactions.
///
@ -52,7 +58,7 @@ pub(crate) struct PendingPool<T: TransactionOrdering> {
impl<T: TransactionOrdering> PendingPool<T> {
/// Create a new pool instance.
pub(crate) fn new(ordering: T) -> Self {
pub fn new(ordering: T) -> Self {
let (new_transaction_notifier, _) = broadcast::channel(200);
Self {
ordering,
@ -60,6 +66,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
by_id: Default::default(),
all: Default::default(),
independent_transactions: Default::default(),
highest_nonces: Default::default(),
size_of: Default::default(),
new_transaction_notifier,
}
@ -73,6 +80,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
/// Returns all transactions by id.
fn clear_transactions(&mut self) -> BTreeMap<TransactionId, PendingTransaction<T>> {
self.independent_transactions.clear();
self.highest_nonces.clear();
self.all.clear();
self.size_of.reset();
std::mem::take(&mut self.by_id)
@ -184,9 +192,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
}
} else {
self.size_of += tx.transaction.size();
if self.ancestor(&id).is_none() {
self.independent_transactions.insert(tx.clone());
}
self.update_independents_and_highest_nonces(&tx, &id);
self.all.insert(tx.clone());
self.by_id.insert(id, tx);
}
@ -232,9 +238,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
tx.priority = self.ordering.priority(&tx.transaction.transaction, base_fee);
self.size_of += tx.transaction.size();
if self.ancestor(&id).is_none() {
self.independent_transactions.insert(tx.clone());
}
self.update_independents_and_highest_nonces(&tx, &id);
self.all.insert(tx.clone());
self.by_id.insert(id, tx);
}
@ -243,6 +247,27 @@ impl<T: TransactionOrdering> PendingPool<T> {
removed
}
/// Updates the independent transaction and highest nonces set, assuming the given transaction
/// is being _added_ to the pool.
fn update_independents_and_highest_nonces(
&mut self,
tx: &PendingTransaction<T>,
tx_id: &TransactionId,
) {
let ancestor_id = tx_id.unchecked_ancestor();
if let Some(ancestor) = ancestor_id.and_then(|id| self.by_id.get(&id)) {
// the transaction already has an ancestor, so we only need to ensure that the
// highest nonces set actually contains the highest nonce for that sender
self.highest_nonces.remove(ancestor);
self.highest_nonces.insert(tx.clone());
} else {
// If there's __no__ ancestor in the pool, then this transaction is independent, this is
// guaranteed because this pool is gapless.
self.independent_transactions.insert(tx.clone());
self.highest_nonces.insert(tx.clone());
}
}
/// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
///
/// Note: for a transaction with nonce higher than the current on chain nonce this will always
@ -256,7 +281,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
/// # Panics
///
/// if the transaction is already included
pub(crate) fn add_transaction(
pub fn add_transaction(
&mut self,
tx: Arc<ValidPoolTransaction<T::Transaction>>,
base_fee: u64,
@ -276,11 +301,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
let priority = self.ordering.priority(&tx.transaction, base_fee);
let tx = PendingTransaction { submission_id, transaction: tx, priority };
// If there's __no__ ancestor in the pool, then this transaction is independent, this is
// guaranteed because this pool is gapless.
if self.ancestor(&tx_id).is_none() {
self.independent_transactions.insert(tx.clone());
}
self.update_independents_and_highest_nonces(&tx, &tx_id);
self.all.insert(tx.clone());
// send the new transaction to any existing pendingpool snapshot iterators
@ -316,6 +337,12 @@ impl<T: TransactionOrdering> PendingPool<T> {
self.size_of -= tx.transaction.size();
self.all.remove(&tx);
self.independent_transactions.remove(&tx);
// switch out for the next ancestor if there is one
self.highest_nonces.remove(&tx);
if let Some(ancestor) = self.ancestor(id) {
self.highest_nonces.insert(ancestor.clone());
}
Some(tx.transaction)
}
@ -325,10 +352,121 @@ impl<T: TransactionOrdering> PendingPool<T> {
id
}
/// Removes the worst transaction from this pool.
pub(crate) fn pop_worst(&mut self) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let worst = self.all.iter().next().map(|tx| *tx.transaction.id())?;
self.remove_transaction(&worst)
/// Traverses the pool, starting at the highest nonce set, removing the transactions which
/// would put the pool under the specified limits.
///
/// This attempts to remove transactions by roughly the same amount for each sender. This is
/// done by removing the highest-nonce transactions for each sender.
///
/// If the `remove_locals` flag is unset, transactions will be removed per-sender until a
/// local transaction is the highest nonce transaction for that sender. If all senders have a
/// local highest-nonce transaction, the pool will not be truncated further.
///
/// Otherwise, if the `remove_locals` flag is set, transactions will be removed per-sender
/// until the pool is under the given limits.
///
/// Any removed transactions will be added to the `end_removed` vector.
pub fn remove_to_limit(
&mut self,
limit: &SubPoolLimit,
remove_locals: bool,
end_removed: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
) {
// This serves as a termination condition for the loop - it represents the number of
// _valid_ unique senders that might have descendants in the pool.
//
// If `remove_locals` is false, a value of zero means that there are no non-local txs in the
// pool that can be removed.
//
// If `remove_locals` is true, a value of zero means that there are no txs in the pool that
// can be removed.
let mut non_local_senders = self.highest_nonces.len();
// keep track of unique senders from previous iterations, to understand how many unique
// senders were removed in the last iteration
let mut unique_senders = self.highest_nonces.len();
// keep track of transactions to remove and how many have been removed so far
let original_length = self.len();
let mut removed = Vec::new();
let mut total_removed = 0;
// track total `size` of transactions to remove
let original_size = self.size();
let mut total_size = 0;
loop {
// check how many unique senders were removed last iteration
let unique_removed = unique_senders - self.highest_nonces.len();
// the new number of unique senders
unique_senders = self.highest_nonces.len();
non_local_senders -= unique_removed;
// we can re-use the temp array
removed.clear();
// loop through the highest nonces set, removing transactions until we reach the limit
for tx in self.highest_nonces.iter() {
// return early if the pool is under limits
if original_size - total_size <= limit.max_size &&
original_length - total_removed <= limit.max_txs ||
non_local_senders == 0
{
// need to remove remaining transactions before exiting
for id in &removed {
if let Some(tx) = self.remove_transaction(id) {
end_removed.push(tx);
}
}
return
}
if !remove_locals && tx.transaction.is_local() {
non_local_senders -= 1;
continue
}
total_size += tx.transaction.size();
total_removed += 1;
removed.push(*tx.transaction.id());
}
// remove the transactions from this iteration
for id in &removed {
if let Some(tx) = self.remove_transaction(id) {
end_removed.push(tx);
}
}
}
}
/// Truncates the pool to the given [SubPoolLimit], removing transactions until the subpool
/// limits are met.
///
/// This attempts to remove transactions by rougly the same amount for each sender. For more
/// information on this exact process see docs for
/// [remove_to_limit](PendingPool::remove_to_limit).
///
/// This first truncates all of the non-local transactions in the pool. If the subpool is still
/// not under the limit, this truncates the entire pool, including non-local transactions. The
/// removed transactions are returned.
pub fn truncate_pool(
&mut self,
limit: SubPoolLimit,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let mut removed = Vec::new();
self.remove_to_limit(&limit, false, &mut removed);
if self.size() <= limit.max_size && self.len() <= limit.max_txs {
return removed
}
// now repeat for local transactions
self.remove_to_limit(&limit, true, &mut removed);
removed
}
/// The reported size of all transactions in this pool.
@ -361,6 +499,14 @@ impl<T: TransactionOrdering> PendingPool<T> {
self.independent_transactions.len() <= self.all.len(),
"independent.len() > all.len()"
);
assert!(
self.highest_nonces.len() <= self.all.len(),
"independent_descendants.len() > all.len()"
);
assert!(
self.highest_nonces.len() == self.independent_transactions.len(),
"independent.len() = independent_descendants.len()"
);
}
}
@ -418,6 +564,10 @@ impl<T: TransactionOrdering> Ord for PendingTransaction<T> {
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use reth_primitives::address;
use super::*;
use crate::{
test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
@ -458,6 +608,7 @@ mod tests {
assert_eq!(pool.len(), 2);
assert_eq!(pool.independent_transactions.len(), 1);
assert_eq!(pool.highest_nonces.len(), 1);
let removed = pool.update_base_fee(0);
assert!(removed.is_empty());
@ -478,6 +629,7 @@ mod tests {
let removed = pool.update_base_fee((root_tx.max_fee_per_gas() + 1) as u64);
assert_eq!(removed.len(), 2);
assert!(pool.is_empty());
pool.assert_invariants();
}
#[test]
@ -492,6 +644,151 @@ mod tests {
pool.add_transaction(f.validated_arc(t2), 0);
// First transaction should be evicted.
assert_eq!(pool.pop_worst().map(|tx| *tx.hash()), Some(*t.hash()));
assert_eq!(
pool.highest_nonces.iter().next().map(|tx| *tx.transaction.hash()),
Some(*t.hash())
);
// truncate pool with max size = 1, ensure it's the same transaction
let removed = pool.truncate_pool(SubPoolLimit { max_txs: 1, max_size: usize::MAX });
assert_eq!(removed.len(), 1);
assert_eq!(removed[0].hash(), t.hash());
}
#[test]
fn correct_independent_descendants() {
// this test ensures that we set the right highest nonces set for each sender
let mut f = MockTransactionFactory::default();
let mut pool = PendingPool::new(MockOrdering::default());
let a = address!("000000000000000000000000000000000000000a");
let b = address!("000000000000000000000000000000000000000b");
let c = address!("000000000000000000000000000000000000000c");
let d = address!("000000000000000000000000000000000000000d");
// create a chain of transactions by sender A, B, C
let a1 = MockTransaction::eip1559().with_sender(a);
let a2 = a1.next();
let a3 = a2.next();
let a4 = a3.next();
let b1 = MockTransaction::eip1559().with_sender(b);
let b2 = b1.next();
let b3 = b2.next();
// C has the same number of txs as B
let c1 = MockTransaction::eip1559().with_sender(c);
let c2 = c1.next();
let c3 = c2.next();
let d1 = MockTransaction::eip1559().with_sender(d);
// add all the transactions to the pool
let all_txs =
vec![a1, a2, a3, a4.clone(), b1, b2, b3.clone(), c1, c2, c3.clone(), d1.clone()];
for tx in all_txs {
pool.add_transaction(f.validated_arc(tx), 0);
}
pool.assert_invariants();
// the independent set is the roots of each of these tx chains, these are the highest
// nonces for each sender
let expected_highest_nonces =
vec![d1, c3, b3, a4].iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
let actual_highest_nonces = pool
.highest_nonces
.iter()
.map(|tx| (tx.transaction.sender(), tx.transaction.nonce()))
.collect::<HashSet<_>>();
assert_eq!(expected_highest_nonces, actual_highest_nonces);
pool.assert_invariants();
}
#[test]
fn truncate_by_sender() {
// this test ensures that we evict from the pending pool by sender
let mut f = MockTransactionFactory::default();
let mut pool = PendingPool::new(MockOrdering::default());
let a = address!("000000000000000000000000000000000000000a");
let b = address!("000000000000000000000000000000000000000b");
let c = address!("000000000000000000000000000000000000000c");
let d = address!("000000000000000000000000000000000000000d");
// TODO: make creating these mock tx chains easier
// create a chain of transactions by sender A, B, C
let a1 = MockTransaction::eip1559().with_sender(a);
let a2 = a1.next();
let a3 = a2.next();
let a4 = a3.next();
let b1 = MockTransaction::eip1559().with_sender(b);
let b2 = b1.next();
let b3 = b2.next();
// C has the same number of txs as B
let c1 = MockTransaction::eip1559().with_sender(c);
let c2 = c1.next();
let c3 = c2.next();
let d1 = MockTransaction::eip1559().with_sender(d);
// just construct a list of all txs to add
let expected_pending = vec![a1.clone(), b1.clone(), c1.clone(), a2.clone()]
.into_iter()
.map(|tx| (tx.sender(), tx.nonce()))
.collect::<HashSet<_>>();
let expected_removed = vec![
d1.clone(),
c3.clone(),
b3.clone(),
a4.clone(),
c2.clone(),
b2.clone(),
a3.clone(),
]
.into_iter()
.map(|tx| (tx.sender(), tx.nonce()))
.collect::<HashSet<_>>();
let all_txs = vec![a1, a2, a3, a4.clone(), b1, b2, b3, c1, c2, c3, d1];
// add all the transactions to the pool
for tx in all_txs {
pool.add_transaction(f.validated_arc(tx), 0);
}
// sanity check, make sure everything checks out
pool.assert_invariants();
// let's set the max total txs to 4, since we remove txs for each sender first, we remove
// in this order:
// * d1, c3, b3, a4
// * c2, b2, a3
//
// and we are left with:
// * a1, a2
// * b1
// * c1
let pool_limit = SubPoolLimit { max_txs: 4, max_size: usize::MAX };
// truncate the pool
let removed = pool.truncate_pool(pool_limit);
pool.assert_invariants();
assert_eq!(removed.len(), expected_removed.len());
// get the inner txs from the removed txs
let removed =
removed.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
assert_eq!(removed, expected_removed);
// get the pending pool
let pending = pool.all().collect::<Vec<_>>();
assert_eq!(pending.len(), expected_pending.len());
// get the inner txs from the pending txs
let pending =
pending.into_iter().map(|tx| (tx.sender(), tx.nonce())).collect::<HashSet<_>>();
assert_eq!(pending, expected_pending);
}
}

View File

@ -793,18 +793,9 @@ impl<T: TransactionOrdering> TxPool<T> {
.$limit
.is_exceeded($this.$pool.len(), $this.$pool.size())
{
// pops the worst transaction from the sub-pool
if let Some(tx) = $this.$pool.pop_worst() {
let id = tx.transaction_id;
// now that the tx is removed from the sub-pool, we need to remove it also from the total set
$this.all_transactions.remove_transaction(&id);
// record the removed transaction
removed.push(tx);
// this might have introduced a nonce gap, so we also discard any descendants
$this.remove_descendants(&id, &mut $removed);
removed = $this.$pool.truncate_pool($this.config.$limit.clone());
for tx in removed.clone().iter() {
$this.remove_descendants(tx.id(), &mut $removed);
}
}

View File

@ -898,8 +898,8 @@ impl proptest::arbitrary::Arbitrary for MockTransaction {
fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
use proptest::prelude::{any, Strategy};
any::<(Transaction, Address, B256, BlobTransactionSidecar)>()
.prop_map(|(tx, sender, tx_hash, sidecar)| match &tx {
any::<(Transaction, Address, B256)>()
.prop_map(|(tx, sender, tx_hash)| match &tx {
Transaction::Legacy(TxLegacy {
nonce,
gas_price,
@ -972,7 +972,9 @@ impl proptest::arbitrary::Arbitrary for MockTransaction {
value: (*value).into(),
input: (*input).clone(),
accesslist: (*access_list).clone(),
sidecar,
// only generate a sidecar if it is a 4844 tx - also for the sake of
// performance just use a default sidecar
sidecar: BlobTransactionSidecar::default(),
},
_ => unimplemented!(),
})