feat(rpc-testing-utils) : stream support for trace_get functionality (#5265)

This commit is contained in:
DoTheBestToGetTheBest
2023-11-02 03:35:52 -07:00
committed by GitHub
parent 7cc64024d9
commit b78b8b8990

View File

@ -1,11 +1,11 @@
//! Helpers for testing trace calls.
use futures::{Stream, StreamExt};
use jsonrpsee::core::Error as RpcError;
use reth_primitives::{BlockId, Bytes, TxHash};
use reth_primitives::{BlockId, Bytes, TxHash, B256};
use reth_rpc_api::clients::TraceApiClient;
use reth_rpc_types::{
trace::parity::{LocalizedTransactionTrace, TraceResults, TraceType},
CallRequest,
CallRequest, Index,
};
use std::{
collections::HashSet,
@ -27,6 +27,10 @@ pub type CallManyTraceResult = Result<
(Vec<TraceResults>, Vec<(CallRequest, HashSet<TraceType>)>),
(RpcError, Vec<(CallRequest, HashSet<TraceType>)>),
>;
/// Result type for the `trace_get` method that also captures the requested transaction hash and
/// index.
pub type TraceGetResult =
Result<(Option<LocalizedTransactionTrace>, B256, Vec<Index>), (RpcError, B256, Vec<Index>)>;
/// An extension trait for the Trace API.
#[async_trait::async_trait]
@ -78,7 +82,31 @@ pub trait TraceApiExt {
) -> CallManyTraceStream<'_>
where
I: IntoIterator<Item = (CallRequest, HashSet<TraceType>)>;
/// Returns a new stream that yields the traces for the given transaction hash and indices.
fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
where
I: IntoIterator<Item = Index>;
}
/// A stream that asynchronously yields the results of the `trace_get` method for a given
/// transaction hash and a series of indices.
#[must_use = "streams do nothing unless polled"]
pub struct TraceGetStream<'a> {
stream: Pin<Box<dyn Stream<Item = TraceGetResult> + 'a>>,
}
impl<'a> Stream for TraceGetStream<'a> {
type Item = TraceGetResult;
/// Attempts to pull out the next item of the stream
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl<'a> std::fmt::Debug for TraceGetStream<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TraceGetStream").finish_non_exhaustive()
}
}
/// A stream that provides asynchronous iteration over results from the `trace_call_many` function.
///
/// The stream yields items of type `CallManyTraceResult`.
@ -231,6 +259,21 @@ impl<T: TraceApiClient + Sync> TraceApiExt for T {
});
CallManyTraceStream { stream: Box::pin(stream) }
}
fn trace_get_stream<I>(&self, hash: B256, indices: I) -> TraceGetStream<'_>
where
I: IntoIterator<Item = Index>,
{
let index_list = indices.into_iter().collect::<Vec<_>>();
let stream = futures::stream::iter(index_list.into_iter().map(move |index| async move {
match self.trace_get(hash, vec![index]).await {
Ok(result) => Ok((result, hash, vec![index])),
Err(err) => Err((err, hash, vec![index])),
}
}))
.buffered(10);
TraceGetStream { stream: Box::pin(stream) }
}
}
/// A stream that yields the traces for the requested blocks.