optimise TransactionFetcher (#6012)

Co-authored-by: zerosnacks <95942363+zerosnacks@users.noreply.github.com>
This commit is contained in:
Emilia Hane
2024-01-19 00:37:14 +01:00
committed by GitHub
parent 9300e53927
commit 8349cb3205
8 changed files with 1169 additions and 261 deletions

1
Cargo.lock generated
View File

@ -6288,6 +6288,7 @@ dependencies = [
"fnv",
"futures",
"humantime-serde",
"itertools 0.12.0",
"linked-hash-map",
"linked_hash_set",
"metrics",

View File

@ -144,6 +144,14 @@ impl NewPooledTransactionHashes {
}
}
/// Returns a mutable reference to transaction hashes.
pub fn hashes_mut(&mut self) -> &mut Vec<B256> {
match self {
NewPooledTransactionHashes::Eth66(msg) => &mut msg.0,
NewPooledTransactionHashes::Eth68(msg) => &mut msg.hashes,
}
}
/// Consumes the type and returns all hashes
pub fn into_hashes(self) -> Vec<B256> {
match self {
@ -188,6 +196,15 @@ impl NewPooledTransactionHashes {
NewPooledTransactionHashes::Eth68(msg) => msg.hashes.len(),
}
}
/// Returns an iterator over tx hashes zipped with corresponding eth68 metadata if this is
/// an eth68 message.
pub fn as_eth68(&self) -> Option<&NewPooledTransactionHashes68> {
match self {
NewPooledTransactionHashes::Eth66(_) => None,
NewPooledTransactionHashes::Eth68(msg) => Some(msg),
}
}
}
impl From<NewPooledTransactionHashes> for EthMessage {
@ -265,6 +282,13 @@ pub struct NewPooledTransactionHashes68 {
pub hashes: Vec<B256>,
}
impl NewPooledTransactionHashes68 {
/// Returns an iterator over tx hashes zipped with corresponding metadata.
pub fn metadata_iter(&self) -> impl Iterator<Item = (&B256, (u8, usize))> {
self.hashes.iter().zip(self.types.iter().copied().zip(self.sizes.iter().copied()))
}
}
impl Encodable for NewPooledTransactionHashes68 {
fn encode(&self, out: &mut dyn bytes::BufMut) {
#[derive(RlpEncodable)]

View File

@ -65,6 +65,7 @@ rand.workspace = true
secp256k1 = { workspace = true, features = ["global-context", "rand-std", "recovery"] }
derive_more.workspace = true
schnellru.workspace = true
itertools.workspace = true
enr = { workspace = true, features = ["rust-secp256k1"], optional = true }
tempfile = { workspace = true, optional = true }

View File

@ -53,13 +53,13 @@ impl<T: Hash + Eq> LruCache<T> {
/// Remove the least recently used entry and return it.
///
/// If the `LruCache` is empty this will return None.
/// If the `LruCache` is empty or if the eviction feedback is
/// configured, this will return None.
#[inline]
fn remove_lru(&mut self) -> Option<T> {
self.inner.pop_front()
}
#[allow(dead_code)]
/// Expels the given value. Returns true if the value existed.
pub fn remove(&mut self, value: &T) -> bool {
self.inner.remove(value)
@ -80,14 +80,12 @@ impl<T: Hash + Eq> LruCache<T> {
}
/// Returns number of elements currently in cache.
#[cfg(test)]
#[allow(dead_code)]
pub fn len(&self) -> usize {
self.inner.len()
}
/// Returns `true` if there are currently no elements in the cache.
#[cfg(test)]
#[allow(dead_code)]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
@ -136,7 +134,6 @@ where
K: Hash + PartialEq,
{
/// Returns a new cache with default limiter and hash builder.
#[allow(dead_code)]
pub fn new(max_length: u32) -> Self {
LruMap(schnellru::LruMap::new(ByLength::new(max_length)))
}
@ -147,7 +144,6 @@ where
K: Hash + PartialEq,
{
/// Returns a new cache with [`Unlimited`] limiter and default hash builder.
#[allow(dead_code)]
pub fn new_unlimited() -> Self {
LruMap(schnellru::LruMap::new(Unlimited))
}

View File

@ -160,7 +160,7 @@ impl ActiveSession {
// request was already timed out internally
self.update_request_timeout(req.timestamp, Instant::now());
}
};
}
} else {
// we received a response to a request we never sent
self.on_bad_message();

View File

@ -21,6 +21,7 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tracing::trace;
#[cfg_attr(doc, aquamarine::aquamarine)]

View File

@ -0,0 +1,711 @@
use crate::{
cache::{LruCache, LruMap},
message::PeerRequest,
};
use futures::{stream::FuturesUnordered, Future, FutureExt, Stream, StreamExt};
use itertools::Itertools;
use pin_project::pin_project;
use reth_eth_wire::GetPooledTransactions;
use reth_interfaces::p2p::error::{RequestError, RequestResult};
use reth_primitives::{PeerId, PooledTransactionsElement, TxHash};
use schnellru::{ByLength, Unlimited};
use std::{
num::NonZeroUsize,
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::{mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tracing::{debug, trace};
use super::{Peer, PooledTransactions, MAX_FULL_TRANSACTIONS_PACKET_SIZE};
/// Maximum concurrent [`GetPooledTxRequest`]s to allow per peer.
pub(super) const MAX_CONCURRENT_TX_REQUESTS_PER_PEER: u8 = 1;
/// How many peers we keep track of for each missing transaction.
pub(super) const MAX_ALTERNATIVE_PEERS_PER_TX: u8 =
MAX_REQUEST_RETRIES_PER_TX_HASH + MARGINAL_FALLBACK_PEERS_PER_TX;
/// Marginal on fallback peers. If all fallback peers are idle, at most
/// [`MAX_REQUEST_RETRIES_PER_TX_HASH`] of them can ever be needed.
const MARGINAL_FALLBACK_PEERS_PER_TX: u8 = 1;
/// Maximum request retires per [`TxHash`]. Note, this is reset should the [`TxHash`] re-appear in
/// an announcement after it has been ejected from the hash buffer.
const MAX_REQUEST_RETRIES_PER_TX_HASH: u8 = 2;
/// Maximum concurrent [`GetPooledTxRequest`]s.
const MAX_CONCURRENT_TX_REQUESTS: u32 = 10000;
/// Cache limit of transactions waiting for idle peer to be fetched.
const MAX_CAPACITY_BUFFERED_HASHES: usize = 100 * GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES;
/// Recommended soft limit for the number of hashes in a GetPooledTransactions message (8kb)
///
/// <https://github.com/ethereum/devp2p/blob/master/caps/eth.md#newpooledtransactionhashes-0x08>
const GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES: usize = 256;
/// The type responsible for fetching missing transactions from peers.
///
/// This will keep track of unique transaction hashes that are currently being fetched and submits
/// new requests on announced hashes.
#[derive(Debug)]
#[pin_project]
pub(super) struct TransactionFetcher {
/// All peers to which a request for pooled transactions is currently active. Maps 1-1 to
/// `inflight_requests`.
pub(super) active_peers: LruMap<PeerId, u8, ByLength>,
/// All currently active requests for pooled transactions.
#[pin]
pub(super) inflight_requests: FuturesUnordered<GetPooledTxRequestFut>,
/// Hashes that are awaiting fetch from an idle peer.
pub(super) buffered_hashes: LruCache<TxHash>,
/// Tracks all hashes that are currently being fetched or are buffered, mapping them to
/// request retries and last recently seen fallback peers (max one request try for any peer).
pub(super) unknown_hashes: LruMap<TxHash, (u8, LruCache<PeerId>), Unlimited>,
/// Size metadata for unknown eth68 hashes.
pub(super) eth68_meta: LruMap<TxHash, usize, Unlimited>,
}
// === impl TransactionFetcher ===
impl TransactionFetcher {
/// Removes the specified hashes from inflight tracking.
#[inline]
fn remove_from_unknown_hashes<I>(&mut self, hashes: I)
where
I: IntoIterator<Item = TxHash>,
{
for hash in hashes {
self.unknown_hashes.remove(&hash);
self.eth68_meta.remove(&hash);
}
}
/// Updates peer's activity status upon a resolved [`GetPooledTxRequest`].
fn update_peer_activity(&mut self, resp: &GetPooledTxResponse) {
let GetPooledTxResponse { peer_id, .. } = resp;
debug_assert!(
self.active_peers.get(peer_id).is_some(),
"broken invariant `active-peers` and `inflight-requests`"
);
let remove = || -> bool {
if let Some(inflight_count) = self.active_peers.get(peer_id) {
if *inflight_count <= 1 {
return true
}
*inflight_count -= 1;
}
false
}();
if remove {
self.active_peers.remove(peer_id);
}
}
/// Returns `true` if peer is idle.
pub(super) fn is_idle(&self, peer_id: PeerId) -> bool {
let Some(inflight_count) = self.active_peers.peek(&peer_id) else { return true };
if *inflight_count < MAX_CONCURRENT_TX_REQUESTS_PER_PEER {
return true
}
false
}
/// Returns any idle peer for the given hash. Writes peer IDs of any ended sessions to buffer
/// passed as parameter.
pub(super) fn get_idle_peer_for(
&self,
hash: TxHash,
ended_sessions_buf: &mut Vec<PeerId>,
is_session_active: impl Fn(PeerId) -> bool,
) -> Option<PeerId> {
let (_, peers) = self.unknown_hashes.peek(&hash)?;
for &peer_id in peers.iter() {
if self.is_idle(peer_id) {
if is_session_active(peer_id) {
return Some(peer_id)
} else {
ended_sessions_buf.push(peer_id);
}
}
}
None
}
/// Packages hashes for [`GetPooledTxRequest`] up to limit. Returns left over hashes.
pub(super) fn pack_hashes(&mut self, hashes: &mut Vec<TxHash>, peer_id: PeerId) -> Vec<TxHash> {
let Some(hash) = hashes.first() else { return vec![] };
if self.eth68_meta.get(hash).is_some() {
return self.pack_hashes_eth68(hashes, peer_id)
}
self.pack_hashes_eth66(hashes, peer_id)
}
/// Packages hashes for [`GetPooledTxRequest`] up to limit as defined by protocol version 66.
/// If necessary, takes hashes from buffer for which peer is listed as fallback peer.
///
/// Returns left over hashes.
pub(super) fn pack_hashes_eth66(
&mut self,
hashes: &mut Vec<TxHash>,
peer_id: PeerId,
) -> Vec<TxHash> {
if hashes.len() < GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES {
self.fill_request_for_peer(hashes, peer_id, None);
return vec![]
}
hashes.split_off(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES)
}
/// Evaluates wether or not to include a hash in a `GetPooledTransactions` version eth68
/// request, based on the size of the transaction and the accumulated size of the
/// corresponding `PooledTransactions` response.
///
/// Returns `true` if hash is included in request. If there is still space in the respective
/// response but not enough for the transaction of given hash, `false` is returned.
fn include_eth68_hash(&self, acc_size_response: &mut usize, eth68_hash: TxHash) -> bool {
debug_assert!(
self.eth68_meta.peek(&eth68_hash).is_some(),
"broken invariant `eth68-hash` and `eth68-meta`"
);
if let Some(size) = self.eth68_meta.peek(&eth68_hash) {
let next_acc_size = *acc_size_response + size;
if next_acc_size <= MAX_FULL_TRANSACTIONS_PACKET_SIZE {
// only update accumulated size of tx response if tx will fit in
*acc_size_response = next_acc_size;
return true
}
}
false
}
/// Packages hashes for [`GetPooledTxRequest`] up to limit as defined by protocol version 68.
/// If necessary, takes hashes from buffer for which peer is listed as fallback peer. Returns
/// left over hashes.
///
/// 1. Loops through hashes passed as parameter, calculating the accumulated size of the
/// response that this request would generate if filled with requested hashes.
/// 2.a. All hashes fit in response and there is no more space. Returns empty vector.
/// 2.b. Some hashes didn't fit in and there is no more space. Returns surplus hashes.
/// 2.c. All hashes fit in response and there is still space. Surplus hashes = empty vector.
/// 2.d. Some hashes didn't fit in but there is still space. Surplus hashes != empty vector.
/// 3. Try to fill remaining space with hashes from buffer.
/// 4. Return surplus hashes.
pub(super) fn pack_hashes_eth68(
&mut self,
hashes: &mut Vec<TxHash>,
peer_id: PeerId,
) -> Vec<TxHash> {
let mut acc_size_response = 0;
let mut surplus_hashes = vec![];
hashes.retain(|&hash| match self.include_eth68_hash(&mut acc_size_response, hash) {
true => true,
false => {
trace!(
target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=format!("{hash:#}"),
size=self.eth68_meta.get(&hash).expect("should find size in `eth68-meta`"),
acc_size_response=acc_size_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"no space for hash in `GetPooledTransactions` request to peer"
);
surplus_hashes.push(hash);
false
}
});
// all hashes included in request and there is still space
// todo: compare free space with min tx size
if acc_size_response < MAX_FULL_TRANSACTIONS_PACKET_SIZE {
self.fill_request_for_peer(hashes, peer_id, Some(acc_size_response));
}
surplus_hashes
}
pub(super) fn buffer_hashes_for_retry(&mut self, hashes: impl IntoIterator<Item = TxHash>) {
self.buffer_hashes(hashes, None)
}
/// Buffers hashes. Note: Only peers that haven't yet tried to request the hashes should be
/// passed as `fallback_peer` parameter! Hashes that have been re-requested
/// [`MAX_REQUEST_RETRIES_PER_TX_HASH`], are dropped.
pub(super) fn buffer_hashes(
&mut self,
hashes: impl IntoIterator<Item = TxHash>,
fallback_peer: Option<PeerId>,
) {
let mut max_retried_hashes = vec![];
for hash in hashes {
// todo: enforce by adding new type UnknownTxHash
debug_assert!(
self.unknown_hashes.peek(&hash).is_some(),
"only hashes that are confirmed as unknown should be buffered"
);
let Some((retries, peers)) = self.unknown_hashes.get(&hash) else { return };
if let Some(peer_id) = fallback_peer {
// peer has not yet requested hash
peers.insert(peer_id);
} else {
// peer in caller's context has requested hash and is hence not eligible as
// fallback peer.
if *retries >= MAX_REQUEST_RETRIES_PER_TX_HASH {
debug!(target: "net::tx",
hash=format!("{hash:#}"),
retries=retries,
"retry limit for `GetPooledTransactions` requests reached for hash, dropping hash"
);
max_retried_hashes.push(hash);
continue;
}
*retries += 1;
}
if let (_, Some(evicted_hash)) = self.buffered_hashes.insert_and_get_evicted(hash) {
_ = self.unknown_hashes.remove(&evicted_hash);
_ = self.eth68_meta.remove(&evicted_hash);
}
}
self.remove_from_unknown_hashes(max_retried_hashes);
}
/// Removes the provided transaction hashes from the inflight requests set.
///
/// This is called when we receive full transactions that are currently scheduled for fetching.
#[inline]
pub(super) fn on_received_full_transactions_broadcast(
&mut self,
hashes: impl IntoIterator<Item = TxHash>,
) {
self.remove_from_unknown_hashes(hashes)
}
pub(super) fn filter_unseen_hashes(
&mut self,
new_announced_hashes: &mut Vec<TxHash>,
peer_id: PeerId,
is_session_active: impl Fn(PeerId) -> bool,
) {
// filter out inflight hashes, and register the peer as fallback for all inflight hashes
new_announced_hashes.retain(|hash| {
// occupied entry
if let Some((_retries, ref mut backups)) = self.unknown_hashes.peek_mut(hash) {
// hash has been seen but is not inflight
if self.buffered_hashes.remove(hash) {
return true
}
// hash has been seen and is in flight. store peer as fallback peer.
//
// remove any ended sessions, so that in case of a full cache, alive peers aren't
// removed in favour of lru dead peers
let mut ended_sessions = vec!();
for &peer_id in backups.iter() {
if is_session_active(peer_id) {
ended_sessions.push(peer_id);
}
}
for peer_id in ended_sessions {
backups.remove(&peer_id);
}
backups.insert(peer_id);
return false
}
// vacant entry
trace!(
target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=format!("{hash:#}"),
"new hash seen in announcement by peer"
);
// todo: allow `MAX_ALTERNATIVE_PEERS_PER_TX` to be zero
let limit = NonZeroUsize::new(MAX_ALTERNATIVE_PEERS_PER_TX.into()).expect("MAX_ALTERNATIVE_PEERS_PER_TX should be non-zero");
if self.unknown_hashes.get_or_insert(*hash, ||
(0, LruCache::new(limit))
).is_none() {
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=format!("{hash:#}"),
"failed to cache new announced hash from peer in schnellru::LruMap, dropping hash"
);
return false
}
true
});
}
/// Requests the missing transactions from the announced hashes of the peer. Returns the
/// requested hashes if concurrency limit is reached or if the request fails to send over the
/// channel to the peer's session task.
///
/// This filters all announced hashes that are already in flight, and requests the missing,
/// while marking the given peer as an alternative peer for the hashes that are already in
/// flight.
pub(super) fn request_transactions_from_peer(
&mut self,
new_announced_hashes: Vec<TxHash>,
peer: &Peer,
metrics_increment_egress_peer_channel_full: impl FnOnce(),
) -> Option<Vec<TxHash>> {
let peer_id: PeerId = peer.request_tx.peer_id;
if self.active_peers.len() as u32 >= MAX_CONCURRENT_TX_REQUESTS {
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", new_announced_hashes.iter().format(", ")),
limit=MAX_CONCURRENT_TX_REQUESTS,
"limit for concurrent `GetPooledTransactions` requests reached, dropping request for hashes to peer"
);
return Some(new_announced_hashes)
}
let Some(inflight_count) = self.active_peers.get_or_insert(peer_id, || 0) else {
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", new_announced_hashes.iter().format(", ")),
"failed to cache active peer in schnellru::LruMap, dropping request to peer"
);
return Some(new_announced_hashes)
};
if *inflight_count >= MAX_CONCURRENT_TX_REQUESTS_PER_PEER {
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", new_announced_hashes.iter().format(", ")),
limit=MAX_CONCURRENT_TX_REQUESTS_PER_PEER,
"limit for concurrent `GetPooledTransactions` requests per peer reached"
);
return Some(new_announced_hashes)
}
*inflight_count += 1;
let (response, rx) = oneshot::channel();
let req: PeerRequest = PeerRequest::GetPooledTransactions {
request: GetPooledTransactions(new_announced_hashes.clone()),
response,
};
// try to send the request to the peer
if let Err(err) = peer.request_tx.try_send(req) {
// peer channel is full
match err {
TrySendError::Full(req) | TrySendError::Closed(req) => {
// need to do some cleanup so
let req = req.into_get_pooled_transactions().expect("is get pooled tx");
// we know that the peer is the only entry in the map, so we can remove all
self.remove_from_unknown_hashes(req.0);
}
}
metrics_increment_egress_peer_channel_full();
return Some(new_announced_hashes)
} else {
// remove requested hashes from buffered hashes
debug_assert!(
|| -> bool {
for hash in &new_announced_hashes {
if self.buffered_hashes.contains(hash) {
return false
}
}
true
}(),
"broken invariant `buffered-hashes` and `unknown-hashes`"
);
// stores a new request future for the request
self.inflight_requests.push(GetPooledTxRequestFut::new(
peer_id,
new_announced_hashes,
rx,
))
}
None
}
/// Tries to fill request so that the respective tx response is at its size limit. It does so
/// by taking buffered hashes for which peer is listed as fallback peer. If this is an eth68
/// request, the accumulated size of transactions corresponding to parameter hashes, must also
/// be passed as parameter.
pub(super) fn fill_request_for_peer(
&mut self,
hashes: &mut Vec<TxHash>,
peer_id: PeerId,
mut acc_eth68_size: Option<usize>,
) {
debug_assert!(
acc_eth68_size.is_none() || {
let mut acc_size = 0;
for &hash in hashes.iter() {
_ = self.include_eth68_hash(&mut acc_size, hash);
}
Some(acc_size) == acc_eth68_size
},
"broken invariant `acc-eth68-size` and `hashes`"
);
for hash in self.buffered_hashes.iter() {
// if this request is eth68 txns, check the size metadata
if let Some(acc_size_response) = acc_eth68_size.as_mut() {
if *acc_size_response >= MAX_FULL_TRANSACTIONS_PACKET_SIZE {
trace!(
target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=format!("{hash:#}"),
size=self.eth68_meta.get(hash).expect("should find size in `eth68-meta`"),
acc_size_response=acc_size_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"found buffered hash for peer but can't fit it into request"
);
break
}
if !self.include_eth68_hash(acc_size_response, *hash) {
trace!(
target: "net::tx",
peer_id=format!("{peer_id:#}"),
hash=format!("{hash:#}"),
size=self.eth68_meta.get(hash).expect("should find size in `eth68-meta`"),
acc_size_response=acc_size_response,
MAX_FULL_TRANSACTIONS_PACKET_SIZE=MAX_FULL_TRANSACTIONS_PACKET_SIZE,
"found buffered hash for peer but can't fit it into request"
);
continue
}
// otherwise fill request based on hashes count
} else if hashes.len() >= GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES {
break
}
debug_assert!(
self.unknown_hashes.peek(hash).is_some(),
"broken invariant `buffered-hashes` and `unknown-hashes`"
);
if let Some((_, fallback_peers)) = self.unknown_hashes.get(hash) {
// upgrade this peer from fallback peer
if fallback_peers.remove(&peer_id) {
hashes.push(*hash)
}
}
}
for hash in hashes {
self.buffered_hashes.remove(hash);
}
}
}
impl Stream for TransactionFetcher {
type Item = FetchEvent;
/// Advances all inflight requests and returns the next event.
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
let res = this.inflight_requests.poll_next_unpin(cx);
if let Poll::Ready(Some(response)) = res {
// update peer activity, requests for buffered hashes can only be made to idle
// fallback peers
self.update_peer_activity(&response);
let GetPooledTxResponse { peer_id, mut requested_hashes, result } = response;
return match result {
Ok(Ok(transactions)) => {
// clear received hashes
requested_hashes.retain(|requested_hash| {
if transactions.hashes().any(|hash| hash == requested_hash) {
// hash is now known, stop tracking
self.unknown_hashes.remove(requested_hash);
self.eth68_meta.remove(requested_hash);
return false
}
true
});
// buffer left over hashes
self.buffer_hashes_for_retry(requested_hashes);
Poll::Ready(Some(FetchEvent::TransactionsFetched {
peer_id,
transactions: transactions.0,
}))
}
Ok(Err(req_err)) => {
self.buffer_hashes_for_retry(requested_hashes);
Poll::Ready(Some(FetchEvent::FetchError { peer_id, error: req_err }))
}
Err(_) => {
self.buffer_hashes_for_retry(requested_hashes);
// request channel closed/dropped
Poll::Ready(Some(FetchEvent::FetchError {
peer_id,
error: RequestError::ChannelClosed,
}))
}
}
}
Poll::Pending
}
}
impl Default for TransactionFetcher {
fn default() -> Self {
Self {
active_peers: LruMap::new(MAX_CONCURRENT_TX_REQUESTS),
inflight_requests: Default::default(),
buffered_hashes: LruCache::new(
NonZeroUsize::new(MAX_CAPACITY_BUFFERED_HASHES)
.expect("buffered cache limit should be non-zero"),
),
unknown_hashes: LruMap::new_unlimited(),
eth68_meta: LruMap::new_unlimited(),
}
}
}
/// Represents possible events from fetching transactions.
#[derive(Debug)]
pub(super) enum FetchEvent {
/// Triggered when transactions are successfully fetched.
TransactionsFetched {
/// The ID of the peer from which transactions were fetched.
peer_id: PeerId,
/// The transactions that were fetched, if available.
transactions: Vec<PooledTransactionsElement>,
},
/// Triggered when there is an error in fetching transactions.
FetchError {
/// The ID of the peer from which an attempt to fetch transactions resulted in an error.
peer_id: PeerId,
/// The specific error that occurred while fetching.
error: RequestError,
},
}
/// An inflight request for `PooledTransactions` from a peer
pub(super) struct GetPooledTxRequest {
peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes
requested_hashes: Vec<TxHash>,
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
}
pub(super) struct GetPooledTxResponse {
peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes
requested_hashes: Vec<TxHash>,
result: Result<RequestResult<PooledTransactions>, RecvError>,
}
#[must_use = "futures do nothing unless polled"]
#[pin_project::pin_project]
pub(super) struct GetPooledTxRequestFut {
#[pin]
inner: Option<GetPooledTxRequest>,
}
impl GetPooledTxRequestFut {
#[inline]
fn new(
peer_id: PeerId,
requested_hashes: Vec<TxHash>,
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
) -> Self {
Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
}
}
impl Future for GetPooledTxRequestFut {
type Output = GetPooledTxResponse;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut req = self.as_mut().project().inner.take().expect("polled after completion");
match req.response.poll_unpin(cx) {
Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
peer_id: req.peer_id,
requested_hashes: req.requested_hashes,
result,
}),
Poll::Pending => {
self.project().inner.set(Some(req));
Poll::Pending
}
}
}
}
#[cfg(test)]
mod test {
use reth_primitives::B256;
use crate::transactions::tests::default_cache;
use super::*;
#[test]
fn pack_eth68_request_surplus_hashes() {
reth_tracing::init_test_tracing();
let tx_fetcher = &mut TransactionFetcher::default();
let peer_id = PeerId::new([1; 64]);
let eth68_hashes = [
B256::from_slice(&[1; 32]),
B256::from_slice(&[2; 32]),
B256::from_slice(&[3; 32]),
B256::from_slice(&[4; 32]),
B256::from_slice(&[5; 32]),
B256::from_slice(&[6; 32]),
];
let eth68_hashes_sizes = [
MAX_FULL_TRANSACTIONS_PACKET_SIZE - 4,
MAX_FULL_TRANSACTIONS_PACKET_SIZE, // this one will not fit
2, // this one will fit
3, // but now this one won't
2, /* this one will, no more txns will fit
* after this */
1,
];
// load unseen hashes
for i in 0..6 {
tx_fetcher.unknown_hashes.insert(eth68_hashes[i], (0, default_cache()));
tx_fetcher.eth68_meta.insert(eth68_hashes[i], eth68_hashes_sizes[i]);
}
let mut eth68_hashes_to_request = eth68_hashes.clone().to_vec();
let surplus_eth68_hashes =
tx_fetcher.pack_hashes_eth68(&mut eth68_hashes_to_request, peer_id);
assert_eq!(surplus_eth68_hashes, vec!(eth68_hashes[1], eth68_hashes[3], eth68_hashes[5]));
assert_eq!(
eth68_hashes_to_request,
vec!(eth68_hashes[0], eth68_hashes[2], eth68_hashes[4])
);
}
}

View File

@ -1,4 +1,31 @@
//! Transactions management for the p2p network.
//!
//! `TransactionFetcher` is responsible for rate limiting and retry logic for fetching
//! transactions. Upon receiving an announcement, functionality of the `TransactionFetcher` is
//! used for filtering out hashes 1) for which the tx is already known and 2) unknown but the hash
//! is already seen in a previous announcement. The hashes that remain from an announcement are
//! then packed into a request with respect to the [`EthVersion`] of the announcement. Any hashes
//! that don't fit into the request, are buffered in the `TransactionFetcher`. If on the other
//! hand, space remains, hashes that the peer has previously announced are taken out of buffered
//! hashes to fill the request up. The [`GetPooledTransactions`] request is then sent to the
//! peer's session, this marks the peer as active with respect to
//! `fetcher::MAX_CONCURRENT_TX_REQUESTS_PER_PEER`.
//!
//! When a peer buffers hashes in the `TransactionsManager::on_new_pooled_transaction_hashes`
//! pipeline, it is stored as fallback peer for those hashes. When [`TransactionsManager`] is
//! polled, it checks if any of fallback peer is idle. If so, it packs a request for that peer,
//! filling it from the buffered hashes. It does so until there are no more idle peers or until
//! the hashes buffer is empty.
//!
//! If a [`GetPooledTransactions`] request resolves with an error, the hashes in the request are
//! buffered with respect to `MAX_REQUEST_RETRIES_PER_TX_HASH`. So is the case if the request
//! resolves with partial success, that is some of the requested hashes are not in the response,
//! these are then buffered.
//!
//! Most healthy peers will send the same hashes in their announcements, as RLPx is a gossip
//! protocol. This means it's unlikely, that a valid hash, will be buffered for very long
//! before it's re-tried. Nonetheless, the capacity of the buffered hashes cache must be large
//! enough to buffer many hashes during network failure, to allow for recovery.
use crate::{
cache::LruCache,
@ -7,7 +34,8 @@ use crate::{
metrics::{TransactionsManagerMetrics, NETWORK_POOL_TRANSACTIONS_SCOPE},
NetworkEvents, NetworkHandle,
};
use futures::{stream::FuturesUnordered, Future, FutureExt, StreamExt};
use futures::{stream::FuturesUnordered, Future, StreamExt};
use itertools::Itertools;
use reth_eth_wire::{
EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, PooledTransactions, Transactions,
@ -33,31 +61,27 @@ use std::{
sync::Arc,
task::{Context, Poll},
};
use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::{debug, trace};
mod fetcher;
use fetcher::{FetchEvent, TransactionFetcher};
/// Cache limit of transactions to keep track of for a single peer.
const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10;
/// Soft limit for NewPooledTransactions
const NEW_POOLED_TRANSACTION_HASHES_SOFT_LIMIT: usize = 4096;
/// The target size for the message of full transactions.
/// The target size for the message of full transactions in bytes.
const MAX_FULL_TRANSACTIONS_PACKET_SIZE: usize = 100 * 1024;
/// Recommended soft limit for the number of hashes in a GetPooledTransactions message (8kb)
///
/// <https://github.com/ethereum/devp2p/blob/master/caps/eth.md#newpooledtransactionhashes-0x08>
const GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES: usize = 256;
/// Softlimit for the response size of a GetPooledTransactions message (2MB)
const GET_POOLED_TRANSACTION_SOFT_LIMIT_SIZE: GetPooledTransactionLimit =
GetPooledTransactionLimit::SizeSoftLimit(2 * 1024 * 1024);
/// How many peers we keep track of for each missing transaction.
const MAX_ALTERNATIVE_PEERS_PER_TX: usize = 3;
/// The future for inserting a function into the pool
pub type PoolImportFuture = Pin<Box<dyn Future<Output = PoolResult<TxHash>> + Send + 'static>>;
@ -524,47 +548,202 @@ where
return
}
// get handle to peer's session, if the session is still active
let Some(peer) = self.peers.get_mut(&peer_id) else {
debug!(
peer_id=format!("{peer_id:#}"),
msg=?msg,
"discarding announcement from inactive peer"
);
return
};
// message version decides how hashes are packed
// if this is a eth68 message, store eth68 tx metadata
if let Some(eth68_msg) = msg.as_eth68() {
for (&hash, (_type, size)) in eth68_msg.metadata_iter() {
self.transaction_fetcher.eth68_meta.insert(hash, size);
}
}
// extract hashes payload
let mut hashes = msg.into_hashes();
// keep track of the transactions the peer knows
let mut num_already_seen = 0;
if let Some(peer) = self.peers.get_mut(&peer_id) {
let mut hashes = msg.into_hashes();
// keep track of the transactions the peer knows
for tx in hashes.iter().copied() {
if !peer.transactions.insert(tx) {
num_already_seen += 1;
}
for tx in hashes.iter().copied() {
if !peer.transactions.insert(tx) {
num_already_seen += 1;
}
}
self.pool.retain_unknown(&mut hashes);
// filter out known hashes, those txns have already been successfully fetched
self.pool.retain_unknown(&mut hashes);
if hashes.is_empty() {
// nothing to request
return
}
// filter out already seen unknown hashes, for those hashes add the peer as fallback
let peers = &self.peers;
self.transaction_fetcher
.filter_unseen_hashes(&mut hashes, peer_id, |peer_id| peers.contains_key(&peer_id));
if hashes.is_empty() {
// nothing to request
return
}
if hashes.is_empty() {
// nothing to request
return
}
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", hashes.iter().format(", ")),
"received previously unseen hashes in announcement from peer"
);
// enforce recommended soft limit, however the peer may enforce an arbitrary limit on
// the response (2MB)
hashes.truncate(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES);
// only send request for hashes to idle peer, otherwise buffer hashes storing peer as
// fallback
if !self.transaction_fetcher.is_idle(peer_id) {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", hashes.iter().format(", ")),
"buffering hashes announced by busy peer"
);
// request the missing transactions
let request_sent =
self.transaction_fetcher.request_transactions_from_peer(hashes, peer);
if !request_sent {
self.metrics.egress_peer_channel_full.increment(1);
return
}
self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
return
}
// demand recommended soft limit on response, however the peer may enforce an arbitrary
// limit on the response (2MB)
let surplus_hashes = self.transaction_fetcher.pack_hashes(&mut hashes, peer_id);
if num_already_seen > 0 {
self.metrics.messages_with_already_seen_hashes.increment(1);
trace!(target: "net::tx", num_hashes=%num_already_seen, ?peer_id, client=?peer.client_version, "Peer sent already seen hashes");
}
if !surplus_hashes.is_empty() {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
surplus_hashes=format!("{surplus_hashes:#?}"),
"some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
);
self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
}
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", hashes.iter().format(", ")),
"sending hashes in `GetPooledTransactions` request to peer's session"
);
// request the missing transactions
//
// get handle to peer's session again, at this point we know it exists
let Some(peer) = self.peers.get_mut(&peer_id) else { return };
let metrics = &self.metrics;
if let Some(failed_to_request_hashes) =
self.transaction_fetcher.request_transactions_from_peer(hashes, peer, || {
metrics.egress_peer_channel_full.increment(1)
})
{
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", failed_to_request_hashes.iter().format(", ")),
"sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
);
self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
return
}
if num_already_seen > 0 {
self.metrics.messages_with_already_seen_hashes.increment(1);
trace!(target: "net::tx", num_hashes=%num_already_seen, ?peer_id, client=?peer.client_version, "Peer sent already seen hashes");
self.report_already_seen(peer_id);
}
}
/// Tries to request hashes in buffer.
///
/// If any idle fallback peer exists for any hash in the buffer, that hash is taken out of the
/// buffer and put into a request to that peer. Before sending, the request is filled with
/// additional hashes out of the buffer, for which the peer is listed as fallback, as long as
/// space remains in the respective transactions response.
fn request_buffered_hashes(&mut self) {
loop {
let mut hashes = vec![];
let Some(peer_id) = self.pop_any_idle_peer(&mut hashes) else { return };
debug_assert!(
self.peers.contains_key(&peer_id),
"broken invariant `peers` and `transaction-fetcher`"
);
// fill the request with other buffered hashes that have been announced by the peer
let Some(peer) = self.peers.get(&peer_id) else { return };
let Some(hash) = hashes.first() else { return };
let acc_eth68_size = self.transaction_fetcher.eth68_meta.get(hash).copied();
self.transaction_fetcher.fill_request_for_peer(&mut hashes, peer_id, acc_eth68_size);
trace!(
target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", hashes.iter().format(", ")),
"requesting buffered hashes from idle peer"
);
// request the buffered missing transactions
let metrics = &self.metrics;
if let Some(failed_to_request_hashes) =
self.transaction_fetcher.request_transactions_from_peer(hashes, peer, || {
metrics.egress_peer_channel_full.increment(1)
})
{
debug!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=format!("[{:#}]", failed_to_request_hashes.iter().format(", ")),
"failed sending request to peer's session, buffering hashes"
);
self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
return
}
}
}
/// Returns any idle peer for any buffered unknown hash, and writes that hash to the request's
/// hashes buffer that is passed as parameter.
///
/// Loops through the fallback peers of each buffered hashes, until an idle fallback peer is
/// found. As a side effect, dead fallback peers are filtered out for visited hashes.
fn pop_any_idle_peer(&mut self, hashes: &mut Vec<TxHash>) -> Option<PeerId> {
let mut ended_sessions = vec![];
let mut buffered_hashes_iter = self.transaction_fetcher.buffered_hashes.iter();
let peers = &self.peers;
let idle_peer = loop {
let Some(&hash) = buffered_hashes_iter.next() else { break None };
let idle_peer =
self.transaction_fetcher.get_idle_peer_for(hash, &mut ended_sessions, |peer_id| {
peers.contains_key(&peer_id)
});
for peer_id in ended_sessions.drain(..) {
let (_, peers) = self.transaction_fetcher.unknown_hashes.peek_mut(&hash)?;
_ = peers.remove(&peer_id);
}
if idle_peer.is_some() {
hashes.push(hash);
break idle_peer
}
};
let peer_id = &idle_peer?;
let hash = hashes.first()?;
let (_, peers) = self.transaction_fetcher.unknown_hashes.get(hash)?;
// pop peer from fallback peers
_ = peers.remove(peer_id);
// pop hash that is loaded in request buffer from buffered hashes
drop(buffered_hashes_iter);
_ = self.transaction_fetcher.buffered_hashes.remove(hash);
idle_peer
}
/// Handles dedicated transaction events related to the `eth` protocol.
fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) {
match event {
@ -583,7 +762,7 @@ where
// mark the transactions as received
self.transaction_fetcher.on_received_full_transactions_broadcast(
non_blob_txs.iter().map(|tx| tx.hash()),
non_blob_txs.iter().map(|tx| *tx.hash()),
);
self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
@ -829,7 +1008,7 @@ where
this.update_request_metrics();
// drain fetching transaction events
while let Poll::Ready(fetch_event) = this.transaction_fetcher.poll(cx) {
while let Poll::Ready(Some(fetch_event)) = this.transaction_fetcher.poll_next_unpin(cx) {
match fetch_event {
FetchEvent::TransactionsFetched { peer_id, transactions } => {
this.import_transactions(peer_id, transactions, TransactionSource::Response);
@ -841,6 +1020,9 @@ where
}
}
// try drain buffered transactions
this.request_buffered_hashes();
this.update_request_metrics();
this.update_import_metrics();
@ -992,7 +1174,7 @@ impl PooledTransactionsHashesBuilder {
enum TransactionSource {
/// Transactions were broadcast to us via [`Transactions`] message.
Broadcast,
/// Transactions were sent as the response of [`GetPooledTxRequest`] issued by us.
/// Transactions were sent as the response of [`fetcher::GetPooledTxRequest`] issued by us.
Response,
}
@ -1005,58 +1187,6 @@ impl TransactionSource {
}
}
/// An inflight request for `PooledTransactions` from a peer
struct GetPooledTxRequest {
peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes
requested_hashes: Vec<TxHash>,
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
}
struct GetPooledTxResponse {
peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes
requested_hashes: Vec<TxHash>,
result: Result<RequestResult<PooledTransactions>, RecvError>,
}
#[must_use = "futures do nothing unless polled"]
#[pin_project::pin_project]
struct GetPooledTxRequestFut {
#[pin]
inner: Option<GetPooledTxRequest>,
}
impl GetPooledTxRequestFut {
#[inline]
fn new(
peer_id: PeerId,
requested_hashes: Vec<TxHash>,
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
) -> Self {
Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
}
}
impl Future for GetPooledTxRequestFut {
type Output = GetPooledTxResponse;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut req = self.as_mut().project().inner.take().expect("polled after completion");
match req.response.poll_unpin(cx) {
Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
peer_id: req.peer_id,
requested_hashes: req.requested_hashes,
result,
}),
Poll::Pending => {
self.project().inner.set(Some(req));
Poll::Pending
}
}
}
}
/// Tracks a single peer
#[derive(Debug)]
struct Peer {
@ -1070,166 +1200,6 @@ struct Peer {
client_version: Arc<str>,
}
/// The type responsible for fetching missing transactions from peers.
///
/// This will keep track of unique transaction hashes that are currently being fetched and submits
/// new requests on announced hashes.
#[derive(Debug, Default)]
struct TransactionFetcher {
/// All currently active requests for pooled transactions.
inflight_requests: FuturesUnordered<GetPooledTxRequestFut>,
/// Set that tracks all hashes that are currently being fetched.
inflight_hash_to_fallback_peers: HashMap<TxHash, Vec<PeerId>>,
}
// === impl TransactionFetcher ===
impl TransactionFetcher {
/// Removes the specified hashes from inflight tracking.
#[inline]
fn remove_inflight_hashes<'a, I>(&mut self, hashes: I)
where
I: IntoIterator<Item = &'a TxHash>,
{
for &hash in hashes {
self.inflight_hash_to_fallback_peers.remove(&hash);
}
}
/// Advances all inflight requests and returns the next event.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchEvent> {
if let Poll::Ready(Some(GetPooledTxResponse { peer_id, requested_hashes, result })) =
self.inflight_requests.poll_next_unpin(cx)
{
return match result {
Ok(Ok(transactions)) => {
// clear received hashes
self.remove_inflight_hashes(transactions.hashes());
// TODO: re-request missing hashes, for now clear all of them
self.remove_inflight_hashes(requested_hashes.iter());
Poll::Ready(FetchEvent::TransactionsFetched {
peer_id,
transactions: transactions.0,
})
}
Ok(Err(req_err)) => {
// TODO: re-request missing hashes
self.remove_inflight_hashes(&requested_hashes);
Poll::Ready(FetchEvent::FetchError { peer_id, error: req_err })
}
Err(_) => {
// TODO: re-request missing hashes
self.remove_inflight_hashes(&requested_hashes);
// request channel closed/dropped
Poll::Ready(FetchEvent::FetchError {
peer_id,
error: RequestError::ChannelClosed,
})
}
}
}
Poll::Pending
}
/// Removes the provided transaction hashes from the inflight requests set.
///
/// This is called when we receive full transactions that are currently scheduled for fetching.
#[inline]
fn on_received_full_transactions_broadcast<'a>(
&mut self,
hashes: impl IntoIterator<Item = &'a TxHash>,
) {
self.remove_inflight_hashes(hashes)
}
/// Requests the missing transactions from the announced hashes of the peer
///
/// This filters all announced hashes that are already in flight, and requests the missing,
/// while marking the given peer as an alternative peer for the hashes that are already in
/// flight.
fn request_transactions_from_peer(
&mut self,
mut announced_hashes: Vec<TxHash>,
peer: &Peer,
) -> bool {
let peer_id: PeerId = peer.request_tx.peer_id;
// 1. filter out inflight hashes, and register the peer as fallback for all inflight hashes
announced_hashes.retain(|&hash| {
match self.inflight_hash_to_fallback_peers.entry(hash) {
Entry::Vacant(entry) => {
// the hash is not in inflight hashes, insert it and retain in the vector
entry.insert(vec![peer_id]);
true
}
Entry::Occupied(mut entry) => {
// the hash is already in inflight, add this peer as a backup if not more than 3
// backups already
let backups = entry.get_mut();
if backups.len() < MAX_ALTERNATIVE_PEERS_PER_TX {
backups.push(peer_id);
}
false
}
}
});
// 2. request all missing from peer
if announced_hashes.is_empty() {
// nothing to request
return false
}
let (response, rx) = oneshot::channel();
let req: PeerRequest = PeerRequest::GetPooledTransactions {
request: GetPooledTransactions(announced_hashes.clone()),
response,
};
// try to send the request to the peer
if let Err(err) = peer.request_tx.try_send(req) {
// peer channel is full
match err {
TrySendError::Full(req) | TrySendError::Closed(req) => {
// need to do some cleanup so
let req = req.into_get_pooled_transactions().expect("is get pooled tx");
// we know that the peer is the only entry in the map, so we can remove all
for hash in req.0.into_iter() {
self.inflight_hash_to_fallback_peers.remove(&hash);
}
}
}
return false
} else {
//create a new request for it, from that peer
self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, announced_hashes, rx))
}
true
}
}
/// Represents possible events from fetching transactions.
#[derive(Debug)]
enum FetchEvent {
/// Triggered when transactions are successfully fetched.
TransactionsFetched {
/// The ID of the peer from which transactions were fetched.
peer_id: PeerId,
/// The transactions that were fetched, if available.
transactions: Vec<PooledTransactionsElement>,
},
/// Triggered when there is an error in fetching transactions.
FetchError {
/// The ID of the peer from which an attempt to fetch transactions resulted in an error.
peer_id: PeerId,
/// The specific error that occurred while fetching.
error: RequestError,
},
}
/// Commands to send to the [`TransactionsManager`]
#[derive(Debug)]
enum TransactionsCommand {
@ -1288,6 +1258,7 @@ mod tests {
use super::*;
use crate::{test_utils::Testnet, NetworkConfigBuilder, NetworkManager};
use alloy_rlp::Decodable;
use futures::FutureExt;
use reth_interfaces::sync::{NetworkSyncUpdater, SyncState};
use reth_network_api::NetworkInfo;
use reth_primitives::hex;
@ -1295,7 +1266,51 @@ mod tests {
use reth_transaction_pool::test_utils::{testing_pool, MockTransaction};
use secp256k1::SecretKey;
use std::future::poll_fn;
use std::{future::poll_fn, hash};
use fetcher::MAX_ALTERNATIVE_PEERS_PER_TX;
async fn new_tx_manager() -> TransactionsManager<impl TransactionPool> {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let client = NoopProvider::default();
let config = NetworkConfigBuilder::new(secret_key).disable_discovery().build(client);
let pool = testing_pool();
let (_network_handle, _network, transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone())
.split_with_handle();
transactions
}
pub(super) fn default_cache<T: hash::Hash + Eq>() -> LruCache<T> {
let limit = NonZeroUsize::new(MAX_ALTERNATIVE_PEERS_PER_TX.into()).unwrap();
LruCache::new(limit)
}
// Returns (peer, channel-to-send-get-pooled-tx-response-on).
fn new_mock_session(
peer_id: PeerId,
version: EthVersion,
) -> (Peer, mpsc::Receiver<PeerRequest>) {
let (to_mock_session_tx, to_mock_session_rx) = mpsc::channel(1);
(
Peer {
transactions: default_cache(),
request_tx: PeerRequestSender::new(peer_id, to_mock_session_tx),
version,
client_version: Arc::from(""),
},
to_mock_session_rx,
)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_ignored_tx_broadcasts_while_initially_syncing() {
reth_tracing::init_test_tracing();
@ -1643,4 +1658,163 @@ mod tests {
}
}
}
#[tokio::test]
async fn max_retries_tx_request() {
reth_tracing::init_test_tracing();
let mut tx_manager = new_tx_manager().await;
let tx_fetcher = &mut tx_manager.transaction_fetcher;
let peer_id_1 = PeerId::new([1; 64]);
let peer_id_2 = PeerId::new([2; 64]);
let eth_version = EthVersion::Eth66;
let seen_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
let (peer_1, mut to_mock_session_rx) = new_mock_session(peer_id_1, eth_version);
tx_manager.peers.insert(peer_id_1, peer_1);
// hashes are seen and currently not inflight, with one fallback peer, and are buffered
// for first retry.
let retries = 1;
let mut backups = default_cache();
backups.insert(peer_id_1);
tx_fetcher.unknown_hashes.insert(seen_hashes[0], (retries, backups.clone()));
tx_fetcher.unknown_hashes.insert(seen_hashes[1], (retries, backups));
tx_fetcher.buffered_hashes.insert(seen_hashes[0]);
tx_fetcher.buffered_hashes.insert(seen_hashes[1]);
// peer_1 is idle
assert!(tx_fetcher.is_idle(peer_id_1));
// sends request for buffered hashes to peer_1
tx_manager.request_buffered_hashes();
let tx_fetcher = &mut tx_manager.transaction_fetcher;
assert!(tx_fetcher.buffered_hashes.is_empty());
// as long as request is in inflight peer_1 is not idle
assert!(!tx_fetcher.is_idle(peer_id_1));
// mock session of peer_1 receives request
let req = to_mock_session_rx
.recv()
.await
.expect("peer_1 session should receive request with buffered hashes");
let PeerRequest::GetPooledTransactions { request, response } = req else { unreachable!() };
let GetPooledTransactions(hashes) = request;
assert_eq!(hashes, seen_hashes);
// fail request to peer_1
response
.send(Err(RequestError::BadResponse))
.expect("should send peer_1 response to tx manager");
let Some(FetchEvent::FetchError { peer_id, .. }) = tx_fetcher.next().await else {
unreachable!()
};
// request has resolved, peer_1 is idle again
assert!(tx_fetcher.is_idle(peer_id));
// failing peer_1's request buffers requested hashes for retry
assert_eq!(tx_fetcher.buffered_hashes.len(), 2);
let (peer_2, mut to_mock_session_rx) = new_mock_session(peer_id_2, eth_version);
tx_manager.peers.insert(peer_id_2, peer_2);
// peer_2 announces same hashes as peer_1
let msg =
NewPooledTransactionHashes::Eth66(NewPooledTransactionHashes66(seen_hashes.to_vec()));
tx_manager.on_new_pooled_transaction_hashes(peer_id_2, msg);
let tx_fetcher = &mut tx_manager.transaction_fetcher;
// since hashes are already seen, no changes to length of unknown hashes
assert_eq!(tx_fetcher.unknown_hashes.len(), 2);
// but hashes are taken out of buffer and packed into request to peer_2
assert!(tx_fetcher.buffered_hashes.is_empty());
// mock session of peer_2 receives request
let req = to_mock_session_rx
.recv()
.await
.expect("peer_2 session should receive request with buffered hashes");
let PeerRequest::GetPooledTransactions { response, .. } = req else { unreachable!() };
// report failed request to tx manager
response
.send(Err(RequestError::BadResponse))
.expect("should send peer_2 response to tx manager");
let Some(FetchEvent::FetchError { .. }) = tx_fetcher.next().await else { unreachable!() };
// `MAX_REQUEST_RETRIES_PER_TX_HASH`, 2, for hashes reached however this time won't be
// buffered for retry
assert!(tx_fetcher.buffered_hashes.is_empty());
}
#[tokio::test]
async fn fill_eth68_request_for_peer() {
reth_tracing::init_test_tracing();
let mut tx_manager = new_tx_manager().await;
let tx_fetcher = &mut tx_manager.transaction_fetcher;
let peer_id = PeerId::new([1; 64]);
let eth_version = EthVersion::Eth68;
let unseen_eth68_hashes = [B256::from_slice(&[1; 32]), B256::from_slice(&[2; 32])];
let unseen_eth68_hashes_sizes =
[MAX_FULL_TRANSACTIONS_PACKET_SIZE / 2, MAX_FULL_TRANSACTIONS_PACKET_SIZE / 2 - 4];
let seen_eth68_hashes =
[B256::from_slice(&[3; 32]), B256::from_slice(&[4; 32]), B256::from_slice(&[5; 32])];
let seen_eth68_hashes_sizes = [
5,
3, // the second hash should be filled into the request because there is space for it
4, // then there is no space for the last anymore
];
// insert peer in tx manager
let (peer, _to_mock_session_rx) = new_mock_session(peer_id, eth_version);
tx_manager.peers.insert(peer_id, peer);
// hashes are seen and currently not inflight, with one fallback peer, and are buffered
// for first try to fetch.
let mut backups = default_cache();
backups.insert(peer_id);
for i in 0..3 {
tx_fetcher.unknown_hashes.insert(seen_eth68_hashes[i], (0, backups.clone()));
tx_fetcher.eth68_meta.insert(seen_eth68_hashes[i], seen_eth68_hashes_sizes[i]);
tx_fetcher.buffered_hashes.insert(seen_eth68_hashes[i]);
}
let (peer, mut to_mock_session_rx) = new_mock_session(peer_id, eth_version);
tx_manager.peers.insert(peer_id, peer);
// peer announces previously unseen hashes
let msg = NewPooledTransactionHashes::Eth68(NewPooledTransactionHashes68 {
hashes: unseen_eth68_hashes.to_vec(),
sizes: unseen_eth68_hashes_sizes.to_vec(),
types: [0; 2].to_vec(),
});
tx_manager.on_new_pooled_transaction_hashes(peer_id, msg);
let tx_fetcher = &mut tx_manager.transaction_fetcher;
// since hashes are unseen, length of unknown hashes increases
assert_eq!(tx_fetcher.unknown_hashes.len(), 5);
// seen_eth68_hashes[1] should be taken out of buffer and packed into request
assert_eq!(tx_fetcher.buffered_hashes.len(), 2);
assert!(tx_fetcher.buffered_hashes.contains(&seen_eth68_hashes[0]));
// mock session of peer receives request
let req = to_mock_session_rx
.recv()
.await
.expect("peer session should receive request with buffered hashes");
let PeerRequest::GetPooledTransactions { request, .. } = req else { unreachable!() };
let GetPooledTransactions(hashes) = request;
let mut expected_request = unseen_eth68_hashes.to_vec();
expected_request.push(seen_eth68_hashes[1]);
assert_eq!(hashes, expected_request);
}
}