chore: refactor header stream (#3880)

This commit is contained in:
Matthias Seitz
2023-07-24 18:57:45 +02:00
committed by GitHub
parent 1ca7f3ae40
commit b69a18dc47
2 changed files with 21 additions and 23 deletions

View File

@ -1,25 +1,22 @@
//! `eth_` PubSub RPC handler implementation //! `eth_` PubSub RPC handler implementation
use crate::eth::logs_utils; use crate::{eth::logs_utils, result::invalid_params_rpc_err};
use futures::StreamExt; use futures::StreamExt;
use jsonrpsee::{server::SubscriptionMessage, PendingSubscriptionSink, SubscriptionSink}; use jsonrpsee::{server::SubscriptionMessage, PendingSubscriptionSink, SubscriptionSink};
use reth_network_api::NetworkInfo; use reth_network_api::NetworkInfo;
use reth_primitives::{IntoRecoveredTransaction, TxHash}; use reth_primitives::{IntoRecoveredTransaction, TxHash};
use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider}; use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider};
use reth_rpc_api::EthPubSubApiServer; use reth_rpc_api::EthPubSubApiServer;
use reth_rpc_types::FilteredParams;
use std::sync::Arc;
use crate::result::invalid_params_rpc_err;
use reth_rpc_types::{ use reth_rpc_types::{
pubsub::{ pubsub::{
Params, PubSubSyncStatus, SubscriptionKind, SubscriptionResult as EthSubscriptionResult, Params, PubSubSyncStatus, SubscriptionKind, SubscriptionResult as EthSubscriptionResult,
SyncStatusMetadata, SyncStatusMetadata,
}, },
Header, Log, Transaction, FilteredParams, Header, Log, Transaction,
}; };
use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use reth_transaction_pool::{NewTransactionEvent, TransactionPool}; use reth_transaction_pool::{NewTransactionEvent, TransactionPool};
use serde::Serialize; use serde::Serialize;
use std::sync::Arc;
use tokio_stream::{ use tokio_stream::{
wrappers::{BroadcastStream, ReceiverStream}, wrappers::{BroadcastStream, ReceiverStream},
Stream, Stream,
@ -292,22 +289,13 @@ where
{ {
/// Returns a stream that yields all new RPC blocks. /// Returns a stream that yields all new RPC blocks.
fn new_headers_stream(&self) -> impl Stream<Item = Header> { fn new_headers_stream(&self) -> impl Stream<Item = Header> {
BroadcastStream::new(self.chain_events.subscribe_to_canonical_state()) self.chain_events.canonical_state_stream().flat_map(|new_chain| {
.map(|new_block| { let headers = new_chain
let new_chain = new_block.expect("new block subscription never ends; qed"); .committed()
new_chain .map(|chain| chain.headers().collect::<Vec<_>>())
.committed() .unwrap_or_default();
.map(|c| { futures::stream::iter(headers.into_iter().map(Header::from_primitive_with_hash))
c.blocks() })
.iter()
.map(|(_, block)| {
Header::from_primitive_with_hash(block.header.clone())
})
.collect::<Vec<_>>()
})
.unwrap_or_default()
})
.flat_map(futures::stream::iter)
} }
/// Returns a stream that yields all logs that match the given filter. /// Returns a stream that yields all logs that match the given filter.

View File

@ -4,7 +4,7 @@ use crate::PostState;
use reth_interfaces::{executor::BlockExecutionError, Error}; use reth_interfaces::{executor::BlockExecutionError, Error};
use reth_primitives::{ use reth_primitives::{
BlockHash, BlockNumHash, BlockNumber, ForkBlock, Receipt, SealedBlock, SealedBlockWithSenders, BlockHash, BlockNumHash, BlockNumber, ForkBlock, Receipt, SealedBlock, SealedBlockWithSenders,
TransactionSigned, TxHash, SealedHeader, TransactionSigned, TxHash,
}; };
use std::{borrow::Cow, collections::BTreeMap, fmt}; use std::{borrow::Cow, collections::BTreeMap, fmt};
@ -31,6 +31,16 @@ impl Chain {
&self.blocks &self.blocks
} }
/// Consumes the type and only returns the blocks in this chain.
pub fn into_blocks(self) -> BTreeMap<BlockNumber, SealedBlockWithSenders> {
self.blocks
}
/// Returns an iterator over all headers in the block with increasing block numbers.
pub fn headers(&self) -> impl Iterator<Item = SealedHeader> + '_ {
self.blocks.values().map(|block| block.header.clone())
}
/// Get post state of this chain /// Get post state of this chain
pub fn state(&self) -> &PostState { pub fn state(&self) -> &PostState {
&self.state &self.state