feat: use network tx for Pool::Pooled (#13159)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Arsenii Kulikov
2024-12-05 22:50:43 +04:00
committed by GitHub
parent 4fe5c2a577
commit 8226fa0cac
17 changed files with 148 additions and 138 deletions

View File

@ -25,7 +25,7 @@ use reth_node_builder::{
BuilderContext, Node, NodeAdapter, NodeComponentsBuilder, PayloadBuilderConfig, PayloadTypes, BuilderContext, Node, NodeAdapter, NodeComponentsBuilder, PayloadBuilderConfig, PayloadTypes,
}; };
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_primitives::EthPrimitives; use reth_primitives::{EthPrimitives, PooledTransactionsElement};
use reth_provider::{CanonStateSubscriptions, EthStorage}; use reth_provider::{CanonStateSubscriptions, EthStorage};
use reth_rpc::EthApi; use reth_rpc::EthApi;
use reth_tracing::tracing::{debug, info}; use reth_tracing::tracing::{debug, info};
@ -309,8 +309,12 @@ pub struct EthereumNetworkBuilder {
impl<Node, Pool> NetworkBuilder<Node, Pool> for EthereumNetworkBuilder impl<Node, Pool> NetworkBuilder<Node, Pool> for EthereumNetworkBuilder
where where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec = ChainSpec, Primitives = EthPrimitives>>, Node: FullNodeTypes<Types: NodeTypes<ChainSpec = ChainSpec, Primitives = EthPrimitives>>,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>> Pool: TransactionPool<
+ Unpin Transaction: PoolTransaction<
Consensus = TxTy<Node::Types>,
Pooled = PooledTransactionsElement,
>,
> + Unpin
+ 'static, + 'static,
{ {
async fn build_network( async fn build_network(

View File

@ -20,7 +20,7 @@ use reth_network_api::{
NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers, NetworkEvent, NetworkEventListenerProvider, NetworkInfo, Peers,
}; };
use reth_network_peers::PeerId; use reth_network_peers::PeerId;
use reth_primitives::TransactionSigned; use reth_primitives::{PooledTransactionsElement, TransactionSigned};
use reth_provider::{test_utils::NoopProvider, ChainSpecProvider}; use reth_provider::{test_utils::NoopProvider, ChainSpecProvider};
use reth_storage_api::{BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory}; use reth_storage_api::{BlockReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory};
use reth_tasks::TokioTaskExecutor; use reth_tasks::TokioTaskExecutor;
@ -206,8 +206,12 @@ where
+ Clone + Clone
+ Unpin + Unpin
+ 'static, + 'static,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TransactionSigned>> Pool: TransactionPool<
+ Unpin Transaction: PoolTransaction<
Consensus = TransactionSigned,
Pooled = PooledTransactionsElement,
>,
> + Unpin
+ 'static, + 'static,
{ {
/// Spawns the testnet to a separate task /// Spawns the testnet to a separate task
@ -273,8 +277,12 @@ where
> + HeaderProvider > + HeaderProvider
+ Unpin + Unpin
+ 'static, + 'static,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TransactionSigned>> Pool: TransactionPool<
+ Unpin Transaction: PoolTransaction<
Consensus = TransactionSigned,
Pooled = PooledTransactionsElement,
>,
> + Unpin
+ 'static, + 'static,
{ {
type Output = (); type Output = ();
@ -476,8 +484,12 @@ where
> + HeaderProvider > + HeaderProvider
+ Unpin + Unpin
+ 'static, + 'static,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TransactionSigned>> Pool: TransactionPool<
+ Unpin Transaction: PoolTransaction<
Consensus = TransactionSigned,
Pooled = PooledTransactionsElement,
>,
> + Unpin
+ 'static, + 'static,
{ {
type Output = (); type Output = ();

View File

@ -49,9 +49,7 @@ use reth_network_p2p::{
}; };
use reth_network_peers::PeerId; use reth_network_peers::PeerId;
use reth_network_types::ReputationChangeKind; use reth_network_types::ReputationChangeKind;
use reth_primitives::{ use reth_primitives::{transaction::SignedTransactionIntoRecoveredExt, TransactionSigned};
transaction::SignedTransactionIntoRecoveredExt, RecoveredTx, TransactionSigned,
};
use reth_primitives_traits::{SignedTransaction, TxType}; use reth_primitives_traits::{SignedTransaction, TxType};
use reth_tokio_util::EventStream; use reth_tokio_util::EventStream;
use reth_transaction_pool::{ use reth_transaction_pool::{
@ -703,10 +701,8 @@ where
BroadcastedTransaction: SignedTransaction, BroadcastedTransaction: SignedTransaction,
PooledTransaction: SignedTransaction, PooledTransaction: SignedTransaction,
>, >,
Pool::Transaction: PoolTransaction< Pool::Transaction:
Consensus = N::BroadcastedTransaction, PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
Pooled: Into<N::PooledTransaction> + From<RecoveredTx<N::PooledTransaction>>,
>,
{ {
/// Invoked when transactions in the local mempool are considered __pending__. /// Invoked when transactions in the local mempool are considered __pending__.
/// ///
@ -991,13 +987,12 @@ where
let _ = response.send(Ok(PooledTransactions::default())); let _ = response.send(Ok(PooledTransactions::default()));
return return
} }
let transactions = self.pool.get_pooled_transactions_as::<N::PooledTransaction>( let transactions = self.pool.get_pooled_transaction_elements(
request.0, request.0,
GetPooledTransactionLimit::ResponseSizeSoftLimit( GetPooledTransactionLimit::ResponseSizeSoftLimit(
self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response, self.transaction_fetcher.info.soft_limit_byte_size_pooled_transactions_response,
), ),
); );
trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer"); trace!(target: "net::tx::propagation", sent_txs=?transactions.iter().map(|tx| tx.tx_hash()), "Sending requested transactions to peer");
// we sent a response at which point we assume that the peer is aware of the // we sent a response at which point we assume that the peer is aware of the
@ -1247,7 +1242,7 @@ where
} else { } else {
// this is a new transaction that should be imported into the pool // this is a new transaction that should be imported into the pool
let pool_transaction = Pool::Transaction::from_pooled(tx.into()); let pool_transaction = Pool::Transaction::from_pooled(tx);
new_txs.push(pool_transaction); new_txs.push(pool_transaction);
entry.insert(HashSet::from([peer_id])); entry.insert(HashSet::from([peer_id]));
@ -1338,10 +1333,8 @@ where
BroadcastedTransaction: SignedTransaction, BroadcastedTransaction: SignedTransaction,
PooledTransaction: SignedTransaction, PooledTransaction: SignedTransaction,
>, >,
Pool::Transaction: PoolTransaction< Pool::Transaction:
Consensus = N::BroadcastedTransaction, PoolTransaction<Consensus = N::BroadcastedTransaction, Pooled = N::PooledTransaction>,
Pooled: Into<N::PooledTransaction> + From<RecoveredTx<N::PooledTransaction>>,
>,
{ {
type Output = (); type Output = ();

View File

@ -651,7 +651,10 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
pub fn start_network<Pool>(&self, builder: NetworkBuilder<(), ()>, pool: Pool) -> NetworkHandle pub fn start_network<Pool>(&self, builder: NetworkBuilder<(), ()>, pool: Pool) -> NetworkHandle
where where
Pool: TransactionPool< Pool: TransactionPool<
Transaction: PoolTransaction<Consensus = reth_primitives::TransactionSigned>, Transaction: PoolTransaction<
Consensus = reth_primitives::TransactionSigned,
Pooled = reth_primitives::PooledTransactionsElement,
>,
> + Unpin > + Unpin
+ 'static, + 'static,
Node::Provider: BlockReader< Node::Provider: BlockReader<
@ -677,7 +680,10 @@ impl<Node: FullNodeTypes> BuilderContext<Node> {
) -> NetworkHandle ) -> NetworkHandle
where where
Pool: TransactionPool< Pool: TransactionPool<
Transaction: PoolTransaction<Consensus = reth_primitives::TransactionSigned>, Transaction: PoolTransaction<
Consensus = reth_primitives::TransactionSigned,
Pooled = reth_primitives::PooledTransactionsElement,
>,
> + Unpin > + Unpin
+ 'static, + 'static,
Node::Provider: BlockReader< Node::Provider: BlockReader<

View File

@ -35,7 +35,7 @@ use reth_optimism_rpc::{
OpEthApi, SequencerClient, OpEthApi, SequencerClient,
}; };
use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService};
use reth_primitives::{BlockBody, TransactionSigned}; use reth_primitives::{BlockBody, PooledTransactionsElement, TransactionSigned};
use reth_provider::{ use reth_provider::{
providers::ChainStorage, BlockBodyReader, BlockBodyWriter, CanonStateSubscriptions, providers::ChainStorage, BlockBodyReader, BlockBodyWriter, CanonStateSubscriptions,
ChainSpecProvider, DBProvider, EthStorage, ProviderResult, ReadBodyInput, ChainSpecProvider, DBProvider, EthStorage, ProviderResult, ReadBodyInput,
@ -633,8 +633,12 @@ impl OpNetworkBuilder {
impl<Node, Pool> NetworkBuilder<Node, Pool> for OpNetworkBuilder impl<Node, Pool> NetworkBuilder<Node, Pool> for OpNetworkBuilder
where where
Node: FullNodeTypes<Types: NodeTypes<ChainSpec = OpChainSpec, Primitives = OpPrimitives>>, Node: FullNodeTypes<Types: NodeTypes<ChainSpec = OpChainSpec, Primitives = OpPrimitives>>,
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = TxTy<Node::Types>>> Pool: TransactionPool<
+ Unpin Transaction: PoolTransaction<
Consensus = TxTy<Node::Types>,
Pooled = PooledTransactionsElement,
>,
> + Unpin
+ 'static, + 'static,
{ {
async fn build_network( async fn build_network(

View File

@ -31,8 +31,7 @@ where
/// Returns the hash of the transaction. /// Returns the hash of the transaction.
async fn send_raw_transaction(&self, tx: Bytes) -> Result<B256, Self::Error> { async fn send_raw_transaction(&self, tx: Bytes) -> Result<B256, Self::Error> {
let recovered = recover_raw_transaction(tx.clone())?; let recovered = recover_raw_transaction(tx.clone())?;
let pool_transaction = let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
<Self::Pool as TransactionPool>::Transaction::from_pooled(recovered.into());
// On optimism, transactions are forwarded directly to the sequencer to be included in // On optimism, transactions are forwarded directly to the sequencer to be included in
// blocks that it builds. // blocks that it builds.

View File

@ -344,7 +344,7 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
async move { async move {
let recovered = recover_raw_transaction(tx)?; let recovered = recover_raw_transaction(tx)?;
let pool_transaction = let pool_transaction =
<Self::Pool as TransactionPool>::Transaction::from_pooled(recovered.into()); <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
// submit the transaction to the pool with a `Local` origin // submit the transaction to the pool with a `Local` origin
let hash = self let hash = self

View File

@ -1,21 +1,21 @@
//! Commonly used code snippets //! Commonly used code snippets
use alloy_eips::eip2718::Decodable2718;
use alloy_primitives::Bytes; use alloy_primitives::Bytes;
use reth_primitives::{PooledTransactionsElement, PooledTransactionsElementEcRecovered}; use reth_primitives::{transaction::SignedTransactionIntoRecoveredExt, RecoveredTx};
use reth_primitives_traits::SignedTransaction;
use std::future::Future; use std::future::Future;
use super::{EthApiError, EthResult}; use super::{EthApiError, EthResult};
/// Recovers a [`PooledTransactionsElementEcRecovered`] from an enveloped encoded byte stream. /// Recovers a [`SignedTransaction`] from an enveloped encoded byte stream.
/// ///
/// See [`Decodable2718::decode_2718`] /// See [`alloy_eips::eip2718::Decodable2718::decode_2718`]
pub fn recover_raw_transaction(data: Bytes) -> EthResult<PooledTransactionsElementEcRecovered> { pub fn recover_raw_transaction<T: SignedTransaction>(data: Bytes) -> EthResult<RecoveredTx<T>> {
if data.is_empty() { if data.is_empty() {
return Err(EthApiError::EmptyRawTransactionData) return Err(EthApiError::EmptyRawTransactionData)
} }
let transaction = PooledTransactionsElement::decode_2718(&mut data.as_ref()) let transaction = T::decode_2718(&mut data.as_ref())
.map_err(|_| EthApiError::FailedToDecodeSignedTransaction)?; .map_err(|_| EthApiError::FailedToDecodeSignedTransaction)?;
transaction.try_into_ecrecovered().or(Err(EthApiError::InvalidTransactionSignature)) transaction.try_into_ecrecovered().or(Err(EthApiError::InvalidTransactionSignature))

View File

@ -79,7 +79,7 @@ where
let transactions = txs let transactions = txs
.into_iter() .into_iter()
.map(recover_raw_transaction) .map(recover_raw_transaction::<PooledTransactionsElement>)
.collect::<Result<Vec<_>, _>>()? .collect::<Result<Vec<_>, _>>()?
.into_iter() .into_iter()
.map(|tx| tx.to_components()) .map(|tx| tx.to_components())

View File

@ -10,7 +10,7 @@ use alloy_rpc_types_mev::{
use jsonrpsee::core::RpcResult; use jsonrpsee::core::RpcResult;
use reth_chainspec::EthChainSpec; use reth_chainspec::EthChainSpec;
use reth_evm::{ConfigureEvm, ConfigureEvmEnv}; use reth_evm::{ConfigureEvm, ConfigureEvmEnv};
use reth_primitives::TransactionSigned; use reth_primitives::{PooledTransactionsElement, TransactionSigned};
use reth_provider::{ChainSpecProvider, HeaderProvider}; use reth_provider::{ChainSpecProvider, HeaderProvider};
use reth_revm::database::StateProviderDatabase; use reth_revm::database::StateProviderDatabase;
use reth_rpc_api::MevSimApiServer; use reth_rpc_api::MevSimApiServer;
@ -171,7 +171,8 @@ where
match &body[idx] { match &body[idx] {
BundleItem::Tx { tx, can_revert } => { BundleItem::Tx { tx, can_revert } => {
let recovered_tx = let recovered_tx =
recover_raw_transaction(tx.clone()).map_err(EthApiError::from)?; recover_raw_transaction::<PooledTransactionsElement>(tx.clone())
.map_err(EthApiError::from)?;
let (tx, signer) = recovered_tx.to_components(); let (tx, signer) = recovered_tx.to_components();
let tx = tx.into_transaction(); let tx = tx.into_transaction();

View File

@ -19,6 +19,7 @@ use reth_consensus_common::calc::{
base_block_reward, base_block_reward_pre_merge, block_reward, ommer_reward, base_block_reward, base_block_reward_pre_merge, block_reward, ommer_reward,
}; };
use reth_evm::ConfigureEvmEnv; use reth_evm::ConfigureEvmEnv;
use reth_primitives::PooledTransactionsElement;
use reth_provider::{BlockReader, ChainSpecProvider, EvmEnvProvider, StateProviderFactory}; use reth_provider::{BlockReader, ChainSpecProvider, EvmEnvProvider, StateProviderFactory};
use reth_revm::database::StateProviderDatabase; use reth_revm::database::StateProviderDatabase;
use reth_rpc_api::TraceApiServer; use reth_rpc_api::TraceApiServer;
@ -115,7 +116,8 @@ where
trace_types: HashSet<TraceType>, trace_types: HashSet<TraceType>,
block_id: Option<BlockId>, block_id: Option<BlockId>,
) -> Result<TraceResults, Eth::Error> { ) -> Result<TraceResults, Eth::Error> {
let tx = recover_raw_transaction(tx)?.into_ecrecovered_transaction(); let tx = recover_raw_transaction::<PooledTransactionsElement>(tx)?
.into_ecrecovered_transaction();
let (cfg, block, at) = self.eth_api().evm_env_at(block_id.unwrap_or_default()).await?; let (cfg, block, at) = self.eth_api().evm_env_at(block_id.unwrap_or_default()).await?;

View File

@ -156,6 +156,7 @@ use alloy_primitives::{Address, TxHash, B256, U256};
use aquamarine as _; use aquamarine as _;
use reth_eth_wire_types::HandleMempoolData; use reth_eth_wire_types::HandleMempoolData;
use reth_execution_types::ChangedAccount; use reth_execution_types::ChangedAccount;
use reth_primitives::RecoveredTx;
use reth_storage_api::StateProviderFactory; use reth_storage_api::StateProviderFactory;
use std::{collections::HashSet, sync::Arc}; use std::{collections::HashSet, sync::Arc};
use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Receiver;
@ -419,21 +420,11 @@ where
self.pool.get_pooled_transaction_elements(tx_hashes, limit) self.pool.get_pooled_transaction_elements(tx_hashes, limit)
} }
fn get_pooled_transactions_as<P>(
&self,
tx_hashes: Vec<TxHash>,
limit: GetPooledTransactionLimit,
) -> Vec<P>
where
<Self::Transaction as PoolTransaction>::Pooled: Into<P>,
{
self.pool.get_pooled_transactions_as(tx_hashes, limit)
}
fn get_pooled_transaction_element( fn get_pooled_transaction_element(
&self, &self,
tx_hash: TxHash, tx_hash: TxHash,
) -> Option<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled> { ) -> Option<RecoveredTx<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
{
self.pool.get_pooled_transaction_element(tx_hash) self.pool.get_pooled_transaction_element(tx_hash)
} }

View File

@ -4,7 +4,7 @@ use crate::{
blobstore::{BlobStoreCanonTracker, BlobStoreUpdates}, blobstore::{BlobStoreCanonTracker, BlobStoreUpdates},
error::PoolError, error::PoolError,
metrics::MaintainPoolMetrics, metrics::MaintainPoolMetrics,
traits::{CanonicalStateUpdate, TransactionPool, TransactionPoolExt}, traits::{CanonicalStateUpdate, EthPoolTransaction, TransactionPool, TransactionPoolExt},
BlockInfo, PoolTransaction, PoolUpdateKind, BlockInfo, PoolTransaction, PoolUpdateKind,
}; };
use alloy_consensus::BlockHeader; use alloy_consensus::BlockHeader;
@ -20,8 +20,7 @@ use reth_chainspec::{ChainSpecProvider, EthChainSpec};
use reth_execution_types::ChangedAccount; use reth_execution_types::ChangedAccount;
use reth_fs_util::FsPathError; use reth_fs_util::FsPathError;
use reth_primitives::{ use reth_primitives::{
transaction::SignedTransactionIntoRecoveredExt, PooledTransactionsElementEcRecovered, transaction::SignedTransactionIntoRecoveredExt, SealedHeader, TransactionSigned,
SealedHeader, TransactionSigned,
}; };
use reth_primitives_traits::SignedTransaction; use reth_primitives_traits::SignedTransaction;
use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory}; use reth_storage_api::{errors::provider::ProviderError, BlockReaderIdExt, StateProviderFactory};
@ -335,13 +334,9 @@ pub async fn maintain_transaction_pool<Client, P, St, Tasks>(
.flatten() .flatten()
.map(Arc::unwrap_or_clone) .map(Arc::unwrap_or_clone)
.and_then(|sidecar| { .and_then(|sidecar| {
PooledTransactionsElementEcRecovered::try_from_blob_transaction( <P as TransactionPool>::Transaction::try_from_eip4844(
tx, sidecar, tx, sidecar,
) )
.ok()
})
.map(|tx| {
<P as TransactionPool>::Transaction::from_pooled(tx.into())
}) })
} else { } else {
<P as TransactionPool>::Transaction::try_from_consensus(tx).ok() <P as TransactionPool>::Transaction::try_from_consensus(tx).ok()

View File

@ -22,6 +22,7 @@ use alloy_eips::{
}; };
use alloy_primitives::{Address, TxHash, B256, U256}; use alloy_primitives::{Address, TxHash, B256, U256};
use reth_eth_wire_types::HandleMempoolData; use reth_eth_wire_types::HandleMempoolData;
use reth_primitives::RecoveredTx;
use std::{collections::HashSet, marker::PhantomData, sync::Arc}; use std::{collections::HashSet, marker::PhantomData, sync::Arc};
use tokio::sync::{mpsc, mpsc::Receiver}; use tokio::sync::{mpsc, mpsc::Receiver};
@ -139,21 +140,10 @@ impl TransactionPool for NoopTransactionPool {
vec![] vec![]
} }
fn get_pooled_transactions_as<T>(
&self,
_tx_hashes: Vec<TxHash>,
_limit: GetPooledTransactionLimit,
) -> Vec<T>
where
<Self::Transaction as PoolTransaction>::Pooled: Into<T>,
{
vec![]
}
fn get_pooled_transaction_element( fn get_pooled_transaction_element(
&self, &self,
_tx_hash: TxHash, _tx_hash: TxHash,
) -> Option<<Self::Transaction as PoolTransaction>::Pooled> { ) -> Option<RecoveredTx<<Self::Transaction as PoolTransaction>::Pooled>> {
None None
} }

View File

@ -88,6 +88,7 @@ use reth_eth_wire_types::HandleMempoolData;
use reth_execution_types::ChangedAccount; use reth_execution_types::ChangedAccount;
use alloy_eips::eip4844::BlobTransactionSidecar; use alloy_eips::eip4844::BlobTransactionSidecar;
use reth_primitives::RecoveredTx;
use std::{ use std::{
collections::{HashMap, HashSet}, collections::{HashMap, HashSet},
fmt, fmt,
@ -312,7 +313,7 @@ where
fn to_pooled_transaction( fn to_pooled_transaction(
&self, &self,
transaction: Arc<ValidPoolTransaction<T::Transaction>>, transaction: Arc<ValidPoolTransaction<T::Transaction>>,
) -> Option<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled> ) -> Option<RecoveredTx<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
where where
<V as TransactionValidator>::Transaction: EthPoolTransaction, <V as TransactionValidator>::Transaction: EthPoolTransaction,
{ {
@ -342,19 +343,6 @@ where
) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled> ) -> Vec<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>
where where
<V as TransactionValidator>::Transaction: EthPoolTransaction, <V as TransactionValidator>::Transaction: EthPoolTransaction,
{
self.get_pooled_transactions_as(tx_hashes, limit)
}
/// Returns pooled transactions for the given transaction hashes as the requested type.
pub fn get_pooled_transactions_as<P>(
&self,
tx_hashes: Vec<TxHash>,
limit: GetPooledTransactionLimit,
) -> Vec<P>
where
<V as TransactionValidator>::Transaction: EthPoolTransaction,
<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled: Into<P>,
{ {
let transactions = self.get_all(tx_hashes); let transactions = self.get_all(tx_hashes);
let mut elements = Vec::with_capacity(transactions.len()); let mut elements = Vec::with_capacity(transactions.len());
@ -366,7 +354,7 @@ where
}; };
size += encoded_len; size += encoded_len;
elements.push(pooled.into()); elements.push(pooled.into_signed());
if limit.exceeds(size) { if limit.exceeds(size) {
break break
@ -380,7 +368,7 @@ where
pub fn get_pooled_transaction_element( pub fn get_pooled_transaction_element(
&self, &self,
tx_hash: TxHash, tx_hash: TxHash,
) -> Option<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled> ) -> Option<RecoveredTx<<<V as TransactionValidator>::Transaction as PoolTransaction>::Pooled>>
where where
<V as TransactionValidator>::Transaction: EthPoolTransaction, <V as TransactionValidator>::Transaction: EthPoolTransaction,
{ {

View File

@ -25,8 +25,9 @@ use rand::{
prelude::Distribution, prelude::Distribution,
}; };
use reth_primitives::{ use reth_primitives::{
transaction::TryFromRecoveredTransactionError, PooledTransactionsElementEcRecovered, transaction::{SignedTransactionIntoRecoveredExt, TryFromRecoveredTransactionError},
RecoveredTx, Transaction, TransactionSigned, TxType, PooledTransactionsElement, PooledTransactionsElementEcRecovered, RecoveredTx, Transaction,
TransactionSigned, TxType,
}; };
use reth_primitives_traits::InMemorySize; use reth_primitives_traits::InMemorySize;
use std::{ops::Range, sync::Arc, time::Instant, vec::IntoIter}; use std::{ops::Range, sync::Arc, time::Instant, vec::IntoIter};
@ -594,7 +595,7 @@ impl PoolTransaction for MockTransaction {
type Consensus = TransactionSigned; type Consensus = TransactionSigned;
type Pooled = PooledTransactionsElementEcRecovered; type Pooled = PooledTransactionsElement;
fn try_from_consensus( fn try_from_consensus(
tx: RecoveredTx<Self::Consensus>, tx: RecoveredTx<Self::Consensus>,
@ -606,14 +607,17 @@ impl PoolTransaction for MockTransaction {
self.into() self.into()
} }
fn from_pooled(pooled: Self::Pooled) -> Self { fn from_pooled(pooled: RecoveredTx<Self::Pooled>) -> Self {
pooled.into() pooled.into()
} }
fn try_consensus_into_pooled( fn try_consensus_into_pooled(
tx: RecoveredTx<Self::Consensus>, tx: RecoveredTx<Self::Consensus>,
) -> Result<Self::Pooled, Self::TryFromConsensusError> { ) -> Result<RecoveredTx<Self::Pooled>, Self::TryFromConsensusError> {
Self::Pooled::try_from(tx).map_err(|_| TryFromRecoveredTransactionError::BlobSidecarMissing) let (tx, signer) = tx.to_components();
Self::Pooled::try_from(tx)
.map(|tx| tx.with_signer(signer))
.map_err(|_| TryFromRecoveredTransactionError::BlobSidecarMissing)
} }
fn hash(&self) -> &TxHash { fn hash(&self) -> &TxHash {
@ -786,14 +790,27 @@ impl EthPoolTransaction for MockTransaction {
} }
} }
fn try_into_pooled_eip4844(self, sidecar: Arc<BlobTransactionSidecar>) -> Option<Self::Pooled> { fn try_into_pooled_eip4844(
Self::Pooled::try_from_blob_transaction( self,
self.into_consensus(), sidecar: Arc<BlobTransactionSidecar>,
Arc::unwrap_or_clone(sidecar), ) -> Option<RecoveredTx<Self::Pooled>> {
) let (tx, signer) = self.into_consensus().to_components();
Self::Pooled::try_from_blob_transaction(tx, Arc::unwrap_or_clone(sidecar))
.map(|tx| tx.with_signer(signer))
.ok() .ok()
} }
fn try_from_eip4844(
tx: RecoveredTx<Self::Consensus>,
sidecar: BlobTransactionSidecar,
) -> Option<Self> {
let (tx, signer) = tx.to_components();
Self::Pooled::try_from_blob_transaction(tx, sidecar)
.map(|tx| tx.with_signer(signer))
.ok()
.map(Self::from_pooled)
}
fn validate_blob( fn validate_blob(
&self, &self,
_blob: &BlobTransactionSidecar, _blob: &BlobTransactionSidecar,

View File

@ -19,8 +19,10 @@ use futures_util::{ready, Stream};
use reth_eth_wire_types::HandleMempoolData; use reth_eth_wire_types::HandleMempoolData;
use reth_execution_types::ChangedAccount; use reth_execution_types::ChangedAccount;
use reth_primitives::{ use reth_primitives::{
kzg::KzgSettings, transaction::TryFromRecoveredTransactionError, PooledTransactionsElement, kzg::KzgSettings,
PooledTransactionsElementEcRecovered, RecoveredTx, SealedBlock, Transaction, TransactionSigned, transaction::{SignedTransactionIntoRecoveredExt, TryFromRecoveredTransactionError},
PooledTransactionsElement, PooledTransactionsElementEcRecovered, RecoveredTx, SealedBlock,
Transaction, TransactionSigned,
}; };
use reth_primitives_traits::SignedTransaction; use reth_primitives_traits::SignedTransaction;
#[cfg(feature = "serde")] #[cfg(feature = "serde")]
@ -236,15 +238,6 @@ pub trait TransactionPool: Send + Sync + Clone {
limit: GetPooledTransactionLimit, limit: GetPooledTransactionLimit,
) -> Vec<<Self::Transaction as PoolTransaction>::Pooled>; ) -> Vec<<Self::Transaction as PoolTransaction>::Pooled>;
/// Returns the pooled transaction variant for the given transaction hash as the requested type.
fn get_pooled_transactions_as<T>(
&self,
tx_hashes: Vec<TxHash>,
limit: GetPooledTransactionLimit,
) -> Vec<T>
where
<Self::Transaction as PoolTransaction>::Pooled: Into<T>;
/// Returns the pooled transaction variant for the given transaction hash. /// Returns the pooled transaction variant for the given transaction hash.
/// ///
/// This adheres to the expected behavior of /// This adheres to the expected behavior of
@ -260,15 +253,7 @@ pub trait TransactionPool: Send + Sync + Clone {
fn get_pooled_transaction_element( fn get_pooled_transaction_element(
&self, &self,
tx_hash: TxHash, tx_hash: TxHash,
) -> Option<<Self::Transaction as PoolTransaction>::Pooled>; ) -> Option<RecoveredTx<<Self::Transaction as PoolTransaction>::Pooled>>;
/// Returns the pooled transaction variant for the given transaction hash as the requested type.
fn get_pooled_transaction_as<T>(&self, tx_hash: TxHash) -> Option<T>
where
<Self::Transaction as PoolTransaction>::Pooled: Into<T>,
{
self.get_pooled_transaction_element(tx_hash).map(Into::into)
}
/// Returns an iterator that yields transactions that are ready for block production. /// Returns an iterator that yields transactions that are ready for block production.
/// ///
@ -973,6 +958,7 @@ pub trait PoolTransaction:
+ Clone + Clone
+ TryFrom<RecoveredTx<Self::Consensus>, Error = Self::TryFromConsensusError> + TryFrom<RecoveredTx<Self::Consensus>, Error = Self::TryFromConsensusError>
+ Into<RecoveredTx<Self::Consensus>> + Into<RecoveredTx<Self::Consensus>>
+ From<RecoveredTx<Self::Pooled>>
{ {
/// Associated error type for the `try_from_consensus` method. /// Associated error type for the `try_from_consensus` method.
type TryFromConsensusError: fmt::Display; type TryFromConsensusError: fmt::Display;
@ -981,7 +967,7 @@ pub trait PoolTransaction:
type Consensus; type Consensus;
/// Associated type representing the recovered pooled variant of the transaction. /// Associated type representing the recovered pooled variant of the transaction.
type Pooled: Encodable2718 + Into<Self>; type Pooled: SignedTransaction;
/// Define a method to convert from the `Consensus` type to `Self` /// Define a method to convert from the `Consensus` type to `Self`
fn try_from_consensus( fn try_from_consensus(
@ -1003,19 +989,19 @@ pub trait PoolTransaction:
} }
/// Define a method to convert from the `Pooled` type to `Self` /// Define a method to convert from the `Pooled` type to `Self`
fn from_pooled(pooled: Self::Pooled) -> Self { fn from_pooled(pooled: RecoveredTx<Self::Pooled>) -> Self {
pooled.into() pooled.into()
} }
/// Tries to convert the `Consensus` type into the `Pooled` type. /// Tries to convert the `Consensus` type into the `Pooled` type.
fn try_into_pooled(self) -> Result<Self::Pooled, Self::TryFromConsensusError> { fn try_into_pooled(self) -> Result<RecoveredTx<Self::Pooled>, Self::TryFromConsensusError> {
Self::try_consensus_into_pooled(self.into_consensus()) Self::try_consensus_into_pooled(self.into_consensus())
} }
/// Tries to convert the `Consensus` type into the `Pooled` type. /// Tries to convert the `Consensus` type into the `Pooled` type.
fn try_consensus_into_pooled( fn try_consensus_into_pooled(
tx: RecoveredTx<Self::Consensus>, tx: RecoveredTx<Self::Consensus>,
) -> Result<Self::Pooled, Self::TryFromConsensusError>; ) -> Result<RecoveredTx<Self::Pooled>, Self::TryFromConsensusError>;
/// Hash of the transaction. /// Hash of the transaction.
fn hash(&self) -> &TxHash; fn hash(&self) -> &TxHash;
@ -1144,13 +1130,7 @@ pub trait PoolTransaction:
/// ///
/// This extends the [`PoolTransaction`] trait with additional methods that are specific to the /// This extends the [`PoolTransaction`] trait with additional methods that are specific to the
/// Ethereum pool. /// Ethereum pool.
pub trait EthPoolTransaction: pub trait EthPoolTransaction: PoolTransaction {
PoolTransaction<
Pooled: From<PooledTransactionsElementEcRecovered>
+ Into<PooledTransactionsElementEcRecovered>
+ Into<PooledTransactionsElement>,
>
{
/// Extracts the blob sidecar from the transaction. /// Extracts the blob sidecar from the transaction.
fn take_blob(&mut self) -> EthBlobTransactionSidecar; fn take_blob(&mut self) -> EthBlobTransactionSidecar;
@ -1162,7 +1142,18 @@ pub trait EthPoolTransaction:
/// ///
/// This returns an option, but callers should ensure that the transaction is an EIP-4844 /// This returns an option, but callers should ensure that the transaction is an EIP-4844
/// transaction: [`PoolTransaction::is_eip4844`]. /// transaction: [`PoolTransaction::is_eip4844`].
fn try_into_pooled_eip4844(self, sidecar: Arc<BlobTransactionSidecar>) -> Option<Self::Pooled>; fn try_into_pooled_eip4844(
self,
sidecar: Arc<BlobTransactionSidecar>,
) -> Option<RecoveredTx<Self::Pooled>>;
/// Tries to convert the `Consensus` type with a blob sidecar into the `Pooled` type.
///
/// Returns `None` if passed transaction is not a blob transaction.
fn try_from_eip4844(
tx: RecoveredTx<Self::Consensus>,
sidecar: BlobTransactionSidecar,
) -> Option<Self>;
/// Validates the blob sidecar of the transaction with the given settings. /// Validates the blob sidecar of the transaction with the given settings.
fn validate_blob( fn validate_blob(
@ -1258,7 +1249,7 @@ impl PoolTransaction for EthPooledTransaction {
type Consensus = TransactionSigned; type Consensus = TransactionSigned;
type Pooled = PooledTransactionsElementEcRecovered; type Pooled = PooledTransactionsElement;
fn clone_into_consensus(&self) -> RecoveredTx<Self::Consensus> { fn clone_into_consensus(&self) -> RecoveredTx<Self::Consensus> {
self.transaction().clone() self.transaction().clone()
@ -1266,8 +1257,11 @@ impl PoolTransaction for EthPooledTransaction {
fn try_consensus_into_pooled( fn try_consensus_into_pooled(
tx: RecoveredTx<Self::Consensus>, tx: RecoveredTx<Self::Consensus>,
) -> Result<Self::Pooled, Self::TryFromConsensusError> { ) -> Result<RecoveredTx<Self::Pooled>, Self::TryFromConsensusError> {
Self::Pooled::try_from(tx).map_err(|_| TryFromRecoveredTransactionError::BlobSidecarMissing) let (tx, signer) = tx.to_components();
let pooled = PooledTransactionsElement::try_from_broadcast(tx)
.map_err(|_| TryFromRecoveredTransactionError::BlobSidecarMissing)?;
Ok(RecoveredTx::from_signed_transaction(pooled, signer))
} }
/// Returns hash of the transaction. /// Returns hash of the transaction.
@ -1395,7 +1389,10 @@ impl EthPoolTransaction for EthPooledTransaction {
} }
} }
fn try_into_pooled_eip4844(self, sidecar: Arc<BlobTransactionSidecar>) -> Option<Self::Pooled> { fn try_into_pooled_eip4844(
self,
sidecar: Arc<BlobTransactionSidecar>,
) -> Option<RecoveredTx<Self::Pooled>> {
PooledTransactionsElementEcRecovered::try_from_blob_transaction( PooledTransactionsElementEcRecovered::try_from_blob_transaction(
self.into_consensus(), self.into_consensus(),
Arc::unwrap_or_clone(sidecar), Arc::unwrap_or_clone(sidecar),
@ -1403,6 +1400,17 @@ impl EthPoolTransaction for EthPooledTransaction {
.ok() .ok()
} }
fn try_from_eip4844(
tx: RecoveredTx<Self::Consensus>,
sidecar: BlobTransactionSidecar,
) -> Option<Self> {
let (tx, signer) = tx.to_components();
PooledTransactionsElement::try_from_blob_transaction(tx, sidecar)
.ok()
.map(|tx| tx.with_signer(signer))
.map(Self::from_pooled)
}
fn validate_blob( fn validate_blob(
&self, &self,
sidecar: &BlobTransactionSidecar, sidecar: &BlobTransactionSidecar,