From 755438d61cfed05d97f2dd04e762a1be99c79b1d Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 17 May 2023 13:22:37 +0200 Subject: [PATCH] chore: spawn more eth calls and add docs (#2715) Co-authored-by: Bjerg --- crates/interfaces/src/executor.rs | 3 +- crates/rpc/rpc/src/eth/api/call.rs | 24 +++++++++-- crates/rpc/rpc/src/eth/api/mod.rs | 3 ++ crates/rpc/rpc/src/eth/api/server.rs | 64 ++++++++++++++++------------ crates/rpc/rpc/src/eth/api/state.rs | 15 +++---- crates/rpc/rpc/src/lib.rs | 15 +++++++ 6 files changed, 83 insertions(+), 41 deletions(-) diff --git a/crates/interfaces/src/executor.rs b/crates/interfaces/src/executor.rs index faa43e638..fd2c34ce2 100644 --- a/crates/interfaces/src/executor.rs +++ b/crates/interfaces/src/executor.rs @@ -50,7 +50,8 @@ pub enum BlockExecutionError { MissingTotalDifficulty { hash: H256 }, /// Only used for TestExecutor - #[cfg(feature = "test-utils")] + /// + /// Note: this is not feature gated for convenience. #[error("Execution unavailable for tests")] UnavailableForTest, } diff --git a/crates/rpc/rpc/src/eth/api/call.rs b/crates/rpc/rpc/src/eth/api/call.rs index b1ff1a296..7daca66c6 100644 --- a/crates/rpc/rpc/src/eth/api/call.rs +++ b/crates/rpc/rpc/src/eth/api/call.rs @@ -2,7 +2,7 @@ use crate::{ eth::{ - error::{EthApiError, EthResult, InvalidTransactionError, RevertError}, + error::{ensure_success, EthApiError, EthResult, InvalidTransactionError, RevertError}, revm_utils::{ build_call_evm_env, cap_tx_gas_limit_with_caller_allowance, get_precompiles, inspect, transact, @@ -13,13 +13,13 @@ use crate::{ }; use ethers_core::utils::get_contract_address; use reth_network_api::NetworkInfo; -use reth_primitives::{AccessList, BlockId, BlockNumberOrTag, U256}; +use reth_primitives::{AccessList, BlockId, BlockNumberOrTag, Bytes, U256}; use reth_provider::{BlockProviderIdExt, EvmEnvProvider, StateProvider, StateProviderFactory}; use reth_revm::{ access_list::AccessListInspector, database::{State, SubState}, }; -use reth_rpc_types::CallRequest; +use reth_rpc_types::{state::StateOverride, CallRequest}; use reth_transaction_pool::TransactionPool; use revm::{ db::{CacheDB, DatabaseRef}, @@ -48,6 +48,24 @@ where self.estimate_gas_with(cfg, block_env, request, state) } + /// Executes the call request (`eth_call`) and returns the output + pub(crate) async fn call( + &self, + request: CallRequest, + block_number: Option, + state_overrides: Option, + ) -> EthResult { + let (res, _env) = self + .transact_call_at( + request, + block_number.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)), + state_overrides, + ) + .await?; + + ensure_success(res.result) + } + /// Estimates the gas usage of the `request` with the state. /// /// This will execute the [CallRequest] and find the best gas limit via binary search diff --git a/crates/rpc/rpc/src/eth/api/mod.rs b/crates/rpc/rpc/src/eth/api/mod.rs index 9e32ceea9..9c1609998 100644 --- a/crates/rpc/rpc/src/eth/api/mod.rs +++ b/crates/rpc/rpc/src/eth/api/mod.rs @@ -126,6 +126,9 @@ where } /// Executes the future on a new blocking task. + /// + /// This accepts a closure that creates a new future using a clone of this type and spawns the + /// future onto a new task that is allowed to block. pub(crate) async fn on_blocking_task(&self, c: C) -> EthResult where C: FnOnce(Self) -> F, diff --git a/crates/rpc/rpc/src/eth/api/server.rs b/crates/rpc/rpc/src/eth/api/server.rs index dccaac27b..77db4ec96 100644 --- a/crates/rpc/rpc/src/eth/api/server.rs +++ b/crates/rpc/rpc/src/eth/api/server.rs @@ -3,10 +3,7 @@ use super::EthApiSpec; use crate::{ - eth::{ - api::{EthApi, EthTransactions}, - error::ensure_success, - }, + eth::api::{EthApi, EthTransactions}, result::{internal_rpc_err, ToRpcResult}, }; use jsonrpsee::core::RpcResult as Result; @@ -177,7 +174,7 @@ where /// Handler for: `eth_getBalance` async fn balance(&self, address: Address, block_number: Option) -> Result { trace!(target: "rpc::eth", ?address, ?block_number, "Serving eth_getBalance"); - Ok(EthApi::balance(self, address, block_number)?) + Ok(self.on_blocking_task(|this| async move { this.balance(address, block_number) }).await?) } /// Handler for: `eth_getStorageAt` @@ -188,7 +185,9 @@ where block_number: Option, ) -> Result { trace!(target: "rpc::eth", ?address, ?block_number, "Serving eth_getStorageAt"); - Ok(EthApi::storage_at(self, address, index, block_number).await?) + Ok(self + .on_blocking_task(|this| async move { this.storage_at(address, index, block_number) }) + .await?) } /// Handler for: `eth_getTransactionCount` @@ -198,13 +197,19 @@ where block_number: Option, ) -> Result { trace!(target: "rpc::eth", ?address, ?block_number, "Serving eth_getTransactionCount"); - Ok(EthApi::get_transaction_count(self, address, block_number)?) + Ok(self + .on_blocking_task( + |this| async move { this.get_transaction_count(address, block_number) }, + ) + .await?) } /// Handler for: `eth_getCode` async fn get_code(&self, address: Address, block_number: Option) -> Result { trace!(target: "rpc::eth", ?address, ?block_number, "Serving eth_getCode"); - Ok(EthApi::get_code(self, address, block_number)?) + Ok(self + .on_blocking_task(|this| async move { this.get_code(address, block_number) }) + .await?) } /// Handler for: `eth_call` @@ -215,15 +220,11 @@ where state_overrides: Option, ) -> Result { trace!(target: "rpc::eth", ?request, ?block_number, ?state_overrides, "Serving eth_call"); - let (res, _env) = self - .transact_call_at( - request, - block_number.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)), - state_overrides, - ) - .await?; - - Ok(ensure_success(res.result)?) + Ok(self + .on_blocking_task(|this| async move { + this.call(request, block_number, state_overrides).await + }) + .await?) } /// Handler for: `eth_createAccessList` @@ -233,11 +234,15 @@ where block_number: Option, ) -> Result { trace!(target: "rpc::eth", ?request, ?block_number, "Serving eth_createAccessList"); - let block_id = block_number.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)); - let access_list = self.create_access_list_at(request.clone(), block_number).await?; - request.access_list = Some(access_list.clone()); - let gas_used = self.estimate_gas_at(request, block_id).await?; - Ok(AccessListWithGasUsed { access_list, gas_used }) + Ok(self + .on_blocking_task(|this| async move { + let block_id = block_number.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)); + let access_list = this.create_access_list_at(request.clone(), block_number).await?; + request.access_list = Some(access_list.clone()); + let gas_used = this.estimate_gas_at(request, block_id).await?; + Ok(AccessListWithGasUsed { access_list, gas_used }) + }) + .await?) } /// Handler for: `eth_estimateGas` @@ -247,12 +252,15 @@ where block_number: Option, ) -> Result { trace!(target: "rpc::eth", ?request, ?block_number, "Serving eth_estimateGas"); - Ok(EthApi::estimate_gas_at( - self, - request, - block_number.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)), - ) - .await?) + Ok(self + .on_blocking_task(|this| async move { + this.estimate_gas_at( + request, + block_number.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)), + ) + .await + }) + .await?) } /// Handler for: `eth_gasPrice` diff --git a/crates/rpc/rpc/src/eth/api/state.rs b/crates/rpc/rpc/src/eth/api/state.rs index bb5284d28..42d9a285a 100644 --- a/crates/rpc/rpc/src/eth/api/state.rs +++ b/crates/rpc/rpc/src/eth/api/state.rs @@ -71,18 +71,15 @@ where Ok(U256::from(state.account_nonce(address)?.unwrap_or_default())) } - pub(crate) async fn storage_at( + pub(crate) fn storage_at( &self, address: Address, index: JsonStorageKey, block_id: Option, ) -> EthResult { - self.on_blocking_task(|this| async move { - let state = this.state_at_block_id_or_latest(block_id)?; - let value = state.storage(address, index.0)?.unwrap_or_default(); - Ok(H256(value.to_be_bytes())) - }) - .await + let state = self.state_at_block_id_or_latest(block_id)?; + let value = state.storage(address, index.0)?.unwrap_or_default(); + Ok(H256(value.to_be_bytes())) } #[allow(unused)] @@ -169,7 +166,7 @@ mod tests { GasPriceOracle::new(NoopProvider::default(), Default::default(), cache), ); let address = Address::random(); - let storage = eth_api.storage_at(address, U256::ZERO.into(), None).await.unwrap(); + let storage = eth_api.storage_at(address, U256::ZERO.into(), None).unwrap(); assert_eq!(storage, U256::ZERO.into()); // === Mock === @@ -190,7 +187,7 @@ mod tests { ); let storage_key: U256 = storage_key.into(); - let storage = eth_api.storage_at(address, storage_key.into(), None).await.unwrap(); + let storage = eth_api.storage_at(address, storage_key.into(), None).unwrap(); assert_eq!(storage, storage_value.into()); } } diff --git a/crates/rpc/rpc/src/lib.rs b/crates/rpc/rpc/src/lib.rs index 0c4d47e57..11bd60f59 100644 --- a/crates/rpc/rpc/src/lib.rs +++ b/crates/rpc/rpc/src/lib.rs @@ -8,6 +8,21 @@ //! Reth RPC implementation //! //! Provides the implementation of all RPC interfaces. +//! +//! +//! ## Note on blocking behaviour +//! +//! All async RPC handlers must non-blocking, see also [What is blocking](https://ryhl.io/blog/async-what-is-blocking/). +//! +//! A lot of the RPC are using a mix of async and direct calls to the database, which are blocking +//! and can reduce overall performance of all concurrent requests handled via the jsonrpsee server. +//! +//! To avoid this, all blocking or CPU intensive handlers must be spawned to a separate task. See +//! the [EthApi] handler implementations for examples. The rpc-api traits make no use of the +//! available jsonrpsee `blocking` attribute to give implementors more freedom because the +//! `blocking` attribute and async handlers are mutually exclusive. However, as mentioned above, a +//! lot of handlers make use of async functions, caching for example, but are also using blocking +//! disk-io, hence these calls are spawned as futures to a blocking task manually. mod admin; mod call_guard;