From 2e6b18a1547ef01bf30a5141a29ec8ffe081f331 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 20 Oct 2022 17:34:04 +0200 Subject: [PATCH] wip: feat(txpool): add on block update (#69) * feat(txpool): add on new block handler * feat(txpool): work on update routine * feat(txpool): start update process * feat: implement update loop --- crates/transaction-pool/src/lib.rs | 2 +- crates/transaction-pool/src/pool/mod.rs | 6 +- crates/transaction-pool/src/pool/state.rs | 12 ++ crates/transaction-pool/src/pool/txpool.rs | 194 +++++++++++++++++---- crates/transaction-pool/src/pool/update.rs | 32 ++++ crates/transaction-pool/src/traits.rs | 15 +- 6 files changed, 218 insertions(+), 43 deletions(-) create mode 100644 crates/transaction-pool/src/pool/update.rs diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 305652edc..061555455 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -178,7 +178,7 @@ where self.pool.status() } - async fn on_new_block(&self, _event: NewBlockEvent) { + fn on_new_block(&self, _event: NewBlockEvent) { // TODO perform maintenance: update pool accordingly todo!() } diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 130c99816..94378331d 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -92,6 +92,7 @@ pub(crate) mod size; pub(crate) mod state; mod transaction; pub mod txpool; +mod update; /// Transaction pool internals. pub struct PoolInner { @@ -141,11 +142,6 @@ where self.identifiers.write().sender_id_or_create(addr) } - /// Updates the pool - pub(crate) fn update_base_fee(&self, base_fee: U256) { - self.pool.write().update_base_fee(base_fee); - } - /// Get the validator reference. pub fn validator(&self) -> &V { &self.validator diff --git a/crates/transaction-pool/src/pool/state.rs b/crates/transaction-pool/src/pool/state.rs index 0a3af14aa..e5a759997 100644 --- a/crates/transaction-pool/src/pool/state.rs +++ b/crates/transaction-pool/src/pool/state.rs @@ -40,6 +40,18 @@ impl TxState { pub(crate) fn is_pending(&self) -> bool { *self >= TxState::PENDING_POOL_BITS } + + /// Returns `true` if the `ENOUGH_FEE_CAP_BLOCK` bit is set. + #[inline] + pub(crate) fn has_enough_fee_cap(&self) -> bool { + self.intersects(TxState::ENOUGH_FEE_CAP_BLOCK) + } + + /// Returns `true` if the transaction has a nonce gap. + #[inline] + pub(crate) fn has_nonce_gap(&self) -> bool { + !self.intersects(TxState::NO_NONCE_GAPS) + } } /// Identifier for the used Sub-pool diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index 815056a7c..165784513 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -8,14 +8,17 @@ use crate::{ parked::{BasefeeOrd, ParkedPool, QueuedOrd}, pending::PendingPool, state::{SubPool, TxState}, + update::{Destination, PoolUpdate}, AddedPendingTransaction, AddedTransaction, }, - traits::PoolStatus, - PoolConfig, PoolResult, PoolTransaction, TransactionOrdering, ValidPoolTransaction, U256, + traits::{PoolStatus, StateDiff}, + NewBlockEvent, PoolConfig, PoolResult, PoolTransaction, TransactionOrdering, + ValidPoolTransaction, U256, }; use fnv::FnvHashMap; use reth_primitives::TxHash; use std::{ + cmp::Ordering, collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet}, fmt, ops::Bound::{Excluded, Unbounded}, @@ -145,6 +148,25 @@ impl TxPool { txs.into_iter().filter_map(|tx| self.get(&tx)) } + /// Updates the entire pool after a new block was mined. + /// + /// This removes all mined transactions, updates according to the new base fee and rechecks + /// sender allowance. + pub(crate) fn on_new_block(&mut self, block: NewBlockEvent) { + // Remove all transaction that were included in the block + for mined in &block.mined_transactions { + self.all_transactions.remove_transaction(mined.id()); + self.pending_pool.remove_transaction(mined.id()); + } + + // Apply the state changes to the total set of transactions which triggers sub-pool updates. + let updates = + self.all_transactions.update(block.pending_block_base_fee, &block.state_changes); + + // Process the sub-pool updates + self.process_updates(updates); + } + /// Adds the transaction into the pool. /// /// This pool consists of two three-pools: `Queued`, `Pending` and `BaseFee`. @@ -452,6 +474,139 @@ impl AllTransactions { } } + /// Rechecks all transactions in the pool against the changes. + /// + /// Possible changes are: + /// + /// For all transactions: + /// - decreased basefee: promotes from `basefee` to `pending` sub-pool. + /// - increased basefee: demotes from `pending` to `basefee` sub-pool. + /// Individually: + /// - decreased sender allowance: demote from (`basefee`|`pending`) to `queued`. + /// - increased sender allowance: promote from `queued` to + /// - `pending` if basefee condition is met. + /// - `basefee` if basefee condition is _not_ met. + /// + /// Additionally, this will also update the `cumulative_gas_used` for transactions of a sender + /// that got transaction included in the block. + pub(crate) fn update( + &mut self, + pending_block_base_fee: U256, + state_diffs: &StateDiff, + ) -> Vec { + // update new basefee + self.pending_basefee = pending_block_base_fee; + + // TODO(mattsse): probably good idea to allocate some capacity here. + let mut updates = Vec::new(); + + let mut iter = self.txs.iter_mut().peekable(); + + // Loop over all individual senders and update all affected transactions. + // One sender may have up to `max_account_slots` transactions here, which means, worst case + // `max_accounts_slots` need to be updated, for example if the first transaction is blocked + // due to too low base fee. + // However, we don't have to necessarily check every transaction of a sender. If no updates + // are possible (nonce gap) then we can skip to the next sender. + + // The `unique_sender` loop will process the first transaction of all senders, update its + // state and internally update all consecutive transactions + 'unique_sender: while let Some((id, tx)) = iter.next() { + // Advances the iterator to the next sender + macro_rules! next_sender { + ($iter:ident) => { + 'this: while let Some((peek, _)) = iter.peek() { + if peek.sender != id.sender { + break 'this + } + iter.next(); + } + }; + } + // If there's a nonce gap, we can shortcircuit, because there's nothing to update. + if tx.state.has_nonce_gap() { + next_sender!(iter); + continue + } + + // TODO(mattsse): if account has balance changes or mined transactions the balance needs + // to be checked here + + // Since this is the first transaction of the sender, it has no parked ancestors + tx.state.insert(TxState::NO_PARKED_ANCESTORS); + + // Update the first transaction of this sender. + Self::update_base_fee(&pending_block_base_fee, tx); + // Track if the transaction's sub-pool changed. + Self::record_subpool_update(&mut updates, tx); + + // Track blocking transactions. + let mut has_parked_ancestor = !tx.state.is_pending(); + + // Update all consecutive transaction of this sender + while let Some((peek, ref mut tx)) = iter.peek_mut() { + if peek.sender != id.sender { + // Found the next sender + continue 'unique_sender + } + + if tx.state.has_nonce_gap() { + next_sender!(iter); + continue 'unique_sender + } + + // Update ancestor condition. + if has_parked_ancestor { + tx.state.remove(TxState::NO_PARKED_ANCESTORS); + } else { + tx.state.insert(TxState::NO_PARKED_ANCESTORS); + } + has_parked_ancestor = !tx.state.is_pending(); + + // Update and record sub-pool changes. + Self::update_base_fee(&pending_block_base_fee, tx); + Self::record_subpool_update(&mut updates, tx); + + // Advance iterator + iter.next(); + } + } + + updates + } + + /// This will update the transaction's `subpool` based on its state. + /// + /// If the sub-pool derived from the state differs from the current pool, it will record a + /// `PoolUpdate` for this transaction to move it to the new sub-pool. + fn record_subpool_update(updates: &mut Vec, tx: &mut PoolInternalTransaction) { + let current_pool = tx.subpool; + tx.subpool = tx.state.into(); + if current_pool != tx.subpool { + updates.push(PoolUpdate { + id: *tx.transaction.id(), + hash: *tx.transaction.hash(), + current: current_pool, + destination: Destination::Pool(tx.subpool), + }) + } + } + + /// Rechecks the transaction's dynamic fee condition. + fn update_base_fee(pending_block_base_fee: &U256, tx: &mut PoolInternalTransaction) { + // Recheck dynamic fee condition. + if let Some(fee_cap) = tx.transaction.max_fee_per_gas() { + match fee_cap.cmp(pending_block_base_fee) { + Ordering::Greater | Ordering::Equal => { + tx.state.insert(TxState::ENOUGH_FEE_CAP_BLOCK); + } + Ordering::Less => { + tx.state.remove(TxState::ENOUGH_FEE_CAP_BLOCK); + } + } + } + } + /// Returns an iterator over all transactions for the given sender, starting with the lowest /// nonce #[cfg(test)] @@ -729,11 +884,6 @@ impl AllTransactions { Ok(InsertOk { transaction, move_to: state.into(), state, replaced_tx, updates }) } - /// Rechecks the transaction of the given sender and returns a set of updates. - pub(crate) fn on_mined(&mut self, _sender: &SenderId, _new_balance: U256, _old_balance: U256) { - todo!("ideally we want to process updates in bulk") - } - /// Number of transactions in the entire pool pub(crate) fn len(&self) -> usize { self.txs.len() @@ -767,28 +917,6 @@ impl Default for AllTransactions { } } -/// Where to move an existing transaction. -#[derive(Debug)] -pub(crate) enum Destination { - /// Discard the transaction. - Discard, - /// Move transaction to pool - Pool(SubPool), -} - -/// A change of the transaction's location -/// -/// NOTE: this guarantees that `current` and `destination` differ. -#[derive(Debug)] -pub(crate) struct PoolUpdate { - pub(crate) id: TransactionId, - pub(crate) hash: TxHash, - /// Where the transaction is currently held. - pub(crate) current: SubPool, - /// Where to move the transaction to - pub(crate) destination: Destination, -} - /// Result type for inserting a transaction pub(crate) type InsertResult = Result, InsertErr>; @@ -826,17 +954,17 @@ pub(crate) struct InsertOk { /// determining the current state of the transaction. pub(crate) struct PoolInternalTransaction { /// The actual transaction object. - transaction: Arc>, + pub(crate) transaction: Arc>, /// The `SubPool` that currently contains this transaction. - subpool: SubPool, + pub(crate) subpool: SubPool, /// Keeps track of the current state of the transaction and therefor in which subpool it should /// reside - state: TxState, + pub(crate) state: TxState, /// The total cost all transactions before this transaction. /// /// This is the combined `cost` of all transactions from the same sender that currently /// come before this transaction. - cumulative_cost: U256, + pub(crate) cumulative_cost: U256, } // === impl PoolInternalTransaction === diff --git a/crates/transaction-pool/src/pool/update.rs b/crates/transaction-pool/src/pool/update.rs new file mode 100644 index 000000000..f07972b16 --- /dev/null +++ b/crates/transaction-pool/src/pool/update.rs @@ -0,0 +1,32 @@ +//! Support types for updating the pool. +use crate::{ + identifier::TransactionId, + pool::{state::SubPool, txpool::PoolInternalTransaction}, + PoolTransaction, +}; +use reth_primitives::TxHash; +use std::ops::{Deref, DerefMut}; + +/// A change of the transaction's location +/// +/// NOTE: this guarantees that `current` and `destination` differ. +#[derive(Debug)] +pub(crate) struct PoolUpdate { + /// Internal tx id. + pub(crate) id: TransactionId, + /// Hash of the transaction. + pub(crate) hash: TxHash, + /// Where the transaction is currently held. + pub(crate) current: SubPool, + /// Where to move the transaction to. + pub(crate) destination: Destination, +} + +/// Where to move an existing transaction. +#[derive(Debug)] +pub(crate) enum Destination { + /// Discard the transaction. + Discard, + /// Move transaction to pool + Pool(SubPool), +} diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 100cdafca..090ec4227 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -21,7 +21,7 @@ pub trait TransactionPool: Send + Sync + 'static { /// Implementers need to update the pool accordingly. /// For example the base fee of the pending block is determined after a block is mined which /// affects the dynamic fee requirement of pending transactions in the pool. - async fn on_new_block(&self, event: NewBlockEvent); + fn on_new_block(&self, event: NewBlockEvent); /// Adds an _unvalidated_ transaction into the pool. /// @@ -128,7 +128,7 @@ impl TransactionOrigin { /// Event fired when a new block was mined #[derive(Debug, Clone)] -pub struct NewBlockEvent { +pub struct NewBlockEvent { /// Hash of the added block. pub hash: H256, /// EIP-1559 Base fee of the _next_ (pending) block @@ -136,8 +136,15 @@ pub struct NewBlockEvent { /// The base fee of a block depends on the utilization of the last block and its base fee. pub pending_block_base_fee: U256, /// Provides a set of state changes that affected the accounts. - // TODO based on the account changes, we can recheck balance - pub state_changes: (), + pub state_changes: StateDiff, + /// All mined transactions in the block + pub mined_transactions: Vec>>, +} + +/// Contains a list of changed state +#[derive(Debug, Clone)] +pub struct StateDiff { + // TODO(mattsse) this could be an `Arc>` } /// An `Iterator` that only returns transactions that are ready to be executed.