mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: implement transaction fetcher, w/o redundant tx hash request (#5058)
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
@ -39,6 +39,13 @@ pub struct PooledTransactions(
|
||||
pub Vec<PooledTransactionsElement>,
|
||||
);
|
||||
|
||||
impl PooledTransactions {
|
||||
/// Returns an iterator over the transaction hashes in this response.
|
||||
pub fn hashes(&self) -> impl Iterator<Item = &B256> + '_ {
|
||||
self.0.iter().map(|tx| tx.hash())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Vec<TransactionSigned>> for PooledTransactions {
|
||||
fn from(txs: Vec<TransactionSigned>) -> Self {
|
||||
PooledTransactions(txs.into_iter().map(Into::into).collect())
|
||||
|
||||
@ -146,6 +146,14 @@ impl PeerRequest {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Consumes the type and returns the inner [`GetPooledTransactions`] variant.
|
||||
pub fn into_get_pooled_transactions(self) -> Option<GetPooledTransactions> {
|
||||
match self {
|
||||
PeerRequest::GetPooledTransactions { request, .. } => Some(request),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Corresponding variant for [`PeerRequest`].
|
||||
|
||||
@ -33,7 +33,7 @@ use std::{
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::sync::{mpsc, oneshot, oneshot::error::RecvError};
|
||||
use tokio::sync::{mpsc, mpsc::error::TrySendError, oneshot, oneshot::error::RecvError};
|
||||
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
|
||||
use tracing::trace;
|
||||
|
||||
@ -55,6 +55,9 @@ const GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES: usize = 256;
|
||||
const GET_POOLED_TRANSACTION_SOFT_LIMIT_SIZE: GetPooledTransactionLimit =
|
||||
GetPooledTransactionLimit::SizeSoftLimit(2 * 1024 * 1024);
|
||||
|
||||
/// How many peers we keep track of for each missing transaction.
|
||||
const MAX_ALTERNATIVE_PEERS_PER_TX: usize = 3;
|
||||
|
||||
/// The future for inserting a function into the pool
|
||||
pub type PoolImportFuture = Pin<Box<dyn Future<Output = PoolResult<TxHash>> + Send + 'static>>;
|
||||
|
||||
@ -149,8 +152,8 @@ pub struct TransactionsManager<Pool> {
|
||||
///
|
||||
/// From which we get all new incoming transaction related messages.
|
||||
network_events: UnboundedReceiverStream<NetworkEvent>,
|
||||
/// All currently active requests for pooled transactions.
|
||||
inflight_requests: FuturesUnordered<GetPooledTxRequestFut>,
|
||||
/// Transaction fetcher to handle inflight and missing transaction requests.
|
||||
transaction_fetcher: TransactionFetcher,
|
||||
/// All currently pending transactions grouped by peers.
|
||||
///
|
||||
/// This way we can track incoming transactions and prevent multiple pool imports for the same
|
||||
@ -192,7 +195,7 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
|
||||
pool,
|
||||
network,
|
||||
network_events,
|
||||
inflight_requests: Default::default(),
|
||||
transaction_fetcher: Default::default(),
|
||||
transactions_by_peers: Default::default(),
|
||||
pool_imports: Default::default(),
|
||||
peers: Default::default(),
|
||||
@ -231,7 +234,9 @@ where
|
||||
|
||||
#[inline]
|
||||
fn update_request_metrics(&self) {
|
||||
self.metrics.inflight_transaction_requests.set(self.inflight_requests.len() as f64);
|
||||
self.metrics
|
||||
.inflight_transaction_requests
|
||||
.set(self.transaction_fetcher.inflight_requests.len() as f64);
|
||||
}
|
||||
|
||||
/// Request handler for an incoming request for transactions
|
||||
@ -503,16 +508,9 @@ where
|
||||
hashes.truncate(GET_POOLED_TRANSACTION_SOFT_LIMIT_NUM_HASHES);
|
||||
|
||||
// request the missing transactions
|
||||
let (response, rx) = oneshot::channel();
|
||||
let req = PeerRequest::GetPooledTransactions {
|
||||
request: GetPooledTransactions(hashes),
|
||||
response,
|
||||
};
|
||||
|
||||
if peer.request_tx.try_send(req).is_ok() {
|
||||
self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, rx))
|
||||
} else {
|
||||
// peer channel is saturated, drop the request
|
||||
let request_sent =
|
||||
self.transaction_fetcher.request_transactions_from_peer(hashes, peer);
|
||||
if !request_sent {
|
||||
self.metrics.egress_peer_channel_full.increment(1);
|
||||
return
|
||||
}
|
||||
@ -542,7 +540,12 @@ where
|
||||
.into_iter()
|
||||
.map(PooledTransactionsElement::try_from_broadcast)
|
||||
.filter_map(Result::ok)
|
||||
.collect();
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// mark the transactions as received
|
||||
self.transaction_fetcher.on_received_full_transactions_broadcast(
|
||||
non_blob_txs.iter().map(|tx| tx.hash()),
|
||||
);
|
||||
|
||||
self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
|
||||
|
||||
@ -775,22 +778,19 @@ where
|
||||
|
||||
this.update_request_metrics();
|
||||
|
||||
// Advance all requests.
|
||||
while let Poll::Ready(Some(GetPooledTxResponse { peer_id, result })) =
|
||||
this.inflight_requests.poll_next_unpin(cx)
|
||||
{
|
||||
match result {
|
||||
Ok(Ok(txs)) => {
|
||||
this.import_transactions(peer_id, txs.0, TransactionSource::Response)
|
||||
}
|
||||
Ok(Err(req_err)) => {
|
||||
this.on_request_error(peer_id, req_err);
|
||||
}
|
||||
Err(_) => {
|
||||
// request channel closed/dropped
|
||||
this.on_request_error(peer_id, RequestError::ChannelClosed)
|
||||
let fetch_event = this.transaction_fetcher.poll(cx);
|
||||
match fetch_event {
|
||||
Poll::Ready(FetchEvent::TransactionsFetched { peer_id, transactions }) => {
|
||||
if let Some(txns) = transactions {
|
||||
this.import_transactions(peer_id, txns, TransactionSource::Response);
|
||||
}
|
||||
}
|
||||
Poll::Ready(FetchEvent::FetchError { peer_id, error }) => {
|
||||
this.on_request_error(peer_id, error);
|
||||
}
|
||||
Poll::Pending => {
|
||||
// No event ready at the moment, nothing to do here.
|
||||
}
|
||||
}
|
||||
|
||||
this.update_request_metrics();
|
||||
@ -965,6 +965,8 @@ struct GetPooledTxRequest {
|
||||
|
||||
struct GetPooledTxResponse {
|
||||
peer_id: PeerId,
|
||||
/// Transaction hashes that were requested, for cleanup purposes
|
||||
requested_hashes: Vec<TxHash>,
|
||||
result: Result<RequestResult<PooledTransactions>, RecvError>,
|
||||
}
|
||||
|
||||
@ -991,7 +993,18 @@ impl Future for GetPooledTxRequestFut {
|
||||
let mut req = self.as_mut().project().inner.take().expect("polled after completion");
|
||||
match req.response.poll_unpin(cx) {
|
||||
Poll::Ready(result) => {
|
||||
Poll::Ready(GetPooledTxResponse { peer_id: req.peer_id, result })
|
||||
let request_hashes: Vec<TxHash> = match &result {
|
||||
Ok(Ok(pooled_txs)) => {
|
||||
pooled_txs.0.iter().map(|tx_elem| *tx_elem.hash()).collect()
|
||||
}
|
||||
_ => Vec::new(),
|
||||
};
|
||||
|
||||
Poll::Ready(GetPooledTxResponse {
|
||||
peer_id: req.peer_id,
|
||||
requested_hashes: request_hashes,
|
||||
result,
|
||||
})
|
||||
}
|
||||
Poll::Pending => {
|
||||
self.project().inner.set(Some(req));
|
||||
@ -1015,6 +1028,166 @@ struct Peer {
|
||||
client_version: Arc<String>,
|
||||
}
|
||||
|
||||
/// The type responsible for fetching missing transactions from peers.
|
||||
///
|
||||
/// This will keep track of unique transaction hashes that are currently being fetched and submits
|
||||
/// new requests on announced hashes.
|
||||
#[derive(Debug, Default)]
|
||||
struct TransactionFetcher {
|
||||
/// All currently active requests for pooled transactions.
|
||||
inflight_requests: FuturesUnordered<GetPooledTxRequestFut>,
|
||||
/// Set that tracks all hashes that are currently being fetched.
|
||||
inflight_hash_to_fallback_peers: HashMap<TxHash, Vec<PeerId>>,
|
||||
}
|
||||
|
||||
// === impl TransactionFetcher ===
|
||||
|
||||
impl TransactionFetcher {
|
||||
/// Removes the specified hashes from inflight tracking.
|
||||
#[inline]
|
||||
fn remove_inflight_hashes<'a, I>(&mut self, hashes: I)
|
||||
where
|
||||
I: IntoIterator<Item = &'a TxHash>,
|
||||
{
|
||||
for &hash in hashes {
|
||||
self.inflight_hash_to_fallback_peers.remove(&hash);
|
||||
}
|
||||
}
|
||||
|
||||
/// Advances all inflight requests and returns the next event.
|
||||
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<FetchEvent> {
|
||||
if let Poll::Ready(Some(GetPooledTxResponse { peer_id, requested_hashes, result })) =
|
||||
self.inflight_requests.poll_next_unpin(cx)
|
||||
{
|
||||
return match result {
|
||||
Ok(Ok(txs)) => {
|
||||
// clear received hashes
|
||||
self.remove_inflight_hashes(txs.hashes());
|
||||
|
||||
// TODO: re-request missing hashes, for now clear all of them
|
||||
self.remove_inflight_hashes(requested_hashes.iter());
|
||||
|
||||
Poll::Ready(FetchEvent::TransactionsFetched {
|
||||
peer_id,
|
||||
transactions: Some(txs.0),
|
||||
})
|
||||
}
|
||||
Ok(Err(req_err)) => {
|
||||
// TODO: re-request missing hashes
|
||||
self.remove_inflight_hashes(&requested_hashes);
|
||||
Poll::Ready(FetchEvent::FetchError { peer_id, error: req_err })
|
||||
}
|
||||
Err(_) => {
|
||||
// TODO: re-request missing hashes
|
||||
self.remove_inflight_hashes(&requested_hashes);
|
||||
// request channel closed/dropped
|
||||
Poll::Ready(FetchEvent::FetchError {
|
||||
peer_id,
|
||||
error: RequestError::ChannelClosed,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
|
||||
/// Removes the provided transaction hashes from the inflight requests set.
|
||||
///
|
||||
/// This is called when we receive full transactions that are currently scheduled for fetching.
|
||||
#[inline]
|
||||
fn on_received_full_transactions_broadcast<'a>(
|
||||
&mut self,
|
||||
hashes: impl IntoIterator<Item = &'a TxHash>,
|
||||
) {
|
||||
self.remove_inflight_hashes(hashes)
|
||||
}
|
||||
|
||||
/// Requests the missing transactions from the announced hashes of the peer
|
||||
///
|
||||
/// This filters all announced hashes that are already in flight, and requests the missing,
|
||||
/// while marking the given peer as an alternative peer for the hashes that are already in
|
||||
/// flight.
|
||||
fn request_transactions_from_peer(
|
||||
&mut self,
|
||||
mut announced_hashes: Vec<TxHash>,
|
||||
peer: &Peer,
|
||||
) -> bool {
|
||||
let peer_id: PeerId = peer.request_tx.peer_id;
|
||||
// 1. filter out inflight hashes, and register the peer as fallback for all inflight hashes
|
||||
announced_hashes.retain(|&hash| {
|
||||
match self.inflight_hash_to_fallback_peers.entry(hash) {
|
||||
Entry::Vacant(entry) => {
|
||||
// the hash is not in inflight hashes, insert it and retain in the vector
|
||||
entry.insert(vec![peer_id]);
|
||||
true
|
||||
}
|
||||
Entry::Occupied(mut entry) => {
|
||||
// the hash is already in inflight, add this peer as a backup if not more than 3
|
||||
// backups already
|
||||
let backups = entry.get_mut();
|
||||
if backups.len() < MAX_ALTERNATIVE_PEERS_PER_TX {
|
||||
backups.push(peer_id);
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// 2. request all missing from peer
|
||||
if announced_hashes.is_empty() {
|
||||
// nothing to request
|
||||
return false
|
||||
}
|
||||
|
||||
let (response, rx) = oneshot::channel();
|
||||
let req: PeerRequest = PeerRequest::GetPooledTransactions {
|
||||
request: GetPooledTransactions(announced_hashes),
|
||||
response,
|
||||
};
|
||||
|
||||
// try to send the request to the peer
|
||||
if let Err(err) = peer.request_tx.try_send(req) {
|
||||
// peer channel is full
|
||||
match err {
|
||||
TrySendError::Full(req) | TrySendError::Closed(req) => {
|
||||
// need to do some cleanup so
|
||||
let req = req.into_get_pooled_transactions().expect("is get pooled tx");
|
||||
|
||||
// we know that the peer is the only entry in the map, so we can remove all
|
||||
for hash in req.0.into_iter() {
|
||||
self.inflight_hash_to_fallback_peers.remove(&hash);
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
} else {
|
||||
//create a new request for it, from that peer
|
||||
self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, rx))
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// Represents possible events from fetching transactions.
|
||||
#[derive(Debug)]
|
||||
enum FetchEvent {
|
||||
/// Triggered when transactions are successfully fetched.
|
||||
TransactionsFetched {
|
||||
/// The ID of the peer from which transactions were fetched.
|
||||
peer_id: PeerId,
|
||||
/// The transactions that were fetched, if available.
|
||||
transactions: Option<Vec<PooledTransactionsElement>>,
|
||||
},
|
||||
/// Triggered when there is an error in fetching transactions.
|
||||
FetchError {
|
||||
/// The ID of the peer from which an attempt to fetch transactions resulted in an error.
|
||||
peer_id: PeerId,
|
||||
/// The specific error that occurred while fetching.
|
||||
error: RequestError,
|
||||
},
|
||||
}
|
||||
|
||||
/// Commands to send to the [`TransactionsManager`]
|
||||
#[derive(Debug)]
|
||||
enum TransactionsCommand {
|
||||
|
||||
Reference in New Issue
Block a user