Micro optimizations for pending hash fetching (#13220)

This commit is contained in:
Elvis
2024-12-14 13:00:08 +04:00
committed by GitHub
parent b525231224
commit fd2491a3f1
3 changed files with 51 additions and 21 deletions

View File

@ -23,7 +23,7 @@ pub fn tx_fetch_bench(c: &mut Criterion) {
let rt = TokioRuntime::new().unwrap();
let mut group = c.benchmark_group("Transaction Fetch");
group.sample_size(10);
group.sample_size(30);
group.bench_function("fetch_transactions", |b| {
b.to_async(&rt).iter_with_setup(

View File

@ -25,15 +25,21 @@ impl<T: Hash + Eq + fmt::Debug> LruCache<T> {
/// Insert an element into the set.
///
/// If the element is new (did not exist before [`insert`](Self::insert)) was called, then the
/// given length will be enforced and the oldest element will be removed if the limit was
/// exceeded.
/// This operation uses `get_or_insert` from the underlying `schnellru::LruMap` which:
/// - Automatically evicts the least recently used item if capacity is exceeded
///
/// This method is more efficient than [`insert_and_get_evicted`](Self::insert_and_get_evicted)
/// as it performs fewer checks. Use this method when you don't need information about
/// evicted values.
///
/// If the set did not have this value present, true is returned.
/// If the set did have this value present, false is returned.
pub fn insert(&mut self, entry: T) -> bool {
let (new_entry, _evicted_val) = self.insert_and_get_evicted(entry);
new_entry
let mut is_new = false;
self.inner.get_or_insert(entry, || {
is_new = true;
});
is_new
}
/// Same as [`insert`](Self::insert) but returns a tuple, where the second index is the evicted
@ -227,7 +233,7 @@ mod test {
#[test]
fn test_cache_should_remove_oldest_element_when_exceeding_limit() {
let mut cache = LruCache::new(2);
let mut cache = LruCache::new(1); // LruCache limit will be 2, check LruCache::new
let old_entry = "old_entry";
let new_entry = "new_entry";
cache.insert(old_entry);
@ -338,4 +344,28 @@ mod test {
assert_eq!(key_1.other, key.unwrap().other)
}
#[test]
fn test_insert_methods() {
let mut cache = LruCache::new(2);
// Test basic insert
assert!(cache.insert("first")); // new entry
assert!(!cache.insert("first")); // existing entry
assert!(cache.insert("second")); // new entry
// Test insert_and_get_evicted
let (is_new, evicted) = cache.insert_and_get_evicted("third");
assert!(is_new); // should be new entry
assert_eq!(evicted, Some("first")); // should evict
assert!(cache.contains(&"second"));
assert!(cache.contains(&"third"));
assert!(!cache.contains(&"first"));
// Test insert_and_get_evicted with existing entry
let (is_new, evicted) = cache.insert_and_get_evicted("second");
assert!(!is_new); // should not be new
assert_eq!(evicted, None); // should not evict anything
}
}

View File

@ -181,6 +181,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
}
/// Returns `true` if peer is idle with respect to `self.inflight_requests`.
#[inline]
pub fn is_idle(&self, peer_id: &PeerId) -> bool {
let Some(inflight_count) = self.active_peers.peek(peer_id) else { return true };
if *inflight_count < self.info.max_inflight_requests_per_peer {
@ -193,13 +194,13 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
pub fn get_idle_peer_for(
&self,
hash: TxHash,
is_session_active: impl Fn(&PeerId) -> bool,
peers: &HashMap<PeerId, PeerMetadata<N>>,
) -> Option<&PeerId> {
let TxFetchMetadata { fallback_peers, .. } =
self.hashes_fetch_inflight_and_pending_fetch.peek(&hash)?;
for peer_id in fallback_peers.iter() {
if self.is_idle(peer_id) && is_session_active(peer_id) {
if self.is_idle(peer_id) && peers.contains_key(peer_id) {
return Some(peer_id)
}
}
@ -215,7 +216,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
pub fn find_any_idle_fallback_peer_for_any_pending_hash(
&mut self,
hashes_to_request: &mut RequestTxHashes,
is_session_active: impl Fn(&PeerId) -> bool,
peers: &HashMap<PeerId, PeerMetadata<N>>,
mut budget: Option<usize>, // search fallback peers for max `budget` lru pending hashes
) -> Option<PeerId> {
let mut hashes_pending_fetch_iter = self.hashes_pending_fetch.iter();
@ -223,7 +224,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
let idle_peer = loop {
let &hash = hashes_pending_fetch_iter.next()?;
let idle_peer = self.get_idle_peer_for(hash, &is_session_active);
let idle_peer = self.get_idle_peer_for(hash, peers);
if idle_peer.is_some() {
hashes_to_request.insert(hash);
@ -381,8 +382,6 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
/// [`TransactionFetcher::try_buffer_hashes_for_retry`]. Hashes that have been re-requested
/// [`DEFAULT_MAX_RETRIES`], are dropped.
pub fn buffer_hashes(&mut self, hashes: RequestTxHashes, fallback_peer: Option<PeerId>) {
let mut max_retried_and_evicted_hashes = vec![];
for hash in hashes {
// hash could have been evicted from bounded lru map
if self.hashes_fetch_inflight_and_pending_fetch.peek(&hash).is_none() {
@ -406,18 +405,19 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
"retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
);
max_retried_and_evicted_hashes.push(hash);
self.hashes_fetch_inflight_and_pending_fetch.remove(&hash);
self.hashes_pending_fetch.remove(&hash);
continue
}
*retries += 1;
}
if let (_, Some(evicted_hash)) = self.hashes_pending_fetch.insert_and_get_evicted(hash)
{
max_retried_and_evicted_hashes.push(evicted_hash);
self.hashes_fetch_inflight_and_pending_fetch.remove(&evicted_hash);
self.hashes_pending_fetch.remove(&evicted_hash);
}
}
self.remove_hashes_from_transaction_fetcher(max_retried_and_evicted_hashes);
}
/// Tries to request hashes pending fetch.
@ -429,9 +429,9 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
peers: &HashMap<PeerId, PeerMetadata<N>>,
has_capacity_wrt_pending_pool_imports: impl Fn(usize) -> bool,
) {
let mut hashes_to_request = RequestTxHashes::default();
let is_session_active = |peer_id: &PeerId| peers.contains_key(peer_id);
let mut hashes_to_request = RequestTxHashes::with_capacity(
DEFAULT_MARGINAL_COUNT_HASHES_GET_POOLED_TRANSACTIONS_REQUEST,
);
let mut search_durations = TxFetcherSearchDurations::default();
// budget to look for an idle peer before giving up
@ -442,7 +442,7 @@ impl<N: NetworkPrimitives> TransactionFetcher<N> {
{
let Some(peer_id) = self.find_any_idle_fallback_peer_for_any_pending_hash(
&mut hashes_to_request,
is_session_active,
peers,
budget_find_idle_fallback_peer,
) else {
// no peers are idle or budget is depleted