fix: track actually requested transactions (#5483)

This commit is contained in:
Matthias Seitz
2023-11-18 09:34:57 +01:00
committed by GitHub
parent d05dda72dd
commit f29e04dadc

View File

@ -760,7 +760,7 @@ where
}
fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
trace!(target: "net::tx", ?peer_id, ?kind);
trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
self.network.reputation_change(peer_id, kind);
self.metrics.reported_bad_transactions.increment(1);
}
@ -831,11 +831,10 @@ where
while let Poll::Ready(fetch_event) = this.transaction_fetcher.poll(cx) {
match fetch_event {
FetchEvent::TransactionsFetched { peer_id, transactions } => {
if let Some(txns) = transactions {
this.import_transactions(peer_id, txns, TransactionSource::Response);
}
this.import_transactions(peer_id, transactions, TransactionSource::Response);
}
FetchEvent::FetchError { peer_id, error } => {
trace!(target: "net::tx", ?peer_id, ?error, "requesting transactions from peer failed");
this.on_request_error(peer_id, error);
}
}
@ -857,7 +856,7 @@ where
// known that this transaction is bad. (e.g. consensus
// rules)
if err.is_bad_transaction() && !this.network.is_syncing() {
trace!(target: "net::tx", ?err, "Bad transaction import");
trace!(target: "net::tx", ?err, "bad pool transaction import");
this.on_bad_import(err.hash);
continue
}
@ -1008,6 +1007,8 @@ impl TransactionSource {
/// An inflight request for `PooledTransactions` from a peer
struct GetPooledTxRequest {
peer_id: PeerId,
/// Transaction hashes that were requested, for cleanup purposes
requested_hashes: Vec<TxHash>,
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
}
@ -1026,11 +1027,13 @@ struct GetPooledTxRequestFut {
}
impl GetPooledTxRequestFut {
#[inline]
fn new(
peer_id: PeerId,
requested_hashes: Vec<TxHash>,
response: oneshot::Receiver<RequestResult<PooledTransactions>>,
) -> Self {
Self { inner: Some(GetPooledTxRequest { peer_id, response }) }
Self { inner: Some(GetPooledTxRequest { peer_id, requested_hashes, response }) }
}
}
@ -1040,20 +1043,11 @@ impl Future for GetPooledTxRequestFut {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut req = self.as_mut().project().inner.take().expect("polled after completion");
match req.response.poll_unpin(cx) {
Poll::Ready(result) => {
let request_hashes: Vec<TxHash> = match &result {
Ok(Ok(pooled_txs)) => {
pooled_txs.0.iter().map(|tx_elem| *tx_elem.hash()).collect()
}
_ => Vec::new(),
};
Poll::Ready(GetPooledTxResponse {
peer_id: req.peer_id,
requested_hashes: request_hashes,
result,
})
}
Poll::Ready(result) => Poll::Ready(GetPooledTxResponse {
peer_id: req.peer_id,
requested_hashes: req.requested_hashes,
result,
}),
Poll::Pending => {
self.project().inner.set(Some(req));
Poll::Pending
@ -1108,16 +1102,16 @@ impl TransactionFetcher {
self.inflight_requests.poll_next_unpin(cx)
{
return match result {
Ok(Ok(txs)) => {
Ok(Ok(transactions)) => {
// clear received hashes
self.remove_inflight_hashes(txs.hashes());
self.remove_inflight_hashes(transactions.hashes());
// TODO: re-request missing hashes, for now clear all of them
self.remove_inflight_hashes(requested_hashes.iter());
Poll::Ready(FetchEvent::TransactionsFetched {
peer_id,
transactions: Some(txs.0),
transactions: transactions.0,
})
}
Ok(Err(req_err)) => {
@ -1189,7 +1183,7 @@ impl TransactionFetcher {
let (response, rx) = oneshot::channel();
let req: PeerRequest = PeerRequest::GetPooledTransactions {
request: GetPooledTransactions(announced_hashes),
request: GetPooledTransactions(announced_hashes.clone()),
response,
};
@ -1210,7 +1204,7 @@ impl TransactionFetcher {
return false
} else {
//create a new request for it, from that peer
self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, rx))
self.inflight_requests.push(GetPooledTxRequestFut::new(peer_id, announced_hashes, rx))
}
true
@ -1225,7 +1219,7 @@ enum FetchEvent {
/// The ID of the peer from which transactions were fetched.
peer_id: PeerId,
/// The transactions that were fetched, if available.
transactions: Option<Vec<PooledTransactionsElement>>,
transactions: Vec<PooledTransactionsElement>,
},
/// Triggered when there is an error in fetching transactions.
FetchError {