refactor(txpool): cleanup pass (#53)

* chore: some cleanup

* refactor(txpool): simplify layers and add docs

* refactor: more cleanup

* refactor: cleanup and simplifications
This commit is contained in:
Matthias Seitz
2022-10-12 18:23:08 +02:00
committed by GitHub
parent 4fb99848fd
commit 6d4e39deef
10 changed files with 156 additions and 114 deletions

8
Cargo.lock generated
View File

@ -276,9 +276,9 @@ dependencies = [
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.0.12" version = "4.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "385007cbbed899260395a4107435fead4cad80684461b3cc78238bdcb0bad58f" checksum = "6ea54a38e4bce14ff6931c72e5b3c43da7051df056913d4e7e1fcdb1c03df69d"
dependencies = [ dependencies = [
"atty", "atty",
"bitflags", "bitflags",
@ -291,9 +291,9 @@ dependencies = [
[[package]] [[package]]
name = "clap_derive" name = "clap_derive"
version = "4.0.10" version = "4.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db342ce9fda24fb191e2ed4e102055a4d381c1086a06630174cd8da8d5d917ce" checksum = "c42f169caba89a7d512b5418b09864543eeb4d497416c917d7137863bd2076ad"
dependencies = [ dependencies = [
"heck", "heck",
"proc-macro-error", "proc-macro-error",

View File

@ -1,4 +1,5 @@
#![warn(missing_docs)] // unreachable_pub, missing_debug_implementations #![warn(missing_docs)]
// unreachable_pub, missing_debug_implementations
#![allow(unused)] // TODO(mattsse) remove after progress was made #![allow(unused)] // TODO(mattsse) remove after progress was made
#![deny(unused_must_use, rust_2018_idioms)] #![deny(unused_must_use, rust_2018_idioms)]
#![doc(test( #![doc(test(
@ -83,7 +84,7 @@ pub use crate::{
}; };
use crate::{error::PoolResult, pool::PoolInner, validate::ValidPoolTransaction}; use crate::{error::PoolResult, pool::PoolInner, validate::ValidPoolTransaction};
use futures::channel::mpsc::Receiver; use futures::channel::mpsc::Receiver;
use parking_lot::Mutex;
use reth_primitives::{BlockID, TxHash, U256, U64}; use reth_primitives::{BlockID, TxHash, U256, U64};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
@ -162,6 +163,16 @@ where
todo!() todo!()
} }
/// Number of transactions in the entire pool
pub fn len(&self) -> usize {
self.pool.len()
}
/// Whether the pool is empty
pub fn is_empty(&self) -> bool {
self.pool.is_empty()
}
} }
/// implements the `TransactionPool` interface for various transaction pool API consumers. /// implements the `TransactionPool` interface for various transaction pool API consumers.
@ -215,15 +226,10 @@ where
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> { ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
todo!() todo!()
} }
}
/// Tracks the current update status of the pool. fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>> {
#[derive(Debug, Clone, Default)] self.inner().get(tx_hash)
struct UpdateStatus { }
/// Block number when the pool was last updated.
updated_at: U64,
/// Current base fee that needs to be enforced
base_fee: U256,
} }
impl<P: PoolClient, O: TransactionOrdering> Clone for Pool<P, O> { impl<P: PoolClient, O: TransactionOrdering> Clone for Pool<P, O> {

View File

@ -65,15 +65,15 @@
use crate::{ use crate::{
error::PoolResult, pool::listener::PoolEventListener, traits::PoolTransaction, error::PoolResult, pool::listener::PoolEventListener, traits::PoolTransaction,
validate::ValidPoolTransaction, BlockID, PoolClient, PoolConfig, TransactionOrdering, validate::ValidPoolTransaction, PoolClient, PoolConfig, TransactionOrdering,
TransactionValidator, U256, TransactionValidator, U256,
}; };
use best::BestTransactions; use best::BestTransactions;
use futures::channel::mpsc::{channel, Receiver, Sender}; use futures::channel::mpsc::{channel, Receiver, Sender};
use parking_lot::{Mutex, RwLock}; use parking_lot::{Mutex, RwLock};
use reth_primitives::{TxHash, U64}; use reth_primitives::{Address, TxHash};
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc, time::Instant};
use tracing::warn; use tracing::warn;
mod best; mod best;
@ -85,11 +85,17 @@ pub(crate) mod state;
mod transaction; mod transaction;
pub mod txpool; pub mod txpool;
use crate::{pool::txpool::TxPool, validate::TransactionValidationOutcome}; use crate::{
identifier::{SenderId, SenderIdentifiers, TransactionId},
pool::txpool::TxPool,
validate::TransactionValidationOutcome,
};
pub use events::TransactionEvent; pub use events::TransactionEvent;
/// Transaction pool internals. /// Transaction pool internals.
pub struct PoolInner<P: PoolClient, T: TransactionOrdering> { pub struct PoolInner<P: PoolClient, T: TransactionOrdering> {
/// Internal mapping of addresses to plain ints.
identifiers: RwLock<SenderIdentifiers>,
/// Chain/Storage access. /// Chain/Storage access.
client: Arc<P>, client: Arc<P>,
/// The internal pool that manages all transactions. /// The internal pool that manages all transactions.
@ -112,6 +118,7 @@ where
/// Create a new transaction pool instance. /// Create a new transaction pool instance.
pub fn new(client: Arc<P>, ordering: Arc<T>, config: PoolConfig) -> Self { pub fn new(client: Arc<P>, ordering: Arc<T>, config: PoolConfig) -> Self {
Self { Self {
identifiers: Default::default(),
client, client,
config, config,
event_listener: Default::default(), event_listener: Default::default(),
@ -120,6 +127,11 @@ where
} }
} }
/// Returns the internal `SenderId` for this address
pub(crate) fn get_sender_id(&self, addr: Address) -> SenderId {
self.identifiers.write().sender_id_or_create(addr)
}
/// Updates the pool /// Updates the pool
pub(crate) fn update_base_fee(&self, base_fee: U256) { pub(crate) fn update_base_fee(&self, base_fee: U256) {
self.pool.write().update_base_fee(base_fee); self.pool.write().update_base_fee(base_fee);
@ -150,20 +162,28 @@ where
tx: TransactionValidationOutcome<T::Transaction>, tx: TransactionValidationOutcome<T::Transaction>,
) -> PoolResult<TxHash> { ) -> PoolResult<TxHash> {
match tx { match tx {
TransactionValidationOutcome::Valid { balance: _, state_nonce: _, transaction: _ } => { TransactionValidationOutcome::Valid { balance, state_nonce, transaction } => {
// TODO create `ValidPoolTransaction` let sender_id = self.get_sender_id(*transaction.sender());
let transaction_id = TransactionId::new(sender_id, transaction.nonce());
// let added = self.pool.write().add_transaction(tx)?; let tx = ValidPoolTransaction {
// cost: transaction.cost(),
// if let Some(ready) = added.as_ready() { transaction,
// self.on_new_ready_transaction(ready); transaction_id,
// } propagate: false,
// is_local: false,
// self.notify_event_listeners(&added); timestamp: Instant::now(),
// };
// Ok(*added.hash())
todo!() let added = self.pool.write().add_transaction(tx, balance, state_nonce)?;
if let Some(pending_hash) = added.as_pending() {
self.on_new_pending_transaction(pending_hash);
}
self.notify_event_listeners(&added);
Ok(*added.hash())
} }
TransactionValidationOutcome::Invalid(_tx, err) => { TransactionValidationOutcome::Invalid(_tx, err) => {
// TODO notify listeners about invalid // TODO notify listeners about invalid
@ -183,7 +203,7 @@ where
} }
/// Notify all listeners about the new transaction. /// Notify all listeners about the new transaction.
fn on_new_ready_transaction(&self, ready: &TxHash) { fn on_new_pending_transaction(&self, ready: &TxHash) {
let mut transaction_listeners = self.ready_transaction_listener.lock(); let mut transaction_listeners = self.ready_transaction_listener.lock();
transaction_listeners.retain_mut(|listener| match listener.try_send(*ready) { transaction_listeners.retain_mut(|listener| match listener.try_send(*ready) {
Ok(()) => true, Ok(()) => true,
@ -221,6 +241,24 @@ where
pub(crate) fn ready_transactions(&self) -> BestTransactions<T> { pub(crate) fn ready_transactions(&self) -> BestTransactions<T> {
self.pool.read().best_transactions() self.pool.read().best_transactions()
} }
/// Returns the transaction by hash.
pub(crate) fn get(
&self,
tx_hash: &TxHash,
) -> Option<Arc<ValidPoolTransaction<T::Transaction>>> {
self.pool.read().get(tx_hash)
}
/// Number of transactions in the entire pool
pub(crate) fn len(&self) -> usize {
self.pool.read().len()
}
/// Whether the pool is empty
pub(crate) fn is_empty(&self) -> bool {
self.pool.read().is_empty()
}
} }
/// Tracks an added transaction and all graph changes caused by adding it. /// Tracks an added transaction and all graph changes caused by adding it.

View File

@ -172,7 +172,7 @@ impl<T: PoolTransaction> Ord for BasefeeOrd<T> {
fn cmp(&self, other: &Self) -> Ordering { fn cmp(&self, other: &Self) -> Ordering {
match (self.0.transaction.max_fee_per_gas(), other.0.transaction.max_fee_per_gas()) { match (self.0.transaction.max_fee_per_gas(), other.0.transaction.max_fee_per_gas()) {
(Some(fee), Some(other)) => fee.cmp(&other), (Some(fee), Some(other)) => fee.cmp(&other),
(None, Some(other)) => Ordering::Less, (None, Some(_other)) => Ordering::Less,
(Some(_), None) => Ordering::Greater, (Some(_), None) => Ordering::Greater,
_ => Ordering::Equal, _ => Ordering::Equal,
} }
@ -187,7 +187,7 @@ pub struct QueuedOrd<T: PoolTransaction>(Arc<ValidPoolTransaction<T>>);
impl_ord_wrapper!(QueuedOrd); impl_ord_wrapper!(QueuedOrd);
impl<T: PoolTransaction> Ord for QueuedOrd<T> { impl<T: PoolTransaction> Ord for QueuedOrd<T> {
fn cmp(&self, other: &Self) -> Ordering { fn cmp(&self, _other: &Self) -> Ordering {
// TODO ideally compare by distance here. // TODO ideally compare by distance here.
Ordering::Equal Ordering::Equal
} }

View File

@ -5,10 +5,9 @@ use crate::{
use reth_primitives::rpc::TxHash; use reth_primitives::rpc::TxHash;
use std::{ use std::{
cmp::Ordering, cmp::Ordering,
collections::{BTreeMap, BTreeSet, HashSet}, collections::{BTreeMap, BTreeSet},
sync::Arc, sync::Arc,
}; };
use tracing::debug;
/// A pool of validated and gapless transactions that are ready on the current state and are waiting /// A pool of validated and gapless transactions that are ready on the current state and are waiting
/// to be included in a block. /// to be included in a block.

View File

@ -16,7 +16,7 @@ use reth_primitives::TxHash;
use std::{ use std::{
collections::{btree_map::Entry, hash_map, BTreeMap, HashMap}, collections::{btree_map::Entry, hash_map, BTreeMap, HashMap},
fmt, fmt,
ops::Bound::{Excluded, Included, Unbounded}, ops::Bound::{Excluded, Unbounded},
sync::Arc, sync::Arc,
}; };
@ -60,8 +60,6 @@ const MIN_PROTOCOL_BASE_FEE: U256 = U256([7, 0, 0, 0]);
/// new --> |apply state changes| pool /// new --> |apply state changes| pool
/// ``` /// ```
pub struct TxPool<T: TransactionOrdering> { pub struct TxPool<T: TransactionOrdering> {
/// How to order transactions.
ordering: Arc<T>,
/// Contains the currently known info /// Contains the currently known info
sender_info: FnvHashMap<SenderId, SenderInfo>, sender_info: FnvHashMap<SenderId, SenderInfo>,
/// pending subpool /// pending subpool
@ -87,17 +85,16 @@ impl<T: TransactionOrdering> TxPool<T> {
pub fn new(ordering: Arc<T>) -> Self { pub fn new(ordering: Arc<T>) -> Self {
Self { Self {
sender_info: Default::default(), sender_info: Default::default(),
pending_pool: PendingPool::new(ordering.clone()), pending_pool: PendingPool::new(ordering),
queued_pool: Default::default(), queued_pool: Default::default(),
basefee_pool: Default::default(), basefee_pool: Default::default(),
all_transactions: Default::default(), all_transactions: Default::default(),
ordering,
} }
} }
/// Updates the pool based on the changed base fee. /// Updates the pool based on the changed base fee.
/// ///
/// This enforces the dynamic fee requirement. /// This enforces the dynamic fee requirement.
pub(crate) fn update_base_fee(&mut self, new_base_fee: U256) { pub(crate) fn update_base_fee(&mut self, _new_base_fee: U256) {
// TODO update according to the changed base_fee // TODO update according to the changed base_fee
todo!() todo!()
} }
@ -144,7 +141,10 @@ impl<T: TransactionOrdering> TxPool<T> {
on_chain_nonce: u64, on_chain_nonce: u64,
) -> PoolResult<AddedTransaction<T::Transaction>> { ) -> PoolResult<AddedTransaction<T::Transaction>> {
// Update sender info // Update sender info
self.sender_info.entry(tx.sender_id).or_default().update(on_chain_nonce, on_chain_balance); self.sender_info
.entry(tx.sender_id())
.or_default()
.update(on_chain_nonce, on_chain_balance);
let hash = *tx.hash(); let hash = *tx.hash();
@ -265,6 +265,16 @@ impl<T: TransactionOrdering> TxPool<T> {
pub fn enforce_size_limits(&mut self) { pub fn enforce_size_limits(&mut self) {
unimplemented!() unimplemented!()
} }
/// Number of transactions in the entire pool
pub(crate) fn len(&self) -> usize {
self.all_transactions.len()
}
/// Whether the pool is empty
pub(crate) fn is_empty(&self) -> bool {
self.all_transactions.is_empty()
}
} }
/// Container for _all_ transaction in the pool. /// Container for _all_ transaction in the pool.
@ -319,6 +329,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
/// Returns an iterator over all transactions for the given sender, starting with the lowest /// Returns an iterator over all transactions for the given sender, starting with the lowest
/// nonce /// nonce
#[cfg(test)]
pub(crate) fn txs_iter( pub(crate) fn txs_iter(
&self, &self,
sender: SenderId, sender: SenderId,
@ -330,6 +341,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
/// Returns a mutable iterator over all transactions for the given sender, starting with the /// Returns a mutable iterator over all transactions for the given sender, starting with the
/// lowest nonce /// lowest nonce
#[cfg(test)]
pub(crate) fn txs_iter_mut( pub(crate) fn txs_iter_mut(
&mut self, &mut self,
sender: SenderId, sender: SenderId,
@ -339,51 +351,6 @@ impl<T: PoolTransaction> AllTransactions<T> {
.take_while(move |(other, _)| sender == other.sender) .take_while(move |(other, _)| sender == other.sender)
} }
/// Returns all transactions that predates the given transaction.
///
/// NOTE: The range is _inclusive_
pub(crate) fn ancestor_txs<'a, 'b: 'a>(
&'a self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a PoolInternalTransaction<T>)> + '_ {
self.txs
.range((Unbounded, Included(id)))
.rev()
.take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all mutable transactions that predates the given transaction.
///
/// NOTE: The range is _inclusive_
pub(crate) fn ancestor_txs_mut<'a, 'b: 'a>(
&'a mut self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a mut PoolInternalTransaction<T>)> + '_ {
self.txs
.range_mut((Unbounded, Included(id)))
.rev()
.take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all transactions that predates the given transaction.
///
/// NOTE: The range is _exclusive_: This does not return the transaction itself
pub(crate) fn ancestor_txs_exclusive<'a, 'b: 'a>(
&'a self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a PoolInternalTransaction<T>)> + '_ {
self.txs.range(..id).rev().take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all transactions that _follow_ after the given id but have the same sender.
///
/// NOTE: The range is _exclusive_
pub(crate) fn descendant_txs_exclusive<'a, 'b: 'a>(
&'a self,
id: &'b TransactionId,
) -> impl Iterator<Item = (&'a TransactionId, &'a PoolInternalTransaction<T>)> + '_ {
self.txs.range((Excluded(id), Unbounded)).take_while(|(other, _)| id.sender == other.sender)
}
/// Returns all transactions that _follow_ after the given id but have the same sender. /// Returns all transactions that _follow_ after the given id but have the same sender.
/// ///
/// NOTE: The range is _exclusive_ /// NOTE: The range is _exclusive_
@ -420,10 +387,19 @@ impl<T: PoolTransaction> AllTransactions<T> {
/// Removes a transaction from the pool after it was mined. /// Removes a transaction from the pool after it was mined.
/// ///
/// This will not trigger additional updates since, because descendants without nonce gaps are /// This will _not_ trigger additional updates, because descendants without nonce gaps are
/// already in the pending pool. /// already in the pending pool, and this transaction will be the first transaction of the
pub(crate) fn remove_mined_tx(&mut self, _id: &TransactionId) { /// sender in this pool.
// TODO decrease nonce gap pub(crate) fn remove_mined_tx(
&mut self,
id: &TransactionId,
) -> Option<Arc<ValidPoolTransaction<T>>> {
let tx = self.txs.remove(id)?;
// decrement the counter for the sender.
self.tx_decr(tx.transaction.sender_id());
self.by_hash.remove(tx.transaction.hash())
} }
/// Inserts a new transaction into the pool. /// Inserts a new transaction into the pool.
@ -513,7 +489,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
} }
// The next transaction of this sender // The next transaction of this sender
let on_chain_id = TransactionId::new(transaction.sender_id, on_chain_nonce); let on_chain_id = TransactionId::new(transaction.sender_id(), on_chain_nonce);
{ {
// Tracks the next nonce we expect if the transactions are gapless // Tracks the next nonce we expect if the transactions are gapless
let mut next_nonce = on_chain_id.nonce; let mut next_nonce = on_chain_id.nonce;
@ -533,7 +509,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
tx.cumulative_cost = cumulative_cost; tx.cumulative_cost = cumulative_cost;
// Update for next transaction // Update for next transaction
cumulative_cost = tx.cumulative_cost + tx.transaction.cost; cumulative_cost = tx.next_cumulative_cost();
if cumulative_cost > on_chain_balance { if cumulative_cost > on_chain_balance {
// sender lacks sufficient funds to pay for this transaction // sender lacks sufficient funds to pay for this transaction
@ -572,7 +548,7 @@ impl<T: PoolTransaction> AllTransactions<T> {
/// Rechecks the transaction of the given sender and returns a set of updates. /// Rechecks the transaction of the given sender and returns a set of updates.
pub(crate) fn on_mined(&mut self, _sender: &SenderId, _new_balance: U256, _old_balance: U256) { pub(crate) fn on_mined(&mut self, _sender: &SenderId, _new_balance: U256, _old_balance: U256) {
todo!() todo!("ideally we want to process updates in bulk")
} }
/// Number of transactions in the entire pool /// Number of transactions in the entire pool
@ -639,9 +615,13 @@ pub(crate) enum InsertResult<T: PoolTransaction> {
// === impl InsertResult === // === impl InsertResult ===
// Some test helpers
#[allow(missing_docs)] #[allow(missing_docs)]
#[cfg(test)]
impl<T: PoolTransaction> InsertResult<T> { impl<T: PoolTransaction> InsertResult<T> {
fn is_underpriced(&self) -> bool { /// Returns true if the result is underpriced variant
pub(crate) fn is_underpriced(&self) -> bool {
matches!(self, InsertResult::Underpriced { .. }) matches!(self, InsertResult::Underpriced { .. })
} }
} }
@ -743,7 +723,7 @@ impl SenderInfo {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::test_util::{mock_tx_pool, MockTransaction, MockTransactionFactory}; use crate::test_util::{MockTransaction, MockTransactionFactory};
#[test] #[test]
fn test_simple_insert() { fn test_simple_insert() {
@ -805,7 +785,7 @@ mod tests {
let mut pool = AllTransactions::default(); let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_price().inc_limit(); let tx = MockTransaction::eip1559().inc_price().inc_limit();
let first = f.validated(tx.clone()); let first = f.validated(tx.clone());
let res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); let _res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce);
let replacement = f.validated(tx.rng_hash().inc_price()); let replacement = f.validated(tx.rng_hash().inc_price());
let res = pool.insert_tx(replacement.clone(), on_chain_balance, on_chain_nonce); let res = pool.insert_tx(replacement.clone(), on_chain_balance, on_chain_nonce);
match res { match res {
@ -832,7 +812,7 @@ mod tests {
let mut pool = AllTransactions::default(); let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_nonce().inc_price().inc_limit(); let tx = MockTransaction::eip1559().inc_nonce().inc_price().inc_limit();
let first = f.validated(tx.clone()); let first = f.validated(tx.clone());
let res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); let _res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce);
let first_in_pool = pool.get(first.id()).unwrap(); let first_in_pool = pool.get(first.id()).unwrap();
// has nonce gap // has nonce gap
@ -868,7 +848,7 @@ mod tests {
let mut pool = AllTransactions::default(); let mut pool = AllTransactions::default();
let tx = MockTransaction::eip1559().inc_nonce().set_gas_price(100u64.into()).inc_limit(); let tx = MockTransaction::eip1559().inc_nonce().set_gas_price(100u64.into()).inc_limit();
let first = f.validated(tx.clone()); let first = f.validated(tx.clone());
let res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); let _res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce);
let first_in_pool = pool.get(first.id()).unwrap(); let first_in_pool = pool.get(first.id()).unwrap();
// has nonce gap // has nonce gap

View File

@ -1,5 +1,4 @@
//! Internal helpers for testing. //! Mock Types
#![allow(missing_docs, unused)]
use crate::{ use crate::{
identifier::{SenderIdentifiers, TransactionId}, identifier::{SenderIdentifiers, TransactionId},
@ -336,7 +335,6 @@ impl MockTransactionFactory {
MockValidTx { MockValidTx {
propagate: false, propagate: false,
is_local: false, is_local: false,
sender_id: transaction_id.sender,
transaction_id, transaction_id,
cost: transaction.cost(), cost: transaction.cost(),
transaction, transaction,

View File

@ -0,0 +1,6 @@
//! Internal helpers for testing.
#![allow(missing_docs, unused)]
mod mock;
pub use mock::*;

View File

@ -1,7 +1,7 @@
use crate::{error::PoolResult, validate::ValidPoolTransaction, BlockID}; use crate::{error::PoolResult, validate::ValidPoolTransaction, BlockID};
use futures::channel::mpsc::Receiver; use futures::channel::mpsc::Receiver;
use reth_primitives::{Address, TxHash, H256, U256}; use reth_primitives::{Address, TxHash, H256, U256};
use std::{fmt, hash::Hash, sync::Arc}; use std::{fmt, sync::Arc};
/// General purpose abstraction fo a transaction-pool. /// General purpose abstraction fo a transaction-pool.
/// ///
@ -61,6 +61,14 @@ pub trait TransactionPool: Send + Sync {
&self, &self,
tx_hashes: &[TxHash], tx_hashes: &[TxHash],
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>; ) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
/// Returns if the transaction for the given hash is already included in this pool.
fn contains(&self, tx_hash: &TxHash) -> bool {
self.get(tx_hash).is_some()
}
/// Returns the transaction for the given hash.
fn get(&self, tx_hash: &TxHash) -> Option<Arc<ValidPoolTransaction<Self::Transaction>>>;
} }
/// Event fired when a new block was mined /// Event fired when a new block was mined
@ -97,7 +105,7 @@ impl<T> BestTransactions for std::iter::Empty<T> {
/// Trait for transaction types used inside the pool /// Trait for transaction types used inside the pool
pub trait PoolTransaction: fmt::Debug + Send + Sync + 'static { pub trait PoolTransaction: fmt::Debug + Send + Sync + 'static {
/// Hash of the transaction /// Hash of the transaction.
fn hash(&self) -> &TxHash; fn hash(&self) -> &TxHash;
/// The Sender of the transaction. /// The Sender of the transaction.
@ -112,6 +120,8 @@ pub trait PoolTransaction: fmt::Debug + Send + Sync + 'static {
fn cost(&self) -> U256; fn cost(&self) -> U256;
/// Returns the effective gas price for this transaction. /// Returns the effective gas price for this transaction.
///
/// This is `priority + basefee`for EIP-1559 and `gasPrice` for legacy transactions.
fn effective_gas_price(&self) -> U256; fn effective_gas_price(&self) -> U256;
/// Amount of gas that should be used in executing this transaction. This is paid up-front. /// Amount of gas that should be used in executing this transaction. This is paid up-front.

View File

@ -5,7 +5,7 @@ use crate::{
identifier::{SenderId, TransactionId}, identifier::{SenderId, TransactionId},
traits::PoolTransaction, traits::PoolTransaction,
}; };
use reth_primitives::{BlockID, TxHash, U256}; use reth_primitives::{rpc::Address, BlockID, TxHash, U256};
use std::{fmt, time::Instant}; use std::{fmt, time::Instant};
/// A Result type returned after checking a transaction's validity. /// A Result type returned after checking a transaction's validity.
@ -19,9 +19,7 @@ pub enum TransactionValidationOutcome<T: PoolTransaction> {
/// Validated transaction. /// Validated transaction.
transaction: T, transaction: T,
}, },
/// The transaction is considered invalid. /// The transaction is considered invalid indefinitely.
///
/// Note: This does not indicate whether the transaction will not be valid in the future
Invalid(T, PoolError), Invalid(T, PoolError),
} }
@ -57,8 +55,6 @@ pub struct ValidPoolTransaction<T: PoolTransaction> {
pub propagate: bool, pub propagate: bool,
/// Whether the tx is from a local source. /// Whether the tx is from a local source.
pub is_local: bool, pub is_local: bool,
/// Internal `Sender` identifier.
pub sender_id: SenderId,
/// Total cost of the transaction: `feeCap x gasLimit + transferred_value`. /// Total cost of the transaction: `feeCap x gasLimit + transferred_value`.
pub cost: U256, pub cost: U256,
/// Timestamp when this was added to the pool. /// Timestamp when this was added to the pool.
@ -73,23 +69,33 @@ impl<T: PoolTransaction> ValidPoolTransaction<T> {
self.transaction.hash() self.transaction.hash()
} }
/// Returns the address of the sender
pub fn sender(&self) -> &Address {
self.transaction.sender()
}
/// Returns the internal identifier for the sender of this transaction
pub(crate) fn sender_id(&self) -> SenderId {
self.transaction_id.sender
}
/// Returns the internal identifier for this transaction. /// Returns the internal identifier for this transaction.
pub(crate) fn id(&self) -> &TransactionId { pub(crate) fn id(&self) -> &TransactionId {
&self.transaction_id &self.transaction_id
} }
/// Returns the nonce set for this transaction. /// Returns the nonce set for this transaction.
pub(crate) fn nonce(&self) -> u64 { pub fn nonce(&self) -> u64 {
self.transaction.nonce() self.transaction.nonce()
} }
/// Returns the EIP-1559 Max base fee the caller is willing to pay. /// Returns the EIP-1559 Max base fee the caller is willing to pay.
pub(crate) fn max_fee_per_gas(&self) -> Option<U256> { pub fn max_fee_per_gas(&self) -> Option<U256> {
self.transaction.max_fee_per_gas() self.transaction.max_fee_per_gas()
} }
/// Amount of gas that should be used in executing this transaction. This is paid up-front. /// Amount of gas that should be used in executing this transaction. This is paid up-front.
pub(crate) fn gas_limit(&self) -> u64 { pub fn gas_limit(&self) -> u64 {
self.transaction.gas_limit() self.transaction.gas_limit()
} }
@ -107,7 +113,6 @@ impl<T: PoolTransaction + Clone> Clone for ValidPoolTransaction<T> {
transaction_id: self.transaction_id, transaction_id: self.transaction_id,
propagate: self.propagate, propagate: self.propagate,
is_local: self.is_local, is_local: self.is_local,
sender_id: self.sender_id,
cost: self.cost, cost: self.cost,
timestamp: self.timestamp, timestamp: self.timestamp,
} }