feat(rpc): complete log filter (#1741)

This commit is contained in:
Matthias Seitz
2023-03-14 17:03:36 +01:00
committed by GitHub
parent a0ff24b691
commit 8e5644dac0
4 changed files with 115 additions and 78 deletions

View File

@ -1,5 +1,5 @@
use crate::{
eth::error::EthApiError,
eth::{error::EthApiError, logs_utils},
result::{internal_rpc_err, rpc_error_with_code, ToRpcResult},
EthSubscriptionIdProvider,
};
@ -7,7 +7,7 @@ use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_primitives::{
filter::{Filter, FilterBlockOption, FilteredParams},
Block, U256,
U256,
};
use reth_provider::{BlockProvider, EvmEnvProvider};
use reth_rpc_api::EthFilterApiServer;
@ -150,7 +150,7 @@ where
trace!(target: "rpc::eth::filter", ?id, "uninstalled filter");
Ok(true)
} else {
Err(internal_rpc_err(format!("Filter id {id:?} does not exist.")))
Ok(false)
}
}
@ -201,9 +201,8 @@ where
/// Returns an error if:
/// - underlying database error
/// - amount of matches exceeds configured limit
#[allow(dead_code)]
fn filter_logs(&self, filter: &Filter, from_block: u64, to_block: u64) -> RpcResult<Vec<Log>> {
let mut logs = Vec::new();
let mut all_logs = Vec::new();
let filter_params = FilteredParams::new(Some(filter.clone()));
let topics =
@ -213,38 +212,41 @@ where
let address_filter = FilteredParams::address_filter(&filter.address);
let topics_filter = FilteredParams::topics_filter(&topics);
// loop over the range of new blocks and check logs if the filter matches the log's bloom
// filter
for block_number in from_block..=to_block {
if let Some(block) = self.client.block_by_number(block_number).to_rpc_result()? {
// only if filter matches
if FilteredParams::matches_address(block.header.logs_bloom, &address_filter) &&
FilteredParams::matches_topics(block.header.logs_bloom, &topics_filter)
{
self.append_matching_block_logs(&mut logs, &filter_params, block);
// get receipts for the block
if let Some(receipts) =
self.client.receipts_by_block(block.number.into()).to_rpc_result()?
{
let block_hash = block.hash_slow();
// TODO size check
logs_utils::append_matching_block_logs(
&mut all_logs,
&filter_params,
block_hash,
block_number,
block.body.into_iter().map(|tx| tx.hash).zip(receipts),
);
// size check
if all_logs.len() > self.max_logs_in_response {
return Err(FilterError::QueryExceedsMaxResults(
self.max_logs_in_response,
)
.into())
}
}
}
}
}
Ok(logs)
}
/// Appends all logs emitted in the `block` that match the `filter` to the `logs` vector.
#[allow(clippy::ptr_arg)]
fn append_matching_block_logs(
&self,
_logs: &mut Vec<Log>,
_filter: &FilteredParams,
block: Block,
) {
let _block_log_index: u32 = 0;
let _block_hash = block.hash_slow();
// loop over all transactions in the block
for tx in block.body {
let _transaction_log_index: u32 = 0;
let _transaction_hash = tx.hash;
}
Ok(all_logs)
}
}
@ -266,7 +268,6 @@ struct ActiveFilter {
}
#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)]
enum FilterKind {
Log(Box<Filter>),
Block,
@ -278,6 +279,8 @@ enum FilterKind {
pub enum FilterError {
#[error("filter not found")]
FilterNotFound(FilterId),
#[error("Query exceeds max results {0}")]
QueryExceedsMaxResults(usize),
}
// convert the error
@ -285,9 +288,12 @@ impl From<FilterError> for jsonrpsee::core::Error {
fn from(err: FilterError) -> Self {
match err {
FilterError::FilterNotFound(_) => rpc_error_with_code(
jsonrpsee::types::error::CALL_EXECUTION_FAILED_CODE,
jsonrpsee::types::error::INVALID_PARAMS_CODE,
"filter not found",
),
err @ FilterError::QueryExceedsMaxResults(_) => {
rpc_error_with_code(jsonrpsee::types::error::INVALID_PARAMS_CODE, err.to_string())
}
}
}
}

View File

@ -0,0 +1,72 @@
use reth_primitives::{filter::FilteredParams, Receipt, TxHash, U256};
use reth_rpc_types::Log;
use revm::primitives::B256 as H256;
/// Returns all matching logs of a block's receipts grouped with the hash of their transaction.
pub(crate) fn matching_block_logs<I>(
filter: &FilteredParams,
block_hash: H256,
block_number: u64,
tx_and_receipts: I,
) -> Vec<Log>
where
I: IntoIterator<Item = (TxHash, Receipt)>,
{
let mut all_logs = Vec::new();
append_matching_block_logs(&mut all_logs, filter, block_hash, block_number, tx_and_receipts);
all_logs
}
/// Appends all matching logs of a block's receipts grouped with the hash of their transaction
pub(crate) fn append_matching_block_logs<I>(
all_logs: &mut Vec<Log>,
filter: &FilteredParams,
block_hash: H256,
block_number: u64,
tx_and_receipts: I,
) where
I: IntoIterator<Item = (TxHash, Receipt)>,
{
let block_number_u256 = U256::from(block_number);
// tracks the index of a log in the entire block
let mut log_index: u32 = 0;
for (transaction_idx, (transaction_hash, receipt)) in tx_and_receipts.into_iter().enumerate() {
let logs = receipt.logs;
for (transaction_log_idx, log) in logs.into_iter().enumerate() {
if log_matches_filter(block_hash, block_number, &log, filter) {
let log = Log {
address: log.address,
topics: log.topics,
data: log.data,
block_hash: Some(block_hash),
block_number: Some(block_number_u256),
transaction_hash: Some(transaction_hash),
transaction_index: Some(U256::from(transaction_idx)),
log_index: Some(U256::from(log_index)),
transaction_log_index: Some(U256::from(transaction_log_idx)),
removed: false,
};
all_logs.push(log);
}
log_index += 1;
}
}
}
/// Returns true if the log matches the filter and should be included
pub(crate) fn log_matches_filter(
block_hash: H256,
block_number: u64,
log: &reth_primitives::Log,
params: &FilteredParams,
) -> bool {
if params.filter.is_some() &&
(!params.filter_block_range(block_number) ||
!params.filter_block_hash(block_hash) ||
!params.filter_address(log) ||
!params.filter_topics(log))
{
return false
}
true
}

View File

@ -5,6 +5,7 @@ pub mod cache;
pub mod error;
mod filter;
mod id_provider;
mod logs_utils;
mod pubsub;
pub(crate) mod revm_utils;
mod signer;

View File

@ -1,9 +1,10 @@
//! `eth_` PubSub RPC handler implementation
use crate::eth::logs_utils;
use futures::StreamExt;
use jsonrpsee::{types::SubscriptionResult, SubscriptionSink};
use reth_interfaces::{events::ChainEventSubscriptions, sync::SyncStateProvider};
use reth_primitives::{filter::FilteredParams, BlockId, TxHash, H256, U256};
use reth_primitives::{filter::FilteredParams, BlockId, TxHash};
use reth_provider::{BlockProvider, EvmEnvProvider};
use reth_rpc_api::EthPubSubApiServer;
use reth_rpc_types::{
@ -226,56 +227,13 @@ where
.flat_map(move |(new_block, transactions, receipts)| {
let block_hash = new_block.hash;
let block_number = new_block.header.number;
let mut all_logs: Vec<Log> = Vec::new();
// tracks the index of a log in the entire block
let mut log_index: u32 = 0;
for (transaction_idx, (tx, receipt)) in
transactions.into_iter().zip(receipts).enumerate()
{
let logs = receipt.logs;
// tracks the index of the log in the transaction
let transaction_hash = tx.hash;
for (transaction_log_idx, log) in logs.into_iter().enumerate() {
if matches_filter(block_hash, block_number, &log, &filter) {
let log = Log {
address: log.address,
topics: log.topics,
data: log.data,
block_hash: Some(block_hash),
block_number: Some(U256::from(block_number)),
transaction_hash: Some(transaction_hash),
transaction_index: Some(U256::from(transaction_idx)),
log_index: Some(U256::from(log_index)),
transaction_log_index: Some(U256::from(transaction_log_idx)),
removed: false,
};
all_logs.push(log);
}
log_index += 1;
}
}
let all_logs = logs_utils::matching_block_logs(
&filter,
block_hash,
block_number,
transactions.into_iter().map(|tx| tx.hash).zip(receipts),
);
futures::stream::iter(all_logs)
})
}
}
/// Returns true if the log matches the filter and should be included
fn matches_filter(
block_hash: H256,
block_number: u64,
log: &reth_primitives::Log,
params: &FilteredParams,
) -> bool {
if params.filter.is_some() &&
(!params.filter_block_range(block_number) ||
!params.filter_block_hash(block_hash) ||
!params.filter_address(log) ||
!params.filter_topics(log))
{
return false
}
true
}