diff --git a/crates/net/eth-wire/src/types/broadcast.rs b/crates/net/eth-wire/src/types/broadcast.rs index 581387389..c6fef5457 100644 --- a/crates/net/eth-wire/src/types/broadcast.rs +++ b/crates/net/eth-wire/src/types/broadcast.rs @@ -1,4 +1,5 @@ //! Types for broadcasting new data. +use crate::{EthMessage, EthVersion}; use reth_codecs::derive_arbitrary; use reth_primitives::{Block, TransactionSigned, H256, U128}; use reth_rlp::{RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper}; @@ -99,6 +100,78 @@ pub struct SharedTransactions( pub Vec>, ); +/// A wrapper type for all different new pooled transaction types +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum NewPooledTransactionHashes { + /// A list of transaction hashes valid for [66-68) + Eth66(NewPooledTransactionHashes66), + /// A list of transaction hashes valid from [68..] + /// + /// Note: it is assumed that the payload is valid (all vectors have the same length) + Eth68(NewPooledTransactionHashes68), +} + +// === impl NewPooledTransactionHashes === + +impl NewPooledTransactionHashes { + /// Returns `true` if the payload is valid for the given version + pub fn is_valid_for_version(&self, version: EthVersion) -> bool { + match self { + NewPooledTransactionHashes::Eth66(_) => { + matches!(version, EthVersion::Eth67 | EthVersion::Eth66) + } + NewPooledTransactionHashes::Eth68(_) => { + matches!(version, EthVersion::Eth68) + } + } + } + + /// Returns an iterator over all transaction hashes. + pub fn iter_hashes(&self) -> impl Iterator + '_ { + match self { + NewPooledTransactionHashes::Eth66(msg) => msg.0.iter(), + NewPooledTransactionHashes::Eth68(msg) => msg.hashes.iter(), + } + } + + /// Consumes the type and returns all hashes + pub fn into_hashes(self) -> Vec { + match self { + NewPooledTransactionHashes::Eth66(msg) => msg.0, + NewPooledTransactionHashes::Eth68(msg) => msg.hashes, + } + } + + /// Returns an iterator over all transaction hashes. + pub fn into_iter_hashes(self) -> impl Iterator { + match self { + NewPooledTransactionHashes::Eth66(msg) => msg.0.into_iter(), + NewPooledTransactionHashes::Eth68(msg) => msg.hashes.into_iter(), + } + } +} + +impl From for EthMessage { + fn from(value: NewPooledTransactionHashes) -> Self { + match value { + NewPooledTransactionHashes::Eth66(msg) => EthMessage::NewPooledTransactionHashes66(msg), + NewPooledTransactionHashes::Eth68(msg) => EthMessage::NewPooledTransactionHashes68(msg), + } + } +} + +impl From for NewPooledTransactionHashes { + fn from(hashes: NewPooledTransactionHashes66) -> Self { + Self::Eth66(hashes) + } +} + +impl From for NewPooledTransactionHashes { + fn from(hashes: NewPooledTransactionHashes68) -> Self { + Self::Eth68(hashes) + } +} + /// This informs peers of transaction hashes for transactions that have appeared on the network, /// but have not been included in a block. #[derive_arbitrary(rlp)] diff --git a/crates/net/network/src/message.rs b/crates/net/network/src/message.rs index 66f02863f..f2027c074 100644 --- a/crates/net/network/src/message.rs +++ b/crates/net/network/src/message.rs @@ -7,7 +7,7 @@ use futures::FutureExt; use reth_eth_wire::{ capability::RawCapabilityMessage, message::RequestPair, BlockBodies, BlockBody, BlockHeaders, EthMessage, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts, - NewBlock, NewBlockHashes, NewPooledTransactionHashes66, NodeData, PooledTransactions, Receipts, + NewBlock, NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts, SharedTransactions, Transactions, }; use reth_interfaces::p2p::error::{RequestError, RequestResult}; @@ -50,7 +50,7 @@ pub enum PeerMessage { /// Broadcast transactions _from_ local _to_ a peer. SendTransactions(SharedTransactions), /// Send new pooled transactions - PooledTransactions(NewPooledTransactionHashes66), + PooledTransactions(NewPooledTransactionHashes), /// All `eth` request variants. EthRequest(PeerRequest), /// Other than eth namespace message diff --git a/crates/net/network/src/network.rs b/crates/net/network/src/network.rs index 02024d9ae..a9bd08da3 100644 --- a/crates/net/network/src/network.rs +++ b/crates/net/network/src/network.rs @@ -4,7 +4,7 @@ use crate::{ }; use async_trait::async_trait; use parking_lot::Mutex; -use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes66, SharedTransactions}; +use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions}; use reth_interfaces::{ p2p::headers::client::StatusUpdater, sync::{SyncState, SyncStateProvider, SyncStateUpdater}, @@ -13,7 +13,7 @@ use reth_net_common::bandwidth_meter::BandwidthMeter; use reth_network_api::{ NetworkError, NetworkInfo, NetworkStatus, PeerKind, Peers, PeersInfo, ReputationChangeKind, }; -use reth_primitives::{Head, NodeRecord, PeerId, TransactionSigned, TxHash, H256}; +use reth_primitives::{Head, NodeRecord, PeerId, TransactionSigned, H256}; use std::{ net::SocketAddr, sync::{ @@ -141,11 +141,8 @@ impl NetworkHandle { } /// Send transactions hashes to the peer. - pub fn send_transactions_hashes(&self, peer_id: PeerId, msg: Vec) { - self.send_message(NetworkHandleMessage::SendPooledTransactionHashes { - peer_id, - msg: NewPooledTransactionHashes66(msg), - }) + pub fn send_transactions_hashes(&self, peer_id: PeerId, msg: NewPooledTransactionHashes) { + self.send_message(NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg }) } /// Send full transactions to the peer @@ -292,7 +289,7 @@ pub(crate) enum NetworkHandleMessage { /// Sends the list of transactions to the given peer. SendTransaction { peer_id: PeerId, msg: SharedTransactions }, /// Sends the list of transactions hashes to the given peer. - SendPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes66 }, + SendPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes }, /// Send an `eth` protocol request to the peer. EthRequest { /// The peer to send the request to. diff --git a/crates/net/network/src/session/active.rs b/crates/net/network/src/session/active.rs index 5d267b3db..d1e99e6cd 100644 --- a/crates/net/network/src/session/active.rs +++ b/crates/net/network/src/session/active.rs @@ -16,7 +16,7 @@ use reth_eth_wire::{ capability::Capabilities, errors::{EthHandshakeError, EthStreamError, P2PStreamError}, message::{EthBroadcastMessage, RequestPair}, - DisconnectReason, EthMessage, EthStream, EthVersion, P2PStream, + DisconnectReason, EthMessage, EthStream, P2PStream, }; use reth_interfaces::p2p::error::RequestError; use reth_metrics_common::metered_sender::MeteredSender; @@ -179,7 +179,7 @@ impl ActiveSession { self.try_emit_broadcast(PeerMessage::ReceivedTransaction(msg)).into() } EthMessage::NewPooledTransactionHashes66(msg) => { - self.try_emit_broadcast(PeerMessage::PooledTransactions(msg)).into() + self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.into())).into() } EthMessage::NewPooledTransactionHashes68(msg) => { if msg.hashes.len() != msg.types.len() || msg.hashes.len() != msg.sizes.len() { @@ -192,8 +192,7 @@ impl ActiveSession { message: EthMessage::NewPooledTransactionHashes68(msg), } } - // TODO revise `PeerMessage::PooledTransactions` to have `types` and `sizes` - self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.hashes.into())).into() + self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.into())).into() } EthMessage::GetBlockHeaders(req) => { on_request!(req, BlockHeaders, GetBlockHeaders) @@ -251,12 +250,8 @@ impl ActiveSession { self.queued_outgoing.push_back(EthBroadcastMessage::NewBlock(msg.block).into()); } PeerMessage::PooledTransactions(msg) => { - if self.conn.version() >= EthVersion::Eth68 { - // TODO - // we don't know types and sizes yet - } else { - self.queued_outgoing - .push_back(EthMessage::NewPooledTransactionHashes66(msg).into()); + if msg.is_valid_for_version(self.conn.version()) { + self.queued_outgoing.push_back(EthMessage::from(msg).into()); } } PeerMessage::EthRequest(req) => { diff --git a/crates/net/network/src/transactions.rs b/crates/net/network/src/transactions.rs index ef6ebadea..3b4707813 100644 --- a/crates/net/network/src/transactions.rs +++ b/crates/net/network/src/transactions.rs @@ -5,18 +5,19 @@ use crate::{ manager::NetworkEvent, message::{PeerRequest, PeerRequestSender}, metrics::TransactionsManagerMetrics, - network::NetworkHandleMessage, NetworkHandle, }; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use reth_eth_wire::{ - GetPooledTransactions, NewPooledTransactionHashes66, PooledTransactions, Transactions, + EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66, + NewPooledTransactionHashes68, PooledTransactions, Transactions, }; use reth_interfaces::{p2p::error::RequestResult, sync::SyncStateProvider}; use reth_network_api::{Peers, ReputationChangeKind}; use reth_primitives::{ FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, H256, }; +use reth_rlp::Encodable; use reth_transaction_pool::{ error::PoolResult, PropagateKind, PropagatedTransactions, TransactionPool, }; @@ -197,7 +198,8 @@ where .get_all(hashes) .into_iter() .map(|tx| { - (*tx.hash(), Arc::new(tx.transaction.to_recovered_transaction().into_signed())) + let tx = Arc::new(tx.transaction.to_recovered_transaction().into_signed()); + PropagateTransaction::new(tx) }) .collect(), ); @@ -206,9 +208,13 @@ where self.pool.on_propagated(propagated); } + /// Propagate the transactions to all connected peers either as full objects or hashes + /// + /// The message for new pooled hashes depends on the negotiated version of the stream. + /// See [NewPooledTransactionHashes](NewPooledTransactionHashes) fn propagate_transactions( &mut self, - txs: Vec<(TxHash, Arc)>, + to_propagate: Vec, ) -> PropagatedTransactions { let mut propagated = PropagatedTransactions::default(); @@ -218,21 +224,29 @@ where // Note: Assuming ~random~ order due to random state of the peers map hasher for (idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() { - let (hashes, full): (Vec<_>, Vec<_>) = - txs.iter().filter(|(hash, _)| peer.transactions.insert(*hash)).cloned().unzip(); + // filter all transactions unknown to the peer + let mut hashes = PooledTransactionsHashesBuilder::new(peer.version); + let mut full_transactions = Vec::new(); + for tx in to_propagate.iter() { + if peer.transactions.insert(tx.hash()) { + hashes.push(tx); + full_transactions.push(Arc::clone(&tx.transaction)); + } + } + let hashes = hashes.build(); - if !full.is_empty() { + if !full_transactions.is_empty() { if idx > max_num_full { - for hash in &hashes { - propagated.0.entry(*hash).or_default().push(PropagateKind::Hash(*peer_id)); + for hash in hashes.iter_hashes().copied() { + propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id)); } // send hashes of transactions self.network.send_transactions_hashes(*peer_id, hashes); } else { // send full transactions - self.network.send_transactions(*peer_id, full); + self.network.send_transactions(*peer_id, full_transactions); - for hash in hashes { + for hash in hashes.into_iter_hashes() { propagated.0.entry(hash).or_default().push(PropagateKind::Full(*peer_id)); } } @@ -249,7 +263,7 @@ where fn on_new_pooled_transaction_hashes( &mut self, peer_id: PeerId, - msg: NewPooledTransactionHashes66, + msg: NewPooledTransactionHashes, ) { // If the node is currently syncing, ignore transactions if self.network.is_syncing() { @@ -259,18 +273,17 @@ where let mut num_already_seen = 0; if let Some(peer) = self.peers.get_mut(&peer_id) { - let mut transactions = msg.0; - + let mut hashes = msg.into_hashes(); // keep track of the transactions the peer knows - for tx in transactions.iter().copied() { + for tx in hashes.iter().copied() { if !peer.transactions.insert(tx) { num_already_seen += 1; } } - self.pool.retain_unknown(&mut transactions); + self.pool.retain_unknown(&mut hashes); - if transactions.is_empty() { + if hashes.is_empty() { // nothing to request return } @@ -278,7 +291,7 @@ where // request the missing transactions let (response, rx) = oneshot::channel(); let req = PeerRequest::GetPooledTransactions { - request: GetPooledTransactions(transactions), + request: GetPooledTransactions(hashes), response, }; @@ -323,7 +336,7 @@ where // remove the peer self.peers.remove(&peer_id); } - NetworkEvent::SessionEstablished { peer_id, messages, .. } => { + NetworkEvent::SessionEstablished { peer_id, messages, version, .. } => { // insert a new peer self.peers.insert( peer_id, @@ -332,17 +345,19 @@ where NonZeroUsize::new(PEER_TRANSACTION_CACHE_LIMIT).unwrap(), ), request_tx: messages, + version, }, ); // Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the // pool if !self.network.is_syncing() { - let msg = NewPooledTransactionHashes66(self.pool.pooled_transactions()); - self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes { - peer_id, - msg, - }) + todo!("get access to full tx"); + // let msg = NewPooledTransactionHashes66(self.pool.pooled_transactions()); + // self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes { + // peer_id, + // msg, + // }) } } // TODO Add remaining events @@ -502,6 +517,64 @@ where } } +/// A transaction that's about to be propagated +struct PropagateTransaction { + tx_type: u8, + length: usize, + transaction: Arc, +} + +// === impl PropagateTransaction === + +impl PropagateTransaction { + fn hash(&self) -> TxHash { + self.transaction.hash + } + + fn new(transaction: Arc) -> Self { + Self { tx_type: transaction.tx_type().into(), length: transaction.length(), transaction } + } +} + +/// A helper type to create the pooled transactions message based on the negotiated version of the +/// session with the peer +enum PooledTransactionsHashesBuilder { + Eth66(NewPooledTransactionHashes66), + Eth68(NewPooledTransactionHashes68), +} + +// === impl PooledTransactionsHashesBuilder === + +impl PooledTransactionsHashesBuilder { + fn push(&mut self, tx: &PropagateTransaction) { + match self { + PooledTransactionsHashesBuilder::Eth66(msg) => msg.0.push(tx.hash()), + PooledTransactionsHashesBuilder::Eth68(msg) => { + msg.hashes.push(tx.hash()); + msg.sizes.push(tx.length); + msg.types.push(tx.tx_type); + } + } + } + + /// Create a builder for the negotiated version of the peer's session + fn new(version: EthVersion) -> Self { + match version { + EthVersion::Eth66 | EthVersion::Eth67 => { + PooledTransactionsHashesBuilder::Eth66(Default::default()) + } + EthVersion::Eth68 => PooledTransactionsHashesBuilder::Eth68(Default::default()), + } + } + + fn build(self) -> NewPooledTransactionHashes { + match self { + PooledTransactionsHashesBuilder::Eth66(msg) => msg.into(), + PooledTransactionsHashesBuilder::Eth68(msg) => msg.into(), + } + } +} + /// How we received the transactions. enum TransactionSource { /// Transactions were broadcast to us via [`Transactions`] message. @@ -532,6 +605,8 @@ struct Peer { transactions: LruCache, /// A communication channel directly to the session task. request_tx: PeerRequestSender, + /// negotiated version of the session. + version: EthVersion, } /// Commands to send to the [`TransactionsManager`](crate::transactions::TransactionsManager) @@ -546,7 +621,7 @@ pub enum NetworkTransactionEvent { /// Received list of transactions from the given peer. IncomingTransactions { peer_id: PeerId, msg: Transactions }, /// Received list of transactions hashes to the given peer. - IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes66 }, + IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes }, /// Incoming `GetPooledTransactions` request from a peer. GetPooledTransactions { peer_id: PeerId, diff --git a/crates/primitives/src/transaction/tx_type.rs b/crates/primitives/src/transaction/tx_type.rs index 215ff5194..c2d86fe68 100644 --- a/crates/primitives/src/transaction/tx_type.rs +++ b/crates/primitives/src/transaction/tx_type.rs @@ -14,12 +14,22 @@ pub enum TxType { EIP1559 = 2_isize, } +impl From for u8 { + fn from(value: TxType) -> Self { + match value { + TxType::Legacy => 0, + TxType::EIP2930 => 1, + TxType::EIP1559 => 2, + } + } +} + impl Compact for TxType { fn to_compact(self, _: &mut impl bytes::BufMut) -> usize { match self { TxType::Legacy => 0, TxType::EIP2930 => 1, - _ => 2, + TxType::EIP1559 => 2, } }