diff --git a/Cargo.lock b/Cargo.lock index 3df3be67b..bfca56c1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4913,6 +4913,7 @@ name = "reth-rpc" version = "0.1.0" dependencies = [ "async-trait", + "futures", "hex", "http", "http-body", @@ -4933,6 +4934,7 @@ dependencies = [ "reth-tasks", "reth-transaction-pool", "revm", + "schnellru", "secp256k1 0.26.0", "serde", "serde_json", @@ -5436,6 +5438,17 @@ dependencies = [ "windows-sys 0.42.0", ] +[[package]] +name = "schnellru" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "772575a524feeb803e5b0fcbc6dd9f367e579488197c94c6e4023aad2305774d" +dependencies = [ + "ahash 0.8.3", + "cfg-if", + "hashbrown 0.13.2", +] + [[package]] name = "scopeguard" version = "1.1.0" diff --git a/crates/executor/src/executor.rs b/crates/executor/src/executor.rs index 5a7cd6977..f383d84e0 100644 --- a/crates/executor/src/executor.rs +++ b/crates/executor/src/executor.rs @@ -72,7 +72,13 @@ where /// Initializes the config and block env. fn init_env(&mut self, header: &Header, total_difficulty: U256) { - fill_cfg_and_block_env(&mut self.evm.env, self.chain_spec, header, total_difficulty); + fill_cfg_and_block_env( + &mut self.evm.env.cfg, + &mut self.evm.env.block, + self.chain_spec, + header, + total_difficulty, + ); } /// Commit change to database and return change diff that is used to update state and create diff --git a/crates/interfaces/Cargo.toml b/crates/interfaces/Cargo.toml index aeb1b909c..9448840bf 100644 --- a/crates/interfaces/Cargo.toml +++ b/crates/interfaces/Cargo.toml @@ -22,7 +22,7 @@ reth-eth-wire = { path = "../net/eth-wire" } # codecs parity-scale-codec = { version = "3.2.1", features = ["bytes"] } -futures = "0.3.25" +futures = "0.3" tokio-stream = "0.1.11" rand = "0.8.5" arbitrary = { version = "1.1.7", features = ["derive"], optional = true } diff --git a/crates/interfaces/src/provider.rs b/crates/interfaces/src/provider.rs index be00f6512..ac24be67a 100644 --- a/crates/interfaces/src/provider.rs +++ b/crates/interfaces/src/provider.rs @@ -71,4 +71,7 @@ pub enum ProviderError { /// Thrown when required header related data was not found but was required. #[error("requested data not found")] HeaderNotFound, + /// Thrown when the cache service task dropped + #[error("cache service task stopped")] + CacheServiceUnavailable, } diff --git a/crates/revm/revm-primitives/src/env.rs b/crates/revm/revm-primitives/src/env.rs index c6bbf1802..470684a76 100644 --- a/crates/revm/revm-primitives/src/env.rs +++ b/crates/revm/revm-primitives/src/env.rs @@ -3,18 +3,19 @@ use reth_primitives::{ Address, ChainSpec, Head, Header, Transaction, TransactionKind, TransactionSigned, TxEip1559, TxEip2930, TxLegacy, U256, }; -use revm::primitives::{AnalysisKind, BlockEnv, CfgEnv, Env, SpecId, TransactTo, TxEnv}; +use revm::primitives::{AnalysisKind, BlockEnv, CfgEnv, SpecId, TransactTo, TxEnv}; /// Convenience function to call both [fill_cfg_env] and [fill_block_env] pub fn fill_cfg_and_block_env( - env: &mut Env, + cfg: &mut CfgEnv, + block_env: &mut BlockEnv, chain_spec: &ChainSpec, header: &Header, total_difficulty: U256, ) { - fill_cfg_env(&mut env.cfg, chain_spec, header, total_difficulty); - let after_merge = env.cfg.spec_id >= SpecId::MERGE; - fill_block_env(&mut env.block, header, after_merge); + fill_cfg_env(cfg, chain_spec, header, total_difficulty); + let after_merge = cfg.spec_id >= SpecId::MERGE; + fill_block_env(block_env, header, after_merge); } /// Fill [CfgEnv] fields according to the chain spec and given header diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index f86cb8a25..9e1a9a612 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -50,6 +50,8 @@ thiserror = "1.0" hex = "0.4" rand = "0.8.5" tracing = "0.1" +schnellru = "0.2" +futures = "0.3.26" [dev-dependencies] jsonrpsee = { version = "0.16", features = ["client"]} diff --git a/crates/rpc/rpc/src/eth/cache.rs b/crates/rpc/rpc/src/eth/cache.rs new file mode 100644 index 000000000..edc7ed060 --- /dev/null +++ b/crates/rpc/rpc/src/eth/cache.rs @@ -0,0 +1,285 @@ +//! Async caching support for eth RPC + +use futures::StreamExt; +use reth_interfaces::{provider::ProviderError, Result}; +use reth_primitives::{Block, H256}; +use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory}; +use reth_tasks::TaskSpawner; +use revm::primitives::{BlockEnv, CfgEnv}; +use schnellru::{ByMemoryUsage, Limiter, LruMap}; +use std::{ + collections::{hash_map::Entry, HashMap}, + future::Future, + hash::Hash, + pin::Pin, + task::{ready, Context, Poll}, +}; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedSender}, + oneshot, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +/// The type that can send the response to a requested [Block] +type BlockResponseSender = oneshot::Sender>>; + +/// The type that can send the response to a requested env +type EnvResponseSender = oneshot::Sender>; + +type BlockLruCache = MultiConsumerLruCache; + +type EnvLruCache = MultiConsumerLruCache; + +/// Provides async access to cached eth data +/// +/// This is the frontend to the [EthStateCacheService] which manages cached data on a different +/// task. +#[derive(Debug, Clone)] +pub(crate) struct EthStateCache { + to_service: UnboundedSender, +} + +impl EthStateCache { + /// Creates and returns both [EthStateCache] frontend and the memory bound service. + fn create( + client: Client, + action_task_spawner: Box, + max_block_bytes: usize, + max_env_bytes: usize, + ) -> (Self, EthStateCacheService) { + let (to_service, rx) = unbounded_channel(); + let service = EthStateCacheService { + client, + full_block_cache: BlockLruCache::with_memory_budget(max_block_bytes), + evm_env_cache: EnvLruCache::with_memory_budget(max_env_bytes), + action_tx: to_service.clone(), + action_rx: UnboundedReceiverStream::new(rx), + action_task_spawner, + }; + let cache = EthStateCache { to_service }; + (cache, service) + } + + /// Creates a new async LRU backed cache service task and spawns it to a new task via the given + /// spawner. + /// + /// The cache is memory limited by the given max bytes values. + pub(crate) fn spawn( + client: Client, + spawner: Box, + max_block_bytes: usize, + max_env_bytes: usize, + ) -> Self + where + Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static, + { + let (this, service) = Self::create(client, spawner.clone(), max_block_bytes, max_env_bytes); + spawner.spawn(Box::pin(service)); + this + } + + /// Requests the [Block] for the block hash + /// + /// Returns `None` if the block does not exist. + pub(crate) async fn get_block(&self, block_hash: H256) -> Result> { + let (response_tx, rx) = oneshot::channel(); + let _ = self.to_service.send(CacheAction::GetBlock { block_hash, response_tx }); + rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)? + } + + /// Requests the evm env config for the block hash. + /// + /// Returns an error if the corresponding header (required for populating the envs) was not + /// found. + pub(crate) async fn get_evm_evn(&self, block_hash: H256) -> Result<(CfgEnv, BlockEnv)> { + let (response_tx, rx) = oneshot::channel(); + let _ = self.to_service.send(CacheAction::GetEnv { block_hash, response_tx }); + rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)? + } +} + +/// A task than manages caches for data required by the `eth` rpc implementation. +/// +/// It provides a caching layer on top of the given [StateProvider](reth_provider::StateProvider) +/// and keeps data fetched via the provider in memory in an LRU cache. If the requested data is +/// missing in the cache it is fetched and inserted into the cache afterwards. While fetching data +/// from disk is sync, this service is async since requests and data is shared via channels. +/// +/// This type is an endless future that listens for incoming messages from the user facing +/// [EthStateCache] via a channel. If the requested data is not cached then it spawns a new task +/// that does the IO and sends the result back to it. This way the [EthStateCacheService] only +/// handles messages and does LRU lookups and never blocking IO. +/// +/// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the +/// [EthApi](crate::EthApi) which is typically invoked by the RPC server, which already uses permits +/// to limit concurrent requests. +#[must_use = "Type does nothing unless spawned"] +pub(crate) struct EthStateCacheService< + Client, + LimitBlocks = ByMemoryUsage, + LimitEnvs = ByMemoryUsage, +> where + LimitBlocks: Limiter, + LimitEnvs: Limiter, +{ + /// The type used to lookup data from disk + client: Client, + /// The LRU cache for full blocks grouped by their hash. + full_block_cache: BlockLruCache, + /// The LRU cache for revm environments + evm_env_cache: EnvLruCache, + /// Sender half of the action channel. + action_tx: UnboundedSender, + /// Receiver half of the action channel. + action_rx: UnboundedReceiverStream, + /// The type that's used to spawn tasks that do the actual work + action_task_spawner: Box, +} + +impl Future for EthStateCacheService +where + Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + match ready!(this.action_rx.poll_next_unpin(cx)) { + None => { + unreachable!("can't close") + } + Some(action) => { + match action { + CacheAction::GetBlock { block_hash, response_tx } => { + // check if block is cached + if let Some(block) = + this.full_block_cache.cache.get(&block_hash).cloned() + { + let _ = response_tx.send(Ok(Some(block))); + continue + } + + // block is not in the cache, request it if this is the first consumer + if this.full_block_cache.queue(block_hash, response_tx) { + let client = this.client.clone(); + let action_tx = this.action_tx.clone(); + this.action_task_spawner.spawn(Box::pin(async move { + let res = client.block_by_hash(block_hash); + let _ = action_tx + .send(CacheAction::BlockResult { block_hash, res }); + })); + } + } + CacheAction::GetEnv { block_hash, response_tx } => { + // check if env data is cached + if let Some(env) = this.evm_env_cache.cache.get(&block_hash).cloned() { + let _ = response_tx.send(Ok(env)); + continue + } + + // env data is not in the cache, request it if this is the first + // consumer + if this.evm_env_cache.queue(block_hash, response_tx) { + let client = this.client.clone(); + let action_tx = this.action_tx.clone(); + this.action_task_spawner.spawn(Box::pin(async move { + let mut cfg = CfgEnv::default(); + let mut block_env = BlockEnv::default(); + let res = client + .fill_env_at(&mut cfg, &mut block_env, block_hash.into()) + .map(|_| (cfg, block_env)); + let _ = action_tx.send(CacheAction::EnvResult { + block_hash, + res: Box::new(res), + }); + })); + } + } + CacheAction::BlockResult { block_hash, res } => { + if let Some(queued) = this.full_block_cache.queued.remove(&block_hash) { + // send the response to queued senders + for tx in queued { + let _ = tx.send(res.clone()); + } + } + + // cache good block + if let Ok(Some(block)) = res { + this.full_block_cache.cache.insert(block_hash, block); + } + } + CacheAction::EnvResult { block_hash, res } => { + let res = *res; + if let Some(queued) = this.evm_env_cache.queued.remove(&block_hash) { + // send the response to queued senders + for tx in queued { + let _ = tx.send(res.clone()); + } + } + + // cache good env data + if let Ok(data) = res { + this.evm_env_cache.cache.insert(block_hash, data); + } + } + } + } + } + } + } +} + +struct MultiConsumerLruCache +where + K: Hash + Eq, + L: Limiter, +{ + /// The LRU cache for the + cache: LruMap, + /// All queued consumers + queued: HashMap>, +} + +impl MultiConsumerLruCache +where + K: Hash + Eq, + L: Limiter, +{ + /// Adds the sender to the queue for the given key. + /// + /// Returns true if this is the first queued sender for the key + fn queue(&mut self, key: K, sender: S) -> bool { + match self.queued.entry(key) { + Entry::Occupied(mut entry) => { + entry.get_mut().push(sender); + false + } + Entry::Vacant(entry) => { + entry.insert(vec![sender]); + true + } + } + } +} + +impl MultiConsumerLruCache +where + K: Hash + Eq, +{ + /// Creates a new empty map with a given `memory_budget`. + /// + /// See also [LruMap::with_memory_budget] + fn with_memory_budget(memory_budget: usize) -> Self { + Self { cache: LruMap::with_memory_budget(memory_budget), queued: Default::default() } + } +} + +/// All message variants sent through the channel +enum CacheAction { + GetBlock { block_hash: H256, response_tx: BlockResponseSender }, + GetEnv { block_hash: H256, response_tx: EnvResponseSender }, + BlockResult { block_hash: H256, res: Result> }, + EnvResult { block_hash: H256, res: Box> }, +} diff --git a/crates/rpc/rpc/src/eth/mod.rs b/crates/rpc/rpc/src/eth/mod.rs index 598e3721a..0a79cc763 100644 --- a/crates/rpc/rpc/src/eth/mod.rs +++ b/crates/rpc/rpc/src/eth/mod.rs @@ -1,6 +1,7 @@ //! `eth` namespace handler implementation. mod api; +mod cache; pub(crate) mod error; mod filter; mod pubsub; diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index c67de018f..8ea781b06 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -17,7 +17,7 @@ use reth_revm_primitives::{ config::revm_spec, env::{fill_block_env, fill_cfg_and_block_env, fill_cfg_env}, }; -use revm_primitives::{BlockEnv, CfgEnv, Env, SpecId}; +use revm_primitives::{BlockEnv, CfgEnv, SpecId}; use std::{ops::RangeBounds, sync::Arc}; mod state; @@ -241,16 +241,21 @@ impl WithdrawalsProvider for ShareableDatabase { } impl EvmEnvProvider for ShareableDatabase { - fn fill_env_at(&self, env: &mut Env, at: BlockId) -> Result<()> { + fn fill_env_at(&self, cfg: &mut CfgEnv, block_env: &mut BlockEnv, at: BlockId) -> Result<()> { let hash = self.block_hash_for_id(at)?.ok_or(ProviderError::HeaderNotFound)?; let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound)?; - self.fill_env_with_header(env, &header) + self.fill_env_with_header(cfg, block_env, &header) } - fn fill_env_with_header(&self, env: &mut Env, header: &Header) -> Result<()> { + fn fill_env_with_header( + &self, + cfg: &mut CfgEnv, + block_env: &mut BlockEnv, + header: &Header, + ) -> Result<()> { let total_difficulty = self.header_td_by_number(header.number)?.ok_or(ProviderError::HeaderNotFound)?; - fill_cfg_and_block_env(env, &self.chain_spec, header, total_difficulty); + fill_cfg_and_block_env(cfg, block_env, &self.chain_spec, header, total_difficulty); Ok(()) } diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index a95bc0b7a..f6946f95e 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -8,7 +8,7 @@ use reth_primitives::{ keccak256, Account, Address, Block, BlockHash, BlockId, BlockNumber, BlockNumberOrTag, Bytes, ChainInfo, Header, StorageKey, StorageValue, TransactionSigned, TxHash, H256, U256, }; -use revm_primitives::{BlockEnv, CfgEnv, Env}; +use revm_primitives::{BlockEnv, CfgEnv}; use std::{collections::HashMap, ops::RangeBounds, sync::Arc}; /// A mock implementation for Provider interfaces. @@ -240,11 +240,21 @@ impl StateProvider for MockEthProvider { } impl EvmEnvProvider for MockEthProvider { - fn fill_env_at(&self, _env: &mut Env, _at: BlockId) -> Result<()> { + fn fill_env_at( + &self, + _cfg: &mut CfgEnv, + _block_env: &mut BlockEnv, + _at: BlockId, + ) -> Result<()> { unimplemented!() } - fn fill_env_with_header(&self, _env: &mut Env, _header: &Header) -> Result<()> { + fn fill_env_with_header( + &self, + _cfg: &mut CfgEnv, + _block_env: &mut BlockEnv, + _header: &Header, + ) -> Result<()> { unimplemented!() } diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 8f24ea6fb..28eba6613 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -7,7 +7,7 @@ use reth_primitives::{ Account, Address, Block, BlockHash, BlockId, BlockNumber, Bytes, ChainInfo, Header, StorageKey, StorageValue, TransactionSigned, TxHash, TxNumber, H256, U256, }; -use revm_primitives::{BlockEnv, CfgEnv, Env}; +use revm_primitives::{BlockEnv, CfgEnv}; use std::ops::RangeBounds; /// Supports various api interfaces for testing purposes. @@ -102,11 +102,21 @@ impl StateProvider for NoopProvider { } impl EvmEnvProvider for NoopProvider { - fn fill_env_at(&self, _env: &mut Env, _at: BlockId) -> Result<()> { + fn fill_env_at( + &self, + _cfg: &mut CfgEnv, + _block_env: &mut BlockEnv, + _at: BlockId, + ) -> Result<()> { Ok(()) } - fn fill_env_with_header(&self, _env: &mut Env, _header: &Header) -> Result<()> { + fn fill_env_with_header( + &self, + _cfg: &mut CfgEnv, + _block_env: &mut BlockEnv, + _header: &Header, + ) -> Result<()> { Ok(()) } diff --git a/crates/storage/provider/src/traits/evm_env.rs b/crates/storage/provider/src/traits/evm_env.rs index 6f422e298..9afe1ce47 100644 --- a/crates/storage/provider/src/traits/evm_env.rs +++ b/crates/storage/provider/src/traits/evm_env.rs @@ -1,17 +1,23 @@ use reth_interfaces::Result; use reth_primitives::{BlockId, Header}; -use revm_primitives::{BlockEnv, CfgEnv, Env}; +use revm_primitives::{BlockEnv, CfgEnv}; -/// A provider type that knows chain specific information required to configure an [Env] +/// A provider type that knows chain specific information required to configure an +/// [Env](revm_primitives::Env) /// /// This type is mainly used to provide required data to configure the EVM environment. #[auto_impl::auto_impl(&, Arc)] pub trait EvmEnvProvider: Send + Sync { /// Fills the [CfgEnv] and [BlockEnv] fields with values specific to the given [BlockId]. - fn fill_env_at(&self, env: &mut Env, at: BlockId) -> Result<()>; + fn fill_env_at(&self, cfg: &mut CfgEnv, block_env: &mut BlockEnv, at: BlockId) -> Result<()>; /// Fills the [CfgEnv] and [BlockEnv] fields with values specific to the given [Header]. - fn fill_env_with_header(&self, env: &mut Env, header: &Header) -> Result<()>; + fn fill_env_with_header( + &self, + cfg: &mut CfgEnv, + block_env: &mut BlockEnv, + header: &Header, + ) -> Result<()>; /// Fills the [BlockEnv] fields with values specific to the given [BlockId]. fn fill_block_env_at(&self, block_env: &mut BlockEnv, at: BlockId) -> Result<()>;