fix: fix tx propagation when full (#10251)

This commit is contained in:
Matthias Seitz
2024-08-12 18:28:49 +02:00
committed by GitHub
parent 3fe862eb4f
commit b8a6f87884
3 changed files with 297 additions and 59 deletions

View File

@ -33,7 +33,7 @@ use std::{
use futures::{stream::FuturesUnordered, Future, StreamExt};
use reth_eth_wire::{
EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
DedupPayload, EthVersion, GetPooledTransactions, HandleMempoolData, HandleVersionedMempoolData,
NewPooledTransactionHashes, NewPooledTransactionHashes66, NewPooledTransactionHashes68,
PooledTransactions, RequestTxHashes, Transactions,
};
@ -425,9 +425,12 @@ where
// Note: Assuming ~random~ order due to random state of the peers map hasher
for (peer_idx, (peer_id, peer)) in self.peers.iter_mut().enumerate() {
// filter all transactions unknown to the peer
let mut hashes = PooledTransactionsHashesBuilder::new(peer.version);
let mut full_transactions = FullTransactionsBuilder::default();
// determine whether to send full tx objects or hashes.
let mut builder = if peer_idx > max_num_full {
PropagateTransactionsBuilder::pooled(peer.version)
} else {
PropagateTransactionsBuilder::full(peer.version)
};
// Iterate through the transactions to propagate and fill the hashes and full
// transaction lists, before deciding whether or not to send full transactions to the
@ -436,31 +439,19 @@ where
// Only proceed if the transaction is not in the peer's list of seen transactions
if !peer.seen_transactions.contains(&tx.hash()) {
// add transaction to the list of hashes to propagate
hashes.push(tx);
// Do not send full 4844 transaction hashes to peers.
//
// Nodes MUST NOT automatically broadcast blob transactions to their peers.
// Instead, those transactions are only announced using
// `NewPooledTransactionHashes` messages, and can then be manually requested
// via `GetPooledTransactions`.
//
// From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
if !tx.transaction.is_eip4844() {
full_transactions.push(tx);
}
builder.push(tx);
}
}
let mut new_pooled_hashes = hashes.build();
if new_pooled_hashes.is_empty() {
if builder.is_empty() {
trace!(target: "net::tx", ?peer_id, "Nothing to propagate to peer; has seen all transactions");
continue
}
// determine whether to send full tx objects or hashes. If there are no full
// transactions, try to send hashes.
if peer_idx > max_num_full || full_transactions.is_empty() {
let PropagateTransactions { pooled, full } = builder.build();
// send hashes if any
if let Some(mut new_pooled_hashes) = pooled {
// enforce tx soft limit per message for the (unlikely) event the number of
// hashes exceeds it
new_pooled_hashes
@ -476,9 +467,10 @@ where
// send hashes of transactions
self.network.send_transactions_hashes(*peer_id, new_pooled_hashes);
} else {
let new_full_transactions = full_transactions.build();
}
// send full transactions, if any
if let Some(new_full_transactions) = full {
for tx in &new_full_transactions {
propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(*peer_id));
// mark transaction as seen by peer
@ -498,9 +490,9 @@ where
propagated
}
/// Propagate the full transactions to a specific peer
/// Propagate the full transactions to a specific peer.
///
/// Returns the propagated transactions
/// Returns the propagated transactions.
fn propagate_full_transactions_to_peer(
&mut self,
txs: Vec<TxHash>,
@ -512,33 +504,45 @@ where
let mut propagated = PropagatedTransactions::default();
// filter all transactions unknown to the peer
let mut full_transactions = FullTransactionsBuilder::default();
let mut full_transactions = FullTransactionsBuilder::new(peer.version);
let to_propagate = self
.pool
.get_all(txs)
.into_iter()
.filter(|tx| !tx.transaction.is_eip4844())
.map(PropagateTransaction::new);
let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::new);
// Iterate through the transactions to propagate and fill the hashes and full transaction
for tx in to_propagate {
if peer.seen_transactions.insert(tx.hash()) {
if !peer.seen_transactions.contains(&tx.hash()) {
full_transactions.push(&tx);
}
}
if full_transactions.transactions.is_empty() {
if full_transactions.is_empty() {
// nothing to propagate
return None
}
let new_full_transactions = full_transactions.build();
for tx in &new_full_transactions {
propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(peer_id));
let PropagateTransactions { pooled, full } = full_transactions.build();
// send hashes if any
if let Some(new_pooled_hashes) = pooled {
for hash in new_pooled_hashes.iter_hashes().copied() {
propagated.0.entry(hash).or_default().push(PropagateKind::Hash(peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(hash);
}
// send hashes of transactions
self.network.send_transactions_hashes(peer_id, new_pooled_hashes);
}
// send full transactions, if any
if let Some(new_full_transactions) = full {
for tx in &new_full_transactions {
propagated.0.entry(tx.hash()).or_default().push(PropagateKind::Full(peer_id));
// mark transaction as seen by peer
peer.seen_transactions.insert(tx.hash());
}
// send full transactions
self.network.send_transactions(peer_id, new_full_transactions);
}
// send full transactions
self.network.send_transactions(peer_id, new_full_transactions);
// Update propagated transactions metrics
self.metrics.propagated_transactions.increment(propagated.0.len() as u64);
@ -865,8 +869,8 @@ where
let peers = self.peers.keys().copied().collect::<HashSet<_>>();
tx.send(peers).ok();
}
TransactionsCommand::PropagateTransactionsTo(_txs, _peer) => {
if let Some(propagated) = self.propagate_full_transactions_to_peer(_txs, _peer) {
TransactionsCommand::PropagateTransactionsTo(txs, _peer) => {
if let Some(propagated) = self.propagate_full_transactions_to_peer(txs, _peer) {
self.pool.on_propagated(propagated);
}
}
@ -1368,6 +1372,7 @@ where
}
/// A transaction that's about to be propagated to multiple peers.
#[derive(Debug, Clone)]
struct PropagateTransaction {
size: usize,
transaction: Arc<TransactionSigned>,
@ -1388,27 +1393,114 @@ impl PropagateTransaction {
}
}
/// Helper type to construct the appropriate message to send to the peer based on whether the peer
/// should receive them in full or as pooled
#[derive(Debug, Clone)]
enum PropagateTransactionsBuilder {
Pooled(PooledTransactionsHashesBuilder),
Full(FullTransactionsBuilder),
}
impl PropagateTransactionsBuilder {
/// Create a builder for pooled transactions
fn pooled(version: EthVersion) -> Self {
Self::Pooled(PooledTransactionsHashesBuilder::new(version))
}
/// Create a builder that sends transactions in full and records transactions that don't fit.
fn full(version: EthVersion) -> Self {
Self::Full(FullTransactionsBuilder::new(version))
}
/// Appends a transaction to the list.
fn push(&mut self, transaction: &PropagateTransaction) {
match self {
Self::Pooled(builder) => builder.push(transaction),
Self::Full(builder) => builder.push(transaction),
}
}
/// Returns true if no transactions are recorded.
fn is_empty(&self) -> bool {
match self {
Self::Pooled(builder) => builder.is_empty(),
Self::Full(builder) => builder.is_empty(),
}
}
/// Consumes the type and returns the built messages that should be sent to the peer.
fn build(self) -> PropagateTransactions {
match self {
Self::Pooled(pooled) => {
PropagateTransactions { pooled: Some(pooled.build()), full: None }
}
Self::Full(full) => full.build(),
}
}
}
/// Represents how the transactions should be sent to a peer if any.
struct PropagateTransactions {
/// The pooled transaction hashes to send.
pooled: Option<NewPooledTransactionHashes>,
/// The transactions to send in full.
full: Option<Vec<Arc<TransactionSigned>>>,
}
/// Helper type for constructing the full transaction message that enforces the
/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`].
#[derive(Default)]
/// [`DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE`] for full transaction broadcast
/// and enforces other propagation rules for EIP-4844 and tracks those transactions that can't be
/// broadcasted in full.
#[derive(Debug, Clone)]
struct FullTransactionsBuilder {
/// The soft limit to enforce for a single broadcast message of full transactions.
total_size: usize,
/// All transactions to be broadcasted.
transactions: Vec<Arc<TransactionSigned>>,
/// Transactions that didn't fit into the broadcast message
pooled: PooledTransactionsHashesBuilder,
}
// === impl FullTransactionsBuilder ===
impl FullTransactionsBuilder {
/// Append a transaction to the list if the total message bytes size doesn't exceed the soft
/// maximum target byte size. The limit is soft, meaning if one single transaction goes over
/// the limit, it will be broadcasted in its own [`Transactions`] message. The same pattern is
/// followed in filling a [`GetPooledTransactions`] request in
/// Create a builder for the negotiated version of the peer's session
fn new(version: EthVersion) -> Self {
Self {
total_size: 0,
pooled: PooledTransactionsHashesBuilder::new(version),
transactions: vec![],
}
}
/// Append a transaction to the list of full transaction if the total message bytes size doesn't
/// exceed the soft maximum target byte size. The limit is soft, meaning if one single
/// transaction goes over the limit, it will be broadcasted in its own [`Transactions`]
/// message. The same pattern is followed in filling a [`GetPooledTransactions`] request in
/// [`TransactionFetcher::fill_request_from_hashes_pending_fetch`].
///
/// If the transaction is unsuitable for broadcast or would exceed the softlimit, it is appended
/// to list of pooled transactions, (e.g. 4844 transactions).
fn push(&mut self, transaction: &PropagateTransaction) {
// Do not send full 4844 transaction hashes to peers.
//
// Nodes MUST NOT automatically broadcast blob transactions to their peers.
// Instead, those transactions are only announced using
// `NewPooledTransactionHashes` messages, and can then be manually requested
// via `GetPooledTransactions`.
//
// From: <https://eips.ethereum.org/EIPS/eip-4844#networking>
if transaction.transaction.is_eip4844() {
self.pooled.push(transaction);
return
}
let new_size = self.total_size + transaction.size;
if new_size > DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE &&
self.total_size > 0
{
// transaction does not fit into the message
self.pooled.push(transaction);
return
}
@ -1418,17 +1510,20 @@ impl FullTransactionsBuilder {
/// Returns whether or not any transactions are in the [`FullTransactionsBuilder`].
fn is_empty(&self) -> bool {
self.transactions.is_empty()
self.transactions.is_empty() && self.pooled.is_empty()
}
/// returns the list of transactions.
fn build(self) -> Vec<Arc<TransactionSigned>> {
self.transactions
/// Returns the messages that should be propagated to the peer.
fn build(self) -> PropagateTransactions {
let pooled = Some(self.pooled.build()).filter(|pooled| !pooled.is_empty());
let full = Some(self.transactions).filter(|full| !full.is_empty());
PropagateTransactions { pooled, full }
}
}
/// A helper type to create the pooled transactions message based on the negotiated version of the
/// session with the peer
#[derive(Debug, Clone)]
enum PooledTransactionsHashesBuilder {
Eth66(NewPooledTransactionHashes66),
Eth68(NewPooledTransactionHashes68),
@ -1449,6 +1544,14 @@ impl PooledTransactionsHashesBuilder {
}
}
/// Returns whether or not any transactions are in the [`PooledTransactionsHashesBuilder`].
fn is_empty(&self) -> bool {
match self {
Self::Eth66(hashes) => hashes.is_empty(),
Self::Eth68(hashes) => hashes.is_empty(),
}
}
fn push(&mut self, tx: &PropagateTransaction) {
match self {
Self::Eth66(msg) => msg.0.push(tx.hash()),
@ -1627,13 +1730,20 @@ mod tests {
};
use reth_primitives::hex;
use reth_provider::test_utils::NoopProvider;
use reth_transaction_pool::test_utils::{testing_pool, MockTransaction};
use reth_transaction_pool::test_utils::{
testing_pool, MockTransaction, MockTransactionFactory, TestPool,
};
use secp256k1::SecretKey;
use std::{fmt, future::poll_fn, hash};
use std::{
fmt,
future::poll_fn,
hash,
net::{IpAddr, Ipv4Addr, SocketAddr},
};
use tests::fetcher::TxFetchMetadata;
use tracing::error;
async fn new_tx_manager() -> TransactionsManager<impl TransactionPool> {
async fn new_tx_manager() -> (TransactionsManager<TestPool>, NetworkManager) {
let secret_key = SecretKey::new(&mut rand::thread_rng());
let client = NoopProvider::default();
@ -1646,14 +1756,14 @@ mod tests {
let pool = testing_pool();
let transactions_manager_config = config.transactions_manager_config.clone();
let (_network_handle, _network, transactions, _) = NetworkManager::new(config)
let (_network_handle, network, transactions, _) = NetworkManager::new(config)
.await
.unwrap()
.into_builder()
.transactions(pool.clone(), transactions_manager_config)
.split_with_handle();
transactions
(transactions, network)
}
pub(super) fn default_cache<T: hash::Hash + Eq + fmt::Debug>() -> LruCache<T> {
@ -2034,7 +2144,7 @@ mod tests {
async fn test_max_retries_tx_request() {
reth_tracing::init_test_tracing();
let mut tx_manager = new_tx_manager().await;
let mut tx_manager = new_tx_manager().await.0;
let tx_fetcher = &mut tx_manager.transaction_fetcher;
let peer_id_1 = PeerId::new([1; 64]);
@ -2142,4 +2252,122 @@ mod tests {
assert!(tx_fetcher.hashes_pending_fetch.is_empty());
assert_eq!(tx_fetcher.active_peers.len(), 0);
}
#[test]
fn test_transaction_builder_empty() {
let mut builder = PropagateTransactionsBuilder::pooled(EthVersion::Eth68);
assert!(builder.is_empty());
let mut factory = MockTransactionFactory::default();
let tx = PropagateTransaction::new(Arc::new(factory.create_eip1559()));
builder.push(&tx);
assert!(!builder.is_empty());
let txs = builder.build();
assert!(txs.full.is_none());
let txs = txs.pooled.unwrap();
assert_eq!(txs.len(), 1);
}
#[test]
fn test_transaction_builder_large() {
let mut builder = PropagateTransactionsBuilder::full(EthVersion::Eth68);
assert!(builder.is_empty());
let mut factory = MockTransactionFactory::default();
let mut tx = factory.create_eip1559();
// create a transaction that still fits
tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
let tx = Arc::new(tx);
let tx = PropagateTransaction::new(tx);
builder.push(&tx);
assert!(!builder.is_empty());
let txs = builder.clone().build();
assert!(txs.pooled.is_none());
let txs = txs.full.unwrap();
assert_eq!(txs.len(), 1);
builder.push(&tx);
let txs = builder.clone().build();
let pooled = txs.pooled.unwrap();
assert_eq!(pooled.len(), 1);
let txs = txs.full.unwrap();
assert_eq!(txs.len(), 1);
}
#[test]
fn test_transaction_builder_eip4844() {
let mut builder = PropagateTransactionsBuilder::full(EthVersion::Eth68);
assert!(builder.is_empty());
let mut factory = MockTransactionFactory::default();
let tx = PropagateTransaction::new(Arc::new(factory.create_eip4844()));
builder.push(&tx);
assert!(!builder.is_empty());
let txs = builder.clone().build();
assert!(txs.full.is_none());
let txs = txs.pooled.unwrap();
assert_eq!(txs.len(), 1);
let tx = PropagateTransaction::new(Arc::new(factory.create_eip1559()));
builder.push(&tx);
let txs = builder.clone().build();
let pooled = txs.pooled.unwrap();
assert_eq!(pooled.len(), 1);
let txs = txs.full.unwrap();
assert_eq!(txs.len(), 1);
}
#[tokio::test]
async fn test_propagate_full() {
reth_tracing::init_test_tracing();
let (mut tx_manager, network) = new_tx_manager().await;
let peer_id = PeerId::random();
// ensure not syncing
network.handle().update_sync_state(SyncState::Idle);
// mock a peer
let (tx, _rx) = mpsc::channel(1);
tx_manager.on_network_event(NetworkEvent::SessionEstablished {
peer_id,
remote_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 0),
client_version: Arc::from(""),
capabilities: Arc::new(vec![].into()),
messages: PeerRequestSender::new(peer_id, tx),
status: Arc::new(Default::default()),
version: EthVersion::Eth68,
});
let mut propagate = vec![];
let mut factory = MockTransactionFactory::default();
let eip1559_tx = Arc::new(factory.create_eip1559());
propagate.push(PropagateTransaction::new(eip1559_tx.clone()));
let eip4844_tx = Arc::new(factory.create_eip4844());
propagate.push(PropagateTransaction::new(eip4844_tx.clone()));
let propagated = tx_manager.propagate_transactions(propagate.clone());
assert_eq!(propagated.0.len(), 2);
let prop_txs = propagated.0.get(eip1559_tx.transaction.hash()).unwrap();
assert_eq!(prop_txs.len(), 1);
assert!(prop_txs[0].is_full());
let prop_txs = propagated.0.get(eip4844_tx.transaction.hash()).unwrap();
assert_eq!(prop_txs.len(), 1);
assert!(prop_txs[0].is_hash());
let peer = tx_manager.peers.get(&peer_id).unwrap();
assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
assert!(peer.seen_transactions.contains(eip1559_tx.transaction.hash()));
peer.seen_transactions.contains(eip4844_tx.transaction.hash());
// propagate again
let propagated = tx_manager.propagate_transactions(propagate);
assert!(propagated.0.is_empty());
}
}

View File

@ -718,7 +718,7 @@ impl PoolTransaction for MockTransaction {
/// Returns the encoded length of the transaction.
fn encoded_length(&self) -> usize {
0
self.size()
}
/// Returns the chain ID associated with the transaction.

View File

@ -499,6 +499,16 @@ impl PropagateKind {
Self::Full(peer) | Self::Hash(peer) => peer,
}
}
/// Returns true if the transaction was sent as a full transaction
pub const fn is_full(&self) -> bool {
matches!(self, Self::Full(_))
}
/// Returns true if the transaction was sent as a hash
pub const fn is_hash(&self) -> bool {
matches!(self, Self::Hash(_))
}
}
impl From<PropagateKind> for PeerId {