diff --git a/crates/transaction-pool/src/pool/blob.rs b/crates/transaction-pool/src/pool/blob.rs index 65845f965..4503bf267 100644 --- a/crates/transaction-pool/src/pool/blob.rs +++ b/crates/transaction-pool/src/pool/blob.rs @@ -9,10 +9,12 @@ use std::{ sync::Arc, }; -/// A set of __all__ validated blob transactions in the pool. +use super::txpool::PendingFees; + +/// A set of validated blob transactions in the pool that are __not pending__. /// -/// The purpose of this pool is keep track of blob transactions that are either pending or queued -/// and to evict the worst blob transactions once the sub-pool is full. +/// The purpose of this pool is keep track of blob transactions that are queued and to evict the +/// worst blob transactions once the sub-pool is full. /// /// This expects that certain constraints are met: /// - blob transactions are always gap less @@ -22,7 +24,7 @@ pub(crate) struct BlobTransactions { /// This way we can determine when transactions were submitted to the pool. submission_id: u64, /// _All_ Transactions that are currently inside the pool grouped by their identifier. - by_id: BTreeMap>>, + by_id: BTreeMap>, /// _All_ transactions sorted by blob priority. all: BTreeSet>, /// Keeps track of the size of this pool. @@ -53,10 +55,10 @@ impl BlobTransactions { // keep track of size self.size_of += tx.size(); - self.by_id.insert(id, tx.clone()); - let ord = BlobOrd { submission_id }; let transaction = BlobTransaction { ord, transaction: tx }; + + self.by_id.insert(id, transaction.clone()); self.all.insert(transaction); } @@ -68,13 +70,12 @@ impl BlobTransactions { // remove from queues let tx = self.by_id.remove(id)?; - // TODO: remove from ordered set - // self.best.remove(&tx); + self.all.remove(&tx); // keep track of size self.size_of -= tx.transaction.size(); - Some(tx) + Some(tx.transaction) } /// Returns all transactions that satisfy the given basefee and blob_fee. @@ -101,6 +102,59 @@ impl BlobTransactions { self.by_id.len() } + /// Returns whether the pool is empty + #[cfg(test)] + #[allow(unused)] + pub(crate) fn is_empty(&self) -> bool { + self.by_id.is_empty() + } + + /// Returns all transactions which: + /// * have a `max_fee_per_blob_gas` greater than or equal to the given `blob_fee`, _and_ + /// * have a `max_fee_per_gas` greater than or equal to the given `base_fee` + fn satisfy_pending_fee_ids(&self, pending_fees: &PendingFees) -> Vec { + let mut transactions = Vec::new(); + { + let mut iter = self.by_id.iter().peekable(); + + while let Some((id, tx)) = iter.next() { + if tx.transaction.max_fee_per_blob_gas() < Some(pending_fees.blob_fee) || + tx.transaction.max_fee_per_gas() < pending_fees.base_fee as u128 + { + // still parked in blob pool -> skip descendant transactions + 'this: while let Some((peek, _)) = iter.peek() { + if peek.sender != id.sender { + break 'this + } + iter.next(); + } + } else { + transactions.push(*id); + } + } + } + transactions + } + + /// Removes all transactions (and their descendants) which: + /// * have a `max_fee_per_blob_gas` greater than or equal to the given `blob_fee`, _and_ + /// * have a `max_fee_per_gas` greater than or equal to the given `base_fee` + /// + /// Note: the transactions are not returned in a particular order. + pub(crate) fn enforce_pending_fees( + &mut self, + pending_fees: &PendingFees, + ) -> Vec>> { + let to_remove = self.satisfy_pending_fee_ids(pending_fees); + + let mut removed = Vec::with_capacity(to_remove.len()); + for id in to_remove { + removed.push(self.remove_transaction(&id).expect("transaction exists")); + } + + removed + } + /// Returns `true` if the transaction with the given id is already included in this pool. #[cfg(test)] #[allow(unused)] @@ -134,6 +188,12 @@ struct BlobTransaction { ord: BlobOrd, } +impl Clone for BlobTransaction { + fn clone(&self) -> Self { + Self { transaction: self.transaction.clone(), ord: self.ord.clone() } + } +} + impl Eq for BlobTransaction {} impl PartialEq for BlobTransaction { @@ -154,7 +214,7 @@ impl Ord for BlobTransaction { } } -#[derive(Debug)] +#[derive(Debug, Clone)] struct BlobOrd { /// Identifier that tags when transaction was submitted in the pool. pub(crate) submission_id: u64, diff --git a/crates/transaction-pool/src/pool/parked.rs b/crates/transaction-pool/src/pool/parked.rs index 633e526f7..fa741d32f 100644 --- a/crates/transaction-pool/src/pool/parked.rs +++ b/crates/transaction-pool/src/pool/parked.rs @@ -101,7 +101,7 @@ impl ParkedPool { self.by_id.len() } - /// Whether the pool is empty + /// Returns whether the pool is empty #[cfg(test)] #[allow(unused)] pub(crate) fn is_empty(&self) -> bool { diff --git a/crates/transaction-pool/src/pool/pending.rs b/crates/transaction-pool/src/pool/pending.rs index b4e71484f..31acc9132 100644 --- a/crates/transaction-pool/src/pool/pending.rs +++ b/crates/transaction-pool/src/pool/pending.rs @@ -150,6 +150,51 @@ impl PendingPool { self.by_id.values().map(|tx| tx.transaction.clone()) } + /// Updates the pool with the new blob fee. Removes + /// from the subpool all transactions and their dependents that no longer satisfy the given + /// base fee (`tx.max_blob_fee < blob_fee`). + /// + /// Note: the transactions are not returned in a particular order. + /// + /// # Returns + /// + /// Removed transactions that no longer satisfy the blob fee. + pub(crate) fn update_blob_fee( + &mut self, + blob_fee: u128, + ) -> Vec>> { + // Create a collection for removed transactions. + let mut removed = Vec::new(); + + // Drain and iterate over all transactions. + let mut transactions_iter = self.clear_transactions().into_iter().peekable(); + while let Some((id, tx)) = transactions_iter.next() { + if tx.transaction.max_fee_per_blob_gas() < Some(blob_fee) { + // Add this tx to the removed collection since it no longer satisfies the blob fee + // condition. Decrease the total pool size. + removed.push(Arc::clone(&tx.transaction)); + + // Remove all dependent transactions. + 'this: while let Some((next_id, next_tx)) = transactions_iter.peek() { + if next_id.sender != id.sender { + break 'this + } + removed.push(Arc::clone(&next_tx.transaction)); + transactions_iter.next(); + } + } else { + self.size_of += tx.transaction.size(); + if self.ancestor(&id).is_none() { + self.independent_transactions.insert(tx.clone()); + } + self.all.insert(tx.clone()); + self.by_id.insert(id, tx); + } + } + + removed + } + /// Updates the pool with the new base fee. Reorders transactions by new priorities. Removes /// from the subpool all transactions and their dependents that no longer satisfy the given /// base fee (`tx.fee < base_fee`). diff --git a/crates/transaction-pool/src/pool/state.rs b/crates/transaction-pool/src/pool/state.rs index 27a7f1486..b7058c7ad 100644 --- a/crates/transaction-pool/src/pool/state.rs +++ b/crates/transaction-pool/src/pool/state.rs @@ -66,7 +66,7 @@ impl TxState { } /// Identifier for the transaction Sub-pool -#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)] #[repr(u8)] pub enum SubPool { /// The queued sub-pool contains transactions that are not ready to be included in the next @@ -190,5 +190,10 @@ mod tests { state.remove(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK); assert!(state.is_blob()); assert!(!state.is_pending()); + + state.insert(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK); + state.remove(TxState::ENOUGH_FEE_CAP_BLOCK); + assert!(state.is_blob()); + assert!(!state.is_pending()); } } diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index 8dbe28db3..0330deed2 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -47,12 +47,14 @@ use std::{ /// B3[(Queued)] /// B1[(Pending)] /// B2[(Basefee)] +/// B4[(Blob)] /// end /// end /// discard([discard]) /// production([Block Production]) /// new([New Block]) /// A[Incoming Tx] --> B[Validation] -->|insert| pool +/// pool --> |if ready + blobfee too low| B4 /// pool --> |if ready| B1 /// pool --> |if ready + basfee too low| B2 /// pool --> |nonce gap or lack of funds| B3 @@ -60,8 +62,11 @@ use std::{ /// B1 --> |best| production /// B2 --> |worst| discard /// B3 --> |worst| discard -/// B1 --> |increased fee| B2 -/// B2 --> |decreased fee| B1 +/// B4 --> |worst| discard +/// B1 --> |increased blob fee| B4 +/// B4 --> |decreased blob fee| B1 +/// B1 --> |increased base fee| B2 +/// B2 --> |decreased base fee| B1 /// B3 --> |promote| B1 /// B3 --> |promote| B2 /// new --> |apply state changes| pool @@ -131,6 +136,8 @@ impl TxPool { basefee_size: self.basefee_pool.size(), queued: self.queued_pool.len(), queued_size: self.queued_pool.size(), + blob: self.blob_transactions.len(), + blob_size: self.blob_transactions.size(), total: self.all_transactions.len(), } } @@ -140,31 +147,108 @@ impl TxPool { BlockInfo { last_seen_block_hash: self.all_transactions.last_seen_block_hash, last_seen_block_number: self.all_transactions.last_seen_block_number, - pending_basefee: self.all_transactions.pending_basefee, - pending_blob_fee: Some(self.all_transactions.pending_blob_fee), + pending_basefee: self.all_transactions.pending_fees.base_fee, + pending_blob_fee: Some(self.all_transactions.pending_fees.blob_fee), } } /// Updates the tracked blob fee - fn update_blob_fee(&mut self, _pending_blob_fee: u128) { - // TODO: std::mem::swap pending_blob_fee - // TODO(mattsse): update blob txs + fn update_blob_fee(&mut self, mut pending_blob_fee: u128, base_fee_update: Ordering) { + std::mem::swap(&mut self.all_transactions.pending_fees.blob_fee, &mut pending_blob_fee); + match (self.all_transactions.pending_fees.blob_fee.cmp(&pending_blob_fee), base_fee_update) + { + (Ordering::Equal, Ordering::Equal) => { + // fee unchanged, nothing to update + } + (Ordering::Greater, Ordering::Equal) | + (Ordering::Equal, Ordering::Greater) | + (Ordering::Greater, Ordering::Greater) => { + // increased blob fee: recheck pending pool and remove all that are no longer valid + let removed = + self.pending_pool.update_blob_fee(self.all_transactions.pending_fees.blob_fee); + for tx in removed { + let to = { + let tx = + self.all_transactions.txs.get_mut(tx.id()).expect("tx exists in set"); + + // we unset the blob fee cap block flag, if the base fee is too high now + tx.state.remove(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK); + tx.subpool = tx.state.into(); + tx.subpool + }; + self.add_transaction_to_subpool(to, tx); + } + } + (Ordering::Less, Ordering::Equal) | (_, Ordering::Less) => { + // decreased blob fee or base fee: recheck blob pool and promote all that are now + // valid + let removed = self + .blob_transactions + .enforce_pending_fees(&self.all_transactions.pending_fees); + for tx in removed { + let to = { + let tx = + self.all_transactions.txs.get_mut(tx.id()).expect("tx exists in set"); + tx.state.insert(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK); + tx.state.insert(TxState::ENOUGH_FEE_CAP_BLOCK); + tx.subpool = tx.state.into(); + tx.subpool + }; + self.add_transaction_to_subpool(to, tx); + } + } + (Ordering::Less, Ordering::Greater) => { + // increased blob fee: recheck pending pool and remove all that are no longer valid + let removed = + self.pending_pool.update_blob_fee(self.all_transactions.pending_fees.blob_fee); + for tx in removed { + let to = { + let tx = + self.all_transactions.txs.get_mut(tx.id()).expect("tx exists in set"); + + // we unset the blob fee cap block flag, if the base fee is too high now + tx.state.remove(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK); + tx.subpool = tx.state.into(); + tx.subpool + }; + self.add_transaction_to_subpool(to, tx); + } + + // decreased blob fee or base fee: recheck blob pool and promote all that are now + // valid + let removed = self + .blob_transactions + .enforce_pending_fees(&self.all_transactions.pending_fees); + for tx in removed { + let to = { + let tx = + self.all_transactions.txs.get_mut(tx.id()).expect("tx exists in set"); + tx.state.insert(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK); + tx.state.insert(TxState::ENOUGH_FEE_CAP_BLOCK); + tx.subpool = tx.state.into(); + tx.subpool + }; + self.add_transaction_to_subpool(to, tx); + } + } + } } /// Updates the tracked basefee /// /// Depending on the change in direction of the basefee, this will promote or demote /// transactions from the basefee pool. - fn update_basefee(&mut self, mut pending_basefee: u64) { - std::mem::swap(&mut self.all_transactions.pending_basefee, &mut pending_basefee); - match self.all_transactions.pending_basefee.cmp(&pending_basefee) { + fn update_basefee(&mut self, mut pending_basefee: u64) -> Ordering { + std::mem::swap(&mut self.all_transactions.pending_fees.base_fee, &mut pending_basefee); + match self.all_transactions.pending_fees.base_fee.cmp(&pending_basefee) { Ordering::Equal => { // fee unchanged, nothing to update + Ordering::Equal } Ordering::Greater => { // increased base fee: recheck pending pool and remove all that are no longer valid let removed = - self.pending_pool.update_base_fee(self.all_transactions.pending_basefee); + self.pending_pool.update_base_fee(self.all_transactions.pending_fees.base_fee); for tx in removed { let to = { let tx = @@ -175,11 +259,13 @@ impl TxPool { }; self.add_transaction_to_subpool(to, tx); } + + Ordering::Greater } Ordering::Less => { // decreased base fee: recheck basefee pool and promote all that are now valid let removed = - self.basefee_pool.enforce_basefee(self.all_transactions.pending_basefee); + self.basefee_pool.enforce_basefee(self.all_transactions.pending_fees.base_fee); for tx in removed { let to = { let tx = @@ -190,6 +276,8 @@ impl TxPool { }; self.add_transaction_to_subpool(to, tx); } + + Ordering::Less } } } @@ -206,10 +294,10 @@ impl TxPool { } = info; self.all_transactions.last_seen_block_hash = last_seen_block_hash; self.all_transactions.last_seen_block_number = last_seen_block_number; - self.update_basefee(pending_basefee); + let basefee_ordering = self.update_basefee(pending_basefee); if let Some(blob_fee) = pending_blob_fee { - self.update_blob_fee(blob_fee) + self.update_blob_fee(blob_fee, basefee_ordering) } } @@ -225,7 +313,7 @@ impl TxPool { basefee: u64, ) -> Box>>> { - match basefee.cmp(&self.all_transactions.pending_basefee) { + match basefee.cmp(&self.all_transactions.pending_fees.base_fee) { Ordering::Equal => { // fee unchanged, nothing to shift Box::new(self.best_transactions()) @@ -240,7 +328,7 @@ impl TxPool { let unlocked = self.basefee_pool.satisfy_base_fee_transactions(basefee); Box::new( self.pending_pool - .best_with_unlocked(unlocked, self.all_transactions.pending_basefee), + .best_with_unlocked(unlocked, self.all_transactions.pending_fees.base_fee), ) } } @@ -253,7 +341,8 @@ impl TxPool { best_transactions_attributes: BestTransactionsAttributes, ) -> Box>>> { - match best_transactions_attributes.basefee.cmp(&self.all_transactions.pending_basefee) { + match best_transactions_attributes.basefee.cmp(&self.all_transactions.pending_fees.base_fee) + { Ordering::Equal => { // fee unchanged, nothing to shift Box::new(self.best_transactions()) @@ -268,12 +357,10 @@ impl TxPool { let unlocked_with_blob = self.blob_transactions.satisfy_attributes(best_transactions_attributes); - Box::new( - self.pending_pool.best_with_unlocked( - unlocked_with_blob, - self.all_transactions.pending_basefee, - ), - ) + Box::new(self.pending_pool.best_with_unlocked( + unlocked_with_blob, + self.all_transactions.pending_fees.base_fee, + )) } } } @@ -658,7 +745,7 @@ impl TxPool { self.queued_pool.add_transaction(tx); } SubPool::Pending => { - self.pending_pool.add_transaction(tx, self.all_transactions.pending_basefee); + self.pending_pool.add_transaction(tx, self.all_transactions.pending_fees.base_fee); } SubPool::BaseFee => { self.basefee_pool.add_transaction(tx); @@ -751,8 +838,8 @@ impl TxPool { #[cfg(any(test, feature = "test-utils"))] pub fn assert_invariants(&self) { let size = self.size(); - let actual = size.basefee + size.pending + size.queued; - assert_eq!(size.total, actual, "total size must be equal to the sum of all sub-pools, basefee:{}, pending:{}, queued:{}", size.basefee, size.pending, size.queued); + let actual = size.basefee + size.pending + size.queued + size.blob; + assert_eq!(size.total, actual, "total size must be equal to the sum of all sub-pools, basefee:{}, pending:{}, queued:{}, blob:{}", size.basefee, size.pending, size.queued, size.blob); self.all_transactions.assert_invariants(); self.pending_pool.assert_invariants(); self.basefee_pool.assert_invariants(); @@ -822,10 +909,8 @@ pub(crate) struct AllTransactions { last_seen_block_number: u64, /// The current block hash the pool keeps track of. last_seen_block_hash: B256, - /// Expected base fee for the pending block. - pending_basefee: u64, - /// Expected blob fee for the pending block. - pending_blob_fee: u128, + /// Expected blob and base fee for the pending block. + pending_fees: PendingFees, /// Configured price bump settings for replacements price_bumps: PriceBumpConfig, } @@ -892,9 +977,9 @@ impl AllTransactions { } = block_info; self.last_seen_block_number = last_seen_block_number; self.last_seen_block_hash = last_seen_block_hash; - self.pending_basefee = pending_basefee; + self.pending_fees.base_fee = pending_basefee; if let Some(pending_blob_fee) = pending_blob_fee { - self.pending_blob_fee = pending_blob_fee; + self.pending_fees.blob_fee = pending_blob_fee; } } @@ -985,7 +1070,7 @@ impl AllTransactions { tx.state.insert(TxState::NO_PARKED_ANCESTORS); // Update the first transaction of this sender. - Self::update_tx_base_fee(self.pending_basefee, tx); + Self::update_tx_base_fee(self.pending_fees.base_fee, tx); // Track if the transaction's sub-pool changed. Self::record_subpool_update(&mut updates, tx); @@ -1031,7 +1116,7 @@ impl AllTransactions { has_parked_ancestor = !tx.state.is_pending(); // Update and record sub-pool changes. - Self::update_tx_base_fee(self.pending_basefee, tx); + Self::update_tx_base_fee(self.pending_fees.base_fee, tx); Self::record_subpool_update(&mut updates, tx); // Advance iterator @@ -1376,7 +1461,7 @@ impl AllTransactions { transaction = self.ensure_valid_blob_transaction(transaction, on_chain_balance, ancestor)?; let blob_fee_cap = transaction.transaction.max_fee_per_blob_gas().unwrap_or_default(); - if blob_fee_cap >= self.pending_blob_fee { + if blob_fee_cap >= self.pending_fees.blob_fee { state.insert(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK); } } else { @@ -1398,7 +1483,7 @@ impl AllTransactions { if fee_cap < self.minimal_protocol_basefee as u128 { return Err(InsertErr::FeeCapBelowMinimumProtocolFeeCap { transaction, fee_cap }) } - if fee_cap >= self.pending_basefee as u128 { + if fee_cap >= self.pending_fees.base_fee as u128 { state.insert(TxState::ENOUGH_FEE_CAP_BLOCK); } @@ -1572,13 +1657,27 @@ impl Default for AllTransactions { tx_counter: Default::default(), last_seen_block_number: 0, last_seen_block_hash: Default::default(), - pending_basefee: Default::default(), - pending_blob_fee: BLOB_TX_MIN_BLOB_GASPRICE, + pending_fees: Default::default(), price_bumps: Default::default(), } } } +/// Represents updated fees for the pending block. +#[derive(Debug, Clone)] +pub(crate) struct PendingFees { + /// The pending base fee + pub(crate) base_fee: u64, + /// The pending blob fee + pub(crate) blob_fee: u128, +} + +impl Default for PendingFees { + fn default() -> Self { + PendingFees { base_fee: Default::default(), blob_fee: BLOB_TX_MIN_BLOB_GASPRICE } + } +} + /// Result type for inserting a transaction pub(crate) type InsertResult = Result, InsertErr>; @@ -1752,9 +1851,12 @@ mod tests { let on_chain_balance = U256::MAX; let on_chain_nonce = 0; let mut f = MockTransactionFactory::default(); - let mut pool = AllTransactions { pending_blob_fee: 10_000_000, ..Default::default() }; - pool.pending_blob_fee = 10_000_000; + let mut pool = AllTransactions { + pending_fees: PendingFees { blob_fee: 10_000_000, ..Default::default() }, + ..Default::default() + }; let tx = MockTransaction::eip4844().inc_price().inc_limit(); + pool.pending_fees.blob_fee = tx.max_fee_per_blob_gas().unwrap() + 1; let valid_tx = f.validated(tx); let InsertOk { state, .. } = pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce).unwrap(); @@ -1764,6 +1866,321 @@ mod tests { let _ = pool.txs.get(&valid_tx.transaction_id).unwrap(); } + #[test] + fn test_valid_tx_with_decreasing_blob_fee() { + let on_chain_balance = U256::MAX; + let on_chain_nonce = 0; + let mut f = MockTransactionFactory::default(); + let mut pool = AllTransactions { + pending_fees: PendingFees { blob_fee: 10_000_000, ..Default::default() }, + ..Default::default() + }; + let tx = MockTransaction::eip4844().inc_price().inc_limit(); + + pool.pending_fees.blob_fee = tx.max_fee_per_blob_gas().unwrap() + 1; + let valid_tx = f.validated(tx.clone()); + let InsertOk { state, .. } = + pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce).unwrap(); + assert!(state.contains(TxState::NO_NONCE_GAPS)); + assert!(!state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK)); + + let _ = pool.txs.get(&valid_tx.transaction_id).unwrap(); + pool.remove_transaction(&valid_tx.transaction_id); + + pool.pending_fees.blob_fee = tx.max_fee_per_blob_gas().unwrap(); + let InsertOk { state, .. } = + pool.insert_tx(valid_tx.clone(), on_chain_balance, on_chain_nonce).unwrap(); + assert!(state.contains(TxState::NO_NONCE_GAPS)); + assert!(state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK)); + } + + #[test] + fn test_demote_valid_tx_with_increasing_blob_fee() { + let on_chain_balance = U256::MAX; + let on_chain_nonce = 0; + let mut f = MockTransactionFactory::default(); + let mut pool = TxPool::new(MockOrdering::default(), Default::default()); + let tx = MockTransaction::eip4844().inc_price().inc_limit(); + + // set block info so the tx is initially underpriced w.r.t. blob fee + let mut block_info = pool.block_info(); + block_info.pending_blob_fee = Some(tx.max_fee_per_blob_gas().unwrap()); + pool.set_block_info(block_info); + + let validated = f.validated(tx.clone()); + let id = *validated.id(); + pool.add_transaction(validated, on_chain_balance, on_chain_nonce).unwrap(); + + // assert pool lengths + assert!(pool.blob_transactions.is_empty()); + assert_eq!(pool.pending_pool.len(), 1); + + // check tx state and derived subpool + let internal_tx = pool.all_transactions.txs.get(&id).unwrap(); + assert!(internal_tx.state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK)); + assert_eq!(internal_tx.subpool, SubPool::Pending); + + // set block info so the pools are updated + block_info.pending_blob_fee = Some(tx.max_fee_per_blob_gas().unwrap() + 1); + pool.set_block_info(block_info); + + // check that the tx is promoted + let internal_tx = pool.all_transactions.txs.get(&id).unwrap(); + assert!(!internal_tx.state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK)); + assert_eq!(internal_tx.subpool, SubPool::Blob); + + // make sure the blob transaction was promoted into the pending pool + assert_eq!(pool.blob_transactions.len(), 1); + assert!(pool.pending_pool.is_empty()); + } + + #[test] + fn test_promote_valid_tx_with_decreasing_blob_fee() { + let on_chain_balance = U256::MAX; + let on_chain_nonce = 0; + let mut f = MockTransactionFactory::default(); + let mut pool = TxPool::new(MockOrdering::default(), Default::default()); + let tx = MockTransaction::eip4844().inc_price().inc_limit(); + + // set block info so the tx is initially underpriced w.r.t. blob fee + let mut block_info = pool.block_info(); + block_info.pending_blob_fee = Some(tx.max_fee_per_blob_gas().unwrap() + 1); + pool.set_block_info(block_info); + + let validated = f.validated(tx.clone()); + let id = *validated.id(); + pool.add_transaction(validated, on_chain_balance, on_chain_nonce).unwrap(); + + // assert pool lengths + assert!(pool.pending_pool.is_empty()); + assert_eq!(pool.blob_transactions.len(), 1); + + // check tx state and derived subpool + let internal_tx = pool.all_transactions.txs.get(&id).unwrap(); + assert!(!internal_tx.state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK)); + assert_eq!(internal_tx.subpool, SubPool::Blob); + + // set block info so the pools are updated + block_info.pending_blob_fee = Some(tx.max_fee_per_blob_gas().unwrap()); + pool.set_block_info(block_info); + + // check that the tx is promoted + let internal_tx = pool.all_transactions.txs.get(&id).unwrap(); + assert!(internal_tx.state.contains(TxState::ENOUGH_BLOB_FEE_CAP_BLOCK)); + assert_eq!(internal_tx.subpool, SubPool::Pending); + + // make sure the blob transaction was promoted into the pending pool + assert_eq!(pool.pending_pool.len(), 1); + assert!(pool.blob_transactions.is_empty()); + } + + /// A struct representing a txpool promotion test instance + #[derive(Debug, PartialEq, Eq, Clone, Hash)] + struct PromotionTest { + /// The basefee at the start of the test + basefee: u64, + /// The blobfee at the start of the test + blobfee: u128, + /// The subpool at the start of the test + subpool: SubPool, + /// The basefee update + basefee_update: u64, + /// The blobfee update + blobfee_update: u128, + /// The subpool after the update + new_subpool: SubPool, + } + + impl PromotionTest { + /// Returns the test case for the opposite update + fn opposite(&self) -> Self { + Self { + basefee: self.basefee_update, + blobfee: self.blobfee_update, + subpool: self.new_subpool, + blobfee_update: self.blobfee, + basefee_update: self.basefee, + new_subpool: self.subpool, + } + } + + fn assert_subpool_lengths( + &self, + pool: &TxPool, + failure_message: String, + check_subpool: SubPool, + ) { + match check_subpool { + SubPool::Blob => { + assert_eq!(pool.blob_transactions.len(), 1, "{failure_message}"); + assert!(pool.pending_pool.is_empty(), "{failure_message}"); + assert!(pool.basefee_pool.is_empty(), "{failure_message}"); + assert!(pool.queued_pool.is_empty(), "{failure_message}"); + } + SubPool::Pending => { + assert!(pool.blob_transactions.is_empty(), "{failure_message}"); + assert_eq!(pool.pending_pool.len(), 1, "{failure_message}"); + assert!(pool.basefee_pool.is_empty(), "{failure_message}"); + assert!(pool.queued_pool.is_empty(), "{failure_message}"); + } + SubPool::BaseFee => { + assert!(pool.blob_transactions.is_empty(), "{failure_message}"); + assert!(pool.pending_pool.is_empty(), "{failure_message}"); + assert_eq!(pool.basefee_pool.len(), 1, "{failure_message}"); + assert!(pool.queued_pool.is_empty(), "{failure_message}"); + } + SubPool::Queued => { + assert!(pool.blob_transactions.is_empty(), "{failure_message}"); + assert!(pool.pending_pool.is_empty(), "{failure_message}"); + assert!(pool.basefee_pool.is_empty(), "{failure_message}"); + assert_eq!(pool.queued_pool.len(), 1, "{failure_message}"); + } + } + } + + /// Runs an assertion on the provided pool, ensuring that the transaction is in the correct + /// subpool based on the starting condition of the test, assuming the pool contains only a + /// single transaction. + fn assert_single_tx_starting_subpool(&self, pool: &TxPool) { + self.assert_subpool_lengths( + pool, + format!("pool length check failed at start of test: {self:?}"), + self.subpool, + ); + } + + /// Runs an assertion on the provided pool, ensuring that the transaction is in the correct + /// subpool based on the ending condition of the test, assuming the pool contains only a + /// single transaction. + fn assert_single_tx_ending_subpool(&self, pool: &TxPool) { + self.assert_subpool_lengths( + pool, + format!("pool length check failed at end of test: {self:?}"), + self.new_subpool, + ); + } + } + + #[test] + fn test_promote_blob_tx_with_both_pending_fee_updates() { + // this exhaustively tests all possible promotion scenarios for a single transaction moving + // between the blob and pending pool + let on_chain_balance = U256::MAX; + let on_chain_nonce = 0; + let mut f = MockTransactionFactory::default(); + let tx = MockTransaction::eip4844().inc_price().inc_limit(); + + let max_fee_per_blob_gas = tx.max_fee_per_blob_gas().unwrap(); + let max_fee_per_gas = tx.max_fee_per_gas() as u64; + + // These are all _promotion_ tests or idempotent tests. + let mut expected_promotions = vec![ + PromotionTest { + blobfee: max_fee_per_blob_gas + 1, + basefee: max_fee_per_gas + 1, + subpool: SubPool::Blob, + blobfee_update: max_fee_per_blob_gas + 1, + basefee_update: max_fee_per_gas + 1, + new_subpool: SubPool::Blob, + }, + PromotionTest { + blobfee: max_fee_per_blob_gas + 1, + basefee: max_fee_per_gas + 1, + subpool: SubPool::Blob, + blobfee_update: max_fee_per_blob_gas, + basefee_update: max_fee_per_gas + 1, + new_subpool: SubPool::Blob, + }, + PromotionTest { + blobfee: max_fee_per_blob_gas + 1, + basefee: max_fee_per_gas + 1, + subpool: SubPool::Blob, + blobfee_update: max_fee_per_blob_gas + 1, + basefee_update: max_fee_per_gas, + new_subpool: SubPool::Blob, + }, + PromotionTest { + blobfee: max_fee_per_blob_gas + 1, + basefee: max_fee_per_gas + 1, + subpool: SubPool::Blob, + blobfee_update: max_fee_per_blob_gas, + basefee_update: max_fee_per_gas, + new_subpool: SubPool::Pending, + }, + PromotionTest { + blobfee: max_fee_per_blob_gas, + basefee: max_fee_per_gas + 1, + subpool: SubPool::Blob, + blobfee_update: max_fee_per_blob_gas, + basefee_update: max_fee_per_gas, + new_subpool: SubPool::Pending, + }, + PromotionTest { + blobfee: max_fee_per_blob_gas + 1, + basefee: max_fee_per_gas, + subpool: SubPool::Blob, + blobfee_update: max_fee_per_blob_gas, + basefee_update: max_fee_per_gas, + new_subpool: SubPool::Pending, + }, + PromotionTest { + blobfee: max_fee_per_blob_gas, + basefee: max_fee_per_gas, + subpool: SubPool::Pending, + blobfee_update: max_fee_per_blob_gas, + basefee_update: max_fee_per_gas, + new_subpool: SubPool::Pending, + }, + ]; + + // extend the test cases with reversed updates - this will add all _demotion_ tests + let reversed = expected_promotions.iter().map(|test| test.opposite()).collect::>(); + expected_promotions.extend(reversed); + + // dedup the test cases + let expected_promotions = expected_promotions.into_iter().collect::>(); + + for promotion_test in expected_promotions.iter() { + let mut pool = TxPool::new(MockOrdering::default(), Default::default()); + + // set block info so the tx is initially underpriced w.r.t. blob fee + let mut block_info = pool.block_info(); + + block_info.pending_blob_fee = Some(promotion_test.blobfee); + block_info.pending_basefee = promotion_test.basefee; + pool.set_block_info(block_info); + + let validated = f.validated(tx.clone()); + let id = *validated.id(); + pool.add_transaction(validated, on_chain_balance, on_chain_nonce).unwrap(); + + // assert pool lengths + promotion_test.assert_single_tx_starting_subpool(&pool); + + // check tx state and derived subpool, it should not move into the blob pool + let internal_tx = pool.all_transactions.txs.get(&id).unwrap(); + assert_eq!( + internal_tx.subpool, promotion_test.subpool, + "Subpools do not match at start of test: {promotion_test:?}" + ); + + // set block info with new base fee + block_info.pending_basefee = promotion_test.basefee_update; + block_info.pending_blob_fee = Some(promotion_test.blobfee_update); + pool.set_block_info(block_info); + + // check tx state and derived subpool, it should not move into the blob pool + let internal_tx = pool.all_transactions.txs.get(&id).unwrap(); + assert_eq!( + internal_tx.subpool, promotion_test.new_subpool, + "Subpools do not match at end of test: {promotion_test:?}" + ); + + // assert new pool lengths + promotion_test.assert_single_tx_ending_subpool(&pool); + } + } + #[test] fn test_insert_pending() { let on_chain_balance = U256::MAX; @@ -2006,7 +2423,7 @@ mod tests { let on_chain_nonce = 0; let mut f = MockTransactionFactory::default(); let mut pool = AllTransactions::default(); - pool.pending_basefee = pool.minimal_protocol_basefee.checked_add(1).unwrap(); + pool.pending_fees.base_fee = pool.minimal_protocol_basefee.checked_add(1).unwrap(); let tx = MockTransaction::eip1559().inc_nonce().inc_limit(); let first = f.validated(tx.clone()); @@ -2014,7 +2431,7 @@ mod tests { let first_in_pool = pool.get(first.id()).unwrap(); - assert!(tx.get_gas_price() < pool.pending_basefee as u128); + assert!(tx.get_gas_price() < pool.pending_fees.base_fee as u128); // has nonce gap assert!(!first_in_pool.state.contains(TxState::NO_NONCE_GAPS)); diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 01162f427..398fcec1b 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -1031,6 +1031,10 @@ pub struct PoolSize { pub pending: usize, /// Reported size of transactions in the _pending_ sub-pool. pub pending_size: usize, + /// Number of transactions in the _blob_ pool. + pub blob: usize, + /// Reported size of transactions in the _blob_ pool. + pub blob_size: usize, /// Number of transactions in the _basefee_ pool. pub basefee: usize, /// Reported size of transactions in the _basefee_ sub-pool. @@ -1051,7 +1055,7 @@ impl PoolSize { /// Asserts that the invariants of the pool size are met. #[cfg(test)] pub(crate) fn assert_invariants(&self) { - assert_eq!(self.total, self.pending + self.basefee + self.queued); + assert_eq!(self.total, self.pending + self.basefee + self.queued + self.blob); } } diff --git a/crates/transaction-pool/src/validate/mod.rs b/crates/transaction-pool/src/validate/mod.rs index 1d72170a7..54eac2787 100644 --- a/crates/transaction-pool/src/validate/mod.rs +++ b/crates/transaction-pool/src/validate/mod.rs @@ -275,6 +275,13 @@ impl ValidPoolTransaction { self.transaction.cost() } + /// Returns the EIP-4844 max blob fee the caller is willing to pay. + /// + /// For non-EIP-4844 transactions, this returns [None]. + pub fn max_fee_per_blob_gas(&self) -> Option { + self.transaction.max_fee_per_blob_gas() + } + /// Returns the EIP-1559 Max base fee the caller is willing to pay. /// /// For legacy transactions this is `gas_price`.