feat: abstract OpPooledTransaction and OpPool over consensus tx (#14256)

This commit is contained in:
Arsenii Kulikov
2025-02-06 15:26:06 +04:00
committed by GitHub
parent 5662508149
commit 823d065071
9 changed files with 123 additions and 184 deletions

View File

@ -45,8 +45,8 @@ use reth_rpc_eth_types::error::FromEvmError;
use reth_rpc_server_types::RethRpcModule;
use reth_tracing::tracing::{debug, info};
use reth_transaction_pool::{
blobstore::DiskFileBlobStore, CoinbaseTipOrdering, PoolTransaction, TransactionPool,
TransactionValidationTaskExecutor,
blobstore::DiskFileBlobStore, CoinbaseTipOrdering, EthPoolTransaction, PoolTransaction,
TransactionPool, TransactionValidationTaskExecutor,
};
use reth_trie_db::MerklePatriciaTrie;
use revm::primitives::TxEnv;
@ -354,25 +354,29 @@ where
///
/// This contains various settings that can be configured and take precedence over the node's
/// config.
#[derive(Debug, Default, Clone)]
pub struct OpPoolBuilder {
#[derive(Debug, Clone)]
pub struct OpPoolBuilder<T = crate::txpool::OpPooledTransaction> {
/// Enforced overrides that are applied to the pool config.
pub pool_config_overrides: PoolBuilderConfigOverrides,
/// Marker for the pooled transaction type.
_pd: core::marker::PhantomData<T>,
}
impl<Node> PoolBuilder<Node> for OpPoolBuilder
impl<T> Default for OpPoolBuilder<T> {
fn default() -> Self {
Self { pool_config_overrides: Default::default(), _pd: Default::default() }
}
}
impl<Node, T> PoolBuilder<Node> for OpPoolBuilder<T>
where
Node: FullNodeTypes<
Types: NodeTypes<
ChainSpec: OpHardforks,
Primitives: NodePrimitives<SignedTx = OpTransactionSigned>,
>,
>,
Node: FullNodeTypes<Types: NodeTypes<ChainSpec: OpHardforks>>,
T: EthPoolTransaction<Consensus = TxTy<Node::Types>>,
{
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore>;
type Pool = OpTransactionPool<Node::Provider, DiskFileBlobStore, T>;
async fn build_pool(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::Pool> {
let Self { pool_config_overrides } = self;
let Self { pool_config_overrides, .. } = self;
let data_dir = ctx.config().datadir();
let blob_store = DiskFileBlobStore::open(data_dir.blobstore(), Default::default())?;

View File

@ -9,10 +9,7 @@ use reth_node_api::{Block, BlockBody};
use reth_optimism_evm::RethL1BlockInfo;
use reth_optimism_forks::OpHardforks;
use reth_optimism_primitives::OpTransactionSigned;
use reth_primitives::{
transaction::TransactionConversionError, GotExpected, InvalidTransactionError, Recovered,
SealedBlock,
};
use reth_primitives::{GotExpected, InvalidTransactionError, Recovered, SealedBlock};
use reth_primitives_traits::{InMemorySize, SignedTransaction};
use reth_provider::{BlockReaderIdExt, ChainSpecProvider, StateProviderFactory};
use reth_revm::L1BlockInfo;
@ -22,15 +19,18 @@ use reth_transaction_pool::{
TransactionValidationOutcome, TransactionValidationTaskExecutor, TransactionValidator,
};
use revm::primitives::{AccessList, KzgSettings};
use std::sync::{
atomic::{AtomicU64, Ordering},
Arc, OnceLock,
use std::{
fmt::Debug,
sync::{
atomic::{AtomicU64, Ordering},
Arc, OnceLock,
},
};
/// Type alias for default optimism transaction pool
pub type OpTransactionPool<Client, S> = Pool<
TransactionValidationTaskExecutor<OpTransactionValidator<Client, OpPooledTransaction>>,
CoinbaseTipOrdering<OpPooledTransaction>,
pub type OpTransactionPool<Client, S, T = OpPooledTransaction> = Pool<
TransactionValidationTaskExecutor<OpTransactionValidator<Client, T>>,
CoinbaseTipOrdering<T>,
S,
>;
@ -40,19 +40,25 @@ pub type OpTransactionPool<Client, S> = Pool<
/// For payload building this lazily tracks values that are required during payload building:
/// - Estimated compressed size of this transaction
#[derive(Debug, Clone, derive_more::Deref)]
pub struct OpPooledTransaction {
pub struct OpPooledTransaction<
Cons = OpTransactionSigned,
Pooled = op_alloy_consensus::OpPooledTransaction,
> {
#[deref]
inner: EthPooledTransaction<OpTransactionSigned>,
inner: EthPooledTransaction<Cons>,
/// The estimated size of this transaction, lazily computed.
estimated_tx_compressed_size: OnceLock<u64>,
/// The pooled transaction type.
_pd: core::marker::PhantomData<Pooled>,
}
impl OpPooledTransaction {
impl<Cons: SignedTransaction, Pooled> OpPooledTransaction<Cons, Pooled> {
/// Create new instance of [Self].
pub fn new(transaction: Recovered<OpTransactionSigned>, encoded_length: usize) -> Self {
pub fn new(transaction: Recovered<Cons>, encoded_length: usize) -> Self {
Self {
inner: EthPooledTransaction::new(transaction, encoded_length),
estimated_tx_compressed_size: Default::default(),
_pd: core::marker::PhantomData,
}
}
@ -66,48 +72,27 @@ impl OpPooledTransaction {
}
}
impl From<Recovered<op_alloy_consensus::OpPooledTransaction>> for OpPooledTransaction {
fn from(tx: Recovered<op_alloy_consensus::OpPooledTransaction>) -> Self {
let encoded_len = tx.encode_2718_len();
let tx = tx.map_transaction(|tx| tx.into());
Self {
inner: EthPooledTransaction::new(tx, encoded_len),
estimated_tx_compressed_size: Default::default(),
}
}
}
impl TryFrom<Recovered<OpTransactionSigned>> for OpPooledTransaction {
type Error = TransactionConversionError;
fn try_from(value: Recovered<OpTransactionSigned>) -> Result<Self, Self::Error> {
let (tx, signer) = value.into_parts();
let pooled: Recovered<op_alloy_consensus::OpPooledTransaction> =
Recovered::new_unchecked(tx.try_into()?, signer);
Ok(pooled.into())
}
}
impl From<OpPooledTransaction> for Recovered<OpTransactionSigned> {
fn from(value: OpPooledTransaction) -> Self {
value.inner.transaction
}
}
impl PoolTransaction for OpPooledTransaction {
type TryFromConsensusError = <Self as TryFrom<Recovered<Self::Consensus>>>::Error;
type Consensus = OpTransactionSigned;
type Pooled = op_alloy_consensus::OpPooledTransaction;
impl<Cons, Pooled> PoolTransaction for OpPooledTransaction<Cons, Pooled>
where
Cons: SignedTransaction + From<Pooled>,
Pooled: SignedTransaction + TryFrom<Cons, Error: core::error::Error>,
{
type TryFromConsensusError = <Pooled as TryFrom<Cons>>::Error;
type Consensus = Cons;
type Pooled = Pooled;
fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
self.inner.transaction().clone()
}
fn try_consensus_into_pooled(
tx: Recovered<Self::Consensus>,
) -> Result<Recovered<Self::Pooled>, Self::TryFromConsensusError> {
let (tx, signer) = tx.into_parts();
Ok(Recovered::new_unchecked(tx.try_into()?, signer))
fn into_consensus(self) -> Recovered<Self::Consensus> {
self.inner.transaction
}
fn from_pooled(tx: Recovered<Self::Pooled>) -> Self {
let encoded_len = tx.encode_2718_len();
let tx = tx.map_transaction(|tx| tx.into());
Self::new(tx, encoded_len)
}
fn hash(&self) -> &TxHash {
@ -131,19 +116,23 @@ impl PoolTransaction for OpPooledTransaction {
}
}
impl Typed2718 for OpPooledTransaction {
impl<Cons: Typed2718, Pooled> Typed2718 for OpPooledTransaction<Cons, Pooled> {
fn ty(&self) -> u8 {
self.inner.ty()
}
}
impl InMemorySize for OpPooledTransaction {
impl<Cons: InMemorySize, Pooled> InMemorySize for OpPooledTransaction<Cons, Pooled> {
fn size(&self) -> usize {
self.inner.size()
}
}
impl alloy_consensus::Transaction for OpPooledTransaction {
impl<Cons, Pooled> alloy_consensus::Transaction for OpPooledTransaction<Cons, Pooled>
where
Cons: alloy_consensus::Transaction,
Pooled: Debug + Send + Sync + 'static,
{
fn chain_id(&self) -> Option<alloy_primitives::ChainId> {
self.inner.chain_id()
}
@ -213,7 +202,12 @@ impl alloy_consensus::Transaction for OpPooledTransaction {
}
}
impl EthPoolTransaction for OpPooledTransaction {
impl<Cons, Pooled> EthPoolTransaction for OpPooledTransaction<Cons, Pooled>
where
Cons: SignedTransaction + From<Pooled>,
Pooled: SignedTransaction + TryFrom<Cons>,
<Pooled as TryFrom<Cons>>::Error: core::error::Error,
{
fn take_blob(&mut self) -> EthBlobTransactionSidecar {
EthBlobTransactionSidecar::None
}
@ -294,7 +288,7 @@ impl<Client, Tx> OpTransactionValidator<Client, Tx> {
impl<Client, Tx> OpTransactionValidator<Client, Tx>
where
Client: ChainSpecProvider<ChainSpec: OpHardforks> + StateProviderFactory + BlockReaderIdExt,
Tx: EthPoolTransaction<Consensus = OpTransactionSigned>,
Tx: EthPoolTransaction,
{
/// Create a new [`OpTransactionValidator`].
pub fn new(inner: EthTransactionValidator<Client, Tx>) -> Self {
@ -430,7 +424,7 @@ where
impl<Client, Tx> TransactionValidator for OpTransactionValidator<Client, Tx>
where
Client: ChainSpecProvider<ChainSpec: OpHardforks> + StateProviderFactory + BlockReaderIdExt,
Tx: EthPoolTransaction<Consensus = OpTransactionSigned>,
Tx: EthPoolTransaction,
{
type Transaction = Tx;
@ -511,7 +505,7 @@ mod tests {
let signed_tx = OpTransactionSigned::new_unhashed(deposit_tx, signature);
let signed_recovered = Recovered::new_unchecked(signed_tx, signer);
let len = signed_recovered.encode_2718_len();
let pooled_tx = OpPooledTransaction::new(signed_recovered, len);
let pooled_tx: OpPooledTransaction = OpPooledTransaction::new(signed_recovered, len);
let outcome = validator.validate_one(origin, pooled_tx);
let err = match outcome {

View File

@ -35,6 +35,7 @@ use reth_payload_util::{
use reth_primitives::Recovered;
use reth_provider::providers::BlockchainProvider;
use reth_tasks::TaskManager;
use reth_transaction_pool::PoolTransaction;
use std::sync::Arc;
use tokio::sync::Mutex;
@ -68,13 +69,12 @@ impl OpPayloadTransactions<OpPooledTransaction> for CustomTxPriority {
..Default::default()
};
let signature = sender.sign_transaction_sync(&mut end_of_block_tx).unwrap();
let end_of_block_tx = Recovered::new_unchecked(
let end_of_block_tx = OpPooledTransaction::from_pooled(Recovered::new_unchecked(
op_alloy_consensus::OpPooledTransaction::Eip1559(
end_of_block_tx.into_signed(signature),
),
sender.address(),
)
.into();
));
PayloadTransactionsChain::new(
BestPayloadTransactions::new(pool.best_transactions_with_attributes(attr)),

View File

@ -162,7 +162,7 @@ where
while let Some(tx) = transactions.next() {
let signer = tx.signer();
let tx = {
let mut tx: <Eth::Pool as TransactionPool>::Transaction = tx.into();
let mut tx = <Eth::Pool as TransactionPool>::Transaction::from_pooled(tx);
if let EthBlobTransactionSidecar::Present(sidecar) = tx.take_blob() {
tx.validate_blob(&sidecar, EnvKzgSettings::Default.get()).map_err(

View File

@ -720,7 +720,7 @@ mod tests {
let tx_bytes = hex!("02f87201830655c2808505ef61f08482565f94388c818ca8b9251b393131c08a736a67ccb192978801049e39c4b5b1f580c001a01764ace353514e8abdfb92446de356b260e3c1225b73fc4c8876a6258d12a129a04f02294aa61ca7676061cd99f29275491218b4754b46a0248e5e42bc5091f507");
let tx = PooledTransaction::decode_2718(&mut &tx_bytes[..]).unwrap();
let provider = MockEthProvider::default();
let transaction: EthPooledTransaction = tx.try_into_recovered().unwrap().into();
let transaction = EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap());
let tx_to_cmp = transaction.clone();
let sender = hex!("1f9090aaE28b8a3dCeaDf281B0F12828e676c326").into();
provider.add_account(sender, ExtendedAccount::new(42, U256::MAX));

View File

@ -1,4 +1,4 @@
use crate::EthPooledTransaction;
use crate::{EthPooledTransaction, PoolTransaction};
use alloy_consensus::{SignableTransaction, TxEip1559, TxEip4844, TxLegacy};
use alloy_eips::{eip1559::MIN_PROTOCOL_BASE_FEE, eip2718::Encodable2718, eip2930::AccessList};
use alloy_primitives::{Address, Bytes, TxKind, B256, U256};
@ -101,7 +101,8 @@ impl<R: Rng> TransactionGenerator<R> {
/// Generates and returns a pooled EIP-1559 transaction with a random signer.
pub fn gen_eip1559_pooled(&mut self) -> EthPooledTransaction {
self.gen_eip1559().try_into_recovered().unwrap().try_into().unwrap()
EthPooledTransaction::try_from_consensus(self.gen_eip1559().try_into_recovered().unwrap())
.unwrap()
}
/// Generates and returns a pooled EIP-4844 transaction with a random signer.

View File

@ -29,7 +29,10 @@ use rand::{
prelude::Distribution,
};
use reth_primitives::{
transaction::{SignedTransactionIntoRecoveredExt, TryFromRecoveredTransactionError},
transaction::{
SignedTransactionIntoRecoveredExt, TransactionConversionError,
TryFromRecoveredTransactionError,
},
PooledTransaction, Recovered, Transaction, TransactionSigned, TxType,
};
use reth_primitives_traits::{InMemorySize, SignedTransaction};
@ -667,18 +670,12 @@ impl MockTransaction {
}
impl PoolTransaction for MockTransaction {
type TryFromConsensusError = TryFromRecoveredTransactionError;
type TryFromConsensusError = TransactionConversionError;
type Consensus = TransactionSigned;
type Pooled = PooledTransaction;
fn try_from_consensus(
tx: Recovered<Self::Consensus>,
) -> Result<Self, Self::TryFromConsensusError> {
tx.try_into()
}
fn into_consensus(self) -> Recovered<Self::Consensus> {
self.into()
}
@ -687,15 +684,6 @@ impl PoolTransaction for MockTransaction {
pooled.into()
}
fn try_consensus_into_pooled(
tx: Recovered<Self::Consensus>,
) -> Result<Recovered<Self::Pooled>, Self::TryFromConsensusError> {
let (tx, signer) = tx.into_parts();
Self::Pooled::try_from(tx)
.map(|tx| tx.with_signer(signer))
.map_err(|_| TryFromRecoveredTransactionError::BlobSidecarMissing)
}
fn hash(&self) -> &TxHash {
self.get_hash()
}

View File

@ -5,10 +5,7 @@ use crate::{
validate::ValidPoolTransaction,
AllTransactionsEvents,
};
use alloy_consensus::{
constants::{EIP1559_TX_TYPE_ID, EIP4844_TX_TYPE_ID, EIP7702_TX_TYPE_ID},
BlockHeader, Signed, Typed2718,
};
use alloy_consensus::{BlockHeader, Signed, Typed2718};
use alloy_eips::{
eip2718::Encodable2718,
eip2930::AccessList,
@ -21,7 +18,7 @@ use reth_eth_wire_types::HandleMempoolData;
use reth_execution_types::ChangedAccount;
use reth_primitives::{
kzg::KzgSettings,
transaction::{SignedTransactionIntoRecoveredExt, TryFromRecoveredTransactionError},
transaction::{SignedTransactionIntoRecoveredExt, TransactionConversionError},
PooledTransaction, Recovered, SealedBlock, TransactionSigned,
};
use reth_primitives_traits::{Block, InMemorySize, SignedTransaction};
@ -573,17 +570,20 @@ pub struct AllPoolTransactions<T: PoolTransaction> {
impl<T: PoolTransaction> AllPoolTransactions<T> {
/// Returns an iterator over all pending [`Recovered`] transactions.
pub fn pending_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
self.pending.iter().map(|tx| tx.transaction.clone().into())
self.pending.iter().map(|tx| tx.transaction.clone().into_consensus())
}
/// Returns an iterator over all queued [`Recovered`] transactions.
pub fn queued_recovered(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
self.queued.iter().map(|tx| tx.transaction.clone().into())
self.queued.iter().map(|tx| tx.transaction.clone().into_consensus())
}
/// Returns an iterator over all transactions, both pending and queued.
pub fn all(&self) -> impl Iterator<Item = Recovered<T::Consensus>> + '_ {
self.pending.iter().chain(self.queued.iter()).map(|tx| tx.transaction.clone().into())
self.pending
.iter()
.chain(self.queued.iter())
.map(|tx| tx.transaction.clone().into_consensus())
}
}
@ -964,16 +964,12 @@ impl BestTransactionsAttributes {
/// This distinction is necessary for the EIP-4844 blob transactions, which require an additional
/// sidecar when they are gossiped around the network. It is expected that the `Consensus` format is
/// a subset of the `Pooled` format.
///
/// The assumption is that fallible conversion from `Consensus` to `Pooled` will encapsulate
/// handling of all valid `Consensus` transactions that can't be pooled (e.g Deposit transactions or
/// blob-less EIP-4844 transactions).
pub trait PoolTransaction:
alloy_consensus::Transaction
+ InMemorySize
+ fmt::Debug
+ Send
+ Sync
+ Clone
+ TryFrom<Recovered<Self::Consensus>, Error = Self::TryFromConsensusError>
+ Into<Recovered<Self::Consensus>>
+ From<Recovered<Self::Pooled>>
alloy_consensus::Transaction + InMemorySize + fmt::Debug + Send + Sync + Clone
{
/// Associated error type for the `try_from_consensus` method.
type TryFromConsensusError: fmt::Display;
@ -982,13 +978,17 @@ pub trait PoolTransaction:
type Consensus: SignedTransaction + From<Self::Pooled>;
/// Associated type representing the recovered pooled variant of the transaction.
type Pooled: SignedTransaction;
type Pooled: TryFrom<Self::Consensus, Error = Self::TryFromConsensusError> + SignedTransaction;
/// Define a method to convert from the `Consensus` type to `Self`
///
/// Note: this _must_ fail on any transactions that cannot be pooled (e.g OP Deposit
/// transactions).
fn try_from_consensus(
tx: Recovered<Self::Consensus>,
) -> Result<Self, Self::TryFromConsensusError> {
tx.try_into()
let (tx, signer) = tx.into_parts();
Ok(Self::from_pooled(Recovered::new_unchecked(tx.try_into()?, signer)))
}
/// Clone the transaction into a consensus variant.
@ -999,25 +999,18 @@ pub trait PoolTransaction:
}
/// Define a method to convert from the `Self` type to `Consensus`
fn into_consensus(self) -> Recovered<Self::Consensus> {
self.into()
}
fn into_consensus(self) -> Recovered<Self::Consensus>;
/// Define a method to convert from the `Pooled` type to `Self`
fn from_pooled(pooled: Recovered<Self::Pooled>) -> Self {
pooled.into()
}
fn from_pooled(pooled: Recovered<Self::Pooled>) -> Self;
/// Tries to convert the `Consensus` type into the `Pooled` type.
fn try_into_pooled(self) -> Result<Recovered<Self::Pooled>, Self::TryFromConsensusError> {
Self::try_consensus_into_pooled(self.into_consensus())
let consensus = self.into_consensus();
let (tx, signer) = consensus.into_parts();
Ok(Recovered::new_unchecked(tx.try_into()?, signer))
}
/// Tries to convert the `Consensus` type into the `Pooled` type.
fn try_consensus_into_pooled(
tx: Recovered<Self::Consensus>,
) -> Result<Recovered<Self::Pooled>, Self::TryFromConsensusError>;
/// Converts the `Pooled` type into the `Consensus` type.
fn pooled_into_consensus(tx: Self::Pooled) -> Self::Consensus {
tx.into()
@ -1157,9 +1150,22 @@ impl<T: SignedTransaction> EthPooledTransaction<T> {
}
}
/// Conversion from the network transaction type to the pool transaction type.
impl From<Recovered<PooledTransaction>> for EthPooledTransaction {
fn from(tx: Recovered<PooledTransaction>) -> Self {
impl PoolTransaction for EthPooledTransaction {
type TryFromConsensusError = TransactionConversionError;
type Consensus = TransactionSigned;
type Pooled = PooledTransaction;
fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
self.transaction().clone()
}
fn into_consensus(self) -> Recovered<Self::Consensus> {
self.transaction
}
fn from_pooled(tx: Recovered<Self::Pooled>) -> Self {
let encoded_length = tx.encode_2718_len();
let (tx, signer) = tx.into_parts();
match tx {
@ -1181,27 +1187,6 @@ impl From<Recovered<PooledTransaction>> for EthPooledTransaction {
}
}
}
}
impl PoolTransaction for EthPooledTransaction {
type TryFromConsensusError = TryFromRecoveredTransactionError;
type Consensus = TransactionSigned;
type Pooled = PooledTransaction;
fn clone_into_consensus(&self) -> Recovered<Self::Consensus> {
self.transaction().clone()
}
fn try_consensus_into_pooled(
tx: Recovered<Self::Consensus>,
) -> Result<Recovered<Self::Pooled>, Self::TryFromConsensusError> {
let (tx, signer) = tx.into_parts();
let pooled =
tx.try_into().map_err(|_| TryFromRecoveredTransactionError::BlobSidecarMissing)?;
Ok(Recovered::new_unchecked(pooled, signer))
}
/// Returns hash of the transaction.
fn hash(&self) -> &TxHash {
@ -1359,39 +1344,6 @@ impl EthPoolTransaction for EthPooledTransaction {
}
}
impl TryFrom<Recovered<TransactionSigned>> for EthPooledTransaction {
type Error = TryFromRecoveredTransactionError;
fn try_from(tx: Recovered<TransactionSigned>) -> Result<Self, Self::Error> {
// ensure we can handle the transaction type and its format
match tx.ty() {
0..=EIP1559_TX_TYPE_ID | EIP7702_TX_TYPE_ID => {
// supported
}
EIP4844_TX_TYPE_ID => {
// doesn't have a blob sidecar
return Err(TryFromRecoveredTransactionError::BlobSidecarMissing);
}
unsupported => {
// unsupported transaction type
return Err(TryFromRecoveredTransactionError::UnsupportedTransactionType(
unsupported,
))
}
};
let encoded_length = tx.encode_2718_len();
let transaction = Self::new(tx, encoded_length);
Ok(transaction)
}
}
impl From<EthPooledTransaction> for Recovered<TransactionSigned> {
fn from(tx: EthPooledTransaction) -> Self {
tx.transaction
}
}
/// Represents the blob sidecar of the [`EthPooledTransaction`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EthBlobTransactionSidecar {

View File

@ -928,7 +928,7 @@ mod tests {
let data = hex::decode(raw).unwrap();
let tx = PooledTransaction::decode_2718(&mut data.as_ref()).unwrap();
tx.try_into_recovered().unwrap().into()
EthPooledTransaction::from_pooled(tx.try_into_recovered().unwrap())
}
// <https://github.com/paradigmxyz/reth/issues/5178>