feat: relax bounds for EthPubSub (#13203)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Arsenii Kulikov
2024-12-07 09:30:56 +04:00
committed by GitHub
parent 4d2c5767ec
commit 6b35b05993
8 changed files with 74 additions and 139 deletions

View File

@ -208,7 +208,7 @@ use reth_primitives::NodePrimitives;
use reth_provider::{
AccountReader, BlockReader, CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader,
EvmEnvProvider, FullRpcProvider, ProviderBlock, ProviderHeader, ProviderReceipt,
ReceiptProvider, StateProviderFactory,
StateProviderFactory,
};
use reth_rpc::{
AdminApi, DebugApi, EngineEthApi, EthBundle, MinerApi, NetApi, OtterscanApi, RPCApi, RethApi,
@ -286,18 +286,19 @@ where
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions<Primitives = BlockExecutor::Primitives> + Clone + 'static,
EvmConfig: ConfigureEvm<Header = alloy_consensus::Header>,
EvmConfig: ConfigureEvm<
Header = <BlockExecutor::Primitives as NodePrimitives>::BlockHeader,
Transaction = <BlockExecutor::Primitives as NodePrimitives>::SignedTx,
>,
EthApi: FullEthApiServer<
Provider: BlockReader<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
Header = reth_primitives::Header,
Block = <BlockExecutor::Primitives as NodePrimitives>::Block,
Receipt = <BlockExecutor::Primitives as NodePrimitives>::Receipt,
Header = <BlockExecutor::Primitives as NodePrimitives>::BlockHeader,
>,
>,
BlockExecutor: BlockExecutorProvider<
Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
BlockHeader = reth_primitives::Header,
BlockBody = reth_primitives::BlockBody,
>,
@ -647,16 +648,21 @@ impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus
impl<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus>
RpcModuleBuilder<Provider, Pool, Network, Tasks, Events, EvmConfig, BlockExecutor, Consensus>
where
Provider: FullRpcProvider + AccountReader + ChangeSetReader,
Provider: FullRpcProvider<
Block = <Events::Primitives as NodePrimitives>::Block,
Receipt = <Events::Primitives as NodePrimitives>::Receipt,
> + AccountReader
+ ChangeSetReader,
Pool: TransactionPool + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
Tasks: TaskSpawner + Clone + 'static,
Events: CanonStateSubscriptions<Primitives = BlockExecutor::Primitives> + Clone + 'static,
EvmConfig: ConfigureEvm<Header = Header>,
EvmConfig: ConfigureEvm<
Header = <BlockExecutor::Primitives as NodePrimitives>::BlockHeader,
Transaction = <BlockExecutor::Primitives as NodePrimitives>::SignedTx,
>,
BlockExecutor: BlockExecutorProvider<
Primitives: NodePrimitives<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
BlockHeader = reth_primitives::Header,
BlockBody = reth_primitives::BlockBody,
>,
@ -685,15 +691,11 @@ where
EngineApi: EngineApiServer<EngineT>,
EthApi: FullEthApiServer<
Provider: BlockReader<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
Header = reth_primitives::Header,
Block = <Events::Primitives as NodePrimitives>::Block,
Receipt = <Events::Primitives as NodePrimitives>::Receipt,
Header = <Events::Primitives as NodePrimitives>::BlockHeader,
>,
>,
Provider: BlockReader<
Block = <Events::Primitives as NodePrimitives>::Block,
Receipt = <Events::Primitives as NodePrimitives>::Receipt,
>,
{
let Self {
provider,
@ -741,13 +743,16 @@ where
/// use reth_evm::ConfigureEvm;
/// use reth_evm_ethereum::execute::EthExecutorProvider;
/// use reth_network_api::noop::NoopNetwork;
/// use reth_primitives::TransactionSigned;
/// use reth_provider::test_utils::{NoopProvider, TestCanonStateSubscriptions};
/// use reth_rpc::EthApi;
/// use reth_rpc_builder::RpcModuleBuilder;
/// use reth_tasks::TokioTaskExecutor;
/// use reth_transaction_pool::noop::NoopTransactionPool;
///
/// fn init<Evm: ConfigureEvm<Header = Header> + 'static>(evm: Evm) {
/// fn init<Evm: ConfigureEvm<Header = Header, Transaction = TransactionSigned> + 'static>(
/// evm: Evm,
/// ) {
/// let mut registry = RpcModuleBuilder::default()
/// .with_provider(NoopProvider::default())
/// .with_pool(NoopTransactionPool::default())
@ -769,11 +774,6 @@ where
) -> RpcRegistryInner<Provider, Pool, Network, Tasks, Events, EthApi, BlockExecutor, Consensus>
where
EthApi: EthApiTypes + 'static,
Provider: BlockReader<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
Header = reth_primitives::Header,
>,
{
let Self {
provider,
@ -809,15 +809,11 @@ where
where
EthApi: FullEthApiServer<
Provider: BlockReader<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
Header = reth_primitives::Header,
Receipt = <Events::Primitives as NodePrimitives>::Receipt,
Block = <Events::Primitives as NodePrimitives>::Block,
Header = <Events::Primitives as NodePrimitives>::BlockHeader,
>,
>,
Provider: BlockReader<
Block = <EthApi::Provider as BlockReader>::Block,
Receipt = <EthApi::Provider as ReceiptProvider>::Receipt,
>,
Pool: TransactionPool<Transaction = <EthApi::Pool as TransactionPool>::Transaction>,
{
let mut modules = TransportRpcModules::default();
@ -1155,8 +1151,7 @@ where
RpcReceipt<EthApi::NetworkTypes>,
RpcHeader<EthApi::NetworkTypes>,
> + EthApiTypes,
BlockExecutor:
BlockExecutorProvider<Primitives: NodePrimitives<Block = reth_primitives::Block>>,
BlockExecutor: BlockExecutorProvider,
{
/// Register Eth Namespace
///
@ -1190,17 +1185,8 @@ where
/// If called outside of the tokio runtime. See also [`Self::eth_api`]
pub fn register_debug(&mut self) -> &mut Self
where
EthApi: EthApiSpec
+ EthTransactions<
Provider: BlockReader<
Block = reth_primitives::Block,
Receipt = reth_primitives::Receipt,
>,
> + TraceExt,
Provider: BlockReader<
Block = <EthApi::Provider as BlockReader>::Block,
Receipt = <EthApi::Provider as ReceiptProvider>::Receipt,
>,
EthApi: EthApiSpec + EthTransactions + TraceExt,
BlockExecutor::Primitives: NodePrimitives<Block = ProviderBlock<EthApi::Provider>>,
{
let debug_api = self.debug_api();
self.modules.insert(RethRpcModule::Debug, debug_api.into_rpc().into());
@ -1303,8 +1289,7 @@ where
pub fn debug_api(&self) -> DebugApi<EthApi, BlockExecutor>
where
EthApi: EthApiSpec + EthTransactions + TraceExt,
BlockExecutor:
BlockExecutorProvider<Primitives: NodePrimitives<Block = reth_primitives::Block>>,
BlockExecutor::Primitives: NodePrimitives<Block = ProviderBlock<EthApi::Provider>>,
{
DebugApi::new(
self.eth_api().clone(),
@ -1363,10 +1348,8 @@ where
>,
BlockExecutor: BlockExecutorProvider<
Primitives: NodePrimitives<
Block = reth_primitives::Block,
BlockHeader = reth_primitives::Header,
BlockBody = reth_primitives::BlockBody,
Receipt = reth_primitives::Receipt,
>,
>,
Consensus: reth_consensus::FullConsensus<BlockExecutor::Primitives> + Clone + 'static,

View File

@ -8,27 +8,28 @@ use alloy_primitives::TxHash;
use alloy_rpc_types_eth::{FilteredParams, Log};
use reth_chainspec::ChainInfo;
use reth_errors::ProviderError;
use reth_primitives::{Receipt, SealedBlockWithSenders};
use reth_primitives::SealedBlockWithSenders;
use reth_primitives_traits::{BlockBody, SignedTransaction};
use reth_storage_api::{BlockReader, ProviderBlock};
use std::sync::Arc;
/// Returns all matching of a block's receipts when the transaction hashes are known.
pub fn matching_block_logs_with_tx_hashes<'a, I>(
pub fn matching_block_logs_with_tx_hashes<'a, I, R>(
filter: &FilteredParams,
block_num_hash: BlockNumHash,
tx_hashes_and_receipts: I,
removed: bool,
) -> Vec<Log>
where
I: IntoIterator<Item = (TxHash, &'a Receipt)>,
I: IntoIterator<Item = (TxHash, &'a R)>,
R: TxReceipt<Log = alloy_primitives::Log> + 'a,
{
let mut all_logs = Vec::new();
// Tracks the index of a log in the entire block.
let mut log_index: u64 = 0;
// Iterate over transaction hashes and receipts and append matching logs.
for (receipt_idx, (tx_hash, receipt)) in tx_hashes_and_receipts.into_iter().enumerate() {
for log in &receipt.logs {
for log in receipt.logs() {
if log_matches_filter(block_num_hash, log, filter) {
let log = Log {
inner: log.clone(),

View File

@ -2,13 +2,9 @@
use std::sync::Arc;
use alloy_eips::eip2718::Encodable2718;
use alloy_primitives::TxHash;
use alloy_rpc_types_eth::{
pubsub::{
Params, PubSubSyncStatus, SubscriptionKind, SubscriptionResult as EthSubscriptionResult,
SyncStatusMetadata,
},
pubsub::{Params, PubSubSyncStatus, SubscriptionKind, SyncStatusMetadata},
FilteredParams, Header, Log,
};
use futures::StreamExt;
@ -68,13 +64,7 @@ impl<Eth, Events> EthPubSub<Eth, Events> {
#[async_trait::async_trait]
impl<Eth, Events> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for EthPubSub<Eth, Events>
where
Events: CanonStateSubscriptions<
Primitives: NodePrimitives<
BlockHeader = reth_primitives::Header,
Receipt = reth_primitives::Receipt,
>,
> + Clone
+ 'static,
Events: CanonStateSubscriptions + 'static,
Eth: RpcNodeCore<Provider: BlockNumReader, Pool: TransactionPool, Network: NetworkInfo>
+ EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>
+ 'static,
@ -104,23 +94,13 @@ async fn handle_accepted<Eth, Events>(
params: Option<Params>,
) -> Result<(), ErrorObject<'static>>
where
Events: CanonStateSubscriptions<
Primitives: NodePrimitives<
SignedTx: Encodable2718,
BlockHeader = reth_primitives::Header,
Receipt = reth_primitives::Receipt,
>,
> + Clone
+ 'static,
Events: CanonStateSubscriptions + 'static,
Eth: RpcNodeCore<Provider: BlockNumReader, Pool: TransactionPool, Network: NetworkInfo>
+ EthApiTypes<TransactionCompat: TransactionCompat<PoolConsensusTx<Eth::Pool>>>,
{
match kind {
SubscriptionKind::NewHeads => {
let stream = pubsub
.new_headers_stream()
.map(|header| EthSubscriptionResult::<()>::Header(Box::new(header.into())));
pipe_from_stream(accepted_sink, stream).await
pipe_from_stream(accepted_sink, pubsub.new_headers_stream()).await
}
SubscriptionKind::Logs => {
// if no params are provided, used default filter params
@ -131,10 +111,7 @@ where
}
_ => FilteredParams::default(),
};
let stream = pubsub
.log_stream(filter)
.map(|log| EthSubscriptionResult::<()>::Log(Box::new(log)));
pipe_from_stream(accepted_sink, stream).await
pipe_from_stream(accepted_sink, pubsub.log_stream(filter)).await
}
SubscriptionKind::NewPendingTransactions => {
if let Some(params) = params {
@ -146,9 +123,7 @@ where
tx.transaction.to_consensus(),
pubsub.eth_api.tx_resp_builder(),
) {
Ok(tx) => {
Some(EthSubscriptionResult::FullTransaction(Box::new(tx)))
}
Ok(tx) => Some(tx),
Err(err) => {
error!(target = "rpc",
%err,
@ -172,10 +147,7 @@ where
}
}
let stream = pubsub
.pending_transaction_hashes_stream()
.map(EthSubscriptionResult::<()>::TransactionHash);
pipe_from_stream(accepted_sink, stream).await
pipe_from_stream(accepted_sink, pubsub.pending_transaction_hashes_stream()).await
}
SubscriptionKind::Syncing => {
// get new block subscription
@ -285,7 +257,7 @@ where
Eth: RpcNodeCore<Provider: BlockNumReader>,
{
/// Returns the current sync status for the `syncing` subscription
fn sync_status(&self, is_syncing: bool) -> EthSubscriptionResult {
fn sync_status(&self, is_syncing: bool) -> PubSubSyncStatus {
if is_syncing {
let current_block = self
.eth_api
@ -293,14 +265,14 @@ where
.chain_info()
.map(|info| info.best_number)
.unwrap_or_default();
EthSubscriptionResult::SyncState(PubSubSyncStatus::Detailed(SyncStatusMetadata {
PubSubSyncStatus::Detailed(SyncStatusMetadata {
syncing: true,
starting_block: 0,
current_block,
highest_block: Some(current_block),
}))
})
} else {
EthSubscriptionResult::SyncState(PubSubSyncStatus::Simple(false))
PubSubSyncStatus::Simple(false)
}
}
}
@ -324,15 +296,12 @@ where
impl<Eth, Events> EthPubSubInner<Eth, Events>
where
Events: CanonStateSubscriptions<
Primitives: NodePrimitives<
BlockHeader = reth_primitives::Header,
Receipt = reth_primitives::Receipt,
>,
>,
Events: CanonStateSubscriptions,
{
/// Returns a stream that yields all new RPC blocks.
fn new_headers_stream(&self) -> impl Stream<Item = Header> {
fn new_headers_stream(
&self,
) -> impl Stream<Item = Header<<Events::Primitives as NodePrimitives>::BlockHeader>> {
self.chain_events.canonical_state_stream().flat_map(|new_chain| {
let headers = new_chain.committed().headers().collect::<Vec<_>>();
futures::stream::iter(