diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 33e37c329..635aa026a 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -20,7 +20,6 @@ pub type EthApiBuilderCtx = reth_rpc_eth_types::EthApiBuilderCtx< ::Evm, ::Network, TaskExecutor, - ::Provider, >; /// A general purpose trait that launches a new node of any kind. diff --git a/crates/node/builder/src/rpc.rs b/crates/node/builder/src/rpc.rs index 1f6803bd4..e42193913 100644 --- a/crates/node/builder/src/rpc.rs +++ b/crates/node/builder/src/rpc.rs @@ -197,7 +197,6 @@ pub struct RpcRegistry { Node::Pool, Node::Network, TaskExecutor, - Node::Provider, EthApi, Node::Executor, Node::Consensus, @@ -214,7 +213,6 @@ where Node::Pool, Node::Network, TaskExecutor, - Node::Provider, EthApi, Node::Executor, Node::Consensus, @@ -453,7 +451,6 @@ where .with_provider(node.provider().clone()) .with_pool(node.pool().clone()) .with_network(node.network().clone()) - .with_events(node.provider().clone()) .with_executor(node.task_executor().clone()) .with_evm_config(node.evm_config().clone()) .with_block_executor(node.block_executor().clone()) diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index 10071c26e..62578affa 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -9,12 +9,12 @@ use reth_rpc_eth_types::{ use reth_tasks::TaskSpawner; /// Alias for `eth` namespace API builder. -pub type DynEthApiBuilder = - Box) -> EthApi>; +pub type DynEthApiBuilder = + Box) -> EthApi>; /// Handlers for core, filter and pubsub `eth` namespace APIs. #[derive(Debug, Clone)] -pub struct EthHandlers { +pub struct EthHandlers { /// Main `eth_` request handler pub api: EthApi, /// The async caching layer used by the eth handlers @@ -22,19 +22,18 @@ pub struct EthHandlers { /// Polling based filter handler available on all transports pub filter: EthFilter, /// Handler for subscriptions only available for transports that support it (ws, ipc) - pub pubsub: EthPubSub, + pub pubsub: EthPubSub, } -impl EthHandlers +impl EthHandlers where + N: NodePrimitives, Provider: StateProviderFactory - + BlockReader< - Block = ::Block, - Receipt = ::Receipt, - > + Clone + + BlockReader + + Clone + + CanonStateSubscriptions + Unpin + 'static, - Events: CanonStateSubscriptions + Clone + 'static, EthApi: EthApiTypes + 'static, { /// Returns a new instance with handlers for `eth` namespace. @@ -48,16 +47,7 @@ where evm_config: EvmConfig, config: EthConfig, executor: Tasks, - events: Events, - eth_api_builder: DynEthApiBuilder< - Provider, - Pool, - EvmConfig, - Network, - Tasks, - Events, - EthApi, - >, + eth_api_builder: DynEthApiBuilder, ) -> Self where EvmConfig: ConfigureEvm
, @@ -65,7 +55,7 @@ where { let cache = EthStateCache::spawn_with(provider.clone(), config.cache, executor.clone()); - let new_canonical_blocks = events.canonical_state_stream(); + let new_canonical_blocks = provider.canonical_state_stream(); let c = cache.clone(); executor.spawn_critical( "cache canonical blocks task", @@ -74,27 +64,14 @@ where }), ); - let ctx = EthApiBuilderCtx { - provider, - pool, - network, - evm_config, - config, - executor, - events, - cache, - }; + let ctx = EthApiBuilderCtx { provider, pool, network, evm_config, config, executor, cache }; let api = eth_api_builder(&ctx); let filter = EthFilter::new(api.clone(), ctx.config.filter_config(), Box::new(ctx.executor.clone())); - let pubsub = EthPubSub::with_spawner( - api.clone(), - ctx.events.clone(), - Box::new(ctx.executor.clone()), - ); + let pubsub = EthPubSub::with_spawner(api.clone(), Box::new(ctx.executor.clone())); Self { api, cache: ctx.cache, filter, pubsub } } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 354f4c766..3a90859c4 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -31,11 +31,10 @@ //! use reth_transaction_pool::{PoolTransaction, TransactionPool}; //! use std::sync::Arc; //! -//! pub async fn launch( +//! pub async fn launch( //! provider: Provider, //! pool: Pool, //! network: Network, -//! events: Events, //! evm_config: EthEvmConfig, //! block_executor: BlockExecutor, //! consensus: Consensus, @@ -47,7 +46,8 @@ //! Receipt = reth_primitives::Receipt, //! Header = reth_primitives::Header, //! > + AccountReader -//! + ChangeSetReader, +//! + ChangeSetReader +//! + CanonStateSubscriptions, //! Pool: TransactionPool< //! Transaction: PoolTransaction< //! Consensus = TransactionSigned, @@ -56,9 +56,7 @@ //! > + Unpin //! + 'static, //! Network: NetworkInfo + Peers + Clone + 'static, -//! Events: -//! CanonStateSubscriptions + Clone + 'static, -//! BlockExecutor: BlockExecutorProvider, +//! BlockExecutor: BlockExecutorProvider, //! Consensus: FullConsensus + Clone + 'static, //! Validator: PayloadValidator, //! { @@ -74,7 +72,6 @@ //! pool, //! network, //! TokioTaskExecutor::default(), -//! events, //! evm_config, //! block_executor, //! consensus, @@ -114,7 +111,6 @@ //! Provider, //! Pool, //! Network, -//! Events, //! EngineApi, //! EngineT, //! BlockExecutor, @@ -124,7 +120,6 @@ //! provider: Provider, //! pool: Pool, //! network: Network, -//! events: Events, //! engine_api: EngineApi, //! evm_config: EthEvmConfig, //! block_executor: BlockExecutor, @@ -137,7 +132,8 @@ //! Receipt = reth_primitives::Receipt, //! Header = reth_primitives::Header, //! > + AccountReader -//! + ChangeSetReader, +//! + ChangeSetReader +//! + CanonStateSubscriptions, //! Pool: TransactionPool< //! Transaction: PoolTransaction< //! Consensus = TransactionSigned, @@ -146,11 +142,9 @@ //! > + Unpin //! + 'static, //! Network: NetworkInfo + Peers + Clone + 'static, -//! Events: -//! CanonStateSubscriptions + Clone + 'static, //! EngineApi: EngineApiServer, //! EngineT: EngineTypes, -//! BlockExecutor: BlockExecutorProvider, +//! BlockExecutor: BlockExecutorProvider, //! Consensus: FullConsensus + Clone + 'static, //! Validator: PayloadValidator, //! { @@ -166,7 +160,6 @@ //! pool, //! network, //! TokioTaskExecutor::default(), -//! events, //! evm_config, //! block_executor, //! consensus, @@ -279,16 +272,15 @@ pub mod rate_limiter; /// Convenience function for starting a server in one step. #[allow(clippy::too_many_arguments)] -pub async fn launch( +pub async fn launch( provider: Provider, pool: Pool, network: Network, module_config: impl Into, server_config: impl Into, executor: Tasks, - events: Events, evm_config: EvmConfig, - eth: DynEthApiBuilder, + eth: DynEthApiBuilder, block_executor: BlockExecutor, consensus: Arc>, payload_validator: Arc>, @@ -298,12 +290,12 @@ where Block = ProviderBlock, Receipt = ProviderReceipt, Header = ProviderHeader, - > + AccountReader + > + CanonStateSubscriptions + + AccountReader + ChangeSetReader, Pool: TransactionPool::Transaction> + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, - Events: CanonStateSubscriptions + Clone + 'static, EvmConfig: ConfigureEvm< Header = ::BlockHeader, Transaction = ::SignedTx, @@ -313,7 +305,7 @@ where Block = ::Block, Receipt = ::Receipt, Header = ::BlockHeader, - >, + > + CanonStateSubscriptions, >, BlockExecutor: BlockExecutorProvider, { @@ -326,7 +318,6 @@ where pool, network, executor, - events, evm_config, block_executor, consensus, @@ -340,16 +331,7 @@ where /// /// This is the main entrypoint and the easiest way to configure an RPC server. #[derive(Debug, Clone)] -pub struct RpcModuleBuilder< - Provider, - Pool, - Network, - Tasks, - Events, - EvmConfig, - BlockExecutor, - Consensus, -> { +pub struct RpcModuleBuilder { /// The Provider type to when creating all rpc handlers provider: Provider, /// The Pool type to when creating all rpc handlers @@ -358,8 +340,6 @@ pub struct RpcModuleBuilder< network: Network, /// How additional tasks are spawned, for example in the eth pubsub namespace executor: Tasks, - /// Provides access to chain events, such as new blocks, required by pubsub. - events: Events, /// Defines how the EVM should be configured before execution. evm_config: EvmConfig, /// The provider for getting a block executor that executes blocks @@ -370,8 +350,8 @@ pub struct RpcModuleBuilder< // === impl RpcBuilder === -impl - RpcModuleBuilder +impl + RpcModuleBuilder { /// Create a new instance of the builder #[allow(clippy::too_many_arguments)] @@ -380,30 +360,27 @@ impl Self { - Self { provider, pool, network, executor, events, evm_config, block_executor, consensus } + Self { provider, pool, network, executor, evm_config, block_executor, consensus } } /// Configure the provider instance. pub fn with_provider

( self, provider: P, - ) -> RpcModuleBuilder + ) -> RpcModuleBuilder where P: BlockReader + StateProviderFactory + 'static, { - let Self { pool, network, executor, events, evm_config, block_executor, consensus, .. } = - self; + let Self { pool, network, executor, evm_config, block_executor, consensus, .. } = self; RpcModuleBuilder { provider, network, pool, executor, - events, evm_config, block_executor, consensus, @@ -414,19 +391,16 @@ impl( self, pool: P, - ) -> RpcModuleBuilder + ) -> RpcModuleBuilder where P: TransactionPool + 'static, { - let Self { - provider, network, executor, events, evm_config, block_executor, consensus, .. - } = self; + let Self { provider, network, executor, evm_config, block_executor, consensus, .. } = self; RpcModuleBuilder { provider, network, pool, executor, - events, evm_config, block_executor, consensus, @@ -445,18 +419,14 @@ impl { - let Self { - provider, executor, events, network, evm_config, block_executor, consensus, .. - } = self; + let Self { provider, executor, network, evm_config, block_executor, consensus, .. } = self; RpcModuleBuilder { provider, executor, - events, network, evm_config, block_executor, @@ -469,19 +439,16 @@ impl( self, network: N, - ) -> RpcModuleBuilder + ) -> RpcModuleBuilder where N: NetworkInfo + Peers + 'static, { - let Self { - provider, pool, executor, events, evm_config, block_executor, consensus, .. - } = self; + let Self { provider, pool, executor, evm_config, block_executor, consensus, .. } = self; RpcModuleBuilder { provider, network, pool, executor, - events, evm_config, block_executor, consensus, @@ -495,24 +462,13 @@ impl RpcModuleBuilder< - Provider, - Pool, - NoopNetwork, - Tasks, - Events, - EvmConfig, - BlockExecutor, - Consensus, - > { - let Self { - provider, pool, executor, events, evm_config, block_executor, consensus, .. - } = self; + ) -> RpcModuleBuilder + { + let Self { provider, pool, executor, evm_config, block_executor, consensus, .. } = self; RpcModuleBuilder { provider, pool, executor, - events, network: NoopNetwork::default(), evm_config, block_executor, @@ -524,18 +480,16 @@ impl( self, executor: T, - ) -> RpcModuleBuilder + ) -> RpcModuleBuilder where T: TaskSpawner + 'static, { - let Self { pool, network, provider, events, evm_config, block_executor, consensus, .. } = - self; + let Self { pool, network, provider, evm_config, block_executor, consensus, .. } = self; RpcModuleBuilder { provider, network, pool, executor, - events, evm_config, block_executor, consensus, @@ -553,18 +507,15 @@ impl { - let Self { pool, network, provider, events, evm_config, block_executor, consensus, .. } = - self; + let Self { pool, network, provider, evm_config, block_executor, consensus, .. } = self; RpcModuleBuilder { provider, network, pool, - events, executor: TokioTaskExecutor::default(), evm_config, block_executor, @@ -572,45 +523,20 @@ impl( - self, - events: E, - ) -> RpcModuleBuilder - where - E: CanonStateSubscriptions + 'static, - { - let Self { - provider, pool, executor, network, evm_config, block_executor, consensus, .. - } = self; - RpcModuleBuilder { - provider, - network, - pool, - executor, - events, - evm_config, - block_executor, - consensus, - } - } - /// Configure the evm configuration type pub fn with_evm_config( self, evm_config: E, - ) -> RpcModuleBuilder + ) -> RpcModuleBuilder where E: ConfigureEvm + 'static, { - let Self { provider, pool, executor, network, events, block_executor, consensus, .. } = - self; + let Self { provider, pool, executor, network, block_executor, consensus, .. } = self; RpcModuleBuilder { provider, network, pool, executor, - events, evm_config, block_executor, consensus, @@ -621,17 +547,16 @@ impl( self, block_executor: BE, - ) -> RpcModuleBuilder + ) -> RpcModuleBuilder where BE: BlockExecutorProvider, { - let Self { provider, network, pool, executor, events, evm_config, consensus, .. } = self; + let Self { provider, network, pool, executor, evm_config, consensus, .. } = self; RpcModuleBuilder { provider, network, pool, executor, - events, evm_config, block_executor, consensus, @@ -642,15 +567,13 @@ impl( self, consensus: C, - ) -> RpcModuleBuilder { - let Self { provider, network, pool, executor, events, evm_config, block_executor, .. } = - self; + ) -> RpcModuleBuilder { + let Self { provider, network, pool, executor, evm_config, block_executor, .. } = self; RpcModuleBuilder { provider, network, pool, executor, - events, evm_config, block_executor, consensus, @@ -658,19 +581,19 @@ impl - RpcModuleBuilder +impl + RpcModuleBuilder where Provider: FullRpcProvider< - Block = ::Block, - Receipt = ::Receipt, - Header = ::BlockHeader, - > + AccountReader + Block = ::Block, + Receipt = ::Receipt, + Header = ::BlockHeader, + > + CanonStateSubscriptions + + AccountReader + ChangeSetReader, Pool: TransactionPool + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, - Events: CanonStateSubscriptions + Clone + 'static, EvmConfig: ConfigureEvm< Header = ::BlockHeader, Transaction = ::SignedTx, @@ -689,34 +612,26 @@ where self, module_config: TransportRpcModuleConfig, engine: EngineApi, - eth: DynEthApiBuilder, + eth: DynEthApiBuilder, payload_validator: Arc>, ) -> ( TransportRpcModules, AuthRpcModule, - RpcRegistryInner, + RpcRegistryInner, ) where EngineT: EngineTypes, EngineApi: EngineApiServer, EthApi: FullEthApiServer< Provider: BlockReader< - Block = ::Block, - Receipt = ::Receipt, - Header = ::BlockHeader, - >, + Block = ::Block, + Receipt = ::Receipt, + Header = ::BlockHeader, + > + CanonStateSubscriptions, >, { - let Self { - provider, - pool, - network, - executor, - events, - evm_config, - block_executor, - consensus, - } = self; + let Self { provider, pool, network, executor, evm_config, block_executor, consensus } = + self; let config = module_config.config.clone().unwrap_or_default(); @@ -725,7 +640,6 @@ where pool, network, executor, - events, consensus, config, evm_config, @@ -772,7 +686,6 @@ where /// .with_pool(NoopTransactionPool::default()) /// .with_network(NoopNetwork::default()) /// .with_executor(TokioTaskExecutor::default()) - /// .with_events(TestCanonStateSubscriptions::default()) /// .with_evm_config(evm) /// .with_block_executor(EthExecutorProvider::mainnet()) /// .with_consensus(NoopConsensus::default()) @@ -784,28 +697,19 @@ where pub fn into_registry( self, config: RpcModuleConfig, - eth: DynEthApiBuilder, + eth: DynEthApiBuilder, payload_validator: Arc>, - ) -> RpcRegistryInner + ) -> RpcRegistryInner where EthApi: EthApiTypes + 'static, { - let Self { - provider, - pool, - network, - executor, - events, - evm_config, - block_executor, - consensus, - } = self; + let Self { provider, pool, network, executor, evm_config, block_executor, consensus } = + self; RpcRegistryInner::new( provider, pool, network, executor, - events, consensus, config, evm_config, @@ -820,31 +724,23 @@ where pub fn build( self, module_config: TransportRpcModuleConfig, - eth: DynEthApiBuilder, + eth: DynEthApiBuilder, payload_validator: Arc>, ) -> TransportRpcModules<()> where EthApi: FullEthApiServer< Provider: BlockReader< - Receipt = ::Receipt, - Block = ::Block, - Header = ::BlockHeader, - >, + Receipt = ::Receipt, + Block = ::Block, + Header = ::BlockHeader, + > + CanonStateSubscriptions, >, Pool: TransactionPool::Transaction>, { let mut modules = TransportRpcModules::default(); - let Self { - provider, - pool, - network, - executor, - events, - evm_config, - block_executor, - consensus, - } = self; + let Self { provider, pool, network, executor, evm_config, block_executor, consensus } = + self; if !module_config.is_empty() { let TransportRpcModuleConfig { http, ws, ipc, config } = module_config.clone(); @@ -854,7 +750,6 @@ where pool, network, executor, - events, consensus, config.unwrap_or_default(), evm_config, @@ -873,9 +768,9 @@ where } } -impl Default for RpcModuleBuilder<(), (), (), (), (), (), (), ()> { +impl Default for RpcModuleBuilder<(), (), (), (), (), (), ()> { fn default() -> Self { - Self::new((), (), (), (), (), (), (), ()) + Self::new((), (), (), (), (), (), ()) } } @@ -963,7 +858,6 @@ pub struct RpcRegistryInner< Pool, Network, Tasks, - Events, EthApi: EthApiTypes, BlockExecutor, Consensus, @@ -972,14 +866,13 @@ pub struct RpcRegistryInner< pool: Pool, network: Network, executor: Tasks, - events: Events, block_executor: BlockExecutor, consensus: Consensus, payload_validator: Arc>, /// Holds the configuration for the RPC modules config: RpcModuleConfig, /// Holds a all `eth_` namespace handlers - eth: EthHandlers, + eth: EthHandlers, /// to put trace calls behind semaphore blocking_pool_guard: BlockingTaskGuard, /// Contains the [Methods] of a module @@ -988,19 +881,18 @@ pub struct RpcRegistryInner< // === impl RpcRegistryInner === -impl - RpcRegistryInner +impl + RpcRegistryInner where + N: NodePrimitives, Provider: StateProviderFactory - + BlockReader< - Block = ::Block, - Receipt = ::Receipt, - > + Clone + + CanonStateSubscriptions + + BlockReader + + Clone + Unpin + 'static, Pool: Send + Sync + Clone + 'static, Network: Clone + 'static, - Events: CanonStateSubscriptions + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, EthApi: EthApiTypes + 'static, BlockExecutor: BlockExecutorProvider, @@ -1012,19 +904,10 @@ where pool: Pool, network: Network, executor: Tasks, - events: Events, consensus: Consensus, config: RpcModuleConfig, evm_config: EvmConfig, - eth_api_builder: DynEthApiBuilder< - Provider, - Pool, - EvmConfig, - Network, - Tasks, - Events, - EthApi, - >, + eth_api_builder: DynEthApiBuilder, block_executor: BlockExecutor, payload_validator: Arc>, ) -> Self @@ -1040,7 +923,6 @@ where evm_config, config.eth, executor.clone(), - events.clone(), eth_api_builder, ); @@ -1054,15 +936,14 @@ where config, modules: Default::default(), blocking_pool_guard, - events, block_executor, payload_validator, } } } -impl - RpcRegistryInner +impl + RpcRegistryInner where Provider: BlockReader, EthApi: EthApiTypes, @@ -1073,7 +954,7 @@ where } /// Returns a reference to the installed [`EthHandlers`]. - pub const fn eth_handlers(&self) -> &EthHandlers { + pub const fn eth_handlers(&self) -> &EthHandlers { &self.eth } @@ -1090,11 +971,6 @@ where &self.pool } - /// Returns a reference to the events type - pub const fn events(&self) -> &Events { - &self.events - } - /// Returns a reference to the tasks type pub const fn tasks(&self) -> &Tasks { &self.executor @@ -1120,8 +996,8 @@ where } } -impl - RpcRegistryInner +impl + RpcRegistryInner where Network: NetworkInfo + Clone + 'static, EthApi: EthApiTypes, @@ -1159,8 +1035,8 @@ where } } -impl - RpcRegistryInner +impl + RpcRegistryInner where Provider: FullRpcProvider + AccountReader + ChangeSetReader, Network: NetworkInfo + Peers + Clone + 'static, @@ -1267,8 +1143,8 @@ where } } -impl - RpcRegistryInner +impl + RpcRegistryInner where Provider: FullRpcProvider + AccountReader + ChangeSetReader, Network: NetworkInfo + Peers + Clone + 'static, @@ -1354,22 +1230,22 @@ where } } -impl - RpcRegistryInner +impl + RpcRegistryInner where Provider: FullRpcProvider::Block> + + CanonStateSubscriptions + AccountReader + ChangeSetReader, Pool: TransactionPool + 'static, Network: NetworkInfo + Peers + Clone + 'static, Tasks: TaskSpawner + Clone + 'static, - Events: CanonStateSubscriptions + Clone + 'static, EthApi: FullEthApiServer< Provider: BlockReader< Block = ::Block, Receipt = ::Receipt, Header = ::BlockHeader, - >, + > + CanonStateSubscriptions, >, BlockExecutor: BlockExecutorProvider, Consensus: FullConsensus + Clone + 'static, diff --git a/crates/rpc/rpc-builder/tests/it/utils.rs b/crates/rpc/rpc-builder/tests/it/utils.rs index dab771503..21251628e 100644 --- a/crates/rpc/rpc-builder/tests/it/utils.rs +++ b/crates/rpc/rpc-builder/tests/it/utils.rs @@ -12,7 +12,7 @@ use reth_evm::execute::BasicBlockExecutorProvider; use reth_evm_ethereum::{execute::EthExecutionStrategyFactory, EthEvmConfig}; use reth_network_api::noop::NoopNetwork; use reth_payload_builder::test_utils::spawn_test_payload_service; -use reth_provider::test_utils::{NoopProvider, TestCanonStateSubscriptions}; +use reth_provider::test_utils::NoopProvider; use reth_rpc::EthApi; use reth_rpc_builder::{ auth::{AuthRpcModule, AuthServerConfig, AuthServerHandle}, @@ -134,7 +134,6 @@ pub fn test_rpc_builder() -> RpcModuleBuilder< TestPool, NoopNetwork, TokioTaskExecutor, - TestCanonStateSubscriptions, EthEvmConfig, BasicBlockExecutorProvider, NoopConsensus, @@ -144,7 +143,6 @@ pub fn test_rpc_builder() -> RpcModuleBuilder< .with_pool(TestPoolBuilder::default().into()) .with_network(NoopNetwork::default()) .with_executor(TokioTaskExecutor::default()) - .with_events(TestCanonStateSubscriptions::default()) .with_evm_config(EthEvmConfig::new(MAINNET.clone())) .with_block_executor( BasicBlockExecutorProvider::new(EthExecutionStrategyFactory::mainnet()), diff --git a/crates/rpc/rpc-eth-types/src/builder/ctx.rs b/crates/rpc/rpc-eth-types/src/builder/ctx.rs index f9710882f..9ba771aed 100644 --- a/crates/rpc/rpc-eth-types/src/builder/ctx.rs +++ b/crates/rpc/rpc-eth-types/src/builder/ctx.rs @@ -13,7 +13,7 @@ use crate::{ /// Context for building the `eth` namespace API. #[derive(Debug, Clone)] -pub struct EthApiBuilderCtx +pub struct EthApiBuilderCtx where Provider: BlockReader, { @@ -29,14 +29,12 @@ where pub config: EthConfig, /// Runtime handle. pub executor: Tasks, - /// Events handle. - pub events: Events, /// RPC cache handle. pub cache: EthStateCache, } -impl - EthApiBuilderCtx +impl + EthApiBuilderCtx where Provider: BlockReaderIdExt + Clone, { @@ -45,13 +43,14 @@ where where N: NodePrimitives, Tasks: TaskSpawner, - Events: CanonStateSubscriptions, - Provider: - BlockReaderIdExt + ChainSpecProvider + 'static, + Provider: BlockReaderIdExt + + CanonStateSubscriptions + + ChainSpecProvider + + 'static, { let fee_history_cache = FeeHistoryCache::new(self.config.fee_history_cache); - let new_canonical_blocks = self.events.canonical_state_stream(); + let new_canonical_blocks = self.provider.canonical_state_stream(); let fhc = fee_history_cache.clone(); let provider = self.provider.clone(); let cache = self.cache.clone(); diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index cf5753a34..58c06a2df 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -99,25 +99,24 @@ where } } -impl EthApi +impl EthApi where - Provider: ChainSpecProvider + BlockReaderIdExt + Clone + 'static, + N: NodePrimitives, + Provider: ChainSpecProvider + + BlockReaderIdExt + + CanonStateSubscriptions + + Clone + + 'static, Pool: Clone, EvmConfig: Clone, Network: Clone, { /// Creates a new, shareable instance. - pub fn with_spawner( - ctx: &EthApiBuilderCtx, + pub fn with_spawner( + ctx: &EthApiBuilderCtx, ) -> Self where Tasks: TaskSpawner + Clone + 'static, - Events: CanonStateSubscriptions< - Primitives: NodePrimitives< - Block = ProviderBlock, - Receipt = ProviderReceipt, - >, - >, { let blocking_task_pool = BlockingTaskPool::build().expect("failed to build blocking task pool"); diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index c38028a33..da39092b8 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -32,40 +32,38 @@ use tracing::error; /// /// This handles `eth_subscribe` RPC calls. #[derive(Clone)] -pub struct EthPubSub { +pub struct EthPubSub { /// All nested fields bundled together. - inner: Arc>, + inner: Arc>, /// The type that's used to spawn subscription tasks. subscription_task_spawner: Box, } // === impl EthPubSub === -impl EthPubSub { +impl EthPubSub { /// Creates a new, shareable instance. /// /// Subscription tasks are spawned via [`tokio::task::spawn`] - pub fn new(eth_api: Eth, chain_events: Events) -> Self { - Self::with_spawner(eth_api, chain_events, Box::::default()) + pub fn new(eth_api: Eth) -> Self { + Self::with_spawner(eth_api, Box::::default()) } /// Creates a new, shareable instance. - pub fn with_spawner( - eth_api: Eth, - chain_events: Events, - subscription_task_spawner: Box, - ) -> Self { - let inner = EthPubSubInner { eth_api, chain_events }; + pub fn with_spawner(eth_api: Eth, subscription_task_spawner: Box) -> Self { + let inner = EthPubSubInner { eth_api }; Self { inner: Arc::new(inner), subscription_task_spawner } } } #[async_trait::async_trait] -impl EthPubSubApiServer> for EthPubSub +impl EthPubSubApiServer> for EthPubSub where - Events: CanonStateSubscriptions + 'static, - Eth: RpcNodeCore - + EthApiTypes>> + Eth: RpcNodeCore< + Provider: BlockNumReader + CanonStateSubscriptions, + Pool: TransactionPool, + Network: NetworkInfo, + > + EthApiTypes>> + 'static, { /// Handler for `eth_subscribe` @@ -86,16 +84,18 @@ where } /// The actual handler for an accepted [`EthPubSub::subscribe`] call. -async fn handle_accepted( - pubsub: Arc>, +async fn handle_accepted( + pubsub: Arc>, accepted_sink: SubscriptionSink, kind: SubscriptionKind, params: Option, ) -> Result<(), ErrorObject<'static>> where - Events: CanonStateSubscriptions + 'static, - Eth: RpcNodeCore - + EthApiTypes>>, + Eth: RpcNodeCore< + Provider: BlockNumReader + CanonStateSubscriptions, + Pool: TransactionPool, + Network: NetworkInfo, + > + EthApiTypes>>, { match kind { SubscriptionKind::NewHeads => { @@ -152,7 +152,7 @@ where SubscriptionKind::Syncing => { // get new block subscription let mut canon_state = - BroadcastStream::new(pubsub.chain_events.subscribe_to_canonical_state()); + BroadcastStream::new(pubsub.eth_api.provider().subscribe_to_canonical_state()); // get current sync status let mut initial_sync_status = pubsub.eth_api.network().is_syncing(); let current_sub_res = pubsub.sync_status(initial_sync_status); @@ -235,7 +235,7 @@ where } } -impl std::fmt::Debug for EthPubSub { +impl std::fmt::Debug for EthPubSub { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("EthPubSub").finish_non_exhaustive() } @@ -243,16 +243,14 @@ impl std::fmt::Debug for EthPubSub { /// Container type `EthPubSub` #[derive(Clone)] -struct EthPubSubInner { +struct EthPubSubInner { /// The `eth` API. eth_api: EthApi, - /// A type that allows to create new event subscriptions. - chain_events: Events, } // == impl EthPubSubInner === -impl EthPubSubInner +impl EthPubSubInner where Eth: RpcNodeCore, { @@ -277,7 +275,7 @@ where } } -impl EthPubSubInner +impl EthPubSubInner where Eth: RpcNodeCore, { @@ -294,15 +292,13 @@ where } } -impl EthPubSubInner +impl EthPubSubInner where - Events: CanonStateSubscriptions, + Eth: RpcNodeCore>, { /// Returns a stream that yields all new RPC blocks. - fn new_headers_stream( - &self, - ) -> impl Stream::BlockHeader>> { - self.chain_events.canonical_state_stream().flat_map(|new_chain| { + fn new_headers_stream(&self) -> impl Stream> { + self.eth_api.provider().canonical_state_stream().flat_map(|new_chain| { let headers = new_chain.committed().headers().collect::>(); futures::stream::iter( headers.into_iter().map(|h| Header::from_consensus(h.into(), None, None)), @@ -312,7 +308,7 @@ where /// Returns a stream that yields all logs that match the given filter. fn log_stream(&self, filter: FilteredParams) -> impl Stream { - BroadcastStream::new(self.chain_events.subscribe_to_canonical_state()) + BroadcastStream::new(self.eth_api.provider().subscribe_to_canonical_state()) .map(move |canon_state| { canon_state.expect("new block subscription never ends").block_receipts() }) diff --git a/examples/rpc-db/src/main.rs b/examples/rpc-db/src/main.rs index 19c108e5e..1bf98c8c0 100644 --- a/examples/rpc-db/src/main.rs +++ b/examples/rpc-db/src/main.rs @@ -37,7 +37,7 @@ use reth::tasks::TokioTaskExecutor; use reth_node_ethereum::{ node::EthereumEngineValidator, EthEvmConfig, EthExecutorProvider, EthereumNode, }; -use reth_provider::{test_utils::TestCanonStateSubscriptions, ChainSpecProvider}; +use reth_provider::ChainSpecProvider; // Custom rpc extension pub mod myrpc_ext; @@ -70,7 +70,6 @@ async fn main() -> eyre::Result<()> { .with_noop_network() .with_executor(TokioTaskExecutor::default()) .with_evm_config(EthEvmConfig::new(spec.clone())) - .with_events(TestCanonStateSubscriptions::default()) .with_block_executor(EthExecutorProvider::ethereum(provider.chain_spec())) .with_consensus(EthBeaconConsensus::new(spec.clone()));