From e7851492b11340969a0447b985e16a1c5fe92f3b Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 20 Oct 2022 02:01:49 +0200 Subject: [PATCH] feat(txpool): enforce size limits (#98) * feat(txpool): enforce size limits * chore: rustfmt --- crates/transaction-pool/src/config.rs | 36 +++++-- crates/transaction-pool/src/error.rs | 4 + crates/transaction-pool/src/pool/mod.rs | 39 +++++++- crates/transaction-pool/src/pool/parked.rs | 8 +- crates/transaction-pool/src/pool/pending.rs | 35 ++++--- crates/transaction-pool/src/pool/txpool.rs | 104 +++++++++++++++++--- 6 files changed, 191 insertions(+), 35 deletions(-) diff --git a/crates/transaction-pool/src/config.rs b/crates/transaction-pool/src/config.rs index 5472647e0..0b5ef1488 100644 --- a/crates/transaction-pool/src/config.rs +++ b/crates/transaction-pool/src/config.rs @@ -5,11 +5,11 @@ pub(crate) const MAX_ACCOUNT_SLOTS_PER_SENDER: usize = 16; #[derive(Debug, Clone)] pub struct PoolConfig { /// Max number of transaction in the pending sub-pool - pub pending_limit: usize, + pub pending_limit: SubPoolLimit, /// Max number of transaction in the basefee sub-pool - pub basefee_limit: usize, + pub basefee_limit: SubPoolLimit, /// Max number of transaction in the queued sub-pool - pub queued_limit: usize, + pub queued_limit: SubPoolLimit, /// Max number of executable transaction slots guaranteed per account pub max_account_slots: usize, } @@ -17,10 +17,34 @@ pub struct PoolConfig { impl Default for PoolConfig { fn default() -> Self { Self { - pending_limit: 10_000, - basefee_limit: 10_000, - queued_limit: 10_000, + pending_limit: Default::default(), + basefee_limit: Default::default(), + queued_limit: Default::default(), max_account_slots: MAX_ACCOUNT_SLOTS_PER_SENDER, } } } + +/// Size limits for a sub-pool. +#[derive(Debug, Clone)] +pub struct SubPoolLimit { + /// Max. amount of transaction in the pool. + pub max_txs: usize, + /// Max. combined size of transactions in the pool. + pub max_size: usize, +} + +impl SubPoolLimit { + /// Returns whether the size or amount constraint is violated. + #[inline] + pub fn is_exceeded(&self, txs: usize, size: usize) -> bool { + self.max_txs < txs || self.max_size < size + } +} + +impl Default for SubPoolLimit { + fn default() -> Self { + // either 10k transactions or 20MB + Self { max_txs: 10_000, max_size: 20 * 1024 * 1024 } + } +} diff --git a/crates/transaction-pool/src/error.rs b/crates/transaction-pool/src/error.rs index 342575781..12c1b6282 100644 --- a/crates/transaction-pool/src/error.rs +++ b/crates/transaction-pool/src/error.rs @@ -17,4 +17,8 @@ pub enum PoolError { /// Thrown when the number of unique transactions of a sender exceeded the slot capacity. #[error("{0:?} identified as spammer. Transaction {1:?} rejected.")] SpammerExceededCapacity(Address, TxHash), + /// Thrown when a new transaction is added to the pool, but then immediately discarded to + /// respect the size limits of the pool. + #[error("[{0:?}] Transaction discarded outright due to pool size constraints.")] + DiscardedOnInsert(TxHash), } diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 474aa936b..130c99816 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -64,7 +64,7 @@ //! category (2.) and become pending. use crate::{ - error::PoolResult, + error::{PoolError, PoolResult}, identifier::{SenderId, SenderIdentifiers, TransactionId}, pool::{listener::PoolEventListener, state::SubPool, txpool::TxPool}, traits::{NewTransactionEvent, PoolStatus, PoolTransaction, TransactionOrigin}, @@ -76,7 +76,11 @@ pub use events::TransactionEvent; use futures::channel::mpsc::{channel, Receiver, Sender}; use parking_lot::{Mutex, RwLock}; use reth_primitives::{Address, TxHash}; -use std::{collections::HashMap, sync::Arc, time::Instant}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Instant, +}; use tracing::warn; mod best; @@ -170,6 +174,9 @@ where } /// Add a single validated transaction into the pool. + /// + /// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s) + /// come in through that function, either as a batch or `std::iter::once`. fn add_transaction( &self, origin: TransactionOrigin, @@ -218,9 +225,28 @@ where origin: TransactionOrigin, transactions: impl IntoIterator>, ) -> Vec> { - // TODO check pool limits + let added = + transactions.into_iter().map(|tx| self.add_transaction(origin, tx)).collect::>(); - transactions.into_iter().map(|tx| self.add_transaction(origin, tx)).collect::>() + // If at least one transaction was added successfully, then we enforce the pool size limits. + let discarded = + if added.iter().any(Result::is_ok) { self.discard_worst() } else { Default::default() }; + + if discarded.is_empty() { + return added + } + + // It may happen that a newly added transaction is immediately discarded, so we need to + // adjust the result here + added + .into_iter() + .map(|res| match res { + Ok(ref hash) if discarded.contains(hash) => { + Err(PoolError::DiscardedOnInsert(*hash)) + } + other => other, + }) + .collect() } /// Notify all listeners about a new pending transaction. @@ -310,6 +336,11 @@ where pub(crate) fn is_empty(&self) -> bool { self.pool.read().is_empty() } + + /// Enforces the size limits of pool and returns the discarded transactions if violated. + pub(crate) fn discard_worst(&self) -> HashSet { + self.pool.write().discard_worst().into_iter().map(|tx| *tx.hash()).collect() + } } /// Tracks an added transaction and all graph changes caused by adding it. diff --git a/crates/transaction-pool/src/pool/parked.rs b/crates/transaction-pool/src/pool/parked.rs index c572fc7ef..038bb44ac 100644 --- a/crates/transaction-pool/src/pool/parked.rs +++ b/crates/transaction-pool/src/pool/parked.rs @@ -15,7 +15,7 @@ pub(crate) struct ParkedPool { submission_id: u64, /// _All_ Transactions that are currently inside the pool grouped by their identifier. by_id: FnvHashMap>, - /// All transactions sorted by their priority function. + /// All transactions sorted by their order function. best: BTreeSet>, /// Keeps track of the size of this pool. /// @@ -60,6 +60,12 @@ impl ParkedPool { Some(tx.transaction.into()) } + /// Removes the worst transaction from this pool. + pub(crate) fn pop_worst(&mut self) -> Option>> { + let worst = self.best.iter().next_back().map(|tx| *tx.transaction.id())?; + self.remove_transaction(&worst) + } + fn next_id(&mut self) -> u64 { let id = self.submission_id; self.submission_id = self.submission_id.wrapping_add(1); diff --git a/crates/transaction-pool/src/pool/pending.rs b/crates/transaction-pool/src/pool/pending.rs index 2c582d677..35694c166 100644 --- a/crates/transaction-pool/src/pool/pending.rs +++ b/crates/transaction-pool/src/pool/pending.rs @@ -29,6 +29,8 @@ pub(crate) struct PendingPool { submission_id: u64, /// _All_ Transactions that are currently inside the pool grouped by their identifier. by_id: BTreeMap>>, + /// _All_ transactions sorted by priority + all: BTreeSet>, /// Independent transactions that can be included directly and don't require other /// transactions. /// @@ -49,6 +51,7 @@ impl PendingPool { ordering, submission_id: 0, by_id: Default::default(), + all: Default::default(), independent_transactions: Default::default(), size_of: Default::default(), } @@ -111,6 +114,7 @@ impl PendingPool { if self.ancestor(&tx_id).is_none() { self.independent_transactions.insert(transaction.clone()); } + self.all.insert(transaction.clone()); let transaction = Arc::new(PendingTransaction { transaction }); @@ -120,25 +124,28 @@ impl PendingPool { /// Removes a _mined_ transaction from the pool. /// /// If the transactions has a descendant transaction it will advance it to the best queue. - pub(crate) fn remove_mined(&mut self, id: &TransactionId) { - if let Some(tx) = self.by_id.remove(id) { - // keep track of size - self.size_of -= tx.transaction.transaction.size(); - self.independent_transactions.remove(&tx.transaction); - - // mark the next as independent if it exists - if let Some(unlocked) = self.by_id.get(&id.descendant()) { - self.independent_transactions.insert(unlocked.transaction.clone()); - } - } + pub(crate) fn remove_mined( + &mut self, + id: &TransactionId, + ) -> Option>> { + // mark the next as independent if it exists + if let Some(unlocked) = self.by_id.get(&id.descendant()) { + self.independent_transactions.insert(unlocked.transaction.clone()); + }; + self.remove_transaction(id) } /// Removes the transaction from the pool. + /// + /// Note: this only removes the given transaction. pub(crate) fn remove_transaction( &mut self, id: &TransactionId, ) -> Option>> { let tx = self.by_id.remove(id)?; + // keep track of size + self.size_of -= tx.transaction.transaction.size(); + self.all.remove(&tx.transaction); self.independent_transactions.remove(&tx.transaction); Some(tx.transaction.transaction.clone()) } @@ -154,6 +161,12 @@ impl PendingPool { self.by_id.get(id).cloned() } + /// Removes the worst transaction from this pool. + pub(crate) fn pop_worst(&mut self) -> Option>> { + let worst = self.all.iter().next_back().map(|tx| *tx.transaction.id())?; + self.remove_transaction(&worst) + } + /// The reported size of all transactions in this pool. pub(crate) fn size(&self) -> usize { self.size_of.into() diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index e53421f14..815056a7c 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -16,7 +16,7 @@ use crate::{ use fnv::FnvHashMap; use reth_primitives::TxHash; use std::{ - collections::{btree_map::Entry, hash_map, BTreeMap, HashMap}, + collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet}, fmt, ops::Bound::{Excluded, Unbounded}, sync::Arc, @@ -168,7 +168,7 @@ impl TxPool { on_chain_balance: U256, on_chain_nonce: u64, ) -> PoolResult> { - // Update sender info + // Update sender info with balance and nonce self.sender_info .entry(tx.sender_id()) .or_default() @@ -235,13 +235,26 @@ impl TxPool { /// This will remove the given transaction from one sub-pool and insert it in the other /// sub-pool. fn move_transaction(&mut self, from: SubPool, to: SubPool, id: &TransactionId) { - if let Some(tx) = self.remove_transaction(from, id) { + if let Some(tx) = self.remove_from_subpool(from, id) { self.add_transaction_to_pool(to, tx); } } - /// Removes the transaction from the given pool + /// Remove the transaction from the entire pool. + /// + /// This includes the total set of transaction and the subpool it currently resides in. fn remove_transaction( + &mut self, + id: &TransactionId, + ) -> Option>> { + let (tx, pool) = self.all_transactions.remove_transaction(id)?; + self.remove_from_subpool(pool, tx.id()) + } + + /// Removes the transaction from the given pool. + /// + /// Caution: this only removes the tx from the sub-pool and not from the pool itself + fn remove_from_subpool( &mut self, pool: SubPool, tx: &TransactionId, @@ -253,6 +266,31 @@ impl TxPool { } } + /// Removes _only_ the descendants of the given transaction from the entire pool. + /// + /// All removed transactions are added to the `removed` vec. + fn remove_descendants( + &mut self, + tx: &TransactionId, + removed: &mut Vec>>, + ) { + let mut id = *tx; + + // this will essentially pop _all_ descendant transactions one by one + loop { + let descendant = + self.all_transactions.descendant_txs_exclusive(&id).map(|(id, _)| *id).next(); + if let Some(descendant) = descendant { + if let Some(tx) = self.remove_transaction(&descendant) { + removed.push(tx) + } + id = descendant; + } else { + return + } + } + } + /// Removes the transaction from the given pool fn add_transaction_to_pool( &mut self, @@ -281,7 +319,7 @@ impl TxPool { ) { if let Some((replaced, replaced_pool)) = replaced { // Remove the replaced transaction - self.remove_transaction(replaced_pool, replaced.id()); + self.remove_from_subpool(replaced_pool, replaced.id()); } self.add_transaction_to_pool(pool, transaction) @@ -291,8 +329,38 @@ impl TxPool { /// /// If the current size exceeds the given bounds, the worst transactions are evicted from the /// pool and returned. - pub fn enforce_size_limits(&mut self) { - unimplemented!() + pub fn discard_worst(&mut self) -> Vec>> { + let mut removed = Vec::new(); + + // Helper macro that discards the worst transactions for the pools + macro_rules! discard_worst { + ($this:ident, $removed:ident, [$($limit:ident => $pool:ident),*] ) => { + $ ( + while $this + .config + .$limit + .is_exceeded($this.$pool.len(), $this.$pool.size()) + { + if let Some(tx) = $this.$pool.pop_worst() { + let id = tx.transaction_id; + removed.push(tx); + $this.remove_descendants(&id, &mut $removed); + } + } + + )* + }; + } + + discard_worst!( + self, removed, [ + pending_limit => pending_pool, + basefee_limit => basefee_pool, + queued_limit => queued_pool + ] + ); + + removed } /// Number of transactions in the entire pool @@ -408,6 +476,16 @@ impl AllTransactions { .take_while(move |(other, _)| sender == other.sender) } + /// Returns all transactions that _follow_ after the given id but have the same sender. + /// + /// NOTE: The range is _exclusive_ + pub(crate) fn descendant_txs_exclusive<'a, 'b: 'a>( + &'a self, + id: &'b TransactionId, + ) -> impl Iterator)> + '_ { + self.txs.range((Excluded(id), Unbounded)).take_while(|(other, _)| id.sender == other.sender) + } + /// Returns all transactions that _follow_ after the given id but have the same sender. /// /// NOTE: The range is _exclusive_ @@ -442,21 +520,21 @@ impl AllTransactions { self.txs.range_mut(id..).take_while(|(other, _)| id.sender == other.sender) } - /// Removes a transaction from the pool after it was mined. + /// Removes a transaction from the set. /// /// This will _not_ trigger additional updates, because descendants without nonce gaps are /// already in the pending pool, and this transaction will be the first transaction of the /// sender in this pool. - pub(crate) fn remove_mined_tx( + pub(crate) fn remove_transaction( &mut self, id: &TransactionId, - ) -> Option>> { - let tx = self.txs.remove(id)?; + ) -> Option<(Arc>, SubPool)> { + let internal = self.txs.remove(id)?; // decrement the counter for the sender. - self.tx_decr(tx.transaction.sender_id()); + self.tx_decr(internal.transaction.sender_id()); - self.by_hash.remove(tx.transaction.hash()) + self.by_hash.remove(internal.transaction.hash()).map(|tx| (tx, internal.subpool)) } /// Additional checks for a new transaction.