feat(rpc): implement log subscription (#1719)

This commit is contained in:
Matthias Seitz
2023-03-14 04:00:58 +01:00
committed by GitHub
parent 54aab533c2
commit c5cd236e1a
2 changed files with 83 additions and 11 deletions

View File

@ -1,7 +1,7 @@
//! Ethereum types for pub-sub
use crate::{Log, RichHeader};
use reth_primitives::{rpc::Filter, H256};
use reth_primitives::{filter::Filter, H256};
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
/// Subscription result.

View File

@ -1,8 +1,9 @@
//! `eth_` PubSub RPC handler implementation
use futures::StreamExt;
use jsonrpsee::{types::SubscriptionResult, SubscriptionSink};
use reth_interfaces::{events::ChainEventSubscriptions, sync::SyncStateProvider};
use reth_primitives::{rpc::FilteredParams, TxHash};
use reth_primitives::{filter::FilteredParams, BlockId, TxHash, H256, U256};
use reth_provider::{BlockProvider, EvmEnvProvider};
use reth_rpc_api::EthPubSubApiServer;
use reth_rpc_types::{
@ -10,13 +11,13 @@ use reth_rpc_types::{
Params, PubSubSyncStatus, SubscriptionKind, SubscriptionResult as EthSubscriptionResult,
SyncStatusMetadata,
},
Header,
Header, Log,
};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use reth_transaction_pool::TransactionPool;
use tokio_stream::{
wrappers::{ReceiverStream, UnboundedReceiverStream},
Stream, StreamExt,
Stream,
};
/// `Eth` pubsub RPC implementation.
@ -91,12 +92,6 @@ async fn handle_accepted<Client, Pool, Events, Network>(
Events: ChainEventSubscriptions + Clone + 'static,
Network: SyncStateProvider + Clone + 'static,
{
// if no params are provided, used default filter params
let _params = match params {
Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)),
_ => FilteredParams::default(),
};
match kind {
SubscriptionKind::NewHeads => {
let stream = pubsub
@ -105,7 +100,14 @@ async fn handle_accepted<Client, Pool, Events, Network>(
accepted_sink.pipe_from_stream(stream).await;
}
SubscriptionKind::Logs => {
// TODO subscribe new blocks -> fetch logs via bloom
// if no params are provided, used default filter params
let filter = match params {
Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)),
_ => FilteredParams::default(),
};
let stream =
pubsub.into_log_stream(filter).map(|log| EthSubscriptionResult::Log(Box::new(log)));
accepted_sink.pipe_from_stream(stream).await;
}
SubscriptionKind::NewPendingTransactions => {
let stream = pubsub
@ -206,4 +208,74 @@ where
Header::from_primitive_with_hash(new_block.header.as_ref().clone(), new_block.hash)
})
}
/// Returns a stream that yields all logs that match the given filter.
fn into_log_stream(self, filter: FilteredParams) -> impl Stream<Item = Log> {
UnboundedReceiverStream::new(self.chain_events.subscribe_new_blocks())
.filter_map(move |new_block| {
let block_id: BlockId = new_block.hash.into();
let txs = self.client.transactions_by_block(block_id).ok().flatten();
let receipts = self.client.receipts_by_block(block_id).ok().flatten();
match (txs, receipts) {
(Some(txs), Some(receipts)) => {
futures::future::ready(Some((new_block, txs, receipts)))
}
_ => futures::future::ready(None),
}
})
.flat_map(move |(new_block, transactions, receipts)| {
let block_hash = new_block.hash;
let block_number = new_block.header.number;
let mut all_logs: Vec<Log> = Vec::new();
// tracks the index of a log in the entire block
let mut log_index: u32 = 0;
for (transaction_idx, (tx, receipt)) in
transactions.into_iter().zip(receipts).enumerate()
{
let logs = receipt.logs;
// tracks the index of the log in the transaction
let transaction_hash = tx.hash;
for (transaction_log_idx, log) in logs.into_iter().enumerate() {
if matches_filter(block_hash, block_number, &log, &filter) {
let log = Log {
address: log.address,
topics: log.topics,
data: log.data,
block_hash: Some(block_hash),
block_number: Some(U256::from(block_number)),
transaction_hash: Some(transaction_hash),
transaction_index: Some(U256::from(transaction_idx)),
log_index: Some(U256::from(log_index)),
transaction_log_index: Some(U256::from(transaction_log_idx)),
removed: false,
};
all_logs.push(log);
}
log_index += 1;
}
}
futures::stream::iter(all_logs)
})
}
}
/// Returns true if the log matches the filter and should be included
fn matches_filter(
block_hash: H256,
block_number: u64,
log: &reth_primitives::Log,
params: &FilteredParams,
) -> bool {
if params.filter.is_some() &&
(!params.filter_block_range(block_number) ||
!params.filter_block_hash(block_hash) ||
!params.filter_address(log) ||
!params.filter_topics(log))
{
return false
}
true
}