refactor(txpool): simplify layers and add docs (#52)

* chore: some cleanup

* refactor(txpool): simplify layers and add docs
This commit is contained in:
Matthias Seitz
2022-10-12 17:58:36 +02:00
committed by GitHub
parent ae5935e6b2
commit 4fb99848fd
6 changed files with 161 additions and 143 deletions

View File

@ -6,12 +6,86 @@
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Reth's transaction pool implementation
//! Reth's transaction pool implementation.
//!
//! This crate provides a generic transaction pool implementation.
//!
//! ## Functionality
//!
//! The transaction pool is responsible for
//!
//! - recording incoming transactions
//! - providing existing transactions
//! - ordering and providing the best transactions for block production
//! - monitoring memory footprint and enforce pool size limits
//!
//! ## Assumptions
//!
//! ### Transaction type
//!
//! The pool expects certain ethereum related information from the generic transaction type of the
//! pool ([`PoolTransaction`]), this includes gas price, base fee (EIP-1559 transactions), nonce
//! etc. It makes no assumptions about the encoding format, but the transaction type must report its
//! size so pool size limits (memory) can be enforced.
//!
//! ### Transaction ordering
//!
//! The pending pool contains transactions that can be mined on the current state.
//! The order in which they're returned are determined by a `Priority` value returned by the
//! `TransactionOrdering` type this pool is configured with.
//!
//! This is only used in the _pending_ pool to yield the best transactions for block production. The
//! _base pool_ is ordered by base fee, and the _queued pool_ by current distance.
//!
//! ### Validation
//!
//! The pool itself does not validate incoming transactions, instead this should be provided by
//! implementing `TransactionsValidator`. Only transactions that the validator returns as valid are
//! included in the pool. It is assumed that transaction that are in the pool are either valid on
//! the current state or could become valid after certain state changes. transaction that can never
//! become valid (e.g. nonce lower than current on chain nonce) will never be added to the pool and
//! instead are discarded right away.
//!
//! ### State Changes
//!
//! Once a new block is mined, the pool needs to be updated with a changeset in order to:
//!
//! - remove mined transactions
//! - update using account changes: balance changes
//! - base fee updates
//!
//! ## Implementation details
//!
//! The `TransactionPool` trait exposes all externally used functionality of the pool, such as
//! inserting, querying specific transactions by hash or retrieving the best transactions.
//! Additionally, it allows to register event listeners for new ready transactions or state changes.
//! Events are communicated via channels.
//!
//! ### Architecture
//!
//! The final `TransactionPool` is made up of two layers:
//!
//! The lowest layer is the actual pool implementations that manages (validated) transactions:
//! [`TxPool`](crate::pool::TxPool). This is contained in a higher level pool type that guards the
//! low level pool and handles additional listeners or metrics:
//! [`PoolInner`](crate::pool::PoolInner)
//!
//! The transaction pool will be used by separate consumers (RPC, P2P), to make sharing easier, the
//! [`Pool`](crate::Pool) type is just an `Arc` wrapper around `PoolInner`. This is the usable type
//! that provides the `TransactionPool` interface.
pub use crate::{
client::PoolClient,
config::PoolConfig,
ordering::TransactionOrdering,
traits::{BestTransactions, NewBlockEvent, PoolTransaction, TransactionPool},
validate::{TransactionValidationOutcome, TransactionValidator},
};
use crate::{error::PoolResult, pool::PoolInner, validate::ValidPoolTransaction};
use futures::channel::mpsc::Receiver;
use parking_lot::Mutex;
use reth_primitives::{BlockID, TxHash, U256, U64};
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
mod client;
mod config;
@ -25,24 +99,10 @@ mod validate;
#[cfg(test)]
mod test_util;
pub use crate::{
client::PoolClient,
config::PoolConfig,
ordering::TransactionOrdering,
pool::BasicPool,
traits::{BestTransactions, NewBlockEvent, PoolTransaction, TransactionPool},
validate::{TransactionValidationOutcome, TransactionValidator},
};
use crate::{error::PoolResult, validate::ValidPoolTransaction};
/// A generic, customizable `TransactionPool` implementation.
/// A shareable, generic, customizable `TransactionPool` implementation.
pub struct Pool<P: PoolClient, T: TransactionOrdering> {
/// The actual transaction pool where transactions and subscriptions are handled.
pool: BasicPool<P, T>,
/// Tracks status updates linked to chain events.
update_status: Arc<Mutex<UpdateStatus>>,
/// Chain/Storage access.
client: Arc<P>,
/// Arc'ed instance of the pool internals
pool: Arc<PoolInner<P, T>>,
}
// === impl Pool ===
@ -52,10 +112,55 @@ where
P: PoolClient,
T: TransactionOrdering<Transaction = <P as TransactionValidator>::Transaction>,
{
/// Creates a new `Pool` with the given config and client and ordering.
/// Create a new transaction pool instance.
pub fn new(client: Arc<P>, ordering: Arc<T>, config: PoolConfig) -> Self {
let pool = BasicPool::new(Arc::clone(&client), ordering, config);
Self { pool, update_status: Arc::new(Default::default()), client }
Self { pool: Arc::new(PoolInner::new(client, ordering, config)) }
}
/// Returns the wrapped pool
pub(crate) fn inner(&self) -> &PoolInner<P, T> {
&self.pool
}
/// Returns the actual block number for the block id
fn resolve_block_number(&self, block_id: &BlockID) -> PoolResult<U64> {
self.pool.client().ensure_block_number(block_id)
}
/// Returns future that validates all transaction in the given iterator at the block the
/// `block_id` points to.
async fn validate_all(
&self,
block_id: &BlockID,
transactions: impl IntoIterator<Item = P::Transaction>,
) -> PoolResult<HashMap<TxHash, TransactionValidationOutcome<P::Transaction>>> {
// get the actual block number which is required to validate the transactions
let block_number = self.resolve_block_number(block_id)?;
let outcome = futures::future::join_all(
transactions.into_iter().map(|tx| self.validate(block_id, block_number, tx)),
)
.await
.into_iter()
.collect::<HashMap<_, _>>();
Ok(outcome)
}
/// Validates the given transaction at the given block
async fn validate(
&self,
block_id: &BlockID,
_block_number: U64,
transaction: P::Transaction,
) -> (TxHash, TransactionValidationOutcome<P::Transaction>) {
let _hash = *transaction.hash();
// TODO this is where additional validate checks would go, like banned senders etc...
let _res = self.pool.client().validate_transaction(block_id, transaction).await;
// TODO blockstamp the transaction
todo!()
}
}
@ -78,7 +183,10 @@ where
block_id: BlockID,
transaction: Self::Transaction,
) -> PoolResult<TxHash> {
self.pool.clone().add_transaction(&block_id, transaction).await
self.add_transactions(block_id, vec![transaction])
.await?
.pop()
.expect("transaction exists; qed")
}
async fn add_transactions(
@ -86,17 +194,19 @@ where
block_id: BlockID,
transactions: Vec<Self::Transaction>,
) -> PoolResult<Vec<PoolResult<TxHash>>> {
self.pool.clone().add_transactions(&block_id, transactions).await
let validated = self.validate_all(&block_id, transactions).await?;
let transactions = self.pool.add_transactions(validated.into_values());
Ok(transactions)
}
fn ready_transactions_listener(&self) -> Receiver<TxHash> {
self.pool.ready_transactions_listener()
self.pool.add_ready_listener()
}
fn best_transactions(
&self,
) -> Box<dyn BestTransactions<Item = Arc<ValidPoolTransaction<Self::Transaction>>>> {
Box::new(self.pool.inner().ready_transactions())
Box::new(self.pool.ready_transactions())
}
fn remove_invalid(
@ -115,3 +225,9 @@ struct UpdateStatus {
/// Current base fee that needs to be enforced
base_fee: U256,
}
impl<P: PoolClient, O: TransactionOrdering> Clone for Pool<P, O> {
fn clone(&self) -> Self {
Self { pool: Arc::clone(&self.pool) }
}
}

View File

@ -1,18 +1,19 @@
use crate::traits::PoolTransaction;
use std::fmt;
/// Transaction ordering trait to determine the order of transactions.
///
/// Decides how transactions should be ordered within the pool.
/// Decides how transactions should be ordered within the pool, depending on a `Priority` value.
///
/// The returned priority must reflect natural `Ordering`
// TODO(mattsse) this should be extended so it provides a way to rank transaction in relation to
// each other.
/// The returned priority must reflect [total order](https://en.wikipedia.org/wiki/Total_order).
pub trait TransactionOrdering: Send + Sync + 'static {
/// Priority of a transaction.
///
/// Higher is better.
type Priority: Ord + Clone + Default + fmt::Debug + Send + Sync;
/// The transaction type to score.
type Transaction: PoolTransaction + Send + Sync + 'static;
/// The transaction type to determine the priority of.
type Transaction: PoolTransaction;
/// Returns the priority score for the given transaction.
fn priority(&self, transaction: &Self::Transaction) -> Self::Priority;

View File

@ -88,110 +88,11 @@ pub mod txpool;
use crate::{pool::txpool::TxPool, validate::TransactionValidationOutcome};
pub use events::TransactionEvent;
/// Shareable Transaction pool.
pub struct BasicPool<P: PoolClient, T: TransactionOrdering> {
/// Arc'ed instance of the pool internals
pool: Arc<PoolInner<P, T>>,
}
// === impl Pool ===
impl<P: PoolClient, T: TransactionOrdering> BasicPool<P, T>
where
P: PoolClient,
T: TransactionOrdering<Transaction = <P as TransactionValidator>::Transaction>,
{
/// Create a new transaction pool instance.
pub fn new(client: Arc<P>, ordering: Arc<T>, config: PoolConfig) -> Self {
Self { pool: Arc::new(PoolInner::new(client, ordering, config)) }
}
/// Returns the wrapped pool
pub(crate) fn inner(&self) -> &PoolInner<P, T> {
&self.pool
}
/// Returns the actual block number for the block id
fn resolve_block_number(&self, block_id: &BlockID) -> PoolResult<U64> {
self.pool.client().ensure_block_number(block_id)
}
/// Add a single _unverified_ transaction into the pool.
pub async fn add_transaction(
&self,
block_id: &BlockID,
transaction: P::Transaction,
) -> PoolResult<TxHash> {
self.add_transactions(block_id, Some(transaction))
.await?
.pop()
.expect("transaction exists; qed")
}
/// Adds all given transactions into the pool
pub async fn add_transactions(
&self,
block_id: &BlockID,
transactions: impl IntoIterator<Item = P::Transaction>,
) -> PoolResult<Vec<PoolResult<TxHash>>> {
let validated = self.validate_all(block_id, transactions).await?;
let transactions = self.pool.add_transactions(validated.into_values());
Ok(transactions)
}
/// Returns future that validates all transaction in the given iterator at the block the
/// `block_id` points to.
async fn validate_all(
&self,
block_id: &BlockID,
transactions: impl IntoIterator<Item = P::Transaction>,
) -> PoolResult<HashMap<TxHash, TransactionValidationOutcome<P::Transaction>>> {
// get the actual block number which is required to validate the transactions
let block_number = self.resolve_block_number(block_id)?;
let outcome = futures::future::join_all(
transactions.into_iter().map(|tx| self.validate(block_id, block_number, tx)),
)
.await
.into_iter()
.collect::<HashMap<_, _>>();
Ok(outcome)
}
/// Validates the given transaction at the given block
async fn validate(
&self,
block_id: &BlockID,
_block_number: U64,
transaction: P::Transaction,
) -> (TxHash, TransactionValidationOutcome<P::Transaction>) {
let _hash = *transaction.hash();
// TODO this is where additional validate checks would go, like banned senders etc...
let _res = self.pool.client().validate_transaction(block_id, transaction).await;
// TODO blockstamp the transaction
todo!()
}
/// Registers a new transaction listener and returns the receiver stream.
pub fn ready_transactions_listener(&self) -> Receiver<TxHash> {
self.pool.add_ready_listener()
}
}
impl<P: PoolClient, O: TransactionOrdering> Clone for BasicPool<P, O> {
fn clone(&self) -> Self {
Self { pool: Arc::clone(&self.pool) }
}
}
/// Transaction pool internals.
pub struct PoolInner<P: PoolClient, T: TransactionOrdering> {
/// Chain/Storage access.
client: Arc<P>,
/// The internal pool that manages
/// The internal pool that manages all transactions.
pool: RwLock<TxPool<T>>,
/// Pool settings.
config: PoolConfig,
@ -310,7 +211,7 @@ where
listener.ready(&tx.hash, None);
// TODO more listeners for discarded, removed etc...
}
AddedTransaction::Queued { hash } => {
AddedTransaction::Parked { hash } => {
listener.queued(hash);
}
}
@ -352,17 +253,17 @@ impl<T: PoolTransaction> AddedPendingTransaction<T> {
pub enum AddedTransaction<T: PoolTransaction> {
/// Transaction was successfully added and moved to the pending pool.
Pending(AddedPendingTransaction<T>),
/// Transaction was successfully added but not yet queued for processing and moved to the
/// queued pool instead.
Queued {
/// the hash of the submitted transaction
/// Transaction was successfully added but not yet ready for processing and moved to a
/// parked pool instead.
Parked {
/// Hash of the submitted transaction that is currently parked.
hash: TxHash,
},
}
impl<T: PoolTransaction> AddedTransaction<T> {
/// Returns the hash of the transaction if it's ready
pub fn as_ready(&self) -> Option<&TxHash> {
/// Returns the hash of the transaction if it's pending
pub fn as_pending(&self) -> Option<&TxHash> {
if let AddedTransaction::Pending(tx) = self {
Some(&tx.hash)
} else {
@ -374,7 +275,7 @@ impl<T: PoolTransaction> AddedTransaction<T> {
pub fn hash(&self) -> &TxHash {
match self {
AddedTransaction::Pending(tx) => &tx.hash,
AddedTransaction::Queued { hash } => hash,
AddedTransaction::Parked { hash } => hash,
}
}
}

View File

@ -162,7 +162,7 @@ impl<T: TransactionOrdering> TxPool<T> {
removed,
})
} else {
AddedTransaction::Queued { hash }
AddedTransaction::Parked { hash }
};
Ok(res)

View File

@ -11,7 +11,7 @@ use std::{fmt, hash::Hash, sync::Arc};
#[async_trait::async_trait]
pub trait TransactionPool: Send + Sync {
/// The transaction type of the pool
type Transaction: PoolTransaction + Send + Sync + 'static;
type Transaction: PoolTransaction;
/// Event listener for when a new block was mined.
///

View File

@ -29,7 +29,7 @@ pub enum TransactionValidationOutcome<T: PoolTransaction> {
#[async_trait::async_trait]
pub trait TransactionValidator: Send + Sync {
/// The transaction type to validate.
type Transaction: PoolTransaction + Send + Sync + 'static;
type Transaction: PoolTransaction;
/// Validates the transaction and returns a validated outcome
///