feat(txpool): add add_and_subscribe (#2603)

This commit is contained in:
Matthias Seitz
2023-05-08 15:28:53 +02:00
committed by GitHub
parent ab55ea5e04
commit 7ca8a297a8
5 changed files with 106 additions and 16 deletions

View File

@ -83,6 +83,7 @@
pub use crate::{
config::PoolConfig,
ordering::{CostOrdering, TransactionOrdering},
pool::TransactionEvents,
traits::{
AllPoolTransactions, BestTransactions, BlockInfo, CanonicalStateUpdate, ChangedAccount,
PoolTransaction, PooledTransaction, PropagateKind, PropagatedTransactions,
@ -94,7 +95,7 @@ pub use crate::{
},
};
use crate::{
error::{PoolError, PoolResult},
error::PoolResult,
pool::PoolInner,
traits::{NewTransactionEvent, PoolSize},
};
@ -249,24 +250,22 @@ where
self.pool.on_canonical_state_change(update);
}
async fn add_transaction_and_subscribe(
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> PoolResult<TransactionEvents> {
let (_, tx) = self.validate(origin, transaction).await;
self.pool.add_transaction_and_subscribe(origin, tx)
}
async fn add_transaction(
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> PoolResult<TxHash> {
let (_, tx) = self.validate(origin, transaction).await;
match tx {
TransactionValidationOutcome::Valid { .. } => {
self.pool.add_transactions(origin, std::iter::once(tx)).pop().expect("exists; qed")
}
TransactionValidationOutcome::Invalid(transaction, error) => {
Err(PoolError::InvalidTransaction(*transaction.hash(), error))
}
TransactionValidationOutcome::Error(transaction, error) => {
Err(PoolError::Other(*transaction.hash(), error))
}
}
self.pool.add_transactions(origin, std::iter::once(tx)).pop().expect("exists; qed")
}
async fn add_transactions(

View File

@ -1,9 +1,39 @@
//! Listeners for the transaction-pool
use crate::{pool::events::TransactionEvent, traits::PropagateKind};
use futures_util::Stream;
use reth_primitives::{TxHash, H256};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc::UnboundedSender;
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
};
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
/// A Stream that receives [TransactionEvent] for the transactions
#[derive(Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TransactionEvents {
hash: TxHash,
events: UnboundedReceiver<TransactionEvent>,
}
impl TransactionEvents {
/// The hash for this transaction
pub fn hash(&self) -> TxHash {
self.hash
}
}
impl Stream for TransactionEvents {
type Item = TransactionEvent;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.get_mut().events.poll_recv(cx)
}
}
type EventBroadcast = UnboundedSender<TransactionEvent>;
@ -35,6 +65,21 @@ impl PoolEventBroadcast {
}
}
/// Create a new subscription for the given transaction hash.
pub(crate) fn subscribe(&mut self, tx_hash: TxHash) -> TransactionEvents {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
match self.broadcasters.entry(tx_hash) {
Entry::Occupied(mut entry) => {
entry.get_mut().senders.push(tx);
}
Entry::Vacant(entry) => {
entry.insert(PoolEventBroadcaster { is_done: false, senders: vec![tx] });
}
};
TransactionEvents { hash: tx_hash, events: rx }
}
/// Notify listeners about a transaction that was added to the pending queue.
pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<&TxHash>) {
self.broadcast_with(tx, |notifier| notifier.pending());

View File

@ -102,6 +102,7 @@ pub(crate) mod size;
pub(crate) mod state;
pub mod txpool;
mod update;
pub use listener::TransactionEvents;
/// Transaction pool internals.
pub struct PoolInner<V: TransactionValidator, T: TransactionOrdering> {
@ -292,6 +293,19 @@ where
}
}
pub(crate) fn add_transaction_and_subscribe(
&self,
origin: TransactionOrigin,
tx: TransactionValidationOutcome<T::Transaction>,
) -> PoolResult<TransactionEvents> {
let listener = {
let mut listener = self.event_listener.write();
listener.subscribe(tx.tx_hash())
};
self.add_transactions(origin, std::iter::once(tx)).pop().expect("exists; qed")?;
Ok(listener)
}
/// Adds all transactions in the iterator to the pool, returning a list of results.
pub fn add_transactions(
&self,

View File

@ -1,4 +1,8 @@
use crate::{error::PoolResult, pool::state::SubPool, validate::ValidPoolTransaction};
use crate::{
error::PoolResult,
pool::{state::SubPool, TransactionEvents},
validate::ValidPoolTransaction,
};
use reth_primitives::{
Address, FromRecoveredTransaction, IntoRecoveredTransaction, PeerId, Transaction,
TransactionKind, TransactionSignedEcRecovered, TxHash, EIP1559_TX_TYPE_ID, H256, U256,
@ -60,6 +64,18 @@ pub trait TransactionPool: Send + Sync + Clone {
self.add_transactions(TransactionOrigin::External, transactions).await
}
/// Adds an _unvalidated_ transaction into the pool and subscribe to state changes.
///
/// This is the same as [TransactionPool::add_transaction] but returns an event stream for the
/// given transaction.
///
/// Consumer: Custom
async fn add_transaction_and_subscribe(
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> PoolResult<TransactionEvents>;
/// Adds an _unvalidated_ transaction into the pool.
///
/// Consumer: RPC

View File

@ -33,6 +33,22 @@ pub enum TransactionValidationOutcome<T: PoolTransaction> {
Error(T, Box<dyn std::error::Error + Send + Sync>),
}
impl<T: PoolTransaction> TransactionValidationOutcome<T> {
/// Returns the transaction that was validated.
pub fn transaction(&self) -> &T {
match self {
Self::Valid { transaction, .. } => transaction,
Self::Invalid(transaction, ..) => transaction,
Self::Error(transaction, ..) => transaction,
}
}
/// Returns the hash of the transactions
pub fn tx_hash(&self) -> TxHash {
*self.transaction().hash()
}
}
/// Provides support for validating transaction at any given state of the chain
#[async_trait::async_trait]
pub trait TransactionValidator: Send + Sync {