chore(rpc): make TransactionCompat::fill stateful (#11732)

This commit is contained in:
Emilia Hane
2024-10-24 06:46:07 +02:00
committed by GitHub
parent 082f2cd235
commit 7a06298cf7
22 changed files with 167 additions and 75 deletions

1
Cargo.lock generated
View File

@ -8877,6 +8877,7 @@ dependencies = [
"alloy-serde",
"reth-primitives",
"reth-trie-common",
"serde",
"serde_json",
]

View File

@ -56,7 +56,7 @@ serde_json.workspace = true
# misc
thiserror.workspace = true
tracing.workspace = true
derive_more.workspace = true
derive_more = { workspace = true, features = ["constructor", "deref"] }
[dev-dependencies]
reth-optimism-chainspec.workspace = true

View File

@ -39,7 +39,7 @@ use reth_tasks::{
};
use reth_transaction_pool::TransactionPool;
use crate::{OpEthApiError, OpTxBuilder, SequencerClient};
use crate::{OpEthApiError, SequencerClient};
/// Adapter for [`EthApiInner`], which holds all the data required to serve core `eth_` API.
pub type EthApiNodeBackend<N> = EthApiInner<
@ -59,7 +59,7 @@ pub type EthApiNodeBackend<N> = EthApiInner<
///
/// This type implements the [`FullEthApi`](reth_rpc_eth_api::helpers::FullEthApi) by implemented
/// all the `Eth` helper traits and prerequisite traits.
#[derive(Clone, Deref)]
#[derive(Deref)]
pub struct OpEthApi<N: FullNodeComponents> {
/// Gateway to node's core components.
#[deref]
@ -102,7 +102,11 @@ where
{
type Error = OpEthApiError;
type NetworkTypes = Optimism;
type TransactionCompat = OpTxBuilder;
type TransactionCompat = Self;
fn tx_resp_builder(&self) -> &Self::TransactionCompat {
self
}
}
impl<N> EthApiSpec for OpEthApi<N>
@ -249,3 +253,12 @@ impl<N: FullNodeComponents> fmt::Debug for OpEthApi<N> {
f.debug_struct("OpEthApi").finish_non_exhaustive()
}
}
impl<N> Clone for OpEthApi<N>
where
N: FullNodeComponents,
{
fn clone(&self) -> Self {
Self { inner: self.inner.clone(), sequencer_client: self.sequencer_client.clone() }
}
}

View File

@ -6,7 +6,7 @@ use alloy_rpc_types::TransactionInfo;
use op_alloy_rpc_types::Transaction;
use reth_node_api::FullNodeComponents;
use reth_primitives::TransactionSignedEcRecovered;
use reth_provider::{BlockReaderIdExt, TransactionsProvider};
use reth_provider::{BlockReaderIdExt, ReceiptProvider, TransactionsProvider};
use reth_rpc::eth::EthTxBuilder;
use reth_rpc_eth_api::{
helpers::{EthSigner, EthTransactions, LoadTransaction, SpawnBlocking},
@ -88,22 +88,34 @@ where
}
}
/// Builds OP transaction response type.
#[derive(Clone, Debug, Copy)]
pub struct OpTxBuilder;
impl TransactionCompat for OpTxBuilder {
impl<N> TransactionCompat for OpEthApi<N>
where
N: FullNodeComponents,
{
type Transaction = Transaction;
fn fill(tx: TransactionSignedEcRecovered, tx_info: TransactionInfo) -> Self::Transaction {
fn fill(
&self,
tx: TransactionSignedEcRecovered,
tx_info: TransactionInfo,
) -> Self::Transaction {
let signed_tx = tx.clone().into_signed();
let hash = tx.hash;
let mut inner = EthTxBuilder::fill(tx, tx_info).inner;
let mut inner = EthTxBuilder.fill(tx, tx_info).inner;
if signed_tx.is_deposit() {
inner.gas_price = Some(signed_tx.max_fee_per_gas())
}
let deposit_receipt_version = self
.inner
.provider()
.receipt_by_hash(hash)
.ok() // todo: change sig to return result
.flatten()
.and_then(|receipt| receipt.deposit_receipt_version);
Transaction {
inner,
source_hash: signed_tx.source_hash(),
@ -111,7 +123,7 @@ impl TransactionCompat for OpTxBuilder {
// only include is_system_tx if true: <https://github.com/ethereum-optimism/op-geth/blob/641e996a2dcf1f81bac9416cb6124f86a69f1de7/internal/ethapi/api.go#L1518-L1518>
is_system_tx: (signed_tx.is_deposit() && signed_tx.is_system_transaction())
.then_some(true),
deposit_receipt_version: None, // todo: how to fill this field?
deposit_receipt_version,
}
}

View File

@ -15,5 +15,5 @@ pub mod eth;
pub mod sequencer;
pub use error::{OpEthApiError, OptimismInvalidTransactionError, SequencerClientError};
pub use eth::{transaction::OpTxBuilder, OpEthApi, OpReceiptBuilder};
pub use eth::{OpEthApi, OpReceiptBuilder};
pub use sequencer::SequencerClient;

View File

@ -22,7 +22,7 @@ pub struct EthHandlers<Provider, Pool, Network, Events, EthApi: EthApiTypes> {
/// Polling based filter handler available on all transports
pub filter: EthFilter<Provider, Pool, EthApi>,
/// Handler for subscriptions only available for transports that support it (ws, ipc)
pub pubsub: EthPubSub<Provider, Pool, Events, Network, EthApi>,
pub pubsub: EthPubSub<Provider, Pool, Events, Network, EthApi::TransactionCompat>,
}
impl<Provider, Pool, Network, Events, EthApi> EthHandlers<Provider, Pool, Network, Events, EthApi>
@ -94,6 +94,7 @@ where
ctx.cache.clone(),
ctx.config.filter_config(),
Box::new(ctx.executor.clone()),
api.tx_resp_builder().clone(),
);
let pubsub = EthPubSub::with_spawner(
@ -102,6 +103,7 @@ where
ctx.events.clone(),
ctx.network.clone(),
Box::new(ctx.executor.clone()),
api.tx_resp_builder().clone(),
);
Self { api, cache: ctx.cache, filter, pubsub }

View File

@ -1199,9 +1199,12 @@ where
.into_rpc()
.into(),
RethRpcModule::Web3 => Web3Api::new(self.network.clone()).into_rpc().into(),
RethRpcModule::Txpool => {
TxPoolApi::<_, EthApi>::new(self.pool.clone()).into_rpc().into()
}
RethRpcModule::Txpool => TxPoolApi::new(
self.pool.clone(),
self.eth.api.tx_resp_builder().clone(),
)
.into_rpc()
.into(),
RethRpcModule::Rpc => RPCApi::new(
namespaces
.iter()

View File

@ -502,7 +502,7 @@ where
trace!(target: "rpc::eth", ?hash, "Serving eth_getTransactionByHash");
Ok(EthTransactions::transaction_by_hash(self, hash)
.await?
.map(|tx| tx.into_transaction::<T::TransactionCompat>()))
.map(|tx| tx.into_transaction(self.tx_resp_builder())))
}
/// Handler for: `eth_getRawTransactionByBlockHashAndIndex`

View File

@ -63,11 +63,12 @@ pub trait EthBlocks: LoadBlock {
.map_err(Self::Error::from_eth_err)?;
}
let block = from_block::<Self::TransactionCompat>(
let block = from_block(
(*block).clone().unseal(),
total_difficulty.unwrap_or_default(),
full.into(),
Some(block_hash),
self.tx_resp_builder(),
)
.map_err(Self::Error::from_eth_err)?;
Ok(Some(block))

View File

@ -191,7 +191,7 @@ pub trait EthCall: Call + LoadPendingBlock {
results.push((env.tx.caller, res.result));
}
let block = simulate::build_block::<Self::TransactionCompat>(
let block = simulate::build_block(
results,
transactions,
&block_env,
@ -199,6 +199,7 @@ pub trait EthCall: Call + LoadPendingBlock {
total_difficulty,
return_full_transactions,
&db,
this.tx_resp_builder(),
)?;
parent_hash = block.inner.header.hash;

View File

@ -209,9 +209,10 @@ pub trait EthTransactions: LoadTransaction {
index: Some(index as u64),
};
return Ok(Some(from_recovered_with_block_context::<Self::TransactionCompat>(
return Ok(Some(from_recovered_with_block_context(
tx.clone().with_signer(*signer),
tx_info,
self.tx_resp_builder(),
)))
}
}
@ -237,7 +238,7 @@ pub trait EthTransactions: LoadTransaction {
LoadState::pool(self).get_transaction_by_sender_and_nonce(sender, nonce)
{
let transaction = tx.transaction.clone().into_consensus();
return Ok(Some(from_recovered::<Self::TransactionCompat>(transaction.into())));
return Ok(Some(from_recovered(transaction.into(), self.tx_resp_builder())));
}
}
@ -288,9 +289,10 @@ pub trait EthTransactions: LoadTransaction {
base_fee: base_fee_per_gas.map(u128::from),
index: Some(index as u64),
};
from_recovered_with_block_context::<Self::TransactionCompat>(
from_recovered_with_block_context(
tx.clone().with_signer(*signer),
tx_info,
self.tx_resp_builder(),
)
})
})

View File

@ -23,12 +23,19 @@ pub trait EthApiTypes: Send + Sync + Clone {
type NetworkTypes: Network<HeaderResponse = alloy_rpc_types::Header>;
/// Conversion methods for transaction RPC type.
type TransactionCompat: Send + Sync + Clone + fmt::Debug;
/// Returns reference to transaction response builder.
fn tx_resp_builder(&self) -> &Self::TransactionCompat;
}
impl EthApiTypes for () {
type Error = EthApiError;
type NetworkTypes = AnyNetwork;
type TransactionCompat = ();
fn tx_resp_builder(&self) -> &Self::TransactionCompat {
self
}
}
/// Adapter for network specific transaction type.

View File

@ -172,6 +172,7 @@ where
}
/// Handles outputs of the calls execution and builds a [`SimulatedBlock`].
#[expect(clippy::too_many_arguments)]
pub fn build_block<T: TransactionCompat>(
results: Vec<(Address, ExecutionResult)>,
transactions: Vec<TransactionSigned>,
@ -180,6 +181,7 @@ pub fn build_block<T: TransactionCompat>(
total_difficulty: U256,
full_transactions: bool,
db: &CacheDB<StateProviderDatabase<StateProviderTraitObjWrapper<'_>>>,
tx_resp_builder: &T,
) -> Result<SimulatedBlock<Block<T::Transaction>>, EthApiError> {
let mut calls: Vec<SimCallResult> = Vec::with_capacity(results.len());
let mut senders = Vec::with_capacity(results.len());
@ -304,6 +306,6 @@ pub fn build_block<T: TransactionCompat>(
let txs_kind =
if full_transactions { BlockTransactionsKind::Full } else { BlockTransactionsKind::Hashes };
let block = from_block::<T>(block, total_difficulty, txs_kind, None)?;
let block = from_block(block, total_difficulty, txs_kind, None, tx_resp_builder)?;
Ok(SimulatedBlock { inner: block, calls })
}

View File

@ -41,9 +41,9 @@ impl TransactionSource {
}
/// Conversion into network specific transaction type.
pub fn into_transaction<T: TransactionCompat>(self) -> T::Transaction {
pub fn into_transaction<T: TransactionCompat>(self, resp_builder: &T) -> T::Transaction {
match self {
Self::Pool(tx) => from_recovered::<T>(tx),
Self::Pool(tx) => from_recovered(tx, resp_builder),
Self::Block { transaction, index, block_hash, block_number, base_fee } => {
let tx_info = TransactionInfo {
hash: Some(transaction.hash()),
@ -53,7 +53,7 @@ impl TransactionSource {
base_fee: base_fee.map(u128::from),
};
from_recovered_with_block_context::<T>(transaction, tx_info)
from_recovered_with_block_context(transaction, tx_info, resp_builder)
}
}
}

View File

@ -26,5 +26,8 @@ alloy-serde.workspace = true
alloy-rpc-types-engine.workspace = true
alloy-consensus.workspace = true
# io
serde.workspace = true
[dev-dependencies]
serde_json.workspace = true

View File

@ -20,12 +20,15 @@ pub fn from_block<T: TransactionCompat>(
total_difficulty: U256,
kind: BlockTransactionsKind,
block_hash: Option<B256>,
tx_resp_builder: &T,
) -> Result<Block<T::Transaction>, BlockError> {
match kind {
BlockTransactionsKind::Hashes => {
Ok(from_block_with_tx_hashes::<T::Transaction>(block, total_difficulty, block_hash))
}
BlockTransactionsKind::Full => from_block_full::<T>(block, total_difficulty, block_hash),
BlockTransactionsKind::Full => {
from_block_full::<T>(block, total_difficulty, block_hash, tx_resp_builder)
}
}
}
@ -60,6 +63,7 @@ pub fn from_block_full<T: TransactionCompat>(
mut block: BlockWithSenders,
total_difficulty: U256,
block_hash: Option<B256>,
tx_resp_builder: &T,
) -> Result<Block<T::Transaction>, BlockError> {
let block_hash = block_hash.unwrap_or_else(|| block.block.header.hash_slow());
let block_number = block.block.number;
@ -83,7 +87,7 @@ pub fn from_block_full<T: TransactionCompat>(
index: Some(idx as u64),
};
from_recovered_with_block_context::<T>(signed_tx_ec_recovered, tx_info)
from_recovered_with_block_context::<T>(signed_tx_ec_recovered, tx_info, tx_resp_builder)
})
.collect::<Vec<_>>();

View File

@ -2,6 +2,7 @@
mod signature;
pub use signature::*;
use std::fmt;
use alloy_consensus::Transaction as _;
@ -11,6 +12,7 @@ use alloy_rpc_types::{
};
use alloy_serde::WithOtherFields;
use reth_primitives::{TransactionSigned, TransactionSignedEcRecovered, TxType};
use serde::{Deserialize, Serialize};
/// Create a new rpc transaction result for a mined transaction, using the given block hash,
/// number, and tx index fields to populate the corresponding fields in the rpc result.
@ -20,21 +22,33 @@ use reth_primitives::{TransactionSigned, TransactionSignedEcRecovered, TxType};
pub fn from_recovered_with_block_context<T: TransactionCompat>(
tx: TransactionSignedEcRecovered,
tx_info: TransactionInfo,
resp_builder: &T,
) -> T::Transaction {
T::fill(tx, tx_info)
resp_builder.fill(tx, tx_info)
}
/// Create a new rpc transaction result for a _pending_ signed transaction, setting block
/// environment related fields to `None`.
pub fn from_recovered<T: TransactionCompat>(tx: TransactionSignedEcRecovered) -> T::Transaction {
T::fill(tx, TransactionInfo::default())
pub fn from_recovered<T: TransactionCompat>(
tx: TransactionSignedEcRecovered,
resp_builder: &T,
) -> T::Transaction {
resp_builder.fill(tx, TransactionInfo::default())
}
/// Builds RPC transaction w.r.t. network.
pub trait TransactionCompat: Send + Sync + Unpin + Clone + fmt::Debug {
/// RPC transaction response type.
type Transaction: Send + Clone + Default + fmt::Debug;
type Transaction: Serialize
+ for<'de> Deserialize<'de>
+ Send
+ Sync
+ Unpin
+ Clone
+ Default
+ fmt::Debug;
///
/// Formats gas price and max fee per gas for RPC transaction response w.r.t. network specific
/// transaction type.
fn gas_price(signed_tx: &TransactionSigned, base_fee: Option<u64>) -> GasPrice {
@ -63,7 +77,7 @@ pub trait TransactionCompat: Send + Sync + Unpin + Clone + fmt::Debug {
/// Create a new rpc transaction result for a _pending_ signed transaction, setting block
/// environment related fields to `None`.
fn fill(tx: TransactionSignedEcRecovered, tx_inf: TransactionInfo) -> Self::Transaction;
fn fill(&self, tx: TransactionSignedEcRecovered, tx_inf: TransactionInfo) -> Self::Transaction;
/// Truncates the input of a transaction to only the first 4 bytes.
// todo: remove in favour of using constructor on `TransactionResponse` or similar
@ -80,7 +94,11 @@ impl TransactionCompat for () {
// `alloy_network::AnyNetwork`
type Transaction = WithOtherFields<Transaction>;
fn fill(_tx: TransactionSignedEcRecovered, _tx_info: TransactionInfo) -> Self::Transaction {
fn fill(
&self,
_tx: TransactionSignedEcRecovered,
_tx_info: TransactionInfo,
) -> Self::Transaction {
WithOtherFields::default()
}

View File

@ -36,12 +36,15 @@ use crate::eth::EthTxBuilder;
#[derive(Deref)]
pub struct EthApi<Provider, Pool, Network, EvmConfig> {
/// All nested fields bundled together.
#[deref]
pub(super) inner: Arc<EthApiInner<Provider, Pool, Network, EvmConfig>>,
/// Transaction RPC response builder.
pub tx_resp_builder: EthTxBuilder,
}
impl<Provider, Pool, Network, EvmConfig> Clone for EthApi<Provider, Pool, Network, EvmConfig> {
fn clone(&self) -> Self {
Self { inner: self.inner.clone() }
Self { inner: self.inner.clone(), tx_resp_builder: EthTxBuilder }
}
}
@ -81,7 +84,7 @@ where
proof_permits,
);
Self { inner: Arc::new(inner) }
Self { inner: Arc::new(inner), tx_resp_builder: EthTxBuilder }
}
}
@ -119,7 +122,7 @@ where
ctx.config.proof_permits,
);
Self { inner: Arc::new(inner) }
Self { inner: Arc::new(inner), tx_resp_builder: EthTxBuilder }
}
}
@ -131,6 +134,10 @@ where
// todo: replace with alloy_network::Ethereum
type NetworkTypes = AnyNetwork;
type TransactionCompat = EthTxBuilder;
fn tx_resp_builder(&self) -> &Self::TransactionCompat {
&self.tx_resp_builder
}
}
impl<Provider, Pool, Network, EvmConfig> std::fmt::Debug

View File

@ -4,7 +4,6 @@ use std::{
collections::HashMap,
fmt,
iter::StepBy,
marker::PhantomData,
ops::RangeInclusive,
sync::Arc,
time::{Duration, Instant},
@ -44,7 +43,7 @@ pub struct EthFilter<Provider, Pool, Eth: EthApiTypes> {
/// All nested fields bundled together
inner: Arc<EthFilterInner<Provider, Pool, RpcTransaction<Eth::NetworkTypes>>>,
/// Assembles response data w.r.t. network.
_tx_resp_builder: PhantomData<Eth>,
tx_resp_builder: Eth::TransactionCompat,
}
impl<Provider, Pool, Eth> Clone for EthFilter<Provider, Pool, Eth>
@ -52,7 +51,7 @@ where
Eth: EthApiTypes,
{
fn clone(&self) -> Self {
Self { inner: self.inner.clone(), _tx_resp_builder: PhantomData }
Self { inner: self.inner.clone(), tx_resp_builder: self.tx_resp_builder.clone() }
}
}
@ -76,6 +75,7 @@ where
eth_cache: EthStateCache,
config: EthFilterConfig,
task_spawner: Box<dyn TaskSpawner>,
tx_resp_builder: Eth::TransactionCompat,
) -> Self {
let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
config;
@ -93,7 +93,7 @@ where
max_logs_per_response: max_logs_per_response.unwrap_or(usize::MAX),
};
let eth_filter = Self { inner: Arc::new(inner), _tx_resp_builder: PhantomData };
let eth_filter = Self { inner: Arc::new(inner), tx_resp_builder };
let this = eth_filter.clone();
eth_filter.inner.task_spawner.spawn_critical(
@ -278,7 +278,7 @@ where
PendingTransactionFilterKind::Full => {
let stream = self.inner.pool.new_pending_pool_transactions_listener();
let full_txs_receiver =
FullTransactionsReceiver::<_, Eth::TransactionCompat>::new(stream);
FullTransactionsReceiver::new(stream, self.tx_resp_builder.clone());
FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
full_txs_receiver,
)))
@ -603,7 +603,7 @@ impl PendingTransactionsReceiver {
#[derive(Debug, Clone)]
struct FullTransactionsReceiver<T: PoolTransaction, TxCompat> {
txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
_tx_resp_builder: PhantomData<TxCompat>,
tx_resp_builder: TxCompat,
}
impl<T, TxCompat> FullTransactionsReceiver<T, TxCompat>
@ -612,8 +612,8 @@ where
TxCompat: TransactionCompat,
{
/// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
fn new(stream: NewSubpoolTransactionStream<T>) -> Self {
Self { txs_stream: Arc::new(Mutex::new(stream)), _tx_resp_builder: PhantomData }
fn new(stream: NewSubpoolTransactionStream<T>, tx_resp_builder: TxCompat) -> Self {
Self { txs_stream: Arc::new(Mutex::new(stream)), tx_resp_builder }
}
/// Returns all new pending transactions received since the last poll.
@ -625,7 +625,10 @@ where
let mut prepared_stream = self.txs_stream.lock().await;
while let Ok(tx) = prepared_stream.try_recv() {
pending_txs.push(from_recovered::<TxCompat>(tx.transaction.to_recovered_transaction()))
pending_txs.push(from_recovered(
tx.transaction.to_recovered_transaction(),
&self.tx_resp_builder,
))
}
FilterChanges::Transactions(pending_txs)
}

View File

@ -21,7 +21,11 @@ where
{
type Transaction = <AnyNetwork as Network>::TransactionResponse;
fn fill(tx: TransactionSignedEcRecovered, tx_info: TransactionInfo) -> Self::Transaction {
fn fill(
&self,
tx: TransactionSignedEcRecovered,
tx_info: TransactionInfo,
) -> Self::Transaction {
let signer = tx.signer();
let signed_tx = tx.into_signed();

View File

@ -1,6 +1,6 @@
//! `eth_` `PubSub` RPC handler implementation
use std::{marker::PhantomData, sync::Arc};
use std::sync::Arc;
use alloy_primitives::TxHash;
use alloy_rpc_types::{
@ -17,7 +17,7 @@ use jsonrpsee::{
};
use reth_network_api::NetworkInfo;
use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider};
use reth_rpc_eth_api::{pubsub::EthPubSubApiServer, FullEthApiTypes, RpcTransaction};
use reth_rpc_eth_api::{pubsub::EthPubSubApiServer, TransactionCompat};
use reth_rpc_eth_types::logs_utils;
use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
use reth_rpc_types_compat::transaction::from_recovered;
@ -38,7 +38,7 @@ pub struct EthPubSub<Provider, Pool, Events, Network, Eth> {
inner: Arc<EthPubSubInner<Provider, Pool, Events, Network>>,
/// The type that's used to spawn subscription tasks.
subscription_task_spawner: Box<dyn TaskSpawner>,
_tx_resp_builder: PhantomData<Eth>,
tx_resp_builder: Eth,
}
// === impl EthPubSub ===
@ -47,13 +47,20 @@ impl<Provider, Pool, Events, Network, Eth> EthPubSub<Provider, Pool, Events, Net
/// Creates a new, shareable instance.
///
/// Subscription tasks are spawned via [`tokio::task::spawn`]
pub fn new(provider: Provider, pool: Pool, chain_events: Events, network: Network) -> Self {
pub fn new(
provider: Provider,
pool: Pool,
chain_events: Events,
network: Network,
tx_resp_builder: Eth,
) -> Self {
Self::with_spawner(
provider,
pool,
chain_events,
network,
Box::<TokioTaskExecutor>::default(),
tx_resp_builder,
)
}
@ -64,21 +71,22 @@ impl<Provider, Pool, Events, Network, Eth> EthPubSub<Provider, Pool, Events, Net
chain_events: Events,
network: Network,
subscription_task_spawner: Box<dyn TaskSpawner>,
tx_resp_builder: Eth,
) -> Self {
let inner = EthPubSubInner { provider, pool, chain_events, network };
Self { inner: Arc::new(inner), subscription_task_spawner, _tx_resp_builder: PhantomData }
Self { inner: Arc::new(inner), subscription_task_spawner, tx_resp_builder }
}
}
#[async_trait::async_trait]
impl<Provider, Pool, Events, Network, Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>>
impl<Provider, Pool, Events, Network, Eth> EthPubSubApiServer<Eth::Transaction>
for EthPubSub<Provider, Pool, Events, Network, Eth>
where
Provider: BlockReader + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
Network: NetworkInfo + Clone + 'static,
Eth: FullEthApiTypes + 'static,
Eth: TransactionCompat + 'static,
{
/// Handler for `eth_subscribe`
async fn subscribe(
@ -89,8 +97,9 @@ where
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let pubsub = self.inner.clone();
let resp_builder = self.tx_resp_builder.clone();
self.subscription_task_spawner.spawn(Box::pin(async move {
let _ = handle_accepted::<_, _, _, _, Eth>(pubsub, sink, kind, params).await;
let _ = handle_accepted(pubsub, sink, kind, params, resp_builder).await;
}));
Ok(())
@ -103,13 +112,14 @@ async fn handle_accepted<Provider, Pool, Events, Network, Eth>(
accepted_sink: SubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
tx_resp_builder: Eth,
) -> Result<(), ErrorObject<'static>>
where
Provider: BlockReader + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
Network: NetworkInfo + Clone + 'static,
Eth: FullEthApiTypes,
Eth: TransactionCompat,
{
match kind {
SubscriptionKind::NewHeads => {
@ -140,10 +150,9 @@ where
Params::Bool(true) => {
// full transaction objects requested
let stream = pubsub.full_pending_transaction_stream().map(|tx| {
EthSubscriptionResult::FullTransaction(Box::new(from_recovered::<
Eth::TransactionCompat,
>(
EthSubscriptionResult::FullTransaction(Box::new(from_recovered(
tx.transaction.to_recovered_transaction(),
&tx_resp_builder,
)))
});
return pipe_from_stream(accepted_sink, stream).await

View File

@ -1,4 +1,4 @@
use std::{collections::BTreeMap, marker::PhantomData};
use std::collections::BTreeMap;
use alloy_consensus::Transaction;
use alloy_primitives::Address;
@ -9,7 +9,6 @@ use async_trait::async_trait;
use jsonrpsee::core::RpcResult as Result;
use reth_primitives::TransactionSignedEcRecovered;
use reth_rpc_api::TxPoolApiServer;
use reth_rpc_eth_api::{FullEthApiTypes, RpcTransaction};
use reth_rpc_types_compat::{transaction::from_recovered, TransactionCompat};
use reth_transaction_pool::{AllPoolTransactions, PoolTransaction, TransactionPool};
use tracing::trace;
@ -21,33 +20,34 @@ use tracing::trace;
pub struct TxPoolApi<Pool, Eth> {
/// An interface to interact with the pool
pool: Pool,
_tx_resp_builder: PhantomData<Eth>,
tx_resp_builder: Eth,
}
impl<Pool, Eth> TxPoolApi<Pool, Eth> {
/// Creates a new instance of `TxpoolApi`.
pub const fn new(pool: Pool) -> Self {
Self { pool, _tx_resp_builder: PhantomData }
pub const fn new(pool: Pool, tx_resp_builder: Eth) -> Self {
Self { pool, tx_resp_builder }
}
}
impl<Pool, Eth> TxPoolApi<Pool, Eth>
where
Pool: TransactionPool + 'static,
Eth: FullEthApiTypes,
Eth: TransactionCompat,
{
fn content(&self) -> TxpoolContent<RpcTransaction<Eth::NetworkTypes>> {
fn content(&self) -> TxpoolContent<Eth::Transaction> {
#[inline]
fn insert<Tx, RpcTxB>(
tx: &Tx,
content: &mut BTreeMap<Address, BTreeMap<String, RpcTxB::Transaction>>,
resp_builder: &RpcTxB,
) where
Tx: PoolTransaction<Consensus: Into<TransactionSignedEcRecovered>>,
RpcTxB: TransactionCompat,
{
content.entry(tx.sender()).or_default().insert(
tx.nonce().to_string(),
from_recovered::<RpcTxB>(tx.clone().into_consensus().into()),
from_recovered(tx.clone().into_consensus().into(), resp_builder),
);
}
@ -55,10 +55,10 @@ where
let mut content = TxpoolContent { pending: BTreeMap::new(), queued: BTreeMap::new() };
for pending in pending {
insert::<_, Eth::TransactionCompat>(&pending.transaction, &mut content.pending);
insert::<_, Eth>(&pending.transaction, &mut content.pending, &self.tx_resp_builder);
}
for queued in queued {
insert::<_, Eth::TransactionCompat>(&queued.transaction, &mut content.queued);
insert::<_, Eth>(&queued.transaction, &mut content.queued, &self.tx_resp_builder);
}
content
@ -66,10 +66,10 @@ where
}
#[async_trait]
impl<Pool, Eth> TxPoolApiServer<RpcTransaction<Eth::NetworkTypes>> for TxPoolApi<Pool, Eth>
impl<Pool, Eth> TxPoolApiServer<Eth::Transaction> for TxPoolApi<Pool, Eth>
where
Pool: TransactionPool + 'static,
Eth: FullEthApiTypes + 'static,
Eth: TransactionCompat + 'static,
{
/// Returns the number of transactions currently pending for inclusion in the next block(s), as
/// well as the ones that are being scheduled for future execution only.
@ -131,7 +131,7 @@ where
async fn txpool_content_from(
&self,
from: Address,
) -> Result<TxpoolContentFrom<RpcTransaction<Eth::NetworkTypes>>> {
) -> Result<TxpoolContentFrom<Eth::Transaction>> {
trace!(target: "rpc::eth", ?from, "Serving txpool_contentFrom");
Ok(self.content().remove_from(&from))
}
@ -141,7 +141,7 @@ where
///
/// See [here](https://geth.ethereum.org/docs/rpc/ns-txpool#txpool_content) for more details
/// Handler for `txpool_content`
async fn txpool_content(&self) -> Result<TxpoolContent<RpcTransaction<Eth::NetworkTypes>>> {
async fn txpool_content(&self) -> Result<TxpoolContent<Eth::Transaction>> {
trace!(target: "rpc::eth", "Serving txpool_content");
Ok(self.content())
}