feat: integrate blobstore in validator (#4273)

This commit is contained in:
Matthias Seitz
2023-08-18 22:52:49 +02:00
committed by GitHub
parent 82a42c98a3
commit cbf3eb4333
5 changed files with 220 additions and 199 deletions

View File

@ -1,6 +1,7 @@
//! Storage for blob data of EIP4844 transactions.
use reth_primitives::{BlobTransactionSidecar, H256};
use std::fmt;
mod maintain;
mod mem;
mod noop;
@ -15,7 +16,7 @@ pub use noop::NoopBlobStore;
/// finalization).
///
/// Note: this is Clone because it is expected to be wrapped in an Arc.
pub trait BlobStore: Send + Sync + 'static {
pub trait BlobStore: fmt::Debug + Send + Sync + 'static {
/// Inserts the blob sidecar into the store
fn insert(&self, tx: H256, data: BlobTransactionSidecar) -> Result<(), BlobStoreError>;

View File

@ -105,9 +105,10 @@
//! use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool, TransactionPool};
//! use reth_transaction_pool::blobstore::InMemoryBlobStore;
//! async fn t<C>(client: C) where C: StateProviderFactory + ChainSpecProvider + Clone + 'static{
//! let blob_store = InMemoryBlobStore::default();
//! let pool = Pool::eth_pool(
//! TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), TokioTaskExecutor::default()),
//! InMemoryBlobStore::default(),
//! TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), blob_store.clone(), TokioTaskExecutor::default()),
//! blob_store,
//! Default::default(),
//! );
//! let mut transactions = pool.pending_transactions_listener();
@ -136,9 +137,10 @@
//! where C: StateProviderFactory + BlockReaderIdExt + ChainSpecProvider + Clone + 'static,
//! St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
//! {
//! let blob_store = InMemoryBlobStore::default();
//! let pool = Pool::eth_pool(
//! TransactionValidationTaskExecutor::eth(client.clone(), MAINNET.clone(), TokioTaskExecutor::default()),
//! InMemoryBlobStore::default(),
//! TransactionValidationTaskExecutor::eth(client.clone(), MAINNET.clone(), blob_store.clone(), TokioTaskExecutor::default()),
//! blob_store,
//! Default::default(),
//! );
//!
@ -296,10 +298,11 @@ where
/// use reth_tasks::TokioTaskExecutor;
/// use reth_transaction_pool::{TransactionValidationTaskExecutor, Pool};
/// use reth_transaction_pool::blobstore::InMemoryBlobStore;
/// # fn t<C>(client: C) where C: StateProviderFactory + Clone + 'static{
/// # fn t<C>(client: C) where C: StateProviderFactory + Clone + 'static {
/// let blob_store = InMemoryBlobStore::default();
/// let pool = Pool::eth_pool(
/// TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), TokioTaskExecutor::default()),
/// InMemoryBlobStore::default(),
/// TransactionValidationTaskExecutor::eth(client, MAINNET.clone(), blob_store.clone(), TokioTaskExecutor::default()),
/// blob_store,
/// Default::default(),
/// );
/// # }

View File

@ -1,6 +1,7 @@
//! Ethereum transaction validator.
use crate::{
blobstore::BlobStore,
error::InvalidPoolTransactionError,
traits::{PoolTransaction, TransactionOrigin},
validate::{ValidTransaction, ValidationTask, MAX_INIT_CODE_SIZE, TX_MAX_SIZE},
@ -22,198 +23,16 @@ pub struct EthTransactionValidator<Client, T> {
pub inner: Arc<EthTransactionValidatorInner<Client, T>>,
}
/// A builder for [TransactionValidationTaskExecutor]
#[derive(Debug, Clone)]
pub struct EthTransactionValidatorBuilder {
chain_spec: Arc<ChainSpec>,
/// Fork indicator whether we are in the Shanghai stage.
shanghai: bool,
/// Fork indicator whether we are in the Cancun hardfork.
cancun: bool,
/// Fork indicator whether we are using EIP-2718 type transactions.
eip2718: bool,
/// Fork indicator whether we are using EIP-1559 type transactions.
eip1559: bool,
/// Fork indicator whether we are using EIP-4844 blob transactions.
eip4844: bool,
/// The current max gas limit
block_gas_limit: u64,
/// Minimum priority fee to enforce for acceptance into the pool.
minimum_priority_fee: Option<u128>,
/// Determines how many additional tasks to spawn
///
/// Default is 1
additional_tasks: usize,
/// Toggle to determine if a local transaction should be propagated
propagate_local_transactions: bool,
}
impl EthTransactionValidatorBuilder {
/// Creates a new builder for the given [ChainSpec]
pub fn new(chain_spec: Arc<ChainSpec>) -> Self {
Self {
chain_spec,
shanghai: true,
eip2718: true,
eip1559: true,
block_gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
minimum_priority_fee: None,
additional_tasks: 1,
// default to true, can potentially take this as a param in the future
propagate_local_transactions: true,
// TODO: can hard enable by default once transitioned
cancun: false,
eip4844: false,
}
}
/// Disables the Cancun fork.
pub fn no_cancun(self) -> Self {
self.set_cancun(false)
}
/// Set the Cancun fork.
pub fn set_cancun(mut self, cancun: bool) -> Self {
self.cancun = cancun;
self
}
/// Disables the Shanghai fork.
pub fn no_shanghai(self) -> Self {
self.set_shanghai(false)
}
/// Set the Shanghai fork.
pub fn set_shanghai(mut self, shanghai: bool) -> Self {
self.shanghai = shanghai;
self
}
/// Disables the eip2718 support.
pub fn no_eip2718(self) -> Self {
self.set_eip2718(false)
}
/// Set eip2718 support.
pub fn set_eip2718(mut self, eip2718: bool) -> Self {
self.eip2718 = eip2718;
self
}
/// Disables the eip1559 support.
pub fn no_eip1559(self) -> Self {
self.set_eip1559(false)
}
/// Set the eip1559 support.
pub fn set_eip1559(mut self, eip1559: bool) -> Self {
self.eip1559 = eip1559;
self
}
/// Sets toggle to propagate transactions received locally by this client (e.g
/// transactions from eth_Sendtransaction to this nodes' RPC server)
///
/// If set to false, only transactions received by network peers (via
/// p2p) will be marked as propagated in the local transaction pool and returned on a
/// GetPooledTransactions p2p request
pub fn set_propagate_local_transactions(mut self, propagate_local_txs: bool) -> Self {
self.propagate_local_transactions = propagate_local_txs;
self
}
/// Disables propagating transactions recieved locally by this client
///
/// For more information, check docs for set_propagate_local_transactions
pub fn no_local_transaction_propagation(mut self) -> Self {
self.propagate_local_transactions = false;
self
}
/// Sets a minimum priority fee that's enforced for acceptance into the pool.
pub fn with_minimum_priority_fee(mut self, minimum_priority_fee: u128) -> Self {
self.minimum_priority_fee = Some(minimum_priority_fee);
self
}
/// Sets the number of additional tasks to spawn.
pub fn with_additional_tasks(mut self, additional_tasks: usize) -> Self {
self.additional_tasks = additional_tasks;
self
}
/// Builds a [TransactionValidationTaskExecutor]
///
/// The validator will spawn `additional_tasks` additional tasks for validation.
///
/// By default this will spawn 1 additional task.
pub fn build<Client, Tx, T>(
self,
client: Client,
tasks: T,
) -> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>>
where
T: TaskSpawner,
{
let Self {
chain_spec,
shanghai,
cancun,
eip2718,
eip1559,
eip4844,
block_gas_limit,
minimum_priority_fee,
additional_tasks,
propagate_local_transactions,
} = self;
let inner = EthTransactionValidatorInner {
chain_spec,
client,
shanghai,
eip2718,
eip1559,
cancun,
eip4844,
block_gas_limit,
minimum_priority_fee,
propagate_local_transactions,
_marker: Default::default(),
};
let (tx, task) = ValidationTask::new();
// Spawn validation tasks, they are blocking because they perform db lookups
for _ in 0..additional_tasks {
let task = task.clone();
tasks.spawn_blocking(Box::pin(async move {
task.run().await;
}));
}
tasks.spawn_critical_blocking(
"transaction-validation-service",
Box::pin(async move {
task.run().await;
}),
);
let to_validation_task = Arc::new(Mutex::new(tx));
TransactionValidationTaskExecutor {
validator: EthTransactionValidator { inner: Arc::new(inner) },
to_validation_task,
}
}
}
/// A [TransactionValidator] implementation that validates ethereum transaction.
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct EthTransactionValidatorInner<Client, T> {
/// Spec of the chain
chain_spec: Arc<ChainSpec>,
/// This type fetches account info from the db
client: Client,
/// Blobstore used for fetching re-injected blob transactions.
#[allow(unused)]
blob_store: Box<dyn BlobStore>,
/// Fork indicator whether we are in the Shanghai stage.
shanghai: bool,
/// Fork indicator whether we are in the Cancun hardfork.
@ -412,3 +231,192 @@ where
}
}
}
/// A builder for [TransactionValidationTaskExecutor]
#[derive(Debug, Clone)]
pub struct EthTransactionValidatorBuilder {
chain_spec: Arc<ChainSpec>,
/// Fork indicator whether we are in the Shanghai stage.
shanghai: bool,
/// Fork indicator whether we are in the Cancun hardfork.
cancun: bool,
/// Fork indicator whether we are using EIP-2718 type transactions.
eip2718: bool,
/// Fork indicator whether we are using EIP-1559 type transactions.
eip1559: bool,
/// Fork indicator whether we are using EIP-4844 blob transactions.
eip4844: bool,
/// The current max gas limit
block_gas_limit: u64,
/// Minimum priority fee to enforce for acceptance into the pool.
minimum_priority_fee: Option<u128>,
/// Determines how many additional tasks to spawn
///
/// Default is 1
additional_tasks: usize,
/// Toggle to determine if a local transaction should be propagated
propagate_local_transactions: bool,
}
impl EthTransactionValidatorBuilder {
/// Creates a new builder for the given [ChainSpec]
pub fn new(chain_spec: Arc<ChainSpec>) -> Self {
Self {
chain_spec,
shanghai: true,
eip2718: true,
eip1559: true,
block_gas_limit: ETHEREUM_BLOCK_GAS_LIMIT,
minimum_priority_fee: None,
additional_tasks: 1,
// default to true, can potentially take this as a param in the future
propagate_local_transactions: true,
// TODO: can hard enable by default once transitioned
cancun: false,
eip4844: false,
}
}
/// Disables the Cancun fork.
pub fn no_cancun(self) -> Self {
self.set_cancun(false)
}
/// Set the Cancun fork.
pub fn set_cancun(mut self, cancun: bool) -> Self {
self.cancun = cancun;
self
}
/// Disables the Shanghai fork.
pub fn no_shanghai(self) -> Self {
self.set_shanghai(false)
}
/// Set the Shanghai fork.
pub fn set_shanghai(mut self, shanghai: bool) -> Self {
self.shanghai = shanghai;
self
}
/// Disables the eip2718 support.
pub fn no_eip2718(self) -> Self {
self.set_eip2718(false)
}
/// Set eip2718 support.
pub fn set_eip2718(mut self, eip2718: bool) -> Self {
self.eip2718 = eip2718;
self
}
/// Disables the eip1559 support.
pub fn no_eip1559(self) -> Self {
self.set_eip1559(false)
}
/// Set the eip1559 support.
pub fn set_eip1559(mut self, eip1559: bool) -> Self {
self.eip1559 = eip1559;
self
}
/// Sets toggle to propagate transactions received locally by this client (e.g
/// transactions from eth_Sendtransaction to this nodes' RPC server)
///
/// If set to false, only transactions received by network peers (via
/// p2p) will be marked as propagated in the local transaction pool and returned on a
/// GetPooledTransactions p2p request
pub fn set_propagate_local_transactions(mut self, propagate_local_txs: bool) -> Self {
self.propagate_local_transactions = propagate_local_txs;
self
}
/// Disables propagating transactions recieved locally by this client
///
/// For more information, check docs for set_propagate_local_transactions
pub fn no_local_transaction_propagation(mut self) -> Self {
self.propagate_local_transactions = false;
self
}
/// Sets a minimum priority fee that's enforced for acceptance into the pool.
pub fn with_minimum_priority_fee(mut self, minimum_priority_fee: u128) -> Self {
self.minimum_priority_fee = Some(minimum_priority_fee);
self
}
/// Sets the number of additional tasks to spawn.
pub fn with_additional_tasks(mut self, additional_tasks: usize) -> Self {
self.additional_tasks = additional_tasks;
self
}
/// Builds a the [EthTransactionValidator] and spawns validation tasks via the
/// [TransactionValidationTaskExecutor]
///
/// The validator will spawn `additional_tasks` additional tasks for validation.
///
/// By default this will spawn 1 additional task.
pub fn build_with_tasks<Client, Tx, T, S>(
self,
client: Client,
tasks: T,
blob_store: S,
) -> TransactionValidationTaskExecutor<EthTransactionValidator<Client, Tx>>
where
T: TaskSpawner,
S: BlobStore,
{
let Self {
chain_spec,
shanghai,
cancun,
eip2718,
eip1559,
eip4844,
block_gas_limit,
minimum_priority_fee,
additional_tasks,
propagate_local_transactions,
} = self;
let inner = EthTransactionValidatorInner {
chain_spec,
client,
shanghai,
eip2718,
eip1559,
cancun,
eip4844,
block_gas_limit,
minimum_priority_fee,
propagate_local_transactions,
blob_store: Box::new(blob_store),
_marker: Default::default(),
};
let (tx, task) = ValidationTask::new();
// Spawn validation tasks, they are blocking because they perform db lookups
for _ in 0..additional_tasks {
let task = task.clone();
tasks.spawn_blocking(Box::pin(async move {
task.run().await;
}));
}
tasks.spawn_critical_blocking(
"transaction-validation-service",
Box::pin(async move {
task.run().await;
}),
);
let to_validation_task = Arc::new(Mutex::new(tx));
TransactionValidationTaskExecutor {
validator: EthTransactionValidator { inner: Arc::new(inner) },
to_validation_task,
}
}
}

View File

@ -1,6 +1,7 @@
//! A validation service for transactions.
use crate::{
blobstore::BlobStore,
validate::{EthTransactionValidatorBuilder, TransactionValidatorError},
EthTransactionValidator, PoolTransaction, TransactionOrigin, TransactionValidationOutcome,
TransactionValidator,
@ -96,11 +97,16 @@ impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Clien
///
/// This will spawn a single validation tasks that performs the actual validation.
/// See [TransactionValidationTaskExecutor::eth_with_additional_tasks]
pub fn eth<T>(client: Client, chain_spec: Arc<ChainSpec>, tasks: T) -> Self
pub fn eth<T, S: BlobStore>(
client: Client,
chain_spec: Arc<ChainSpec>,
blob_store: S,
tasks: T,
) -> Self
where
T: TaskSpawner,
{
Self::eth_with_additional_tasks(client, chain_spec, tasks, 0)
Self::eth_with_additional_tasks(client, chain_spec, blob_store, tasks, 0)
}
/// Creates a new instance for the given [ChainSpec]
@ -112,9 +118,10 @@ impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Clien
///
/// This will always spawn a validation task that performs the actual validation. It will spawn
/// `num_additional_tasks` additional tasks.
pub fn eth_with_additional_tasks<T>(
pub fn eth_with_additional_tasks<T, S: BlobStore>(
client: Client,
chain_spec: Arc<ChainSpec>,
blob_store: S,
tasks: T,
num_additional_tasks: usize,
) -> Self
@ -123,7 +130,7 @@ impl<Client, Tx> TransactionValidationTaskExecutor<EthTransactionValidator<Clien
{
EthTransactionValidatorBuilder::new(chain_spec)
.with_additional_tasks(num_additional_tasks)
.build::<Client, Tx, T>(client, tasks)
.build_with_tasks::<Client, Tx, T, S>(client, tasks, blob_store)
}
/// Returns the configured chain id