feat: Implement is_syncing subscription handler (#1562)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
chirag-bgh
2023-03-12 14:46:47 +05:30
committed by GitHub
parent a4a80a713e
commit 1846f2d73c
2 changed files with 74 additions and 20 deletions

View File

@ -14,7 +14,7 @@ pub struct EthHandlers<Client, Pool, Network, Events> {
/// Polling based filter handler available on all transports
pub filter: EthFilter<Client, Pool>,
/// Handler for subscriptions only available for transports that support it (ws, ipc)
pub pubsub: Option<EthPubSub<Client, Pool, Events>>,
pub pubsub: Option<EthPubSub<Client, Pool, Events, Network>>,
}
/// Additional config values for the eth namespace

View File

@ -1,12 +1,15 @@
//! `eth_` PubSub RPC handler implementation
use jsonrpsee::{types::SubscriptionResult, SubscriptionSink};
use reth_interfaces::events::ChainEventSubscriptions;
use reth_interfaces::{events::ChainEventSubscriptions, sync::SyncStateProvider};
use reth_primitives::{rpc::FilteredParams, TxHash};
use reth_provider::{BlockProvider, EvmEnvProvider};
use reth_rpc_api::EthPubSubApiServer;
use reth_rpc_types::{
pubsub::{Params, SubscriptionKind, SubscriptionResult as EthSubscriptionResult},
pubsub::{
Params, PubSubSyncStatus, SubscriptionKind, SubscriptionResult as EthSubscriptionResult,
SyncStatusMetadata,
},
Header,
};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
@ -20,21 +23,21 @@ use tokio_stream::{
///
/// This handles `eth_subscribe` RPC calls.
#[derive(Clone)]
pub struct EthPubSub<Client, Pool, Events> {
pub struct EthPubSub<Client, Pool, Events, Network> {
/// All nested fields bundled together.
inner: EthPubSubInner<Client, Pool, Events>,
inner: EthPubSubInner<Client, Pool, Events, Network>,
/// The type that's used to spawn subscription tasks.
subscription_task_spawner: Box<dyn TaskSpawner>,
}
// === impl EthPubSub ===
impl<Client, Pool, Events> EthPubSub<Client, Pool, Events> {
impl<Client, Pool, Events, Network> EthPubSub<Client, Pool, Events, Network> {
/// Creates a new, shareable instance.
///
/// Subscription tasks are spawned via [tokio::task::spawn]
pub fn new(client: Client, pool: Pool, chain_events: Events) -> Self {
Self::with_spawner(client, pool, chain_events, Box::<TokioTaskExecutor>::default())
pub fn new(client: Client, pool: Pool, chain_events: Events, network: Network) -> Self {
Self::with_spawner(client, pool, chain_events, network, Box::<TokioTaskExecutor>::default())
}
/// Creates a new, shareable instance.
@ -42,18 +45,20 @@ impl<Client, Pool, Events> EthPubSub<Client, Pool, Events> {
client: Client,
pool: Pool,
chain_events: Events,
network: Network,
subscription_task_spawner: Box<dyn TaskSpawner>,
) -> Self {
let inner = EthPubSubInner { client, pool, chain_events };
let inner = EthPubSubInner { client, pool, chain_events, network };
Self { inner, subscription_task_spawner }
}
}
impl<Client, Pool, Events> EthPubSubApiServer for EthPubSub<Client, Pool, Events>
impl<Client, Pool, Events, Network> EthPubSubApiServer for EthPubSub<Client, Pool, Events, Network>
where
Client: BlockProvider + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
Network: SyncStateProvider + Clone + 'static,
{
fn subscribe(
&self,
@ -65,7 +70,7 @@ where
let pubsub = self.inner.clone();
self.subscription_task_spawner.spawn(Box::pin(async move {
handle_accepted(pubsub, sink, kind, params).await;
handle_accepted(pubsub, sink, kind, params, Box::<TokioTaskExecutor>::default()).await;
}));
Ok(())
@ -73,15 +78,17 @@ where
}
/// The actual handler for and accepted [`EthPubSub::subscribe`] call.
async fn handle_accepted<Client, Pool, Events>(
pubsub: EthPubSubInner<Client, Pool, Events>,
async fn handle_accepted<Client, Pool, Events, Network>(
pubsub: EthPubSubInner<Client, Pool, Events, Network>,
mut accepted_sink: SubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
subscription_task_spawner: Box<dyn TaskSpawner>,
) where
Client: BlockProvider + EvmEnvProvider + 'static,
Client: BlockProvider + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: ChainEventSubscriptions + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
Network: SyncStateProvider + Clone + 'static,
{
// if no params are provided, used default filter params
let _params = match params {
@ -106,12 +113,35 @@ async fn handle_accepted<Client, Pool, Events>(
accepted_sink.pipe_from_stream(stream).await;
}
SubscriptionKind::Syncing => {
// TODO subscribe new blocks -> read is_syncing from network
subscription_task_spawner.spawn(Box::pin(async move {
// get new block subscription
let mut new_blocks =
UnboundedReceiverStream::new(pubsub.chain_events.subscribe_new_blocks());
// get current sync status
let mut initial_sync_status = pubsub.network.is_syncing();
let current_sub_res = pubsub.sync_status(initial_sync_status).await;
// send the current status immediately
let _ = accepted_sink.send(&current_sub_res);
while (new_blocks.next().await).is_some() {
let current_syncing = pubsub.network.is_syncing();
// Only send a new response if the sync status has changed
if current_syncing != initial_sync_status {
// Update the sync status on each new block
initial_sync_status = current_syncing;
// send a new message now that the status changed
let sync_status = pubsub.sync_status(current_syncing).await;
let _ = accepted_sink.send(&sync_status);
}
}
}));
}
}
}
impl<Client, Pool, Events> std::fmt::Debug for EthPubSub<Client, Pool, Events> {
impl<Client, Pool, Events, Network> std::fmt::Debug for EthPubSub<Client, Pool, Events, Network> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthPubSub").finish_non_exhaustive()
}
@ -119,18 +149,41 @@ impl<Client, Pool, Events> std::fmt::Debug for EthPubSub<Client, Pool, Events> {
/// Container type `EthPubSub`
#[derive(Clone)]
struct EthPubSubInner<Client, Pool, Events> {
struct EthPubSubInner<Client, Pool, Events, Network> {
/// The transaction pool.
pool: Pool,
/// The client that can interact with the chain.
client: Client,
/// A type that allows to create new event subscriptions,
chain_events: Events,
/// The network.
network: Network,
}
// == impl EthPubSubInner ===
impl<Client, Pool, Events> EthPubSubInner<Client, Pool, Events>
impl<Client, Pool, Events, Network> EthPubSubInner<Client, Pool, Events, Network>
where
Client: BlockProvider + 'static,
{
/// Returns the current sync status for the `syncing` subscription
async fn sync_status(&self, is_syncing: bool) -> EthSubscriptionResult {
let current_block =
self.client.chain_info().map(|info| info.best_number).unwrap_or_default();
if is_syncing {
EthSubscriptionResult::SyncState(PubSubSyncStatus::Detailed(SyncStatusMetadata {
syncing: true,
starting_block: 0,
current_block,
highest_block: Some(current_block),
}))
} else {
EthSubscriptionResult::SyncState(PubSubSyncStatus::Simple(false))
}
}
}
impl<Client, Pool, Events, Network> EthPubSubInner<Client, Pool, Events, Network>
where
Pool: TransactionPool + 'static,
{
@ -140,10 +193,11 @@ where
}
}
impl<Client, Pool, Events> EthPubSubInner<Client, Pool, Events>
impl<Client, Pool, Events, Network> EthPubSubInner<Client, Pool, Events, Network>
where
Client: BlockProvider + EvmEnvProvider + 'static,
Events: ChainEventSubscriptions + 'static,
Network: SyncStateProvider + 'static,
{
/// Returns a stream that yields all new RPC blocks.
fn into_new_headers_stream(self) -> impl Stream<Item = Header> {