chore(txpool): remove hash generics (#211)

This commit is contained in:
Matthias Seitz
2022-11-16 12:19:30 +01:00
committed by GitHub
parent bdf41d39a8
commit 11404adf6c
3 changed files with 22 additions and 27 deletions

View File

@ -1,9 +1,9 @@
use reth_primitives::H256;
use reth_primitives::{TxHash, H256};
use serde::{Deserialize, Serialize};
/// Various events that describe status changes of a transaction.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum TransactionEvent<Hash> {
pub enum TransactionEvent {
/// Transaction has been added to the ready queue.
Ready,
/// Transaction has been added to the pending pool.
@ -15,7 +15,7 @@ pub enum TransactionEvent<Hash> {
/// Transaction has been replaced by the transaction belonging to the hash.
///
/// E.g. same (sender + nonce) pair
Replaced(Hash),
Replaced(TxHash),
/// Transaction was dropped due to configured limits.
Discarded,
/// Transaction became invalid indefinitely.

View File

@ -1,23 +1,24 @@
//! Listeners for the transaction-pool
use crate::pool::events::TransactionEvent;
use reth_primitives::H256;
use reth_primitives::{rpc::TxHash, H256};
use std::{collections::HashMap, hash};
use tokio::sync::mpsc::UnboundedSender;
type EventSink<Hash> = UnboundedSender<TransactionEvent<Hash>>;
type EventSink = UnboundedSender<TransactionEvent>;
/// Transaction pool event listeners.
pub(crate) struct PoolEventListener<Hash: hash::Hash + Eq> {
#[derive(Default)]
pub(crate) struct PoolEventListener {
/// All listeners for certain transaction events.
listeners: HashMap<Hash, PoolEventNotifier<Hash>>,
listeners: HashMap<TxHash, PoolEventNotifier>,
}
impl<Hash: hash::Hash + Eq + Clone> PoolEventListener<Hash> {
impl PoolEventListener {
/// Calls the notification callback with the `PoolEventListenerSender` that belongs to the hash.
fn notify_with<F>(&mut self, hash: &Hash, callback: F)
fn notify_with<F>(&mut self, hash: &TxHash, callback: F)
where
F: FnOnce(&mut PoolEventNotifier<Hash>),
F: FnOnce(&mut PoolEventNotifier),
{
let is_done = if let Some(sink) = self.listeners.get_mut(hash) {
callback(sink);
@ -32,49 +33,43 @@ impl<Hash: hash::Hash + Eq + Clone> PoolEventListener<Hash> {
}
/// Notify listeners about a transaction that was added to the ready queue.
pub(crate) fn ready(&mut self, tx: &Hash, replaced: Option<&Hash>) {
pub(crate) fn ready(&mut self, tx: &TxHash, replaced: Option<&TxHash>) {
self.notify_with(tx, |notifier| notifier.ready());
if let Some(replaced) = replaced {
// notify listeners that this transaction was replaced
self.notify_with(replaced, |notifier| notifier.replaced(tx.clone()));
self.notify_with(replaced, |notifier| notifier.replaced(*tx));
}
}
/// Notify listeners about a transaction that was added to the queued pool.
pub(crate) fn queued(&mut self, tx: &Hash) {
pub(crate) fn queued(&mut self, tx: &TxHash) {
self.notify_with(tx, |notifier| notifier.queued());
}
/// Notify listeners about a transaction that was discarded.
pub(crate) fn discarded(&mut self, tx: &Hash) {
pub(crate) fn discarded(&mut self, tx: &TxHash) {
self.notify_with(tx, |notifier| notifier.discarded());
}
/// Notify listeners that the transaction was mined
pub(crate) fn mined(&mut self, tx: &Hash, block_hash: H256) {
pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: H256) {
self.notify_with(tx, |notifier| notifier.mined(block_hash));
}
}
impl<Hash: hash::Hash + Eq> Default for PoolEventListener<Hash> {
fn default() -> Self {
Self { listeners: Default::default() }
}
}
/// Sender half(s) of the event channels for a specific transaction
#[derive(Debug)]
struct PoolEventNotifier<Hash> {
struct PoolEventNotifier {
/// Tracks whether the transaction this notifier can stop because the transaction was
/// completed, or removed.
is_done: bool,
/// Corresponding sender half(s) for event listener channel
senders: Vec<EventSink<Hash>>,
senders: Vec<EventSink>,
}
impl<Hash: Clone> PoolEventNotifier<Hash> {
fn notify(&mut self, event: TransactionEvent<Hash>) {
impl PoolEventNotifier {
fn notify(&mut self, event: TransactionEvent) {
self.senders.retain(|sender| sender.send(event.clone()).is_ok())
}
@ -93,7 +88,7 @@ impl<Hash: Clone> PoolEventNotifier<Hash> {
}
/// Transaction was replaced with the given transaction
fn replaced(&mut self, hash: Hash) {
fn replaced(&mut self, hash: TxHash) {
self.notify(TransactionEvent::Replaced(hash));
self.is_done = true;
}

View File

@ -105,7 +105,7 @@ pub struct PoolInner<V: TransactionValidator, T: TransactionOrdering> {
/// Pool settings.
config: PoolConfig,
/// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventListener<TxHash>>,
event_listener: RwLock<PoolEventListener>,
/// Listeners for new ready transactions.
pending_transaction_listener: Mutex<Vec<mpsc::Sender<TxHash>>>,
/// Listeners for new transactions added to the pool.