feat(rpc): integrate eth cache and task executor in pipeline (#1596)

This commit is contained in:
Matthias Seitz
2023-03-01 23:46:57 +01:00
committed by GitHub
parent ad0ce8cf1a
commit 626c076c85
14 changed files with 239 additions and 76 deletions

1
Cargo.lock generated
View File

@ -5032,6 +5032,7 @@ dependencies = [
"reth-rpc-api",
"reth-rpc-engine-api",
"reth-rpc-types",
"reth-tasks",
"reth-tracing",
"reth-transaction-pool",
"serde",

View File

@ -11,6 +11,7 @@ use reth_rpc_builder::{
RpcServerHandle, ServerBuilder, TransportRpcModuleConfig,
};
use reth_rpc_engine_api::EngineApiHandle;
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
@ -103,11 +104,12 @@ impl RpcServerArgs {
}
/// Convenience function for starting a rpc server with configs which extracted from cli args.
pub(crate) async fn start_rpc_server<Client, Pool, Network>(
pub(crate) async fn start_rpc_server<Client, Pool, Network, Tasks>(
&self,
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
) -> Result<RpcServerHandle, RpcError>
where
Client: BlockProvider
@ -115,9 +117,11 @@ impl RpcServerArgs {
+ StateProviderFactory
+ EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
reth_rpc_builder::launch(
client,
@ -125,16 +129,18 @@ impl RpcServerArgs {
network,
self.transport_rpc_module_config(),
self.rpc_server_config(),
executor,
)
.await
}
/// Create Engine API server.
pub(crate) async fn start_auth_server<Client, Pool, Network>(
pub(crate) async fn start_auth_server<Client, Pool, Network, Tasks>(
&self,
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
handle: EngineApiHandle,
) -> Result<ServerHandle, RpcError>
where
@ -143,16 +149,27 @@ impl RpcServerArgs {
+ StateProviderFactory
+ EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
let socket_address = SocketAddr::new(
self.auth_addr.unwrap_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED)),
self.auth_port.unwrap_or(constants::DEFAULT_AUTH_PORT),
);
let secret = self.jwt_secret().map_err(|err| RpcError::Custom(err.to_string()))?;
reth_rpc_builder::auth::launch(client, pool, network, handle, socket_address, secret).await
reth_rpc_builder::auth::launch(
client,
pool,
network,
executor,
handle,
socket_address,
secret,
)
.await
}
/// Creates the [TransportRpcModuleConfig] from cli args.

View File

@ -160,7 +160,12 @@ impl Command {
let _rpc_server = self
.rpc
.start_rpc_server(shareable_db.clone(), test_transaction_pool.clone(), network.clone())
.start_rpc_server(
shareable_db.clone(),
test_transaction_pool.clone(),
network.clone(),
ctx.task_executor.clone(),
)
.await?;
info!(target: "reth::cli", "Started RPC server");
@ -174,6 +179,7 @@ impl Command {
shareable_db,
test_transaction_pool,
network.clone(),
ctx.task_executor.clone(),
engine_api_handle,
)
.await?;

View File

@ -15,6 +15,7 @@ reth-rpc = { path = "../rpc" }
reth-rpc-api = { path = "../rpc-api" }
reth-rpc-engine-api = { path = "../rpc-engine-api" }
reth-rpc-types = { path = "../rpc-types" }
reth-tasks = { path = "../../tasks" }
reth-transaction-pool = { path = "../../transaction-pool" }
jsonrpsee = { version = "0.16", features = ["server"] }

View File

@ -14,11 +14,12 @@ pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint};
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{BlockProvider, EvmEnvProvider, HeaderProvider, StateProviderFactory};
use reth_rpc::{
AdminApi, AuthLayer, DebugApi, EngineApi, EthApi, JwtAuthValidator, JwtSecret, NetApi,
TraceApi, Web3Api,
eth::cache::EthStateCache, AdminApi, AuthLayer, DebugApi, EngineApi, EthApi, JwtAuthValidator,
JwtSecret, NetApi, TraceApi, Web3Api,
};
use reth_rpc_api::servers::*;
use reth_rpc_engine_api::EngineApiHandle;
use reth_tasks::TaskSpawner;
use reth_transaction_pool::TransactionPool;
use serde::{Deserialize, Serialize, Serializer};
use std::{
@ -31,22 +32,32 @@ use strum::{AsRefStr, EnumString, EnumVariantNames, ParseError, VariantNames};
use tower::layer::util::{Identity, Stack};
use tower_http::cors::{AllowOrigin, Any, CorsLayer};
/// Configure and launch an auth server with `engine` and `eth` namespaces.
pub async fn launch<Client, Pool, Network>(
/// Configure and launch an auth server with `engine` and a _new_ `eth` namespace.
pub async fn launch<Client, Pool, Network, Tasks>(
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
handle: EngineApiHandle,
socket_addr: SocketAddr,
secret: JwtSecret,
) -> Result<ServerHandle, RpcError>
where
Client:
BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + 'static,
Client: BlockProvider
+ HeaderProvider
+ StateProviderFactory
+ EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
launch_with_eth_api(EthApi::new(client, pool, network), handle, socket_addr, secret).await
// spawn a new cache task
let eth_cache = EthStateCache::spawn_with(client.clone(), Default::default(), executor);
launch_with_eth_api(EthApi::new(client, pool, network, eth_cache), handle, socket_addr, secret)
.await
}
/// Configure and launch an auth server with existing EthApi implementation.
@ -57,8 +68,13 @@ pub async fn launch_with_eth_api<Client, Pool, Network>(
secret: JwtSecret,
) -> Result<ServerHandle, RpcError>
where
Client:
BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + 'static,
Client: BlockProvider
+ HeaderProvider
+ StateProviderFactory
+ EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
{
@ -71,7 +87,7 @@ where
let middleware =
tower::ServiceBuilder::new().layer(AuthLayer::new(JwtAuthValidator::new(secret)));
// By default both http and ws are enabled.
// By default, both http and ws are enabled.
let server = ServerBuilder::new().set_middleware(middleware).build(socket_addr).await?;
server.start(module)

View File

@ -28,10 +28,11 @@
//! use reth_network_api::{NetworkInfo, Peers};
//! use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory, EvmEnvProvider};
//! use reth_rpc_builder::{RethRpcModule, RpcModuleBuilder, RpcServerConfig, ServerBuilder, TransportRpcModuleConfig};
//! use reth_tasks::TokioTaskExecutor;
//! use reth_transaction_pool::TransactionPool;
//! pub async fn launch<Client, Pool, Network>(client: Client, pool: Pool, network: Network)
//! where
//! Client: BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + 'static,
//! Client: BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + Unpin + 'static,
//! Pool: TransactionPool + Clone + 'static,
//! Network: NetworkInfo + Peers + Clone + 'static,
//! {
@ -42,7 +43,7 @@
//! RethRpcModule::Eth,
//! RethRpcModule::Web3,
//! ]);
//! let transport_modules = RpcModuleBuilder::new(client, pool, network).build(transports);
//! let transport_modules = RpcModuleBuilder::new(client, pool, network, TokioTaskExecutor::default()).build(transports);
//! let handle = RpcServerConfig::default()
//! .with_http(ServerBuilder::default())
//! .start(transport_modules)
@ -86,27 +87,36 @@ pub mod auth;
/// Common RPC constants.
pub mod constants;
use constants::*;
use reth_rpc::eth::cache::EthStateCache;
use reth_tasks::TaskSpawner;
/// Cors utilities.
mod cors;
/// Convenience function for starting a server in one step.
pub async fn launch<Client, Pool, Network>(
pub async fn launch<Client, Pool, Network, Tasks>(
client: Client,
pool: Pool,
network: Network,
module_config: impl Into<TransportRpcModuleConfig>,
server_config: impl Into<RpcServerConfig>,
executor: Tasks,
) -> Result<RpcServerHandle, RpcError>
where
Client:
BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + 'static,
Client: BlockProvider
+ HeaderProvider
+ StateProviderFactory
+ EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
let module_config = module_config.into();
let server_config = server_config.into();
RpcModuleBuilder::new(client, pool, network)
RpcModuleBuilder::new(client, pool, network, executor)
.build(module_config)
.start_server(server_config)
.await
@ -116,57 +126,74 @@ where
///
/// This is the main entrypoint for up RPC servers.
#[derive(Debug)]
pub struct RpcModuleBuilder<Client, Pool, Network> {
pub struct RpcModuleBuilder<Client, Pool, Network, Tasks> {
/// The Client type to when creating all rpc handlers
client: Client,
/// The Pool type to when creating all rpc handlers
pool: Pool,
/// The Network type to when creating all rpc handlers
network: Network,
/// How additional tasks are spawned, for example in the eth pubsub namespace
executor: Tasks,
}
// === impl RpcBuilder ===
impl<Client, Pool, Network> RpcModuleBuilder<Client, Pool, Network> {
impl<Client, Pool, Network, Tasks> RpcModuleBuilder<Client, Pool, Network, Tasks> {
/// Create a new instance of the builder
pub fn new(client: Client, pool: Pool, network: Network) -> Self {
Self { client, pool, network }
pub fn new(client: Client, pool: Pool, network: Network, executor: Tasks) -> Self {
Self { client, pool, network, executor }
}
/// Configure the client instance.
pub fn with_client<C>(self, client: C) -> RpcModuleBuilder<C, Pool, Network>
pub fn with_client<C>(self, client: C) -> RpcModuleBuilder<C, Pool, Network, Tasks>
where
C: BlockProvider + StateProviderFactory + EvmEnvProvider + 'static,
{
let Self { pool, network, .. } = self;
RpcModuleBuilder { client, network, pool }
let Self { pool, network, executor, .. } = self;
RpcModuleBuilder { client, network, pool, executor }
}
/// Configure the transaction pool instance.
pub fn with_pool<P>(self, pool: P) -> RpcModuleBuilder<Client, P, Network>
pub fn with_pool<P>(self, pool: P) -> RpcModuleBuilder<Client, P, Network, Tasks>
where
P: TransactionPool + 'static,
{
let Self { client, network, .. } = self;
RpcModuleBuilder { client, network, pool }
let Self { client, network, executor, .. } = self;
RpcModuleBuilder { client, network, pool, executor }
}
/// Configure the network instance.
pub fn with_network<N>(self, network: N) -> RpcModuleBuilder<Client, Pool, N>
pub fn with_network<N>(self, network: N) -> RpcModuleBuilder<Client, Pool, N, Tasks>
where
N: NetworkInfo + Peers + 'static,
{
let Self { client, pool, .. } = self;
RpcModuleBuilder { client, network, pool }
let Self { client, pool, executor, .. } = self;
RpcModuleBuilder { client, network, pool, executor }
}
/// Configure the task executor to use for additional tasks.
pub fn with_executor<T>(self, executor: T) -> RpcModuleBuilder<Client, Pool, Network, T>
where
T: TaskSpawner + 'static,
{
let Self { pool, network, client, .. } = self;
RpcModuleBuilder { client, network, pool, executor }
}
}
impl<Client, Pool, Network> RpcModuleBuilder<Client, Pool, Network>
impl<Client, Pool, Network, Tasks> RpcModuleBuilder<Client, Pool, Network, Tasks>
where
Client:
BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + 'static,
Client: BlockProvider
+ HeaderProvider
+ StateProviderFactory
+ EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
/// Configures all [RpcModule]s specific to the given [TransportRpcModuleConfig] which can be
/// used to start the transport server(s).
@ -175,9 +202,9 @@ where
pub fn build(self, module_config: TransportRpcModuleConfig) -> TransportRpcModules<()> {
let mut modules = TransportRpcModules::default();
let Self { client, pool, network } = self;
let Self { client, pool, network, executor } = self;
let mut registry = RethModuleRegistry::new(client, pool, network);
let mut registry = RethModuleRegistry::new(client, pool, network, executor);
if !module_config.is_empty() {
let TransportRpcModuleConfig { http, ws, ipc } = module_config;
@ -190,9 +217,9 @@ where
}
}
impl Default for RpcModuleBuilder<(), (), ()> {
impl Default for RpcModuleBuilder<(), (), (), ()> {
fn default() -> Self {
RpcModuleBuilder::new((), (), ())
RpcModuleBuilder::new((), (), (), ())
}
}
@ -258,11 +285,12 @@ impl RpcModuleSelection {
/// Note: This will always create new instance of the module handlers and is therefor only
/// recommended for launching standalone transports. If multiple transports need to be
/// configured it's recommended to use the [RpcModuleBuilder].
pub fn standalone_module<Client, Pool, Network>(
pub fn standalone_module<Client, Pool, Network, Tasks>(
&self,
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
) -> RpcModule<()>
where
Client: BlockProvider
@ -270,11 +298,13 @@ impl RpcModuleSelection {
+ StateProviderFactory
+ EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
let mut registry = RethModuleRegistry::new(client, pool, network);
let mut registry = RethModuleRegistry::new(client, pool, network, executor);
registry.module_for(self)
}
@ -354,10 +384,13 @@ impl Serialize for RethRpcModule {
}
/// A Helper type the holds instances of the configured modules.
pub struct RethModuleRegistry<Client, Pool, Network> {
pub struct RethModuleRegistry<Client, Pool, Network, Tasks> {
client: Client,
pool: Pool,
network: Network,
executor: Tasks,
/// Holds a clone of the async [EthStateCache] channel.
eth_cache: Option<EthStateCache>,
/// Holds a clone of the actual [EthApi] namespace impl since this can be required by other
/// namespaces
eth_api: Option<EthApi<Client, Pool, Network>>,
@ -367,10 +400,18 @@ pub struct RethModuleRegistry<Client, Pool, Network> {
// === impl RethModuleRegistry ===
impl<Client, Pool, Network> RethModuleRegistry<Client, Pool, Network> {
impl<Client, Pool, Network, Tasks> RethModuleRegistry<Client, Pool, Network, Tasks> {
/// Creates a new, empty instance.
pub fn new(client: Client, pool: Pool, network: Network) -> Self {
Self { client, pool, network, eth_api: None, modules: Default::default() }
pub fn new(client: Client, pool: Pool, network: Network, executor: Tasks) -> Self {
Self {
client,
pool,
network,
eth_api: None,
executor,
modules: Default::default(),
eth_cache: None,
}
}
/// Returns all installed methods
@ -388,7 +429,7 @@ impl<Client, Pool, Network> RethModuleRegistry<Client, Pool, Network> {
}
}
impl<Client, Pool, Network> RethModuleRegistry<Client, Pool, Network>
impl<Client, Pool, Network, Tasks> RethModuleRegistry<Client, Pool, Network, Tasks>
where
Network: NetworkInfo + Peers + Clone + 'static,
{
@ -407,12 +448,18 @@ where
}
}
impl<Client, Pool, Network> RethModuleRegistry<Client, Pool, Network>
impl<Client, Pool, Network, Tasks> RethModuleRegistry<Client, Pool, Network, Tasks>
where
Client:
BlockProvider + HeaderProvider + StateProviderFactory + EvmEnvProvider + Clone + 'static,
Client: BlockProvider
+ HeaderProvider
+ StateProviderFactory
+ EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
/// Register Eth Namespace
pub fn register_eth(&mut self) -> &mut Self {
@ -486,11 +533,28 @@ where
.collect::<Vec<_>>()
}
/// Returns the [EthStateCache] frontend
///
/// This will spawn exactly one [EthStateCache] service if this is the first time the cache is
/// requested.
pub fn eth_cache(&mut self) -> EthStateCache {
self.eth_cache
.get_or_insert_with(|| {
EthStateCache::spawn_with(
self.client.clone(),
Default::default(),
self.executor.clone(),
)
})
.clone()
}
/// Returns the configured [EthApi] or creates it if it does not exist yet
fn eth_api(&mut self) -> EthApi<Client, Pool, Network> {
let cache = self.eth_cache();
self.eth_api
.get_or_insert_with(|| {
EthApi::new(self.client.clone(), self.pool.clone(), self.network.clone())
EthApi::new(self.client.clone(), self.pool.clone(), self.network.clone(), cache)
})
.clone()
}

View File

@ -4,6 +4,7 @@ use reth_rpc_builder::{
RpcModuleBuilder, RpcModuleSelection, RpcServerConfig, RpcServerHandle,
TransportRpcModuleConfig,
};
use reth_tasks::TokioTaskExecutor;
use reth_transaction_pool::test_utils::{testing_pool, TestPool};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
@ -50,9 +51,11 @@ pub async fn launch_http_ws(modules: impl Into<RpcModuleSelection>) -> RpcServer
}
/// Returns an [RpcModuleBuilder] with testing components.
pub fn test_rpc_builder() -> RpcModuleBuilder<NoopProvider, TestPool, NoopNetwork> {
pub fn test_rpc_builder() -> RpcModuleBuilder<NoopProvider, TestPool, NoopNetwork, TokioTaskExecutor>
{
RpcModuleBuilder::default()
.with_client(NoopProvider::default())
.with_pool(testing_pool())
.with_network(NoopNetwork::default())
.with_executor(TokioTaskExecutor::default())
}

View File

@ -13,7 +13,7 @@ use reth_primitives::{
use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory};
use std::num::NonZeroUsize;
use crate::eth::error::EthResult;
use crate::eth::{cache::EthStateCache, error::EthResult};
use reth_provider::providers::ChainState;
use reth_rpc_types::FeeHistoryCache;
use reth_transaction_pool::TransactionPool;
@ -67,8 +67,8 @@ pub struct EthApi<Client, Pool, Network> {
impl<Client, Pool, Network> EthApi<Client, Pool, Network> {
/// Creates a new, shareable instance.
pub fn new(client: Client, pool: Pool, network: Network) -> Self {
let inner = EthApiInner { client, pool, network, signers: Default::default() };
pub fn new(client: Client, pool: Pool, network: Network, eth_cache: EthStateCache) -> Self {
let inner = EthApiInner { client, pool, network, signers: Default::default(), eth_cache };
Self {
inner: Arc::new(inner),
fee_history_cache: FeeHistoryCache::new(
@ -216,4 +216,6 @@ struct EthApiInner<Client, Pool, Network> {
network: Network,
/// All configured Signers
signers: Vec<Box<dyn EthSigner>>,
/// The async cache frontend for eth related data
eth_cache: EthStateCache,
}

View File

@ -378,6 +378,7 @@ where
#[cfg(test)]
mod tests {
use crate::eth::cache::EthStateCache;
use jsonrpsee::{
core::{error::Error as RpcError, RpcResult},
types::error::{CallError, INVALID_PARAMS_CODE},
@ -394,7 +395,12 @@ mod tests {
#[tokio::test]
/// Handler for: `eth_test_fee_history`
async fn test_fee_history() {
let eth_api = EthApi::new(NoopProvider::default(), testing_pool(), NoopNetwork::default());
let eth_api = EthApi::new(
NoopProvider::default(),
testing_pool(),
NoopNetwork::default(),
EthStateCache::spawn(NoopProvider::default(), Default::default()),
);
let response = eth_api.fee_history(1.into(), BlockNumberOrTag::Latest.into(), None).await;
assert!(matches!(response, RpcResult::Err(RpcError::Call(CallError::Custom(_)))));
@ -434,7 +440,12 @@ mod tests {
.push(base_fee_per_gas.map(|fee| U256::try_from(fee).unwrap()).unwrap_or_default());
}
let eth_api = EthApi::new(mock_provider, testing_pool(), NoopNetwork::default());
let eth_api = EthApi::new(
mock_provider,
testing_pool(),
NoopNetwork::default(),
EthStateCache::spawn(NoopProvider::default(), Default::default()),
);
let response =
eth_api.fee_history((newest_block + 1).into(), newest_block.into(), None).await;

View File

@ -46,6 +46,7 @@ where
#[cfg(test)]
mod tests {
use crate::eth::cache::EthStateCache;
use reth_primitives::{hex_literal::hex, Bytes};
use reth_provider::test_utils::NoopProvider;
use reth_transaction_pool::{test_utils::testing_pool, TransactionPool};
@ -58,7 +59,12 @@ mod tests {
let pool = testing_pool();
let eth_api = EthApi::new(noop_provider, pool.clone(), ());
let eth_api = EthApi::new(
noop_provider,
pool.clone(),
(),
EthStateCache::spawn(NoopProvider::default(), Default::default()),
);
// https://etherscan.io/tx/0xa694b71e6c128a2ed8e2e0f6770bddbe52e3bb8f10e8472f9a79ab81497a8b5d
let tx_1 = Bytes::from(hex!("02f871018303579880850555633d1b82520894eee27662c2b8eba3cd936a23f039f3189633e4c887ad591c62bdaeb180c080a07ea72c68abfb8fca1bd964f0f99132ed9280261bdca3e549546c0205e800f7d0a05b4ef3039e9c9b9babc179a1878fb825b5aaf5aed2fa8744854150157b08d6f3"));

View File

@ -4,9 +4,10 @@ use futures::StreamExt;
use reth_interfaces::{provider::ProviderError, Result};
use reth_primitives::{Block, H256};
use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory};
use reth_tasks::TaskSpawner;
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use revm::primitives::{BlockEnv, CfgEnv};
use schnellru::{ByMemoryUsage, Limiter, LruMap};
use serde::{Deserialize, Serialize};
use std::{
collections::{hash_map::Entry, HashMap},
future::Future,
@ -30,23 +31,43 @@ type BlockLruCache<L> = MultiConsumerLruCache<H256, Block, L, BlockResponseSende
type EnvLruCache<L> = MultiConsumerLruCache<H256, (CfgEnv, BlockEnv), L, EnvResponseSender>;
/// Settings for the [EthStateCache]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EthStateCacheConfig {
/// Max number of bytes for cached block data.
///
/// Default is 50MB
pub max_block_bytes: usize,
/// Max number of bytes for cached env data.
///
/// Default is 500kb (env configs are very small)
pub max_env_bytes: usize,
}
impl Default for EthStateCacheConfig {
fn default() -> Self {
Self { max_block_bytes: 50 * 1024 * 1024, max_env_bytes: 500 * 1024 }
}
}
/// Provides async access to cached eth data
///
/// This is the frontend to the [EthStateCacheService] which manages cached data on a different
/// This is the frontend for the async caching service which manages cached data on a different
/// task.
#[derive(Debug, Clone)]
pub(crate) struct EthStateCache {
pub struct EthStateCache {
to_service: UnboundedSender<CacheAction>,
}
impl EthStateCache {
/// Creates and returns both [EthStateCache] frontend and the memory bound service.
fn create<Client>(
fn create<Client, Tasks>(
client: Client,
action_task_spawner: Box<dyn TaskSpawner>,
action_task_spawner: Tasks,
max_block_bytes: usize,
max_env_bytes: usize,
) -> (Self, EthStateCacheService<Client>) {
) -> (Self, EthStateCacheService<Client, Tasks>) {
let (to_service, rx) = unbounded_channel();
let service = EthStateCacheService {
client,
@ -60,21 +81,34 @@ impl EthStateCache {
(cache, service)
}
/// Creates a new async LRU backed cache service task and spawns it to a new task via
/// [tokio::spawn].
///
/// See also [Self::spawn_with]
pub fn spawn<Client>(client: Client, config: EthStateCacheConfig) -> Self
where
Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static,
{
Self::spawn_with(client, config, TokioTaskExecutor::default())
}
/// Creates a new async LRU backed cache service task and spawns it to a new task via the given
/// spawner.
///
/// The cache is memory limited by the given max bytes values.
pub(crate) fn spawn<Client>(
pub fn spawn_with<Client, Tasks>(
client: Client,
spawner: Box<dyn TaskSpawner>,
max_block_bytes: usize,
max_env_bytes: usize,
config: EthStateCacheConfig,
executor: Tasks,
) -> Self
where
Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
let (this, service) = Self::create(client, spawner.clone(), max_block_bytes, max_env_bytes);
spawner.spawn(Box::pin(service));
let EthStateCacheConfig { max_block_bytes, max_env_bytes } = config;
let (this, service) =
Self::create(client, executor.clone(), max_block_bytes, max_env_bytes);
executor.spawn(Box::pin(service));
this
}
@ -107,7 +141,7 @@ impl EthStateCache {
///
/// This type is an endless future that listens for incoming messages from the user facing
/// [EthStateCache] via a channel. If the requested data is not cached then it spawns a new task
/// that does the IO and sends the result back to it. This way the [EthStateCacheService] only
/// that does the IO and sends the result back to it. This way the caching service only
/// handles messages and does LRU lookups and never blocking IO.
///
/// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the
@ -116,6 +150,7 @@ impl EthStateCache {
#[must_use = "Type does nothing unless spawned"]
pub(crate) struct EthStateCacheService<
Client,
Tasks,
LimitBlocks = ByMemoryUsage,
LimitEnvs = ByMemoryUsage,
> where
@ -133,12 +168,13 @@ pub(crate) struct EthStateCacheService<
/// Receiver half of the action channel.
action_rx: UnboundedReceiverStream<CacheAction>,
/// The type that's used to spawn tasks that do the actual work
action_task_spawner: Box<dyn TaskSpawner>,
action_task_spawner: Tasks,
}
impl<Client> Future for EthStateCacheService<Client>
impl<Client, Tasks> Future for EthStateCacheService<Client, Tasks>
where
Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
{
type Output = ();

View File

@ -1,7 +1,7 @@
//! `eth` namespace handler implementation.
mod api;
mod cache;
pub mod cache;
pub(crate) mod error;
mod filter;
mod pubsub;

View File

@ -14,7 +14,7 @@
mod admin;
mod debug;
mod engine;
mod eth;
pub mod eth;
mod layers;
mod net;
mod trace;

View File

@ -66,7 +66,7 @@ pub mod shutdown;
/// ```
///
/// The [TaskSpawner] trait is [DynClone] so `Box<dyn TaskSpawner>` are also `Clone`.
pub trait TaskSpawner: Send + Sync + std::fmt::Debug + DynClone {
pub trait TaskSpawner: Send + Sync + Unpin + std::fmt::Debug + DynClone {
/// Spawns the task onto the runtime.
/// See also [`Handle::spawn`].
fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>;