feat(txpool): enforce size limits (#98)

* feat(txpool): enforce size limits

* chore: rustfmt
This commit is contained in:
Matthias Seitz
2022-10-20 02:01:49 +02:00
committed by GitHub
parent 0ff0ffa7d5
commit e7851492b1
6 changed files with 191 additions and 35 deletions

View File

@ -5,11 +5,11 @@ pub(crate) const MAX_ACCOUNT_SLOTS_PER_SENDER: usize = 16;
#[derive(Debug, Clone)]
pub struct PoolConfig {
/// Max number of transaction in the pending sub-pool
pub pending_limit: usize,
pub pending_limit: SubPoolLimit,
/// Max number of transaction in the basefee sub-pool
pub basefee_limit: usize,
pub basefee_limit: SubPoolLimit,
/// Max number of transaction in the queued sub-pool
pub queued_limit: usize,
pub queued_limit: SubPoolLimit,
/// Max number of executable transaction slots guaranteed per account
pub max_account_slots: usize,
}
@ -17,10 +17,34 @@ pub struct PoolConfig {
impl Default for PoolConfig {
fn default() -> Self {
Self {
pending_limit: 10_000,
basefee_limit: 10_000,
queued_limit: 10_000,
pending_limit: Default::default(),
basefee_limit: Default::default(),
queued_limit: Default::default(),
max_account_slots: MAX_ACCOUNT_SLOTS_PER_SENDER,
}
}
}
/// Size limits for a sub-pool.
#[derive(Debug, Clone)]
pub struct SubPoolLimit {
/// Max. amount of transaction in the pool.
pub max_txs: usize,
/// Max. combined size of transactions in the pool.
pub max_size: usize,
}
impl SubPoolLimit {
/// Returns whether the size or amount constraint is violated.
#[inline]
pub fn is_exceeded(&self, txs: usize, size: usize) -> bool {
self.max_txs < txs || self.max_size < size
}
}
impl Default for SubPoolLimit {
fn default() -> Self {
// either 10k transactions or 20MB
Self { max_txs: 10_000, max_size: 20 * 1024 * 1024 }
}
}

View File

@ -17,4 +17,8 @@ pub enum PoolError {
/// Thrown when the number of unique transactions of a sender exceeded the slot capacity.
#[error("{0:?} identified as spammer. Transaction {1:?} rejected.")]
SpammerExceededCapacity(Address, TxHash),
/// Thrown when a new transaction is added to the pool, but then immediately discarded to
/// respect the size limits of the pool.
#[error("[{0:?}] Transaction discarded outright due to pool size constraints.")]
DiscardedOnInsert(TxHash),
}

View File

@ -64,7 +64,7 @@
//! category (2.) and become pending.
use crate::{
error::PoolResult,
error::{PoolError, PoolResult},
identifier::{SenderId, SenderIdentifiers, TransactionId},
pool::{listener::PoolEventListener, state::SubPool, txpool::TxPool},
traits::{NewTransactionEvent, PoolStatus, PoolTransaction, TransactionOrigin},
@ -76,7 +76,11 @@ pub use events::TransactionEvent;
use futures::channel::mpsc::{channel, Receiver, Sender};
use parking_lot::{Mutex, RwLock};
use reth_primitives::{Address, TxHash};
use std::{collections::HashMap, sync::Arc, time::Instant};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Instant,
};
use tracing::warn;
mod best;
@ -170,6 +174,9 @@ where
}
/// Add a single validated transaction into the pool.
///
/// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
/// come in through that function, either as a batch or `std::iter::once`.
fn add_transaction(
&self,
origin: TransactionOrigin,
@ -218,9 +225,28 @@ where
origin: TransactionOrigin,
transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
) -> Vec<PoolResult<TxHash>> {
// TODO check pool limits
let added =
transactions.into_iter().map(|tx| self.add_transaction(origin, tx)).collect::<Vec<_>>();
transactions.into_iter().map(|tx| self.add_transaction(origin, tx)).collect::<Vec<_>>()
// If at least one transaction was added successfully, then we enforce the pool size limits.
let discarded =
if added.iter().any(Result::is_ok) { self.discard_worst() } else { Default::default() };
if discarded.is_empty() {
return added
}
// It may happen that a newly added transaction is immediately discarded, so we need to
// adjust the result here
added
.into_iter()
.map(|res| match res {
Ok(ref hash) if discarded.contains(hash) => {
Err(PoolError::DiscardedOnInsert(*hash))
}
other => other,
})
.collect()
}
/// Notify all listeners about a new pending transaction.
@ -310,6 +336,11 @@ where
pub(crate) fn is_empty(&self) -> bool {
self.pool.read().is_empty()
}
/// Enforces the size limits of pool and returns the discarded transactions if violated.
pub(crate) fn discard_worst(&self) -> HashSet<TxHash> {
self.pool.write().discard_worst().into_iter().map(|tx| *tx.hash()).collect()
}
}
/// Tracks an added transaction and all graph changes caused by adding it.

View File

@ -15,7 +15,7 @@ pub(crate) struct ParkedPool<T: ParkedOrd> {
submission_id: u64,
/// _All_ Transactions that are currently inside the pool grouped by their identifier.
by_id: FnvHashMap<TransactionId, ParkedPoolTransaction<T>>,
/// All transactions sorted by their priority function.
/// All transactions sorted by their order function.
best: BTreeSet<ParkedPoolTransaction<T>>,
/// Keeps track of the size of this pool.
///
@ -60,6 +60,12 @@ impl<T: ParkedOrd> ParkedPool<T> {
Some(tx.transaction.into())
}
/// Removes the worst transaction from this pool.
pub(crate) fn pop_worst(&mut self) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let worst = self.best.iter().next_back().map(|tx| *tx.transaction.id())?;
self.remove_transaction(&worst)
}
fn next_id(&mut self) -> u64 {
let id = self.submission_id;
self.submission_id = self.submission_id.wrapping_add(1);

View File

@ -29,6 +29,8 @@ pub(crate) struct PendingPool<T: TransactionOrdering> {
submission_id: u64,
/// _All_ Transactions that are currently inside the pool grouped by their identifier.
by_id: BTreeMap<TransactionId, Arc<PendingTransaction<T>>>,
/// _All_ transactions sorted by priority
all: BTreeSet<PendingTransactionRef<T>>,
/// Independent transactions that can be included directly and don't require other
/// transactions.
///
@ -49,6 +51,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
ordering,
submission_id: 0,
by_id: Default::default(),
all: Default::default(),
independent_transactions: Default::default(),
size_of: Default::default(),
}
@ -111,6 +114,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
if self.ancestor(&tx_id).is_none() {
self.independent_transactions.insert(transaction.clone());
}
self.all.insert(transaction.clone());
let transaction = Arc::new(PendingTransaction { transaction });
@ -120,25 +124,28 @@ impl<T: TransactionOrdering> PendingPool<T> {
/// Removes a _mined_ transaction from the pool.
///
/// If the transactions has a descendant transaction it will advance it to the best queue.
pub(crate) fn remove_mined(&mut self, id: &TransactionId) {
if let Some(tx) = self.by_id.remove(id) {
// keep track of size
self.size_of -= tx.transaction.transaction.size();
self.independent_transactions.remove(&tx.transaction);
pub(crate) fn remove_mined(
&mut self,
id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
// mark the next as independent if it exists
if let Some(unlocked) = self.by_id.get(&id.descendant()) {
self.independent_transactions.insert(unlocked.transaction.clone());
}
}
};
self.remove_transaction(id)
}
/// Removes the transaction from the pool.
///
/// Note: this only removes the given transaction.
pub(crate) fn remove_transaction(
&mut self,
id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let tx = self.by_id.remove(id)?;
// keep track of size
self.size_of -= tx.transaction.transaction.size();
self.all.remove(&tx.transaction);
self.independent_transactions.remove(&tx.transaction);
Some(tx.transaction.transaction.clone())
}
@ -154,6 +161,12 @@ impl<T: TransactionOrdering> PendingPool<T> {
self.by_id.get(id).cloned()
}
/// Removes the worst transaction from this pool.
pub(crate) fn pop_worst(&mut self) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let worst = self.all.iter().next_back().map(|tx| *tx.transaction.id())?;
self.remove_transaction(&worst)
}
/// The reported size of all transactions in this pool.
pub(crate) fn size(&self) -> usize {
self.size_of.into()

View File

@ -16,7 +16,7 @@ use crate::{
use fnv::FnvHashMap;
use reth_primitives::TxHash;
use std::{
collections::{btree_map::Entry, hash_map, BTreeMap, HashMap},
collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet},
fmt,
ops::Bound::{Excluded, Unbounded},
sync::Arc,
@ -168,7 +168,7 @@ impl<T: TransactionOrdering> TxPool<T> {
on_chain_balance: U256,
on_chain_nonce: u64,
) -> PoolResult<AddedTransaction<T::Transaction>> {
// Update sender info
// Update sender info with balance and nonce
self.sender_info
.entry(tx.sender_id())
.or_default()
@ -235,13 +235,26 @@ impl<T: TransactionOrdering> TxPool<T> {
/// This will remove the given transaction from one sub-pool and insert it in the other
/// sub-pool.
fn move_transaction(&mut self, from: SubPool, to: SubPool, id: &TransactionId) {
if let Some(tx) = self.remove_transaction(from, id) {
if let Some(tx) = self.remove_from_subpool(from, id) {
self.add_transaction_to_pool(to, tx);
}
}
/// Removes the transaction from the given pool
/// Remove the transaction from the entire pool.
///
/// This includes the total set of transaction and the subpool it currently resides in.
fn remove_transaction(
&mut self,
id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let (tx, pool) = self.all_transactions.remove_transaction(id)?;
self.remove_from_subpool(pool, tx.id())
}
/// Removes the transaction from the given pool.
///
/// Caution: this only removes the tx from the sub-pool and not from the pool itself
fn remove_from_subpool(
&mut self,
pool: SubPool,
tx: &TransactionId,
@ -253,6 +266,31 @@ impl<T: TransactionOrdering> TxPool<T> {
}
}
/// Removes _only_ the descendants of the given transaction from the entire pool.
///
/// All removed transactions are added to the `removed` vec.
fn remove_descendants(
&mut self,
tx: &TransactionId,
removed: &mut Vec<Arc<ValidPoolTransaction<T::Transaction>>>,
) {
let mut id = *tx;
// this will essentially pop _all_ descendant transactions one by one
loop {
let descendant =
self.all_transactions.descendant_txs_exclusive(&id).map(|(id, _)| *id).next();
if let Some(descendant) = descendant {
if let Some(tx) = self.remove_transaction(&descendant) {
removed.push(tx)
}
id = descendant;
} else {
return
}
}
}
/// Removes the transaction from the given pool
fn add_transaction_to_pool(
&mut self,
@ -281,7 +319,7 @@ impl<T: TransactionOrdering> TxPool<T> {
) {
if let Some((replaced, replaced_pool)) = replaced {
// Remove the replaced transaction
self.remove_transaction(replaced_pool, replaced.id());
self.remove_from_subpool(replaced_pool, replaced.id());
}
self.add_transaction_to_pool(pool, transaction)
@ -291,8 +329,38 @@ impl<T: TransactionOrdering> TxPool<T> {
///
/// If the current size exceeds the given bounds, the worst transactions are evicted from the
/// pool and returned.
pub fn enforce_size_limits(&mut self) {
unimplemented!()
pub fn discard_worst(&mut self) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let mut removed = Vec::new();
// Helper macro that discards the worst transactions for the pools
macro_rules! discard_worst {
($this:ident, $removed:ident, [$($limit:ident => $pool:ident),*] ) => {
$ (
while $this
.config
.$limit
.is_exceeded($this.$pool.len(), $this.$pool.size())
{
if let Some(tx) = $this.$pool.pop_worst() {
let id = tx.transaction_id;
removed.push(tx);
$this.remove_descendants(&id, &mut $removed);
}
}
)*
};
}
discard_worst!(
self, removed, [
pending_limit => pending_pool,
basefee_limit => basefee_pool,
queued_limit => queued_pool
]
);
removed
}
/// Number of transactions in the entire pool
@ -408,6 +476,16 @@ impl<T: PoolTransaction> AllTransactions<T> {
.take_while(move |(other, _)| sender == other.sender)
}
/// Returns all transactions that _follow_ after the given id but have the same sender.
///
/// NOTE: The range is _exclusive_
pub(crate) fn descendant_txs_exclusive<'a, 'b: 'a>(
&'a self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a PoolInternalTransaction<T>)> + '_ {
self.txs.range((Excluded(id), Unbounded)).take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all transactions that _follow_ after the given id but have the same sender.
///
/// NOTE: The range is _exclusive_
@ -442,21 +520,21 @@ impl<T: PoolTransaction> AllTransactions<T> {
self.txs.range_mut(id..).take_while(|(other, _)| id.sender == other.sender)
}
/// Removes a transaction from the pool after it was mined.
/// Removes a transaction from the set.
///
/// This will _not_ trigger additional updates, because descendants without nonce gaps are
/// already in the pending pool, and this transaction will be the first transaction of the
/// sender in this pool.
pub(crate) fn remove_mined_tx(
pub(crate) fn remove_transaction(
&mut self,
id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T>>> {
let tx = self.txs.remove(id)?;
) -> Option<(Arc<ValidPoolTransaction<T>>, SubPool)> {
let internal = self.txs.remove(id)?;
// decrement the counter for the sender.
self.tx_decr(tx.transaction.sender_id());
self.tx_decr(internal.transaction.sender_id());
self.by_hash.remove(tx.transaction.hash())
self.by_hash.remove(internal.transaction.hash()).map(|tx| (tx, internal.subpool))
}
/// Additional checks for a new transaction.