perf: introduce moka cached state provider (#12214)

This commit is contained in:
Dan Cline
2025-01-08 11:11:20 -05:00
committed by GitHub
parent 73ed3ea440
commit d336ceb27e
5 changed files with 444 additions and 130 deletions

View File

@ -49,6 +49,7 @@ revm-primitives.workspace = true
futures.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["macros", "sync"] }
moka = { workspace = true, features = ["sync"] }
# metrics
metrics.workspace = true

View File

@ -0,0 +1,288 @@
//! Implements a state provider that has a shared cache in front of it.
use alloy_primitives::{map::B256HashMap, Address, StorageKey, StorageValue, B256};
use metrics::Gauge;
use moka::sync::CacheBuilder;
use reth_errors::ProviderResult;
use reth_metrics::Metrics;
use reth_primitives::{Account, Bytecode};
use reth_provider::{
AccountReader, BlockHashReader, HashedPostStateProvider, StateProofProvider, StateProvider,
StateRootProvider, StorageRootProvider,
};
use reth_trie::{
updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
};
use revm_primitives::map::DefaultHashBuilder;
type Cache<K, V> = moka::sync::Cache<K, V, alloy_primitives::map::DefaultHashBuilder>;
/// A wrapper of a state provider and a shared cache.
pub(crate) struct CachedStateProvider<S> {
/// The state provider
state_provider: S,
/// The caches used for the provider
caches: ProviderCaches,
/// Metrics for the cached state provider
metrics: CachedStateMetrics,
}
impl<S> CachedStateProvider<S>
where
S: StateProvider,
{
/// Creates a new [`CachedStateProvider`] from a [`ProviderCaches`], state provider, and
/// [`CachedStateMetrics`].
pub(crate) const fn new_with_caches(
state_provider: S,
caches: ProviderCaches,
metrics: CachedStateMetrics,
) -> Self {
Self { state_provider, caches, metrics }
}
}
/// Metrics for the cached state provider, showing hits / misses for each cache
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.caching")]
pub(crate) struct CachedStateMetrics {
/// Code cache hits
code_cache_hits: Gauge,
/// Code cache misses
code_cache_misses: Gauge,
/// Storage cache hits
storage_cache_hits: Gauge,
/// Storage cache misses
storage_cache_misses: Gauge,
/// Account cache hits
account_cache_hits: Gauge,
/// Account cache misses
account_cache_misses: Gauge,
}
impl CachedStateMetrics {
/// Sets all values to zero, indicating that a new block is being executed.
pub(crate) fn reset(&self) {
// code cache
self.code_cache_hits.set(0);
self.code_cache_misses.set(0);
// storage cache
self.storage_cache_hits.set(0);
self.storage_cache_misses.set(0);
// account cache
self.account_cache_hits.set(0);
self.account_cache_misses.set(0);
}
/// Returns a new zeroed-out instance of [`CachedStateMetrics`].
pub(crate) fn zeroed() -> Self {
let zeroed = Self::default();
zeroed.reset();
zeroed
}
}
impl<S: AccountReader> AccountReader for CachedStateProvider<S> {
fn basic_account(&self, address: &Address) -> ProviderResult<Option<Account>> {
if let Some(res) = self.caches.account_cache.get(address) {
self.metrics.account_cache_hits.increment(1);
return Ok(res)
}
self.metrics.account_cache_misses.increment(1);
let res = self.state_provider.basic_account(address)?;
self.caches.account_cache.insert(*address, res);
Ok(res)
}
}
impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
fn storage(
&self,
account: Address,
storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
if let Some(res) = self.caches.storage_cache.get(&(account, storage_key)) {
self.metrics.storage_cache_hits.increment(1);
return Ok(res)
}
self.metrics.storage_cache_misses.increment(1);
let final_res = self.state_provider.storage(account, storage_key)?;
self.caches.storage_cache.insert((account, storage_key), final_res);
Ok(final_res)
}
fn bytecode_by_hash(&self, code_hash: &B256) -> ProviderResult<Option<Bytecode>> {
if let Some(res) = self.caches.code_cache.get(code_hash) {
self.metrics.code_cache_hits.increment(1);
return Ok(res)
}
self.metrics.code_cache_misses.increment(1);
let final_res = self.state_provider.bytecode_by_hash(code_hash)?;
self.caches.code_cache.insert(*code_hash, final_res.clone());
Ok(final_res)
}
}
impl<S: StateRootProvider> StateRootProvider for CachedStateProvider<S> {
fn state_root(&self, hashed_state: HashedPostState) -> ProviderResult<B256> {
self.state_provider.state_root(hashed_state)
}
fn state_root_from_nodes(&self, input: TrieInput) -> ProviderResult<B256> {
self.state_provider.state_root_from_nodes(input)
}
fn state_root_from_nodes_with_updates(
&self,
input: TrieInput,
) -> ProviderResult<(B256, TrieUpdates)> {
self.state_provider.state_root_from_nodes_with_updates(input)
}
fn state_root_with_updates(
&self,
hashed_state: HashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
self.state_provider.state_root_with_updates(hashed_state)
}
}
impl<S: StateProofProvider> StateProofProvider for CachedStateProvider<S> {
fn proof(
&self,
input: TrieInput,
address: Address,
slots: &[B256],
) -> ProviderResult<AccountProof> {
self.state_provider.proof(input, address, slots)
}
fn multiproof(
&self,
input: TrieInput,
targets: MultiProofTargets,
) -> ProviderResult<MultiProof> {
self.state_provider.multiproof(input, targets)
}
fn witness(
&self,
input: TrieInput,
target: HashedPostState,
) -> ProviderResult<B256HashMap<alloy_primitives::Bytes>> {
self.state_provider.witness(input, target)
}
}
impl<S: StorageRootProvider> StorageRootProvider for CachedStateProvider<S> {
fn storage_root(
&self,
address: Address,
hashed_storage: HashedStorage,
) -> ProviderResult<B256> {
self.state_provider.storage_root(address, hashed_storage)
}
fn storage_proof(
&self,
address: Address,
slot: B256,
hashed_storage: HashedStorage,
) -> ProviderResult<StorageProof> {
self.state_provider.storage_proof(address, slot, hashed_storage)
}
fn storage_multiproof(
&self,
address: Address,
slots: &[B256],
hashed_storage: HashedStorage,
) -> ProviderResult<StorageMultiProof> {
self.state_provider.storage_multiproof(address, slots, hashed_storage)
}
}
impl<S: BlockHashReader> BlockHashReader for CachedStateProvider<S> {
fn block_hash(&self, number: alloy_primitives::BlockNumber) -> ProviderResult<Option<B256>> {
self.state_provider.block_hash(number)
}
fn canonical_hashes_range(
&self,
start: alloy_primitives::BlockNumber,
end: alloy_primitives::BlockNumber,
) -> ProviderResult<Vec<B256>> {
self.state_provider.canonical_hashes_range(start, end)
}
}
impl<S: HashedPostStateProvider> HashedPostStateProvider for CachedStateProvider<S> {
fn hashed_post_state(&self, bundle_state: &reth_revm::db::BundleState) -> HashedPostState {
self.state_provider.hashed_post_state(bundle_state)
}
}
/// The set of caches that are used in the [`CachedStateProvider`].
#[derive(Debug, Clone)]
pub(crate) struct ProviderCaches {
/// The cache for bytecode
code_cache: Cache<B256, Option<Bytecode>>,
/// The cache for storage
storage_cache: Cache<(Address, StorageKey), Option<StorageValue>>,
/// The cache for basic accounts
account_cache: Cache<Address, Option<Account>>,
}
/// A builder for [`ProviderCaches`].
#[derive(Debug)]
pub(crate) struct ProviderCacheBuilder {
/// Code cache size
code_cache_size: u64,
/// Storage cache size
storage_cache_size: u64,
/// Account cache size
account_cache_size: u64,
}
impl ProviderCacheBuilder {
/// Build a [`ProviderCaches`] struct, so that provider caches can be easily cloned.
pub(crate) fn build_caches(self) -> ProviderCaches {
ProviderCaches {
code_cache: CacheBuilder::new(self.code_cache_size)
.build_with_hasher(DefaultHashBuilder::default()),
storage_cache: CacheBuilder::new(self.storage_cache_size)
.build_with_hasher(DefaultHashBuilder::default()),
account_cache: CacheBuilder::new(self.account_cache_size)
.build_with_hasher(DefaultHashBuilder::default()),
}
}
}
impl Default for ProviderCacheBuilder {
fn default() -> Self {
// moka caches have been benchmarked up to 800k entries, so we just use 1M, optimizing for
// hitrate over memory consumption.
//
// See: https://github.com/moka-rs/moka/wiki#admission-and-eviction-policies
Self { code_cache_size: 1000000, storage_cache_size: 1000000, account_cache_size: 1000000 }
}
}

View File

@ -3,7 +3,10 @@ use crate::{
chain::FromOrchestrator,
engine::{DownloadRequest, EngineApiEvent, EngineApiKind, EngineApiRequest, FromEngine},
persistence::PersistenceHandle,
tree::metrics::EngineApiMetrics,
tree::{
cached_state::{CachedStateMetrics, CachedStateProvider, ProviderCacheBuilder},
metrics::EngineApiMetrics,
},
};
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumHash;
@ -74,6 +77,7 @@ use tokio::sync::{
use tracing::*;
mod block_buffer;
mod cached_state;
pub mod config;
pub mod error;
mod invalid_block_hook;
@ -2249,6 +2253,13 @@ where
return Err(e.into())
}
// Use cached state provider before executing, this does nothing currently, will be used in
// prewarming
let caches = ProviderCacheBuilder::default().build_caches();
let cache_metrics = CachedStateMetrics::zeroed();
let state_provider =
CachedStateProvider::new_with_caches(state_provider, caches, cache_metrics);
trace!(target: "engine::tree", block=?block.num_hash(), "Executing block");
let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));