feat: relax more tx manager bounds (#12744)

This commit is contained in:
Matthias Seitz
2024-11-21 16:35:43 +01:00
committed by GitHub
parent c73dadacb2
commit 54ff4c7349
3 changed files with 211 additions and 187 deletions

View File

@ -48,7 +48,7 @@ use reth_network_p2p::{
};
use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind;
use reth_primitives::{PooledTransactionsElement, TransactionSigned, TransactionSignedEcRecovered};
use reth_primitives::{PooledTransactionsElement, TransactionSigned};
use reth_primitives_traits::{SignedTransaction, TransactionExt, TxType};
use reth_tokio_util::EventStream;
use reth_transaction_pool::{
@ -678,40 +678,13 @@ where
}
}
impl<Pool> TransactionsManager<Pool>
impl<Pool, N> TransactionsManager<Pool, N>
where
Pool: TransactionPool + 'static,
Pool: TransactionPool,
N: NetworkPrimitives<BroadcastedTransaction: SignedTransaction>,
<<Pool as TransactionPool>::Transaction as PoolTransaction>::Consensus:
Into<N::BroadcastedTransaction>,
{
/// Request handler for an incoming request for transactions
fn on_get_pooled_transactions(
&mut self,
peer_id: PeerId,
request: GetPooledTransactions,
response: oneshot::Sender<RequestResult<PooledTransactions>>,
) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
if self.network.tx_gossip_disabled() {
let _ = response.send(Ok(PooledTransactions::default()));
return
}
let transactions = self.pool.get_pooled_transaction_elements(
request.0,
GetPooledTransactionLimit::ResponseSizeSoftLimit(
self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
),
);
trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| *tx.hash()), "Sending requested transactions to peer");
// we sent a response at which point we assume that the peer is aware of the
// transactions
peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.hash()));
let resp = PooledTransactions(transactions);
let _ = response.send(Ok(resp));
}
}
/// Invoked when transactions in the local mempool are considered __pending__.
///
/// When a transaction in the local mempool is moved to the pending pool, we propagate them to
@ -737,15 +710,139 @@ where
self.propagate_all(hashes);
}
/// Propagates the given transactions to the peers
/// Propagate the full transactions to a specific peer.
///
/// This fetches all transaction from the pool, including the 4844 blob transactions but
/// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
fn propagate_all(&mut self, hashes: Vec<TxHash>) {
let propagated = self.propagate_transactions(
self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(),
PropagationMode::Basic,
);
/// Returns the propagated transactions.
fn propagate_full_transactions_to_peer(
&mut self,
txs: Vec<TxHash>,
peer_id: PeerId,
propagation_mode: PropagationMode,
) -> Option<PropagatedTransactions> {
trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
let peer = self.peers.get_mut(&peer_id)?;
let mut propagated = PropagatedTransactions::default();
// filter all transactions unknown to the peer
let mut full_transactions = FullTransactionsBuilder::new(peer.version);
let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::new);
if propagation_mode.is_forced() {
// skip cache check if forced
full_transactions.extend(to_propagate);
} else {
// Iterate through the transactions to propagate and fill the hashes and full
// transaction
for tx in to_propagate {
if !peer.seen_transactions.contains(&tx.hash()) {
// Only include if the peer hasn't seen the transaction
full_transactions.push(&tx);
}
}
}
if full_transactions.is_empty() {
// nothing to propagate
return None
}
let PropagateTransactions { pooled, full } = full_transactions.build();
// send hashes if any
if let Some(new_pooled_hashes) = pooled {
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(hash);
}
// send hashes of transactions
self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
}
// send full transactions, if any
if let Some(new_full_transactions) = full {
for tx in &new_full_transactions {
propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(*tx.tx_hash());
}
// send full transactions
self.network.send_transactions(peer_id, new_full_transactions);
}
// Update propagated transactions metrics
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
Some(propagated)
}
/// Propagate the transaction hashes to the given peer
///
/// Note: This will only send the hashes for transactions that exist in the pool.
fn propagate_hashes_to(
&mut self,
hashes: Vec<TxHash>,
peer_id: PeerId,
propagation_mode: PropagationMode,
) {
trace!(target: "net::tx", "Start propagating transactions as hashes");
// This fetches a transactions from the pool, including the blob transactions, which are
// only ever sent as hashes.
let propagated = {
let Some(peer) = self.peers.get_mut(&peer_id) else {
// no such peer
return
};
let to_propagate = self
.pool
.get_all(hashes)
.into_iter()
.map(PropagateTransaction::new)
.collect::<Vec<_>>();
let mut propagated = PropagatedTransactions::default();
// check if transaction is known to peer
let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
if propagation_mode.is_forced() {
hashes.extend(to_propagate)
} else {
for tx in to_propagate {
if !peer.seen_transactions.contains(&tx.hash()) {
// Include if the peer hasn't seen it
hashes.push(&tx);
}
}
}
let new_pooled_hashes = hashes.build();
if new_pooled_hashes.is_empty() {
// nothing to propagate
return
}
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
}
trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
// send hashes of transactions
self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
// Update propagated transactions metrics
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
propagated
};
// notify pool so events get fired
self.pool.on_propagated(propagated);
@ -759,7 +856,7 @@ where
/// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also <https://eips.ethereum.org/EIPS/eip-4844#networking>.
fn propagate_transactions(
&mut self,
to_propagate: Vec<PropagateTransaction>,
to_propagate: Vec<PropagateTransaction<N::BroadcastedTransaction>>,
propagation_mode: PropagationMode,
) -> PropagatedTransactions {
let mut propagated = PropagatedTransactions::default();
@ -823,9 +920,13 @@ where
// send full transactions, if any
if let Some(new_full_transactions) = full {
for tx in &new_full_transactions {
propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(*peer_id));
propagated
.0
.entry(*tx.tx_hash())
.or_default()
.push(PropagateKind::Full(*peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
peer.seen_transactions.insert(*tx.tx_hash());
}
trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer");
@ -841,139 +942,55 @@ where
propagated
}
/// Propagate the full transactions to a specific peer.
/// Propagates the given transactions to the peers
///
/// Returns the propagated transactions.
fn propagate_full_transactions_to_peer(
&mut self,
txs: Vec<TxHash>,
peer_id: PeerId,
propagation_mode: PropagationMode,
) -> Option<PropagatedTransactions> {
trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer");
let peer = self.peers.get_mut(&peer_id)?;
let mut propagated = PropagatedTransactions::default();
// filter all transactions unknown to the peer
let mut full_transactions = FullTransactionsBuilder::new(peer.version);
let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::new);
if propagation_mode.is_forced() {
// skip cache check if forced
full_transactions.extend(to_propagate);
} else {
// Iterate through the transactions to propagate and fill the hashes and full
// transaction
for tx in to_propagate {
if !peer.seen_transactions.contains(&tx.hash()) {
// Only include if the peer hasn't seen the transaction
full_transactions.push(&tx);
}
}
}
if full_transactions.is_empty() {
// nothing to propagate
return None
}
let PropagateTransactions { pooled, full } = full_transactions.build();
// send hashes if any
if let Some(new_pooled_hashes) = pooled {
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(hash);
}
// send hashes of transactions
self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
}
// send full transactions, if any
if let Some(new_full_transactions) = full {
for tx in &new_full_transactions {
propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
}
// send full transactions
self.network.send_transactions(peer_id, new_full_transactions);
}
// Update propagated transactions metrics
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
Some(propagated)
}
/// Propagate the transaction hashes to the given peer
///
/// Note: This will only send the hashes for transactions that exist in the pool.
fn propagate_hashes_to(
&mut self,
hashes: Vec<TxHash>,
peer_id: PeerId,
propagation_mode: PropagationMode,
) {
trace!(target: "net::tx", "Start propagating transactions as hashes");
// This fetches a transactions from the pool, including the blob transactions, which are
// only ever sent as hashes.
let propagated = {
let Some(peer) = self.peers.get_mut(&peer_id) else {
// no such peer
return
};
let to_propagate: Vec<PropagateTransaction> =
self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect();
let mut propagated = PropagatedTransactions::default();
// check if transaction is known to peer
let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
if propagation_mode.is_forced() {
hashes.extend(to_propagate)
} else {
for tx in to_propagate {
if !peer.seen_transactions.contains(&tx.hash()) {
// Include if the peer hasn't seen it
hashes.push(&tx);
}
}
}
let new_pooled_hashes = hashes.build();
if new_pooled_hashes.is_empty() {
// nothing to propagate
return
}
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
}
trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer");
// send hashes of transactions
self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
// Update propagated transactions metrics
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
propagated
};
/// This fetches all transaction from the pool, including the 4844 blob transactions but
/// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
fn propagate_all(&mut self, hashes: Vec<TxHash>) {
let propagated = self.propagate_transactions(
self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(),
PropagationMode::Basic,
);
// notify pool so events get fired
self.pool.on_propagated(propagated);
}
}
impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool + 'static,
<<Pool as TransactionPool>::Transaction as PoolTransaction>::Consensus: Into<TransactionSigned>,
{
/// Request handler for an incoming request for transactions
fn on_get_pooled_transactions(
&mut self,
peer_id: PeerId,
request: GetPooledTransactions,
response: oneshot::Sender<RequestResult<PooledTransactions>>,
) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
if self.network.tx_gossip_disabled() {
let _ = response.send(Ok(PooledTransactions::default()));
return
}
let transactions = self.pool.get_pooled_transaction_elements(
request.0,
GetPooledTransactionLimit::ResponseSizeSoftLimit(
self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
),
);
trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| *tx.hash()), "Sending requested transactions to peer");
// we sent a response at which point we assume that the peer is aware of the
// transactions
peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.hash()));
let resp = PooledTransactions(transactions);
let _ = response.send(Ok(resp));
}
}
/// Handles dedicated transaction events related to the `eth` protocol.
fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) {
@ -1273,6 +1290,7 @@ where
impl<Pool> Future for TransactionsManager<Pool>
where
Pool: TransactionPool + Unpin + 'static,
<<Pool as TransactionPool>::Transaction as PoolTransaction>::Consensus: Into<TransactionSigned>,
{
type Output = ();
@ -1456,21 +1474,18 @@ struct PropagateTransaction<T = TransactionSigned> {
transaction: Arc<T>,
}
impl PropagateTransaction {
impl<T: SignedTransaction> PropagateTransaction<T> {
/// Create a new instance from a pooled transaction
fn new<T>(tx: Arc<ValidPoolTransaction<T>>) -> Self
fn new<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
where
T: PoolTransaction<Consensus: Into<TransactionSignedEcRecovered>>,
P: PoolTransaction<Consensus: Into<T>>,
{
let size = tx.encoded_length();
let recovered: TransactionSignedEcRecovered =
tx.transaction.clone().into_consensus().into();
let transaction = Arc::new(recovered.into_signed());
let transaction = tx.transaction.clone().into_consensus().into();
let transaction = Arc::new(transaction);
Self { size, transaction }
}
}
impl<T: SignedTransaction> PropagateTransaction<T> {
fn hash(&self) -> TxHash {
*self.transaction.tx_hash()
}
@ -2372,7 +2387,8 @@ mod tests {
#[test]
fn test_transaction_builder_empty() {
let mut builder = PropagateTransactionsBuilder::pooled(EthVersion::Eth68);
let mut builder =
PropagateTransactionsBuilder::<TransactionSigned>::pooled(EthVersion::Eth68);
assert!(builder.is_empty());
let mut factory = MockTransactionFactory::default();
@ -2388,7 +2404,8 @@ mod tests {
#[test]
fn test_transaction_builder_large() {
let mut builder = PropagateTransactionsBuilder::full(EthVersion::Eth68);
let mut builder =
PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
assert!(builder.is_empty());
let mut factory = MockTransactionFactory::default();
@ -2416,7 +2433,8 @@ mod tests {
#[test]
fn test_transaction_builder_eip4844() {
let mut builder = PropagateTransactionsBuilder::full(EthVersion::Eth68);
let mut builder =
PropagateTransactionsBuilder::<TransactionSigned>::full(EthVersion::Eth68);
assert!(builder.is_empty());
let mut factory = MockTransactionFactory::default();