diff --git a/Cargo.lock b/Cargo.lock index d02f756a7..d6befb71c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8720,9 +8720,11 @@ dependencies = [ "alloy-rpc-types-eth", "c-kzg", "derive_more", + "futures-util", "op-alloy-consensus", "op-alloy-flz", "parking_lot", + "reth-chain-state", "reth-chainspec", "reth-optimism-chainspec", "reth-optimism-evm", diff --git a/crates/optimism/node/src/node.rs b/crates/optimism/node/src/node.rs index 83438e40d..23d020b44 100644 --- a/crates/optimism/node/src/node.rs +++ b/crates/optimism/node/src/node.rs @@ -107,7 +107,10 @@ impl OpNode { self.args; ComponentsBuilder::default() .node_types::() - .pool(OpPoolBuilder::default()) + .pool( + OpPoolBuilder::default() + .with_enable_tx_conditional(self.args.enable_tx_conditional), + ) .payload( OpPayloadBuilder::new(compute_pending_block).with_da_config(self.da_config.clone()), ) @@ -433,20 +436,33 @@ where pub struct OpPoolBuilder { /// Enforced overrides that are applied to the pool config. pub pool_config_overrides: PoolBuilderConfigOverrides, + /// Enable transaction conditionals. + pub enable_tx_conditional: bool, /// Marker for the pooled transaction type. _pd: core::marker::PhantomData, } impl Default for OpPoolBuilder { fn default() -> Self { - Self { pool_config_overrides: Default::default(), _pd: Default::default() } + Self { + pool_config_overrides: Default::default(), + enable_tx_conditional: false, + _pd: Default::default(), + } + } +} + +impl OpPoolBuilder { + fn with_enable_tx_conditional(mut self, enable_tx_conditional: bool) -> Self { + self.enable_tx_conditional = enable_tx_conditional; + self } } impl PoolBuilder for OpPoolBuilder where Node: FullNodeTypes>, - T: EthPoolTransaction>, + T: EthPoolTransaction> + MaybeConditionalTransaction, { type Pool = OpTransactionPool; @@ -481,7 +497,7 @@ where info!(target: "reth::cli", "Transaction pool initialized"); let transactions_path = data_dir.txpool_transactions(); - // spawn txpool maintenance task + // spawn txpool maintenance tasks { let pool = transaction_pool.clone(); let chain_events = ctx.provider().canonical_state_stream(); @@ -500,18 +516,31 @@ where }, ); - // spawn the maintenance task + // spawn the main maintenance task ctx.task_executor().spawn_critical( "txpool maintenance task", reth_transaction_pool::maintain::maintain_transaction_pool_future( client, - pool, + pool.clone(), chain_events, ctx.task_executor().clone(), Default::default(), ), ); debug!(target: "reth::cli", "Spawned txpool maintenance task"); + + if self.enable_tx_conditional { + // spawn the Op txpool maintenance task + let chain_events = ctx.provider().canonical_state_stream(); + ctx.task_executor().spawn_critical( + "Op txpool maintenance task", + reth_optimism_txpool::maintain::maintain_transaction_pool_future( + pool, + chain_events, + ), + ); + debug!(target: "reth::cli", "Spawned Op txpool maintenance task"); + } } Ok(transaction_pool) diff --git a/crates/optimism/txpool/Cargo.toml b/crates/optimism/txpool/Cargo.toml index 09569854f..1331377d0 100644 --- a/crates/optimism/txpool/Cargo.toml +++ b/crates/optimism/txpool/Cargo.toml @@ -21,6 +21,7 @@ alloy-rpc-types-eth.workspace = true # reth reth-chainspec.workspace = true reth-primitives-traits.workspace = true +reth-chain-state.workspace = true reth-storage-api.workspace = true reth-transaction-pool.workspace = true @@ -37,6 +38,7 @@ reth-optimism-primitives = { workspace = true, features = ["reth-codec"] } # misc c-kzg.workspace = true derive_more.workspace = true +futures-util.workspace = true parking_lot.workspace = true [dev-dependencies] diff --git a/crates/optimism/txpool/src/conditional.rs b/crates/optimism/txpool/src/conditional.rs index 7103feb8a..2a4b6b518 100644 --- a/crates/optimism/txpool/src/conditional.rs +++ b/crates/optimism/txpool/src/conditional.rs @@ -7,6 +7,9 @@ pub trait MaybeConditionalTransaction { /// Attach a [`TransactionConditional`]. fn set_conditional(&mut self, conditional: TransactionConditional); + /// Get attached [`TransactionConditional`] if any. + fn conditional(&self) -> Option<&TransactionConditional>; + /// Helper that sets the conditional and returns the instance again fn with_conditional(mut self, conditional: TransactionConditional) -> Self where diff --git a/crates/optimism/txpool/src/lib.rs b/crates/optimism/txpool/src/lib.rs index 205585254..793fa7a28 100644 --- a/crates/optimism/txpool/src/lib.rs +++ b/crates/optimism/txpool/src/lib.rs @@ -16,6 +16,7 @@ pub use validator::{OpL1BlockInfo, OpTransactionValidator}; pub mod conditional; mod transaction; pub use transaction::OpPooledTransaction; +pub mod maintain; use reth_transaction_pool::{CoinbaseTipOrdering, Pool, TransactionValidationTaskExecutor}; diff --git a/crates/optimism/txpool/src/maintain.rs b/crates/optimism/txpool/src/maintain.rs new file mode 100644 index 000000000..d47cac207 --- /dev/null +++ b/crates/optimism/txpool/src/maintain.rs @@ -0,0 +1,59 @@ +//! Support for maintaining the state of the transaction pool + +use alloy_consensus::{conditional::BlockConditionalAttributes, BlockHeader}; +use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt}; +use reth_chain_state::CanonStateNotification; +use reth_primitives_traits::NodePrimitives; +use reth_transaction_pool::TransactionPool; + +use crate::conditional::MaybeConditionalTransaction; + +/// Returns a spawnable future for maintaining the state of the transaction pool. +pub fn maintain_transaction_pool_future( + pool: Pool, + events: St, +) -> BoxFuture<'static, ()> +where + N: NodePrimitives, + Pool: TransactionPool + 'static, + Pool::Transaction: MaybeConditionalTransaction, + St: Stream> + Send + Unpin + 'static, +{ + async move { + maintain_transaction_pool(pool, events).await; + } + .boxed() +} + +/// Maintains the state of the transaction pool by handling new blocks and reorgs. +/// +/// This listens for any new blocks and reorgs and updates the transaction pool's state accordingly +pub async fn maintain_transaction_pool(pool: Pool, mut events: St) +where + N: NodePrimitives, + Pool: TransactionPool, + Pool::Transaction: MaybeConditionalTransaction, + St: Stream> + Send + Unpin + 'static, +{ + loop { + let Some(event) = events.next().await else { break }; + if let CanonStateNotification::Commit { new } = event { + if new.is_empty() { + continue; + } + let block_attr = BlockConditionalAttributes { + number: new.tip().number(), + timestamp: new.tip().timestamp(), + }; + let mut to_remove = Vec::new(); + for tx in &pool.pooled_transactions() { + if let Some(conditional) = tx.transaction.conditional() { + if conditional.has_exceeded_block_attributes(&block_attr) { + to_remove.push(*tx.hash()); + } + } + } + let _ = pool.remove_transactions(to_remove); + } + } +} diff --git a/crates/optimism/txpool/src/transaction.rs b/crates/optimism/txpool/src/transaction.rs index 55ebd0226..6446be51d 100644 --- a/crates/optimism/txpool/src/transaction.rs +++ b/crates/optimism/txpool/src/transaction.rs @@ -60,17 +60,16 @@ impl OpPooledTransaction { self.conditional = Some(Box::new(conditional)); self } - - /// Conditional getter. - pub fn conditional(&self) -> Option<&TransactionConditional> { - self.conditional.as_deref() - } } impl MaybeConditionalTransaction for OpPooledTransaction { fn set_conditional(&mut self, conditional: TransactionConditional) { self.conditional = Some(Box::new(conditional)) } + + fn conditional(&self) -> Option<&TransactionConditional> { + self.conditional.as_deref() + } } impl PoolTransaction for OpPooledTransaction