fix(txpool): emit replaced events (#3642)

This commit is contained in:
Roman Krasiuk
2023-07-06 21:55:37 +03:00
committed by GitHub
parent 5d904eba1e
commit fdc8a05320
3 changed files with 21 additions and 7 deletions

View File

@ -116,10 +116,15 @@ impl PoolEventBroadcast {
if let Some(replaced) = replaced { if let Some(replaced) = replaced {
// notify listeners that this transaction was replaced // notify listeners that this transaction was replaced
self.broadcast_event(replaced, TransactionEvent::Replaced(*tx)); self.replaced(replaced, tx);
} }
} }
/// Notify listeners about a transaction that was replaced.
pub(crate) fn replaced(&mut self, tx: &TxHash, replaced_by: &TxHash) {
self.broadcast_event(tx, TransactionEvent::Replaced(*replaced_by));
}
/// Notify listeners about a transaction that was added to the queued pool. /// Notify listeners about a transaction that was added to the queued pool.
pub(crate) fn queued(&mut self, tx: &TxHash) { pub(crate) fn queued(&mut self, tx: &TxHash) {
self.broadcast_event(tx, TransactionEvent::Queued); self.broadcast_event(tx, TransactionEvent::Queued);

View File

@ -415,14 +415,17 @@ where
match tx { match tx {
AddedTransaction::Pending(tx) => { AddedTransaction::Pending(tx) => {
let AddedPendingTransaction { transaction, promoted, discarded, .. } = tx; let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
listener.pending(transaction.hash(), None); listener.pending(transaction.hash(), replaced.as_ref().map(|tx| tx.hash()));
promoted.iter().for_each(|tx| listener.pending(tx, None)); promoted.iter().for_each(|tx| listener.pending(tx, None));
discarded.iter().for_each(|tx| listener.discarded(tx)); discarded.iter().for_each(|tx| listener.discarded(tx));
} }
AddedTransaction::Parked { transaction, .. } => { AddedTransaction::Parked { transaction, replaced, .. } => {
listener.queued(transaction.hash()); listener.queued(transaction.hash());
if let Some(replaced) = replaced {
listener.replaced(replaced.hash(), transaction.hash());
}
} }
} }
} }
@ -532,6 +535,8 @@ impl<V: TransactionValidator, T: TransactionOrdering> fmt::Debug for PoolInner<V
pub struct AddedPendingTransaction<T: PoolTransaction> { pub struct AddedPendingTransaction<T: PoolTransaction> {
/// Inserted transaction. /// Inserted transaction.
transaction: Arc<ValidPoolTransaction<T>>, transaction: Arc<ValidPoolTransaction<T>>,
/// Replaced transaction.
replaced: Option<Arc<ValidPoolTransaction<T>>>,
/// transactions promoted to the ready queue /// transactions promoted to the ready queue
promoted: Vec<TxHash>, promoted: Vec<TxHash>,
/// transaction that failed and became discarded /// transaction that failed and became discarded
@ -548,6 +553,8 @@ pub enum AddedTransaction<T: PoolTransaction> {
Parked { Parked {
/// Inserted transaction. /// Inserted transaction.
transaction: Arc<ValidPoolTransaction<T>>, transaction: Arc<ValidPoolTransaction<T>>,
/// Replaced transaction.
replaced: Option<Arc<ValidPoolTransaction<T>>>,
/// The subpool it was moved to. /// The subpool it was moved to.
subpool: SubPool, subpool: SubPool,
}, },
@ -577,7 +584,7 @@ impl<T: PoolTransaction> AddedTransaction<T> {
AddedTransaction::Pending(tx) => { AddedTransaction::Pending(tx) => {
NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction } NewTransactionEvent { subpool: SubPool::Pending, transaction: tx.transaction }
} }
AddedTransaction::Parked { transaction, subpool } => { AddedTransaction::Parked { transaction, subpool, .. } => {
NewTransactionEvent { transaction, subpool } NewTransactionEvent { transaction, subpool }
} }
} }

View File

@ -314,20 +314,22 @@ impl<T: TransactionOrdering> TxPool<T> {
match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) { match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) {
Ok(InsertOk { transaction, move_to, replaced_tx, updates, .. }) => { Ok(InsertOk { transaction, move_to, replaced_tx, updates, .. }) => {
self.add_new_transaction(transaction.clone(), replaced_tx, move_to); self.add_new_transaction(transaction.clone(), replaced_tx.clone(), move_to);
// Update inserted transactions metric // Update inserted transactions metric
self.metrics.inserted_transactions.increment(1); self.metrics.inserted_transactions.increment(1);
let UpdateOutcome { promoted, discarded } = self.process_updates(updates); let UpdateOutcome { promoted, discarded } = self.process_updates(updates);
// This transaction was moved to the pending pool. // This transaction was moved to the pending pool.
let replaced = replaced_tx.map(|(tx, _)| tx);
let res = if move_to.is_pending() { let res = if move_to.is_pending() {
AddedTransaction::Pending(AddedPendingTransaction { AddedTransaction::Pending(AddedPendingTransaction {
transaction, transaction,
promoted, promoted,
discarded, discarded,
replaced,
}) })
} else { } else {
AddedTransaction::Parked { transaction, subpool: move_to } AddedTransaction::Parked { transaction, subpool: move_to, replaced }
}; };
Ok(res) Ok(res)