feat(rpc): basic eth filter support (#1320)

This commit is contained in:
Matthias Seitz
2023-02-14 15:55:04 +01:00
committed by GitHub
parent 0aaf51d462
commit 7b1624d592
2 changed files with 103 additions and 34 deletions

View File

@ -1,34 +1,34 @@
use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc}; use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc, types::SubscriptionId};
use reth_primitives::{rpc::Filter, U256}; use reth_primitives::rpc::Filter;
use reth_rpc_types::{FilterChanges, Index, Log}; use reth_rpc_types::{FilterChanges, Log};
/// Rpc Interface for poll-based ethereum filter API. /// Rpc Interface for poll-based ethereum filter API.
#[cfg_attr(not(feature = "client"), rpc(server))] #[cfg_attr(not(feature = "client"), rpc(server))]
#[cfg_attr(feature = "client", rpc(server, client))] #[cfg_attr(feature = "client", rpc(server))] // TODO(mattsse) make it work with SubscriptionId lifetime
pub trait EthFilterApi { pub trait EthFilterApi {
/// Creates anew filter and returns its id. /// Creates anew filter and returns its id.
#[method(name = "eth_newFilter")] #[method(name = "eth_newFilter")]
fn new_filter(&self, filter: Filter) -> Result<U256>; async fn new_filter(&self, filter: Filter) -> Result<SubscriptionId<'static>>;
/// Creates a new block filter and returns its id. /// Creates a new block filter and returns its id.
#[method(name = "eth_newBlockFilter")] #[method(name = "eth_newBlockFilter")]
fn new_block_filter(&self) -> Result<U256>; async fn new_block_filter(&self) -> Result<SubscriptionId<'static>>;
/// Creates a pending transaction filter and returns its id. /// Creates a pending transaction filter and returns its id.
#[method(name = "eth_newPendingTransactionFilter")] #[method(name = "eth_newPendingTransactionFilter")]
fn new_pending_transaction_filter(&self) -> Result<U256>; async fn new_pending_transaction_filter(&self) -> Result<SubscriptionId<'static>>;
/// Returns all filter changes since last poll. /// Returns all filter changes since last poll.
#[method(name = "eth_getFilterChanges")] #[method(name = "eth_getFilterChanges")]
async fn filter_changes(&self, index: Index) -> Result<FilterChanges>; async fn filter_changes(&self, id: SubscriptionId<'_>) -> Result<FilterChanges>;
/// Returns all logs matching given filter (in a range 'from' - 'to'). /// Returns all logs matching given filter (in a range 'from' - 'to').
#[method(name = "eth_getFilterLogs")] #[method(name = "eth_getFilterLogs")]
async fn filter_logs(&self, index: Index) -> Result<Vec<Log>>; async fn filter_logs(&self, id: SubscriptionId<'_>) -> Result<Vec<Log>>;
/// Uninstalls filter. /// Uninstalls filter.
#[method(name = "eth_uninstallFilter")] #[method(name = "eth_uninstallFilter")]
fn uninstall_filter(&self, index: Index) -> Result<bool>; async fn uninstall_filter(&self, id: SubscriptionId<'_>) -> Result<bool>;
/// Returns logs matching given filter object. /// Returns logs matching given filter object.
#[method(name = "eth_getLogs")] #[method(name = "eth_getLogs")]

View File

@ -1,11 +1,18 @@
use crate::result::{internal_rpc_err, ToRpcResult};
use async_trait::async_trait; use async_trait::async_trait;
use jsonrpsee::core::RpcResult; use jsonrpsee::{
use reth_primitives::{rpc::Filter, U256}; core::RpcResult,
server::{IdProvider, RandomIntegerIdProvider},
types::SubscriptionId,
};
use reth_primitives::rpc::Filter;
use reth_provider::BlockProvider; use reth_provider::BlockProvider;
use reth_rpc_api::EthFilterApiServer; use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{FilterChanges, Index, Log}; use reth_rpc_types::{FilterChanges, Log};
use reth_transaction_pool::TransactionPool; use reth_transaction_pool::TransactionPool;
use std::sync::Arc; use std::{collections::HashMap, sync::Arc, time::Instant};
use tokio::sync::Mutex;
use tracing::trace;
/// `Eth` filter RPC implementation. /// `Eth` filter RPC implementation.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -16,10 +23,20 @@ pub struct EthFilter<Client, Pool> {
impl<Client, Pool> EthFilter<Client, Pool> { impl<Client, Pool> EthFilter<Client, Pool> {
/// Creates a new, shareable instance. /// Creates a new, shareable instance.
pub fn new(client: Arc<Client>, pool: Pool) -> Self { pub fn new(client: Client, pool: Pool) -> Self {
let inner = EthFilterInner { client, pool }; let inner = EthFilterInner {
client,
active_filters: Default::default(),
pool,
id_provider: Arc::new(RandomIntegerIdProvider),
};
Self { inner: Arc::new(inner) } Self { inner: Arc::new(inner) }
} }
/// Returns all currently active filters
pub fn active_filters(&self) -> &ActiveFilters {
&self.inner.active_filters
}
} }
#[async_trait] #[async_trait]
@ -28,28 +45,35 @@ where
Client: BlockProvider + 'static, Client: BlockProvider + 'static,
Pool: TransactionPool + 'static, Pool: TransactionPool + 'static,
{ {
fn new_filter(&self, _filter: Filter) -> RpcResult<U256> { async fn new_filter(&self, filter: Filter) -> RpcResult<SubscriptionId<'static>> {
self.inner.install_filter(FilterKind::Log(filter)).await
}
async fn new_block_filter(&self) -> RpcResult<SubscriptionId<'static>> {
self.inner.install_filter(FilterKind::Block).await
}
async fn new_pending_transaction_filter(&self) -> RpcResult<SubscriptionId<'static>> {
self.inner.install_filter(FilterKind::PendingTransaction).await
}
async fn filter_changes(&self, _id: SubscriptionId<'_>) -> RpcResult<FilterChanges> {
todo!() todo!()
} }
fn new_block_filter(&self) -> RpcResult<U256> { async fn filter_logs(&self, _id: SubscriptionId<'_>) -> RpcResult<Vec<Log>> {
todo!() todo!()
} }
fn new_pending_transaction_filter(&self) -> RpcResult<U256> { async fn uninstall_filter(&self, id: SubscriptionId<'_>) -> RpcResult<bool> {
todo!() let mut filters = self.inner.active_filters.inner.lock().await;
} let id = id.into_owned();
if filters.remove(&id).is_some() {
async fn filter_changes(&self, _index: Index) -> RpcResult<FilterChanges> { trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
todo!() Ok(true)
} } else {
Err(internal_rpc_err(format!("Filter id {id:?} does not exist.")))
async fn filter_logs(&self, _index: Index) -> RpcResult<Vec<Log>> { }
todo!()
}
fn uninstall_filter(&self, _index: Index) -> RpcResult<bool> {
todo!()
} }
async fn logs(&self, _filter: Filter) -> RpcResult<Vec<Log>> { async fn logs(&self, _filter: Filter) -> RpcResult<Vec<Log>> {
@ -63,6 +87,51 @@ struct EthFilterInner<Client, Pool> {
/// The transaction pool. /// The transaction pool.
pool: Pool, pool: Pool,
/// The client that can interact with the chain. /// The client that can interact with the chain.
client: Arc<Client>, client: Client,
// TODO needs spawn access /// All currently installed filters.
active_filters: ActiveFilters,
id_provider: Arc<dyn IdProvider>,
}
impl<Client, Pool> EthFilterInner<Client, Pool>
where
Client: BlockProvider + 'static,
Pool: TransactionPool + 'static,
{
/// Installs a new filter and returns the new identifier.
async fn install_filter(&self, kind: FilterKind) -> RpcResult<SubscriptionId<'static>> {
let last_poll_block_number = self.client.chain_info().to_rpc_result()?.best_number;
let id = self.id_provider.next_id();
let mut filters = self.active_filters.inner.lock().await;
filters.insert(
id.clone(),
ActiveFilter { last_poll_block_number, last_poll_timestamp: Instant::now(), kind },
);
Ok(id)
}
}
/// All active filters
#[derive(Debug, Clone, Default)]
pub struct ActiveFilters {
inner: Arc<Mutex<HashMap<SubscriptionId<'static>, ActiveFilter>>>,
}
/// An installed filter
#[derive(Debug)]
struct ActiveFilter {
/// At which block the filter was polled last.
last_poll_block_number: u64,
/// Last time this filter was polled.
last_poll_timestamp: Instant,
/// What kind of filter it is.
kind: FilterKind,
}
#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)] // stored on heap
enum FilterKind {
Log(Filter),
Block,
PendingTransaction,
} }