feat(txpool): initial sketch (#7)

This commit is contained in:
Matthias Seitz
2022-10-07 22:00:19 +02:00
committed by GitHub
parent 95ed994fd1
commit 791ee2839f
18 changed files with 2173 additions and 0 deletions

33
Cargo.lock generated
View File

@ -503,6 +503,7 @@ checksum = "7f21eda599937fba36daeb58a22e8f5cee2d14c4a17b5b7739c7c8e5e3b8230c"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
@ -525,6 +526,17 @@ version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e5aa3de05362c3fb88de6531e6296e85cde7739cccad4b9dfeeb7f6ebce56bf"
[[package]]
name = "futures-executor"
version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff63c23854bee61b6e9cd331d523909f238fc7636290b96826e9cfa5faa00ab"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.24"
@ -975,6 +987,12 @@ dependencies = [
"thiserror",
]
[[package]]
name = "linked-hash-map"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"
[[package]]
name = "lock_api"
version = "0.4.9"
@ -1494,6 +1512,21 @@ dependencies = [
"tracing-futures",
]
[[package]]
name = "reth-transaction-pool"
version = "0.1.0"
dependencies = [
"async-trait",
"fnv",
"futures",
"linked-hash-map",
"parking_lot",
"reth-primitives",
"serde",
"thiserror",
"tracing",
]
[[package]]
name = "revm"
version = "2.1.0"

View File

@ -15,6 +15,8 @@ members = [
"crates/net/rpc-types",
"crates/primitives",
"crates/stages",
"crates/transaction-pool",
"crates/db"
]
[dependencies]

View File

@ -0,0 +1,27 @@
[package]
name = "reth-transaction-pool"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/foundry-rs/reth"
readme = "README.md"
description = """
Transaction pool implementation
"""
[dependencies]
# eth
reth-primitives = { path = "../primitives" }
# async/futures
async-trait = "0.1"
futures = "0.3"
parking_lot = "0.12"
# misc
thiserror = "1.0"
tracing = "0.1"
serde = { version = "1.0", features = ["derive"] }
linked-hash-map = "0.5"
fnv = "1.0.7"

View File

@ -0,0 +1,24 @@
//! Provides access to the chain's storage
use crate::{
error::{PoolError, PoolResult},
validate::TransactionValidator,
};
use reth_primitives::{BlockID, U64};
/// The interface used to interact with the blockchain and access storage.
#[async_trait::async_trait]
pub trait PoolClient: Send + Sync + TransactionValidator {
/// Error type that can be converted to the crate's internal Error.
type Error: Into<PoolError>;
/// Returns the block number for the given block identifier.
fn convert_block_id(&self, block_id: &BlockID) -> PoolResult<Option<U64>>;
/// Same as [`PoolClient::convert_block_id()`] but returns an error if no matching block number
/// was found
fn ensure_block_number(&self, block_id: &BlockID) -> PoolResult<U64> {
self.convert_block_id(block_id)
.and_then(|number| number.ok_or(PoolError::BlockNumberNotFound(*block_id)))
}
}

View File

@ -0,0 +1,13 @@
///! Configuration options for the Transaction pool.
#[derive(Debug, Clone)]
pub struct PoolConfig {
// TODO add limits for subpools
// TODO limits for per peer
// TODO config whether to check if transactions are banned
}
impl Default for PoolConfig {
fn default() -> Self {
todo!()
}
}

View File

@ -0,0 +1,23 @@
//! Transaction pool errors
use reth_primitives::BlockID;
/// Transaction pool result type.
pub type PoolResult<T> = Result<T, PoolError>;
/// All errors the Transaction pool can throw.
#[derive(Debug, thiserror::Error)]
pub enum PoolError {
/// Thrown if a replacement transaction's gas price is below the already imported transaction
#[error("Tx: insufficient gas price to replace existing transaction")]
ReplacementUnderpriced,
/// Encountered a transaction that was already added into the poll
#[error("[{0:?}] Already added")]
AlreadyAdded(Box<dyn std::any::Any + Send + Sync>),
/// Encountered a cycle in the graph pool
#[error("Transaction with cyclic dependent transactions")]
CyclicTransaction,
/// Thrown if no number was found for the given block id
#[error("Invalid block id: {0:?}")]
BlockNumberNotFound(BlockID),
}

View File

@ -0,0 +1,95 @@
use crate::U256;
use fnv::FnvHashMap;
use reth_primitives::Address;
use std::collections::HashMap;
/// An internal mapping of addresses.
///
/// This assigns a _unique_ `SenderId` for a new `Address`.
#[derive(Debug)]
pub struct SenderIdentifiers {
/// The identifier to use next.
id: u64,
/// Assigned `SenderId` for an `Address`.
address_to_id: HashMap<Address, SenderId>,
/// Reverse mapping of `SenderId` to `Address`.
sender_to_address: FnvHashMap<SenderId, Address>,
}
impl SenderIdentifiers {
/// Returns the address for the given identifier.
pub fn address(&self, id: &SenderId) -> Option<&Address> {
self.sender_to_address.get(id)
}
/// Returns the `SenderId` that belongs to the given address, if it exists
pub fn sender_id(&self, addr: &Address) -> Option<SenderId> {
self.address_to_id.get(addr).copied()
}
/// Returns the existing `SendId` or assigns a new one if it's missing
pub fn sender_id_or_create(&mut self, addr: Address) -> SenderId {
if let Some(id) = self.sender_id(&addr) {
return id
}
let id = self.next_id();
self.address_to_id.insert(addr, id);
self.sender_to_address.insert(id, addr);
id
}
/// Returns a new address
fn next_id(&mut self) -> SenderId {
let id = self.id;
self.id = self.id.wrapping_add(1);
SenderId(id)
}
}
/// A _unique_ identifier for a sender of an address.
///
/// This is the identifier of an internal `address` mapping that is valid in the context of this
/// program.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct SenderId(u64);
/// A unique identifier of a transaction of a Sender.
///
/// This serves as an identifier for dependencies of a transaction:
/// A transaction with a nonce higher than the current state nonce depends on `tx.nonce - 1`.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct TransactionId {
/// Sender of this transaction
pub sender: SenderId,
/// Nonce of this transaction
pub nonce: u64,
}
// === impl TransactionId ===
impl TransactionId {
/// Create a new identifier pair
pub fn new(sender: SenderId, nonce: u64) -> Self {
Self { sender, nonce }
}
/// Returns the id a transactions depends on
///
/// This returns `transaction_nonce - 1` if `transaction_nonce` is higher than the
/// `on_chain_none`
pub fn dependency(
transaction_nonce: u64,
on_chain_nonce: u64,
sender: SenderId,
) -> Option<TransactionId> {
if transaction_nonce == on_chain_nonce {
return None
}
let prev_nonce = transaction_nonce.saturating_sub(1);
if on_chain_nonce <= prev_nonce {
Some(Self::new(sender, prev_nonce))
} else {
None
}
}
}

View File

@ -0,0 +1,114 @@
#![warn(missing_docs)] // unreachable_pub, missing_debug_implementations
#![allow(unused)] // TODO(mattsse) remove after progress was made
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Reth's transaction pool implementation
use futures::channel::mpsc::Receiver;
use parking_lot::Mutex;
use reth_primitives::{BlockID, TxHash, U256, U64};
use std::sync::Arc;
mod client;
mod config;
pub mod error;
mod identifier;
mod ordering;
pub mod pool;
mod traits;
mod validate;
pub use crate::{
client::PoolClient,
config::PoolConfig,
ordering::TransactionOrdering,
pool::BasicPool,
traits::{BestTransactions, NewBlockEvent, PoolTransaction, TransactionPool},
validate::{TransactionValidationOutcome, TransactionValidator},
};
use crate::{error::PoolResult, validate::ValidPoolTransaction};
/// A generic, customizable `TransactionPool` implementation.
pub struct Pool<P: PoolClient, T: TransactionOrdering> {
/// The actual transaction pool where transactions and subscriptions are handled.
pool: BasicPool<P, T>,
/// Tracks status updates linked to chain events.
update_status: Arc<Mutex<UpdateStatus>>,
/// Chain/Storage access.
client: Arc<P>,
}
// === impl Pool ===
impl<P, T> Pool<P, T>
where
P: PoolClient,
T: TransactionOrdering<Transaction = <P as TransactionValidator>::Transaction>,
{
/// Creates a new `Pool` with the given config and client and ordering.
pub fn new(client: Arc<P>, ordering: Arc<T>, config: PoolConfig) -> Self {
let pool = BasicPool::new(Arc::clone(&client), ordering, config);
Self { pool, update_status: Arc::new(Default::default()), client }
}
}
/// implements the `TransactionPool` interface for various transaction pool API consumers.
#[async_trait::async_trait]
impl<P, T> TransactionPool for Pool<P, T>
where
P: PoolClient,
T: TransactionOrdering<Transaction = <P as TransactionValidator>::Transaction>,
{
type Transaction = T::Transaction;
async fn on_new_block(&self, _event: NewBlockEvent) {
// TODO perform maintenance: update pool accordingly
todo!()
}
async fn add_transaction(
&self,
block_id: BlockID,
transaction: Self::Transaction,
) -> PoolResult<TxHash> {
self.pool.clone().add_transaction(&block_id, transaction).await
}
async fn add_transactions(
&self,
block_id: BlockID,
transactions: Vec<Self::Transaction>,
) -> PoolResult<Vec<PoolResult<TxHash>>> {
self.pool.clone().add_transactions(&block_id, transactions).await
}
fn ready_transactions_listener(&self) -> Receiver<TxHash> {
self.pool.ready_transactions_listener()
}
fn best_transactions(
&self,
) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
Box::new(self.pool.inner().ready_transactions())
}
fn remove_invalid(
&self,
_tx_hashes: &[TxHash],
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>> {
todo!()
}
}
/// Tracks the current update status of the pool.
#[derive(Debug, Clone, Default)]
struct UpdateStatus {
/// Block number when the pool was last updated.
updated_at: U64,
/// Current base fee that needs to be enforced
base_fee: U256,
}

View File

@ -0,0 +1,20 @@
use crate::traits::PoolTransaction;
use std::fmt;
/// Transaction ordering.
///
/// Decides how transactions should be ordered within the pool.
///
/// The returned priority must reflect natural `Ordering`.
// TODO: for custom, more advanced scoring it would be ideal to determine the priority in the
// context of the entire pool instead of standalone by alone looking at a single transaction
pub trait TransactionOrdering: Send + Sync + 'static {
/// Priority of a transaction.
type Priority: Ord + Clone + Default + fmt::Debug + Send + Sync;
/// The transaction type to score.
type Transaction: PoolTransaction + Send + Sync + 'static;
/// Returns the priority score for the given transaction.
fn priority(&self, transaction: &Self::Transaction) -> Self::Priority;
}

View File

@ -0,0 +1,24 @@
use reth_primitives::H256;
use serde::{Deserialize, Serialize};
/// Various events that describe status changes of a transaction.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub enum TransactionEvent<Hash> {
/// Transaction has been added to the ready queue.
Ready,
/// Transaction has been added to the pending pool.
Pending,
/// Transaction has been added to the queued pool.
Queued,
/// Transaction has been included in the block belonging to this hash.
Included(H256),
/// Transaction has been replaced by the transaction belonging to the hash.
///
/// E.g. same (sender + nonce) pair
Replaced(Hash),
/// Transaction was dropped due to configured limits.
Dropped,
/// Transaction became invalid indefinitely.
Invalid,
// TODO Timedout?, broadcasted(peers)
}

View File

@ -0,0 +1,89 @@
//! Listeners for the transaction-pool
use crate::pool::events::TransactionEvent;
use futures::channel::mpsc::UnboundedSender;
use std::{collections::HashMap, hash};
type EventSink<Hash> = UnboundedSender<TransactionEvent<Hash>>;
/// Transaction pool event listeners.
pub(crate) struct PoolEventListener<Hash: hash::Hash + Eq> {
/// All listeners for certain transaction events.
listeners: HashMap<Hash, PoolEventNotifier<Hash>>,
}
impl<Hash: hash::Hash + Eq + Clone> PoolEventListener<Hash> {
/// Calls the notification callback with the `PoolEventListenerSender` that belongs to the hash.
fn notify_with<F>(&mut self, hash: &Hash, callback: F)
where
F: FnOnce(&mut PoolEventNotifier<Hash>),
{
let is_done = if let Some(sink) = self.listeners.get_mut(hash) {
callback(sink);
sink.is_done()
} else {
false
};
if is_done {
self.listeners.remove(hash);
}
}
/// Notify listeners about a transaction that was added to the ready queue.
pub(crate) fn ready(&mut self, tx: &Hash, replaced: Option<&Hash>) {
self.notify_with(tx, |notifier| notifier.ready());
if let Some(replaced) = replaced {
// notify listeners that this transaction was replaced
self.notify_with(replaced, |notifier| notifier.replaced(tx.clone()));
}
}
/// Notify listeners about a transaction that was added to the queued pool.
pub(crate) fn queued(&mut self, tx: &Hash) {
self.notify_with(tx, |notifier| notifier.queued());
}
}
impl<Hash: hash::Hash + Eq> Default for PoolEventListener<Hash> {
fn default() -> Self {
Self { listeners: Default::default() }
}
}
/// Sender half(s) of the event channels for a specific transaction
#[derive(Debug)]
struct PoolEventNotifier<Hash> {
/// Tracks whether the transaction this notifier can stop because the transaction was
/// completed, or removed.
is_done: bool,
/// Corresponding sender half(s) for event listener channel
senders: Vec<EventSink<Hash>>,
}
impl<Hash: Clone> PoolEventNotifier<Hash> {
fn notify(&mut self, event: TransactionEvent<Hash>) {
self.senders.retain(|sender| sender.unbounded_send(event.clone()).is_ok())
}
fn is_done(&self) -> bool {
self.senders.is_empty() || self.is_done
}
/// Transaction became ready.
fn ready(&mut self) {
self.notify(TransactionEvent::Pending)
}
/// Transaction was moved to the queued pool
fn queued(&mut self) {
self.notify(TransactionEvent::Queued)
}
/// Transaction was replaced with the given transaction
fn replaced(&mut self, hash: Hash) {
self.notify(TransactionEvent::Replaced(hash));
self.is_done = true;
}
}

View File

@ -0,0 +1,655 @@
//! Transaction Pool internals.
//!
//! Incoming transactions are validated first. The validation outcome can have 3 states:
//!
//! 1. Transaction can _never_ be valid
//! 2. Transaction is _currently_ valid
//! 3. Transaction is _currently_ invalid, but could potentially become valid in the future
//!
//! However, (2.) and (3.) of a transaction can only be determined on the basis of the current
//! state, whereas (1.) holds indefinitely. This means once the state changes (2.) and (3.) need to
//! be reevaluated again.
//!
//! The transaction pool is responsible for storing new, valid transactions and providing the next
//! best transactions sorted by their priority. Where priority is determined by the transaction's
//! score.
//!
//! However, the score is also only valid for the current state.
//!
//! Furthermore, the following characteristics fall under (3.):
//!
//! a) Nonce of a transaction is higher than the expected nonce for the next transaction of its
//! sender. A distinction is made here whether multiple transactions from the same sender have
//! gapless nonce increments. a)(1) If _no_ transaction is missing in a chain of multiple
//! transactions from the same sender (all nonce in row), all of them can in principle be executed
//! on the current state one after the other. a)(2) If there's a nonce gap, then all
//! transactions after the missing transaction are blocked until the missing transaction arrives.
//! b) Transaction does not meet the dynamic fee cap requirement introduced by EIP-1559: The fee
//! cap of the transaction needs to be no less than the base fee of block.
//!
//!
//! In essence the transaction pool is made of two separate sub-pools:
//!
//! _Pending Pool_: Contains all transactions that are valid on the current state and satisfy
//! (3. a)(1): _No_ nonce gaps _Queued Pool_: Contains all transactions that are currently
//! blocked by missing transactions: (3. a)(2): _With_ nonce gaps
//!
//! To account for the dynamic base fee requirement (3. b) which could render an EIP-1559 and all
//! subsequent transactions of the sender currently invalid, the pending pool itself consists of two
//! queues:
//!
//! _Ready Queue_: Contains all transactions that can be executed on the current state
//! _Parked Queue_: Contains all transactions that either do not currently meet the dynamic
//! base fee requirement or are blocked by a previous transaction that violates it.
//!
//! The classification of transaction in which queue it belongs depends on the current base fee and
//! must be updated after changes:
//!
//! - Base Fee increases: recheck the _Ready Queue_ and evict transactions that don't satisfy
//! the new base fee, or depend on a transaction that no longer satisfies it, and move them
//! to the _Parked Queue_.
//! - Base Fee decreases: recheck the _Parked Queue_ and move all transactions that now satisfy
//! the new base fee to the _Ready Queue_.
//!
//!
//! Depending on the use case, consumers of the [`TransactionPool`](crate::traits::TransactionPool)
//! are interested in (2.) and/or (3.).
//! A generic [`TransactionPool`](crate::traits::TransactionPool) that only handles transactions.
//!
//! This Pool maintains two separate sub-pools for (2.) and (3.)
//!
//! ## Terminology
//!
//! - _Pending_: pending transactions are transactions that fall under (2.). Those transactions
//! are _currently_ ready to be executed and are stored in the `pending` sub-pool
//! - _Queued_: queued transactions are transactions that fall under category (3.). Those
//! transactions are _currently_ waiting for state changes that eventually move them into
//! category (2.) and become pending.
use crate::{
error::{PoolError, PoolResult},
pool::{
listener::PoolEventListener,
pending::PendingTransactions,
queued::{QueuedPoolTransaction, QueuedTransactions},
},
traits::PoolTransaction,
validate::ValidPoolTransaction,
BlockID, PoolClient, PoolConfig, TransactionOrdering, TransactionValidator, U256,
};
use fnv::FnvHashMap;
use futures::channel::mpsc::{channel, Receiver, Sender};
use parking_lot::{Mutex, RwLock};
use reth_primitives::{TxHash, U64};
use std::{
collections::{HashMap, VecDeque},
fmt,
sync::Arc,
};
use tracing::{debug, trace, warn};
mod events;
mod listener;
mod pending;
mod queued;
mod transaction;
use crate::{
identifier::{SenderId, TransactionId},
validate::TransactionValidationOutcome,
};
pub use events::TransactionEvent;
pub use pending::TransactionsIterator;
/// Shareable Transaction pool.
pub struct BasicPool<P: PoolClient, T: TransactionOrdering> {
/// Arc'ed instance of the pool internals
pool: Arc<PoolInner<P, T>>,
}
// === impl Pool ===
impl<P: PoolClient, T: TransactionOrdering> BasicPool<P, T>
where
P: PoolClient,
T: TransactionOrdering<Transaction = <P as TransactionValidator>::Transaction>,
{
/// Create a new transaction pool instance.
pub fn new(client: Arc<P>, ordering: Arc<T>, config: PoolConfig) -> Self {
Self { pool: Arc::new(PoolInner::new(client, ordering, config)) }
}
/// Returns the wrapped pool
pub(crate) fn inner(&self) -> &PoolInner<P, T> {
&self.pool
}
/// Returns the actual block number for the block id
fn resolve_block_number(&self, block_id: &BlockID) -> PoolResult<U64> {
self.pool.client().ensure_block_number(block_id)
}
/// Add a single _unverified_ transaction into the pool.
pub async fn add_transaction(
&self,
block_id: &BlockID,
transaction: P::Transaction,
) -> PoolResult<TxHash> {
self.add_transactions(block_id, Some(transaction))
.await?
.pop()
.expect("transaction exists; qed")
}
/// Adds all given transactions into the pool
pub async fn add_transactions(
&self,
block_id: &BlockID,
transactions: impl IntoIterator<Item = P::Transaction>,
) -> PoolResult<Vec<PoolResult<TxHash>>> {
let validated = self.validate_all(block_id, transactions).await?;
let transactions = self.pool.add_transactions(validated.into_values());
Ok(transactions)
}
/// Returns future that validates all transaction in the given iterator at the block the
/// `block_id` points to.
async fn validate_all(
&self,
block_id: &BlockID,
transactions: impl IntoIterator<Item = P::Transaction>,
) -> PoolResult<HashMap<TxHash, TransactionValidationOutcome<P::Transaction>>> {
// get the actual block number which is required to validate the transactions
let block_number = self.resolve_block_number(block_id)?;
let outcome = futures::future::join_all(
transactions.into_iter().map(|tx| self.validate(block_id, block_number, tx)),
)
.await
.into_iter()
.collect::<HashMap<_, _>>();
Ok(outcome)
}
/// Validates the given transaction at the given block
async fn validate(
&self,
block_id: &BlockID,
_block_number: U64,
transaction: P::Transaction,
) -> (TxHash, TransactionValidationOutcome<P::Transaction>) {
let _hash = *transaction.hash();
// TODO this is where additional validate checks would go, like banned senders etc...
let _res = self.pool.client().validate_transaction(block_id, transaction).await;
// TODO blockstamp the transaction
todo!()
}
/// Registers a new transaction listener and returns the receiver stream.
pub fn ready_transactions_listener(&self) -> Receiver<TxHash> {
self.pool.add_ready_listener()
}
}
impl<P: PoolClient, O: TransactionOrdering> Clone for BasicPool<P, O> {
fn clone(&self) -> Self {
Self { pool: Arc::clone(&self.pool) }
}
}
/// Transaction pool internals.
pub struct PoolInner<P: PoolClient, T: TransactionOrdering> {
/// Chain/Storage access.
client: Arc<P>,
/// The internal pool that manages
pool: RwLock<GraphPool<T>>,
/// Pool settings.
config: PoolConfig,
/// Manages listeners for transaction state change events.
event_listener: RwLock<PoolEventListener<TxHash>>,
/// Listeners for new ready transactions.
ready_transaction_listener: Mutex<Vec<Sender<TxHash>>>,
}
// === impl PoolInner ===
impl<P: PoolClient, T: TransactionOrdering> PoolInner<P, T>
where
P: PoolClient,
T: TransactionOrdering<Transaction = <P as TransactionValidator>::Transaction>,
{
/// Create a new transaction pool instance.
pub fn new(client: Arc<P>, ordering: Arc<T>, config: PoolConfig) -> Self {
Self {
client,
config,
event_listener: Default::default(),
pool: RwLock::new(GraphPool::new(ordering)),
ready_transaction_listener: Default::default(),
}
}
/// Updates the pool
pub(crate) fn update_base_fee(&self, base_fee: U256) {
self.pool.write().update_base_fee(base_fee);
}
/// Get client reference.
pub fn client(&self) -> &P {
&self.client
}
/// Adds a new transaction listener to the pool that gets notified about every new ready
/// transaction
pub fn add_ready_listener(&self) -> Receiver<TxHash> {
const TX_LISTENER_BUFFER_SIZE: usize = 2048;
let (tx, rx) = channel(TX_LISTENER_BUFFER_SIZE);
self.ready_transaction_listener.lock().push(tx);
rx
}
/// Resubmits transactions back into the pool.
pub fn resubmit(&self, _transactions: HashMap<TxHash, ValidPoolTransaction<T::Transaction>>) {
unimplemented!()
}
/// Add a single validated transaction into the pool.
fn add_transaction(
&self,
tx: TransactionValidationOutcome<T::Transaction>,
) -> PoolResult<TxHash> {
match tx {
TransactionValidationOutcome::Valid { balance, state_nonce, transaction } => {
// TODO create `ValidPoolTransaction`
// let added = self.pool.write().add_transaction(tx)?;
//
// if let Some(ready) = added.as_ready() {
// self.on_new_ready_transaction(ready);
// }
//
// self.notify_event_listeners(&added);
//
// Ok(*added.hash())
todo!()
}
TransactionValidationOutcome::Invalid(_tx, err) => {
// TODO notify listeners about invalid
Err(err)
}
}
}
/// Adds all transactions in the iterator to the pool, returning a list of results.
pub fn add_transactions(
&self,
transactions: impl IntoIterator<Item = TransactionValidationOutcome<T::Transaction>>,
) -> Vec<PoolResult<TxHash>> {
// TODO check pool limits
transactions.into_iter().map(|tx| self.add_transaction(tx)).collect::<Vec<_>>()
}
/// Notify all listeners about the new transaction.
fn on_new_ready_transaction(&self, ready: &TxHash) {
let mut transaction_listeners = self.ready_transaction_listener.lock();
transaction_listeners.retain_mut(|listener| match listener.try_send(*ready) {
Ok(()) => true,
Err(e) => {
if e.is_full() {
warn!(
target: "txpool",
"[{:?}] dropping full ready transaction listener",
ready,
);
true
} else {
false
}
}
});
}
/// Fire events for the newly added transaction.
fn notify_event_listeners(&self, tx: &AddedTransaction<T::Transaction>) {
let mut listener = self.event_listener.write();
match tx {
AddedTransaction::Pending(tx) => {
listener.ready(&tx.hash, None);
// TODO more listeners for discarded, removed etc...
}
AddedTransaction::Queued { hash } => {
listener.queued(hash);
}
}
}
/// Returns an iterator that yields transactions that are ready to be included in the block.
pub(crate) fn ready_transactions(&self) -> TransactionsIterator<T> {
self.pool.read().ready_transactions()
}
}
/// A pool that only manages transactions.
///
/// This pool maintains a dependency graph of transactions and provides the currently ready
/// transactions.
pub struct GraphPool<T: TransactionOrdering> {
/// How to order transactions.
ordering: Arc<T>,
/// Contains the currently known info
sender_info: FnvHashMap<SenderId, SenderInfo>,
/// Sub-Pool of transactions that are ready and waiting to be executed
pending: PendingTransactions<T>,
/// Sub-Pool of transactions that are waiting for state changes that eventually turn them
/// valid, so they can be moved in the `pending` pool.
queued: QueuedTransactions<T>,
}
// === impl PoolInner ===
impl<T: TransactionOrdering> GraphPool<T> {
/// Create a new graph pool instance.
pub fn new(ordering: Arc<T>) -> Self {
let pending = PendingTransactions::new(Arc::clone(&ordering));
Self { ordering, sender_info: Default::default(), pending, queued: Default::default() }
}
/// Updates the pool based on the changed base fee.
///
/// This enforces the dynamic fee requirement.
/// If the `new_base_fee` is _higher_ than previous base fee, all EIP-1559 transactions in the
/// ready queue that now violate the dynamic fee requirement need to parked.
/// If the `new_base_fee` is _lower_ than the previous base fee, all parked transactions that
/// now satisfy the dynamic fee requirement need to moved to the ready queue.
pub(crate) fn update_base_fee(&mut self, new_base_fee: U256) {
let _old_base_fee = self.pending.set_next_base_fee(new_base_fee);
// TODO update according to the changed base_fee
todo!()
}
/// Returns if the transaction for the given hash is already included in this pool
pub(crate) fn contains(&self, tx_hash: &TxHash) -> bool {
self.queued.contains(tx_hash) || self.pending.contains(tx_hash)
}
/// Returns an iterator that yields transactions that are ready to be included in the block.
pub(crate) fn ready_transactions(&self) -> TransactionsIterator<T> {
self.pending.get_transactions()
}
/// Adds the transaction into the pool
///
/// This pool consists of two sub-pools: `Queued` and `Pending`.
///
/// The `Queued` pool contains transaction with gaps in its dependency tree: It requires
/// additional transaction that are note yet present in the pool.
///
/// The `Pending` pool contains all transactions that have all their dependencies satisfied (no
/// nonce gaps). It consists of two parts: `Parked` and `Ready`.
///
/// The `Ready` queue contains transactions that are ready to be included in the pending block.
/// With EIP-1559, transactions can become executable or not without any changes to the
/// sender's balance or nonce and instead their feeCap determines whether the transaction is
/// _currently_ (on the current state) ready or needs to be parked until the feeCap satisfies
/// the block's baseFee.
fn add_transaction(
&mut self,
tx: ValidPoolTransaction<T::Transaction>,
) -> PoolResult<AddedTransaction<T::Transaction>> {
if self.contains(tx.hash()) {
warn!(target: "txpool", "[{:?}] Already added", tx.hash());
return Err(PoolError::AlreadyAdded(Box::new(*tx.hash())))
}
let tx = QueuedPoolTransaction::new(tx, self.pending.provided_dependencies());
trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash(), tx);
// If all ids are not satisfied import to queued
if !tx.is_satisfied() {
let hash = *tx.transaction.hash();
self.queued.add_transaction(tx)?;
return Ok(AddedTransaction::Queued { hash })
}
self.add_pending_transaction(tx)
}
/// Adds the transaction to the pending pool.
///
/// This will also move all transaction that get unlocked by the dependency id this transaction
/// provides from the queued pool into the pending pool.
///
/// CAUTION: this expects that transaction's dependencies are fully satisfied
fn add_pending_transaction(
&mut self,
tx: QueuedPoolTransaction<T>,
) -> PoolResult<AddedTransaction<T::Transaction>> {
let hash = *tx.transaction.hash();
trace!(target: "txpool", "adding pending transaction [{:?}]", hash);
let mut pending = AddedPendingTransaction::new(hash);
// tracks all transaction that can be moved to the pending pool, starting the given
// transaction
let mut pending_transactions = VecDeque::from([tx]);
// tracks whether we're processing the given `tx`
let mut is_new_tx = true;
// take first transaction from the list
while let Some(current_tx) = pending_transactions.pop_front() {
// also add the transaction that the current transaction unlocks
pending_transactions
.extend(self.queued.satisfy_and_unlock(&current_tx.transaction.transaction_id));
let current_hash = *current_tx.transaction.hash();
// try to add the transaction to the ready pool
match self.pending.add_transaction(current_tx) {
Ok(replaced_transactions) => {
if !is_new_tx {
pending.promoted.push(current_hash);
}
// tx removed from ready pool
pending.removed.extend(replaced_transactions);
}
Err(err) => {
// failed to add transaction
if is_new_tx {
debug!(target: "txpool", "[{:?}] Failed to add tx: {:?}", current_hash,
err);
return Err(err)
} else {
pending.discarded.push(current_hash);
}
}
}
is_new_tx = false;
}
// check for a cycle where importing a transaction resulted in pending transactions to be
// added while removing current transaction. in which case we move this transaction back to
// the pending queue
if pending.removed.iter().any(|tx| *tx.hash() == hash) {
self.pending.clear_transactions(&pending.promoted);
return Err(PoolError::CyclicTransaction)
}
Ok(AddedTransaction::Pending(pending))
}
/// Prunes the transactions that provide the given dependencies.
///
/// This will effectively remove those transactions that satisfy the dependencies.
/// And queued transactions might get promoted if the pruned dependencies unlock them.
pub fn prune_transactions(
&mut self,
dependencies: impl IntoIterator<Item = TransactionId>,
) -> PruneResult<T::Transaction> {
let mut imports = vec![];
let mut pruned = vec![];
for dependency in dependencies {
// mark as satisfied and store the transactions that got unlocked
imports.extend(self.queued.satisfy_and_unlock(&dependency));
// prune transactions
pruned.extend(self.pending.remove_mined(dependency));
}
let mut promoted = vec![];
let mut failed = vec![];
for tx in imports {
let hash = *tx.transaction.hash();
match self.add_pending_transaction(tx) {
Ok(res) => promoted.push(res),
Err(e) => {
warn!(target: "txpool", "Failed to promote tx [{:?}] : {:?}", hash, e);
failed.push(hash)
}
}
}
PruneResult { pruned, failed, promoted }
}
/// Remove the given transactions from the pool.
pub fn remove_invalid(
&mut self,
tx_hashes: Vec<TxHash>,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
// early exit in case there is no invalid transactions.
if tx_hashes.is_empty() {
return vec![]
}
trace!(target: "txpool", "Removing invalid transactions: {:?}", tx_hashes);
let mut removed = self.pending.remove_with_dependencies(tx_hashes.clone(), None);
removed.extend(self.queued.remove(tx_hashes));
trace!(target: "txpool", "Removed invalid transactions: {:?}", removed);
removed
}
/// Returns the current size of the entire pool
pub fn size_of(&self) -> usize {
unimplemented!()
}
/// Ensures that the transactions in the sub-pools are within the given bounds.
///
/// If the current size exceeds the given bounds, the worst transactions are evicted from the
/// pool and returned.
pub fn enforce_size_limits(&mut self) {
unimplemented!()
}
}
/// Represents the outcome of a prune
pub struct PruneResult<T: PoolTransaction> {
/// a list of added transactions that a pruned marker satisfied
pub promoted: Vec<AddedTransaction<T>>,
/// all transactions that failed to be promoted and now are discarded
pub failed: Vec<TxHash>,
/// all transactions that were pruned from the ready pool
pub pruned: Vec<Arc<ValidPoolTransaction<T>>>,
}
impl<T: PoolTransaction> fmt::Debug for PruneResult<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "PruneResult {{ ")?;
write!(
fmt,
"promoted: {:?}, ",
self.promoted.iter().map(|tx| *tx.hash()).collect::<Vec<_>>()
)?;
write!(fmt, "failed: {:?}, ", self.failed)?;
write!(
fmt,
"pruned: {:?}, ",
self.pruned.iter().map(|tx| *tx.transaction.hash()).collect::<Vec<_>>()
)?;
write!(fmt, "}}")?;
Ok(())
}
}
/// Tracks an added transaction and all graph changes caused by adding it.
#[derive(Debug, Clone)]
pub struct AddedPendingTransaction<T: PoolTransaction> {
/// the hash of the submitted transaction
hash: TxHash,
/// transactions promoted to the ready queue
promoted: Vec<TxHash>,
/// transaction that failed and became discarded
discarded: Vec<TxHash>,
/// Transactions removed from the Ready pool
removed: Vec<Arc<ValidPoolTransaction<T>>>,
}
impl<T: PoolTransaction> AddedPendingTransaction<T> {
/// Create a new, empty transaction.
fn new(hash: TxHash) -> Self {
Self {
hash,
promoted: Default::default(),
discarded: Default::default(),
removed: Default::default(),
}
}
}
/// Stores relevant context about a sender.
#[derive(Debug, Clone)]
struct SenderInfo {
/// current nonce of the sender
state_nonce: u64,
/// Balance of the sender at the current point.
balance: U256,
/// How many transactions of this sender are currently in the pool.
num_transactions: u64,
}
// === impl SenderInfo ===
impl SenderInfo {
/// Creates a new entry for an incoming, not yet tracked sender.
fn new_incoming(state_nonce: u64, balance: U256) -> Self {
Self { state_nonce, balance, num_transactions: 1 }
}
}
/// Represents a transaction that was added into the pool and its state
#[derive(Debug, Clone)]
pub enum AddedTransaction<T: PoolTransaction> {
/// Transaction was successfully added and moved to the pending pool.
Pending(AddedPendingTransaction<T>),
/// Transaction was successfully added but not yet queued for processing and moved to the
/// queued pool instead.
Queued {
/// the hash of the submitted transaction
hash: TxHash,
},
}
impl<T: PoolTransaction> AddedTransaction<T> {
/// Returns the hash of the transaction if it's ready
pub fn as_ready(&self) -> Option<&TxHash> {
if let AddedTransaction::Pending(tx) = self {
Some(&tx.hash)
} else {
None
}
}
/// Returns the hash of the transaction
pub fn hash(&self) -> &TxHash {
match self {
AddedTransaction::Pending(tx) => &tx.hash,
AddedTransaction::Queued { hash } => hash,
}
}
}

View File

@ -0,0 +1,606 @@
use crate::{
error::PoolResult, identifier::TransactionId, pool::queued::QueuedPoolTransaction,
traits::BestTransactions, validate::ValidPoolTransaction, TransactionOrdering,
};
use parking_lot::RwLock;
use reth_primitives::{TxHash, U256};
use std::{
cmp::Ordering,
collections::{BTreeSet, HashMap, HashSet},
sync::Arc,
};
use tracing::debug;
/// Type alias for replaced transactions
pub(crate) type ReplacedTransactions<T> =
(Vec<Arc<ValidPoolTransaction<<T as TransactionOrdering>::Transaction>>>, Vec<TxHash>);
/// A pool of validated transactions that are ready on the current state and are waiting to be
/// included in a block.
///
/// Each transaction in this pool is valid on its own, i.e. they are not dependent on transaction
/// that must be executed first. Each of these transaction can be executed independently on the
/// current state
pub(crate) struct PendingTransactions<T: TransactionOrdering> {
/// Keeps track of transactions inserted in the pool.
///
/// This way we can determine when transactions where submitted to the pool.
id: u64,
/// How to order transactions.
ordering: Arc<T>,
/// Base fee of the next block.
pending_base_fee: U256,
/// Dependencies that are provided by `PendingTransaction`s
provided_dependencies: HashMap<TransactionId, TxHash>,
/// Pending transactions that are currently on hold until the `baseFee` of the pending block
/// changes in favor of the parked transactions: the `pendingBlock.baseFee` must decrease
/// before they can be moved to the ready pool and are ready to be executed.
parked: ParkedTransactions<T>,
/// All Transactions that are currently ready.
///
/// Meaning, there are no nonce gaps in these transactions and all of them satisfy the
/// `baseFee` condition: transaction `maxFeePerGas >= pendingBlock.baseFee`
ready_transactions: Arc<RwLock<HashMap<TxHash, PendingTransaction<T>>>>,
/// Independent transactions that can be included directly and don't require other
/// transactions.
///
/// Sorted by their scoring value.
independent_transactions: BTreeSet<PoolTransactionRef<T>>,
}
// === impl PendingTransactions ===
impl<T: TransactionOrdering> PendingTransactions<T> {
/// Create a new pool instance
pub(crate) fn new(ordering: Arc<T>) -> Self {
Self {
id: 0,
provided_dependencies: Default::default(),
parked: Default::default(),
ready_transactions: Arc::new(Default::default()),
ordering,
independent_transactions: Default::default(),
pending_base_fee: Default::default(),
}
}
/// Returns an iterator over all transactions that are _currently_ ready.
///
/// 1. The iterator _always_ returns transaction in order: It never returns a transaction with
/// an unsatisfied dependency and only returns them if dependency transaction were yielded
/// previously. In other words: The nonces of transactions with the same sender will _always_
/// increase by exactly 1.
///
/// The order of transactions which satisfy (1.) is determent by their computed priority: A
/// transaction with a higher priority is returned before a transaction with a lower priority.
///
/// If two transactions have the same priority score, then the transactions which spent more
/// time in pool (were added earlier) are returned first.
///
/// NOTE: while this iterator returns transaction that pool considers valid at this point, they
/// could potentially be become invalid at point of execution. Therefore, this iterator
/// provides a way to mark transactions that the consumer of this iterator considers invalid. In
/// which case the transaction's subgraph is also automatically marked invalid, See (1.).
/// Invalid transactions are skipped.
pub(crate) fn get_transactions(&self) -> TransactionsIterator<T> {
TransactionsIterator {
all: self.ready_transactions.read().clone(),
independent: self.independent_transactions.clone(),
awaiting: Default::default(),
invalid: Default::default(),
}
}
/// Sets the given base fee and returns the old one.
pub(crate) fn set_next_base_fee(&mut self, base_fee: U256) -> U256 {
std::mem::replace(&mut self.pending_base_fee, base_fee)
}
/// Returns true if the transaction is part of the queue.
pub(crate) fn contains(&self, hash: &TxHash) -> bool {
self.ready_transactions.read().contains_key(hash)
}
/// Returns the transaction for the hash if it's in the ready pool but not yet mined
pub(crate) fn get(&self, hash: &TxHash) -> Option<PendingTransaction<T>> {
self.ready_transactions.read().get(hash).cloned()
}
pub(crate) fn provided_dependencies(&self) -> &HashMap<TransactionId, TxHash> {
&self.provided_dependencies
}
fn next_id(&mut self) -> u64 {
let id = self.id;
self.id = self.id.wrapping_add(1);
id
}
/// Adds a new transactions to the pending queue.
///
/// Depending on the transaction's feeCap, this will either move it into the ready queue or park
/// it until a future baseFee unlocks it.
///
/// # Panics
///
/// if the pending transaction is not ready
/// or the transaction is already included
pub(crate) fn add_transaction(
&mut self,
tx: QueuedPoolTransaction<T>,
) -> PoolResult<Vec<Arc<ValidPoolTransaction<T::Transaction>>>> {
assert!(tx.is_satisfied(), "transaction must be ready",);
assert!(
!self.ready_transactions.read().contains_key(tx.transaction.hash()),
"transaction already included"
);
let (replaced_tx, unlocks) = self.replaced_transactions(&tx.transaction)?;
let submission_id = self.next_id();
let hash = *tx.transaction.hash();
let mut independent = true;
let mut requires_offset = 0;
let mut ready = self.ready_transactions.write();
// Add links to transactions that unlock the current one
for dependency in &tx.transaction.depends_on {
// Check if the transaction that satisfies the mark is still in the queue.
if let Some(other) = self.provided_dependencies.get(dependency) {
let tx = ready.get_mut(other).expect("hash included;");
tx.unlocks.push(hash);
// tx still depends on other tx
independent = false;
} else {
requires_offset += 1;
}
}
// update dependencies
self.provided_dependencies.insert(tx.transaction.transaction_id, hash);
let priority = self.ordering.priority(&tx.transaction.transaction);
let transaction =
PoolTransactionRef { submission_id, transaction: tx.transaction, priority };
// TODO check basefee requirement
// add to the independent set
if independent {
self.independent_transactions.insert(transaction.clone());
}
// insert to ready queue
ready.insert(hash, PendingTransaction { transaction, unlocks, requires_offset });
Ok(replaced_tx)
}
/// Removes and returns those transactions that got replaced by the `tx`
fn replaced_transactions(
&mut self,
tx: &ValidPoolTransaction<T::Transaction>,
) -> PoolResult<ReplacedTransactions<T>> {
// check if we are replacing transactions
let remove_hashes: HashSet<_> =
if let Some(hash) = self.provided_dependencies.get(&tx.transaction_id) {
HashSet::from([hash])
} else {
return Ok((Vec::new(), Vec::new()))
};
// early exit if we are not replacing anything.
if remove_hashes.is_empty() {
return Ok((Vec::new(), Vec::new()))
}
// check if we're replacing the same transaction and if it can be replaced
let mut unlocked_tx = Vec::new();
{
// construct a list of unlocked transactions
// also check for transactions that shouldn't be replaced because underpriced
let ready = self.ready_transactions.read();
for to_remove in remove_hashes.iter().filter_map(|hash| ready.get(hash)) {
// if we're attempting to replace a transaction that provides the exact same
// dependencies (addr + nonce) then we check for gas price
if to_remove.id().eq(&tx.transaction_id) {
// check if underpriced
// TODO check if underpriced
// if tx.pending_transaction.transaction.gas_price() <= to_remove.gas_price() {
// warn!(target: "txpool", "ready replacement transaction underpriced
// [{:?}]", tx.hash()); return
// Err(PoolError::ReplacementUnderpriced(Box::new(tx.clone())))
// } else {
// trace!(target: "txpool", "replacing ready transaction [{:?}] with higher
// gas price [{:?}]", to_remove.transaction.transaction.hash(), tx.hash());
// }
}
unlocked_tx.extend(to_remove.unlocks.iter().cloned())
}
}
let remove_hashes = remove_hashes.into_iter().copied().collect::<Vec<_>>();
let new_provides = HashSet::from([tx.transaction_id]);
let removed_tx = self.remove_with_dependencies(remove_hashes, Some(new_provides));
Ok((removed_tx, unlocked_tx))
}
/// Removes the transactions from the ready queue and returns the removed transactions.
/// This will also remove all transactions that depend on those.
pub(crate) fn clear_transactions(
&mut self,
tx_hashes: &[TxHash],
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
self.remove_with_dependencies(tx_hashes.to_vec(), None)
}
/// Removes the transactions that was mined.
///
/// This will also remove all transactions that lead to the transaction that provides the
/// id.
pub(crate) fn remove_mined(
&mut self,
id: TransactionId,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let mut removed_tx = vec![];
// the dependencies to remove
let mut remove = vec![id];
while let Some(dependency) = remove.pop() {
let res = self
.provided_dependencies
.remove(&dependency)
.and_then(|hash| self.ready_transactions.write().remove(&hash));
if let Some(tx) = res {
let unlocks = tx.unlocks;
self.independent_transactions.remove(&tx.transaction);
let tx = tx.transaction.transaction;
// also remove previous transactions
{
let hash = tx.hash();
let mut ready = self.ready_transactions.write();
let mut previous_dependency = |dependency| -> Option<Vec<TransactionId>> {
let prev_hash = self.provided_dependencies.get(dependency)?;
let tx2 = ready.get_mut(prev_hash)?;
// remove hash
if let Some(idx) = tx2.unlocks.iter().position(|i| i == hash) {
tx2.unlocks.swap_remove(idx);
}
if tx2.unlocks.is_empty() {
Some(vec![tx2.transaction.transaction.transaction_id])
} else {
None
}
};
// find previous transactions
for dep in &tx.depends_on {
if let Some(mut dependency_to_remove) = previous_dependency(dep) {
remove.append(&mut dependency_to_remove);
}
}
}
// add the transactions that just got unlocked to independent set
for hash in unlocks {
if let Some(tx) = self.ready_transactions.write().get_mut(&hash) {
tx.requires_offset += 1;
if tx.requires_offset == tx.transaction.transaction.depends_on.len() {
self.independent_transactions.insert(tx.transaction.clone());
}
}
}
// finally, remove the dependencies that this transaction provides
let current_dependency = &dependency;
let removed = self.provided_dependencies.remove(&tx.transaction_id);
assert_eq!(
removed.as_ref(),
if current_dependency.eq(&tx.transaction_id) { None } else { Some(tx.hash()) },
"The pool contains exactly one transaction providing given tag; the removed transaction
claims to provide that tag, so it has to be mapped to it's hash; qed"
);
removed_tx.push(tx);
}
}
removed_tx
}
/// Removes transactions and those that depend on them and satisfy at least one dependency in
/// the given filter set.
pub(crate) fn remove_with_dependencies(
&mut self,
mut tx_hashes: Vec<TxHash>,
dependency_filter: Option<HashSet<TransactionId>>,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let mut removed = Vec::new();
let mut ready = self.ready_transactions.write();
while let Some(hash) = tx_hashes.pop() {
if let Some(mut tx) = ready.remove(&hash) {
let id = &tx.transaction.transaction.transaction_id;
// remove the transactions
let removed_transaction = if dependency_filter
.as_ref()
.map(|filter| !filter.contains(id))
.unwrap_or(true)
{
self.provided_dependencies.remove(id);
true
} else {
false
};
// remove from unlocks
for dependency in &tx.transaction.transaction.depends_on {
if let Some(hash) = self.provided_dependencies.get(dependency) {
if let Some(tx) = ready.get_mut(hash) {
if let Some(idx) = tx.unlocks.iter().position(|i| i == hash) {
tx.unlocks.swap_remove(idx);
}
}
}
}
// remove from the independent set
self.independent_transactions.remove(&tx.transaction);
if removed_transaction {
// remove all transactions that the current one unlocks
tx_hashes.append(&mut tx.unlocks);
}
// remove transaction
removed.push(tx.transaction.transaction);
}
}
removed
}
}
/// A transaction that is ready to be included in a block.
#[derive(Debug)]
pub(crate) struct PendingTransaction<T: TransactionOrdering> {
/// Reference to the actual transaction.
transaction: PoolTransactionRef<T>,
/// Tracks the transactions that get unlocked by this transaction.
unlocks: Vec<TxHash>,
/// Amount of required dependencies that are inherently provided.
requires_offset: usize,
}
// == impl PendingTransaction ===
impl<T: TransactionOrdering> PendingTransaction<T> {
/// Returns all ids this transaction satisfies.
pub(crate) fn id(&self) -> &TransactionId {
&self.transaction.transaction.transaction_id
}
}
impl<T: TransactionOrdering> Clone for PendingTransaction<T> {
fn clone(&self) -> Self {
Self {
transaction: self.transaction.clone(),
unlocks: self.unlocks.clone(),
requires_offset: self.requires_offset,
}
}
}
/// A reference to a transaction in the _pending_ pool
#[derive(Debug)]
pub(crate) struct PoolTransactionRef<T: TransactionOrdering> {
/// Actual transaction.
pub(crate) transaction: Arc<ValidPoolTransaction<T::Transaction>>,
/// Identifier that tags when transaction was submitted in the pool.
pub(crate) submission_id: u64,
/// The priority value assigned by the used `Ordering` function.
pub(crate) priority: T::Priority,
}
impl<T: TransactionOrdering> Clone for PoolTransactionRef<T> {
fn clone(&self) -> Self {
Self {
transaction: Arc::clone(&self.transaction),
submission_id: self.submission_id,
priority: self.priority.clone(),
}
}
}
impl<T: TransactionOrdering> Eq for PoolTransactionRef<T> {}
impl<T: TransactionOrdering> PartialEq<Self> for PoolTransactionRef<T> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<T: TransactionOrdering> PartialOrd<Self> for PoolTransactionRef<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T: TransactionOrdering> Ord for PoolTransactionRef<T> {
fn cmp(&self, other: &Self) -> Ordering {
// This compares by `priority` and only if two tx have the exact same priority this compares
// the unique `submission_id`. This ensures that transactions with same priority are not
// equal, so they're not replaced in the set
self.priority
.cmp(&other.priority)
.then_with(|| other.submission_id.cmp(&self.submission_id))
}
}
/// Pending Transactions that are currently parked until their set baseFee becomes valid
struct ParkedTransactions<T: TransactionOrdering> {
/// Keeps track of transactions inserted in the pool.
///
/// This way we can determine when transactions where submitted to the pool.
id: u64,
/// All transactions that are currently parked due to their fee.
parked_transactions: HashMap<TxHash, ParkedTransaction<T>>,
/// Same transactions but sorted by their fee and priority
sorted_transactions: BTreeSet<ParkedTransactionRef<T>>,
}
impl<T: TransactionOrdering> Default for ParkedTransactions<T> {
fn default() -> Self {
Self {
id: 0,
parked_transactions: Default::default(),
sorted_transactions: Default::default(),
}
}
}
/// A transaction that is ready to be included in a block.
#[derive(Debug, Clone)]
pub(crate) struct ParkedTransaction<T: TransactionOrdering> {
/// Reference to the actual transaction.
transaction: PoolTransactionRef<T>,
/// Tracks the transactions that get unlocked by this transaction.
unlocks: Vec<TxHash>,
/// Amount of required dependencies that are inherently provided
requires_offset: usize,
}
/// A reference to a currently _parked_ transaction.
struct ParkedTransactionRef<T: TransactionOrdering> {
/// Actual transaction.
transaction: Arc<ValidPoolTransaction<T::Transaction>>,
/// Identifier that tags when transaction was submitted in the pool.
submission_id: u64,
/// The priority value assigned by the used `Ordering` function.
priority: T::Priority,
/// EIP-1559 Max base fee the caller is willing to pay.
max_fee_per_gas: U256,
}
impl<T: TransactionOrdering> Eq for ParkedTransactionRef<T> {}
impl<T: TransactionOrdering> PartialEq<Self> for ParkedTransactionRef<T> {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}
impl<T: TransactionOrdering> PartialOrd<Self> for ParkedTransactionRef<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T: TransactionOrdering> Ord for ParkedTransactionRef<T> {
fn cmp(&self, other: &Self) -> Ordering {
// This compares the `max_fee_per_gas` value of the transaction
self.max_fee_per_gas
.cmp(&other.max_fee_per_gas)
.then_with(|| self.priority.cmp(&other.priority))
.then_with(|| other.submission_id.cmp(&self.submission_id))
}
}
/// An iterator that returns transactions that can be executed on the current state.
pub struct TransactionsIterator<T: TransactionOrdering> {
all: HashMap<TxHash, PendingTransaction<T>>,
awaiting: HashMap<TxHash, (usize, PoolTransactionRef<T>)>,
independent: BTreeSet<PoolTransactionRef<T>>,
invalid: HashSet<TxHash>,
}
// == impl TransactionsIterator ==
impl<T: TransactionOrdering> TransactionsIterator<T> {
/// Mark the transaction as invalid.
///
/// As a consequence, all values that depend on the invalid one will be skipped.
/// When given transaction is not in the pool it has no effect.
pub(crate) fn mark_invalid(&mut self, tx: &Arc<ValidPoolTransaction<T::Transaction>>) {
if let Some(invalid_transaction) = self.all.get(tx.hash()) {
debug!(
target: "txpool",
"[{:?}] Marked as invalid",
invalid_transaction.transaction.transaction.hash()
);
for hash in &invalid_transaction.unlocks {
self.invalid.insert(*hash);
}
}
}
/// Depending on number of satisfied requirements insert given ref
/// either to awaiting set or to best set.
fn independent_or_awaiting(&mut self, satisfied: usize, tx_ref: PoolTransactionRef<T>) {
if satisfied >= tx_ref.transaction.depends_on.len() {
// If we have satisfied all deps insert to the best set
self.independent.insert(tx_ref);
} else {
// otherwise we're still waiting for some deps
self.awaiting.insert(*tx_ref.transaction.hash(), (satisfied, tx_ref));
}
}
}
impl<T: TransactionOrdering> BestTransactions for TransactionsIterator<T> {
fn mark_invalid(&mut self, tx: &Self::Item) {
TransactionsIterator::mark_invalid(self, tx)
}
}
impl<T: TransactionOrdering> Iterator for TransactionsIterator<T> {
type Item = Arc<ValidPoolTransaction<T::Transaction>>;
fn next(&mut self) -> Option<Self::Item> {
loop {
let best = self.independent.iter().next_back()?.clone();
let best = self.independent.take(&best)?;
let hash = best.transaction.hash();
// skip transactions that were marked as invalid
if self.invalid.contains(hash) {
debug!(
target: "txpool",
"[{:?}] skipping invalid transaction",
hash
);
continue
}
let ready =
if let Some(ready) = self.all.get(hash).cloned() { ready } else { continue };
// Insert transactions that just got unlocked.
for hash in &ready.unlocks {
// first check local awaiting transactions
let res = if let Some((mut satisfied, tx_ref)) = self.awaiting.remove(hash) {
satisfied += 1;
Some((satisfied, tx_ref))
// then get from the pool
} else {
self.all
.get(hash)
.map(|next| (next.requires_offset + 1, next.transaction.clone()))
};
if let Some((satisfied, tx_ref)) = res {
self.independent_or_awaiting(satisfied, tx_ref)
}
}
return Some(best.transaction)
}
}
}

View File

@ -0,0 +1,232 @@
use crate::{
error::PoolResult, identifier::TransactionId, traits::PoolTransaction,
validate::ValidPoolTransaction, TransactionOrdering,
};
use reth_primitives::TxHash;
use std::{
collections::{HashMap, HashSet},
fmt,
sync::Arc,
time::Instant,
};
/// A pool of transactions that are not ready on the current state and are waiting for state changes
/// that turn them valid.
///
/// This could include transactions with nonce gaps: Transactions that are waiting until for a
/// transaction to arrive that closes the nonce gap.
///
/// Keeps a set of transactions that are waiting until their dependencies are unlocked.
pub(crate) struct QueuedTransactions<T: TransactionOrdering> {
/// Dependencies that aren't yet provided by any transaction.
required_dependencies: HashMap<TransactionId, HashSet<TxHash>>,
/// Mapping of the dependencies of a transaction to the hash of the transaction,
waiting_dependencies: HashMap<TransactionId, TxHash>,
/// Transactions that are not ready yet are waiting for another tx to finish,
waiting_queue: HashMap<TxHash, QueuedPoolTransaction<T>>,
}
// == impl QueuedTransactions ==
impl<T: TransactionOrdering> QueuedTransactions<T> {
/// Returns the number of transactions that are currently waiting in this pool for new
/// transactions to satisfy their dependencies.
pub(crate) fn len(&self) -> usize {
self.waiting_queue.len()
}
/// Whether this pool is empty.
pub(crate) fn is_empty(&self) -> bool {
self.waiting_queue.is_empty()
}
/// Returns an iterator over all transactions waiting in this pool.
pub(crate) fn transactions(
&self,
) -> impl Iterator<Item = Arc<ValidPoolTransaction<T::Transaction>>> + '_ {
self.waiting_queue.values().map(|tx| Arc::clone(&tx.transaction))
}
/// Adds a transaction to the queue of transactions
pub(crate) fn add_transaction(&mut self, tx: QueuedPoolTransaction<T>) -> PoolResult<()> {
assert!(!tx.is_satisfied(), "transaction must not be ready");
assert!(
!self.waiting_queue.contains_key(tx.transaction.hash()),
"transaction is already added"
);
if let Some(_replace) = self
.waiting_dependencies
.get(&tx.transaction.transaction_id)
.and_then(|hash| self.waiting_queue.get(hash))
{
// TODO handle transaction underpriced
// // check if underpriced
// if tx.transaction.gas_price() < replace.transaction.gas_price() {
// warn!(target: "txpool", "pending replacement transaction underpriced [{:?}]",
// tx.transaction.hash()); return Err(Error::ReplacementUnderpriced)
// }
}
// add all missing dependencies
for dependency in &tx.missing_dependencies {
self.required_dependencies
.entry(*dependency)
.or_default()
.insert(*tx.transaction.hash());
}
// also track identifying dependencies
self.waiting_dependencies.insert(tx.transaction.transaction_id, *tx.transaction.hash());
// add tx to the queue
self.waiting_queue.insert(*tx.transaction.hash(), tx);
Ok(())
}
/// Returns true if given transaction is part of the queue
pub(crate) fn contains(&self, hash: &TxHash) -> bool {
self.waiting_queue.contains_key(hash)
}
/// Returns the transaction for the hash if it's waiting
pub(crate) fn get(&self, tx_hash: &TxHash) -> Option<&QueuedPoolTransaction<T>> {
self.waiting_queue.get(tx_hash)
}
/// Returns the transactions for the given hashes, `None` if no transaction exists
pub(crate) fn get_all(
&self,
tx_hashes: &[TxHash],
) -> Vec<Option<Arc<ValidPoolTransaction<T::Transaction>>>> {
tx_hashes
.iter()
.map(|hash| self.waiting_queue.get(hash).map(|tx| Arc::clone(&tx.transaction)))
.collect()
}
/// This will check off the dependencies of queued transactions.
///
/// Returns the those transactions that become unlocked (all dependencies checked) and can be
/// moved to the ready queue.
pub(crate) fn satisfy_and_unlock(
&mut self,
id: &TransactionId,
) -> Vec<QueuedPoolTransaction<T>> {
let mut unlocked_ready = Vec::new();
if let Some(tx_hashes) = self.required_dependencies.remove(id) {
for hash in tx_hashes {
let tx = self.waiting_queue.get_mut(&hash).expect("tx is included;");
tx.satisfy(id);
if tx.is_satisfied() {
let tx = self.waiting_queue.remove(&hash).expect("tx is included;");
self.waiting_dependencies.remove(&tx.transaction.transaction_id);
unlocked_ready.push(tx);
}
}
}
unlocked_ready
}
/// Removes the transactions associated with the given hashes
///
/// Returns all removed transactions.
pub(crate) fn remove(
&mut self,
hashes: Vec<TxHash>,
) -> Vec<Arc<ValidPoolTransaction<T::Transaction>>> {
let mut removed = vec![];
for hash in hashes {
if let Some(waiting_tx) = self.waiting_queue.remove(&hash) {
self.waiting_dependencies.remove(&waiting_tx.transaction.transaction_id);
for dependency in waiting_tx.missing_dependencies {
let remove =
if let Some(required) = self.required_dependencies.get_mut(&dependency) {
required.remove(&hash);
required.is_empty()
} else {
false
};
if remove {
self.required_dependencies.remove(&dependency);
}
}
removed.push(waiting_tx.transaction)
}
}
removed
}
}
/// A transaction submitted to the pool.
#[derive(Clone)]
pub(crate) struct QueuedPoolTransaction<T: TransactionOrdering> {
/// The actual validated transaction.
pub(crate) transaction: Arc<ValidPoolTransaction<T::Transaction>>,
/// Transactions required for and have not been satisfied yet by other transactions in the
/// pool.
///
/// This will be an empty list if there are no nonce gaps across multiple transactions of the
/// same sender in the pool. If there are gaps, this will include the missing transactions.
pub(crate) missing_dependencies: HashSet<TransactionId>,
/// Timestamp when the tx was added.
pub(crate) added_at: Instant,
}
impl<T: TransactionOrdering> Default for QueuedTransactions<T> {
fn default() -> Self {
Self {
required_dependencies: Default::default(),
waiting_dependencies: Default::default(),
waiting_queue: Default::default(),
}
}
}
// === impl QuQueuedPoolTransaction ===
impl<T: TransactionOrdering> QueuedPoolTransaction<T> {
/// Creates a new `QueuedPoolTransaction`.
///
/// Determines the dependent transaction that are still missing before this transaction can be
/// moved to the queue.
pub(crate) fn new(
transaction: ValidPoolTransaction<T::Transaction>,
provided: &HashMap<TransactionId, TxHash>,
) -> Self {
let missing_dependencies = transaction
.depends_on
.iter()
.filter(|id| {
// is true if the dependency id is already satisfied either via transaction in the
// pool
!provided.contains_key(&**id)
})
.cloned()
.collect();
Self { transaction: Arc::new(transaction), missing_dependencies, added_at: Instant::now() }
}
/// Removes the required dependency.
pub(crate) fn satisfy(&mut self, id: &TransactionId) {
self.missing_dependencies.remove(id);
}
/// Returns true if transaction has all dependencies are satisfied.
pub(crate) fn is_satisfied(&self) -> bool {
self.missing_dependencies.is_empty()
}
}
impl<T: TransactionOrdering> fmt::Debug for QueuedPoolTransaction<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "QueuedPoolTransaction {{ ")?;
write!(fmt, "added_at: {:?}, ", self.added_at)?;
write!(fmt, "tx: {:?}, ", self.transaction)?;
write!(fmt, "missing_dependencies: [{:?}]", &self.missing_dependencies)?;
write!(fmt, "}}")
}
}

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,123 @@
use crate::{error::PoolResult, validate::ValidPoolTransaction, BlockID};
use futures::channel::mpsc::Receiver;
use reth_primitives::{Address, TxHash, H256, U256};
use std::{fmt, hash::Hash, sync::Arc};
/// General purpose abstraction fo a transaction-pool.
///
/// This is intended to be used by API-consumers such as RPC that need inject new incoming,
/// unverified transactions. And by block production that needs to get transactions to execute in a
/// new block.
#[async_trait::async_trait]
pub trait TransactionPool: Send + Sync {
/// The transaction type of the pool
type Transaction: PoolTransaction + Send + Sync + 'static;
/// Event listener for when a new block was mined.
///
/// Implementers need to update the pool accordingly.
/// For example the base fee of the pending block is determined after a block is mined which
/// affects the dynamic fee requirement of pending transactions in the pool.
async fn on_new_block(&self, event: NewBlockEvent);
/// Adds an _unvalidated_ transaction into the pool.
///
/// Consumer: RPC
async fn add_transaction(
&self,
block_id: BlockID,
transaction: Self::Transaction,
) -> PoolResult<TxHash>;
/// Adds the given _unvalidated_ transaction into the pool.
///
/// Returns a list of results.
///
/// Consumer: RPC
async fn add_transactions(
&self,
block_id: BlockID,
transactions: Vec<Self::Transaction>,
) -> PoolResult<Vec<PoolResult<TxHash>>>;
/// Returns a new Stream that yields transactions hashes for new ready transactions.
///
/// Consumer: RPC
fn ready_transactions_listener(&self) -> Receiver<TxHash>;
/// Returns an iterator that yields transactions that are ready for block production.
///
/// Consumer: Block production
fn best_transactions(
&self,
) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>>;
/// Removes all transactions corresponding to the given hashes.
///
/// Also removes all dependent transactions.
///
/// Consumer: Block production
fn remove_invalid(
&self,
tx_hashes: &[TxHash],
) -> Vec<Arc<ValidPoolTransaction<Self::Transaction>>>;
}
/// Event fired when a new block was mined
#[derive(Debug, Clone)]
pub struct NewBlockEvent {
/// Hash of the added block.
pub hash: H256,
/// EIP-1559 Base fee of the _next_ (pending) block
///
/// The base fee of a block depends on the utilization of the last block and its base fee.
pub pending_block_base_fee: U256,
/// Provides a set of state changes that affected the accounts.
// TODO based on the account changes, we can recheck balance
pub state_changes: (),
}
/// An `Iterator` that only returns transactions that are ready to be executed.
///
/// This makes no assumptions about the order of the transactions, but expects that _all_
/// transactions are valid (no nonce gaps.).
pub trait BestTransactions: Iterator + Send {
/// Mark the transaction as invalid.
///
/// Implementers must ensure all subsequent transaction _don't_ depend on this transaction.
/// In other words, this must remove the given transaction _and_ drain all transaction that
/// depend on it.
fn mark_invalid(&mut self, transaction: &Self::Item);
}
/// A no-op implementation that yields no transactions.
impl<T> BestTransactions for std::iter::Empty<T> {
fn mark_invalid(&mut self, _tx: &T) {}
}
/// Trait for transaction types used inside the pool
pub trait PoolTransaction: fmt::Debug + Send + Send + 'static {
/// Hash of the transaction
fn hash(&self) -> &TxHash;
/// The Sender of the transaction.
fn sender(&self) -> &Address;
/// Returns the nonce for this transaction.
fn nonce(&self) -> u64;
/// Calculates the cost that this transaction is allowed to consume:
///
/// For EIP-1559 transactions that is `feeCap x gasLimit + transferred_value`
fn cost(&self) -> U256;
/// Returns the EIP-1559 Max base fee the caller is willing to pay.
///
/// This will return `None` for non-EIP1559 transactions
fn max_fee_per_gas(&self) -> Option<&U256>;
/// Returns the EIP-1559 Priority fee the caller is paying to the block author.
///
/// This will return `None` for non-EIP1559 transactions
fn max_priority_fee_per_gas(&self) -> Option<&U256>;
}

View File

@ -0,0 +1,89 @@
//! Transaction validation abstractions.
use crate::{
error::PoolError,
identifier::{SenderId, TransactionId},
traits::PoolTransaction,
};
use reth_primitives::{BlockID, TxHash, U256};
use std::fmt;
/// A Result type returned after checking a transaction's validity.
pub enum TransactionValidationOutcome<T: PoolTransaction> {
/// Transaction successfully validated
Valid {
/// Balance of the sender at the current point.
balance: U256,
/// current nonce of the sender
state_nonce: u64,
/// Validated transaction.
transaction: T,
},
/// The transaction is considered invalid.
///
/// Note: This does not indicate whether the transaction will not be valid in the future
Invalid(T, PoolError),
}
/// Provides support for validating transaction at any given state of the chain
#[async_trait::async_trait]
pub trait TransactionValidator: Send + Sync {
/// The transaction type to validate.
type Transaction: PoolTransaction + Send + Sync + 'static;
/// Validates the transaction and returns a validated outcome
///
/// This is used by the transaction-pool check the transaction's validity against the state of
/// the given block hash.
///
/// This is supposed to extend the `transaction` with its id in the graph of
/// transactions for the sender.
async fn validate_transaction(
&self,
_block_id: &BlockID,
_transaction: Self::Transaction,
) -> TransactionValidationOutcome<Self::Transaction> {
unimplemented!()
}
}
/// A valida transaction in the pool.
pub struct ValidPoolTransaction<T: PoolTransaction> {
/// The transaction
pub transaction: T,
/// Ids required by the transaction.
///
/// This lists all unique transactions that need to be mined before this transaction can be
/// considered `pending` and itself be included.
pub depends_on: Vec<TransactionId>,
/// The identifier for this transaction.
pub transaction_id: TransactionId,
/// Whether to propagate the transaction.
pub propagate: bool,
/// Internal `Sender` identifier
pub sender_id: SenderId,
/// Total cost of the transaction: `feeCap x gasLimit + transferred_value`
pub cost: U256,
// TODO add a block timestamp that marks validity
}
// === impl ValidPoolTransaction ===
impl<T: PoolTransaction> ValidPoolTransaction<T> {
/// Returns the hash of the transaction
pub fn hash(&self) -> &TxHash {
self.transaction.hash()
}
}
impl<T: PoolTransaction> fmt::Debug for ValidPoolTransaction<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "Transaction {{ ")?;
write!(fmt, "hash: {:?}, ", &self.transaction.hash())?;
write!(fmt, "provides: {:?}, ", &self.transaction_id)?;
write!(fmt, "depends_on: {:?}, ", &self.depends_on)?;
write!(fmt, "raw tx: {:?}", &self.transaction)?;
write!(fmt, "}}")?;
Ok(())
}
}

View File

@ -0,0 +1,3 @@
//! transaction-pool integration tests
fn main() {}