chore: group tx manager functions (#12679)

This commit is contained in:
Matthias Seitz
2024-11-19 19:25:01 +01:00
committed by GitHub
parent 8c467e4291
commit 7c7baca980
2 changed files with 338 additions and 337 deletions

View File

@ -252,7 +252,7 @@ impl<N: NetworkPrimitives> PeersInfo for NetworkHandle<N> {
}
}
impl Peers for NetworkHandle {
impl<N: NetworkPrimitives> Peers for NetworkHandle<N> {
fn add_trusted_peer_id(&self, peer: PeerId) {
self.send_message(NetworkHandleMessage::AddTrustedPeerId(peer));
}

View File

@ -313,22 +313,106 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
}
}
// === impl TransactionsManager ===
impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool,
{
impl<Pool, N: NetworkPrimitives> TransactionsManager<Pool, N> {
/// Returns a new handle that can send commands to this type.
pub fn handle(&self) -> TransactionsHandle {
TransactionsHandle { manager_tx: self.command_tx.clone() }
}
}
impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool + 'static,
{
/// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
/// `false` if [`TransactionsManager`] is operating close to full capacity.
fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
self.pending_pool_imports_info
.has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
}
fn report_peer_bad_transactions(&self, peer_id: PeerId) {
self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
self.metrics.reported_bad_transactions.increment(1);
}
fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
self.network.reputation_change(peer_id, kind);
}
fn report_already_seen(&self, peer_id: PeerId) {
trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
}
/// Clear the transaction
fn on_good_import(&mut self, hash: TxHash) {
self.transactions_by_peers.remove(&hash);
}
/// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid
/// fetching or importing it again.
///
/// Errors that count as bad transactions are:
///
/// - intrinsic gas too low
/// - exceeds gas limit
/// - gas uint overflow
/// - exceeds max init code size
/// - oversized data
/// - signer account has bytecode
/// - chain id mismatch
/// - old legacy chain id
/// - tx type not supported
///
/// (and additionally for blobs txns...)
///
/// - no blobs
/// - too many blobs
/// - invalid kzg proof
/// - kzg error
/// - not blob transaction (tx type mismatch)
/// - wrong versioned kzg commitment hash
fn on_bad_import(&mut self, err: PoolError) {
let peers = self.transactions_by_peers.remove(&err.hash);
// if we're _currently_ syncing, we ignore a bad transaction
if !err.is_bad_transaction() || self.network.is_syncing() {
return
}
// otherwise we penalize the peer that sent the bad transaction, with the assumption that
// the peer should have known that this transaction is bad (e.g. violating consensus rules)
if let Some(peers) = peers {
for peer_id in peers {
self.report_peer_bad_transactions(peer_id);
}
}
self.metrics.bad_imports.increment(1);
self.bad_imports.insert(err.hash);
}
/// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
fn on_fetch_hashes_pending_fetch(&mut self) {
// try drain transaction hashes pending fetch
let info = &self.pending_pool_imports_info;
let max_pending_pool_imports = info.max_pending_pool_imports;
let has_capacity_wrt_pending_pool_imports =
|divisor| info.has_capacity(max_pending_pool_imports / divisor);
self.transaction_fetcher
.on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
}
fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
let kind = match req_err {
RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
RequestError::Timeout => ReputationChangeKind::Timeout,
RequestError::ChannelClosed | RequestError::ConnectionDropped => {
// peer is already disconnected
return
}
RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
};
self.report_peer(peer_id, kind);
}
#[inline]
fn update_poll_metrics(&self, start: Instant, poll_durations: TxManagerPollDurations) {
let metrics = &self.metrics;
@ -354,7 +438,249 @@ where
metrics.acc_duration_fetch_pending_hashes.set(acc_pending_fetch.as_secs_f64());
metrics.acc_duration_poll_commands.set(acc_cmds.as_secs_f64());
}
}
impl<Pool, N> TransactionsManager<Pool, N>
where
Pool: TransactionPool,
N: NetworkPrimitives,
{
/// Processes a batch import results.
fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<TxHash>>) {
for res in batch_results {
match res {
Ok(hash) => {
self.on_good_import(hash);
}
Err(err) => {
self.on_bad_import(err);
}
}
}
}
/// Request handler for an incoming `NewPooledTransactionHashes`
fn on_new_pooled_transaction_hashes(
&mut self,
peer_id: PeerId,
msg: NewPooledTransactionHashes,
) {
// If the node is initially syncing, ignore transactions
if self.network.is_initially_syncing() {
return
}
if self.network.tx_gossip_disabled() {
return
}
// get handle to peer's session, if the session is still active
let Some(peer) = self.peers.get_mut(&peer_id) else {
trace!(
peer_id = format!("{peer_id:#}"),
?msg,
"discarding announcement from inactive peer"
);
return
};
let client = peer.client_version.clone();
// keep track of the transactions the peer knows
let mut count_txns_already_seen_by_peer = 0;
for tx in msg.iter_hashes().copied() {
if !peer.seen_transactions.insert(tx) {
count_txns_already_seen_by_peer += 1;
}
}
if count_txns_already_seen_by_peer > 0 {
// this may occur if transactions are sent or announced to a peer, at the same time as
// the peer sends/announces those hashes to us. this is because, marking
// txns as seen by a peer is done optimistically upon sending them to the
// peer.
self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
self.metrics
.occurrences_hash_already_seen_by_peer
.increment(count_txns_already_seen_by_peer);
trace!(target: "net::tx",
%count_txns_already_seen_by_peer,
peer_id=format!("{peer_id:#}"),
?client,
"Peer sent hashes that have already been marked as seen by peer"
);
self.report_already_seen(peer_id);
}
// 1. filter out spam
let (validation_outcome, mut partially_valid_msg) =
self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg);
if validation_outcome == FilterOutcome::ReportPeer {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}
// 2. filter out transactions pending import to pool
partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
// 3. filter out known hashes
//
// known txns have already been successfully fetched or received over gossip.
//
// most hashes will be filtered out here since this the mempool protocol is a gossip
// protocol, healthy peers will send many of the same hashes.
//
let hashes_count_pre_pool_filter = partially_valid_msg.len();
self.pool.retain_unknown(&mut partially_valid_msg);
if hashes_count_pre_pool_filter > partially_valid_msg.len() {
let already_known_hashes_count =
hashes_count_pre_pool_filter - partially_valid_msg.len();
self.metrics
.occurrences_hashes_already_in_pool
.increment(already_known_hashes_count as u64);
}
if partially_valid_msg.is_empty() {
// nothing to request
return
}
// 4. filter out invalid entries (spam)
//
// validates messages with respect to the given network, e.g. allowed tx types
//
let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg
.msg_version()
.expect("partially valid announcement should have version")
.is_eth68()
{
// validate eth68 announcement data
self.transaction_fetcher
.filter_valid_message
.filter_valid_entries_68(partially_valid_msg)
} else {
// validate eth66 announcement data
self.transaction_fetcher
.filter_valid_message
.filter_valid_entries_66(partially_valid_msg)
};
if validation_outcome == FilterOutcome::ReportPeer {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}
if valid_announcement_data.is_empty() {
// no valid announcement data
return
}
// 5. filter out already seen unknown hashes
//
// seen hashes are already in the tx fetcher, pending fetch.
//
// for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
// fetcher, hence they should be valid at this point.
let bad_imports = &self.bad_imports;
self.transaction_fetcher.filter_unseen_and_pending_hashes(
&mut valid_announcement_data,
|hash| bad_imports.contains(hash),
&peer_id,
|peer_id| self.peers.contains_key(&peer_id),
&client,
);
if valid_announcement_data.is_empty() {
// nothing to request
return
}
trace!(target: "net::tx::propagation",
peer_id=format!("{peer_id:#}"),
hashes_len=valid_announcement_data.iter().count(),
hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
msg_version=%valid_announcement_data.msg_version(),
client_version=%client,
"received previously unseen and pending hashes in announcement from peer"
);
// only send request for hashes to idle peer, otherwise buffer hashes storing peer as
// fallback
if !self.transaction_fetcher.is_idle(&peer_id) {
// load message version before announcement data is destructed in packing
let msg_version = valid_announcement_data.msg_version();
let (hashes, _version) = valid_announcement_data.into_request_hashes();
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?*hashes,
%msg_version,
%client,
"buffering hashes announced by busy peer"
);
self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
return
}
// load message version before announcement data type is destructed in packing
let msg_version = valid_announcement_data.msg_version();
//
// demand recommended soft limit on response, however the peer may enforce an arbitrary
// limit on the response (2MB)
//
// request buffer is shrunk via call to pack request!
let init_capacity_req =
self.transaction_fetcher.approx_capacity_get_pooled_transactions_req(msg_version);
let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req);
let surplus_hashes =
self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
if !surplus_hashes.is_empty() {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
surplus_hashes=?*surplus_hashes,
%msg_version,
%client,
"some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
);
self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
}
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?*hashes_to_request,
%msg_version,
%client,
"sending hashes in `GetPooledTransactions` request to peer's session"
);
// request the missing transactions
//
// get handle to peer's session again, at this point we know it exists
let Some(peer) = self.peers.get_mut(&peer_id) else { return };
if let Some(failed_to_request_hashes) =
self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
{
let conn_eth_version = peer.version;
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
failed_to_request_hashes=?*failed_to_request_hashes,
%conn_eth_version,
%client,
"sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
);
self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
}
}
}
impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool + 'static,
{
/// Request handler for an incoming request for transactions
fn on_get_pooled_transactions(
&mut self,
@ -648,223 +974,6 @@ where
self.pool.on_propagated(propagated);
}
/// Request handler for an incoming `NewPooledTransactionHashes`
fn on_new_pooled_transaction_hashes(
&mut self,
peer_id: PeerId,
msg: NewPooledTransactionHashes,
) {
// If the node is initially syncing, ignore transactions
if self.network.is_initially_syncing() {
return
}
if self.network.tx_gossip_disabled() {
return
}
// get handle to peer's session, if the session is still active
let Some(peer) = self.peers.get_mut(&peer_id) else {
trace!(
peer_id = format!("{peer_id:#}"),
?msg,
"discarding announcement from inactive peer"
);
return
};
let client = peer.client_version.clone();
// keep track of the transactions the peer knows
let mut count_txns_already_seen_by_peer = 0;
for tx in msg.iter_hashes().copied() {
if !peer.seen_transactions.insert(tx) {
count_txns_already_seen_by_peer += 1;
}
}
if count_txns_already_seen_by_peer > 0 {
// this may occur if transactions are sent or announced to a peer, at the same time as
// the peer sends/announces those hashes to us. this is because, marking
// txns as seen by a peer is done optimistically upon sending them to the
// peer.
self.metrics.messages_with_hashes_already_seen_by_peer.increment(1);
self.metrics
.occurrences_hash_already_seen_by_peer
.increment(count_txns_already_seen_by_peer);
trace!(target: "net::tx",
%count_txns_already_seen_by_peer,
peer_id=format!("{peer_id:#}"),
?client,
"Peer sent hashes that have already been marked as seen by peer"
);
self.report_already_seen(peer_id);
}
// 1. filter out spam
let (validation_outcome, mut partially_valid_msg) =
self.transaction_fetcher.filter_valid_message.partially_filter_valid_entries(msg);
if validation_outcome == FilterOutcome::ReportPeer {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}
// 2. filter out transactions pending import to pool
partially_valid_msg.retain_by_hash(|hash| !self.transactions_by_peers.contains_key(hash));
// 3. filter out known hashes
//
// known txns have already been successfully fetched or received over gossip.
//
// most hashes will be filtered out here since this the mempool protocol is a gossip
// protocol, healthy peers will send many of the same hashes.
//
let hashes_count_pre_pool_filter = partially_valid_msg.len();
self.pool.retain_unknown(&mut partially_valid_msg);
if hashes_count_pre_pool_filter > partially_valid_msg.len() {
let already_known_hashes_count =
hashes_count_pre_pool_filter - partially_valid_msg.len();
self.metrics
.occurrences_hashes_already_in_pool
.increment(already_known_hashes_count as u64);
}
if partially_valid_msg.is_empty() {
// nothing to request
return
}
// 4. filter out invalid entries (spam)
//
// validates messages with respect to the given network, e.g. allowed tx types
//
let (validation_outcome, mut valid_announcement_data) = if partially_valid_msg
.msg_version()
.expect("partially valid announcement should have version")
.is_eth68()
{
// validate eth68 announcement data
self.transaction_fetcher
.filter_valid_message
.filter_valid_entries_68(partially_valid_msg)
} else {
// validate eth66 announcement data
self.transaction_fetcher
.filter_valid_message
.filter_valid_entries_66(partially_valid_msg)
};
if validation_outcome == FilterOutcome::ReportPeer {
self.report_peer(peer_id, ReputationChangeKind::BadAnnouncement);
}
if valid_announcement_data.is_empty() {
// no valid announcement data
return
}
// 5. filter out already seen unknown hashes
//
// seen hashes are already in the tx fetcher, pending fetch.
//
// for any seen hashes add the peer as fallback. unseen hashes are loaded into the tx
// fetcher, hence they should be valid at this point.
let bad_imports = &self.bad_imports;
self.transaction_fetcher.filter_unseen_and_pending_hashes(
&mut valid_announcement_data,
|hash| bad_imports.contains(hash),
&peer_id,
|peer_id| self.peers.contains_key(&peer_id),
&client,
);
if valid_announcement_data.is_empty() {
// nothing to request
return
}
trace!(target: "net::tx::propagation",
peer_id=format!("{peer_id:#}"),
hashes_len=valid_announcement_data.iter().count(),
hashes=?valid_announcement_data.keys().collect::<Vec<_>>(),
msg_version=%valid_announcement_data.msg_version(),
client_version=%client,
"received previously unseen and pending hashes in announcement from peer"
);
// only send request for hashes to idle peer, otherwise buffer hashes storing peer as
// fallback
if !self.transaction_fetcher.is_idle(&peer_id) {
// load message version before announcement data is destructed in packing
let msg_version = valid_announcement_data.msg_version();
let (hashes, _version) = valid_announcement_data.into_request_hashes();
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?*hashes,
%msg_version,
%client,
"buffering hashes announced by busy peer"
);
self.transaction_fetcher.buffer_hashes(hashes, Some(peer_id));
return
}
// load message version before announcement data type is destructed in packing
let msg_version = valid_announcement_data.msg_version();
//
// demand recommended soft limit on response, however the peer may enforce an arbitrary
// limit on the response (2MB)
//
// request buffer is shrunk via call to pack request!
let init_capacity_req =
self.transaction_fetcher.approx_capacity_get_pooled_transactions_req(msg_version);
let mut hashes_to_request = RequestTxHashes::with_capacity(init_capacity_req);
let surplus_hashes =
self.transaction_fetcher.pack_request(&mut hashes_to_request, valid_announcement_data);
if !surplus_hashes.is_empty() {
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
surplus_hashes=?*surplus_hashes,
%msg_version,
%client,
"some hashes in announcement from peer didn't fit in `GetPooledTransactions` request, buffering surplus hashes"
);
self.transaction_fetcher.buffer_hashes(surplus_hashes, Some(peer_id));
}
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
hashes=?*hashes_to_request,
%msg_version,
%client,
"sending hashes in `GetPooledTransactions` request to peer's session"
);
// request the missing transactions
//
// get handle to peer's session again, at this point we know it exists
let Some(peer) = self.peers.get_mut(&peer_id) else { return };
if let Some(failed_to_request_hashes) =
self.transaction_fetcher.request_transactions_from_peer(hashes_to_request, peer)
{
let conn_eth_version = peer.version;
trace!(target: "net::tx",
peer_id=format!("{peer_id:#}"),
failed_to_request_hashes=?*failed_to_request_hashes,
%conn_eth_version,
%client,
"sending `GetPooledTransactions` request to peer's session failed, buffering hashes"
);
self.transaction_fetcher.buffer_hashes(failed_to_request_hashes, Some(peer_id));
}
}
/// Handles dedicated transaction events related to the `eth` protocol.
fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) {
match event {
@ -1136,20 +1245,6 @@ where
}
}
/// Processes a batch import results.
fn on_batch_import_result(&mut self, batch_results: Vec<PoolResult<TxHash>>) {
for res in batch_results {
match res {
Ok(hash) => {
self.on_good_import(hash);
}
Err(err) => {
self.on_bad_import(err);
}
}
}
}
/// Processes a [`FetchEvent`].
fn on_fetch_event(&mut self, fetch_event: FetchEvent) {
match fetch_event {
@ -1165,100 +1260,6 @@ where
}
}
}
/// Runs an operation to fetch hashes that are cached in [`TransactionFetcher`].
fn on_fetch_hashes_pending_fetch(&mut self) {
// try drain transaction hashes pending fetch
let info = &self.pending_pool_imports_info;
let max_pending_pool_imports = info.max_pending_pool_imports;
let has_capacity_wrt_pending_pool_imports =
|divisor| info.has_capacity(max_pending_pool_imports / divisor);
self.transaction_fetcher
.on_fetch_pending_hashes(&self.peers, has_capacity_wrt_pending_pool_imports);
}
fn report_peer_bad_transactions(&self, peer_id: PeerId) {
self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
self.metrics.reported_bad_transactions.increment(1);
}
fn report_peer(&self, peer_id: PeerId, kind: ReputationChangeKind) {
trace!(target: "net::tx", ?peer_id, ?kind, "reporting reputation change");
self.network.reputation_change(peer_id, kind);
}
fn on_request_error(&self, peer_id: PeerId, req_err: RequestError) {
let kind = match req_err {
RequestError::UnsupportedCapability => ReputationChangeKind::BadProtocol,
RequestError::Timeout => ReputationChangeKind::Timeout,
RequestError::ChannelClosed | RequestError::ConnectionDropped => {
// peer is already disconnected
return
}
RequestError::BadResponse => return self.report_peer_bad_transactions(peer_id),
};
self.report_peer(peer_id, kind);
}
fn report_already_seen(&self, peer_id: PeerId) {
trace!(target: "net::tx", ?peer_id, "Penalizing peer for already seen transaction");
self.network.reputation_change(peer_id, ReputationChangeKind::AlreadySeenTransaction);
}
/// Clear the transaction
fn on_good_import(&mut self, hash: TxHash) {
self.transactions_by_peers.remove(&hash);
}
/// Penalize the peers that intentionally sent the bad transaction, and cache it to avoid
/// fetching or importing it again.
///
/// Errors that count as bad transactions are:
///
/// - intrinsic gas too low
/// - exceeds gas limit
/// - gas uint overflow
/// - exceeds max init code size
/// - oversized data
/// - signer account has bytecode
/// - chain id mismatch
/// - old legacy chain id
/// - tx type not supported
///
/// (and additionally for blobs txns...)
///
/// - no blobs
/// - too many blobs
/// - invalid kzg proof
/// - kzg error
/// - not blob transaction (tx type mismatch)
/// - wrong versioned kzg commitment hash
fn on_bad_import(&mut self, err: PoolError) {
let peers = self.transactions_by_peers.remove(&err.hash);
// if we're _currently_ syncing, we ignore a bad transaction
if !err.is_bad_transaction() || self.network.is_syncing() {
return
}
// otherwise we penalize the peer that sent the bad transaction, with the assumption that
// the peer should have known that this transaction is bad (e.g. violating consensus rules)
if let Some(peers) = peers {
for peer_id in peers {
self.report_peer_bad_transactions(peer_id);
}
}
self.metrics.bad_imports.increment(1);
self.bad_imports.insert(err.hash);
}
/// Returns `true` if [`TransactionsManager`] has capacity to request pending hashes. Returns
/// `false` if [`TransactionsManager`] is operating close to full capacity.
fn has_capacity_for_fetching_pending_hashes(&self) -> bool {
self.pending_pool_imports_info
.has_capacity(self.pending_pool_imports_info.max_pending_pool_imports) &&
self.transaction_fetcher.has_capacity_for_fetching_pending_hashes()
}
}
/// An endless future. Preemption ensure that future is non-blocking, nonetheless. See