From c5cd236e1a664d2e66db18e2c6642faf91ec13ac Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 14 Mar 2023 04:00:58 +0100 Subject: [PATCH] feat(rpc): implement log subscription (#1719) --- crates/rpc/rpc-types/src/eth/pubsub.rs | 2 +- crates/rpc/rpc/src/eth/pubsub.rs | 92 +++++++++++++++++++++++--- 2 files changed, 83 insertions(+), 11 deletions(-) diff --git a/crates/rpc/rpc-types/src/eth/pubsub.rs b/crates/rpc/rpc-types/src/eth/pubsub.rs index 7fbd1e46b..5166e6436 100644 --- a/crates/rpc/rpc-types/src/eth/pubsub.rs +++ b/crates/rpc/rpc-types/src/eth/pubsub.rs @@ -1,7 +1,7 @@ //! Ethereum types for pub-sub use crate::{Log, RichHeader}; -use reth_primitives::{rpc::Filter, H256}; +use reth_primitives::{filter::Filter, H256}; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; /// Subscription result. diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index fa08bf95e..5a2ee798e 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -1,8 +1,9 @@ //! `eth_` PubSub RPC handler implementation +use futures::StreamExt; use jsonrpsee::{types::SubscriptionResult, SubscriptionSink}; use reth_interfaces::{events::ChainEventSubscriptions, sync::SyncStateProvider}; -use reth_primitives::{rpc::FilteredParams, TxHash}; +use reth_primitives::{filter::FilteredParams, BlockId, TxHash, H256, U256}; use reth_provider::{BlockProvider, EvmEnvProvider}; use reth_rpc_api::EthPubSubApiServer; use reth_rpc_types::{ @@ -10,13 +11,13 @@ use reth_rpc_types::{ Params, PubSubSyncStatus, SubscriptionKind, SubscriptionResult as EthSubscriptionResult, SyncStatusMetadata, }, - Header, + Header, Log, }; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use reth_transaction_pool::TransactionPool; use tokio_stream::{ wrappers::{ReceiverStream, UnboundedReceiverStream}, - Stream, StreamExt, + Stream, }; /// `Eth` pubsub RPC implementation. @@ -91,12 +92,6 @@ async fn handle_accepted( Events: ChainEventSubscriptions + Clone + 'static, Network: SyncStateProvider + Clone + 'static, { - // if no params are provided, used default filter params - let _params = match params { - Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)), - _ => FilteredParams::default(), - }; - match kind { SubscriptionKind::NewHeads => { let stream = pubsub @@ -105,7 +100,14 @@ async fn handle_accepted( accepted_sink.pipe_from_stream(stream).await; } SubscriptionKind::Logs => { - // TODO subscribe new blocks -> fetch logs via bloom + // if no params are provided, used default filter params + let filter = match params { + Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)), + _ => FilteredParams::default(), + }; + let stream = + pubsub.into_log_stream(filter).map(|log| EthSubscriptionResult::Log(Box::new(log))); + accepted_sink.pipe_from_stream(stream).await; } SubscriptionKind::NewPendingTransactions => { let stream = pubsub @@ -206,4 +208,74 @@ where Header::from_primitive_with_hash(new_block.header.as_ref().clone(), new_block.hash) }) } + + /// Returns a stream that yields all logs that match the given filter. + fn into_log_stream(self, filter: FilteredParams) -> impl Stream { + UnboundedReceiverStream::new(self.chain_events.subscribe_new_blocks()) + .filter_map(move |new_block| { + let block_id: BlockId = new_block.hash.into(); + let txs = self.client.transactions_by_block(block_id).ok().flatten(); + let receipts = self.client.receipts_by_block(block_id).ok().flatten(); + match (txs, receipts) { + (Some(txs), Some(receipts)) => { + futures::future::ready(Some((new_block, txs, receipts))) + } + _ => futures::future::ready(None), + } + }) + .flat_map(move |(new_block, transactions, receipts)| { + let block_hash = new_block.hash; + let block_number = new_block.header.number; + let mut all_logs: Vec = Vec::new(); + + // tracks the index of a log in the entire block + let mut log_index: u32 = 0; + for (transaction_idx, (tx, receipt)) in + transactions.into_iter().zip(receipts).enumerate() + { + let logs = receipt.logs; + + // tracks the index of the log in the transaction + let transaction_hash = tx.hash; + + for (transaction_log_idx, log) in logs.into_iter().enumerate() { + if matches_filter(block_hash, block_number, &log, &filter) { + let log = Log { + address: log.address, + topics: log.topics, + data: log.data, + block_hash: Some(block_hash), + block_number: Some(U256::from(block_number)), + transaction_hash: Some(transaction_hash), + transaction_index: Some(U256::from(transaction_idx)), + log_index: Some(U256::from(log_index)), + transaction_log_index: Some(U256::from(transaction_log_idx)), + removed: false, + }; + all_logs.push(log); + } + log_index += 1; + } + } + futures::stream::iter(all_logs) + }) + } +} + +/// Returns true if the log matches the filter and should be included +fn matches_filter( + block_hash: H256, + block_number: u64, + log: &reth_primitives::Log, + params: &FilteredParams, +) -> bool { + if params.filter.is_some() && + (!params.filter_block_range(block_number) || + !params.filter_block_hash(block_hash) || + !params.filter_address(log) || + !params.filter_topics(log)) + { + return false + } + true }