diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index ac76d4fab..a3b5e0a03 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -7,6 +7,7 @@ use reth_primitives::TxHash; use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider}; use reth_rpc_api::EthPubSubApiServer; use reth_rpc_types::FilteredParams; +use std::sync::Arc; use reth_rpc_types::{ pubsub::{ @@ -29,7 +30,7 @@ use tokio_stream::{ #[derive(Clone)] pub struct EthPubSub { /// All nested fields bundled together. - inner: EthPubSubInner, + inner: Arc>, /// The type that's used to spawn subscription tasks. subscription_task_spawner: Box, } @@ -59,7 +60,7 @@ impl EthPubSub subscription_task_spawner: Box, ) -> Self { let inner = EthPubSubInner { provider, pool, chain_events, network }; - Self { inner, subscription_task_spawner } + Self { inner: Arc::new(inner), subscription_task_spawner } } } @@ -91,7 +92,7 @@ where /// The actual handler for and accepted [`EthPubSub::subscribe`] call. async fn handle_accepted( - pubsub: EthPubSubInner, + pubsub: Arc>, accepted_sink: SubscriptionSink, kind: SubscriptionKind, params: Option, @@ -105,7 +106,7 @@ where match kind { SubscriptionKind::NewHeads => { let stream = pubsub - .into_new_headers_stream() + .new_headers_stream() .map(|block| EthSubscriptionResult::Header(Box::new(block.into()))); pipe_from_stream(accepted_sink, stream).await } @@ -116,13 +117,12 @@ where _ => FilteredParams::default(), }; let stream = - pubsub.into_log_stream(filter).map(|log| EthSubscriptionResult::Log(Box::new(log))); + pubsub.log_stream(filter).map(|log| EthSubscriptionResult::Log(Box::new(log))); pipe_from_stream(accepted_sink, stream).await } SubscriptionKind::NewPendingTransactions => { - let stream = pubsub - .into_pending_transaction_stream() - .map(EthSubscriptionResult::TransactionHash); + let stream = + pubsub.pending_transaction_stream().map(EthSubscriptionResult::TransactionHash); pipe_from_stream(accepted_sink, stream).await } SubscriptionKind::Syncing => { @@ -241,7 +241,7 @@ where Pool: TransactionPool + 'static, { /// Returns a stream that yields all transactions emitted by the txpool. - fn into_pending_transaction_stream(self) -> impl Stream { + fn pending_transaction_stream(&self) -> impl Stream { ReceiverStream::new(self.pool.pending_transactions_listener()) } } @@ -254,7 +254,7 @@ where Pool: 'static, { /// Returns a stream that yields all new RPC blocks. - fn into_new_headers_stream(self) -> impl Stream { + fn new_headers_stream(&self) -> impl Stream { BroadcastStream::new(self.chain_events.subscribe_to_canonical_state()) .map(|new_block| { let new_chain = new_block.expect("new block subscription never ends; qed"); @@ -274,7 +274,7 @@ where } /// Returns a stream that yields all logs that match the given filter. - fn into_log_stream(self, filter: FilteredParams) -> impl Stream { + fn log_stream(&self, filter: FilteredParams) -> impl Stream { BroadcastStream::new(self.chain_events.subscribe_to_canonical_state()) .map(move |canon_state| { canon_state.expect("new block subscription never ends; qed").block_receipts()