From e4c1789880d675d930604a19f6269452706d82ba Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Thu, 8 Jun 2023 23:26:43 +0200 Subject: [PATCH] feat: add rpc-testing crate (#3069) --- Cargo.lock | 25 +++- Cargo.toml | 1 + crates/rpc/rpc-testing-util/Cargo.toml | 28 ++++ crates/rpc/rpc-testing-util/src/lib.rs | 12 ++ crates/rpc/rpc-testing-util/src/trace.rs | 124 ++++++++++++++++++ crates/rpc/rpc-testing-util/src/utils.rs | 13 ++ crates/rpc/rpc-testing-util/tests/it/main.rs | 3 + crates/rpc/rpc-testing-util/tests/it/trace.rs | 23 ++++ 8 files changed, 223 insertions(+), 6 deletions(-) create mode 100644 crates/rpc/rpc-testing-util/Cargo.toml create mode 100644 crates/rpc/rpc-testing-util/src/lib.rs create mode 100644 crates/rpc/rpc-testing-util/src/trace.rs create mode 100644 crates/rpc/rpc-testing-util/src/utils.rs create mode 100644 crates/rpc/rpc-testing-util/tests/it/main.rs create mode 100644 crates/rpc/rpc-testing-util/tests/it/trace.rs diff --git a/Cargo.lock b/Cargo.lock index 02b9ddf6b..93eb30716 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2158,9 +2158,9 @@ checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" [[package]] name = "futures" -version = "0.3.26" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13e2792b0ff0340399d58445b88fd9770e3489eff258a4cbc1523418f12abf84" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" dependencies = [ "futures-channel", "futures-core", @@ -2189,9 +2189,9 @@ checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" [[package]] name = "futures-executor" -version = "0.3.26" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8de0a35a6ab97ec8869e32a2473f4b1324459e14c29275d14b10cb1fd19b50e" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" dependencies = [ "futures-core", "futures-task", @@ -5329,6 +5329,19 @@ dependencies = [ "serde_json", ] +[[package]] +name = "reth-rpc-api-testing-util" +version = "0.1.0" +dependencies = [ + "async-trait", + "futures", + "jsonrpsee", + "reth-primitives", + "reth-rpc-api", + "reth-rpc-types", + "tokio", +] + [[package]] name = "reth-rpc-builder" version = "0.1.0" @@ -6734,9 +6747,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.28.1" +version = "1.28.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0aa32867d44e6f2ce3385e89dceb990188b8bb0fb25b0cf576647a6f98ac5105" +checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" dependencies = [ "autocfg", "bytes", diff --git a/Cargo.toml b/Cargo.toml index 0a26c11eb..f11dcb6d8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ members = [ "crates/rpc/rpc-builder", "crates/rpc/rpc-engine-api", "crates/rpc/rpc-types", + "crates/rpc/rpc-testing-util", "crates/staged-sync", "crates/stages", "crates/storage/codecs", diff --git a/crates/rpc/rpc-testing-util/Cargo.toml b/crates/rpc/rpc-testing-util/Cargo.toml new file mode 100644 index 000000000..dd785dace --- /dev/null +++ b/crates/rpc/rpc-testing-util/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "reth-rpc-api-testing-util" +version = "0.1.0" +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = """ +Reth RPC testing helpers +""" + +[dependencies] +# reth +reth-primitives = { path = "../../primitives" } +reth-rpc-types = { path = "../rpc-types" } +reth-rpc-api = { path = "../rpc-api", default-features = false, features = ["client"] } + +# async +async-trait = "0.1" +futures = "0.3" + +# misc +jsonrpsee = { version = "0.18", features = ["client", "async-client"] } + +[dev-dependencies] +tokio = { version = "1", features = ["rt-multi-thread", "macros", "rt"] } + diff --git a/crates/rpc/rpc-testing-util/src/lib.rs b/crates/rpc/rpc-testing-util/src/lib.rs new file mode 100644 index 000000000..a3e2b6af1 --- /dev/null +++ b/crates/rpc/rpc-testing-util/src/lib.rs @@ -0,0 +1,12 @@ +#![warn(missing_debug_implementations, missing_docs, unreachable_pub, unused_crate_dependencies)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! Reth RPC testing utilities. + +pub mod trace; + +pub mod utils; diff --git a/crates/rpc/rpc-testing-util/src/trace.rs b/crates/rpc/rpc-testing-util/src/trace.rs new file mode 100644 index 000000000..cdf6ad81a --- /dev/null +++ b/crates/rpc/rpc-testing-util/src/trace.rs @@ -0,0 +1,124 @@ +//! Helpers for testing trace calls. + +use futures::{Stream, StreamExt}; +use reth_primitives::BlockId; +use reth_rpc_api::clients::TraceApiClient; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use jsonrpsee::core::Error as RpcError; +use reth_rpc_types::trace::parity::LocalizedTransactionTrace; + +/// A result type for the `trace_block` method that also +pub type TraceBlockResult = Result<(Vec, BlockId), (RpcError, BlockId)>; + +/// An extension trait for the Trace API. +#[async_trait::async_trait] +pub trait TraceApiExt { + /// The client type that is used to make the requests. + type Client; + + /// Returns a new stream that yields the traces for the given blocks. + /// + /// See also [StreamExt::buffered]. + fn trace_block_buffered(&self, params: I, n: usize) -> TraceBlockStream<'_> + where + I: IntoIterator, + B: Into; + + /// Returns a new stream that yields the traces for the given blocks. + /// + /// See also [StreamExt::buffer_unordered]. + fn trace_block_buffered_unordered(&self, params: I, n: usize) -> TraceBlockStream<'_> + where + I: IntoIterator, + B: Into; +} + +#[async_trait::async_trait] +impl TraceApiExt for T { + type Client = T; + + fn trace_block_buffered(&self, params: I, n: usize) -> TraceBlockStream<'_> + where + I: IntoIterator, + B: Into, + { + let blocks = params.into_iter().map(|b| b.into()).collect::>(); + let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move { + match self.trace_block(block).await { + Ok(result) => Ok((result.unwrap_or_default(), block)), + Err(err) => Err((err, block)), + } + })) + .buffered(n); + TraceBlockStream { stream: Box::pin(stream) } + } + + fn trace_block_buffered_unordered(&self, params: I, n: usize) -> TraceBlockStream<'_> + where + I: IntoIterator, + B: Into, + { + let blocks = params.into_iter().map(|b| b.into()).collect::>(); + let stream = futures::stream::iter(blocks.into_iter().map(move |block| async move { + match self.trace_block(block).await { + Ok(result) => Ok((result.unwrap_or_default(), block)), + Err(err) => Err((err, block)), + } + })) + .buffer_unordered(n); + TraceBlockStream { stream: Box::pin(stream) } + } +} + +/// A stream that yields the traces for the requested blocks. +#[must_use = "streams do nothing unless polled"] +pub struct TraceBlockStream<'a> { + stream: Pin + 'a>>, +} + +impl<'a> TraceBlockStream<'a> { + /// Returns the next error result of the stream. + pub async fn next_err(&mut self) -> Option<(RpcError, BlockId)> { + loop { + match self.next().await? { + Ok(_) => continue, + Err(err) => return Some(err), + } + } + } +} + +impl<'a> Stream for TraceBlockStream<'a> { + type Item = TraceBlockResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.stream.as_mut().poll_next(cx) + } +} + +impl<'a> std::fmt::Debug for TraceBlockStream<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TraceBlockStream").finish_non_exhaustive() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use jsonrpsee::http_client::HttpClientBuilder; + use reth_primitives::BlockNumberOrTag; + + fn assert_is_stream(_: &St) {} + + #[tokio::test] + async fn can_create_block_stream() { + let client = HttpClientBuilder::default().build("http://localhost:8545").unwrap(); + let block = vec![BlockId::Number(5u64.into()), BlockNumberOrTag::Latest.into()]; + let stream = client.trace_block_buffered(block, 2); + assert_is_stream(&stream); + } +} diff --git a/crates/rpc/rpc-testing-util/src/utils.rs b/crates/rpc/rpc-testing-util/src/utils.rs new file mode 100644 index 000000000..8a74f90c9 --- /dev/null +++ b/crates/rpc/rpc-testing-util/src/utils.rs @@ -0,0 +1,13 @@ +//! Utils for testing RPC. + +/// This will read the value of the given environment variable and parse it as a URL. +/// +/// If the value has no http(s) scheme, it will be appended: `http://{var}`. +pub fn parse_env_url(var: &str) -> Result { + let var = std::env::var(var)?; + if var.starts_with("http") { + Ok(var) + } else { + Ok(format!("http://{}", var)) + } +} diff --git a/crates/rpc/rpc-testing-util/tests/it/main.rs b/crates/rpc/rpc-testing-util/tests/it/main.rs new file mode 100644 index 000000000..76f73cd8f --- /dev/null +++ b/crates/rpc/rpc-testing-util/tests/it/main.rs @@ -0,0 +1,3 @@ +mod trace; + +fn main() {} diff --git a/crates/rpc/rpc-testing-util/tests/it/trace.rs b/crates/rpc/rpc-testing-util/tests/it/trace.rs new file mode 100644 index 000000000..0e3d6559f --- /dev/null +++ b/crates/rpc/rpc-testing-util/tests/it/trace.rs @@ -0,0 +1,23 @@ +use jsonrpsee::http_client::HttpClientBuilder; +use reth_rpc_api_testing_util::{trace::TraceApiExt, utils::parse_env_url}; +use std::time::Instant; + +/// This is intended to be run locally against a running node. +/// +/// This is a noop of env var `RETH_RPC_TEST_NODE_URL` is not set. +#[tokio::test(flavor = "multi_thread")] +async fn trace_many_blocks() { + let url = parse_env_url("RETH_RPC_TEST_NODE_URL"); + if url.is_err() { + return + } + let url = url.unwrap(); + + let client = HttpClientBuilder::default().build(url).unwrap(); + let mut stream = client.trace_block_buffered_unordered(15_000_000..=16_000_100, 20); + let now = Instant::now(); + while let Some((err, block)) = stream.next_err().await { + eprintln!("Error tracing block {block:?}: {err:?}"); + } + println!("Traced all blocks in {:?}", now.elapsed()); +}