chore(typool): clippy cleanup, docs and renames (#277)

This commit is contained in:
Matthias Seitz
2022-11-28 14:16:33 +01:00
committed by GitHub
parent b30e8241ce
commit 7867c67eaa
11 changed files with 96 additions and 87 deletions

View File

@ -1,6 +1,6 @@
//! Transaction pool errors
use reth_primitives::{Address, BlockID, TxHash, U256};
use reth_primitives::{Address, TxHash, U256};
/// Transaction pool result type.
pub type PoolResult<T> = Result<T, PoolError>;

View File

@ -1,6 +1,6 @@
use fnv::FnvHashMap;
use reth_primitives::Address;
use std::{collections::HashMap, ops::Bound};
use std::collections::HashMap;
/// An internal mapping of addresses.
///
@ -17,6 +17,7 @@ pub struct SenderIdentifiers {
impl SenderIdentifiers {
/// Returns the address for the given identifier.
#[allow(unused)]
pub fn address(&self, id: &SenderId) -> Option<&Address> {
self.sender_to_address.get(id)
}
@ -56,8 +57,9 @@ pub struct SenderId(u64);
impl SenderId {
/// Returns a `Bound` for `TransactionId` starting with nonce `0`
pub(crate) fn start_bound(self) -> Bound<TransactionId> {
Bound::Included(TransactionId::new(self, 0))
#[cfg(test)]
pub(crate) fn start_bound(self) -> std::ops::Bound<TransactionId> {
std::ops::Bound::Included(TransactionId::new(self, 0))
}
}

View File

@ -1,6 +1,5 @@
#![warn(missing_docs)]
// unreachable_pub, missing_debug_implementations
#![allow(unused)] // TODO(mattsse) remove after progress was made
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
@ -86,11 +85,11 @@ pub use crate::{
};
use crate::{
error::PoolResult,
pool::{OnNewBlockOutcome, PoolInner},
traits::{NewTransactionEvent, PoolStatus, TransactionOrigin},
pool::PoolInner,
traits::{NewTransactionEvent, PoolSize, TransactionOrigin},
validate::ValidPoolTransaction,
};
use reth_primitives::{BlockID, TxHash, U256, U64};
use reth_primitives::{TxHash, U256};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::mpsc::Receiver;
@ -128,6 +127,11 @@ where
&self.pool
}
/// Get the config the pool was configured with.
pub fn config(&self) -> &PoolConfig {
self.inner().config()
}
/// Returns future that validates all transaction in the given iterator.
async fn validate_all(
&self,
@ -178,8 +182,8 @@ where
{
type Transaction = T::Transaction;
fn status(&self) -> PoolStatus {
self.pool.status()
fn status(&self) -> PoolSize {
self.pool.size()
}
fn on_new_block(&self, event: OnNewBlockEvent) {

View File

@ -2,25 +2,28 @@
use crate::{pool::events::TransactionEvent, traits::PropagateKind};
use reth_primitives::{rpc::TxHash, H256};
use std::{collections::HashMap, hash};
use std::collections::HashMap;
use tokio::sync::mpsc::UnboundedSender;
type EventSink = UnboundedSender<TransactionEvent>;
type EventBroadcast = UnboundedSender<TransactionEvent>;
/// Transaction pool event listeners.
/// A type that broadcasts [`TransactionEvent`] to installed listeners.
///
/// This is essentially a multi-producer, multi-consumer channel where each event is broadcasted to
/// all active receivers.
#[derive(Default)]
pub(crate) struct PoolEventListener {
pub(crate) struct PoolEventBroadcast {
/// All listeners for certain transaction events.
listeners: HashMap<TxHash, PoolEventNotifier>,
broadcasters: HashMap<TxHash, PoolEventBroadcaster>,
}
impl PoolEventListener {
/// Calls the notification callback with the `PoolEventListenerSender` that belongs to the hash.
fn notify_with<F>(&mut self, hash: &TxHash, callback: F)
impl PoolEventBroadcast {
/// Calls the broadcast callback with the `PoolEventBroadcaster` that belongs to the hash.
fn broadcast_with<F>(&mut self, hash: &TxHash, callback: F)
where
F: FnOnce(&mut PoolEventNotifier),
F: FnOnce(&mut PoolEventBroadcaster),
{
let is_done = if let Some(sink) = self.listeners.get_mut(hash) {
let is_done = if let Some(sink) = self.broadcasters.get_mut(hash) {
callback(sink);
sink.is_done()
} else {
@ -28,53 +31,55 @@ impl PoolEventListener {
};
if is_done {
self.listeners.remove(hash);
self.broadcasters.remove(hash);
}
}
/// Notify listeners about a transaction that was added to the pending queue.
pub(crate) fn pending(&mut self, tx: &TxHash, replaced: Option<&TxHash>) {
self.notify_with(tx, |notifier| notifier.pending());
self.broadcast_with(tx, |notifier| notifier.pending());
if let Some(replaced) = replaced {
// notify listeners that this transaction was replaced
self.notify_with(replaced, |notifier| notifier.replaced(*tx));
self.broadcast_with(replaced, |notifier| notifier.replaced(*tx));
}
}
/// Notify listeners about a transaction that was added to the queued pool.
pub(crate) fn queued(&mut self, tx: &TxHash) {
self.notify_with(tx, |notifier| notifier.queued());
self.broadcast_with(tx, |notifier| notifier.queued());
}
/// Notify listeners about a transaction that was propagated.
pub(crate) fn propagated(&mut self, tx: &TxHash, peers: Vec<PropagateKind>) {
self.notify_with(tx, |notifier| notifier.propagated(peers));
self.broadcast_with(tx, |notifier| notifier.propagated(peers));
}
/// Notify listeners about a transaction that was discarded.
pub(crate) fn discarded(&mut self, tx: &TxHash) {
self.notify_with(tx, |notifier| notifier.discarded());
self.broadcast_with(tx, |notifier| notifier.discarded());
}
/// Notify listeners that the transaction was mined
pub(crate) fn mined(&mut self, tx: &TxHash, block_hash: H256) {
self.notify_with(tx, |notifier| notifier.mined(block_hash));
self.broadcast_with(tx, |notifier| notifier.mined(block_hash));
}
}
/// Sender half(s) of the event channels for a specific transaction
/// All Sender half(s) of the event channels for a specific transaction.
///
/// This mimics [tokio::sync::broadcast] but uses separate channels.
#[derive(Debug)]
struct PoolEventNotifier {
struct PoolEventBroadcaster {
/// Tracks whether the transaction this notifier can stop because the transaction was
/// completed, or removed.
is_done: bool,
/// Corresponding sender half(s) for event listener channel
senders: Vec<EventSink>,
senders: Vec<EventBroadcast>,
}
impl PoolEventNotifier {
fn notify(&mut self, event: TransactionEvent) {
impl PoolEventBroadcaster {
fn broadcast(&mut self, event: TransactionEvent) {
self.senders.retain(|sender| sender.send(event.clone()).is_ok())
}
@ -84,34 +89,34 @@ impl PoolEventNotifier {
/// Transaction was moved to the pending queue.
fn pending(&mut self) {
self.notify(TransactionEvent::Pending)
self.broadcast(TransactionEvent::Pending)
}
/// Transaction was moved to the queued pool
fn queued(&mut self) {
self.notify(TransactionEvent::Queued)
self.broadcast(TransactionEvent::Queued)
}
/// Transaction was replaced with the given transaction
fn replaced(&mut self, hash: TxHash) {
self.notify(TransactionEvent::Replaced(hash));
self.broadcast(TransactionEvent::Replaced(hash));
self.is_done = true;
}
/// Transaction was mined.
fn mined(&mut self, block_hash: H256) {
self.notify(TransactionEvent::Mined(block_hash));
self.broadcast(TransactionEvent::Mined(block_hash));
self.is_done = true;
}
/// Transaction was propagated.
fn propagated(&mut self, peers: Vec<PropagateKind>) {
self.notify(TransactionEvent::Propagated(peers));
self.broadcast(TransactionEvent::Propagated(peers));
}
/// Transaction was replaced with the given transaction
fn discarded(&mut self) {
self.notify(TransactionEvent::Discarded);
self.broadcast(TransactionEvent::Discarded);
self.is_done = true;
}
}

View File

@ -65,25 +65,23 @@
//! transactions are _currently_ waiting for state changes that eventually move them into
//! category (2.) and become pending.
#![allow(dead_code)] // TODO(mattsse): remove once remaining checks implemented
use crate::{
error::{PoolError, PoolResult},
identifier::{SenderId, SenderIdentifiers, TransactionId},
pool::{listener::PoolEventListener, state::SubPool, txpool::TxPool},
pool::{listener::PoolEventBroadcast, state::SubPool, txpool::TxPool},
traits::{
NewTransactionEvent, PoolStatus, PoolTransaction, PropagatedTransactions, TransactionOrigin,
NewTransactionEvent, PoolSize, PoolTransaction, PropagatedTransactions, TransactionOrigin,
},
validate::{TransactionValidationOutcome, ValidPoolTransaction},
OnNewBlockEvent, PoolConfig, TransactionOrdering, TransactionValidator, U256,
OnNewBlockEvent, PoolConfig, TransactionOrdering, TransactionValidator,
};
use best::BestTransactions;
pub use events::TransactionEvent;
use parking_lot::{Mutex, RwLock};
use reth_primitives::{Address, TxHash, H256};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Instant,
};
use std::{collections::HashSet, sync::Arc, time::Instant};
use tokio::sync::mpsc;
use tracing::warn;
@ -109,7 +107,7 @@ pub struct PoolInner<V: TransactionValidator, T: TransactionOrdering> {
/// Pool settings.
config: PoolConfig,
/// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventListener>,
event_listener: RwLock<PoolEventBroadcast>,
/// Listeners for new ready transactions.
pending_transaction_listener: Mutex<Vec<mpsc::Sender<TxHash>>>,
/// Listeners for new transactions added to the pool.
@ -136,9 +134,9 @@ where
}
}
/// Returns stats about the pool.
pub(crate) fn status(&self) -> PoolStatus {
self.pool.read().status()
/// Returns stats about the size of the pool.
pub(crate) fn size(&self) -> PoolSize {
self.pool.read().size()
}
/// Returns the internal `SenderId` for this address
@ -146,13 +144,18 @@ where
self.identifiers.write().sender_id_or_create(addr)
}
/// Get the config the pool was configured with.
pub fn config(&self) -> &PoolConfig {
&self.config
}
/// Get the validator reference.
pub fn validator(&self) -> &V {
&self.validator
}
/// Adds a new transaction listener to the pool that gets notified about every new _ready_
/// transaction
/// Adds a new transaction listener to the pool that gets notified about every new _pending_
/// transaction.
pub fn add_pending_listener(&self) -> mpsc::Receiver<TxHash> {
const TX_LISTENER_BUFFER_SIZE: usize = 2048;
let (tx, rx) = mpsc::channel(TX_LISTENER_BUFFER_SIZE);
@ -160,12 +163,6 @@ where
rx
}
/// Returns hashes of _all_ transactions in the pool.
pub(crate) fn pooled_transactions(&self) -> Vec<TxHash> {
let pool = self.pool.read();
pool.all().hashes_iter().collect()
}
/// Adds a new transaction listener to the pool that gets notified about every new transaction
pub fn add_transaction_listener(&self) -> mpsc::Receiver<NewTransactionEvent<T::Transaction>> {
const TX_LISTENER_BUFFER_SIZE: usize = 1024;
@ -174,17 +171,18 @@ where
rx
}
/// Updates the entire pool after a new block was mined.
/// Returns hashes of _all_ transactions in the pool.
pub(crate) fn pooled_transactions(&self) -> Vec<TxHash> {
let pool = self.pool.read();
pool.all().hashes_iter().collect()
}
/// Updates the entire pool after a new block was executed.
pub(crate) fn on_new_block(&self, block: OnNewBlockEvent) {
let outcome = self.pool.write().on_new_block(block);
self.notify_on_new_block(outcome);
}
/// Resubmits transactions back into the pool.
pub fn resubmit(&self, _transactions: HashMap<TxHash, ValidPoolTransaction<T::Transaction>>) {
unimplemented!()
}
/// Add a single validated transaction into the pool.
///
/// Note: this is only used internally by [`Self::add_transactions()`], all new transaction(s)
@ -224,8 +222,9 @@ where
Ok(hash)
}
TransactionValidationOutcome::Invalid(_tx, err) => {
// TODO notify listeners about invalid
TransactionValidationOutcome::Invalid(tx, err) => {
let mut listener = self.event_listener.write();
listener.discarded(tx.hash());
Err(err)
}
}
@ -291,7 +290,7 @@ where
if matches!(err, mpsc::error::TrySendError::Full(_)) {
warn!(
target: "txpool",
"dropping full transaction listener",
"skipping transaction on full transaction listener",
);
true
} else {

View File

@ -83,6 +83,7 @@ impl<T: ParkedOrd> ParkedPool<T> {
}
/// Whether the pool is empty
#[cfg(test)]
pub(crate) fn is_empty(&self) -> bool {
self.by_id.is_empty()
}

View File

@ -12,13 +12,13 @@ pub struct SizeTracker(isize);
impl AddAssign<usize> for SizeTracker {
fn add_assign(&mut self, rhs: usize) {
self.0 += (rhs as isize)
self.0 += rhs as isize
}
}
impl SubAssign<usize> for SizeTracker {
fn sub_assign(&mut self, rhs: usize) {
self.0 -= (rhs as isize)
self.0 -= rhs as isize
}
}

View File

@ -11,7 +11,7 @@ use crate::{
update::{Destination, PoolUpdate},
AddedPendingTransaction, AddedTransaction, OnNewBlockOutcome,
},
traits::{PoolStatus, StateDiff},
traits::{PoolSize, StateDiff},
OnNewBlockEvent, PoolConfig, PoolResult, PoolTransaction, TransactionOrdering,
ValidPoolTransaction, U256,
};
@ -19,7 +19,7 @@ use fnv::FnvHashMap;
use reth_primitives::{TxHash, H256};
use std::{
cmp::Ordering,
collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet},
collections::{btree_map::Entry, hash_map, BTreeMap, HashMap},
fmt,
ops::Bound::{Excluded, Unbounded},
sync::Arc,
@ -107,9 +107,9 @@ impl<T: TransactionOrdering> TxPool<T> {
&self.all_transactions
}
/// Returns stats about the pool.
pub(crate) fn status(&self) -> PoolStatus {
PoolStatus {
/// Returns stats about the size of pool.
pub(crate) fn size(&self) -> PoolSize {
PoolSize {
pending: self.pending_pool.len(),
pending_size: self.pending_pool.size(),
basefee: self.basefee_pool.len(),
@ -210,7 +210,7 @@ impl<T: TransactionOrdering> TxPool<T> {
.or_default()
.update(on_chain_nonce, on_chain_balance);
let hash = *tx.hash();
let _hash = *tx.hash();
match self.all_transactions.insert_tx(tx, on_chain_balance, on_chain_nonce) {
Ok(InsertOk { transaction, move_to, replaced_tx, updates, .. }) => {
@ -478,6 +478,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
}
/// Returns the internal transaction with additional metadata
#[cfg(test)]
pub(crate) fn get(&self, id: &TransactionId) -> Option<&PoolInternalTransaction<T>> {
self.txs.get(id)
}
@ -518,7 +519,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
pub(crate) fn update(
&mut self,
pending_block_base_fee: U256,
state_diffs: &StateDiff,
_state_diffs: &StateDiff,
) -> Vec<PoolUpdate> {
// update new basefee
self.pending_basefee = pending_block_base_fee;
@ -636,6 +637,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
/// Returns an iterator over all transactions for the given sender, starting with the lowest
/// nonce
#[cfg(test)]
#[allow(unused)]
pub(crate) fn txs_iter(
&self,
sender: SenderId,
@ -648,6 +650,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
/// Returns a mutable iterator over all transactions for the given sender, starting with the
/// lowest nonce
#[cfg(test)]
#[allow(unused)]
pub(crate) fn txs_iter_mut(
&mut self,
sender: SenderId,

View File

@ -1,11 +1,6 @@
//! Support types for updating the pool.
use crate::{
identifier::TransactionId,
pool::{state::SubPool, txpool::PoolInternalTransaction},
PoolTransaction,
};
use crate::{identifier::TransactionId, pool::state::SubPool};
use reth_primitives::TxHash;
use std::ops::{Deref, DerefMut};
/// A change of the transaction's location
///

View File

@ -1,4 +1,4 @@
use crate::{error::PoolResult, pool::state::SubPool, validate::ValidPoolTransaction, BlockID};
use crate::{error::PoolResult, pool::state::SubPool, validate::ValidPoolTransaction};
use reth_primitives::{Address, FromRecoveredTransaction, PeerId, TxHash, H256, U256};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt, sync::Arc};
@ -15,7 +15,7 @@ pub trait TransactionPool: Send + Sync + 'static {
type Transaction: PoolTransaction;
/// Returns stats about the pool.
fn status(&self) -> PoolStatus;
fn status(&self) -> PoolSize;
/// Event listener for when a new block was mined.
///
@ -271,7 +271,7 @@ pub trait PoolTransaction: fmt::Debug + Send + Sync + FromRecoveredTransaction {
/// Represents the current status of the pool.
#[derive(Debug, Clone)]
pub struct PoolStatus {
pub struct PoolSize {
/// Number of transactions in the _pending_ sub-pool.
pub pending: usize,
/// Reported size of transactions in the _pending_ sub-pool.

View File

@ -5,7 +5,7 @@ use crate::{
identifier::{SenderId, TransactionId},
traits::{PoolTransaction, TransactionOrigin},
};
use reth_primitives::{rpc::Address, BlockID, TxHash, U256};
use reth_primitives::{rpc::Address, TxHash, U256};
use std::{fmt, time::Instant};
/// A Result type returned after checking a transaction's validity.
@ -103,7 +103,7 @@ impl<T: PoolTransaction> ValidPoolTransaction<T> {
}
/// Whether the transaction originated locally.
pub(crate) fn is_local(&self) -> bool {
pub fn is_local(&self) -> bool {
self.origin.is_local()
}