refactor: remove Events generic from RPC types (#14033)

This commit is contained in:
Arsenii Kulikov
2025-01-28 17:29:27 +04:00
committed by GitHub
parent 42dc1ed04b
commit 0f2e2faeda
9 changed files with 146 additions and 306 deletions

View File

@ -9,12 +9,12 @@ use reth_rpc_eth_types::{
use reth_tasks::TaskSpawner;
/// Alias for `eth` namespace API builder.
pub type DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, Events, EthApi> =
Box<dyn FnOnce(&EthApiBuilderCtx<Provider, Pool, EvmConfig, Network, Tasks, Events>) -> EthApi>;
pub type DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, EthApi> =
Box<dyn FnOnce(&EthApiBuilderCtx<Provider, Pool, EvmConfig, Network, Tasks>) -> EthApi>;
/// Handlers for core, filter and pubsub `eth` namespace APIs.
#[derive(Debug, Clone)]
pub struct EthHandlers<Provider: BlockReader, Events, EthApi: EthApiTypes> {
pub struct EthHandlers<Provider: BlockReader, EthApi: EthApiTypes> {
/// Main `eth_` request handler
pub api: EthApi,
/// The async caching layer used by the eth handlers
@ -22,19 +22,18 @@ pub struct EthHandlers<Provider: BlockReader, Events, EthApi: EthApiTypes> {
/// Polling based filter handler available on all transports
pub filter: EthFilter<EthApi>,
/// Handler for subscriptions only available for transports that support it (ws, ipc)
pub pubsub: EthPubSub<EthApi, Events>,
pub pubsub: EthPubSub<EthApi>,
}
impl<Provider, Events, EthApi> EthHandlers<Provider, Events, EthApi>
impl<N, Provider, EthApi> EthHandlers<Provider, EthApi>
where
N: NodePrimitives,
Provider: StateProviderFactory
+ BlockReader<
Block = <Events::Primitives as NodePrimitives>::Block,
Receipt = <Events::Primitives as NodePrimitives>::Receipt,
> + Clone
+ BlockReader<Block = N::Block, Receipt = N::Receipt>
+ Clone
+ CanonStateSubscriptions<Primitives = N>
+ Unpin
+ 'static,
Events: CanonStateSubscriptions + Clone + 'static,
EthApi: EthApiTypes + 'static,
{
/// Returns a new instance with handlers for `eth` namespace.
@ -48,16 +47,7 @@ where
evm_config: EvmConfig,
config: EthConfig,
executor: Tasks,
events: Events,
eth_api_builder: DynEthApiBuilder<
Provider,
Pool,
EvmConfig,
Network,
Tasks,
Events,
EthApi,
>,
eth_api_builder: DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, EthApi>,
) -> Self
where
EvmConfig: ConfigureEvm<Header = Provider::Header>,
@ -65,7 +55,7 @@ where
{
let cache = EthStateCache::spawn_with(provider.clone(), config.cache, executor.clone());
let new_canonical_blocks = events.canonical_state_stream();
let new_canonical_blocks = provider.canonical_state_stream();
let c = cache.clone();
executor.spawn_critical(
"cache canonical blocks task",
@ -74,27 +64,14 @@ where
}),
);
let ctx = EthApiBuilderCtx {
provider,
pool,
network,
evm_config,
config,
executor,
events,
cache,
};
let ctx = EthApiBuilderCtx { provider, pool, network, evm_config, config, executor, cache };
let api = eth_api_builder(&ctx);
let filter =
EthFilter::new(api.clone(), ctx.config.filter_config(), Box::new(ctx.executor.clone()));
let pubsub = EthPubSub::with_spawner(
api.clone(),
ctx.events.clone(),
Box::new(ctx.executor.clone()),
);
let pubsub = EthPubSub::with_spawner(api.clone(), Box::new(ctx.executor.clone()));
Self { api, cache: ctx.cache, filter, pubsub }
}

View File

@ -31,11 +31,10 @@
//! use reth_transaction_pool::{PoolTransaction, TransactionPool};
//! use std::sync::Arc;
//!
//! pub async fn launch<Provider, Pool, Network, Events, BlockExecutor, Consensus, Validator>(
//! pub async fn launch<Provider, Pool, Network, BlockExecutor, Consensus, Validator>(
//! provider: Provider,
//! pool: Pool,
//! network: Network,
//! events: Events,
//! evm_config: EthEvmConfig,
//! block_executor: BlockExecutor,
//! consensus: Consensus,
@ -47,7 +46,8 @@
//! Receipt = reth_primitives::Receipt,
//! Header = reth_primitives::Header,
//! > + AccountReader
//! + ChangeSetReader,
//! + ChangeSetReader
//! + CanonStateSubscriptions<Primitives = reth_primitives::EthPrimitives>,
//! Pool: TransactionPool<
//! Transaction: PoolTransaction<
//! Consensus = TransactionSigned,
@ -56,9 +56,7 @@
//! > + Unpin
//! + 'static,
//! Network: NetworkInfo + Peers + Clone + 'static,
//! Events:
//! CanonStateSubscriptions<Primitives = reth_primitives::EthPrimitives> + Clone + 'static,
//! BlockExecutor: BlockExecutorProvider<Primitives = Events::Primitives>,
//! BlockExecutor: BlockExecutorProvider<Primitives = Provider::Primitives>,
//! Consensus: FullConsensus<Error = ConsensusError> + Clone + 'static,
//! Validator: PayloadValidator<Block = reth_primitives::Block>,
//! {
@ -74,7 +72,6 @@
//! pool,
//! network,
//! TokioTaskExecutor::default(),
//! events,
//! evm_config,
//! block_executor,
//! consensus,
@ -114,7 +111,6 @@
//! Provider,
//! Pool,
//! Network,
//! Events,
//! EngineApi,
//! EngineT,
//! BlockExecutor,
@ -124,7 +120,6 @@
//! provider: Provider,
//! pool: Pool,
//! network: Network,
//! events: Events,
//! engine_api: EngineApi,
//! evm_config: EthEvmConfig,
//! block_executor: BlockExecutor,
@ -137,7 +132,8 @@
//! Receipt = reth_primitives::Receipt,
//! Header = reth_primitives::Header,
//! > + AccountReader
//! + ChangeSetReader,
//! + ChangeSetReader
//! + CanonStateSubscriptions<Primitives = reth_primitives::EthPrimitives>,
//! Pool: TransactionPool<
//! Transaction: PoolTransaction<
//! Consensus = TransactionSigned,
@ -146,11 +142,9 @@
//! > + Unpin
//! + 'static,
//! Network: NetworkInfo + Peers + Clone + 'static,
//! Events:
//! CanonStateSubscriptions<Primitives = reth_primitives::EthPrimitives> + Clone + 'static,
//! EngineApi: EngineApiServer<EngineT>,
//! EngineT: EngineTypes,
//! BlockExecutor: BlockExecutorProvider<Primitives = Events::Primitives>,
//! BlockExecutor: BlockExecutorProvider<Primitives = Provider::Primitives>,
//! Consensus: FullConsensus<Error = ConsensusError> + Clone + 'static,
//! Validator: PayloadValidator<Block = reth_primitives::Block>,
//! {
@ -166,7 +160,6 @@
//! pool,
//! network,
//! TokioTaskExecutor::default(),
//! events,
//! evm_config,
//! block_executor,
//! consensus,
@ -279,16 +272,15 @@ pub mod rate_limiter;
/// Convenience function for starting a server in one step.
#[allow(clippy::too_many_arguments)]
pub async fn launch<Provider, Pool, Network, Tasks, Events, EvmConfig, EthApi, BlockExecutor>(
pub async fn launch<Provider, Pool, Network, Tasks, EvmConfig, EthApi, BlockExecutor>(
provider: Provider,
pool: Pool,
network: Network,
module_config: impl Into<TransportRpcModuleConfig>,
server_config: impl Into<RpcServerConfig>,
executor: Tasks,
events: Events,
evm_config: EvmConfig,
eth: DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, Events, EthApi>,
eth: DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, EthApi>,
block_executor: BlockExecutor,
consensus: Arc<dyn FullConsensus<BlockExecutor::Primitives, Error = ConsensusError>>,
payload_validator: Arc<dyn PayloadValidator<Block = Provider::Block>>,
@ -298,12 +290,12 @@ where
Block = ProviderBlock<EthApi::Provider>,
Receipt = ProviderReceipt<EthApi::Provider>,
Header = ProviderHeader<EthApi::Provider>,
> + AccountReader
> + CanonStateSubscriptions<Primitives = BlockExecutor::Primitives>
+ AccountReader
+ ChangeSetReader,
Pool: TransactionPool<Transaction = <EthApi::Pool as TransactionPool>::Transaction> + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions<Primitives = BlockExecutor::Primitives> + Clone + 'static,
EvmConfig: ConfigureEvm<
Header = <BlockExecutor::Primitives as NodePrimitives>::BlockHeader,
Transaction = <BlockExecutor::Primitives as NodePrimitives>::SignedTx,
@ -313,7 +305,7 @@ where
Block = <BlockExecutor::Primitives as NodePrimitives>::Block,
Receipt = <BlockExecutor::Primitives as NodePrimitives>::Receipt,
Header = <BlockExecutor::Primitives as NodePrimitives>::BlockHeader,
>,
> + CanonStateSubscriptions<Primitives = BlockExecutor::Primitives>,
>,
BlockExecutor: BlockExecutorProvider,
{
@ -326,7 +318,6 @@ where
pool,
network,
executor,
events,
evm_config,
block_executor,
consensus,
@ -340,16 +331,7 @@ where
///
/// This is the main entrypoint and the easiest way to configure an RPC server.
#[derive(Debug, Clone)]
pub struct RpcModuleBuilder<
Provider,
Pool,
Network,
Tasks,
Events,
EvmConfig,
BlockExecutor,
Consensus,
> {
pub struct RpcModuleBuilder<Provider, Pool, Network, Tasks, EvmConfig, BlockExecutor, Consensus> {
/// The Provider type to when creating all rpc handlers
provider: Provider,
/// The Pool type to when creating all rpc handlers
@ -358,8 +340,6 @@ pub struct RpcModuleBuilder<
network: Network,
/// How additional tasks are spawned, for example in the eth pubsub namespace
executor: Tasks,
/// Provides access to chain events, such as new blocks, required by pubsub.
events: Events,
/// Defines how the EVM should be configured before execution.
evm_config: EvmConfig,
/// The provider for getting a block executor that executes blocks
@ -370,8 +350,8 @@ pub struct RpcModuleBuilder<
// === impl RpcBuilder ===
impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus>
RpcModuleBuilder<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus>
impl<Provider, Pool, Network, Tasks, EvmConfig, BlockExecutor, Consensus>
RpcModuleBuilder<Provider, Pool, Network, Tasks, EvmConfig, BlockExecutor, Consensus>
{
/// Create a new instance of the builder
#[allow(clippy::too_many_arguments)]
@ -380,30 +360,27 @@ impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus
pool: Pool,
network: Network,
executor: Tasks,
events: Events,
evm_config: EvmConfig,
block_executor: BlockExecutor,
consensus: Consensus,
) -> Self {
Self { provider, pool, network, executor, events, evm_config, block_executor, consensus }
Self { provider, pool, network, executor, evm_config, block_executor, consensus }
}
/// Configure the provider instance.
pub fn with_provider<P>(
self,
provider: P,
) -> RpcModuleBuilder<P, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus>
) -> RpcModuleBuilder<P, Pool, Network, Tasks, EvmConfig, BlockExecutor, Consensus>
where
P: BlockReader + StateProviderFactory + 'static,
{
let Self { pool, network, executor, events, evm_config, block_executor, consensus, .. } =
self;
let Self { pool, network, executor, evm_config, block_executor, consensus, .. } = self;
RpcModuleBuilder {
provider,
network,
pool,
executor,
events,
evm_config,
block_executor,
consensus,
@ -414,19 +391,16 @@ impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus
pub fn with_pool<P>(
self,
pool: P,
) -> RpcModuleBuilder<Provider, P, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus>
) -> RpcModuleBuilder<Provider, P, Network, Tasks, EvmConfig, BlockExecutor, Consensus>
where
P: TransactionPool + 'static,
{
let Self {
provider, network, executor, events, evm_config, block_executor, consensus, ..
} = self;
let Self { provider, network, executor, evm_config, block_executor, consensus, .. } = self;
RpcModuleBuilder {
provider,
network,
pool,
executor,
events,
evm_config,
block_executor,
consensus,
@ -445,18 +419,14 @@ impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus
NoopTransactionPool,
Network,
Tasks,
Events,
EvmConfig,
BlockExecutor,
Consensus,
> {
let Self {
provider, executor, events, network, evm_config, block_executor, consensus, ..
} = self;
let Self { provider, executor, network, evm_config, block_executor, consensus, .. } = self;
RpcModuleBuilder {
provider,
executor,
events,
network,
evm_config,
block_executor,
@ -469,19 +439,16 @@ impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus
pub fn with_network<N>(
self,
network: N,
) -> RpcModuleBuilder<Provider, Pool, N, Tasks, Events, EvmConfig, BlockExecutor, Consensus>
) -> RpcModuleBuilder<Provider, Pool, N, Tasks, EvmConfig, BlockExecutor, Consensus>
where
N: NetworkInfo + Peers + 'static,
{
let Self {
provider, pool, executor, events, evm_config, block_executor, consensus, ..
} = self;
let Self { provider, pool, executor, evm_config, block_executor, consensus, .. } = self;
RpcModuleBuilder {
provider,
network,
pool,
executor,
events,
evm_config,
block_executor,
consensus,
@ -495,24 +462,13 @@ impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus
/// [`EthApi`](reth_rpc::eth::EthApi) which requires a [`NetworkInfo`] implementation.
pub fn with_noop_network(
self,
) -> RpcModuleBuilder<
Provider,
Pool,
NoopNetwork,
Tasks,
Events,
EvmConfig,
BlockExecutor,
Consensus,
> {
let Self {
provider, pool, executor, events, evm_config, block_executor, consensus, ..
} = self;
) -> RpcModuleBuilder<Provider, Pool, NoopNetwork, Tasks, EvmConfig, BlockExecutor, Consensus>
{
let Self { provider, pool, executor, evm_config, block_executor, consensus, .. } = self;
RpcModuleBuilder {
provider,
pool,
executor,
events,
network: NoopNetwork::default(),
evm_config,
block_executor,
@ -524,18 +480,16 @@ impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus
pub fn with_executor<T>(
self,
executor: T,
) -> RpcModuleBuilder<Provider, Pool, Network, T, Events, EvmConfig, BlockExecutor, Consensus>
) -> RpcModuleBuilder<Provider, Pool, Network, T, EvmConfig, BlockExecutor, Consensus>
where
T: TaskSpawner + 'static,
{
let Self { pool, network, provider, events, evm_config, block_executor, consensus, .. } =
self;
let Self { pool, network, provider, evm_config, block_executor, consensus, .. } = self;
RpcModuleBuilder {
provider,
network,
pool,
executor,
events,
evm_config,
block_executor,
consensus,
@ -553,18 +507,15 @@ impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus
Pool,
Network,
TokioTaskExecutor,
Events,
EvmConfig,
BlockExecutor,
Consensus,
> {
let Self { pool, network, provider, events, evm_config, block_executor, consensus, .. } =
self;
let Self { pool, network, provider, evm_config, block_executor, consensus, .. } = self;
RpcModuleBuilder {
provider,
network,
pool,
events,
executor: TokioTaskExecutor::default(),
evm_config,
block_executor,
@ -572,45 +523,20 @@ impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus
}
}
/// Configure the event subscriber instance
pub fn with_events<E>(
self,
events: E,
) -> RpcModuleBuilder<Provider, Pool, Network, Tasks, E, EvmConfig, BlockExecutor, Consensus>
where
E: CanonStateSubscriptions + 'static,
{
let Self {
provider, pool, executor, network, evm_config, block_executor, consensus, ..
} = self;
RpcModuleBuilder {
provider,
network,
pool,
executor,
events,
evm_config,
block_executor,
consensus,
}
}
/// Configure the evm configuration type
pub fn with_evm_config<E>(
self,
evm_config: E,
) -> RpcModuleBuilder<Provider, Pool, Network, Tasks, Events, E, BlockExecutor, Consensus>
) -> RpcModuleBuilder<Provider, Pool, Network, Tasks, E, BlockExecutor, Consensus>
where
E: ConfigureEvm + 'static,
{
let Self { provider, pool, executor, network, events, block_executor, consensus, .. } =
self;
let Self { provider, pool, executor, network, block_executor, consensus, .. } = self;
RpcModuleBuilder {
provider,
network,
pool,
executor,
events,
evm_config,
block_executor,
consensus,
@ -621,17 +547,16 @@ impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus
pub fn with_block_executor<BE>(
self,
block_executor: BE,
) -> RpcModuleBuilder<Provider, Pool, Network, Tasks, Events, EvmConfig, BE, Consensus>
) -> RpcModuleBuilder<Provider, Pool, Network, Tasks, EvmConfig, BE, Consensus>
where
BE: BlockExecutorProvider,
{
let Self { provider, network, pool, executor, events, evm_config, consensus, .. } = self;
let Self { provider, network, pool, executor, evm_config, consensus, .. } = self;
RpcModuleBuilder {
provider,
network,
pool,
executor,
events,
evm_config,
block_executor,
consensus,
@ -642,15 +567,13 @@ impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus
pub fn with_consensus<C>(
self,
consensus: C,
) -> RpcModuleBuilder<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, C> {
let Self { provider, network, pool, executor, events, evm_config, block_executor, .. } =
self;
) -> RpcModuleBuilder<Provider, Pool, Network, Tasks, EvmConfig, BlockExecutor, C> {
let Self { provider, network, pool, executor, evm_config, block_executor, .. } = self;
RpcModuleBuilder {
provider,
network,
pool,
executor,
events,
evm_config,
block_executor,
consensus,
@ -658,19 +581,19 @@ impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus
}
}
impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus>
RpcModuleBuilder<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus>
impl<Provider, Pool, Network, Tasks, EvmConfig, BlockExecutor, Consensus>
RpcModuleBuilder<Provider, Pool, Network, Tasks, EvmConfig, BlockExecutor, Consensus>
where
Provider: FullRpcProvider<
Block = <Events::Primitives as NodePrimitives>::Block,
Receipt = <Events::Primitives as NodePrimitives>::Receipt,
Header = <Events::Primitives as NodePrimitives>::BlockHeader,
> + AccountReader
Block = <BlockExecutor::Primitives as NodePrimitives>::Block,
Receipt = <BlockExecutor::Primitives as NodePrimitives>::Receipt,
Header = <BlockExecutor::Primitives as NodePrimitives>::BlockHeader,
> + CanonStateSubscriptions<Primitives = BlockExecutor::Primitives>
+ AccountReader
+ ChangeSetReader,
Pool: TransactionPool + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions<Primitives = BlockExecutor::Primitives> + Clone + 'static,
EvmConfig: ConfigureEvm<
Header = <BlockExecutor::Primitives as NodePrimitives>::BlockHeader,
Transaction = <BlockExecutor::Primitives as NodePrimitives>::SignedTx,
@ -689,34 +612,26 @@ where
self,
module_config: TransportRpcModuleConfig,
engine: EngineApi,
eth: DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, Events, EthApi>,
eth: DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, EthApi>,
payload_validator: Arc<dyn PayloadValidator<Block = Provider::Block>>,
) -> (
TransportRpcModules,
AuthRpcModule,
RpcRegistryInner<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>,
RpcRegistryInner<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>,
)
where
EngineT: EngineTypes,
EngineApi: EngineApiServer<EngineT>,
EthApi: FullEthApiServer<
Provider: BlockReader<
Block = <Events::Primitives as NodePrimitives>::Block,
Receipt = <Events::Primitives as NodePrimitives>::Receipt,
Header = <Events::Primitives as NodePrimitives>::BlockHeader,
>,
Block = <BlockExecutor::Primitives as NodePrimitives>::Block,
Receipt = <BlockExecutor::Primitives as NodePrimitives>::Receipt,
Header = <BlockExecutor::Primitives as NodePrimitives>::BlockHeader,
> + CanonStateSubscriptions<Primitives = BlockExecutor::Primitives>,
>,
{
let Self {
provider,
pool,
network,
executor,
events,
evm_config,
block_executor,
consensus,
} = self;
let Self { provider, pool, network, executor, evm_config, block_executor, consensus } =
self;
let config = module_config.config.clone().unwrap_or_default();
@ -725,7 +640,6 @@ where
pool,
network,
executor,
events,
consensus,
config,
evm_config,
@ -772,7 +686,6 @@ where
/// .with_pool(NoopTransactionPool::default())
/// .with_network(NoopNetwork::default())
/// .with_executor(TokioTaskExecutor::default())
/// .with_events(TestCanonStateSubscriptions::default())
/// .with_evm_config(evm)
/// .with_block_executor(EthExecutorProvider::mainnet())
/// .with_consensus(NoopConsensus::default())
@ -784,28 +697,19 @@ where
pub fn into_registry<EthApi>(
self,
config: RpcModuleConfig,
eth: DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, Events, EthApi>,
eth: DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, EthApi>,
payload_validator: Arc<dyn PayloadValidator<Block = Provider::Block>>,
) -> RpcRegistryInner<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
) -> RpcRegistryInner<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
where
EthApi: EthApiTypes + 'static,
{
let Self {
provider,
pool,
network,
executor,
events,
evm_config,
block_executor,
consensus,
} = self;
let Self { provider, pool, network, executor, evm_config, block_executor, consensus } =
self;
RpcRegistryInner::new(
provider,
pool,
network,
executor,
events,
consensus,
config,
evm_config,
@ -820,31 +724,23 @@ where
pub fn build<EthApi>(
self,
module_config: TransportRpcModuleConfig,
eth: DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, Events, EthApi>,
eth: DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, EthApi>,
payload_validator: Arc<dyn PayloadValidator<Block = Provider::Block>>,
) -> TransportRpcModules<()>
where
EthApi: FullEthApiServer<
Provider: BlockReader<
Receipt = <Events::Primitives as NodePrimitives>::Receipt,
Block = <Events::Primitives as NodePrimitives>::Block,
Header = <Events::Primitives as NodePrimitives>::BlockHeader,
>,
Receipt = <BlockExecutor::Primitives as NodePrimitives>::Receipt,
Block = <BlockExecutor::Primitives as NodePrimitives>::Block,
Header = <BlockExecutor::Primitives as NodePrimitives>::BlockHeader,
> + CanonStateSubscriptions<Primitives = BlockExecutor::Primitives>,
>,
Pool: TransactionPool<Transaction = <EthApi::Pool as TransactionPool>::Transaction>,
{
let mut modules = TransportRpcModules::default();
let Self {
provider,
pool,
network,
executor,
events,
evm_config,
block_executor,
consensus,
} = self;
let Self { provider, pool, network, executor, evm_config, block_executor, consensus } =
self;
if !module_config.is_empty() {
let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone();
@ -854,7 +750,6 @@ where
pool,
network,
executor,
events,
consensus,
config.unwrap_or_default(),
evm_config,
@ -873,9 +768,9 @@ where
}
}
impl Default for RpcModuleBuilder<(), (), (), (), (), (), (), ()> {
impl Default for RpcModuleBuilder<(), (), (), (), (), (), ()> {
fn default() -> Self {
Self::new((), (), (), (), (), (), (), ())
Self::new((), (), (), (), (), (), ())
}
}
@ -963,7 +858,6 @@ pub struct RpcRegistryInner<
Pool,
Network,
Tasks,
Events,
EthApi: EthApiTypes,
BlockExecutor,
Consensus,
@ -972,14 +866,13 @@ pub struct RpcRegistryInner<
pool: Pool,
network: Network,
executor: Tasks,
events: Events,
block_executor: BlockExecutor,
consensus: Consensus,
payload_validator: Arc<dyn PayloadValidator<Block = Provider::Block>>,
/// Holds the configuration for the RPC modules
config: RpcModuleConfig,
/// Holds a all `eth_` namespace handlers
eth: EthHandlers<Provider, Events, EthApi>,
eth: EthHandlers<Provider, EthApi>,
/// to put trace calls behind semaphore
blocking_pool_guard: BlockingTaskGuard,
/// Contains the [Methods] of a module
@ -988,19 +881,18 @@ pub struct RpcRegistryInner<
// === impl RpcRegistryInner ===
impl<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
impl<N, Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
where
N: NodePrimitives,
Provider: StateProviderFactory
+ BlockReader<
Block = <Events::Primitives as NodePrimitives>::Block,
Receipt = <Events::Primitives as NodePrimitives>::Receipt,
> + Clone
+ CanonStateSubscriptions<Primitives = N>
+ BlockReader<Block = N::Block, Receipt = N::Receipt>
+ Clone
+ Unpin
+ 'static,
Pool: Send + Sync + Clone + 'static,
Network: Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
EthApi: EthApiTypes + 'static,
BlockExecutor: BlockExecutorProvider,
@ -1012,19 +904,10 @@ where
pool: Pool,
network: Network,
executor: Tasks,
events: Events,
consensus: Consensus,
config: RpcModuleConfig,
evm_config: EvmConfig,
eth_api_builder: DynEthApiBuilder<
Provider,
Pool,
EvmConfig,
Network,
Tasks,
Events,
EthApi,
>,
eth_api_builder: DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, EthApi>,
block_executor: BlockExecutor,
payload_validator: Arc<dyn PayloadValidator<Block = Provider::Block>>,
) -> Self
@ -1040,7 +923,6 @@ where
evm_config,
config.eth,
executor.clone(),
events.clone(),
eth_api_builder,
);
@ -1054,15 +936,14 @@ where
config,
modules: Default::default(),
blocking_pool_guard,
events,
block_executor,
payload_validator,
}
}
}
impl<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
impl<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
where
Provider: BlockReader,
EthApi: EthApiTypes,
@ -1073,7 +954,7 @@ where
}
/// Returns a reference to the installed [`EthHandlers`].
pub const fn eth_handlers(&self) -> &EthHandlers<Provider, Events, EthApi> {
pub const fn eth_handlers(&self) -> &EthHandlers<Provider, EthApi> {
&self.eth
}
@ -1090,11 +971,6 @@ where
&self.pool
}
/// Returns a reference to the events type
pub const fn events(&self) -> &Events {
&self.events
}
/// Returns a reference to the tasks type
pub const fn tasks(&self) -> &Tasks {
&self.executor
@ -1120,8 +996,8 @@ where
}
}
impl<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
impl<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
where
Network: NetworkInfo + Clone + 'static,
EthApi: EthApiTypes,
@ -1159,8 +1035,8 @@ where
}
}
impl<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
impl<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
where
Provider: FullRpcProvider + AccountReader + ChangeSetReader,
Network: NetworkInfo + Peers + Clone + 'static,
@ -1267,8 +1143,8 @@ where
}
}
impl<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
impl<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
where
Provider: FullRpcProvider + AccountReader + ChangeSetReader,
Network: NetworkInfo + Peers + Clone + 'static,
@ -1354,22 +1230,22 @@ where
}
}
impl<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
impl<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, EthApi, BlockExecutor, Consensus>
where
Provider: FullRpcProvider<Block = <BlockExecutor::Primitives as NodePrimitives>::Block>
+ CanonStateSubscriptions<Primitives = BlockExecutor::Primitives>
+ AccountReader
+ ChangeSetReader,
Pool: TransactionPool + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions<Primitives = BlockExecutor::Primitives> + Clone + 'static,
EthApi: FullEthApiServer<
Provider: BlockReader<
Block = <BlockExecutor::Primitives as NodePrimitives>::Block,
Receipt = <BlockExecutor::Primitives as NodePrimitives>::Receipt,
Header = <BlockExecutor::Primitives as NodePrimitives>::BlockHeader,
>,
> + CanonStateSubscriptions<Primitives = BlockExecutor::Primitives>,
>,
BlockExecutor: BlockExecutorProvider,
Consensus: FullConsensus<BlockExecutor::Primitives, Error = ConsensusError> + Clone + 'static,

View File

@ -12,7 +12,7 @@ use reth_evm::execute::BasicBlockExecutorProvider;
use reth_evm_ethereum::{execute::EthExecutionStrategyFactory, EthEvmConfig};
use reth_network_api::noop::NoopNetwork;
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_provider::test_utils::{NoopProvider, TestCanonStateSubscriptions};
use reth_provider::test_utils::NoopProvider;
use reth_rpc::EthApi;
use reth_rpc_builder::{
auth::{AuthRpcModule, AuthServerConfig, AuthServerHandle},
@ -134,7 +134,6 @@ pub fn test_rpc_builder() -> RpcModuleBuilder<
TestPool,
NoopNetwork,
TokioTaskExecutor,
TestCanonStateSubscriptions,
EthEvmConfig,
BasicBlockExecutorProvider<EthExecutionStrategyFactory>,
NoopConsensus,
@ -144,7 +143,6 @@ pub fn test_rpc_builder() -> RpcModuleBuilder<
.with_pool(TestPoolBuilder::default().into())
.with_network(NoopNetwork::default())
.with_executor(TokioTaskExecutor::default())
.with_events(TestCanonStateSubscriptions::default())
.with_evm_config(EthEvmConfig::new(MAINNET.clone()))
.with_block_executor(
BasicBlockExecutorProvider::new(EthExecutionStrategyFactory::mainnet()),

View File

@ -13,7 +13,7 @@ use crate::{
/// Context for building the `eth` namespace API.
#[derive(Debug, Clone)]
pub struct EthApiBuilderCtx<Provider, Pool, EvmConfig, Network, Tasks, Events>
pub struct EthApiBuilderCtx<Provider, Pool, EvmConfig, Network, Tasks>
where
Provider: BlockReader,
{
@ -29,14 +29,12 @@ where
pub config: EthConfig,
/// Runtime handle.
pub executor: Tasks,
/// Events handle.
pub events: Events,
/// RPC cache handle.
pub cache: EthStateCache<Provider::Block, Provider::Receipt>,
}
impl<Provider, Pool, EvmConfig, Network, Tasks, Events>
EthApiBuilderCtx<Provider, Pool, EvmConfig, Network, Tasks, Events>
impl<Provider, Pool, EvmConfig, Network, Tasks>
EthApiBuilderCtx<Provider, Pool, EvmConfig, Network, Tasks>
where
Provider: BlockReaderIdExt + Clone,
{
@ -45,13 +43,14 @@ where
where
N: NodePrimitives,
Tasks: TaskSpawner,
Events: CanonStateSubscriptions<Primitives = N>,
Provider:
BlockReaderIdExt<Block = N::Block, Receipt = N::Receipt> + ChainSpecProvider + 'static,
Provider: BlockReaderIdExt<Block = N::Block, Receipt = N::Receipt>
+ CanonStateSubscriptions<Primitives = N>
+ ChainSpecProvider
+ 'static,
{
let fee_history_cache = FeeHistoryCache::new(self.config.fee_history_cache);
let new_canonical_blocks = self.events.canonical_state_stream();
let new_canonical_blocks = self.provider.canonical_state_stream();
let fhc = fee_history_cache.clone();
let provider = self.provider.clone();
let cache = self.cache.clone();

View File

@ -99,25 +99,24 @@ where
}
}
impl<Provider, Pool, EvmConfig, Network> EthApi<Provider, Pool, Network, EvmConfig>
impl<N, Provider, Pool, EvmConfig, Network> EthApi<Provider, Pool, Network, EvmConfig>
where
Provider: ChainSpecProvider + BlockReaderIdExt + Clone + 'static,
N: NodePrimitives,
Provider: ChainSpecProvider
+ BlockReaderIdExt<Block = N::Block, Receipt = N::Receipt>
+ CanonStateSubscriptions<Primitives = N>
+ Clone
+ 'static,
Pool: Clone,
EvmConfig: Clone,
Network: Clone,
{
/// Creates a new, shareable instance.
pub fn with_spawner<Tasks, Events>(
ctx: &EthApiBuilderCtx<Provider, Pool, EvmConfig, Network, Tasks, Events>,
pub fn with_spawner<Tasks>(
ctx: &EthApiBuilderCtx<Provider, Pool, EvmConfig, Network, Tasks>,
) -> Self
where
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions<
Primitives: NodePrimitives<
Block = ProviderBlock<Provider>,
Receipt = ProviderReceipt<Provider>,
>,
>,
{
let blocking_task_pool =
BlockingTaskPool::build().expect("failed to build blocking task pool");

View File

@ -32,40 +32,38 @@ use tracing::error;
///
/// This handles `eth_subscribe` RPC calls.
#[derive(Clone)]
pub struct EthPubSub<Eth, Events> {
pub struct EthPubSub<Eth> {
/// All nested fields bundled together.
inner: Arc<EthPubSubInner<Eth, Events>>,
inner: Arc<EthPubSubInner<Eth>>,
/// The type that's used to spawn subscription tasks.
subscription_task_spawner: Box<dyn TaskSpawner>,
}
// === impl EthPubSub ===
impl<Eth, Events> EthPubSub<Eth, Events> {
impl<Eth> EthPubSub<Eth> {
/// Creates a new, shareable instance.
///
/// Subscription tasks are spawned via [`tokio::task::spawn`]
pub fn new(eth_api: Eth, chain_events: Events) -> Self {
Self::with_spawner(eth_api, chain_events, Box::<TokioTaskExecutor>::default())
pub fn new(eth_api: Eth) -> Self {
Self::with_spawner(eth_api, Box::<TokioTaskExecutor>::default())
}
/// Creates a new, shareable instance.
pub fn with_spawner(
eth_api: Eth,
chain_events: Events,
subscription_task_spawner: Box<dyn TaskSpawner>,
) -> Self {
let inner = EthPubSubInner { eth_api, chain_events };
pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box<dyn TaskSpawner>) -> Self {
let inner = EthPubSubInner { eth_api };
Self { inner: Arc::new(inner), subscription_task_spawner }
}
}
#[async_trait::async_trait]
impl<Eth, Events> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth, Events>
impl<Eth> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth>
where
Events: CanonStateSubscriptions + 'static,
Eth: RpcNodeCore<Provider: BlockNumReader, Pool: TransactionPool, Network: NetworkInfo>
+ EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>
Eth: RpcNodeCore<
Provider: BlockNumReader + CanonStateSubscriptions,
Pool: TransactionPool,
Network: NetworkInfo,
> + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>
+ 'static,
{
/// Handler for `eth_subscribe`
@ -86,16 +84,18 @@ where
}
/// The actual handler for an accepted [`EthPubSub::subscribe`] call.
async fn handle_accepted<Eth, Events>(
pubsub: Arc<EthPubSubInner<Eth, Events>>,
async fn handle_accepted<Eth>(
pubsub: Arc<EthPubSubInner<Eth>>,
accepted_sink: SubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
) -> Result<(), ErrorObject<'static>>
where
Events: CanonStateSubscriptions + 'static,
Eth: RpcNodeCore<Provider: BlockNumReader, Pool: TransactionPool, Network: NetworkInfo>
+ EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>,
Eth: RpcNodeCore<
Provider: BlockNumReader + CanonStateSubscriptions,
Pool: TransactionPool,
Network: NetworkInfo,
> + EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>,
{
match kind {
SubscriptionKind::NewHeads => {
@ -152,7 +152,7 @@ where
SubscriptionKind::Syncing => {
// get new block subscription
let mut canon_state =
BroadcastStream::new(pubsub.chain_events.subscribe_to_canonical_state());
BroadcastStream::new(pubsub.eth_api.provider().subscribe_to_canonical_state());
// get current sync status
let mut initial_sync_status = pubsub.eth_api.network().is_syncing();
let current_sub_res = pubsub.sync_status(initial_sync_status);
@ -235,7 +235,7 @@ where
}
}
impl<Eth, Events> std::fmt::Debug for EthPubSub<Eth, Events> {
impl<Eth> std::fmt::Debug for EthPubSub<Eth> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthPubSub").finish_non_exhaustive()
}
@ -243,16 +243,14 @@ impl<Eth, Events> std::fmt::Debug for EthPubSub<Eth, Events> {
/// Container type `EthPubSub`
#[derive(Clone)]
struct EthPubSubInner<EthApi, Events> {
struct EthPubSubInner<EthApi> {
/// The `eth` API.
eth_api: EthApi,
/// A type that allows to create new event subscriptions.
chain_events: Events,
}
// == impl EthPubSubInner ===
impl<Eth, Events> EthPubSubInner<Eth, Events>
impl<Eth> EthPubSubInner<Eth>
where
Eth: RpcNodeCore<Provider: BlockNumReader>,
{
@ -277,7 +275,7 @@ where
}
}
impl<Eth, Events> EthPubSubInner<Eth, Events>
impl<Eth> EthPubSubInner<Eth>
where
Eth: RpcNodeCore<Pool: TransactionPool>,
{
@ -294,15 +292,13 @@ where
}
}
impl<Eth, Events> EthPubSubInner<Eth, Events>
impl<N: NodePrimitives, Eth> EthPubSubInner<Eth>
where
Events: CanonStateSubscriptions,
Eth: RpcNodeCore<Provider: CanonStateSubscriptions<Primitives = N>>,
{
/// Returns a stream that yields all new RPC blocks.
fn new_headers_stream(
&self,
) -> impl Stream<Item = Header<<Events::Primitives as NodePrimitives>::BlockHeader>> {
self.chain_events.canonical_state_stream().flat_map(|new_chain| {
fn new_headers_stream(&self) -> impl Stream<Item = Header<N::BlockHeader>> {
self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| {
let headers = new_chain.committed().headers().collect::<Vec<_>>();
futures::stream::iter(
headers.into_iter().map(|h| Header::from_consensus(h.into(), None, None)),
@ -312,7 +308,7 @@ where
/// Returns a stream that yields all logs that match the given filter.
fn log_stream(&self, filter: FilteredParams) -> impl Stream<Item = Log> {
BroadcastStream::new(self.chain_events.subscribe_to_canonical_state())
BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state())
.map(move |canon_state| {
canon_state.expect("new block subscription never ends").block_receipts()
})