feat(rpc): integrate eth filter and eth pubsub (#1930)

This commit is contained in:
Matthias Seitz
2023-03-23 18:37:53 +01:00
committed by GitHub
parent 1d3cf5a86c
commit 84af91737d
8 changed files with 122 additions and 51 deletions

View File

@ -9,6 +9,7 @@ description = "Helpers for configuring RPC"
[dependencies]
# reth
reth-ipc = { path = "../ipc" }
reth-interfaces = { path = "../../interfaces" }
reth-network-api = { path = "../../net/network-api" }
reth-provider = { path = "../../storage/provider" }
reth-rpc = { path = "../rpc" }
@ -37,6 +38,7 @@ reth-rpc-api = { path = "../rpc-api", features = ["client"] }
reth-transaction-pool = { path = "../../transaction-pool", features = ["test-utils"] }
reth-provider = { path = "../../storage/provider", features = ["test-utils"] }
reth-network-api = { path = "../../net/network-api", features = ["test-utils"] }
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread"] }
serde_json = "1.0.94"

View File

@ -10,11 +10,11 @@ pub struct EthHandlers<Client, Pool, Network, Events> {
/// Main `eth_` request handler
pub api: EthApi<Client, Pool, Network>,
/// The async caching layer used by the eth handlers
pub eth_cache: EthStateCache,
pub cache: EthStateCache,
/// Polling based filter handler available on all transports
pub filter: EthFilter<Client, Pool>,
/// Handler for subscriptions only available for transports that support it (ws, ipc)
pub pubsub: Option<EthPubSub<Client, Pool, Events, Network>>,
pub pubsub: EthPubSub<Client, Pool, Events, Network>,
}
/// Additional config values for the eth namespace

View File

@ -24,16 +24,18 @@
//! Configure only a http server with a selection of [RethRpcModule]s
//!
//! ```
//! use reth_interfaces::events::ChainEventSubscriptions;
//! use reth_network_api::{NetworkInfo, Peers};
//! use reth_provider::{BlockProvider, StateProviderFactory, EvmEnvProvider};
//! use reth_rpc_builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, ServerBuilder, TransportRpcModuleConfig};
//! use reth_tasks::TokioTaskExecutor;
//! use reth_transaction_pool::TransactionPool;
//! pub async fn launch<Client, Pool, Network>(client: Client, pool: Pool, network: Network)
//! pub async fn launch<Client, Pool, Network, Events>(client: Client, pool: Pool, network: Network, events: Events)
//! where
//! Client: BlockProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static,
//! Pool: TransactionPool + Clone + 'static,
//! Network: NetworkInfo + Peers + Clone + 'static,
//! Events: ChainEventSubscriptions + Clone + 'static,
//! {
//! // configure the rpc module per transport
//! let transports = TransportRpcModuleConfig::default().with_http(vec![
@ -42,7 +44,7 @@
//! RethRpcModule::Eth,
//! RethRpcModule::Web3,
//! ]);
//! let transport_modules = RpcModuleBuilder::new(client, pool, network, TokioTaskExecutor::default()).build(transports);
//! let transport_modules = RpcModuleBuilder::new(client, pool, network, TokioTaskExecutor::default(), events).build(transports);
//! let handle = RpcServerConfig::default()
//! .with_http(ServerBuilder::default())
//! .start(transport_modules)
@ -60,7 +62,8 @@ use reth_ipc::server::IpcServer;
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory};
use reth_rpc::{
AdminApi, DebugApi, EthApi, EthFilter, EthSubscriptionIdProvider, NetApi, TraceApi, Web3Api,
AdminApi, DebugApi, EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, NetApi, TraceApi,
Web3Api,
};
use reth_rpc_api::servers::*;
use reth_transaction_pool::TransactionPool;
@ -90,6 +93,7 @@ mod eth;
pub mod constants;
pub use crate::eth::{EthConfig, EthHandlers};
use constants::*;
use reth_interfaces::events::ChainEventSubscriptions;
use reth_rpc::eth::cache::EthStateCache;
use reth_tasks::TaskSpawner;
@ -97,23 +101,25 @@ use reth_tasks::TaskSpawner;
mod cors;
/// Convenience function for starting a server in one step.
pub async fn launch<Client, Pool, Network, Tasks>(
pub async fn launch<Client, Pool, Network, Tasks, Events>(
client: Client,
pool: Pool,
network: Network,
module_config: impl Into<TransportRpcModuleConfig>,
server_config: impl Into<RpcServerConfig>,
executor: Tasks,
events: Events,
) -> Result<RpcServerHandle, RpcError>
where
Client: BlockProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
{
let module_config = module_config.into();
let server_config = server_config.into();
RpcModuleBuilder::new(client, pool, network, executor)
RpcModuleBuilder::new(client, pool, network, executor, events)
.build(module_config)
.start_server(server_config)
.await
@ -123,7 +129,7 @@ where
///
/// This is the main entrypoint for up RPC servers.
#[derive(Debug)]
pub struct RpcModuleBuilder<Client, Pool, Network, Tasks> {
pub struct RpcModuleBuilder<Client, Pool, Network, Tasks, Events> {
/// The Client type to when creating all rpc handlers
client: Client,
/// The Pool type to when creating all rpc handlers
@ -132,59 +138,77 @@ pub struct RpcModuleBuilder<Client, Pool, Network, Tasks> {
network: Network,
/// How additional tasks are spawned, for example in the eth pubsub namespace
executor: Tasks,
/// Provides access to chain events, such as new blocks, required by pubsub.
events: Events,
}
// === impl RpcBuilder ===
impl<Client, Pool, Network, Tasks> RpcModuleBuilder<Client, Pool, Network, Tasks> {
impl<Client, Pool, Network, Tasks, Events> RpcModuleBuilder<Client, Pool, Network, Tasks, Events> {
/// Create a new instance of the builder
pub fn new(client: Client, pool: Pool, network: Network, executor: Tasks) -> Self {
Self { client, pool, network, executor }
pub fn new(
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
events: Events,
) -> Self {
Self { client, pool, network, executor, events }
}
/// Configure the client instance.
pub fn with_client<C>(self, client: C) -> RpcModuleBuilder<C, Pool, Network, Tasks>
pub fn with_client<C>(self, client: C) -> RpcModuleBuilder<C, Pool, Network, Tasks, Events>
where
C: BlockProvider + StateProviderFactory + EvmEnvProvider + 'static,
{
let Self { pool, network, executor, .. } = self;
RpcModuleBuilder { client, network, pool, executor }
let Self { pool, network, executor, events, .. } = self;
RpcModuleBuilder { client, network, pool, executor, events }
}
/// Configure the transaction pool instance.
pub fn with_pool<P>(self, pool: P) -> RpcModuleBuilder<Client, P, Network, Tasks>
pub fn with_pool<P>(self, pool: P) -> RpcModuleBuilder<Client, P, Network, Tasks, Events>
where
P: TransactionPool + 'static,
{
let Self { client, network, executor, .. } = self;
RpcModuleBuilder { client, network, pool, executor }
let Self { client, network, executor, events, .. } = self;
RpcModuleBuilder { client, network, pool, executor, events }
}
/// Configure the network instance.
pub fn with_network<N>(self, network: N) -> RpcModuleBuilder<Client, Pool, N, Tasks>
pub fn with_network<N>(self, network: N) -> RpcModuleBuilder<Client, Pool, N, Tasks, Events>
where
N: NetworkInfo + Peers + 'static,
{
let Self { client, pool, executor, .. } = self;
RpcModuleBuilder { client, network, pool, executor }
let Self { client, pool, executor, events, .. } = self;
RpcModuleBuilder { client, network, pool, executor, events }
}
/// Configure the task executor to use for additional tasks.
pub fn with_executor<T>(self, executor: T) -> RpcModuleBuilder<Client, Pool, Network, T>
pub fn with_executor<T>(self, executor: T) -> RpcModuleBuilder<Client, Pool, Network, T, Events>
where
T: TaskSpawner + 'static,
{
let Self { pool, network, client, .. } = self;
RpcModuleBuilder { client, network, pool, executor }
let Self { pool, network, client, events, .. } = self;
RpcModuleBuilder { client, network, pool, executor, events }
}
/// Configure the event subscriber instance
pub fn with_events<E>(self, events: E) -> RpcModuleBuilder<Client, Pool, Network, Tasks, E>
where
E: ChainEventSubscriptions + 'static,
{
let Self { client, pool, executor, network, .. } = self;
RpcModuleBuilder { client, network, pool, executor, events }
}
}
impl<Client, Pool, Network, Tasks> RpcModuleBuilder<Client, Pool, Network, Tasks>
impl<Client, Pool, Network, Tasks, Events> RpcModuleBuilder<Client, Pool, Network, Tasks, Events>
where
Client: BlockProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
{
/// Configures all [RpcModule]s specific to the given [TransportRpcModuleConfig] which can be
/// used to start the transport server(s).
@ -193,7 +217,7 @@ where
pub fn build(self, module_config: TransportRpcModuleConfig) -> TransportRpcModules<()> {
let mut modules = TransportRpcModules::default();
let Self { client, pool, network, executor } = self;
let Self { client, pool, network, executor, events } = self;
if !module_config.is_empty() {
let TransportRpcModuleConfig { http, ws, ipc, config } = module_config;
@ -203,6 +227,7 @@ where
pool,
network,
executor,
events,
config.unwrap_or_default(),
);
@ -215,9 +240,9 @@ where
}
}
impl Default for RpcModuleBuilder<(), (), (), ()> {
impl Default for RpcModuleBuilder<(), (), (), (), ()> {
fn default() -> Self {
RpcModuleBuilder::new((), (), (), ())
RpcModuleBuilder::new((), (), (), (), ())
}
}
@ -321,12 +346,13 @@ impl RpcModuleSelection {
/// Note: This will always create new instance of the module handlers and is therefor only
/// recommended for launching standalone transports. If multiple transports need to be
/// configured it's recommended to use the [RpcModuleBuilder].
pub fn standalone_module<Client, Pool, Network, Tasks>(
pub fn standalone_module<Client, Pool, Network, Tasks, Events>(
&self,
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
events: Events,
config: RpcModuleConfig,
) -> RpcModule<()>
where
@ -334,8 +360,9 @@ impl RpcModuleSelection {
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
{
let mut registry = RethModuleRegistry::new(client, pool, network, executor, config);
let mut registry = RethModuleRegistry::new(client, pool, network, executor, events, config);
registry.module_for(self)
}
@ -415,31 +442,44 @@ impl Serialize for RethRpcModule {
}
/// A Helper type the holds instances of the configured modules.
pub struct RethModuleRegistry<Client, Pool, Network, Tasks> {
pub struct RethModuleRegistry<Client, Pool, Network, Tasks, Events> {
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
events: Events,
/// Additional settings for handlers.
config: RpcModuleConfig,
/// Holds a clone of all the eth namespace handlers
eth: Option<EthHandlers<Client, Pool, Network, ()>>,
eth: Option<EthHandlers<Client, Pool, Network, Events>>,
/// Contains the [Methods] of a module
modules: HashMap<RethRpcModule, Methods>,
}
// === impl RethModuleRegistry ===
impl<Client, Pool, Network, Tasks> RethModuleRegistry<Client, Pool, Network, Tasks> {
impl<Client, Pool, Network, Tasks, Events>
RethModuleRegistry<Client, Pool, Network, Tasks, Events>
{
/// Creates a new, empty instance.
pub fn new(
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
events: Events,
config: RpcModuleConfig,
) -> Self {
Self { client, pool, network, eth: None, executor, modules: Default::default(), config }
Self {
client,
pool,
network,
eth: None,
executor,
modules: Default::default(),
config,
events,
}
}
/// Returns all installed methods
@ -457,7 +497,7 @@ impl<Client, Pool, Network, Tasks> RethModuleRegistry<Client, Pool, Network, Tas
}
}
impl<Client, Pool, Network, Tasks> RethModuleRegistry<Client, Pool, Network, Tasks>
impl<Client, Pool, Network, Tasks, Events> RethModuleRegistry<Client, Pool, Network, Tasks, Events>
where
Network: NetworkInfo + Peers + Clone + 'static,
{
@ -476,12 +516,13 @@ where
}
}
impl<Client, Pool, Network, Tasks> RethModuleRegistry<Client, Pool, Network, Tasks>
impl<Client, Pool, Network, Tasks, Events> RethModuleRegistry<Client, Pool, Network, Tasks, Events>
where
Client: BlockProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
{
/// Register Eth Namespace
pub fn register_eth(&mut self) -> &mut Self {
@ -536,8 +577,8 @@ where
&mut self,
namespaces: impl Iterator<Item = RethRpcModule>,
) -> Vec<Methods> {
let eth_api = self.eth_api();
let eth_cache = self.eth_cache();
let EthHandlers { api: eth_api, cache: eth_cache, filter: eth_filter, pubsub: eth_pubsub } =
self.with_eth(|eth| eth.clone());
namespaces
.map(|namespace| {
self.modules
@ -549,7 +590,14 @@ where
RethRpcModule::Debug => {
DebugApi::new(self.client.clone(), eth_api.clone()).into_rpc().into()
}
RethRpcModule::Eth => eth_api.clone().into_rpc().into(),
RethRpcModule::Eth => {
// merge all eth handlers
let mut module = eth_api.clone().into_rpc();
module.merge(eth_filter.clone().into_rpc()).expect("No conflicts");
module.merge(eth_pubsub.clone().into_rpc()).expect("No conflicts");
module.into()
}
RethRpcModule::Net => {
NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into()
}
@ -570,16 +618,16 @@ where
/// This will spawn exactly one [EthStateCache] service if this is the first time the cache is
/// requested.
pub fn eth_cache(&mut self) -> EthStateCache {
self.with_eth(|handlers| handlers.eth_cache.clone())
self.with_eth(|handlers| handlers.cache.clone())
}
/// Creates the [EthHandlers] type the first time this is called.
fn with_eth<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&EthHandlers<Client, Pool, Network, ()>) -> R,
F: FnOnce(&EthHandlers<Client, Pool, Network, Events>) -> R,
{
if self.eth.is_none() {
let eth_cache = EthStateCache::spawn_with(
let cache = EthStateCache::spawn_with(
self.client.clone(),
self.config.eth.cache.clone(),
self.executor.clone(),
@ -588,13 +636,18 @@ where
self.client.clone(),
self.pool.clone(),
self.network.clone(),
eth_cache.clone(),
cache.clone(),
);
let filter = EthFilter::new(self.client.clone(), self.pool.clone());
// TODO: install pubsub
let pubsub = EthPubSub::new(
self.client.clone(),
self.pool.clone(),
self.events.clone(),
self.network.clone(),
);
let eth = EthHandlers { api, eth_cache, filter, pubsub: None };
let eth = EthHandlers { api, cache, filter, pubsub };
self.eth = Some(eth);
}
f(self.eth.as_ref().expect("exists; qed"))

View File

@ -1,3 +1,4 @@
use reth_interfaces::test_utils::TestChainEventSubscriptions;
use reth_network_api::test_utils::NoopNetwork;
use reth_provider::test_utils::NoopProvider;
use reth_rpc_builder::{
@ -51,11 +52,17 @@ pub async fn launch_http_ws(modules: impl Into<RpcModuleSelection>) -> RpcServer
}
/// Returns an [RpcModuleBuilder] with testing components.
pub fn test_rpc_builder() -> RpcModuleBuilder<NoopProvider, TestPool, NoopNetwork, TokioTaskExecutor>
{
pub fn test_rpc_builder() -> RpcModuleBuilder<
NoopProvider,
TestPool,
NoopNetwork,
TokioTaskExecutor,
TestChainEventSubscriptions,
> {
RpcModuleBuilder::default()
.with_client(NoopProvider::default())
.with_pool(testing_pool())
.with_network(NoopNetwork::default())
.with_executor(TokioTaskExecutor::default())
.with_events(TestChainEventSubscriptions::default())
}

View File

@ -3,7 +3,8 @@
use crate::eth::logs_utils;
use futures::StreamExt;
use jsonrpsee::{types::SubscriptionResult, SubscriptionSink};
use reth_interfaces::{events::ChainEventSubscriptions, sync::SyncStateProvider};
use reth_interfaces::events::ChainEventSubscriptions;
use reth_network_api::NetworkInfo;
use reth_primitives::{filter::FilteredParams, BlockId, TxHash};
use reth_provider::{BlockProvider, EvmEnvProvider};
use reth_rpc_api::EthPubSubApiServer;
@ -60,7 +61,7 @@ where
Client: BlockProvider + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
Network: SyncStateProvider + Clone + 'static,
Network: NetworkInfo + Clone + 'static,
{
/// Handler for `eth_subscribe`
fn subscribe(
@ -91,7 +92,7 @@ async fn handle_accepted<Client, Pool, Events, Network>(
Client: BlockProvider + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: ChainEventSubscriptions + Clone + 'static,
Network: SyncStateProvider + Clone + 'static,
Network: NetworkInfo + Clone + 'static,
{
match kind {
SubscriptionKind::NewHeads => {
@ -201,7 +202,7 @@ impl<Client, Pool, Events, Network> EthPubSubInner<Client, Pool, Events, Network
where
Client: BlockProvider + EvmEnvProvider + 'static,
Events: ChainEventSubscriptions + 'static,
Network: SyncStateProvider + 'static,
Network: NetworkInfo + 'static,
{
/// Returns a stream that yields all new RPC blocks.
fn into_new_headers_stream(self) -> impl Stream<Item = Header> {