Putting RPC tracing calls behind semaphore (#1914)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Aditya Pandey
2023-03-25 15:57:52 +05:30
committed by GitHub
parent 9fc3e6f594
commit cb318192ba
6 changed files with 86 additions and 18 deletions

View File

@ -18,8 +18,17 @@ pub struct EthHandlers<Client, Pool, Network, Events> {
}
/// Additional config values for the eth namespace
#[derive(Debug, Clone, Eq, PartialEq, Default, Serialize, Deserialize)]
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct EthConfig {
/// Settings for the caching layer
pub cache: EthStateCacheConfig,
/// The maximum number of tracing calls that can be executed in concurrently.
pub max_tracing_requests: usize,
}
impl Default for EthConfig {
fn default() -> Self {
Self { cache: EthStateCacheConfig::default(), max_tracing_requests: 10 }
}
}

View File

@ -65,7 +65,7 @@ use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{BlockProvider, EvmEnvProvider, StateProviderFactory};
use reth_rpc::{
eth::cache::EthStateCache, AdminApi, DebugApi, EthApi, EthFilter, EthPubSub,
EthSubscriptionIdProvider, NetApi, TraceApi, Web3Api,
EthSubscriptionIdProvider, NetApi, TraceApi, TracingCallGuard, Web3Api,
};
use reth_rpc_api::servers::*;
use reth_tasks::TaskSpawner;
@ -78,6 +78,7 @@ use std::{
str::FromStr,
};
use strum::{AsRefStr, EnumString, EnumVariantNames, ParseError, VariantNames};
use tower::layer::util::{Identity, Stack};
use tower_http::cors::CorsLayer;
use tracing::{instrument, trace};
@ -451,6 +452,8 @@ pub struct RethModuleRegistry<Client, Pool, Network, Tasks, Events> {
config: RpcModuleConfig,
/// Holds a clone of all the eth namespace handlers
eth: Option<EthHandlers<Client, Pool, Network, Events>>,
/// to put trace calls behind semaphore
tracing_call_guard: TracingCallGuard,
/// Contains the [Methods] of a module
modules: HashMap<RethRpcModule, Methods>,
}
@ -476,6 +479,7 @@ impl<Client, Pool, Network, Tasks, Events>
eth: None,
executor,
modules: Default::default(),
tracing_call_guard: TracingCallGuard::new(config.eth.max_tracing_requests),
config,
events,
}
@ -535,7 +539,9 @@ where
let eth_api = self.eth_api();
self.modules.insert(
RethRpcModule::Debug,
DebugApi::new(self.client.clone(), eth_api).into_rpc().into(),
DebugApi::new(self.client.clone(), eth_api, self.tracing_call_guard.clone())
.into_rpc()
.into(),
);
self
}
@ -586,9 +592,13 @@ where
RethRpcModule::Admin => {
AdminApi::new(self.network.clone()).into_rpc().into()
}
RethRpcModule::Debug => {
DebugApi::new(self.client.clone(), eth_api.clone()).into_rpc().into()
}
RethRpcModule::Debug => DebugApi::new(
self.client.clone(),
eth_api.clone(),
self.tracing_call_guard.clone(),
)
.into_rpc()
.into(),
RethRpcModule::Eth => {
// merge all eth handlers
let mut module = eth_api.clone().into_rpc();
@ -600,11 +610,14 @@ where
RethRpcModule::Net => {
NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into()
}
RethRpcModule::Trace => {
TraceApi::new(self.client.clone(), eth_api.clone(), eth_cache.clone())
.into_rpc()
.into()
}
RethRpcModule::Trace => TraceApi::new(
self.client.clone(),
eth_api.clone(),
eth_cache.clone(),
self.tracing_call_guard.clone(),
)
.into_rpc()
.into(),
RethRpcModule::Web3 => Web3Api::new(self.network.clone()).into_rpc().into(),
})
.clone()

View File

@ -0,0 +1,21 @@
use std::sync::Arc;
use tokio::sync::{AcquireError, OwnedSemaphorePermit, Semaphore};
/// RPC Tracing call guard semaphore.
///
/// This is used to restrict the number of concurrent RPC requests to tracing methods like
/// `debug_traceTransaction` because they can consume a lot of memory.
#[derive(Clone, Debug)]
pub struct TracingCallGuard(Arc<Semaphore>);
impl TracingCallGuard {
/// Create a new `TracingCallGuard` with the given maximum number of tracing calls in parallel.
pub fn new(max_tracing_requests: usize) -> Self {
Self(Arc::new(Semaphore::new(max_tracing_requests)))
}
/// See also [Semaphore::acquire_owned]
pub async fn acquire_owned(self) -> Result<OwnedSemaphorePermit, AcquireError> {
self.0.acquire_owned().await
}
}

View File

@ -5,7 +5,7 @@ use crate::{
EthTransactions, TransactionSource,
},
result::{internal_rpc_err, ToRpcResult},
EthApiSpec,
EthApiSpec, TracingCallGuard,
};
use async_trait::async_trait;
use jsonrpsee::core::RpcResult;
@ -35,14 +35,17 @@ pub struct DebugApi<Client, Eth> {
client: Client,
/// The implementation of `eth` API
eth_api: Eth,
// restrict the number of concurrent calls to `debug_traceTransaction`
tracing_call_guard: TracingCallGuard,
}
// === impl DebugApi ===
impl<Client, Eth> DebugApi<Client, Eth> {
/// Create a new instance of the [DebugApi]
pub fn new(client: Client, eth: Eth) -> Self {
Self { client, eth_api: eth }
pub fn new(client: Client, eth: Eth, tracing_call_guard: TracingCallGuard) -> Self {
Self { client, eth_api: eth, tracing_call_guard }
}
}

View File

@ -12,6 +12,7 @@
//! Provides the implementation of all RPC interfaces.
mod admin;
mod call_guard;
mod debug;
mod engine;
pub mod eth;
@ -21,6 +22,7 @@ mod trace;
mod web3;
pub use admin::AdminApi;
pub use call_guard::TracingCallGuard;
pub use debug::DebugApi;
pub use engine::EngineApi;
pub use eth::{EthApi, EthApiSpec, EthFilter, EthPubSub, EthSubscriptionIdProvider};

View File

@ -4,6 +4,7 @@ use crate::{
utils::recover_raw_transaction, EthTransactions,
},
result::internal_rpc_err,
TracingCallGuard,
};
use async_trait::async_trait;
use jsonrpsee::core::RpcResult as Result;
@ -14,7 +15,6 @@ use reth_revm::{
env::tx_env_with_recovered,
tracing::{TracingInspector, TracingInspectorConfig},
};
use reth_rpc_api::TraceApiServer;
use reth_rpc_types::{
trace::{filter::TraceFilter, parity::*},
@ -22,6 +22,7 @@ use reth_rpc_types::{
};
use revm::primitives::{Env, ExecutionResult, ResultAndState};
use std::collections::HashSet;
use tokio::sync::{AcquireError, OwnedSemaphorePermit};
/// `trace` API implementation.
///
@ -34,14 +35,29 @@ pub struct TraceApi<Client, Eth> {
eth_api: Eth,
/// The async cache frontend for eth related data
eth_cache: EthStateCache,
// restrict the number of concurrent calls to `trace_*`
tracing_call_guard: TracingCallGuard,
}
// === impl TraceApi ===
impl<Client, Eth> TraceApi<Client, Eth> {
/// Create a new instance of the [TraceApi]
pub fn new(client: Client, eth_api: Eth, eth_cache: EthStateCache) -> Self {
Self { client, eth_api, eth_cache }
pub fn new(
client: Client,
eth_api: Eth,
eth_cache: EthStateCache,
tracing_call_guard: TracingCallGuard,
) -> Self {
Self { client, eth_api, eth_cache, tracing_call_guard }
}
/// Acquires a permit to execute a tracing call.
async fn acquire_trace_permit(
&self,
) -> std::result::Result<OwnedSemaphorePermit, AcquireError> {
self.tracing_call_guard.clone().acquire_owned().await
}
}
@ -90,6 +106,7 @@ where
trace_types: HashSet<TraceType>,
block_id: Option<BlockId>,
) -> EthResult<TraceResults> {
let _permit = self.acquire_trace_permit().await;
let tx = recover_raw_transaction(tx)?;
let (cfg, block, at) = self
@ -123,6 +140,8 @@ where
hash: H256,
trace_address: Vec<usize>,
) -> EthResult<Option<LocalizedTransactionTrace>> {
let _permit = self.acquire_trace_permit().await;
match self.trace_transaction(hash).await? {
None => Ok(None),
Some(traces) => {
@ -138,6 +157,8 @@ where
&self,
hash: H256,
) -> EthResult<Option<Vec<LocalizedTransactionTrace>>> {
let _permit = self.acquire_trace_permit().await;
let (transaction, at) = match self.eth_api.transaction_by_hash_at(hash).await? {
None => return Ok(None),
Some(res) => res,
@ -152,7 +173,6 @@ where
// execute the trace
self.trace_at(env, TracingInspectorConfig::default_parity(), at, |inspector, _| {
let traces = inspector.into_parity_builder().into_localized_transaction_traces(tx_info);
Ok(Some(traces))
})
}