//! Overrides for RPC methods to post-filter system transactions and logs. //! //! System transactions are always at the beginning of the block, //! so we can use the transaction index to determine if the log is from a system transaction, //! and if it is, we can exclude it. //! //! For non-system transactions, we can just return the log as is, and the client will //! adjust the transaction index accordingly. use alloy_consensus::{transaction::TransactionMeta, BlockHeader, TxReceipt}; use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_json_rpc::RpcObject; use alloy_primitives::{B256, U256}; use alloy_rpc_types::{ pubsub::{Params, SubscriptionKind}, BlockTransactions, Filter, FilterChanges, FilterId, Log, PendingTransactionFilterKind, TransactionInfo, }; use jsonrpsee::{proc_macros::rpc, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink}; use jsonrpsee_core::{async_trait, RpcResult}; use jsonrpsee_types::{error::INTERNAL_ERROR_CODE, ErrorObject}; use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner}; use reth_primitives_traits::{BlockBody as _, SignedTransaction}; use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider}; use reth_rpc::{eth::pubsub::SubscriptionSerializeError, EthFilter, EthPubSub, RpcTypes}; use reth_rpc_eth_api::{ helpers::{EthBlocks, EthTransactions, LoadReceipt}, transaction::ConvertReceiptInput, EthApiServer, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock, RpcConvert, RpcHeader, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq, }; use serde::Serialize; use std::{borrow::Cow, marker::PhantomData, sync::Arc}; use tokio_stream::{Stream, StreamExt}; use tracing::{trace, Instrument}; use crate::{node::primitives::HlPrimitives, HlBlock}; pub trait EthWrapper: EthApiServer< RpcTxReq, RpcTransaction, RpcBlock, RpcReceipt, RpcHeader, > + FullEthApiTypes< Primitives = HlPrimitives, NetworkTypes: RpcTypes, > + RpcNodeCoreExt> + EthBlocks + EthTransactions + LoadReceipt + 'static { } impl EthWrapper for T where T: EthApiServer< RpcTxReq, RpcTransaction, RpcBlock, RpcReceipt, RpcHeader, > + FullEthApiTypes< Primitives = HlPrimitives, NetworkTypes: RpcTypes, > + RpcNodeCoreExt> + EthBlocks + EthTransactions + LoadReceipt + 'static { } #[rpc(server, namespace = "eth")] #[async_trait] pub trait EthSystemTransactionApi { #[method(name = "getEvmSystemTxsByBlockHash")] async fn get_evm_system_txs_by_block_hash(&self, hash: B256) -> RpcResult>>; #[method(name = "getEvmSystemTxsByBlockNumber")] async fn get_evm_system_txs_by_block_number( &self, block_id: Option, ) -> RpcResult>>; #[method(name = "getEvmSystemTxsReceiptsByBlockHash")] async fn get_evm_system_txs_receipts_by_block_hash( &self, hash: B256, ) -> RpcResult>>; #[method(name = "getEvmSystemTxsReceiptsByBlockNumber")] async fn get_evm_system_txs_receipts_by_block_number( &self, block_id: Option, ) -> RpcResult>>; } pub struct HlSystemTransactionExt { eth_api: Eth, _marker: PhantomData, } impl HlSystemTransactionExt { pub fn new(eth_api: Eth) -> Self { Self { eth_api, _marker: PhantomData } } async fn get_system_txs_by_block_id( &self, block_id: BlockId, ) -> RpcResult>>> where jsonrpsee_types::ErrorObject<'static>: From<::Error>, { if let Some(block) = self.eth_api.recovered_block(block_id).await? { let block_hash = block.hash(); let block_number = block.number(); let base_fee_per_gas = block.base_fee_per_gas(); let system_txs = block .transactions_with_sender() .enumerate() .filter_map(|(index, (signer, tx))| { if tx.is_system_transaction() { let tx_info = TransactionInfo { hash: Some(*tx.tx_hash()), block_hash: Some(block_hash), block_number: Some(block_number), base_fee: base_fee_per_gas, index: Some(index as u64), }; self.eth_api .tx_resp_builder() .fill(tx.clone().with_signer(*signer), tx_info) .ok() } else { None } }) .collect(); Ok(Some(system_txs)) } else { Ok(None) } } async fn get_system_txs_receipts_by_block_id( &self, block_id: BlockId, ) -> RpcResult>>> where jsonrpsee_types::ErrorObject<'static>: From<::Error>, { if let Some((block, receipts)) = EthBlocks::load_block_and_receipts(&self.eth_api, block_id).await? { let block_number = block.number; let base_fee = block.base_fee_per_gas; let block_hash = block.hash(); let excess_blob_gas = block.excess_blob_gas; let timestamp = block.timestamp; let mut gas_used = 0; let mut next_log_index = 0; let mut inputs = Vec::new(); for (idx, (tx, receipt)) in block.transactions_recovered().zip(receipts.iter()).enumerate() { if receipt.cumulative_gas_used() != 0 { break; } let meta = TransactionMeta { tx_hash: *tx.tx_hash(), index: idx as u64, block_hash, block_number, base_fee, excess_blob_gas, timestamp, }; let input = ConvertReceiptInput { receipt: Cow::Borrowed(receipt), tx, gas_used: receipt.cumulative_gas_used() - gas_used, next_log_index, meta, }; gas_used = receipt.cumulative_gas_used(); next_log_index += receipt.logs().len(); inputs.push(input); } let receipts = self.eth_api.tx_resp_builder().convert_receipts(inputs)?; Ok(Some(receipts)) } else { Ok(None) } } } #[async_trait] impl EthSystemTransactionApiServer, RpcReceipt> for HlSystemTransactionExt where jsonrpsee_types::ErrorObject<'static>: From<::Error>, { /// Returns the system transactions for a given block hash. /// Semi-compliance with the `eth_getSystemTxsByBlockHash` RPC method introduced by hl-node. /// https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/hyperevm/json-rpc /// /// NOTE: Method name differs from hl-node because we retrieve transaction data from EVM /// (signature recovery for 'from' address, EVM hash calculation) rather than HyperCore. async fn get_evm_system_txs_by_block_hash( &self, hash: B256, ) -> RpcResult>>> { trace!(target: "rpc::eth", ?hash, "Serving eth_getEvmSystemTxsByBlockHash"); match self.get_system_txs_by_block_id(BlockId::Hash(hash.into())).await { Ok(txs) => Ok(txs), // hl-node returns none if the block is not found Err(_) => Ok(None), } } /// Returns the system transactions for a given block number, or the latest block if no block /// number is provided. Semi-compliance with the `eth_getSystemTxsByBlockNumber` RPC method /// introduced by hl-node. https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/hyperevm/json-rpc /// /// NOTE: Method name differs from hl-node because we retrieve transaction data from EVM /// (signature recovery for 'from' address, EVM hash calculation) rather than HyperCore. async fn get_evm_system_txs_by_block_number( &self, id: Option, ) -> RpcResult>>> { trace!(target: "rpc::eth", ?id, "Serving eth_getEvmSystemTxsByBlockNumber"); match self.get_system_txs_by_block_id(id.unwrap_or_default()).await? { Some(txs) => Ok(Some(txs)), None => { // hl-node returns an error if the block is not found Err(ErrorObject::owned( INTERNAL_ERROR_CODE, format!("invalid block height: {id:?}"), Some(()), )) } } } /// Returns the receipts for the system transactions for a given block hash. async fn get_evm_system_txs_receipts_by_block_hash( &self, hash: B256, ) -> RpcResult>>> { trace!(target: "rpc::eth", ?hash, "Serving eth_getEvmSystemTxsReceiptsByBlockHash"); match self.get_system_txs_receipts_by_block_id(BlockId::Hash(hash.into())).await { Ok(receipts) => Ok(receipts), // hl-node returns none if the block is not found Err(_) => Ok(None), } } /// Returns the receipts for the system transactions for a given block number, or the latest /// block if no block async fn get_evm_system_txs_receipts_by_block_number( &self, block_id: Option, ) -> RpcResult>>> { trace!(target: "rpc::eth", ?block_id, "Serving eth_getEvmSystemTxsReceiptsByBlockNumber"); match self.get_system_txs_receipts_by_block_id(block_id.unwrap_or_default()).await? { Some(receipts) => Ok(Some(receipts)), None => Err(ErrorObject::owned( INTERNAL_ERROR_CODE, format!("invalid block height: {block_id:?}"), Some(()), )), } } } pub struct HlNodeFilterHttp { filter: Arc>, provider: Arc, } impl HlNodeFilterHttp { pub fn new(filter: Arc>, provider: Arc) -> Self { Self { filter, provider } } } #[async_trait] impl EthFilterApiServer> for HlNodeFilterHttp { async fn new_filter(&self, filter: Filter) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_newFilter"); self.filter.new_filter(filter).await } async fn new_block_filter(&self) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_newBlockFilter"); self.filter.new_block_filter().await } async fn new_pending_transaction_filter( &self, kind: Option, ) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter"); self.filter.new_pending_transaction_filter(kind).await } async fn filter_changes( &self, id: FilterId, ) -> RpcResult>> { trace!(target: "rpc::eth", "Serving eth_getFilterChanges"); self.filter.filter_changes(id).await.map_err(ErrorObject::from) } async fn filter_logs(&self, id: FilterId) -> RpcResult> { trace!(target: "rpc::eth", "Serving eth_getFilterLogs"); self.filter.filter_logs(id).await.map_err(ErrorObject::from) } async fn uninstall_filter(&self, id: FilterId) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_uninstallFilter"); self.filter.uninstall_filter(id).await } async fn logs(&self, filter: Filter) -> RpcResult> { trace!(target: "rpc::eth", "Serving eth_getLogs"); let logs = EthFilterApiServer::logs(&*self.filter, filter).await?; Ok(logs.into_iter().filter_map(|log| adjust_log::(log, &self.provider)).collect()) } } pub struct HlNodeFilterWs { pubsub: Arc>, provider: Arc, subscription_task_spawner: Box, } impl HlNodeFilterWs { pub fn new( pubsub: Arc>, provider: Arc, subscription_task_spawner: Box, ) -> Self { Self { pubsub, provider, subscription_task_spawner } } } #[async_trait] impl EthPubSubApiServer> for HlNodeFilterWs where jsonrpsee_types::error::ErrorObject<'static>: From<::Error>, { async fn subscribe( &self, pending: PendingSubscriptionSink, kind: SubscriptionKind, params: Option, ) -> jsonrpsee::core::SubscriptionResult { let sink = pending.accept().await?; let (pubsub, provider) = (self.pubsub.clone(), self.provider.clone()); self.subscription_task_spawner.spawn(Box::pin(async move { if kind == SubscriptionKind::Logs { let filter = match params { Some(Params::Logs(f)) => *f, Some(Params::Bool(_)) => return, _ => Default::default(), }; let _ = pipe_from_stream( sink, pubsub.log_stream(filter).filter_map(|log| adjust_log::(log, &provider)), ) .await; } else { let _ = pubsub.handle_accepted(sink, kind, params).await; } })); Ok(()) } } fn adjust_log(mut log: Log, provider: &Eth::Provider) -> Option { let (tx_idx, log_idx) = (log.transaction_index?, log.log_index?); let receipts = provider.receipts_by_block(log.block_number?.into()).unwrap()?; let (mut sys_tx_count, mut sys_log_count) = (0u64, 0u64); for receipt in receipts { if receipt.cumulative_gas_used() == 0 { sys_tx_count += 1; sys_log_count += receipt.logs().len() as u64; } } if sys_tx_count > tx_idx { return None; } log.transaction_index = Some(tx_idx - sys_tx_count); log.log_index = Some(log_idx - sys_log_count); Some(log) } async fn pipe_from_stream + Unpin>( sink: SubscriptionSink, mut stream: St, ) -> Result<(), ErrorObject<'static>> { loop { tokio::select! { _ = sink.closed() => break Ok(()), maybe_item = stream.next() => { let Some(item) = maybe_item else { break Ok(()) }; let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item) .map_err(SubscriptionSerializeError::from)?; if sink.send(msg).await.is_err() { break Ok(()); } } } } } pub struct HlNodeBlockFilterHttp { eth_api: Arc, _marker: PhantomData, } impl HlNodeBlockFilterHttp { pub fn new(eth_api: Arc) -> Self { Self { eth_api, _marker: PhantomData } } } #[rpc(server, namespace = "eth")] pub trait EthBlockApi { /// Returns information about a block by hash. #[method(name = "getBlockByHash")] async fn block_by_hash(&self, hash: B256, full: bool) -> RpcResult>; /// Returns information about a block by number. #[method(name = "getBlockByNumber")] async fn block_by_number(&self, number: BlockNumberOrTag, full: bool) -> RpcResult>; /// Returns all transaction receipts for a given block. #[method(name = "getBlockReceipts")] async fn block_receipts(&self, block_id: BlockId) -> RpcResult>>; #[method(name = "getBlockTransactionCountByHash")] async fn block_transaction_count_by_hash(&self, hash: B256) -> RpcResult>; #[method(name = "getBlockTransactionCountByNumber")] async fn block_transaction_count_by_number( &self, number: BlockNumberOrTag, ) -> RpcResult>; #[method(name = "getTransactionReceipt")] async fn transaction_receipt(&self, hash: B256) -> RpcResult>; } macro_rules! engine_span { () => { tracing::trace_span!(target: "rpc", "engine") }; } fn adjust_block( recovered_block: &RpcBlock, eth_api: &Eth, ) -> RpcBlock { let system_tx_count = system_tx_count_for_block(eth_api, recovered_block.number().into()); let mut new_block = recovered_block.clone(); new_block.transactions = match new_block.transactions { BlockTransactions::Full(mut transactions) => { transactions.drain(..system_tx_count); transactions.iter_mut().for_each(|tx| { if let Some(idx) = &mut tx.transaction_index { *idx -= system_tx_count as u64; } }); BlockTransactions::Full(transactions) } BlockTransactions::Hashes(mut hashes) => { hashes.drain(..system_tx_count); BlockTransactions::Hashes(hashes) } BlockTransactions::Uncle => BlockTransactions::Uncle, }; new_block } async fn adjust_block_receipts( block_id: BlockId, eth_api: &Eth, ) -> Result>)>, Eth::Error> { // Modified from EthBlocks::block_receipt. See `NOTE` comment below. let system_tx_count = system_tx_count_for_block(eth_api, block_id); if let Some((block, receipts)) = EthBlocks::load_block_and_receipts(eth_api, block_id).await? { let block_number = block.number; let base_fee = block.base_fee_per_gas; let block_hash = block.hash(); let excess_blob_gas = block.excess_blob_gas; let timestamp = block.timestamp; let mut gas_used = 0; let mut next_log_index = 0; let inputs = block .transactions_recovered() .zip(receipts.iter()) .enumerate() .filter_map(|(idx, (tx, receipt))| { if receipt.cumulative_gas_used() == 0 { // NOTE: modified to exclude system tx return None; } let meta = TransactionMeta { tx_hash: *tx.tx_hash(), index: (idx - system_tx_count) as u64, block_hash, block_number, base_fee, excess_blob_gas, timestamp, }; let input = ConvertReceiptInput { receipt: Cow::Borrowed(receipt), tx, gas_used: receipt.cumulative_gas_used() - gas_used, next_log_index, meta, }; gas_used = receipt.cumulative_gas_used(); next_log_index += receipt.logs().len(); Some(input) }) .collect::>(); return eth_api .tx_resp_builder() .convert_receipts(inputs) .map(|receipts| Some((system_tx_count, receipts))); } Ok(None) } async fn adjust_transaction_receipt( tx_hash: B256, eth_api: &Eth, ) -> Result>, Eth::Error> { match eth_api.load_transaction_and_receipt(tx_hash).await? { Some((_, meta, _)) => { // LoadReceipt::block_transaction_receipt loads the block again, so loading blocks again // doesn't hurt performance much let Some((system_tx_count, block_receipts)) = adjust_block_receipts(meta.block_hash.into(), eth_api).await? else { unreachable!(); }; Ok(Some(block_receipts.into_iter().nth(meta.index as usize - system_tx_count).unwrap())) } None => Ok(None), } } // This function assumes that `block_id` is already validated by the caller. fn system_tx_count_for_block(eth_api: &Eth, block_id: BlockId) -> usize { let provider = eth_api.provider(); let block = provider.block_by_id(block_id).unwrap().unwrap(); let system_tx_count = block.body.transactions().iter().filter(|tx| tx.is_system_transaction()).count(); system_tx_count } #[async_trait] impl EthBlockApiServer, RpcReceipt> for HlNodeBlockFilterHttp where Eth: EthApiTypes + 'static, ErrorObject<'static>: From, { /// Handler for: `eth_getBlockByHash` async fn block_by_hash( &self, hash: B256, full: bool, ) -> RpcResult>> { let res = self.eth_api.block_by_hash(hash, full).instrument(engine_span!()).await?; Ok(res.map(|block| adjust_block(&block, &*self.eth_api))) } /// Handler for: `eth_getBlockByNumber` async fn block_by_number( &self, number: BlockNumberOrTag, full: bool, ) -> RpcResult>> { trace!(target: "rpc::eth", ?number, ?full, "Serving eth_getBlockByNumber"); let res = self.eth_api.block_by_number(number, full).instrument(engine_span!()).await?; Ok(res.map(|block| adjust_block(&block, &*self.eth_api))) } /// Handler for: `eth_getBlockTransactionCountByHash` async fn block_transaction_count_by_hash(&self, hash: B256) -> RpcResult> { trace!(target: "rpc::eth", ?hash, "Serving eth_getBlockTransactionCountByHash"); let res = self.eth_api.block_transaction_count_by_hash(hash).instrument(engine_span!()).await?; Ok(res.map(|count| { let sys_tx_count = system_tx_count_for_block(&*self.eth_api, BlockId::Hash(hash.into())); count - U256::from(sys_tx_count) })) } /// Handler for: `eth_getBlockTransactionCountByNumber` async fn block_transaction_count_by_number( &self, number: BlockNumberOrTag, ) -> RpcResult> { trace!(target: "rpc::eth", ?number, "Serving eth_getBlockTransactionCountByNumber"); let res = self .eth_api .block_transaction_count_by_number(number) .instrument(engine_span!()) .await?; Ok(res.map(|count| { count - U256::from(system_tx_count_for_block(&*self.eth_api, number.into())) })) } async fn transaction_receipt( &self, hash: B256, ) -> RpcResult>> { trace!(target: "rpc::eth", ?hash, "Serving eth_getTransactionReceipt"); let eth_api = &*self.eth_api; Ok(adjust_transaction_receipt(hash, eth_api).instrument(engine_span!()).await?) } /// Handler for: `eth_getBlockReceipts` async fn block_receipts( &self, block_id: BlockId, ) -> RpcResult>>> { trace!(target: "rpc::eth", ?block_id, "Serving eth_getBlockReceipts"); let result = adjust_block_receipts(block_id, &*self.eth_api).instrument(engine_span!()).await?; Ok(result.map(|(_, receipts)| receipts)) } } pub fn install_hl_node_compliance( ctx: &mut RpcContext, ) -> Result<(), eyre::Error> where Node: FullNodeComponents, Node::Provider: BlockIdReader + BlockReader, EthApi: EthWrapper, ErrorObject<'static>: From, { ctx.modules.replace_configured( HlNodeFilterHttp::new( Arc::new(ctx.registry.eth_handlers().filter.clone()), Arc::new(ctx.registry.eth_api().provider().clone()), ) .into_rpc(), )?; ctx.modules.replace_configured( HlNodeFilterWs::new( Arc::new(ctx.registry.eth_handlers().pubsub.clone()), Arc::new(ctx.registry.eth_api().provider().clone()), Box::new(ctx.node().task_executor().clone()), ) .into_rpc(), )?; ctx.modules.replace_configured( HlNodeBlockFilterHttp::new(Arc::new(ctx.registry.eth_api().clone())).into_rpc(), )?; ctx.modules .merge_configured(HlSystemTransactionExt::new(ctx.registry.eth_api().clone()).into_rpc())?; Ok(()) }