fix: fetch missing cached blocks (#5612)

This commit is contained in:
Matthias Seitz
2023-11-30 16:12:17 +01:00
committed by GitHub
parent 02a07f6480
commit 12d9559333

View File

@ -1,40 +1,25 @@
//! Consist of types adjacent to the fee history cache and its configs
use crate::eth::{cache::EthStateCache, error::EthApiError, gas_oracle::MAX_HEADER_HISTORY};
use futures::{Stream, StreamExt};
use futures::{
future::{Fuse, FusedFuture},
FutureExt, Stream, StreamExt,
};
use metrics::atomics::AtomicU64;
use reth_primitives::{Receipt, SealedBlock, TransactionSigned, B256, U256};
use reth_provider::{BlockReaderIdExt, CanonStateNotification, ChainSpecProvider};
use reth_rpc_types::TxGasAndReward;
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
collections::{BTreeMap, VecDeque},
fmt::Debug,
sync::{atomic::Ordering::SeqCst, Arc},
};
use tracing::trace;
/// Settings for the [FeeHistoryCache].
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeHistoryCacheConfig {
/// Max number of blocks in cache.
///
/// Default is [MAX_HEADER_HISTORY] plus some change to also serve slightly older blocks from
/// cache, since fee_history supports the entire range
pub max_blocks: u64,
/// Percentile approximation resolution
///
/// Default is 4 which means 0.25
pub resolution: u64,
}
impl Default for FeeHistoryCacheConfig {
fn default() -> Self {
FeeHistoryCacheConfig { max_blocks: MAX_HEADER_HISTORY + 100, resolution: 4 }
}
}
/// Wrapper struct for BTreeMap
/// Contains cached fee history entries for blocks.
///
/// Purpose for this is to provide cached data for `eth_feeHistory`.
#[derive(Debug, Clone)]
pub struct FeeHistoryCache {
inner: Arc<FeeHistoryCacheInner>,
@ -65,7 +50,24 @@ impl FeeHistoryCache {
self.config().resolution
}
/// Processing of the arriving blocks
/// Returns all blocks that are missing in the cache in the [lower_bound, upper_bound] range.
///
/// This function is used to populate the cache with missing blocks, which can happen if the
/// node switched to stage sync node.
async fn missing_consecutive_blocks(&self) -> VecDeque<u64> {
let mut missing_blocks = VecDeque::new();
let lower_bound = self.lower_bound();
let upper_bound = self.upper_bound();
let entries = self.inner.entries.read().await;
for block_number in (lower_bound..upper_bound).rev() {
if !entries.contains_key(&block_number) {
missing_blocks.push_back(block_number);
}
}
missing_blocks
}
/// Insert block data into the cache.
async fn insert_blocks<I>(&self, blocks: I)
where
I: Iterator<Item = (SealedBlock, Vec<Receipt>)>,
@ -99,6 +101,13 @@ impl FeeHistoryCache {
}
let upper_bound = *entries.last_entry().expect("Contains at least one entry").key();
// also enforce proper lower bound in case we have gaps
let target_lower = upper_bound.saturating_sub(self.inner.config.max_blocks);
while entries.len() > 1 && *entries.first_key_value().unwrap().0 < target_lower {
entries.pop_first();
}
let lower_bound = *entries.first_entry().expect("Contains at least one entry").key();
self.inner.upper_bound.store(upper_bound, SeqCst);
self.inner.lower_bound.store(lower_bound, SeqCst);
@ -130,7 +139,7 @@ impl FeeHistoryCache {
if start_block >= lower_bound && end_block <= upper_bound {
let entries = self.inner.entries.read().await;
let result = entries
.range(start_block..=end_block + 1)
.range(start_block..=end_block)
.map(|(_, fee_entry)| fee_entry.clone())
.collect::<Vec<_>>();
@ -153,18 +162,39 @@ impl FeeHistoryCache {
}
}
/// Settings for the [FeeHistoryCache].
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeHistoryCacheConfig {
/// Max number of blocks in cache.
///
/// Default is [MAX_HEADER_HISTORY] plus some change to also serve slightly older blocks from
/// cache, since fee_history supports the entire range
pub max_blocks: u64,
/// Percentile approximation resolution
///
/// Default is 4 which means 0.25
pub resolution: u64,
}
impl Default for FeeHistoryCacheConfig {
fn default() -> Self {
FeeHistoryCacheConfig { max_blocks: MAX_HEADER_HISTORY + 100, resolution: 4 }
}
}
/// Container type for shared state in [FeeHistoryCache]
#[derive(Debug)]
struct FeeHistoryCacheInner {
/// Stores the lower bound of the cache
lower_bound: AtomicU64,
/// Stores the upper bound of the cache
upper_bound: AtomicU64,
/// Config for FeeHistoryCache, consists of resolution for percentile approximation
/// and max number of blocks
config: FeeHistoryCacheConfig,
/// Stores the entries of the cache
entries: tokio::sync::RwLock<BTreeMap<u64, FeeHistoryEntry>>,
#[allow(unused)]
eth_cache: EthStateCache,
}
@ -173,22 +203,56 @@ struct FeeHistoryCacheInner {
pub async fn fee_history_cache_new_blocks_task<St, Provider>(
fee_history_cache: FeeHistoryCache,
mut events: St,
_provider: Provider,
provider: Provider,
) where
St: Stream<Item = CanonStateNotification> + Unpin + 'static,
Provider: BlockReaderIdExt + ChainSpecProvider + 'static,
{
// TODO: keep track of gaps in the chain and fill them from disk
// We're listening for new blocks emitted when the node is in live sync.
// If the node transitions to stage sync, we need to fetch the missing blocks
let mut missing_blocks = VecDeque::new();
let mut fetch_missing_block = Fuse::terminated();
while let Some(event) = events.next().await {
if let Some(committed) = event.committed() {
let (blocks, receipts): (Vec<_>, Vec<_>) = committed
.blocks_and_receipts()
.map(|(block, receipts)| {
(block.block.clone(), receipts.iter().flatten().cloned().collect::<Vec<_>>())
})
.unzip();
fee_history_cache.insert_blocks(blocks.into_iter().zip(receipts)).await;
loop {
if fetch_missing_block.is_terminated() {
if let Some(block_number) = missing_blocks.pop_front() {
trace!(target: "rpc::fee", ?block_number, "Fetching missing block for fee history cache");
if let Ok(Some(hash)) = provider.block_hash(block_number) {
// fetch missing block
fetch_missing_block = fee_history_cache
.inner
.eth_cache
.get_block_and_receipts(hash)
.boxed()
.fuse();
}
}
}
tokio::select! {
res = &mut fetch_missing_block => {
if let Ok(res) = res {
fee_history_cache.insert_blocks(res.into_iter()).await;
}
}
event = events.next() => {
let Some(event) = event else {
// the stream ended, we are done
break;
};
if let Some(committed) = event.committed() {
let (blocks, receipts): (Vec<_>, Vec<_>) = committed
.blocks_and_receipts()
.map(|(block, receipts)| {
(block.block.clone(), receipts.iter().flatten().cloned().collect::<Vec<_>>())
})
.unzip();
fee_history_cache.insert_blocks(blocks.into_iter().zip(receipts)).await;
// keep track of missing blocks
missing_blocks = fee_history_cache.missing_consecutive_blocks().await;
}
}
}
}
}