feat: add rpc-testing crate (#3069)

This commit is contained in:
Matthias Seitz
2023-06-08 23:26:43 +02:00
committed by GitHub
parent ab6fff92af
commit e4c1789880
8 changed files with 223 additions and 6 deletions

25
Cargo.lock generated
View File

@ -2158,9 +2158,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.26" version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13e2792b0ff0340399d58445b88fd9770e3489eff258a4cbc1523418f12abf84" checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -2189,9 +2189,9 @@ checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
version = "0.3.26" version = "0.3.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8de0a35a6ab97ec8869e32a2473f4b1324459e14c29275d14b10cb1fd19b50e" checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-task", "futures-task",
@ -5329,6 +5329,19 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "reth-rpc-api-testing-util"
version = "0.1.0"
dependencies = [
"async-trait",
"futures",
"jsonrpsee",
"reth-primitives",
"reth-rpc-api",
"reth-rpc-types",
"tokio",
]
[[package]] [[package]]
name = "reth-rpc-builder" name = "reth-rpc-builder"
version = "0.1.0" version = "0.1.0"
@ -6734,9 +6747,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.28.1" version = "1.28.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes", "bytes",

View File

@ -32,6 +32,7 @@ members = [
"crates/rpc/rpc-builder", "crates/rpc/rpc-builder",
"crates/rpc/rpc-engine-api", "crates/rpc/rpc-engine-api",
"crates/rpc/rpc-types", "crates/rpc/rpc-types",
"crates/rpc/rpc-testing-util",
"crates/staged-sync", "crates/staged-sync",
"crates/stages", "crates/stages",
"crates/storage/codecs", "crates/storage/codecs",

View File

@ -0,0 +1,28 @@
[package]
name = "reth-rpc-api-testing-util"
version = "0.1.0"
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = """
Reth RPC testing helpers
"""
[dependencies]
# reth
reth-primitives = { path = "../../primitives" }
reth-rpc-types = { path = "../rpc-types" }
reth-rpc-api = { path = "../rpc-api", default-features = false, features = ["client"] }
# async
async-trait = "0.1"
futures = "0.3"
# misc
jsonrpsee = { version = "0.18", features = ["client", "async-client"] }
[dev-dependencies]
tokio = { version = "1", features = ["rt-multi-thread", "macros", "rt"] }

View File

@ -0,0 +1,12 @@
#![warn(missing_debug_implementations, missing_docs, unreachable_pub, unused_crate_dependencies)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Reth RPC testing utilities.
pub mod trace;
pub mod utils;

View File

@ -0,0 +1,124 @@
//! Helpers for testing trace calls.
use futures::{Stream, StreamExt};
use reth_primitives::BlockId;
use reth_rpc_api::clients::TraceApiClient;
use std::{
pin::Pin,
task::{Context, Poll},
};
use jsonrpsee::core::Error as RpcError;
use reth_rpc_types::trace::parity::LocalizedTransactionTrace;
/// A result type for the `trace_block` method that also
pub type TraceBlockResult = Result<(Vec<LocalizedTransactionTrace>, BlockId), (RpcError, BlockId)>;
/// An extension trait for the Trace API.
#[async_trait::async_trait]
pub trait TraceApiExt {
/// The client type that is used to make the requests.
type Client;
/// Returns a new stream that yields the traces for the given blocks.
///
/// See also [StreamExt::buffered].
fn trace_block_buffered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
where
I: IntoIterator<Item = B>,
B: Into<BlockId>;
/// Returns a new stream that yields the traces for the given blocks.
///
/// See also [StreamExt::buffer_unordered].
fn trace_block_buffered_unordered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
where
I: IntoIterator<Item = B>,
B: Into<BlockId>;
}
#[async_trait::async_trait]
impl<T: TraceApiClient + Sync> TraceApiExt for T {
type Client = T;
fn trace_block_buffered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
where
I: IntoIterator<Item = B>,
B: Into<BlockId>,
{
let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
match self.trace_block(block).await {
Ok(result) => Ok((result.unwrap_or_default(), block)),
Err(err) => Err((err, block)),
}
}))
.buffered(n);
TraceBlockStream { stream: Box::pin(stream) }
}
fn trace_block_buffered_unordered<I, B>(&self, params: I, n: usize) -> TraceBlockStream<'_>
where
I: IntoIterator<Item = B>,
B: Into<BlockId>,
{
let blocks = params.into_iter().map(|b| b.into()).collect::<Vec<_>>();
let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move {
match self.trace_block(block).await {
Ok(result) => Ok((result.unwrap_or_default(), block)),
Err(err) => Err((err, block)),
}
}))
.buffer_unordered(n);
TraceBlockStream { stream: Box::pin(stream) }
}
}
/// A stream that yields the traces for the requested blocks.
#[must_use = "streams do nothing unless polled"]
pub struct TraceBlockStream<'a> {
stream: Pin<Box<dyn Stream<Item = TraceBlockResult> + 'a>>,
}
impl<'a> TraceBlockStream<'a> {
/// Returns the next error result of the stream.
pub async fn next_err(&mut self) -> Option<(RpcError, BlockId)> {
loop {
match self.next().await? {
Ok(_) => continue,
Err(err) => return Some(err),
}
}
}
}
impl<'a> Stream for TraceBlockStream<'a> {
type Item = TraceBlockResult;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.stream.as_mut().poll_next(cx)
}
}
impl<'a> std::fmt::Debug for TraceBlockStream<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TraceBlockStream").finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use jsonrpsee::http_client::HttpClientBuilder;
use reth_primitives::BlockNumberOrTag;
fn assert_is_stream<St: Stream>(_: &St) {}
#[tokio::test]
async fn can_create_block_stream() {
let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap();
let block = vec![BlockId::Number(5u64.into()), BlockNumberOrTag::Latest.into()];
let stream = client.trace_block_buffered(block, 2);
assert_is_stream(&stream);
}
}

View File

@ -0,0 +1,13 @@
//! Utils for testing RPC.
/// This will read the value of the given environment variable and parse it as a URL.
///
/// If the value has no http(s) scheme, it will be appended: `http://{var}`.
pub fn parse_env_url(var: &str) -> Result<String, std::env::VarError> {
let var = std::env::var(var)?;
if var.starts_with("http") {
Ok(var)
} else {
Ok(format!("http://{}", var))
}
}

View File

@ -0,0 +1,3 @@
mod trace;
fn main() {}

View File

@ -0,0 +1,23 @@
use jsonrpsee::http_client::HttpClientBuilder;
use reth_rpc_api_testing_util::{trace::TraceApiExt, utils::parse_env_url};
use std::time::Instant;
/// This is intended to be run locally against a running node.
///
/// This is a noop of env var `RETH_RPC_TEST_NODE_URL` is not set.
#[tokio::test(flavor = "multi_thread")]
async fn trace_many_blocks() {
let url = parse_env_url("RETH_RPC_TEST_NODE_URL");
if url.is_err() {
return
}
let url = url.unwrap();
let client = HttpClientBuilder::default().build(url).unwrap();
let mut stream = client.trace_block_buffered_unordered(15_000_000..=16_000_100, 20);
let now = Instant::now();
while let Some((err, block)) = stream.next_err().await {
eprintln!("Error tracing block {block:?}: {err:?}");
}
println!("Traced all blocks in {:?}", now.elapsed());
}