feat: expose pool transaction in PayloadTransactions (#14249)

Co-authored-by: Hamdi Allam <hamdi.allam97@gmail.com>
This commit is contained in:
Arsenii Kulikov
2025-02-06 05:16:20 +04:00
committed by GitHub
parent c1a305ca5c
commit 14a51b5292
14 changed files with 212 additions and 202 deletions

View File

@ -64,6 +64,7 @@ exclude_crates=(
reth-stages-api # reth-provider, reth-prune
reth-static-file # tokio
reth-transaction-pool # c-kzg
reth-payload-util # reth-transaction-pool
reth-trie-parallel # tokio
reth-testing-utils
)

3
Cargo.lock generated
View File

@ -8548,7 +8548,7 @@ version = "1.1.5"
dependencies = [
"alloy-consensus",
"alloy-primitives",
"reth-primitives",
"reth-transaction-pool",
]
[[package]]
@ -9330,7 +9330,6 @@ dependencies = [
"reth-execution-types",
"reth-fs-util",
"reth-metrics",
"reth-payload-util",
"reth-primitives",
"reth-primitives-traits",
"reth-provider",

View File

@ -473,16 +473,10 @@ impl OpPayloadBuilder {
}
}
impl<Txs> OpPayloadBuilder<Txs>
where
Txs: OpPayloadTransactions,
{
impl<Txs> OpPayloadBuilder<Txs> {
/// Configures the type responsible for yielding the transactions that should be included in the
/// payload.
pub fn with_transactions<T: OpPayloadTransactions>(
self,
best_transactions: T,
) -> OpPayloadBuilder<T> {
pub fn with_transactions<T>(self, best_transactions: T) -> OpPayloadBuilder<T> {
let Self { compute_pending_block, da_config, .. } = self;
OpPayloadBuilder { compute_pending_block, best_transactions, da_config }
}
@ -506,7 +500,7 @@ where
+ Unpin
+ 'static,
Evm: ConfigureEvmFor<PrimitivesTy<Node::Types>>,
Txs: OpPayloadTransactions<TxTy<Node::Types>>,
Txs: OpPayloadTransactions<Pool::Transaction>,
{
let payload_builder = reth_optimism_payload_builder::OpPayloadBuilder::with_builder_config(
pool,
@ -551,7 +545,7 @@ where
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>>
+ Unpin
+ 'static,
Txs: OpPayloadTransactions<TxTy<Node::Types>>,
Txs: OpPayloadTransactions<Pool::Transaction>,
{
async fn spawn_payload_service(
self,

View File

@ -1,6 +1,6 @@
//! Node builder test that customizes priority of transactions in the block.
use alloy_consensus::TxEip1559;
use alloy_consensus::{SignableTransaction, TxEip1559};
use alloy_genesis::Genesis;
use alloy_network::TxSignerSync;
use alloy_primitives::{Address, ChainId, TxKind};
@ -22,16 +22,19 @@ use reth_optimism_node::{
OpAddOns, OpConsensusBuilder, OpExecutorBuilder, OpNetworkBuilder, OpPayloadBuilder,
OpPoolBuilder,
},
txpool::OpPooledTransaction,
utils::optimism_payload_attributes,
OpEngineTypes, OpNode,
};
use reth_optimism_payload_builder::builder::OpPayloadTransactions;
use reth_optimism_primitives::{OpPrimitives, OpTransactionSigned};
use reth_payload_util::{PayloadTransactions, PayloadTransactionsChain, PayloadTransactionsFixed};
use reth_optimism_primitives::OpPrimitives;
use reth_payload_util::{
BestPayloadTransactions, PayloadTransactions, PayloadTransactionsChain,
PayloadTransactionsFixed,
};
use reth_primitives::Recovered;
use reth_provider::providers::BlockchainProvider;
use reth_tasks::TaskManager;
use reth_transaction_pool::{pool::BestPayloadTransactions, PoolTransaction};
use std::sync::Arc;
use tokio::sync::Mutex;
@ -40,16 +43,14 @@ struct CustomTxPriority {
chain_id: ChainId,
}
impl OpPayloadTransactions for CustomTxPriority {
impl OpPayloadTransactions<OpPooledTransaction> for CustomTxPriority {
fn best_transactions<Pool>(
&self,
pool: Pool,
attr: reth_transaction_pool::BestTransactionsAttributes,
) -> impl PayloadTransactions<Transaction = OpTransactionSigned>
) -> impl PayloadTransactions<Transaction = OpPooledTransaction>
where
Pool: reth_transaction_pool::TransactionPool<
Transaction: PoolTransaction<Consensus = OpTransactionSigned>,
>,
Pool: reth_transaction_pool::TransactionPool<Transaction = OpPooledTransaction>,
{
// Block composition:
// 1. Best transactions from the pool (up to 250k gas)
@ -68,12 +69,12 @@ impl OpPayloadTransactions for CustomTxPriority {
};
let signature = sender.sign_transaction_sync(&mut end_of_block_tx).unwrap();
let end_of_block_tx = Recovered::new_unchecked(
OpTransactionSigned::new_unhashed(
OpTypedTransaction::Eip1559(end_of_block_tx),
signature,
op_alloy_consensus::OpPooledTransaction::Eip1559(
end_of_block_tx.into_signed(signature),
),
sender.address(),
);
)
.into();
PayloadTransactionsChain::new(
BestPayloadTransactions::new(pool.best_transactions_with_attributes(attr)),

View File

@ -30,11 +30,11 @@ use reth_optimism_consensus::calculate_receipt_root_no_memo_optimism;
use reth_optimism_evm::{OpReceiptBuilder, ReceiptBuilderCtx};
use reth_optimism_forks::OpHardforks;
use reth_optimism_primitives::{
transaction::signed::OpTransaction, OpTransactionSigned, ADDRESS_L2_TO_L1_MESSAGE_PASSER,
transaction::signed::OpTransaction, ADDRESS_L2_TO_L1_MESSAGE_PASSER,
};
use reth_payload_builder_primitives::PayloadBuilderError;
use reth_payload_primitives::PayloadBuilderAttributes;
use reth_payload_util::{NoopPayloadTransactions, PayloadTransactions};
use reth_payload_util::{BestPayloadTransactions, NoopPayloadTransactions, PayloadTransactions};
use reth_primitives::{
transaction::SignedTransactionIntoRecoveredExt, BlockBody, NodePrimitives, SealedHeader,
};
@ -46,9 +46,7 @@ use reth_provider::{
use reth_revm::{
cancelled::CancelOnDrop, database::StateProviderDatabase, witness::ExecutionWitnessRecord,
};
use reth_transaction_pool::{
pool::BestPayloadTransactions, BestTransactionsAttributes, PoolTransaction, TransactionPool,
};
use reth_transaction_pool::{BestTransactionsAttributes, PoolTransaction, TransactionPool};
use revm::{
db::{states::bundle_state::BundleRetention, State},
primitives::{ExecutionResult, ResultAndState},
@ -122,7 +120,7 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives, Txs>
/// Configures the type responsible for yielding the transactions that should be included in the
/// payload.
pub fn with_transactions<T: OpPayloadTransactions>(
pub fn with_transactions<T>(
self,
best_transactions: T,
) -> OpPayloadBuilder<Pool, Client, EvmConfig, N, T> {
@ -150,8 +148,10 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives, Txs>
self.compute_pending_block
}
}
impl<Pool, Client, EvmConfig, N, T> OpPayloadBuilder<Pool, Client, EvmConfig, N, T>
where
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = N::SignedTx>>,
Client: StateProviderFactory + ChainSpecProvider<ChainSpec = OpChainSpec>,
N: OpPayloadPrimitives,
EvmConfig: ConfigureEvmFor<N>,
@ -170,7 +170,7 @@ where
best: impl FnOnce(BestTransactionsAttributes) -> Txs + Send + Sync + 'a,
) -> Result<BuildOutcome<OpBuiltPayload<N>>, PayloadBuilderError>
where
Txs: PayloadTransactions<Transaction = N::SignedTx>,
Txs: PayloadTransactions<Transaction: PoolTransaction<Consensus = N::SignedTx>>,
{
let evm_env = self
.evm_env(&args.config.attributes, &args.config.parent_header)
@ -251,7 +251,7 @@ where
let state = StateProviderDatabase::new(state_provider);
let mut state = State::builder().with_database(state).with_bundle_update().build();
let builder = OpBuilder::new(|_| NoopPayloadTransactions::default());
let builder = OpBuilder::new(|_| NoopPayloadTransactions::<Pool::Transaction>::default());
builder.witness(&mut state, &ctx)
}
}
@ -264,7 +264,7 @@ where
N: OpPayloadPrimitives,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = N::SignedTx>>,
EvmConfig: ConfigureEvmFor<N>,
Txs: OpPayloadTransactions<N::SignedTx>,
Txs: OpPayloadTransactions<Pool::Transaction>,
{
type Attributes = OpPayloadBuilderAttributes<N::SignedTx>;
type BuiltPayload = OpBuiltPayload<N>;
@ -298,7 +298,7 @@ where
cancel: Default::default(),
best_payload: None,
};
self.build_payload(args, |_| NoopPayloadTransactions::default())?
self.build_payload(args, |_| NoopPayloadTransactions::<Pool::Transaction>::default())?
.into_payload()
.ok_or_else(|| PayloadBuilderError::MissingPayload)
}
@ -332,10 +332,7 @@ impl<'a, Txs> OpBuilder<'a, Txs> {
}
}
impl<Txs> OpBuilder<'_, Txs>
where
Txs: PayloadTransactions,
{
impl<Txs> OpBuilder<'_, Txs> {
/// Executes the payload and returns the outcome.
pub fn execute<EvmConfig, N, DB, P>(
self,
@ -344,7 +341,7 @@ where
) -> Result<BuildOutcomeKind<ExecutedPayload<N>>, PayloadBuilderError>
where
N: OpPayloadPrimitives,
Txs: PayloadTransactions<Transaction = N::SignedTx>,
Txs: PayloadTransactions<Transaction: PoolTransaction<Consensus = N::SignedTx>>,
EvmConfig: ConfigureEvmFor<N>,
DB: Database<Error = ProviderError> + AsRef<P>,
P: StorageRootProvider,
@ -408,7 +405,7 @@ where
where
EvmConfig: ConfigureEvmFor<N>,
N: OpPayloadPrimitives,
Txs: PayloadTransactions<Transaction = N::SignedTx>,
Txs: PayloadTransactions<Transaction: PoolTransaction<Consensus = N::SignedTx>>,
DB: Database<Error = ProviderError> + AsRef<P>,
P: StateRootProvider + HashedPostStateProvider + StorageRootProvider,
{
@ -533,7 +530,7 @@ where
where
EvmConfig: ConfigureEvmFor<N>,
N: OpPayloadPrimitives,
Txs: PayloadTransactions<Transaction = N::SignedTx>,
Txs: PayloadTransactions<Transaction: PoolTransaction<Consensus = N::SignedTx>>,
DB: Database<Error = ProviderError> + AsRef<P>,
P: StateProofProvider + StorageRootProvider,
{
@ -546,22 +543,18 @@ where
}
/// A type that returns a the [`PayloadTransactions`] that should be included in the pool.
pub trait OpPayloadTransactions<Transaction = OpTransactionSigned>:
Clone + Send + Sync + Unpin + 'static
{
pub trait OpPayloadTransactions<Transaction>: Clone + Send + Sync + Unpin + 'static {
/// Returns an iterator that yields the transaction in the order they should get included in the
/// new payload.
fn best_transactions<
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = Transaction>>,
>(
fn best_transactions<Pool: TransactionPool<Transaction = Transaction>>(
&self,
pool: Pool,
attr: BestTransactionsAttributes,
) -> impl PayloadTransactions<Transaction = Transaction>;
}
impl<T> OpPayloadTransactions<T> for () {
fn best_transactions<Pool: TransactionPool<Transaction: PoolTransaction<Consensus = T>>>(
impl<T: PoolTransaction> OpPayloadTransactions<T> for () {
fn best_transactions<Pool: TransactionPool<Transaction = T>>(
&self,
pool: Pool,
attr: BestTransactionsAttributes,
@ -948,7 +941,9 @@ where
&self,
info: &mut ExecutionInfo<N>,
db: &mut State<DB>,
mut best_txs: impl PayloadTransactions<Transaction = EvmConfig::Transaction>,
mut best_txs: impl PayloadTransactions<
Transaction: PoolTransaction<Consensus = EvmConfig::Transaction>,
>,
) -> Result<Option<()>, PayloadBuilderError>
where
DB: Database<Error = ProviderError>,
@ -961,6 +956,7 @@ where
let mut evm = self.evm_config.evm_with_env(&mut *db, self.evm_env.clone());
while let Some(tx) = best_txs.next(()) {
let tx = tx.into_consensus();
if info.is_tx_over_limits(tx.tx(), block_gas_limit, tx_da_limit, block_da_limit) {
// we can't fit this transaction into the block, so we need to mark it as
// invalid which also removes all dependent transaction from

View File

@ -6,6 +6,7 @@ use jsonrpsee_core::{async_trait, RpcResult};
use op_alloy_rpc_types_engine::OpPayloadAttributes;
use reth_chainspec::ChainSpecProvider;
use reth_evm::ConfigureEvmFor;
use reth_node_api::NodePrimitives;
use reth_optimism_chainspec::OpChainSpec;
use reth_optimism_payload_builder::{OpPayloadBuilder, OpPayloadPrimitives};
use reth_primitives::SealedHeader;
@ -15,6 +16,7 @@ use reth_provider::{
pub use reth_rpc_api::DebugExecutionWitnessApiServer;
use reth_rpc_server_types::{result::internal_rpc_err, ToRpcResult};
use reth_tasks::TaskSpawner;
use reth_transaction_pool::{PoolTransaction, TransactionPool};
use std::{fmt::Debug, sync::Arc};
use tokio::sync::{oneshot, Semaphore};
@ -55,7 +57,11 @@ where
impl<Pool, Provider, EvmConfig> DebugExecutionWitnessApiServer<OpPayloadAttributes>
for OpDebugWitnessApi<Pool, Provider, EvmConfig>
where
Pool: Send + Sync + 'static,
Pool: TransactionPool<
Transaction: PoolTransaction<
Consensus = <Provider::Primitives as NodePrimitives>::SignedTx,
>,
> + 'static,
Provider: BlockReaderIdExt<Header = reth_primitives::Header>
+ NodePrimitivesProvider<Primitives: OpPayloadPrimitives>
+ StateProviderFactory

View File

@ -13,7 +13,7 @@ workspace = true
[dependencies]
# reth
reth-primitives.workspace = true
reth-transaction-pool.workspace = true
# alloy
alloy-primitives.workspace = true

View File

@ -11,5 +11,5 @@
mod traits;
mod transaction;
pub use traits::{NoopPayloadTransactions, PayloadTransactions};
pub use traits::{BestPayloadTransactions, NoopPayloadTransactions, PayloadTransactions};
pub use transaction::{PayloadTransactionsChain, PayloadTransactionsFixed};

View File

@ -1,5 +1,7 @@
use alloy_primitives::Address;
use reth_primitives::Recovered;
use std::sync::Arc;
use alloy_primitives::{map::HashSet, Address};
use reth_transaction_pool::{PoolTransaction, ValidPoolTransaction};
/// Iterator that returns transactions for the block building process in the order they should be
/// included in the block.
@ -15,7 +17,7 @@ pub trait PayloadTransactions {
&mut self,
// In the future, `ctx` can include access to state for block building purposes.
ctx: (),
) -> Option<Recovered<Self::Transaction>>;
) -> Option<Self::Transaction>;
/// Exclude descendants of the transaction with given sender and nonce from the iterator,
/// because this transaction won't be included in the block.
@ -35,9 +37,149 @@ impl<T> Default for NoopPayloadTransactions<T> {
impl<T> PayloadTransactions for NoopPayloadTransactions<T> {
type Transaction = T;
fn next(&mut self, _ctx: ()) -> Option<Recovered<Self::Transaction>> {
fn next(&mut self, _ctx: ()) -> Option<Self::Transaction> {
None
}
fn mark_invalid(&mut self, _sender: Address, _nonce: u64) {}
}
/// Wrapper struct that allows to convert `BestTransactions` (used in tx pool) to
/// `PayloadTransactions` (used in block composition).
#[derive(Debug)]
pub struct BestPayloadTransactions<T, I>
where
T: PoolTransaction,
I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
{
invalid: HashSet<Address>,
best: I,
}
impl<T, I> BestPayloadTransactions<T, I>
where
T: PoolTransaction,
I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
{
/// Create a new `BestPayloadTransactions` with the given iterator.
pub fn new(best: I) -> Self {
Self { invalid: Default::default(), best }
}
}
impl<T, I> PayloadTransactions for BestPayloadTransactions<T, I>
where
T: PoolTransaction,
I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
{
type Transaction = T;
fn next(&mut self, _ctx: ()) -> Option<Self::Transaction> {
loop {
let tx = self.best.next()?;
if self.invalid.contains(&tx.sender()) {
continue
}
return Some(tx.transaction.clone())
}
}
fn mark_invalid(&mut self, sender: Address, _nonce: u64) {
self.invalid.insert(sender);
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::{
BestPayloadTransactions, PayloadTransactions, PayloadTransactionsChain,
PayloadTransactionsFixed,
};
use alloy_primitives::{map::HashSet, Address};
use reth_transaction_pool::{
pool::{BestTransactionsWithPrioritizedSenders, PendingPool},
test_utils::{MockOrdering, MockTransaction, MockTransactionFactory},
PoolTransaction,
};
#[test]
fn test_best_transactions_chained_iterators() {
let mut priority_pool = PendingPool::new(MockOrdering::default());
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
// Block composition
// ===
// (1) up to 100 gas: custom top-of-block transaction
// (2) up to 100 gas: transactions from the priority pool
// (3) up to 200 gas: only transactions from address A
// (4) up to 200 gas: only transactions from address B
// (5) until block gas limit: all transactions from the main pool
// Notes:
// - If prioritized addresses overlap, a single transaction will be prioritized twice and
// therefore use the per-segment gas limit twice.
// - Priority pool and main pool must synchronize between each other to make sure there are
// no conflicts for the same nonce. For example, in this scenario, pools can't reject
// transactions with seemingly incorrect nonces, because previous transactions might be in
// the other pool.
let address_top_of_block = Address::random();
let address_in_priority_pool = Address::random();
let address_a = Address::random();
let address_b = Address::random();
let address_regular = Address::random();
// Add transactions to the main pool
{
let prioritized_tx_a =
MockTransaction::eip1559().with_gas_price(5).with_sender(address_a);
// without our custom logic, B would be prioritized over A due to gas price:
let prioritized_tx_b =
MockTransaction::eip1559().with_gas_price(10).with_sender(address_b);
let regular_tx =
MockTransaction::eip1559().with_gas_price(15).with_sender(address_regular);
pool.add_transaction(Arc::new(f.validated(prioritized_tx_a)), 0);
pool.add_transaction(Arc::new(f.validated(prioritized_tx_b)), 0);
pool.add_transaction(Arc::new(f.validated(regular_tx)), 0);
}
// Add transactions to the priority pool
{
let prioritized_tx =
MockTransaction::eip1559().with_gas_price(0).with_sender(address_in_priority_pool);
let valid_prioritized_tx = f.validated(prioritized_tx);
priority_pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
}
let mut block = PayloadTransactionsChain::new(
PayloadTransactionsFixed::single(
MockTransaction::eip1559().with_sender(address_top_of_block),
),
Some(100),
PayloadTransactionsChain::new(
BestPayloadTransactions::new(priority_pool.best()),
Some(100),
BestPayloadTransactions::new(BestTransactionsWithPrioritizedSenders::new(
HashSet::from([address_a]),
200,
BestTransactionsWithPrioritizedSenders::new(
HashSet::from([address_b]),
200,
pool.best(),
),
)),
None,
),
None,
);
assert_eq!(block.next(()).unwrap().sender(), address_top_of_block);
assert_eq!(block.next(()).unwrap().sender(), address_in_priority_pool);
assert_eq!(block.next(()).unwrap().sender(), address_a);
assert_eq!(block.next(()).unwrap().sender(), address_b);
assert_eq!(block.next(()).unwrap().sender(), address_regular);
}
}

View File

@ -1,7 +1,7 @@
use crate::PayloadTransactions;
use alloy_consensus::Transaction;
use alloy_primitives::Address;
use reth_primitives::Recovered;
use reth_transaction_pool::PoolTransaction;
/// An implementation of [`crate::traits::PayloadTransactions`] that yields
/// a pre-defined set of transactions.
@ -26,10 +26,10 @@ impl<T> PayloadTransactionsFixed<T> {
}
}
impl<T: Clone> PayloadTransactions for PayloadTransactionsFixed<Recovered<T>> {
impl<T: Clone> PayloadTransactions for PayloadTransactionsFixed<T> {
type Transaction = T;
fn next(&mut self, _ctx: ()) -> Option<Recovered<T>> {
fn next(&mut self, _ctx: ()) -> Option<T> {
(self.index < self.transactions.len()).then(|| {
let tx = self.transactions[self.index].clone();
self.index += 1;
@ -91,20 +91,20 @@ impl<B: PayloadTransactions, A: PayloadTransactions> PayloadTransactionsChain<B,
impl<A, B> PayloadTransactions for PayloadTransactionsChain<A, B>
where
A: PayloadTransactions<Transaction: Transaction>,
A: PayloadTransactions<Transaction: PoolTransaction>,
B: PayloadTransactions<Transaction = A::Transaction>,
{
type Transaction = A::Transaction;
fn next(&mut self, ctx: ()) -> Option<Recovered<Self::Transaction>> {
fn next(&mut self, ctx: ()) -> Option<Self::Transaction> {
while let Some(tx) = self.before.next(ctx) {
if let Some(before_max_gas) = self.before_max_gas {
if self.before_gas + tx.tx().gas_limit() <= before_max_gas {
self.before_gas += tx.tx().gas_limit();
if self.before_gas + tx.gas_limit() <= before_max_gas {
self.before_gas += tx.gas_limit();
return Some(tx);
}
self.before.mark_invalid(tx.signer(), tx.tx().nonce());
self.after.mark_invalid(tx.signer(), tx.tx().nonce());
self.before.mark_invalid(tx.sender(), tx.nonce());
self.after.mark_invalid(tx.sender(), tx.nonce());
} else {
return Some(tx);
}
@ -112,11 +112,11 @@ where
while let Some(tx) = self.after.next(ctx) {
if let Some(after_max_gas) = self.after_max_gas {
if self.after_gas + tx.tx().gas_limit() <= after_max_gas {
self.after_gas += tx.tx().gas_limit();
if self.after_gas + tx.gas_limit() <= after_max_gas {
self.after_gas += tx.gas_limit();
return Some(tx);
}
self.after.mark_invalid(tx.signer(), tx.tx().nonce());
self.after.mark_invalid(tx.sender(), tx.nonce());
} else {
return Some(tx);
}

View File

@ -18,7 +18,6 @@ reth-chainspec.workspace = true
reth-eth-wire-types.workspace = true
reth-primitives = { workspace = true, features = ["c-kzg", "secp256k1"] }
reth-primitives-traits.workspace = true
reth-payload-util.workspace = true
reth-execution-types.workspace = true
reth-fs-util.workspace = true
reth-storage-api.workspace = true

View File

@ -8,8 +8,7 @@ use alloy_consensus::Transaction;
use alloy_eips::Typed2718;
use alloy_primitives::Address;
use core::fmt;
use reth_payload_util::PayloadTransactions;
use reth_primitives::{InvalidTransactionError, Recovered};
use reth_primitives::InvalidTransactionError;
use std::{
collections::{BTreeMap, BTreeSet, HashSet, VecDeque},
sync::Arc,
@ -223,51 +222,6 @@ impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
}
}
/// Wrapper struct that allows to convert `BestTransactions` (used in tx pool) to
/// `PayloadTransactions` (used in block composition).
#[derive(Debug)]
pub struct BestPayloadTransactions<T, I>
where
T: PoolTransaction,
I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
{
invalid: HashSet<Address>,
best: I,
}
impl<T, I> BestPayloadTransactions<T, I>
where
T: PoolTransaction,
I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
{
/// Create a new `BestPayloadTransactions` with the given iterator.
pub fn new(best: I) -> Self {
Self { invalid: Default::default(), best }
}
}
impl<T, I> PayloadTransactions for BestPayloadTransactions<T, I>
where
T: PoolTransaction,
I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
{
type Transaction = T::Consensus;
fn next(&mut self, _ctx: ()) -> Option<Recovered<Self::Transaction>> {
loop {
let tx = self.best.next()?;
if self.invalid.contains(&tx.sender()) {
continue
}
return Some(tx.to_consensus())
}
}
fn mark_invalid(&mut self, sender: Address, _nonce: u64) {
self.invalid.insert(sender);
}
}
/// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the
/// transactions of iter with predicate.
///
@ -425,7 +379,6 @@ mod tests {
BestTransactions, Priority,
};
use alloy_primitives::U256;
use reth_payload_util::{PayloadTransactionsChain, PayloadTransactionsFixed};
#[test]
fn test_best_iter() {
@ -846,85 +799,6 @@ mod tests {
// TODO: Test that gas limits for prioritized transactions are respected
}
#[test]
fn test_best_transactions_chained_iterators() {
let mut priority_pool = PendingPool::new(MockOrdering::default());
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
// Block composition
// ===
// (1) up to 100 gas: custom top-of-block transaction
// (2) up to 100 gas: transactions from the priority pool
// (3) up to 200 gas: only transactions from address A
// (4) up to 200 gas: only transactions from address B
// (5) until block gas limit: all transactions from the main pool
// Notes:
// - If prioritized addresses overlap, a single transaction will be prioritized twice and
// therefore use the per-segment gas limit twice.
// - Priority pool and main pool must synchronize between each other to make sure there are
// no conflicts for the same nonce. For example, in this scenario, pools can't reject
// transactions with seemingly incorrect nonces, because previous transactions might be in
// the other pool.
let address_top_of_block = Address::random();
let address_in_priority_pool = Address::random();
let address_a = Address::random();
let address_b = Address::random();
let address_regular = Address::random();
// Add transactions to the main pool
{
let prioritized_tx_a =
MockTransaction::eip1559().with_gas_price(5).with_sender(address_a);
// without our custom logic, B would be prioritized over A due to gas price:
let prioritized_tx_b =
MockTransaction::eip1559().with_gas_price(10).with_sender(address_b);
let regular_tx =
MockTransaction::eip1559().with_gas_price(15).with_sender(address_regular);
pool.add_transaction(Arc::new(f.validated(prioritized_tx_a)), 0);
pool.add_transaction(Arc::new(f.validated(prioritized_tx_b)), 0);
pool.add_transaction(Arc::new(f.validated(regular_tx)), 0);
}
// Add transactions to the priority pool
{
let prioritized_tx =
MockTransaction::eip1559().with_gas_price(0).with_sender(address_in_priority_pool);
let valid_prioritized_tx = f.validated(prioritized_tx);
priority_pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
}
let mut block = PayloadTransactionsChain::new(
PayloadTransactionsFixed::single(
MockTransaction::eip1559().with_sender(address_top_of_block).into(),
),
Some(100),
PayloadTransactionsChain::new(
BestPayloadTransactions::new(priority_pool.best()),
Some(100),
BestPayloadTransactions::new(BestTransactionsWithPrioritizedSenders::new(
HashSet::from([address_a]),
200,
BestTransactionsWithPrioritizedSenders::new(
HashSet::from([address_b]),
200,
pool.best(),
),
)),
None,
),
None,
);
assert_eq!(block.next(()).unwrap().signer(), address_top_of_block);
assert_eq!(block.next(()).unwrap().signer(), address_in_priority_pool);
assert_eq!(block.next(()).unwrap().signer(), address_a);
assert_eq!(block.next(()).unwrap().signer(), address_b);
assert_eq!(block.next(()).unwrap().signer(), address_regular);
}
#[test]
fn test_best_with_fees_iter_no_blob_fee_required() {
// Tests transactions without blob fees where base fees are checked.

View File

@ -101,9 +101,7 @@ use crate::{
traits::{GetPooledTransactionLimit, NewBlobSidecar, TransactionListenerKind},
validate::ValidTransaction,
};
pub use best::{
BestPayloadTransactions, BestTransactionFilter, BestTransactionsWithPrioritizedSenders,
};
pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
pub use blob::{blob_tx_priority, fee_delta};
pub use events::{FullTransactionEvent, TransactionEvent};
pub use listener::{AllTransactionsEvents, TransactionEvents};

View File

@ -98,7 +98,7 @@ impl<T: TransactionOrdering> PendingPool<T> {
/// provides a way to mark transactions that the consumer of this iterator considers invalid. In
/// which case the transaction's subgraph is also automatically marked invalid, See (1.).
/// Invalid transactions are skipped.
pub(crate) fn best(&self) -> BestTransactions<T> {
pub fn best(&self) -> BestTransactions<T> {
BestTransactions {
all: self.by_id.clone(),
independent: self.independent_transactions.values().cloned().collect(),