feat(txpool): keep track of pool size (#95)

* feat(txpool): add PoolStatus Api

* feat(txpool): track sizes

* feat(txpool): add size tracking

* cleanup
This commit is contained in:
Matthias Seitz
2022-10-18 23:47:20 +02:00
committed by GitHub
parent 0fc446c5a2
commit 1e7d3ae57e
8 changed files with 95 additions and 9 deletions

View File

@ -84,6 +84,7 @@ mod events;
mod listener; mod listener;
mod parked; mod parked;
mod pending; mod pending;
pub(crate) mod size;
pub(crate) mod state; pub(crate) mod state;
mod transaction; mod transaction;
pub mod txpool; pub mod txpool;

View File

@ -1,4 +1,6 @@
use crate::{identifier::TransactionId, PoolTransaction, ValidPoolTransaction}; use crate::{
identifier::TransactionId, pool::size::SizeTracker, PoolTransaction, ValidPoolTransaction,
};
use fnv::FnvHashMap; use fnv::FnvHashMap;
use std::{cmp::Ordering, collections::BTreeSet, ops::Deref, sync::Arc}; use std::{cmp::Ordering, collections::BTreeSet, ops::Deref, sync::Arc};
@ -15,6 +17,10 @@ pub(crate) struct ParkedPool<T: ParkedOrd> {
by_id: FnvHashMap<TransactionId, ParkedPoolTransaction<T>>, by_id: FnvHashMap<TransactionId, ParkedPoolTransaction<T>>,
/// All transactions sorted by their priority function. /// All transactions sorted by their priority function.
best: BTreeSet<ParkedPoolTransaction<T>>, best: BTreeSet<ParkedPoolTransaction<T>>,
/// Keeps track of the size of this pool.
///
/// See also [`PoolTransaction::size`].
size_of: SizeTracker,
} }
// === impl QueuedPool === // === impl QueuedPool ===
@ -30,6 +36,9 @@ impl<T: ParkedOrd> ParkedPool<T> {
assert!(!self.by_id.contains_key(&id), "transaction already included"); assert!(!self.by_id.contains_key(&id), "transaction already included");
let submission_id = self.next_id(); let submission_id = self.next_id();
// keep track of size
self.size_of += tx.size();
let transaction = ParkedPoolTransaction { submission_id, transaction: tx.into() }; let transaction = ParkedPoolTransaction { submission_id, transaction: tx.into() };
self.by_id.insert(id, transaction.clone()); self.by_id.insert(id, transaction.clone());
@ -41,8 +50,13 @@ impl<T: ParkedOrd> ParkedPool<T> {
&mut self, &mut self,
id: &TransactionId, id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> { ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
// remove from queues
let tx = self.by_id.remove(id)?; let tx = self.by_id.remove(id)?;
self.best.remove(&tx); self.best.remove(&tx);
// keep track of size
self.size_of -= tx.transaction.size();
Some(tx.transaction.into()) Some(tx.transaction.into())
} }
@ -52,6 +66,11 @@ impl<T: ParkedOrd> ParkedPool<T> {
id id
} }
/// The reported size of all transactions in this pool.
pub(crate) fn size(&self) -> usize {
self.size_of.into()
}
/// Number of transactions in the entire pool /// Number of transactions in the entire pool
pub(crate) fn len(&self) -> usize { pub(crate) fn len(&self) -> usize {
self.by_id.len() self.by_id.len()
@ -65,7 +84,12 @@ impl<T: ParkedOrd> ParkedPool<T> {
impl<T: ParkedOrd> Default for ParkedPool<T> { impl<T: ParkedOrd> Default for ParkedPool<T> {
fn default() -> Self { fn default() -> Self {
Self { submission_id: 0, by_id: Default::default(), best: Default::default() } Self {
submission_id: 0,
by_id: Default::default(),
best: Default::default(),
size_of: Default::default(),
}
} }
} }

View File

@ -1,6 +1,7 @@
use crate::{ use crate::{
identifier::TransactionId, pool::best::BestTransactions, TransactionOrdering, identifier::TransactionId,
ValidPoolTransaction, pool::{best::BestTransactions, size::SizeTracker},
TransactionOrdering, ValidPoolTransaction,
}; };
use reth_primitives::rpc::TxHash; use reth_primitives::rpc::TxHash;
use std::{ use std::{
@ -33,6 +34,10 @@ pub(crate) struct PendingPool<T: TransactionOrdering> {
/// ///
/// Sorted by their scoring value. /// Sorted by their scoring value.
independent_transactions: BTreeSet<PendingTransactionRef<T>>, independent_transactions: BTreeSet<PendingTransactionRef<T>>,
/// Keeps track of the size of this pool.
///
/// See also [`PoolTransaction::size`].
size_of: SizeTracker,
} }
// === impl PendingPool === // === impl PendingPool ===
@ -45,6 +50,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
submission_id: 0, submission_id: 0,
by_id: Default::default(), by_id: Default::default(),
independent_transactions: Default::default(), independent_transactions: Default::default(),
size_of: Default::default(),
} }
} }
@ -95,6 +101,9 @@ impl<T: TransactionOrdering> PendingPool<T> {
let priority = self.ordering.priority(&tx.transaction); let priority = self.ordering.priority(&tx.transaction);
// keep track of size
self.size_of += tx.size();
let transaction = PendingTransactionRef { submission_id, transaction: tx, priority }; let transaction = PendingTransactionRef { submission_id, transaction: tx, priority };
// If there's __no__ ancestor in the pool, then this transaction is independent, this is // If there's __no__ ancestor in the pool, then this transaction is independent, this is
@ -113,6 +122,8 @@ impl<T: TransactionOrdering> PendingPool<T> {
/// If the transactions has a descendant transaction it will advance it to the best queue. /// If the transactions has a descendant transaction it will advance it to the best queue.
pub(crate) fn remove_mined(&mut self, id: &TransactionId) { pub(crate) fn remove_mined(&mut self, id: &TransactionId) {
if let Some(tx) = self.by_id.remove(id) { if let Some(tx) = self.by_id.remove(id) {
// keep track of size
self.size_of -= tx.transaction.transaction.size();
self.independent_transactions.remove(&tx.transaction); self.independent_transactions.remove(&tx.transaction);
// mark the next as independent if it exists // mark the next as independent if it exists
@ -143,6 +154,11 @@ impl<T: TransactionOrdering> PendingPool<T> {
self.by_id.get(id).cloned() self.by_id.get(id).cloned()
} }
/// The reported size of all transactions in this pool.
pub(crate) fn size(&self) -> usize {
self.size_of.into()
}
/// Number of transactions in the entire pool /// Number of transactions in the entire pool
pub(crate) fn len(&self) -> usize { pub(crate) fn len(&self) -> usize {
self.by_id.len() self.by_id.len()

View File

@ -0,0 +1,29 @@
//! Tracks a size value.
use std::ops::{AddAssign, SubAssign};
/// Keeps track of accumulated size in bytes.
///
/// Note: We do not assume that size tracking is always exact. Depending on the bookkeeping of the
/// additions and subtractions the total size might be slightly off. Therefore, the underlying value
/// is an `isize`, so that the value does not wrap.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct SizeTracker(isize);
impl AddAssign<usize> for SizeTracker {
fn add_assign(&mut self, rhs: usize) {
self.0 += (rhs as isize)
}
}
impl SubAssign<usize> for SizeTracker {
fn sub_assign(&mut self, rhs: usize) {
self.0 -= (rhs as isize)
}
}
impl From<SizeTracker> for usize {
fn from(value: SizeTracker) -> Self {
value.0 as usize
}
}

View File

@ -103,8 +103,11 @@ impl<T: TransactionOrdering> TxPool<T> {
pub(crate) fn status(&self) -> PoolStatus { pub(crate) fn status(&self) -> PoolStatus {
PoolStatus { PoolStatus {
pending: self.pending_pool.len(), pending: self.pending_pool.len(),
pending_size: self.pending_pool.size(),
basefee: self.basefee_pool.len(), basefee: self.basefee_pool.len(),
basefee_size: self.basefee_pool.size(),
queued: self.queued_pool.len(), queued: self.queued_pool.len(),
queued_size: self.queued_pool.size(),
} }
} }
@ -284,11 +287,6 @@ impl<T: TransactionOrdering> TxPool<T> {
self.add_transaction_to_pool(pool, transaction) self.add_transaction_to_pool(pool, transaction)
} }
/// Returns the current size of the entire pool
pub fn size_of(&self) -> usize {
unimplemented!()
}
/// Ensures that the transactions in the sub-pools are within the given bounds. /// Ensures that the transactions in the sub-pools are within the given bounds.
/// ///
/// If the current size exceeds the given bounds, the worst transactions are evicted from the /// If the current size exceeds the given bounds, the worst transactions are evicted from the

View File

@ -327,6 +327,10 @@ impl PoolTransaction for MockTransaction {
} }
} }
} }
fn size(&self) -> usize {
0
}
} }
#[derive(Default)] #[derive(Default)]

View File

@ -191,6 +191,9 @@ pub trait PoolTransaction: fmt::Debug + Send + Sync + 'static {
/// ///
/// This will return `None` for non-EIP1559 transactions /// This will return `None` for non-EIP1559 transactions
fn max_priority_fee_per_gas(&self) -> Option<U256>; fn max_priority_fee_per_gas(&self) -> Option<U256>;
/// Returns a measurement of the heap usage of this type and all its internals.
fn size(&self) -> usize;
} }
/// Represents the current status of the pool. /// Represents the current status of the pool.
@ -198,8 +201,14 @@ pub trait PoolTransaction: fmt::Debug + Send + Sync + 'static {
pub struct PoolStatus { pub struct PoolStatus {
/// Number of transactions in the _pending_ sub-pool. /// Number of transactions in the _pending_ sub-pool.
pub pending: usize, pub pending: usize,
/// Reported size of transactions in the _pending_ sub-pool.
pub pending_size: usize,
/// Number of transactions in the _basefee_ pool. /// Number of transactions in the _basefee_ pool.
pub basefee: usize, pub basefee: usize,
/// Reported size of transactions in the _basefee_ sub-pool.
pub basefee_size: usize,
/// Number of transactions in the _queued_ sub-pool. /// Number of transactions in the _queued_ sub-pool.
pub queued: usize, pub queued: usize,
/// Reported size of transactions in the _queued_ sub-pool.
pub queued_size: usize,
} }

View File

@ -106,6 +106,11 @@ impl<T: PoolTransaction> ValidPoolTransaction<T> {
pub(crate) fn is_local(&self) -> bool { pub(crate) fn is_local(&self) -> bool {
self.origin.is_local() self.origin.is_local()
} }
/// The heap allocated size of this transaction.
pub(crate) fn size(&self) -> usize {
self.transaction.size()
}
} }
#[cfg(test)] #[cfg(test)]