From 3fed7cfe219e237e0c6fba805786c9c8a1c465dc Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 11 Oct 2022 17:10:02 +0200 Subject: [PATCH] feat(txpool): new txpool design (#22) --- Cargo.lock | 8 + crates/transaction-pool/Cargo.toml | 4 + crates/transaction-pool/src/identifier.rs | 68 +- crates/transaction-pool/src/lib.rs | 3 + crates/transaction-pool/src/ordering.rs | 9 +- crates/transaction-pool/src/pool/best.rs | 60 ++ crates/transaction-pool/src/pool/mod.rs | 377 ++------- crates/transaction-pool/src/pool/parked.rs | 194 +++++ crates/transaction-pool/src/pool/pending.rs | 564 ++----------- crates/transaction-pool/src/pool/queued.rs | 232 ------ crates/transaction-pool/src/pool/state.rs | 77 ++ crates/transaction-pool/src/pool/txpool.rs | 864 ++++++++++++++++++++ crates/transaction-pool/src/test_util.rs | 375 +++++++++ crates/transaction-pool/src/traits.rs | 12 +- crates/transaction-pool/src/validate.rs | 59 +- 15 files changed, 1847 insertions(+), 1059 deletions(-) create mode 100644 crates/transaction-pool/src/pool/best.rs create mode 100644 crates/transaction-pool/src/pool/parked.rs delete mode 100644 crates/transaction-pool/src/pool/queued.rs create mode 100644 crates/transaction-pool/src/pool/state.rs create mode 100644 crates/transaction-pool/src/pool/txpool.rs create mode 100644 crates/transaction-pool/src/test_util.rs diff --git a/Cargo.lock b/Cargo.lock index a51aa0fb6..b37b3d459 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1488,6 +1488,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "paste" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1de2e551fb905ac83f73f7aedf2f0cb4a0da7e35efa24a202a936269f1f18e1" + [[package]] name = "peeking_take_while" version = "0.1.2" @@ -1811,10 +1817,12 @@ name = "reth-transaction-pool" version = "0.1.0" dependencies = [ "async-trait", + "bitflags", "fnv", "futures", "linked-hash-map", "parking_lot", + "paste", "reth-primitives", "serde", "thiserror", diff --git a/crates/transaction-pool/Cargo.toml b/crates/transaction-pool/Cargo.toml index 157c6ef32..e0bcb6650 100644 --- a/crates/transaction-pool/Cargo.toml +++ b/crates/transaction-pool/Cargo.toml @@ -25,3 +25,7 @@ tracing = "0.1" serde = { version = "1.0", features = ["derive"] } linked-hash-map = "0.5" fnv = "1.0.7" +bitflags = "1.3" + +[dev-dependencies] +paste = "1.0" diff --git a/crates/transaction-pool/src/identifier.rs b/crates/transaction-pool/src/identifier.rs index cbebe4cbc..df31fc7eb 100644 --- a/crates/transaction-pool/src/identifier.rs +++ b/crates/transaction-pool/src/identifier.rs @@ -1,12 +1,11 @@ -use crate::U256; use fnv::FnvHashMap; use reth_primitives::Address; -use std::collections::HashMap; +use std::{collections::HashMap, ops::Bound}; /// An internal mapping of addresses. /// /// This assigns a _unique_ `SenderId` for a new `Address`. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct SenderIdentifiers { /// The identifier to use next. id: u64, @@ -53,6 +52,21 @@ impl SenderIdentifiers { #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] pub struct SenderId(u64); +// === impl SenderId === + +impl SenderId { + /// Returns a `Bound` for `TransactionId` starting with nonce `0` + pub(crate) fn start_bound(self) -> Bound { + Bound::Included(TransactionId::new(self, 0)) + } +} + +impl From for SenderId { + fn from(value: u64) -> Self { + SenderId(value) + } +} + /// A unique identifier of a transaction of a Sender. /// /// This serves as an identifier for dependencies of a transaction: @@ -73,11 +87,11 @@ impl TransactionId { Self { sender, nonce } } - /// Returns the id a transactions depends on + /// Returns the `TransactionId` this transaction depends on. /// /// This returns `transaction_nonce - 1` if `transaction_nonce` is higher than the /// `on_chain_none` - pub fn dependency( + pub fn ancestor( transaction_nonce: u64, on_chain_nonce: u64, sender: SenderId, @@ -92,4 +106,48 @@ impl TransactionId { None } } + + /// Returns the `TransactionId` that would come before this transaction. + pub(crate) fn unchecked_ancestor(&self) -> Option { + if self.nonce == 0 { + None + } else { + Some(TransactionId::new(self.sender, self.nonce - 1)) + } + } + + /// Returns the `TransactionId` that directly follows this transaction: `self.nonce + 1` + pub fn descendant(&self) -> TransactionId { + TransactionId::new(self.sender, self.nonce + 1) + } + + /// Returns the nonce the follows directly after this. + #[inline] + pub(crate) fn next_nonce(&self) -> u64 { + self.nonce + 1 + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::collections::BTreeSet; + + #[test] + fn test_transaction_id_ord_eq_sender() { + let tx1 = TransactionId::new(100u64.into(), 0u64); + let tx2 = TransactionId::new(100u64.into(), 1u64); + assert!(tx2 > tx1); + let set = BTreeSet::from([tx1, tx2]); + assert_eq!(set.into_iter().collect::>(), vec![tx1, tx2]); + } + + #[test] + fn test_transaction_id_ord() { + let tx1 = TransactionId::new(99u64.into(), 0u64); + let tx2 = TransactionId::new(100u64.into(), 1u64); + assert!(tx2 > tx1); + let set = BTreeSet::from([tx1, tx2]); + assert_eq!(set.into_iter().collect::>(), vec![tx1, tx2]); + } } diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 55648a6b4..cff266160 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -22,6 +22,9 @@ pub mod pool; mod traits; mod validate; +#[cfg(test)] +mod test_util; + pub use crate::{ client::PoolClient, config::PoolConfig, diff --git a/crates/transaction-pool/src/ordering.rs b/crates/transaction-pool/src/ordering.rs index 456bd5b5f..164cb6780 100644 --- a/crates/transaction-pool/src/ordering.rs +++ b/crates/transaction-pool/src/ordering.rs @@ -1,13 +1,12 @@ use crate::traits::PoolTransaction; use std::fmt; - -/// Transaction ordering. +/// Transaction ordering trait to determine the order of transactions. /// /// Decides how transactions should be ordered within the pool. /// -/// The returned priority must reflect natural `Ordering`. -// TODO: for custom, more advanced scoring it would be ideal to determine the priority in the -// context of the entire pool instead of standalone by alone looking at a single transaction +/// The returned priority must reflect natural `Ordering` +// TODO(mattsse) this should be extended so it provides a way to rank transaction in relation to +// each other. pub trait TransactionOrdering: Send + Sync + 'static { /// Priority of a transaction. type Priority: Ord + Clone + Default + fmt::Debug + Send + Sync; diff --git a/crates/transaction-pool/src/pool/best.rs b/crates/transaction-pool/src/pool/best.rs new file mode 100644 index 000000000..fa40d3d34 --- /dev/null +++ b/crates/transaction-pool/src/pool/best.rs @@ -0,0 +1,60 @@ +use crate::{ + identifier::TransactionId, + pool::pending::{PendingTransaction, PendingTransactionRef}, + TransactionOrdering, ValidPoolTransaction, +}; +use reth_primitives::H256 as TxHash; +use std::{ + collections::{BTreeMap, BTreeSet, HashSet}, + sync::Arc, +}; +use tracing::debug; + +/// An iterator that returns transactions that can be executed on the current state. +pub struct BestTransactions { + pub(crate) all: BTreeMap>>, + pub(crate) independent: BTreeSet>, + pub(crate) invalid: HashSet, +} + +impl BestTransactions { + /// Mark the transaction and it's descendants as invalid. + pub(crate) fn mark_invalid(&mut self, tx: &Arc>) { + self.invalid.insert(*tx.hash()); + } +} + +impl crate::traits::BestTransactions for BestTransactions { + fn mark_invalid(&mut self, tx: &Self::Item) { + BestTransactions::mark_invalid(self, tx) + } +} + +impl Iterator for BestTransactions { + type Item = Arc>; + + fn next(&mut self) -> Option { + loop { + let best = self.independent.iter().next_back()?.clone(); + let best = self.independent.take(&best)?; + let hash = best.transaction.hash(); + + // skip transactions that were marked as invalid + if self.invalid.contains(hash) { + debug!( + target: "txpool", + "[{:?}] skipping invalid transaction", + hash + ); + continue + } + + // Insert transactions that just got unlocked. + if let Some(unlocked) = self.all.get(&best.unlocks()) { + self.independent.insert(unlocked.transaction.clone()); + } + + return Some(best.transaction) + } + } +} diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 869af787e..ea0006d3d 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -1,55 +1,51 @@ //! Transaction Pool internals. //! -//! Incoming transactions are validated first. The validation outcome can have 3 states: +//! Incoming transactions are before they enter the pool first. The validation outcome can have 3 +//! states: //! -//! 1. Transaction can _never_ be valid -//! 2. Transaction is _currently_ valid -//! 3. Transaction is _currently_ invalid, but could potentially become valid in the future +//! 1. Transaction can _never_ be valid +//! 2. Transaction is _currently_ valid +//! 3. Transaction is _currently_ invalid, but could potentially become valid in the future //! //! However, (2.) and (3.) of a transaction can only be determined on the basis of the current -//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) need to -//! be reevaluated again. +//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) the +//! state of a transaction needs to be reevaluated again. //! //! The transaction pool is responsible for storing new, valid transactions and providing the next //! best transactions sorted by their priority. Where priority is determined by the transaction's -//! score. -//! -//! However, the score is also only valid for the current state. +//! score ([`TransactionOrdering`]). //! //! Furthermore, the following characteristics fall under (3.): //! -//! a) Nonce of a transaction is higher than the expected nonce for the next transaction of its +//! a) Nonce of a transaction is higher than the expected nonce for the next transaction of its //! sender. A distinction is made here whether multiple transactions from the same sender have -//! gapless nonce increments. a)(1) If _no_ transaction is missing in a chain of multiple +//! gapless nonce increments. +//! +//! a)(1) If _no_ transaction is missing in a chain of multiple //! transactions from the same sender (all nonce in row), all of them can in principle be executed -//! on the current state one after the other. a)(2) If there's a nonce gap, then all +//! on the current state one after the other. +//! +//! a)(2) If there's a nonce gap, then all //! transactions after the missing transaction are blocked until the missing transaction arrives. -//! b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The fee -//! cap of the transaction needs to be no less than the base fee of block. +//! +//! b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The +//! fee cap of the transaction needs to be no less than the base fee of block. //! //! -//! In essence the transaction pool is made of two separate sub-pools: +//! In essence the transaction pool is made of three separate sub-pools: //! -//! _Pending Pool_: Contains all transactions that are valid on the current state and satisfy -//! (3. a)(1): _No_ nonce gaps _Queued Pool_: Contains all transactions that are currently -//! blocked by missing transactions: (3. a)(2): _With_ nonce gaps +//! - Pending Pool: Contains all transactions that are valid on the current state and satisfy +//! (3. a)(1): _No_ nonce gaps //! -//! To account for the dynamic base fee requirement (3. b) which could render an EIP-1559 and all -//! subsequent transactions of the sender currently invalid, the pending pool itself consists of two -//! queues: +//! - Queued Pool: Contains all transactions that are currently blocked by missing +//! transactions: (3. a)(2): _With_ nonce gaps or due to lack of funds. //! -//! _Ready Queue_: Contains all transactions that can be executed on the current state -//! _Parked Queue_: Contains all transactions that either do not currently meet the dynamic -//! base fee requirement or are blocked by a previous transaction that violates it. +//! - Basefee Pool: To account for the dynamic base fee requirement (3. b) which could render +//! an EIP-1559 and all subsequent transactions of the sender currently invalid. //! -//! The classification of transaction in which queue it belongs depends on the current base fee and -//! must be updated after changes: -//! -//! - Base Fee increases: recheck the _Ready Queue_ and evict transactions that don't satisfy -//! the new base fee, or depend on a transaction that no longer satisfies it, and move them -//! to the _Parked Queue_. -//! - Base Fee decreases: recheck the _Parked Queue_ and move all transactions that now satisfy -//! the new base fee to the _Ready Queue_. +//! The classification of transactions is always dependent on the current state that is changed as +//! soon as a new block is mined. Once a new block is mined, the account changeset must be applied +//! to the transaction pool. //! //! //! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool) @@ -61,45 +57,36 @@ //! //! ## Terminology //! -//! - _Pending_: pending transactions are transactions that fall under (2.). Those transactions -//! are _currently_ ready to be executed and are stored in the `pending` sub-pool -//! - _Queued_: queued transactions are transactions that fall under category (3.). Those -//! transactions are _currently_ waiting for state changes that eventually move them into -//! category (2.) and become pending. +//! - _Pending_: pending transactions are transactions that fall under (2.). Those transactions are +//! _currently_ ready to be executed and are stored in the pending sub-pool +//! - _Queued_: queued transactions are transactions that fall under category (3.). Those +//! transactions are _currently_ waiting for state changes that eventually move them into +//! category (2.) and become pending. + use crate::{ - error::{PoolError, PoolResult}, - pool::{ - listener::PoolEventListener, - pending::PendingTransactions, - queued::{QueuedPoolTransaction, QueuedTransactions}, - }, - traits::PoolTransaction, - validate::ValidPoolTransaction, - BlockID, PoolClient, PoolConfig, TransactionOrdering, TransactionValidator, U256, + error::PoolResult, pool::listener::PoolEventListener, traits::PoolTransaction, + validate::ValidPoolTransaction, BlockID, PoolClient, PoolConfig, TransactionOrdering, + TransactionValidator, U256, }; -use fnv::FnvHashMap; + +use best::BestTransactions; use futures::channel::mpsc::{channel, Receiver, Sender}; use parking_lot::{Mutex, RwLock}; use reth_primitives::{TxHash, U64}; -use std::{ - collections::{HashMap, VecDeque}, - fmt, - sync::Arc, -}; -use tracing::{debug, trace, warn}; +use std::{collections::HashMap, sync::Arc}; +use tracing::warn; +mod best; mod events; mod listener; +mod parked; mod pending; -mod queued; +pub(crate) mod state; mod transaction; +pub mod txpool; -use crate::{ - identifier::{SenderId, TransactionId}, - validate::TransactionValidationOutcome, -}; +use crate::{pool::txpool::TxPool, validate::TransactionValidationOutcome}; pub use events::TransactionEvent; -pub use pending::TransactionsIterator; /// Shareable Transaction pool. pub struct BasicPool { @@ -205,7 +192,7 @@ pub struct PoolInner { /// Chain/Storage access. client: Arc

, /// The internal pool that manages - pool: RwLock>, + pool: RwLock>, /// Pool settings. config: PoolConfig, /// Manages listeners for transaction state change events. @@ -227,7 +214,7 @@ where client, config, event_listener: Default::default(), - pool: RwLock::new(GraphPool::new(ordering)), + pool: RwLock::new(TxPool::new(ordering)), ready_transaction_listener: Default::default(), } } @@ -262,7 +249,7 @@ where tx: TransactionValidationOutcome, ) -> PoolResult { match tx { - TransactionValidationOutcome::Valid { balance, state_nonce, transaction } => { + TransactionValidationOutcome::Valid { balance: _, state_nonce: _, transaction: _ } => { // TODO create `ValidPoolTransaction` // let added = self.pool.write().add_transaction(tx)?; @@ -330,250 +317,8 @@ where } /// Returns an iterator that yields transactions that are ready to be included in the block. - pub(crate) fn ready_transactions(&self) -> TransactionsIterator { - self.pool.read().ready_transactions() - } -} - -/// A pool that only manages transactions. -/// -/// This pool maintains a dependency graph of transactions and provides the currently ready -/// transactions. -pub struct GraphPool { - /// How to order transactions. - ordering: Arc, - /// Contains the currently known info - sender_info: FnvHashMap, - /// Sub-Pool of transactions that are ready and waiting to be executed - pending: PendingTransactions, - /// Sub-Pool of transactions that are waiting for state changes that eventually turn them - /// valid, so they can be moved in the `pending` pool. - queued: QueuedTransactions, -} - -// === impl PoolInner === - -impl GraphPool { - /// Create a new graph pool instance. - pub fn new(ordering: Arc) -> Self { - let pending = PendingTransactions::new(Arc::clone(&ordering)); - Self { ordering, sender_info: Default::default(), pending, queued: Default::default() } - } - - /// Updates the pool based on the changed base fee. - /// - /// This enforces the dynamic fee requirement. - /// If the `new_base_fee` is _higher_ than previous base fee, all EIP-1559 transactions in the - /// ready queue that now violate the dynamic fee requirement need to parked. - /// If the `new_base_fee` is _lower_ than the previous base fee, all parked transactions that - /// now satisfy the dynamic fee requirement need to moved to the ready queue. - pub(crate) fn update_base_fee(&mut self, new_base_fee: U256) { - let _old_base_fee = self.pending.set_next_base_fee(new_base_fee); - // TODO update according to the changed base_fee - todo!() - } - - /// Returns if the transaction for the given hash is already included in this pool - pub(crate) fn contains(&self, tx_hash: &TxHash) -> bool { - self.queued.contains(tx_hash) || self.pending.contains(tx_hash) - } - - /// Returns an iterator that yields transactions that are ready to be included in the block. - pub(crate) fn ready_transactions(&self) -> TransactionsIterator { - self.pending.get_transactions() - } - - /// Adds the transaction into the pool - /// - /// This pool consists of two sub-pools: `Queued` and `Pending`. - /// - /// The `Queued` pool contains transaction with gaps in its dependency tree: It requires - /// additional transaction that are note yet present in the pool. - /// - /// The `Pending` pool contains all transactions that have all their dependencies satisfied (no - /// nonce gaps). It consists of two parts: `Parked` and `Ready`. - /// - /// The `Ready` queue contains transactions that are ready to be included in the pending block. - /// With EIP-1559, transactions can become executable or not without any changes to the - /// sender's balance or nonce and instead their feeCap determines whether the transaction is - /// _currently_ (on the current state) ready or needs to be parked until the feeCap satisfies - /// the block's baseFee. - fn add_transaction( - &mut self, - tx: ValidPoolTransaction, - ) -> PoolResult> { - if self.contains(tx.hash()) { - warn!(target: "txpool", "[{:?}] Already added", tx.hash()); - return Err(PoolError::AlreadyAdded(Box::new(*tx.hash()))) - } - - let tx = QueuedPoolTransaction::new(tx, self.pending.provided_dependencies()); - trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash(), tx); - - // If all ids are not satisfied import to queued - if !tx.is_satisfied() { - let hash = *tx.transaction.hash(); - self.queued.add_transaction(tx)?; - return Ok(AddedTransaction::Queued { hash }) - } - self.add_pending_transaction(tx) - } - - /// Adds the transaction to the pending pool. - /// - /// This will also move all transaction that get unlocked by the dependency id this transaction - /// provides from the queued pool into the pending pool. - /// - /// CAUTION: this expects that transaction's dependencies are fully satisfied - fn add_pending_transaction( - &mut self, - tx: QueuedPoolTransaction, - ) -> PoolResult> { - let hash = *tx.transaction.hash(); - trace!(target: "txpool", "adding pending transaction [{:?}]", hash); - let mut pending = AddedPendingTransaction::new(hash); - - // tracks all transaction that can be moved to the pending pool, starting the given - // transaction - let mut pending_transactions = VecDeque::from([tx]); - // tracks whether we're processing the given `tx` - let mut is_new_tx = true; - - // take first transaction from the list - while let Some(current_tx) = pending_transactions.pop_front() { - // also add the transaction that the current transaction unlocks - pending_transactions - .extend(self.queued.satisfy_and_unlock(¤t_tx.transaction.transaction_id)); - - let current_hash = *current_tx.transaction.hash(); - - // try to add the transaction to the ready pool - match self.pending.add_transaction(current_tx) { - Ok(replaced_transactions) => { - if !is_new_tx { - pending.promoted.push(current_hash); - } - // tx removed from ready pool - pending.removed.extend(replaced_transactions); - } - Err(err) => { - // failed to add transaction - if is_new_tx { - debug!(target: "txpool", "[{:?}] Failed to add tx: {:?}", current_hash, - err); - return Err(err) - } else { - pending.discarded.push(current_hash); - } - } - } - is_new_tx = false; - } - - // check for a cycle where importing a transaction resulted in pending transactions to be - // added while removing current transaction. in which case we move this transaction back to - // the pending queue - if pending.removed.iter().any(|tx| *tx.hash() == hash) { - self.pending.clear_transactions(&pending.promoted); - return Err(PoolError::CyclicTransaction) - } - - Ok(AddedTransaction::Pending(pending)) - } - - /// Prunes the transactions that provide the given dependencies. - /// - /// This will effectively remove those transactions that satisfy the dependencies. - /// And queued transactions might get promoted if the pruned dependencies unlock them. - pub fn prune_transactions( - &mut self, - dependencies: impl IntoIterator, - ) -> PruneResult { - let mut imports = vec![]; - let mut pruned = vec![]; - - for dependency in dependencies { - // mark as satisfied and store the transactions that got unlocked - imports.extend(self.queued.satisfy_and_unlock(&dependency)); - // prune transactions - pruned.extend(self.pending.remove_mined(dependency)); - } - - let mut promoted = vec![]; - let mut failed = vec![]; - for tx in imports { - let hash = *tx.transaction.hash(); - match self.add_pending_transaction(tx) { - Ok(res) => promoted.push(res), - Err(e) => { - warn!(target: "txpool", "Failed to promote tx [{:?}] : {:?}", hash, e); - failed.push(hash) - } - } - } - - PruneResult { pruned, failed, promoted } - } - - /// Remove the given transactions from the pool. - pub fn remove_invalid( - &mut self, - tx_hashes: Vec, - ) -> Vec>> { - // early exit in case there is no invalid transactions. - if tx_hashes.is_empty() { - return vec![] - } - trace!(target: "txpool", "Removing invalid transactions: {:?}", tx_hashes); - - let mut removed = self.pending.remove_with_dependencies(tx_hashes.clone(), None); - removed.extend(self.queued.remove(tx_hashes)); - - trace!(target: "txpool", "Removed invalid transactions: {:?}", removed); - - removed - } - - /// Returns the current size of the entire pool - pub fn size_of(&self) -> usize { - unimplemented!() - } - - /// Ensures that the transactions in the sub-pools are within the given bounds. - /// - /// If the current size exceeds the given bounds, the worst transactions are evicted from the - /// pool and returned. - pub fn enforce_size_limits(&mut self) { - unimplemented!() - } -} - -/// Represents the outcome of a prune -pub struct PruneResult { - /// a list of added transactions that a pruned marker satisfied - pub promoted: Vec>, - /// all transactions that failed to be promoted and now are discarded - pub failed: Vec, - /// all transactions that were pruned from the ready pool - pub pruned: Vec>>, -} - -impl fmt::Debug for PruneResult { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "PruneResult {{ ")?; - write!( - fmt, - "promoted: {:?}, ", - self.promoted.iter().map(|tx| *tx.hash()).collect::>() - )?; - write!(fmt, "failed: {:?}, ", self.failed)?; - write!( - fmt, - "pruned: {:?}, ", - self.pruned.iter().map(|tx| *tx.transaction.hash()).collect::>() - )?; - write!(fmt, "}}")?; - Ok(()) + pub(crate) fn ready_transactions(&self) -> BestTransactions { + self.pool.read().best_transactions() } } @@ -602,26 +347,6 @@ impl AddedPendingTransaction { } } -/// Stores relevant context about a sender. -#[derive(Debug, Clone)] -struct SenderInfo { - /// current nonce of the sender - state_nonce: u64, - /// Balance of the sender at the current point. - balance: U256, - /// How many transactions of this sender are currently in the pool. - num_transactions: u64, -} - -// === impl SenderInfo === - -impl SenderInfo { - /// Creates a new entry for an incoming, not yet tracked sender. - fn new_incoming(state_nonce: u64, balance: U256) -> Self { - Self { state_nonce, balance, num_transactions: 1 } - } -} - /// Represents a transaction that was added into the pool and its state #[derive(Debug, Clone)] pub enum AddedTransaction { diff --git a/crates/transaction-pool/src/pool/parked.rs b/crates/transaction-pool/src/pool/parked.rs new file mode 100644 index 000000000..398122cce --- /dev/null +++ b/crates/transaction-pool/src/pool/parked.rs @@ -0,0 +1,194 @@ +use crate::{identifier::TransactionId, PoolTransaction, ValidPoolTransaction}; +use fnv::FnvHashMap; +use std::{cmp::Ordering, collections::BTreeSet, ops::Deref, sync::Arc}; + +/// A pool of transaction that are currently parked and wait for external changes that eventually +/// move the transaction into the pending pool. +/// +/// This pool is a bijection: at all times each set contains the same transactions. +pub(crate) struct ParkedPool { + /// Keeps track of transactions inserted in the pool. + /// + /// This way we can determine when transactions where submitted to the pool. + submission_id: u64, + /// _All_ Transactions that are currently inside the pool grouped by their identifier. + by_id: FnvHashMap>, + /// All transactions sorted by their priority function. + best: BTreeSet>, +} + +// === impl QueuedPool === + +impl ParkedPool { + /// Adds a new transactions to the pending queue. + /// + /// # Panics + /// + /// If the transaction is already included. + pub(crate) fn add_transaction(&mut self, tx: Arc>) { + let id = *tx.id(); + assert!(!self.by_id.contains_key(&id), "transaction already included"); + let submission_id = self.next_id(); + + let transaction = ParkedPoolTransaction { submission_id, transaction: tx.into() }; + + self.by_id.insert(id, transaction.clone()); + self.best.insert(transaction); + } + + /// Removes the transaction from the pool + pub(crate) fn remove_transaction( + &mut self, + id: &TransactionId, + ) -> Option>> { + let tx = self.by_id.remove(id)?; + self.best.remove(&tx); + Some(tx.transaction.into()) + } + + fn next_id(&mut self) -> u64 { + let id = self.submission_id; + self.submission_id = self.submission_id.wrapping_add(1); + id + } +} + +impl Default for ParkedPool { + fn default() -> Self { + Self { submission_id: 0, by_id: Default::default(), best: Default::default() } + } +} + +/// Represents a transaction in this pool. +struct ParkedPoolTransaction { + /// Identifier that tags when transaction was submitted in the pool. + submission_id: u64, + /// Actual transaction. + transaction: T, +} + +impl Clone for ParkedPoolTransaction { + fn clone(&self) -> Self { + Self { submission_id: self.submission_id, transaction: self.transaction.clone() } + } +} + +impl Eq for ParkedPoolTransaction {} + +impl PartialEq for ParkedPoolTransaction { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl PartialOrd for ParkedPoolTransaction { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for ParkedPoolTransaction { + fn cmp(&self, other: &Self) -> Ordering { + // This compares by the transactions first, and only if two tx are equal this compares + // the unique `transaction_id`. + self.transaction + .cmp(&other.transaction) + .then_with(|| self.transaction.id().cmp(other.transaction.id())) + } +} + +/// Helper trait used for custom `Ord` wrappers around a transaction. +/// +/// This is effectively a wrapper for `Arc` with custom `Ord` implementation. +pub(crate) trait ParkedOrd: + Ord + + Clone + + From>> + + Into>> + + Deref>> +{ + /// The wrapper transaction type. + type Transaction: PoolTransaction; +} + +/// Helper macro to implement necessary conversions for `ParkedOrd` trait +macro_rules! impl_ord_wrapper { + ($name:ident) => { + impl Clone for $name { + fn clone(&self) -> Self { + Self(self.0.clone()) + } + } + + impl Eq for $name {} + + impl PartialEq for $name { + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } + } + + impl PartialOrd for $name { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } + } + impl Deref for $name { + type Target = Arc>; + + fn deref(&self) -> &Self::Target { + &self.0 + } + } + + impl ParkedOrd for $name { + type Transaction = T; + } + + impl From>> for $name { + fn from(value: Arc>) -> Self { + Self(value) + } + } + + impl From<$name> for Arc> { + fn from(value: $name) -> Arc> { + value.0 + } + } + }; +} + +/// A new type wrapper for [`ValidPoolTransaction`] +/// +/// This sorts transactions by their base fee. +/// +/// Caution: This assumes all transaction in the `BaseFee` sub-pool have a fee value. +pub struct BasefeeOrd(Arc>); + +impl_ord_wrapper!(BasefeeOrd); + +impl Ord for BasefeeOrd { + fn cmp(&self, other: &Self) -> Ordering { + match (self.0.transaction.max_fee_per_gas(), other.0.transaction.max_fee_per_gas()) { + (Some(fee), Some(other)) => fee.cmp(&other), + (None, Some(other)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + _ => Ordering::Equal, + } + } +} + +/// A new type wrapper for [`ValidPoolTransaction`] +/// +/// This sorts transactions by their distance. +pub struct QueuedOrd(Arc>); + +impl_ord_wrapper!(QueuedOrd); + +impl Ord for QueuedOrd { + fn cmp(&self, other: &Self) -> Ordering { + // TODO ideally compare by distance here. + Ordering::Equal + } +} diff --git a/crates/transaction-pool/src/pool/pending.rs b/crates/transaction-pool/src/pool/pending.rs index 11eb6f3f7..0030f42a2 100644 --- a/crates/transaction-pool/src/pool/pending.rs +++ b/crates/transaction-pool/src/pool/pending.rs @@ -1,68 +1,46 @@ use crate::{ - error::PoolResult, identifier::TransactionId, pool::queued::QueuedPoolTransaction, - traits::BestTransactions, validate::ValidPoolTransaction, TransactionOrdering, + identifier::TransactionId, pool::best::BestTransactions, TransactionOrdering, + ValidPoolTransaction, }; -use parking_lot::RwLock; -use reth_primitives::{TxHash, U256}; +use reth_primitives::rpc::TxHash; use std::{ cmp::Ordering, - collections::{BTreeSet, HashMap, HashSet}, + collections::{BTreeMap, BTreeSet, HashSet}, sync::Arc, }; use tracing::debug; -/// Type alias for replaced transactions -pub(crate) type ReplacedTransactions = - (Vec::Transaction>>>, Vec); - -/// A pool of validated transactions that are ready on the current state and are waiting to be -/// included in a block. -/// -/// Each transaction in this pool is valid on its own, i.e. they are not dependent on transaction -/// that must be executed first. Each of these transaction can be executed independently on the -/// current state -pub(crate) struct PendingTransactions { +/// A pool of validated and gapless transactions that are ready on the current state and are waiting +/// to be included in a block. +pub(crate) struct PendingPool { + /// How to order transactions. + ordering: Arc, /// Keeps track of transactions inserted in the pool. /// /// This way we can determine when transactions where submitted to the pool. - id: u64, - /// How to order transactions. - ordering: Arc, - /// Base fee of the next block. - pending_base_fee: U256, - /// Dependencies that are provided by `PendingTransaction`s - provided_dependencies: HashMap, - /// Pending transactions that are currently on hold until the `baseFee` of the pending block - /// changes in favor of the parked transactions: the `pendingBlock.baseFee` must decrease - /// before they can be moved to the ready pool and are ready to be executed. - parked: ParkedTransactions, - /// All Transactions that are currently ready. - /// - /// Meaning, there are no nonce gaps in these transactions and all of them satisfy the - /// `baseFee` condition: transaction `maxFeePerGas >= pendingBlock.baseFee` - ready_transactions: Arc>>>, + submission_id: u64, + /// _All_ Transactions that are currently inside the pool grouped by their identifier. + by_id: BTreeMap>>, /// Independent transactions that can be included directly and don't require other /// transactions. /// /// Sorted by their scoring value. - independent_transactions: BTreeSet>, + independent_transactions: BTreeSet>, } -// === impl PendingTransactions === +// === impl PendingPool === -impl PendingTransactions { - /// Create a new pool instance +impl PendingPool { + /// Create a new pool instance. pub(crate) fn new(ordering: Arc) -> Self { Self { - id: 0, - provided_dependencies: Default::default(), - parked: Default::default(), - ready_transactions: Arc::new(Default::default()), ordering, + submission_id: 0, + by_id: Default::default(), independent_transactions: Default::default(), - pending_base_fee: Default::default(), } } + /// Returns an iterator over all transactions that are _currently_ ready. /// /// 1. The iterator _always_ returns transaction in order: It never returns a transaction with @@ -81,303 +59,88 @@ impl PendingTransactions { /// provides a way to mark transactions that the consumer of this iterator considers invalid. In /// which case the transaction's subgraph is also automatically marked invalid, See (1.). /// Invalid transactions are skipped. - pub(crate) fn get_transactions(&self) -> TransactionsIterator { - TransactionsIterator { - all: self.ready_transactions.read().clone(), + pub(crate) fn best(&self) -> BestTransactions { + BestTransactions { + all: self.by_id.clone(), independent: self.independent_transactions.clone(), - awaiting: Default::default(), invalid: Default::default(), } } - /// Sets the given base fee and returns the old one. - pub(crate) fn set_next_base_fee(&mut self, base_fee: U256) -> U256 { - std::mem::replace(&mut self.pending_base_fee, base_fee) - } - - /// Returns true if the transaction is part of the queue. - pub(crate) fn contains(&self, hash: &TxHash) -> bool { - self.ready_transactions.read().contains_key(hash) - } - - /// Returns the transaction for the hash if it's in the ready pool but not yet mined - pub(crate) fn get(&self, hash: &TxHash) -> Option> { - self.ready_transactions.read().get(hash).cloned() - } - - pub(crate) fn provided_dependencies(&self) -> &HashMap { - &self.provided_dependencies - } - - fn next_id(&mut self) -> u64 { - let id = self.id; - self.id = self.id.wrapping_add(1); - id + /// Returns the ancestor the given transaction, the transaction with `nonce - 1`. + /// + /// Note: for a transaction with nonce higher than the current on chain nonce this will always + /// return an ancestor since all transaction in this pool are gapless. + fn ancestor(&self, id: &TransactionId) -> Option<&Arc>> { + self.by_id.get(&id.unchecked_ancestor()?) } /// Adds a new transactions to the pending queue. /// - /// Depending on the transaction's feeCap, this will either move it into the ready queue or park - /// it until a future baseFee unlocks it. - /// /// # Panics /// - /// if the pending transaction is not ready - /// or the transaction is already included - pub(crate) fn add_transaction( - &mut self, - tx: QueuedPoolTransaction, - ) -> PoolResult>>> { - assert!(tx.is_satisfied(), "transaction must be ready",); - assert!( - !self.ready_transactions.read().contains_key(tx.transaction.hash()), - "transaction already included" - ); - - let (replaced_tx, unlocks) = self.replaced_transactions(&tx.transaction)?; + /// if the transaction is already included + pub(crate) fn add_transaction(&mut self, tx: Arc>) { + assert!(!self.by_id.contains_key(tx.id()), "transaction already included"); + let tx_id = *tx.id(); let submission_id = self.next_id(); - let hash = *tx.transaction.hash(); - let mut independent = true; - let mut requires_offset = 0; - let mut ready = self.ready_transactions.write(); + let priority = self.ordering.priority(&tx.transaction); - // Add links to transactions that unlock the current one - for dependency in &tx.transaction.depends_on { - // Check if the transaction that satisfies the mark is still in the queue. - if let Some(other) = self.provided_dependencies.get(dependency) { - let tx = ready.get_mut(other).expect("hash included;"); - tx.unlocks.push(hash); - // tx still depends on other tx - independent = false; - } else { - requires_offset += 1; - } - } + let transaction = PendingTransactionRef { submission_id, transaction: tx, priority }; - // update dependencies - self.provided_dependencies.insert(tx.transaction.transaction_id, hash); - - let priority = self.ordering.priority(&tx.transaction.transaction); - - let transaction = - PoolTransactionRef { submission_id, transaction: tx.transaction, priority }; - - // TODO check basefee requirement - - // add to the independent set - if independent { + // If there's __no__ ancestor in the pool, then this transaction is independent, this is + // guaranteed because this pool is gapless. + if self.ancestor(&tx_id).is_none() { self.independent_transactions.insert(transaction.clone()); } - // insert to ready queue - ready.insert(hash, PendingTransaction { transaction, unlocks, requires_offset }); + let transaction = Arc::new(PendingTransaction { transaction }); - Ok(replaced_tx) + self.by_id.insert(tx_id, transaction); } - /// Removes and returns those transactions that got replaced by the `tx` - fn replaced_transactions( - &mut self, - tx: &ValidPoolTransaction, - ) -> PoolResult> { - // check if we are replacing transactions - let remove_hashes: HashSet<_> = - if let Some(hash) = self.provided_dependencies.get(&tx.transaction_id) { - HashSet::from([hash]) - } else { - return Ok((Vec::new(), Vec::new())) - }; - - // early exit if we are not replacing anything. - if remove_hashes.is_empty() { - return Ok((Vec::new(), Vec::new())) - } - - // check if we're replacing the same transaction and if it can be replaced - - let mut unlocked_tx = Vec::new(); - { - // construct a list of unlocked transactions - // also check for transactions that shouldn't be replaced because underpriced - let ready = self.ready_transactions.read(); - for to_remove in remove_hashes.iter().filter_map(|hash| ready.get(hash)) { - // if we're attempting to replace a transaction that provides the exact same - // dependencies (addr + nonce) then we check for gas price - if to_remove.id().eq(&tx.transaction_id) { - // check if underpriced - // TODO check if underpriced - // if tx.pending_transaction.transaction.gas_price() <= to_remove.gas_price() { - // warn!(target: "txpool", "ready replacement transaction underpriced - // [{:?}]", tx.hash()); return - // Err(PoolError::ReplacementUnderpriced(Box::new(tx.clone()))) - // } else { - // trace!(target: "txpool", "replacing ready transaction [{:?}] with higher - // gas price [{:?}]", to_remove.transaction.transaction.hash(), tx.hash()); - // } - } - - unlocked_tx.extend(to_remove.unlocks.iter().cloned()) - } - } - - let remove_hashes = remove_hashes.into_iter().copied().collect::>(); - - let new_provides = HashSet::from([tx.transaction_id]); - let removed_tx = self.remove_with_dependencies(remove_hashes, Some(new_provides)); - - Ok((removed_tx, unlocked_tx)) - } - - /// Removes the transactions from the ready queue and returns the removed transactions. - /// This will also remove all transactions that depend on those. - pub(crate) fn clear_transactions( - &mut self, - tx_hashes: &[TxHash], - ) -> Vec>> { - self.remove_with_dependencies(tx_hashes.to_vec(), None) - } - - /// Removes the transactions that was mined. + /// Removes a _mined_ transaction from the pool. /// - /// This will also remove all transactions that lead to the transaction that provides the - /// id. - pub(crate) fn remove_mined( - &mut self, - id: TransactionId, - ) -> Vec>> { - let mut removed_tx = vec![]; + /// If the transactions has a descendant transaction it will advance it to the best queue. + pub(crate) fn remove_mined(&mut self, id: &TransactionId) { + if let Some(tx) = self.by_id.remove(id) { + self.independent_transactions.remove(&tx.transaction); - // the dependencies to remove - let mut remove = vec![id]; - - while let Some(dependency) = remove.pop() { - let res = self - .provided_dependencies - .remove(&dependency) - .and_then(|hash| self.ready_transactions.write().remove(&hash)); - - if let Some(tx) = res { - let unlocks = tx.unlocks; - self.independent_transactions.remove(&tx.transaction); - let tx = tx.transaction.transaction; - - // also remove previous transactions - { - let hash = tx.hash(); - let mut ready = self.ready_transactions.write(); - - let mut previous_dependency = |dependency| -> Option> { - let prev_hash = self.provided_dependencies.get(dependency)?; - let tx2 = ready.get_mut(prev_hash)?; - // remove hash - if let Some(idx) = tx2.unlocks.iter().position(|i| i == hash) { - tx2.unlocks.swap_remove(idx); - } - if tx2.unlocks.is_empty() { - Some(vec![tx2.transaction.transaction.transaction_id]) - } else { - None - } - }; - - // find previous transactions - for dep in &tx.depends_on { - if let Some(mut dependency_to_remove) = previous_dependency(dep) { - remove.append(&mut dependency_to_remove); - } - } - } - - // add the transactions that just got unlocked to independent set - for hash in unlocks { - if let Some(tx) = self.ready_transactions.write().get_mut(&hash) { - tx.requires_offset += 1; - if tx.requires_offset == tx.transaction.transaction.depends_on.len() { - self.independent_transactions.insert(tx.transaction.clone()); - } - } - } - // finally, remove the dependencies that this transaction provides - let current_dependency = &dependency; - let removed = self.provided_dependencies.remove(&tx.transaction_id); - assert_eq!( - removed.as_ref(), - if current_dependency.eq(&tx.transaction_id) { None } else { Some(tx.hash()) }, - "The pool contains exactly one transaction providing given tag; the removed transaction - claims to provide that tag, so it has to be mapped to it's hash; qed" - ); - removed_tx.push(tx); + // mark the next as independent if it exists + if let Some(unlocked) = self.by_id.get(&id.descendant()) { + self.independent_transactions.insert(unlocked.transaction.clone()); } } - - removed_tx } - /// Removes transactions and those that depend on them and satisfy at least one dependency in - /// the given filter set. - pub(crate) fn remove_with_dependencies( + /// Removes the transaction from the pool. + pub(crate) fn remove_transaction( &mut self, - mut tx_hashes: Vec, - dependency_filter: Option>, - ) -> Vec>> { - let mut removed = Vec::new(); - let mut ready = self.ready_transactions.write(); + id: &TransactionId, + ) -> Option>> { + let tx = self.by_id.remove(id)?; + self.independent_transactions.remove(&tx.transaction); + Some(tx.transaction.transaction.clone()) + } - while let Some(hash) = tx_hashes.pop() { - if let Some(mut tx) = ready.remove(&hash) { - let id = &tx.transaction.transaction.transaction_id; + fn next_id(&mut self) -> u64 { + let id = self.submission_id; + self.submission_id = self.submission_id.wrapping_add(1); + id + } - // remove the transactions - let removed_transaction = if dependency_filter - .as_ref() - .map(|filter| !filter.contains(id)) - .unwrap_or(true) - { - self.provided_dependencies.remove(id); - true - } else { - false - }; - - // remove from unlocks - for dependency in &tx.transaction.transaction.depends_on { - if let Some(hash) = self.provided_dependencies.get(dependency) { - if let Some(tx) = ready.get_mut(hash) { - if let Some(idx) = tx.unlocks.iter().position(|i| i == hash) { - tx.unlocks.swap_remove(idx); - } - } - } - } - - // remove from the independent set - self.independent_transactions.remove(&tx.transaction); - - if removed_transaction { - // remove all transactions that the current one unlocks - tx_hashes.append(&mut tx.unlocks); - } - - // remove transaction - removed.push(tx.transaction.transaction); - } - } - - removed + /// Returns the transaction for the id if it's in the pool but not yet mined. + pub(crate) fn get(&self, id: &TransactionId) -> Option>> { + self.by_id.get(id).cloned() } } /// A transaction that is ready to be included in a block. -#[derive(Debug)] pub(crate) struct PendingTransaction { /// Reference to the actual transaction. - transaction: PoolTransactionRef, - /// Tracks the transactions that get unlocked by this transaction. - unlocks: Vec, - /// Amount of required dependencies that are inherently provided. - requires_offset: usize, + pub(crate) transaction: PendingTransactionRef, } // == impl PendingTransaction === @@ -391,50 +154,57 @@ impl PendingTransaction { impl Clone for PendingTransaction { fn clone(&self) -> Self { - Self { - transaction: self.transaction.clone(), - unlocks: self.unlocks.clone(), - requires_offset: self.requires_offset, - } + Self { transaction: self.transaction.clone() } } } -/// A reference to a transaction in the _pending_ pool -#[derive(Debug)] -pub(crate) struct PoolTransactionRef { - /// Actual transaction. - pub(crate) transaction: Arc>, +/// A transaction that is ready to be included in a block. +pub(crate) struct PendingTransactionRef { /// Identifier that tags when transaction was submitted in the pool. pub(crate) submission_id: u64, + /// Actual transaction. + pub(crate) transaction: Arc>, /// The priority value assigned by the used `Ordering` function. pub(crate) priority: T::Priority, } -impl Clone for PoolTransactionRef { +impl PendingTransactionRef { + /// The next transaction of the sender: `nonce + 1` + pub(crate) fn unlocks(&self) -> TransactionId { + self.transaction.transaction_id.descendant() + } + + /// The hash for this transaction + pub(crate) fn hash(&self) -> &TxHash { + self.transaction.hash() + } +} + +impl Clone for PendingTransactionRef { fn clone(&self) -> Self { Self { - transaction: Arc::clone(&self.transaction), submission_id: self.submission_id, + transaction: Arc::clone(&self.transaction), priority: self.priority.clone(), } } } -impl Eq for PoolTransactionRef {} +impl Eq for PendingTransactionRef {} -impl PartialEq for PoolTransactionRef { +impl PartialEq for PendingTransactionRef { fn eq(&self, other: &Self) -> bool { self.cmp(other) == Ordering::Equal } } -impl PartialOrd for PoolTransactionRef { +impl PartialOrd for PendingTransactionRef { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } -impl Ord for PoolTransactionRef { +impl Ord for PendingTransactionRef { fn cmp(&self, other: &Self) -> Ordering { // This compares by `priority` and only if two tx have the exact same priority this compares // the unique `submission_id`. This ensures that transactions with same priority are not @@ -444,163 +214,3 @@ impl Ord for PoolTransactionRef { .then_with(|| other.submission_id.cmp(&self.submission_id)) } } - -/// Pending Transactions that are currently parked until their set baseFee becomes valid -struct ParkedTransactions { - /// Keeps track of transactions inserted in the pool. - /// - /// This way we can determine when transactions where submitted to the pool. - id: u64, - /// All transactions that are currently parked due to their fee. - parked_transactions: HashMap>, - /// Same transactions but sorted by their fee and priority - sorted_transactions: BTreeSet>, -} - -impl Default for ParkedTransactions { - fn default() -> Self { - Self { - id: 0, - parked_transactions: Default::default(), - sorted_transactions: Default::default(), - } - } -} - -/// A transaction that is ready to be included in a block. -#[derive(Debug, Clone)] -pub(crate) struct ParkedTransaction { - /// Reference to the actual transaction. - transaction: PoolTransactionRef, - /// Tracks the transactions that get unlocked by this transaction. - unlocks: Vec, - /// Amount of required dependencies that are inherently provided - requires_offset: usize, -} - -/// A reference to a currently _parked_ transaction. -struct ParkedTransactionRef { - /// Actual transaction. - transaction: Arc>, - /// Identifier that tags when transaction was submitted in the pool. - submission_id: u64, - /// The priority value assigned by the used `Ordering` function. - priority: T::Priority, - /// EIP-1559 Max base fee the caller is willing to pay. - max_fee_per_gas: U256, -} - -impl Eq for ParkedTransactionRef {} - -impl PartialEq for ParkedTransactionRef { - fn eq(&self, other: &Self) -> bool { - self.cmp(other) == Ordering::Equal - } -} - -impl PartialOrd for ParkedTransactionRef { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for ParkedTransactionRef { - fn cmp(&self, other: &Self) -> Ordering { - // This compares the `max_fee_per_gas` value of the transaction - self.max_fee_per_gas - .cmp(&other.max_fee_per_gas) - .then_with(|| self.priority.cmp(&other.priority)) - .then_with(|| other.submission_id.cmp(&self.submission_id)) - } -} - -/// An iterator that returns transactions that can be executed on the current state. -pub struct TransactionsIterator { - all: HashMap>, - awaiting: HashMap)>, - independent: BTreeSet>, - invalid: HashSet, -} - -// == impl TransactionsIterator == - -impl TransactionsIterator { - /// Mark the transaction as invalid. - /// - /// As a consequence, all values that depend on the invalid one will be skipped. - /// When given transaction is not in the pool it has no effect. - pub(crate) fn mark_invalid(&mut self, tx: &Arc>) { - if let Some(invalid_transaction) = self.all.get(tx.hash()) { - debug!( - target: "txpool", - "[{:?}] Marked as invalid", - invalid_transaction.transaction.transaction.hash() - ); - for hash in &invalid_transaction.unlocks { - self.invalid.insert(*hash); - } - } - } - - /// Depending on number of satisfied requirements insert given ref - /// either to awaiting set or to best set. - fn independent_or_awaiting(&mut self, satisfied: usize, tx_ref: PoolTransactionRef) { - if satisfied >= tx_ref.transaction.depends_on.len() { - // If we have satisfied all deps insert to the best set - self.independent.insert(tx_ref); - } else { - // otherwise we're still waiting for some deps - self.awaiting.insert(*tx_ref.transaction.hash(), (satisfied, tx_ref)); - } - } -} - -impl BestTransactions for TransactionsIterator { - fn mark_invalid(&mut self, tx: &Self::Item) { - TransactionsIterator::mark_invalid(self, tx) - } -} - -impl Iterator for TransactionsIterator { - type Item = Arc>; - - fn next(&mut self) -> Option { - loop { - let best = self.independent.iter().next_back()?.clone(); - let best = self.independent.take(&best)?; - let hash = best.transaction.hash(); - - // skip transactions that were marked as invalid - if self.invalid.contains(hash) { - debug!( - target: "txpool", - "[{:?}] skipping invalid transaction", - hash - ); - continue - } - - let ready = - if let Some(ready) = self.all.get(hash).cloned() { ready } else { continue }; - - // Insert transactions that just got unlocked. - for hash in &ready.unlocks { - // first check local awaiting transactions - let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) { - satisfied += 1; - Some((satisfied, tx_ref)) - // then get from the pool - } else { - self.all - .get(hash) - .map(|next| (next.requires_offset + 1, next.transaction.clone())) - }; - if let Some((satisfied, tx_ref)) = res { - self.independent_or_awaiting(satisfied, tx_ref) - } - } - - return Some(best.transaction) - } - } -} diff --git a/crates/transaction-pool/src/pool/queued.rs b/crates/transaction-pool/src/pool/queued.rs deleted file mode 100644 index 284542680..000000000 --- a/crates/transaction-pool/src/pool/queued.rs +++ /dev/null @@ -1,232 +0,0 @@ -use crate::{ - error::PoolResult, identifier::TransactionId, traits::PoolTransaction, - validate::ValidPoolTransaction, TransactionOrdering, -}; -use reth_primitives::TxHash; -use std::{ - collections::{HashMap, HashSet}, - fmt, - sync::Arc, - time::Instant, -}; - -/// A pool of transactions that are not ready on the current state and are waiting for state changes -/// that turn them valid. -/// -/// This could include transactions with nonce gaps: Transactions that are waiting until for a -/// transaction to arrive that closes the nonce gap. -/// -/// Keeps a set of transactions that are waiting until their dependencies are unlocked. -pub(crate) struct QueuedTransactions { - /// Dependencies that aren't yet provided by any transaction. - required_dependencies: HashMap>, - /// Mapping of the dependencies of a transaction to the hash of the transaction, - waiting_dependencies: HashMap, - /// Transactions that are not ready yet are waiting for another tx to finish, - waiting_queue: HashMap>, -} - -// == impl QueuedTransactions == - -impl QueuedTransactions { - /// Returns the number of transactions that are currently waiting in this pool for new - /// transactions to satisfy their dependencies. - pub(crate) fn len(&self) -> usize { - self.waiting_queue.len() - } - - /// Whether this pool is empty. - pub(crate) fn is_empty(&self) -> bool { - self.waiting_queue.is_empty() - } - - /// Returns an iterator over all transactions waiting in this pool. - pub(crate) fn transactions( - &self, - ) -> impl Iterator>> + '_ { - self.waiting_queue.values().map(|tx| Arc::clone(&tx.transaction)) - } - - /// Adds a transaction to the queue of transactions - pub(crate) fn add_transaction(&mut self, tx: QueuedPoolTransaction) -> PoolResult<()> { - assert!(!tx.is_satisfied(), "transaction must not be ready"); - assert!( - !self.waiting_queue.contains_key(tx.transaction.hash()), - "transaction is already added" - ); - - if let Some(_replace) = self - .waiting_dependencies - .get(&tx.transaction.transaction_id) - .and_then(|hash| self.waiting_queue.get(hash)) - { - // TODO handle transaction underpriced - // // check if underpriced - // if tx.transaction.gas_price() < replace.transaction.gas_price() { - // warn!(target: "txpool", "pending replacement transaction underpriced [{:?}]", - // tx.transaction.hash()); return Err(Error::ReplacementUnderpriced) - // } - } - - // add all missing dependencies - for dependency in &tx.missing_dependencies { - self.required_dependencies - .entry(*dependency) - .or_default() - .insert(*tx.transaction.hash()); - } - - // also track identifying dependencies - self.waiting_dependencies.insert(tx.transaction.transaction_id, *tx.transaction.hash()); - - // add tx to the queue - self.waiting_queue.insert(*tx.transaction.hash(), tx); - - Ok(()) - } - - /// Returns true if given transaction is part of the queue - pub(crate) fn contains(&self, hash: &TxHash) -> bool { - self.waiting_queue.contains_key(hash) - } - - /// Returns the transaction for the hash if it's waiting - pub(crate) fn get(&self, tx_hash: &TxHash) -> Option<&QueuedPoolTransaction> { - self.waiting_queue.get(tx_hash) - } - - /// Returns the transactions for the given hashes, `None` if no transaction exists - pub(crate) fn get_all( - &self, - tx_hashes: &[TxHash], - ) -> Vec>>> { - tx_hashes - .iter() - .map(|hash| self.waiting_queue.get(hash).map(|tx| Arc::clone(&tx.transaction))) - .collect() - } - - /// This will check off the dependencies of queued transactions. - /// - /// Returns the those transactions that become unlocked (all dependencies checked) and can be - /// moved to the ready queue. - pub(crate) fn satisfy_and_unlock( - &mut self, - id: &TransactionId, - ) -> Vec> { - let mut unlocked_ready = Vec::new(); - if let Some(tx_hashes) = self.required_dependencies.remove(id) { - for hash in tx_hashes { - let tx = self.waiting_queue.get_mut(&hash).expect("tx is included;"); - tx.satisfy(id); - if tx.is_satisfied() { - let tx = self.waiting_queue.remove(&hash).expect("tx is included;"); - self.waiting_dependencies.remove(&tx.transaction.transaction_id); - unlocked_ready.push(tx); - } - } - } - - unlocked_ready - } - - /// Removes the transactions associated with the given hashes - /// - /// Returns all removed transactions. - pub(crate) fn remove( - &mut self, - hashes: Vec, - ) -> Vec>> { - let mut removed = vec![]; - for hash in hashes { - if let Some(waiting_tx) = self.waiting_queue.remove(&hash) { - self.waiting_dependencies.remove(&waiting_tx.transaction.transaction_id); - for dependency in waiting_tx.missing_dependencies { - let remove = - if let Some(required) = self.required_dependencies.get_mut(&dependency) { - required.remove(&hash); - required.is_empty() - } else { - false - }; - if remove { - self.required_dependencies.remove(&dependency); - } - } - removed.push(waiting_tx.transaction) - } - } - removed - } -} - -/// A transaction submitted to the pool. -#[derive(Clone)] -pub(crate) struct QueuedPoolTransaction { - /// The actual validated transaction. - pub(crate) transaction: Arc>, - /// Transactions required for and have not been satisfied yet by other transactions in the - /// pool. - /// - /// This will be an empty list if there are no nonce gaps across multiple transactions of the - /// same sender in the pool. If there are gaps, this will include the missing transactions. - pub(crate) missing_dependencies: HashSet, - /// Timestamp when the tx was added. - pub(crate) added_at: Instant, -} - -impl Default for QueuedTransactions { - fn default() -> Self { - Self { - required_dependencies: Default::default(), - waiting_dependencies: Default::default(), - waiting_queue: Default::default(), - } - } -} - -// === impl QuQueuedPoolTransaction === - -impl QueuedPoolTransaction { - /// Creates a new `QueuedPoolTransaction`. - /// - /// Determines the dependent transaction that are still missing before this transaction can be - /// moved to the queue. - pub(crate) fn new( - transaction: ValidPoolTransaction, - provided: &HashMap, - ) -> Self { - let missing_dependencies = transaction - .depends_on - .iter() - .filter(|id| { - // is true if the dependency id is already satisfied either via transaction in the - // pool - !provided.contains_key(&**id) - }) - .cloned() - .collect(); - - Self { transaction: Arc::new(transaction), missing_dependencies, added_at: Instant::now() } - } - - /// Removes the required dependency. - pub(crate) fn satisfy(&mut self, id: &TransactionId) { - self.missing_dependencies.remove(id); - } - - /// Returns true if transaction has all dependencies are satisfied. - pub(crate) fn is_satisfied(&self) -> bool { - self.missing_dependencies.is_empty() - } -} - -impl fmt::Debug for QueuedPoolTransaction { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(fmt, "QueuedPoolTransaction {{ ")?; - write!(fmt, "added_at: {:?}, ", self.added_at)?; - write!(fmt, "tx: {:?}, ", self.transaction)?; - write!(fmt, "missing_dependencies: [{:?}]", &self.missing_dependencies)?; - write!(fmt, "}}") - } -} diff --git a/crates/transaction-pool/src/pool/state.rs b/crates/transaction-pool/src/pool/state.rs new file mode 100644 index 000000000..42e6e7891 --- /dev/null +++ b/crates/transaction-pool/src/pool/state.rs @@ -0,0 +1,77 @@ +bitflags::bitflags! { + /// Marker to represents the current state of a transaction in the pool and from which the corresponding sub-pool is derived, depending on what bits are set. + /// + /// This mirrors [erigon's ephemeral state field](https://github.com/ledgerwatch/erigon/wiki/Transaction-Pool-Design#ordering-function). + #[derive(Default)] + pub(crate) struct TxState: u8 { + /// Set to `1` if the `feeCap` of the transaction meets the chain's minimum `feeCap` requirement. + /// + /// This is different from `ENOUGH_FEE_CAP_BLOCK` which tracks on a per-block basis. + const ENOUGH_FEE_CAP_PROTOCOL = 0b100000; + /// Set to `1` of the transaction is either the next transaction of the sender (on chain nonce == tx.nonce) or all prior transactions are also present in the pool. + const NO_NONCE_GAPS = 0b010000; + /// Bit derived from the sender's balance. + /// + /// Set to `1` if the sender's balance can cover the maximum cost for this transaction (`feeCap * gasLimit + value`). + /// This includes cumulative costs of prior transactions, which ensures that the sender has enough funds for all max cost of prior transactions. + const ENOUGH_BALANCE = 0b001000; + /// Bit set to true if the transaction has a lower gas limit than the block's gas limit + const NOT_TOO_MUCH_GAS = 0b000100; + /// Covers the Dynamic fee requirement. + /// + /// Set to 1 if `feeCap` of the transaction meets the requirement of the pending block. + const ENOUGH_FEE_CAP_BLOCK = 0b000010; + const IS_LOCAL = 0b000001; + + const BASE_FEE_POOL_BITS = Self::ENOUGH_FEE_CAP_PROTOCOL.bits | Self::NO_NONCE_GAPS.bits | Self::ENOUGH_BALANCE.bits | Self::NOT_TOO_MUCH_GAS.bits; + + const QUEUED_POOL_BITS = Self::ENOUGH_FEE_CAP_PROTOCOL.bits; + } +} + +/// Identifier for the used Sub-pool +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] +#[repr(u8)] +pub(crate) enum SubPool { + Queued = 0, + Pending, + BaseFee, +} + +// === impl PoolDestination === + +impl SubPool { + /// Whether this transaction is to be moved to the pending sub-pool. + pub(crate) fn is_pending(&self) -> bool { + matches!(self, SubPool::Pending) + } + + /// Returns whether this is a promotion depending on the current sub-pool location. + pub(crate) fn is_promoted(&self, other: SubPool) -> bool { + self > &other + } +} + +impl From for SubPool { + fn from(value: TxState) -> Self { + if value > TxState::BASE_FEE_POOL_BITS { + return SubPool::Pending + } + if value < TxState::QUEUED_POOL_BITS { + return SubPool::Queued + } + SubPool::BaseFee + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_tx_state() { + let mut state = TxState::default(); + state |= TxState::NO_NONCE_GAPS; + assert!(state.intersects(TxState::NO_NONCE_GAPS)) + } +} diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs new file mode 100644 index 000000000..20fbc0b63 --- /dev/null +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -0,0 +1,864 @@ +//! The internal transaction pool implementation. +use crate::{ + error::PoolError, + identifier::{SenderId, TransactionId}, + pool::{ + best::BestTransactions, + parked::{BasefeeOrd, ParkedPool, QueuedOrd}, + pending::PendingPool, + state::{SubPool, TxState}, + AddedPendingTransaction, AddedTransaction, + }, + PoolResult, PoolTransaction, TransactionOrdering, ValidPoolTransaction, U256, +}; +use fnv::FnvHashMap; +use reth_primitives::TxHash; +use std::{ + collections::{btree_map::Entry, hash_map, BTreeMap, HashMap}, + fmt, + ops::Bound::{Excluded, Included, Unbounded}, + sync::Arc, +}; + +/// A pool that manages transactions. +/// +/// This pool maintains the state of all transactions and stores them accordingly. +pub struct TxPool { + /// How to order transactions. + ordering: Arc, + /// Contains the currently known info + sender_info: FnvHashMap, + /// pending subpool + pending_pool: PendingPool, + /// queued subpool + /// + /// Holds all parked transactions that depend on external changes from the sender: + /// + /// - blocked by missing ancestor transaction (has nonce gaps) + /// - sender lacks funds to pay for this transaction. + queued_pool: ParkedPool>, + /// base fee subpool + /// + /// Holds all parked transactions that currently violate the dynamic fee requirement but could + /// be moved to pending if the base fee changes in their favor (decreases) in future blocks. + basefee_pool: ParkedPool>, + /// All transactions in the pool. + all_transactions: AllTransactions, +} + +impl TxPool { + /// Create a new graph pool instance. + pub fn new(ordering: Arc) -> Self { + Self { + sender_info: Default::default(), + pending_pool: PendingPool::new(ordering.clone()), + queued_pool: Default::default(), + basefee_pool: Default::default(), + all_transactions: Default::default(), + ordering, + } + } + /// Updates the pool based on the changed base fee. + /// + /// This enforces the dynamic fee requirement. + pub(crate) fn update_base_fee(&mut self, new_base_fee: U256) { + // TODO update according to the changed base_fee + todo!() + } + + /// Returns an iterator that yields transactions that are ready to be included in the block. + pub(crate) fn best_transactions(&self) -> BestTransactions { + self.pending_pool.best() + } + + /// Returns if the transaction for the given hash is already included in this pool + pub(crate) fn contains(&self, tx_hash: &TxHash) -> bool { + self.all_transactions.contains(tx_hash) + } + + /// Returns the transaction for the given hash. + pub(crate) fn get( + &self, + tx_hash: &TxHash, + ) -> Option>> { + self.all_transactions.by_hash.get(tx_hash).cloned() + } + + /// Adds the transaction into the pool. + /// + /// This pool consists of two three-pools: `Queued`, `Pending` and `BaseFee`. + /// + /// The `Queued` pool contains transactions with gaps in its dependency tree: It requires + /// additional transaction that are note yet present in the pool. And transactions that the + /// sender can not afford with the current balance. + /// + /// The `Pending` pool contains all transactions that have no nonce gaps, and can be afforded by + /// the sender. It only contains transactions that are ready to be included in the pending + /// block. + /// + /// The `BaseFee` pool contains transaction that currently can't satisfy the dynamic fee + /// requirement. With EIP-1559, transactions can become executable or not without any changes to + /// the sender's balance or nonce and instead their feeCap determines whether the + /// transaction is _currently_ (on the current state) ready or needs to be parked until the + /// feeCap satisfies the block's baseFee. + pub(crate) fn add_transaction( + &mut self, + tx: ValidPoolTransaction, + on_chain_balance: U256, + on_chain_nonce: u64, + ) -> PoolResult> { + // Update sender info + self.sender_info.entry(tx.sender_id).or_default().update(on_chain_nonce, on_chain_balance); + + let hash = *tx.hash(); + + match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) { + InsertResult::Inserted { transaction, move_to, replaced_tx, updates, .. } => { + self.add_new_transaction(transaction, replaced_tx, move_to); + let UpdateOutcome { promoted, discarded, removed } = self.process_updates(updates); + + // This transaction was moved to the pending pool. + let res = if move_to.is_pending() { + AddedTransaction::Pending(AddedPendingTransaction { + hash, + promoted, + discarded, + removed, + }) + } else { + AddedTransaction::Queued { hash } + }; + + Ok(res) + } + InsertResult::Underpriced { existing, .. } => { + Err(PoolError::AlreadyAdded(Box::new(existing))) + } + } + } + + /// Maintenance task to apply a series of updates. + /// + /// This will move/discard the given transaction according to the `PoolUpdate` + fn process_updates( + &mut self, + updates: impl IntoIterator, + ) -> UpdateOutcome { + let mut outcome = UpdateOutcome::default(); + for update in updates { + let PoolUpdate { id, hash, current, destination } = update; + match destination { + Destination::Discard => { + outcome.discarded.push(hash); + } + Destination::Pool(move_to) => { + debug_assert!(!move_to.eq(¤t), "destination must be different"); + self.move_transaction(current, move_to, &id); + } + } + } + outcome + } + + /// Moves a transaction from one sub pool to another. + /// + /// This will remove the given transaction from one sub-pool and insert it in the other + /// sub-pool. + fn move_transaction(&mut self, from: SubPool, to: SubPool, id: &TransactionId) { + if let Some(tx) = self.remove_transaction(from, id) { + self.add_transaction_to_pool(to, tx); + } + } + + /// Removes the transaction from the given pool + fn remove_transaction( + &mut self, + pool: SubPool, + tx: &TransactionId, + ) -> Option>> { + match pool { + SubPool::Queued => self.queued_pool.remove_transaction(tx), + SubPool::Pending => self.pending_pool.remove_transaction(tx), + SubPool::BaseFee => self.basefee_pool.remove_transaction(tx), + } + } + + /// Removes the transaction from the given pool + fn add_transaction_to_pool( + &mut self, + pool: SubPool, + tx: Arc>, + ) { + match pool { + SubPool::Queued => { + self.queued_pool.add_transaction(tx); + } + SubPool::Pending => { + self.pending_pool.add_transaction(tx); + } + SubPool::BaseFee => { + self.basefee_pool.add_transaction(tx); + } + } + } + + /// Inserts the transaction into the given sub-pool + fn add_new_transaction( + &mut self, + transaction: Arc>, + replaced: Option<(Arc>, SubPool)>, + pool: SubPool, + ) { + if let Some((replaced, replaced_pool)) = replaced { + // Remove the replaced transaction + self.remove_transaction(replaced_pool, replaced.id()); + } + + self.add_transaction_to_pool(pool, transaction) + } + + /// Returns the current size of the entire pool + pub fn size_of(&self) -> usize { + unimplemented!() + } + + /// Ensures that the transactions in the sub-pools are within the given bounds. + /// + /// If the current size exceeds the given bounds, the worst transactions are evicted from the + /// pool and returned. + pub fn enforce_size_limits(&mut self) { + unimplemented!() + } +} + +/// Container for _all_ transaction in the pool. +/// +/// This is the sole entrypoint that's guarding all sub-pools, all sub-pool actions are always +/// derived from this set. Updates returned from this type must be applied to the sub-pools. +pub struct AllTransactions { + /// Expected base fee for the pending block. + pending_basefee: U256, + /// Minimum base fee required by the protol. + /// + /// Transactions with a lower base fee will never be included by the chain + minimal_protocol_basefee: U256, + /// The max gas limit of the block + block_gas_limit: u64, + /// _All_ transactions identified by their hash. + by_hash: HashMap>>, + /// _All_ transaction in the pool sorted by their sender and nonce pair. + txs: BTreeMap>, + /// Tracks the number of transactions by sender that are currently in the pool. + tx_counter: FnvHashMap, +} + +impl AllTransactions { + /// Returns if the transaction for the given hash is already included in this pool + pub(crate) fn contains(&self, tx_hash: &TxHash) -> bool { + self.by_hash.contains_key(tx_hash) + } + + /// Returns the internal transaction with additional metadata + pub(crate) fn get(&self, id: &TransactionId) -> Option<&PoolInternalTransaction> { + self.txs.get(id) + } + + /// Increments the transaction counter for the sender + pub(crate) fn tx_inc(&mut self, sender: SenderId) { + let count = self.tx_counter.entry(sender).or_default(); + *count += 1; + } + + /// Decrements the transaction counter for the sender + pub(crate) fn tx_decr(&mut self, sender: SenderId) { + if let hash_map::Entry::Occupied(mut entry) = self.tx_counter.entry(sender) { + let count = entry.get_mut(); + if *count == 1 { + entry.remove(); + return + } + *count -= 1; + } + } + + /// Returns an iterator over all transactions for the given sender, starting with the lowest + /// nonce + pub(crate) fn txs_iter( + &self, + sender: SenderId, + ) -> impl Iterator)> + '_ { + self.txs + .range((sender.start_bound(), Unbounded)) + .take_while(move |(other, _)| sender == other.sender) + } + + /// Returns a mutable iterator over all transactions for the given sender, starting with the + /// lowest nonce + pub(crate) fn txs_iter_mut( + &mut self, + sender: SenderId, + ) -> impl Iterator)> + '_ { + self.txs + .range_mut((sender.start_bound(), Unbounded)) + .take_while(move |(other, _)| sender == other.sender) + } + + /// Returns all transactions that predates the given transaction. + /// + /// NOTE: The range is _inclusive_ + pub(crate) fn ancestor_txs<'a, 'b: 'a>( + &'a self, + id: &'b TransactionId, + ) -> impl Iterator)> + '_ { + self.txs + .range((Unbounded, Included(id))) + .rev() + .take_while(|(other, _)| id.sender == other.sender) + } + + /// Returns all mutable transactions that predates the given transaction. + /// + /// NOTE: The range is _inclusive_ + pub(crate) fn ancestor_txs_mut<'a, 'b: 'a>( + &'a mut self, + id: &'b TransactionId, + ) -> impl Iterator)> + '_ { + self.txs + .range_mut((Unbounded, Included(id))) + .rev() + .take_while(|(other, _)| id.sender == other.sender) + } + + /// Returns all transactions that predates the given transaction. + /// + /// NOTE: The range is _exclusive_: This does not return the transaction itself + pub(crate) fn ancestor_txs_exclusive<'a, 'b: 'a>( + &'a self, + id: &'b TransactionId, + ) -> impl Iterator)> + '_ { + self.txs.range(..id).rev().take_while(|(other, _)| id.sender == other.sender) + } + + /// Returns all transactions that _follow_ after the given id but have the same sender. + /// + /// NOTE: The range is _exclusive_ + pub(crate) fn descendant_txs_exclusive<'a, 'b: 'a>( + &'a self, + id: &'b TransactionId, + ) -> impl Iterator)> + '_ { + self.txs.range((Excluded(id), Unbounded)).take_while(|(other, _)| id.sender == other.sender) + } + /// Returns all transactions that _follow_ after the given id but have the same sender. + /// + /// NOTE: The range is _exclusive_ + pub(crate) fn descendant_txs_exclusive_mut<'a, 'b: 'a>( + &'a mut self, + id: &'b TransactionId, + ) -> impl Iterator)> + '_ { + self.txs + .range_mut((Excluded(id), Unbounded)) + .take_while(|(other, _)| id.sender == other.sender) + } + + /// Returns all transactions that _follow_ after the given id but have the same sender. + /// + /// NOTE: The range is _inclusive_: if the transaction that belongs to `id` it field be the + /// first value. + pub(crate) fn descendant_txs<'a, 'b: 'a>( + &'a self, + id: &'b TransactionId, + ) -> impl Iterator)> + '_ { + self.txs.range(id..).take_while(|(other, _)| id.sender == other.sender) + } + + /// Returns all mutable transactions that _follow_ after the given id but have the same sender. + /// + /// NOTE: The range is _inclusive_: if the transaction that belongs to `id` it field be the + /// first value. + pub(crate) fn descendant_txs_mut<'a, 'b: 'a>( + &'a mut self, + id: &'b TransactionId, + ) -> impl Iterator)> + '_ { + self.txs.range_mut(id..).take_while(|(other, _)| id.sender == other.sender) + } + + /// Removes a transaction from the pool after it was mined. + /// + /// This will not trigger additional updates since, because descendants without nonce gaps are + /// already in the pending pool. + pub(crate) fn remove_mined_tx(&mut self, _id: &TransactionId) { + // TODO decrease nonce gap + } + + /// Inserts a new transaction into the pool. + /// + /// If the transaction already exists, it will be replaced if not underpriced. + /// Returns info to which sub-pool the transaction should be moved. + /// Also returns a set of pool updates triggered by this insert, that need to be handled by the + /// caller. + /// + /// These can include: + /// - closing nonce gaps of descendant transactions + /// - enough balance updates + pub(crate) fn insert_tx( + &mut self, + transaction: ValidPoolTransaction, + on_chain_balance: U256, + on_chain_nonce: u64, + ) -> InsertResult { + assert!(on_chain_nonce <= transaction.nonce(), "Invalid transaction"); + + let tx_id = *transaction.id(); + let transaction = Arc::new(transaction); + let mut state = TxState::default(); + let mut cumulative_cost = U256::zero(); + let mut updates = Vec::new(); + + let predecessor = + TransactionId::ancestor(transaction.transaction.nonce(), on_chain_nonce, tx_id.sender); + + // If there's no predecessor then this is the next transaction + if predecessor.is_none() { + state.insert(TxState::NO_NONCE_GAPS); + } + + // Check dynamic fee + if let Some(fee) = transaction.max_fee_per_gas() { + if fee >= self.pending_basefee { + state.insert(TxState::ENOUGH_FEE_CAP_BLOCK); + } + if fee > self.minimal_protocol_basefee { + state.insert(TxState::ENOUGH_FEE_CAP_PROTOCOL); + } + } else { + // legacy transactions always satisfy the condition + state.insert(TxState::ENOUGH_FEE_CAP_BLOCK); + state.insert(TxState::ENOUGH_FEE_CAP_PROTOCOL); + } + + // Ensure tx does not exceed block gas limit + if transaction.gas_limit() < self.block_gas_limit { + state.insert(TxState::NOT_TOO_MUCH_GAS); + } + + let mut replaced_tx = None; + + let pool_tx = PoolInternalTransaction { + transaction: transaction.clone(), + subpool: SubPool::Queued, + state, + cumulative_cost, + }; + + // try to insert the transaction + match self.txs.entry(*transaction.id()) { + Entry::Vacant(entry) => { + // Insert the transaction in both maps + self.by_hash.insert(*pool_tx.transaction.hash(), pool_tx.transaction.clone()); + entry.insert(pool_tx); + } + Entry::Occupied(mut entry) => { + // Transaction already exists + // Ensure the new transaction is not underpriced + if transaction.is_underpriced(entry.get().transaction.as_ref()) { + return InsertResult::Underpriced { + transaction: pool_tx.transaction, + existing: *entry.get().transaction.hash(), + } + } + let new_hash = *pool_tx.transaction.hash(); + let new_transaction = pool_tx.transaction.clone(); + let replaced = entry.insert(pool_tx); + self.by_hash.remove(replaced.transaction.hash()); + self.by_hash.insert(new_hash, new_transaction); + // also remove the hash + replaced_tx = Some((replaced.transaction, replaced.subpool)); + } + } + + // The next transaction of this sender + let on_chain_id = TransactionId::new(transaction.sender_id, on_chain_nonce); + { + // Tracks the next nonce we expect if the transactions are gapless + let mut next_nonce = on_chain_id.nonce; + + // Traverse all transactions of the sender and update existing transactions + for (id, tx) in self.descendant_txs_mut(&on_chain_id) { + let current_pool = tx.subpool; + if next_nonce != id.nonce { + // nothing to update + break + } + + // close the nonce gap + tx.state.insert(TxState::NO_NONCE_GAPS); + + // set cumulative cost + tx.cumulative_cost = cumulative_cost; + + // Update for next transaction + cumulative_cost = tx.cumulative_cost + tx.transaction.cost; + + if cumulative_cost > on_chain_balance { + // sender lacks sufficient funds to pay for this transaction + tx.state.remove(TxState::ENOUGH_BALANCE); + } else { + tx.state.insert(TxState::ENOUGH_BALANCE); + } + + if tx_id.eq(id) { + // if it is the new transaction, track the state + state = tx.state; + } else { + tx.subpool = tx.state.into(); + if current_pool != tx.subpool { + updates.push(PoolUpdate { + id: *id, + hash: *tx.transaction.hash(), + current: current_pool, + destination: Destination::Pool(tx.subpool), + }) + } + } + + // increment for next iteration + next_nonce = id.next_nonce(); + } + } + + // If this wasn't a replacement transaction we need to update the counter. + if replaced_tx.is_none() { + self.tx_inc(tx_id.sender); + } + + InsertResult::Inserted { transaction, move_to: state.into(), state, replaced_tx, updates } + } + + /// Rechecks the transaction of the given sender and returns a set of updates. + pub(crate) fn on_mined(&mut self, _sender: &SenderId, _new_balance: U256, _old_balance: U256) { + todo!() + } + + /// Number of transactions in the entire pool + pub(crate) fn len(&self) -> usize { + self.txs.len() + } + + /// Whether the pool is empty + pub(crate) fn is_empty(&self) -> bool { + self.txs.is_empty() + } +} + +impl Default for AllTransactions { + fn default() -> Self { + Self { + pending_basefee: Default::default(), + // TODO(mattsse): document + minimal_protocol_basefee: 7u64.into(), + block_gas_limit: 30_000_000, + by_hash: Default::default(), + txs: Default::default(), + tx_counter: Default::default(), + } + } +} + +/// Where to move an existing transaction. +#[derive(Debug)] +pub(crate) enum Destination { + /// Discard the transaction. + Discard, + /// Move transaction to pool + Pool(SubPool), +} + +/// A change of the transaction's location +/// +/// NOTE: this guarantees that `current` and `destination` differ. +#[derive(Debug)] +pub(crate) struct PoolUpdate { + pub(crate) id: TransactionId, + pub(crate) hash: TxHash, + /// Where the transaction is currently held. + pub(crate) current: SubPool, + /// Where to move the transaction to + pub(crate) destination: Destination, +} + +/// The outcome of [TxPool::insert_tx] +#[derive(Debug)] +pub(crate) enum InsertResult { + /// Transaction was successfully inserted into the pool + Inserted { + transaction: Arc>, + move_to: SubPool, + state: TxState, + replaced_tx: Option<(Arc>, SubPool)>, + /// Additional updates to transactions affected by this change. + updates: Vec, + }, + /// Attempted to replace existing transaction, but was underpriced + Underpriced { transaction: Arc>, existing: TxHash }, +} + +// === impl InsertResult === + +#[allow(missing_docs)] +impl InsertResult { + fn is_underpriced(&self) -> bool { + matches!(self, InsertResult::Underpriced { .. }) + } +} + +/// The internal transaction typed used by `AllTransactions` which also additional info used for +/// determining the current state of the transaction. +pub(crate) struct PoolInternalTransaction { + /// The actual transaction object. + transaction: Arc>, + /// The `SubPool` that currently contains this transaction. + subpool: SubPool, + /// Keeps track of the current state of the transaction and therefor in which subpool it should + /// reside + state: TxState, + /// The total cost all transactions before this transaction. + /// + /// This is the combined `cost` of all transactions from the same sender that currently + /// come before this transaction. + cumulative_cost: U256, +} + +// === impl PoolInternalTransaction === + +impl PoolInternalTransaction { + fn next_cumulative_cost(&self) -> U256 { + self.cumulative_cost + self.transaction.cost + } +} + +/// Tracks the result after updating the pool +#[derive(Debug)] +pub struct UpdateOutcome { + /// transactions promoted to the ready queue + promoted: Vec, + /// transaction that failed and became discarded + discarded: Vec, + /// Transactions removed from the Ready pool + removed: Vec>>, +} + +impl Default for UpdateOutcome { + fn default() -> Self { + Self { promoted: vec![], discarded: vec![], removed: vec![] } + } +} + +/// Represents the outcome of a prune +pub struct PruneResult { + /// A list of added transactions that a pruned marker satisfied + pub promoted: Vec>, + /// all transactions that failed to be promoted and now are discarded + pub failed: Vec, + /// all transactions that were pruned from the ready pool + pub pruned: Vec>>, +} + +impl fmt::Debug for PruneResult { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "PruneResult {{ ")?; + write!( + fmt, + "promoted: {:?}, ", + self.promoted.iter().map(|tx| *tx.hash()).collect::>() + )?; + write!(fmt, "failed: {:?}, ", self.failed)?; + write!( + fmt, + "pruned: {:?}, ", + self.pruned.iter().map(|tx| *tx.transaction.hash()).collect::>() + )?; + write!(fmt, "}}")?; + Ok(()) + } +} + +/// Stores relevant context about a sender. +#[derive(Debug, Clone, Default)] +struct SenderInfo { + /// current nonce of the sender. + state_nonce: u64, + /// Balance of the sender at the current point. + balance: U256, +} + +// === impl SenderInfo === + +impl SenderInfo { + /// Creates a new entry for an incoming, not yet tracked sender. + fn new_incoming(state_nonce: u64, balance: U256) -> Self { + Self { state_nonce, balance } + } + + /// Updates the info with the new values. + fn update(&mut self, state_nonce: u64, balance: U256) { + *self = Self { state_nonce, balance }; + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_util::{mock_tx_pool, MockTransaction, MockTransactionFactory}; + + #[test] + fn test_simple_insert() { + let on_chain_balance = U256::zero(); + let on_chain_nonce = 0; + let mut f = MockTransactionFactory::default(); + let mut pool = AllTransactions::default(); + let tx = MockTransaction::eip1559().inc_price().inc_limit(); + let valid_tx = f.validated(tx.clone()); + let res = pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce); + match res { + InsertResult::Inserted { updates, replaced_tx, move_to, state, .. } => { + assert!(updates.is_empty()); + assert!(replaced_tx.is_none()); + assert!(state.contains(TxState::NO_NONCE_GAPS)); + assert!(!state.contains(TxState::ENOUGH_BALANCE)); + assert_eq!(move_to, SubPool::Queued); + } + InsertResult::Underpriced { .. } => { + panic!("not underpriced") + } + }; + assert_eq!(pool.len(), 1); + assert!(pool.contains(valid_tx.hash())); + let expected_state = TxState::ENOUGH_FEE_CAP_BLOCK | TxState::NO_NONCE_GAPS; + let inserted = pool.get(valid_tx.id()).unwrap(); + assert!(inserted.state.intersects(expected_state)); + + // insert the same tx again + let res = pool.insert_tx(valid_tx, on_chain_balance, on_chain_nonce); + assert!(res.is_underpriced()); + assert_eq!(pool.len(), 1); + + let valid_tx = f.validated(tx.next()); + let res = pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce); + match res { + InsertResult::Inserted { updates, replaced_tx, move_to, state, .. } => { + assert!(updates.is_empty()); + assert!(replaced_tx.is_none()); + assert!(state.contains(TxState::NO_NONCE_GAPS)); + assert!(!state.contains(TxState::ENOUGH_BALANCE)); + assert_eq!(move_to, SubPool::Queued); + } + InsertResult::Underpriced { .. } => { + panic!("not underpriced") + } + }; + assert!(pool.contains(valid_tx.hash())); + assert_eq!(pool.len(), 2); + let inserted = pool.get(valid_tx.id()).unwrap(); + assert!(inserted.state.intersects(expected_state)); + } + + #[test] + fn insert_replace() { + let on_chain_balance = U256::zero(); + let on_chain_nonce = 0; + let mut f = MockTransactionFactory::default(); + let mut pool = AllTransactions::default(); + let tx = MockTransaction::eip1559().inc_price().inc_limit(); + let first = f.validated(tx.clone()); + let res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); + let replacement = f.validated(tx.rng_hash().inc_price()); + let res = pool.insert_tx(replacement.clone(), on_chain_balance, on_chain_nonce); + match res { + InsertResult::Inserted { updates, replaced_tx, .. } => { + assert!(updates.is_empty()); + let replaced = replaced_tx.unwrap(); + assert_eq!(replaced.0.hash(), first.hash()); + } + InsertResult::Underpriced { .. } => { + panic!("not underpriced") + } + }; + assert!(!pool.contains(first.hash())); + assert!(pool.contains(replacement.hash())); + assert_eq!(pool.len(), 1); + } + + // insert nonce then nonce - 1 + #[test] + fn insert_previous() { + let on_chain_balance = U256::zero(); + let on_chain_nonce = 0; + let mut f = MockTransactionFactory::default(); + let mut pool = AllTransactions::default(); + let tx = MockTransaction::eip1559().inc_nonce().inc_price().inc_limit(); + let first = f.validated(tx.clone()); + let res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); + + let first_in_pool = pool.get(first.id()).unwrap(); + // has nonce gap + assert!(!first_in_pool.state.contains(TxState::NO_NONCE_GAPS)); + + let prev = f.validated(tx.prev()); + let res = pool.insert_tx(prev, on_chain_balance, on_chain_nonce); + + match res { + InsertResult::Inserted { updates, replaced_tx, state, move_to, .. } => { + // no updates since still in queued pool + assert!(updates.is_empty()); + assert!(replaced_tx.is_none()); + assert!(state.contains(TxState::NO_NONCE_GAPS)); + assert_eq!(move_to, SubPool::Queued); + } + InsertResult::Underpriced { .. } => { + panic!("not underpriced") + } + }; + + let first_in_pool = pool.get(first.id()).unwrap(); + // has non nonce gap + assert!(first_in_pool.state.contains(TxState::NO_NONCE_GAPS)); + } + + // insert nonce then nonce - 1 + #[test] + fn insert_with_updates() { + let on_chain_balance = U256::from(10_000); + let on_chain_nonce = 0; + let mut f = MockTransactionFactory::default(); + let mut pool = AllTransactions::default(); + let tx = MockTransaction::eip1559().inc_nonce().set_gas_price(100u64.into()).inc_limit(); + let first = f.validated(tx.clone()); + let res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); + + let first_in_pool = pool.get(first.id()).unwrap(); + // has nonce gap + assert!(!first_in_pool.state.contains(TxState::NO_NONCE_GAPS)); + assert_eq!(SubPool::Queued, first_in_pool.subpool); + + let prev = f.validated(tx.prev()); + let res = pool.insert_tx(prev, on_chain_balance, on_chain_nonce); + + match res { + InsertResult::Inserted { updates, replaced_tx, state, move_to, .. } => { + // updated previous tx + assert_eq!(updates.len(), 1); + assert!(replaced_tx.is_none()); + assert!(state.contains(TxState::NO_NONCE_GAPS)); + assert_eq!(move_to, SubPool::Pending); + } + InsertResult::Underpriced { .. } => { + panic!("not underpriced") + } + }; + + let first_in_pool = pool.get(first.id()).unwrap(); + // has non nonce gap + assert!(first_in_pool.state.contains(TxState::NO_NONCE_GAPS)); + assert_eq!(SubPool::Pending, first_in_pool.subpool); + } +} diff --git a/crates/transaction-pool/src/test_util.rs b/crates/transaction-pool/src/test_util.rs new file mode 100644 index 000000000..852e574c4 --- /dev/null +++ b/crates/transaction-pool/src/test_util.rs @@ -0,0 +1,375 @@ +//! Internal helpers for testing. +#![allow(missing_docs, unused)] + +use crate::{ + identifier::{SenderIdentifiers, TransactionId}, + pool::txpool::TxPool, + PoolTransaction, TransactionOrdering, ValidPoolTransaction, +}; +use paste::paste; +use reth_primitives::{Address, TxHash, H256, U256}; +use std::{sync::Arc, time::Instant}; + +pub type MockTxPool = TxPool; + +pub type MockValidTx = ValidPoolTransaction; + +/// Create an empty `TxPool` +pub fn mock_tx_pool() -> MockTxPool { + MockTxPool::new(Arc::new(Default::default())) +} + +/// Sets the value for the field +macro_rules! set_value { + ($this:ident => $field:ident) => { + let new_value = $field; + match $this { + MockTransaction::Legacy { ref mut $field, .. } => { + *$field = new_value; + } + MockTransaction::Eip1559 { ref mut $field, .. } => { + *$field = new_value; + } + } + }; +} + +/// Sets the value fo the field +macro_rules! get_value { + ($this:ident => $field:ident) => { + match $this { + MockTransaction::Legacy { $field, .. } => $field, + MockTransaction::Eip1559 { $field, .. } => $field, + } + }; +} + +// Generates all setters and getters +macro_rules! make_setters_getters { + ($($name:ident => $t:ty);*) => { + paste! { + $( + pub fn [](&mut self, $name: $t) -> &mut Self { + set_value!(self => $name); + self + } + + pub fn [](mut self, $name: $t) -> Self { + set_value!(self => $name); + self + } + + pub fn [](&self) -> $t { + get_value!(self => $name).clone() + } + + )* + + } + }; +} + +/// A Bare transaction type used for testing. +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum MockTransaction { + Legacy { + hash: H256, + sender: Address, + nonce: u64, + gas_price: U256, + gas_limit: u64, + value: U256, + }, + Eip1559 { + hash: H256, + sender: Address, + nonce: u64, + max_fee_per_gas: U256, + max_priority_fee_per_gas: U256, + gas_limit: u64, + value: U256, + }, +} + +// === impl MockTransaction === + +impl MockTransaction { + make_setters_getters! { + nonce => u64; + hash => H256; + sender => Address; + gas_limit => u64; + value => U256 + } + + /// Returns a new legacy transaction with random address and hash and empty values + pub fn legacy() -> Self { + MockTransaction::Legacy { + hash: H256::random(), + sender: Address::random(), + nonce: 0, + gas_price: U256::zero(), + gas_limit: 0, + value: Default::default(), + } + } + + /// Returns a new EIP1559 transaction with random address and hash and empty values + pub fn eip1559() -> Self { + MockTransaction::Eip1559 { + hash: H256::random(), + sender: Address::random(), + nonce: 0, + max_fee_per_gas: U256::zero(), + max_priority_fee_per_gas: U256::zero(), + gas_limit: 0, + value: Default::default(), + } + } + + pub fn set_priority_fee(&mut self, val: U256) -> &mut Self { + if let MockTransaction::Eip1559 { max_priority_fee_per_gas, .. } = self { + *max_priority_fee_per_gas = val; + } + self + } + + pub fn with_priority_fee(mut self, val: U256) -> Self { + if let MockTransaction::Eip1559 { ref mut max_priority_fee_per_gas, .. } = self { + *max_priority_fee_per_gas = val; + } + self + } + + pub fn get_priority_fee(&self) -> Option { + if let MockTransaction::Eip1559 { max_priority_fee_per_gas, .. } = self { + Some(*max_priority_fee_per_gas) + } else { + None + } + } + + pub fn set_max_fee(&mut self, val: U256) -> &mut Self { + if let MockTransaction::Eip1559 { max_fee_per_gas, .. } = self { + *max_fee_per_gas = val; + } + self + } + + pub fn with_max_fee(mut self, val: U256) -> Self { + if let MockTransaction::Eip1559 { ref mut max_fee_per_gas, .. } = self { + *max_fee_per_gas = val; + } + self + } + + pub fn get_max_fee(&self) -> Option { + if let MockTransaction::Eip1559 { max_fee_per_gas, .. } = self { + Some(*max_fee_per_gas) + } else { + None + } + } + + pub fn set_gas_price(&mut self, val: U256) -> &mut Self { + match self { + MockTransaction::Legacy { gas_price, .. } => { + *gas_price = val; + } + MockTransaction::Eip1559 { max_fee_per_gas, max_priority_fee_per_gas, .. } => { + *max_fee_per_gas = val; + *max_priority_fee_per_gas = val; + } + } + self + } + + pub fn with_gas_price(mut self, val: U256) -> Self { + match self { + MockTransaction::Legacy { ref mut gas_price, .. } => { + *gas_price = val; + } + MockTransaction::Eip1559 { + ref mut max_fee_per_gas, + ref mut max_priority_fee_per_gas, + .. + } => { + *max_fee_per_gas = val; + *max_priority_fee_per_gas = val; + } + } + self + } + + pub fn get_gas_price(&self) -> U256 { + match self { + MockTransaction::Legacy { gas_price, .. } => *gas_price, + MockTransaction::Eip1559 { max_fee_per_gas, .. } => *max_fee_per_gas, + } + } + + /// Returns a clone with a decreased nonce + pub fn prev(&self) -> Self { + let mut next = self.clone().with_hash(H256::random()); + next.with_nonce(self.get_nonce() - 1) + } + + /// Returns a clone with an increased nonce + pub fn next(&self) -> Self { + let mut next = self.clone().with_hash(H256::random()); + next.with_nonce(self.get_nonce() + 1) + } + + /// Returns a clone with an increased nonce + pub fn skip(&self, skip: u64) -> Self { + let mut next = self.clone().with_hash(H256::random()); + next.with_nonce(self.get_nonce() + skip + 1) + } + + /// Returns a clone with incremented nonce + pub fn inc_nonce(mut self) -> Self { + let nonce = self.get_nonce() + 1; + self.with_nonce(nonce) + } + + /// Sets a new random hash + pub fn rng_hash(mut self) -> Self { + self.with_hash(H256::random()) + } + + /// Returns a new transaction with a higher gas price +1 + pub fn inc_price(&self) -> Self { + let mut next = self.clone(); + let gas = self.get_gas_price() + 1; + next.with_gas_price(gas) + } + + /// Returns a new transaction with a higher value + pub fn inc_value(&self) -> Self { + let mut next = self.clone(); + let val = self.get_value() + 1; + next.with_value(val) + } + + /// Returns a new transaction with a higher gas limit + pub fn inc_limit(&self) -> Self { + let mut next = self.clone(); + let gas = self.get_gas_limit() + 1; + next.with_gas_limit(gas) + } +} + +impl PoolTransaction for MockTransaction { + fn hash(&self) -> &TxHash { + match self { + MockTransaction::Legacy { hash, .. } => hash, + MockTransaction::Eip1559 { hash, .. } => hash, + } + } + + fn sender(&self) -> &Address { + match self { + MockTransaction::Legacy { sender, .. } => sender, + MockTransaction::Eip1559 { sender, .. } => sender, + } + } + + fn nonce(&self) -> u64 { + match self { + MockTransaction::Legacy { nonce, .. } => *nonce, + MockTransaction::Eip1559 { nonce, .. } => *nonce, + } + } + + fn cost(&self) -> U256 { + match self { + MockTransaction::Legacy { gas_price, value, gas_limit, .. } => { + U256::from(*gas_limit) * *gas_price + *value + } + MockTransaction::Eip1559 { max_fee_per_gas, value, gas_limit, .. } => { + U256::from(*gas_limit) * *max_fee_per_gas + *value + } + } + } + + fn effective_gas_price(&self) -> U256 { + self.get_gas_price() + } + + fn gas_limit(&self) -> u64 { + self.get_gas_limit() + } + + fn max_fee_per_gas(&self) -> Option { + match self { + MockTransaction::Legacy { .. } => None, + MockTransaction::Eip1559 { max_fee_per_gas, .. } => Some(*max_fee_per_gas), + } + } + + fn max_priority_fee_per_gas(&self) -> Option { + match self { + MockTransaction::Legacy { .. } => None, + MockTransaction::Eip1559 { max_priority_fee_per_gas, .. } => { + Some(*max_priority_fee_per_gas) + } + } + } +} + +#[derive(Default)] +pub struct MockTransactionFactory { + ids: SenderIdentifiers, +} + +// === impl MockTransactionFactory === + +impl MockTransactionFactory { + pub fn tx_id(&mut self, tx: &MockTransaction) -> TransactionId { + let sender = self.ids.sender_id_or_create(tx.get_sender()); + TransactionId::new(sender, tx.get_nonce()) + } + + /// Converts the transaction into a validated transaction + pub fn validated(&mut self, transaction: MockTransaction) -> MockValidTx { + let transaction_id = self.tx_id(&transaction); + MockValidTx { + propagate: false, + is_local: false, + sender_id: transaction_id.sender, + transaction_id, + cost: transaction.cost(), + transaction, + timestamp: Instant::now(), + } + } + + pub fn create_legacy(&mut self) -> MockValidTx { + self.validated(MockTransaction::legacy()) + } + + pub fn create_eip1559(&mut self) -> MockValidTx { + self.validated(MockTransaction::eip1559()) + } +} + +#[derive(Default)] +#[non_exhaustive] +pub struct MockOrdering; + +impl TransactionOrdering for MockOrdering { + type Priority = U256; + type Transaction = MockTransaction; + + fn priority(&self, transaction: &Self::Transaction) -> Self::Priority { + transaction.cost() + } +} + +#[test] +fn test_mock_priority() { + let o = MockOrdering; + let lo = MockTransaction::eip1559(); + let hi = lo.next().inc_value(); + assert!(o.priority(&hi) > o.priority(&lo)); +} diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 9996f6a94..4efe83bb2 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -96,7 +96,7 @@ impl BestTransactions for std::iter::Empty { } /// Trait for transaction types used inside the pool -pub trait PoolTransaction: fmt::Debug + Send + Send + 'static { +pub trait PoolTransaction: fmt::Debug + Send + Sync + 'static { /// Hash of the transaction fn hash(&self) -> &TxHash; @@ -111,13 +111,19 @@ pub trait PoolTransaction: fmt::Debug + Send + Send + 'static { /// For EIP-1559 transactions that is `feeCap x gasLimit + transferred_value` fn cost(&self) -> U256; + /// Returns the effective gas price for this transaction. + fn effective_gas_price(&self) -> U256; + + /// Amount of gas that should be used in executing this transaction. This is paid up-front. + fn gas_limit(&self) -> u64; + /// Returns the EIP-1559 Max base fee the caller is willing to pay. /// /// This will return `None` for non-EIP1559 transactions - fn max_fee_per_gas(&self) -> Option<&U256>; + fn max_fee_per_gas(&self) -> Option; /// Returns the EIP-1559 Priority fee the caller is paying to the block author. /// /// This will return `None` for non-EIP1559 transactions - fn max_priority_fee_per_gas(&self) -> Option<&U256>; + fn max_priority_fee_per_gas(&self) -> Option; } diff --git a/crates/transaction-pool/src/validate.rs b/crates/transaction-pool/src/validate.rs index 904f5c387..859aed692 100644 --- a/crates/transaction-pool/src/validate.rs +++ b/crates/transaction-pool/src/validate.rs @@ -6,7 +6,7 @@ use crate::{ traits::PoolTransaction, }; use reth_primitives::{BlockID, TxHash, U256}; -use std::fmt; +use std::{fmt, time::Instant}; /// A Result type returned after checking a transaction's validity. pub enum TransactionValidationOutcome { @@ -51,29 +51,67 @@ pub trait TransactionValidator: Send + Sync { pub struct ValidPoolTransaction { /// The transaction pub transaction: T, - /// Ids required by the transaction. - /// - /// This lists all unique transactions that need to be mined before this transaction can be - /// considered `pending` and itself be included. - pub depends_on: Vec, /// The identifier for this transaction. pub transaction_id: TransactionId, /// Whether to propagate the transaction. pub propagate: bool, - /// Internal `Sender` identifier + /// Whether the tx is from a local source. + pub is_local: bool, + /// Internal `Sender` identifier. pub sender_id: SenderId, - /// Total cost of the transaction: `feeCap x gasLimit + transferred_value` + /// Total cost of the transaction: `feeCap x gasLimit + transferred_value`. pub cost: U256, - // TODO add a block timestamp that marks validity + /// Timestamp when this was added to the pool. + pub timestamp: Instant, } // === impl ValidPoolTransaction === impl ValidPoolTransaction { - /// Returns the hash of the transaction + /// Returns the hash of the transaction. pub fn hash(&self) -> &TxHash { self.transaction.hash() } + + /// Returns the internal identifier for this transaction. + pub(crate) fn id(&self) -> &TransactionId { + &self.transaction_id + } + + /// Returns the nonce set for this transaction. + pub(crate) fn nonce(&self) -> u64 { + self.transaction.nonce() + } + + /// Returns the EIP-1559 Max base fee the caller is willing to pay. + pub(crate) fn max_fee_per_gas(&self) -> Option { + self.transaction.max_fee_per_gas() + } + + /// Amount of gas that should be used in executing this transaction. This is paid up-front. + pub(crate) fn gas_limit(&self) -> u64 { + self.transaction.gas_limit() + } + + /// Returns true if this transaction is underpriced compared to the other. + pub(crate) fn is_underpriced(&self, other: &Self) -> bool { + self.transaction.effective_gas_price() <= other.transaction.effective_gas_price() + } +} + +#[cfg(test)] +impl Clone for ValidPoolTransaction { + fn clone(&self) -> Self { + Self { + transaction: self.transaction.clone(), + transaction_id: self.transaction_id, + propagate: self.propagate, + is_local: self.is_local, + sender_id: self.sender_id, + cost: self.cost, + timestamp: self.timestamp, + } + } } impl fmt::Debug for ValidPoolTransaction { @@ -81,7 +119,6 @@ impl fmt::Debug for ValidPoolTransaction { write!(fmt, "Transaction {{ ")?; write!(fmt, "hash: {:?}, ", &self.transaction.hash())?; write!(fmt, "provides: {:?}, ", &self.transaction_id)?; - write!(fmt, "depends_on: {:?}, ", &self.depends_on)?; write!(fmt, "raw tx: {:?}", &self.transaction)?; write!(fmt, "}}")?; Ok(())