From fbf12de4e42b75c6eab2f25fb337b42a522e6053 Mon Sep 17 00:00:00 2001 From: DoTheBestToGetTheBest <146037313+DoTheBestToGetTheBest@users.noreply.github.com> Date: Mon, 23 Oct 2023 09:30:01 -0700 Subject: [PATCH] feat(rpc): Replay of Ethereum Transactions with ReplayTransactionStream (#5134) Co-authored-by: Matthias Seitz --- crates/rpc/rpc-testing-util/src/trace.rs | 65 +++++++++++++++++++++--- 1 file changed, 59 insertions(+), 6 deletions(-) diff --git a/crates/rpc/rpc-testing-util/src/trace.rs b/crates/rpc/rpc-testing-util/src/trace.rs index 47cd424fd..4aa1d3750 100644 --- a/crates/rpc/rpc-testing-util/src/trace.rs +++ b/crates/rpc/rpc-testing-util/src/trace.rs @@ -1,18 +1,19 @@ //! Helpers for testing trace calls. - use futures::{Stream, StreamExt}; -use reth_primitives::BlockId; +use jsonrpsee::core::Error as RpcError; +use reth_primitives::{BlockId, TxHash}; use reth_rpc_api::clients::TraceApiClient; +use reth_rpc_types::trace::parity::{LocalizedTransactionTrace, TraceResults, TraceType}; use std::{ + collections::HashSet, pin::Pin, task::{Context, Poll}, }; - -use jsonrpsee::core::Error as RpcError; -use reth_rpc_types::trace::parity::LocalizedTransactionTrace; - /// A result type for the `trace_block` method that also captures the requested block. pub type TraceBlockResult = Result<(Vec, BlockId), (RpcError, BlockId)>; +/// Type alias representing the result of replaying a transaction. + +pub type ReplayTransactionResult = Result<(TraceResults, TxHash), (RpcError, TxHash)>; /// An extension trait for the Trace API. #[async_trait::async_trait] @@ -35,6 +36,36 @@ pub trait TraceApiExt { where I: IntoIterator, B: Into; + + /// Returns a new stream that replays the transactions for the given transaction hashes. + /// + /// This returns all results in order. + fn replay_transactions( + &self, + tx_hashes: I, + trace_types: HashSet, + ) -> ReplayTransactionStream<'_> + where + I: IntoIterator; +} + +/// A stream that replays the transactions for the requested hashes. +#[must_use = "streams do nothing unless polled"] +pub struct ReplayTransactionStream<'a> { + stream: Pin + 'a>>, +} +impl<'a> Stream for ReplayTransactionStream<'a> { + type Item = ReplayTransactionResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.stream.as_mut().poll_next(cx) + } +} + +impl<'a> std::fmt::Debug for ReplayTransactionStream<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ReplayTransactionStream").finish() + } } #[async_trait::async_trait] @@ -72,6 +103,28 @@ impl TraceApiExt for T { .buffer_unordered(n); TraceBlockStream { stream: Box::pin(stream) } } + + fn replay_transactions( + &self, + tx_hashes: I, + trace_types: HashSet, + ) -> ReplayTransactionStream<'_> + where + I: IntoIterator, + { + let hashes = tx_hashes.into_iter().collect::>(); + let stream = futures::stream::iter(hashes.into_iter().map(move |hash| { + let trace_types_clone = trace_types.clone(); // Clone outside of the async block + async move { + match self.replay_transaction(hash, trace_types_clone).await { + Ok(result) => Ok((result, hash)), + Err(err) => Err((err, hash)), + } + } + })) + .buffered(10); + ReplayTransactionStream { stream: Box::pin(stream) } + } } /// A stream that yields the traces for the requested blocks.