feat(transaction-pool): chaining & static txs for best transactions trait (#12320)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Seva Zhidkov
2024-11-08 11:45:27 +00:00
committed by GitHub
parent 9f6f63d45a
commit 02d2593b2e
12 changed files with 555 additions and 42 deletions

4
Cargo.lock generated
View File

@ -8235,10 +8235,13 @@ dependencies = [
name = "reth-optimism-node"
version = "1.1.1"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-genesis",
"alloy-network",
"alloy-primitives",
"alloy-rpc-types-engine",
"alloy-signer-local",
"clap",
"eyre",
"futures",
@ -8261,6 +8264,7 @@ dependencies = [
"reth-optimism-consensus",
"reth-optimism-evm",
"reth-optimism-forks",
"reth-optimism-node",
"reth-optimism-payload-builder",
"reth-optimism-rpc",
"reth-payload-builder",

View File

@ -55,17 +55,23 @@ parking_lot.workspace = true
# rpc
serde_json.workspace = true
# test-utils dependencies
reth = { workspace = true, optional = true }
reth-e2e-test-utils = { workspace = true, optional = true }
alloy-genesis = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
[dev-dependencies]
reth.workspace = true
reth-optimism-node = { workspace = true, features = ["test-utils"] }
reth-db.workspace = true
reth-e2e-test-utils.workspace = true
reth-node-builder = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
reth-revm = { workspace = true, features = ["test-utils"] }
tokio.workspace = true
alloy-primitives.workspace = true
alloy-genesis.workspace = true
op-alloy-consensus.workspace = true
alloy-signer-local.workspace = true
alloy-network.workspace = true
alloy-consensus.workspace = true
futures.workspace = true
[features]
@ -79,15 +85,21 @@ optimism = [
"reth-optimism-rpc/optimism",
"reth-engine-local/optimism",
"reth-optimism-consensus/optimism",
"reth-db/optimism"
"reth-db/optimism",
"reth-optimism-node/optimism"
]
asm-keccak = [
"reth-primitives/asm-keccak",
"reth/asm-keccak",
"alloy-primitives/asm-keccak",
"revm/asm-keccak"
"revm/asm-keccak",
"reth-optimism-node/asm-keccak"
]
test-utils = [
"reth",
"reth-e2e-test-utils",
"alloy-genesis",
"tokio",
"reth-node-builder/test-utils",
"reth-chainspec/test-utils",
"reth-consensus/test-utils",
@ -100,5 +112,6 @@ test-utils = [
"reth-provider/test-utils",
"reth-transaction-pool/test-utils",
"reth-trie-db/test-utils",
"revm/test-utils"
"revm/test-utils",
"reth-optimism-node/test-utils"
]

View File

@ -22,6 +22,10 @@ pub use node::OpNode;
pub mod txpool;
/// Helpers for running test node instances.
#[cfg(feature = "test-utils")]
pub mod utils;
pub use reth_optimism_payload_builder::{
OpBuiltPayload, OpPayloadBuilder, OpPayloadBuilderAttributes,
};

View File

@ -1,3 +1,4 @@
use crate::{node::OpAddOns, OpBuiltPayload, OpNode as OtherOpNode, OpPayloadBuilderAttributes};
use alloy_genesis::Genesis;
use alloy_primitives::{Address, B256};
use reth::{rpc::types::engine::PayloadAttributes, tasks::TaskManager};
@ -5,9 +6,6 @@ use reth_e2e_test_utils::{
transaction::TransactionTestContext, wallet::Wallet, Adapter, NodeHelperType,
};
use reth_optimism_chainspec::OpChainSpecBuilder;
use reth_optimism_node::{
node::OpAddOns, OpBuiltPayload, OpNode as OtherOpNode, OpPayloadBuilderAttributes,
};
use reth_payload_builder::EthPayloadBuilderAttributes;
use std::sync::Arc;
use tokio::sync::Mutex;
@ -15,8 +13,10 @@ use tokio::sync::Mutex;
/// Optimism Node Helper type
pub(crate) type OpNode = NodeHelperType<OtherOpNode, OpAddOns<Adapter<OtherOpNode>>>;
pub(crate) async fn setup(num_nodes: usize) -> eyre::Result<(Vec<OpNode>, TaskManager, Wallet)> {
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
/// Creates the initial setup with `num_nodes` of the node config, started and connected.
pub async fn setup(num_nodes: usize) -> eyre::Result<(Vec<OpNode>, TaskManager, Wallet)> {
let genesis: Genesis =
serde_json::from_str(include_str!("../tests/assets/genesis.json")).unwrap();
reth_e2e_test_utils::setup(
num_nodes,
Arc::new(OpChainSpecBuilder::base_mainnet().genesis(genesis).ecotone_activated().build()),
@ -27,7 +27,7 @@ pub(crate) async fn setup(num_nodes: usize) -> eyre::Result<(Vec<OpNode>, TaskMa
}
/// Advance the chain with sequential payloads returning them in the end.
pub(crate) async fn advance_chain(
pub async fn advance_chain(
length: usize,
node: &mut OpNode,
wallet: Arc<Mutex<Wallet>>,
@ -49,7 +49,7 @@ pub(crate) async fn advance_chain(
}
/// Helper function to create a new eth payload attributes
pub(crate) fn optimism_payload_attributes(timestamp: u64) -> OpPayloadBuilderAttributes {
pub fn optimism_payload_attributes(timestamp: u64) -> OpPayloadBuilderAttributes {
let attributes = PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,

View File

@ -3,7 +3,4 @@
#[cfg(feature = "optimism")]
mod p2p;
#[cfg(feature = "optimism")]
mod utils;
const fn main() {}

View File

@ -1,7 +1,7 @@
use crate::utils::{advance_chain, setup};
use alloy_rpc_types_engine::PayloadStatusEnum;
use futures::StreamExt;
use reth::blockchain_tree::error::BlockchainTreeError;
use reth_optimism_node::utils::{advance_chain, setup};
use std::sync::Arc;
use tokio::sync::Mutex;

View File

@ -3,4 +3,7 @@
#[cfg(feature = "optimism")]
mod builder;
#[cfg(feature = "optimism")]
mod priority;
const fn main() {}

View File

@ -0,0 +1,190 @@
//! Node builder test that customizes priority of transactions in the block.
use alloy_consensus::TxEip1559;
use alloy_genesis::Genesis;
use alloy_network::TxSignerSync;
use alloy_primitives::{Address, ChainId, TxKind};
use reth::{args::DatadirArgs, tasks::TaskManager};
use reth_chainspec::EthChainSpec;
use reth_db::test_utils::create_test_rw_db_with_path;
use reth_e2e_test_utils::{
node::NodeTestContext, transaction::TransactionTestContext, wallet::Wallet,
};
use reth_node_api::{FullNodeTypes, NodeTypesWithEngine};
use reth_node_builder::{
components::ComponentsBuilder, EngineNodeLauncher, NodeBuilder, NodeConfig,
};
use reth_optimism_chainspec::{OpChainSpec, OpChainSpecBuilder};
use reth_optimism_node::{
args::RollupArgs,
node::{
OpAddOns, OpConsensusBuilder, OpExecutorBuilder, OpNetworkBuilder, OpPayloadBuilder,
OpPoolBuilder,
},
utils::optimism_payload_attributes,
OpEngineTypes, OpNode,
};
use reth_optimism_payload_builder::builder::OpPayloadTransactions;
use reth_primitives::{SealedBlock, Transaction, TransactionSigned, TransactionSignedEcRecovered};
use reth_provider::providers::BlockchainProvider2;
use reth_transaction_pool::{
pool::{BestPayloadTransactions, PayloadTransactionsChain, PayloadTransactionsFixed},
PayloadTransactions,
};
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone, Debug)]
struct CustomTxPriority {
chain_id: ChainId,
}
impl OpPayloadTransactions for CustomTxPriority {
fn best_transactions<Pool>(
&self,
pool: Pool,
attr: reth_transaction_pool::BestTransactionsAttributes,
) -> impl PayloadTransactions
where
Pool: reth_transaction_pool::TransactionPool,
{
// Block composition:
// 1. Best transactions from the pool (up to 250k gas)
// 2. End-of-block transaction created by the node (up to 100k gas)
// End of block transaction should send a 0-value transfer to a random address.
let sender = Wallet::default().inner;
let mut end_of_block_tx = TxEip1559 {
chain_id: self.chain_id,
nonce: 1, // it will be 2nd tx after L1 block info tx that uses the same sender
gas_limit: 21000,
max_fee_per_gas: 20e9 as u128,
to: TxKind::Call(Address::random()),
value: 0.try_into().unwrap(),
..Default::default()
};
let signature = sender.sign_transaction_sync(&mut end_of_block_tx).unwrap();
let end_of_block_tx = TransactionSignedEcRecovered::from_signed_transaction(
TransactionSigned::from_transaction_and_signature(
Transaction::Eip1559(end_of_block_tx),
signature,
),
sender.address(),
);
PayloadTransactionsChain::new(
BestPayloadTransactions::new(pool.best_transactions_with_attributes(attr)),
// Allow 250k gas for the transactions from the pool
Some(250_000),
PayloadTransactionsFixed::single(end_of_block_tx),
// Allow 100k gas for the end-of-block transaction
Some(100_000),
)
}
}
/// Builds the node with custom transaction priority service within default payload builder.
fn build_components<Node>(
chain_id: ChainId,
) -> ComponentsBuilder<
Node,
OpPoolBuilder,
OpPayloadBuilder<CustomTxPriority>,
OpNetworkBuilder,
OpExecutorBuilder,
OpConsensusBuilder,
>
where
Node:
FullNodeTypes<Types: NodeTypesWithEngine<Engine = OpEngineTypes, ChainSpec = OpChainSpec>>,
{
let RollupArgs { disable_txpool_gossip, compute_pending_block, discovery_v4, .. } =
RollupArgs::default();
ComponentsBuilder::default()
.node_types::<Node>()
.pool(OpPoolBuilder::default())
.payload(
OpPayloadBuilder::new(compute_pending_block)
.with_transactions(CustomTxPriority { chain_id }),
)
.network(OpNetworkBuilder { disable_txpool_gossip, disable_discovery_v4: !discovery_v4 })
.executor(OpExecutorBuilder::default())
.consensus(OpConsensusBuilder::default())
}
#[tokio::test]
async fn test_custom_block_priority_config() {
reth_tracing::init_test_tracing();
let genesis: Genesis = serde_json::from_str(include_str!("../assets/genesis.json")).unwrap();
let chain_spec =
Arc::new(OpChainSpecBuilder::base_mainnet().genesis(genesis).ecotone_activated().build());
// This wallet is going to send:
// 1. L1 block info tx
// 2. End-of-block custom tx
let wallet = Arc::new(Mutex::new(Wallet::default().with_chain_id(chain_spec.chain().into())));
// Configure and launch the node.
let config = NodeConfig::new(chain_spec).with_datadir_args(DatadirArgs {
datadir: reth_db::test_utils::tempdir_path().into(),
..Default::default()
});
let db = create_test_rw_db_with_path(
config
.datadir
.datadir
.unwrap_or_chain_default(config.chain.chain(), config.datadir.clone())
.db(),
);
let tasks = TaskManager::current();
let node_handle = NodeBuilder::new(config.clone())
.with_database(db)
.with_types_and_provider::<OpNode, BlockchainProvider2<_>>()
.with_components(build_components(config.chain.chain_id()))
.with_add_ons(OpAddOns::default())
.launch_with_fn(|builder| {
let launcher = EngineNodeLauncher::new(
tasks.executor(),
builder.config.datadir(),
Default::default(),
);
builder.launch_with(launcher)
})
.await
.expect("Failed to launch node");
// Advance the chain with a single block.
let block_payloads = NodeTestContext::new(node_handle.node, optimism_payload_attributes)
.await
.unwrap()
.advance(1, |_| {
let wallet = wallet.clone();
Box::pin(async move {
let mut wallet = wallet.lock().await;
let tx_fut = TransactionTestContext::optimism_l1_block_info_tx(
wallet.chain_id,
wallet.inner.clone(),
// This doesn't matter in the current test (because it's only one block),
// but make sure you're not reusing the nonce from end-of-block tx
// if they have the same signer.
wallet.inner_nonce * 2,
);
wallet.inner_nonce += 1;
tx_fut.await
})
})
.await
.unwrap();
assert_eq!(block_payloads.len(), 1);
let (block_payload, _) = block_payloads.first().unwrap();
let block_payload: SealedBlock = block_payload.block().clone();
assert_eq!(block_payload.body.transactions.len(), 2); // L1 block info tx + end-of-block custom tx
// Check that last transaction in the block looks like a transfer to a random address.
let end_of_block_tx = block_payload.body.transactions.last().unwrap();
let end_of_block_tx = end_of_block_tx.transaction.as_eip1559().unwrap();
assert_eq!(end_of_block_tx.nonce, 1);
assert_eq!(end_of_block_tx.gas_limit, 21_000);
assert!(end_of_block_tx.input.is_empty());
}

View File

@ -2,7 +2,7 @@
use std::{fmt::Display, sync::Arc};
use alloy_consensus::EMPTY_OMMER_ROOT_HASH;
use alloy_consensus::{Transaction, EMPTY_OMMER_ROOT_HASH};
use alloy_eips::merge::BEACON_NONCE;
use alloy_primitives::{Address, Bytes, U256};
use alloy_rpc_types_engine::PayloadId;
@ -23,8 +23,7 @@ use reth_primitives::{
use reth_provider::{ProviderError, StateProviderFactory, StateRootProvider};
use reth_revm::database::StateProviderDatabase;
use reth_transaction_pool::{
noop::NoopTransactionPool, BestTransactions, BestTransactionsAttributes, BestTransactionsFor,
TransactionPool,
noop::NoopTransactionPool, BestTransactionsAttributes, PayloadTransactions, TransactionPool,
};
use reth_trie::HashedPostState;
use revm::{
@ -39,6 +38,7 @@ use crate::{
payload::{OpBuiltPayload, OpPayloadBuilderAttributes},
};
use op_alloy_consensus::DepositTransaction;
use reth_transaction_pool::pool::BestPayloadTransactions;
/// Optimism's payload builder
#[derive(Debug, Clone, PartialEq, Eq)]
@ -390,7 +390,7 @@ where
}
}
/// A type that returns a the [`BestTransactions`] that should be included in the pool.
/// A type that returns a the [`PayloadTransactions`] that should be included in the pool.
pub trait OpPayloadTransactions: Clone + Send + Sync + Unpin + 'static {
/// Returns an iterator that yields the transaction in the order they should get included in the
/// new payload.
@ -398,7 +398,7 @@ pub trait OpPayloadTransactions: Clone + Send + Sync + Unpin + 'static {
&self,
pool: Pool,
attr: BestTransactionsAttributes,
) -> BestTransactionsFor<Pool>;
) -> impl PayloadTransactions;
}
impl OpPayloadTransactions for () {
@ -406,8 +406,8 @@ impl OpPayloadTransactions for () {
&self,
pool: Pool,
attr: BestTransactionsAttributes,
) -> BestTransactionsFor<Pool> {
pool.best_transactions_with_attributes(attr)
) -> impl PayloadTransactions {
BestPayloadTransactions::new(pool.best_transactions_with_attributes(attr))
}
}
@ -730,7 +730,7 @@ where
&self,
info: &mut ExecutionInfo,
db: &mut State<DB>,
mut best_txs: BestTransactionsFor<Pool>,
mut best_txs: impl PayloadTransactions,
) -> Result<Option<BuildOutcomeKind<OpBuiltPayload>>, PayloadBuilderError>
where
DB: Database<Error = ProviderError>,
@ -746,19 +746,19 @@ where
);
let mut evm = self.evm_config.evm_with_env(&mut *db, env);
while let Some(pool_tx) = best_txs.next() {
while let Some(tx) = best_txs.next(()) {
// ensure we still have capacity for this transaction
if info.cumulative_gas_used + pool_tx.gas_limit() > block_gas_limit {
if info.cumulative_gas_used + tx.gas_limit() > block_gas_limit {
// we can't fit this transaction into the block, so we need to mark it as
// invalid which also removes all dependent transaction from
// the iterator before we can continue
best_txs.mark_invalid(&pool_tx);
best_txs.mark_invalid(tx.signer(), tx.nonce());
continue
}
// A sequencer's block should never contain blob or deposit transactions from the pool.
if pool_tx.is_eip4844() || pool_tx.tx_type() == TxType::Deposit as u8 {
best_txs.mark_invalid(&pool_tx);
if tx.is_eip4844() || tx.tx_type() == TxType::Deposit as u8 {
best_txs.mark_invalid(tx.signer(), tx.nonce());
continue
}
@ -767,9 +767,6 @@ where
return Ok(Some(BuildOutcomeKind::Cancelled))
}
// convert tx to a signed transaction
let tx = pool_tx.to_recovered_transaction();
// Configure the environment for the tx.
*evm.tx_mut() = self.evm_config.tx_env(tx.as_signed(), tx.signer());
@ -785,7 +782,7 @@ where
// if the transaction is invalid, we can skip it and all of its
// descendants
trace!(target: "payload_builder", %err, ?tx, "skipping invalid transaction and its descendants");
best_txs.mark_invalid(&pool_tx);
best_txs.mark_invalid(tx.signer(), tx.nonce());
}
continue
@ -819,7 +816,7 @@ where
// update add to total fees
let miner_fee = tx
.effective_tip_per_gas(Some(base_fee))
.effective_tip_per_gas(base_fee)
.expect("fee is always valid; execution succeeded");
info.total_fees += U256::from(miner_fee) * U256::from(gas_used);

View File

@ -1,10 +1,12 @@
use crate::{
identifier::{SenderId, TransactionId},
pool::pending::PendingTransaction,
PoolTransaction, TransactionOrdering, ValidPoolTransaction,
PayloadTransactions, PoolTransaction, TransactionOrdering, ValidPoolTransaction,
};
use alloy_consensus::Transaction;
use alloy_primitives::Address;
use core::fmt;
use reth_primitives::TransactionSignedEcRecovered;
use std::{
collections::{BTreeMap, BTreeSet, HashSet, VecDeque},
sync::Arc,
@ -48,7 +50,7 @@ impl<T: TransactionOrdering> Iterator for BestTransactionsWithFees<T> {
fn next(&mut self) -> Option<Self::Item> {
// find the next transaction that satisfies the base fee
loop {
let best = self.best.next()?;
let best = Iterator::next(&mut self.best)?;
// If both the base fee and blob fee (if applicable for EIP-4844) are satisfied, return
// the transaction
if best.transaction.max_fee_per_gas() >= self.base_fee as u128 &&
@ -205,6 +207,49 @@ impl<T: TransactionOrdering> Iterator for BestTransactions<T> {
}
}
/// Wrapper struct that allows to convert `BestTransactions` (used in tx pool) to
/// `PayloadTransactions` (used in block composition).
#[derive(Debug)]
pub struct BestPayloadTransactions<T, I>
where
T: PoolTransaction<Consensus: Into<TransactionSignedEcRecovered>>,
I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
{
invalid: HashSet<Address>,
best: I,
}
impl<T, I> BestPayloadTransactions<T, I>
where
T: PoolTransaction<Consensus: Into<TransactionSignedEcRecovered>>,
I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
{
/// Create a new `BestPayloadTransactions` with the given iterator.
pub fn new(best: I) -> Self {
Self { invalid: Default::default(), best }
}
}
impl<T, I> PayloadTransactions for BestPayloadTransactions<T, I>
where
T: PoolTransaction<Consensus: Into<TransactionSignedEcRecovered>>,
I: Iterator<Item = Arc<ValidPoolTransaction<T>>>,
{
fn next(&mut self, _ctx: ()) -> Option<TransactionSignedEcRecovered> {
loop {
let tx = self.best.next()?;
if self.invalid.contains(&tx.sender()) {
continue
}
return Some(tx.to_recovered_transaction())
}
}
fn mark_invalid(&mut self, sender: Address, _nonce: u64) {
self.invalid.insert(sender);
}
}
/// A [`BestTransactions`](crate::traits::BestTransactions) implementation that filters the
/// transactions of iter with predicate.
///
@ -350,6 +395,130 @@ where
}
}
/// An implementation of [`crate::traits::PayloadTransactions`] that yields
/// a pre-defined set of transactions.
///
/// This is useful to put a sequencer-specified set of transactions into the block
/// and compose it with the rest of the transactions.
#[derive(Debug)]
pub struct PayloadTransactionsFixed<T> {
transactions: Vec<T>,
index: usize,
}
impl<T> PayloadTransactionsFixed<T> {
/// Constructs a new [`PayloadTransactionsFixed`].
pub fn new(transactions: Vec<T>) -> Self {
Self { transactions, index: Default::default() }
}
/// Constructs a new [`PayloadTransactionsFixed`] with a single transaction.
pub fn single(transaction: T) -> Self {
Self { transactions: vec![transaction], index: Default::default() }
}
}
impl PayloadTransactions for PayloadTransactionsFixed<TransactionSignedEcRecovered> {
fn next(&mut self, _ctx: ()) -> Option<TransactionSignedEcRecovered> {
(self.index < self.transactions.len()).then(|| {
let tx = self.transactions[self.index].clone();
self.index += 1;
tx
})
}
fn mark_invalid(&mut self, _sender: Address, _nonce: u64) {}
}
/// Wrapper over [`crate::traits::PayloadTransactions`] that combines transactions from multiple
/// `PayloadTransactions` iterators and keeps track of the gas for both of iterators.
///
/// We can't use [`Iterator::chain`], because:
/// (a) we need to propagate the `mark_invalid` and `no_updates`
/// (b) we need to keep track of the gas
///
/// Notes that [`PayloadTransactionsChain`] fully drains the first iterator
/// before moving to the second one.
///
/// If the `before` iterator has transactions that are not fitting into the block,
/// the after iterator will get propagated a `mark_invalid` call for each of them.
#[derive(Debug)]
pub struct PayloadTransactionsChain<B: PayloadTransactions, A: PayloadTransactions> {
/// Iterator that will be used first
before: B,
/// Allowed gas for the transactions from `before` iterator. If `None`, no gas limit is
/// enforced.
before_max_gas: Option<u64>,
/// Gas used by the transactions from `before` iterator
before_gas: u64,
/// Iterator that will be used after `before` iterator
after: A,
/// Allowed gas for the transactions from `after` iterator. If `None`, no gas limit is
/// enforced.
after_max_gas: Option<u64>,
/// Gas used by the transactions from `after` iterator
after_gas: u64,
}
impl<B: PayloadTransactions, A: PayloadTransactions> PayloadTransactionsChain<B, A> {
/// Constructs a new [`PayloadTransactionsChain`].
pub fn new(
before: B,
before_max_gas: Option<u64>,
after: A,
after_max_gas: Option<u64>,
) -> Self {
Self {
before,
before_max_gas,
before_gas: Default::default(),
after,
after_max_gas,
after_gas: Default::default(),
}
}
}
impl<B, A> PayloadTransactions for PayloadTransactionsChain<B, A>
where
B: PayloadTransactions,
A: PayloadTransactions,
{
fn next(&mut self, ctx: ()) -> Option<TransactionSignedEcRecovered> {
while let Some(tx) = self.before.next(ctx) {
if let Some(before_max_gas) = self.before_max_gas {
if self.before_gas + tx.transaction.gas_limit() <= before_max_gas {
self.before_gas += tx.transaction.gas_limit();
return Some(tx);
}
self.before.mark_invalid(tx.signer(), tx.transaction.nonce());
self.after.mark_invalid(tx.signer(), tx.transaction.nonce());
} else {
return Some(tx);
}
}
while let Some(tx) = self.after.next(ctx) {
if let Some(after_max_gas) = self.after_max_gas {
if self.after_gas + tx.transaction.gas_limit() <= after_max_gas {
self.after_gas += tx.transaction.gas_limit();
return Some(tx);
}
self.after.mark_invalid(tx.signer(), tx.transaction.nonce());
} else {
return Some(tx);
}
}
None
}
fn mark_invalid(&mut self, sender: Address, nonce: u64) {
self.before.mark_invalid(sender, nonce);
self.after.mark_invalid(sender, nonce);
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -428,9 +597,9 @@ mod tests {
dyn crate::traits::BestTransactions<Item = Arc<ValidPoolTransaction<MockTransaction>>>,
> = Box::new(pool.best());
let tx = best.next().unwrap();
best.mark_invalid(&tx);
assert!(best.next().is_none());
let tx = Iterator::next(&mut best).unwrap();
crate::traits::BestTransactions::mark_invalid(&mut *best, &tx);
assert!(Iterator::next(&mut best).is_none());
}
#[test]
@ -737,4 +906,119 @@ mod tests {
assert_eq!(tx.nonce() % 2, 0);
}
}
#[test]
fn test_best_transactions_prioritized_senders() {
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
// Add 5 plain transactions from different senders with increasing gas price
for gas_price in 0..5 {
let tx = MockTransaction::eip1559().with_gas_price(gas_price);
let valid_tx = f.validated(tx);
pool.add_transaction(Arc::new(valid_tx), 0);
}
// Add another transaction with 0 gas price that's going to be prioritized by sender
let prioritized_tx = MockTransaction::eip1559().with_gas_price(0);
let valid_prioritized_tx = f.validated(prioritized_tx.clone());
pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
let prioritized_senders = HashSet::from([prioritized_tx.sender()]);
let best =
BestTransactionsWithPrioritizedSenders::new(prioritized_senders, 200, pool.best());
// Verify that the prioritized transaction is returned first
// and the rest are returned in the reverse order of gas price
let mut iter = best.into_iter();
let top_of_block_tx = iter.next().unwrap();
assert_eq!(top_of_block_tx.max_fee_per_gas(), 0);
assert_eq!(top_of_block_tx.sender(), prioritized_tx.sender());
for gas_price in (0..5).rev() {
assert_eq!(iter.next().unwrap().max_fee_per_gas(), gas_price);
}
// TODO: Test that gas limits for prioritized transactions are respected
}
#[test]
fn test_best_transactions_chained_iterators() {
let mut priority_pool = PendingPool::new(MockOrdering::default());
let mut pool = PendingPool::new(MockOrdering::default());
let mut f = MockTransactionFactory::default();
// Block composition
// ===
// (1) up to 100 gas: custom top-of-block transaction
// (2) up to 100 gas: transactions from the priority pool
// (3) up to 200 gas: only transactions from address A
// (4) up to 200 gas: only transactions from address B
// (5) until block gas limit: all transactions from the main pool
// Notes:
// - If prioritized addresses overlap, a single transaction will be prioritized twice and
// therefore use the per-segment gas limit twice.
// - Priority pool and main pool must synchronize between each other to make sure there are
// no conflicts for the same nonce. For example, in this scenario, pools can't reject
// transactions with seemingly incorrect nonces, because previous transactions might be in
// the other pool.
let address_top_of_block = Address::random();
let address_in_priority_pool = Address::random();
let address_a = Address::random();
let address_b = Address::random();
let address_regular = Address::random();
// Add transactions to the main pool
{
let prioritized_tx_a =
MockTransaction::eip1559().with_gas_price(5).with_sender(address_a);
// without our custom logic, B would be prioritized over A due to gas price:
let prioritized_tx_b =
MockTransaction::eip1559().with_gas_price(10).with_sender(address_b);
let regular_tx =
MockTransaction::eip1559().with_gas_price(15).with_sender(address_regular);
pool.add_transaction(Arc::new(f.validated(prioritized_tx_a)), 0);
pool.add_transaction(Arc::new(f.validated(prioritized_tx_b)), 0);
pool.add_transaction(Arc::new(f.validated(regular_tx)), 0);
}
// Add transactions to the priority pool
{
let prioritized_tx =
MockTransaction::eip1559().with_gas_price(0).with_sender(address_in_priority_pool);
let valid_prioritized_tx = f.validated(prioritized_tx);
priority_pool.add_transaction(Arc::new(valid_prioritized_tx), 0);
}
let mut block = PayloadTransactionsChain::new(
PayloadTransactionsFixed::single(
MockTransaction::eip1559().with_sender(address_top_of_block).into(),
),
Some(100),
PayloadTransactionsChain::new(
BestPayloadTransactions::new(priority_pool.best()),
Some(100),
BestPayloadTransactions::new(BestTransactionsWithPrioritizedSenders::new(
HashSet::from([address_a]),
200,
BestTransactionsWithPrioritizedSenders::new(
HashSet::from([address_b]),
200,
pool.best(),
),
)),
None,
),
None,
);
assert_eq!(block.next(()).unwrap().signer(), address_top_of_block);
assert_eq!(block.next(()).unwrap().signer(), address_in_priority_pool);
assert_eq!(block.next(()).unwrap().signer(), address_a);
assert_eq!(block.next(()).unwrap().signer(), address_b);
assert_eq!(block.next(()).unwrap().signer(), address_regular);
}
// TODO: Same nonce test
}

View File

@ -106,7 +106,10 @@ use crate::{
traits::{GetPooledTransactionLimit, NewBlobSidecar, TransactionListenerKind},
validate::ValidTransaction,
};
pub use best::{BestTransactionFilter, BestTransactionsWithPrioritizedSenders};
pub use best::{
BestPayloadTransactions, BestTransactionFilter, BestTransactionsWithPrioritizedSenders,
PayloadTransactionsChain, PayloadTransactionsFixed,
};
pub use blob::{blob_tx_priority, fee_delta};
pub use events::{FullTransactionEvent, TransactionEvent};
pub use listener::{AllTransactionsEvents, TransactionEvents};

View File

@ -1501,6 +1501,24 @@ impl<Tx: PoolTransaction> Stream for NewSubpoolTransactionStream<Tx> {
}
}
/// Iterator that returns transactions for the block building process in the order they should be
/// included in the block.
///
/// Can include transactions from the pool and other sources (alternative pools,
/// sequencer-originated transactions, etc.).
pub trait PayloadTransactions {
/// Returns the next transaction to include in the block.
fn next(
&mut self,
// In the future, `ctx` can include access to state for block building purposes.
ctx: (),
) -> Option<TransactionSignedEcRecovered>;
/// Exclude descendants of the transaction with given sender and nonce from the iterator,
/// because this transaction won't be included in the block.
fn mark_invalid(&mut self, sender: Address, nonce: u64);
}
#[cfg(test)]
mod tests {
use super::*;