feat(rpc): work on subscriptions (#1290)

This commit is contained in:
Matthias Seitz
2023-02-15 06:37:26 +01:00
committed by GitHub
parent 32a1966ebf
commit 78d06fd769
5 changed files with 110 additions and 23 deletions

View File

@ -1,14 +1,14 @@
use jsonrpsee::proc_macros::rpc;
use reth_rpc_types::pubsub::{Kind, Params};
use reth_rpc_types::pubsub::{Params, SubscriptionKind};
/// Ethereum pub-sub rpc interface.
#[rpc(server)]
pub trait EthPubSubApi {
/// Create an ethereum subscription.
/// Create an ethereum subscription for the given params
#[subscription(
name = "eth_subscribe",
unsubscribe = "eth_unsubscribe",
item = reth_rpc_types::pubsub::SubscriptionResult
)]
fn subscribe(&self, kind: Kind, params: Option<Params>);
fn subscribe(&self, kind: SubscriptionKind, params: Option<Params>);
}

View File

@ -58,14 +58,34 @@ impl Serialize for SubscriptionResult {
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Hash, Clone)]
#[serde(deny_unknown_fields)]
#[serde(rename_all = "camelCase")]
pub enum Kind {
pub enum SubscriptionKind {
/// New block headers subscription.
///
/// Fires a notification each time a new header is appended to the chain, including chain
/// reorganizations. In case of a chain reorganization the subscription will emit all new
/// headers for the new chain. Therefore the subscription can emit multiple headers on the same
/// height.
NewHeads,
/// Logs subscription.
///
/// Returns logs that are included in new imported blocks and match the given filter criteria.
/// In case of a chain reorganization previous sent logs that are on the old chain will be
/// resent with the removed property set to true. Logs from transactions that ended up in the
/// new chain are emitted. Therefore, a subscription can emit logs for the same transaction
/// multiple times.
Logs,
/// New Pending Transactions subscription.
///
/// Returns the hash for all transactions that are added to the pending state and are signed
/// with a key that is available in the node. When a transaction that was previously part of
/// the canonical chain isn't part of the new canonical chain after a reogranization its again
/// emitted.
NewPendingTransactions,
/// Node syncing status subscription.
///
/// Indicates when the node starts or stops synchronizing. The result can either be a boolean
/// indicating that the synchronization has started (true), finished (false) or an object with
/// various progress indicators.
Syncing,
}

View File

@ -19,6 +19,7 @@ reth-provider = { path = "../../storage/provider", features = ["test-utils"] }
reth-transaction-pool = { path = "../../transaction-pool", features=["test-utils"]}
reth-network-api = { path = "../../net/network-api" }
reth-rpc-engine-api = { path = "../rpc-engine-api" }
reth-tasks = { path = "../../tasks" }
# rpc
jsonrpsee = { version = "0.16" }

View File

@ -1,63 +1,128 @@
//! `eth_` PubSub RPC handler implementation
use jsonrpsee::{types::SubscriptionResult, SubscriptionSink};
use reth_primitives::{rpc::FilteredParams, TxHash};
use reth_provider::BlockProvider;
use reth_rpc_api::EthPubSubApiServer;
use reth_rpc_types::pubsub::{Kind, Params};
use reth_rpc_types::pubsub::{
Params, SubscriptionKind, SubscriptionResult as EthSubscriptionResult,
};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use reth_transaction_pool::TransactionPool;
use std::sync::Arc;
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt};
/// `Eth` pubsub RPC implementation.
///
/// This handles
#[derive(Debug, Clone)]
pub struct EthPubSub<Client, Pool> {
/// All nested fields bundled together.
inner: Arc<EthPubSubInner<Client, Pool>>,
inner: EthPubSubInner<Client, Pool>,
/// The type that's used to spawn subscription tasks.
subscription_task_spawner: Box<dyn TaskSpawner>,
}
// === impl EthPubSub ===
impl<Client, Pool> EthPubSub<Client, Pool> {
/// Creates a new, shareable instance.
pub fn new(client: Arc<Client>, pool: Pool) -> Self {
///
/// Subscription tasks are spawned via [tokio::task::spawn]
pub fn new(client: Client, pool: Pool) -> Self {
Self::with_spawner(client, pool, Box::<TokioTaskExecutor>::default())
}
/// Creates a new, shareable instance.
pub fn with_spawner(
client: Client,
pool: Pool,
subscription_task_spawner: Box<dyn TaskSpawner>,
) -> Self {
let inner = EthPubSubInner { client, pool };
Self { inner: Arc::new(inner) }
Self { inner, subscription_task_spawner }
}
}
impl<Client, Pool> EthPubSubApiServer for EthPubSub<Client, Pool>
where
Client: BlockProvider + 'static,
Client: BlockProvider + Clone + 'static,
Pool: TransactionPool + 'static,
{
fn subscribe(
&self,
mut sink: SubscriptionSink,
_kind: Kind,
_params: Option<Params>,
kind: SubscriptionKind,
params: Option<Params>,
) -> SubscriptionResult {
sink.accept()?;
todo!()
let pubsub = self.inner.clone();
self.subscription_task_spawner.spawn(Box::pin(async move {
handle_accepted(pubsub, sink, kind, params).await;
}));
Ok(())
}
}
/// The actual handler for and accepted [`EthPubSub::subscribe`] call.
async fn handle_accepted<Client, Pool>(
_pool: Pool,
_client: Arc<Client>,
_accepted_sink: SubscriptionSink,
_kind: Kind,
_params: Option<Params>,
) {
pubsub: EthPubSubInner<Client, Pool>,
mut accepted_sink: SubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
) where
Client: BlockProvider + 'static,
Pool: TransactionPool + 'static,
{
// if no params are provided, used default filter params
let _params = match params {
Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)),
_ => FilteredParams::default(),
};
match kind {
SubscriptionKind::NewHeads => {
// TODO subscribe new blocks -> convert
}
SubscriptionKind::Logs => {
// TODO subscribe new blocks -> fetch logs via bloom
}
SubscriptionKind::NewPendingTransactions => {
let stream = pubsub
.into_pending_transaction_stream()
.map(EthSubscriptionResult::TransactionHash);
accepted_sink.pipe_from_stream(stream).await;
}
SubscriptionKind::Syncing => {
// TODO subscribe new blocks -> read is_syncing from network
}
}
}
impl<Client, Pool> std::fmt::Debug for EthPubSub<Client, Pool> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthPubSub").finish_non_exhaustive()
}
}
/// Container type `EthPubSub`
#[derive(Debug)]
#[derive(Clone)]
struct EthPubSubInner<Client, Pool> {
/// The transaction pool.
pool: Pool,
/// The client that can interact with the chain.
client: Arc<Client>,
// TODO needs spawn access
client: Client,
}
// == impl EthPubSubInner ===
impl<Client, Pool> EthPubSubInner<Client, Pool>
where
Client: BlockProvider + 'static,
Pool: TransactionPool + 'static,
{
/// Returns a stream that yields all transactions emitted by the txpool.
fn into_pending_transaction_stream(self) -> impl Stream<Item = TxHash> {
ReceiverStream::new(self.pool.pending_transactions_listener())
}
}