wip: feat(txpool): add on block update (#69)

* feat(txpool): add on new block handler

* feat(txpool): work on update routine

* feat(txpool): start update process

* feat: implement update loop
This commit is contained in:
Matthias Seitz
2022-10-20 17:34:04 +02:00
committed by GitHub
parent 483bcdf9ab
commit 2e6b18a154
6 changed files with 218 additions and 43 deletions

View File

@ -178,7 +178,7 @@ where
self.pool.status()
}
async fn on_new_block(&self, _event: NewBlockEvent) {
fn on_new_block(&self, _event: NewBlockEvent<Self::Transaction>) {
// TODO perform maintenance: update pool accordingly
todo!()
}

View File

@ -92,6 +92,7 @@ pub(crate) mod size;
pub(crate) mod state;
mod transaction;
pub mod txpool;
mod update;
/// Transaction pool internals.
pub struct PoolInner<V: TransactionValidator, T: TransactionOrdering> {
@ -141,11 +142,6 @@ where
self.identifiers.write().sender_id_or_create(addr)
}
/// Updates the pool
pub(crate) fn update_base_fee(&self, base_fee: U256) {
self.pool.write().update_base_fee(base_fee);
}
/// Get the validator reference.
pub fn validator(&self) -> &V {
&self.validator

View File

@ -40,6 +40,18 @@ impl TxState {
pub(crate) fn is_pending(&self) -> bool {
*self >= TxState::PENDING_POOL_BITS
}
/// Returns `true` if the `ENOUGH_FEE_CAP_BLOCK` bit is set.
#[inline]
pub(crate) fn has_enough_fee_cap(&self) -> bool {
self.intersects(TxState::ENOUGH_FEE_CAP_BLOCK)
}
/// Returns `true` if the transaction has a nonce gap.
#[inline]
pub(crate) fn has_nonce_gap(&self) -> bool {
!self.intersects(TxState::NO_NONCE_GAPS)
}
}
/// Identifier for the used Sub-pool

View File

@ -8,14 +8,17 @@ use crate::{
parked::{BasefeeOrd, ParkedPool, QueuedOrd},
pending::PendingPool,
state::{SubPool, TxState},
update::{Destination, PoolUpdate},
AddedPendingTransaction, AddedTransaction,
},
traits::PoolStatus,
PoolConfig, PoolResult, PoolTransaction, TransactionOrdering, ValidPoolTransaction, U256,
traits::{PoolStatus, StateDiff},
NewBlockEvent, PoolConfig, PoolResult, PoolTransaction, TransactionOrdering,
ValidPoolTransaction, U256,
};
use fnv::FnvHashMap;
use reth_primitives::TxHash;
use std::{
cmp::Ordering,
collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet},
fmt,
ops::Bound::{Excluded, Unbounded},
@ -145,6 +148,25 @@ impl<T: TransactionOrdering> TxPool<T> {
txs.into_iter().filter_map(|tx| self.get(&tx))
}
/// Updates the entire pool after a new block was mined.
///
/// This removes all mined transactions, updates according to the new base fee and rechecks
/// sender allowance.
pub(crate) fn on_new_block(&mut self, block: NewBlockEvent<T::Transaction>) {
// Remove all transaction that were included in the block
for mined in &block.mined_transactions {
self.all_transactions.remove_transaction(mined.id());
self.pending_pool.remove_transaction(mined.id());
}
// Apply the state changes to the total set of transactions which triggers sub-pool updates.
let updates =
self.all_transactions.update(block.pending_block_base_fee, &block.state_changes);
// Process the sub-pool updates
self.process_updates(updates);
}
/// Adds the transaction into the pool.
///
/// This pool consists of two three-pools: `Queued`, `Pending` and `BaseFee`.
@ -452,6 +474,139 @@ impl<T: PoolTransaction> AllTransactions<T> {
}
}
/// Rechecks all transactions in the pool against the changes.
///
/// Possible changes are:
///
/// For all transactions:
/// - decreased basefee: promotes from `basefee` to `pending` sub-pool.
/// - increased basefee: demotes from `pending` to `basefee` sub-pool.
/// Individually:
/// - decreased sender allowance: demote from (`basefee`|`pending`) to `queued`.
/// - increased sender allowance: promote from `queued` to
/// - `pending` if basefee condition is met.
/// - `basefee` if basefee condition is _not_ met.
///
/// Additionally, this will also update the `cumulative_gas_used` for transactions of a sender
/// that got transaction included in the block.
pub(crate) fn update(
&mut self,
pending_block_base_fee: U256,
state_diffs: &StateDiff,
) -> Vec<PoolUpdate> {
// update new basefee
self.pending_basefee = pending_block_base_fee;
// TODO(mattsse): probably good idea to allocate some capacity here.
let mut updates = Vec::new();
let mut iter = self.txs.iter_mut().peekable();
// Loop over all individual senders and update all affected transactions.
// One sender may have up to `max_account_slots` transactions here, which means, worst case
// `max_accounts_slots` need to be updated, for example if the first transaction is blocked
// due to too low base fee.
// However, we don't have to necessarily check every transaction of a sender. If no updates
// are possible (nonce gap) then we can skip to the next sender.
// The `unique_sender` loop will process the first transaction of all senders, update its
// state and internally update all consecutive transactions
'unique_sender: while let Some((id, tx)) = iter.next() {
// Advances the iterator to the next sender
macro_rules! next_sender {
($iter:ident) => {
'this: while let Some((peek, _)) = iter.peek() {
if peek.sender != id.sender {
break 'this
}
iter.next();
}
};
}
// If there's a nonce gap, we can shortcircuit, because there's nothing to update.
if tx.state.has_nonce_gap() {
next_sender!(iter);
continue
}
// TODO(mattsse): if account has balance changes or mined transactions the balance needs
// to be checked here
// Since this is the first transaction of the sender, it has no parked ancestors
tx.state.insert(TxState::NO_PARKED_ANCESTORS);
// Update the first transaction of this sender.
Self::update_base_fee(&pending_block_base_fee, tx);
// Track if the transaction's sub-pool changed.
Self::record_subpool_update(&mut updates, tx);
// Track blocking transactions.
let mut has_parked_ancestor = !tx.state.is_pending();
// Update all consecutive transaction of this sender
while let Some((peek, ref mut tx)) = iter.peek_mut() {
if peek.sender != id.sender {
// Found the next sender
continue 'unique_sender
}
if tx.state.has_nonce_gap() {
next_sender!(iter);
continue 'unique_sender
}
// Update ancestor condition.
if has_parked_ancestor {
tx.state.remove(TxState::NO_PARKED_ANCESTORS);
} else {
tx.state.insert(TxState::NO_PARKED_ANCESTORS);
}
has_parked_ancestor = !tx.state.is_pending();
// Update and record sub-pool changes.
Self::update_base_fee(&pending_block_base_fee, tx);
Self::record_subpool_update(&mut updates, tx);
// Advance iterator
iter.next();
}
}
updates
}
/// This will update the transaction's `subpool` based on its state.
///
/// If the sub-pool derived from the state differs from the current pool, it will record a
/// `PoolUpdate` for this transaction to move it to the new sub-pool.
fn record_subpool_update(updates: &mut Vec<PoolUpdate>, tx: &mut PoolInternalTransaction<T>) {
let current_pool = tx.subpool;
tx.subpool = tx.state.into();
if current_pool != tx.subpool {
updates.push(PoolUpdate {
id: *tx.transaction.id(),
hash: *tx.transaction.hash(),
current: current_pool,
destination: Destination::Pool(tx.subpool),
})
}
}
/// Rechecks the transaction's dynamic fee condition.
fn update_base_fee(pending_block_base_fee: &U256, tx: &mut PoolInternalTransaction<T>) {
// Recheck dynamic fee condition.
if let Some(fee_cap) = tx.transaction.max_fee_per_gas() {
match fee_cap.cmp(pending_block_base_fee) {
Ordering::Greater | Ordering::Equal => {
tx.state.insert(TxState::ENOUGH_FEE_CAP_BLOCK);
}
Ordering::Less => {
tx.state.remove(TxState::ENOUGH_FEE_CAP_BLOCK);
}
}
}
}
/// Returns an iterator over all transactions for the given sender, starting with the lowest
/// nonce
#[cfg(test)]
@ -729,11 +884,6 @@ impl<T: PoolTransaction> AllTransactions<T> {
Ok(InsertOk { transaction, move_to: state.into(), state, replaced_tx, updates })
}
/// Rechecks the transaction of the given sender and returns a set of updates.
pub(crate) fn on_mined(&mut self, _sender: &SenderId, _new_balance: U256, _old_balance: U256) {
todo!("ideally we want to process updates in bulk")
}
/// Number of transactions in the entire pool
pub(crate) fn len(&self) -> usize {
self.txs.len()
@ -767,28 +917,6 @@ impl<T: PoolTransaction> Default for AllTransactions<T> {
}
}
/// Where to move an existing transaction.
#[derive(Debug)]
pub(crate) enum Destination {
/// Discard the transaction.
Discard,
/// Move transaction to pool
Pool(SubPool),
}
/// A change of the transaction's location
///
/// NOTE: this guarantees that `current` and `destination` differ.
#[derive(Debug)]
pub(crate) struct PoolUpdate {
pub(crate) id: TransactionId,
pub(crate) hash: TxHash,
/// Where the transaction is currently held.
pub(crate) current: SubPool,
/// Where to move the transaction to
pub(crate) destination: Destination,
}
/// Result type for inserting a transaction
pub(crate) type InsertResult<T> = Result<InsertOk<T>, InsertErr<T>>;
@ -826,17 +954,17 @@ pub(crate) struct InsertOk<T: PoolTransaction> {
/// determining the current state of the transaction.
pub(crate) struct PoolInternalTransaction<T: PoolTransaction> {
/// The actual transaction object.
transaction: Arc<ValidPoolTransaction<T>>,
pub(crate) transaction: Arc<ValidPoolTransaction<T>>,
/// The `SubPool` that currently contains this transaction.
subpool: SubPool,
pub(crate) subpool: SubPool,
/// Keeps track of the current state of the transaction and therefor in which subpool it should
/// reside
state: TxState,
pub(crate) state: TxState,
/// The total cost all transactions before this transaction.
///
/// This is the combined `cost` of all transactions from the same sender that currently
/// come before this transaction.
cumulative_cost: U256,
pub(crate) cumulative_cost: U256,
}
// === impl PoolInternalTransaction ===

View File

@ -0,0 +1,32 @@
//! Support types for updating the pool.
use crate::{
identifier::TransactionId,
pool::{state::SubPool, txpool::PoolInternalTransaction},
PoolTransaction,
};
use reth_primitives::TxHash;
use std::ops::{Deref, DerefMut};
/// A change of the transaction's location
///
/// NOTE: this guarantees that `current` and `destination` differ.
#[derive(Debug)]
pub(crate) struct PoolUpdate {
/// Internal tx id.
pub(crate) id: TransactionId,
/// Hash of the transaction.
pub(crate) hash: TxHash,
/// Where the transaction is currently held.
pub(crate) current: SubPool,
/// Where to move the transaction to.
pub(crate) destination: Destination,
}
/// Where to move an existing transaction.
#[derive(Debug)]
pub(crate) enum Destination {
/// Discard the transaction.
Discard,
/// Move transaction to pool
Pool(SubPool),
}

View File

@ -21,7 +21,7 @@ pub trait TransactionPool: Send + Sync + 'static {
/// Implementers need to update the pool accordingly.
/// For example the base fee of the pending block is determined after a block is mined which
/// affects the dynamic fee requirement of pending transactions in the pool.
async fn on_new_block(&self, event: NewBlockEvent);
fn on_new_block(&self, event: NewBlockEvent<Self::Transaction>);
/// Adds an _unvalidated_ transaction into the pool.
///
@ -128,7 +128,7 @@ impl TransactionOrigin {
/// Event fired when a new block was mined
#[derive(Debug, Clone)]
pub struct NewBlockEvent {
pub struct NewBlockEvent<T: PoolTransaction> {
/// Hash of the added block.
pub hash: H256,
/// EIP-1559 Base fee of the _next_ (pending) block
@ -136,8 +136,15 @@ pub struct NewBlockEvent {
/// The base fee of a block depends on the utilization of the last block and its base fee.
pub pending_block_base_fee: U256,
/// Provides a set of state changes that affected the accounts.
// TODO based on the account changes, we can recheck balance
pub state_changes: (),
pub state_changes: StateDiff,
/// All mined transactions in the block
pub mined_transactions: Vec<Arc<ValidPoolTransaction<T>>>,
}
/// Contains a list of changed state
#[derive(Debug, Clone)]
pub struct StateDiff {
// TODO(mattsse) this could be an `Arc<revm::State>>`
}
/// An `Iterator` that only returns transactions that are ready to be executed.