notify full transaction listeners (4902) (#4978)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
SleepingShell
2023-10-12 11:23:21 +00:00
committed by GitHub
parent d2845afdf7
commit bd7b8f536d
2 changed files with 54 additions and 6 deletions

View File

@ -88,7 +88,7 @@ where
} }
} }
/// The actual handler for and accepted [`EthPubSub::subscribe`] call. /// The actual handler for an accepted [`EthPubSub::subscribe`] call.
async fn handle_accepted<Provider, Pool, Events, Network>( async fn handle_accepted<Provider, Pool, Events, Network>(
pubsub: Arc<EthPubSubInner<Provider, Pool, Events, Network>>, pubsub: Arc<EthPubSubInner<Provider, Pool, Events, Network>>,
accepted_sink: SubscriptionSink, accepted_sink: SubscriptionSink,
@ -267,7 +267,7 @@ impl<Provider, Pool, Events, Network> EthPubSubInner<Provider, Pool, Events, Net
where where
Pool: TransactionPool + 'static, Pool: TransactionPool + 'static,
{ {
/// Returns a stream that yields all transactions emitted by the txpool. /// Returns a stream that yields all transaction hashes emitted by the txpool.
fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> { fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
ReceiverStream::new(self.pool.pending_transactions_listener()) ReceiverStream::new(self.pool.pending_transactions_listener())
} }

View File

@ -561,18 +561,26 @@ where
}) })
} }
/// Notifies transaction listeners about changes after a block was processed. /// Notifies transaction listeners about changes once a block was processed.
fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) { fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
// notify about promoted pending transactions // notify about promoted pending transactions
{ {
let mut transaction_listeners = self.pending_transaction_listener.lock(); // emit hashes
transaction_listeners.retain_mut(|listener| { let mut transaction_hash_listeners = self.pending_transaction_listener.lock();
transaction_hash_listeners.retain_mut(|listener| {
listener.send_all(outcome.pending_transactions(listener.kind)) listener.send_all(outcome.pending_transactions(listener.kind))
}); });
// emit full transactions
let mut transaction_full_listeners = self.transaction_listener.lock();
transaction_full_listeners.retain_mut(|listener| {
listener.send_all(outcome.full_pending_transactions(listener.kind))
})
} }
let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome; let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
// broadcast specific transaction events
let mut listener = self.event_listener.write(); let mut listener = self.event_listener.write();
mined.iter().for_each(|tx| listener.mined(tx, block_hash)); mined.iter().for_each(|tx| listener.mined(tx, block_hash));
@ -918,6 +926,33 @@ where
} }
} }
/// An iterator over full pending transactions
pub(crate) struct FullPendingTransactionIter<Iter> {
kind: TransactionListenerKind,
iter: Iter,
}
impl<'a, Iter, T> Iterator for FullPendingTransactionIter<Iter>
where
Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
T: PoolTransaction + 'a,
{
type Item = NewTransactionEvent<T>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let next = self.iter.next()?;
if self.kind.is_propagate_only() && !next.propagate {
continue
}
return Some(NewTransactionEvent {
subpool: SubPool::Pending,
transaction: next.clone(),
})
}
}
}
/// Represents a transaction that was added into the pool and its state /// Represents a transaction that was added into the pool and its state
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum AddedTransaction<T: PoolTransaction> { pub enum AddedTransaction<T: PoolTransaction> {
@ -1011,7 +1046,7 @@ pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
pub(crate) block_hash: B256, pub(crate) block_hash: B256,
/// All mined transactions. /// All mined transactions.
pub(crate) mined: Vec<TxHash>, pub(crate) mined: Vec<TxHash>,
/// Transactions promoted to the ready queue. /// Transactions promoted to the pending pool.
pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>, pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
/// transaction that were discarded during the update /// transaction that were discarded during the update
pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>, pub(crate) discarded: Vec<Arc<ValidPoolTransaction<T>>>,
@ -1030,4 +1065,17 @@ impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
let iter = self.promoted.iter(); let iter = self.promoted.iter();
PendingTransactionIter { kind, iter } PendingTransactionIter { kind, iter }
} }
/// Returns all FULL transactions that were promoted to the pending pool and adhere to the given
/// [TransactionListenerKind].
///
/// If the kind is [TransactionListenerKind::PropagateOnly], then only transactions that
/// are allowed to be propagated are returned.
pub(crate) fn full_pending_transactions(
&self,
kind: TransactionListenerKind,
) -> impl Iterator<Item = NewTransactionEvent<T>> + '_ {
let iter = self.promoted.iter();
FullPendingTransactionIter { kind, iter }
}
} }