perf(tree): add cross-block caching (#13769)

Co-authored-by: Federico Gimenez <fgimenez@users.noreply.github.com>
Co-authored-by: Federico Gimenez <federico.gimenez@gmail.com>
This commit is contained in:
Dan Cline
2025-02-06 12:38:03 -05:00
committed by GitHub
parent 98266f87da
commit c9169705e2
5 changed files with 515 additions and 31 deletions

View File

@ -48,7 +48,7 @@ revm-primitives.workspace = true
futures.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["macros", "sync"] }
moka = { workspace = true, features = ["sync"] }
mini-moka = { workspace = true, features = ["sync"] }
# metrics
metrics.workspace = true

View File

@ -1,7 +1,7 @@
//! Implements a state provider that has a shared cache in front of it.
use alloy_primitives::{map::B256HashMap, Address, StorageKey, StorageValue, B256};
use metrics::Gauge;
use moka::sync::CacheBuilder;
use mini_moka::sync::CacheBuilder;
use reth_errors::ProviderResult;
use reth_metrics::Metrics;
use reth_primitives_traits::{Account, Bytecode};
@ -9,13 +9,17 @@ use reth_provider::{
AccountReader, BlockHashReader, HashedPostStateProvider, StateProofProvider, StateProvider,
StateRootProvider, StorageRootProvider,
};
use reth_revm::db::BundleState;
use reth_trie::{
updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, MultiProof,
MultiProofTargets, StorageMultiProof, StorageProof, TrieInput,
};
use revm_primitives::map::DefaultHashBuilder;
use std::time::{Duration, Instant};
use tracing::{debug, trace};
pub(crate) type Cache<K, V> = moka::sync::Cache<K, V, alloy_primitives::map::DefaultHashBuilder>;
pub(crate) type Cache<K, V> =
mini_moka::sync::Cache<K, V, alloy_primitives::map::DefaultHashBuilder>;
/// A wrapper of a state provider and a shared cache.
pub(crate) struct CachedStateProvider<S> {
@ -44,6 +48,73 @@ where
}
}
impl<S> CachedStateProvider<S> {
/// Creates a new [`SavedCache`] from the given state updates and executed block hash.
///
/// This does not update the code cache, because no changes are required to the code cache on
/// state change.
///
/// NOTE: Consumers should ensure that these caches are not in use by a state provider for a
/// previous block - otherwise, this update will cause that state provider to contain future
/// state, which would be incorrect.
pub(crate) fn save_cache(
self,
executed_block_hash: B256,
state_updates: &BundleState,
) -> Result<SavedCache, ()> {
let Self { caches, metrics, state_provider: _ } = self;
let start = Instant::now();
for (addr, account) in &state_updates.state {
// If the account was not modified, as in not changed and not destroyed, then we have
// nothing to do w.r.t. this particular account and can move on
if account.status.is_not_modified() {
continue
}
// if the account was destroyed, invalidate from the account / storage caches
if account.was_destroyed() {
// invalidate the account cache entry if destroyed
caches.account_cache.invalidate(addr);
caches.invalidate_account_storage(addr);
continue
}
// if we have an account that was modified, but it has a `None` account info, some wild
// error has occurred because this state should be unrepresentable. An account with
// `None` current info, should be destroyed.
let Some(ref account_info) = account.info else {
trace!(target: "engine::caching", ?account, "Account with None account info found in state updates");
return Err(())
};
// insert will update if present, so we just use the new account info as the new value
// for the account cache
caches.account_cache.insert(*addr, Some(Account::from(account_info)));
// now we iterate over all storage and make updates to the cached storage values
for (storage_key, slot) in &account.storage {
// we convert the storage key from U256 to B256 because that is how it's represented
// in the cache
caches.insert_storage(*addr, (*storage_key).into(), Some(slot.present_value));
}
}
// set metrics
metrics.storage_cache_size.set(caches.total_storage_slots() as f64);
metrics.account_cache_size.set(caches.account_cache.entry_count() as f64);
metrics.code_cache_size.set(caches.code_cache.entry_count() as f64);
debug!(target: "engine::caching", update_latency=?start.elapsed(), "Updated state caches");
// create a saved cache with the executed block hash, same metrics, and updated caches
let saved_cache = SavedCache { hash: executed_block_hash, caches, metrics };
Ok(saved_cache)
}
}
/// Metrics for the cached state provider, showing hits / misses for each cache
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.caching")]
@ -54,17 +125,35 @@ pub(crate) struct CachedStateMetrics {
/// Code cache misses
code_cache_misses: Gauge,
/// Code cache size
///
/// NOTE: this uses the moka caches' `entry_count`, NOT the `weighted_size` method to calculate
/// size.
code_cache_size: Gauge,
/// Storage cache hits
storage_cache_hits: Gauge,
/// Storage cache misses
storage_cache_misses: Gauge,
/// Storage cache size
///
/// NOTE: this uses the moka caches' `entry_count`, NOT the `weighted_size` method to calculate
/// size.
storage_cache_size: Gauge,
/// Account cache hits
account_cache_hits: Gauge,
/// Account cache misses
account_cache_misses: Gauge,
/// Account cache size
///
/// NOTE: this uses the moka caches' `entry_count`, NOT the `weighted_size` method to calculate
/// size.
account_cache_size: Gauge,
}
impl CachedStateMetrics {
@ -112,7 +201,7 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
account: Address,
storage_key: StorageKey,
) -> ProviderResult<Option<StorageValue>> {
if let Some(res) = self.caches.storage_cache.get(&(account, storage_key)) {
if let Some(res) = self.caches.get_storage(&account, &storage_key) {
self.metrics.storage_cache_hits.increment(1);
return Ok(res)
}
@ -120,7 +209,7 @@ impl<S: StateProvider> StateProvider for CachedStateProvider<S> {
self.metrics.storage_cache_misses.increment(1);
let final_res = self.state_provider.storage(account, storage_key)?;
self.caches.storage_cache.insert((account, storage_key), final_res);
self.caches.insert_storage(account, storage_key, final_res);
Ok(final_res)
}
@ -243,13 +332,48 @@ pub(crate) struct ProviderCaches {
/// The cache for bytecode
code_cache: Cache<B256, Option<Bytecode>>,
/// The cache for storage
storage_cache: Cache<(Address, StorageKey), Option<StorageValue>>,
/// The cache for storage, organized hierarchically by account
storage_cache: Cache<Address, AccountStorageCache>,
/// The cache for basic accounts
account_cache: Cache<Address, Option<Account>>,
}
impl ProviderCaches {
/// Get storage value from hierarchical cache
pub(crate) fn get_storage(
&self,
address: &Address,
key: &StorageKey,
) -> Option<Option<StorageValue>> {
self.storage_cache.get(address).and_then(|account_cache| account_cache.get_storage(key))
}
/// Insert storage value into hierarchical cache
pub(crate) fn insert_storage(
&self,
address: Address,
key: StorageKey,
value: Option<StorageValue>,
) {
let account_cache = self.storage_cache.get(&address).unwrap_or_default();
account_cache.insert_storage(key, value);
self.storage_cache.insert(address, account_cache);
}
/// Invalidate storage for specific account
pub(crate) fn invalidate_account_storage(&self, address: &Address) {
self.storage_cache.invalidate(address);
}
/// Returns the total number of storage slots cached across all accounts
pub(crate) fn total_storage_slots(&self) -> usize {
self.storage_cache.iter().map(|addr| addr.len()).sum()
}
}
/// A builder for [`ProviderCaches`].
#[derive(Debug)]
pub(crate) struct ProviderCacheBuilder {
@ -266,23 +390,252 @@ pub(crate) struct ProviderCacheBuilder {
impl ProviderCacheBuilder {
/// Build a [`ProviderCaches`] struct, so that provider caches can be easily cloned.
pub(crate) fn build_caches(self) -> ProviderCaches {
ProviderCaches {
code_cache: CacheBuilder::new(self.code_cache_size)
.build_with_hasher(DefaultHashBuilder::default()),
storage_cache: CacheBuilder::new(self.storage_cache_size)
.build_with_hasher(DefaultHashBuilder::default()),
account_cache: CacheBuilder::new(self.account_cache_size)
.build_with_hasher(DefaultHashBuilder::default()),
}
// TODO: the total cache size could be a CLI configuration parameter.
const TOTAL_CACHE_SIZE: u64 = 4 * 1024 * 1024 * 1024; // 4GB
let storage_cache_size = (TOTAL_CACHE_SIZE * 8888) / 10000; // 88.88% of total
let account_cache_size = (TOTAL_CACHE_SIZE * 556) / 10000; // 5.56% of total
let code_cache_size = (TOTAL_CACHE_SIZE * 556) / 10000; // 5.56% of total
const EXPIRY_TIME: Duration = Duration::from_secs(7200); // 2 hours
const TIME_TO_IDLE: Duration = Duration::from_secs(3600); // 1 hour
let storage_cache = CacheBuilder::new(self.storage_cache_size)
.weigher(|_key: &Address, value: &AccountStorageCache| -> u32 {
// values based on results from measure_storage_cache_overhead test
let base_weight = 39_000;
let slots_weight = value.len() * 218;
(base_weight + slots_weight) as u32
})
.max_capacity(storage_cache_size)
.time_to_live(EXPIRY_TIME)
.time_to_idle(TIME_TO_IDLE)
.build_with_hasher(DefaultHashBuilder::default());
let account_cache = CacheBuilder::new(self.account_cache_size)
.weigher(|_key: &Address, value: &Option<Account>| -> u32 {
match value {
Some(account) => {
let mut weight = 40;
if account.nonce != 0 {
weight += 32;
}
if !account.balance.is_zero() {
weight += 32;
}
if account.bytecode_hash.is_some() {
weight += 33; // size of Option<B256>
} else {
weight += 8; // size of None variant
}
weight as u32
}
None => 8, // size of None variant
}
})
.max_capacity(account_cache_size)
.time_to_live(EXPIRY_TIME)
.time_to_idle(TIME_TO_IDLE)
.build_with_hasher(DefaultHashBuilder::default());
let code_cache = CacheBuilder::new(self.code_cache_size)
.weigher(|_key: &B256, value: &Option<Bytecode>| -> u32 {
match value {
Some(bytecode) => {
// base weight + actual bytecode size
(40 + bytecode.len()) as u32
}
None => 8, // size of None variant
}
})
.max_capacity(code_cache_size)
.time_to_live(EXPIRY_TIME)
.time_to_idle(TIME_TO_IDLE)
.build_with_hasher(DefaultHashBuilder::default());
ProviderCaches { code_cache, storage_cache, account_cache }
}
}
impl Default for ProviderCacheBuilder {
fn default() -> Self {
// moka caches have been benchmarked up to 800k entries, so we just use 1M, optimizing for
// hitrate over memory consumption.
// With weigher and max_capacity in place, these numbers represent
// the maximum number of entries that can be stored, not the actual
// memory usage which is controlled by max_capacity.
//
// See: https://github.com/moka-rs/moka/wiki#admission-and-eviction-policies
Self { code_cache_size: 1000000, storage_cache_size: 1000000, account_cache_size: 1000000 }
// Code cache: up to 10M entries but limited to 0.5GB
// Storage cache: up to 10M accounts but limited to 8GB
// Account cache: up to 10M accounts but limited to 0.5GB
Self {
code_cache_size: 10_000_000,
storage_cache_size: 10_000_000,
account_cache_size: 10_000_000,
}
}
}
/// A saved cache that has been used for executing a specific block, which has been updated for its
/// execution.
#[derive(Debug)]
pub(crate) struct SavedCache {
/// The hash of the block these caches were used to execute.
hash: B256,
/// The caches used for the provider.
caches: ProviderCaches,
/// Metrics for the cached state provider
metrics: CachedStateMetrics,
}
impl SavedCache {
/// Returns the hash for this cache
pub(crate) const fn executed_block_hash(&self) -> B256 {
self.hash
}
/// Splits the cache into its caches and metrics, consuming it.
pub(crate) fn split(self) -> (ProviderCaches, CachedStateMetrics) {
(self.caches, self.metrics)
}
}
/// Cache for an account's storage slots
#[derive(Debug, Clone)]
pub(crate) struct AccountStorageCache {
/// The storage slots for this account
slots: Cache<StorageKey, Option<StorageValue>>,
}
impl AccountStorageCache {
/// Create a new [`AccountStorageCache`]
pub(crate) fn new(max_slots: u64) -> Self {
Self {
slots: CacheBuilder::new(max_slots).build_with_hasher(DefaultHashBuilder::default()),
}
}
/// Get a storage value
pub(crate) fn get_storage(&self, key: &StorageKey) -> Option<Option<StorageValue>> {
self.slots.get(key)
}
/// Insert a storage value
pub(crate) fn insert_storage(&self, key: StorageKey, value: Option<StorageValue>) {
self.slots.insert(key, value);
}
/// Returns the number of slots in the cache
pub(crate) fn len(&self) -> usize {
self.slots.entry_count() as usize
}
}
impl Default for AccountStorageCache {
fn default() -> Self {
// With weigher and max_capacity in place, this number represents
// the maximum number of entries that can be stored, not the actual
// memory usage which is controlled by storage cache's max_capacity.
Self::new(1_000_000)
}
}
#[cfg(test)]
mod tests {
use super::*;
use rand::Rng;
use std::mem::size_of;
mod tracking_allocator {
use std::{
alloc::{GlobalAlloc, Layout, System},
sync::atomic::{AtomicUsize, Ordering},
};
#[derive(Debug)]
pub(crate) struct TrackingAllocator {
allocated: AtomicUsize,
total_allocated: AtomicUsize,
inner: System,
}
impl TrackingAllocator {
pub(crate) const fn new() -> Self {
Self {
allocated: AtomicUsize::new(0),
total_allocated: AtomicUsize::new(0),
inner: System,
}
}
pub(crate) fn reset(&self) {
self.allocated.store(0, Ordering::SeqCst);
self.total_allocated.store(0, Ordering::SeqCst);
}
pub(crate) fn total_allocated(&self) -> usize {
self.total_allocated.load(Ordering::SeqCst)
}
}
unsafe impl GlobalAlloc for TrackingAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let ret = self.inner.alloc(layout);
if !ret.is_null() {
self.allocated.fetch_add(layout.size(), Ordering::SeqCst);
self.total_allocated.fetch_add(layout.size(), Ordering::SeqCst);
}
ret
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
self.allocated.fetch_sub(layout.size(), Ordering::SeqCst);
self.inner.dealloc(ptr, layout)
}
}
}
use tracking_allocator::TrackingAllocator;
#[global_allocator]
static ALLOCATOR: TrackingAllocator = TrackingAllocator::new();
fn measure_allocation<T, F>(f: F) -> (usize, T)
where
F: FnOnce() -> T,
{
ALLOCATOR.reset();
let result = f();
let total = ALLOCATOR.total_allocated();
(total, result)
}
#[test]
fn measure_storage_cache_overhead() {
let (base_overhead, cache) = measure_allocation(|| AccountStorageCache::new(1000));
println!("Base AccountStorageCache overhead: {} bytes", base_overhead);
let mut rng = rand::thread_rng();
let key = StorageKey::random();
let value = StorageValue::from(rng.gen::<u128>());
let (first_slot, _) = measure_allocation(|| {
cache.insert_storage(key, Some(value));
});
println!("First slot insertion overhead: {} bytes", first_slot);
const TOTAL_SLOTS: usize = 10_000;
let (test_slots, _) = measure_allocation(|| {
for _ in 0..TOTAL_SLOTS {
let key = StorageKey::random();
let value = StorageValue::from(rng.gen::<u128>());
cache.insert_storage(key, Some(value));
}
});
println!("Average overhead over {} slots: {} bytes", TOTAL_SLOTS, test_slots / TOTAL_SLOTS);
println!("\nTheoretical sizes:");
println!("StorageKey size: {} bytes", size_of::<StorageKey>());
println!("StorageValue size: {} bytes", size_of::<StorageValue>());
println!("Option<StorageValue> size: {} bytes", size_of::<Option<StorageValue>>());
println!("Option<B256> size: {} bytes", size_of::<Option<B256>>());
}
}

View File

@ -19,7 +19,7 @@ use alloy_rpc_types_engine::{
ForkchoiceState, PayloadStatus, PayloadStatusEnum, PayloadValidationError,
};
use block_buffer::BlockBuffer;
use cached_state::ProviderCaches;
use cached_state::{ProviderCaches, SavedCache};
use error::{InsertBlockError, InsertBlockErrorKind, InsertBlockFatalError};
use persistence_state::CurrentPersistenceAction;
use reth_chain_state::{
@ -70,7 +70,7 @@ use std::{
ops::Bound,
sync::{
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
Arc,
Arc, RwLock,
},
time::{Duration, Instant},
};
@ -571,6 +571,8 @@ where
invalid_block_hook: Box<dyn InvalidBlockHook<N>>,
/// The engine API variant of this handler
engine_kind: EngineApiKind,
/// The most recent cache used for execution.
most_recent_cache: Option<SavedCache>,
/// Thread pool used for the state root task and prewarming
thread_pool: Arc<rayon::ThreadPool>,
}
@ -668,6 +670,7 @@ where
incoming_tx,
invalid_block_hook: Box::new(NoopInvalidBlockHook),
engine_kind,
most_recent_cache: None,
thread_pool,
}
}
@ -2294,6 +2297,19 @@ where
Ok(None)
}
/// This fetches the most recent saved cache, using the hash of the block we are trying to
/// execute on top of.
///
/// If the hash does not match the saved cache's hash, then the only saved cache doesn't contain
/// state useful for this block's execution, and we return `None`.
///
/// If there is no cache saved, this returns `None`.
///
/// This `take`s the cache, to avoid cloning the entire cache.
fn take_latest_cache(&mut self, parent_hash: B256) -> Option<SavedCache> {
self.most_recent_cache.take_if(|cache| cache.executed_block_hash() == parent_hash)
}
fn insert_block_without_senders(
&mut self,
block: SealedBlock<N::Block>,
@ -2418,16 +2434,24 @@ where
(None, None, None, Box::new(NoopHook::default()) as Box<dyn OnStateHook>)
};
let (caches, cache_metrics) =
if let Some(cache) = self.take_latest_cache(block.parent_hash()) {
cache.split()
} else {
(ProviderCacheBuilder::default().build_caches(), CachedStateMetrics::zeroed())
};
// Use cached state provider before executing, used in execution after prewarming threads
// complete
let caches = ProviderCacheBuilder::default().build_caches();
let cache_metrics = CachedStateMetrics::zeroed();
let state_provider = CachedStateProvider::new_with_caches(
state_provider,
caches.clone(),
cache_metrics.clone(),
);
// This prevents caches from being saved without all prewarm execution tasks being completed
let prewarm_task_lock = Arc::new(RwLock::new(()));
if self.config.use_caching_and_prewarming() {
debug!(target: "engine::tree", "Spawning prewarm threads");
let prewarm_start = Instant::now();
@ -2444,6 +2468,7 @@ where
cache_metrics.clone(),
state_root_sender,
cancel_execution.clone(),
prewarm_task_lock.clone(),
)?;
let elapsed = start.elapsed();
debug!(target: "engine::tree", ?tx_idx, elapsed = ?elapsed, "Spawned transaction prewarm");
@ -2545,6 +2570,19 @@ where
self.metrics.block_validation.record_state_root(&trie_output, root_elapsed.as_secs_f64());
debug!(target: "engine::tree", ?root_elapsed, block=?block_num_hash, "Calculated state root");
if self.config.use_caching_and_prewarming() {
// this is the only place / thread a writer is acquired, so we would have already
// crashed if we had a poisoned rwlock
//
// we use a lock here and in prewarming, so we do not save the cache if a prewarm task
// is still running, since it would update the cache with stale data. It's unlikely that
// prewarm tasks are still running at this point however
drop(prewarm_task_lock.write().unwrap());
// apply state updates to cache and save it (if saving was successful)
self.most_recent_cache =
state_provider.save_cache(sealed_block.hash(), &output.state).ok();
}
let executed: ExecutedBlockWithTrieUpdates<N> = ExecutedBlockWithTrieUpdates {
block: ExecutedBlock {
recovered_block: Arc::new(block),
@ -2637,6 +2675,7 @@ where
cache_metrics: CachedStateMetrics,
state_root_sender: Option<Sender<StateRootMessage>>,
cancel_execution: ManualCancel,
task_finished: Arc<RwLock<()>>,
) -> Result<(), InsertBlockErrorKind> {
let Some(state_provider) = self.state_provider(block.parent_hash())? else {
trace!(target: "engine::tree", parent=%block.parent_hash(), "Could not get state provider for prewarm");
@ -2652,6 +2691,7 @@ where
// spawn task executing the individual tx
self.thread_pool.spawn(move || {
let in_progress = task_finished.read().unwrap();
let state_provider = StateProviderDatabase::new(&state_provider);
// create a new executor and disable nonce checks in the env
@ -2674,6 +2714,9 @@ where
}
};
// execution no longer in progress, so we can drop the lock
drop(in_progress);
// if execution is finished there is no point to sending proof targets
if cancel_execution.is_cancelled() {
return