feat(txpool) add blob transaction sidecar listeners (#4722)

This commit is contained in:
prames
2023-09-23 17:02:55 +05:30
committed by GitHub
parent 344ffbd38f
commit c24a6b2e08
6 changed files with 80 additions and 11 deletions

View File

@ -180,9 +180,9 @@ pub use crate::{
traits::{
AllPoolTransactions, BestTransactions, BlockInfo, CanonicalStateUpdate, ChangedAccount,
EthBlobTransactionSidecar, EthPoolTransaction, EthPooledTransaction,
GetPooledTransactionLimit, NewTransactionEvent, PoolSize, PoolTransaction, PropagateKind,
PropagatedTransactions, TransactionListenerKind, TransactionOrigin, TransactionPool,
TransactionPoolExt,
GetPooledTransactionLimit, NewBlobSidecar, NewTransactionEvent, PoolSize, PoolTransaction,
PropagateKind, PropagatedTransactions, TransactionListenerKind, TransactionOrigin,
TransactionPool, TransactionPoolExt,
},
validate::{
EthTransactionValidator, TransactionValidationOutcome, TransactionValidationTaskExecutor,
@ -377,6 +377,10 @@ where
self.pool.add_pending_listener(kind)
}
fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar> {
self.pool.add_blob_sidecar_listener()
}
fn new_transactions_listener_for(
&self,
kind: TransactionListenerKind,

View File

@ -6,7 +6,7 @@
use crate::{
blobstore::BlobStoreError,
error::PoolError,
traits::{GetPooledTransactionLimit, TransactionListenerKind},
traits::{GetPooledTransactionLimit, NewBlobSidecar, TransactionListenerKind},
validate::ValidTransaction,
AllPoolTransactions, AllTransactionsEvents, BestTransactions, BlockInfo, EthPooledTransaction,
NewTransactionEvent, PoolResult, PoolSize, PoolTransaction, PropagatedTransactions,
@ -89,6 +89,10 @@ impl TransactionPool for NoopTransactionPool {
mpsc::channel(1).1
}
fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar> {
mpsc::channel(1).1
}
fn new_transactions_listener(&self) -> Receiver<NewTransactionEvent<Self::Transaction>> {
mpsc::channel(1).1
}

View File

@ -103,7 +103,7 @@ use crate::{
blobstore::BlobStore,
metrics::BlobStoreMetrics,
pool::txpool::UpdateOutcome,
traits::{GetPooledTransactionLimit, TransactionListenerKind},
traits::{GetPooledTransactionLimit, NewBlobSidecar, TransactionListenerKind},
validate::ValidTransaction,
};
pub use listener::{AllTransactionsEvents, TransactionEvents};
@ -118,6 +118,10 @@ pub(crate) mod state;
pub mod txpool;
mod update;
const PENDING_TX_LISTENER_BUFFER_SIZE: usize = 2048;
const NEW_TX_LISTENER_BUFFER_SIZE: usize = 1024;
const BLOB_SIDECAR_LISTENER_BUFFER_SIZE: usize = 512;
/// Transaction pool internals.
pub struct PoolInner<V, T, S>
where
@ -139,6 +143,8 @@ where
pending_transaction_listener: Mutex<Vec<PendingTransactionListener>>,
/// Listeners for new transactions added to the pool.
transaction_listener: Mutex<Vec<TransactionListener<T::Transaction>>>,
/// Listener for new blob transaction sidecars added to the pool.
blob_transaction_sidecar_listener: Mutex<Vec<BlobTransactionSidecarListener>>,
/// Metrics for the blob store
blob_store_metrics: BlobStoreMetrics,
}
@ -160,6 +166,7 @@ where
pool: RwLock::new(TxPool::new(ordering, config.clone())),
pending_transaction_listener: Default::default(),
transaction_listener: Default::default(),
blob_transaction_sidecar_listener: Default::default(),
config,
blob_store,
blob_store_metrics: Default::default(),
@ -224,8 +231,7 @@ where
/// Adds a new transaction listener to the pool that gets notified about every new _pending_
/// transaction inserted into the pool
pub fn add_pending_listener(&self, kind: TransactionListenerKind) -> mpsc::Receiver<TxHash> {
const TX_LISTENER_BUFFER_SIZE: usize = 2048;
let (sender, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
let (sender, rx) = mpsc::channel(PENDING_TX_LISTENER_BUFFER_SIZE);
let listener = PendingTransactionListener { sender, kind };
self.pending_transaction_listener.lock().push(listener);
rx
@ -236,12 +242,19 @@ where
&self,
kind: TransactionListenerKind,
) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
const TX_LISTENER_BUFFER_SIZE: usize = 1024;
let (sender, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
let (sender, rx) = mpsc::channel(NEW_TX_LISTENER_BUFFER_SIZE);
let listener = TransactionListener { sender, kind };
self.transaction_listener.lock().push(listener);
rx
}
/// Adds a new blob sidecar listener to the pool that gets notified about every new
/// eip4844 transaction's blob sidecar.
pub fn add_blob_sidecar_listener(&self) -> mpsc::Receiver<NewBlobSidecar> {
let (sender, rx) = mpsc::channel(BLOB_SIDECAR_LISTENER_BUFFER_SIZE);
let listener = BlobTransactionSidecarListener { sender };
self.blob_transaction_sidecar_listener.lock().push(listener);
rx
}
/// If the pool contains the transaction, this adds a new listener that gets notified about
/// transaction events.
@ -407,6 +420,8 @@ where
// transaction was successfully inserted into the pool
if let Some(sidecar) = maybe_sidecar {
// notify blob sidecar listeners
self.on_new_blob_sidecar(&hash, &sidecar);
// store the sidecar in the blob store
self.insert_blob(hash, sidecar);
}
@ -553,6 +568,29 @@ where
});
}
/// Notify all listeners about a blob sidecar for a newly inserted blob (eip4844) transaction.
fn on_new_blob_sidecar(&self, tx_hash: &TxHash, sidecar: &BlobTransactionSidecar) {
let mut sidecar_listeners = self.blob_transaction_sidecar_listener.lock();
sidecar_listeners.retain_mut(|listener| {
let new_blob_event = NewBlobSidecar { tx_hash: *tx_hash, sidecar: sidecar.clone() };
match listener.sender.try_send(new_blob_event) {
Ok(()) => true,
Err(err) => {
if matches!(err, mpsc::error::TrySendError::Full(_)) {
debug!(
target: "txpool",
"[{:?}] failed to send blob sidecar; channel full",
sidecar,
);
true
} else {
false
}
}
}
})
}
/// Notifies transaction listeners about changes after a block was processed.
fn notify_on_new_state(&self, outcome: OnNewCanonicalStateOutcome<T::Transaction>) {
// notify about promoted pending transactions
@ -791,6 +829,12 @@ struct TransactionListener<T: PoolTransaction> {
kind: TransactionListenerKind,
}
/// An active listener for new blobs
#[derive(Debug)]
struct BlobTransactionSidecarListener {
sender: mpsc::Sender<NewBlobSidecar>,
}
/// Tracks an added transaction and all graph changes caused by adding it.
#[derive(Debug, Clone)]
pub struct AddedPendingTransaction<T: PoolTransaction> {

View File

@ -153,7 +153,7 @@ impl MockTransaction {
}
}
/// Returns a new EIP1559 transaction with random address and hash and empty values
/// Returns a new EIP4844 transaction with random address and hash and empty values
pub fn eip4844() -> Self {
MockTransaction::Eip4844 {
hash: H256::random(),

View File

@ -130,6 +130,10 @@ pub trait TransactionPool: Send + Sync + Clone {
self.new_transactions_listener_for(TransactionListenerKind::PropagateOnly)
}
/// Returns a new Stream that yields blob "sidecars" (blobs w/ assoc. kzg
/// commitments/proofs) for eip-4844 transactions inserted into the pool
fn blob_transaction_sidecars_listener(&self) -> Receiver<NewBlobSidecar>;
/// Returns a new stream that yields new valid transactions added to the pool
/// depending on the given [TransactionListenerKind] argument.
fn new_transactions_listener_for(
@ -448,6 +452,17 @@ impl<T: PoolTransaction> Clone for NewTransactionEvent<T> {
}
}
/// This type represents a new blob sidecar that has been stored in the transaction pool's
/// blobstore; it includes the TransasctionHash of the blob transaction along with the assoc.
/// sidecar (blobs, commitments, proofs)
#[derive(Debug, Clone)]
pub struct NewBlobSidecar {
/// hash of the EIP-4844 transaction.
pub tx_hash: TxHash,
/// the blob transaction sidecar.
pub sidecar: BlobTransactionSidecar,
}
/// Where the transaction originates from.
///
/// Depending on where the transaction was picked up, it affects how the transaction is handled
@ -727,7 +742,7 @@ pub struct EthPooledTransaction {
/// max_blob_fee_per_gas * blob_gas_used`.
pub(crate) cost: U256,
/// The blob side car this transaction
/// The blob side car for this transaction
pub(crate) blob_sidecar: EthBlobTransactionSidecar,
}

View File

@ -10,6 +10,8 @@ use reth_transaction_pool::{
async fn blobs_exclusive() {
let txpool = testing_pool();
let mut mock_tx_factory = MockTransactionFactory::default();
// TODO: add blob sidecar to mock_eip4844_tx returned here so we can test the
// BlobTxSidecarListener in tx_pool
let blob_tx = mock_tx_factory.create_eip4844();
let hash = txpool