chore(rpc): split cache into multiple files (#3519)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Roman Krasiuk
2023-07-01 14:16:22 +03:00
committed by GitHub
parent 56674ade06
commit de1323921d
4 changed files with 183 additions and 140 deletions

50
crates/rpc/rpc/src/eth/cache/config.rs vendored Normal file
View File

@ -0,0 +1,50 @@
use serde::{Deserialize, Serialize};
// TODO: memory based limiter is currently disabled pending <https://github.com/paradigmxyz/reth/issues/3503>
/// Default cache size for the block cache: 500MB
///
/// With an average block size of ~100kb this should be able to cache ~5000 blocks.
pub const DEFAULT_BLOCK_CACHE_SIZE_BYTES_MB: usize = 500;
/// Default cache size for the receipts cache: 500MB
pub const DEFAULT_RECEIPT_CACHE_SIZE_BYTES_MB: usize = 500;
/// Default cache size for the env cache: 1MB
pub const DEFAULT_ENV_CACHE_SIZE_BYTES_MB: usize = 1;
/// Default cache size for the block cache: 5000 blocks.
pub const DEFAULT_BLOCK_CACHE_MAX_LEN: u32 = 5000;
/// Default cache size for the receipts cache: 2000 receipts.
pub const DEFAULT_RECEIPT_CACHE_MAX_LEN: u32 = 2000;
/// Default cache size for the env cache: 1000 envs.
pub const DEFAULT_ENV_CACHE_MAX_LEN: u32 = 1000;
/// Settings for the [EthStateCache](crate::eth::cache::EthStateCache).
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EthStateCacheConfig {
/// Max number of blocks in cache.
///
/// Default is 5000.
pub max_blocks: u32,
/// Max number receipts in cache.
///
/// Default is 2000.
pub max_receipts: u32,
/// Max number of bytes for cached env data.
///
/// Default is 1000.
pub max_envs: u32,
}
impl Default for EthStateCacheConfig {
fn default() -> Self {
Self {
max_blocks: DEFAULT_BLOCK_CACHE_MAX_LEN,
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_envs: DEFAULT_ENV_CACHE_MAX_LEN,
}
}
}

13
crates/rpc/rpc/src/eth/cache/metrics.rs vendored Normal file
View File

@ -0,0 +1,13 @@
use reth_metrics::{
metrics::{self, Gauge},
Metrics,
};
#[derive(Metrics)]
#[metrics(scope = "rpc.eth_cache")]
pub(crate) struct CacheMetrics {
/// The number of entities in the cache.
pub(crate) cached_count: Gauge,
/// The number of queued consumers.
pub(crate) queued_consumers_count: Gauge,
}

View File

@ -2,20 +2,13 @@
use futures::{future::Either, Stream, StreamExt};
use reth_interfaces::{provider::ProviderError, Result};
use reth_metrics::{
metrics::{self, Gauge},
Metrics,
};
use reth_primitives::{Block, Receipt, SealedBlock, TransactionSigned, H256};
use reth_provider::{BlockReader, CanonStateNotification, EvmEnvProvider, StateProviderFactory};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use revm::primitives::{BlockEnv, CfgEnv};
use schnellru::{ByLength, Limiter, LruMap};
use serde::{Deserialize, Serialize};
use schnellru::{ByLength, Limiter};
use std::{
collections::{hash_map::Entry, HashMap},
future::Future,
hash::Hash,
pin::Pin,
task::{ready, Context, Poll},
};
@ -25,26 +18,13 @@ use tokio::sync::{
};
use tokio_stream::wrappers::UnboundedReceiverStream;
// TODO: memory based limiter is currently disabled pending <https://github.com/paradigmxyz/reth/issues/3503>
/// Default cache size for the block cache: 500MB
///
/// With an average block size of ~100kb this should be able to cache ~5000 blocks.
pub const DEFAULT_BLOCK_CACHE_SIZE_BYTES_MB: usize = 500;
mod config;
pub use config::*;
/// Default cache size for the receipts cache: 500MB
pub const DEFAULT_RECEIPT_CACHE_SIZE_BYTES_MB: usize = 500;
mod metrics;
/// Default cache size for the env cache: 1MB
pub const DEFAULT_ENV_CACHE_SIZE_BYTES_MB: usize = 1;
/// Default cache size for the block cache: 5000 blocks.
pub const DEFAULT_BLOCK_CACHE_MAX_LEN: u32 = 5000;
/// Default cache size for the receipts cache: 2000 receipts.
pub const DEFAULT_RECEIPT_CACHE_MAX_LEN: u32 = 2000;
/// Default cache size for the env cache: 1000 envs.
pub const DEFAULT_ENV_CACHE_MAX_LEN: u32 = 1000;
mod multi_consumer;
pub use multi_consumer::MultiConsumerLruCache;
/// The type that can send the response to a requested [Block]
type BlockResponseSender = oneshot::Sender<Result<Option<Block>>>;
@ -69,34 +49,6 @@ type ReceiptsLruCache<L> = MultiConsumerLruCache<H256, Vec<Receipt>, L, Receipts
type EnvLruCache<L> = MultiConsumerLruCache<H256, (CfgEnv, BlockEnv), L, EnvResponseSender>;
/// Settings for the [EthStateCache]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EthStateCacheConfig {
/// Max number of blocks in cache.
///
/// Default is 5000.
pub max_blocks: u32,
/// Max number receipts in cache.
///
/// Default is 2000.
pub max_receipts: u32,
/// Max number of bytes for cached env data.
///
/// Default is 1000.
pub max_envs: u32,
}
impl Default for EthStateCacheConfig {
fn default() -> Self {
Self {
max_blocks: DEFAULT_BLOCK_CACHE_MAX_LEN,
max_receipts: DEFAULT_RECEIPT_CACHE_MAX_LEN,
max_envs: DEFAULT_ENV_CACHE_MAX_LEN,
}
}
}
/// Provides async access to cached eth data
///
/// This is the frontend for the async caching service which manages cached data on a different
@ -301,7 +253,7 @@ where
// cache good block
if let Ok(Some(block)) = res {
self.full_block_cache.cache.insert(block_hash, block);
self.full_block_cache.insert(block_hash, block);
}
}
@ -315,7 +267,7 @@ where
// cache good receipts
if let Ok(Some(receipts)) = res {
self.receipts_cache.cache.insert(block_hash, receipts);
self.receipts_cache.insert(block_hash, receipts);
}
}
@ -345,9 +297,7 @@ where
match action {
CacheAction::GetBlock { block_hash, response_tx } => {
// check if block is cached
if let Some(block) =
this.full_block_cache.cache.get(&block_hash).cloned()
{
if let Some(block) = this.full_block_cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(Some(block)));
continue
}
@ -365,7 +315,7 @@ where
}
CacheAction::GetBlockTransactions { block_hash, response_tx } => {
// check if block is cached
if let Some(block) = this.full_block_cache.cache.get(&block_hash) {
if let Some(block) = this.full_block_cache.get(&block_hash) {
let _ = response_tx.send(Ok(Some(block.body.clone())));
continue
}
@ -383,9 +333,7 @@ where
}
CacheAction::GetReceipts { block_hash, response_tx } => {
// check if block is cached
if let Some(receipts) =
this.receipts_cache.cache.get(&block_hash).cloned()
{
if let Some(receipts) = this.receipts_cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(Some(receipts)));
continue
}
@ -403,7 +351,7 @@ where
}
CacheAction::GetEnv { block_hash, response_tx } => {
// check if env data is cached
if let Some(env) = this.evm_env_cache.cache.get(&block_hash).cloned() {
if let Some(env) = this.evm_env_cache.get(&block_hash).cloned() {
let _ = response_tx.send(Ok(env));
continue
}
@ -443,7 +391,7 @@ where
// cache good env data
if let Ok(data) = res {
this.evm_env_cache.cache.insert(block_hash, data);
this.evm_env_cache.insert(block_hash, data);
}
}
CacheAction::CacheNewCanonicalChain { blocks, receipts } => {
@ -466,72 +414,6 @@ where
}
}
struct MultiConsumerLruCache<K, V, L, S>
where
K: Hash + Eq,
L: Limiter<K, V>,
{
/// The LRU cache for the
cache: LruMap<K, V, L>,
/// All queued consumers
queued: HashMap<K, Vec<S>>,
/// Cache metrics
metrics: CacheMetrics,
}
impl<K, V, L, S> MultiConsumerLruCache<K, V, L, S>
where
K: Hash + Eq,
L: Limiter<K, V>,
{
/// Adds the sender to the queue for the given key.
///
/// Returns true if this is the first queued sender for the key
fn queue(&mut self, key: K, sender: S) -> bool {
self.metrics.queued_consumers_count.increment(1.0);
match self.queued.entry(key) {
Entry::Occupied(mut entry) => {
entry.get_mut().push(sender);
false
}
Entry::Vacant(entry) => {
entry.insert(vec![sender]);
true
}
}
}
/// Remove consumers for a given key.
fn remove(&mut self, key: &K) -> Option<Vec<S>> {
match self.queued.remove(key) {
Some(removed) => {
self.metrics.queued_consumers_count.decrement(removed.len() as f64);
Some(removed)
}
None => None,
}
}
#[inline]
fn update_cached_metrics(&self) {
self.metrics.cached_count.set(self.cache.len() as f64);
}
}
impl<K, V, S> MultiConsumerLruCache<K, V, ByLength, S>
where
K: Hash + Eq,
{
/// Creates a new empty map with a given `memory_budget` and metric label.
fn new(max_len: u32, cache_id: &str) -> Self {
Self {
cache: LruMap::new(ByLength::new(max_len)),
queued: Default::default(),
metrics: CacheMetrics::new_with_labels(&[("cache", cache_id.to_string())]),
}
}
}
/// All message variants sent through the channel
enum CacheAction {
GetBlock { block_hash: H256, response_tx: BlockResponseSender },
@ -578,12 +460,3 @@ where
}
}
}
#[derive(Metrics)]
#[metrics(scope = "rpc.eth_cache")]
struct CacheMetrics {
/// The number of entities in the cache.
cached_count: Gauge,
/// The number of queued consumers.
queued_consumers_count: Gauge,
}

View File

@ -0,0 +1,107 @@
use super::metrics::CacheMetrics;
use schnellru::{ByLength, Limiter, LruMap};
use std::{
collections::{hash_map::Entry, HashMap},
fmt::{self, Debug, Formatter},
hash::Hash,
};
/// A multi-consumer LRU cache.
pub struct MultiConsumerLruCache<K, V, L, S>
where
K: Hash + Eq,
L: Limiter<K, V>,
{
/// The LRU cache for the
cache: LruMap<K, V, L>,
/// All queued consumers
queued: HashMap<K, Vec<S>>,
/// Cache metrics
metrics: CacheMetrics,
}
impl<K, V, L, S> Debug for MultiConsumerLruCache<K, V, L, S>
where
K: Hash + Eq,
L: Limiter<K, V>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("MultiConsumerLruCache")
.field("cache_length", &self.cache.len())
.field("cache_memory_usage", &self.cache.memory_usage())
.field("queued_length", &self.queued.len())
.finish()
}
}
impl<K, V, L, S> MultiConsumerLruCache<K, V, L, S>
where
K: Hash + Eq + Debug,
L: Limiter<K, V>,
{
/// Adds the sender to the queue for the given key.
///
/// Returns true if this is the first queued sender for the key
pub fn queue(&mut self, key: K, sender: S) -> bool {
self.metrics.queued_consumers_count.increment(1.0);
match self.queued.entry(key) {
Entry::Occupied(mut entry) => {
entry.get_mut().push(sender);
false
}
Entry::Vacant(entry) => {
entry.insert(vec![sender]);
true
}
}
}
/// Remove consumers for a given key.
pub fn remove(&mut self, key: &K) -> Option<Vec<S>> {
match self.queued.remove(key) {
Some(removed) => {
self.metrics.queued_consumers_count.decrement(removed.len() as f64);
Some(removed)
}
None => None,
}
}
/// Returns a reference to the value for a given key and promotes that element to be the most
/// recently used.
pub fn get(&mut self, key: &K) -> Option<&mut V> {
self.cache.get(key)
}
/// Inserts a new element into the map.
///
/// Can fail if the element is rejected by the limiter or if we fail to grow an empty map.
///
/// See [Schnellru::insert](LruMap::insert) for more info.
pub fn insert<'a>(&mut self, key: L::KeyToInsert<'a>, value: V) -> bool
where
L::KeyToInsert<'a>: Hash + PartialEq<K>,
{
self.cache.insert(key, value)
}
/// Update metrics for the inner cache.
#[inline]
pub fn update_cached_metrics(&self) {
self.metrics.cached_count.set(self.cache.len() as f64);
}
}
impl<K, V, S> MultiConsumerLruCache<K, V, ByLength, S>
where
K: Hash + Eq,
{
/// Creates a new empty map with a given `max_len` and metric label.
pub fn new(max_len: u32, cache_id: &str) -> Self {
Self {
cache: LruMap::new(ByLength::new(max_len)),
queued: Default::default(),
metrics: CacheMetrics::new_with_labels(&[("cache", cache_id.to_string())]),
}
}
}