Integrate permits for getproof (#9363)

This commit is contained in:
kostekIV
2024-07-08 19:51:08 +02:00
committed by GitHub
parent 2f8a860bb8
commit 3e8a2a29c5
11 changed files with 97 additions and 14 deletions

View File

@ -318,6 +318,11 @@ RPC:
[default: 0]
--rpc.proof-permits <COUNT>
Maximum number of concurrent getproof requests
[default: 25]
RPC State Cache:
--rpc-cache.max-blocks <MAX_BLOCKS>
Max number of blocks in cache

View File

@ -166,6 +166,10 @@ pub struct RpcServerArgs {
)]
pub rpc_eth_proof_window: u64,
/// Maximum number of concurrent getproof requests.
#[arg(long = "rpc.proof-permits", alias = "rpc-proof-permits", value_name = "COUNT", default_value_t = constants::DEFAULT_PROOF_PERMITS)]
pub rpc_proof_permits: usize,
/// State cache configuration.
#[command(flatten)]
pub rpc_state_cache: RpcStateCacheArgs,
@ -299,6 +303,7 @@ impl Default for RpcServerArgs {
rpc_eth_proof_window: constants::DEFAULT_ETH_PROOF_WINDOW,
gas_price_oracle: GasPriceOracleArgs::default(),
rpc_state_cache: RpcStateCacheArgs::default(),
rpc_proof_permits: constants::DEFAULT_PROOF_PERMITS,
}
}
}

View File

@ -20,7 +20,7 @@ use reth_rpc_types::SyncStatus;
use reth_tasks::{pool::BlockingTaskPool, TaskSpawner};
use reth_transaction_pool::TransactionPool;
use std::future::Future;
use tokio::sync::Mutex;
use tokio::sync::{AcquireError, Mutex, OwnedSemaphorePermit};
/// OP-Reth `Eth` API implementation.
///
@ -108,6 +108,19 @@ impl<Eth: SpawnBlocking> SpawnBlocking for OpEthApi<Eth> {
fn tracing_task_pool(&self) -> &BlockingTaskPool {
self.inner.tracing_task_pool()
}
fn acquire_owned(
&self,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.inner.acquire_owned()
}
fn acquire_many_owned(
&self,
n: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.inner.acquire_many_owned(n)
}
}
impl<Eth: LoadReceipt> LoadReceipt for OpEthApi<Eth> {

View File

@ -95,6 +95,7 @@ impl RethRpcServerConfig for RpcServerArgs {
.rpc_gas_cap(self.rpc_gas_cap)
.state_cache(self.state_cache_config())
.gpo_config(self.gas_price_oracle_config())
.proof_permits(self.rpc_proof_permits)
}
fn state_cache_config(&self) -> EthStateCacheConfig {

View File

@ -14,7 +14,7 @@ use reth_rpc_eth_types::{
};
use reth_rpc_server_types::constants::{
default_max_tracing_requests, DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKS_PER_FILTER,
DEFAULT_MAX_LOGS_PER_RESPONSE,
DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_PROOF_PERMITS,
};
use reth_tasks::{pool::BlockingTaskPool, TaskSpawner};
use reth_transaction_pool::TransactionPool;
@ -159,6 +159,8 @@ pub struct EthConfig {
pub stale_filter_ttl: Duration,
/// Settings for the fee history cache
pub fee_history_cache: FeeHistoryCacheConfig,
/// The maximum number of getproof calls that can be executed concurrently.
pub proof_permits: usize,
}
impl EthConfig {
@ -183,6 +185,7 @@ impl Default for EthConfig {
rpc_gas_cap: RPC_DEFAULT_GAS_CAP.into(),
stale_filter_ttl: DEFAULT_STALE_FILTER_TTL,
fee_history_cache: FeeHistoryCacheConfig::default(),
proof_permits: DEFAULT_PROOF_PERMITS,
}
}
}
@ -229,6 +232,12 @@ impl EthConfig {
self.eth_proof_window = window;
self
}
/// Configures the number of getproof requests
pub const fn proof_permits(mut self, permits: usize) -> Self {
self.proof_permits = permits;
self
}
}
/// Context for building the `eth` namespace API.
@ -285,6 +294,7 @@ impl EthApiBuild {
fee_history_cache,
ctx.evm_config.clone(),
None,
ctx.config.proof_permits,
)
}
}

View File

@ -4,7 +4,7 @@
use futures::Future;
use reth_rpc_eth_types::{EthApiError, EthResult};
use reth_tasks::{pool::BlockingTaskPool, TaskSpawner};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit};
/// Executes code on a blocking thread.
pub trait SpawnBlocking: Clone + Send + Sync + 'static {
@ -18,6 +18,17 @@ pub trait SpawnBlocking: Clone + Send + Sync + 'static {
/// Thread pool access in default trait method implementations.
fn tracing_task_pool(&self) -> &BlockingTaskPool;
/// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`).
fn acquire_owned(
&self,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send;
/// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`).
fn acquire_many_owned(
&self,
n: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send;
/// Executes the future on a new blocking task.
///
/// Note: This is expected for futures that are dominated by blocking IO operations, for tracing

View File

@ -2,6 +2,7 @@
//! RPC methods.
use futures::Future;
use reth_errors::RethError;
use reth_evm::ConfigureEvmEnv;
use reth_primitives::{Address, BlockId, Bytes, Header, B256, U256};
use reth_provider::{BlockIdReader, StateProvider, StateProviderBox, StateProviderFactory};
@ -102,12 +103,19 @@ pub trait EthState: LoadState + SpawnBlocking {
return Err(EthApiError::ExceedsMaxProofWindow)
}
Ok(self.spawn_blocking_io(move |this| {
Ok(async move {
let _permit = self
.acquire_owned()
.await
.map_err(|err| EthApiError::Internal(RethError::other(err)))?;
self.spawn_blocking_io(move |this| {
let state = this.state_at_block_id(block_id)?;
let storage_keys = keys.iter().map(|key| key.0).collect::<Vec<_>>();
let proof = state.proof(&BundleState::default(), address, &storage_keys)?;
Ok(from_primitive_account_proof(proof))
}))
})
.await
})
}
}

View File

@ -26,6 +26,9 @@ pub fn default_max_tracing_requests() -> usize {
.map_or(25, |cpus| max(cpus.get().saturating_sub(RESERVED), RESERVED))
}
/// The default number of getproof calls we are allowing to run concurrently.
pub const DEFAULT_PROOF_PERMITS: usize = 25;
/// The default IPC endpoint
#[cfg(windows)]
pub const DEFAULT_IPC_ENDPOINT: &str = r"\\.\pipe\reth.ipc";

View File

@ -1,6 +1,7 @@
//! Implementation of the [`jsonrpsee`] generated [`EthApiServer`](crate::EthApi) trait
//! Handles RPC requests for the `eth_` namespace.
use futures::Future;
use std::sync::Arc;
use derive_more::Deref;
@ -11,8 +12,11 @@ use reth_rpc_eth_api::{
RawTransactionForwarder,
};
use reth_rpc_eth_types::{EthStateCache, FeeHistoryCache, GasCap, GasPriceOracle, PendingBlock};
use reth_tasks::{pool::BlockingTaskPool, TaskSpawner, TokioTaskExecutor};
use tokio::sync::Mutex;
use reth_tasks::{
pool::{BlockingTaskGuard, BlockingTaskPool},
TaskSpawner, TokioTaskExecutor,
};
use tokio::sync::{AcquireError, Mutex, OwnedSemaphorePermit};
use crate::eth::DevSigner;
@ -49,6 +53,7 @@ where
fee_history_cache: FeeHistoryCache,
evm_config: EvmConfig,
raw_transaction_forwarder: Option<Arc<dyn RawTransactionForwarder>>,
proof_permits: usize,
) -> Self {
Self::with_spawner(
provider,
@ -63,6 +68,7 @@ where
fee_history_cache,
evm_config,
raw_transaction_forwarder,
proof_permits,
)
}
@ -81,6 +87,7 @@ where
fee_history_cache: FeeHistoryCache,
evm_config: EvmConfig,
raw_transaction_forwarder: Option<Arc<dyn RawTransactionForwarder>>,
proof_permits: usize,
) -> Self {
// get the block number of the latest block
let latest_block = provider
@ -106,6 +113,7 @@ where
fee_history_cache,
evm_config,
raw_transaction_forwarder: parking_lot::RwLock::new(raw_transaction_forwarder),
blocking_task_guard: BlockingTaskGuard::new(proof_permits),
};
Self { inner: Arc::new(inner) }
@ -140,6 +148,19 @@ where
fn tracing_task_pool(&self) -> &reth_tasks::pool::BlockingTaskPool {
self.inner.blocking_task_pool()
}
fn acquire_owned(
&self,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.blocking_task_guard.clone().acquire_owned()
}
fn acquire_many_owned(
&self,
n: u32,
) -> impl Future<Output = Result<OwnedSemaphorePermit, AcquireError>> + Send {
self.blocking_task_guard.clone().acquire_many_owned(n)
}
}
impl<Provider, Pool, Network, EvmConfig> EthApi<Provider, Pool, Network, EvmConfig> {
@ -184,6 +205,8 @@ pub struct EthApiInner<Provider, Pool, Network, EvmConfig> {
evm_config: EvmConfig,
/// Allows forwarding received raw transactions
raw_transaction_forwarder: parking_lot::RwLock<Option<Arc<dyn RawTransactionForwarder>>>,
/// Guard for getproof calls
blocking_task_guard: BlockingTaskGuard,
}
impl<Provider, Pool, Network, EvmConfig> EthApiInner<Provider, Pool, Network, EvmConfig> {
@ -304,7 +327,7 @@ mod tests {
use reth_rpc_eth_types::{
EthStateCache, FeeHistoryCache, FeeHistoryCacheConfig, GasPriceOracle,
};
use reth_rpc_server_types::constants::DEFAULT_ETH_PROOF_WINDOW;
use reth_rpc_server_types::constants::{DEFAULT_ETH_PROOF_WINDOW, DEFAULT_PROOF_PERMITS};
use reth_rpc_types::FeeHistory;
use reth_tasks::pool::BlockingTaskPool;
use reth_testing_utils::{generators, generators::Rng};
@ -341,6 +364,7 @@ mod tests {
fee_history_cache,
evm_config,
None,
DEFAULT_PROOF_PERMITS,
)
}

View File

@ -51,7 +51,7 @@ mod tests {
use reth_rpc_eth_types::{
EthStateCache, FeeHistoryCache, FeeHistoryCacheConfig, GasPriceOracle,
};
use reth_rpc_server_types::constants::DEFAULT_ETH_PROOF_WINDOW;
use reth_rpc_server_types::constants::{DEFAULT_ETH_PROOF_WINDOW, DEFAULT_PROOF_PERMITS};
use reth_tasks::pool::BlockingTaskPool;
use reth_transaction_pool::test_utils::testing_pool;
@ -76,6 +76,7 @@ mod tests {
FeeHistoryCache::new(cache, FeeHistoryCacheConfig::default()),
evm_config,
None,
DEFAULT_PROOF_PERMITS,
);
let address = Address::random();
let storage = eth_api.storage_at(address, U256::ZERO.into(), None).await.unwrap();
@ -102,6 +103,7 @@ mod tests {
FeeHistoryCache::new(cache, FeeHistoryCacheConfig::default()),
evm_config,
None,
DEFAULT_PROOF_PERMITS,
);
let storage_key: U256 = storage_key.into();

View File

@ -68,7 +68,7 @@ mod tests {
use reth_rpc_eth_types::{
EthStateCache, FeeHistoryCache, FeeHistoryCacheConfig, GasPriceOracle,
};
use reth_rpc_server_types::constants::DEFAULT_ETH_PROOF_WINDOW;
use reth_rpc_server_types::constants::{DEFAULT_ETH_PROOF_WINDOW, DEFAULT_PROOF_PERMITS};
use reth_tasks::pool::BlockingTaskPool;
use reth_transaction_pool::{test_utils::testing_pool, TransactionPool};
@ -97,6 +97,7 @@ mod tests {
fee_history_cache,
evm_config,
None,
DEFAULT_PROOF_PERMITS,
);
// https://etherscan.io/tx/0xa694b71e6c128a2ed8e2e0f6770bddbe52e3bb8f10e8472f9a79ab81497a8b5d