feat(net): support eth68 transactions (#1482)

This commit is contained in:
Matthias Seitz
2023-02-21 20:34:59 +01:00
committed by GitHub
parent f0f8f417d6
commit f78da81e1e
6 changed files with 196 additions and 46 deletions

View File

@ -1,4 +1,5 @@
//! Types for broadcasting new data.
use crate::{EthMessage, EthVersion};
use reth_codecs::derive_arbitrary;
use reth_primitives::{Block, TransactionSigned, H256, U128};
use reth_rlp::{RlpDecodable, RlpDecodableWrapper, RlpEncodable, RlpEncodableWrapper};
@ -99,6 +100,78 @@ pub struct SharedTransactions(
pub Vec<Arc<TransactionSigned>>,
);
/// A wrapper type for all different new pooled transaction types
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum NewPooledTransactionHashes {
/// A list of transaction hashes valid for [66-68)
Eth66(NewPooledTransactionHashes66),
/// A list of transaction hashes valid from [68..]
///
/// Note: it is assumed that the payload is valid (all vectors have the same length)
Eth68(NewPooledTransactionHashes68),
}
// === impl NewPooledTransactionHashes ===
impl NewPooledTransactionHashes {
/// Returns `true` if the payload is valid for the given version
pub fn is_valid_for_version(&self, version: EthVersion) -> bool {
match self {
NewPooledTransactionHashes::Eth66(_) => {
matches!(version, EthVersion::Eth67 | EthVersion::Eth66)
}
NewPooledTransactionHashes::Eth68(_) => {
matches!(version, EthVersion::Eth68)
}
}
}
/// Returns an iterator over all transaction hashes.
pub fn iter_hashes(&self) -> impl Iterator<Item = &H256> + '_ {
match self {
NewPooledTransactionHashes::Eth66(msg) => msg.0.iter(),
NewPooledTransactionHashes::Eth68(msg) => msg.hashes.iter(),
}
}
/// Consumes the type and returns all hashes
pub fn into_hashes(self) -> Vec<H256> {
match self {
NewPooledTransactionHashes::Eth66(msg) => msg.0,
NewPooledTransactionHashes::Eth68(msg) => msg.hashes,
}
}
/// Returns an iterator over all transaction hashes.
pub fn into_iter_hashes(self) -> impl Iterator<Item = H256> {
match self {
NewPooledTransactionHashes::Eth66(msg) => msg.0.into_iter(),
NewPooledTransactionHashes::Eth68(msg) => msg.hashes.into_iter(),
}
}
}
impl From<NewPooledTransactionHashes> for EthMessage {
fn from(value: NewPooledTransactionHashes) -> Self {
match value {
NewPooledTransactionHashes::Eth66(msg) => EthMessage::NewPooledTransactionHashes66(msg),
NewPooledTransactionHashes::Eth68(msg) => EthMessage::NewPooledTransactionHashes68(msg),
}
}
}
impl From<NewPooledTransactionHashes66> for NewPooledTransactionHashes {
fn from(hashes: NewPooledTransactionHashes66) -> Self {
Self::Eth66(hashes)
}
}
impl From<NewPooledTransactionHashes68> for NewPooledTransactionHashes {
fn from(hashes: NewPooledTransactionHashes68) -> Self {
Self::Eth68(hashes)
}
}
/// This informs peers of transaction hashes for transactions that have appeared on the network,
/// but have not been included in a block.
#[derive_arbitrary(rlp)]

View File

@ -7,7 +7,7 @@ use futures::FutureExt;
use reth_eth_wire::{
capability::RawCapabilityMessage, message::RequestPair, BlockBodies, BlockBody, BlockHeaders,
EthMessage, GetBlockBodies, GetBlockHeaders, GetNodeData, GetPooledTransactions, GetReceipts,
NewBlock, NewBlockHashes, NewPooledTransactionHashes66, NodeData, PooledTransactions, Receipts,
NewBlock, NewBlockHashes, NewPooledTransactionHashes, NodeData, PooledTransactions, Receipts,
SharedTransactions, Transactions,
};
use reth_interfaces::p2p::error::{RequestError, RequestResult};
@ -50,7 +50,7 @@ pub enum PeerMessage {
/// Broadcast transactions _from_ local _to_ a peer.
SendTransactions(SharedTransactions),
/// Send new pooled transactions
PooledTransactions(NewPooledTransactionHashes66),
PooledTransactions(NewPooledTransactionHashes),
/// All `eth` request variants.
EthRequest(PeerRequest),
/// Other than eth namespace message

View File

@ -4,7 +4,7 @@ use crate::{
};
use async_trait::async_trait;
use parking_lot::Mutex;
use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes66, SharedTransactions};
use reth_eth_wire::{DisconnectReason, NewBlock, NewPooledTransactionHashes, SharedTransactions};
use reth_interfaces::{
p2p::headers::client::StatusUpdater,
sync::{SyncState, SyncStateProvider, SyncStateUpdater},
@ -13,7 +13,7 @@ use reth_net_common::bandwidth_meter::BandwidthMeter;
use reth_network_api::{
NetworkError, NetworkInfo, NetworkStatus, PeerKind, Peers, PeersInfo, ReputationChangeKind,
};
use reth_primitives::{Head, NodeRecord, PeerId, TransactionSigned, TxHash, H256};
use reth_primitives::{Head, NodeRecord, PeerId, TransactionSigned, H256};
use std::{
net::SocketAddr,
sync::{
@ -141,11 +141,8 @@ impl NetworkHandle {
}
/// Send transactions hashes to the peer.
pub fn send_transactions_hashes(&self, peer_id: PeerId, msg: Vec<TxHash>) {
self.send_message(NetworkHandleMessage::SendPooledTransactionHashes {
peer_id,
msg: NewPooledTransactionHashes66(msg),
})
pub fn send_transactions_hashes(&self, peer_id: PeerId, msg: NewPooledTransactionHashes) {
self.send_message(NetworkHandleMessage::SendPooledTransactionHashes { peer_id, msg })
}
/// Send full transactions to the peer
@ -292,7 +289,7 @@ pub(crate) enum NetworkHandleMessage {
/// Sends the list of transactions to the given peer.
SendTransaction { peer_id: PeerId, msg: SharedTransactions },
/// Sends the list of transactions hashes to the given peer.
SendPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes66 },
SendPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes },
/// Send an `eth` protocol request to the peer.
EthRequest {
/// The peer to send the request to.

View File

@ -16,7 +16,7 @@ use reth_eth_wire::{
capability::Capabilities,
errors::{EthHandshakeError, EthStreamError, P2PStreamError},
message::{EthBroadcastMessage, RequestPair},
DisconnectReason, EthMessage, EthStream, EthVersion, P2PStream,
DisconnectReason, EthMessage, EthStream, P2PStream,
};
use reth_interfaces::p2p::error::RequestError;
use reth_metrics_common::metered_sender::MeteredSender;
@ -179,7 +179,7 @@ impl ActiveSession {
self.try_emit_broadcast(PeerMessage::ReceivedTransaction(msg)).into()
}
EthMessage::NewPooledTransactionHashes66(msg) => {
self.try_emit_broadcast(PeerMessage::PooledTransactions(msg)).into()
self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.into())).into()
}
EthMessage::NewPooledTransactionHashes68(msg) => {
if msg.hashes.len() != msg.types.len() || msg.hashes.len() != msg.sizes.len() {
@ -192,8 +192,7 @@ impl ActiveSession {
message: EthMessage::NewPooledTransactionHashes68(msg),
}
}
// TODO revise `PeerMessage::PooledTransactions` to have `types` and `sizes`
self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.hashes.into())).into()
self.try_emit_broadcast(PeerMessage::PooledTransactions(msg.into())).into()
}
EthMessage::GetBlockHeaders(req) => {
on_request!(req, BlockHeaders, GetBlockHeaders)
@ -251,12 +250,8 @@ impl ActiveSession {
self.queued_outgoing.push_back(EthBroadcastMessage::NewBlock(msg.block).into());
}
PeerMessage::PooledTransactions(msg) => {
if self.conn.version() >= EthVersion::Eth68 {
// TODO
// we don't know types and sizes yet
} else {
self.queued_outgoing
.push_back(EthMessage::NewPooledTransactionHashes66(msg).into());
if msg.is_valid_for_version(self.conn.version()) {
self.queued_outgoing.push_back(EthMessage::from(msg).into());
}
}
PeerMessage::EthRequest(req) => {

View File

@ -5,18 +5,19 @@ use crate::{
manager::NetworkEvent,
message::{PeerRequest, PeerRequestSender},
metrics::TransactionsManagerMetrics,
network::NetworkHandleMessage,
NetworkHandle,
};
use futures::{stream::FuturesUnordered, FutureExt, StreamExt};
use reth_eth_wire::{
GetPooledTransactions, NewPooledTransactionHashes66, PooledTransactions, Transactions,
EthVersion, GetPooledTransactions, NewPooledTransactionHashes, NewPooledTransactionHashes66,
NewPooledTransactionHashes68, PooledTransactions, Transactions,
};
use reth_interfaces::{p2p::error::RequestResult, sync::SyncStateProvider};
use reth_network_api::{Peers, ReputationChangeKind};
use reth_primitives::{
FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, H256,
};
use reth_rlp::Encodable;
use reth_transaction_pool::{
error::PoolResult, PropagateKind, PropagatedTransactions, TransactionPool,
};
@ -197,7 +198,8 @@ where
.get_all(hashes)
.into_iter()
.map(|tx| {
(*tx.hash(), Arc::new(tx.transaction.to_recovered_transaction().into_signed()))
let tx = Arc::new(tx.transaction.to_recovered_transaction().into_signed());
PropagateTransaction::new(tx)
})
.collect(),
);
@ -206,9 +208,13 @@ where
self.pool.on_propagated(propagated);
}
/// Propagate the transactions to all connected peers either as full objects or hashes
///
/// The message for new pooled hashes depends on the negotiated version of the stream.
/// See [NewPooledTransactionHashes](NewPooledTransactionHashes)
fn propagate_transactions(
&mut self,
txs: Vec<(TxHash, Arc<TransactionSigned>)>,
to_propagate: Vec<PropagateTransaction>,
) -> PropagatedTransactions {
let mut propagated = PropagatedTransactions::default();
@ -218,21 +224,29 @@ where
// Note: Assuming ~random~ order due to random state of the peers map hasher
for (idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
let (hashes, full): (Vec<_>, Vec<_>) =
txs.iter().filter(|(hash, _)| peer.transactions.insert(*hash)).cloned().unzip();
// filter all transactions unknown to the peer
let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
let mut full_transactions = Vec::new();
for tx in to_propagate.iter() {
if peer.transactions.insert(tx.hash()) {
hashes.push(tx);
full_transactions.push(Arc::clone(&tx.transaction));
}
}
let hashes = hashes.build();
if !full.is_empty() {
if !full_transactions.is_empty() {
if idx > max_num_full {
for hash in &hashes {
propagated.0.entry(*hash).or_default().push(PropagateKind::Hash(*peer_id));
for hash in hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(*peer_id));
}
// send hashes of transactions
self.network.send_transactions_hashes(*peer_id, hashes);
} else {
// send full transactions
self.network.send_transactions(*peer_id, full);
self.network.send_transactions(*peer_id, full_transactions);
for hash in hashes {
for hash in hashes.into_iter_hashes() {
propagated.0.entry(hash).or_default().push(PropagateKind::Full(*peer_id));
}
}
@ -249,7 +263,7 @@ where
fn on_new_pooled_transaction_hashes(
&mut self,
peer_id: PeerId,
msg: NewPooledTransactionHashes66,
msg: NewPooledTransactionHashes,
) {
// If the node is currently syncing, ignore transactions
if self.network.is_syncing() {
@ -259,18 +273,17 @@ where
let mut num_already_seen = 0;
if let Some(peer) = self.peers.get_mut(&peer_id) {
let mut transactions = msg.0;
let mut hashes = msg.into_hashes();
// keep track of the transactions the peer knows
for tx in transactions.iter().copied() {
for tx in hashes.iter().copied() {
if !peer.transactions.insert(tx) {
num_already_seen += 1;
}
}
self.pool.retain_unknown(&mut transactions);
self.pool.retain_unknown(&mut hashes);
if transactions.is_empty() {
if hashes.is_empty() {
// nothing to request
return
}
@ -278,7 +291,7 @@ where
// request the missing transactions
let (response, rx) = oneshot::channel();
let req = PeerRequest::GetPooledTransactions {
request: GetPooledTransactions(transactions),
request: GetPooledTransactions(hashes),
response,
};
@ -323,7 +336,7 @@ where
// remove the peer
self.peers.remove(&peer_id);
}
NetworkEvent::SessionEstablished { peer_id, messages, .. } => {
NetworkEvent::SessionEstablished { peer_id, messages, version, .. } => {
// insert a new peer
self.peers.insert(
peer_id,
@ -332,17 +345,19 @@ where
NonZeroUsize::new(PEER_TRANSACTION_CACHE_LIMIT).unwrap(),
),
request_tx: messages,
version,
},
);
// Send a `NewPooledTransactionHashes` to the peer with _all_ transactions in the
// pool
if !self.network.is_syncing() {
let msg = NewPooledTransactionHashes66(self.pool.pooled_transactions());
self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes {
peer_id,
msg,
})
todo!("get access to full tx");
// let msg = NewPooledTransactionHashes66(self.pool.pooled_transactions());
// self.network.send_message(NetworkHandleMessage::SendPooledTransactionHashes {
// peer_id,
// msg,
// })
}
}
// TODO Add remaining events
@ -502,6 +517,64 @@ where
}
}
/// A transaction that's about to be propagated
struct PropagateTransaction {
tx_type: u8,
length: usize,
transaction: Arc<TransactionSigned>,
}
// === impl PropagateTransaction ===
impl PropagateTransaction {
fn hash(&self) -> TxHash {
self.transaction.hash
}
fn new(transaction: Arc<TransactionSigned>) -> Self {
Self { tx_type: transaction.tx_type().into(), length: transaction.length(), transaction }
}
}
/// A helper type to create the pooled transactions message based on the negotiated version of the
/// session with the peer
enum PooledTransactionsHashesBuilder {
Eth66(NewPooledTransactionHashes66),
Eth68(NewPooledTransactionHashes68),
}
// === impl PooledTransactionsHashesBuilder ===
impl PooledTransactionsHashesBuilder {
fn push(&mut self, tx: &PropagateTransaction) {
match self {
PooledTransactionsHashesBuilder::Eth66(msg) => msg.0.push(tx.hash()),
PooledTransactionsHashesBuilder::Eth68(msg) => {
msg.hashes.push(tx.hash());
msg.sizes.push(tx.length);
msg.types.push(tx.tx_type);
}
}
}
/// Create a builder for the negotiated version of the peer's session
fn new(version: EthVersion) -> Self {
match version {
EthVersion::Eth66 | EthVersion::Eth67 => {
PooledTransactionsHashesBuilder::Eth66(Default::default())
}
EthVersion::Eth68 => PooledTransactionsHashesBuilder::Eth68(Default::default()),
}
}
fn build(self) -> NewPooledTransactionHashes {
match self {
PooledTransactionsHashesBuilder::Eth66(msg) => msg.into(),
PooledTransactionsHashesBuilder::Eth68(msg) => msg.into(),
}
}
}
/// How we received the transactions.
enum TransactionSource {
/// Transactions were broadcast to us via [`Transactions`] message.
@ -532,6 +605,8 @@ struct Peer {
transactions: LruCache<H256>,
/// A communication channel directly to the session task.
request_tx: PeerRequestSender,
/// negotiated version of the session.
version: EthVersion,
}
/// Commands to send to the [`TransactionsManager`](crate::transactions::TransactionsManager)
@ -546,7 +621,7 @@ pub enum NetworkTransactionEvent {
/// Received list of transactions from the given peer.
IncomingTransactions { peer_id: PeerId, msg: Transactions },
/// Received list of transactions hashes to the given peer.
IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes66 },
IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes },
/// Incoming `GetPooledTransactions` request from a peer.
GetPooledTransactions {
peer_id: PeerId,