Feat/improve fee history performance (#5182)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Nil Medvedev
2023-11-27 10:59:27 +00:00
committed by GitHub
parent 57653b1a58
commit 563a683a62
18 changed files with 490 additions and 124 deletions

View File

@ -92,7 +92,7 @@ impl BodiesClient for TestBodiesClient {
Box::pin(async move {
if should_respond_empty {
return Ok((PeerId::default(), vec![]).into());
return Ok((PeerId::default(), vec![]).into())
}
if should_delay {

View File

@ -252,7 +252,7 @@ impl SharedCapability {
/// Returns an error if the offset is equal or less than [`MAX_RESERVED_MESSAGE_ID`].
pub(crate) fn new(name: &str, version: u8, offset: u8) -> Result<Self, SharedCapabilityError> {
if offset <= MAX_RESERVED_MESSAGE_ID {
return Err(SharedCapabilityError::ReservedMessageIdOffset(offset));
return Err(SharedCapabilityError::ReservedMessageIdOffset(offset))
}
match name {

View File

@ -117,7 +117,7 @@ impl SnapshotSegment {
) -> Option<(Self, RangeInclusive<BlockNumber>, RangeInclusive<TxNumber>)> {
let mut parts = name.to_str()?.split('_');
if parts.next() != Some("snapshot") {
return None;
return None
}
let segment = Self::from_str(parts.next()?).ok()?;
@ -125,7 +125,7 @@ impl SnapshotSegment {
let (tx_start, tx_end) = (parts.next()?.parse().ok()?, parts.next()?.parse().ok()?);
if block_start >= block_end || tx_start > tx_end {
return None;
return None
}
Some((segment, block_start..=block_end, tx_start..=tx_end))

View File

@ -16,7 +16,10 @@ use reth_provider::{
StateProviderFactory,
};
use reth_rpc::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig},
eth::{
cache::EthStateCache, gas_oracle::GasPriceOracle, EthFilterConfig, FeeHistoryCache,
FeeHistoryCacheConfig,
},
AuthLayer, BlockingTaskPool, Claims, EngineEthApi, EthApi, EthFilter,
EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret,
};
@ -57,7 +60,11 @@ where
// spawn a new cache task
let eth_cache =
EthStateCache::spawn_with(provider.clone(), Default::default(), executor.clone());
let gas_oracle = GasPriceOracle::new(provider.clone(), Default::default(), eth_cache.clone());
let fee_history_cache =
FeeHistoryCache::new(eth_cache.clone(), FeeHistoryCacheConfig::default());
let eth_api = EthApi::with_spawner(
provider.clone(),
pool.clone(),
@ -67,6 +74,7 @@ where
EthConfig::default().rpc_gas_cap,
Box::new(executor.clone()),
BlockingTaskPool::build().expect("failed to build tracing pool"),
fee_history_cache,
);
let config = EthFilterConfig::default()
.max_logs_per_response(DEFAULT_MAX_LOGS_PER_RESPONSE)

View File

@ -5,7 +5,7 @@ use reth_rpc::{
eth::{
cache::{EthStateCache, EthStateCacheConfig},
gas_oracle::GasPriceOracleConfig,
EthFilterConfig, RPC_DEFAULT_GAS_CAP,
EthFilterConfig, FeeHistoryCacheConfig, RPC_DEFAULT_GAS_CAP,
},
BlockingTaskPool, EthApi, EthFilter, EthPubSub,
};
@ -46,6 +46,8 @@ pub struct EthConfig {
///
/// Sets TTL for stale filters
pub stale_filter_ttl: std::time::Duration,
/// Settings for the fee history cache
pub fee_history_cache: FeeHistoryCacheConfig,
}
impl EthConfig {
@ -71,6 +73,7 @@ impl Default for EthConfig {
max_logs_per_response: DEFAULT_MAX_LOGS_PER_RESPONSE,
rpc_gas_cap: RPC_DEFAULT_GAS_CAP.into(),
stale_filter_ttl: DEFAULT_STALE_FILTER_TTL,
fee_history_cache: FeeHistoryCacheConfig::default(),
}
}
}

View File

@ -168,8 +168,9 @@ use reth_provider::{
use reth_rpc::{
eth::{
cache::{cache_new_blocks_task, EthStateCache},
fee_history_cache_new_blocks_task,
gas_oracle::GasPriceOracle,
EthBundle,
EthBundle, FeeHistoryCache,
},
AdminApi, AuthLayer, BlockingTaskGuard, BlockingTaskPool, Claims, DebugApi, EngineEthApi,
EthApi, EthFilter, EthPubSub, EthSubscriptionIdProvider, JwtAuthValidator, JwtSecret, NetApi,
@ -1153,6 +1154,10 @@ where
}
/// Creates the [EthHandlers] type the first time this is called.
///
/// This will spawn the required service tasks for [EthApi] for:
/// - [EthStateCache]
/// - [FeeHistoryCache]
fn with_eth<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&EthHandlers<Provider, Pool, Network, Events>) -> R,
@ -1170,6 +1175,7 @@ where
);
let new_canonical_blocks = self.events.canonical_state_stream();
let c = cache.clone();
self.executor.spawn_critical(
"cache canonical blocks task",
Box::pin(async move {
@ -1177,6 +1183,19 @@ where
}),
);
let fee_history_cache =
FeeHistoryCache::new(cache.clone(), self.config.eth.fee_history_cache.clone());
let new_canonical_blocks = self.events.canonical_state_stream();
let fhc = fee_history_cache.clone();
let provider_clone = self.provider.clone();
self.executor.spawn_critical(
"cache canonical blocks for fee history task",
Box::pin(async move {
fee_history_cache_new_blocks_task(fhc, new_canonical_blocks, provider_clone)
.await;
}),
);
let executor = Box::new(self.executor.clone());
let blocking_task_pool =
BlockingTaskPool::build().expect("failed to build tracing pool");
@ -1189,6 +1208,7 @@ where
self.config.eth.rpc_gas_cap,
executor.clone(),
blocking_task_pool.clone(),
fee_history_cache,
);
let filter = EthFilter::new(
self.provider.clone(),

View File

@ -162,11 +162,11 @@ where
} else {
// We could try to convert to a u128 here but there would probably be loss of
// precision, so we just return an error.
return Err(Error::custom("Deserializing a large non-mainnet TTD is not supported"));
return Err(Error::custom("Deserializing a large non-mainnet TTD is not supported"))
}
} else {
// must be i64 - negative numbers are not supported
return Err(Error::custom("Negative TTD values are invalid and will not be deserialized"));
return Err(Error::custom("Negative TTD values are invalid and will not be deserialized"))
};
Ok(num)

View File

@ -0,0 +1,280 @@
//! Consist of types adjacent to the fee history cache and its configs
use crate::eth::{cache::EthStateCache, error::EthApiError, gas_oracle::MAX_HEADER_HISTORY};
use futures::{Stream, StreamExt};
use metrics::atomics::AtomicU64;
use reth_primitives::{Receipt, SealedBlock, TransactionSigned, B256, U256};
use reth_provider::{BlockReaderIdExt, CanonStateNotification, ChainSpecProvider};
use reth_rpc_types::TxGasAndReward;
use serde::{Deserialize, Serialize};
use std::{
collections::BTreeMap,
fmt::Debug,
sync::{atomic::Ordering::SeqCst, Arc},
};
/// Settings for the [FeeHistoryCache].
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FeeHistoryCacheConfig {
/// Max number of blocks in cache.
///
/// Default is [MAX_HEADER_HISTORY] plus some change to also serve slightly older blocks from
/// cache, since fee_history supports the entire range
pub max_blocks: u64,
/// Percentile approximation resolution
///
/// Default is 4 which means 0.25
pub resolution: u64,
}
impl Default for FeeHistoryCacheConfig {
fn default() -> Self {
FeeHistoryCacheConfig { max_blocks: MAX_HEADER_HISTORY + 100, resolution: 4 }
}
}
/// Wrapper struct for BTreeMap
#[derive(Debug, Clone)]
pub struct FeeHistoryCache {
/// Stores the lower bound of the cache
lower_bound: Arc<AtomicU64>,
upper_bound: Arc<AtomicU64>,
/// Config for FeeHistoryCache, consists of resolution for percentile approximation
/// and max number of blocks
config: FeeHistoryCacheConfig,
/// Stores the entries of the cache
entries: Arc<tokio::sync::RwLock<BTreeMap<u64, FeeHistoryEntry>>>,
#[allow(unused)]
eth_cache: EthStateCache,
}
impl FeeHistoryCache {
/// Creates new FeeHistoryCache instance, initialize it with the mose recent data, set bounds
pub fn new(eth_cache: EthStateCache, config: FeeHistoryCacheConfig) -> Self {
let init_tree_map = BTreeMap::new();
let entries = Arc::new(tokio::sync::RwLock::new(init_tree_map));
let upper_bound = Arc::new(AtomicU64::new(0));
let lower_bound = Arc::new(AtomicU64::new(0));
FeeHistoryCache { config, entries, upper_bound, lower_bound, eth_cache }
}
/// How the cache is configured.
pub fn config(&self) -> &FeeHistoryCacheConfig {
&self.config
}
/// Returns the configured resolution for percentile approximation.
#[inline]
pub fn resolution(&self) -> u64 {
self.config.resolution
}
/// Processing of the arriving blocks
async fn insert_blocks<I>(&self, blocks: I)
where
I: Iterator<Item = (SealedBlock, Vec<Receipt>)>,
{
let mut entries = self.entries.write().await;
let percentiles = self.predefined_percentiles();
// Insert all new blocks and calculate approximated rewards
for (block, receipts) in blocks {
let mut fee_history_entry = FeeHistoryEntry::new(&block);
fee_history_entry.rewards = calculate_reward_percentiles_for_block(
&percentiles,
fee_history_entry.gas_used,
fee_history_entry.base_fee_per_gas,
&block.body,
&receipts,
)
.unwrap_or_default();
entries.insert(block.number, fee_history_entry);
}
// enforce bounds by popping the oldest entries
while entries.len() > self.config.max_blocks as usize {
entries.pop_first();
}
if entries.len() == 0 {
self.upper_bound.store(0, SeqCst);
self.lower_bound.store(0, SeqCst);
return
}
let upper_bound = *entries.last_entry().expect("Contains at least one entry").key();
let lower_bound = *entries.first_entry().expect("Contains at least one entry").key();
self.upper_bound.store(upper_bound, SeqCst);
self.lower_bound.store(lower_bound, SeqCst);
}
/// Get UpperBound value for FeeHistoryCache
pub fn upper_bound(&self) -> u64 {
self.upper_bound.load(SeqCst)
}
/// Get LowerBound value for FeeHistoryCache
pub fn lower_bound(&self) -> u64 {
self.lower_bound.load(SeqCst)
}
/// Collect fee history for given range.
///
/// This function retrieves fee history entries from the cache for the specified range.
/// If the requested range (start_block to end_block) is within the cache bounds,
/// it returns the corresponding entries.
/// Otherwise it returns None.
pub async fn get_history(
&self,
start_block: u64,
end_block: u64,
) -> Option<Vec<FeeHistoryEntry>> {
let lower_bound = self.lower_bound();
let upper_bound = self.upper_bound();
if start_block >= lower_bound && end_block <= upper_bound {
let entries = self.entries.read().await;
let result = entries
.range(start_block..=end_block + 1)
.map(|(_, fee_entry)| fee_entry.clone())
.collect::<Vec<_>>();
if result.is_empty() {
return None
}
Some(result)
} else {
None
}
}
/// Generates predefined set of percentiles
///
/// This returns 100 * resolution points
pub fn predefined_percentiles(&self) -> Vec<f64> {
let res = self.resolution() as f64;
(0..=100 * self.resolution()).map(|p| p as f64 / res).collect()
}
}
/// Awaits for new chain events and directly inserts them into the cache so they're available
/// immediately before they need to be fetched from disk.
pub async fn fee_history_cache_new_blocks_task<St, Provider>(
fee_history_cache: FeeHistoryCache,
mut events: St,
_provider: Provider,
) where
St: Stream<Item = CanonStateNotification> + Unpin + 'static,
Provider: BlockReaderIdExt + ChainSpecProvider + 'static,
{
// TODO: keep track of gaps in the chain and fill them from disk
while let Some(event) = events.next().await {
if let Some(committed) = event.committed() {
let (blocks, receipts): (Vec<_>, Vec<_>) = committed
.blocks_and_receipts()
.map(|(block, receipts)| {
(block.block.clone(), receipts.iter().flatten().cloned().collect::<Vec<_>>())
})
.unzip();
fee_history_cache.insert_blocks(blocks.into_iter().zip(receipts)).await;
}
}
}
/// Calculates reward percentiles for transactions in a block header.
/// Given a list of percentiles and a sealed block header, this function computes
/// the corresponding rewards for the transactions at each percentile.
///
/// The results are returned as a vector of U256 values.
pub(crate) fn calculate_reward_percentiles_for_block(
percentiles: &[f64],
gas_used: u64,
base_fee_per_gas: u64,
transactions: &[TransactionSigned],
receipts: &[Receipt],
) -> Result<Vec<U256>, EthApiError> {
let mut transactions = transactions
.iter()
.zip(receipts)
.scan(0, |previous_gas, (tx, receipt)| {
// Convert the cumulative gas used in the receipts
// to the gas usage by the transaction
//
// While we will sum up the gas again later, it is worth
// noting that the order of the transactions will be different,
// so the sum will also be different for each receipt.
let gas_used = receipt.cumulative_gas_used - *previous_gas;
*previous_gas = receipt.cumulative_gas_used;
Some(TxGasAndReward {
gas_used,
reward: tx.effective_tip_per_gas(Some(base_fee_per_gas)).unwrap_or_default(),
})
})
.collect::<Vec<_>>();
// Sort the transactions by their rewards in ascending order
transactions.sort_by_key(|tx| tx.reward);
// Find the transaction that corresponds to the given percentile
//
// We use a `tx_index` here that is shared across all percentiles, since we know
// the percentiles are monotonically increasing.
let mut tx_index = 0;
let mut cumulative_gas_used = transactions.first().map(|tx| tx.gas_used).unwrap_or_default();
let mut rewards_in_block = Vec::new();
for percentile in percentiles {
// Empty blocks should return in a zero row
if transactions.is_empty() {
rewards_in_block.push(U256::ZERO);
continue
}
let threshold = (gas_used as f64 * percentile / 100.) as u64;
while cumulative_gas_used < threshold && tx_index < transactions.len() - 1 {
tx_index += 1;
cumulative_gas_used += transactions[tx_index].gas_used;
}
rewards_in_block.push(U256::from(transactions[tx_index].reward));
}
Ok(rewards_in_block)
}
/// A cached entry for a block's fee history.
#[derive(Debug, Clone)]
pub struct FeeHistoryEntry {
/// The base fee per gas for this block.
pub base_fee_per_gas: u64,
/// Gas used ratio this block.
pub gas_used_ratio: f64,
/// Gas used by this block.
pub gas_used: u64,
/// Gas limit by this block.
pub gas_limit: u64,
/// Hash of the block.
pub header_hash: B256,
/// Approximated rewards for the configured percentiles.
pub rewards: Vec<U256>,
}
impl FeeHistoryEntry {
/// Creates a new entry from a sealed block.
///
/// Note: This does not calculate the rewards for the block.
pub fn new(block: &SealedBlock) -> Self {
FeeHistoryEntry {
base_fee_per_gas: block.base_fee_per_gas.unwrap_or_default(),
gas_used_ratio: block.gas_used as f64 / block.gas_limit as f64,
gas_used: block.gas_used,
header_hash: block.hash,
gas_limit: block.gas_limit,
rewards: Vec::new(),
}
}
}

View File

@ -1,17 +1,17 @@
//! Contains RPC handler implementations for fee history.
use crate::{
eth::error::{EthApiError, EthResult},
eth::{
api::fee_history::{calculate_reward_percentiles_for_block, FeeHistoryEntry},
error::{EthApiError, EthResult},
},
EthApi,
};
use reth_network_api::NetworkInfo;
use reth_primitives::{
basefee::calculate_next_block_base_fee, BlockNumberOrTag, SealedHeader, U256,
};
use reth_primitives::{basefee::calculate_next_block_base_fee, BlockNumberOrTag, U256};
use reth_provider::{BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProviderFactory};
use reth_rpc_types::{FeeHistory, TxGasAndReward};
use reth_rpc_types::FeeHistory;
use reth_transaction_pool::TransactionPool;
use tracing::debug;
impl<Provider, Pool, Network> EthApi<Provider, Pool, Network>
@ -46,8 +46,10 @@ where
self.gas_oracle().suggest_tip_cap().await
}
/// Reports the fee history, for the given amount of blocks, up until the newest block
/// provided.
/// Reports the fee history, for the given amount of blocks, up until the given newest block.
///
/// If `reward_percentiles` are provided the [FeeHistory] will include the _approximated_
/// rewards for the requested range.
pub(crate) async fn fee_history(
&self,
mut block_count: u64,
@ -85,9 +87,9 @@ where
block_count = end_block_plus;
}
// If reward percentiles were specified, we need to validate that they are monotonically
// If reward percentiles were specified, we
// need to validate that they are monotonically
// increasing and 0 <= p <= 100
//
// Note: The types used ensure that the percentiles are never < 0
if let Some(percentiles) = &reward_percentiles {
if percentiles.windows(2).any(|w| w[0] > w[1] || w[0] > 100.) {
@ -101,38 +103,89 @@ where
// otherwise `newest_block - 2
// SAFETY: We ensured that block count is capped
let start_block = end_block_plus - block_count;
let headers = self.provider().sealed_headers_range(start_block..=end_block)?;
if headers.len() != block_count as usize {
return Err(EthApiError::InvalidBlockRange)
}
// Collect base fees, gas usage ratios and (optionally) reward percentile data
let mut base_fee_per_gas: Vec<U256> = Vec::new();
let mut gas_used_ratio: Vec<f64> = Vec::new();
let mut rewards: Vec<Vec<U256>> = Vec::new();
// Check if the requested range is within the cache bounds
let fee_entries = self.fee_history_cache().get_history(start_block, end_block).await;
if let Some(fee_entries) = fee_entries {
if fee_entries.len() != block_count as usize {
return Err(EthApiError::InvalidBlockRange)
}
for entry in &fee_entries {
base_fee_per_gas.push(U256::from(entry.base_fee_per_gas));
gas_used_ratio.push(entry.gas_used_ratio);
if let Some(percentiles) = &reward_percentiles {
let mut block_rewards = Vec::with_capacity(percentiles.len());
for &percentile in percentiles.iter() {
block_rewards.push(self.approximate_percentile(entry, percentile));
}
rewards.push(block_rewards);
}
}
let last_entry = fee_entries.last().expect("is not empty");
base_fee_per_gas.push(U256::from(calculate_next_block_base_fee(
last_entry.gas_used,
last_entry.gas_limit,
last_entry.base_fee_per_gas,
self.provider().chain_spec().base_fee_params,
)));
} else {
// read the requested header range
let headers = self.provider().sealed_headers_range(start_block..=end_block)?;
if headers.len() != block_count as usize {
return Err(EthApiError::InvalidBlockRange)
}
for header in &headers {
base_fee_per_gas
.push(U256::try_from(header.base_fee_per_gas.unwrap_or_default()).unwrap());
base_fee_per_gas.push(U256::from(header.base_fee_per_gas.unwrap_or_default()));
gas_used_ratio.push(header.gas_used as f64 / header.gas_limit as f64);
// Percentiles were specified, so we need to collect reward percentile ino
if let Some(percentiles) = &reward_percentiles {
rewards.push(self.calculate_reward_percentiles(percentiles, header).await?);
let (transactions, receipts) = self
.cache()
.get_transactions_and_receipts(header.hash)
.await?
.ok_or(EthApiError::InvalidBlockRange)?;
rewards.push(
calculate_reward_percentiles_for_block(
percentiles,
header.gas_used,
header.base_fee_per_gas.unwrap_or_default(),
&transactions,
&receipts,
)
.unwrap_or_default(),
);
}
}
// The spec states that `base_fee_per_gas` "[..] includes the next block after the newest of
// the returned range, because this value can be derived from the newest block"
// The spec states that `base_fee_per_gas` "[..] includes the next block after the
// newest of the returned range, because this value can be derived from the
// newest block"
//
// The unwrap is safe since we checked earlier that we got at least 1 header.
let last_header = headers.last().unwrap();
let chain_spec = self.provider().chain_spec();
// The spec states that `base_fee_per_gas` "[..] includes the next block after the
// newest of the returned range, because this value can be derived from the
// newest block"
//
// The unwrap is safe since we checked earlier that we got at least 1 header.
let last_header = headers.last().expect("is present");
base_fee_per_gas.push(U256::from(calculate_next_block_base_fee(
last_header.gas_used,
last_header.gas_limit,
last_header.base_fee_per_gas.unwrap_or_default(),
chain_spec.base_fee_params,
self.provider().chain_spec().base_fee_params,
)));
};
Ok(FeeHistory {
base_fee_per_gas,
@ -142,68 +195,17 @@ where
})
}
/// Calculates reward percentiles for transactions in a block header.
/// Given a list of percentiles and a sealed block header, this function computes
/// the corresponding rewards for the transactions at each percentile.
///
/// The results are returned as a vector of U256 values.
async fn calculate_reward_percentiles(
&self,
percentiles: &[f64],
header: &SealedHeader,
) -> Result<Vec<U256>, EthApiError> {
let (transactions, receipts) = self
.cache()
.get_transactions_and_receipts(header.hash)
.await?
.ok_or(EthApiError::InvalidBlockRange)?;
/// Approximates reward at a given percentile for a specific block
/// Based on the configured resolution
fn approximate_percentile(&self, entry: &FeeHistoryEntry, requested_percentile: f64) -> U256 {
let resolution = self.fee_history_cache().resolution();
let rounded_percentile =
(requested_percentile * resolution as f64).round() / resolution as f64;
let clamped_percentile = rounded_percentile.clamp(0.0, 100.0);
let mut transactions = transactions
.into_iter()
.zip(receipts)
.scan(0, |previous_gas, (tx, receipt)| {
// Convert the cumulative gas used in the receipts
// to the gas usage by the transaction
//
// While we will sum up the gas again later, it is worth
// noting that the order of the transactions will be different,
// so the sum will also be different for each receipt.
let gas_used = receipt.cumulative_gas_used - *previous_gas;
*previous_gas = receipt.cumulative_gas_used;
Some(TxGasAndReward {
gas_used,
reward: tx.effective_tip_per_gas(header.base_fee_per_gas).unwrap_or_default(),
})
})
.collect::<Vec<_>>();
// Sort the transactions by their rewards in ascending order
transactions.sort_by_key(|tx| tx.reward);
// Find the transaction that corresponds to the given percentile
//
// We use a `tx_index` here that is shared across all percentiles, since we know
// the percentiles are monotonically increasing.
let mut tx_index = 0;
let mut cumulative_gas_used =
transactions.first().map(|tx| tx.gas_used).unwrap_or_default();
let mut rewards_in_block = Vec::new();
for percentile in percentiles {
// Empty blocks should return in a zero row
if transactions.is_empty() {
rewards_in_block.push(U256::ZERO);
continue
}
let threshold = (header.gas_used as f64 * percentile / 100.) as u64;
while cumulative_gas_used < threshold && tx_index < transactions.len() - 1 {
tx_index += 1;
cumulative_gas_used += transactions[tx_index].gas_used;
}
rewards_in_block.push(U256::from(transactions[tx_index].reward));
}
Ok(rewards_in_block)
// Calculate the index in the precomputed rewards array
let index = (clamped_percentile / (1.0 / resolution as f64)).round() as usize;
// Fetch the reward from the FeeHistoryEntry
entry.rewards.get(index).cloned().unwrap_or(U256::ZERO)
}
}

View File

@ -1,15 +1,17 @@
//! Provides everything related to `eth_` namespace
//!
//! The entire implementation of the namespace is quite large, hence it is divided across several
//! files.
use crate::eth::{
api::pending_block::{PendingBlock, PendingBlockEnv, PendingBlockEnvOrigin},
api::{
fee_history::FeeHistoryCache,
pending_block::{PendingBlock, PendingBlockEnv, PendingBlockEnvOrigin},
},
cache::EthStateCache,
error::{EthApiError, EthResult},
gas_oracle::GasPriceOracle,
signer::EthSigner,
};
use async_trait::async_trait;
use reth_interfaces::RethResult;
use reth_network_api::NetworkInfo;
@ -17,6 +19,7 @@ use reth_primitives::{
revm_primitives::{BlockEnv, CfgEnv},
Address, BlockId, BlockNumberOrTag, ChainInfo, SealedBlockWithSenders, B256, U256, U64,
};
use reth_provider::{
BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProviderBox, StateProviderFactory,
};
@ -24,14 +27,17 @@ use reth_rpc_types::{SyncInfo, SyncStatus};
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use reth_transaction_pool::TransactionPool;
use std::{
fmt::Debug,
future::Future,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::{oneshot, Mutex};
mod block;
mod call;
pub(crate) mod fee_history;
mod fees;
#[cfg(feature = "optimism")]
mod optimism;
@ -86,6 +92,7 @@ where
Provider: BlockReaderIdExt + ChainSpecProvider,
{
/// Creates a new, shareable instance using the default tokio task spawner.
#[allow(clippy::too_many_arguments)]
pub fn new(
provider: Provider,
pool: Pool,
@ -94,6 +101,7 @@ where
gas_oracle: GasPriceOracle<Provider>,
gas_cap: impl Into<GasCap>,
blocking_task_pool: BlockingTaskPool,
fee_history_cache: FeeHistoryCache,
) -> Self {
Self::with_spawner(
provider,
@ -104,6 +112,7 @@ where
gas_cap.into().into(),
Box::<TokioTaskExecutor>::default(),
blocking_task_pool,
fee_history_cache,
)
}
@ -118,6 +127,7 @@ where
gas_cap: u64,
task_spawner: Box<dyn TaskSpawner>,
blocking_task_pool: BlockingTaskPool,
fee_history_cache: FeeHistoryCache,
) -> Self {
// get the block number of the latest block
let latest_block = provider
@ -139,9 +149,11 @@ where
task_spawner,
pending_block: Default::default(),
blocking_task_pool,
fee_history_cache,
#[cfg(feature = "optimism")]
http_client: reqwest::Client::new(),
};
Self { inner: Arc::new(inner) }
}
@ -194,6 +206,11 @@ where
pub fn pool(&self) -> &Pool {
&self.inner.pool
}
/// Returns fee history cache
pub fn fee_history_cache(&self) -> &FeeHistoryCache {
&self.inner.fee_history_cache
}
}
// === State access helpers ===
@ -448,6 +465,8 @@ struct EthApiInner<Provider, Pool, Network> {
pending_block: Mutex<Option<PendingBlock>>,
/// A pool dedicated to blocking tasks.
blocking_task_pool: BlockingTaskPool,
/// Cache for block fees history
fee_history_cache: FeeHistoryCache,
/// An http client for communicating with sequencers.
#[cfg(feature = "optimism")]
http_client: reqwest::Client,

View File

@ -391,7 +391,10 @@ where
#[cfg(test)]
mod tests {
use crate::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle},
eth::{
cache::EthStateCache, gas_oracle::GasPriceOracle, FeeHistoryCache,
FeeHistoryCacheConfig,
},
BlockingTaskPool, EthApi,
};
use jsonrpsee::types::error::INVALID_PARAMS_CODE;
@ -422,14 +425,19 @@ mod tests {
provider: P,
) -> EthApi<P, TestPool, NoopNetwork> {
let cache = EthStateCache::spawn(provider.clone(), Default::default());
let fee_history_cache =
FeeHistoryCache::new(cache.clone(), FeeHistoryCacheConfig::default());
EthApi::new(
provider.clone(),
testing_pool(),
NoopNetwork::default(),
cache.clone(),
GasPriceOracle::new(provider, Default::default(), cache),
GasPriceOracle::new(provider.clone(), Default::default(), cache.clone()),
ETHEREUM_BLOCK_GAS_LIMIT,
BlockingTaskPool::build().expect("failed to build tracing pool"),
fee_history_cache,
)
}
@ -446,6 +454,7 @@ mod tests {
let mut gas_used_ratios = Vec::new();
let mut base_fees_per_gas = Vec::new();
let mut last_header = None;
let mut parent_hash = B256::default();
for i in (0..block_count).rev() {
let hash = rng.gen();
@ -459,9 +468,11 @@ mod tests {
gas_limit,
gas_used,
base_fee_per_gas,
parent_hash,
..Default::default()
};
last_header = Some(header.clone());
parent_hash = hash;
let mut transactions = vec![];
for _ in 0..100 {

View File

@ -125,7 +125,10 @@ where
mod tests {
use super::*;
use crate::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle},
eth::{
cache::EthStateCache, gas_oracle::GasPriceOracle, FeeHistoryCache,
FeeHistoryCacheConfig,
},
BlockingTaskPool,
};
use reth_primitives::{constants::ETHEREUM_BLOCK_GAS_LIMIT, StorageKey, StorageValue};
@ -144,9 +147,10 @@ mod tests {
pool.clone(),
(),
cache.clone(),
GasPriceOracle::new(NoopProvider::default(), Default::default(), cache),
GasPriceOracle::new(NoopProvider::default(), Default::default(), cache.clone()),
ETHEREUM_BLOCK_GAS_LIMIT,
BlockingTaskPool::build().expect("failed to build tracing pool"),
FeeHistoryCache::new(cache.clone(), FeeHistoryCacheConfig::default()),
);
let address = Address::random();
let storage = eth_api.storage_at(address, U256::ZERO.into(), None).unwrap();
@ -166,9 +170,10 @@ mod tests {
pool,
(),
cache.clone(),
GasPriceOracle::new(mock_provider, Default::default(), cache),
GasPriceOracle::new(mock_provider.clone(), Default::default(), cache.clone()),
ETHEREUM_BLOCK_GAS_LIMIT,
BlockingTaskPool::build().expect("failed to build tracing pool"),
FeeHistoryCache::new(cache.clone(), FeeHistoryCacheConfig::default()),
);
let storage_key: U256 = storage_key.into();

View File

@ -1254,7 +1254,10 @@ pub(crate) fn build_transaction_receipt_with_block_receipts(
mod tests {
use super::*;
use crate::{
eth::{cache::EthStateCache, gas_oracle::GasPriceOracle},
eth::{
cache::EthStateCache, gas_oracle::GasPriceOracle, FeeHistoryCache,
FeeHistoryCacheConfig,
},
BlockingTaskPool, EthApi,
};
use reth_network_api::noop::NoopNetwork;
@ -1270,14 +1273,17 @@ mod tests {
let pool = testing_pool();
let cache = EthStateCache::spawn(noop_provider, Default::default());
let fee_history_cache =
FeeHistoryCache::new(cache.clone(), FeeHistoryCacheConfig::default());
let eth_api = EthApi::new(
noop_provider,
pool.clone(),
noop_network_provider,
cache.clone(),
GasPriceOracle::new(noop_provider, Default::default(), cache),
GasPriceOracle::new(noop_provider, Default::default(), cache.clone()),
ETHEREUM_BLOCK_GAS_LIMIT,
BlockingTaskPool::build().expect("failed to build tracing pool"),
fee_history_cache,
);
// https://etherscan.io/tx/0xa694b71e6c128a2ed8e2e0f6770bddbe52e3bb8f10e8472f9a79ab81497a8b5d

View File

@ -16,6 +16,9 @@ use tracing::warn;
/// The number of transactions sampled in a block
pub const SAMPLE_NUMBER: usize = 3_usize;
/// The default maximum number of blocks to use for the gas price oracle.
pub const MAX_HEADER_HISTORY: u64 = 1024;
/// The default maximum gas price to use for the estimate
pub const DEFAULT_MAX_PRICE: U256 = U256::from_limbs([500_000_000_000u64, 0, 0, 0]);
@ -53,8 +56,8 @@ impl Default for GasPriceOracleConfig {
GasPriceOracleConfig {
blocks: 20,
percentile: 60,
max_header_history: 1024,
max_block_history: 1024,
max_header_history: MAX_HEADER_HISTORY,
max_block_history: MAX_HEADER_HISTORY,
default: None,
max_price: Some(DEFAULT_MAX_PRICE),
ignore_price: Some(DEFAULT_IGNORE_PRICE),
@ -73,8 +76,8 @@ impl GasPriceOracleConfig {
Self {
blocks: blocks.unwrap_or(20),
percentile: percentile.unwrap_or(60),
max_header_history: 1024,
max_block_history: 1024,
max_header_history: MAX_HEADER_HISTORY,
max_block_history: MAX_HEADER_HISTORY,
default: None,
max_price: max_price.map(U256::from).or(Some(DEFAULT_MAX_PRICE)),
ignore_price: ignore_price.map(U256::from).or(Some(DEFAULT_IGNORE_PRICE)),

View File

@ -13,7 +13,11 @@ pub mod revm_utils;
mod signer;
pub(crate) mod utils;
pub use api::{EthApi, EthApiSpec, EthTransactions, TransactionSource, RPC_DEFAULT_GAS_CAP};
pub use api::{
fee_history::{fee_history_cache_new_blocks_task, FeeHistoryCache, FeeHistoryCacheConfig},
EthApi, EthApiSpec, EthTransactions, TransactionSource, RPC_DEFAULT_GAS_CAP,
};
pub use bundle::EthBundle;
pub use filter::{EthFilter, EthFilterConfig};
pub use id_provider::EthSubscriptionIdProvider;

View File

@ -1114,7 +1114,6 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
transaction_kind: TransactionVariant,
) -> ProviderResult<Option<BlockWithSenders>> {
let Some(block_number) = self.convert_hash_or_number(id)? else { return Ok(None) };
let Some(header) = self.header_by_number(block_number)? else { return Ok(None) };
let ommers = self.ommers(block_number.into())?.unwrap_or_default();

View File

@ -462,8 +462,14 @@ impl BlockReader for MockEthProvider {
Ok(None)
}
fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
Ok(vec![])
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
let lock = self.blocks.lock();
let mut blocks: Vec<_> =
lock.values().filter(|block| range.contains(&block.number)).cloned().collect();
blocks.sort_by_key(|block| block.number);
Ok(blocks)
}
}

View File

@ -290,7 +290,7 @@ const LOG_2_1_125: f64 = 0.16992500144231237;
pub fn fee_delta(max_tx_fee: u128, current_fee: u128) -> i64 {
if max_tx_fee == current_fee {
// if these are equal, then there's no fee jump
return 0;
return 0
}
let max_tx_fee_jumps = if max_tx_fee == 0 {