refactor: simplify and relax some RPC bounds (#13202)

This commit is contained in:
Arsenii Kulikov
2024-12-07 07:28:50 +04:00
committed by GitHub
parent 552c6237a8
commit 9167e454b5
8 changed files with 279 additions and 350 deletions

View File

@ -1,6 +1,6 @@
use alloy_consensus::Header;
use reth_evm::ConfigureEvm;
use reth_primitives::EthPrimitives;
use reth_primitives::NodePrimitives;
use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider, StateProviderFactory};
use reth_rpc::{EthFilter, EthPubSub};
use reth_rpc_eth_api::EthApiTypes;
@ -15,38 +15,35 @@ pub type DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, Events, Eth
/// Handlers for core, filter and pubsub `eth` namespace APIs.
#[derive(Debug, Clone)]
pub struct EthHandlers<Provider: BlockReader, Pool, Network, Events, EthApi: EthApiTypes> {
pub struct EthHandlers<Provider: BlockReader, Events, EthApi: EthApiTypes> {
/// Main `eth_` request handler
pub api: EthApi,
/// The async caching layer used by the eth handlers
pub cache: EthStateCache<Provider::Block, Provider::Receipt>,
/// Polling based filter handler available on all transports
pub filter: EthFilter<Provider, Pool, EthApi>,
pub filter: EthFilter<EthApi>,
/// Handler for subscriptions only available for transports that support it (ws, ipc)
pub pubsub: EthPubSub<Provider, Pool, Events, Network, EthApi::TransactionCompat>,
pub pubsub: EthPubSub<EthApi, Events>,
}
impl<Provider, Pool, Network, Events, EthApi> EthHandlers<Provider, Pool, Network, Events, EthApi>
impl<Provider, Events, EthApi> EthHandlers<Provider, Events, EthApi>
where
Provider: StateProviderFactory
+ BlockReader<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
Header = reth_primitives::Header,
Block = <Events::Primitives as NodePrimitives>::Block,
Receipt = <Events::Primitives as NodePrimitives>::Receipt,
> + EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Pool: Send + Sync + Clone + 'static,
Network: Clone + 'static,
Events: CanonStateSubscriptions<Primitives = EthPrimitives> + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
EthApi: EthApiTypes + 'static,
{
/// Returns a new instance with handlers for `eth` namespace.
///
/// This will spawn all necessary tasks for the handlers.
#[allow(clippy::too_many_arguments)]
pub fn bootstrap<EvmConfig, Tasks>(
pub fn bootstrap<EvmConfig, Tasks, Pool, Network>(
provider: Provider,
pool: Pool,
network: Network,
@ -92,22 +89,13 @@ where
let api = eth_api_builder(&ctx);
let filter = EthFilter::new(
ctx.provider.clone(),
ctx.pool.clone(),
ctx.cache.clone(),
ctx.config.filter_config(),
Box::new(ctx.executor.clone()),
api.tx_resp_builder().clone(),
);
let filter =
EthFilter::new(api.clone(), ctx.config.filter_config(), Box::new(ctx.executor.clone()));
let pubsub = EthPubSub::with_spawner(
ctx.provider.clone(),
ctx.pool.clone(),
api.clone(),
ctx.events.clone(),
ctx.network.clone(),
Box::new(ctx.executor.clone()),
api.tx_resp_builder().clone(),
);
Self { api, cache: ctx.cache, filter, pubsub }

View File

@ -204,11 +204,11 @@ use reth_consensus::FullConsensus;
use reth_engine_primitives::EngineTypes;
use reth_evm::{execute::BlockExecutorProvider, ConfigureEvm};
use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
use reth_primitives::{EthPrimitives, NodePrimitives};
use reth_primitives::NodePrimitives;
use reth_provider::{
AccountReader, BlockReader, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader,
EvmEnvProvider, FullRpcProvider, HeaderProvider, ProviderBlock, ProviderHeader,
ProviderReceipt, ReceiptProvider, StateProviderFactory,
EvmEnvProvider, FullRpcProvider, ProviderBlock, ProviderHeader, ProviderReceipt,
ReceiptProvider, StateProviderFactory,
};
use reth_rpc::{
AdminApi, DebugApi, EngineEthApi, EthBundle, MinerApi, NetApi, OtterscanApi, RPCApi, RethApi,
@ -273,7 +273,7 @@ pub async fn launch<Provider, Pool, Network, Tasks, Events, EvmConfig, EthApi, B
evm_config: EvmConfig,
eth: DynEthApiBuilder<Provider, Pool, EvmConfig, Network, Tasks, Events, EthApi>,
block_executor: BlockExecutor,
consensus: Arc<dyn FullConsensus>,
consensus: Arc<dyn FullConsensus<BlockExecutor::Primitives>>,
) -> Result<RpcServerHandle, RpcError>
where
Provider: FullRpcProvider<
@ -285,7 +285,7 @@ where
Pool: TransactionPool<Transaction = <EthApi::Pool as TransactionPool>::Transaction> + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions<Primitives = EthPrimitives> + Clone + 'static,
Events: CanonStateSubscriptions<Primitives = BlockExecutor::Primitives> + Clone + 'static,
EvmConfig: ConfigureEvm<Header = alloy_consensus::Header>,
EthApi: FullEthApiServer<
Provider: BlockReader<
@ -298,6 +298,8 @@ where
Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
BlockHeader = reth_primitives::Header,
BlockBody = reth_primitives::BlockBody,
>,
>,
{
@ -649,15 +651,17 @@ where
Pool: TransactionPool + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions<Primitives = EthPrimitives> + Clone + 'static,
Events: CanonStateSubscriptions<Primitives = BlockExecutor::Primitives> + Clone + 'static,
EvmConfig: ConfigureEvm<Header = Header>,
BlockExecutor: BlockExecutorProvider<
Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
BlockHeader = reth_primitives::Header,
BlockBody = reth_primitives::BlockBody,
>,
>,
Consensus: reth_consensus::FullConsensus + Clone + 'static,
Consensus: reth_consensus::FullConsensus<BlockExecutor::Primitives> + Clone + 'static,
{
/// Configures all [`RpcModule`]s specific to the given [`TransportRpcModuleConfig`] which can
/// be used to start the transport server(s).
@ -687,11 +691,9 @@ where
>,
>,
Provider: BlockReader<
Block = <EthApi::Provider as BlockReader>::Block,
Receipt = <EthApi::Provider as ReceiptProvider>::Receipt,
Header = <EthApi::Provider as HeaderProvider>::Header,
Block = <Events::Primitives as NodePrimitives>::Block,
Receipt = <Events::Primitives as NodePrimitives>::Receipt,
>,
Pool: TransactionPool<Transaction = <EthApi::Pool as TransactionPool>::Transaction>,
{
let Self {
provider,
@ -815,7 +817,6 @@ where
Provider: BlockReader<
Block = <EthApi::Provider as BlockReader>::Block,
Receipt = <EthApi::Provider as ReceiptProvider>::Receipt,
Header = <EthApi::Provider as HeaderProvider>::Header,
>,
Pool: TransactionPool<Transaction = <EthApi::Pool as TransactionPool>::Transaction>,
{
@ -963,7 +964,7 @@ pub struct RpcRegistryInner<
/// Holds the configuration for the RPC modules
config: RpcModuleConfig,
/// Holds a all `eth_` namespace handlers
eth: EthHandlers<Provider, Pool, Network, Events, EthApi>,
eth: EthHandlers<Provider, Events, EthApi>,
/// to put trace calls behind semaphore
blocking_pool_guard: BlockingTaskGuard,
/// Contains the [Methods] of a module
@ -977,16 +978,15 @@ impl<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
where
Provider: StateProviderFactory
+ BlockReader<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
Header = reth_primitives::Header,
Block = <Events::Primitives as NodePrimitives>::Block,
Receipt = <Events::Primitives as NodePrimitives>::Receipt,
> + EvmEnvProvider
+ Clone
+ Unpin
+ 'static,
Pool: Send + Sync + Clone + 'static,
Network: Clone + 'static,
Events: CanonStateSubscriptions<Primitives = EthPrimitives> + Clone + 'static,
Events: CanonStateSubscriptions + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
EthApi: EthApiTypes + 'static,
BlockExecutor: BlockExecutorProvider,
@ -1057,7 +1057,7 @@ where
}
/// Returns a reference to the installed [`EthHandlers`].
pub const fn eth_handlers(&self) -> &EthHandlers<Provider, Pool, Network, Events, EthApi> {
pub const fn eth_handlers(&self) -> &EthHandlers<Provider, Events, EthApi> {
&self.eth
}
@ -1215,7 +1215,6 @@ where
pub fn register_trace(&mut self) -> &mut Self
where
EthApi: TraceExt,
Provider: BlockReader<Block = <EthApi::Provider as BlockReader>::Block>,
{
let trace_api = self.trace_api();
self.modules.insert(RethRpcModule::Trace, trace_api.into_rpc().into());
@ -1276,15 +1275,11 @@ where
/// # Panics
///
/// If called outside of the tokio runtime. See also [`Self::eth_api`]
pub fn trace_api(&self) -> TraceApi<Provider, EthApi>
pub fn trace_api(&self) -> TraceApi<EthApi>
where
EthApi: TraceExt,
{
TraceApi::new(
self.provider.clone(),
self.eth_api().clone(),
self.blocking_pool_guard.clone(),
)
TraceApi::new(self.eth_api().clone(), self.blocking_pool_guard.clone())
}
/// Instantiates [`EthBundle`] Api
@ -1305,14 +1300,13 @@ where
/// # Panics
///
/// If called outside of the tokio runtime. See also [`Self::eth_api`]
pub fn debug_api(&self) -> DebugApi<Provider, EthApi, BlockExecutor>
pub fn debug_api(&self) -> DebugApi<EthApi, BlockExecutor>
where
EthApi: EthApiSpec + EthTransactions + TraceExt,
BlockExecutor:
BlockExecutorProvider<Primitives: NodePrimitives<Block = reth_primitives::Block>>,
{
DebugApi::new(
self.provider.clone(),
self.eth_api().clone(),
self.blocking_pool_guard.clone(),
self.block_executor.clone(),
@ -1340,7 +1334,7 @@ where
/// Instantiates `ValidationApi`
pub fn validation_api(&self) -> ValidationApi<Provider, BlockExecutor>
where
Consensus: reth_consensus::FullConsensus + Clone + 'static,
Consensus: reth_consensus::FullConsensus<BlockExecutor::Primitives> + Clone + 'static,
{
ValidationApi::new(
self.provider.clone(),
@ -1355,30 +1349,27 @@ where
impl<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
RpcRegistryInner<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
where
Provider: FullRpcProvider<
Block = <EthApi::Provider as BlockReader>::Block,
Receipt = <EthApi::Provider as ReceiptProvider>::Receipt,
Header = <EthApi::Provider as HeaderProvider>::Header,
> + AccountReader
+ ChangeSetReader,
Pool: TransactionPool<Transaction = <EthApi::Pool as TransactionPool>::Transaction> + 'static,
Provider: FullRpcProvider + AccountReader + ChangeSetReader,
Pool: TransactionPool + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions<Primitives = EthPrimitives> + Clone + 'static,
Events: CanonStateSubscriptions<Primitives = BlockExecutor::Primitives> + Clone + 'static,
EthApi: FullEthApiServer<
Provider: BlockReader<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
Header = reth_primitives::Header,
Block = <BlockExecutor::Primitives as NodePrimitives>::Block,
Receipt = <BlockExecutor::Primitives as NodePrimitives>::Receipt,
Header = <BlockExecutor::Primitives as NodePrimitives>::BlockHeader,
>,
>,
BlockExecutor: BlockExecutorProvider<
Primitives: NodePrimitives<
Block = reth_primitives::Block,
BlockHeader = reth_primitives::Header,
BlockBody = reth_primitives::BlockBody,
Receipt = reth_primitives::Receipt,
>,
>,
Consensus: reth_consensus::FullConsensus + Clone + 'static,
Consensus: reth_consensus::FullConsensus<BlockExecutor::Primitives> + Clone + 'static,
{
/// Configures the auth module that includes the
/// * `engine_` namespace
@ -1468,7 +1459,6 @@ where
.into()
}
RethRpcModule::Debug => DebugApi::new(
self.provider.clone(),
eth_api.clone(),
self.blocking_pool_guard.clone(),
self.block_executor.clone(),
@ -1495,16 +1485,14 @@ where
RethRpcModule::Net => {
NetApi::new(self.network.clone(), eth_api.clone()).into_rpc().into()
}
RethRpcModule::Trace => TraceApi::new(
self.provider.clone(),
eth_api.clone(),
self.blocking_pool_guard.clone(),
)
.into_rpc()
.into(),
RethRpcModule::Trace => {
TraceApi::new(eth_api.clone(), self.blocking_pool_guard.clone())
.into_rpc()
.into()
}
RethRpcModule::Web3 => Web3Api::new(self.network.clone()).into_rpc().into(),
RethRpcModule::Txpool => TxPoolApi::new(
self.pool.clone(),
self.eth.api.pool().clone(),
self.eth.api.tx_resp_builder().clone(),
)
.into_rpc()
@ -1524,7 +1512,7 @@ where
.into()
}
RethRpcModule::Flashbots => ValidationApi::new(
self.provider.clone(),
eth_api.provider().clone(),
Arc::new(self.consensus.clone()),
self.block_executor.clone(),
self.config.flashbots.clone(),

View File

@ -19,16 +19,16 @@ use reth_evm::{
execute::{BlockExecutorProvider, Executor},
ConfigureEvmEnv,
};
use reth_primitives::{BlockExt, NodePrimitives, SealedBlockWithSenders};
use reth_primitives::{BlockExt, NodePrimitives, ReceiptWithBloom, SealedBlockWithSenders};
use reth_primitives_traits::{Block as _, BlockBody, SignedTransaction};
use reth_provider::{
BlockReader, BlockReaderIdExt, ChainSpecProvider, HeaderProvider, ProviderBlock,
StateProofProvider, StateProviderFactory, TransactionVariant,
BlockIdReader, BlockReaderIdExt, ChainSpecProvider, HeaderProvider, ProviderBlock,
ReceiptProviderIdExt, StateProofProvider, TransactionVariant,
};
use reth_revm::{database::StateProviderDatabase, witness::ExecutionWitnessRecord};
use reth_rpc_api::DebugApiServer;
use reth_rpc_eth_api::{
helpers::{EthApiSpec, EthTransactions, TraceExt},
helpers::{EthTransactions, TraceExt},
EthApiTypes, FromEthApiError, RpcNodeCore,
};
use reth_rpc_eth_types::{EthApiError, StateCacheDb};
@ -47,22 +47,20 @@ use tokio::sync::{AcquireError, OwnedSemaphorePermit};
/// `debug` API implementation.
///
/// This type provides the functionality for handling `debug` related requests.
pub struct DebugApi<Provider, Eth, BlockExecutor> {
inner: Arc<DebugApiInner<Provider, Eth, BlockExecutor>>,
pub struct DebugApi<Eth, BlockExecutor> {
inner: Arc<DebugApiInner<Eth, BlockExecutor>>,
}
// === impl DebugApi ===
impl<Provider, Eth, BlockExecutor> DebugApi<Provider, Eth, BlockExecutor> {
impl<Eth, BlockExecutor> DebugApi<Eth, BlockExecutor> {
/// Create a new instance of the [`DebugApi`]
pub fn new(
provider: Provider,
eth: Eth,
blocking_task_guard: BlockingTaskGuard,
block_executor: BlockExecutor,
) -> Self {
let inner =
Arc::new(DebugApiInner { provider, eth_api: eth, blocking_task_guard, block_executor });
let inner = Arc::new(DebugApiInner { eth_api: eth, blocking_task_guard, block_executor });
Self { inner }
}
@ -72,15 +70,17 @@ impl<Provider, Eth, BlockExecutor> DebugApi<Provider, Eth, BlockExecutor> {
}
}
impl<Eth: RpcNodeCore, BlockExecutor> DebugApi<Eth, BlockExecutor> {
/// Access the underlying provider.
pub fn provider(&self) -> &Eth::Provider {
self.inner.eth_api.provider()
}
}
// === impl DebugApi ===
impl<Provider, Eth, BlockExecutor> DebugApi<Provider, Eth, BlockExecutor>
impl<Eth, BlockExecutor> DebugApi<Eth, BlockExecutor>
where
Provider: BlockReaderIdExt
+ HeaderProvider
+ ChainSpecProvider<ChainSpec: EthereumHardforks>
+ StateProviderFactory
+ 'static,
Eth: EthApiTypes + TraceExt + 'static,
BlockExecutor:
BlockExecutorProvider<Primitives: NodePrimitives<Block = ProviderBlock<Eth::Provider>>>,
@ -164,34 +164,30 @@ where
let (cfg, block_env) = self.eth_api().evm_env_for_raw_block(block.header()).await?;
// Depending on EIP-2 we need to recover the transactions differently
let senders = if self
.inner
.provider
.chain_spec()
.is_homestead_active_at_block(block.header().number())
{
block
.body()
.transactions()
.iter()
.map(|tx| {
tx.recover_signer()
.ok_or(EthApiError::InvalidTransactionSignature)
.map_err(Eth::Error::from_eth_err)
})
.collect::<Result<Vec<_>, Eth::Error>>()?
} else {
block
.body()
.transactions()
.iter()
.map(|tx| {
tx.recover_signer_unchecked()
.ok_or(EthApiError::InvalidTransactionSignature)
.map_err(Eth::Error::from_eth_err)
})
.collect::<Result<Vec<_>, Eth::Error>>()?
};
let senders =
if self.provider().chain_spec().is_homestead_active_at_block(block.header().number()) {
block
.body()
.transactions()
.iter()
.map(|tx| {
tx.recover_signer()
.ok_or(EthApiError::InvalidTransactionSignature)
.map_err(Eth::Error::from_eth_err)
})
.collect::<Result<Vec<_>, Eth::Error>>()?
} else {
block
.body()
.transactions()
.iter()
.map(|tx| {
tx.recover_signer_unchecked()
.ok_or(EthApiError::InvalidTransactionSignature)
.map_err(Eth::Error::from_eth_err)
})
.collect::<Result<Vec<_>, Eth::Error>>()?
};
self.trace_block(
Arc::new(block.with_senders_unchecked(senders).seal_slow()),
@ -209,8 +205,7 @@ where
opts: GethDebugTracingOptions,
) -> Result<Vec<TraceResult>, Eth::Error> {
let block_hash = self
.inner
.provider
.provider()
.block_hash_for_id(block_id)
.map_err(Eth::Error::from_eth_err)?
.ok_or(EthApiError::HeaderNotFound(block_id))?;
@ -813,30 +808,25 @@ where
}
#[async_trait]
impl<Provider, Eth, BlockExecutor> DebugApiServer for DebugApi<Provider, Eth, BlockExecutor>
impl<Eth, BlockExecutor> DebugApiServer for DebugApi<Eth, BlockExecutor>
where
Provider: BlockReaderIdExt<Block: Encodable, Receipt = reth_primitives::Receipt>
+ HeaderProvider
+ ChainSpecProvider<ChainSpec: EthereumHardforks>
+ StateProviderFactory
+ 'static,
Eth: EthApiSpec + EthTransactions + TraceExt + 'static,
BlockExecutor: BlockExecutorProvider<
Primitives: NodePrimitives<Block = <<Eth as RpcNodeCore>::Provider as BlockReader>::Block>,
>,
Eth: EthApiTypes + EthTransactions + TraceExt + 'static,
BlockExecutor:
BlockExecutorProvider<Primitives: NodePrimitives<Block = ProviderBlock<Eth::Provider>>>,
{
/// Handler for `debug_getRawHeader`
async fn raw_header(&self, block_id: BlockId) -> RpcResult<Bytes> {
let header = match block_id {
BlockId::Hash(hash) => self.inner.provider.header(&hash.into()).to_rpc_result()?,
BlockId::Hash(hash) => self.provider().header(&hash.into()).to_rpc_result()?,
BlockId::Number(number_or_tag) => {
let number = self
.inner
.provider
.provider()
.convert_block_number(number_or_tag)
.to_rpc_result()?
.ok_or_else(|| internal_rpc_err("Pending block not supported".to_string()))?;
self.inner.provider.header_by_number(number).to_rpc_result()?
.ok_or_else(|| {
internal_rpc_err("Pending block not supported".to_string())
})?;
self.provider().header_by_number(number).to_rpc_result()?
}
};
@ -851,8 +841,7 @@ where
/// Handler for `debug_getRawBlock`
async fn raw_block(&self, block_id: BlockId) -> RpcResult<Bytes> {
let block = self
.inner
.provider
.provider()
.block_by_id(block_id)
.to_rpc_result()?
.ok_or(EthApiError::HeaderNotFound(block_id))?;
@ -874,8 +863,7 @@ where
/// Returns the bytes of the transaction for the given hash.
async fn raw_transactions(&self, block_id: BlockId) -> RpcResult<Vec<Bytes>> {
let block = self
.inner
.provider
.provider()
.block_with_senders_by_id(block_id, TransactionVariant::NoHash)
.to_rpc_result()?
.unwrap_or_default();
@ -885,13 +873,12 @@ where
/// Handler for `debug_getRawReceipts`
async fn raw_receipts(&self, block_id: BlockId) -> RpcResult<Vec<Bytes>> {
Ok(self
.inner
.provider
.provider()
.receipts_by_block_id(block_id)
.to_rpc_result()?
.unwrap_or_default()
.into_iter()
.map(|receipt| receipt.with_bloom().encoded_2718().into())
.map(|receipt| ReceiptWithBloom::from(receipt).encoded_2718().into())
.collect())
}
@ -1201,21 +1188,19 @@ where
}
}
impl<Provider, Eth, BlockExecutor> std::fmt::Debug for DebugApi<Provider, Eth, BlockExecutor> {
impl<Eth, BlockExecutor> std::fmt::Debug for DebugApi<Eth, BlockExecutor> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DebugApi").finish_non_exhaustive()
}
}
impl<Provider, Eth, BlockExecutor> Clone for DebugApi<Provider, Eth, BlockExecutor> {
impl<Eth, BlockExecutor> Clone for DebugApi<Eth, BlockExecutor> {
fn clone(&self) -> Self {
Self { inner: Arc::clone(&self.inner) }
}
}
struct DebugApiInner<Provider, Eth, BlockExecutor> {
/// The provider that can interact with the chain.
provider: Provider,
struct DebugApiInner<Eth, BlockExecutor> {
/// The implementation of `eth` API
eth_api: Eth,
// restrict the number of concurrent calls to blocking calls

View File

@ -10,9 +10,13 @@ use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_chainspec::ChainInfo;
use reth_primitives::SealedBlockWithSenders;
use reth_provider::{BlockIdReader, BlockReader, ProviderBlock, ProviderError, ProviderReceipt};
use reth_provider::{
BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, HeaderProvider, ProviderBlock,
ProviderError, ProviderReceipt,
};
use reth_rpc_eth_api::{
EthApiTypes, EthFilterApiServer, FullEthApiTypes, RpcTransaction, TransactionCompat,
EthApiTypes, EthFilterApiServer, FullEthApiTypes, RpcNodeCoreExt, RpcTransaction,
TransactionCompat,
};
use reth_rpc_eth_types::{
logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
@ -40,27 +44,22 @@ use tracing::{error, trace};
const MAX_HEADERS_RANGE: u64 = 1_000; // with ~530bytes per header this is ~500kb
/// `Eth` filter RPC implementation.
pub struct EthFilter<Provider: BlockReader, Pool, Eth: EthApiTypes> {
pub struct EthFilter<Eth: EthApiTypes> {
/// All nested fields bundled together
inner: Arc<EthFilterInner<Provider, Pool, RpcTransaction<Eth::NetworkTypes>>>,
/// Assembles response data w.r.t. network.
tx_resp_builder: Eth::TransactionCompat,
inner: Arc<EthFilterInner<Eth>>,
}
impl<Provider, Pool, Eth> Clone for EthFilter<Provider, Pool, Eth>
impl<Eth> Clone for EthFilter<Eth>
where
Eth: EthApiTypes,
Provider: BlockReader,
{
fn clone(&self) -> Self {
Self { inner: self.inner.clone(), tx_resp_builder: self.tx_resp_builder.clone() }
Self { inner: self.inner.clone() }
}
}
impl<Provider, Pool, Eth> EthFilter<Provider, Pool, Eth>
impl<Eth> EthFilter<Eth>
where
Provider: BlockReader + Send + Sync + 'static,
Pool: Send + Sync + 'static,
Eth: EthApiTypes + 'static,
{
/// Creates a new, shareable instance.
@ -71,22 +70,13 @@ where
/// See also [`EthFilterConfig`].
///
/// This also spawns a task that periodically clears stale filters.
pub fn new(
provider: Provider,
pool: Pool,
eth_cache: EthStateCache<Provider::Block, Provider::Receipt>,
config: EthFilterConfig,
task_spawner: Box<dyn TaskSpawner>,
tx_resp_builder: Eth::TransactionCompat,
) -> Self {
pub fn new(eth_api: Eth, config: EthFilterConfig, task_spawner: Box<dyn TaskSpawner>) -> Self {
let EthFilterConfig { max_blocks_per_filter, max_logs_per_response, stale_filter_ttl } =
config;
let inner = EthFilterInner {
provider,
eth_api,
active_filters: ActiveFilters::new(),
pool,
id_provider: Arc::new(EthSubscriptionIdProvider::default()),
eth_cache,
max_headers_range: MAX_HEADERS_RANGE,
task_spawner,
stale_filter_ttl,
@ -95,7 +85,7 @@ where
max_logs_per_response: max_logs_per_response.unwrap_or(usize::MAX),
};
let eth_filter = Self { inner: Arc::new(inner), tx_resp_builder };
let eth_filter = Self { inner: Arc::new(inner) };
let this = eth_filter.clone();
eth_filter.inner.task_spawner.spawn_critical(
@ -143,18 +133,26 @@ where
}
}
impl<Provider, Pool, Eth> EthFilter<Provider, Pool, Eth>
impl<Eth> EthFilter<Eth>
where
Provider: BlockReader + BlockIdReader + 'static,
Pool: TransactionPool<Transaction = <Eth::Pool as TransactionPool>::Transaction> + 'static,
Eth: FullEthApiTypes,
Eth: FullEthApiTypes<Provider: BlockReader + BlockIdReader> + RpcNodeCoreExt,
{
/// Access the underlying provider.
fn provider(&self) -> &Eth::Provider {
self.inner.eth_api.provider()
}
/// Access the underlying pool.
fn pool(&self) -> &Eth::Pool {
self.inner.eth_api.pool()
}
/// Returns all the filter changes for the given id, if any
pub async fn filter_changes(
&self,
id: FilterId,
) -> Result<FilterChanges<RpcTransaction<Eth::NetworkTypes>>, EthFilterError> {
let info = self.inner.provider.chain_info()?;
let info = self.provider().chain_info()?;
let best_number = info.best_number;
// start_block is the block from which we should start fetching changes, the next block from
@ -185,7 +183,7 @@ where
// [start_block..best_block]
let end_block = best_number + 1;
let block_hashes =
self.inner.provider.canonical_hashes_range(start_block, end_block).map_err(
self.provider().canonical_hashes_range(start_block, end_block).map_err(
|_| EthApiError::HeaderRangeNotFound(start_block.into(), end_block.into()),
)?;
Ok(FilterChanges::Hashes(block_hashes))
@ -194,11 +192,11 @@ where
let (from_block_number, to_block_number) = match filter.block_option {
FilterBlockOption::Range { from_block, to_block } => {
let from = from_block
.map(|num| self.inner.provider.convert_block_number(num))
.map(|num| self.provider().convert_block_number(num))
.transpose()?
.flatten();
let to = to_block
.map(|num| self.inner.provider.convert_block_number(num))
.map(|num| self.provider().convert_block_number(num))
.transpose()?
.flatten();
logs_utils::get_filter_block_range(from, to, start_block, info)
@ -242,12 +240,9 @@ where
}
#[async_trait]
impl<Provider, Pool, Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>>
for EthFilter<Provider, Pool, Eth>
impl<Eth> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> for EthFilter<Eth>
where
Provider: BlockReader + BlockIdReader + 'static,
Pool: TransactionPool<Transaction = <Eth::Pool as TransactionPool>::Transaction> + 'static,
Eth: FullEthApiTypes + 'static,
Eth: FullEthApiTypes + RpcNodeCoreExt<Provider: BlockIdReader> + 'static,
{
/// Handler for `eth_newFilter`
async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
@ -272,14 +267,16 @@ where
let transaction_kind = match kind.unwrap_or_default() {
PendingTransactionFilterKind::Hashes => {
let receiver = self.inner.pool.pending_transactions_listener();
let receiver = self.pool().pending_transactions_listener();
let pending_txs_receiver = PendingTransactionsReceiver::new(receiver);
FilterKind::PendingTransaction(PendingTransactionKind::Hashes(pending_txs_receiver))
}
PendingTransactionFilterKind::Full => {
let stream = self.inner.pool.new_pending_pool_transactions_listener();
let full_txs_receiver =
FullTransactionsReceiver::new(stream, self.tx_resp_builder.clone());
let stream = self.pool().new_pending_pool_transactions_listener();
let full_txs_receiver = FullTransactionsReceiver::new(
stream,
self.inner.eth_api.tx_resp_builder().clone(),
);
FilterKind::PendingTransaction(PendingTransactionKind::FullTransaction(Arc::new(
full_txs_receiver,
)))
@ -332,10 +329,9 @@ where
}
}
impl<Provider, Pool, Eth> std::fmt::Debug for EthFilter<Provider, Pool, Eth>
impl<Eth> std::fmt::Debug for EthFilter<Eth>
where
Eth: EthApiTypes,
Provider: BlockReader,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthFilter").finish_non_exhaustive()
@ -344,21 +340,17 @@ where
/// Container type `EthFilter`
#[derive(Debug)]
struct EthFilterInner<Provider: BlockReader, Pool, Tx> {
/// The transaction pool.
pool: Pool,
/// The provider that can interact with the chain.
provider: Provider,
struct EthFilterInner<Eth: EthApiTypes> {
/// Inner `eth` API implementation.
eth_api: Eth,
/// All currently installed filters.
active_filters: ActiveFilters<Tx>,
active_filters: ActiveFilters<RpcTransaction<Eth::NetworkTypes>>,
/// Provides ids to identify filters
id_provider: Arc<dyn IdProvider>,
/// Maximum number of blocks that could be scanned per filter
max_blocks_per_filter: u64,
/// Maximum number of logs that can be returned in a response
max_logs_per_response: usize,
/// The async cache frontend for eth related data
eth_cache: EthStateCache<Provider::Block, Provider::Receipt>,
/// maximum number of headers to read at once for range filter
max_headers_range: u64,
/// The type that can spawn tasks.
@ -367,11 +359,22 @@ struct EthFilterInner<Provider: BlockReader, Pool, Tx> {
stale_filter_ttl: Duration,
}
impl<Provider, Pool, Tx> EthFilterInner<Provider, Pool, Tx>
impl<Eth> EthFilterInner<Eth>
where
Provider: BlockReader + BlockIdReader + 'static,
Pool: TransactionPool + 'static,
Eth: RpcNodeCoreExt<Provider: BlockIdReader, Pool: TransactionPool> + EthApiTypes,
{
/// Access the underlying provider.
fn provider(&self) -> &Eth::Provider {
self.eth_api.provider()
}
/// Access the underlying [`EthStateCache`].
fn eth_cache(
&self,
) -> &EthStateCache<ProviderBlock<Eth::Provider>, ProviderReceipt<Eth::Provider>> {
self.eth_api.cache()
}
/// Returns logs matching given filter object.
async fn logs_for_filter(&self, filter: Filter) -> Result<Vec<Log>, EthFilterError> {
match filter.block_option {
@ -379,7 +382,7 @@ where
// for all matching logs in the block
// get the block header with the hash
let header = self
.provider
.provider()
.header_by_hash_or_number(block_hash.into())?
.ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;
@ -390,7 +393,7 @@ where
let (receipts, maybe_block) = self
.receipts_and_maybe_block(
&block_num_hash,
self.provider.chain_info()?.best_number,
self.provider().chain_info()?.best_number,
)
.await?
.ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;
@ -399,8 +402,8 @@ where
append_matching_block_logs(
&mut all_logs,
maybe_block
.map(|b| ProviderOrBlock::Block(b))
.unwrap_or_else(|| ProviderOrBlock::Provider(&self.provider)),
.map(ProviderOrBlock::Block)
.unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
&FilteredParams::new(Some(filter)),
block_num_hash,
&receipts,
@ -412,16 +415,16 @@ where
}
FilterBlockOption::Range { from_block, to_block } => {
// compute the range
let info = self.provider.chain_info()?;
let info = self.provider().chain_info()?;
// we start at the most recent block if unset in filter
let start_block = info.best_number;
let from = from_block
.map(|num| self.provider.convert_block_number(num))
.map(|num| self.provider().convert_block_number(num))
.transpose()?
.flatten();
let to = to_block
.map(|num| self.provider.convert_block_number(num))
.map(|num| self.provider().convert_block_number(num))
.transpose()?
.flatten();
let (from_block_number, to_block_number) =
@ -433,8 +436,11 @@ where
}
/// Installs a new filter and returns the new identifier.
async fn install_filter(&self, kind: FilterKind<Tx>) -> RpcResult<FilterId> {
let last_poll_block_number = self.provider.best_block_number().to_rpc_result()?;
async fn install_filter(
&self,
kind: FilterKind<RpcTransaction<Eth::NetworkTypes>>,
) -> RpcResult<FilterId> {
let last_poll_block_number = self.provider().best_block_number().to_rpc_result()?;
let id = FilterId::from(self.id_provider.next_id());
let mut filters = self.active_filters.inner.lock().await;
filters.insert(
@ -482,7 +488,7 @@ where
for (from, to) in
BlockRangeInclusiveIter::new(from_block..=to_block, self.max_headers_range)
{
let headers = self.provider.headers_range(from..=to)?;
let headers = self.provider().headers_range(from..=to)?;
for (idx, header) in headers.iter().enumerate() {
// only if filter matches
@ -494,7 +500,7 @@ where
let block_hash = match headers.get(idx + 1) {
Some(parent) => parent.parent_hash(),
None => self
.provider
.provider()
.block_hash(header.number())?
.ok_or_else(|| ProviderError::HeaderNotFound(header.number().into()))?,
};
@ -506,8 +512,8 @@ where
append_matching_block_logs(
&mut all_logs,
maybe_block
.map(|block| ProviderOrBlock::Block(block))
.unwrap_or_else(|| ProviderOrBlock::Provider(&self.provider)),
.map(ProviderOrBlock::Block)
.unwrap_or_else(|| ProviderOrBlock::Provider(self.provider())),
&filter_params,
num_hash,
&receipts,
@ -540,20 +546,20 @@ where
best_number: u64,
) -> Result<
Option<(
Arc<Vec<ProviderReceipt<Provider>>>,
Option<Arc<SealedBlockWithSenders<ProviderBlock<Provider>>>>,
Arc<Vec<ProviderReceipt<Eth::Provider>>>,
Option<Arc<SealedBlockWithSenders<ProviderBlock<Eth::Provider>>>>,
)>,
EthFilterError,
> {
// The last 4 blocks are most likely cached, so we can just fetch them
let cached_range = best_number.saturating_sub(4)..=best_number;
let receipts_block = if cached_range.contains(&block_num_hash.number) {
self.eth_cache
self.eth_cache()
.get_block_and_receipts(block_num_hash.hash)
.await?
.map(|(b, r)| (r, Some(b)))
} else {
self.eth_cache.get_receipts(block_num_hash.hash).await?.map(|r| (r, None))
self.eth_cache().get_receipts(block_num_hash.hash).await?.map(|r| (r, None))
};
Ok(receipts_block)
}

View File

@ -17,8 +17,10 @@ use jsonrpsee::{
};
use reth_network_api::NetworkInfo;
use reth_primitives::NodePrimitives;
use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider};
use reth_rpc_eth_api::{pubsub::EthPubSubApiServer, TransactionCompat};
use reth_provider::{BlockNumReader, CanonStateSubscriptions};
use reth_rpc_eth_api::{
pubsub::EthPubSubApiServer, EthApiTypes, RpcNodeCore, RpcTransaction, TransactionCompat,
};
use reth_rpc_eth_types::logs_utils;
use reth_rpc_server_types::result::{internal_rpc_err, invalid_params_rpc_err};
use reth_rpc_types_compat::transaction::from_recovered;
@ -35,67 +37,47 @@ use tracing::error;
///
/// This handles `eth_subscribe` RPC calls.
#[derive(Clone)]
pub struct EthPubSub<Provider, Pool, Events, Network, Eth> {
pub struct EthPubSub<Eth, Events> {
/// All nested fields bundled together.
inner: Arc<EthPubSubInner<Provider, Pool, Events, Network>>,
inner: Arc<EthPubSubInner<Eth, Events>>,
/// The type that's used to spawn subscription tasks.
subscription_task_spawner: Box<dyn TaskSpawner>,
tx_resp_builder: Eth,
}
// === impl EthPubSub ===
impl<Provider, Pool, Events, Network, Eth> EthPubSub<Provider, Pool, Events, Network, Eth> {
impl<Eth, Events> EthPubSub<Eth, Events> {
/// Creates a new, shareable instance.
///
/// Subscription tasks are spawned via [`tokio::task::spawn`]
pub fn new(
provider: Provider,
pool: Pool,
chain_events: Events,
network: Network,
tx_resp_builder: Eth,
) -> Self {
Self::with_spawner(
provider,
pool,
chain_events,
network,
Box::<TokioTaskExecutor>::default(),
tx_resp_builder,
)
pub fn new(eth_api: Eth, chain_events: Events) -> Self {
Self::with_spawner(eth_api, chain_events, Box::<TokioTaskExecutor>::default())
}
/// Creates a new, shareable instance.
pub fn with_spawner(
provider: Provider,
pool: Pool,
eth_api: Eth,
chain_events: Events,
network: Network,
subscription_task_spawner: Box<dyn TaskSpawner>,
tx_resp_builder: Eth,
) -> Self {
let inner = EthPubSubInner { provider, pool, chain_events, network };
Self { inner: Arc::new(inner), subscription_task_spawner, tx_resp_builder }
let inner = EthPubSubInner { eth_api, chain_events };
Self { inner: Arc::new(inner), subscription_task_spawner }
}
}
#[async_trait::async_trait]
impl<Provider, Pool, Events, Network, Eth> EthPubSubApiServer<Eth::Transaction>
for EthPubSub<Provider, Pool, Events, Network, Eth>
impl<Eth, Events> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth, Events>
where
Provider: BlockReader + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: CanonStateSubscriptions<
Primitives: NodePrimitives<
SignedTx: Encodable2718,
BlockHeader = reth_primitives::Header,
Receipt = reth_primitives::Receipt,
>,
> + Clone
+ 'static,
Network: NetworkInfo + Clone + 'static,
Eth: TransactionCompat<PoolConsensusTx<Pool>> + 'static,
Eth: RpcNodeCore<Provider: BlockNumReader, Pool: TransactionPool, Network: NetworkInfo>
+ EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>
+ 'static,
{
/// Handler for `eth_subscribe`
async fn subscribe(
@ -106,9 +88,8 @@ where
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let pubsub = self.inner.clone();
let resp_builder = self.tx_resp_builder.clone();
self.subscription_task_spawner.spawn(Box::pin(async move {
let _ = handle_accepted(pubsub, sink, kind, params, resp_builder).await;
let _ = handle_accepted(pubsub, sink, kind, params).await;
}));
Ok(())
@ -116,16 +97,13 @@ where
}
/// The actual handler for an accepted [`EthPubSub::subscribe`] call.
async fn handle_accepted<Provider, Pool, Events, Network, Eth>(
pubsub: Arc<EthPubSubInner<Provider, Pool, Events, Network>>,
async fn handle_accepted<Eth, Events>(
pubsub: Arc<EthPubSubInner<Eth, Events>>,
accepted_sink: SubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
tx_resp_builder: Eth,
) -> Result<(), ErrorObject<'static>>
where
Provider: BlockReader + EvmEnvProvider + Clone + 'static,
Pool: TransactionPool + 'static,
Events: CanonStateSubscriptions<
Primitives: NodePrimitives<
SignedTx: Encodable2718,
@ -134,8 +112,8 @@ where
>,
> + Clone
+ 'static,
Network: NetworkInfo + Clone + 'static,
Eth: TransactionCompat<PoolConsensusTx<Pool>>,
Eth: RpcNodeCore<Provider: BlockNumReader, Pool: TransactionPool, Network: NetworkInfo>
+ EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>,
{
match kind {
SubscriptionKind::NewHeads => {
@ -166,7 +144,7 @@ where
let stream = pubsub.full_pending_transaction_stream().filter_map(|tx| {
let tx_value = match from_recovered(
tx.transaction.to_consensus(),
&tx_resp_builder,
pubsub.eth_api.tx_resp_builder(),
) {
Ok(tx) => {
Some(EthSubscriptionResult::FullTransaction(Box::new(tx)))
@ -204,7 +182,7 @@ where
let mut canon_state =
BroadcastStream::new(pubsub.chain_events.subscribe_to_canonical_state());
// get current sync status
let mut initial_sync_status = pubsub.network.is_syncing();
let mut initial_sync_status = pubsub.eth_api.network().is_syncing();
let current_sub_res = pubsub.sync_status(initial_sync_status);
// send the current status immediately
@ -215,7 +193,7 @@ where
}
while canon_state.next().await.is_some() {
let current_syncing = pubsub.network.is_syncing();
let current_syncing = pubsub.eth_api.network().is_syncing();
// Only send a new response if the sync status has changed
if current_syncing != initial_sync_status {
// Update the sync status on each new block
@ -285,9 +263,7 @@ where
}
}
impl<Provider, Pool, Events, Network, Eth> std::fmt::Debug
for EthPubSub<Provider, Pool, Events, Network, Eth>
{
impl<Eth, Events> std::fmt::Debug for EthPubSub<Eth, Events> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EthPubSub").finish_non_exhaustive()
}
@ -295,28 +271,28 @@ impl<Provider, Pool, Events, Network, Eth> std::fmt::Debug
/// Container type `EthPubSub`
#[derive(Clone)]
struct EthPubSubInner<Provider, Pool, Events, Network> {
/// The transaction pool.
pool: Pool,
/// The provider that can interact with the chain.
provider: Provider,
struct EthPubSubInner<EthApi, Events> {
/// The `eth` API.
eth_api: EthApi,
/// A type that allows to create new event subscriptions.
chain_events: Events,
/// The network.
network: Network,
}
// == impl EthPubSubInner ===
impl<Provider, Pool, Events, Network> EthPubSubInner<Provider, Pool, Events, Network>
impl<Eth, Events> EthPubSubInner<Eth, Events>
where
Provider: BlockReader + 'static,
Eth: RpcNodeCore<Provider: BlockNumReader>,
{
/// Returns the current sync status for the `syncing` subscription
fn sync_status(&self, is_syncing: bool) -> EthSubscriptionResult {
if is_syncing {
let current_block =
self.provider.chain_info().map(|info| info.best_number).unwrap_or_default();
let current_block = self
.eth_api
.provider()
.chain_info()
.map(|info| info.best_number)
.unwrap_or_default();
EthSubscriptionResult::SyncState(PubSubSyncStatus::Detailed(SyncStatusMetadata {
syncing: true,
starting_block: 0,
@ -329,35 +305,31 @@ where
}
}
impl<Provider, Pool, Events, Network> EthPubSubInner<Provider, Pool, Events, Network>
impl<Eth, Events> EthPubSubInner<Eth, Events>
where
Pool: TransactionPool + 'static,
Eth: RpcNodeCore<Pool: TransactionPool>,
{
/// Returns a stream that yields all transaction hashes emitted by the txpool.
fn pending_transaction_hashes_stream(&self) -> impl Stream<Item = TxHash> {
ReceiverStream::new(self.pool.pending_transactions_listener())
ReceiverStream::new(self.eth_api.pool().pending_transactions_listener())
}
/// Returns a stream that yields all transactions emitted by the txpool.
fn full_pending_transaction_stream(
&self,
) -> impl Stream<Item = NewTransactionEvent<<Pool as TransactionPool>::Transaction>> {
self.pool.new_pending_pool_transactions_listener()
) -> impl Stream<Item = NewTransactionEvent<<Eth::Pool as TransactionPool>::Transaction>> {
self.eth_api.pool().new_pending_pool_transactions_listener()
}
}
impl<Provider, Pool, Events, Network> EthPubSubInner<Provider, Pool, Events, Network>
impl<Eth, Events> EthPubSubInner<Eth, Events>
where
Provider: BlockReader + EvmEnvProvider + 'static,
Events: CanonStateSubscriptions<
Primitives: NodePrimitives<
SignedTx: Encodable2718,
BlockHeader = reth_primitives::Header,
Receipt = reth_primitives::Receipt,
>,
> + 'static,
Network: NetworkInfo + 'static,
Pool: 'static,
Primitives: NodePrimitives<
BlockHeader = reth_primitives::Header,
Receipt = reth_primitives::Receipt,
>,
>,
{
/// Returns a stream that yields all new RPC blocks.
fn new_headers_stream(&self) -> impl Stream<Item = Header> {

View File

@ -20,10 +20,10 @@ use reth_consensus_common::calc::{
};
use reth_evm::ConfigureEvmEnv;
use reth_primitives_traits::{BlockBody, BlockHeader};
use reth_provider::{BlockReader, ChainSpecProvider, EvmEnvProvider, StateProviderFactory};
use reth_provider::{BlockNumReader, BlockReader, ChainSpecProvider, HeaderProvider};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_api::TraceApiServer;
use reth_rpc_eth_api::{helpers::TraceExt, FromEthApiError};
use reth_rpc_eth_api::{helpers::TraceExt, FromEthApiError, RpcNodeCore};
use reth_rpc_eth_types::{error::EthApiError, utils::recover_raw_transaction};
use reth_tasks::pool::BlockingTaskGuard;
use reth_transaction_pool::{PoolPooledTx, PoolTransaction, TransactionPool};
@ -41,21 +41,16 @@ use tokio::sync::{AcquireError, OwnedSemaphorePermit};
/// `trace` API implementation.
///
/// This type provides the functionality for handling `trace` related requests.
pub struct TraceApi<Provider, Eth> {
inner: Arc<TraceApiInner<Provider, Eth>>,
pub struct TraceApi<Eth> {
inner: Arc<TraceApiInner<Eth>>,
}
// === impl TraceApi ===
impl<Provider, Eth> TraceApi<Provider, Eth> {
/// The provider that can interact with the chain.
pub fn provider(&self) -> &Provider {
&self.inner.provider
}
impl<Eth> TraceApi<Eth> {
/// Create a new instance of the [`TraceApi`]
pub fn new(provider: Provider, eth_api: Eth, blocking_task_guard: BlockingTaskGuard) -> Self {
let inner = Arc::new(TraceApiInner { provider, eth_api, blocking_task_guard });
pub fn new(eth_api: Eth, blocking_task_guard: BlockingTaskGuard) -> Self {
let inner = Arc::new(TraceApiInner { eth_api, blocking_task_guard });
Self { inner }
}
@ -72,15 +67,17 @@ impl<Provider, Eth> TraceApi<Provider, Eth> {
}
}
impl<Eth: RpcNodeCore> TraceApi<Eth> {
/// Access the underlying provider.
pub fn provider(&self) -> &Eth::Provider {
self.inner.eth_api.provider()
}
}
// === impl TraceApi ===
impl<Provider, Eth> TraceApi<Provider, Eth>
impl<Eth> TraceApi<Eth>
where
Provider: BlockReader<Block = <Eth::Provider as BlockReader>::Block>
+ StateProviderFactory
+ EvmEnvProvider
+ ChainSpecProvider<ChainSpec: EthereumHardforks>
+ 'static,
Eth: TraceExt + 'static,
{
/// Executes the given call and returns a number of possible traces for it.
@ -576,13 +573,8 @@ where
}
#[async_trait]
impl<Provider, Eth> TraceApiServer for TraceApi<Provider, Eth>
impl<Eth> TraceApiServer for TraceApi<Eth>
where
Provider: BlockReader<Block = <Eth::Provider as BlockReader>::Block>
+ StateProviderFactory
+ EvmEnvProvider
+ ChainSpecProvider<ChainSpec: EthereumHardforks>
+ 'static,
Eth: TraceExt + 'static,
{
/// Executes the given call and returns a number of possible traces for it.
@ -704,20 +696,18 @@ where
}
}
impl<Provider, Eth> std::fmt::Debug for TraceApi<Provider, Eth> {
impl<Eth> std::fmt::Debug for TraceApi<Eth> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TraceApi").finish_non_exhaustive()
}
}
impl<Provider, Eth> Clone for TraceApi<Provider, Eth> {
impl<Eth> Clone for TraceApi<Eth> {
fn clone(&self) -> Self {
Self { inner: Arc::clone(&self.inner) }
}
}
struct TraceApiInner<Provider, Eth> {
/// The provider that can interact with the chain.
provider: Provider,
struct TraceApiInner<Eth> {
/// Access to commonly used code of the `eth` namespace
eth_api: Eth,
// restrict the number of concurrent calls to `trace_*`

View File

@ -1,4 +1,6 @@
use alloy_consensus::{BlobTransactionValidationError, EnvKzgSettings, Transaction, TxReceipt};
use alloy_consensus::{
BlobTransactionValidationError, BlockHeader, EnvKzgSettings, Transaction, TxReceipt,
};
use alloy_eips::{eip4844::kzg_to_versioned_hash, eip7685::RequestsOrHash};
use alloy_rpc_types_beacon::relay::{
BidTrace, BuilderBlockValidationRequest, BuilderBlockValidationRequestV2,
@ -16,10 +18,10 @@ use reth_errors::{BlockExecutionError, ConsensusError, ProviderError};
use reth_ethereum_consensus::GAS_LIMIT_BOUND_DIVISOR;
use reth_evm::execute::{BlockExecutorProvider, Executor};
use reth_payload_validator::ExecutionPayloadValidator;
use reth_primitives::{Block, GotExpected, NodePrimitives, SealedBlockWithSenders, SealedHeader};
use reth_primitives::{GotExpected, NodePrimitives, SealedBlockWithSenders, SealedHeader};
use reth_primitives_traits::{Block as _, BlockBody};
use reth_provider::{
AccountReader, BlockExecutionInput, BlockExecutionOutput, BlockReaderIdExt, HeaderProvider,
StateProviderFactory, WithdrawalsProvider,
BlockExecutionInput, BlockExecutionOutput, BlockReaderIdExt, StateProviderFactory,
};
use reth_revm::{cached::CachedReads, database::StateProviderDatabase};
use reth_rpc_api::BlockSubmissionValidationApiServer;
@ -32,7 +34,7 @@ use tokio::sync::{oneshot, RwLock};
/// The type that implements the `validation` rpc namespace trait
#[derive(Clone, Debug, derive_more::Deref)]
pub struct ValidationApi<Provider: ChainSpecProvider, E> {
pub struct ValidationApi<Provider: ChainSpecProvider, E: BlockExecutorProvider> {
#[deref]
inner: Arc<ValidationApiInner<Provider, E>>,
}
@ -40,11 +42,12 @@ pub struct ValidationApi<Provider: ChainSpecProvider, E> {
impl<Provider, E> ValidationApi<Provider, E>
where
Provider: ChainSpecProvider,
E: BlockExecutorProvider,
{
/// Create a new instance of the [`ValidationApi`]
pub fn new(
provider: Provider,
consensus: Arc<dyn FullConsensus>,
consensus: Arc<dyn FullConsensus<E::Primitives>>,
executor_provider: E,
config: ValidationApiConfig,
task_spawner: Box<dyn TaskSpawner>,
@ -91,21 +94,18 @@ where
Provider: BlockReaderIdExt<Header = reth_primitives::Header>
+ ChainSpecProvider<ChainSpec: EthereumHardforks>
+ StateProviderFactory
+ HeaderProvider
+ AccountReader
+ WithdrawalsProvider
+ 'static,
E: BlockExecutorProvider<
Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
BlockHeader = Provider::Header,
BlockBody = reth_primitives::BlockBody,
>,
>,
{
/// Validates the given block and a [`BidTrace`] against it.
pub async fn validate_message_against_block(
&self,
block: SealedBlockWithSenders,
block: SealedBlockWithSenders<<E::Primitives as NodePrimitives>::Block>,
message: BidTrace,
registered_gas_limit: u64,
) -> Result<(), ValidationApiError> {
@ -187,9 +187,9 @@ where
let state_root =
state_provider.state_root(state_provider.hashed_post_state(&output.state))?;
if state_root != block.state_root {
if state_root != block.header().state_root() {
return Err(ConsensusError::BodyStateRootDiff(
GotExpected { got: state_root, expected: block.state_root }.into(),
GotExpected { got: state_root, expected: block.header().state_root() }.into(),
)
.into())
}
@ -262,7 +262,7 @@ where
/// to checking the latest block transaction.
fn ensure_payment(
&self,
block: &Block,
block: &<E::Primitives as NodePrimitives>::Block,
output: &BlockExecutionOutput<<E::Primitives as NodePrimitives>::Receipt>,
message: &BidTrace,
) -> Result<(), ValidationApiError> {
@ -279,7 +279,7 @@ where
(U256::ZERO, U256::ZERO)
};
if let Some(withdrawals) = &block.body.withdrawals {
if let Some(withdrawals) = block.body().withdrawals() {
for withdrawal in withdrawals {
if withdrawal.address == message.proposer_fee_recipient {
balance_before += withdrawal.amount_wei();
@ -294,7 +294,7 @@ where
let (receipt, tx) = output
.receipts
.last()
.zip(block.body.transactions.last())
.zip(block.body().transactions().last())
.ok_or(ValidationApiError::ProposerPayment)?;
if !receipt.status() {
@ -313,7 +313,7 @@ where
return Err(ValidationApiError::ProposerPayment)
}
if let Some(block_base_fee) = block.base_fee_per_gas {
if let Some(block_base_fee) = block.header().base_fee_per_gas() {
if tx.effective_tip_per_gas(block_base_fee).unwrap_or_default() != 0 {
return Err(ValidationApiError::ProposerPayment)
}
@ -412,15 +412,12 @@ where
Provider: BlockReaderIdExt<Header = reth_primitives::Header>
+ ChainSpecProvider<ChainSpec: EthereumHardforks>
+ StateProviderFactory
+ HeaderProvider
+ AccountReader
+ WithdrawalsProvider
+ Clone
+ 'static,
E: BlockExecutorProvider<
Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
BlockHeader = Provider::Header,
BlockBody = reth_primitives::BlockBody,
>,
>,
{
@ -476,11 +473,11 @@ where
}
#[derive(Debug)]
pub struct ValidationApiInner<Provider: ChainSpecProvider, E> {
pub struct ValidationApiInner<Provider: ChainSpecProvider, E: BlockExecutorProvider> {
/// The provider that can interact with the chain.
provider: Provider,
/// Consensus implementation.
consensus: Arc<dyn FullConsensus>,
consensus: Arc<dyn FullConsensus<E::Primitives>>,
/// Execution payload validator.
payload_validator: ExecutionPayloadValidator<Provider::ChainSpec>,
/// Block executor factory.