chore: add dedicated send functions (#4903)

This commit is contained in:
Matthias Seitz
2023-10-04 16:16:59 +02:00
committed by GitHub
parent afebb2b20b
commit 5f56e529ab

View File

@ -139,8 +139,8 @@ where
config: PoolConfig,
/// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventBroadcast<T::Transaction>>,
/// Listeners for new pending transactions.
pending_transaction_listener: Mutex<Vec<PendingTransactionListener>>,
/// Listeners for new _full_ pending transactions.
pending_transaction_listener: Mutex<Vec<PendingTransactionHashListener>>,
/// Listeners for new transactions added to the pool.
transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
/// Listener for new blob transaction sidecars added to the pool.
@ -232,7 +232,7 @@ where
/// transaction inserted into the pool
pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
let (sender, rx) = mpsc::channel(PENDING_TX_LISTENER_BUFFER_SIZE);
let listener = PendingTransactionListener { sender, kind };
let listener = PendingTransactionHashListener { sender, kind };
self.pending_transaction_listener.lock().push(listener);
rx
}
@ -520,24 +520,7 @@ where
}
// broadcast all pending transactions to the listener
for tx_hash in pending.pending_transactions(listener.kind) {
match listener.sender.try_send(tx_hash) {
Ok(()) => {}
Err(err) => {
return if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
tx_hash,
);
true
} else {
false
}
}
}
}
true
listener.send_all(pending.pending_transactions(listener.kind))
});
}
@ -551,20 +534,7 @@ where
return !listener.sender.is_closed()
}
match listener.sender.try_send(event.clone()) {
Ok(()) => true,
Err(err) => {
if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"skipping transaction on full transaction listener",
);
true
} else {
false
}
}
}
listener.send(event.clone())
});
}
@ -597,25 +567,7 @@ where
{
let mut transaction_listeners = self.pending_transaction_listener.lock();
transaction_listeners.retain_mut(|listener| {
// broadcast all pending transactions to the listener
for tx_hash in outcome.pending_transactions(listener.kind) {
match listener.sender.try_send(tx_hash) {
Ok(()) => {}
Err(err) => {
return if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
tx_hash,
);
true
} else {
false
}
}
}
}
true
listener.send_all(outcome.pending_transactions(listener.kind))
});
}
@ -825,12 +777,38 @@ impl<V, T: TransactionOrdering, S> fmt::Debug for PoolInner<V, T, S> {
/// An active listener for new pending transactions.
#[derive(Debug)]
struct PendingTransactionListener {
struct PendingTransactionHashListener {
sender: mpsc::Sender<TxHash>,
/// Whether to include transactions that should not be propagated over the network.
kind: TransactionListenerKind,
}
impl PendingTransactionHashListener {
/// Attempts to send all hashes to the listener.
///
/// Returns false if the channel is closed (receiver dropped)
fn send_all(&self, hashes: impl IntoIterator<Item = TxHash>) -> bool {
for tx_hash in hashes.into_iter() {
match self.sender.try_send(tx_hash) {
Ok(()) => {}
Err(err) => {
return if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
tx_hash,
);
true
} else {
false
}
}
}
}
true
}
}
/// An active listener for new pending transactions.
#[derive(Debug)]
struct TransactionListener<T: PoolTransaction> {
@ -839,6 +817,39 @@ struct TransactionListener<T: PoolTransaction> {
kind: TransactionListenerKind,
}
impl<T: PoolTransaction> TransactionListener<T> {
/// Attempts to send the event to the listener.
///
/// Returns false if the channel is closed (receiver dropped)
fn send(&self, event: NewTransactionEvent<T>) -> bool {
self.send_all(std::iter::once(event))
}
/// Attempts to send all events to the listener.
///
/// Returns false if the channel is closed (receiver dropped)
fn send_all(&self, events: impl IntoIterator<Item = NewTransactionEvent<T>>) -> bool {
for event in events.into_iter() {
match self.sender.try_send(event) {
Ok(()) => {}
Err(err) => {
return if let mpsc::error::TrySendError::Full(event) = err {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
event.transaction.hash(),
);
true
} else {
false
}
}
}
}
true
}
}
/// An active listener for new blobs
#[derive(Debug)]
struct BlobTransactionSidecarListener {