feat(transaction-pool): make EthTransactionValidator generic over Validator (#4258)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Thomas Coratger
2023-08-18 13:28:09 +02:00
committed by GitHub
parent 5039b3b582
commit efab153cd9
6 changed files with 163 additions and 132 deletions

View File

@ -65,7 +65,7 @@ use reth_stages::{
MetricEventsSender, MetricsListener,
};
use reth_tasks::TaskExecutor;
use reth_transaction_pool::{EthTransactionValidator, TransactionPool};
use reth_transaction_pool::{TransactionPool, TransactionValidationTaskExecutor};
use secp256k1::SecretKey;
use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
@ -263,7 +263,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
let blockchain_db = BlockchainProvider::new(factory, blockchain_tree.clone())?;
let transaction_pool = reth_transaction_pool::Pool::eth_pool(
EthTransactionValidator::with_additional_tasks(
TransactionValidationTaskExecutor::eth_with_additional_tasks(
blockchain_db.clone(),
Arc::clone(&self.chain),
ctx.task_executor.clone(),

View File

@ -94,10 +94,10 @@
//! use reth_primitives::MAINNET;
//! use reth_provider::{ChainSpecProvider, StateProviderFactory};
//! use reth_tasks::TokioTaskExecutor;
//! use reth_transaction_pool::{EthTransactionValidator, Pool, TransactionPool};
//! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool, TransactionPool};
//! async fn t<C>(client: C) where C: StateProviderFactory + ChainSpecProvider + Clone + 'static{
//! let pool = Pool::eth_pool(
//! EthTransactionValidator::new(client, MAINNET.clone(), TokioTaskExecutor::default()),
//! TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), TokioTaskExecutor::default()),
//! Default::default(),
//! );
//! let mut transactions = pool.pending_transactions_listener();
@ -119,14 +119,14 @@
//! use reth_primitives::MAINNET;
//! use reth_provider::{BlockReaderIdExt, CanonStateNotification, ChainSpecProvider, StateProviderFactory};
//! use reth_tasks::TokioTaskExecutor;
//! use reth_transaction_pool::{EthTransactionValidator, Pool};
//! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool};
//! use reth_transaction_pool::maintain::maintain_transaction_pool_future;
//! async fn t<C, St>(client: C, stream: St)
//! where C: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + 'static,
//! St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
//! {
//! let pool = Pool::eth_pool(
//! EthTransactionValidator::new(client.clone(), MAINNET.clone(), TokioTaskExecutor::default()),
//! TransactionValidationTaskExecutor::eth(client.clone(), MAINNET.clone(), TokioTaskExecutor::default()),
//! Default::default(),
//! );
//!
@ -170,8 +170,8 @@ pub use crate::{
TransactionPoolExt,
},
validate::{
EthTransactionValidator, TransactionValidationOutcome, TransactionValidator,
ValidPoolTransaction,
EthTransactionValidator, TransactionValidationOutcome, TransactionValidationTaskExecutor,
TransactionValidator, ValidPoolTransaction,
},
};
@ -263,14 +263,14 @@ where
impl<Client>
Pool<
EthTransactionValidator<Client, EthPooledTransaction>,
TransactionValidationTaskExecutor<EthTransactionValidator<Client, EthPooledTransaction>>,
CoinbaseTipOrdering<EthPooledTransaction>,
>
where
Client: StateProviderFactory + Clone + 'static,
{
/// Returns a new [Pool] that uses the default [EthTransactionValidator] when validating
/// [EthPooledTransaction]s and ords via [CoinbaseTipOrdering]
/// Returns a new [Pool] that uses the default [TransactionValidationTaskExecutor] when
/// validating [EthPooledTransaction]s and ords via [CoinbaseTipOrdering]
///
/// # Example
///
@ -278,16 +278,18 @@ where
/// use reth_provider::StateProviderFactory;
/// use reth_primitives::MAINNET;
/// use reth_tasks::TokioTaskExecutor;
/// use reth_transaction_pool::{EthTransactionValidator, Pool};
/// use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool};
/// # fn t<C>(client: C) where C: StateProviderFactory + Clone + 'static{
/// let pool = Pool::eth_pool(
/// EthTransactionValidator::new(client, MAINNET.clone(), TokioTaskExecutor::default()),
/// TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), TokioTaskExecutor::default()),
/// Default::default(),
/// );
/// # }
/// ```
pub fn eth_pool(
validator: EthTransactionValidator<Client, EthPooledTransaction>,
validator: TransactionValidationTaskExecutor<
EthTransactionValidator<Client, EthPooledTransaction>,
>,
config: PoolConfig,
) -> Self {
Self::new(validator, CoinbaseTipOrdering::default(), config)

View File

@ -3,11 +3,8 @@
use crate::{
error::InvalidPoolTransactionError,
traits::{PoolTransaction, TransactionOrigin},
validate::{
task::ValidationJobSender, TransactionValidatorError, ValidTransaction, ValidationTask,
MAX_INIT_CODE_SIZE, TX_MAX_SIZE,
},
TransactionValidationOutcome, TransactionValidator,
validate::{ValidTransaction, ValidationTask, MAX_INIT_CODE_SIZE, TX_MAX_SIZE},
TransactionValidationOutcome, TransactionValidationTaskExecutor, TransactionValidator,
};
use reth_primitives::{
constants::ETHEREUM_BLOCK_GAS_LIMIT, ChainSpec, InvalidTransactionError, EIP1559_TX_TYPE_ID,
@ -16,113 +13,16 @@ use reth_primitives::{
use reth_provider::{AccountReader, StateProviderFactory};
use reth_tasks::TaskSpawner;
use std::{marker::PhantomData, sync::Arc};
use tokio::sync::{oneshot, Mutex};
use tokio::sync::Mutex;
/// A [TransactionValidator] implementation that validates ethereum transaction.
///
/// This validator is non-blocking, all validation work is done in a separate task.
#[derive(Debug, Clone)]
/// Validator for Ethereum transactions.
#[derive(Debug)]
pub struct EthTransactionValidator<Client, T> {
/// The type that performs the actual validation.
inner: Arc<EthTransactionValidatorInner<Client, T>>,
/// The sender half to validation tasks that perform the actual validation.
to_validation_task: Arc<Mutex<ValidationJobSender>>,
pub inner: Arc<EthTransactionValidatorInner<Client, T>>,
}
// === impl EthTransactionValidator ===
impl EthTransactionValidator<(), ()> {
/// Convenience method to create a [EthTransactionValidatorBuilder]
pub fn builder(chain_spec: Arc<ChainSpec>) -> EthTransactionValidatorBuilder {
EthTransactionValidatorBuilder::new(chain_spec)
}
}
impl<Client, Tx> EthTransactionValidator<Client, Tx> {
/// Creates a new instance for the given [ChainSpec]
///
/// This will spawn a single validation tasks that performs the actual validation.
/// See [EthTransactionValidator::with_additional_tasks]
pub fn new<T>(client: Client, chain_spec: Arc<ChainSpec>, tasks: T) -> Self
where
T: TaskSpawner,
{
Self::with_additional_tasks(client, chain_spec, tasks, 0)
}
/// Creates a new instance for the given [ChainSpec]
///
/// By default this will enable support for:
/// - shanghai
/// - eip1559
/// - eip2930
///
/// This will always spawn a validation task that performs the actual validation. It will spawn
/// `num_additional_tasks` additional tasks.
pub fn with_additional_tasks<T>(
client: Client,
chain_spec: Arc<ChainSpec>,
tasks: T,
num_additional_tasks: usize,
) -> Self
where
T: TaskSpawner,
{
EthTransactionValidatorBuilder::new(chain_spec)
.with_additional_tasks(num_additional_tasks)
.build(client, tasks)
}
/// Returns the configured chain id
pub fn chain_id(&self) -> u64 {
self.inner.chain_id()
}
}
#[async_trait::async_trait]
impl<Client, Tx> TransactionValidator for EthTransactionValidator<Client, Tx>
where
Client: StateProviderFactory + Clone + 'static,
Tx: PoolTransaction + 'static,
{
type Transaction = Tx;
async fn validate_transaction(
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> TransactionValidationOutcome<Self::Transaction> {
let hash = *transaction.hash();
let (tx, rx) = oneshot::channel();
{
let to_validation_task = self.to_validation_task.clone();
let to_validation_task = to_validation_task.lock().await;
let validator = Arc::clone(&self.inner);
let res = to_validation_task
.send(Box::pin(async move {
let res = validator.validate_transaction(origin, transaction).await;
let _ = tx.send(res);
}))
.await;
if res.is_err() {
return TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
)
}
}
match rx.await {
Ok(res) => res,
Err(_) => TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
),
}
}
}
/// A builder for [EthTransactionValidator]
/// A builder for [TransactionValidationTaskExecutor]
#[derive(Debug, Clone)]
pub struct EthTransactionValidatorBuilder {
chain_spec: Arc<ChainSpec>,
@ -241,7 +141,7 @@ impl EthTransactionValidatorBuilder {
self
}
/// Builds a [EthTransactionValidator]
/// Builds a [TransactionValidationTaskExecutor]
///
/// The validator will spawn `additional_tasks` additional tasks for validation.
///
@ -250,7 +150,7 @@ impl EthTransactionValidatorBuilder {
self,
client: Client,
tasks: T,
) -> EthTransactionValidator<Client, Tx>
) -> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>>
where
T: TaskSpawner,
{
@ -300,13 +200,16 @@ impl EthTransactionValidatorBuilder {
let to_validation_task = Arc::new(Mutex::new(tx));
EthTransactionValidator { inner: Arc::new(inner), to_validation_task }
TransactionValidationTaskExecutor {
validator: EthTransactionValidator { inner: Arc::new(inner) },
to_validation_task,
}
}
}
/// A [TransactionValidator] implementation that validates ethereum transaction.
#[derive(Debug, Clone)]
struct EthTransactionValidatorInner<Client, T> {
pub struct EthTransactionValidatorInner<Client, T> {
/// Spec of the chain
chain_spec: Arc<ChainSpec>,
/// This type fetches account info from the db
@ -335,7 +238,7 @@ struct EthTransactionValidatorInner<Client, T> {
impl<Client, Tx> EthTransactionValidatorInner<Client, Tx> {
/// Returns the configured chain id
fn chain_id(&self) -> u64 {
pub fn chain_id(&self) -> u64 {
self.chain_spec.chain().id()
}
}

View File

@ -19,7 +19,7 @@ mod task;
pub use eth::{EthTransactionValidator, EthTransactionValidatorBuilder};
/// A spawnable task that performs transaction validation.
pub use task::ValidationTask;
pub use task::{TransactionValidationTaskExecutor, ValidationTask};
/// Validation constants.
pub use constants::{MAX_CODE_SIZE, MAX_INIT_CODE_SIZE, TX_MAX_SIZE, TX_SLOT_SIZE};
@ -150,7 +150,7 @@ pub trait TransactionValidator: Send + Sync {
/// example nonce or balance changes. Hence, any validation checks must be applied in this
/// function.
///
/// See [EthTransactionValidator] for a reference implementation.
/// See [TransactionValidationTaskExecutor] for a reference implementation.
async fn validate_transaction(
&self,
origin: TransactionOrigin,

View File

@ -1,9 +1,19 @@
//! A validation service for transactions.
use crate::validate::TransactionValidatorError;
use crate::{
validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
TransactionValidator,
};
use futures_util::{lock::Mutex, StreamExt};
use reth_primitives::ChainSpec;
use reth_provider::StateProviderFactory;
use reth_tasks::TaskSpawner;
use std::{future::Future, pin::Pin, sync::Arc};
use tokio::sync::mpsc;
use tokio::{
sync,
sync::{mpsc, oneshot},
};
use tokio_stream::wrappers::ReceiverStream;
/// A service that performs validation jobs.
@ -60,3 +70,119 @@ impl ValidationJobSender {
self.tx.send(job).await.map_err(|_| TransactionValidatorError::ValidationServiceUnreachable)
}
}
/// A [TransactionValidator] implementation that validates ethereum transaction.
///
/// This validator is non-blocking, all validation work is done in a separate task.
#[derive(Debug, Clone)]
pub struct TransactionValidationTaskExecutor<V> {
/// The validator that will validate transactions on a separate task.
pub validator: V,
/// The sender half to validation tasks that perform the actual validation.
pub to_validation_task: Arc<sync::Mutex<ValidationJobSender>>,
}
// === impl TransactionValidationTaskExecutor ===
impl TransactionValidationTaskExecutor<()> {
/// Convenience method to create a [EthTransactionValidatorBuilder]
pub fn eth_builder(chain_spec: Arc<ChainSpec>) -> EthTransactionValidatorBuilder {
EthTransactionValidatorBuilder::new(chain_spec)
}
}
impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>> {
/// Creates a new instance for the given [ChainSpec]
///
/// This will spawn a single validation tasks that performs the actual validation.
/// See [TransactionValidationTaskExecutor::eth_with_additional_tasks]
pub fn eth<T>(client: Client, chain_spec: Arc<ChainSpec>, tasks: T) -> Self
where
T: TaskSpawner,
{
Self::eth_with_additional_tasks(client, chain_spec, tasks, 0)
}
/// Creates a new instance for the given [ChainSpec]
///
/// By default this will enable support for:
/// - shanghai
/// - eip1559
/// - eip2930
///
/// This will always spawn a validation task that performs the actual validation. It will spawn
/// `num_additional_tasks` additional tasks.
pub fn eth_with_additional_tasks<T>(
client: Client,
chain_spec: Arc<ChainSpec>,
tasks: T,
num_additional_tasks: usize,
) -> Self
where
T: TaskSpawner,
{
EthTransactionValidatorBuilder::new(chain_spec)
.with_additional_tasks(num_additional_tasks)
.build::<Client, Tx, T>(client, tasks)
}
/// Returns the configured chain id
pub fn chain_id(&self) -> u64 {
self.validator.inner.chain_id()
}
}
impl<V: TransactionValidator + Clone> TransactionValidationTaskExecutor<V> {
/// Creates a new executor instance with the given validator for transaction validation.
///
/// Initializes the executor with the provided validator and sets up communication for
/// validation tasks.
pub fn new(validator: V) -> Self {
let (tx, _) = ValidationTask::new();
Self { validator, to_validation_task: Arc::new(sync::Mutex::new(tx)) }
}
}
#[async_trait::async_trait]
impl<Client, Tx> TransactionValidator
for TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>>
where
Client: StateProviderFactory + Clone + 'static,
Tx: PoolTransaction + Clone + 'static,
{
type Transaction = Tx;
async fn validate_transaction(
&self,
origin: TransactionOrigin,
transaction: Self::Transaction,
) -> TransactionValidationOutcome<Self::Transaction> {
let hash = *transaction.hash();
let (tx, rx) = oneshot::channel();
{
let to_validation_task = self.to_validation_task.clone();
let to_validation_task = to_validation_task.lock().await;
let validator = Arc::clone(&self.validator.inner);
let res = to_validation_task
.send(Box::pin(async move {
let res = validator.validate_transaction(origin, transaction).await;
let _ = tx.send(res);
}))
.await;
if res.is_err() {
return TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
)
}
}
match rx.await {
Ok(res) => res,
Err(_) => TransactionValidationOutcome::Error(
hash,
Box::new(TransactionValidatorError::ValidationServiceUnreachable),
),
}
}
}

View File

@ -57,8 +57,8 @@ async fn main() -> eyre::Result<()> {
/// A transaction validator that determines all transactions to be valid.
///
/// An actual validator impl like
/// [EthTransactionValidator](reth_transaction_pool::EthTransactionValidator) would require up to
/// date db access.
/// [TransactionValidationTaskExecutor](reth_transaction_pool::TransactionValidationTaskExecutor)
/// would require up to date db access.
///
/// CAUTION: This validator is not safe to use since it doesn't actually validate the transaction's
/// properties such as chain id, balance, nonce, etc.