feat: add manual broadcast in full (#13453)

This commit is contained in:
Matthias Seitz
2024-12-19 12:44:20 +01:00
committed by GitHub
parent 726d064afb
commit 320a0b9af9

View File

@ -136,7 +136,7 @@ impl<N: NetworkPrimitives> TransactionsHandle<N> {
rx.await
}
/// Manually propagate full transactions to a specific peer.
/// Manually propagate full transaction hashes to a specific peer.
///
/// Do nothing if transactions are empty.
pub fn propagate_transactions_to(&self, transactions: Vec<TxHash>, peer: PeerId) {
@ -146,12 +146,10 @@ impl<N: NetworkPrimitives> TransactionsHandle<N> {
self.send(TransactionsCommand::PropagateTransactionsTo(transactions, peer))
}
/// Manually propagate the given transactions to all peers.
/// Manually propagate the given transaction hashes to all peers.
///
/// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
/// full.
///
/// Do nothing if transactions are empty.
pub fn propagate_transactions(&self, transactions: Vec<TxHash>) {
if transactions.is_empty() {
return
@ -159,6 +157,22 @@ impl<N: NetworkPrimitives> TransactionsHandle<N> {
self.send(TransactionsCommand::PropagateTransactions(transactions))
}
/// Manually propagate the given transactions to all peers.
///
/// It's up to the [`TransactionsManager`] whether the transactions are sent as hashes or in
/// full.
pub fn broadcast_transactions(
&self,
transactions: impl IntoIterator<Item = N::BroadcastedTransaction>,
) {
let transactions =
transactions.into_iter().map(PropagateTransaction::new).collect::<Vec<_>>();
if transactions.is_empty() {
return
}
self.send(TransactionsCommand::BroadcastTransactions(transactions))
}
/// Request the transaction hashes known by specific peers.
pub async fn get_transaction_hashes(
&self,
@ -735,7 +749,7 @@ where
// filter all transactions unknown to the peer
let mut full_transactions = FullTransactionsBuilder::new(peer.version);
let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::new);
let to_propagate = self.pool.get_all(txs).into_iter().map(PropagateTransaction::pool_tx);
if propagation_mode.is_forced() {
// skip cache check if forced
@ -811,7 +825,7 @@ where
.pool
.get_all(hashes)
.into_iter()
.map(PropagateTransaction::new)
.map(PropagateTransaction::pool_tx)
.collect::<Vec<_>>();
let mut propagated = PropagatedTransactions::default();
@ -956,7 +970,7 @@ where
/// __without__ their sidecar, because 4844 transactions are only ever announced as hashes.
fn propagate_all(&mut self, hashes: Vec<TxHash>) {
let propagated = self.propagate_transactions(
self.pool.get_all(hashes).into_iter().map(PropagateTransaction::new).collect(),
self.pool.get_all(hashes).into_iter().map(PropagateTransaction::pool_tx).collect(),
PropagationMode::Basic,
);
@ -1014,6 +1028,9 @@ where
}
}
TransactionsCommand::PropagateTransactions(txs) => self.propagate_all(txs),
TransactionsCommand::BroadcastTransactions(txs) => {
self.propagate_transactions(txs, PropagationMode::Forced);
}
TransactionsCommand::GetTransactionHashes { peers, tx } => {
let mut res = HashMap::with_capacity(peers.len());
for peer_id in peers {
@ -1508,8 +1525,14 @@ struct PropagateTransaction<T = TransactionSigned> {
}
impl<T: SignedTransaction> PropagateTransaction<T> {
/// Create a new instance from a transaction.
pub fn new(transaction: T) -> Self {
let size = transaction.length();
Self { size, transaction: Arc::new(transaction) }
}
/// Create a new instance from a pooled transaction
fn new<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
fn pool_tx<P>(tx: Arc<ValidPoolTransaction<P>>) -> Self
where
P: PoolTransaction<Consensus = T>,
{
@ -1797,8 +1820,10 @@ enum TransactionsCommand<N: NetworkPrimitives = EthNetworkPrimitives> {
GetActivePeers(oneshot::Sender<HashSet<PeerId>>),
/// Propagate a collection of full transactions to a specific peer.
PropagateTransactionsTo(Vec<TxHash>, PeerId),
/// Propagate a collection of full transactions to all peers.
/// Propagate a collection of hashes to all peers.
PropagateTransactions(Vec<TxHash>),
/// Propagate a collection of broadcastable transactions in full to all peers.
BroadcastTransactions(Vec<PropagateTransaction<N::BroadcastedTransaction>>),
/// Request transaction hashes known by specific peers from the [`TransactionsManager`].
GetTransactionHashes {
peers: Vec<PeerId>,
@ -2370,7 +2395,7 @@ mod tests {
assert!(builder.is_empty());
let mut factory = MockTransactionFactory::default();
let tx = PropagateTransaction::new(Arc::new(factory.create_eip1559()));
let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
builder.push(&tx);
assert!(!builder.is_empty());
@ -2391,7 +2416,7 @@ mod tests {
// create a transaction that still fits
tx.transaction.set_size(DEFAULT_SOFT_LIMIT_BYTE_SIZE_TRANSACTIONS_BROADCAST_MESSAGE + 1);
let tx = Arc::new(tx);
let tx = PropagateTransaction::new(tx);
let tx = PropagateTransaction::pool_tx(tx);
builder.push(&tx);
assert!(!builder.is_empty());
@ -2416,7 +2441,7 @@ mod tests {
assert!(builder.is_empty());
let mut factory = MockTransactionFactory::default();
let tx = PropagateTransaction::new(Arc::new(factory.create_eip4844()));
let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip4844()));
builder.push(&tx);
assert!(!builder.is_empty());
@ -2425,7 +2450,7 @@ mod tests {
let txs = txs.pooled.unwrap();
assert_eq!(txs.len(), 1);
let tx = PropagateTransaction::new(Arc::new(factory.create_eip1559()));
let tx = PropagateTransaction::pool_tx(Arc::new(factory.create_eip1559()));
builder.push(&tx);
let txs = builder.clone().build();
@ -2461,9 +2486,9 @@ mod tests {
let mut propagate = vec![];
let mut factory = MockTransactionFactory::default();
let eip1559_tx = Arc::new(factory.create_eip1559());
propagate.push(PropagateTransaction::new(eip1559_tx.clone()));
propagate.push(PropagateTransaction::pool_tx(eip1559_tx.clone()));
let eip4844_tx = Arc::new(factory.create_eip4844());
propagate.push(PropagateTransaction::new(eip4844_tx.clone()));
propagate.push(PropagateTransaction::pool_tx(eip4844_tx.clone()));
let propagated =
tx_manager.propagate_transactions(propagate.clone(), PropagationMode::Basic);