diff --git a/crates/net/network/src/transactions/mod.rs b/crates/net/network/src/transactions/mod.rs index 2a5496dee..a4eef2fa9 100644 --- a/crates/net/network/src/transactions/mod.rs +++ b/crates/net/network/src/transactions/mod.rs @@ -48,7 +48,7 @@ use reth_network_p2p::{ }; use reth_network_peers::PeerId; use reth_network_types::ReputationChangeKind; -use reth_primitives::{PooledTransactionsElement, TransactionSigned, TransactionSignedEcRecovered}; +use reth_primitives::{PooledTransactionsElement, TransactionSigned}; use reth_primitives_traits::{SignedTransaction, TransactionExt, TxType}; use reth_tokio_util::EventStream; use reth_transaction_pool::{ @@ -678,40 +678,13 @@ where } } -impl TransactionsManager +impl TransactionsManager where - Pool: TransactionPool + 'static, + Pool: TransactionPool, + N: NetworkPrimitives, + <::Transaction as PoolTransaction>::Consensus: + Into, { - /// Request handler for an incoming request for transactions - fn on_get_pooled_transactions( - &mut self, - peer_id: PeerId, - request: GetPooledTransactions, - response: oneshot::Sender>, - ) { - if let Some(peer) = self.peers.get_mut(&peer_id) { - if self.network.tx_gossip_disabled() { - let _ = response.send(Ok(PooledTransactions::default())); - return - } - let transactions = self.pool.get_pooled_transaction_elements( - request.0, - GetPooledTransactionLimit::ResponseSizeSoftLimit( - self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response, - ), - ); - - trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| *tx.hash()), "Sending requested transactions to peer"); - - // we sent a response at which point we assume that the peer is aware of the - // transactions - peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.hash())); - - let resp = PooledTransactions(transactions); - let _ = response.send(Ok(resp)); - } - } - /// Invoked when transactions in the local mempool are considered __pending__. /// /// When a transaction in the local mempool is moved to the pending pool, we propagate them to @@ -737,15 +710,139 @@ where self.propagate_all(hashes); } - /// Propagates the given transactions to the peers + /// Propagate the full transactions to a specific peer. /// - /// This fetches all transaction from the pool, including the 4844 blob transactions but - /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes. - fn propagate_all(&mut self, hashes: Vec) { - let propagated = self.propagate_transactions( - self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(), - PropagationMode::Basic, - ); + /// Returns the propagated transactions. + fn propagate_full_transactions_to_peer( + &mut self, + txs: Vec, + peer_id: PeerId, + propagation_mode: PropagationMode, + ) -> Option { + trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer"); + + let peer = self.peers.get_mut(&peer_id)?; + let mut propagated = PropagatedTransactions::default(); + + // filter all transactions unknown to the peer + let mut full_transactions = FullTransactionsBuilder::new(peer.version); + + let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::new); + + if propagation_mode.is_forced() { + // skip cache check if forced + full_transactions.extend(to_propagate); + } else { + // Iterate through the transactions to propagate and fill the hashes and full + // transaction + for tx in to_propagate { + if !peer.seen_transactions.contains(&tx.hash()) { + // Only include if the peer hasn't seen the transaction + full_transactions.push(&tx); + } + } + } + + if full_transactions.is_empty() { + // nothing to propagate + return None + } + + let PropagateTransactions { pooled, full } = full_transactions.build(); + + // send hashes if any + if let Some(new_pooled_hashes) = pooled { + for hash in new_pooled_hashes.iter_hashes().copied() { + propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id)); + // mark transaction as seen by peer + peer.seen_transactions.insert(hash); + } + + // send hashes of transactions + self.network.send_transactions_hashes(peer_id, new_pooled_hashes); + } + + // send full transactions, if any + if let Some(new_full_transactions) = full { + for tx in &new_full_transactions { + propagated.0.entry(*tx.tx_hash()).or_default().push(PropagateKind::Full(peer_id)); + // mark transaction as seen by peer + peer.seen_transactions.insert(*tx.tx_hash()); + } + + // send full transactions + self.network.send_transactions(peer_id, new_full_transactions); + } + + // Update propagated transactions metrics + self.metrics.propagated_transactions.increment(propagated.0.len() as u64); + + Some(propagated) + } + + /// Propagate the transaction hashes to the given peer + /// + /// Note: This will only send the hashes for transactions that exist in the pool. + fn propagate_hashes_to( + &mut self, + hashes: Vec, + peer_id: PeerId, + propagation_mode: PropagationMode, + ) { + trace!(target: "net::tx", "Start propagating transactions as hashes"); + + // This fetches a transactions from the pool, including the blob transactions, which are + // only ever sent as hashes. + let propagated = { + let Some(peer) = self.peers.get_mut(&peer_id) else { + // no such peer + return + }; + + let to_propagate = self + .pool + .get_all(hashes) + .into_iter() + .map(PropagateTransaction::new) + .collect::>(); + + let mut propagated = PropagatedTransactions::default(); + + // check if transaction is known to peer + let mut hashes = PooledTransactionsHashesBuilder::new(peer.version); + + if propagation_mode.is_forced() { + hashes.extend(to_propagate) + } else { + for tx in to_propagate { + if !peer.seen_transactions.contains(&tx.hash()) { + // Include if the peer hasn't seen it + hashes.push(&tx); + } + } + } + + let new_pooled_hashes = hashes.build(); + + if new_pooled_hashes.is_empty() { + // nothing to propagate + return + } + + for hash in new_pooled_hashes.iter_hashes().copied() { + propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id)); + } + + trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer"); + + // send hashes of transactions + self.network.send_transactions_hashes(peer_id, new_pooled_hashes); + + // Update propagated transactions metrics + self.metrics.propagated_transactions.increment(propagated.0.len() as u64); + + propagated + }; // notify pool so events get fired self.pool.on_propagated(propagated); @@ -759,7 +856,7 @@ where /// Note: EIP-4844 are disallowed from being broadcast in full and are only ever sent as hashes, see also . fn propagate_transactions( &mut self, - to_propagate: Vec, + to_propagate: Vec>, propagation_mode: PropagationMode, ) -> PropagatedTransactions { let mut propagated = PropagatedTransactions::default(); @@ -823,9 +920,13 @@ where // send full transactions, if any if let Some(new_full_transactions) = full { for tx in &new_full_transactions { - propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(*peer_id)); + propagated + .0 + .entry(*tx.tx_hash()) + .or_default() + .push(PropagateKind::Full(*peer_id)); // mark transaction as seen by peer - peer.seen_transactions.insert(tx.hash()); + peer.seen_transactions.insert(*tx.tx_hash()); } trace!(target: "net::tx", ?peer_id, num_txs=?new_full_transactions.len(), "Propagating full transactions to peer"); @@ -841,139 +942,55 @@ where propagated } - /// Propagate the full transactions to a specific peer. + /// Propagates the given transactions to the peers /// - /// Returns the propagated transactions. - fn propagate_full_transactions_to_peer( - &mut self, - txs: Vec, - peer_id: PeerId, - propagation_mode: PropagationMode, - ) -> Option { - trace!(target: "net::tx", ?peer_id, "Propagating transactions to peer"); - - let peer = self.peers.get_mut(&peer_id)?; - let mut propagated = PropagatedTransactions::default(); - - // filter all transactions unknown to the peer - let mut full_transactions = FullTransactionsBuilder::new(peer.version); - - let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::new); - - if propagation_mode.is_forced() { - // skip cache check if forced - full_transactions.extend(to_propagate); - } else { - // Iterate through the transactions to propagate and fill the hashes and full - // transaction - for tx in to_propagate { - if !peer.seen_transactions.contains(&tx.hash()) { - // Only include if the peer hasn't seen the transaction - full_transactions.push(&tx); - } - } - } - - if full_transactions.is_empty() { - // nothing to propagate - return None - } - - let PropagateTransactions { pooled, full } = full_transactions.build(); - - // send hashes if any - if let Some(new_pooled_hashes) = pooled { - for hash in new_pooled_hashes.iter_hashes().copied() { - propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id)); - // mark transaction as seen by peer - peer.seen_transactions.insert(hash); - } - - // send hashes of transactions - self.network.send_transactions_hashes(peer_id, new_pooled_hashes); - } - - // send full transactions, if any - if let Some(new_full_transactions) = full { - for tx in &new_full_transactions { - propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(peer_id)); - // mark transaction as seen by peer - peer.seen_transactions.insert(tx.hash()); - } - - // send full transactions - self.network.send_transactions(peer_id, new_full_transactions); - } - - // Update propagated transactions metrics - self.metrics.propagated_transactions.increment(propagated.0.len() as u64); - - Some(propagated) - } - - /// Propagate the transaction hashes to the given peer - /// - /// Note: This will only send the hashes for transactions that exist in the pool. - fn propagate_hashes_to( - &mut self, - hashes: Vec, - peer_id: PeerId, - propagation_mode: PropagationMode, - ) { - trace!(target: "net::tx", "Start propagating transactions as hashes"); - - // This fetches a transactions from the pool, including the blob transactions, which are - // only ever sent as hashes. - let propagated = { - let Some(peer) = self.peers.get_mut(&peer_id) else { - // no such peer - return - }; - - let to_propagate: Vec = - self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(); - - let mut propagated = PropagatedTransactions::default(); - - // check if transaction is known to peer - let mut hashes = PooledTransactionsHashesBuilder::new(peer.version); - - if propagation_mode.is_forced() { - hashes.extend(to_propagate) - } else { - for tx in to_propagate { - if !peer.seen_transactions.contains(&tx.hash()) { - // Include if the peer hasn't seen it - hashes.push(&tx); - } - } - } - - let new_pooled_hashes = hashes.build(); - - if new_pooled_hashes.is_empty() { - // nothing to propagate - return - } - - for hash in new_pooled_hashes.iter_hashes().copied() { - propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id)); - } - - trace!(target: "net::tx::propagation", ?peer_id, ?new_pooled_hashes, "Propagating transactions to peer"); - - // send hashes of transactions - self.network.send_transactions_hashes(peer_id, new_pooled_hashes); - - // Update propagated transactions metrics - self.metrics.propagated_transactions.increment(propagated.0.len() as u64); - - propagated - }; + /// This fetches all transaction from the pool, including the 4844 blob transactions but + /// __without__ their sidecar, because 4844 transactions are only ever announced as hashes. + fn propagate_all(&mut self, hashes: Vec) { + let propagated = self.propagate_transactions( + self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(), + PropagationMode::Basic, + ); // notify pool so events get fired self.pool.on_propagated(propagated); } +} + +impl TransactionsManager +where + Pool: TransactionPool + 'static, + <::Transaction as PoolTransaction>::Consensus: Into, +{ + /// Request handler for an incoming request for transactions + fn on_get_pooled_transactions( + &mut self, + peer_id: PeerId, + request: GetPooledTransactions, + response: oneshot::Sender>, + ) { + if let Some(peer) = self.peers.get_mut(&peer_id) { + if self.network.tx_gossip_disabled() { + let _ = response.send(Ok(PooledTransactions::default())); + return + } + let transactions = self.pool.get_pooled_transaction_elements( + request.0, + GetPooledTransactionLimit::ResponseSizeSoftLimit( + self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response, + ), + ); + + trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| *tx.hash()), "Sending requested transactions to peer"); + + // we sent a response at which point we assume that the peer is aware of the + // transactions + peer.seen_transactions.extend(transactions.iter().map(|tx| *tx.hash())); + + let resp = PooledTransactions(transactions); + let _ = response.send(Ok(resp)); + } + } /// Handles dedicated transaction events related to the `eth` protocol. fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) { @@ -1273,6 +1290,7 @@ where impl Future for TransactionsManager where Pool: TransactionPool + Unpin + 'static, + <::Transaction as PoolTransaction>::Consensus: Into, { type Output = (); @@ -1456,21 +1474,18 @@ struct PropagateTransaction { transaction: Arc, } -impl PropagateTransaction { +impl PropagateTransaction { /// Create a new instance from a pooled transaction - fn new(tx: Arc>) -> Self + fn new

(tx: Arc>) -> Self where - T: PoolTransaction>, + P: PoolTransaction>, { let size = tx.encoded_length(); - let recovered: TransactionSignedEcRecovered = - tx.transaction.clone().into_consensus().into(); - let transaction = Arc::new(recovered.into_signed()); + let transaction = tx.transaction.clone().into_consensus().into(); + let transaction = Arc::new(transaction); Self { size, transaction } } -} -impl PropagateTransaction { fn hash(&self) -> TxHash { *self.transaction.tx_hash() } @@ -2372,7 +2387,8 @@ mod tests { #[test] fn test_transaction_builder_empty() { - let mut builder = PropagateTransactionsBuilder::pooled(EthVersion::Eth68); + let mut builder = + PropagateTransactionsBuilder::::pooled(EthVersion::Eth68); assert!(builder.is_empty()); let mut factory = MockTransactionFactory::default(); @@ -2388,7 +2404,8 @@ mod tests { #[test] fn test_transaction_builder_large() { - let mut builder = PropagateTransactionsBuilder::full(EthVersion::Eth68); + let mut builder = + PropagateTransactionsBuilder::::full(EthVersion::Eth68); assert!(builder.is_empty()); let mut factory = MockTransactionFactory::default(); @@ -2416,7 +2433,8 @@ mod tests { #[test] fn test_transaction_builder_eip4844() { - let mut builder = PropagateTransactionsBuilder::full(EthVersion::Eth68); + let mut builder = + PropagateTransactionsBuilder::::full(EthVersion::Eth68); assert!(builder.is_empty()); let mut factory = MockTransactionFactory::default(); diff --git a/crates/optimism/node/src/txpool.rs b/crates/optimism/node/src/txpool.rs index 0edfeec73..7df5888fb 100644 --- a/crates/optimism/node/src/txpool.rs +++ b/crates/optimism/node/src/txpool.rs @@ -3,7 +3,9 @@ use alloy_eips::eip2718::Encodable2718; use parking_lot::RwLock; use reth_chainspec::ChainSpec; use reth_optimism_evm::RethL1BlockInfo; -use reth_primitives::{Block, GotExpected, InvalidTransactionError, SealedBlock}; +use reth_primitives::{ + Block, GotExpected, InvalidTransactionError, SealedBlock, TransactionSigned, +}; use reth_provider::{BlockReaderIdExt, StateProviderFactory}; use reth_revm::L1BlockInfo; use reth_transaction_pool::{ @@ -140,7 +142,8 @@ where let l1_block_info = self.block_info.l1_block_info.read().clone(); let mut encoded = Vec::with_capacity(valid_tx.transaction().encoded_length()); - valid_tx.transaction().clone().into_consensus().into().encode_2718(&mut encoded); + let tx: TransactionSigned = valid_tx.transaction().clone().into_consensus().into(); + tx.encode_2718(&mut encoded); let cost_addition = match l1_block_info.l1_tx_data_fee( &self.chain_spec(), diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index bcde571b0..68a911f2e 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -20,7 +20,8 @@ use reth_eth_wire_types::HandleMempoolData; use reth_execution_types::ChangedAccount; use reth_primitives::{ kzg::KzgSettings, transaction::TryFromRecoveredTransactionError, PooledTransactionsElement, - PooledTransactionsElementEcRecovered, SealedBlock, Transaction, TransactionSignedEcRecovered, + PooledTransactionsElementEcRecovered, SealedBlock, Transaction, TransactionSigned, + TransactionSignedEcRecovered, }; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; @@ -1068,7 +1069,9 @@ pub trait PoolTransaction: fmt::Debug + Send + Sync + Clone { /// Super trait for transactions that can be converted to and from Eth transactions pub trait EthPoolTransaction: PoolTransaction< - Consensus: From + Into, + Consensus: From + + Into + + Into, Pooled: From + Into, > {