diff --git a/src/hl_node_compliance.rs b/src/hl_node_compliance.rs index 3cb91b7de..9b6ab707e 100644 --- a/src/hl_node_compliance.rs +++ b/src/hl_node_compliance.rs @@ -1,7 +1,6 @@ -use alloy_consensus::TxReceipt; +use alloy_consensus::{transaction::TransactionMeta, TxReceipt}; use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_json_rpc::RpcObject; -use alloy_network::ReceiptResponse; use alloy_primitives::{B256, U256}; use alloy_rpc_types::{ pubsub::{Params, SubscriptionKind}, @@ -14,17 +13,19 @@ use reth::{ api::FullNodeComponents, builder::rpc::RpcContext, rpc::result::internal_rpc_err, tasks::TaskSpawner, }; -use reth_primitives_traits::BlockBody as _; +use reth_primitives_traits::{BlockBody as _, SignedTransaction}; use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider}; use reth_rpc::{EthFilter, EthPubSub}; use reth_rpc_eth_api::{ + helpers::{EthBlocks, EthTransactions, LoadReceipt}, + transaction::ConvertReceiptInput, EthApiServer, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock, - RpcHeader, RpcNodeCore, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq, + RpcConvert, RpcHeader, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq, }; use serde::Serialize; -use std::{marker::PhantomData, sync::Arc}; +use std::{borrow::Cow, marker::PhantomData, sync::Arc}; use tokio_stream::{Stream, StreamExt}; -use tracing::{trace, Instrument}; +use tracing::{info, trace, Instrument}; use crate::{ node::primitives::{HlPrimitives, TransactionSigned}, @@ -40,6 +41,9 @@ pub trait EthWrapper: RpcHeader, > + FullEthApiTypes + RpcNodeCoreExt> + + EthBlocks + + EthTransactions + + LoadReceipt + 'static { } @@ -52,8 +56,10 @@ impl< RpcReceipt, RpcHeader, > + FullEthApiTypes - + RpcNodeCoreExt - + RpcNodeCore> + + RpcNodeCoreExt> + + EthBlocks + + EthTransactions + + LoadReceipt + 'static, > EthWrapper for T { @@ -128,7 +134,7 @@ impl EthFilterApiServer> let logs = EthFilterApiServer::logs(&*self.filter, filter).await?; let provider = self.provider.clone(); - Ok(logs.into_iter().filter_map(|log| exclude_system_tx::(log, &provider)).collect()) + Ok(logs.into_iter().filter_map(|log| adjust_log::(log, &provider)).collect()) } } @@ -174,9 +180,7 @@ impl EthPubSubApiServer> }; let _ = pipe_from_stream( sink, - pubsub - .log_stream(filter) - .filter_map(|log| exclude_system_tx::(log, &provider)), + pubsub.log_stream(filter).filter_map(|log| adjust_log::(log, &provider)), ) .await; } else { @@ -188,7 +192,7 @@ impl EthPubSubApiServer> } } -fn exclude_system_tx(mut log: Log, provider: &Eth::Provider) -> Option { +fn adjust_log(mut log: Log, provider: &Eth::Provider) -> Option { let transaction_index = log.transaction_index?; let log_index = log.log_index?; @@ -306,6 +310,9 @@ pub trait EthBlockApi { &self, number: BlockNumberOrTag, ) -> RpcResult>; + + #[method(name = "getTransactionReceipt")] + async fn transaction_receipt(&self, hash: B256) -> RpcResult>; } macro_rules! engine_span { @@ -318,7 +325,7 @@ fn is_system_tx(tx: &TransactionSigned) -> bool { tx.is_system_transaction() } -fn exclude_system_tx_from_block( +fn adjust_block( recovered_block: &RpcBlock, eth_api: &Eth, ) -> RpcBlock { @@ -339,9 +346,86 @@ fn exclude_system_tx_from_block( new_block } -fn system_tx_count_for_block(eth_api: &Eth, number: BlockId) -> usize { +async fn adjust_block_receipts( + block_id: BlockId, + eth_api: &Eth, +) -> Result>)>, Eth::Error> { + // Modified from EthBlocks::block_receipt. See `NOTE` comment below. + let system_tx_count = system_tx_count_for_block(eth_api, block_id); + if let Some((block, receipts)) = EthBlocks::load_block_and_receipts(eth_api, block_id).await? { + let block_number = block.number; + let base_fee = block.base_fee_per_gas; + let block_hash = block.hash(); + let excess_blob_gas = block.excess_blob_gas; + let timestamp = block.timestamp; + let mut gas_used = 0; + let mut next_log_index = 0; + + let inputs = block + .transactions_recovered() + .zip(receipts.iter()) + .enumerate() + .filter_map(|(idx, (tx, receipt))| { + if receipt.cumulative_gas_used() == 0 { + // NOTE: modified to exclude system tx + return None; + } + let meta = TransactionMeta { + tx_hash: *tx.tx_hash(), + index: (idx - system_tx_count) as u64, + block_hash, + block_number, + base_fee, + excess_blob_gas, + timestamp, + }; + + let input = ConvertReceiptInput { + receipt: Cow::Borrowed(receipt), + tx, + gas_used: receipt.cumulative_gas_used() - gas_used, + next_log_index, + meta, + }; + + gas_used = receipt.cumulative_gas_used(); + next_log_index += receipt.logs().len(); + + Some(input) + }) + .collect::>(); + + return eth_api + .tx_resp_builder() + .convert_receipts(inputs) + .map(|receipts| Some((system_tx_count, receipts))); + } + + Ok(None) +} + +async fn adjust_transaction_receipt( + tx_hash: B256, + eth_api: &Eth, +) -> Result>, Eth::Error> { + match eth_api.load_transaction_and_receipt(tx_hash).await? { + Some((_, meta, _)) => { + // LoadReceipt::block_transaction_receipt loads the block again, so loading blocks again doesn't hurt performance much + info!("block hash: {:?}", meta.block_hash); + let Some((system_tx_count, block_receipts)) = + adjust_block_receipts(meta.block_hash.into(), eth_api).await? + else { + unreachable!(); + }; + Ok(Some(block_receipts.into_iter().nth(meta.index as usize - system_tx_count).unwrap())) + } + None => Ok(None), + } +} + +fn system_tx_count_for_block(eth_api: &Eth, block_id: BlockId) -> usize { let provider = eth_api.provider(); - let block = provider.block_by_id(number).unwrap().unwrap(); + let block = provider.block_by_id(block_id).unwrap().unwrap(); let system_tx_count = block.body.transactions().iter().filter(|tx| is_system_tx(tx)).count(); system_tx_count } @@ -351,6 +435,7 @@ impl EthBlockApiServer, RpcReceipt< for HlNodeBlockFilterHttp where Eth: EthApiTypes + 'static, + ErrorObject<'static>: From, { /// Handler for: `eth_getBlockByHash` async fn block_by_hash( @@ -359,7 +444,7 @@ where full: bool, ) -> RpcResult>> { let res = self.eth_api.block_by_hash(hash, full).instrument(engine_span!()).await?; - Ok(res.map(|block| exclude_system_tx_from_block(&block, &*self.eth_api))) + Ok(res.map(|block| adjust_block(&block, &*self.eth_api))) } /// Handler for: `eth_getBlockByNumber` @@ -370,7 +455,7 @@ where ) -> RpcResult>> { trace!(target: "rpc::eth", ?number, ?full, "Serving eth_getBlockByNumber"); let res = self.eth_api.block_by_number(number, full).instrument(engine_span!()).await?; - Ok(res.map(|block| exclude_system_tx_from_block(&block, &*self.eth_api))) + Ok(res.map(|block| adjust_block(&block, &*self.eth_api))) } /// Handler for: `eth_getBlockTransactionCountByHash` @@ -379,8 +464,8 @@ where let res = self.eth_api.block_transaction_count_by_hash(hash).instrument(engine_span!()).await?; Ok(res.map(|count| { - count - - U256::from(system_tx_count_for_block(&*self.eth_api, BlockId::Hash(hash.into()))) + count + - U256::from(system_tx_count_for_block(&*self.eth_api, BlockId::Hash(hash.into()))) })) } @@ -400,16 +485,24 @@ where })) } + async fn transaction_receipt( + &self, + hash: B256, + ) -> RpcResult>> { + trace!(target: "rpc::eth", ?hash, "Serving eth_getTransactionReceipt"); + let eth_api = &*self.eth_api; + Ok(adjust_transaction_receipt(hash, eth_api).instrument(engine_span!()).await?) + } + /// Handler for: `eth_getBlockReceipts` async fn block_receipts( &self, block_id: BlockId, ) -> RpcResult>>> { trace!(target: "rpc::eth", ?block_id, "Serving eth_getBlockReceipts"); - let res = self.eth_api.block_receipts(block_id).instrument(engine_span!()).await?; - Ok(res.map(|receipts| { - receipts.into_iter().filter(|receipt| receipt.cumulative_gas_used() > 0).collect() - })) + let result = + adjust_block_receipts(block_id, &*self.eth_api).instrument(engine_span!()).await?; + Ok(result.map(|(_, receipts)| receipts)) } } @@ -420,6 +513,7 @@ where Node: FullNodeComponents, Node::Provider: BlockIdReader + BlockReader, EthApi: EthWrapper, + ErrorObject<'static>: From, { ctx.modules.replace_configured( HlNodeFilterHttp::new(