diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index 5b823ea40..d5c0231ef 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -1,25 +1,22 @@ //! `eth_` PubSub RPC handler implementation -use crate::eth::logs_utils; +use crate::{eth::logs_utils, result::invalid_params_rpc_err}; use futures::StreamExt; use jsonrpsee::{server::SubscriptionMessage, PendingSubscriptionSink, SubscriptionSink}; use reth_network_api::NetworkInfo; use reth_primitives::{IntoRecoveredTransaction, TxHash}; use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider}; use reth_rpc_api::EthPubSubApiServer; -use reth_rpc_types::FilteredParams; -use std::sync::Arc; - -use crate::result::invalid_params_rpc_err; use reth_rpc_types::{ pubsub::{ Params, PubSubSyncStatus, SubscriptionKind, SubscriptionResult as EthSubscriptionResult, SyncStatusMetadata, }, - Header, Log, Transaction, + FilteredParams, Header, Log, Transaction, }; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use reth_transaction_pool::{NewTransactionEvent, TransactionPool}; use serde::Serialize; +use std::sync::Arc; use tokio_stream::{ wrappers::{BroadcastStream, ReceiverStream}, Stream, @@ -292,22 +289,13 @@ where { /// Returns a stream that yields all new RPC blocks. fn new_headers_stream(&self) -> impl Stream { - BroadcastStream::new(self.chain_events.subscribe_to_canonical_state()) - .map(|new_block| { - let new_chain = new_block.expect("new block subscription never ends; qed"); - new_chain - .committed() - .map(|c| { - c.blocks() - .iter() - .map(|(_, block)| { - Header::from_primitive_with_hash(block.header.clone()) - }) - .collect::>() - }) - .unwrap_or_default() - }) - .flat_map(futures::stream::iter) + self.chain_events.canonical_state_stream().flat_map(|new_chain| { + let headers = new_chain + .committed() + .map(|chain| chain.headers().collect::>()) + .unwrap_or_default(); + futures::stream::iter(headers.into_iter().map(Header::from_primitive_with_hash)) + }) } /// Returns a stream that yields all logs that match the given filter. diff --git a/crates/storage/provider/src/chain.rs b/crates/storage/provider/src/chain.rs index 13ba859d0..b5d596c01 100644 --- a/crates/storage/provider/src/chain.rs +++ b/crates/storage/provider/src/chain.rs @@ -4,7 +4,7 @@ use crate::PostState; use reth_interfaces::{executor::BlockExecutionError, Error}; use reth_primitives::{ BlockHash, BlockNumHash, BlockNumber, ForkBlock, Receipt, SealedBlock, SealedBlockWithSenders, - TransactionSigned, TxHash, + SealedHeader, TransactionSigned, TxHash, }; use std::{borrow::Cow, collections::BTreeMap, fmt}; @@ -31,6 +31,16 @@ impl Chain { &self.blocks } + /// Consumes the type and only returns the blocks in this chain. + pub fn into_blocks(self) -> BTreeMap { + self.blocks + } + + /// Returns an iterator over all headers in the block with increasing block numbers. + pub fn headers(&self) -> impl Iterator + '_ { + self.blocks.values().map(|block| block.header.clone()) + } + /// Get post state of this chain pub fn state(&self) -> &PostState { &self.state