feat(network): implement PropagateHashTo helper in TransactionsCommand (#4713)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
duguorong009
2023-09-26 20:33:56 +08:00
committed by GitHub
parent e255dd8d49
commit 321ded5b6f

View File

@ -76,9 +76,18 @@ impl TransactionsHandle {
self.send(TransactionsCommand::PropagateHash(hash))
}
/// Manually propagate the transaction that belongs to the hash to a specific peer.
/// Manually propagate the transaction hash to a specific peer.
///
/// Note: this only propagates if the pool contains the transaction.
pub fn propagate_hash_to(&self, hash: TxHash, peer: PeerId) {
self.send(TransactionsCommand::PropagateHashTo(hash, peer))
self.propagate_hashes_to(Some(hash), peer)
}
/// Manually propagate the transaction hashes to a specific peer.
///
/// Note: this only propagates the transactions that are known to the pool.
pub fn propagate_hashes_to(&self, hash: impl IntoIterator<Item = TxHash>, peer: PeerId) {
self.send(TransactionsCommand::PropagateHashesTo(hash.into_iter().collect(), peer))
}
/// Request the active peer IDs from the [`TransactionsManager`].
@ -337,6 +346,57 @@ where
propagated
}
/// Propagate the transaction hashes to the given peer
///
/// Note: This will only send the hashes for transactions that exist in the pool.
fn propagate_hashes_to(&mut self, hashes: Vec<TxHash>, peer_id: PeerId) {
trace!(target: "net::tx", "Start propagating transactions as hashes");
// This fetches a transactions from the pool, including the blob transactions, which are
// only ever sent as hashes.
let propagated = {
let Some(peer) = self.peers.get_mut(&peer_id) else {
// no such peer
return
};
let to_propagate: Vec<PropagateTransaction> =
self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect();
let mut propagated = PropagatedTransactions::default();
// check if transaction is known to peer
let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
for tx in to_propagate {
if peer.transactions.insert(tx.hash()) {
hashes.push(&tx);
}
}
let new_pooled_hashes = hashes.build();
if new_pooled_hashes.is_empty() {
// nothing to propagate
return
}
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
}
// send hashes of transactions
self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
// Update propagated transactions metrics
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
propagated
};
// notify pool so events get fired
self.pool.on_propagated(propagated);
}
/// Request handler for an incoming `NewPooledTransactionHashes`
fn on_new_pooled_transaction_hashes(
&mut self,
@ -430,7 +490,9 @@ where
fn on_command(&mut self, cmd: TransactionsCommand) {
match cmd {
TransactionsCommand::PropagateHash(hash) => self.on_new_transactions(vec![hash]),
TransactionsCommand::PropagateHashTo(_hash, _peer) => todo!(),
TransactionsCommand::PropagateHashesTo(hashes, peer) => {
self.propagate_hashes_to(hashes, peer)
}
TransactionsCommand::GetActivePeers => todo!(),
TransactionsCommand::PropagateTransactionsTo(_txs, _peer) => todo!(),
TransactionsCommand::GetTransactionHashes(_peers) => todo!(),
@ -864,8 +926,8 @@ struct Peer {
enum TransactionsCommand {
/// Propagate a transaction hash to the network.
PropagateHash(H256),
/// Propagate a transaction hash to a specific peer.
PropagateHashTo(H256, PeerId),
/// Propagate transaction hashes to a specific peer.
PropagateHashesTo(Vec<H256>, PeerId),
/// Request the list of active peer IDs from the [`TransactionsManager`].
GetActivePeers,
/// Propagate a collection of full transactions to a specific peer.