feat(rpc): impl filter changes (#1373)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Matthias Seitz
2023-02-17 18:02:36 +01:00
committed by GitHub
parent a0410e46b7
commit 26b606fda5
3 changed files with 176 additions and 10 deletions

View File

@ -1,10 +1,16 @@
use crate::result::{internal_rpc_err, ToRpcResult};
use crate::{
eth::error::EthApiError,
result::{internal_rpc_err, rpc_error_with_code, ToRpcResult},
};
use async_trait::async_trait;
use jsonrpsee::{
core::RpcResult,
server::{IdProvider, RandomIntegerIdProvider},
};
use reth_primitives::rpc::Filter;
use reth_primitives::{
rpc::{Filter, FilterBlockOption, FilteredParams},
U256,
};
use reth_provider::BlockProvider;
use reth_rpc_api::EthFilterApiServer;
use reth_rpc_types::{FilterChanges, FilterId, Log};
@ -13,6 +19,9 @@ use std::{collections::HashMap, sync::Arc, time::Instant};
use tokio::sync::Mutex;
use tracing::trace;
/// The default maximum of logs in a single response.
const DEFAULT_MAX_LOGS_IN_RESPONSE: usize = 2_000;
/// `Eth` filter RPC implementation.
#[derive(Debug, Clone)]
pub struct EthFilter<Client, Pool> {
@ -28,6 +37,7 @@ impl<Client, Pool> EthFilter<Client, Pool> {
active_filters: Default::default(),
pool,
id_provider: Arc::new(RandomIntegerIdProvider),
max_logs_in_response: DEFAULT_MAX_LOGS_IN_RESPONSE,
};
Self { inner: Arc::new(inner) }
}
@ -45,7 +55,7 @@ where
Pool: TransactionPool + 'static,
{
async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
self.inner.install_filter(FilterKind::Log(filter)).await
self.inner.install_filter(FilterKind::Log(Box::new(filter))).await
}
async fn new_block_filter(&self) -> RpcResult<FilterId> {
@ -56,8 +66,74 @@ where
self.inner.install_filter(FilterKind::PendingTransaction).await
}
async fn filter_changes(&self, _id: FilterId) -> RpcResult<FilterChanges> {
todo!()
async fn filter_changes(&self, id: FilterId) -> RpcResult<FilterChanges> {
let info = self.inner.client.chain_info().to_rpc_result()?;
let best_number = info.best_number;
let (start_block, kind) = {
let mut filters = self.inner.active_filters.inner.lock().await;
let mut filter = filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id))?;
// update filter
// we fetch all changes from [filter.block..best_block], so we advance the filter's
// block to `best_block +1`
let mut block = best_number + 1;
std::mem::swap(&mut filter.block, &mut block);
filter.last_poll_timestamp = Instant::now();
(block, filter.kind.clone())
};
match kind {
FilterKind::PendingTransaction => {
return Err(internal_rpc_err("method not implemented"))
}
FilterKind::Block => {
let mut block_hashes = Vec::new();
for block_num in start_block..best_number {
let block_hash = self
.inner
.client
.block_hash(U256::from(block_num))
.to_rpc_result()?
.ok_or(EthApiError::UnknownBlockNumber)?;
block_hashes.push(block_hash);
}
Ok(FilterChanges::Hashes(block_hashes))
}
FilterKind::Log(filter) => {
let mut from_block_number = start_block;
let mut to_block_number = best_number;
match filter.block_option {
FilterBlockOption::Range { from_block, to_block } => {
// from block is maximum of block from last poll or `from_block` of filter
if let Some(filter_from_block) =
from_block.and_then(|num| info.convert_block_number(num))
{
from_block_number = start_block.max(filter_from_block)
}
// to block is max the best number
if let Some(filter_to_block) =
to_block.and_then(|num| info.convert_block_number(num))
{
to_block_number = filter_to_block;
if to_block_number > best_number {
to_block_number = best_number;
}
}
}
FilterBlockOption::AtBlockHash(_) => {
// blockHash is equivalent to fromBlock = toBlock = the block number with
// hash blockHash
}
}
self.inner
.filter_logs(&filter, from_block_number, to_block_number)
.map(FilterChanges::Logs)
}
}
}
async fn filter_logs(&self, _id: FilterId) -> RpcResult<Vec<Log>> {
@ -88,7 +164,10 @@ struct EthFilterInner<Client, Pool> {
client: Client,
/// All currently installed filters.
active_filters: ActiveFilters,
/// Provides ids to identify filters
id_provider: Arc<dyn IdProvider>,
/// Maximum number of logs that can be returned in a response
max_logs_in_response: usize,
}
impl<Client, Pool> EthFilterInner<Client, Pool>
@ -103,10 +182,59 @@ where
let mut filters = self.active_filters.inner.lock().await;
filters.insert(
id.clone(),
ActiveFilter { last_poll_block_number, last_poll_timestamp: Instant::now(), kind },
ActiveFilter {
block: last_poll_block_number,
last_poll_timestamp: Instant::now(),
kind,
},
);
Ok(id)
}
/// Returns all logs in the given range that match the filter
///
/// Returns an error if:
/// - underlying database error
/// - amount of matches exceeds configured limit
#[allow(dead_code)]
fn filter_logs(
&self,
filter: &Filter,
_from_block: u64,
_to_block: u64,
) -> RpcResult<Vec<Log>> {
let logs = Vec::new();
let topics = if filter.has_topics() {
let params = FilteredParams::new(Some(filter.clone()));
Some(params.flat_topics)
} else {
None
};
let _address_filter = FilteredParams::address_filter(&filter.address);
let _topics_filter = FilteredParams::topics_filter(&topics);
// TODO blocked by <https://github.com/paradigmxyz/reth/issues/1371>
// let block_number = from_block;
//
// while block_number <= to_block {
// let _block = self
// .client
// .block_by_number(BlockNumber::Number(block_number.into()))
// .to_rpc_result()?
// .ok_or(EthApiError::UnknownBlockNumber)?;
//
//
//
// if FilteredParams::matches_address(block.header.logs_bloom, &address_filter) &&
// FilteredParams::matches_topics(block.header.logs_bloom, &topics_filter)
// {
// // TODO filter logs from transactions
// }
// }
Ok(logs)
}
}
/// All active filters
@ -119,7 +247,7 @@ pub struct ActiveFilters {
#[derive(Debug)]
struct ActiveFilter {
/// At which block the filter was polled last.
last_poll_block_number: u64,
block: u64,
/// Last time this filter was polled.
last_poll_timestamp: Instant,
/// What kind of filter it is.
@ -127,9 +255,28 @@ struct ActiveFilter {
}
#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)] // stored on heap
#[allow(clippy::large_enum_variant)]
enum FilterKind {
Log(Filter),
Log(Box<Filter>),
Block,
PendingTransaction,
}
/// Errors that can occur in the handler implementation
#[derive(Debug, Clone, thiserror::Error)]
pub enum FilterError {
#[error("filter not found")]
FilterNotFound(FilterId),
}
// convert the error
impl From<FilterError> for jsonrpsee::core::Error {
fn from(err: FilterError) -> Self {
match err {
FilterError::FilterNotFound(_) => rpc_error_with_code(
jsonrpsee::types::error::CALL_EXECUTION_FAILED_CODE,
"filter not found",
),
}
}
}

View File

@ -145,6 +145,11 @@ pub(crate) fn internal_rpc_err_with_data(
rpc_err(jsonrpsee::types::error::INTERNAL_ERROR_CODE, msg, Some(data))
}
/// Constructs an internal JSON-RPC error with code and message
pub(crate) fn rpc_error_with_code(code: i32, msg: impl Into<String>) -> jsonrpsee::core::Error {
rpc_err(code, msg, None)
}
/// Constructs a JSON-RPC error, consisting of `code`, `message` and optional `data`.
pub(crate) fn rpc_err(code: i32, msg: impl Into<String>, data: Option<&[u8]>) -> RpcError {
RpcError::Call(jsonrpsee::types::error::CallError::Custom(