fix: Fix block and transaction receipts' gas and transaction index

This commit is contained in:
sprites0
2025-08-22 10:08:48 -04:00
parent 67cc8b8360
commit 095ad0f65d

View File

@ -1,7 +1,6 @@
use alloy_consensus::TxReceipt; use alloy_consensus::{transaction::TransactionMeta, TxReceipt};
use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_json_rpc::RpcObject; use alloy_json_rpc::RpcObject;
use alloy_network::ReceiptResponse;
use alloy_primitives::{B256, U256}; use alloy_primitives::{B256, U256};
use alloy_rpc_types::{ use alloy_rpc_types::{
pubsub::{Params, SubscriptionKind}, pubsub::{Params, SubscriptionKind},
@ -14,17 +13,19 @@ use reth::{
api::FullNodeComponents, builder::rpc::RpcContext, rpc::result::internal_rpc_err, api::FullNodeComponents, builder::rpc::RpcContext, rpc::result::internal_rpc_err,
tasks::TaskSpawner, tasks::TaskSpawner,
}; };
use reth_primitives_traits::BlockBody as _; use reth_primitives_traits::{BlockBody as _, SignedTransaction};
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider}; use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
use reth_rpc::{EthFilter, EthPubSub}; use reth_rpc::{EthFilter, EthPubSub};
use reth_rpc_eth_api::{ use reth_rpc_eth_api::{
helpers::{EthBlocks, EthTransactions, LoadReceipt},
transaction::ConvertReceiptInput,
EthApiServer, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock, EthApiServer, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock,
RpcHeader, RpcNodeCore, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq, RpcConvert, RpcHeader, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq,
}; };
use serde::Serialize; use serde::Serialize;
use std::{marker::PhantomData, sync::Arc}; use std::{borrow::Cow, marker::PhantomData, sync::Arc};
use tokio_stream::{Stream, StreamExt}; use tokio_stream::{Stream, StreamExt};
use tracing::{trace, Instrument}; use tracing::{info, trace, Instrument};
use crate::{ use crate::{
node::primitives::{HlPrimitives, TransactionSigned}, node::primitives::{HlPrimitives, TransactionSigned},
@ -40,6 +41,9 @@ pub trait EthWrapper:
RpcHeader<Self::NetworkTypes>, RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<Primitives = HlPrimitives> > + FullEthApiTypes<Primitives = HlPrimitives>
+ RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static + 'static
{ {
} }
@ -52,8 +56,10 @@ impl<
RpcReceipt<Self::NetworkTypes>, RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>, RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<Primitives = HlPrimitives> > + FullEthApiTypes<Primitives = HlPrimitives>
+ RpcNodeCoreExt + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ RpcNodeCore<Provider: BlockReader<Block = HlBlock>> + EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static, + 'static,
> EthWrapper for T > EthWrapper for T
{ {
@ -128,7 +134,7 @@ impl<Eth: EthWrapper> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>>
let logs = EthFilterApiServer::logs(&*self.filter, filter).await?; let logs = EthFilterApiServer::logs(&*self.filter, filter).await?;
let provider = self.provider.clone(); let provider = self.provider.clone();
Ok(logs.into_iter().filter_map(|log| exclude_system_tx::<Eth>(log, &provider)).collect()) Ok(logs.into_iter().filter_map(|log| adjust_log::<Eth>(log, &provider)).collect())
} }
} }
@ -174,9 +180,7 @@ impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>>
}; };
let _ = pipe_from_stream( let _ = pipe_from_stream(
sink, sink,
pubsub pubsub.log_stream(filter).filter_map(|log| adjust_log::<Eth>(log, &provider)),
.log_stream(filter)
.filter_map(|log| exclude_system_tx::<Eth>(log, &provider)),
) )
.await; .await;
} else { } else {
@ -188,7 +192,7 @@ impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>>
} }
} }
fn exclude_system_tx<Eth: EthWrapper>(mut log: Log, provider: &Eth::Provider) -> Option<Log> { fn adjust_log<Eth: EthWrapper>(mut log: Log, provider: &Eth::Provider) -> Option<Log> {
let transaction_index = log.transaction_index?; let transaction_index = log.transaction_index?;
let log_index = log.log_index?; let log_index = log.log_index?;
@ -306,6 +310,9 @@ pub trait EthBlockApi<B: RpcObject, R: RpcObject> {
&self, &self,
number: BlockNumberOrTag, number: BlockNumberOrTag,
) -> RpcResult<Option<U256>>; ) -> RpcResult<Option<U256>>;
#[method(name = "getTransactionReceipt")]
async fn transaction_receipt(&self, hash: B256) -> RpcResult<Option<R>>;
} }
macro_rules! engine_span { macro_rules! engine_span {
@ -318,7 +325,7 @@ fn is_system_tx(tx: &TransactionSigned) -> bool {
tx.is_system_transaction() tx.is_system_transaction()
} }
fn exclude_system_tx_from_block<Eth: EthWrapper>( fn adjust_block<Eth: EthWrapper>(
recovered_block: &RpcBlock<Eth::NetworkTypes>, recovered_block: &RpcBlock<Eth::NetworkTypes>,
eth_api: &Eth, eth_api: &Eth,
) -> RpcBlock<Eth::NetworkTypes> { ) -> RpcBlock<Eth::NetworkTypes> {
@ -339,9 +346,86 @@ fn exclude_system_tx_from_block<Eth: EthWrapper>(
new_block new_block
} }
fn system_tx_count_for_block<Eth: EthWrapper>(eth_api: &Eth, number: BlockId) -> usize { async fn adjust_block_receipts<Eth: EthWrapper>(
block_id: BlockId,
eth_api: &Eth,
) -> Result<Option<(usize, Vec<RpcReceipt<Eth::NetworkTypes>>)>, Eth::Error> {
// Modified from EthBlocks::block_receipt. See `NOTE` comment below.
let system_tx_count = system_tx_count_for_block(eth_api, block_id);
if let Some((block, receipts)) = EthBlocks::load_block_and_receipts(eth_api, block_id).await? {
let block_number = block.number;
let base_fee = block.base_fee_per_gas;
let block_hash = block.hash();
let excess_blob_gas = block.excess_blob_gas;
let timestamp = block.timestamp;
let mut gas_used = 0;
let mut next_log_index = 0;
let inputs = block
.transactions_recovered()
.zip(receipts.iter())
.enumerate()
.filter_map(|(idx, (tx, receipt))| {
if receipt.cumulative_gas_used() == 0 {
// NOTE: modified to exclude system tx
return None;
}
let meta = TransactionMeta {
tx_hash: *tx.tx_hash(),
index: (idx - system_tx_count) as u64,
block_hash,
block_number,
base_fee,
excess_blob_gas,
timestamp,
};
let input = ConvertReceiptInput {
receipt: Cow::Borrowed(receipt),
tx,
gas_used: receipt.cumulative_gas_used() - gas_used,
next_log_index,
meta,
};
gas_used = receipt.cumulative_gas_used();
next_log_index += receipt.logs().len();
Some(input)
})
.collect::<Vec<_>>();
return eth_api
.tx_resp_builder()
.convert_receipts(inputs)
.map(|receipts| Some((system_tx_count, receipts)));
}
Ok(None)
}
async fn adjust_transaction_receipt<Eth: EthWrapper>(
tx_hash: B256,
eth_api: &Eth,
) -> Result<Option<RpcReceipt<Eth::NetworkTypes>>, Eth::Error> {
match eth_api.load_transaction_and_receipt(tx_hash).await? {
Some((_, meta, _)) => {
// LoadReceipt::block_transaction_receipt loads the block again, so loading blocks again doesn't hurt performance much
info!("block hash: {:?}", meta.block_hash);
let Some((system_tx_count, block_receipts)) =
adjust_block_receipts(meta.block_hash.into(), eth_api).await?
else {
unreachable!();
};
Ok(Some(block_receipts.into_iter().nth(meta.index as usize - system_tx_count).unwrap()))
}
None => Ok(None),
}
}
fn system_tx_count_for_block<Eth: EthWrapper>(eth_api: &Eth, block_id: BlockId) -> usize {
let provider = eth_api.provider(); let provider = eth_api.provider();
let block = provider.block_by_id(number).unwrap().unwrap(); let block = provider.block_by_id(block_id).unwrap().unwrap();
let system_tx_count = block.body.transactions().iter().filter(|tx| is_system_tx(tx)).count(); let system_tx_count = block.body.transactions().iter().filter(|tx| is_system_tx(tx)).count();
system_tx_count system_tx_count
} }
@ -351,6 +435,7 @@ impl<Eth: EthWrapper> EthBlockApiServer<RpcBlock<Eth::NetworkTypes>, RpcReceipt<
for HlNodeBlockFilterHttp<Eth> for HlNodeBlockFilterHttp<Eth>
where where
Eth: EthApiTypes + 'static, Eth: EthApiTypes + 'static,
ErrorObject<'static>: From<Eth::Error>,
{ {
/// Handler for: `eth_getBlockByHash` /// Handler for: `eth_getBlockByHash`
async fn block_by_hash( async fn block_by_hash(
@ -359,7 +444,7 @@ where
full: bool, full: bool,
) -> RpcResult<Option<RpcBlock<Eth::NetworkTypes>>> { ) -> RpcResult<Option<RpcBlock<Eth::NetworkTypes>>> {
let res = self.eth_api.block_by_hash(hash, full).instrument(engine_span!()).await?; let res = self.eth_api.block_by_hash(hash, full).instrument(engine_span!()).await?;
Ok(res.map(|block| exclude_system_tx_from_block(&block, &*self.eth_api))) Ok(res.map(|block| adjust_block(&block, &*self.eth_api)))
} }
/// Handler for: `eth_getBlockByNumber` /// Handler for: `eth_getBlockByNumber`
@ -370,7 +455,7 @@ where
) -> RpcResult<Option<RpcBlock<Eth::NetworkTypes>>> { ) -> RpcResult<Option<RpcBlock<Eth::NetworkTypes>>> {
trace!(target: "rpc::eth", ?number, ?full, "Serving eth_getBlockByNumber"); trace!(target: "rpc::eth", ?number, ?full, "Serving eth_getBlockByNumber");
let res = self.eth_api.block_by_number(number, full).instrument(engine_span!()).await?; let res = self.eth_api.block_by_number(number, full).instrument(engine_span!()).await?;
Ok(res.map(|block| exclude_system_tx_from_block(&block, &*self.eth_api))) Ok(res.map(|block| adjust_block(&block, &*self.eth_api)))
} }
/// Handler for: `eth_getBlockTransactionCountByHash` /// Handler for: `eth_getBlockTransactionCountByHash`
@ -379,8 +464,8 @@ where
let res = let res =
self.eth_api.block_transaction_count_by_hash(hash).instrument(engine_span!()).await?; self.eth_api.block_transaction_count_by_hash(hash).instrument(engine_span!()).await?;
Ok(res.map(|count| { Ok(res.map(|count| {
count - count
U256::from(system_tx_count_for_block(&*self.eth_api, BlockId::Hash(hash.into()))) - U256::from(system_tx_count_for_block(&*self.eth_api, BlockId::Hash(hash.into())))
})) }))
} }
@ -400,16 +485,24 @@ where
})) }))
} }
async fn transaction_receipt(
&self,
hash: B256,
) -> RpcResult<Option<RpcReceipt<Eth::NetworkTypes>>> {
trace!(target: "rpc::eth", ?hash, "Serving eth_getTransactionReceipt");
let eth_api = &*self.eth_api;
Ok(adjust_transaction_receipt(hash, eth_api).instrument(engine_span!()).await?)
}
/// Handler for: `eth_getBlockReceipts` /// Handler for: `eth_getBlockReceipts`
async fn block_receipts( async fn block_receipts(
&self, &self,
block_id: BlockId, block_id: BlockId,
) -> RpcResult<Option<Vec<RpcReceipt<Eth::NetworkTypes>>>> { ) -> RpcResult<Option<Vec<RpcReceipt<Eth::NetworkTypes>>>> {
trace!(target: "rpc::eth", ?block_id, "Serving eth_getBlockReceipts"); trace!(target: "rpc::eth", ?block_id, "Serving eth_getBlockReceipts");
let res = self.eth_api.block_receipts(block_id).instrument(engine_span!()).await?; let result =
Ok(res.map(|receipts| { adjust_block_receipts(block_id, &*self.eth_api).instrument(engine_span!()).await?;
receipts.into_iter().filter(|receipt| receipt.cumulative_gas_used() > 0).collect() Ok(result.map(|(_, receipts)| receipts))
}))
} }
} }
@ -420,6 +513,7 @@ where
Node: FullNodeComponents, Node: FullNodeComponents,
Node::Provider: BlockIdReader + BlockReader<Block = crate::HlBlock>, Node::Provider: BlockIdReader + BlockReader<Block = crate::HlBlock>,
EthApi: EthWrapper, EthApi: EthWrapper,
ErrorObject<'static>: From<EthApi::Error>,
{ {
ctx.modules.replace_configured( ctx.modules.replace_configured(
HlNodeFilterHttp::new( HlNodeFilterHttp::new(