feat(rpc-testing-utils): implement trace_call streaming (#5347)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
DoTheBestToGetTheBest
2023-11-10 03:49:14 -08:00
committed by GitHub
parent ee351fc493
commit 13fd00c117

View File

@ -7,6 +7,7 @@ use reth_rpc_types::{
trace::{
filter::TraceFilter,
parity::{LocalizedTransactionTrace, TraceResults, TraceType},
tracerequest::TraceCallRequest,
},
CallRequest, Index,
};
@ -37,6 +38,8 @@ pub type TraceGetResult =
/// Represents a result type for the `trace_filter` stream extension.
pub type TraceFilterResult =
Result<(Vec<LocalizedTransactionTrace>, TraceFilter), (RpcError, TraceFilter)>;
/// Represents the result of a single trace call.
pub type TraceCallResult = Result<TraceResults, (RpcError, TraceCallRequest)>;
/// An extension trait for the Trace API.
#[async_trait::async_trait]
@ -97,6 +100,27 @@ pub trait TraceApiExt {
fn trace_filter_stream<I>(&self, filters: I) -> TraceFilterStream<'_>
where
I: IntoIterator<Item = TraceFilter>;
/// Returns a new stream that yields the trace results for the given call requests.
fn trace_call_stream(&self, request: TraceCallRequest) -> TraceCallStream<'_>;
}
/// `TraceCallStream` provides an asynchronous stream of tracing results.
#[must_use = "streams do nothing unless polled"]
pub struct TraceCallStream<'a> {
stream: Pin<Box<dyn Stream<Item = TraceCallResult> + 'a>>,
}
impl<'a> Stream for TraceCallStream<'a> {
type Item = TraceCallResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl<'a> std::fmt::Debug for TraceCallStream<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TraceCallStream").finish()
}
}
/// Represents a stream that asynchronously yields the results of the `trace_filter` method.
@ -322,6 +346,25 @@ impl<T: TraceApiClient + Sync> TraceApiExt for T {
.buffered(10);
TraceFilterStream { stream: Box::pin(stream) }
}
fn trace_call_stream(&self, request: TraceCallRequest) -> TraceCallStream<'_> {
let stream = futures::stream::once(async move {
match self
.trace_call(
request.call.clone(),
request.trace_types.clone(),
request.block_id,
request.state_overrides.clone(),
request.block_overrides.clone(),
)
.await
{
Ok(result) => Ok(result),
Err(err) => Err((err, request)),
}
});
TraceCallStream { stream: Box::pin(stream) }
}
}
/// A stream that yields the traces for the requested blocks.