diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 061555455..ffd23ded0 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -78,12 +78,12 @@ pub use crate::{ config::PoolConfig, ordering::TransactionOrdering, - traits::{BestTransactions, NewBlockEvent, PoolTransaction, TransactionPool}, + traits::{BestTransactions, OnNewBlockEvent, PoolTransaction, TransactionPool}, validate::{TransactionValidationOutcome, TransactionValidator}, }; use crate::{ error::PoolResult, - pool::PoolInner, + pool::{OnNewBlockOutcome, PoolInner}, traits::{NewTransactionEvent, PoolStatus, TransactionOrigin}, validate::ValidPoolTransaction, }; @@ -178,9 +178,8 @@ where self.pool.status() } - fn on_new_block(&self, _event: NewBlockEvent) { - // TODO perform maintenance: update pool accordingly - todo!() + fn on_new_block(&self, event: OnNewBlockEvent) { + self.pool.on_new_block(event); } async fn add_transaction( diff --git a/crates/transaction-pool/src/pool/events.rs b/crates/transaction-pool/src/pool/events.rs index c7e860c01..7fc258f45 100644 --- a/crates/transaction-pool/src/pool/events.rs +++ b/crates/transaction-pool/src/pool/events.rs @@ -11,14 +11,13 @@ pub enum TransactionEvent { /// Transaction has been added to the queued pool. Queued, /// Transaction has been included in the block belonging to this hash. - Included(H256), + Mined(H256), /// Transaction has been replaced by the transaction belonging to the hash. /// /// E.g. same (sender + nonce) pair Replaced(Hash), /// Transaction was dropped due to configured limits. - Dropped, + Discarded, /// Transaction became invalid indefinitely. Invalid, - // TODO Timedout?, broadcasted(peers) } diff --git a/crates/transaction-pool/src/pool/listener.rs b/crates/transaction-pool/src/pool/listener.rs index 070d6e412..b9d259213 100644 --- a/crates/transaction-pool/src/pool/listener.rs +++ b/crates/transaction-pool/src/pool/listener.rs @@ -2,6 +2,7 @@ use crate::pool::events::TransactionEvent; use futures::channel::mpsc::UnboundedSender; +use reth_primitives::H256; use std::{collections::HashMap, hash}; type EventSink = UnboundedSender>; @@ -44,6 +45,16 @@ impl PoolEventListener { pub(crate) fn queued(&mut self, tx: &Hash) { self.notify_with(tx, |notifier| notifier.queued()); } + + /// Notify listeners about a transaction that was discarded. + pub(crate) fn discarded(&mut self, tx: &Hash) { + self.notify_with(tx, |notifier| notifier.discarded()); + } + + /// Notify listeners that the transaction was mined + pub(crate) fn mined(&mut self, tx: &Hash, block_hash: H256) { + self.notify_with(tx, |notifier| notifier.mined(block_hash)); + } } impl Default for PoolEventListener { @@ -86,4 +97,16 @@ impl PoolEventNotifier { self.notify(TransactionEvent::Replaced(hash)); self.is_done = true; } + + /// Transaction was mined. + fn mined(&mut self, block_hash: H256) { + self.notify(TransactionEvent::Mined(block_hash)); + self.is_done = true; + } + + /// Transaction was replaced with the given transaction + fn discarded(&mut self) { + self.notify(TransactionEvent::Discarded); + self.is_done = true; + } } diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 94378331d..eb6dfda91 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -69,13 +69,13 @@ use crate::{ pool::{listener::PoolEventListener, state::SubPool, txpool::TxPool}, traits::{NewTransactionEvent, PoolStatus, PoolTransaction, TransactionOrigin}, validate::{TransactionValidationOutcome, ValidPoolTransaction}, - PoolConfig, TransactionOrdering, TransactionValidator, U256, + OnNewBlockEvent, PoolConfig, TransactionOrdering, TransactionValidator, U256, }; use best::BestTransactions; pub use events::TransactionEvent; use futures::channel::mpsc::{channel, Receiver, Sender}; use parking_lot::{Mutex, RwLock}; -use reth_primitives::{Address, TxHash}; +use reth_primitives::{Address, TxHash, H256}; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -164,6 +164,12 @@ where rx } + /// Updates the entire pool after a new block was mined. + pub(crate) fn on_new_block(&self, block: OnNewBlockEvent) { + let outcome = self.pool.write().on_new_block(block); + self.notify_on_new_block(outcome); + } + /// Resubmits transactions back into the pool. pub fn resubmit(&self, _transactions: HashMap>) { unimplemented!() @@ -285,14 +291,28 @@ where }); } + /// Notifies transaction listeners about changes after a block was processed. + fn notify_on_new_block(&self, outcome: OnNewBlockOutcome) { + let OnNewBlockOutcome { mined, promoted, discarded, block_hash } = outcome; + + let mut listener = self.event_listener.write(); + + mined.iter().for_each(|tx| listener.mined(tx, block_hash)); + promoted.iter().for_each(|tx| listener.ready(tx, None)); + discarded.iter().for_each(|tx| listener.discarded(tx)); + } + /// Fire events for the newly added transaction. fn notify_event_listeners(&self, tx: &AddedTransaction) { let mut listener = self.event_listener.write(); match tx { AddedTransaction::Pending(tx) => { - listener.ready(tx.transaction.hash(), None); - // TODO more listeners for discarded, removed etc... + let AddedPendingTransaction { transaction, promoted, discarded, .. } = tx; + + listener.ready(transaction.hash(), None); + promoted.iter().for_each(|tx| listener.ready(tx, None)); + discarded.iter().for_each(|tx| listener.discarded(tx)); } AddedTransaction::Parked { transaction, .. } => { listener.queued(transaction.hash()); @@ -409,3 +429,16 @@ impl AddedTransaction { } } } + +/// Contains all state changes after a [`NewBlockEvent`] was processed +#[derive(Debug)] +pub(crate) struct OnNewBlockOutcome { + /// Hash of the block. + pub(crate) block_hash: H256, + /// All mined transactions. + pub(crate) mined: Vec, + /// Transactions promoted to the ready queue. + pub(crate) promoted: Vec, + /// transaction that were discarded during the update + pub(crate) discarded: Vec, +} diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index 165784513..391496c57 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -9,14 +9,14 @@ use crate::{ pending::PendingPool, state::{SubPool, TxState}, update::{Destination, PoolUpdate}, - AddedPendingTransaction, AddedTransaction, + AddedPendingTransaction, AddedTransaction, OnNewBlockOutcome, }, traits::{PoolStatus, StateDiff}, - NewBlockEvent, PoolConfig, PoolResult, PoolTransaction, TransactionOrdering, + OnNewBlockEvent, PoolConfig, PoolResult, PoolTransaction, TransactionOrdering, ValidPoolTransaction, U256, }; use fnv::FnvHashMap; -use reth_primitives::TxHash; +use reth_primitives::{TxHash, H256}; use std::{ cmp::Ordering, collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet}, @@ -152,19 +152,25 @@ impl TxPool { /// /// This removes all mined transactions, updates according to the new base fee and rechecks /// sender allowance. - pub(crate) fn on_new_block(&mut self, block: NewBlockEvent) { + pub(crate) fn on_new_block(&mut self, event: OnNewBlockEvent) -> OnNewBlockOutcome { // Remove all transaction that were included in the block - for mined in &block.mined_transactions { - self.all_transactions.remove_transaction(mined.id()); - self.pending_pool.remove_transaction(mined.id()); + for tx_hash in &event.mined_transactions { + self.remove_transaction_by_hash(tx_hash); } // Apply the state changes to the total set of transactions which triggers sub-pool updates. let updates = - self.all_transactions.update(block.pending_block_base_fee, &block.state_changes); + self.all_transactions.update(event.pending_block_base_fee, &event.state_changes); // Process the sub-pool updates - self.process_updates(updates); + let UpdateOutcome { promoted, discarded, .. } = self.process_updates(updates); + + OnNewBlockOutcome { + block_hash: event.hash, + mined: event.mined_transactions, + promoted, + discarded, + } } /// Adds the transaction into the pool. @@ -273,6 +279,17 @@ impl TxPool { self.remove_from_subpool(pool, tx.id()) } + /// Remove the transaction from the entire pool via its hash. + /// + /// This includes the total set of transaction and the subpool it currently resides in. + fn remove_transaction_by_hash( + &mut self, + tx_hash: &H256, + ) -> Option>> { + let (tx, pool) = self.all_transactions.remove_transaction_by_hash(tx_hash)?; + self.remove_from_subpool(pool, tx.id()) + } + /// Removes the transaction from the given pool. /// /// Caution: this only removes the tx from the sub-pool and not from the pool itself @@ -675,6 +692,18 @@ impl AllTransactions { self.txs.range_mut(id..).take_while(|(other, _)| id.sender == other.sender) } + /// Removes a transaction from the set using its hash. + pub(crate) fn remove_transaction_by_hash( + &mut self, + tx_hash: &H256, + ) -> Option<(Arc>, SubPool)> { + let tx = self.by_hash.remove(tx_hash)?; + let internal = self.txs.remove(&tx.transaction_id)?; + // decrement the counter for the sender. + self.tx_decr(tx.sender_id()); + Some((tx, internal.subpool)) + } + /// Removes a transaction from the set. /// /// This will _not_ trigger additional updates, because descendants without nonce gaps are diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 090ec4227..fb7064dd3 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -21,7 +21,7 @@ pub trait TransactionPool: Send + Sync + 'static { /// Implementers need to update the pool accordingly. /// For example the base fee of the pending block is determined after a block is mined which /// affects the dynamic fee requirement of pending transactions in the pool. - fn on_new_block(&self, event: NewBlockEvent); + fn on_new_block(&self, event: OnNewBlockEvent); /// Adds an _unvalidated_ transaction into the pool. /// @@ -128,7 +128,7 @@ impl TransactionOrigin { /// Event fired when a new block was mined #[derive(Debug, Clone)] -pub struct NewBlockEvent { +pub struct OnNewBlockEvent { /// Hash of the added block. pub hash: H256, /// EIP-1559 Base fee of the _next_ (pending) block @@ -138,7 +138,7 @@ pub struct NewBlockEvent { /// Provides a set of state changes that affected the accounts. pub state_changes: StateDiff, /// All mined transactions in the block - pub mined_transactions: Vec>>, + pub mined_transactions: Vec, } /// Contains a list of changed state