feat(txpool): new txpool design (#22)

This commit is contained in:
Matthias Seitz
2022-10-11 17:10:02 +02:00
committed by GitHub
parent 8eb2ea4152
commit 3fed7cfe21
15 changed files with 1847 additions and 1059 deletions

8
Cargo.lock generated
View File

@ -1488,6 +1488,12 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "paste"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1de2e551fb905ac83f73f7aedf2f0cb4a0da7e35efa24a202a936269f1f18e1"
[[package]] [[package]]
name = "peeking_take_while" name = "peeking_take_while"
version = "0.1.2" version = "0.1.2"
@ -1811,10 +1817,12 @@ name = "reth-transaction-pool"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bitflags",
"fnv", "fnv",
"futures", "futures",
"linked-hash-map", "linked-hash-map",
"parking_lot", "parking_lot",
"paste",
"reth-primitives", "reth-primitives",
"serde", "serde",
"thiserror", "thiserror",

View File

@ -25,3 +25,7 @@ tracing = "0.1"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
linked-hash-map = "0.5" linked-hash-map = "0.5"
fnv = "1.0.7" fnv = "1.0.7"
bitflags = "1.3"
[dev-dependencies]
paste = "1.0"

View File

@ -1,12 +1,11 @@
use crate::U256;
use fnv::FnvHashMap; use fnv::FnvHashMap;
use reth_primitives::Address; use reth_primitives::Address;
use std::collections::HashMap; use std::{collections::HashMap, ops::Bound};
/// An internal mapping of addresses. /// An internal mapping of addresses.
/// ///
/// This assigns a _unique_ `SenderId` for a new `Address`. /// This assigns a _unique_ `SenderId` for a new `Address`.
#[derive(Debug)] #[derive(Debug, Default)]
pub struct SenderIdentifiers { pub struct SenderIdentifiers {
/// The identifier to use next. /// The identifier to use next.
id: u64, id: u64,
@ -53,6 +52,21 @@ impl SenderIdentifiers {
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct SenderId(u64); pub struct SenderId(u64);
// === impl SenderId ===
impl SenderId {
/// Returns a `Bound` for `TransactionId` starting with nonce `0`
pub(crate) fn start_bound(self) -> Bound<TransactionId> {
Bound::Included(TransactionId::new(self, 0))
}
}
impl From<u64> for SenderId {
fn from(value: u64) -> Self {
SenderId(value)
}
}
/// A unique identifier of a transaction of a Sender. /// A unique identifier of a transaction of a Sender.
/// ///
/// This serves as an identifier for dependencies of a transaction: /// This serves as an identifier for dependencies of a transaction:
@ -73,11 +87,11 @@ impl TransactionId {
Self { sender, nonce } Self { sender, nonce }
} }
/// Returns the id a transactions depends on /// Returns the `TransactionId` this transaction depends on.
/// ///
/// This returns `transaction_nonce - 1` if `transaction_nonce` is higher than the /// This returns `transaction_nonce - 1` if `transaction_nonce` is higher than the
/// `on_chain_none` /// `on_chain_none`
pub fn dependency( pub fn ancestor(
transaction_nonce: u64, transaction_nonce: u64,
on_chain_nonce: u64, on_chain_nonce: u64,
sender: SenderId, sender: SenderId,
@ -92,4 +106,48 @@ impl TransactionId {
None None
} }
} }
/// Returns the `TransactionId` that would come before this transaction.
pub(crate) fn unchecked_ancestor(&self) -> Option<TransactionId> {
if self.nonce == 0 {
None
} else {
Some(TransactionId::new(self.sender, self.nonce - 1))
}
}
/// Returns the `TransactionId` that directly follows this transaction: `self.nonce + 1`
pub fn descendant(&self) -> TransactionId {
TransactionId::new(self.sender, self.nonce + 1)
}
/// Returns the nonce the follows directly after this.
#[inline]
pub(crate) fn next_nonce(&self) -> u64 {
self.nonce + 1
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeSet;
#[test]
fn test_transaction_id_ord_eq_sender() {
let tx1 = TransactionId::new(100u64.into(), 0u64);
let tx2 = TransactionId::new(100u64.into(), 1u64);
assert!(tx2 > tx1);
let set = BTreeSet::from([tx1, tx2]);
assert_eq!(set.into_iter().collect::<Vec<_>>(), vec![tx1, tx2]);
}
#[test]
fn test_transaction_id_ord() {
let tx1 = TransactionId::new(99u64.into(), 0u64);
let tx2 = TransactionId::new(100u64.into(), 1u64);
assert!(tx2 > tx1);
let set = BTreeSet::from([tx1, tx2]);
assert_eq!(set.into_iter().collect::<Vec<_>>(), vec![tx1, tx2]);
}
} }

View File

@ -22,6 +22,9 @@ pub mod pool;
mod traits; mod traits;
mod validate; mod validate;
#[cfg(test)]
mod test_util;
pub use crate::{ pub use crate::{
client::PoolClient, client::PoolClient,
config::PoolConfig, config::PoolConfig,

View File

@ -1,13 +1,12 @@
use crate::traits::PoolTransaction; use crate::traits::PoolTransaction;
use std::fmt; use std::fmt;
/// Transaction ordering trait to determine the order of transactions.
/// Transaction ordering.
/// ///
/// Decides how transactions should be ordered within the pool. /// Decides how transactions should be ordered within the pool.
/// ///
/// The returned priority must reflect natural `Ordering`. /// The returned priority must reflect natural `Ordering`
// TODO: for custom, more advanced scoring it would be ideal to determine the priority in the // TODO(mattsse) this should be extended so it provides a way to rank transaction in relation to
// context of the entire pool instead of standalone by alone looking at a single transaction // each other.
pub trait TransactionOrdering: Send + Sync + 'static { pub trait TransactionOrdering: Send + Sync + 'static {
/// Priority of a transaction. /// Priority of a transaction.
type Priority: Ord + Clone + Default + fmt::Debug + Send + Sync; type Priority: Ord + Clone + Default + fmt::Debug + Send + Sync;

View File

@ -0,0 +1,60 @@
use crate::{
identifier::TransactionId,
pool::pending::{PendingTransaction, PendingTransactionRef},
TransactionOrdering, ValidPoolTransaction,
};
use reth_primitives::H256 as TxHash;
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
sync::Arc,
};
use tracing::debug;
/// An iterator that returns transactions that can be executed on the current state.
pub struct BestTransactions<T: TransactionOrdering> {
pub(crate) all: BTreeMap<TransactionId, Arc<PendingTransaction<T>>>,
pub(crate) independent: BTreeSet<PendingTransactionRef<T>>,
pub(crate) invalid: HashSet<TxHash>,
}
impl<T: TransactionOrdering> BestTransactions<T> {
/// Mark the transaction and it's descendants as invalid.
pub(crate) fn mark_invalid(&mut self, tx: &Arc<ValidPoolTransaction<T::Transaction>>) {
self.invalid.insert(*tx.hash());
}
}
impl<T: TransactionOrdering> crate::traits::BestTransactions for BestTransactions<T> {
fn mark_invalid(&mut self, tx: &Self::Item) {
BestTransactions::mark_invalid(self, tx)
}
}
impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
type Item = Arc<ValidPoolTransaction<T::Transaction>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let best = self.independent.iter().next_back()?.clone();
let best = self.independent.take(&best)?;
let hash = best.transaction.hash();
// skip transactions that were marked as invalid
if self.invalid.contains(hash) {
debug!(
target: "txpool",
"[{:?}] skipping invalid transaction",
hash
);
continue
}
// Insert transactions that just got unlocked.
if let Some(unlocked) = self.all.get(&best.unlocks()) {
self.independent.insert(unlocked.transaction.clone());
}
return Some(best.transaction)
}
}
}

View File

@ -1,55 +1,51 @@
//! Transaction Pool internals. //! Transaction Pool internals.
//! //!
//! Incoming transactions are validated first. The validation outcome can have 3 states: //! Incoming transactions are before they enter the pool first. The validation outcome can have 3
//! states:
//! //!
//! 1. Transaction can _never_ be valid //! 1. Transaction can _never_ be valid
//! 2. Transaction is _currently_ valid //! 2. Transaction is _currently_ valid
//! 3. Transaction is _currently_ invalid, but could potentially become valid in the future //! 3. Transaction is _currently_ invalid, but could potentially become valid in the future
//! //!
//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current //! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) need to //! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the
//! be reevaluated again. //! state of a transaction needs to be reevaluated again.
//! //!
//! The transaction pool is responsible for storing new, valid transactions and providing the next //! The transaction pool is responsible for storing new, valid transactions and providing the next
//! best transactions sorted by their priority. Where priority is determined by the transaction's //! best transactions sorted by their priority. Where priority is determined by the transaction's
//! score. //! score ([`TransactionOrdering`]).
//!
//! However, the score is also only valid for the current state.
//! //!
//! Furthermore, the following characteristics fall under (3.): //! Furthermore, the following characteristics fall under (3.):
//! //!
//! a) Nonce of a transaction is higher than the expected nonce for the next transaction of its //! a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
//! sender. A distinction is made here whether multiple transactions from the same sender have //! sender. A distinction is made here whether multiple transactions from the same sender have
//! gapless nonce increments. a)(1) If _no_ transaction is missing in a chain of multiple //! gapless nonce increments.
//!
//! a)(1) If _no_ transaction is missing in a chain of multiple
//! transactions from the same sender (all nonce in row), all of them can in principle be executed //! transactions from the same sender (all nonce in row), all of them can in principle be executed
//! on the current state one after the other. a)(2) If there's a nonce gap, then all //! on the current state one after the other.
//!
//! a)(2) If there's a nonce gap, then all
//! transactions after the missing transaction are blocked until the missing transaction arrives. //! transactions after the missing transaction are blocked until the missing transaction arrives.
//! b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The fee //!
//! cap of the transaction needs to be no less than the base fee of block. //! b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The
//! fee cap of the transaction needs to be no less than the base fee of block.
//! //!
//! //!
//! In essence the transaction pool is made of two separate sub-pools: //! In essence the transaction pool is made of three separate sub-pools:
//! //!
//! _Pending Pool_: Contains all transactions that are valid on the current state and satisfy //! - Pending Pool: Contains all transactions that are valid on the current state and satisfy
//! (3. a)(1): _No_ nonce gaps _Queued Pool_: Contains all transactions that are currently //! (3. a)(1): _No_ nonce gaps
//! blocked by missing transactions: (3. a)(2): _With_ nonce gaps
//! //!
//! To account for the dynamic base fee requirement (3. b) which could render an EIP-1559 and all //! - Queued Pool: Contains all transactions that are currently blocked by missing
//! subsequent transactions of the sender currently invalid, the pending pool itself consists of two //! transactions: (3. a)(2): _With_ nonce gaps or due to lack of funds.
//! queues:
//! //!
//! _Ready Queue_: Contains all transactions that can be executed on the current state //! - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render
//! _Parked Queue_: Contains all transactions that either do not currently meet the dynamic //! an EIP-1559 and all subsequent transactions of the sender currently invalid.
//! base fee requirement or are blocked by a previous transaction that violates it.
//! //!
//! The classification of transaction in which queue it belongs depends on the current base fee and //! The classification of transactions is always dependent on the current state that is changed as
//! must be updated after changes: //! soon as a new block is mined. Once a new block is mined, the account changeset must be applied
//! //! to the transaction pool.
//! - Base Fee increases: recheck the _Ready Queue_ and evict transactions that don't satisfy
//! the new base fee, or depend on a transaction that no longer satisfies it, and move them
//! to the _Parked Queue_.
//! - Base Fee decreases: recheck the _Parked Queue_ and move all transactions that now satisfy
//! the new base fee to the _Ready Queue_.
//! //!
//! //!
//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool) //! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
@ -61,45 +57,36 @@
//! //!
//! ## Terminology //! ## Terminology
//! //!
//! - _Pending_: pending transactions are transactions that fall under (2.). Those transactions //! - _Pending_: pending transactions are transactions that fall under (2.). Those transactions are
//! are _currently_ ready to be executed and are stored in the `pending` sub-pool //! _currently_ ready to be executed and are stored in the pending sub-pool
//! - _Queued_: queued transactions are transactions that fall under category (3.). Those //! - _Queued_: queued transactions are transactions that fall under category (3.). Those
//! transactions are _currently_ waiting for state changes that eventually move them into //! transactions are _currently_ waiting for state changes that eventually move them into
//! category (2.) and become pending. //! category (2.) and become pending.
use crate::{ use crate::{
error::{PoolError, PoolResult}, error::PoolResult, pool::listener::PoolEventListener, traits::PoolTransaction,
pool::{ validate::ValidPoolTransaction, BlockID, PoolClient, PoolConfig, TransactionOrdering,
listener::PoolEventListener, TransactionValidator, U256,
pending::PendingTransactions,
queued::{QueuedPoolTransaction, QueuedTransactions},
},
traits::PoolTransaction,
validate::ValidPoolTransaction,
BlockID, PoolClient, PoolConfig, TransactionOrdering, TransactionValidator, U256,
}; };
use fnv::FnvHashMap;
use best::BestTransactions;
use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::channel::mpsc::{channel, Receiver, Sender};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use reth_primitives::{TxHash, U64}; use reth_primitives::{TxHash, U64};
use std::{ use std::{collections::HashMap, sync::Arc};
collections::{HashMap, VecDeque}, use tracing::warn;
fmt,
sync::Arc,
};
use tracing::{debug, trace, warn};
mod best;
mod events; mod events;
mod listener; mod listener;
mod parked;
mod pending; mod pending;
mod queued; pub(crate) mod state;
mod transaction; mod transaction;
pub mod txpool;
use crate::{ use crate::{pool::txpool::TxPool, validate::TransactionValidationOutcome};
identifier::{SenderId, TransactionId},
validate::TransactionValidationOutcome,
};
pub use events::TransactionEvent; pub use events::TransactionEvent;
pub use pending::TransactionsIterator;
/// Shareable Transaction pool. /// Shareable Transaction pool.
pub struct BasicPool<P: PoolClient, T: TransactionOrdering> { pub struct BasicPool<P: PoolClient, T: TransactionOrdering> {
@ -205,7 +192,7 @@ pub struct PoolInner<P: PoolClient, T: TransactionOrdering> {
/// Chain/Storage access. /// Chain/Storage access.
client: Arc<P>, client: Arc<P>,
/// The internal pool that manages /// The internal pool that manages
pool: RwLock<GraphPool<T>>, pool: RwLock<TxPool<T>>,
/// Pool settings. /// Pool settings.
config: PoolConfig, config: PoolConfig,
/// Manages listeners for transaction state change events. /// Manages listeners for transaction state change events.
@ -227,7 +214,7 @@ where
client, client,
config, config,
event_listener: Default::default(), event_listener: Default::default(),
pool: RwLock::new(GraphPool::new(ordering)), pool: RwLock::new(TxPool::new(ordering)),
ready_transaction_listener: Default::default(), ready_transaction_listener: Default::default(),
} }
} }
@ -262,7 +249,7 @@ where
tx: TransactionValidationOutcome<T::Transaction>, tx: TransactionValidationOutcome<T::Transaction>,
) -> PoolResult<TxHash> { ) -> PoolResult<TxHash> {
match tx { match tx {
TransactionValidationOutcome::Valid { balance, state_nonce, transaction } => { TransactionValidationOutcome::Valid { balance: _, state_nonce: _, transaction: _ } => {
// TODO create `ValidPoolTransaction` // TODO create `ValidPoolTransaction`
// let added = self.pool.write().add_transaction(tx)?; // let added = self.pool.write().add_transaction(tx)?;
@ -330,250 +317,8 @@ where
} }
/// Returns an iterator that yields transactions that are ready to be included in the block. /// Returns an iterator that yields transactions that are ready to be included in the block.
pub(crate) fn ready_transactions(&self) -> TransactionsIterator<T> { pub(crate) fn ready_transactions(&self) -> BestTransactions<T> {
self.pool.read().ready_transactions() self.pool.read().best_transactions()
}
}
/// A pool that only manages transactions.
///
/// This pool maintains a dependency graph of transactions and provides the currently ready
/// transactions.
pub struct GraphPool<T: TransactionOrdering> {
/// How to order transactions.
ordering: Arc<T>,
/// Contains the currently known info
sender_info: FnvHashMap<SenderId, SenderInfo>,
/// Sub-Pool of transactions that are ready and waiting to be executed
pending: PendingTransactions<T>,
/// Sub-Pool of transactions that are waiting for state changes that eventually turn them
/// valid, so they can be moved in the `pending` pool.
queued: QueuedTransactions<T>,
}
// === impl PoolInner ===
impl<T: TransactionOrdering> GraphPool<T> {
/// Create a new graph pool instance.
pub fn new(ordering: Arc<T>) -> Self {
let pending = PendingTransactions::new(Arc::clone(&ordering));
Self { ordering, sender_info: Default::default(), pending, queued: Default::default() }
}
/// Updates the pool based on the changed base fee.
///
/// This enforces the dynamic fee requirement.
/// If the `new_base_fee` is _higher_ than previous base fee, all EIP-1559 transactions in the
/// ready queue that now violate the dynamic fee requirement need to parked.
/// If the `new_base_fee` is _lower_ than the previous base fee, all parked transactions that
/// now satisfy the dynamic fee requirement need to moved to the ready queue.
pub(crate) fn update_base_fee(&mut self, new_base_fee: U256) {
let _old_base_fee = self.pending.set_next_base_fee(new_base_fee);
// TODO update according to the changed base_fee
todo!()
}
/// Returns if the transaction for the given hash is already included in this pool
pub(crate) fn contains(&self, tx_hash: &TxHash) -> bool {
self.queued.contains(tx_hash) || self.pending.contains(tx_hash)
}
/// Returns an iterator that yields transactions that are ready to be included in the block.
pub(crate) fn ready_transactions(&self) -> TransactionsIterator<T> {
self.pending.get_transactions()
}
/// Adds the transaction into the pool
///
/// This pool consists of two sub-pools: `Queued` and `Pending`.
///
/// The `Queued` pool contains transaction with gaps in its dependency tree: It requires
/// additional transaction that are note yet present in the pool.
///
/// The `Pending` pool contains all transactions that have all their dependencies satisfied (no
/// nonce gaps). It consists of two parts: `Parked` and `Ready`.
///
/// The `Ready` queue contains transactions that are ready to be included in the pending block.
/// With EIP-1559, transactions can become executable or not without any changes to the
/// sender's balance or nonce and instead their feeCap determines whether the transaction is
/// _currently_ (on the current state) ready or needs to be parked until the feeCap satisfies
/// the block's baseFee.
fn add_transaction(
&mut self,
tx: ValidPoolTransaction<T::Transaction>,
) -> PoolResult<AddedTransaction<T::Transaction>> {
if self.contains(tx.hash()) {
warn!(target: "txpool", "[{:?}] Already added", tx.hash());
return Err(PoolError::AlreadyAdded(Box::new(*tx.hash())))
}
let tx = QueuedPoolTransaction::new(tx, self.pending.provided_dependencies());
trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash(), tx);
// If all ids are not satisfied import to queued
if !tx.is_satisfied() {
let hash = *tx.transaction.hash();
self.queued.add_transaction(tx)?;
return Ok(AddedTransaction::Queued { hash })
}
self.add_pending_transaction(tx)
}
/// Adds the transaction to the pending pool.
///
/// This will also move all transaction that get unlocked by the dependency id this transaction
/// provides from the queued pool into the pending pool.
///
/// CAUTION: this expects that transaction's dependencies are fully satisfied
fn add_pending_transaction(
&mut self,
tx: QueuedPoolTransaction<T>,
) -> PoolResult<AddedTransaction<T::Transaction>> {
let hash = *tx.transaction.hash();
trace!(target: "txpool", "adding pending transaction [{:?}]", hash);
let mut pending = AddedPendingTransaction::new(hash);
// tracks all transaction that can be moved to the pending pool, starting the given
// transaction
let mut pending_transactions = VecDeque::from([tx]);
// tracks whether we're processing the given `tx`
let mut is_new_tx = true;
// take first transaction from the list
while let Some(current_tx) = pending_transactions.pop_front() {
// also add the transaction that the current transaction unlocks
pending_transactions
.extend(self.queued.satisfy_and_unlock(&current_tx.transaction.transaction_id));
let current_hash = *current_tx.transaction.hash();
// try to add the transaction to the ready pool
match self.pending.add_transaction(current_tx) {
Ok(replaced_transactions) => {
if !is_new_tx {
pending.promoted.push(current_hash);
}
// tx removed from ready pool
pending.removed.extend(replaced_transactions);
}
Err(err) => {
// failed to add transaction
if is_new_tx {
debug!(target: "txpool", "[{:?}] Failed to add tx: {:?}", current_hash,
err);
return Err(err)
} else {
pending.discarded.push(current_hash);
}
}
}
is_new_tx = false;
}
// check for a cycle where importing a transaction resulted in pending transactions to be
// added while removing current transaction. in which case we move this transaction back to
// the pending queue
if pending.removed.iter().any(|tx| *tx.hash() == hash) {
self.pending.clear_transactions(&pending.promoted);
return Err(PoolError::CyclicTransaction)
}
Ok(AddedTransaction::Pending(pending))
}
/// Prunes the transactions that provide the given dependencies.
///
/// This will effectively remove those transactions that satisfy the dependencies.
/// And queued transactions might get promoted if the pruned dependencies unlock them.
pub fn prune_transactions(
&mut self,
dependencies: impl IntoIterator<Item = TransactionId>,
) -> PruneResult<T::Transaction> {
let mut imports = vec![];
let mut pruned = vec![];
for dependency in dependencies {
// mark as satisfied and store the transactions that got unlocked
imports.extend(self.queued.satisfy_and_unlock(&dependency));
// prune transactions
pruned.extend(self.pending.remove_mined(dependency));
}
let mut promoted = vec![];
let mut failed = vec![];
for tx in imports {
let hash = *tx.transaction.hash();
match self.add_pending_transaction(tx) {
Ok(res) => promoted.push(res),
Err(e) => {
warn!(target: "txpool", "Failed to promote tx [{:?}] : {:?}", hash, e);
failed.push(hash)
}
}
}
PruneResult { pruned, failed, promoted }
}
/// Remove the given transactions from the pool.
pub fn remove_invalid(
&mut self,
tx_hashes: Vec<TxHash>,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
// early exit in case there is no invalid transactions.
if tx_hashes.is_empty() {
return vec![]
}
trace!(target: "txpool", "Removing invalid transactions: {:?}", tx_hashes);
let mut removed = self.pending.remove_with_dependencies(tx_hashes.clone(), None);
removed.extend(self.queued.remove(tx_hashes));
trace!(target: "txpool", "Removed invalid transactions: {:?}", removed);
removed
}
/// Returns the current size of the entire pool
pub fn size_of(&self) -> usize {
unimplemented!()
}
/// Ensures that the transactions in the sub-pools are within the given bounds.
///
/// If the current size exceeds the given bounds, the worst transactions are evicted from the
/// pool and returned.
pub fn enforce_size_limits(&mut self) {
unimplemented!()
}
}
/// Represents the outcome of a prune
pub struct PruneResult<T: PoolTransaction> {
/// a list of added transactions that a pruned marker satisfied
pub promoted: Vec<AddedTransaction<T>>,
/// all transactions that failed to be promoted and now are discarded
pub failed: Vec<TxHash>,
/// all transactions that were pruned from the ready pool
pub pruned: Vec<Arc<ValidPoolTransaction<T>>>,
}
impl<T: PoolTransaction> fmt::Debug for PruneResult<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "PruneResult {{ ")?;
write!(
fmt,
"promoted: {:?}, ",
self.promoted.iter().map(|tx| *tx.hash()).collect::<Vec<_>>()
)?;
write!(fmt, "failed: {:?}, ", self.failed)?;
write!(
fmt,
"pruned: {:?}, ",
self.pruned.iter().map(|tx| *tx.transaction.hash()).collect::<Vec<_>>()
)?;
write!(fmt, "}}")?;
Ok(())
} }
} }
@ -602,26 +347,6 @@ impl<T: PoolTransaction> AddedPendingTransaction<T> {
} }
} }
/// Stores relevant context about a sender.
#[derive(Debug, Clone)]
struct SenderInfo {
/// current nonce of the sender
state_nonce: u64,
/// Balance of the sender at the current point.
balance: U256,
/// How many transactions of this sender are currently in the pool.
num_transactions: u64,
}
// === impl SenderInfo ===
impl SenderInfo {
/// Creates a new entry for an incoming, not yet tracked sender.
fn new_incoming(state_nonce: u64, balance: U256) -> Self {
Self { state_nonce, balance, num_transactions: 1 }
}
}
/// Represents a transaction that was added into the pool and its state /// Represents a transaction that was added into the pool and its state
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum AddedTransaction<T: PoolTransaction> { pub enum AddedTransaction<T: PoolTransaction> {

View File

@ -0,0 +1,194 @@
use crate::{identifier::TransactionId, PoolTransaction, ValidPoolTransaction};
use fnv::FnvHashMap;
use std::{cmp::Ordering, collections::BTreeSet, ops::Deref, sync::Arc};
/// A pool of transaction that are currently parked and wait for external changes that eventually
/// move the transaction into the pending pool.
///
/// This pool is a bijection: at all times each set contains the same transactions.
pub(crate) struct ParkedPool<T: ParkedOrd> {
/// Keeps track of transactions inserted in the pool.
///
/// This way we can determine when transactions where submitted to the pool.
submission_id: u64,
/// _All_ Transactions that are currently inside the pool grouped by their identifier.
by_id: FnvHashMap<TransactionId, ParkedPoolTransaction<T>>,
/// All transactions sorted by their priority function.
best: BTreeSet<ParkedPoolTransaction<T>>,
}
// === impl QueuedPool ===
impl<T: ParkedOrd> ParkedPool<T> {
/// Adds a new transactions to the pending queue.
///
/// # Panics
///
/// If the transaction is already included.
pub(crate) fn add_transaction(&mut self, tx: Arc<ValidPoolTransaction<T::Transaction>>) {
let id = *tx.id();
assert!(!self.by_id.contains_key(&id), "transaction already included");
let submission_id = self.next_id();
let transaction = ParkedPoolTransaction { submission_id, transaction: tx.into() };
self.by_id.insert(id, transaction.clone());
self.best.insert(transaction);
}
/// Removes the transaction from the pool
pub(crate) fn remove_transaction(
&mut self,
id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let tx = self.by_id.remove(id)?;
self.best.remove(&tx);
Some(tx.transaction.into())
}
fn next_id(&mut self) -> u64 {
let id = self.submission_id;
self.submission_id = self.submission_id.wrapping_add(1);
id
}
}
impl<T: ParkedOrd> Default for ParkedPool<T> {
fn default() -> Self {
Self { submission_id: 0, by_id: Default::default(), best: Default::default() }
}
}
/// Represents a transaction in this pool.
struct ParkedPoolTransaction<T: ParkedOrd> {
/// Identifier that tags when transaction was submitted in the pool.
submission_id: u64,
/// Actual transaction.
transaction: T,
}
impl<T: ParkedOrd> Clone for ParkedPoolTransaction<T> {
fn clone(&self) -> Self {
Self { submission_id: self.submission_id, transaction: self.transaction.clone() }
}
}
impl<T: ParkedOrd> Eq for ParkedPoolTransaction<T> {}
impl<T: ParkedOrd> PartialEq<Self> for ParkedPoolTransaction<T> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<T: ParkedOrd> PartialOrd<Self> for ParkedPoolTransaction<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T: ParkedOrd> Ord for ParkedPoolTransaction<T> {
fn cmp(&self, other: &Self) -> Ordering {
// This compares by the transactions first, and only if two tx are equal this compares
// the unique `transaction_id`.
self.transaction
.cmp(&other.transaction)
.then_with(|| self.transaction.id().cmp(other.transaction.id()))
}
}
/// Helper trait used for custom `Ord` wrappers around a transaction.
///
/// This is effectively a wrapper for `Arc<ValidPoolTransaction>` with custom `Ord` implementation.
pub(crate) trait ParkedOrd:
Ord
+ Clone
+ From<Arc<ValidPoolTransaction<Self::Transaction>>>
+ Into<Arc<ValidPoolTransaction<Self::Transaction>>>
+ Deref<Target = Arc<ValidPoolTransaction<Self::Transaction>>>
{
/// The wrapper transaction type.
type Transaction: PoolTransaction;
}
/// Helper macro to implement necessary conversions for `ParkedOrd` trait
macro_rules! impl_ord_wrapper {
($name:ident) => {
impl<T: PoolTransaction> Clone for $name<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: PoolTransaction> Eq for $name<T> {}
impl<T: PoolTransaction> PartialEq<Self> for $name<T> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<T: PoolTransaction> PartialOrd<Self> for $name<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T: PoolTransaction> Deref for $name<T> {
type Target = Arc<ValidPoolTransaction<T>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<T: PoolTransaction> ParkedOrd for $name<T> {
type Transaction = T;
}
impl<T: PoolTransaction> From<Arc<ValidPoolTransaction<T>>> for $name<T> {
fn from(value: Arc<ValidPoolTransaction<T>>) -> Self {
Self(value)
}
}
impl<T: PoolTransaction> From<$name<T>> for Arc<ValidPoolTransaction<T>> {
fn from(value: $name<T>) -> Arc<ValidPoolTransaction<T>> {
value.0
}
}
};
}
/// A new type wrapper for [`ValidPoolTransaction`]
///
/// This sorts transactions by their base fee.
///
/// Caution: This assumes all transaction in the `BaseFee` sub-pool have a fee value.
pub struct BasefeeOrd<T: PoolTransaction>(Arc<ValidPoolTransaction<T>>);
impl_ord_wrapper!(BasefeeOrd);
impl<T: PoolTransaction> Ord for BasefeeOrd<T> {
fn cmp(&self, other: &Self) -> Ordering {
match (self.0.transaction.max_fee_per_gas(), other.0.transaction.max_fee_per_gas()) {
(Some(fee), Some(other)) => fee.cmp(&other),
(None, Some(other)) => Ordering::Less,
(Some(_), None) => Ordering::Greater,
_ => Ordering::Equal,
}
}
}
/// A new type wrapper for [`ValidPoolTransaction`]
///
/// This sorts transactions by their distance.
pub struct QueuedOrd<T: PoolTransaction>(Arc<ValidPoolTransaction<T>>);
impl_ord_wrapper!(QueuedOrd);
impl<T: PoolTransaction> Ord for QueuedOrd<T> {
fn cmp(&self, other: &Self) -> Ordering {
// TODO ideally compare by distance here.
Ordering::Equal
}
}

View File

@ -1,68 +1,46 @@
use crate::{ use crate::{
error::PoolResult, identifier::TransactionId, pool::queued::QueuedPoolTransaction, identifier::TransactionId, pool::best::BestTransactions, TransactionOrdering,
traits::BestTransactions, validate::ValidPoolTransaction, TransactionOrdering, ValidPoolTransaction,
}; };
use parking_lot::RwLock; use reth_primitives::rpc::TxHash;
use reth_primitives::{TxHash, U256};
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
collections::{BTreeSet, HashMap, HashSet}, collections::{BTreeMap, BTreeSet, HashSet},
sync::Arc, sync::Arc,
}; };
use tracing::debug; use tracing::debug;
/// Type alias for replaced transactions /// A pool of validated and gapless transactions that are ready on the current state and are waiting
pub(crate) type ReplacedTransactions<T> = /// to be included in a block.
(Vec<Arc<ValidPoolTransaction<<T as TransactionOrdering>::Transaction>>>, Vec<TxHash>); pub(crate) struct PendingPool<T: TransactionOrdering> {
/// How to order transactions.
/// A pool of validated transactions that are ready on the current state and are waiting to be ordering: Arc<T>,
/// included in a block.
///
/// Each transaction in this pool is valid on its own, i.e. they are not dependent on transaction
/// that must be executed first. Each of these transaction can be executed independently on the
/// current state
pub(crate) struct PendingTransactions<T: TransactionOrdering> {
/// Keeps track of transactions inserted in the pool. /// Keeps track of transactions inserted in the pool.
/// ///
/// This way we can determine when transactions where submitted to the pool. /// This way we can determine when transactions where submitted to the pool.
id: u64, submission_id: u64,
/// How to order transactions. /// _All_ Transactions that are currently inside the pool grouped by their identifier.
ordering: Arc<T>, by_id: BTreeMap<TransactionId, Arc<PendingTransaction<T>>>,
/// Base fee of the next block.
pending_base_fee: U256,
/// Dependencies that are provided by `PendingTransaction`s
provided_dependencies: HashMap<TransactionId, TxHash>,
/// Pending transactions that are currently on hold until the `baseFee` of the pending block
/// changes in favor of the parked transactions: the `pendingBlock.baseFee` must decrease
/// before they can be moved to the ready pool and are ready to be executed.
parked: ParkedTransactions<T>,
/// All Transactions that are currently ready.
///
/// Meaning, there are no nonce gaps in these transactions and all of them satisfy the
/// `baseFee` condition: transaction `maxFeePerGas >= pendingBlock.baseFee`
ready_transactions: Arc<RwLock<HashMap<TxHash, PendingTransaction<T>>>>,
/// Independent transactions that can be included directly and don't require other /// Independent transactions that can be included directly and don't require other
/// transactions. /// transactions.
/// ///
/// Sorted by their scoring value. /// Sorted by their scoring value.
independent_transactions: BTreeSet<PoolTransactionRef<T>>, independent_transactions: BTreeSet<PendingTransactionRef<T>>,
} }
// === impl PendingTransactions === // === impl PendingPool ===
impl<T: TransactionOrdering> PendingTransactions<T> { impl<T: TransactionOrdering> PendingPool<T> {
/// Create a new pool instance /// Create a new pool instance.
pub(crate) fn new(ordering: Arc<T>) -> Self { pub(crate) fn new(ordering: Arc<T>) -> Self {
Self { Self {
id: 0,
provided_dependencies: Default::default(),
parked: Default::default(),
ready_transactions: Arc::new(Default::default()),
ordering, ordering,
submission_id: 0,
by_id: Default::default(),
independent_transactions: Default::default(), independent_transactions: Default::default(),
pending_base_fee: Default::default(),
} }
} }
/// Returns an iterator over all transactions that are _currently_ ready. /// Returns an iterator over all transactions that are _currently_ ready.
/// ///
/// 1. The iterator _always_ returns transaction in order: It never returns a transaction with /// 1. The iterator _always_ returns transaction in order: It never returns a transaction with
@ -81,303 +59,88 @@ impl<T: TransactionOrdering> PendingTransactions<T> {
/// provides a way to mark transactions that the consumer of this iterator considers invalid. In /// provides a way to mark transactions that the consumer of this iterator considers invalid. In
/// which case the transaction's subgraph is also automatically marked invalid, See (1.). /// which case the transaction's subgraph is also automatically marked invalid, See (1.).
/// Invalid transactions are skipped. /// Invalid transactions are skipped.
pub(crate) fn get_transactions(&self) -> TransactionsIterator<T> { pub(crate) fn best(&self) -> BestTransactions<T> {
TransactionsIterator { BestTransactions {
all: self.ready_transactions.read().clone(), all: self.by_id.clone(),
independent: self.independent_transactions.clone(), independent: self.independent_transactions.clone(),
awaiting: Default::default(),
invalid: Default::default(), invalid: Default::default(),
} }
} }
/// Sets the given base fee and returns the old one. /// Returns the ancestor the given transaction, the transaction with `nonce - 1`.
pub(crate) fn set_next_base_fee(&mut self, base_fee: U256) -> U256 { ///
std::mem::replace(&mut self.pending_base_fee, base_fee) /// Note: for a transaction with nonce higher than the current on chain nonce this will always
} /// return an ancestor since all transaction in this pool are gapless.
fn ancestor(&self, id: &TransactionId) -> Option<&Arc<PendingTransaction<T>>> {
/// Returns true if the transaction is part of the queue. self.by_id.get(&id.unchecked_ancestor()?)
pub(crate) fn contains(&self, hash: &TxHash) -> bool {
self.ready_transactions.read().contains_key(hash)
}
/// Returns the transaction for the hash if it's in the ready pool but not yet mined
pub(crate) fn get(&self, hash: &TxHash) -> Option<PendingTransaction<T>> {
self.ready_transactions.read().get(hash).cloned()
}
pub(crate) fn provided_dependencies(&self) -> &HashMap<TransactionId, TxHash> {
&self.provided_dependencies
}
fn next_id(&mut self) -> u64 {
let id = self.id;
self.id = self.id.wrapping_add(1);
id
} }
/// Adds a new transactions to the pending queue. /// Adds a new transactions to the pending queue.
/// ///
/// Depending on the transaction's feeCap, this will either move it into the ready queue or park
/// it until a future baseFee unlocks it.
///
/// # Panics /// # Panics
/// ///
/// if the pending transaction is not ready /// if the transaction is already included
/// or the transaction is already included pub(crate) fn add_transaction(&mut self, tx: Arc<ValidPoolTransaction<T::Transaction>>) {
pub(crate) fn add_transaction( assert!(!self.by_id.contains_key(tx.id()), "transaction already included");
&mut self,
tx: QueuedPoolTransaction<T>,
) -> PoolResult<Vec<Arc<ValidPoolTransaction<T::Transaction>>>> {
assert!(tx.is_satisfied(), "transaction must be ready",);
assert!(
!self.ready_transactions.read().contains_key(tx.transaction.hash()),
"transaction already included"
);
let (replaced_tx, unlocks) = self.replaced_transactions(&tx.transaction)?;
let tx_id = *tx.id();
let submission_id = self.next_id(); let submission_id = self.next_id();
let hash = *tx.transaction.hash();
let mut independent = true; let priority = self.ordering.priority(&tx.transaction);
let mut requires_offset = 0;
let mut ready = self.ready_transactions.write();
// Add links to transactions that unlock the current one let transaction = PendingTransactionRef { submission_id, transaction: tx, priority };
for dependency in &tx.transaction.depends_on {
// Check if the transaction that satisfies the mark is still in the queue.
if let Some(other) = self.provided_dependencies.get(dependency) {
let tx = ready.get_mut(other).expect("hash included;");
tx.unlocks.push(hash);
// tx still depends on other tx
independent = false;
} else {
requires_offset += 1;
}
}
// update dependencies // If there's __no__ ancestor in the pool, then this transaction is independent, this is
self.provided_dependencies.insert(tx.transaction.transaction_id, hash); // guaranteed because this pool is gapless.
if self.ancestor(&tx_id).is_none() {
let priority = self.ordering.priority(&tx.transaction.transaction);
let transaction =
PoolTransactionRef { submission_id, transaction: tx.transaction, priority };
// TODO check basefee requirement
// add to the independent set
if independent {
self.independent_transactions.insert(transaction.clone()); self.independent_transactions.insert(transaction.clone());
} }
// insert to ready queue let transaction = Arc::new(PendingTransaction { transaction });
ready.insert(hash, PendingTransaction { transaction, unlocks, requires_offset });
Ok(replaced_tx) self.by_id.insert(tx_id, transaction);
} }
/// Removes and returns those transactions that got replaced by the `tx` /// Removes a _mined_ transaction from the pool.
fn replaced_transactions(
&mut self,
tx: &ValidPoolTransaction<T::Transaction>,
) -> PoolResult<ReplacedTransactions<T>> {
// check if we are replacing transactions
let remove_hashes: HashSet<_> =
if let Some(hash) = self.provided_dependencies.get(&tx.transaction_id) {
HashSet::from([hash])
} else {
return Ok((Vec::new(), Vec::new()))
};
// early exit if we are not replacing anything.
if remove_hashes.is_empty() {
return Ok((Vec::new(), Vec::new()))
}
// check if we're replacing the same transaction and if it can be replaced
let mut unlocked_tx = Vec::new();
{
// construct a list of unlocked transactions
// also check for transactions that shouldn't be replaced because underpriced
let ready = self.ready_transactions.read();
for to_remove in remove_hashes.iter().filter_map(|hash| ready.get(hash)) {
// if we're attempting to replace a transaction that provides the exact same
// dependencies (addr + nonce) then we check for gas price
if to_remove.id().eq(&tx.transaction_id) {
// check if underpriced
// TODO check if underpriced
// if tx.pending_transaction.transaction.gas_price() <= to_remove.gas_price() {
// warn!(target: "txpool", "ready replacement transaction underpriced
// [{:?}]", tx.hash()); return
// Err(PoolError::ReplacementUnderpriced(Box::new(tx.clone())))
// } else {
// trace!(target: "txpool", "replacing ready transaction [{:?}] with higher
// gas price [{:?}]", to_remove.transaction.transaction.hash(), tx.hash());
// }
}
unlocked_tx.extend(to_remove.unlocks.iter().cloned())
}
}
let remove_hashes = remove_hashes.into_iter().copied().collect::<Vec<_>>();
let new_provides = HashSet::from([tx.transaction_id]);
let removed_tx = self.remove_with_dependencies(remove_hashes, Some(new_provides));
Ok((removed_tx, unlocked_tx))
}
/// Removes the transactions from the ready queue and returns the removed transactions.
/// This will also remove all transactions that depend on those.
pub(crate) fn clear_transactions(
&mut self,
tx_hashes: &[TxHash],
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.remove_with_dependencies(tx_hashes.to_vec(), None)
}
/// Removes the transactions that was mined.
/// ///
/// This will also remove all transactions that lead to the transaction that provides the /// If the transactions has a descendant transaction it will advance it to the best queue.
/// id. pub(crate) fn remove_mined(&mut self, id: &TransactionId) {
pub(crate) fn remove_mined( if let Some(tx) = self.by_id.remove(id) {
&mut self, self.independent_transactions.remove(&tx.transaction);
id: TransactionId,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let mut removed_tx = vec![];
// the dependencies to remove // mark the next as independent if it exists
let mut remove = vec![id]; if let Some(unlocked) = self.by_id.get(&id.descendant()) {
self.independent_transactions.insert(unlocked.transaction.clone());
while let Some(dependency) = remove.pop() {
let res = self
.provided_dependencies
.remove(&dependency)
.and_then(|hash| self.ready_transactions.write().remove(&hash));
if let Some(tx) = res {
let unlocks = tx.unlocks;
self.independent_transactions.remove(&tx.transaction);
let tx = tx.transaction.transaction;
// also remove previous transactions
{
let hash = tx.hash();
let mut ready = self.ready_transactions.write();
let mut previous_dependency = |dependency| -> Option<Vec<TransactionId>> {
let prev_hash = self.provided_dependencies.get(dependency)?;
let tx2 = ready.get_mut(prev_hash)?;
// remove hash
if let Some(idx) = tx2.unlocks.iter().position(|i| i == hash) {
tx2.unlocks.swap_remove(idx);
}
if tx2.unlocks.is_empty() {
Some(vec![tx2.transaction.transaction.transaction_id])
} else {
None
}
};
// find previous transactions
for dep in &tx.depends_on {
if let Some(mut dependency_to_remove) = previous_dependency(dep) {
remove.append(&mut dependency_to_remove);
}
}
}
// add the transactions that just got unlocked to independent set
for hash in unlocks {
if let Some(tx) = self.ready_transactions.write().get_mut(&hash) {
tx.requires_offset += 1;
if tx.requires_offset == tx.transaction.transaction.depends_on.len() {
self.independent_transactions.insert(tx.transaction.clone());
}
}
}
// finally, remove the dependencies that this transaction provides
let current_dependency = &dependency;
let removed = self.provided_dependencies.remove(&tx.transaction_id);
assert_eq!(
removed.as_ref(),
if current_dependency.eq(&tx.transaction_id) { None } else { Some(tx.hash()) },
"The pool contains exactly one transaction providing given tag; the removed transaction
claims to provide that tag, so it has to be mapped to it's hash; qed"
);
removed_tx.push(tx);
} }
} }
removed_tx
} }
/// Removes transactions and those that depend on them and satisfy at least one dependency in /// Removes the transaction from the pool.
/// the given filter set. pub(crate) fn remove_transaction(
pub(crate) fn remove_with_dependencies(
&mut self, &mut self,
mut tx_hashes: Vec<TxHash>, id: &TransactionId,
dependency_filter: Option<HashSet<TransactionId>>, ) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> { let tx = self.by_id.remove(id)?;
let mut removed = Vec::new(); self.independent_transactions.remove(&tx.transaction);
let mut ready = self.ready_transactions.write(); Some(tx.transaction.transaction.clone())
}
while let Some(hash) = tx_hashes.pop() { fn next_id(&mut self) -> u64 {
if let Some(mut tx) = ready.remove(&hash) { let id = self.submission_id;
let id = &tx.transaction.transaction.transaction_id; self.submission_id = self.submission_id.wrapping_add(1);
id
}
// remove the transactions /// Returns the transaction for the id if it's in the pool but not yet mined.
let removed_transaction = if dependency_filter pub(crate) fn get(&self, id: &TransactionId) -> Option<Arc<PendingTransaction<T>>> {
.as_ref() self.by_id.get(id).cloned()
.map(|filter| !filter.contains(id))
.unwrap_or(true)
{
self.provided_dependencies.remove(id);
true
} else {
false
};
// remove from unlocks
for dependency in &tx.transaction.transaction.depends_on {
if let Some(hash) = self.provided_dependencies.get(dependency) {
if let Some(tx) = ready.get_mut(hash) {
if let Some(idx) = tx.unlocks.iter().position(|i| i == hash) {
tx.unlocks.swap_remove(idx);
}
}
}
}
// remove from the independent set
self.independent_transactions.remove(&tx.transaction);
if removed_transaction {
// remove all transactions that the current one unlocks
tx_hashes.append(&mut tx.unlocks);
}
// remove transaction
removed.push(tx.transaction.transaction);
}
}
removed
} }
} }
/// A transaction that is ready to be included in a block. /// A transaction that is ready to be included in a block.
#[derive(Debug)]
pub(crate) struct PendingTransaction<T: TransactionOrdering> { pub(crate) struct PendingTransaction<T: TransactionOrdering> {
/// Reference to the actual transaction. /// Reference to the actual transaction.
transaction: PoolTransactionRef<T>, pub(crate) transaction: PendingTransactionRef<T>,
/// Tracks the transactions that get unlocked by this transaction.
unlocks: Vec<TxHash>,
/// Amount of required dependencies that are inherently provided.
requires_offset: usize,
} }
// == impl PendingTransaction === // == impl PendingTransaction ===
@ -391,50 +154,57 @@ impl<T: TransactionOrdering> PendingTransaction<T> {
impl<T: TransactionOrdering> Clone for PendingTransaction<T> { impl<T: TransactionOrdering> Clone for PendingTransaction<T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self { transaction: self.transaction.clone() }
transaction: self.transaction.clone(),
unlocks: self.unlocks.clone(),
requires_offset: self.requires_offset,
}
} }
} }
/// A reference to a transaction in the _pending_ pool /// A transaction that is ready to be included in a block.
#[derive(Debug)] pub(crate) struct PendingTransactionRef<T: TransactionOrdering> {
pub(crate) struct PoolTransactionRef<T: TransactionOrdering> {
/// Actual transaction.
pub(crate) transaction: Arc<ValidPoolTransaction<T::Transaction>>,
/// Identifier that tags when transaction was submitted in the pool. /// Identifier that tags when transaction was submitted in the pool.
pub(crate) submission_id: u64, pub(crate) submission_id: u64,
/// Actual transaction.
pub(crate) transaction: Arc<ValidPoolTransaction<T::Transaction>>,
/// The priority value assigned by the used `Ordering` function. /// The priority value assigned by the used `Ordering` function.
pub(crate) priority: T::Priority, pub(crate) priority: T::Priority,
} }
impl<T: TransactionOrdering> Clone for PoolTransactionRef<T> { impl<T: TransactionOrdering> PendingTransactionRef<T> {
/// The next transaction of the sender: `nonce + 1`
pub(crate) fn unlocks(&self) -> TransactionId {
self.transaction.transaction_id.descendant()
}
/// The hash for this transaction
pub(crate) fn hash(&self) -> &TxHash {
self.transaction.hash()
}
}
impl<T: TransactionOrdering> Clone for PendingTransactionRef<T> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
transaction: Arc::clone(&self.transaction),
submission_id: self.submission_id, submission_id: self.submission_id,
transaction: Arc::clone(&self.transaction),
priority: self.priority.clone(), priority: self.priority.clone(),
} }
} }
} }
impl<T: TransactionOrdering> Eq for PoolTransactionRef<T> {} impl<T: TransactionOrdering> Eq for PendingTransactionRef<T> {}
impl<T: TransactionOrdering> PartialEq<Self> for PoolTransactionRef<T> { impl<T: TransactionOrdering> PartialEq<Self> for PendingTransactionRef<T> {
fn eq(&self, other: &Self) -> bool { fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal self.cmp(other) == Ordering::Equal
} }
} }
impl<T: TransactionOrdering> PartialOrd<Self> for PoolTransactionRef<T> { impl<T: TransactionOrdering> PartialOrd<Self> for PendingTransactionRef<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> { fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other)) Some(self.cmp(other))
} }
} }
impl<T: TransactionOrdering> Ord for PoolTransactionRef<T> { impl<T: TransactionOrdering> Ord for PendingTransactionRef<T> {
fn cmp(&self, other: &Self) -> Ordering { fn cmp(&self, other: &Self) -> Ordering {
// This compares by `priority` and only if two tx have the exact same priority this compares // This compares by `priority` and only if two tx have the exact same priority this compares
// the unique `submission_id`. This ensures that transactions with same priority are not // the unique `submission_id`. This ensures that transactions with same priority are not
@ -444,163 +214,3 @@ impl<T: TransactionOrdering> Ord for PoolTransactionRef<T> {
.then_with(|| other.submission_id.cmp(&self.submission_id)) .then_with(|| other.submission_id.cmp(&self.submission_id))
} }
} }
/// Pending Transactions that are currently parked until their set baseFee becomes valid
struct ParkedTransactions<T: TransactionOrdering> {
/// Keeps track of transactions inserted in the pool.
///
/// This way we can determine when transactions where submitted to the pool.
id: u64,
/// All transactions that are currently parked due to their fee.
parked_transactions: HashMap<TxHash, ParkedTransaction<T>>,
/// Same transactions but sorted by their fee and priority
sorted_transactions: BTreeSet<ParkedTransactionRef<T>>,
}
impl<T: TransactionOrdering> Default for ParkedTransactions<T> {
fn default() -> Self {
Self {
id: 0,
parked_transactions: Default::default(),
sorted_transactions: Default::default(),
}
}
}
/// A transaction that is ready to be included in a block.
#[derive(Debug, Clone)]
pub(crate) struct ParkedTransaction<T: TransactionOrdering> {
/// Reference to the actual transaction.
transaction: PoolTransactionRef<T>,
/// Tracks the transactions that get unlocked by this transaction.
unlocks: Vec<TxHash>,
/// Amount of required dependencies that are inherently provided
requires_offset: usize,
}
/// A reference to a currently _parked_ transaction.
struct ParkedTransactionRef<T: TransactionOrdering> {
/// Actual transaction.
transaction: Arc<ValidPoolTransaction<T::Transaction>>,
/// Identifier that tags when transaction was submitted in the pool.
submission_id: u64,
/// The priority value assigned by the used `Ordering` function.
priority: T::Priority,
/// EIP-1559 Max base fee the caller is willing to pay.
max_fee_per_gas: U256,
}
impl<T: TransactionOrdering> Eq for ParkedTransactionRef<T> {}
impl<T: TransactionOrdering> PartialEq<Self> for ParkedTransactionRef<T> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<T: TransactionOrdering> PartialOrd<Self> for ParkedTransactionRef<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T: TransactionOrdering> Ord for ParkedTransactionRef<T> {
fn cmp(&self, other: &Self) -> Ordering {
// This compares the `max_fee_per_gas` value of the transaction
self.max_fee_per_gas
.cmp(&other.max_fee_per_gas)
.then_with(|| self.priority.cmp(&other.priority))
.then_with(|| other.submission_id.cmp(&self.submission_id))
}
}
/// An iterator that returns transactions that can be executed on the current state.
pub struct TransactionsIterator<T: TransactionOrdering> {
all: HashMap<TxHash, PendingTransaction<T>>,
awaiting: HashMap<TxHash, (usize, PoolTransactionRef<T>)>,
independent: BTreeSet<PoolTransactionRef<T>>,
invalid: HashSet<TxHash>,
}
// == impl TransactionsIterator ==
impl<T: TransactionOrdering> TransactionsIterator<T> {
/// Mark the transaction as invalid.
///
/// As a consequence, all values that depend on the invalid one will be skipped.
/// When given transaction is not in the pool it has no effect.
pub(crate) fn mark_invalid(&mut self, tx: &Arc<ValidPoolTransaction<T::Transaction>>) {
if let Some(invalid_transaction) = self.all.get(tx.hash()) {
debug!(
target: "txpool",
"[{:?}] Marked as invalid",
invalid_transaction.transaction.transaction.hash()
);
for hash in &invalid_transaction.unlocks {
self.invalid.insert(*hash);
}
}
}
/// Depending on number of satisfied requirements insert given ref
/// either to awaiting set or to best set.
fn independent_or_awaiting(&mut self, satisfied: usize, tx_ref: PoolTransactionRef<T>) {
if satisfied >= tx_ref.transaction.depends_on.len() {
// If we have satisfied all deps insert to the best set
self.independent.insert(tx_ref);
} else {
// otherwise we're still waiting for some deps
self.awaiting.insert(*tx_ref.transaction.hash(), (satisfied, tx_ref));
}
}
}
impl<T: TransactionOrdering> BestTransactions for TransactionsIterator<T> {
fn mark_invalid(&mut self, tx: &Self::Item) {
TransactionsIterator::mark_invalid(self, tx)
}
}
impl<T: TransactionOrdering> Iterator for TransactionsIterator<T> {
type Item = Arc<ValidPoolTransaction<T::Transaction>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let best = self.independent.iter().next_back()?.clone();
let best = self.independent.take(&best)?;
let hash = best.transaction.hash();
// skip transactions that were marked as invalid
if self.invalid.contains(hash) {
debug!(
target: "txpool",
"[{:?}] skipping invalid transaction",
hash
);
continue
}
let ready =
if let Some(ready) = self.all.get(hash).cloned() { ready } else { continue };
// Insert transactions that just got unlocked.
for hash in &ready.unlocks {
// first check local awaiting transactions
let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
satisfied += 1;
Some((satisfied, tx_ref))
// then get from the pool
} else {
self.all
.get(hash)
.map(|next| (next.requires_offset + 1, next.transaction.clone()))
};
if let Some((satisfied, tx_ref)) = res {
self.independent_or_awaiting(satisfied, tx_ref)
}
}
return Some(best.transaction)
}
}
}

View File

@ -1,232 +0,0 @@
use crate::{
error::PoolResult, identifier::TransactionId, traits::PoolTransaction,
validate::ValidPoolTransaction, TransactionOrdering,
};
use reth_primitives::TxHash;
use std::{
collections::{HashMap, HashSet},
fmt,
sync::Arc,
time::Instant,
};
/// A pool of transactions that are not ready on the current state and are waiting for state changes
/// that turn them valid.
///
/// This could include transactions with nonce gaps: Transactions that are waiting until for a
/// transaction to arrive that closes the nonce gap.
///
/// Keeps a set of transactions that are waiting until their dependencies are unlocked.
pub(crate) struct QueuedTransactions<T: TransactionOrdering> {
/// Dependencies that aren't yet provided by any transaction.
required_dependencies: HashMap<TransactionId, HashSet<TxHash>>,
/// Mapping of the dependencies of a transaction to the hash of the transaction,
waiting_dependencies: HashMap<TransactionId, TxHash>,
/// Transactions that are not ready yet are waiting for another tx to finish,
waiting_queue: HashMap<TxHash, QueuedPoolTransaction<T>>,
}
// == impl QueuedTransactions ==
impl<T: TransactionOrdering> QueuedTransactions<T> {
/// Returns the number of transactions that are currently waiting in this pool for new
/// transactions to satisfy their dependencies.
pub(crate) fn len(&self) -> usize {
self.waiting_queue.len()
}
/// Whether this pool is empty.
pub(crate) fn is_empty(&self) -> bool {
self.waiting_queue.is_empty()
}
/// Returns an iterator over all transactions waiting in this pool.
pub(crate) fn transactions(
&self,
) -> impl Iterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
self.waiting_queue.values().map(|tx| Arc::clone(&tx.transaction))
}
/// Adds a transaction to the queue of transactions
pub(crate) fn add_transaction(&mut self, tx: QueuedPoolTransaction<T>) -> PoolResult<()> {
assert!(!tx.is_satisfied(), "transaction must not be ready");
assert!(
!self.waiting_queue.contains_key(tx.transaction.hash()),
"transaction is already added"
);
if let Some(_replace) = self
.waiting_dependencies
.get(&tx.transaction.transaction_id)
.and_then(|hash| self.waiting_queue.get(hash))
{
// TODO handle transaction underpriced
// // check if underpriced
// if tx.transaction.gas_price() < replace.transaction.gas_price() {
// warn!(target: "txpool", "pending replacement transaction underpriced [{:?}]",
// tx.transaction.hash()); return Err(Error::ReplacementUnderpriced)
// }
}
// add all missing dependencies
for dependency in &tx.missing_dependencies {
self.required_dependencies
.entry(*dependency)
.or_default()
.insert(*tx.transaction.hash());
}
// also track identifying dependencies
self.waiting_dependencies.insert(tx.transaction.transaction_id, *tx.transaction.hash());
// add tx to the queue
self.waiting_queue.insert(*tx.transaction.hash(), tx);
Ok(())
}
/// Returns true if given transaction is part of the queue
pub(crate) fn contains(&self, hash: &TxHash) -> bool {
self.waiting_queue.contains_key(hash)
}
/// Returns the transaction for the hash if it's waiting
pub(crate) fn get(&self, tx_hash: &TxHash) -> Option<&QueuedPoolTransaction<T>> {
self.waiting_queue.get(tx_hash)
}
/// Returns the transactions for the given hashes, `None` if no transaction exists
pub(crate) fn get_all(
&self,
tx_hashes: &[TxHash],
) -> Vec<Option<Arc<ValidPoolTransaction<T::Transaction>>>> {
tx_hashes
.iter()
.map(|hash| self.waiting_queue.get(hash).map(|tx| Arc::clone(&tx.transaction)))
.collect()
}
/// This will check off the dependencies of queued transactions.
///
/// Returns the those transactions that become unlocked (all dependencies checked) and can be
/// moved to the ready queue.
pub(crate) fn satisfy_and_unlock(
&mut self,
id: &TransactionId,
) -> Vec<QueuedPoolTransaction<T>> {
let mut unlocked_ready = Vec::new();
if let Some(tx_hashes) = self.required_dependencies.remove(id) {
for hash in tx_hashes {
let tx = self.waiting_queue.get_mut(&hash).expect("tx is included;");
tx.satisfy(id);
if tx.is_satisfied() {
let tx = self.waiting_queue.remove(&hash).expect("tx is included;");
self.waiting_dependencies.remove(&tx.transaction.transaction_id);
unlocked_ready.push(tx);
}
}
}
unlocked_ready
}
/// Removes the transactions associated with the given hashes
///
/// Returns all removed transactions.
pub(crate) fn remove(
&mut self,
hashes: Vec<TxHash>,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let mut removed = vec![];
for hash in hashes {
if let Some(waiting_tx) = self.waiting_queue.remove(&hash) {
self.waiting_dependencies.remove(&waiting_tx.transaction.transaction_id);
for dependency in waiting_tx.missing_dependencies {
let remove =
if let Some(required) = self.required_dependencies.get_mut(&dependency) {
required.remove(&hash);
required.is_empty()
} else {
false
};
if remove {
self.required_dependencies.remove(&dependency);
}
}
removed.push(waiting_tx.transaction)
}
}
removed
}
}
/// A transaction submitted to the pool.
#[derive(Clone)]
pub(crate) struct QueuedPoolTransaction<T: TransactionOrdering> {
/// The actual validated transaction.
pub(crate) transaction: Arc<ValidPoolTransaction<T::Transaction>>,
/// Transactions required for and have not been satisfied yet by other transactions in the
/// pool.
///
/// This will be an empty list if there are no nonce gaps across multiple transactions of the
/// same sender in the pool. If there are gaps, this will include the missing transactions.
pub(crate) missing_dependencies: HashSet<TransactionId>,
/// Timestamp when the tx was added.
pub(crate) added_at: Instant,
}
impl<T: TransactionOrdering> Default for QueuedTransactions<T> {
fn default() -> Self {
Self {
required_dependencies: Default::default(),
waiting_dependencies: Default::default(),
waiting_queue: Default::default(),
}
}
}
// === impl QuQueuedPoolTransaction ===
impl<T: TransactionOrdering> QueuedPoolTransaction<T> {
/// Creates a new `QueuedPoolTransaction`.
///
/// Determines the dependent transaction that are still missing before this transaction can be
/// moved to the queue.
pub(crate) fn new(
transaction: ValidPoolTransaction<T::Transaction>,
provided: &HashMap<TransactionId, TxHash>,
) -> Self {
let missing_dependencies = transaction
.depends_on
.iter()
.filter(|id| {
// is true if the dependency id is already satisfied either via transaction in the
// pool
!provided.contains_key(&**id)
})
.cloned()
.collect();
Self { transaction: Arc::new(transaction), missing_dependencies, added_at: Instant::now() }
}
/// Removes the required dependency.
pub(crate) fn satisfy(&mut self, id: &TransactionId) {
self.missing_dependencies.remove(id);
}
/// Returns true if transaction has all dependencies are satisfied.
pub(crate) fn is_satisfied(&self) -> bool {
self.missing_dependencies.is_empty()
}
}
impl<T: TransactionOrdering> fmt::Debug for QueuedPoolTransaction<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "QueuedPoolTransaction {{ ")?;
write!(fmt, "added_at: {:?}, ", self.added_at)?;
write!(fmt, "tx: {:?}, ", self.transaction)?;
write!(fmt, "missing_dependencies: [{:?}]", &self.missing_dependencies)?;
write!(fmt, "}}")
}
}

View File

@ -0,0 +1,77 @@
bitflags::bitflags! {
/// Marker to represents the current state of a transaction in the pool and from which the corresponding sub-pool is derived, depending on what bits are set.
///
/// This mirrors [erigon's ephemeral state field](https://github.com/ledgerwatch/erigon/wiki/Transaction-Pool-Design#ordering-function).
#[derive(Default)]
pub(crate) struct TxState: u8 {
/// Set to `1` if the `feeCap` of the transaction meets the chain's minimum `feeCap` requirement.
///
/// This is different from `ENOUGH_FEE_CAP_BLOCK` which tracks on a per-block basis.
const ENOUGH_FEE_CAP_PROTOCOL = 0b100000;
/// Set to `1` of the transaction is either the next transaction of the sender (on chain nonce == tx.nonce) or all prior transactions are also present in the pool.
const NO_NONCE_GAPS = 0b010000;
/// Bit derived from the sender's balance.
///
/// Set to `1` if the sender's balance can cover the maximum cost for this transaction (`feeCap * gasLimit + value`).
/// This includes cumulative costs of prior transactions, which ensures that the sender has enough funds for all max cost of prior transactions.
const ENOUGH_BALANCE = 0b001000;
/// Bit set to true if the transaction has a lower gas limit than the block's gas limit
const NOT_TOO_MUCH_GAS = 0b000100;
/// Covers the Dynamic fee requirement.
///
/// Set to 1 if `feeCap` of the transaction meets the requirement of the pending block.
const ENOUGH_FEE_CAP_BLOCK = 0b000010;
const IS_LOCAL = 0b000001;
const BASE_FEE_POOL_BITS = Self::ENOUGH_FEE_CAP_PROTOCOL.bits | Self::NO_NONCE_GAPS.bits | Self::ENOUGH_BALANCE.bits | Self::NOT_TOO_MUCH_GAS.bits;
const QUEUED_POOL_BITS = Self::ENOUGH_FEE_CAP_PROTOCOL.bits;
}
}
/// Identifier for the used Sub-pool
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)]
pub(crate) enum SubPool {
Queued = 0,
Pending,
BaseFee,
}
// === impl PoolDestination ===
impl SubPool {
/// Whether this transaction is to be moved to the pending sub-pool.
pub(crate) fn is_pending(&self) -> bool {
matches!(self, SubPool::Pending)
}
/// Returns whether this is a promotion depending on the current sub-pool location.
pub(crate) fn is_promoted(&self, other: SubPool) -> bool {
self > &other
}
}
impl From<TxState> for SubPool {
fn from(value: TxState) -> Self {
if value > TxState::BASE_FEE_POOL_BITS {
return SubPool::Pending
}
if value < TxState::QUEUED_POOL_BITS {
return SubPool::Queued
}
SubPool::BaseFee
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tx_state() {
let mut state = TxState::default();
state |= TxState::NO_NONCE_GAPS;
assert!(state.intersects(TxState::NO_NONCE_GAPS))
}
}

View File

@ -0,0 +1,864 @@
//! The internal transaction pool implementation.
use crate::{
error::PoolError,
identifier::{SenderId, TransactionId},
pool::{
best::BestTransactions,
parked::{BasefeeOrd, ParkedPool, QueuedOrd},
pending::PendingPool,
state::{SubPool, TxState},
AddedPendingTransaction, AddedTransaction,
},
PoolResult, PoolTransaction, TransactionOrdering, ValidPoolTransaction, U256,
};
use fnv::FnvHashMap;
use reth_primitives::TxHash;
use std::{
collections::{btree_map::Entry, hash_map, BTreeMap, HashMap},
fmt,
ops::Bound::{Excluded, Included, Unbounded},
sync::Arc,
};
/// A pool that manages transactions.
///
/// This pool maintains the state of all transactions and stores them accordingly.
pub struct TxPool<T: TransactionOrdering> {
/// How to order transactions.
ordering: Arc<T>,
/// Contains the currently known info
sender_info: FnvHashMap<SenderId, SenderInfo>,
/// pending subpool
pending_pool: PendingPool<T>,
/// queued subpool
///
/// Holds all parked transactions that depend on external changes from the sender:
///
/// - blocked by missing ancestor transaction (has nonce gaps)
/// - sender lacks funds to pay for this transaction.
queued_pool: ParkedPool<QueuedOrd<T::Transaction>>,
/// base fee subpool
///
/// Holds all parked transactions that currently violate the dynamic fee requirement but could
/// be moved to pending if the base fee changes in their favor (decreases) in future blocks.
basefee_pool: ParkedPool<BasefeeOrd<T::Transaction>>,
/// All transactions in the pool.
all_transactions: AllTransactions<T::Transaction>,
}
impl<T: TransactionOrdering> TxPool<T> {
/// Create a new graph pool instance.
pub fn new(ordering: Arc<T>) -> Self {
Self {
sender_info: Default::default(),
pending_pool: PendingPool::new(ordering.clone()),
queued_pool: Default::default(),
basefee_pool: Default::default(),
all_transactions: Default::default(),
ordering,
}
}
/// Updates the pool based on the changed base fee.
///
/// This enforces the dynamic fee requirement.
pub(crate) fn update_base_fee(&mut self, new_base_fee: U256) {
// TODO update according to the changed base_fee
todo!()
}
/// Returns an iterator that yields transactions that are ready to be included in the block.
pub(crate) fn best_transactions(&self) -> BestTransactions<T> {
self.pending_pool.best()
}
/// Returns if the transaction for the given hash is already included in this pool
pub(crate) fn contains(&self, tx_hash: &TxHash) -> bool {
self.all_transactions.contains(tx_hash)
}
/// Returns the transaction for the given hash.
pub(crate) fn get(
&self,
tx_hash: &TxHash,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
self.all_transactions.by_hash.get(tx_hash).cloned()
}
/// Adds the transaction into the pool.
///
/// This pool consists of two three-pools: `Queued`, `Pending` and `BaseFee`.
///
/// The `Queued` pool contains transactions with gaps in its dependency tree: It requires
/// additional transaction that are note yet present in the pool. And transactions that the
/// sender can not afford with the current balance.
///
/// The `Pending` pool contains all transactions that have no nonce gaps, and can be afforded by
/// the sender. It only contains transactions that are ready to be included in the pending
/// block.
///
/// The `BaseFee` pool contains transaction that currently can't satisfy the dynamic fee
/// requirement. With EIP-1559, transactions can become executable or not without any changes to
/// the sender's balance or nonce and instead their feeCap determines whether the
/// transaction is _currently_ (on the current state) ready or needs to be parked until the
/// feeCap satisfies the block's baseFee.
pub(crate) fn add_transaction(
&mut self,
tx: ValidPoolTransaction<T::Transaction>,
on_chain_balance: U256,
on_chain_nonce: u64,
) -> PoolResult<AddedTransaction<T::Transaction>> {
// Update sender info
self.sender_info.entry(tx.sender_id).or_default().update(on_chain_nonce, on_chain_balance);
let hash = *tx.hash();
match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) {
InsertResult::Inserted { transaction, move_to, replaced_tx, updates, .. } => {
self.add_new_transaction(transaction, replaced_tx, move_to);
let UpdateOutcome { promoted, discarded, removed } = self.process_updates(updates);
// This transaction was moved to the pending pool.
let res = if move_to.is_pending() {
AddedTransaction::Pending(AddedPendingTransaction {
hash,
promoted,
discarded,
removed,
})
} else {
AddedTransaction::Queued { hash }
};
Ok(res)
}
InsertResult::Underpriced { existing, .. } => {
Err(PoolError::AlreadyAdded(Box::new(existing)))
}
}
}
/// Maintenance task to apply a series of updates.
///
/// This will move/discard the given transaction according to the `PoolUpdate`
fn process_updates(
&mut self,
updates: impl IntoIterator<Item = PoolUpdate>,
) -> UpdateOutcome<T::Transaction> {
let mut outcome = UpdateOutcome::default();
for update in updates {
let PoolUpdate { id, hash, current, destination } = update;
match destination {
Destination::Discard => {
outcome.discarded.push(hash);
}
Destination::Pool(move_to) => {
debug_assert!(!move_to.eq(&current), "destination must be different");
self.move_transaction(current, move_to, &id);
}
}
}
outcome
}
/// Moves a transaction from one sub pool to another.
///
/// This will remove the given transaction from one sub-pool and insert it in the other
/// sub-pool.
fn move_transaction(&mut self, from: SubPool, to: SubPool, id: &TransactionId) {
if let Some(tx) = self.remove_transaction(from, id) {
self.add_transaction_to_pool(to, tx);
}
}
/// Removes the transaction from the given pool
fn remove_transaction(
&mut self,
pool: SubPool,
tx: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
match pool {
SubPool::Queued => self.queued_pool.remove_transaction(tx),
SubPool::Pending => self.pending_pool.remove_transaction(tx),
SubPool::BaseFee => self.basefee_pool.remove_transaction(tx),
}
}
/// Removes the transaction from the given pool
fn add_transaction_to_pool(
&mut self,
pool: SubPool,
tx: Arc<ValidPoolTransaction<T::Transaction>>,
) {
match pool {
SubPool::Queued => {
self.queued_pool.add_transaction(tx);
}
SubPool::Pending => {
self.pending_pool.add_transaction(tx);
}
SubPool::BaseFee => {
self.basefee_pool.add_transaction(tx);
}
}
}
/// Inserts the transaction into the given sub-pool
fn add_new_transaction(
&mut self,
transaction: Arc<ValidPoolTransaction<T::Transaction>>,
replaced: Option<(Arc<ValidPoolTransaction<T::Transaction>>, SubPool)>,
pool: SubPool,
) {
if let Some((replaced, replaced_pool)) = replaced {
// Remove the replaced transaction
self.remove_transaction(replaced_pool, replaced.id());
}
self.add_transaction_to_pool(pool, transaction)
}
/// Returns the current size of the entire pool
pub fn size_of(&self) -> usize {
unimplemented!()
}
/// Ensures that the transactions in the sub-pools are within the given bounds.
///
/// If the current size exceeds the given bounds, the worst transactions are evicted from the
/// pool and returned.
pub fn enforce_size_limits(&mut self) {
unimplemented!()
}
}
/// Container for _all_ transaction in the pool.
///
/// This is the sole entrypoint that's guarding all sub-pools, all sub-pool actions are always
/// derived from this set. Updates returned from this type must be applied to the sub-pools.
pub struct AllTransactions<T: PoolTransaction> {
/// Expected base fee for the pending block.
pending_basefee: U256,
/// Minimum base fee required by the protol.
///
/// Transactions with a lower base fee will never be included by the chain
minimal_protocol_basefee: U256,
/// The max gas limit of the block
block_gas_limit: u64,
/// _All_ transactions identified by their hash.
by_hash: HashMap<TxHash, Arc<ValidPoolTransaction<T>>>,
/// _All_ transaction in the pool sorted by their sender and nonce pair.
txs: BTreeMap<TransactionId, PoolInternalTransaction<T>>,
/// Tracks the number of transactions by sender that are currently in the pool.
tx_counter: FnvHashMap<SenderId, usize>,
}
impl<T: PoolTransaction> AllTransactions<T> {
/// Returns if the transaction for the given hash is already included in this pool
pub(crate) fn contains(&self, tx_hash: &TxHash) -> bool {
self.by_hash.contains_key(tx_hash)
}
/// Returns the internal transaction with additional metadata
pub(crate) fn get(&self, id: &TransactionId) -> Option<&PoolInternalTransaction<T>> {
self.txs.get(id)
}
/// Increments the transaction counter for the sender
pub(crate) fn tx_inc(&mut self, sender: SenderId) {
let count = self.tx_counter.entry(sender).or_default();
*count += 1;
}
/// Decrements the transaction counter for the sender
pub(crate) fn tx_decr(&mut self, sender: SenderId) {
if let hash_map::Entry::Occupied(mut entry) = self.tx_counter.entry(sender) {
let count = entry.get_mut();
if *count == 1 {
entry.remove();
return
}
*count -= 1;
}
}
/// Returns an iterator over all transactions for the given sender, starting with the lowest
/// nonce
pub(crate) fn txs_iter(
&self,
sender: SenderId,
) -> impl Iterator<Item = (&TransactionId, &PoolInternalTransaction<T>)> + '_ {
self.txs
.range((sender.start_bound(), Unbounded))
.take_while(move |(other, _)| sender == other.sender)
}
/// Returns a mutable iterator over all transactions for the given sender, starting with the
/// lowest nonce
pub(crate) fn txs_iter_mut(
&mut self,
sender: SenderId,
) -> impl Iterator<Item = (&TransactionId, &mut PoolInternalTransaction<T>)> + '_ {
self.txs
.range_mut((sender.start_bound(), Unbounded))
.take_while(move |(other, _)| sender == other.sender)
}
/// Returns all transactions that predates the given transaction.
///
/// NOTE: The range is _inclusive_
pub(crate) fn ancestor_txs<'a, 'b: 'a>(
&'a self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a PoolInternalTransaction<T>)> + '_ {
self.txs
.range((Unbounded, Included(id)))
.rev()
.take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all mutable transactions that predates the given transaction.
///
/// NOTE: The range is _inclusive_
pub(crate) fn ancestor_txs_mut<'a, 'b: 'a>(
&'a mut self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a mut PoolInternalTransaction<T>)> + '_ {
self.txs
.range_mut((Unbounded, Included(id)))
.rev()
.take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all transactions that predates the given transaction.
///
/// NOTE: The range is _exclusive_: This does not return the transaction itself
pub(crate) fn ancestor_txs_exclusive<'a, 'b: 'a>(
&'a self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a PoolInternalTransaction<T>)> + '_ {
self.txs.range(..id).rev().take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all transactions that _follow_ after the given id but have the same sender.
///
/// NOTE: The range is _exclusive_
pub(crate) fn descendant_txs_exclusive<'a, 'b: 'a>(
&'a self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a PoolInternalTransaction<T>)> + '_ {
self.txs.range((Excluded(id), Unbounded)).take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all transactions that _follow_ after the given id but have the same sender.
///
/// NOTE: The range is _exclusive_
pub(crate) fn descendant_txs_exclusive_mut<'a, 'b: 'a>(
&'a mut self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a mut PoolInternalTransaction<T>)> + '_ {
self.txs
.range_mut((Excluded(id), Unbounded))
.take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all transactions that _follow_ after the given id but have the same sender.
///
/// NOTE: The range is _inclusive_: if the transaction that belongs to `id` it field be the
/// first value.
pub(crate) fn descendant_txs<'a, 'b: 'a>(
&'a self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a PoolInternalTransaction<T>)> + '_ {
self.txs.range(id..).take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all mutable transactions that _follow_ after the given id but have the same sender.
///
/// NOTE: The range is _inclusive_: if the transaction that belongs to `id` it field be the
/// first value.
pub(crate) fn descendant_txs_mut<'a, 'b: 'a>(
&'a mut self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a mut PoolInternalTransaction<T>)> + '_ {
self.txs.range_mut(id..).take_while(|(other, _)| id.sender == other.sender)
}
/// Removes a transaction from the pool after it was mined.
///
/// This will not trigger additional updates since, because descendants without nonce gaps are
/// already in the pending pool.
pub(crate) fn remove_mined_tx(&mut self, _id: &TransactionId) {
// TODO decrease nonce gap
}
/// Inserts a new transaction into the pool.
///
/// If the transaction already exists, it will be replaced if not underpriced.
/// Returns info to which sub-pool the transaction should be moved.
/// Also returns a set of pool updates triggered by this insert, that need to be handled by the
/// caller.
///
/// These can include:
/// - closing nonce gaps of descendant transactions
/// - enough balance updates
pub(crate) fn insert_tx(
&mut self,
transaction: ValidPoolTransaction<T>,
on_chain_balance: U256,
on_chain_nonce: u64,
) -> InsertResult<T> {
assert!(on_chain_nonce <= transaction.nonce(), "Invalid transaction");
let tx_id = *transaction.id();
let transaction = Arc::new(transaction);
let mut state = TxState::default();
let mut cumulative_cost = U256::zero();
let mut updates = Vec::new();
let predecessor =
TransactionId::ancestor(transaction.transaction.nonce(), on_chain_nonce, tx_id.sender);
// If there's no predecessor then this is the next transaction
if predecessor.is_none() {
state.insert(TxState::NO_NONCE_GAPS);
}
// Check dynamic fee
if let Some(fee) = transaction.max_fee_per_gas() {
if fee >= self.pending_basefee {
state.insert(TxState::ENOUGH_FEE_CAP_BLOCK);
}
if fee > self.minimal_protocol_basefee {
state.insert(TxState::ENOUGH_FEE_CAP_PROTOCOL);
}
} else {
// legacy transactions always satisfy the condition
state.insert(TxState::ENOUGH_FEE_CAP_BLOCK);
state.insert(TxState::ENOUGH_FEE_CAP_PROTOCOL);
}
// Ensure tx does not exceed block gas limit
if transaction.gas_limit() < self.block_gas_limit {
state.insert(TxState::NOT_TOO_MUCH_GAS);
}
let mut replaced_tx = None;
let pool_tx = PoolInternalTransaction {
transaction: transaction.clone(),
subpool: SubPool::Queued,
state,
cumulative_cost,
};
// try to insert the transaction
match self.txs.entry(*transaction.id()) {
Entry::Vacant(entry) => {
// Insert the transaction in both maps
self.by_hash.insert(*pool_tx.transaction.hash(), pool_tx.transaction.clone());
entry.insert(pool_tx);
}
Entry::Occupied(mut entry) => {
// Transaction already exists
// Ensure the new transaction is not underpriced
if transaction.is_underpriced(entry.get().transaction.as_ref()) {
return InsertResult::Underpriced {
transaction: pool_tx.transaction,
existing: *entry.get().transaction.hash(),
}
}
let new_hash = *pool_tx.transaction.hash();
let new_transaction = pool_tx.transaction.clone();
let replaced = entry.insert(pool_tx);
self.by_hash.remove(replaced.transaction.hash());
self.by_hash.insert(new_hash, new_transaction);
// also remove the hash
replaced_tx = Some((replaced.transaction, replaced.subpool));
}
}
// The next transaction of this sender
let on_chain_id = TransactionId::new(transaction.sender_id, on_chain_nonce);
{
// Tracks the next nonce we expect if the transactions are gapless
let mut next_nonce = on_chain_id.nonce;
// Traverse all transactions of the sender and update existing transactions
for (id, tx) in self.descendant_txs_mut(&on_chain_id) {
let current_pool = tx.subpool;
if next_nonce != id.nonce {
// nothing to update
break
}
// close the nonce gap
tx.state.insert(TxState::NO_NONCE_GAPS);
// set cumulative cost
tx.cumulative_cost = cumulative_cost;
// Update for next transaction
cumulative_cost = tx.cumulative_cost + tx.transaction.cost;
if cumulative_cost > on_chain_balance {
// sender lacks sufficient funds to pay for this transaction
tx.state.remove(TxState::ENOUGH_BALANCE);
} else {
tx.state.insert(TxState::ENOUGH_BALANCE);
}
if tx_id.eq(id) {
// if it is the new transaction, track the state
state = tx.state;
} else {
tx.subpool = tx.state.into();
if current_pool != tx.subpool {
updates.push(PoolUpdate {
id: *id,
hash: *tx.transaction.hash(),
current: current_pool,
destination: Destination::Pool(tx.subpool),
})
}
}
// increment for next iteration
next_nonce = id.next_nonce();
}
}
// If this wasn't a replacement transaction we need to update the counter.
if replaced_tx.is_none() {
self.tx_inc(tx_id.sender);
}
InsertResult::Inserted { transaction, move_to: state.into(), state, replaced_tx, updates }
}
/// Rechecks the transaction of the given sender and returns a set of updates.
pub(crate) fn on_mined(&mut self, _sender: &SenderId, _new_balance: U256, _old_balance: U256) {
todo!()
}
/// Number of transactions in the entire pool
pub(crate) fn len(&self) -> usize {
self.txs.len()
}
/// Whether the pool is empty
pub(crate) fn is_empty(&self) -> bool {
self.txs.is_empty()
}
}
impl<T: PoolTransaction> Default for AllTransactions<T> {
fn default() -> Self {
Self {
pending_basefee: Default::default(),
// TODO(mattsse): document
minimal_protocol_basefee: 7u64.into(),
block_gas_limit: 30_000_000,
by_hash: Default::default(),
txs: Default::default(),
tx_counter: Default::default(),
}
}
}
/// Where to move an existing transaction.
#[derive(Debug)]
pub(crate) enum Destination {
/// Discard the transaction.
Discard,
/// Move transaction to pool
Pool(SubPool),
}
/// A change of the transaction's location
///
/// NOTE: this guarantees that `current` and `destination` differ.
#[derive(Debug)]
pub(crate) struct PoolUpdate {
pub(crate) id: TransactionId,
pub(crate) hash: TxHash,
/// Where the transaction is currently held.
pub(crate) current: SubPool,
/// Where to move the transaction to
pub(crate) destination: Destination,
}
/// The outcome of [TxPool::insert_tx]
#[derive(Debug)]
pub(crate) enum InsertResult<T: PoolTransaction> {
/// Transaction was successfully inserted into the pool
Inserted {
transaction: Arc<ValidPoolTransaction<T>>,
move_to: SubPool,
state: TxState,
replaced_tx: Option<(Arc<ValidPoolTransaction<T>>, SubPool)>,
/// Additional updates to transactions affected by this change.
updates: Vec<PoolUpdate>,
},
/// Attempted to replace existing transaction, but was underpriced
Underpriced { transaction: Arc<ValidPoolTransaction<T>>, existing: TxHash },
}
// === impl InsertResult ===
#[allow(missing_docs)]
impl<T: PoolTransaction> InsertResult<T> {
fn is_underpriced(&self) -> bool {
matches!(self, InsertResult::Underpriced { .. })
}
}
/// The internal transaction typed used by `AllTransactions` which also additional info used for
/// determining the current state of the transaction.
pub(crate) struct PoolInternalTransaction<T: PoolTransaction> {
/// The actual transaction object.
transaction: Arc<ValidPoolTransaction<T>>,
/// The `SubPool` that currently contains this transaction.
subpool: SubPool,
/// Keeps track of the current state of the transaction and therefor in which subpool it should
/// reside
state: TxState,
/// The total cost all transactions before this transaction.
///
/// This is the combined `cost` of all transactions from the same sender that currently
/// come before this transaction.
cumulative_cost: U256,
}
// === impl PoolInternalTransaction ===
impl<T: PoolTransaction> PoolInternalTransaction<T> {
fn next_cumulative_cost(&self) -> U256 {
self.cumulative_cost + self.transaction.cost
}
}
/// Tracks the result after updating the pool
#[derive(Debug)]
pub struct UpdateOutcome<T: PoolTransaction> {
/// transactions promoted to the ready queue
promoted: Vec<TxHash>,
/// transaction that failed and became discarded
discarded: Vec<TxHash>,
/// Transactions removed from the Ready pool
removed: Vec<Arc<ValidPoolTransaction<T>>>,
}
impl<T: PoolTransaction> Default for UpdateOutcome<T> {
fn default() -> Self {
Self { promoted: vec![], discarded: vec![], removed: vec![] }
}
}
/// Represents the outcome of a prune
pub struct PruneResult<T: PoolTransaction> {
/// A list of added transactions that a pruned marker satisfied
pub promoted: Vec<AddedTransaction<T>>,
/// all transactions that failed to be promoted and now are discarded
pub failed: Vec<TxHash>,
/// all transactions that were pruned from the ready pool
pub pruned: Vec<Arc<ValidPoolTransaction<T>>>,
}
impl<T: PoolTransaction> fmt::Debug for PruneResult<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "PruneResult {{ ")?;
write!(
fmt,
"promoted: {:?}, ",
self.promoted.iter().map(|tx| *tx.hash()).collect::<Vec<_>>()
)?;
write!(fmt, "failed: {:?}, ", self.failed)?;
write!(
fmt,
"pruned: {:?}, ",
self.pruned.iter().map(|tx| *tx.transaction.hash()).collect::<Vec<_>>()
)?;
write!(fmt, "}}")?;
Ok(())
}
}
/// Stores relevant context about a sender.
#[derive(Debug, Clone, Default)]
struct SenderInfo {
/// current nonce of the sender.
state_nonce: u64,
/// Balance of the sender at the current point.
balance: U256,
}
// === impl SenderInfo ===
impl SenderInfo {
/// Creates a new entry for an incoming, not yet tracked sender.
fn new_incoming(state_nonce: u64, balance: U256) -> Self {
Self { state_nonce, balance }
}
/// Updates the info with the new values.
fn update(&mut self, state_nonce: u64, balance: U256) {
*self = Self { state_nonce, balance };
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_util::{mock_tx_pool, MockTransaction, MockTransactionFactory};
#[test]
fn test_simple_insert() {
let on_chain_balance = U256::zero();
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_price().inc_limit();
let valid_tx = f.validated(tx.clone());
let res = pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce);
match res {
InsertResult::Inserted { updates, replaced_tx, move_to, state, .. } => {
assert!(updates.is_empty());
assert!(replaced_tx.is_none());
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert!(!state.contains(TxState::ENOUGH_BALANCE));
assert_eq!(move_to, SubPool::Queued);
}
InsertResult::Underpriced { .. } => {
panic!("not underpriced")
}
};
assert_eq!(pool.len(), 1);
assert!(pool.contains(valid_tx.hash()));
let expected_state = TxState::ENOUGH_FEE_CAP_BLOCK | TxState::NO_NONCE_GAPS;
let inserted = pool.get(valid_tx.id()).unwrap();
assert!(inserted.state.intersects(expected_state));
// insert the same tx again
let res = pool.insert_tx(valid_tx, on_chain_balance, on_chain_nonce);
assert!(res.is_underpriced());
assert_eq!(pool.len(), 1);
let valid_tx = f.validated(tx.next());
let res = pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce);
match res {
InsertResult::Inserted { updates, replaced_tx, move_to, state, .. } => {
assert!(updates.is_empty());
assert!(replaced_tx.is_none());
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert!(!state.contains(TxState::ENOUGH_BALANCE));
assert_eq!(move_to, SubPool::Queued);
}
InsertResult::Underpriced { .. } => {
panic!("not underpriced")
}
};
assert!(pool.contains(valid_tx.hash()));
assert_eq!(pool.len(), 2);
let inserted = pool.get(valid_tx.id()).unwrap();
assert!(inserted.state.intersects(expected_state));
}
#[test]
fn insert_replace() {
let on_chain_balance = U256::zero();
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_price().inc_limit();
let first = f.validated(tx.clone());
let res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce);
let replacement = f.validated(tx.rng_hash().inc_price());
let res = pool.insert_tx(replacement.clone(), on_chain_balance, on_chain_nonce);
match res {
InsertResult::Inserted { updates, replaced_tx, .. } => {
assert!(updates.is_empty());
let replaced = replaced_tx.unwrap();
assert_eq!(replaced.0.hash(), first.hash());
}
InsertResult::Underpriced { .. } => {
panic!("not underpriced")
}
};
assert!(!pool.contains(first.hash()));
assert!(pool.contains(replacement.hash()));
assert_eq!(pool.len(), 1);
}
// insert nonce then nonce - 1
#[test]
fn insert_previous() {
let on_chain_balance = U256::zero();
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_nonce().inc_price().inc_limit();
let first = f.validated(tx.clone());
let res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce);
let first_in_pool = pool.get(first.id()).unwrap();
// has nonce gap
assert!(!first_in_pool.state.contains(TxState::NO_NONCE_GAPS));
let prev = f.validated(tx.prev());
let res = pool.insert_tx(prev, on_chain_balance, on_chain_nonce);
match res {
InsertResult::Inserted { updates, replaced_tx, state, move_to, .. } => {
// no updates since still in queued pool
assert!(updates.is_empty());
assert!(replaced_tx.is_none());
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert_eq!(move_to, SubPool::Queued);
}
InsertResult::Underpriced { .. } => {
panic!("not underpriced")
}
};
let first_in_pool = pool.get(first.id()).unwrap();
// has non nonce gap
assert!(first_in_pool.state.contains(TxState::NO_NONCE_GAPS));
}
// insert nonce then nonce - 1
#[test]
fn insert_with_updates() {
let on_chain_balance = U256::from(10_000);
let on_chain_nonce = 0;
let mut f = MockTransactionFactory::default();
let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_nonce().set_gas_price(100u64.into()).inc_limit();
let first = f.validated(tx.clone());
let res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce);
let first_in_pool = pool.get(first.id()).unwrap();
// has nonce gap
assert!(!first_in_pool.state.contains(TxState::NO_NONCE_GAPS));
assert_eq!(SubPool::Queued, first_in_pool.subpool);
let prev = f.validated(tx.prev());
let res = pool.insert_tx(prev, on_chain_balance, on_chain_nonce);
match res {
InsertResult::Inserted { updates, replaced_tx, state, move_to, .. } => {
// updated previous tx
assert_eq!(updates.len(), 1);
assert!(replaced_tx.is_none());
assert!(state.contains(TxState::NO_NONCE_GAPS));
assert_eq!(move_to, SubPool::Pending);
}
InsertResult::Underpriced { .. } => {
panic!("not underpriced")
}
};
let first_in_pool = pool.get(first.id()).unwrap();
// has non nonce gap
assert!(first_in_pool.state.contains(TxState::NO_NONCE_GAPS));
assert_eq!(SubPool::Pending, first_in_pool.subpool);
}
}

View File

@ -0,0 +1,375 @@
//! Internal helpers for testing.
#![allow(missing_docs, unused)]
use crate::{
identifier::{SenderIdentifiers, TransactionId},
pool::txpool::TxPool,
PoolTransaction, TransactionOrdering, ValidPoolTransaction,
};
use paste::paste;
use reth_primitives::{Address, TxHash, H256, U256};
use std::{sync::Arc, time::Instant};
pub type MockTxPool = TxPool<MockOrdering>;
pub type MockValidTx = ValidPoolTransaction<MockTransaction>;
/// Create an empty `TxPool`
pub fn mock_tx_pool() -> MockTxPool {
MockTxPool::new(Arc::new(Default::default()))
}
/// Sets the value for the field
macro_rules! set_value {
($this:ident => $field:ident) => {
let new_value = $field;
match $this {
MockTransaction::Legacy { ref mut $field, .. } => {
*$field = new_value;
}
MockTransaction::Eip1559 { ref mut $field, .. } => {
*$field = new_value;
}
}
};
}
/// Sets the value fo the field
macro_rules! get_value {
($this:ident => $field:ident) => {
match $this {
MockTransaction::Legacy { $field, .. } => $field,
MockTransaction::Eip1559 { $field, .. } => $field,
}
};
}
// Generates all setters and getters
macro_rules! make_setters_getters {
($($name:ident => $t:ty);*) => {
paste! {
$(
pub fn [<set_ $name>](&mut self, $name: $t) -> &mut Self {
set_value!(self => $name);
self
}
pub fn [<with_$name>](mut self, $name: $t) -> Self {
set_value!(self => $name);
self
}
pub fn [<get_$name>](&self) -> $t {
get_value!(self => $name).clone()
}
)*
}
};
}
/// A Bare transaction type used for testing.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum MockTransaction {
Legacy {
hash: H256,
sender: Address,
nonce: u64,
gas_price: U256,
gas_limit: u64,
value: U256,
},
Eip1559 {
hash: H256,
sender: Address,
nonce: u64,
max_fee_per_gas: U256,
max_priority_fee_per_gas: U256,
gas_limit: u64,
value: U256,
},
}
// === impl MockTransaction ===
impl MockTransaction {
make_setters_getters! {
nonce => u64;
hash => H256;
sender => Address;
gas_limit => u64;
value => U256
}
/// Returns a new legacy transaction with random address and hash and empty values
pub fn legacy() -> Self {
MockTransaction::Legacy {
hash: H256::random(),
sender: Address::random(),
nonce: 0,
gas_price: U256::zero(),
gas_limit: 0,
value: Default::default(),
}
}
/// Returns a new EIP1559 transaction with random address and hash and empty values
pub fn eip1559() -> Self {
MockTransaction::Eip1559 {
hash: H256::random(),
sender: Address::random(),
nonce: 0,
max_fee_per_gas: U256::zero(),
max_priority_fee_per_gas: U256::zero(),
gas_limit: 0,
value: Default::default(),
}
}
pub fn set_priority_fee(&mut self, val: U256) -> &mut Self {
if let MockTransaction::Eip1559 { max_priority_fee_per_gas, .. } = self {
*max_priority_fee_per_gas = val;
}
self
}
pub fn with_priority_fee(mut self, val: U256) -> Self {
if let MockTransaction::Eip1559 { ref mut max_priority_fee_per_gas, .. } = self {
*max_priority_fee_per_gas = val;
}
self
}
pub fn get_priority_fee(&self) -> Option<U256> {
if let MockTransaction::Eip1559 { max_priority_fee_per_gas, .. } = self {
Some(*max_priority_fee_per_gas)
} else {
None
}
}
pub fn set_max_fee(&mut self, val: U256) -> &mut Self {
if let MockTransaction::Eip1559 { max_fee_per_gas, .. } = self {
*max_fee_per_gas = val;
}
self
}
pub fn with_max_fee(mut self, val: U256) -> Self {
if let MockTransaction::Eip1559 { ref mut max_fee_per_gas, .. } = self {
*max_fee_per_gas = val;
}
self
}
pub fn get_max_fee(&self) -> Option<U256> {
if let MockTransaction::Eip1559 { max_fee_per_gas, .. } = self {
Some(*max_fee_per_gas)
} else {
None
}
}
pub fn set_gas_price(&mut self, val: U256) -> &mut Self {
match self {
MockTransaction::Legacy { gas_price, .. } => {
*gas_price = val;
}
MockTransaction::Eip1559 { max_fee_per_gas, max_priority_fee_per_gas, .. } => {
*max_fee_per_gas = val;
*max_priority_fee_per_gas = val;
}
}
self
}
pub fn with_gas_price(mut self, val: U256) -> Self {
match self {
MockTransaction::Legacy { ref mut gas_price, .. } => {
*gas_price = val;
}
MockTransaction::Eip1559 {
ref mut max_fee_per_gas,
ref mut max_priority_fee_per_gas,
..
} => {
*max_fee_per_gas = val;
*max_priority_fee_per_gas = val;
}
}
self
}
pub fn get_gas_price(&self) -> U256 {
match self {
MockTransaction::Legacy { gas_price, .. } => *gas_price,
MockTransaction::Eip1559 { max_fee_per_gas, .. } => *max_fee_per_gas,
}
}
/// Returns a clone with a decreased nonce
pub fn prev(&self) -> Self {
let mut next = self.clone().with_hash(H256::random());
next.with_nonce(self.get_nonce() - 1)
}
/// Returns a clone with an increased nonce
pub fn next(&self) -> Self {
let mut next = self.clone().with_hash(H256::random());
next.with_nonce(self.get_nonce() + 1)
}
/// Returns a clone with an increased nonce
pub fn skip(&self, skip: u64) -> Self {
let mut next = self.clone().with_hash(H256::random());
next.with_nonce(self.get_nonce() + skip + 1)
}
/// Returns a clone with incremented nonce
pub fn inc_nonce(mut self) -> Self {
let nonce = self.get_nonce() + 1;
self.with_nonce(nonce)
}
/// Sets a new random hash
pub fn rng_hash(mut self) -> Self {
self.with_hash(H256::random())
}
/// Returns a new transaction with a higher gas price +1
pub fn inc_price(&self) -> Self {
let mut next = self.clone();
let gas = self.get_gas_price() + 1;
next.with_gas_price(gas)
}
/// Returns a new transaction with a higher value
pub fn inc_value(&self) -> Self {
let mut next = self.clone();
let val = self.get_value() + 1;
next.with_value(val)
}
/// Returns a new transaction with a higher gas limit
pub fn inc_limit(&self) -> Self {
let mut next = self.clone();
let gas = self.get_gas_limit() + 1;
next.with_gas_limit(gas)
}
}
impl PoolTransaction for MockTransaction {
fn hash(&self) -> &TxHash {
match self {
MockTransaction::Legacy { hash, .. } => hash,
MockTransaction::Eip1559 { hash, .. } => hash,
}
}
fn sender(&self) -> &Address {
match self {
MockTransaction::Legacy { sender, .. } => sender,
MockTransaction::Eip1559 { sender, .. } => sender,
}
}
fn nonce(&self) -> u64 {
match self {
MockTransaction::Legacy { nonce, .. } => *nonce,
MockTransaction::Eip1559 { nonce, .. } => *nonce,
}
}
fn cost(&self) -> U256 {
match self {
MockTransaction::Legacy { gas_price, value, gas_limit, .. } => {
U256::from(*gas_limit) * *gas_price + *value
}
MockTransaction::Eip1559 { max_fee_per_gas, value, gas_limit, .. } => {
U256::from(*gas_limit) * *max_fee_per_gas + *value
}
}
}
fn effective_gas_price(&self) -> U256 {
self.get_gas_price()
}
fn gas_limit(&self) -> u64 {
self.get_gas_limit()
}
fn max_fee_per_gas(&self) -> Option<U256> {
match self {
MockTransaction::Legacy { .. } => None,
MockTransaction::Eip1559 { max_fee_per_gas, .. } => Some(*max_fee_per_gas),
}
}
fn max_priority_fee_per_gas(&self) -> Option<U256> {
match self {
MockTransaction::Legacy { .. } => None,
MockTransaction::Eip1559 { max_priority_fee_per_gas, .. } => {
Some(*max_priority_fee_per_gas)
}
}
}
}
#[derive(Default)]
pub struct MockTransactionFactory {
ids: SenderIdentifiers,
}
// === impl MockTransactionFactory ===
impl MockTransactionFactory {
pub fn tx_id(&mut self, tx: &MockTransaction) -> TransactionId {
let sender = self.ids.sender_id_or_create(tx.get_sender());
TransactionId::new(sender, tx.get_nonce())
}
/// Converts the transaction into a validated transaction
pub fn validated(&mut self, transaction: MockTransaction) -> MockValidTx {
let transaction_id = self.tx_id(&transaction);
MockValidTx {
propagate: false,
is_local: false,
sender_id: transaction_id.sender,
transaction_id,
cost: transaction.cost(),
transaction,
timestamp: Instant::now(),
}
}
pub fn create_legacy(&mut self) -> MockValidTx {
self.validated(MockTransaction::legacy())
}
pub fn create_eip1559(&mut self) -> MockValidTx {
self.validated(MockTransaction::eip1559())
}
}
#[derive(Default)]
#[non_exhaustive]
pub struct MockOrdering;
impl TransactionOrdering for MockOrdering {
type Priority = U256;
type Transaction = MockTransaction;
fn priority(&self, transaction: &Self::Transaction) -> Self::Priority {
transaction.cost()
}
}
#[test]
fn test_mock_priority() {
let o = MockOrdering;
let lo = MockTransaction::eip1559();
let hi = lo.next().inc_value();
assert!(o.priority(&hi) > o.priority(&lo));
}

View File

@ -96,7 +96,7 @@ impl<T> BestTransactions for std::iter::Empty<T> {
} }
/// Trait for transaction types used inside the pool /// Trait for transaction types used inside the pool
pub trait PoolTransaction: fmt::Debug + Send + Send + 'static { pub trait PoolTransaction: fmt::Debug + Send + Sync + 'static {
/// Hash of the transaction /// Hash of the transaction
fn hash(&self) -> &TxHash; fn hash(&self) -> &TxHash;
@ -111,13 +111,19 @@ pub trait PoolTransaction: fmt::Debug + Send + Send + 'static {
/// For EIP-1559 transactions that is `feeCap x gasLimit + transferred_value` /// For EIP-1559 transactions that is `feeCap x gasLimit + transferred_value`
fn cost(&self) -> U256; fn cost(&self) -> U256;
/// Returns the effective gas price for this transaction.
fn effective_gas_price(&self) -> U256;
/// Amount of gas that should be used in executing this transaction. This is paid up-front.
fn gas_limit(&self) -> u64;
/// Returns the EIP-1559 Max base fee the caller is willing to pay. /// Returns the EIP-1559 Max base fee the caller is willing to pay.
/// ///
/// This will return `None` for non-EIP1559 transactions /// This will return `None` for non-EIP1559 transactions
fn max_fee_per_gas(&self) -> Option<&U256>; fn max_fee_per_gas(&self) -> Option<U256>;
/// Returns the EIP-1559 Priority fee the caller is paying to the block author. /// Returns the EIP-1559 Priority fee the caller is paying to the block author.
/// ///
/// This will return `None` for non-EIP1559 transactions /// This will return `None` for non-EIP1559 transactions
fn max_priority_fee_per_gas(&self) -> Option<&U256>; fn max_priority_fee_per_gas(&self) -> Option<U256>;
} }

View File

@ -6,7 +6,7 @@ use crate::{
traits::PoolTransaction, traits::PoolTransaction,
}; };
use reth_primitives::{BlockID, TxHash, U256}; use reth_primitives::{BlockID, TxHash, U256};
use std::fmt; use std::{fmt, time::Instant};
/// A Result type returned after checking a transaction's validity. /// A Result type returned after checking a transaction's validity.
pub enum TransactionValidationOutcome<T: PoolTransaction> { pub enum TransactionValidationOutcome<T: PoolTransaction> {
@ -51,29 +51,67 @@ pub trait TransactionValidator: Send + Sync {
pub struct ValidPoolTransaction<T: PoolTransaction> { pub struct ValidPoolTransaction<T: PoolTransaction> {
/// The transaction /// The transaction
pub transaction: T, pub transaction: T,
/// Ids required by the transaction.
///
/// This lists all unique transactions that need to be mined before this transaction can be
/// considered `pending` and itself be included.
pub depends_on: Vec<TransactionId>,
/// The identifier for this transaction. /// The identifier for this transaction.
pub transaction_id: TransactionId, pub transaction_id: TransactionId,
/// Whether to propagate the transaction. /// Whether to propagate the transaction.
pub propagate: bool, pub propagate: bool,
/// Internal `Sender` identifier /// Whether the tx is from a local source.
pub is_local: bool,
/// Internal `Sender` identifier.
pub sender_id: SenderId, pub sender_id: SenderId,
/// Total cost of the transaction: `feeCap x gasLimit + transferred_value` /// Total cost of the transaction: `feeCap x gasLimit + transferred_value`.
pub cost: U256, pub cost: U256,
// TODO add a block timestamp that marks validity /// Timestamp when this was added to the pool.
pub timestamp: Instant,
} }
// === impl ValidPoolTransaction === // === impl ValidPoolTransaction ===
impl<T: PoolTransaction> ValidPoolTransaction<T> { impl<T: PoolTransaction> ValidPoolTransaction<T> {
/// Returns the hash of the transaction /// Returns the hash of the transaction.
pub fn hash(&self) -> &TxHash { pub fn hash(&self) -> &TxHash {
self.transaction.hash() self.transaction.hash()
} }
/// Returns the internal identifier for this transaction.
pub(crate) fn id(&self) -> &TransactionId {
&self.transaction_id
}
/// Returns the nonce set for this transaction.
pub(crate) fn nonce(&self) -> u64 {
self.transaction.nonce()
}
/// Returns the EIP-1559 Max base fee the caller is willing to pay.
pub(crate) fn max_fee_per_gas(&self) -> Option<U256> {
self.transaction.max_fee_per_gas()
}
/// Amount of gas that should be used in executing this transaction. This is paid up-front.
pub(crate) fn gas_limit(&self) -> u64 {
self.transaction.gas_limit()
}
/// Returns true if this transaction is underpriced compared to the other.
pub(crate) fn is_underpriced(&self, other: &Self) -> bool {
self.transaction.effective_gas_price() <= other.transaction.effective_gas_price()
}
}
#[cfg(test)]
impl<T: PoolTransaction + Clone> Clone for ValidPoolTransaction<T> {
fn clone(&self) -> Self {
Self {
transaction: self.transaction.clone(),
transaction_id: self.transaction_id,
propagate: self.propagate,
is_local: self.is_local,
sender_id: self.sender_id,
cost: self.cost,
timestamp: self.timestamp,
}
}
} }
impl<T: PoolTransaction> fmt::Debug for ValidPoolTransaction<T> { impl<T: PoolTransaction> fmt::Debug for ValidPoolTransaction<T> {
@ -81,7 +119,6 @@ impl<T: PoolTransaction> fmt::Debug for ValidPoolTransaction<T> {
write!(fmt, "Transaction {{ ")?; write!(fmt, "Transaction {{ ")?;
write!(fmt, "hash: {:?}, ", &self.transaction.hash())?; write!(fmt, "hash: {:?}, ", &self.transaction.hash())?;
write!(fmt, "provides: {:?}, ", &self.transaction_id)?; write!(fmt, "provides: {:?}, ", &self.transaction_id)?;
write!(fmt, "depends_on: {:?}, ", &self.depends_on)?;
write!(fmt, "raw tx: {:?}", &self.transaction)?; write!(fmt, "raw tx: {:?}", &self.transaction)?;
write!(fmt, "}}")?; write!(fmt, "}}")?;
Ok(()) Ok(())