diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 4d71d3810..3f3c671e7 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -88,7 +88,7 @@ where } } -/// The actual handler for and accepted [`EthPubSub::subscribe`] call. +/// The actual handler for an accepted [`EthPubSub::subscribe`] call. async fn handle_accepted( pubsub: Arc>, accepted_sink: SubscriptionSink, @@ -267,7 +267,7 @@ impl EthPubSubInner impl Stream { ReceiverStream::new(self.pool.pending_transactions_listener()) } diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 055064fa5..fd595ae04 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -561,18 +561,26 @@ where }) } - /// Notifies transaction listeners about changes after a block was processed. + /// Notifies transaction listeners about changes once a block was processed. fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome) { // notify about promoted pending transactions { - let mut transaction_listeners = self.pending_transaction_listener.lock(); - transaction_listeners.retain_mut(|listener| { + // emit hashes + let mut transaction_hash_listeners = self.pending_transaction_listener.lock(); + transaction_hash_listeners.retain_mut(|listener| { listener.send_all(outcome.pending_transactions(listener.kind)) }); + + // emit full transactions + let mut transaction_full_listeners = self.transaction_listener.lock(); + transaction_full_listeners.retain_mut(|listener| { + listener.send_all(outcome.full_pending_transactions(listener.kind)) + }) } let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome; + // broadcast specific transaction events let mut listener = self.event_listener.write(); mined.iter().for_each(|tx| listener.mined(tx, block_hash)); @@ -918,6 +926,33 @@ where } } +/// An iterator over full pending transactions +pub(crate) struct FullPendingTransactionIter { + kind: TransactionListenerKind, + iter: Iter, +} + +impl<'a, Iter, T> Iterator for FullPendingTransactionIter +where + Iter: Iterator>>, + T: PoolTransaction + 'a, +{ + type Item = NewTransactionEvent; + + fn next(&mut self) -> Option { + loop { + let next = self.iter.next()?; + if self.kind.is_propagate_only() && !next.propagate { + continue + } + return Some(NewTransactionEvent { + subpool: SubPool::Pending, + transaction: next.clone(), + }) + } + } +} + /// Represents a transaction that was added into the pool and its state #[derive(Debug, Clone)] pub enum AddedTransaction { @@ -1011,7 +1046,7 @@ pub(crate) struct OnNewCanonicalStateOutcome { pub(crate) block_hash: B256, /// All mined transactions. pub(crate) mined: Vec, - /// Transactions promoted to the ready queue. + /// Transactions promoted to the pending pool. pub(crate) promoted: Vec>>, /// transaction that were discarded during the update pub(crate) discarded: Vec>>, @@ -1030,4 +1065,17 @@ impl OnNewCanonicalStateOutcome { let iter = self.promoted.iter(); PendingTransactionIter { kind, iter } } + + /// Returns all FULL transactions that were promoted to the pending pool and adhere to the given + /// [TransactionListenerKind]. + /// + /// If the kind is [TransactionListenerKind::PropagateOnly], then only transactions that + /// are allowed to be propagated are returned. + pub(crate) fn full_pending_transactions( + &self, + kind: TransactionListenerKind, + ) -> impl Iterator> + '_ { + let iter = self.promoted.iter(); + FullPendingTransactionIter { kind, iter } + } }