feat(rpc): create revm env on demand (#13017)

Co-authored-by: dkathiriya <lakshya-sky@users.noreply.github.com>
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Darshan Kathiriya
2024-11-30 12:27:20 -05:00
committed by GitHub
parent 3dc6f506b0
commit 890f082453
14 changed files with 91 additions and 123 deletions

View File

@ -113,7 +113,7 @@ impl RethRpcServerConfig for RpcServerArgs {
EthStateCacheConfig {
max_blocks: self.rpc_state_cache.max_blocks,
max_receipts: self.rpc_state_cache.max_receipts,
max_envs: self.rpc_state_cache.max_envs,
max_headers: self.rpc_state_cache.max_headers,
max_concurrent_db_requests: self.rpc_state_cache.max_concurrent_db_requests,
}
}

View File

@ -65,12 +65,7 @@ where
EvmConfig: ConfigureEvm<Header = Header>,
Tasks: TaskSpawner + Clone + 'static,
{
let cache = EthStateCache::spawn_with(
provider.clone(),
config.cache,
executor.clone(),
evm_config.clone(),
);
let cache = EthStateCache::spawn_with(provider.clone(), config.cache, executor.clone());
let new_canonical_blocks = events.canonical_state_stream();
let c = cache.clone();

View File

@ -259,7 +259,7 @@ where
Some(block_number.into()),
)
.await
.unwrap();
.unwrap_err();
EthApiClient::<Transaction, Block, Receipt>::estimate_gas(
client,
call_request.clone(),
@ -267,7 +267,7 @@ where
None,
)
.await
.unwrap();
.unwrap_err();
EthApiClient::<Transaction, Block, Receipt>::call(
client,
call_request.clone(),
@ -276,7 +276,7 @@ where
None,
)
.await
.unwrap();
.unwrap_err();
EthApiClient::<Transaction, Block, Receipt>::syncing(client).await.unwrap();
EthApiClient::<Transaction, Block, Receipt>::send_transaction(
client,
@ -368,13 +368,15 @@ where
.unwrap_err();
TraceApiClient::trace_call_many(client, vec![], Some(BlockNumberOrTag::Latest.into()))
.await
.unwrap();
.unwrap_err();
TraceApiClient::replay_transaction(client, B256::default(), HashSet::default())
.await
.err()
.unwrap();
TraceApiClient::trace_block(client, block_id).await.unwrap();
TraceApiClient::replay_block_transactions(client, block_id, HashSet::default()).await.unwrap();
TraceApiClient::trace_block(client, block_id).await.unwrap_err();
TraceApiClient::replay_block_transactions(client, block_id, HashSet::default())
.await
.unwrap_err();
TraceApiClient::trace_filter(client, trace_filter).await.unwrap();
}

View File

@ -12,8 +12,8 @@ use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_errors::RethError;
use reth_evm::ConfigureEvmEnv;
use reth_provider::{
BlockIdReader, BlockNumReader, ChainSpecProvider, StateProvider, StateProviderBox,
StateProviderFactory,
BlockIdReader, BlockNumReader, ChainSpecProvider, EvmEnvProvider as _, StateProvider,
StateProviderBox, StateProviderFactory,
};
use reth_rpc_eth_types::{EthApiError, PendingBlockEnv, RpcInvalidTransactionError};
use reth_transaction_pool::TransactionPool;
@ -229,12 +229,15 @@ pub trait LoadState:
.block_hash_for_id(at)
.map_err(Self::Error::from_eth_err)?
.ok_or(EthApiError::HeaderNotFound(at))?;
let (cfg, env) = self
.cache()
.get_evm_env(block_hash)
.await
let header =
self.cache().get_header(block_hash).await.map_err(Self::Error::from_eth_err)?;
let evm_config = self.evm_config().clone();
let (cfg, block_env) = self
.provider()
.env_with_header(&header, evm_config)
.map_err(Self::Error::from_eth_err)?;
Ok((cfg, env, block_hash.into()))
Ok((cfg, block_env, block_hash.into()))
}
}
}

View File

@ -15,7 +15,6 @@ workspace = true
reth-chainspec.workspace = true
reth-chain-state.workspace = true
reth-errors.workspace = true
reth-evm.workspace = true
reth-execution-types.workspace = true
reth-metrics.workspace = true
reth-primitives = { workspace = true, features = ["secp256k1"] }

View File

@ -3,7 +3,7 @@
use serde::{Deserialize, Serialize};
use reth_rpc_server_types::constants::cache::{
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_ENV_CACHE_MAX_LEN,
DEFAULT_BLOCK_CACHE_MAX_LEN, DEFAULT_CONCURRENT_DB_REQUESTS, DEFAULT_HEADER_CACHE_MAX_LEN,
DEFAULT_RECEIPT_CACHE_MAX_LEN,
};
@ -19,10 +19,10 @@ pub struct EthStateCacheConfig {
///
/// Default is 2000.
pub max_receipts: u32,
/// Max number of bytes for cached env data.
/// Max number of headers in cache.
///
/// Default is 1000.
pub max_envs: u32,
pub max_headers: u32,
/// Max number of concurrent database requests.
///
/// Default is 512.
@ -34,7 +34,7 @@ impl Default for EthStateCacheConfig {
Self {
max_blocks: DEFAULT_BLOCK_CACHE_MAX_LEN,
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_envs: DEFAULT_ENV_CACHE_MAX_LEN,
max_headers: DEFAULT_HEADER_CACHE_MAX_LEN,
max_concurrent_db_requests: DEFAULT_CONCURRENT_DB_REQUESTS,
}
}

View File

@ -1,17 +1,16 @@
//! Async caching support for eth RPC
use super::{EthStateCacheConfig, MultiConsumerLruCache};
use alloy_consensus::Header;
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::B256;
use futures::{future::Either, Stream, StreamExt};
use reth_chain_state::CanonStateNotification;
use reth_errors::{ProviderError, ProviderResult};
use reth_evm::{provider::EvmEnvProvider, ConfigureEvm};
use reth_execution_types::Chain;
use reth_primitives::{Receipt, SealedBlockWithSenders, TransactionSigned};
use reth_storage_api::{BlockReader, StateProviderFactory, TransactionVariant};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use revm::primitives::{BlockEnv, CfgEnv, CfgEnvWithHandlerCfg, SpecId};
use schnellru::{ByLength, Limiter};
use std::{
future::Future,
@ -25,8 +24,6 @@ use tokio::sync::{
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use super::{EthStateCacheConfig, MultiConsumerLruCache};
pub mod config;
pub mod db;
pub mod metrics;
@ -43,8 +40,8 @@ type BlockWithSendersResponseSender =
/// The type that can send the response to the requested receipts of a block.
type ReceiptsResponseSender = oneshot::Sender<ProviderResult<Option<Arc<Vec<Receipt>>>>>;
/// The type that can send the response to a requested env
type EnvResponseSender = oneshot::Sender<ProviderResult<(CfgEnvWithHandlerCfg, BlockEnv)>>;
/// The type that can send the response to a requested header
type HeaderResponseSender = oneshot::Sender<ProviderResult<Header>>;
type BlockLruCache<L> = MultiConsumerLruCache<
B256,
@ -56,8 +53,7 @@ type BlockLruCache<L> = MultiConsumerLruCache<
type ReceiptsLruCache<L> =
MultiConsumerLruCache<B256, Arc<Vec<Receipt>>, L, ReceiptsResponseSender>;
type EnvLruCache<L> =
MultiConsumerLruCache<B256, (CfgEnvWithHandlerCfg, BlockEnv), L, EnvResponseSender>;
type HeaderLruCache<L> = MultiConsumerLruCache<B256, Header, L, HeaderResponseSender>;
/// Provides async access to cached eth data
///
@ -70,26 +66,24 @@ pub struct EthStateCache {
impl EthStateCache {
/// Creates and returns both [`EthStateCache`] frontend and the memory bound service.
fn create<Provider, Tasks, EvmConfig>(
fn create<Provider, Tasks>(
provider: Provider,
action_task_spawner: Tasks,
evm_config: EvmConfig,
max_blocks: u32,
max_receipts: u32,
max_envs: u32,
max_headers: u32,
max_concurrent_db_operations: usize,
) -> (Self, EthStateCacheService<Provider, Tasks, EvmConfig>) {
) -> (Self, EthStateCacheService<Provider, Tasks>) {
let (to_service, rx) = unbounded_channel();
let service = EthStateCacheService {
provider,
full_block_cache: BlockLruCache::new(max_blocks, "blocks"),
receipts_cache: ReceiptsLruCache::new(max_receipts, "receipts"),
evm_env_cache: EnvLruCache::new(max_envs, "evm_env"),
headers_cache: HeaderLruCache::new(max_headers, "headers"),
action_tx: to_service.clone(),
action_rx: UnboundedReceiverStream::new(rx),
action_task_spawner,
rate_limiter: Arc::new(Semaphore::new(max_concurrent_db_operations)),
evm_config,
};
let cache = Self { to_service };
(cache, service)
@ -99,52 +93,46 @@ impl EthStateCache {
/// [`tokio::spawn`].
///
/// See also [`Self::spawn_with`]
pub fn spawn<Provider, EvmConfig>(
provider: Provider,
config: EthStateCacheConfig,
evm_config: EvmConfig,
) -> Self
pub fn spawn<Provider>(provider: Provider, config: EthStateCacheConfig) -> Self
where
Provider: StateProviderFactory
+ BlockReader<Block = reth_primitives::Block, Receipt = reth_primitives::Receipt>
+ EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
EvmConfig: ConfigureEvm<Header = Header>,
{
Self::spawn_with(provider, config, TokioTaskExecutor::default(), evm_config)
Self::spawn_with(provider, config, TokioTaskExecutor::default())
}
/// Creates a new async LRU backed cache service task and spawns it to a new task via the given
/// spawner.
///
/// The cache is memory limited by the given max bytes values.
pub fn spawn_with<Provider, Tasks, EvmConfig>(
pub fn spawn_with<Provider, Tasks>(
provider: Provider,
config: EthStateCacheConfig,
executor: Tasks,
evm_config: EvmConfig,
) -> Self
where
Provider: StateProviderFactory
+ BlockReader<Block = reth_primitives::Block, Receipt = reth_primitives::Receipt>
+ EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Tasks: TaskSpawner + Clone + 'static,
EvmConfig: ConfigureEvm<Header = Header>,
{
let EthStateCacheConfig { max_blocks, max_receipts, max_envs, max_concurrent_db_requests } =
config;
let EthStateCacheConfig {
max_blocks,
max_receipts,
max_headers,
max_concurrent_db_requests,
} = config;
let (this, service) = Self::create(
provider,
executor.clone(),
evm_config,
max_blocks,
max_receipts,
max_envs,
max_headers,
max_concurrent_db_requests,
);
executor.spawn_critical("eth state cache", Box::pin(service));
@ -188,16 +176,12 @@ impl EthStateCache {
Ok(block.zip(receipts))
}
/// Requests the evm env config for the block hash.
/// Requests the header for the given hash.
///
/// Returns an error if the corresponding header (required for populating the envs) was not
/// found.
pub async fn get_evm_env(
&self,
block_hash: B256,
) -> ProviderResult<(CfgEnvWithHandlerCfg, BlockEnv)> {
/// Returns an error if the header is not found.
pub async fn get_header(&self, block_hash: B256) -> ProviderResult<Header> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetEnv { block_hash, response_tx });
let _ = self.to_service.send(CacheAction::GetHeader { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
}
}
@ -222,14 +206,13 @@ impl EthStateCache {
pub(crate) struct EthStateCacheService<
Provider,
Tasks,
EvmConfig,
LimitBlocks = ByLength,
LimitReceipts = ByLength,
LimitEnvs = ByLength,
LimitHeaders = ByLength,
> where
LimitBlocks: Limiter<B256, Arc<SealedBlockWithSenders>>,
LimitReceipts: Limiter<B256, Arc<Vec<Receipt>>>,
LimitEnvs: Limiter<B256, (CfgEnvWithHandlerCfg, BlockEnv)>,
LimitHeaders: Limiter<B256, Header>,
{
/// The type used to lookup data from disk
provider: Provider,
@ -237,8 +220,11 @@ pub(crate) struct EthStateCacheService<
full_block_cache: BlockLruCache<LimitBlocks>,
/// The LRU cache for full blocks grouped by their hash.
receipts_cache: ReceiptsLruCache<LimitReceipts>,
/// The LRU cache for revm environments
evm_env_cache: EnvLruCache<LimitEnvs>,
/// The LRU cache for headers.
///
/// Headers are cached because they are required to populate the environment for execution
/// (evm).
headers_cache: HeaderLruCache<LimitHeaders>,
/// Sender half of the action channel.
action_tx: UnboundedSender<CacheAction>,
/// Receiver half of the action channel.
@ -247,15 +233,12 @@ pub(crate) struct EthStateCacheService<
action_task_spawner: Tasks,
/// Rate limiter
rate_limiter: Arc<Semaphore>,
/// The type that determines how to configure the EVM.
evm_config: EvmConfig,
}
impl<Provider, Tasks, EvmConfig> EthStateCacheService<Provider, Tasks, EvmConfig>
impl<Provider, Tasks> EthStateCacheService<Provider, Tasks>
where
Provider: StateProviderFactory + BlockReader + EvmEnvProvider + Clone + Unpin + 'static,
Provider: StateProviderFactory + BlockReader + Clone + Unpin + 'static,
Tasks: TaskSpawner + Clone + 'static,
EvmConfig: ConfigureEvm<Header = Header>,
{
fn on_new_block(
&mut self,
@ -341,20 +324,18 @@ where
fn update_cached_metrics(&self) {
self.full_block_cache.update_cached_metrics();
self.receipts_cache.update_cached_metrics();
self.evm_env_cache.update_cached_metrics();
self.headers_cache.update_cached_metrics();
}
}
impl<Provider, Tasks, EvmConfig> Future for EthStateCacheService<Provider, Tasks, EvmConfig>
impl<Provider, Tasks> Future for EthStateCacheService<Provider, Tasks>
where
Provider: StateProviderFactory
+ BlockReader<Block = reth_primitives::Block, Receipt = reth_primitives::Receipt>
+ EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Tasks: TaskSpawner + Clone + 'static,
EvmConfig: ConfigureEvm<Header = Header>,
{
type Output = ();
@ -421,39 +402,30 @@ where
}));
}
}
CacheAction::GetEnv { block_hash, response_tx } => {
// check if env data is cached
if let Some(env) = this.evm_env_cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(env));
CacheAction::GetHeader { block_hash, response_tx } => {
// check if the header is cached
if let Some(header) = this.headers_cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(header));
continue
}
// env data is not in the cache, request it if this is the first
// header is not in the cache, request it if this is the first
// consumer
if this.evm_env_cache.queue(block_hash, response_tx) {
if this.headers_cache.queue(block_hash, response_tx) {
let provider = this.provider.clone();
let action_tx = this.action_tx.clone();
let rate_limiter = this.rate_limiter.clone();
let evm_config = this.evm_config.clone();
this.action_task_spawner.spawn_blocking(Box::pin(async move {
// Acquire permit
let _permit = rate_limiter.acquire().await;
let mut cfg = CfgEnvWithHandlerCfg::new_with_spec_id(
CfgEnv::default(),
SpecId::LATEST,
);
let mut block_env = BlockEnv::default();
let res = provider
.fill_env_at(
&mut cfg,
&mut block_env,
block_hash.into(),
evm_config,
)
.map(|_| (cfg, block_env));
let _ = action_tx.send(CacheAction::EnvResult {
let header = provider.header(&block_hash).and_then(|header| {
header.ok_or_else(|| {
ProviderError::HeaderNotFound(block_hash.into())
})
});
let _ = action_tx.send(CacheAction::HeaderResult {
block_hash,
res: Box::new(res),
res: Box::new(header),
});
}));
}
@ -472,18 +444,18 @@ where
this.on_new_block(block_hash, Err(e));
}
},
CacheAction::EnvResult { block_hash, res } => {
CacheAction::HeaderResult { block_hash, res } => {
let res = *res;
if let Some(queued) = this.evm_env_cache.remove(&block_hash) {
if let Some(queued) = this.headers_cache.remove(&block_hash) {
// send the response to queued senders
for tx in queued {
let _ = tx.send(res.clone());
}
}
// cache good env data
// cache good header
if let Ok(data) = res {
this.evm_env_cache.insert(block_hash, data);
this.headers_cache.insert(block_hash, data);
}
}
CacheAction::CacheNewCanonicalChain { chain_change } => {
@ -528,9 +500,9 @@ enum CacheAction {
block_hash: B256,
response_tx: BlockWithSendersResponseSender,
},
GetEnv {
GetHeader {
block_hash: B256,
response_tx: EnvResponseSender,
response_tx: HeaderResponseSender,
},
GetReceipts {
block_hash: B256,
@ -544,9 +516,9 @@ enum CacheAction {
block_hash: B256,
res: ProviderResult<Option<Arc<Vec<Receipt>>>>,
},
EnvResult {
HeaderResult {
block_hash: B256,
res: Box<ProviderResult<(CfgEnvWithHandlerCfg, BlockEnv)>>,
res: Box<ProviderResult<Header>>,
},
CacheNewCanonicalChain {
chain_change: ChainChange,

View File

@ -113,8 +113,8 @@ pub mod cache {
/// Default cache size for the receipts cache: 2000 receipts.
pub const DEFAULT_RECEIPT_CACHE_MAX_LEN: u32 = 2000;
/// Default cache size for the env cache: 1000 envs.
pub const DEFAULT_ENV_CACHE_MAX_LEN: u32 = 1000;
/// Default cache size for the header cache: 1000 headers.
pub const DEFAULT_HEADER_CACHE_MAX_LEN: u32 = 1000;
/// Default number of concurrent database requests.
pub const DEFAULT_CONCURRENT_DB_REQUESTS: usize = 512;

View File

@ -450,7 +450,7 @@ mod tests {
provider: P,
) -> EthApi<P, TestPool, NoopNetwork, EthEvmConfig> {
let evm_config = EthEvmConfig::new(provider.chain_spec());
let cache = EthStateCache::spawn(provider.clone(), Default::default(), evm_config.clone());
let cache = EthStateCache::spawn(provider.clone(), Default::default());
let fee_history_cache =
FeeHistoryCache::new(cache.clone(), FeeHistoryCacheConfig::default());

View File

@ -52,8 +52,7 @@ mod tests {
let pool = testing_pool();
let evm_config = EthEvmConfig::new(MAINNET.clone());
let cache =
EthStateCache::spawn(NoopProvider::default(), Default::default(), evm_config.clone());
let cache = EthStateCache::spawn(NoopProvider::default(), Default::default());
EthApi::new(
NoopProvider::default(),
pool,
@ -79,8 +78,7 @@ mod tests {
let evm_config = EthEvmConfig::new(mock_provider.chain_spec());
mock_provider.extend_accounts(accounts);
let cache =
EthStateCache::spawn(mock_provider.clone(), Default::default(), evm_config.clone());
let cache = EthStateCache::spawn(mock_provider.clone(), Default::default());
EthApi::new(
mock_provider.clone(),
pool,

View File

@ -57,7 +57,7 @@ mod tests {
let pool = testing_pool();
let evm_config = EthEvmConfig::new(noop_provider.chain_spec());
let cache = EthStateCache::spawn(noop_provider, Default::default(), evm_config.clone());
let cache = EthStateCache::spawn(noop_provider, Default::default());
let fee_history_cache =
FeeHistoryCache::new(cache.clone(), FeeHistoryCacheConfig::default());
let eth_api = EthApi::new(