diff --git a/crates/primitives/src/chain/info.rs b/crates/primitives/src/chain/info.rs index 33e04c0f0..c9603eea1 100644 --- a/crates/primitives/src/chain/info.rs +++ b/crates/primitives/src/chain/info.rs @@ -1,4 +1,4 @@ -use crate::{BlockNumber, H256}; +use crate::{rpc::BlockNumber as RpcBlockNumber, BlockNumber, H256}; /// Current status of the blockchain's head. #[derive(Default, Debug, Eq, PartialEq)] @@ -12,3 +12,17 @@ pub struct ChainInfo { /// Safe block pub safe_finalized: Option, } + +impl ChainInfo { + /// Attempts to convert a [BlockNumber](crate::rpc::BlockNumber) enum to a numeric value + pub fn convert_block_number(&self, number: RpcBlockNumber) -> Option { + match number { + RpcBlockNumber::Finalized => self.last_finalized, + RpcBlockNumber::Safe => self.safe_finalized, + RpcBlockNumber::Earliest => Some(0), + RpcBlockNumber::Number(num) => Some(num.as_u64()), + RpcBlockNumber::Pending => None, + RpcBlockNumber::Latest => Some(self.best_number), + } + } +} diff --git a/crates/rpc/rpc/src/eth/filter.rs b/crates/rpc/rpc/src/eth/filter.rs index 1a10b785c..acc816fdf 100644 --- a/crates/rpc/rpc/src/eth/filter.rs +++ b/crates/rpc/rpc/src/eth/filter.rs @@ -1,10 +1,16 @@ -use crate::result::{internal_rpc_err, ToRpcResult}; +use crate::{ + eth::error::EthApiError, + result::{internal_rpc_err, rpc_error_with_code, ToRpcResult}, +}; use async_trait::async_trait; use jsonrpsee::{ core::RpcResult, server::{IdProvider, RandomIntegerIdProvider}, }; -use reth_primitives::rpc::Filter; +use reth_primitives::{ + rpc::{Filter, FilterBlockOption, FilteredParams}, + U256, +}; use reth_provider::BlockProvider; use reth_rpc_api::EthFilterApiServer; use reth_rpc_types::{FilterChanges, FilterId, Log}; @@ -13,6 +19,9 @@ use std::{collections::HashMap, sync::Arc, time::Instant}; use tokio::sync::Mutex; use tracing::trace; +/// The default maximum of logs in a single response. +const DEFAULT_MAX_LOGS_IN_RESPONSE: usize = 2_000; + /// `Eth` filter RPC implementation. #[derive(Debug, Clone)] pub struct EthFilter { @@ -28,6 +37,7 @@ impl EthFilter { active_filters: Default::default(), pool, id_provider: Arc::new(RandomIntegerIdProvider), + max_logs_in_response: DEFAULT_MAX_LOGS_IN_RESPONSE, }; Self { inner: Arc::new(inner) } } @@ -45,7 +55,7 @@ where Pool: TransactionPool + 'static, { async fn new_filter(&self, filter: Filter) -> RpcResult { - self.inner.install_filter(FilterKind::Log(filter)).await + self.inner.install_filter(FilterKind::Log(Box::new(filter))).await } async fn new_block_filter(&self) -> RpcResult { @@ -56,8 +66,74 @@ where self.inner.install_filter(FilterKind::PendingTransaction).await } - async fn filter_changes(&self, _id: FilterId) -> RpcResult { - todo!() + async fn filter_changes(&self, id: FilterId) -> RpcResult { + let info = self.inner.client.chain_info().to_rpc_result()?; + let best_number = info.best_number; + + let (start_block, kind) = { + let mut filters = self.inner.active_filters.inner.lock().await; + let mut filter = filters.get_mut(&id).ok_or(FilterError::FilterNotFound(id))?; + + // update filter + // we fetch all changes from [filter.block..best_block], so we advance the filter's + // block to `best_block +1` + let mut block = best_number + 1; + std::mem::swap(&mut filter.block, &mut block); + filter.last_poll_timestamp = Instant::now(); + + (block, filter.kind.clone()) + }; + + match kind { + FilterKind::PendingTransaction => { + return Err(internal_rpc_err("method not implemented")) + } + FilterKind::Block => { + let mut block_hashes = Vec::new(); + for block_num in start_block..best_number { + let block_hash = self + .inner + .client + .block_hash(U256::from(block_num)) + .to_rpc_result()? + .ok_or(EthApiError::UnknownBlockNumber)?; + block_hashes.push(block_hash); + } + Ok(FilterChanges::Hashes(block_hashes)) + } + FilterKind::Log(filter) => { + let mut from_block_number = start_block; + let mut to_block_number = best_number; + match filter.block_option { + FilterBlockOption::Range { from_block, to_block } => { + // from block is maximum of block from last poll or `from_block` of filter + if let Some(filter_from_block) = + from_block.and_then(|num| info.convert_block_number(num)) + { + from_block_number = start_block.max(filter_from_block) + } + + // to block is max the best number + if let Some(filter_to_block) = + to_block.and_then(|num| info.convert_block_number(num)) + { + to_block_number = filter_to_block; + if to_block_number > best_number { + to_block_number = best_number; + } + } + } + FilterBlockOption::AtBlockHash(_) => { + // blockHash is equivalent to fromBlock = toBlock = the block number with + // hash blockHash + } + } + + self.inner + .filter_logs(&filter, from_block_number, to_block_number) + .map(FilterChanges::Logs) + } + } } async fn filter_logs(&self, _id: FilterId) -> RpcResult> { @@ -88,7 +164,10 @@ struct EthFilterInner { client: Client, /// All currently installed filters. active_filters: ActiveFilters, + /// Provides ids to identify filters id_provider: Arc, + /// Maximum number of logs that can be returned in a response + max_logs_in_response: usize, } impl EthFilterInner @@ -103,10 +182,59 @@ where let mut filters = self.active_filters.inner.lock().await; filters.insert( id.clone(), - ActiveFilter { last_poll_block_number, last_poll_timestamp: Instant::now(), kind }, + ActiveFilter { + block: last_poll_block_number, + last_poll_timestamp: Instant::now(), + kind, + }, ); Ok(id) } + + /// Returns all logs in the given range that match the filter + /// + /// Returns an error if: + /// - underlying database error + /// - amount of matches exceeds configured limit + #[allow(dead_code)] + fn filter_logs( + &self, + filter: &Filter, + _from_block: u64, + _to_block: u64, + ) -> RpcResult> { + let logs = Vec::new(); + let topics = if filter.has_topics() { + let params = FilteredParams::new(Some(filter.clone())); + Some(params.flat_topics) + } else { + None + }; + + let _address_filter = FilteredParams::address_filter(&filter.address); + let _topics_filter = FilteredParams::topics_filter(&topics); + + // TODO blocked by + // let block_number = from_block; + // + // while block_number <= to_block { + // let _block = self + // .client + // .block_by_number(BlockNumber::Number(block_number.into())) + // .to_rpc_result()? + // .ok_or(EthApiError::UnknownBlockNumber)?; + // + // + // + // if FilteredParams::matches_address(block.header.logs_bloom, &address_filter) && + // FilteredParams::matches_topics(block.header.logs_bloom, &topics_filter) + // { + // // TODO filter logs from transactions + // } + // } + + Ok(logs) + } } /// All active filters @@ -119,7 +247,7 @@ pub struct ActiveFilters { #[derive(Debug)] struct ActiveFilter { /// At which block the filter was polled last. - last_poll_block_number: u64, + block: u64, /// Last time this filter was polled. last_poll_timestamp: Instant, /// What kind of filter it is. @@ -127,9 +255,28 @@ struct ActiveFilter { } #[derive(Clone, Debug)] -#[allow(clippy::large_enum_variant)] // stored on heap +#[allow(clippy::large_enum_variant)] enum FilterKind { - Log(Filter), + Log(Box), Block, PendingTransaction, } + +/// Errors that can occur in the handler implementation +#[derive(Debug, Clone, thiserror::Error)] +pub enum FilterError { + #[error("filter not found")] + FilterNotFound(FilterId), +} + +// convert the error +impl From for jsonrpsee::core::Error { + fn from(err: FilterError) -> Self { + match err { + FilterError::FilterNotFound(_) => rpc_error_with_code( + jsonrpsee::types::error::CALL_EXECUTION_FAILED_CODE, + "filter not found", + ), + } + } +} diff --git a/crates/rpc/rpc/src/result.rs b/crates/rpc/rpc/src/result.rs index 52d705080..5a28e3bea 100644 --- a/crates/rpc/rpc/src/result.rs +++ b/crates/rpc/rpc/src/result.rs @@ -145,6 +145,11 @@ pub(crate) fn internal_rpc_err_with_data( rpc_err(jsonrpsee::types::error::INTERNAL_ERROR_CODE, msg, Some(data)) } +/// Constructs an internal JSON-RPC error with code and message +pub(crate) fn rpc_error_with_code(code: i32, msg: impl Into) -> jsonrpsee::core::Error { + rpc_err(code, msg, None) +} + /// Constructs a JSON-RPC error, consisting of `code`, `message` and optional `data`. pub(crate) fn rpc_err(code: i32, msg: impl Into, data: Option<&[u8]>) -> RpcError { RpcError::Call(jsonrpsee::types::error::CallError::Custom(