fix: broadcast promoted transactions (#4248)

This commit is contained in:
Matthias Seitz
2023-08-17 17:19:39 +02:00
committed by GitHub
parent e948928224
commit ca99ee2ec9
2 changed files with 115 additions and 25 deletions

View File

@ -269,11 +269,15 @@ where
last_seen_block_number: number,
pending_basefee: pending_block_base_fee,
};
// update the pool
let outcome = self.pool.write().on_canonical_state_change(
block_info,
mined_transactions,
changed_senders,
);
// notify listeners about updates
self.notify_on_new_state(outcome);
}
@ -285,7 +289,8 @@ where
let UpdateOutcome { promoted, discarded } =
self.pool.write().update_accounts(changed_senders);
let mut listener = self.event_listener.write();
promoted.iter().for_each(|tx| listener.pending(tx, None));
promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
discarded.iter().for_each(|tx| listener.discarded(tx));
}
@ -406,7 +411,7 @@ where
}
// broadcast all pending transactions to the listener
for tx_hash in pending.pending_transactions() {
for tx_hash in pending.pending_transactions(listener.kind) {
match listener.sender.try_send(tx_hash) {
Ok(()) => {}
Err(err) => {
@ -448,13 +453,39 @@ where
}
/// Notifies transaction listeners about changes after a block was processed.
fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome) {
fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
// notify about promoted pending transactions
{
let mut transaction_listeners = self.pending_transaction_listener.lock();
transaction_listeners.retain_mut(|listener| {
// broadcast all pending transactions to the listener
for tx_hash in outcome.pending_transactions(listener.kind) {
match listener.sender.try_send(tx_hash) {
Ok(()) => {}
Err(err) => {
return if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"[{:?}] failed to send pending tx; channel full",
tx_hash,
);
true
} else {
false
}
}
}
}
true
});
}
let OnNewCanonicalStateOutcome { mined, promoted, discarded, block_hash } = outcome;
let mut listener = self.event_listener.write();
mined.iter().for_each(|tx| listener.mined(tx, block_hash));
promoted.iter().for_each(|tx| listener.pending(tx, None));
promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
discarded.iter().for_each(|tx| listener.discarded(tx));
}
@ -467,7 +498,7 @@ where
let AddedPendingTransaction { transaction, promoted, discarded, replaced } = tx;
listener.pending(transaction.hash(), replaced.clone());
promoted.iter().for_each(|tx| listener.pending(tx, None));
promoted.iter().for_each(|tx| listener.pending(tx.hash(), None));
discarded.iter().for_each(|tx| listener.discarded(tx));
}
AddedTransaction::Parked { transaction, replaced, .. } => {
@ -605,15 +636,23 @@ pub struct AddedPendingTransaction<T: PoolTransaction> {
/// Replaced transaction.
replaced: Option<Arc<ValidPoolTransaction<T>>>,
/// transactions promoted to the pending queue
promoted: Vec<H256>,
promoted: Vec<Arc<ValidPoolTransaction<T>>>,
/// transaction that failed and became discarded
discarded: Vec<TxHash>,
}
impl<T: PoolTransaction> AddedPendingTransaction<T> {
/// Returns all transactions that were promoted to the pending pool
pub(crate) fn pending_transactions(&self) -> impl Iterator<Item = H256> + '_ {
std::iter::once(self.transaction.hash()).chain(self.promoted.iter()).copied()
/// Returns all transactions that were promoted to the pending pool and adhere to the given
/// [PendingTransactionListenerKind].
///
/// If the kind is [PendingTransactionListenerKind::PropagateOnly], then only transactions that
/// are allowed to be propagated are returned.
pub(crate) fn pending_transactions(
&self,
kind: PendingTransactionListenerKind,
) -> impl Iterator<Item = H256> + '_ {
let iter = std::iter::once(&self.transaction).chain(self.promoted.iter());
PendingTransactionIter { kind, iter }
}
/// Returns if the transaction should be propagated.
@ -622,6 +661,29 @@ impl<T: PoolTransaction> AddedPendingTransaction<T> {
}
}
pub(crate) struct PendingTransactionIter<Iter> {
kind: PendingTransactionListenerKind,
iter: Iter,
}
impl<'a, Iter, T> Iterator for PendingTransactionIter<Iter>
where
Iter: Iterator<Item = &'a Arc<ValidPoolTransaction<T>>>,
T: PoolTransaction + 'a,
{
type Item = H256;
fn next(&mut self) -> Option<Self::Item> {
loop {
let next = self.iter.next()?;
if self.kind.is_propagate_only() && !next.propagate {
continue
}
return Some(*next.hash())
}
}
}
/// Represents a transaction that was added into the pool and its state
#[derive(Debug, Clone)]
pub enum AddedTransaction<T: PoolTransaction> {
@ -671,13 +733,28 @@ impl<T: PoolTransaction> AddedTransaction<T> {
/// Contains all state changes after a [`CanonicalStateUpdate`] was processed
#[derive(Debug)]
pub(crate) struct OnNewCanonicalStateOutcome {
pub(crate) struct OnNewCanonicalStateOutcome<T: PoolTransaction> {
/// Hash of the block.
pub(crate) block_hash: H256,
/// All mined transactions.
pub(crate) mined: Vec<TxHash>,
/// Transactions promoted to the ready queue.
pub(crate) promoted: Vec<TxHash>,
pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
/// transaction that were discarded during the update
pub(crate) discarded: Vec<TxHash>,
}
impl<T: PoolTransaction> OnNewCanonicalStateOutcome<T> {
/// Returns all transactions that were promoted to the pending pool and adhere to the given
/// [PendingTransactionListenerKind].
///
/// If the kind is [PendingTransactionListenerKind::PropagateOnly], then only transactions that
/// are allowed to be propagated are returned.
pub(crate) fn pending_transactions(
&self,
kind: PendingTransactionListenerKind,
) -> impl Iterator<Item = H256> + '_ {
let iter = self.promoted.iter();
PendingTransactionIter { kind, iter }
}
}

View File

@ -266,7 +266,7 @@ impl<T: TransactionOrdering> TxPool<T> {
pub(crate) fn update_accounts(
&mut self,
changed_senders: HashMap<SenderId, SenderInfo>,
) -> UpdateOutcome {
) -> UpdateOutcome<T::Transaction> {
// track changed accounts
self.sender_info.extend(changed_senders.clone());
// Apply the state changes to the total set of transactions which triggers sub-pool updates.
@ -287,7 +287,7 @@ impl<T: TransactionOrdering> TxPool<T> {
block_info: BlockInfo,
mined_transactions: Vec<TxHash>,
changed_senders: HashMap<SenderId, SenderInfo>,
) -> OnNewCanonicalStateOutcome {
) -> OnNewCanonicalStateOutcome<T::Transaction> {
// update block info
let block_hash = block_info.last_seen_block_hash;
self.all_transactions.set_block_info(block_info);
@ -409,7 +409,7 @@ impl<T: TransactionOrdering> TxPool<T> {
/// Maintenance task to apply a series of updates.
///
/// This will move/discard the given transaction according to the `PoolUpdate`
fn process_updates(&mut self, updates: Vec<PoolUpdate>) -> UpdateOutcome {
fn process_updates(&mut self, updates: Vec<PoolUpdate>) -> UpdateOutcome<T::Transaction> {
let mut outcome = UpdateOutcome::default();
for update in updates {
let PoolUpdate { id, hash, current, destination } = update;
@ -422,9 +422,11 @@ impl<T: TransactionOrdering> TxPool<T> {
}
Destination::Pool(move_to) => {
debug_assert!(!move_to.eq(&current), "destination must be different");
self.move_transaction(current, move_to, &id);
let moved = self.move_transaction(current, move_to, &id);
if matches!(move_to, SubPool::Pending) {
outcome.promoted.push(hash);
if let Some(tx) = moved {
outcome.promoted.push(tx);
}
}
}
}
@ -436,10 +438,15 @@ impl<T: TransactionOrdering> TxPool<T> {
///
/// This will remove the given transaction from one sub-pool and insert it into the other
/// sub-pool.
fn move_transaction(&mut self, from: SubPool, to: SubPool, id: &TransactionId) {
if let Some(tx) = self.remove_from_subpool(from, id) {
self.add_transaction_to_subpool(to, tx);
}
fn move_transaction(
&mut self,
from: SubPool,
to: SubPool,
id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
let tx = self.remove_from_subpool(from, id)?;
self.add_transaction_to_subpool(to, tx.clone());
Some(tx)
}
/// Removes and returns all matching transactions from the pool.
@ -1324,14 +1331,20 @@ impl<T: PoolTransaction> PoolInternalTransaction<T> {
}
/// Tracks the result after updating the pool
#[derive(Default, Debug)]
pub(crate) struct UpdateOutcome {
/// transactions promoted to the ready queue
pub(crate) promoted: Vec<TxHash>,
/// transaction that failed and became discarded
#[derive(Debug)]
pub(crate) struct UpdateOutcome<T: PoolTransaction> {
/// transactions promoted to the pending pool
pub(crate) promoted: Vec<Arc<ValidPoolTransaction<T>>>,
/// transaction that failed and were discarded
pub(crate) discarded: Vec<TxHash>,
}
impl<T: PoolTransaction> Default for UpdateOutcome<T> {
fn default() -> Self {
Self { promoted: vec![], discarded: vec![] }
}
}
/// Represents the outcome of a prune
pub struct PruneResult<T: PoolTransaction> {
/// A list of added transactions that a pruned marker satisfied