feat(op, txpool): add additional update routine for conditionals (#14497)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Federico Gimenez
2025-02-15 09:30:09 +01:00
committed by GitHub
parent 9c1988b5cc
commit e4c8e479cf
7 changed files with 106 additions and 11 deletions

View File

@ -107,7 +107,10 @@ impl OpNode {
self.args;
ComponentsBuilder::default()
.node_types::<Node>()
.pool(OpPoolBuilder::default())
.pool(
OpPoolBuilder::default()
.with_enable_tx_conditional(self.args.enable_tx_conditional),
)
.payload(
OpPayloadBuilder::new(compute_pending_block).with_da_config(self.da_config.clone()),
)
@ -433,20 +436,33 @@ where
pub struct OpPoolBuilder<T = crate::txpool::OpPooledTransaction> {
/// Enforced overrides that are applied to the pool config.
pub pool_config_overrides: PoolBuilderConfigOverrides,
/// Enable transaction conditionals.
pub enable_tx_conditional: bool,
/// Marker for the pooled transaction type.
_pd: core::marker::PhantomData<T>,
}
impl<T> Default for OpPoolBuilder<T> {
fn default() -> Self {
Self { pool_config_overrides: Default::default(), _pd: Default::default() }
Self {
pool_config_overrides: Default::default(),
enable_tx_conditional: false,
_pd: Default::default(),
}
}
}
impl<T> OpPoolBuilder<T> {
fn with_enable_tx_conditional(mut self, enable_tx_conditional: bool) -> Self {
self.enable_tx_conditional = enable_tx_conditional;
self
}
}
impl<Node, T> PoolBuilder<Node> for OpPoolBuilder<T>
where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: OpHardforks>>,
T: EthPoolTransaction<Consensus = TxTy<Node::Types>>,
T: EthPoolTransaction<Consensus = TxTy<Node::Types>> + MaybeConditionalTransaction,
{
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore, T>;
@ -481,7 +497,7 @@ where
info!(target: "reth::cli", "Transaction pool initialized");
let transactions_path = data_dir.txpool_transactions();
// spawn txpool maintenance task
// spawn txpool maintenance tasks
{
let pool = transaction_pool.clone();
let chain_events = ctx.provider().canonical_state_stream();
@ -500,18 +516,31 @@ where
},
);
// spawn the maintenance task
// spawn the main maintenance task
ctx.task_executor().spawn_critical(
"txpool maintenance task",
reth_transaction_pool::maintain::maintain_transaction_pool_future(
client,
pool,
pool.clone(),
chain_events,
ctx.task_executor().clone(),
Default::default(),
),
);
debug!(target: "reth::cli", "Spawned txpool maintenance task");
if self.enable_tx_conditional {
// spawn the Op txpool maintenance task
let chain_events = ctx.provider().canonical_state_stream();
ctx.task_executor().spawn_critical(
"Op txpool maintenance task",
reth_optimism_txpool::maintain::maintain_transaction_pool_future(
pool,
chain_events,
),
);
debug!(target: "reth::cli", "Spawned Op txpool maintenance task");
}
}
Ok(transaction_pool)

View File

@ -21,6 +21,7 @@ alloy-rpc-types-eth.workspace = true
# reth
reth-chainspec.workspace = true
reth-primitives-traits.workspace = true
reth-chain-state.workspace = true
reth-storage-api.workspace = true
reth-transaction-pool.workspace = true
@ -37,6 +38,7 @@ reth-optimism-primitives = { workspace = true, features = ["reth-codec"] }
# misc
c-kzg.workspace = true
derive_more.workspace = true
futures-util.workspace = true
parking_lot.workspace = true
[dev-dependencies]

View File

@ -7,6 +7,9 @@ pub trait MaybeConditionalTransaction {
/// Attach a [`TransactionConditional`].
fn set_conditional(&mut self, conditional: TransactionConditional);
/// Get attached [`TransactionConditional`] if any.
fn conditional(&self) -> Option<&TransactionConditional>;
/// Helper that sets the conditional and returns the instance again
fn with_conditional(mut self, conditional: TransactionConditional) -> Self
where

View File

@ -16,6 +16,7 @@ pub use validator::{OpL1BlockInfo, OpTransactionValidator};
pub mod conditional;
mod transaction;
pub use transaction::OpPooledTransaction;
pub mod maintain;
use reth_transaction_pool::{CoinbaseTipOrdering, Pool, TransactionValidationTaskExecutor};

View File

@ -0,0 +1,59 @@
//! Support for maintaining the state of the transaction pool
use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader};
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_primitives_traits::NodePrimitives;
use reth_transaction_pool::TransactionPool;
use crate::conditional::MaybeConditionalTransaction;
/// Returns a spawnable future for maintaining the state of the transaction pool.
pub fn maintain_transaction_pool_future<N, Pool, St>(
pool: Pool,
events: St,
) -> BoxFuture<'static, ()>
where
N: NodePrimitives,
Pool: TransactionPool + 'static,
Pool::Transaction: MaybeConditionalTransaction,
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
{
async move {
maintain_transaction_pool(pool, events).await;
}
.boxed()
}
/// Maintains the state of the transaction pool by handling new blocks and reorgs.
///
/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly
pub async fn maintain_transaction_pool<N, Pool, St>(pool: Pool, mut events: St)
where
N: NodePrimitives,
Pool: TransactionPool,
Pool::Transaction: MaybeConditionalTransaction,
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
{
loop {
let Some(event) = events.next().await else { break };
if let CanonStateNotification::Commit { new } = event {
if new.is_empty() {
continue;
}
let block_attr = BlockConditionalAttributes {
number: new.tip().number(),
timestamp: new.tip().timestamp(),
};
let mut to_remove = Vec::new();
for tx in &pool.pooled_transactions() {
if let Some(conditional) = tx.transaction.conditional() {
if conditional.has_exceeded_block_attributes(&block_attr) {
to_remove.push(*tx.hash());
}
}
}
let _ = pool.remove_transactions(to_remove);
}
}
}

View File

@ -60,17 +60,16 @@ impl<Cons: SignedTransaction, Pooled> OpPooledTransaction<Cons, Pooled> {
self.conditional = Some(Box::new(conditional));
self
}
/// Conditional getter.
pub fn conditional(&self) -> Option<&TransactionConditional> {
self.conditional.as_deref()
}
}
impl<Cons, Pooled> MaybeConditionalTransaction for OpPooledTransaction<Cons, Pooled> {
fn set_conditional(&mut self, conditional: TransactionConditional) {
self.conditional = Some(Box::new(conditional))
}
fn conditional(&self) -> Option<&TransactionConditional> {
self.conditional.as_deref()
}
}
impl<Cons, Pooled> PoolTransaction for OpPooledTransaction<Cons, Pooled>