feat(rpc): Replay of Ethereum Transactions with ReplayTransactionStream (#5134)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
DoTheBestToGetTheBest
2023-10-23 09:30:01 -07:00
committed by GitHub
parent d05b32449b
commit fbf12de4e4

View File

@ -1,18 +1,19 @@
//! Helpers for testing trace calls.
use futures::{Stream, StreamExt};
use reth_primitives::BlockId;
use jsonrpsee::core::Error as RpcError;
use reth_primitives::{BlockId, TxHash};
use reth_rpc_api::clients::TraceApiClient;
use reth_rpc_types::trace::parity::{LocalizedTransactionTrace, TraceResults, TraceType};
use std::{
collections::HashSet,
pin::Pin,
task::{Context, Poll},
};
use jsonrpsee::core::Error as RpcError;
use reth_rpc_types::trace::parity::LocalizedTransactionTrace;
/// A result type for the `trace_block` method that also captures the requested block.
pub type TraceBlockResult = Result<(Vec<LocalizedTransactionTrace>, BlockId), (RpcError, BlockId)>;
/// Type alias representing the result of replaying a transaction.
pub type ReplayTransactionResult = Result<(TraceResults, TxHash), (RpcError, TxHash)>;
/// An extension trait for the Trace API.
#[async_trait::async_trait]
@ -35,6 +36,36 @@ pub trait TraceApiExt {
where
I: IntoIterator<Item = B>,
B: Into<BlockId>;
/// Returns a new stream that replays the transactions for the given transaction hashes.
///
/// This returns all results in order.
fn replay_transactions<I>(
&self,
tx_hashes: I,
trace_types: HashSet<TraceType>,
) -> ReplayTransactionStream<'_>
where
I: IntoIterator<Item = TxHash>;
}
/// A stream that replays the transactions for the requested hashes.
#[must_use = "streams do nothing unless polled"]
pub struct ReplayTransactionStream<'a> {
stream: Pin<Box<dyn Stream<Item = ReplayTransactionResult> + 'a>>,
}
impl<'a> Stream for ReplayTransactionStream<'a> {
type Item = ReplayTransactionResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl<'a> std::fmt::Debug for ReplayTransactionStream<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReplayTransactionStream").finish()
}
}
#[async_trait::async_trait]
@ -72,6 +103,28 @@ impl<T: TraceApiClient + Sync> TraceApiExt for T {
.buffer_unordered(n);
TraceBlockStream { stream: Box::pin(stream) }
}
fn replay_transactions<I>(
&self,
tx_hashes: I,
trace_types: HashSet<TraceType>,
) -> ReplayTransactionStream<'_>
where
I: IntoIterator<Item = TxHash>,
{
let hashes = tx_hashes.into_iter().collect::<Vec<_>>();
let stream = futures::stream::iter(hashes.into_iter().map(move |hash| {
let trace_types_clone = trace_types.clone(); // Clone outside of the async block
async move {
match self.replay_transaction(hash, trace_types_clone).await {
Ok(result) => Ok((result, hash)),
Err(err) => Err((err, hash)),
}
}
}))
.buffered(10);
ReplayTransactionStream { stream: Box::pin(stream) }
}
}
/// A stream that yields the traces for the requested blocks.