diff --git a/crates/net/eth-wire/src/types/transactions.rs b/crates/net/eth-wire/src/types/transactions.rs index 46fc30865..18054a5ef 100644 --- a/crates/net/eth-wire/src/types/transactions.rs +++ b/crates/net/eth-wire/src/types/transactions.rs @@ -39,6 +39,13 @@ pub struct PooledTransactions( pub Vec, ); +impl PooledTransactions { + /// Returns an iterator over the transaction hashes in this response. + pub fn hashes(&self) -> impl Iterator + '_ { + self.0.iter().map(|tx| tx.hash()) + } +} + impl From> for PooledTransactions { fn from(txs: Vec) -> Self { PooledTransactions(txs.into_iter().map(Into::into).collect()) diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 79ef8737e..3e9ba3c4d 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -146,6 +146,14 @@ impl PeerRequest { } } } + + /// Consumes the type and returns the inner [`GetPooledTransactions`] variant. + pub fn into_get_pooled_transactions(self) -> Option { + match self { + PeerRequest::GetPooledTransactions { request, .. } => Some(request), + _ => None, + } + } } /// Corresponding variant for [`PeerRequest`]. diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index 05e253d17..a67405b6d 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -33,7 +33,7 @@ use std::{ sync::Arc, task::{Context, Poll}, }; -use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError}; +use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::error::RecvError}; use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; use tracing::trace; @@ -55,6 +55,9 @@ const GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES: usize = 256; const GET_POOLED_TRANSACTION_SOFT_LIMIT_SIZE: GetPooledTransactionLimit = GetPooledTransactionLimit::SizeSoftLimit(2 * 1024 * 1024); +/// How many peers we keep track of for each missing transaction. +const MAX_ALTERNATIVE_PEERS_PER_TX: usize = 3; + /// The future for inserting a function into the pool pub type PoolImportFuture = Pin> + Send + 'static>>; @@ -149,8 +152,8 @@ pub struct TransactionsManager { /// /// From which we get all new incoming transaction related messages. network_events: UnboundedReceiverStream, - /// All currently active requests for pooled transactions. - inflight_requests: FuturesUnordered, + /// Transaction fetcher to handle inflight and missing transaction requests. + transaction_fetcher: TransactionFetcher, /// All currently pending transactions grouped by peers. /// /// This way we can track incoming transactions and prevent multiple pool imports for the same @@ -192,7 +195,7 @@ impl TransactionsManager { pool, network, network_events, - inflight_requests: Default::default(), + transaction_fetcher: Default::default(), transactions_by_peers: Default::default(), pool_imports: Default::default(), peers: Default::default(), @@ -231,7 +234,9 @@ where #[inline] fn update_request_metrics(&self) { - self.metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64); + self.metrics + .inflight_transaction_requests + .set(self.transaction_fetcher.inflight_requests.len() as f64); } /// Request handler for an incoming request for transactions @@ -503,16 +508,9 @@ where hashes.truncate(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES); // request the missing transactions - let (response, rx) = oneshot::channel(); - let req = PeerRequest::GetPooledTransactions { - request: GetPooledTransactions(hashes), - response, - }; - - if peer.request_tx.try_send(req).is_ok() { - self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, rx)) - } else { - // peer channel is saturated, drop the request + let request_sent = + self.transaction_fetcher.request_transactions_from_peer(hashes, peer); + if !request_sent { self.metrics.egress_peer_channel_full.increment(1); return } @@ -542,7 +540,12 @@ where .into_iter() .map(PooledTransactionsElement::try_from_broadcast) .filter_map(Result::ok) - .collect(); + .collect::>(); + + // mark the transactions as received + self.transaction_fetcher.on_received_full_transactions_broadcast( + non_blob_txs.iter().map(|tx| tx.hash()), + ); self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast); @@ -775,22 +778,19 @@ where this.update_request_metrics(); - // Advance all requests. - while let Poll::Ready(Some(GetPooledTxResponse { peer_id, result })) = - this.inflight_requests.poll_next_unpin(cx) - { - match result { - Ok(Ok(txs)) => { - this.import_transactions(peer_id, txs.0, TransactionSource::Response) - } - Ok(Err(req_err)) => { - this.on_request_error(peer_id, req_err); - } - Err(_) => { - // request channel closed/dropped - this.on_request_error(peer_id, RequestError::ChannelClosed) + let fetch_event = this.transaction_fetcher.poll(cx); + match fetch_event { + Poll::Ready(FetchEvent::TransactionsFetched { peer_id, transactions }) => { + if let Some(txns) = transactions { + this.import_transactions(peer_id, txns, TransactionSource::Response); } } + Poll::Ready(FetchEvent::FetchError { peer_id, error }) => { + this.on_request_error(peer_id, error); + } + Poll::Pending => { + // No event ready at the moment, nothing to do here. + } } this.update_request_metrics(); @@ -965,6 +965,8 @@ struct GetPooledTxRequest { struct GetPooledTxResponse { peer_id: PeerId, + /// Transaction hashes that were requested, for cleanup purposes + requested_hashes: Vec, result: Result, RecvError>, } @@ -991,7 +993,18 @@ impl Future for GetPooledTxRequestFut { let mut req = self.as_mut().project().inner.take().expect("polled after completion"); match req.response.poll_unpin(cx) { Poll::Ready(result) => { - Poll::Ready(GetPooledTxResponse { peer_id: req.peer_id, result }) + let request_hashes: Vec = match &result { + Ok(Ok(pooled_txs)) => { + pooled_txs.0.iter().map(|tx_elem| *tx_elem.hash()).collect() + } + _ => Vec::new(), + }; + + Poll::Ready(GetPooledTxResponse { + peer_id: req.peer_id, + requested_hashes: request_hashes, + result, + }) } Poll::Pending => { self.project().inner.set(Some(req)); @@ -1015,6 +1028,166 @@ struct Peer { client_version: Arc, } +/// The type responsible for fetching missing transactions from peers. +/// +/// This will keep track of unique transaction hashes that are currently being fetched and submits +/// new requests on announced hashes. +#[derive(Debug, Default)] +struct TransactionFetcher { + /// All currently active requests for pooled transactions. + inflight_requests: FuturesUnordered, + /// Set that tracks all hashes that are currently being fetched. + inflight_hash_to_fallback_peers: HashMap>, +} + +// === impl TransactionFetcher === + +impl TransactionFetcher { + /// Removes the specified hashes from inflight tracking. + #[inline] + fn remove_inflight_hashes<'a, I>(&mut self, hashes: I) + where + I: IntoIterator, + { + for &hash in hashes { + self.inflight_hash_to_fallback_peers.remove(&hash); + } + } + + /// Advances all inflight requests and returns the next event. + fn poll(&mut self, cx: &mut Context<'_>) -> Poll { + if let Poll::Ready(Some(GetPooledTxResponse { peer_id, requested_hashes, result })) = + self.inflight_requests.poll_next_unpin(cx) + { + return match result { + Ok(Ok(txs)) => { + // clear received hashes + self.remove_inflight_hashes(txs.hashes()); + + // TODO: re-request missing hashes, for now clear all of them + self.remove_inflight_hashes(requested_hashes.iter()); + + Poll::Ready(FetchEvent::TransactionsFetched { + peer_id, + transactions: Some(txs.0), + }) + } + Ok(Err(req_err)) => { + // TODO: re-request missing hashes + self.remove_inflight_hashes(&requested_hashes); + Poll::Ready(FetchEvent::FetchError { peer_id, error: req_err }) + } + Err(_) => { + // TODO: re-request missing hashes + self.remove_inflight_hashes(&requested_hashes); + // request channel closed/dropped + Poll::Ready(FetchEvent::FetchError { + peer_id, + error: RequestError::ChannelClosed, + }) + } + } + } + Poll::Pending + } + + /// Removes the provided transaction hashes from the inflight requests set. + /// + /// This is called when we receive full transactions that are currently scheduled for fetching. + #[inline] + fn on_received_full_transactions_broadcast<'a>( + &mut self, + hashes: impl IntoIterator, + ) { + self.remove_inflight_hashes(hashes) + } + + /// Requests the missing transactions from the announced hashes of the peer + /// + /// This filters all announced hashes that are already in flight, and requests the missing, + /// while marking the given peer as an alternative peer for the hashes that are already in + /// flight. + fn request_transactions_from_peer( + &mut self, + mut announced_hashes: Vec, + peer: &Peer, + ) -> bool { + let peer_id: PeerId = peer.request_tx.peer_id; + // 1. filter out inflight hashes, and register the peer as fallback for all inflight hashes + announced_hashes.retain(|&hash| { + match self.inflight_hash_to_fallback_peers.entry(hash) { + Entry::Vacant(entry) => { + // the hash is not in inflight hashes, insert it and retain in the vector + entry.insert(vec![peer_id]); + true + } + Entry::Occupied(mut entry) => { + // the hash is already in inflight, add this peer as a backup if not more than 3 + // backups already + let backups = entry.get_mut(); + if backups.len() < MAX_ALTERNATIVE_PEERS_PER_TX { + backups.push(peer_id); + } + false + } + } + }); + + // 2. request all missing from peer + if announced_hashes.is_empty() { + // nothing to request + return false + } + + let (response, rx) = oneshot::channel(); + let req: PeerRequest = PeerRequest::GetPooledTransactions { + request: GetPooledTransactions(announced_hashes), + response, + }; + + // try to send the request to the peer + if let Err(err) = peer.request_tx.try_send(req) { + // peer channel is full + match err { + TrySendError::Full(req) | TrySendError::Closed(req) => { + // need to do some cleanup so + let req = req.into_get_pooled_transactions().expect("is get pooled tx"); + + // we know that the peer is the only entry in the map, so we can remove all + for hash in req.0.into_iter() { + self.inflight_hash_to_fallback_peers.remove(&hash); + } + } + } + return false + } else { + //create a new request for it, from that peer + self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, rx)) + } + + true + } +} + +/// Represents possible events from fetching transactions. +#[derive(Debug)] +enum FetchEvent { + /// Triggered when transactions are successfully fetched. + TransactionsFetched { + /// The ID of the peer from which transactions were fetched. + peer_id: PeerId, + /// The transactions that were fetched, if available. + transactions: Option>, + }, + /// Triggered when there is an error in fetching transactions. + FetchError { + /// The ID of the peer from which an attempt to fetch transactions resulted in an error. + peer_id: PeerId, + /// The specific error that occurred while fetching. + error: RequestError, + }, +} + /// Commands to send to the [`TransactionsManager`] #[derive(Debug)] enum TransactionsCommand {