feat(rpc) : support for eth_newPendingTransactionFilter full rpc function (#5206)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
DoTheBestToGetTheBest
2023-10-30 11:00:30 -07:00
committed by GitHub
parent 64d50643c8
commit ed9b9a7d82
6 changed files with 184 additions and 25 deletions

View File

@ -1,6 +1,5 @@
use jsonrpsee::{core::RpcResult, proc_macros::rpc}; use jsonrpsee::{core::RpcResult, proc_macros::rpc};
use reth_rpc_types::{Filter, FilterChanges, FilterId, Log}; use reth_rpc_types::{Filter, FilterChanges, FilterId, Log, PendingTransactionFilterKind};
/// Rpc Interface for poll-based ethereum filter API. /// Rpc Interface for poll-based ethereum filter API.
#[cfg_attr(not(feature = "client"), rpc(server, namespace = "eth"))] #[cfg_attr(not(feature = "client"), rpc(server, namespace = "eth"))]
#[cfg_attr(feature = "client", rpc(server, client, namespace = "eth"))] #[cfg_attr(feature = "client", rpc(server, client, namespace = "eth"))]
@ -15,7 +14,10 @@ pub trait EthFilterApi {
/// Creates a pending transaction filter and returns its id. /// Creates a pending transaction filter and returns its id.
#[method(name = "newPendingTransactionFilter")] #[method(name = "newPendingTransactionFilter")]
async fn new_pending_transaction_filter(&self) -> RpcResult<FilterId>; async fn new_pending_transaction_filter(
&self,
kind: Option<PendingTransactionFilterKind>,
) -> RpcResult<FilterId>;
/// Returns all filter changes since last poll. /// Returns all filter changes since last poll.
#[method(name = "getFilterChanges")] #[method(name = "getFilterChanges")]

View File

@ -18,7 +18,10 @@ use reth_rpc_api::{
Web3ApiClient, Web3ApiClient,
}; };
use reth_rpc_builder::RethRpcModule; use reth_rpc_builder::RethRpcModule;
use reth_rpc_types::{trace::filter::TraceFilter, CallRequest, Filter, Index, TransactionRequest}; use reth_rpc_types::{
trace::filter::TraceFilter, CallRequest, Filter, Index, PendingTransactionFilterKind,
TransactionRequest,
};
use std::collections::HashSet; use std::collections::HashSet;
fn is_unimplemented(err: Error) -> bool { fn is_unimplemented(err: Error) -> bool {
@ -36,7 +39,13 @@ where
C: ClientT + SubscriptionClientT + Sync, C: ClientT + SubscriptionClientT + Sync,
{ {
EthFilterApiClient::new_filter(client, Filter::default()).await.unwrap(); EthFilterApiClient::new_filter(client, Filter::default()).await.unwrap();
EthFilterApiClient::new_pending_transaction_filter(client).await.unwrap(); EthFilterApiClient::new_pending_transaction_filter(client, None).await.unwrap();
EthFilterApiClient::new_pending_transaction_filter(
client,
Some(PendingTransactionFilterKind::Full),
)
.await
.unwrap();
let id = EthFilterApiClient::new_block_filter(client).await.unwrap(); let id = EthFilterApiClient::new_block_filter(client).await.unwrap();
EthFilterApiClient::filter_changes(client, id.clone()).await.unwrap(); EthFilterApiClient::filter_changes(client, id.clone()).await.unwrap();
EthFilterApiClient::logs(client, Filter::default()).await.unwrap(); EthFilterApiClient::logs(client, Filter::default()).await.unwrap();

View File

@ -1,4 +1,4 @@
use crate::{eth::log::Log as RpcLog, BlockNumberOrTag, Log}; use crate::{eth::log::Log as RpcLog, BlockNumberOrTag, Log, Transaction};
use alloy_primitives::{keccak256, Address, Bloom, BloomInput, B256, U256, U64}; use alloy_primitives::{keccak256, Address, Bloom, BloomInput, B256, U256, U64};
use itertools::{EitherOrBoth::*, Itertools}; use itertools::{EitherOrBoth::*, Itertools};
use serde::{ use serde::{
@ -820,7 +820,6 @@ impl FilteredParams {
true true
} }
} }
/// Response of the `eth_getFilterChanges` RPC. /// Response of the `eth_getFilterChanges` RPC.
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq)]
pub enum FilterChanges { pub enum FilterChanges {
@ -828,6 +827,8 @@ pub enum FilterChanges {
Logs(Vec<RpcLog>), Logs(Vec<RpcLog>),
/// New hashes (block or transactions) /// New hashes (block or transactions)
Hashes(Vec<B256>), Hashes(Vec<B256>),
/// New transactions.
Transactions(Vec<Transaction>),
/// Empty result, /// Empty result,
Empty, Empty,
} }
@ -840,6 +841,7 @@ impl Serialize for FilterChanges {
match self { match self {
FilterChanges::Logs(logs) => logs.serialize(s), FilterChanges::Logs(logs) => logs.serialize(s),
FilterChanges::Hashes(hashes) => hashes.serialize(s), FilterChanges::Hashes(hashes) => hashes.serialize(s),
FilterChanges::Transactions(transactions) => transactions.serialize(s),
FilterChanges::Empty => (&[] as &[serde_json::Value]).serialize(s), FilterChanges::Empty => (&[] as &[serde_json::Value]).serialize(s),
} }
} }
@ -908,6 +910,51 @@ impl From<jsonrpsee_types::SubscriptionId<'_>> for FilterId {
} }
} }
} }
/// Specifies the kind of information you wish to receive from the `eth_newPendingTransactionFilter`
/// RPC endpoint.
///
/// When this type is used in a request, it determines whether the client wishes to receive:
/// - Only the transaction hashes (`Hashes` variant), or
/// - Full transaction details (`Full` variant).
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PendingTransactionFilterKind {
/// Receive only the hashes of the transactions.
#[default]
Hashes,
/// Receive full details of the transactions.
Full,
}
impl Serialize for PendingTransactionFilterKind {
/// Serializes the `PendingTransactionFilterKind` into a boolean value:
/// - `false` for `Hashes`
/// - `true` for `Full`
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match self {
PendingTransactionFilterKind::Hashes => false.serialize(serializer),
PendingTransactionFilterKind::Full => true.serialize(serializer),
}
}
}
impl<'a> Deserialize<'a> for PendingTransactionFilterKind {
/// Deserializes a boolean value into `PendingTransactionFilterKind`:
/// - `false` becomes `Hashes`
/// - `true` becomes `Full`
fn deserialize<D>(deserializer: D) -> Result<PendingTransactionFilterKind, D::Error>
where
D: Deserializer<'a>,
{
let val = Option::<bool>::deserialize(deserializer)?;
match val {
Some(true) => Ok(PendingTransactionFilterKind::Full),
_ => Ok(PendingTransactionFilterKind::Hashes),
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {

View File

@ -4,7 +4,6 @@ use crate::{
eth::{Filter, Transaction}, eth::{Filter, Transaction},
Log, RichHeader, Log, RichHeader,
}; };
use alloy_primitives::B256; use alloy_primitives::B256;
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};

View File

@ -7,16 +7,20 @@ use crate::{
result::{rpc_error_with_code, ToRpcResult}, result::{rpc_error_with_code, ToRpcResult},
EthSubscriptionIdProvider, EthSubscriptionIdProvider,
}; };
use alloy_primitives::B256; use core::fmt;
use async_trait::async_trait; use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider}; use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_interfaces::RethError; use reth_interfaces::RethError;
use reth_primitives::{BlockHashOrNumber, Receipt, SealedBlock, TxHash}; use reth_primitives::{BlockHashOrNumber, IntoRecoveredTransaction, Receipt, SealedBlock, TxHash};
use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider}; use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider};
use reth_rpc_api::EthFilterApiServer; use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log}; use reth_rpc_types::{
Filter, FilterBlockOption, FilterChanges, FilterId, FilteredParams, Log,
PendingTransactionFilterKind,
};
use reth_tasks::TaskSpawner; use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool; use reth_transaction_pool::{NewSubpoolTransactionStream, PoolTransaction, TransactionPool};
use std::{ use std::{
collections::HashMap, collections::HashMap,
iter::StepBy, iter::StepBy,
@ -35,7 +39,7 @@ const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500k
/// `Eth` filter RPC implementation. /// `Eth` filter RPC implementation.
pub struct EthFilter<Provider, Pool> { pub struct EthFilter<Provider, Pool> {
/// All nested fields bundled together. /// All nested fields bundled together
inner: Arc<EthFilterInner<Provider, Pool>>, inner: Arc<EthFilterInner<Provider, Pool>>,
} }
@ -120,6 +124,7 @@ impl<Provider, Pool> EthFilter<Provider, Pool>
where where
Provider: BlockReader + BlockIdReader + EvmEnvProvider + 'static, Provider: BlockReader + BlockIdReader + EvmEnvProvider + 'static,
Pool: TransactionPool + 'static, Pool: TransactionPool + 'static,
<Pool as TransactionPool>::Transaction: 'static,
{ {
/// Returns all the filter changes for the given id, if any /// Returns all the filter changes for the given id, if any
pub async fn filter_changes(&self, id: FilterId) -> Result<FilterChanges, FilterError> { pub async fn filter_changes(&self, id: FilterId) -> Result<FilterChanges, FilterError> {
@ -148,10 +153,7 @@ where
}; };
match kind { match kind {
FilterKind::PendingTransaction(receiver) => { FilterKind::PendingTransaction(filter) => Ok(filter.drain().await),
let pending_txs = receiver.drain().await;
Ok(FilterChanges::Hashes(pending_txs))
}
FilterKind::Block => { FilterKind::Block => {
// Note: we need to fetch the block hashes from inclusive range // Note: we need to fetch the block hashes from inclusive range
// [start_block..best_block] // [start_block..best_block]
@ -235,13 +237,31 @@ where
} }
/// Handler for `eth_newPendingTransactionFilter` /// Handler for `eth_newPendingTransactionFilter`
async fn new_pending_transaction_filter(&self) -> RpcResult<FilterId> { async fn new_pending_transaction_filter(
&self,
kind: Option<PendingTransactionFilterKind>,
) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter"); trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
let receiver = self.inner.pool.pending_transactions_listener();
let pending_txs_receiver = PendingTransactionsReceiver::new(receiver); let transaction_kind = match kind.unwrap_or_default() {
PendingTransactionFilterKind::Hashes => {
let receiver = self.inner.pool.pending_transactions_listener();
let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
}
PendingTransactionFilterKind::Full => {
let stream = self.inner.pool.new_pending_pool_transactions_listener();
let full_txs_receiver = FullTransactionsReceiver::new(stream);
FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
full_txs_receiver,
)))
}
};
self.inner.install_filter(FilterKind::PendingTransaction(pending_txs_receiver)).await //let filter = FilterKind::PendingTransaction(transaction_kind);
// Install the filter and propagate any errors
self.inner.install_filter(transaction_kind).await
} }
/// Handler for `eth_getFilterChanges` /// Handler for `eth_getFilterChanges`
@ -490,14 +510,81 @@ impl PendingTransactionsReceiver {
} }
/// Returns all new pending transactions received since the last poll. /// Returns all new pending transactions received since the last poll.
async fn drain(&self) -> Vec<B256> { async fn drain(&self) -> FilterChanges {
let mut pending_txs = Vec::new(); let mut pending_txs = Vec::new();
let mut prepared_stream = self.txs_receiver.lock().await; let mut prepared_stream = self.txs_receiver.lock().await;
while let Ok(tx_hash) = prepared_stream.try_recv() { while let Ok(tx_hash) = prepared_stream.try_recv() {
pending_txs.push(tx_hash); pending_txs.push(tx_hash);
} }
pending_txs
// Convert the vector of hashes into FilterChanges::Hashes
FilterChanges::Hashes(pending_txs)
}
}
/// A structure to manage and provide access to a stream of full transaction details.
#[derive(Debug, Clone)]
struct FullTransactionsReceiver<T: PoolTransaction> {
txs_stream: Arc<Mutex<NewSubpoolTransactionStream<T>>>,
}
impl<T> FullTransactionsReceiver<T>
where
T: PoolTransaction + 'static,
{
/// Creates a new `FullTransactionsReceiver` encapsulating the provided transaction stream.
fn new(stream: NewSubpoolTransactionStream<T>) -> Self {
FullTransactionsReceiver { txs_stream: Arc::new(Mutex::new(stream)) }
}
/// Returns all new pending transactions received since the last poll.
async fn drain(&self) -> FilterChanges {
let mut pending_txs = Vec::new();
let mut prepared_stream = self.txs_stream.lock().await;
while let Ok(tx) = prepared_stream.try_recv() {
pending_txs.push(reth_rpc_types_compat::transaction::from_recovered(
tx.transaction.to_recovered_transaction(),
))
}
FilterChanges::Transactions(pending_txs)
}
}
/// Helper trait for [FullTransactionsReceiver] to erase the `Transaction` type.
#[async_trait]
trait FullTransactionsFilter: fmt::Debug + Send + Sync + Unpin + 'static {
async fn drain(&self) -> FilterChanges;
}
#[async_trait]
impl<T> FullTransactionsFilter for FullTransactionsReceiver<T>
where
T: PoolTransaction + 'static,
{
async fn drain(&self) -> FilterChanges {
FullTransactionsReceiver::drain(self).await
}
}
/// Represents the kind of pending transaction data that can be retrieved.
///
/// This enum differentiates between two kinds of pending transaction data:
/// - Just the transaction hashes.
/// - Full transaction details.
#[derive(Debug, Clone)]
enum PendingTransactionKind {
Hashes(PendingTransactionsReceiver),
FullTransaction(Arc<dyn FullTransactionsFilter>),
}
impl PendingTransactionKind {
async fn drain(&self) -> FilterChanges {
match self {
PendingTransactionKind::Hashes(receiver) => receiver.drain().await,
PendingTransactionKind::FullTransaction(receiver) => receiver.drain().await,
}
} }
} }
@ -505,9 +592,8 @@ impl PendingTransactionsReceiver {
enum FilterKind { enum FilterKind {
Log(Box<Filter>), Log(Box<Filter>),
Block, Block,
PendingTransaction(PendingTransactionsReceiver), PendingTransaction(PendingTransactionKind),
} }
/// Errors that can occur in the handler implementation /// Errors that can occur in the handler implementation
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum FilterError { pub enum FilterError {

View File

@ -1113,6 +1113,22 @@ impl<Tx: PoolTransaction> NewSubpoolTransactionStream<Tx> {
pub fn new(st: Receiver<NewTransactionEvent<Tx>>, subpool: SubPool) -> Self { pub fn new(st: Receiver<NewTransactionEvent<Tx>>, subpool: SubPool) -> Self {
Self { st, subpool } Self { st, subpool }
} }
/// Tries to receive the next value for this stream.
pub fn try_recv(
&mut self,
) -> Result<NewTransactionEvent<Tx>, tokio::sync::mpsc::error::TryRecvError> {
loop {
match self.st.try_recv() {
Ok(event) => {
if event.subpool == self.subpool {
return Ok(event)
}
}
Err(e) => return Err(e),
}
}
}
} }
impl<Tx: PoolTransaction> Stream for NewSubpoolTransactionStream<Tx> { impl<Tx: PoolTransaction> Stream for NewSubpoolTransactionStream<Tx> {