From 7c7baca9807e9a9556cb217d3df8cd03c21ba4ed Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 19 Nov 2024 19:25:01 +0100 Subject: [PATCH] chore: group tx manager functions (#12679) --- crates/net/network/src/network.rs | 2 +- crates/net/network/src/transactions/mod.rs | 673 +++++++++++---------- 2 files changed, 338 insertions(+), 337 deletions(-) diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 1715fa63e..2fa3fd90e 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -252,7 +252,7 @@ impl PeersInfo for NetworkHandle { } } -impl Peers for NetworkHandle { +impl Peers for NetworkHandle { fn add_trusted_peer_id(&self, peer: PeerId) { self.send_message(NetworkHandleMessage::AddTrustedPeerId(peer)); } diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 125818da3..241f01ae8 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -313,22 +313,106 @@ impl TransactionsManager { } } -// === impl TransactionsManager === - -impl TransactionsManager -where - Pool: TransactionPool, -{ +impl TransactionsManager { /// Returns a new handle that can send commands to this type. pub fn handle(&self) -> TransactionsHandle { TransactionsHandle { manager_tx: self.command_tx.clone() } } -} -impl TransactionsManager -where - Pool: TransactionPool + 'static, -{ + /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns + /// `false` if [`TransactionsManager`] is operating close to full capacity. + fn has_capacity_for_fetching_pending_hashes(&self) -> bool { + self.pending_pool_imports_info + .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) && + self.transaction_fetcher.has_capacity_for_fetching_pending_hashes() + } + + fn report_peer_bad_transactions(&self, peer_id: PeerId) { + self.report_peer(peer_id, ReputationChangeKind::BadTransactions); + self.metrics.reported_bad_transactions.increment(1); + } + + fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) { + trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change"); + self.network.reputation_change(peer_id, kind); + } + + fn report_already_seen(&self, peer_id: PeerId) { + trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction"); + self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction); + } + + /// Clear the transaction + fn on_good_import(&mut self, hash: TxHash) { + self.transactions_by_peers.remove(&hash); + } + + /// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid + /// fetching or importing it again. + /// + /// Errors that count as bad transactions are: + /// + /// - intrinsic gas too low + /// - exceeds gas limit + /// - gas uint overflow + /// - exceeds max init code size + /// - oversized data + /// - signer account has bytecode + /// - chain id mismatch + /// - old legacy chain id + /// - tx type not supported + /// + /// (and additionally for blobs txns...) + /// + /// - no blobs + /// - too many blobs + /// - invalid kzg proof + /// - kzg error + /// - not blob transaction (tx type mismatch) + /// - wrong versioned kzg commitment hash + fn on_bad_import(&mut self, err: PoolError) { + let peers = self.transactions_by_peers.remove(&err.hash); + + // if we're _currently_ syncing, we ignore a bad transaction + if !err.is_bad_transaction() || self.network.is_syncing() { + return + } + // otherwise we penalize the peer that sent the bad transaction, with the assumption that + // the peer should have known that this transaction is bad (e.g. violating consensus rules) + if let Some(peers) = peers { + for peer_id in peers { + self.report_peer_bad_transactions(peer_id); + } + } + self.metrics.bad_imports.increment(1); + self.bad_imports.insert(err.hash); + } + + /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`]. + fn on_fetch_hashes_pending_fetch(&mut self) { + // try drain transaction hashes pending fetch + let info = &self.pending_pool_imports_info; + let max_pending_pool_imports = info.max_pending_pool_imports; + let has_capacity_wrt_pending_pool_imports = + |divisor| info.has_capacity(max_pending_pool_imports / divisor); + + self.transaction_fetcher + .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports); + } + + fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) { + let kind = match req_err { + RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol, + RequestError::Timeout => ReputationChangeKind::Timeout, + RequestError::ChannelClosed | RequestError::ConnectionDropped => { + // peer is already disconnected + return + } + RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id), + }; + self.report_peer(peer_id, kind); + } + #[inline] fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) { let metrics = &self.metrics; @@ -354,7 +438,249 @@ where metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64()); metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64()); } +} +impl TransactionsManager +where + Pool: TransactionPool, + N: NetworkPrimitives, +{ + /// Processes a batch import results. + fn on_batch_import_result(&mut self, batch_results: Vec>) { + for res in batch_results { + match res { + Ok(hash) => { + self.on_good_import(hash); + } + Err(err) => { + self.on_bad_import(err); + } + } + } + } + + /// Request handler for an incoming `NewPooledTransactionHashes` + fn on_new_pooled_transaction_hashes( + &mut self, + peer_id: PeerId, + msg: NewPooledTransactionHashes, + ) { + // If the node is initially syncing, ignore transactions + if self.network.is_initially_syncing() { + return + } + if self.network.tx_gossip_disabled() { + return + } + + // get handle to peer's session, if the session is still active + let Some(peer) = self.peers.get_mut(&peer_id) else { + trace!( + peer_id = format!("{peer_id:#}"), + ?msg, + "discarding announcement from inactive peer" + ); + + return + }; + let client = peer.client_version.clone(); + + // keep track of the transactions the peer knows + let mut count_txns_already_seen_by_peer = 0; + for tx in msg.iter_hashes().copied() { + if !peer.seen_transactions.insert(tx) { + count_txns_already_seen_by_peer += 1; + } + } + if count_txns_already_seen_by_peer > 0 { + // this may occur if transactions are sent or announced to a peer, at the same time as + // the peer sends/announces those hashes to us. this is because, marking + // txns as seen by a peer is done optimistically upon sending them to the + // peer. + self.metrics.messages_with_hashes_already_seen_by_peer.increment(1); + self.metrics + .occurrences_hash_already_seen_by_peer + .increment(count_txns_already_seen_by_peer); + + trace!(target: "net::tx", + %count_txns_already_seen_by_peer, + peer_id=format!("{peer_id:#}"), + ?client, + "Peer sent hashes that have already been marked as seen by peer" + ); + + self.report_already_seen(peer_id); + } + + // 1. filter out spam + let (validation_outcome, mut partially_valid_msg) = + self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg); + + if validation_outcome == FilterOutcome::ReportPeer { + self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); + } + + // 2. filter out transactions pending import to pool + partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash)); + + // 3. filter out known hashes + // + // known txns have already been successfully fetched or received over gossip. + // + // most hashes will be filtered out here since this the mempool protocol is a gossip + // protocol, healthy peers will send many of the same hashes. + // + let hashes_count_pre_pool_filter = partially_valid_msg.len(); + self.pool.retain_unknown(&mut partially_valid_msg); + if hashes_count_pre_pool_filter > partially_valid_msg.len() { + let already_known_hashes_count = + hashes_count_pre_pool_filter - partially_valid_msg.len(); + self.metrics + .occurrences_hashes_already_in_pool + .increment(already_known_hashes_count as u64); + } + + if partially_valid_msg.is_empty() { + // nothing to request + return + } + + // 4. filter out invalid entries (spam) + // + // validates messages with respect to the given network, e.g. allowed tx types + // + let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg + .msg_version() + .expect("partially valid announcement should have version") + .is_eth68() + { + // validate eth68 announcement data + self.transaction_fetcher + .filter_valid_message + .filter_valid_entries_68(partially_valid_msg) + } else { + // validate eth66 announcement data + self.transaction_fetcher + .filter_valid_message + .filter_valid_entries_66(partially_valid_msg) + }; + + if validation_outcome == FilterOutcome::ReportPeer { + self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); + } + + if valid_announcement_data.is_empty() { + // no valid announcement data + return + } + + // 5. filter out already seen unknown hashes + // + // seen hashes are already in the tx fetcher, pending fetch. + // + // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx + // fetcher, hence they should be valid at this point. + let bad_imports = &self.bad_imports; + self.transaction_fetcher.filter_unseen_and_pending_hashes( + &mut valid_announcement_data, + |hash| bad_imports.contains(hash), + &peer_id, + |peer_id| self.peers.contains_key(&peer_id), + &client, + ); + + if valid_announcement_data.is_empty() { + // nothing to request + return + } + + trace!(target: "net::tx::propagation", + peer_id=format!("{peer_id:#}"), + hashes_len=valid_announcement_data.iter().count(), + hashes=?valid_announcement_data.keys().collect::>(), + msg_version=%valid_announcement_data.msg_version(), + client_version=%client, + "received previously unseen and pending hashes in announcement from peer" + ); + + // only send request for hashes to idle peer, otherwise buffer hashes storing peer as + // fallback + if !self.transaction_fetcher.is_idle(&peer_id) { + // load message version before announcement data is destructed in packing + let msg_version = valid_announcement_data.msg_version(); + let (hashes, _version) = valid_announcement_data.into_request_hashes(); + + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hashes=?*hashes, + %msg_version, + %client, + "buffering hashes announced by busy peer" + ); + + self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id)); + + return + } + + // load message version before announcement data type is destructed in packing + let msg_version = valid_announcement_data.msg_version(); + // + // demand recommended soft limit on response, however the peer may enforce an arbitrary + // limit on the response (2MB) + // + // request buffer is shrunk via call to pack request! + let init_capacity_req = + self.transaction_fetcher.approx_capacity_get_pooled_transactions_req(msg_version); + let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req); + let surplus_hashes = + self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data); + + if !surplus_hashes.is_empty() { + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + surplus_hashes=?*surplus_hashes, + %msg_version, + %client, + "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes" + ); + + self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id)); + } + + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + hashes=?*hashes_to_request, + %msg_version, + %client, + "sending hashes in `GetPooledTransactions` request to peer's session" + ); + + // request the missing transactions + // + // get handle to peer's session again, at this point we know it exists + let Some(peer) = self.peers.get_mut(&peer_id) else { return }; + if let Some(failed_to_request_hashes) = + self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer) + { + let conn_eth_version = peer.version; + + trace!(target: "net::tx", + peer_id=format!("{peer_id:#}"), + failed_to_request_hashes=?*failed_to_request_hashes, + %conn_eth_version, + %client, + "sending `GetPooledTransactions` request to peer's session failed, buffering hashes" + ); + self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id)); + } + } +} + +impl TransactionsManager +where + Pool: TransactionPool + 'static, +{ /// Request handler for an incoming request for transactions fn on_get_pooled_transactions( &mut self, @@ -648,223 +974,6 @@ where self.pool.on_propagated(propagated); } - /// Request handler for an incoming `NewPooledTransactionHashes` - fn on_new_pooled_transaction_hashes( - &mut self, - peer_id: PeerId, - msg: NewPooledTransactionHashes, - ) { - // If the node is initially syncing, ignore transactions - if self.network.is_initially_syncing() { - return - } - if self.network.tx_gossip_disabled() { - return - } - - // get handle to peer's session, if the session is still active - let Some(peer) = self.peers.get_mut(&peer_id) else { - trace!( - peer_id = format!("{peer_id:#}"), - ?msg, - "discarding announcement from inactive peer" - ); - - return - }; - let client = peer.client_version.clone(); - - // keep track of the transactions the peer knows - let mut count_txns_already_seen_by_peer = 0; - for tx in msg.iter_hashes().copied() { - if !peer.seen_transactions.insert(tx) { - count_txns_already_seen_by_peer += 1; - } - } - if count_txns_already_seen_by_peer > 0 { - // this may occur if transactions are sent or announced to a peer, at the same time as - // the peer sends/announces those hashes to us. this is because, marking - // txns as seen by a peer is done optimistically upon sending them to the - // peer. - self.metrics.messages_with_hashes_already_seen_by_peer.increment(1); - self.metrics - .occurrences_hash_already_seen_by_peer - .increment(count_txns_already_seen_by_peer); - - trace!(target: "net::tx", - %count_txns_already_seen_by_peer, - peer_id=format!("{peer_id:#}"), - ?client, - "Peer sent hashes that have already been marked as seen by peer" - ); - - self.report_already_seen(peer_id); - } - - // 1. filter out spam - let (validation_outcome, mut partially_valid_msg) = - self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg); - - if validation_outcome == FilterOutcome::ReportPeer { - self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); - } - - // 2. filter out transactions pending import to pool - partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash)); - - // 3. filter out known hashes - // - // known txns have already been successfully fetched or received over gossip. - // - // most hashes will be filtered out here since this the mempool protocol is a gossip - // protocol, healthy peers will send many of the same hashes. - // - let hashes_count_pre_pool_filter = partially_valid_msg.len(); - self.pool.retain_unknown(&mut partially_valid_msg); - if hashes_count_pre_pool_filter > partially_valid_msg.len() { - let already_known_hashes_count = - hashes_count_pre_pool_filter - partially_valid_msg.len(); - self.metrics - .occurrences_hashes_already_in_pool - .increment(already_known_hashes_count as u64); - } - - if partially_valid_msg.is_empty() { - // nothing to request - return - } - - // 4. filter out invalid entries (spam) - // - // validates messages with respect to the given network, e.g. allowed tx types - // - let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg - .msg_version() - .expect("partially valid announcement should have version") - .is_eth68() - { - // validate eth68 announcement data - self.transaction_fetcher - .filter_valid_message - .filter_valid_entries_68(partially_valid_msg) - } else { - // validate eth66 announcement data - self.transaction_fetcher - .filter_valid_message - .filter_valid_entries_66(partially_valid_msg) - }; - - if validation_outcome == FilterOutcome::ReportPeer { - self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement); - } - - if valid_announcement_data.is_empty() { - // no valid announcement data - return - } - - // 5. filter out already seen unknown hashes - // - // seen hashes are already in the tx fetcher, pending fetch. - // - // for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx - // fetcher, hence they should be valid at this point. - let bad_imports = &self.bad_imports; - self.transaction_fetcher.filter_unseen_and_pending_hashes( - &mut valid_announcement_data, - |hash| bad_imports.contains(hash), - &peer_id, - |peer_id| self.peers.contains_key(&peer_id), - &client, - ); - - if valid_announcement_data.is_empty() { - // nothing to request - return - } - - trace!(target: "net::tx::propagation", - peer_id=format!("{peer_id:#}"), - hashes_len=valid_announcement_data.iter().count(), - hashes=?valid_announcement_data.keys().collect::>(), - msg_version=%valid_announcement_data.msg_version(), - client_version=%client, - "received previously unseen and pending hashes in announcement from peer" - ); - - // only send request for hashes to idle peer, otherwise buffer hashes storing peer as - // fallback - if !self.transaction_fetcher.is_idle(&peer_id) { - // load message version before announcement data is destructed in packing - let msg_version = valid_announcement_data.msg_version(); - let (hashes, _version) = valid_announcement_data.into_request_hashes(); - - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hashes=?*hashes, - %msg_version, - %client, - "buffering hashes announced by busy peer" - ); - - self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id)); - - return - } - - // load message version before announcement data type is destructed in packing - let msg_version = valid_announcement_data.msg_version(); - // - // demand recommended soft limit on response, however the peer may enforce an arbitrary - // limit on the response (2MB) - // - // request buffer is shrunk via call to pack request! - let init_capacity_req = - self.transaction_fetcher.approx_capacity_get_pooled_transactions_req(msg_version); - let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req); - let surplus_hashes = - self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data); - - if !surplus_hashes.is_empty() { - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - surplus_hashes=?*surplus_hashes, - %msg_version, - %client, - "some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes" - ); - - self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id)); - } - - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - hashes=?*hashes_to_request, - %msg_version, - %client, - "sending hashes in `GetPooledTransactions` request to peer's session" - ); - - // request the missing transactions - // - // get handle to peer's session again, at this point we know it exists - let Some(peer) = self.peers.get_mut(&peer_id) else { return }; - if let Some(failed_to_request_hashes) = - self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer) - { - let conn_eth_version = peer.version; - - trace!(target: "net::tx", - peer_id=format!("{peer_id:#}"), - failed_to_request_hashes=?*failed_to_request_hashes, - %conn_eth_version, - %client, - "sending `GetPooledTransactions` request to peer's session failed, buffering hashes" - ); - self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id)); - } - } - /// Handles dedicated transaction events related to the `eth` protocol. fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) { match event { @@ -1136,20 +1245,6 @@ where } } - /// Processes a batch import results. - fn on_batch_import_result(&mut self, batch_results: Vec>) { - for res in batch_results { - match res { - Ok(hash) => { - self.on_good_import(hash); - } - Err(err) => { - self.on_bad_import(err); - } - } - } - } - /// Processes a [`FetchEvent`]. fn on_fetch_event(&mut self, fetch_event: FetchEvent) { match fetch_event { @@ -1165,100 +1260,6 @@ where } } } - - /// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`]. - fn on_fetch_hashes_pending_fetch(&mut self) { - // try drain transaction hashes pending fetch - let info = &self.pending_pool_imports_info; - let max_pending_pool_imports = info.max_pending_pool_imports; - let has_capacity_wrt_pending_pool_imports = - |divisor| info.has_capacity(max_pending_pool_imports / divisor); - - self.transaction_fetcher - .on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports); - } - - fn report_peer_bad_transactions(&self, peer_id: PeerId) { - self.report_peer(peer_id, ReputationChangeKind::BadTransactions); - self.metrics.reported_bad_transactions.increment(1); - } - - fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) { - trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change"); - self.network.reputation_change(peer_id, kind); - } - - fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) { - let kind = match req_err { - RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol, - RequestError::Timeout => ReputationChangeKind::Timeout, - RequestError::ChannelClosed | RequestError::ConnectionDropped => { - // peer is already disconnected - return - } - RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id), - }; - self.report_peer(peer_id, kind); - } - - fn report_already_seen(&self, peer_id: PeerId) { - trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction"); - self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction); - } - - /// Clear the transaction - fn on_good_import(&mut self, hash: TxHash) { - self.transactions_by_peers.remove(&hash); - } - - /// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid - /// fetching or importing it again. - /// - /// Errors that count as bad transactions are: - /// - /// - intrinsic gas too low - /// - exceeds gas limit - /// - gas uint overflow - /// - exceeds max init code size - /// - oversized data - /// - signer account has bytecode - /// - chain id mismatch - /// - old legacy chain id - /// - tx type not supported - /// - /// (and additionally for blobs txns...) - /// - /// - no blobs - /// - too many blobs - /// - invalid kzg proof - /// - kzg error - /// - not blob transaction (tx type mismatch) - /// - wrong versioned kzg commitment hash - fn on_bad_import(&mut self, err: PoolError) { - let peers = self.transactions_by_peers.remove(&err.hash); - - // if we're _currently_ syncing, we ignore a bad transaction - if !err.is_bad_transaction() || self.network.is_syncing() { - return - } - // otherwise we penalize the peer that sent the bad transaction, with the assumption that - // the peer should have known that this transaction is bad (e.g. violating consensus rules) - if let Some(peers) = peers { - for peer_id in peers { - self.report_peer_bad_transactions(peer_id); - } - } - self.metrics.bad_imports.increment(1); - self.bad_imports.insert(err.hash); - } - - /// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns - /// `false` if [`TransactionsManager`] is operating close to full capacity. - fn has_capacity_for_fetching_pending_hashes(&self) -> bool { - self.pending_pool_imports_info - .has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) && - self.transaction_fetcher.has_capacity_for_fetching_pending_hashes() - } } /// An endless future. Preemption ensure that future is non-blocking, nonetheless. See