feat(EthApi): Add broadcast stream for incoming raw transactions (#13165)

This commit is contained in:
Ayodeji Akinola
2024-12-11 14:54:51 +01:00
committed by GitHub
parent 1602baef6d
commit 17d38c9152
3 changed files with 57 additions and 33 deletions

View File

@ -17,10 +17,7 @@ use reth_provider::{
BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider, BlockNumReader, BlockReaderIdExt, ProviderBlock, ProviderReceipt, ProviderTx, ReceiptProvider,
TransactionsProvider, TransactionsProvider,
}; };
use reth_rpc_eth_types::{ use reth_rpc_eth_types::{utils::binary_search, EthApiError, SignError, TransactionSource};
utils::{binary_search, recover_raw_transaction},
EthApiError, SignError, TransactionSource,
};
use reth_rpc_types_compat::transaction::{from_recovered, from_recovered_with_block_context}; use reth_rpc_types_compat::transaction::{from_recovered, from_recovered_with_block_context};
use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool}; use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool};
use std::sync::Arc; use std::sync::Arc;
@ -61,6 +58,14 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
#[expect(clippy::type_complexity)] #[expect(clippy::type_complexity)]
fn signers(&self) -> &parking_lot::RwLock<Vec<Box<dyn EthSigner<ProviderTx<Self::Provider>>>>>; fn signers(&self) -> &parking_lot::RwLock<Vec<Box<dyn EthSigner<ProviderTx<Self::Provider>>>>>;
/// Decodes and recovers the transaction and submits it to the pool.
///
/// Returns the hash of the transaction.
fn send_raw_transaction(
&self,
tx: Bytes,
) -> impl Future<Output = Result<B256, Self::Error>> + Send;
/// Returns the transaction by hash. /// Returns the transaction by hash.
/// ///
/// Checks the pool and state. /// Checks the pool and state.
@ -333,29 +338,6 @@ pub trait EthTransactions: LoadTransaction<Provider: BlockReaderIdExt> {
} }
} }
/// Decodes and recovers the transaction and submits it to the pool.
///
/// Returns the hash of the transaction.
fn send_raw_transaction(
&self,
tx: Bytes,
) -> impl Future<Output = Result<B256, Self::Error>> + Send {
async move {
let recovered = recover_raw_transaction(&tx)?;
let pool_transaction =
<Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
// submit the transaction to the pool with a `Local` origin
let hash = self
.pool()
.add_transaction(TransactionOrigin::Local, pool_transaction)
.await
.map_err(Self::Error::from_eth_err)?;
Ok(hash)
}
}
/// Signs transaction with a matching signer, if any and submits the transaction to the pool. /// Signs transaction with a matching signer, if any and submits the transaction to the pool.
/// Returns the hash of the signed transaction. /// Returns the hash of the signed transaction.
fn send_transaction( fn send_transaction(

View File

@ -6,7 +6,7 @@ use std::sync::Arc;
use alloy_consensus::BlockHeader; use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumberOrTag; use alloy_eips::BlockNumberOrTag;
use alloy_network::Ethereum; use alloy_network::Ethereum;
use alloy_primitives::U256; use alloy_primitives::{Bytes, U256};
use derive_more::Deref; use derive_more::Deref;
use reth_primitives::NodePrimitives; use reth_primitives::NodePrimitives;
use reth_provider::{ use reth_provider::{
@ -26,10 +26,12 @@ use reth_tasks::{
pool::{BlockingTaskGuard, BlockingTaskPool}, pool::{BlockingTaskGuard, BlockingTaskPool},
TaskSpawner, TokioTaskExecutor, TaskSpawner, TokioTaskExecutor,
}; };
use tokio::sync::Mutex; use tokio::sync::{broadcast, Mutex};
use crate::eth::EthTxBuilder; use crate::eth::EthTxBuilder;
const DEFAULT_BROADCAST_CAPACITY: usize = 2000;
/// `Eth` API implementation. /// `Eth` API implementation.
/// ///
/// This type provides the functionality for handling `eth_` related requests. /// This type provides the functionality for handling `eth_` related requests.
@ -270,6 +272,9 @@ pub struct EthApiInner<Provider: BlockReader, Pool, Network, EvmConfig> {
/// Guard for getproof calls /// Guard for getproof calls
blocking_task_guard: BlockingTaskGuard, blocking_task_guard: BlockingTaskGuard,
/// Transaction broadcast channel
raw_tx_sender: broadcast::Sender<Bytes>,
} }
impl<Provider, Pool, Network, EvmConfig> EthApiInner<Provider, Pool, Network, EvmConfig> impl<Provider, Pool, Network, EvmConfig> EthApiInner<Provider, Pool, Network, EvmConfig>
@ -304,6 +309,8 @@ where
.unwrap_or_default(), .unwrap_or_default(),
); );
let (raw_tx_sender, _) = broadcast::channel(DEFAULT_BROADCAST_CAPACITY);
Self { Self {
provider, provider,
pool, pool,
@ -321,6 +328,7 @@ where
fee_history_cache, fee_history_cache,
evm_config, evm_config,
blocking_task_guard: BlockingTaskGuard::new(proof_permits), blocking_task_guard: BlockingTaskGuard::new(proof_permits),
raw_tx_sender,
} }
} }
} }
@ -428,6 +436,18 @@ where
pub const fn blocking_task_guard(&self) -> &BlockingTaskGuard { pub const fn blocking_task_guard(&self) -> &BlockingTaskGuard {
&self.blocking_task_guard &self.blocking_task_guard
} }
/// Returns [`broadcast::Receiver`] of new raw transactions
#[inline]
pub fn subscribe_to_raw_transactions(&self) -> broadcast::Receiver<Bytes> {
self.raw_tx_sender.subscribe()
}
/// Broadcasts raw transaction if there are active subscribers.
#[inline]
pub fn broadcast_raw_transaction(&self, raw_tx: Bytes) {
let _ = self.raw_tx_sender.send(raw_tx);
}
} }
#[cfg(test)] #[cfg(test)]

View File

@ -1,13 +1,14 @@
//! Contains RPC handler implementations specific to transactions //! Contains RPC handler implementations specific to transactions
use crate::EthApi;
use alloy_primitives::{Bytes, B256};
use reth_provider::{BlockReader, BlockReaderIdExt, ProviderTx, TransactionsProvider}; use reth_provider::{BlockReader, BlockReaderIdExt, ProviderTx, TransactionsProvider};
use reth_rpc_eth_api::{ use reth_rpc_eth_api::{
helpers::{EthSigner, EthTransactions, LoadTransaction, SpawnBlocking}, helpers::{EthSigner, EthTransactions, LoadTransaction, SpawnBlocking},
FullEthApiTypes, RpcNodeCoreExt, FromEthApiError, FullEthApiTypes, RpcNodeCore, RpcNodeCoreExt,
}; };
use reth_transaction_pool::TransactionPool; use reth_rpc_eth_types::utils::recover_raw_transaction;
use reth_transaction_pool::{PoolTransaction, TransactionOrigin, TransactionPool};
use crate::EthApi;
impl<Provider, Pool, Network, EvmConfig> EthTransactions impl<Provider, Pool, Network, EvmConfig> EthTransactions
for EthApi<Provider, Pool, Network, EvmConfig> for EthApi<Provider, Pool, Network, EvmConfig>
@ -19,6 +20,27 @@ where
fn signers(&self) -> &parking_lot::RwLock<Vec<Box<dyn EthSigner<ProviderTx<Self::Provider>>>>> { fn signers(&self) -> &parking_lot::RwLock<Vec<Box<dyn EthSigner<ProviderTx<Self::Provider>>>>> {
self.inner.signers() self.inner.signers()
} }
/// Decodes and recovers the transaction and submits it to the pool.
///
/// Returns the hash of the transaction.
async fn send_raw_transaction(&self, tx: Bytes) -> Result<B256, Self::Error> {
let recovered = recover_raw_transaction(&tx)?;
// broadcast raw transaction to subscribers if there is any.
self.broadcast_raw_transaction(tx);
let pool_transaction = <Self::Pool as TransactionPool>::Transaction::from_pooled(recovered);
// submit the transaction to the pool with a `Local` origin
let hash = self
.pool()
.add_transaction(TransactionOrigin::Local, pool_transaction)
.await
.map_err(Self::Error::from_eth_err)?;
Ok(hash)
}
} }
impl<Provider, Pool, Network, EvmConfig> LoadTransaction impl<Provider, Pool, Network, EvmConfig> LoadTransaction