perf: track last sender's submission id (#6718)

This commit is contained in:
Matthias Seitz
2024-02-21 20:06:08 +01:00
committed by GitHub
parent 619d0bfefd
commit 4a386d5782

View File

@ -1,11 +1,12 @@
use crate::{
identifier::{SenderId, TransactionId},
pool::size::SizeTracker,
PoolTransaction, SubPoolLimit, ValidPoolTransaction, TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER,
PoolTransaction, SubPoolLimit, ValidPoolTransaction,
};
use fnv::FnvHashMap;
use std::{
cmp::Ordering,
collections::{BTreeMap, BTreeSet, BinaryHeap},
collections::{hash_map::Entry, BTreeMap, BTreeSet},
ops::{Bound::Unbounded, Deref},
sync::Arc,
};
@ -31,6 +32,10 @@ pub struct ParkedPool<T: ParkedOrd> {
///
/// The higher, the better.
best: BTreeSet<ParkedPoolTransaction<T>>,
/// Keeps track of the number of transactions and the latest submission id for each sender.
last_sender_transaction: BTreeSet<SubmissionSenderId>,
/// Keeps track of the number of transactions by the sender and the last submission id.
sender_to_last_transaction: FnvHashMap<SenderId, SenderTransactionCount>,
/// Keeps track of the size of this pool.
///
/// See also [`PoolTransaction::size`].
@ -58,12 +63,66 @@ impl<T: ParkedOrd> ParkedPool<T> {
self.size_of += tx.size();
// update or create sender entry
self.add_sender_count(tx.sender_id(), submission_id);
let transaction = ParkedPoolTransaction { submission_id, transaction: tx.into() };
self.by_id.insert(id, transaction.clone());
self.best.insert(transaction);
}
/// Increments the count of transactions for the given sender and updates the tracked submission
/// id.
fn add_sender_count(&mut self, sender: SenderId, submission_id: u64) {
match self.sender_to_last_transaction.entry(sender) {
Entry::Occupied(mut entry) => {
let value = entry.get_mut();
// remove the __currently__ tracked submission id
self.last_sender_transaction
.remove(&SubmissionSenderId::new(sender, value.last_submission_id));
value.count += 1;
value.last_submission_id = submission_id;
}
Entry::Vacant(entry) => {
entry
.insert(SenderTransactionCount { count: 1, last_submission_id: submission_id });
}
}
// insert a new entry
self.last_sender_transaction.insert(SubmissionSenderId::new(sender, submission_id));
}
/// Decrements the count of transactions for the given sender.
///
/// If the count reaches zero, the sender is removed from the map.
///
/// Note: this does not update the tracked submission id for the sender, because we're only
/// interested in the __last__ submission id when truncating the pool.
fn remove_sender_count(&mut self, sender_id: SenderId) {
let removed_sender = match self.sender_to_last_transaction.entry(sender_id) {
Entry::Occupied(mut entry) => {
let value = entry.get_mut();
value.count -= 1;
if value.count == 0 {
entry.remove()
} else {
return
}
}
Entry::Vacant(_) => {
unreachable!("sender count not found {:?}", sender_id);
}
};
// all transactions for this sender have been removed
assert!(
self.last_sender_transaction
.remove(&SubmissionSenderId::new(sender_id, removed_sender.last_submission_id)),
"last sender transaction not found {:?}",
sender_id
);
}
/// Returns an iterator over all transactions in the pool
pub(crate) fn all(
&self,
@ -79,6 +138,7 @@ impl<T: ParkedOrd> ParkedPool<T> {
// remove from queues
let tx = self.by_id.remove(id)?;
self.best.remove(&tx);
self.remove_sender_count(tx.transaction.sender_id());
// keep track of size
self.size_of -= tx.transaction.size();
@ -95,58 +155,21 @@ impl<T: ParkedOrd> ParkedPool<T> {
.collect()
}
/// Returns sender ids sorted by each sender's last submission id. Senders with older last
/// submission ids are first. Note that _last_ submission ids are the newest submission id for
/// that sender, so this sorts senders by the last time they submitted a transaction in
/// descending order. Senders that have least recently submitted a transaction are first.
///
/// Similar to `Heartbeat` in Geth
pub fn get_senders_by_submission_id(&self) -> Vec<SubmissionSenderId> {
// iterate through by_id, and get the last submission id for each sender
let senders = self
.by_id
.iter()
.fold(
// pre-allocate some capacity for unique senders, targeting 4 slots per sender
Vec::with_capacity(self.by_id.len() / (TXPOOL_MAX_ACCOUNT_SLOTS_PER_SENDER / 4)),
|mut set: Vec<SubmissionSenderId>, (_, tx)| {
if let Some(last) = set.last_mut() {
// sort by last
if last.sender_id == tx.transaction.sender_id() {
if last.submission_id < tx.submission_id {
// update last submission id
last.submission_id = tx.submission_id;
}
} else {
// new entry
set.push(SubmissionSenderId::new(
tx.transaction.sender_id(),
tx.submission_id,
));
}
} else {
// first entry
set.push(SubmissionSenderId::new(
tx.transaction.sender_id(),
tx.submission_id,
));
}
set
},
)
.into_iter()
// sort by submission id
.collect::<BinaryHeap<_>>();
// sort s.t. senders with older submission ids are first
senders.into_sorted_vec()
#[cfg(test)]
pub(crate) fn get_senders_by_submission_id(
&self,
) -> impl Iterator<Item = SubmissionSenderId> + '_ {
self.last_sender_transaction.iter().cloned()
}
/// Truncates the pool by removing transactions, until the given [SubPoolLimit] has been met.
///
/// This is done by first ordering senders by the last time they have submitted a transaction,
/// using [get_senders_by_submission_id](ParkedPool::get_senders_by_submission_id) to determine
/// this ordering.
/// This is done by first ordering senders by the last time they have submitted a transaction
///
/// Uses sender ids sorted by each sender's last submission id. Senders with older last
/// submission ids are first. Note that _last_ submission ids are the newest submission id for
/// that sender, so this sorts senders by the last time they submitted a transaction in
/// descending order. Senders that have least recently submitted a transaction are first.
///
/// Then, for each sender, all transactions for that sender are removed, until the pool limits
/// have been met.
@ -162,11 +185,11 @@ impl<T: ParkedOrd> ParkedPool<T> {
}
let mut removed = Vec::new();
let mut sender_ids = self.get_senders_by_submission_id();
while limit.is_exceeded(self.len(), self.size()) && !sender_ids.is_empty() {
while limit.is_exceeded(self.len(), self.size()) && !self.last_sender_transaction.is_empty()
{
// SAFETY: This will not panic due to `!addresses.is_empty()`
let sender_id = sender_ids.pop().unwrap().sender_id;
let sender_id = self.last_sender_transaction.last().expect("no empty").sender_id;
let list = self.get_txs_by_sender(sender_id);
// Drop transactions from this sender until the pool is under limits
@ -227,6 +250,12 @@ impl<T: ParkedOrd> ParkedPool<T> {
#[cfg(any(test, feature = "test-utils"))]
pub(crate) fn assert_invariants(&self) {
assert_eq!(self.by_id.len(), self.best.len(), "by_id.len() != best.len()");
assert_eq!(
self.last_sender_transaction.len(),
self.sender_to_last_transaction.len(),
"last_sender_transaction.len() != sender_to_last_transaction.len()"
);
}
}
@ -292,11 +321,20 @@ impl<T: ParkedOrd> Default for ParkedPool<T> {
submission_id: 0,
by_id: Default::default(),
best: Default::default(),
last_sender_transaction: Default::default(),
sender_to_last_transaction: Default::default(),
size_of: Default::default(),
}
}
}
/// Keeps track of the number of transactions and the latest submission id for each sender.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
struct SenderTransactionCount {
count: u64,
last_submission_id: u64,
}
/// Represents a transaction in this pool.
#[derive(Debug)]
struct ParkedPoolTransaction<T: ParkedOrd> {
@ -340,7 +378,7 @@ impl<T: ParkedOrd> Ord for ParkedPoolTransaction<T> {
/// Includes a [SenderId] and `submission_id`. This is used to sort senders by their last
/// submission id.
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub struct SubmissionSenderId {
pub(crate) struct SubmissionSenderId {
/// The sender id
pub(crate) sender_id: SenderId,
/// The submission id
@ -672,11 +710,7 @@ mod tests {
}
// get senders by submission id - a4, b3, c3, d1, reversed
let senders = pool
.get_senders_by_submission_id()
.into_iter()
.map(|s| s.sender_id)
.collect::<Vec<_>>();
let senders = pool.get_senders_by_submission_id().map(|s| s.sender_id).collect::<Vec<_>>();
assert_eq!(senders.len(), 4);
let expected_senders = vec![d_sender, c_sender, b_sender, a_sender]
.into_iter()
@ -693,11 +727,7 @@ mod tests {
pool.add_transaction(f.validated_arc(tx));
}
let senders = pool
.get_senders_by_submission_id()
.into_iter()
.map(|s| s.sender_id)
.collect::<Vec<_>>();
let senders = pool.get_senders_by_submission_id().map(|s| s.sender_id).collect::<Vec<_>>();
assert_eq!(senders.len(), 4);
let expected_senders = vec![a_sender, c_sender, b_sender, d_sender]
.into_iter()