From edd0e852f2c764ee73941f83cdbfca7e9cdd4a22 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 17 Oct 2022 16:14:18 +0200 Subject: [PATCH] feat(txpool): introduce transaction origin (#85) --- crates/transaction-pool/src/lib.rs | 26 +++++++++++----- crates/transaction-pool/src/pool/mod.rs | 7 +++-- crates/transaction-pool/src/test_util/mock.rs | 2 ++ crates/transaction-pool/src/traits.rs | 31 ++++++++++++++++++- crates/transaction-pool/src/validate.rs | 6 +++- 5 files changed, 60 insertions(+), 12 deletions(-) diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 3d9db9bc1..69a9dc5c3 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -82,7 +82,10 @@ pub use crate::{ validate::{TransactionValidationOutcome, TransactionValidator}, }; use crate::{ - error::PoolResult, pool::PoolInner, traits::NewTransactionEvent, validate::ValidPoolTransaction, + error::PoolResult, + pool::PoolInner, + traits::{NewTransactionEvent, TransactionOrigin}, + validate::ValidPoolTransaction, }; use futures::channel::mpsc::Receiver; use reth_primitives::{BlockID, TxHash, U256, U64}; @@ -125,10 +128,11 @@ where /// Returns future that validates all transaction in the given iterator. async fn validate_all( &self, + origin: TransactionOrigin, transactions: impl IntoIterator, ) -> PoolResult>> { let outcome = - futures::future::join_all(transactions.into_iter().map(|tx| self.validate(tx))) + futures::future::join_all(transactions.into_iter().map(|tx| self.validate(origin, tx))) .await .into_iter() .collect::>(); @@ -139,12 +143,13 @@ where /// Validates the given transaction async fn validate( &self, + origin: TransactionOrigin, transaction: V::Transaction, ) -> (TxHash, TransactionValidationOutcome) { let hash = *transaction.hash(); // TODO(mattsse): this is where additional validate checks would go, like banned senders // etc... - let outcome = self.pool.validator().validate_transaction(transaction).await; + let outcome = self.pool.validator().validate_transaction(origin, transaction).await; (hash, outcome) } @@ -174,17 +179,22 @@ where todo!() } - async fn add_transaction(&self, transaction: Self::Transaction) -> PoolResult { - let (_, tx) = self.validate(transaction).await; - self.pool.add_transactions(std::iter::once(tx)).pop().expect("exists; qed") + async fn add_transaction( + &self, + origin: TransactionOrigin, + transaction: Self::Transaction, + ) -> PoolResult { + let (_, tx) = self.validate(origin, transaction).await; + self.pool.add_transactions(origin, std::iter::once(tx)).pop().expect("exists; qed") } async fn add_transactions( &self, + origin: TransactionOrigin, transactions: Vec, ) -> PoolResult>> { - let validated = self.validate_all(transactions).await?; - let transactions = self.pool.add_transactions(validated.into_values()); + let validated = self.validate_all(origin, transactions).await?; + let transactions = self.pool.add_transactions(origin, validated.into_values()); Ok(transactions) } diff --git a/crates/transaction-pool/src/pool/mod.rs b/crates/transaction-pool/src/pool/mod.rs index d59f12a25..83810c2c2 100644 --- a/crates/transaction-pool/src/pool/mod.rs +++ b/crates/transaction-pool/src/pool/mod.rs @@ -67,7 +67,7 @@ use crate::{ error::PoolResult, identifier::{SenderId, SenderIdentifiers, TransactionId}, pool::{listener::PoolEventListener, state::SubPool, txpool::TxPool}, - traits::{NewTransactionEvent, PoolTransaction}, + traits::{NewTransactionEvent, PoolTransaction, TransactionOrigin}, validate::{TransactionValidationOutcome, ValidPoolTransaction}, PoolConfig, TransactionOrdering, TransactionValidator, U256, }; @@ -166,6 +166,7 @@ where /// Add a single validated transaction into the pool. fn add_transaction( &self, + origin: TransactionOrigin, tx: TransactionValidationOutcome, ) -> PoolResult { match tx { @@ -180,6 +181,7 @@ where propagate: false, is_local: false, timestamp: Instant::now(), + origin, }; let added = self.pool.write().add_transaction(tx, balance, state_nonce)?; @@ -208,11 +210,12 @@ where /// Adds all transactions in the iterator to the pool, returning a list of results. pub fn add_transactions( &self, + origin: TransactionOrigin, transactions: impl IntoIterator>, ) -> Vec> { // TODO check pool limits - transactions.into_iter().map(|tx| self.add_transaction(tx)).collect::>() + transactions.into_iter().map(|tx| self.add_transaction(origin, tx)).collect::>() } /// Notify all listeners about a new pending transaction. diff --git a/crates/transaction-pool/src/test_util/mock.rs b/crates/transaction-pool/src/test_util/mock.rs index 7022b3102..5aa496c95 100644 --- a/crates/transaction-pool/src/test_util/mock.rs +++ b/crates/transaction-pool/src/test_util/mock.rs @@ -3,6 +3,7 @@ use crate::{ identifier::{SenderIdentifiers, TransactionId}, pool::txpool::{TxPool, MIN_PROTOCOL_BASE_FEE}, + traits::TransactionOrigin, PoolTransaction, TransactionOrdering, ValidPoolTransaction, }; use paste::paste; @@ -351,6 +352,7 @@ impl MockTransactionFactory { cost: transaction.cost(), transaction, timestamp: Instant::now(), + origin: TransactionOrigin::External, } } diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index d67a052fa..576ef15c5 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -23,7 +23,11 @@ pub trait TransactionPool: Send + Sync + 'static { /// Adds an _unvalidated_ transaction into the pool. /// /// Consumer: RPC - async fn add_transaction(&self, transaction: Self::Transaction) -> PoolResult; + async fn add_transaction( + &self, + origin: TransactionOrigin, + transaction: Self::Transaction, + ) -> PoolResult; /// Adds the given _unvalidated_ transaction into the pool. /// @@ -32,6 +36,7 @@ pub trait TransactionPool: Send + Sync + 'static { /// Consumer: RPC async fn add_transactions( &self, + origin: TransactionOrigin, transactions: Vec, ) -> PoolResult>>; @@ -94,6 +99,30 @@ impl Clone for NewTransactionEvent { } } +/// Where the transaction originates from. +/// +/// Depending on where the transaction was picked up, it affects how the transaction is handled +/// internally, e.g. limits for simultaneous transaction of one sender. +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum TransactionOrigin { + /// Transaction is coming from a local source. + Local, + /// Transaction has been received externally. + /// + /// This is usually considered an "untrusted" source, for example received from another in the + /// network. + External, +} + +// === impl TransactionOrigin === + +impl TransactionOrigin { + /// Whether the transaction originates from a local source. + pub fn is_local(&self) -> bool { + matches!(self, TransactionOrigin::Local) + } +} + /// Event fired when a new block was mined #[derive(Debug, Clone)] pub struct NewBlockEvent { diff --git a/crates/transaction-pool/src/validate.rs b/crates/transaction-pool/src/validate.rs index 146d253e4..ed0762c08 100644 --- a/crates/transaction-pool/src/validate.rs +++ b/crates/transaction-pool/src/validate.rs @@ -3,7 +3,7 @@ use crate::{ error::PoolError, identifier::{SenderId, TransactionId}, - traits::PoolTransaction, + traits::{PoolTransaction, TransactionOrigin}, }; use reth_primitives::{rpc::Address, BlockID, TxHash, U256}; use std::{fmt, time::Instant}; @@ -38,6 +38,7 @@ pub trait TransactionValidator: Send + Sync + 'static { /// transactions for the sender. async fn validate_transaction( &self, + origin: TransactionOrigin, _transaction: Self::Transaction, ) -> TransactionValidationOutcome; } @@ -56,6 +57,8 @@ pub struct ValidPoolTransaction { pub cost: U256, /// Timestamp when this was added to the pool. pub timestamp: Instant, + /// Where this transaction originated from. + pub origin: TransactionOrigin, } // === impl ValidPoolTransaction === @@ -112,6 +115,7 @@ impl Clone for ValidPoolTransaction { is_local: self.is_local, cost: self.cost, timestamp: self.timestamp, + origin: self.origin, } } }