chore: spawn more eth calls and add docs (#2715)

Co-authored-by: Bjerg <onbjerg@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2023-05-17 13:22:37 +02:00
committed by GitHub
parent cadf702e5d
commit 755438d61c
6 changed files with 83 additions and 41 deletions

View File

@ -50,7 +50,8 @@ pub enum BlockExecutionError {
MissingTotalDifficulty { hash: H256 }, MissingTotalDifficulty { hash: H256 },
/// Only used for TestExecutor /// Only used for TestExecutor
#[cfg(feature = "test-utils")] ///
/// Note: this is not feature gated for convenience.
#[error("Execution unavailable for tests")] #[error("Execution unavailable for tests")]
UnavailableForTest, UnavailableForTest,
} }

View File

@ -2,7 +2,7 @@
use crate::{ use crate::{
eth::{ eth::{
error::{EthApiError, EthResult, InvalidTransactionError, RevertError}, error::{ensure_success, EthApiError, EthResult, InvalidTransactionError, RevertError},
revm_utils::{ revm_utils::{
build_call_evm_env, cap_tx_gas_limit_with_caller_allowance, get_precompiles, inspect, build_call_evm_env, cap_tx_gas_limit_with_caller_allowance, get_precompiles, inspect,
transact, transact,
@ -13,13 +13,13 @@ use crate::{
}; };
use ethers_core::utils::get_contract_address; use ethers_core::utils::get_contract_address;
use reth_network_api::NetworkInfo; use reth_network_api::NetworkInfo;
use reth_primitives::{AccessList, BlockId, BlockNumberOrTag, U256}; use reth_primitives::{AccessList, BlockId, BlockNumberOrTag, Bytes, U256};
use reth_provider::{BlockProviderIdExt, EvmEnvProvider, StateProvider, StateProviderFactory}; use reth_provider::{BlockProviderIdExt, EvmEnvProvider, StateProvider, StateProviderFactory};
use reth_revm::{ use reth_revm::{
access_list::AccessListInspector, access_list::AccessListInspector,
database::{State, SubState}, database::{State, SubState},
}; };
use reth_rpc_types::CallRequest; use reth_rpc_types::{state::StateOverride, CallRequest};
use reth_transaction_pool::TransactionPool; use reth_transaction_pool::TransactionPool;
use revm::{ use revm::{
db::{CacheDB, DatabaseRef}, db::{CacheDB, DatabaseRef},
@ -48,6 +48,24 @@ where
self.estimate_gas_with(cfg, block_env, request, state) self.estimate_gas_with(cfg, block_env, request, state)
} }
/// Executes the call request (`eth_call`) and returns the output
pub(crate) async fn call(
&self,
request: CallRequest,
block_number: Option<BlockId>,
state_overrides: Option<StateOverride>,
) -> EthResult<Bytes> {
let (res, _env) = self
.transact_call_at(
request,
block_number.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)),
state_overrides,
)
.await?;
ensure_success(res.result)
}
/// Estimates the gas usage of the `request` with the state. /// Estimates the gas usage of the `request` with the state.
/// ///
/// This will execute the [CallRequest] and find the best gas limit via binary search /// This will execute the [CallRequest] and find the best gas limit via binary search

View File

@ -126,6 +126,9 @@ where
} }
/// Executes the future on a new blocking task. /// Executes the future on a new blocking task.
///
/// This accepts a closure that creates a new future using a clone of this type and spawns the
/// future onto a new task that is allowed to block.
pub(crate) async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R> pub(crate) async fn on_blocking_task<C, F, R>(&self, c: C) -> EthResult<R>
where where
C: FnOnce(Self) -> F, C: FnOnce(Self) -> F,

View File

@ -3,10 +3,7 @@
use super::EthApiSpec; use super::EthApiSpec;
use crate::{ use crate::{
eth::{ eth::api::{EthApi, EthTransactions},
api::{EthApi, EthTransactions},
error::ensure_success,
},
result::{internal_rpc_err, ToRpcResult}, result::{internal_rpc_err, ToRpcResult},
}; };
use jsonrpsee::core::RpcResult as Result; use jsonrpsee::core::RpcResult as Result;
@ -177,7 +174,7 @@ where
/// Handler for: `eth_getBalance` /// Handler for: `eth_getBalance`
async fn balance(&self, address: Address, block_number: Option<BlockId>) -> Result<U256> { async fn balance(&self, address: Address, block_number: Option<BlockId>) -> Result<U256> {
trace!(target: "rpc::eth", ?address, ?block_number, "Serving eth_getBalance"); trace!(target: "rpc::eth", ?address, ?block_number, "Serving eth_getBalance");
Ok(EthApi::balance(self, address, block_number)?) Ok(self.on_blocking_task(|this| async move { this.balance(address, block_number) }).await?)
} }
/// Handler for: `eth_getStorageAt` /// Handler for: `eth_getStorageAt`
@ -188,7 +185,9 @@ where
block_number: Option<BlockId>, block_number: Option<BlockId>,
) -> Result<H256> { ) -> Result<H256> {
trace!(target: "rpc::eth", ?address, ?block_number, "Serving eth_getStorageAt"); trace!(target: "rpc::eth", ?address, ?block_number, "Serving eth_getStorageAt");
Ok(EthApi::storage_at(self, address, index, block_number).await?) Ok(self
.on_blocking_task(|this| async move { this.storage_at(address, index, block_number) })
.await?)
} }
/// Handler for: `eth_getTransactionCount` /// Handler for: `eth_getTransactionCount`
@ -198,13 +197,19 @@ where
block_number: Option<BlockId>, block_number: Option<BlockId>,
) -> Result<U256> { ) -> Result<U256> {
trace!(target: "rpc::eth", ?address, ?block_number, "Serving eth_getTransactionCount"); trace!(target: "rpc::eth", ?address, ?block_number, "Serving eth_getTransactionCount");
Ok(EthApi::get_transaction_count(self, address, block_number)?) Ok(self
.on_blocking_task(
|this| async move { this.get_transaction_count(address, block_number) },
)
.await?)
} }
/// Handler for: `eth_getCode` /// Handler for: `eth_getCode`
async fn get_code(&self, address: Address, block_number: Option<BlockId>) -> Result<Bytes> { async fn get_code(&self, address: Address, block_number: Option<BlockId>) -> Result<Bytes> {
trace!(target: "rpc::eth", ?address, ?block_number, "Serving eth_getCode"); trace!(target: "rpc::eth", ?address, ?block_number, "Serving eth_getCode");
Ok(EthApi::get_code(self, address, block_number)?) Ok(self
.on_blocking_task(|this| async move { this.get_code(address, block_number) })
.await?)
} }
/// Handler for: `eth_call` /// Handler for: `eth_call`
@ -215,15 +220,11 @@ where
state_overrides: Option<StateOverride>, state_overrides: Option<StateOverride>,
) -> Result<Bytes> { ) -> Result<Bytes> {
trace!(target: "rpc::eth", ?request, ?block_number, ?state_overrides, "Serving eth_call"); trace!(target: "rpc::eth", ?request, ?block_number, ?state_overrides, "Serving eth_call");
let (res, _env) = self Ok(self
.transact_call_at( .on_blocking_task(|this| async move {
request, this.call(request, block_number, state_overrides).await
block_number.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)), })
state_overrides, .await?)
)
.await?;
Ok(ensure_success(res.result)?)
} }
/// Handler for: `eth_createAccessList` /// Handler for: `eth_createAccessList`
@ -233,11 +234,15 @@ where
block_number: Option<BlockId>, block_number: Option<BlockId>,
) -> Result<AccessListWithGasUsed> { ) -> Result<AccessListWithGasUsed> {
trace!(target: "rpc::eth", ?request, ?block_number, "Serving eth_createAccessList"); trace!(target: "rpc::eth", ?request, ?block_number, "Serving eth_createAccessList");
let block_id = block_number.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)); Ok(self
let access_list = self.create_access_list_at(request.clone(), block_number).await?; .on_blocking_task(|this| async move {
request.access_list = Some(access_list.clone()); let block_id = block_number.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest));
let gas_used = self.estimate_gas_at(request, block_id).await?; let access_list = this.create_access_list_at(request.clone(), block_number).await?;
Ok(AccessListWithGasUsed { access_list, gas_used }) request.access_list = Some(access_list.clone());
let gas_used = this.estimate_gas_at(request, block_id).await?;
Ok(AccessListWithGasUsed { access_list, gas_used })
})
.await?)
} }
/// Handler for: `eth_estimateGas` /// Handler for: `eth_estimateGas`
@ -247,12 +252,15 @@ where
block_number: Option<BlockId>, block_number: Option<BlockId>,
) -> Result<U256> { ) -> Result<U256> {
trace!(target: "rpc::eth", ?request, ?block_number, "Serving eth_estimateGas"); trace!(target: "rpc::eth", ?request, ?block_number, "Serving eth_estimateGas");
Ok(EthApi::estimate_gas_at( Ok(self
self, .on_blocking_task(|this| async move {
request, this.estimate_gas_at(
block_number.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)), request,
) block_number.unwrap_or(BlockId::Number(BlockNumberOrTag::Latest)),
.await?) )
.await
})
.await?)
} }
/// Handler for: `eth_gasPrice` /// Handler for: `eth_gasPrice`

View File

@ -71,18 +71,15 @@ where
Ok(U256::from(state.account_nonce(address)?.unwrap_or_default())) Ok(U256::from(state.account_nonce(address)?.unwrap_or_default()))
} }
pub(crate) async fn storage_at( pub(crate) fn storage_at(
&self, &self,
address: Address, address: Address,
index: JsonStorageKey, index: JsonStorageKey,
block_id: Option<BlockId>, block_id: Option<BlockId>,
) -> EthResult<H256> { ) -> EthResult<H256> {
self.on_blocking_task(|this| async move { let state = self.state_at_block_id_or_latest(block_id)?;
let state = this.state_at_block_id_or_latest(block_id)?; let value = state.storage(address, index.0)?.unwrap_or_default();
let value = state.storage(address, index.0)?.unwrap_or_default(); Ok(H256(value.to_be_bytes()))
Ok(H256(value.to_be_bytes()))
})
.await
} }
#[allow(unused)] #[allow(unused)]
@ -169,7 +166,7 @@ mod tests {
GasPriceOracle::new(NoopProvider::default(), Default::default(), cache), GasPriceOracle::new(NoopProvider::default(), Default::default(), cache),
); );
let address = Address::random(); let address = Address::random();
let storage = eth_api.storage_at(address, U256::ZERO.into(), None).await.unwrap(); let storage = eth_api.storage_at(address, U256::ZERO.into(), None).unwrap();
assert_eq!(storage, U256::ZERO.into()); assert_eq!(storage, U256::ZERO.into());
// === Mock === // === Mock ===
@ -190,7 +187,7 @@ mod tests {
); );
let storage_key: U256 = storage_key.into(); let storage_key: U256 = storage_key.into();
let storage = eth_api.storage_at(address, storage_key.into(), None).await.unwrap(); let storage = eth_api.storage_at(address, storage_key.into(), None).unwrap();
assert_eq!(storage, storage_value.into()); assert_eq!(storage, storage_value.into());
} }
} }

View File

@ -8,6 +8,21 @@
//! Reth RPC implementation //! Reth RPC implementation
//! //!
//! Provides the implementation of all RPC interfaces. //! Provides the implementation of all RPC interfaces.
//!
//!
//! ## Note on blocking behaviour
//!
//! All async RPC handlers must non-blocking, see also [What is blocking](https://ryhl.io/blog/async-what-is-blocking/).
//!
//! A lot of the RPC are using a mix of async and direct calls to the database, which are blocking
//! and can reduce overall performance of all concurrent requests handled via the jsonrpsee server.
//!
//! To avoid this, all blocking or CPU intensive handlers must be spawned to a separate task. See
//! the [EthApi] handler implementations for examples. The rpc-api traits make no use of the
//! available jsonrpsee `blocking` attribute to give implementors more freedom because the
//! `blocking` attribute and async handlers are mutually exclusive. However, as mentioned above, a
//! lot of handlers make use of async functions, caching for example, but are also using blocking
//! disk-io, hence these calls are spawned as futures to a blocking task manually.
mod admin; mod admin;
mod call_guard; mod call_guard;