chore: put EthPubSubInner in Arc (#3463)

This commit is contained in:
Matthias Seitz
2023-06-30 00:07:31 +02:00
committed by GitHub
parent 40f2a51008
commit 793838975b

View File

@ -7,6 +7,7 @@ use reth_primitives::TxHash;
use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider};
use reth_rpc_api::EthPubSubApiServer;
use reth_rpc_types::FilteredParams;
use std::sync::Arc;
use reth_rpc_types::{
pubsub::{
@ -29,7 +30,7 @@ use tokio_stream::{
#[derive(Clone)]
pub struct EthPubSub<Provider, Pool, Events, Network> {
/// All nested fields bundled together.
inner: EthPubSubInner<Provider, Pool, Events, Network>,
inner: Arc<EthPubSubInner<Provider, Pool, Events, Network>>,
/// The type that's used to spawn subscription tasks.
subscription_task_spawner: Box<dyn TaskSpawner>,
}
@ -59,7 +60,7 @@ impl<Provider, Pool, Events, Network> EthPubSub<Provider, Pool, Events, Network>
subscription_task_spawner: Box<dyn TaskSpawner>,
) -> Self {
let inner = EthPubSubInner { provider, pool, chain_events, network };
Self { inner, subscription_task_spawner }
Self { inner: Arc::new(inner), subscription_task_spawner }
}
}
@ -91,7 +92,7 @@ where
/// The actual handler for and accepted [`EthPubSub::subscribe`] call.
async fn handle_accepted<Provider, Pool, Events, Network>(
pubsub: EthPubSubInner<Provider, Pool, Events, Network>,
pubsub: Arc<EthPubSubInner<Provider, Pool, Events, Network>>,
accepted_sink: SubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
@ -105,7 +106,7 @@ where
match kind {
SubscriptionKind::NewHeads => {
let stream = pubsub
.into_new_headers_stream()
.new_headers_stream()
.map(|block| EthSubscriptionResult::Header(Box::new(block.into())));
pipe_from_stream(accepted_sink, stream).await
}
@ -116,13 +117,12 @@ where
_ => FilteredParams::default(),
};
let stream =
pubsub.into_log_stream(filter).map(|log| EthSubscriptionResult::Log(Box::new(log)));
pubsub.log_stream(filter).map(|log| EthSubscriptionResult::Log(Box::new(log)));
pipe_from_stream(accepted_sink, stream).await
}
SubscriptionKind::NewPendingTransactions => {
let stream = pubsub
.into_pending_transaction_stream()
.map(EthSubscriptionResult::TransactionHash);
let stream =
pubsub.pending_transaction_stream().map(EthSubscriptionResult::TransactionHash);
pipe_from_stream(accepted_sink, stream).await
}
SubscriptionKind::Syncing => {
@ -241,7 +241,7 @@ where
Pool: TransactionPool + 'static,
{
/// Returns a stream that yields all transactions emitted by the txpool.
fn into_pending_transaction_stream(self) -> impl Stream<Item = TxHash> {
fn pending_transaction_stream(&self) -> impl Stream<Item = TxHash> {
ReceiverStream::new(self.pool.pending_transactions_listener())
}
}
@ -254,7 +254,7 @@ where
Pool: 'static,
{
/// Returns a stream that yields all new RPC blocks.
fn into_new_headers_stream(self) -> impl Stream<Item = Header> {
fn new_headers_stream(&self) -> impl Stream<Item = Header> {
BroadcastStream::new(self.chain_events.subscribe_to_canonical_state())
.map(|new_block| {
let new_chain = new_block.expect("new block subscription never ends; qed");
@ -274,7 +274,7 @@ where
}
/// Returns a stream that yields all logs that match the given filter.
fn into_log_stream(self, filter: FilteredParams) -> impl Stream<Item = Log> {
fn log_stream(&self, filter: FilteredParams) -> impl Stream<Item = Log> {
BroadcastStream::new(self.chain_events.subscribe_to_canonical_state())
.map(move |canon_state| {
canon_state.expect("new block subscription never ends; qed").block_receipts()