diff --git a/Cargo.lock b/Cargo.lock index 038a13ca1..a9f68173a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,9 +276,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.0.12" +version = "4.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "385007cbbed899260395a4107435fead4cad80684461b3cc78238bdcb0bad58f" +checksum = "6ea54a38e4bce14ff6931c72e5b3c43da7051df056913d4e7e1fcdb1c03df69d" dependencies = [ "atty", "bitflags", @@ -291,9 +291,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.0.10" +version = "4.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db342ce9fda24fb191e2ed4e102055a4d381c1086a06630174cd8da8d5d917ce" +checksum = "c42f169caba89a7d512b5418b09864543eeb4d497416c917d7137863bd2076ad" dependencies = [ "heck", "proc-macro-error", diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index e39f9ae31..4d7039de3 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -1,4 +1,5 @@ -#![warn(missing_docs)] // unreachable_pub, missing_debug_implementations +#![warn(missing_docs)] +// unreachable_pub, missing_debug_implementations #![allow(unused)] // TODO(mattsse) remove after progress was made #![deny(unused_must_use, rust_2018_idioms)] #![doc(test( @@ -83,7 +84,7 @@ pub use crate::{ }; use crate::{error::PoolResult, pool::PoolInner, validate::ValidPoolTransaction}; use futures::channel::mpsc::Receiver; -use parking_lot::Mutex; + use reth_primitives::{BlockID, TxHash, U256, U64}; use std::{collections::HashMap, sync::Arc}; @@ -162,6 +163,16 @@ where todo!() } + + /// Number of transactions in the entire pool + pub fn len(&self) -> usize { + self.pool.len() + } + + /// Whether the pool is empty + pub fn is_empty(&self) -> bool { + self.pool.is_empty() + } } /// implements the `TransactionPool` interface for various transaction pool API consumers. @@ -215,15 +226,10 @@ where ) -> Vec>> { todo!() } -} -/// Tracks the current update status of the pool. -#[derive(Debug, Clone, Default)] -struct UpdateStatus { - /// Block number when the pool was last updated. - updated_at: U64, - /// Current base fee that needs to be enforced - base_fee: U256, + fn get(&self, tx_hash: &TxHash) -> Option>> { + self.inner().get(tx_hash) + } } impl Clone for Pool { diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index 4c6990746..8c60f1425 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -65,15 +65,15 @@ use crate::{ error::PoolResult, pool::listener::PoolEventListener, traits::PoolTransaction, - validate::ValidPoolTransaction, BlockID, PoolClient, PoolConfig, TransactionOrdering, + validate::ValidPoolTransaction, PoolClient, PoolConfig, TransactionOrdering, TransactionValidator, U256, }; use best::BestTransactions; use futures::channel::mpsc::{channel, Receiver, Sender}; use parking_lot::{Mutex, RwLock}; -use reth_primitives::{TxHash, U64}; -use std::{collections::HashMap, sync::Arc}; +use reth_primitives::{Address, TxHash}; +use std::{collections::HashMap, sync::Arc, time::Instant}; use tracing::warn; mod best; @@ -85,11 +85,17 @@ pub(crate) mod state; mod transaction; pub mod txpool; -use crate::{pool::txpool::TxPool, validate::TransactionValidationOutcome}; +use crate::{ + identifier::{SenderId, SenderIdentifiers, TransactionId}, + pool::txpool::TxPool, + validate::TransactionValidationOutcome, +}; pub use events::TransactionEvent; /// Transaction pool internals. pub struct PoolInner { + /// Internal mapping of addresses to plain ints. + identifiers: RwLock, /// Chain/Storage access. client: Arc

, /// The internal pool that manages all transactions. @@ -112,6 +118,7 @@ where /// Create a new transaction pool instance. pub fn new(client: Arc

, ordering: Arc, config: PoolConfig) -> Self { Self { + identifiers: Default::default(), client, config, event_listener: Default::default(), @@ -120,6 +127,11 @@ where } } + /// Returns the internal `SenderId` for this address + pub(crate) fn get_sender_id(&self, addr: Address) -> SenderId { + self.identifiers.write().sender_id_or_create(addr) + } + /// Updates the pool pub(crate) fn update_base_fee(&self, base_fee: U256) { self.pool.write().update_base_fee(base_fee); @@ -150,20 +162,28 @@ where tx: TransactionValidationOutcome, ) -> PoolResult { match tx { - TransactionValidationOutcome::Valid { balance: _, state_nonce: _, transaction: _ } => { - // TODO create `ValidPoolTransaction` + TransactionValidationOutcome::Valid { balance, state_nonce, transaction } => { + let sender_id = self.get_sender_id(*transaction.sender()); + let transaction_id = TransactionId::new(sender_id, transaction.nonce()); - // let added = self.pool.write().add_transaction(tx)?; - // - // if let Some(ready) = added.as_ready() { - // self.on_new_ready_transaction(ready); - // } - // - // self.notify_event_listeners(&added); - // - // Ok(*added.hash()) + let tx = ValidPoolTransaction { + cost: transaction.cost(), + transaction, + transaction_id, + propagate: false, + is_local: false, + timestamp: Instant::now(), + }; - todo!() + let added = self.pool.write().add_transaction(tx, balance, state_nonce)?; + + if let Some(pending_hash) = added.as_pending() { + self.on_new_pending_transaction(pending_hash); + } + + self.notify_event_listeners(&added); + + Ok(*added.hash()) } TransactionValidationOutcome::Invalid(_tx, err) => { // TODO notify listeners about invalid @@ -183,7 +203,7 @@ where } /// Notify all listeners about the new transaction. - fn on_new_ready_transaction(&self, ready: &TxHash) { + fn on_new_pending_transaction(&self, ready: &TxHash) { let mut transaction_listeners = self.ready_transaction_listener.lock(); transaction_listeners.retain_mut(|listener| match listener.try_send(*ready) { Ok(()) => true, @@ -221,6 +241,24 @@ where pub(crate) fn ready_transactions(&self) -> BestTransactions { self.pool.read().best_transactions() } + + /// Returns the transaction by hash. + pub(crate) fn get( + &self, + tx_hash: &TxHash, + ) -> Option>> { + self.pool.read().get(tx_hash) + } + + /// Number of transactions in the entire pool + pub(crate) fn len(&self) -> usize { + self.pool.read().len() + } + + /// Whether the pool is empty + pub(crate) fn is_empty(&self) -> bool { + self.pool.read().is_empty() + } } /// Tracks an added transaction and all graph changes caused by adding it. diff --git a/crates/transaction-pool/src/pool/parked.rs b/crates/transaction-pool/src/pool/parked.rs index 398122cce..b92422776 100644 --- a/crates/transaction-pool/src/pool/parked.rs +++ b/crates/transaction-pool/src/pool/parked.rs @@ -172,7 +172,7 @@ impl Ord for BasefeeOrd { fn cmp(&self, other: &Self) -> Ordering { match (self.0.transaction.max_fee_per_gas(), other.0.transaction.max_fee_per_gas()) { (Some(fee), Some(other)) => fee.cmp(&other), - (None, Some(other)) => Ordering::Less, + (None, Some(_other)) => Ordering::Less, (Some(_), None) => Ordering::Greater, _ => Ordering::Equal, } @@ -187,7 +187,7 @@ pub struct QueuedOrd(Arc>); impl_ord_wrapper!(QueuedOrd); impl Ord for QueuedOrd { - fn cmp(&self, other: &Self) -> Ordering { + fn cmp(&self, _other: &Self) -> Ordering { // TODO ideally compare by distance here. Ordering::Equal } diff --git a/crates/transaction-pool/src/pool/pending.rs b/crates/transaction-pool/src/pool/pending.rs index 0030f42a2..e1bff4c5e 100644 --- a/crates/transaction-pool/src/pool/pending.rs +++ b/crates/transaction-pool/src/pool/pending.rs @@ -5,10 +5,9 @@ use crate::{ use reth_primitives::rpc::TxHash; use std::{ cmp::Ordering, - collections::{BTreeMap, BTreeSet, HashSet}, + collections::{BTreeMap, BTreeSet}, sync::Arc, }; -use tracing::debug; /// A pool of validated and gapless transactions that are ready on the current state and are waiting /// to be included in a block. diff --git a/crates/transaction-pool/src/pool/txpool.rs b/crates/transaction-pool/src/pool/txpool.rs index b352ee711..bac08f04c 100644 --- a/crates/transaction-pool/src/pool/txpool.rs +++ b/crates/transaction-pool/src/pool/txpool.rs @@ -16,7 +16,7 @@ use reth_primitives::TxHash; use std::{ collections::{btree_map::Entry, hash_map, BTreeMap, HashMap}, fmt, - ops::Bound::{Excluded, Included, Unbounded}, + ops::Bound::{Excluded, Unbounded}, sync::Arc, }; @@ -60,8 +60,6 @@ const MIN_PROTOCOL_BASE_FEE: U256 = U256([7, 0, 0, 0]); /// new --> |apply state changes| pool /// ``` pub struct TxPool { - /// How to order transactions. - ordering: Arc, /// Contains the currently known info sender_info: FnvHashMap, /// pending subpool @@ -87,17 +85,16 @@ impl TxPool { pub fn new(ordering: Arc) -> Self { Self { sender_info: Default::default(), - pending_pool: PendingPool::new(ordering.clone()), + pending_pool: PendingPool::new(ordering), queued_pool: Default::default(), basefee_pool: Default::default(), all_transactions: Default::default(), - ordering, } } /// Updates the pool based on the changed base fee. /// /// This enforces the dynamic fee requirement. - pub(crate) fn update_base_fee(&mut self, new_base_fee: U256) { + pub(crate) fn update_base_fee(&mut self, _new_base_fee: U256) { // TODO update according to the changed base_fee todo!() } @@ -144,7 +141,10 @@ impl TxPool { on_chain_nonce: u64, ) -> PoolResult> { // Update sender info - self.sender_info.entry(tx.sender_id).or_default().update(on_chain_nonce, on_chain_balance); + self.sender_info + .entry(tx.sender_id()) + .or_default() + .update(on_chain_nonce, on_chain_balance); let hash = *tx.hash(); @@ -265,6 +265,16 @@ impl TxPool { pub fn enforce_size_limits(&mut self) { unimplemented!() } + + /// Number of transactions in the entire pool + pub(crate) fn len(&self) -> usize { + self.all_transactions.len() + } + + /// Whether the pool is empty + pub(crate) fn is_empty(&self) -> bool { + self.all_transactions.is_empty() + } } /// Container for _all_ transaction in the pool. @@ -319,6 +329,7 @@ impl AllTransactions { /// Returns an iterator over all transactions for the given sender, starting with the lowest /// nonce + #[cfg(test)] pub(crate) fn txs_iter( &self, sender: SenderId, @@ -330,6 +341,7 @@ impl AllTransactions { /// Returns a mutable iterator over all transactions for the given sender, starting with the /// lowest nonce + #[cfg(test)] pub(crate) fn txs_iter_mut( &mut self, sender: SenderId, @@ -339,51 +351,6 @@ impl AllTransactions { .take_while(move |(other, _)| sender == other.sender) } - /// Returns all transactions that predates the given transaction. - /// - /// NOTE: The range is _inclusive_ - pub(crate) fn ancestor_txs<'a, 'b: 'a>( - &'a self, - id: &'b TransactionId, - ) -> impl Iterator)> + '_ { - self.txs - .range((Unbounded, Included(id))) - .rev() - .take_while(|(other, _)| id.sender == other.sender) - } - - /// Returns all mutable transactions that predates the given transaction. - /// - /// NOTE: The range is _inclusive_ - pub(crate) fn ancestor_txs_mut<'a, 'b: 'a>( - &'a mut self, - id: &'b TransactionId, - ) -> impl Iterator)> + '_ { - self.txs - .range_mut((Unbounded, Included(id))) - .rev() - .take_while(|(other, _)| id.sender == other.sender) - } - - /// Returns all transactions that predates the given transaction. - /// - /// NOTE: The range is _exclusive_: This does not return the transaction itself - pub(crate) fn ancestor_txs_exclusive<'a, 'b: 'a>( - &'a self, - id: &'b TransactionId, - ) -> impl Iterator)> + '_ { - self.txs.range(..id).rev().take_while(|(other, _)| id.sender == other.sender) - } - - /// Returns all transactions that _follow_ after the given id but have the same sender. - /// - /// NOTE: The range is _exclusive_ - pub(crate) fn descendant_txs_exclusive<'a, 'b: 'a>( - &'a self, - id: &'b TransactionId, - ) -> impl Iterator)> + '_ { - self.txs.range((Excluded(id), Unbounded)).take_while(|(other, _)| id.sender == other.sender) - } /// Returns all transactions that _follow_ after the given id but have the same sender. /// /// NOTE: The range is _exclusive_ @@ -420,10 +387,19 @@ impl AllTransactions { /// Removes a transaction from the pool after it was mined. /// - /// This will not trigger additional updates since, because descendants without nonce gaps are - /// already in the pending pool. - pub(crate) fn remove_mined_tx(&mut self, _id: &TransactionId) { - // TODO decrease nonce gap + /// This will _not_ trigger additional updates, because descendants without nonce gaps are + /// already in the pending pool, and this transaction will be the first transaction of the + /// sender in this pool. + pub(crate) fn remove_mined_tx( + &mut self, + id: &TransactionId, + ) -> Option>> { + let tx = self.txs.remove(id)?; + + // decrement the counter for the sender. + self.tx_decr(tx.transaction.sender_id()); + + self.by_hash.remove(tx.transaction.hash()) } /// Inserts a new transaction into the pool. @@ -513,7 +489,7 @@ impl AllTransactions { } // The next transaction of this sender - let on_chain_id = TransactionId::new(transaction.sender_id, on_chain_nonce); + let on_chain_id = TransactionId::new(transaction.sender_id(), on_chain_nonce); { // Tracks the next nonce we expect if the transactions are gapless let mut next_nonce = on_chain_id.nonce; @@ -533,7 +509,7 @@ impl AllTransactions { tx.cumulative_cost = cumulative_cost; // Update for next transaction - cumulative_cost = tx.cumulative_cost + tx.transaction.cost; + cumulative_cost = tx.next_cumulative_cost(); if cumulative_cost > on_chain_balance { // sender lacks sufficient funds to pay for this transaction @@ -572,7 +548,7 @@ impl AllTransactions { /// Rechecks the transaction of the given sender and returns a set of updates. pub(crate) fn on_mined(&mut self, _sender: &SenderId, _new_balance: U256, _old_balance: U256) { - todo!() + todo!("ideally we want to process updates in bulk") } /// Number of transactions in the entire pool @@ -639,9 +615,13 @@ pub(crate) enum InsertResult { // === impl InsertResult === +// Some test helpers #[allow(missing_docs)] +#[cfg(test)] impl InsertResult { - fn is_underpriced(&self) -> bool { + /// Returns true if the result is underpriced variant + + pub(crate) fn is_underpriced(&self) -> bool { matches!(self, InsertResult::Underpriced { .. }) } } @@ -743,7 +723,7 @@ impl SenderInfo { #[cfg(test)] mod tests { use super::*; - use crate::test_util::{mock_tx_pool, MockTransaction, MockTransactionFactory}; + use crate::test_util::{MockTransaction, MockTransactionFactory}; #[test] fn test_simple_insert() { @@ -805,7 +785,7 @@ mod tests { let mut pool = AllTransactions::default(); let tx = MockTransaction::eip1559().inc_price().inc_limit(); let first = f.validated(tx.clone()); - let res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); + let _res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); let replacement = f.validated(tx.rng_hash().inc_price()); let res = pool.insert_tx(replacement.clone(), on_chain_balance, on_chain_nonce); match res { @@ -832,7 +812,7 @@ mod tests { let mut pool = AllTransactions::default(); let tx = MockTransaction::eip1559().inc_nonce().inc_price().inc_limit(); let first = f.validated(tx.clone()); - let res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); + let _res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); let first_in_pool = pool.get(first.id()).unwrap(); // has nonce gap @@ -868,7 +848,7 @@ mod tests { let mut pool = AllTransactions::default(); let tx = MockTransaction::eip1559().inc_nonce().set_gas_price(100u64.into()).inc_limit(); let first = f.validated(tx.clone()); - let res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); + let _res = pool.insert_tx(first.clone(), on_chain_balance, on_chain_nonce); let first_in_pool = pool.get(first.id()).unwrap(); // has nonce gap diff --git a/crates/transaction-pool/src/test_util.rs b/crates/transaction-pool/src/test_util/mock.rs similarity index 98% rename from crates/transaction-pool/src/test_util.rs rename to crates/transaction-pool/src/test_util/mock.rs index 852e574c4..0f32d3997 100644 --- a/crates/transaction-pool/src/test_util.rs +++ b/crates/transaction-pool/src/test_util/mock.rs @@ -1,5 +1,4 @@ -//! Internal helpers for testing. -#![allow(missing_docs, unused)] +//! Mock Types use crate::{ identifier::{SenderIdentifiers, TransactionId}, @@ -336,7 +335,6 @@ impl MockTransactionFactory { MockValidTx { propagate: false, is_local: false, - sender_id: transaction_id.sender, transaction_id, cost: transaction.cost(), transaction, diff --git a/crates/transaction-pool/src/test_util/mod.rs b/crates/transaction-pool/src/test_util/mod.rs new file mode 100644 index 000000000..05bd0d925 --- /dev/null +++ b/crates/transaction-pool/src/test_util/mod.rs @@ -0,0 +1,6 @@ +//! Internal helpers for testing. +#![allow(missing_docs, unused)] + +mod mock; + +pub use mock::*; diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index f07d88378..034bfce00 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -1,7 +1,7 @@ use crate::{error::PoolResult, validate::ValidPoolTransaction, BlockID}; use futures::channel::mpsc::Receiver; use reth_primitives::{Address, TxHash, H256, U256}; -use std::{fmt, hash::Hash, sync::Arc}; +use std::{fmt, sync::Arc}; /// General purpose abstraction fo a transaction-pool. /// @@ -61,6 +61,14 @@ pub trait TransactionPool: Send + Sync { &self, tx_hashes: &[TxHash], ) -> Vec>>; + + /// Returns if the transaction for the given hash is already included in this pool. + fn contains(&self, tx_hash: &TxHash) -> bool { + self.get(tx_hash).is_some() + } + + /// Returns the transaction for the given hash. + fn get(&self, tx_hash: &TxHash) -> Option>>; } /// Event fired when a new block was mined @@ -97,7 +105,7 @@ impl BestTransactions for std::iter::Empty { /// Trait for transaction types used inside the pool pub trait PoolTransaction: fmt::Debug + Send + Sync + 'static { - /// Hash of the transaction + /// Hash of the transaction. fn hash(&self) -> &TxHash; /// The Sender of the transaction. @@ -112,6 +120,8 @@ pub trait PoolTransaction: fmt::Debug + Send + Sync + 'static { fn cost(&self) -> U256; /// Returns the effective gas price for this transaction. + /// + /// This is `priority + basefee`for EIP-1559 and `gasPrice` for legacy transactions. fn effective_gas_price(&self) -> U256; /// Amount of gas that should be used in executing this transaction. This is paid up-front. diff --git a/crates/transaction-pool/src/validate.rs b/crates/transaction-pool/src/validate.rs index 4cbdf92e5..4b3891806 100644 --- a/crates/transaction-pool/src/validate.rs +++ b/crates/transaction-pool/src/validate.rs @@ -5,7 +5,7 @@ use crate::{ identifier::{SenderId, TransactionId}, traits::PoolTransaction, }; -use reth_primitives::{BlockID, TxHash, U256}; +use reth_primitives::{rpc::Address, BlockID, TxHash, U256}; use std::{fmt, time::Instant}; /// A Result type returned after checking a transaction's validity. @@ -19,9 +19,7 @@ pub enum TransactionValidationOutcome { /// Validated transaction. transaction: T, }, - /// The transaction is considered invalid. - /// - /// Note: This does not indicate whether the transaction will not be valid in the future + /// The transaction is considered invalid indefinitely. Invalid(T, PoolError), } @@ -57,8 +55,6 @@ pub struct ValidPoolTransaction { pub propagate: bool, /// Whether the tx is from a local source. pub is_local: bool, - /// Internal `Sender` identifier. - pub sender_id: SenderId, /// Total cost of the transaction: `feeCap x gasLimit + transferred_value`. pub cost: U256, /// Timestamp when this was added to the pool. @@ -73,23 +69,33 @@ impl ValidPoolTransaction { self.transaction.hash() } + /// Returns the address of the sender + pub fn sender(&self) -> &Address { + self.transaction.sender() + } + + /// Returns the internal identifier for the sender of this transaction + pub(crate) fn sender_id(&self) -> SenderId { + self.transaction_id.sender + } + /// Returns the internal identifier for this transaction. pub(crate) fn id(&self) -> &TransactionId { &self.transaction_id } /// Returns the nonce set for this transaction. - pub(crate) fn nonce(&self) -> u64 { + pub fn nonce(&self) -> u64 { self.transaction.nonce() } /// Returns the EIP-1559 Max base fee the caller is willing to pay. - pub(crate) fn max_fee_per_gas(&self) -> Option { + pub fn max_fee_per_gas(&self) -> Option { self.transaction.max_fee_per_gas() } /// Amount of gas that should be used in executing this transaction. This is paid up-front. - pub(crate) fn gas_limit(&self) -> u64 { + pub fn gas_limit(&self) -> u64 { self.transaction.gas_limit() } @@ -107,7 +113,6 @@ impl Clone for ValidPoolTransaction { transaction_id: self.transaction_id, propagate: self.propagate, is_local: self.is_local, - sender_id: self.sender_id, cost: self.cost, timestamp: self.timestamp, }