feat: add eth state cache (#1561)

This commit is contained in:
Matthias Seitz
2023-02-28 20:46:25 +01:00
committed by GitHub
parent dc2f6047db
commit 7a01e1e231
12 changed files with 364 additions and 22 deletions

13
Cargo.lock generated
View File

@ -4913,6 +4913,7 @@ name = "reth-rpc"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"hex",
"http",
"http-body",
@ -4933,6 +4934,7 @@ dependencies = [
"reth-tasks",
"reth-transaction-pool",
"revm",
"schnellru",
"secp256k1 0.26.0",
"serde",
"serde_json",
@ -5436,6 +5438,17 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "schnellru"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "772575a524feeb803e5b0fcbc6dd9f367e579488197c94c6e4023aad2305774d"
dependencies = [
"ahash 0.8.3",
"cfg-if",
"hashbrown 0.13.2",
]
[[package]]
name = "scopeguard"
version = "1.1.0"

View File

@ -72,7 +72,13 @@ where
/// Initializes the config and block env.
fn init_env(&mut self, header: &Header, total_difficulty: U256) {
fill_cfg_and_block_env(&mut self.evm.env, self.chain_spec, header, total_difficulty);
fill_cfg_and_block_env(
&mut self.evm.env.cfg,
&mut self.evm.env.block,
self.chain_spec,
header,
total_difficulty,
);
}
/// Commit change to database and return change diff that is used to update state and create

View File

@ -22,7 +22,7 @@ reth-eth-wire = { path = "../net/eth-wire" }
# codecs
parity-scale-codec = { version = "3.2.1", features = ["bytes"] }
futures = "0.3.25"
futures = "0.3"
tokio-stream = "0.1.11"
rand = "0.8.5"
arbitrary = { version = "1.1.7", features = ["derive"], optional = true }

View File

@ -71,4 +71,7 @@ pub enum ProviderError {
/// Thrown when required header related data was not found but was required.
#[error("requested data not found")]
HeaderNotFound,
/// Thrown when the cache service task dropped
#[error("cache service task stopped")]
CacheServiceUnavailable,
}

View File

@ -3,18 +3,19 @@ use reth_primitives::{
Address, ChainSpec, Head, Header, Transaction, TransactionKind, TransactionSigned, TxEip1559,
TxEip2930, TxLegacy, U256,
};
use revm::primitives::{AnalysisKind, BlockEnv, CfgEnv, Env, SpecId, TransactTo, TxEnv};
use revm::primitives::{AnalysisKind, BlockEnv, CfgEnv, SpecId, TransactTo, TxEnv};
/// Convenience function to call both [fill_cfg_env] and [fill_block_env]
pub fn fill_cfg_and_block_env(
env: &mut Env,
cfg: &mut CfgEnv,
block_env: &mut BlockEnv,
chain_spec: &ChainSpec,
header: &Header,
total_difficulty: U256,
) {
fill_cfg_env(&mut env.cfg, chain_spec, header, total_difficulty);
let after_merge = env.cfg.spec_id >= SpecId::MERGE;
fill_block_env(&mut env.block, header, after_merge);
fill_cfg_env(cfg, chain_spec, header, total_difficulty);
let after_merge = cfg.spec_id >= SpecId::MERGE;
fill_block_env(block_env, header, after_merge);
}
/// Fill [CfgEnv] fields according to the chain spec and given header

View File

@ -50,6 +50,8 @@ thiserror = "1.0"
hex = "0.4"
rand = "0.8.5"
tracing = "0.1"
schnellru = "0.2"
futures = "0.3.26"
[dev-dependencies]
jsonrpsee = { version = "0.16", features = ["client"]}

View File

@ -0,0 +1,285 @@
//! Async caching support for eth RPC
use futures::StreamExt;
use reth_interfaces::{provider::ProviderError, Result};
use reth_primitives::{Block, H256};
use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory};
use reth_tasks::TaskSpawner;
use revm::primitives::{BlockEnv, CfgEnv};
use schnellru::{ByMemoryUsage, Limiter, LruMap};
use std::{
collections::{hash_map::Entry, HashMap},
future::Future,
hash::Hash,
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
oneshot,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// The type that can send the response to a requested [Block]
type BlockResponseSender = oneshot::Sender<Result<Option<Block>>>;
/// The type that can send the response to a requested env
type EnvResponseSender = oneshot::Sender<Result<(CfgEnv, BlockEnv)>>;
type BlockLruCache<L> = MultiConsumerLruCache<H256, Block, L, BlockResponseSender>;
type EnvLruCache<L> = MultiConsumerLruCache<H256, (CfgEnv, BlockEnv), L, EnvResponseSender>;
/// Provides async access to cached eth data
///
/// This is the frontend to the [EthStateCacheService] which manages cached data on a different
/// task.
#[derive(Debug, Clone)]
pub(crate) struct EthStateCache {
to_service: UnboundedSender<CacheAction>,
}
impl EthStateCache {
/// Creates and returns both [EthStateCache] frontend and the memory bound service.
fn create<Client>(
client: Client,
action_task_spawner: Box<dyn TaskSpawner>,
max_block_bytes: usize,
max_env_bytes: usize,
) -> (Self, EthStateCacheService<Client>) {
let (to_service, rx) = unbounded_channel();
let service = EthStateCacheService {
client,
full_block_cache: BlockLruCache::with_memory_budget(max_block_bytes),
evm_env_cache: EnvLruCache::with_memory_budget(max_env_bytes),
action_tx: to_service.clone(),
action_rx: UnboundedReceiverStream::new(rx),
action_task_spawner,
};
let cache = EthStateCache { to_service };
(cache, service)
}
/// Creates a new async LRU backed cache service task and spawns it to a new task via the given
/// spawner.
///
/// The cache is memory limited by the given max bytes values.
pub(crate) fn spawn<Client>(
client: Client,
spawner: Box<dyn TaskSpawner>,
max_block_bytes: usize,
max_env_bytes: usize,
) -> Self
where
Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static,
{
let (this, service) = Self::create(client, spawner.clone(), max_block_bytes, max_env_bytes);
spawner.spawn(Box::pin(service));
this
}
/// Requests the [Block] for the block hash
///
/// Returns `None` if the block does not exist.
pub(crate) async fn get_block(&self, block_hash: H256) -> Result<Option<Block>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetBlock { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
/// Requests the evm env config for the block hash.
///
/// Returns an error if the corresponding header (required for populating the envs) was not
/// found.
pub(crate) async fn get_evm_evn(&self, block_hash: H256) -> Result<(CfgEnv, BlockEnv)> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetEnv { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
}
/// A task than manages caches for data required by the `eth` rpc implementation.
///
/// It provides a caching layer on top of the given [StateProvider](reth_provider::StateProvider)
/// and keeps data fetched via the provider in memory in an LRU cache. If the requested data is
/// missing in the cache it is fetched and inserted into the cache afterwards. While fetching data
/// from disk is sync, this service is async since requests and data is shared via channels.
///
/// This type is an endless future that listens for incoming messages from the user facing
/// [EthStateCache] via a channel. If the requested data is not cached then it spawns a new task
/// that does the IO and sends the result back to it. This way the [EthStateCacheService] only
/// handles messages and does LRU lookups and never blocking IO.
///
/// Caution: The channel for the data is _unbounded_ it is assumed that this is mainly used by the
/// [EthApi](crate::EthApi) which is typically invoked by the RPC server, which already uses permits
/// to limit concurrent requests.
#[must_use = "Type does nothing unless spawned"]
pub(crate) struct EthStateCacheService<
Client,
LimitBlocks = ByMemoryUsage,
LimitEnvs = ByMemoryUsage,
> where
LimitBlocks: Limiter<H256, Block>,
LimitEnvs: Limiter<H256, (CfgEnv, BlockEnv)>,
{
/// The type used to lookup data from disk
client: Client,
/// The LRU cache for full blocks grouped by their hash.
full_block_cache: BlockLruCache<LimitBlocks>,
/// The LRU cache for revm environments
evm_env_cache: EnvLruCache<LimitEnvs>,
/// Sender half of the action channel.
action_tx: UnboundedSender<CacheAction>,
/// Receiver half of the action channel.
action_rx: UnboundedReceiverStream<CacheAction>,
/// The type that's used to spawn tasks that do the actual work
action_task_spawner: Box<dyn TaskSpawner>,
}
impl<Client> Future for EthStateCacheService<Client>
where
Client: StateProviderFactory + BlockProvider + EvmEnvProvider + Clone + Unpin + 'static,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match ready!(this.action_rx.poll_next_unpin(cx)) {
None => {
unreachable!("can't close")
}
Some(action) => {
match action {
CacheAction::GetBlock { block_hash, response_tx } => {
// check if block is cached
if let Some(block) =
this.full_block_cache.cache.get(&block_hash).cloned()
{
let _ = response_tx.send(Ok(Some(block)));
continue
}
// block is not in the cache, request it if this is the first consumer
if this.full_block_cache.queue(block_hash, response_tx) {
let client = this.client.clone();
let action_tx = this.action_tx.clone();
this.action_task_spawner.spawn(Box::pin(async move {
let res = client.block_by_hash(block_hash);
let _ = action_tx
.send(CacheAction::BlockResult { block_hash, res });
}));
}
}
CacheAction::GetEnv { block_hash, response_tx } => {
// check if env data is cached
if let Some(env) = this.evm_env_cache.cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(env));
continue
}
// env data is not in the cache, request it if this is the first
// consumer
if this.evm_env_cache.queue(block_hash, response_tx) {
let client = this.client.clone();
let action_tx = this.action_tx.clone();
this.action_task_spawner.spawn(Box::pin(async move {
let mut cfg = CfgEnv::default();
let mut block_env = BlockEnv::default();
let res = client
.fill_env_at(&mut cfg, &mut block_env, block_hash.into())
.map(|_| (cfg, block_env));
let _ = action_tx.send(CacheAction::EnvResult {
block_hash,
res: Box::new(res),
});
}));
}
}
CacheAction::BlockResult { block_hash, res } => {
if let Some(queued) = this.full_block_cache.queued.remove(&block_hash) {
// send the response to queued senders
for tx in queued {
let _ = tx.send(res.clone());
}
}
// cache good block
if let Ok(Some(block)) = res {
this.full_block_cache.cache.insert(block_hash, block);
}
}
CacheAction::EnvResult { block_hash, res } => {
let res = *res;
if let Some(queued) = this.evm_env_cache.queued.remove(&block_hash) {
// send the response to queued senders
for tx in queued {
let _ = tx.send(res.clone());
}
}
// cache good env data
if let Ok(data) = res {
this.evm_env_cache.cache.insert(block_hash, data);
}
}
}
}
}
}
}
}
struct MultiConsumerLruCache<K, V, L, S>
where
K: Hash + Eq,
L: Limiter<K, V>,
{
/// The LRU cache for the
cache: LruMap<K, V, L>,
/// All queued consumers
queued: HashMap<K, Vec<S>>,
}
impl<K, V, L, S> MultiConsumerLruCache<K, V, L, S>
where
K: Hash + Eq,
L: Limiter<K, V>,
{
/// Adds the sender to the queue for the given key.
///
/// Returns true if this is the first queued sender for the key
fn queue(&mut self, key: K, sender: S) -> bool {
match self.queued.entry(key) {
Entry::Occupied(mut entry) => {
entry.get_mut().push(sender);
false
}
Entry::Vacant(entry) => {
entry.insert(vec![sender]);
true
}
}
}
}
impl<K, V, S> MultiConsumerLruCache<K, V, ByMemoryUsage, S>
where
K: Hash + Eq,
{
/// Creates a new empty map with a given `memory_budget`.
///
/// See also [LruMap::with_memory_budget]
fn with_memory_budget(memory_budget: usize) -> Self {
Self { cache: LruMap::with_memory_budget(memory_budget), queued: Default::default() }
}
}
/// All message variants sent through the channel
enum CacheAction {
GetBlock { block_hash: H256, response_tx: BlockResponseSender },
GetEnv { block_hash: H256, response_tx: EnvResponseSender },
BlockResult { block_hash: H256, res: Result<Option<Block>> },
EnvResult { block_hash: H256, res: Box<Result<(CfgEnv, BlockEnv)>> },
}

View File

@ -1,6 +1,7 @@
//! `eth` namespace handler implementation.
mod api;
mod cache;
pub(crate) mod error;
mod filter;
mod pubsub;

View File

@ -17,7 +17,7 @@ use reth_revm_primitives::{
config::revm_spec,
env::{fill_block_env, fill_cfg_and_block_env, fill_cfg_env},
};
use revm_primitives::{BlockEnv, CfgEnv, Env, SpecId};
use revm_primitives::{BlockEnv, CfgEnv, SpecId};
use std::{ops::RangeBounds, sync::Arc};
mod state;
@ -241,16 +241,21 @@ impl<DB: Database> WithdrawalsProvider for ShareableDatabase<DB> {
}
impl<DB: Database> EvmEnvProvider for ShareableDatabase<DB> {
fn fill_env_at(&self, env: &mut Env, at: BlockId) -> Result<()> {
fn fill_env_at(&self, cfg: &mut CfgEnv, block_env: &mut BlockEnv, at: BlockId) -> Result<()> {
let hash = self.block_hash_for_id(at)?.ok_or(ProviderError::HeaderNotFound)?;
let header = self.header(&hash)?.ok_or(ProviderError::HeaderNotFound)?;
self.fill_env_with_header(env, &header)
self.fill_env_with_header(cfg, block_env, &header)
}
fn fill_env_with_header(&self, env: &mut Env, header: &Header) -> Result<()> {
fn fill_env_with_header(
&self,
cfg: &mut CfgEnv,
block_env: &mut BlockEnv,
header: &Header,
) -> Result<()> {
let total_difficulty =
self.header_td_by_number(header.number)?.ok_or(ProviderError::HeaderNotFound)?;
fill_cfg_and_block_env(env, &self.chain_spec, header, total_difficulty);
fill_cfg_and_block_env(cfg, block_env, &self.chain_spec, header, total_difficulty);
Ok(())
}

View File

@ -8,7 +8,7 @@ use reth_primitives::{
keccak256, Account, Address, Block, BlockHash, BlockId, BlockNumber, BlockNumberOrTag, Bytes,
ChainInfo, Header, StorageKey, StorageValue, TransactionSigned, TxHash, H256, U256,
};
use revm_primitives::{BlockEnv, CfgEnv, Env};
use revm_primitives::{BlockEnv, CfgEnv};
use std::{collections::HashMap, ops::RangeBounds, sync::Arc};
/// A mock implementation for Provider interfaces.
@ -240,11 +240,21 @@ impl StateProvider for MockEthProvider {
}
impl EvmEnvProvider for MockEthProvider {
fn fill_env_at(&self, _env: &mut Env, _at: BlockId) -> Result<()> {
fn fill_env_at(
&self,
_cfg: &mut CfgEnv,
_block_env: &mut BlockEnv,
_at: BlockId,
) -> Result<()> {
unimplemented!()
}
fn fill_env_with_header(&self, _env: &mut Env, _header: &Header) -> Result<()> {
fn fill_env_with_header(
&self,
_cfg: &mut CfgEnv,
_block_env: &mut BlockEnv,
_header: &Header,
) -> Result<()> {
unimplemented!()
}

View File

@ -7,7 +7,7 @@ use reth_primitives::{
Account, Address, Block, BlockHash, BlockId, BlockNumber, Bytes, ChainInfo, Header, StorageKey,
StorageValue, TransactionSigned, TxHash, TxNumber, H256, U256,
};
use revm_primitives::{BlockEnv, CfgEnv, Env};
use revm_primitives::{BlockEnv, CfgEnv};
use std::ops::RangeBounds;
/// Supports various api interfaces for testing purposes.
@ -102,11 +102,21 @@ impl StateProvider for NoopProvider {
}
impl EvmEnvProvider for NoopProvider {
fn fill_env_at(&self, _env: &mut Env, _at: BlockId) -> Result<()> {
fn fill_env_at(
&self,
_cfg: &mut CfgEnv,
_block_env: &mut BlockEnv,
_at: BlockId,
) -> Result<()> {
Ok(())
}
fn fill_env_with_header(&self, _env: &mut Env, _header: &Header) -> Result<()> {
fn fill_env_with_header(
&self,
_cfg: &mut CfgEnv,
_block_env: &mut BlockEnv,
_header: &Header,
) -> Result<()> {
Ok(())
}

View File

@ -1,17 +1,23 @@
use reth_interfaces::Result;
use reth_primitives::{BlockId, Header};
use revm_primitives::{BlockEnv, CfgEnv, Env};
use revm_primitives::{BlockEnv, CfgEnv};
/// A provider type that knows chain specific information required to configure an [Env]
/// A provider type that knows chain specific information required to configure an
/// [Env](revm_primitives::Env)
///
/// This type is mainly used to provide required data to configure the EVM environment.
#[auto_impl::auto_impl(&, Arc)]
pub trait EvmEnvProvider: Send + Sync {
/// Fills the [CfgEnv] and [BlockEnv] fields with values specific to the given [BlockId].
fn fill_env_at(&self, env: &mut Env, at: BlockId) -> Result<()>;
fn fill_env_at(&self, cfg: &mut CfgEnv, block_env: &mut BlockEnv, at: BlockId) -> Result<()>;
/// Fills the [CfgEnv] and [BlockEnv] fields with values specific to the given [Header].
fn fill_env_with_header(&self, env: &mut Env, header: &Header) -> Result<()>;
fn fill_env_with_header(
&self,
cfg: &mut CfgEnv,
block_env: &mut BlockEnv,
header: &Header,
) -> Result<()>;
/// Fills the [BlockEnv] fields with values specific to the given [BlockId].
fn fill_block_env_at(&self, block_env: &mut BlockEnv, at: BlockId) -> Result<()>;