feat: support blob transactions in manager (#4294)

This commit is contained in:
Matthias Seitz
2023-08-21 17:05:11 +02:00
committed by GitHub
parent 0d47e4cf4f
commit 3b404acc7d
3 changed files with 54 additions and 14 deletions

View File

@ -80,6 +80,13 @@ pub struct Transactions(
pub Vec<TransactionSigned>,
);
impl Transactions {
/// Returns `true` if the list of transactions contains any blob transactions.
pub fn has_eip4844(&self) -> bool {
self.0.iter().any(|tx| tx.is_eip4844())
}
}
impl From<Vec<TransactionSigned>> for Transactions {
fn from(txs: Vec<TransactionSigned>) -> Self {
Transactions(txs)

View File

@ -19,8 +19,8 @@ use reth_interfaces::{
use reth_metrics::common::mpsc::UnboundedMeteredReceiver;
use reth_network_api::{Peers, ReputationChangeKind};
use reth_primitives::{
FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, TransactionSigned, TxHash, TxType,
H256,
FromRecoveredPooledTransaction, IntoRecoveredTransaction, PeerId, PooledTransactionsElement,
TransactionSigned, TxHash, TxType, H256,
};
use reth_rlp::Encodable;
use reth_transaction_pool::{
@ -164,7 +164,6 @@ impl<Pool: TransactionPool> TransactionsManager<Pool> {
impl<Pool> TransactionsManager<Pool>
where
Pool: TransactionPool + 'static,
<Pool as TransactionPool>::Transaction: IntoRecoveredTransaction,
{
/// Returns a new handle that can send commands to this type.
pub fn handle(&self) -> TransactionsHandle {
@ -375,7 +374,22 @@ where
fn on_network_tx_event(&mut self, event: NetworkTransactionEvent) {
match event {
NetworkTransactionEvent::IncomingTransactions { peer_id, msg } => {
self.import_transactions(peer_id, msg.0, TransactionSource::Broadcast);
// ensure we didn't receive any blob transactions as these are disallowed to be
// broadcasted in full
let has_blob_txs = msg.has_eip4844();
let non_blob_txs = msg
.0
.into_iter()
.map(PooledTransactionsElement::try_from_broadcast)
.filter_map(Result::ok);
self.import_transactions(peer_id, non_blob_txs, TransactionSource::Broadcast);
if has_blob_txs {
self.report_peer(peer_id, ReputationChangeKind::BadTransactions);
}
}
NetworkTransactionEvent::IncomingPooledTransactionHashes { peer_id, msg } => {
self.on_new_pooled_transaction_hashes(peer_id, msg)
@ -448,7 +462,7 @@ where
fn import_transactions(
&mut self,
peer_id: PeerId,
transactions: Vec<TransactionSigned>,
transactions: impl IntoIterator<Item = PooledTransactionsElement>,
source: TransactionSource,
) {
// If the node is pipeline syncing, ignore transactions
@ -463,7 +477,7 @@ where
if let Some(peer) = self.peers.get_mut(&peer_id) {
for tx in transactions {
// recover transaction
let tx = if let Some(tx) = tx.into_ecrecovered() {
let tx = if let Ok(tx) = tx.try_into_ecrecovered() {
tx
} else {
has_bad_transactions = true;
@ -474,18 +488,18 @@ where
// If we received the transactions as the response to our GetPooledTransactions
// requests (based on received `NewPooledTransactionHashes`) then we already
// recorded the hashes in [`Self::on_new_pooled_transaction_hashes`]
if source.is_broadcast() && !peer.transactions.insert(tx.hash()) {
if source.is_broadcast() && !peer.transactions.insert(*tx.hash()) {
num_already_seen += 1;
}
match self.transactions_by_peers.entry(tx.hash()) {
match self.transactions_by_peers.entry(*tx.hash()) {
Entry::Occupied(mut entry) => {
// transaction was already inserted
entry.get_mut().push(peer_id);
}
Entry::Vacant(entry) => {
// this is a new transaction that should be imported into the pool
let pool_transaction = <Pool::Transaction as FromRecoveredTransaction>::from_recovered_transaction(tx);
let pool_transaction = <Pool::Transaction as FromRecoveredPooledTransaction>::from_recovered_transaction(tx);
let pool = self.pool.clone();
@ -583,11 +597,7 @@ where
{
match result {
Ok(Ok(txs)) => {
// convert all transactions to the inner transaction type, ignoring any
// sidecars
// TODO: remove this! this will be different when we introduce the blobpool
let transactions = txs.0.into_iter().map(|tx| tx.into_transaction()).collect();
this.import_transactions(peer_id, transactions, TransactionSource::Response)
this.import_transactions(peer_id, txs.0, TransactionSource::Response)
}
Ok(Err(req_err)) => {
this.on_request_error(peer_id, req_err);
@ -825,6 +835,8 @@ enum TransactionsCommand {
#[allow(missing_docs)]
pub enum NetworkTransactionEvent {
/// Received list of transactions from the given peer.
///
/// This represents transactions that were broadcasted to use from the peer.
IncomingTransactions { peer_id: PeerId, msg: Transactions },
/// Received list of transactions hashes to the given peer.
IncomingPooledTransactionHashes { peer_id: PeerId, msg: NewPooledTransactionHashes },