feat(txpool): add listeners for all transactions (#59)

This commit is contained in:
Matthias Seitz
2022-10-13 19:47:32 +02:00
committed by GitHub
parent 84ec30db5b
commit 55768a534d
5 changed files with 124 additions and 55 deletions

View File

@ -82,7 +82,9 @@ pub use crate::{
traits::{BestTransactions, NewBlockEvent, PoolTransaction, TransactionPool}, traits::{BestTransactions, NewBlockEvent, PoolTransaction, TransactionPool},
validate::{TransactionValidationOutcome, TransactionValidator}, validate::{TransactionValidationOutcome, TransactionValidator},
}; };
use crate::{error::PoolResult, pool::PoolInner, validate::ValidPoolTransaction}; use crate::{
error::PoolResult, pool::PoolInner, traits::NewTransactionEvent, validate::ValidPoolTransaction,
};
use futures::channel::mpsc::Receiver; use futures::channel::mpsc::Receiver;
use reth_primitives::{BlockID, TxHash, U256, U64}; use reth_primitives::{BlockID, TxHash, U256, U64};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
@ -117,13 +119,12 @@ where
Self { pool: Arc::new(PoolInner::new(client, ordering, config)) } Self { pool: Arc::new(PoolInner::new(client, ordering, config)) }
} }
/// Returns the wrapped pool /// Returns the wrapped pool.
pub(crate) fn inner(&self) -> &PoolInner<P, T> { pub(crate) fn inner(&self) -> &PoolInner<P, T> {
&self.pool &self.pool
} }
/// Returns future that validates all transaction in the given iterator at the block the /// Returns future that validates all transaction in the given iterator.
/// `block_id` points to.
async fn validate_all( async fn validate_all(
&self, &self,
transactions: impl IntoIterator<Item = P::Transaction>, transactions: impl IntoIterator<Item = P::Transaction>,
@ -137,18 +138,17 @@ where
Ok(outcome) Ok(outcome)
} }
/// Validates the given transaction at the given block /// Validates the given transaction
async fn validate( async fn validate(
&self, &self,
transaction: P::Transaction, transaction: P::Transaction,
) -> (TxHash, TransactionValidationOutcome<P::Transaction>) { ) -> (TxHash, TransactionValidationOutcome<P::Transaction>) {
let _hash = *transaction.hash(); let hash = *transaction.hash();
// TODO this is where additional validate checks would go, like banned senders etc... // TODO(mattsse): this is where additional validate checks would go, like banned senders
let _res = self.pool.client().validate_transaction(transaction).await; // etc...
let outcome = self.pool.client().validate_transaction(transaction).await;
// TODO blockstamp the transaction (hash, outcome)
todo!()
} }
/// Number of transactions in the entire pool /// Number of transactions in the entire pool
@ -190,8 +190,12 @@ where
Ok(transactions) Ok(transactions)
} }
fn ready_transactions_listener(&self) -> Receiver<TxHash> { fn pending_transactions_listener(&self) -> Receiver<TxHash> {
self.pool.add_ready_listener() self.pool.add_pending_listener()
}
fn transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
self.pool.add_transaction_listener()
} }
fn best_transactions( fn best_transactions(

View File

@ -64,12 +64,15 @@
//! category (2.) and become pending. //! category (2.) and become pending.
use crate::{ use crate::{
error::PoolResult, pool::listener::PoolEventListener, traits::PoolTransaction, error::PoolResult,
validate::ValidPoolTransaction, PoolClient, PoolConfig, TransactionOrdering, identifier::{SenderId, SenderIdentifiers, TransactionId},
TransactionValidator, U256, pool::{listener::PoolEventListener, state::SubPool, txpool::TxPool},
traits::{NewTransactionEvent, PoolTransaction},
validate::{TransactionValidationOutcome, ValidPoolTransaction},
PoolClient, PoolConfig, TransactionOrdering, TransactionValidator, U256,
}; };
use best::BestTransactions; use best::BestTransactions;
pub use events::TransactionEvent;
use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::channel::mpsc::{channel, Receiver, Sender};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use reth_primitives::{Address, TxHash}; use reth_primitives::{Address, TxHash};
@ -85,13 +88,6 @@ pub(crate) mod state;
mod transaction; mod transaction;
pub mod txpool; pub mod txpool;
use crate::{
identifier::{SenderId, SenderIdentifiers, TransactionId},
pool::txpool::TxPool,
validate::TransactionValidationOutcome,
};
pub use events::TransactionEvent;
/// Transaction pool internals. /// Transaction pool internals.
pub struct PoolInner<P: PoolClient, T: TransactionOrdering> { pub struct PoolInner<P: PoolClient, T: TransactionOrdering> {
/// Internal mapping of addresses to plain ints. /// Internal mapping of addresses to plain ints.
@ -105,7 +101,9 @@ pub struct PoolInner<P: PoolClient, T: TransactionOrdering> {
/// Manages listeners for transaction state change events. /// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventListener<TxHash>>, event_listener: RwLock<PoolEventListener<TxHash>>,
/// Listeners for new ready transactions. /// Listeners for new ready transactions.
ready_transaction_listener: Mutex<Vec<Sender<TxHash>>>, pending_transaction_listener: Mutex<Vec<Sender<TxHash>>>,
/// Listeners for new transactions added to the pool.
transaction_listener: Mutex<Vec<Sender<NewTransactionEvent<T::Transaction>>>>,
} }
// === impl PoolInner === // === impl PoolInner ===
@ -123,7 +121,8 @@ where
config, config,
event_listener: Default::default(), event_listener: Default::default(),
pool: RwLock::new(TxPool::new(ordering)), pool: RwLock::new(TxPool::new(ordering)),
ready_transaction_listener: Default::default(), pending_transaction_listener: Default::default(),
transaction_listener: Default::default(),
} }
} }
@ -142,12 +141,20 @@ where
&self.client &self.client
} }
/// Adds a new transaction listener to the pool that gets notified about every new ready /// Adds a new transaction listener to the pool that gets notified about every new _ready_
/// transaction /// transaction
pub fn add_ready_listener(&self) -> Receiver<TxHash> { pub fn add_pending_listener(&self) -> Receiver<TxHash> {
const TX_LISTENER_BUFFER_SIZE: usize = 2048; const TX_LISTENER_BUFFER_SIZE: usize = 2048;
let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE); let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
self.ready_transaction_listener.lock().push(tx); self.pending_transaction_listener.lock().push(tx);
rx
}
/// Adds a new transaction listener to the pool that gets notified about every new transaction
pub fn add_transaction_listener(&self) -> Receiver<NewTransactionEvent<T::Transaction>> {
const TX_LISTENER_BUFFER_SIZE: usize = 1024;
let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
self.transaction_listener.lock().push(tx);
rx rx
} }
@ -176,14 +183,20 @@ where
}; };
let added = self.pool.write().add_transaction(tx, balance, state_nonce)?; let added = self.pool.write().add_transaction(tx, balance, state_nonce)?;
let hash = *added.hash();
// Notify about new pending transactions
if let Some(pending_hash) = added.as_pending() { if let Some(pending_hash) = added.as_pending() {
self.on_new_pending_transaction(pending_hash); self.on_new_pending_transaction(pending_hash);
} }
// Notify tx event listeners
self.notify_event_listeners(&added); self.notify_event_listeners(&added);
Ok(*added.hash()) // Notify listeners for _all_ transactions
self.on_new_transaction(added.into_new_transaction_event());
Ok(hash)
} }
TransactionValidationOutcome::Invalid(_tx, err) => { TransactionValidationOutcome::Invalid(_tx, err) => {
// TODO notify listeners about invalid // TODO notify listeners about invalid
@ -202,9 +215,9 @@ where
transactions.into_iter().map(|tx| self.add_transaction(tx)).collect::<Vec<_>>() transactions.into_iter().map(|tx| self.add_transaction(tx)).collect::<Vec<_>>()
} }
/// Notify all listeners about the new transaction. /// Notify all listeners about a new pending transaction.
fn on_new_pending_transaction(&self, ready: &TxHash) { fn on_new_pending_transaction(&self, ready: &TxHash) {
let mut transaction_listeners = self.ready_transaction_listener.lock(); let mut transaction_listeners = self.pending_transaction_listener.lock();
transaction_listeners.retain_mut(|listener| match listener.try_send(*ready) { transaction_listeners.retain_mut(|listener| match listener.try_send(*ready) {
Ok(()) => true, Ok(()) => true,
Err(e) => { Err(e) => {
@ -222,17 +235,37 @@ where
}); });
} }
/// Notify all listeners about a new pending transaction.
fn on_new_transaction(&self, event: NewTransactionEvent<T::Transaction>) {
let mut transaction_listeners = self.transaction_listener.lock();
transaction_listeners.retain_mut(|listener| match listener.try_send(event.clone()) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
warn!(
target: "txpool",
"dropping full transaction listener",
);
true
} else {
false
}
}
});
}
/// Fire events for the newly added transaction. /// Fire events for the newly added transaction.
fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) { fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
let mut listener = self.event_listener.write(); let mut listener = self.event_listener.write();
match tx { match tx {
AddedTransaction::Pending(tx) => { AddedTransaction::Pending(tx) => {
listener.ready(&tx.hash, None); listener.ready(tx.transaction.hash(), None);
// TODO more listeners for discarded, removed etc... // TODO more listeners for discarded, removed etc...
} }
AddedTransaction::Parked { hash } => { AddedTransaction::Parked { transaction, .. } => {
listener.queued(hash); listener.queued(transaction.hash());
} }
} }
} }
@ -264,8 +297,8 @@ where
/// Tracks an added transaction and all graph changes caused by adding it. /// Tracks an added transaction and all graph changes caused by adding it.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct AddedPendingTransaction<T: PoolTransaction> { pub struct AddedPendingTransaction<T: PoolTransaction> {
/// the hash of the submitted transaction /// Inserted transaction.
hash: TxHash, transaction: Arc<ValidPoolTransaction<T>>,
/// transactions promoted to the ready queue /// transactions promoted to the ready queue
promoted: Vec<TxHash>, promoted: Vec<TxHash>,
/// transaction that failed and became discarded /// transaction that failed and became discarded
@ -276,9 +309,9 @@ pub struct AddedPendingTransaction<T: PoolTransaction> {
impl<T: PoolTransaction> AddedPendingTransaction<T> { impl<T: PoolTransaction> AddedPendingTransaction<T> {
/// Create a new, empty transaction. /// Create a new, empty transaction.
fn new(hash: TxHash) -> Self { fn new(transaction: Arc<ValidPoolTransaction<T>>) -> Self {
Self { Self {
hash, transaction,
promoted: Default::default(), promoted: Default::default(),
discarded: Default::default(), discarded: Default::default(),
removed: Default::default(), removed: Default::default(),
@ -294,26 +327,40 @@ pub enum AddedTransaction<T: PoolTransaction> {
/// Transaction was successfully added but not yet ready for processing and moved to a /// Transaction was successfully added but not yet ready for processing and moved to a
/// parked pool instead. /// parked pool instead.
Parked { Parked {
/// Hash of the submitted transaction that is currently parked. /// Inserted transaction.
hash: TxHash, transaction: Arc<ValidPoolTransaction<T>>,
/// The subpool it was moved to.
subpool: SubPool,
}, },
} }
impl<T: PoolTransaction> AddedTransaction<T> { impl<T: PoolTransaction> AddedTransaction<T> {
/// Returns the hash of the transaction if it's pending /// Returns the hash of the transaction if it's pending
pub fn as_pending(&self) -> Option<&TxHash> { pub(crate) fn as_pending(&self) -> Option<&TxHash> {
if let AddedTransaction::Pending(tx) = self { if let AddedTransaction::Pending(tx) = self {
Some(&tx.hash) Some(tx.transaction.hash())
} else { } else {
None None
} }
} }
/// Returns the hash of the transaction /// Returns the hash of the transaction
pub fn hash(&self) -> &TxHash { pub(crate) fn hash(&self) -> &TxHash {
match self { match self {
AddedTransaction::Pending(tx) => &tx.hash, AddedTransaction::Pending(tx) => tx.transaction.hash(),
AddedTransaction::Parked { hash } => hash, AddedTransaction::Parked { transaction, .. } => transaction.hash(),
}
}
/// Converts this type into the event type for listeners
pub(crate) fn into_new_transaction_event(self) -> NewTransactionEvent<T> {
match self {
AddedTransaction::Pending(tx) => {
NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
}
AddedTransaction::Parked { transaction, subpool } => {
NewTransactionEvent { transaction, subpool }
}
} }
} }
} }

View File

@ -32,7 +32,7 @@ bitflags::bitflags! {
/// Identifier for the used Sub-pool /// Identifier for the used Sub-pool
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)] #[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)] #[repr(u8)]
pub(crate) enum SubPool { pub enum SubPool {
Queued = 0, Queued = 0,
Pending, Pending,
BaseFee, BaseFee,
@ -42,12 +42,12 @@ pub(crate) enum SubPool {
impl SubPool { impl SubPool {
/// Whether this transaction is to be moved to the pending sub-pool. /// Whether this transaction is to be moved to the pending sub-pool.
pub(crate) fn is_pending(&self) -> bool { pub fn is_pending(&self) -> bool {
matches!(self, SubPool::Pending) matches!(self, SubPool::Pending)
} }
/// Returns whether this is a promotion depending on the current sub-pool location. /// Returns whether this is a promotion depending on the current sub-pool location.
pub(crate) fn is_promoted(&self, other: SubPool) -> bool { pub fn is_promoted(&self, other: SubPool) -> bool {
self > &other self > &other
} }
} }

View File

@ -152,19 +152,19 @@ impl<T: TransactionOrdering> TxPool<T> {
match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) { match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) {
InsertResult::Inserted { transaction, move_to, replaced_tx, updates, .. } => { InsertResult::Inserted { transaction, move_to, replaced_tx, updates, .. } => {
self.add_new_transaction(transaction, replaced_tx, move_to); self.add_new_transaction(transaction.clone(), replaced_tx, move_to);
let UpdateOutcome { promoted, discarded, removed } = self.process_updates(updates); let UpdateOutcome { promoted, discarded, removed } = self.process_updates(updates);
// This transaction was moved to the pending pool. // This transaction was moved to the pending pool.
let res = if move_to.is_pending() { let res = if move_to.is_pending() {
AddedTransaction::Pending(AddedPendingTransaction { AddedTransaction::Pending(AddedPendingTransaction {
hash, transaction,
promoted, promoted,
discarded, discarded,
removed, removed,
}) })
} else { } else {
AddedTransaction::Parked { hash } AddedTransaction::Parked { transaction, subpool: move_to }
}; };
Ok(res) Ok(res)

View File

@ -1,5 +1,5 @@
use crate::{error::PoolResult, validate::ValidPoolTransaction, BlockID}; use crate::{error::PoolResult, pool::state::SubPool, validate::ValidPoolTransaction, BlockID};
use futures::channel::mpsc::Receiver; use futures::{channel::mpsc::Receiver, future::Shared};
use reth_primitives::{Address, TxHash, H256, U256}; use reth_primitives::{Address, TxHash, H256, U256};
use std::{fmt, sync::Arc}; use std::{fmt, sync::Arc};
@ -38,7 +38,10 @@ pub trait TransactionPool: Send + Sync {
/// Returns a new Stream that yields transactions hashes for new ready transactions. /// Returns a new Stream that yields transactions hashes for new ready transactions.
/// ///
/// Consumer: RPC /// Consumer: RPC
fn ready_transactions_listener(&self) -> Receiver<TxHash>; fn pending_transactions_listener(&self) -> Receiver<TxHash>;
/// Returns a new stream that yields new valid transactions added to the pool.
fn transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>>;
/// Returns an iterator that yields transactions that are ready for block production. /// Returns an iterator that yields transactions that are ready for block production.
/// ///
@ -66,6 +69,21 @@ pub trait TransactionPool: Send + Sync {
fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>; fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
} }
/// Represents a new transaction
#[derive(Debug)]
pub struct NewTransactionEvent<T: PoolTransaction> {
/// The pool which the transaction was moved to.
pub subpool: SubPool,
/// Actual transaction
pub transaction: Arc<ValidPoolTransaction<T>>,
}
impl<T: PoolTransaction> Clone for NewTransactionEvent<T> {
fn clone(&self) -> Self {
Self { subpool: self.subpool, transaction: self.transaction.clone() }
}
}
/// Event fired when a new block was mined /// Event fired when a new block was mined
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct NewBlockEvent { pub struct NewBlockEvent {