diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index d3cb2d53c..dd10670ce 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -82,7 +82,9 @@ pub use crate::{ traits::{BestTransactions, NewBlockEvent, PoolTransaction, TransactionPool}, validate::{TransactionValidationOutcome, TransactionValidator}, }; -use crate::{error::PoolResult, pool::PoolInner, validate::ValidPoolTransaction}; +use crate::{ + error::PoolResult, pool::PoolInner, traits::NewTransactionEvent, validate::ValidPoolTransaction, +}; use futures::channel::mpsc::Receiver; use reth_primitives::{BlockID, TxHash, U256, U64}; use std::{collections::HashMap, sync::Arc}; @@ -117,13 +119,12 @@ where Self { pool: Arc::new(PoolInner::new(client, ordering, config)) } } - /// Returns the wrapped pool + /// Returns the wrapped pool. pub(crate) fn inner(&self) -> &PoolInner { &self.pool } - /// Returns future that validates all transaction in the given iterator at the block the - /// `block_id` points to. + /// Returns future that validates all transaction in the given iterator. async fn validate_all( &self, transactions: impl IntoIterator, @@ -137,18 +138,17 @@ where Ok(outcome) } - /// Validates the given transaction at the given block + /// Validates the given transaction async fn validate( &self, transaction: P::Transaction, ) -> (TxHash, TransactionValidationOutcome) { - let _hash = *transaction.hash(); - // TODO this is where additional validate checks would go, like banned senders etc... - let _res = self.pool.client().validate_transaction(transaction).await; + let hash = *transaction.hash(); + // TODO(mattsse): this is where additional validate checks would go, like banned senders + // etc... + let outcome = self.pool.client().validate_transaction(transaction).await; - // TODO blockstamp the transaction - - todo!() + (hash, outcome) } /// Number of transactions in the entire pool @@ -190,8 +190,12 @@ where Ok(transactions) } - fn ready_transactions_listener(&self) -> Receiver { - self.pool.add_ready_listener() + fn pending_transactions_listener(&self) -> Receiver { + self.pool.add_pending_listener() + } + + fn transactions_listener(&self) -> Receiver> { + self.pool.add_transaction_listener() } fn best_transactions( diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 8c60f1425..e465d82de 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -64,12 +64,15 @@ //! category (2.) and become pending. use crate::{ - error::PoolResult, pool::listener::PoolEventListener, traits::PoolTransaction, - validate::ValidPoolTransaction, PoolClient, PoolConfig, TransactionOrdering, - TransactionValidator, U256, + error::PoolResult, + identifier::{SenderId, SenderIdentifiers, TransactionId}, + pool::{listener::PoolEventListener, state::SubPool, txpool::TxPool}, + traits::{NewTransactionEvent, PoolTransaction}, + validate::{TransactionValidationOutcome, ValidPoolTransaction}, + PoolClient, PoolConfig, TransactionOrdering, TransactionValidator, U256, }; - use best::BestTransactions; +pub use events::TransactionEvent; use futures::channel::mpsc::{channel, Receiver, Sender}; use parking_lot::{Mutex, RwLock}; use reth_primitives::{Address, TxHash}; @@ -85,13 +88,6 @@ pub(crate) mod state; mod transaction; pub mod txpool; -use crate::{ - identifier::{SenderId, SenderIdentifiers, TransactionId}, - pool::txpool::TxPool, - validate::TransactionValidationOutcome, -}; -pub use events::TransactionEvent; - /// Transaction pool internals. pub struct PoolInner { /// Internal mapping of addresses to plain ints. @@ -105,7 +101,9 @@ pub struct PoolInner { /// Manages listeners for transaction state change events. event_listener: RwLock>, /// Listeners for new ready transactions. - ready_transaction_listener: Mutex>>, + pending_transaction_listener: Mutex>>, + /// Listeners for new transactions added to the pool. + transaction_listener: Mutex>>>, } // === impl PoolInner === @@ -123,7 +121,8 @@ where config, event_listener: Default::default(), pool: RwLock::new(TxPool::new(ordering)), - ready_transaction_listener: Default::default(), + pending_transaction_listener: Default::default(), + transaction_listener: Default::default(), } } @@ -142,12 +141,20 @@ where &self.client } - /// Adds a new transaction listener to the pool that gets notified about every new ready + /// Adds a new transaction listener to the pool that gets notified about every new _ready_ /// transaction - pub fn add_ready_listener(&self) -> Receiver { + pub fn add_pending_listener(&self) -> Receiver { const TX_LISTENER_BUFFER_SIZE: usize = 2048; let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE); - self.ready_transaction_listener.lock().push(tx); + self.pending_transaction_listener.lock().push(tx); + rx + } + + /// Adds a new transaction listener to the pool that gets notified about every new transaction + pub fn add_transaction_listener(&self) -> Receiver> { + const TX_LISTENER_BUFFER_SIZE: usize = 1024; + let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE); + self.transaction_listener.lock().push(tx); rx } @@ -176,14 +183,20 @@ where }; let added = self.pool.write().add_transaction(tx, balance, state_nonce)?; + let hash = *added.hash(); + // Notify about new pending transactions if let Some(pending_hash) = added.as_pending() { self.on_new_pending_transaction(pending_hash); } + // Notify tx event listeners self.notify_event_listeners(&added); - Ok(*added.hash()) + // Notify listeners for _all_ transactions + self.on_new_transaction(added.into_new_transaction_event()); + + Ok(hash) } TransactionValidationOutcome::Invalid(_tx, err) => { // TODO notify listeners about invalid @@ -202,9 +215,9 @@ where transactions.into_iter().map(|tx| self.add_transaction(tx)).collect::>() } - /// Notify all listeners about the new transaction. + /// Notify all listeners about a new pending transaction. fn on_new_pending_transaction(&self, ready: &TxHash) { - let mut transaction_listeners = self.ready_transaction_listener.lock(); + let mut transaction_listeners = self.pending_transaction_listener.lock(); transaction_listeners.retain_mut(|listener| match listener.try_send(*ready) { Ok(()) => true, Err(e) => { @@ -222,17 +235,37 @@ where }); } + /// Notify all listeners about a new pending transaction. + fn on_new_transaction(&self, event: NewTransactionEvent) { + let mut transaction_listeners = self.transaction_listener.lock(); + + transaction_listeners.retain_mut(|listener| match listener.try_send(event.clone()) { + Ok(()) => true, + Err(e) => { + if e.is_full() { + warn!( + target: "txpool", + "dropping full transaction listener", + ); + true + } else { + false + } + } + }); + } + /// Fire events for the newly added transaction. fn notify_event_listeners(&self, tx: &AddedTransaction) { let mut listener = self.event_listener.write(); match tx { AddedTransaction::Pending(tx) => { - listener.ready(&tx.hash, None); + listener.ready(tx.transaction.hash(), None); // TODO more listeners for discarded, removed etc... } - AddedTransaction::Parked { hash } => { - listener.queued(hash); + AddedTransaction::Parked { transaction, .. } => { + listener.queued(transaction.hash()); } } } @@ -264,8 +297,8 @@ where /// Tracks an added transaction and all graph changes caused by adding it. #[derive(Debug, Clone)] pub struct AddedPendingTransaction { - /// the hash of the submitted transaction - hash: TxHash, + /// Inserted transaction. + transaction: Arc>, /// transactions promoted to the ready queue promoted: Vec, /// transaction that failed and became discarded @@ -276,9 +309,9 @@ pub struct AddedPendingTransaction { impl AddedPendingTransaction { /// Create a new, empty transaction. - fn new(hash: TxHash) -> Self { + fn new(transaction: Arc>) -> Self { Self { - hash, + transaction, promoted: Default::default(), discarded: Default::default(), removed: Default::default(), @@ -294,26 +327,40 @@ pub enum AddedTransaction { /// Transaction was successfully added but not yet ready for processing and moved to a /// parked pool instead. Parked { - /// Hash of the submitted transaction that is currently parked. - hash: TxHash, + /// Inserted transaction. + transaction: Arc>, + /// The subpool it was moved to. + subpool: SubPool, }, } impl AddedTransaction { /// Returns the hash of the transaction if it's pending - pub fn as_pending(&self) -> Option<&TxHash> { + pub(crate) fn as_pending(&self) -> Option<&TxHash> { if let AddedTransaction::Pending(tx) = self { - Some(&tx.hash) + Some(tx.transaction.hash()) } else { None } } /// Returns the hash of the transaction - pub fn hash(&self) -> &TxHash { + pub(crate) fn hash(&self) -> &TxHash { match self { - AddedTransaction::Pending(tx) => &tx.hash, - AddedTransaction::Parked { hash } => hash, + AddedTransaction::Pending(tx) => tx.transaction.hash(), + AddedTransaction::Parked { transaction, .. } => transaction.hash(), + } + } + + /// Converts this type into the event type for listeners + pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent { + match self { + AddedTransaction::Pending(tx) => { + NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction } + } + AddedTransaction::Parked { transaction, subpool } => { + NewTransactionEvent { transaction, subpool } + } } } } diff --git a/crates/transaction-pool/src/pool/state.rs b/crates/transaction-pool/src/pool/state.rs index 42e6e7891..7171bfd72 100644 --- a/crates/transaction-pool/src/pool/state.rs +++ b/crates/transaction-pool/src/pool/state.rs @@ -32,7 +32,7 @@ bitflags::bitflags! { /// Identifier for the used Sub-pool #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] #[repr(u8)] -pub(crate) enum SubPool { +pub enum SubPool { Queued = 0, Pending, BaseFee, @@ -42,12 +42,12 @@ pub(crate) enum SubPool { impl SubPool { /// Whether this transaction is to be moved to the pending sub-pool. - pub(crate) fn is_pending(&self) -> bool { + pub fn is_pending(&self) -> bool { matches!(self, SubPool::Pending) } /// Returns whether this is a promotion depending on the current sub-pool location. - pub(crate) fn is_promoted(&self, other: SubPool) -> bool { + pub fn is_promoted(&self, other: SubPool) -> bool { self > &other } } diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index c72244544..9fba8da70 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -152,19 +152,19 @@ impl TxPool { match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) { InsertResult::Inserted { transaction, move_to, replaced_tx, updates, .. } => { - self.add_new_transaction(transaction, replaced_tx, move_to); + self.add_new_transaction(transaction.clone(), replaced_tx, move_to); let UpdateOutcome { promoted, discarded, removed } = self.process_updates(updates); // This transaction was moved to the pending pool. let res = if move_to.is_pending() { AddedTransaction::Pending(AddedPendingTransaction { - hash, + transaction, promoted, discarded, removed, }) } else { - AddedTransaction::Parked { hash } + AddedTransaction::Parked { transaction, subpool: move_to } }; Ok(res) diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index 3a1052339..8a680b20c 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -1,5 +1,5 @@ -use crate::{error::PoolResult, validate::ValidPoolTransaction, BlockID}; -use futures::channel::mpsc::Receiver; +use crate::{error::PoolResult, pool::state::SubPool, validate::ValidPoolTransaction, BlockID}; +use futures::{channel::mpsc::Receiver, future::Shared}; use reth_primitives::{Address, TxHash, H256, U256}; use std::{fmt, sync::Arc}; @@ -38,7 +38,10 @@ pub trait TransactionPool: Send + Sync { /// Returns a new Stream that yields transactions hashes for new ready transactions. /// /// Consumer: RPC - fn ready_transactions_listener(&self) -> Receiver; + fn pending_transactions_listener(&self) -> Receiver; + + /// Returns a new stream that yields new valid transactions added to the pool. + fn transactions_listener(&self) -> Receiver>; /// Returns an iterator that yields transactions that are ready for block production. /// @@ -66,6 +69,21 @@ pub trait TransactionPool: Send + Sync { fn get(&self, tx_hash: &TxHash) -> Option>>; } +/// Represents a new transaction +#[derive(Debug)] +pub struct NewTransactionEvent { + /// The pool which the transaction was moved to. + pub subpool: SubPool, + /// Actual transaction + pub transaction: Arc>, +} + +impl Clone for NewTransactionEvent { + fn clone(&self) -> Self { + Self { subpool: self.subpool, transaction: self.transaction.clone() } + } +} + /// Event fired when a new block was mined #[derive(Debug, Clone)] pub struct NewBlockEvent {