diff --git a/book/cli/reth/node.md b/book/cli/reth/node.md index fe96358e5..61759a694 100644 --- a/book/cli/reth/node.md +++ b/book/cli/reth/node.md @@ -318,6 +318,11 @@ RPC: [default: 0] + --rpc.proof-permits + Maximum number of concurrent getproof requests + + [default: 25] + RPC State Cache: --rpc-cache.max-blocks Max number of blocks in cache diff --git a/crates/node/core/src/args/rpc_server.rs b/crates/node/core/src/args/rpc_server.rs index 9b562329b..761f0c3f7 100644 --- a/crates/node/core/src/args/rpc_server.rs +++ b/crates/node/core/src/args/rpc_server.rs @@ -166,6 +166,10 @@ pub struct RpcServerArgs { )] pub rpc_eth_proof_window: u64, + /// Maximum number of concurrent getproof requests. + #[arg(long = "rpc.proof-permits", alias = "rpc-proof-permits", value_name = "COUNT", default_value_t = constants::DEFAULT_PROOF_PERMITS)] + pub rpc_proof_permits: usize, + /// State cache configuration. #[command(flatten)] pub rpc_state_cache: RpcStateCacheArgs, @@ -299,6 +303,7 @@ impl Default for RpcServerArgs { rpc_eth_proof_window: constants::DEFAULT_ETH_PROOF_WINDOW, gas_price_oracle: GasPriceOracleArgs::default(), rpc_state_cache: RpcStateCacheArgs::default(), + rpc_proof_permits: constants::DEFAULT_PROOF_PERMITS, } } } diff --git a/crates/optimism/rpc/src/eth/mod.rs b/crates/optimism/rpc/src/eth/mod.rs index b73311cb7..d580a30e9 100644 --- a/crates/optimism/rpc/src/eth/mod.rs +++ b/crates/optimism/rpc/src/eth/mod.rs @@ -20,7 +20,7 @@ use reth_rpc_types::SyncStatus; use reth_tasks::{pool::BlockingTaskPool, TaskSpawner}; use reth_transaction_pool::TransactionPool; use std::future::Future; -use tokio::sync::Mutex; +use tokio::sync::{AcquireError, Mutex, OwnedSemaphorePermit}; /// OP-Reth `Eth` API implementation. /// @@ -108,6 +108,19 @@ impl SpawnBlocking for OpEthApi { fn tracing_task_pool(&self) -> &BlockingTaskPool { self.inner.tracing_task_pool() } + + fn acquire_owned( + &self, + ) -> impl Future> + Send { + self.inner.acquire_owned() + } + + fn acquire_many_owned( + &self, + n: u32, + ) -> impl Future> + Send { + self.inner.acquire_many_owned(n) + } } impl LoadReceipt for OpEthApi { diff --git a/crates/rpc/rpc-builder/src/config.rs b/crates/rpc/rpc-builder/src/config.rs index 837b80a99..1f61f5791 100644 --- a/crates/rpc/rpc-builder/src/config.rs +++ b/crates/rpc/rpc-builder/src/config.rs @@ -95,6 +95,7 @@ impl RethRpcServerConfig for RpcServerArgs { .rpc_gas_cap(self.rpc_gas_cap) .state_cache(self.state_cache_config()) .gpo_config(self.gas_price_oracle_config()) + .proof_permits(self.rpc_proof_permits) } fn state_cache_config(&self) -> EthStateCacheConfig { diff --git a/crates/rpc/rpc-builder/src/eth.rs b/crates/rpc/rpc-builder/src/eth.rs index 8e897b771..b9b2d63ef 100644 --- a/crates/rpc/rpc-builder/src/eth.rs +++ b/crates/rpc/rpc-builder/src/eth.rs @@ -14,7 +14,7 @@ use reth_rpc_eth_types::{ }; use reth_rpc_server_types::constants::{ default_max_tracing_requests, DEFAULT_ETH_PROOF_WINDOW, DEFAULT_MAX_BLOCKS_PER_FILTER, - DEFAULT_MAX_LOGS_PER_RESPONSE, + DEFAULT_MAX_LOGS_PER_RESPONSE, DEFAULT_PROOF_PERMITS, }; use reth_tasks::{pool::BlockingTaskPool, TaskSpawner}; use reth_transaction_pool::TransactionPool; @@ -159,6 +159,8 @@ pub struct EthConfig { pub stale_filter_ttl: Duration, /// Settings for the fee history cache pub fee_history_cache: FeeHistoryCacheConfig, + /// The maximum number of getproof calls that can be executed concurrently. + pub proof_permits: usize, } impl EthConfig { @@ -183,6 +185,7 @@ impl Default for EthConfig { rpc_gas_cap: RPC_DEFAULT_GAS_CAP.into(), stale_filter_ttl: DEFAULT_STALE_FILTER_TTL, fee_history_cache: FeeHistoryCacheConfig::default(), + proof_permits: DEFAULT_PROOF_PERMITS, } } } @@ -229,6 +232,12 @@ impl EthConfig { self.eth_proof_window = window; self } + + /// Configures the number of getproof requests + pub const fn proof_permits(mut self, permits: usize) -> Self { + self.proof_permits = permits; + self + } } /// Context for building the `eth` namespace API. @@ -285,6 +294,7 @@ impl EthApiBuild { fee_history_cache, ctx.evm_config.clone(), None, + ctx.config.proof_permits, ) } } diff --git a/crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs b/crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs index c199d4de6..4a2c81b0f 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/blocking_task.rs @@ -4,7 +4,7 @@ use futures::Future; use reth_rpc_eth_types::{EthApiError, EthResult}; use reth_tasks::{pool::BlockingTaskPool, TaskSpawner}; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, AcquireError, OwnedSemaphorePermit}; /// Executes code on a blocking thread. pub trait SpawnBlocking: Clone + Send + Sync + 'static { @@ -18,6 +18,17 @@ pub trait SpawnBlocking: Clone + Send + Sync + 'static { /// Thread pool access in default trait method implementations. fn tracing_task_pool(&self) -> &BlockingTaskPool; + /// See also [`Semaphore::acquire_owned`](`tokio::sync::Semaphore::acquire_owned`). + fn acquire_owned( + &self, + ) -> impl Future> + Send; + + /// See also [`Semaphore::acquire_many_owned`](`tokio::sync::Semaphore::acquire_many_owned`). + fn acquire_many_owned( + &self, + n: u32, + ) -> impl Future> + Send; + /// Executes the future on a new blocking task. /// /// Note: This is expected for futures that are dominated by blocking IO operations, for tracing diff --git a/crates/rpc/rpc-eth-api/src/helpers/state.rs b/crates/rpc/rpc-eth-api/src/helpers/state.rs index 05af8f547..4b0d62925 100644 --- a/crates/rpc/rpc-eth-api/src/helpers/state.rs +++ b/crates/rpc/rpc-eth-api/src/helpers/state.rs @@ -2,6 +2,7 @@ //! RPC methods. use futures::Future; +use reth_errors::RethError; use reth_evm::ConfigureEvmEnv; use reth_primitives::{Address, BlockId, Bytes, Header, B256, U256}; use reth_provider::{BlockIdReader, StateProvider, StateProviderBox, StateProviderFactory}; @@ -102,12 +103,19 @@ pub trait EthState: LoadState + SpawnBlocking { return Err(EthApiError::ExceedsMaxProofWindow) } - Ok(self.spawn_blocking_io(move |this| { - let state = this.state_at_block_id(block_id)?; - let storage_keys = keys.iter().map(|key| key.0).collect::>(); - let proof = state.proof(&BundleState::default(), address, &storage_keys)?; - Ok(from_primitive_account_proof(proof)) - })) + Ok(async move { + let _permit = self + .acquire_owned() + .await + .map_err(|err| EthApiError::Internal(RethError::other(err)))?; + self.spawn_blocking_io(move |this| { + let state = this.state_at_block_id(block_id)?; + let storage_keys = keys.iter().map(|key| key.0).collect::>(); + let proof = state.proof(&BundleState::default(), address, &storage_keys)?; + Ok(from_primitive_account_proof(proof)) + }) + .await + }) } } diff --git a/crates/rpc/rpc-server-types/src/constants.rs b/crates/rpc/rpc-server-types/src/constants.rs index f1af5fb26..e3c129bf6 100644 --- a/crates/rpc/rpc-server-types/src/constants.rs +++ b/crates/rpc/rpc-server-types/src/constants.rs @@ -26,6 +26,9 @@ pub fn default_max_tracing_requests() -> usize { .map_or(25, |cpus| max(cpus.get().saturating_sub(RESERVED), RESERVED)) } +/// The default number of getproof calls we are allowing to run concurrently. +pub const DEFAULT_PROOF_PERMITS: usize = 25; + /// The default IPC endpoint #[cfg(windows)] pub const DEFAULT_IPC_ENDPOINT: &str = r"\\.\pipe\reth.ipc"; diff --git a/crates/rpc/rpc/src/eth/core.rs b/crates/rpc/rpc/src/eth/core.rs index c82e30625..36030741f 100644 --- a/crates/rpc/rpc/src/eth/core.rs +++ b/crates/rpc/rpc/src/eth/core.rs @@ -1,6 +1,7 @@ //! Implementation of the [`jsonrpsee`] generated [`EthApiServer`](crate::EthApi) trait //! Handles RPC requests for the `eth_` namespace. +use futures::Future; use std::sync::Arc; use derive_more::Deref; @@ -11,8 +12,11 @@ use reth_rpc_eth_api::{ RawTransactionForwarder, }; use reth_rpc_eth_types::{EthStateCache, FeeHistoryCache, GasCap, GasPriceOracle, PendingBlock}; -use reth_tasks::{pool::BlockingTaskPool, TaskSpawner, TokioTaskExecutor}; -use tokio::sync::Mutex; +use reth_tasks::{ + pool::{BlockingTaskGuard, BlockingTaskPool}, + TaskSpawner, TokioTaskExecutor, +}; +use tokio::sync::{AcquireError, Mutex, OwnedSemaphorePermit}; use crate::eth::DevSigner; @@ -49,6 +53,7 @@ where fee_history_cache: FeeHistoryCache, evm_config: EvmConfig, raw_transaction_forwarder: Option>, + proof_permits: usize, ) -> Self { Self::with_spawner( provider, @@ -63,6 +68,7 @@ where fee_history_cache, evm_config, raw_transaction_forwarder, + proof_permits, ) } @@ -81,6 +87,7 @@ where fee_history_cache: FeeHistoryCache, evm_config: EvmConfig, raw_transaction_forwarder: Option>, + proof_permits: usize, ) -> Self { // get the block number of the latest block let latest_block = provider @@ -106,6 +113,7 @@ where fee_history_cache, evm_config, raw_transaction_forwarder: parking_lot::RwLock::new(raw_transaction_forwarder), + blocking_task_guard: BlockingTaskGuard::new(proof_permits), }; Self { inner: Arc::new(inner) } @@ -140,6 +148,19 @@ where fn tracing_task_pool(&self) -> &reth_tasks::pool::BlockingTaskPool { self.inner.blocking_task_pool() } + + fn acquire_owned( + &self, + ) -> impl Future> + Send { + self.blocking_task_guard.clone().acquire_owned() + } + + fn acquire_many_owned( + &self, + n: u32, + ) -> impl Future> + Send { + self.blocking_task_guard.clone().acquire_many_owned(n) + } } impl EthApi { @@ -184,6 +205,8 @@ pub struct EthApiInner { evm_config: EvmConfig, /// Allows forwarding received raw transactions raw_transaction_forwarder: parking_lot::RwLock>>, + /// Guard for getproof calls + blocking_task_guard: BlockingTaskGuard, } impl EthApiInner { @@ -304,7 +327,7 @@ mod tests { use reth_rpc_eth_types::{ EthStateCache, FeeHistoryCache, FeeHistoryCacheConfig, GasPriceOracle, }; - use reth_rpc_server_types::constants::DEFAULT_ETH_PROOF_WINDOW; + use reth_rpc_server_types::constants::{DEFAULT_ETH_PROOF_WINDOW, DEFAULT_PROOF_PERMITS}; use reth_rpc_types::FeeHistory; use reth_tasks::pool::BlockingTaskPool; use reth_testing_utils::{generators, generators::Rng}; @@ -341,6 +364,7 @@ mod tests { fee_history_cache, evm_config, None, + DEFAULT_PROOF_PERMITS, ) } diff --git a/crates/rpc/rpc/src/eth/helpers/state.rs b/crates/rpc/rpc/src/eth/helpers/state.rs index 369bd6ba7..d3a99d2f8 100644 --- a/crates/rpc/rpc/src/eth/helpers/state.rs +++ b/crates/rpc/rpc/src/eth/helpers/state.rs @@ -51,7 +51,7 @@ mod tests { use reth_rpc_eth_types::{ EthStateCache, FeeHistoryCache, FeeHistoryCacheConfig, GasPriceOracle, }; - use reth_rpc_server_types::constants::DEFAULT_ETH_PROOF_WINDOW; + use reth_rpc_server_types::constants::{DEFAULT_ETH_PROOF_WINDOW, DEFAULT_PROOF_PERMITS}; use reth_tasks::pool::BlockingTaskPool; use reth_transaction_pool::test_utils::testing_pool; @@ -76,6 +76,7 @@ mod tests { FeeHistoryCache::new(cache, FeeHistoryCacheConfig::default()), evm_config, None, + DEFAULT_PROOF_PERMITS, ); let address = Address::random(); let storage = eth_api.storage_at(address, U256::ZERO.into(), None).await.unwrap(); @@ -102,6 +103,7 @@ mod tests { FeeHistoryCache::new(cache, FeeHistoryCacheConfig::default()), evm_config, None, + DEFAULT_PROOF_PERMITS, ); let storage_key: U256 = storage_key.into(); diff --git a/crates/rpc/rpc/src/eth/helpers/transaction.rs b/crates/rpc/rpc/src/eth/helpers/transaction.rs index 13e3dbd5c..872af0cee 100644 --- a/crates/rpc/rpc/src/eth/helpers/transaction.rs +++ b/crates/rpc/rpc/src/eth/helpers/transaction.rs @@ -68,7 +68,7 @@ mod tests { use reth_rpc_eth_types::{ EthStateCache, FeeHistoryCache, FeeHistoryCacheConfig, GasPriceOracle, }; - use reth_rpc_server_types::constants::DEFAULT_ETH_PROOF_WINDOW; + use reth_rpc_server_types::constants::{DEFAULT_ETH_PROOF_WINDOW, DEFAULT_PROOF_PERMITS}; use reth_tasks::pool::BlockingTaskPool; use reth_transaction_pool::{test_utils::testing_pool, TransactionPool}; @@ -97,6 +97,7 @@ mod tests { fee_history_cache, evm_config, None, + DEFAULT_PROOF_PERMITS, ); // https://etherscan.io/tx/0xa694b71e6c128a2ed8e2e0f6770bddbe52e3bb8f10e8472f9a79ab81497a8b5d