feat(net): propagate new transactions (#256)

This commit is contained in:
Matthias Seitz
2022-11-25 22:13:20 +01:00
committed by GitHub
parent 37809ce774
commit b6d9fe87b9
7 changed files with 140 additions and 8 deletions

View File

@ -105,6 +105,11 @@ impl NetworkHandle {
pub fn send_request(&self, peer_id: PeerId, request: PeerRequest) {
self.send_message(NetworkHandleMessage::EthRequest { peer_id, request })
}
/// Send full transactions to the peer
pub fn send_transactions(&self, peer_id: PeerId, msg: Arc<Transactions>) {
self.send_message(NetworkHandleMessage::SendTransaction { peer_id, msg })
}
}
struct NetworkInner {

View File

@ -16,7 +16,9 @@ use reth_interfaces::p2p::error::RequestResult;
use reth_primitives::{
FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, H256,
};
use reth_transaction_pool::{error::PoolResult, TransactionPool};
use reth_transaction_pool::{
error::PoolResult, PropagateKind, PropagatedTransactions, TransactionPool,
};
use std::{
collections::{hash_map::Entry, HashMap},
future::Future,
@ -27,6 +29,7 @@ use std::{
};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use tracing::trace;
/// Cache limit of transactions to keep track of for a single peer.
const PEER_TRANSACTION_CACHE_LIMIT: usize = 1024 * 10;
@ -151,6 +154,55 @@ where
}
}
/// Invoked when a new transaction is pending.
///
/// When new transactions appear in the pool, we propagate them to the network using the
/// `Transactions` and `NewPooledTransactionHashes` messages. The Transactions message relays
/// complete transaction objects and is typically sent to a small, random fraction of connected
/// peers.
///
/// All other peers receive a notification of the transaction hash and can request the
/// complete transaction object if it is unknown to them. The dissemination of complete
/// transactions to a fraction of peers usually ensures that all nodes receive the transaction
/// and won't need to request it.
fn on_new_transactions(&mut self, hashes: impl IntoIterator<Item = TxHash>) {
trace!(target: "net::tx", "Start propagating transactions");
let propagated = self.propagate_transactions(
self.pool
.get_all(hashes)
.into_iter()
.map(|tx| (*tx.hash(), tx.transaction.to_recovered_transaction().into_signed()))
.collect(),
);
// notify pool so events get fired
self.pool.on_propagated(propagated);
}
fn propagate_transactions(
&mut self,
txs: Vec<(TxHash, TransactionSigned)>,
) -> PropagatedTransactions {
let mut propagated = PropagatedTransactions::default();
for (peer_id, peer) in self.peers.iter_mut() {
let (hashes, full): (Vec<_>, Vec<_>) =
txs.iter().filter(|(hash, _)| peer.transactions.insert(*hash)).cloned().unzip();
if !full.is_empty() {
// TODO select peer for full or hash
self.network.send_transactions(*peer_id, Arc::new(Transactions(full)));
for hash in hashes {
propagated.0.entry(hash).or_default().push(PropagateKind::Full(*peer_id));
}
}
}
propagated
}
/// Request handler for an incoming `NewPooledTransactionHashes`
fn on_new_pooled_transactions(
&mut self,
@ -323,8 +375,12 @@ where
}
// handle new transactions
while let Poll::Ready(Some(_hash)) = this.pending_transactions.poll_next_unpin(cx) {
// TODO(mattsse): propagate new transactions
let mut new_txs = Vec::new();
while let Poll::Ready(Some(hash)) = this.pending_transactions.poll_next_unpin(cx) {
new_txs.push(hash);
}
if !new_txs.is_empty() {
this.on_new_transactions(new_txs);
}
// Advance all requests.