feat: introduce reth benchmark command (#8284)

Co-authored-by: Emilia Hane <elsaemiliaevahane@gmail.com>
This commit is contained in:
Dan Cline
2024-06-12 10:38:42 -04:00
committed by GitHub
parent e9a7691747
commit fcd28f69a8
15 changed files with 1672 additions and 8 deletions

124
Cargo.lock generated
View File

@ -140,7 +140,7 @@ dependencies = [
[[package]]
name = "alloy-consensus"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy#14ed25d8ab485fc0d313fd1e055862c9d20ef273"
source = "git+https://github.com/alloy-rs/alloy#7578618d61213ea832c40c7e613f1d644ce08f27"
dependencies = [
"alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy)",
"alloy-primitives",
@ -189,7 +189,7 @@ dependencies = [
[[package]]
name = "alloy-eips"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy#14ed25d8ab485fc0d313fd1e055862c9d20ef273"
source = "git+https://github.com/alloy-rs/alloy#7578618d61213ea832c40c7e613f1d644ce08f27"
dependencies = [
"alloy-primitives",
"alloy-rlp",
@ -214,7 +214,7 @@ dependencies = [
[[package]]
name = "alloy-genesis"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy#14ed25d8ab485fc0d313fd1e055862c9d20ef273"
source = "git+https://github.com/alloy-rs/alloy#7578618d61213ea832c40c7e613f1d644ce08f27"
dependencies = [
"alloy-primitives",
"alloy-serde 0.1.0 (git+https://github.com/alloy-rs/alloy)",
@ -319,6 +319,7 @@ dependencies = [
"alloy-primitives",
"alloy-pubsub",
"alloy-rpc-client",
"alloy-rpc-types-engine",
"alloy-rpc-types-eth 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=14ed25d)",
"alloy-rpc-types-trace",
"alloy-transport",
@ -416,7 +417,7 @@ dependencies = [
[[package]]
name = "alloy-rpc-types"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy#14ed25d8ab485fc0d313fd1e055862c9d20ef273"
source = "git+https://github.com/alloy-rs/alloy#7578618d61213ea832c40c7e613f1d644ce08f27"
dependencies = [
"alloy-rpc-types-eth 0.1.0 (git+https://github.com/alloy-rs/alloy)",
"alloy-serde 0.1.0 (git+https://github.com/alloy-rs/alloy)",
@ -488,7 +489,7 @@ dependencies = [
[[package]]
name = "alloy-rpc-types-eth"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy#14ed25d8ab485fc0d313fd1e055862c9d20ef273"
source = "git+https://github.com/alloy-rs/alloy#7578618d61213ea832c40c7e613f1d644ce08f27"
dependencies = [
"alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy)",
"alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy)",
@ -528,7 +529,7 @@ dependencies = [
[[package]]
name = "alloy-serde"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy#14ed25d8ab485fc0d313fd1e055862c9d20ef273"
source = "git+https://github.com/alloy-rs/alloy#7578618d61213ea832c40c7e613f1d644ce08f27"
dependencies = [
"alloy-primitives",
"serde",
@ -668,6 +669,24 @@ dependencies = [
"url",
]
[[package]]
name = "alloy-transport-ipc"
version = "0.1.0"
source = "git+https://github.com/alloy-rs/alloy?rev=14ed25d#14ed25d8ab485fc0d313fd1e055862c9d20ef273"
dependencies = [
"alloy-json-rpc",
"alloy-pubsub",
"alloy-transport",
"bytes",
"futures",
"interprocess 2.1.1",
"pin-project",
"serde_json",
"tokio",
"tokio-util",
"tracing",
]
[[package]]
name = "alloy-transport-ws"
version = "0.1.0"
@ -2168,6 +2187,27 @@ dependencies = [
"subtle",
]
[[package]]
name = "csv"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe"
dependencies = [
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70"
dependencies = [
"memchr",
]
[[package]]
name = "ctr"
version = "0.7.0"
@ -2601,6 +2641,12 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "doctest-file"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aac81fa3e28d21450aa4d2ac065992ba96a1d7303efbce51a95f4fd175b67562"
[[package]]
name = "downcast"
version = "0.11.0"
@ -3079,7 +3125,7 @@ dependencies = [
[[package]]
name = "foundry-blob-explorers"
version = "0.1.0"
source = "git+https://github.com/foundry-rs/block-explorers#d5fdf79cd62f378448907663fc4ba9d085393b35"
source = "git+https://github.com/foundry-rs/block-explorers#af29524f4fc7dc25f59e8ae38652022f47ebee9b"
dependencies = [
"alloy-chains",
"alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy)",
@ -4060,6 +4106,21 @@ dependencies = [
"winapi",
]
[[package]]
name = "interprocess"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13f2533e1f1a70bec71ea7a85d1c0a4dab141c314035ce76e51a19a2f48be708"
dependencies = [
"doctest-file",
"futures-core",
"libc",
"recvmsg",
"tokio",
"widestring",
"windows-sys 0.52.0",
]
[[package]]
name = "intmap"
version = "0.7.1"
@ -6067,6 +6128,12 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "recvmsg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3edd4d5d42c92f0a659926464d4cce56b562761267ecf0f469d85b7de384175"
[[package]]
name = "redox_syscall"
version = "0.2.16"
@ -6383,6 +6450,47 @@ dependencies = [
"tracing",
]
[[package]]
name = "reth-bench"
version = "0.2.0-beta.9"
dependencies = [
"alloy-consensus 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=14ed25d)",
"alloy-eips 0.1.0 (git+https://github.com/alloy-rs/alloy?rev=14ed25d)",
"alloy-json-rpc",
"alloy-provider",
"alloy-pubsub",
"alloy-rpc-client",
"alloy-rpc-types-engine",
"alloy-transport",
"alloy-transport-http",
"alloy-transport-ipc",
"alloy-transport-ws",
"async-trait",
"clap",
"csv",
"eyre",
"futures",
"libc",
"reqwest",
"reth-cli-runner",
"reth-db",
"reth-node-api",
"reth-node-core",
"reth-primitives",
"reth-provider",
"reth-rpc-types",
"reth-rpc-types-compat",
"reth-tracing",
"serde",
"serde_json",
"thiserror",
"tikv-jemallocator",
"tokio",
"tokio-util",
"tower",
"tracing",
]
[[package]]
name = "reth-blockchain-tree"
version = "0.2.0-beta.9"
@ -7068,7 +7176,7 @@ dependencies = [
"bytes",
"futures",
"futures-util",
"interprocess",
"interprocess 1.2.1",
"jsonrpsee",
"pin-project",
"rand 0.8.5",

View File

@ -9,6 +9,7 @@ exclude = [".github/"]
[workspace]
members = [
"bin/reth-bench/",
"bin/reth/",
"crates/blockchain-tree/",
"crates/blockchain-tree-api/",
@ -239,6 +240,7 @@ incremental = false
[workspace.dependencies]
# reth
reth = { path = "bin/reth" }
reth-bench = { path = "bin/reth-bench" }
reth-auto-seal-consensus = { path = "crates/consensus/auto-seal" }
reth-basic-payload-builder = { path = "crates/payload/basic" }
reth-beacon-consensus = { path = "crates/consensus/beacon" }
@ -366,6 +368,13 @@ alloy-signer = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-signer-wallet = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-network = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-transport = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-transport-http = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d", features = ["reqwest-rustls-tls"], default-features = false }
alloy-transport-ws = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-transport-ipc = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-pubsub = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-json-rpc = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-client = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
# misc
auto_impl = "1"

106
bin/reth-bench/Cargo.toml Normal file
View File

@ -0,0 +1,106 @@
[package]
name = "reth-bench"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Benchmarking for ethereum nodes"
default-run = "reth-bench"
[lints]
workspace = true
[dependencies]
# reth
reth-provider = { workspace = true }
reth-cli-runner.workspace = true
reth-db = { workspace = true, features = ["mdbx"] }
reth-node-core.workspace = true
reth-node-api.workspace = true
reth-rpc-types.workspace = true
reth-rpc-types-compat.workspace = true
reth-primitives = { workspace = true, features = ["clap", "alloy-compat"] }
reth-tracing.workspace = true
# alloy
alloy-provider = { workspace = true, features = ["engine-api", "reqwest-rustls-tls"], default-features = false }
alloy-rpc-types-engine.workspace = true
alloy-transport.workspace = true
alloy-transport-http.workspace = true
alloy-transport-ws.workspace = true
alloy-transport-ipc.workspace = true
alloy-pubsub.workspace = true
alloy-json-rpc.workspace = true
alloy-rpc-client.workspace = true
alloy-consensus.workspace = true
alloy-eips.workspace = true
# reqwest
reqwest = { workspace = true, default-features = false, features = [
"rustls-tls-native-roots",
] }
# tower
tower.workspace = true
# tracing
tracing.workspace = true
# io
serde.workspace = true
serde_json.workspace = true
# async
tokio = { workspace = true, features = [
"sync",
"macros",
"time",
"rt-multi-thread",
] }
tokio-util.workspace = true
futures.workspace = true
async-trait.workspace = true
# misc
eyre.workspace = true
thiserror.workspace = true
clap = { workspace = true, features = ["derive", "env"] }
# for writing data
csv = "1.3.0"
[target.'cfg(unix)'.dependencies]
tikv-jemallocator = { version = "0.5.0", optional = true }
libc = "0.2"
[dev-dependencies]
reth-tracing.workspace = true
[features]
default = ["jemalloc"]
asm-keccak = ["reth-primitives/asm-keccak"]
jemalloc = ["dep:tikv-jemallocator", "reth-node-core/jemalloc"]
jemalloc-prof = ["jemalloc", "tikv-jemallocator?/profiling"]
min-error-logs = ["tracing/release_max_level_error"]
min-warn-logs = ["tracing/release_max_level_warn"]
min-info-logs = ["tracing/release_max_level_info"]
min-debug-logs = ["tracing/release_max_level_debug"]
min-trace-logs = ["tracing/release_max_level_trace"]
optimism = [
"reth-primitives/optimism",
"reth-provider/optimism",
"reth-node-core/optimism",
]
# no-op feature flag for switching between the `optimism` and default functionality in CI matrices
ethereum = []
[[bin]]
name = "reth-bench"
path = "src/main.rs"

66
bin/reth-bench/README.md Normal file
View File

@ -0,0 +1,66 @@
# Benchmarking reth live sync with `reth-bench`
The binary contained in this directory, `reth-bench`, is a tool that can be used to benchmark the performance of the reth live sync. `reth-bench` is a general tool, and can be used for benchmarking node performance, as long as the node supports the engine API.
### A recap on node synchronization
Reth uses two primary methods for synchronizing the chain:
* Historical sync, which is used to synchronize the chain from genesis to a known finalized block. This involves re-executing the entire chain history.
* Live sync, which is used to synchronize the chain from a finalized block to the current head. This involves processing new blocks as they are produced.
Benchmarking historical sync for reth is fairly easy, because historical sync is a long-running, deterministic process.
Reth specifically contains the `--debug.tip` argument, which allows for running the historical sync pipeline to a specific block.
However, reth's historical sync applies optimizations that are not always possible when syncing new blocks.
Live sync, on the other hand, is a more complex process that is harder to benchmark. It is also more sensitive to network conditions of the CL.
In order to benchmark live sync, we need to simulate a CL in a controlled manner, so reth can use the same code paths it would when syncing new blocks.
### The `reth-bench` tool
The `reth-bench` tool is designed to benchmark performance of reth live sync.
It can also be used for debugging client spec implementations, as it replays historical blocks by mocking a CL client.
Performance is measured by latency and gas used in a block, as well as the computed gas used per second.
As long as the data is representative of real-world load, or closer to worst-case load test, the gas per second gives a rough sense of how much throughput the node would be able to handle.
## Prerequisites
If you will be collecting CPU profiles, make sure `reth` is compiled with the `debug-fast` profile.
For collecting memory profiles, make sure `reth` is also compiled with the `--features profiling` flag.
Otherwise, running `make maxperf` at the root of the repo should be sufficient for collecting accurate performance metrics.
## Command Usage
`reth-bench` contains different commands to benchmark different patterns of engine API calls.
The `reth-bench new-payload-fcu` command is the most representative of ethereum mainnet live sync, alternating between sending `engine_newPayload` calls and `engine_forkchoiceUpdated` calls.
Below is an overview of how to execute a benchmark:
1. **Setup**: Make sure `reth` is running in the background with the proper configuration. This setup involves ensuring the node is at the correct state, setting up profiling tools, and possibly more depending on the purpose of the benchmark's.
2. **Run the Benchmark**:
```bash
reth-bench new-payload-fcu --rpc-url http://<rpc-url>:8545 --from <start_block> --to <end_block> --jwtsecret <jwt_file_path>
```
Replace `<rpc-url>`, `<start_block>`, `<end_block>`, and `<jwt_file_path>` with the appropriate values for your testing environment.
Note that this assumes that the benchmark node's engine API is running on `http://127.0.0.1:8545`, which is set as a default value in `reth-bench`. To configure this value, use the `--engine-rpc-url` flag.
3. **Observe Outputs**: Upon running the command, `reth-bench` will output benchmark results, showing processing speeds and gas usage, which are crucial for analyzing the node's performance.
Example output:
```
2024-05-30T00:45:20.806691Z INFO Running benchmark using data from RPC URL: http://<rpc-url>:8545
// ... logs per block
2024-05-30T00:45:34.203172Z INFO Total Ggas/s: 0.15 total_duration=5.085704882s total_gas_used=741620668.0
```
4. **Stop and Review**: Once the benchmark completes, terminate the `reth` process and review the logs and performance metrics collected, if any.
5. **Repeat**.
## Additional Considerations
- **RPC Configuration**: The RPC endpoints should be accessible and configured correctly, specifically the RPC endpoint must support `eth_getBlockByNumber` and support fetching full transactions. The benchmark will make one RPC query per block as fast as possible, so ensure the RPC endpoint does not rate limit or block requests after a certain volume.
- **Reproducibility**: Ensure that the node is at the same state before attempting to retry a benchmark. The `new-payload-fcu` command specifically will commit to the database, so the node must be rolled back using `reth stage unwind` to reproducibly retry benchmarks.
- **Profiling tools**: If you are collecting CPU profiles, tools like [`samply`](https://github.com/mstange/samply) and [`perf`](https://perf.wiki.kernel.org/index.php/Main_Page) can be useful for analyzing node performance.
- **Benchmark Data**: `reth-bench` additionally contains a `--benchmark.output` flag, which will output gas used benchmarks across the benchmark range in CSV format. This may be useful for further data analysis.
- **Platform Information**: To ensure accurate and reproducible benchmarking, document the platform details, including hardware specifications, OS version, and any other relevant information before publishing any benchmarks.

View File

@ -0,0 +1,267 @@
//! This contains an authenticated rpc transport that can be used to send engine API newPayload
//! requests.
use std::sync::Arc;
use alloy_json_rpc::{RequestPacket, ResponsePacket};
use alloy_pubsub::{PubSubConnect, PubSubFrontend};
use alloy_rpc_types_engine::{Claims, JwtSecret};
use alloy_transport::{
utils::guess_local_url, Authorization, Pbf, TransportConnect, TransportError,
TransportErrorKind, TransportFut,
};
use alloy_transport_http::{reqwest::Url, Http, ReqwestTransport};
use alloy_transport_ipc::IpcConnect;
use alloy_transport_ws::WsConnect;
use futures::FutureExt;
use reqwest::header::HeaderValue;
use std::task::{Context, Poll};
use tokio::sync::RwLock;
use tower::Service;
/// An enum representing the different transports that can be used to connect to a runtime.
/// Only meant to be used internally by [`AuthenticatedTransport`].
#[derive(Clone, Debug)]
pub enum InnerTransport {
/// HTTP transport
Http(ReqwestTransport),
/// `WebSocket` transport
Ws(PubSubFrontend),
/// IPC transport
Ipc(PubSubFrontend),
}
impl InnerTransport {
/// Connects to a transport based on the given URL and JWT. Returns an [`InnerTransport`] and
/// the [`Claims`] generated from the jwt.
async fn connect(
url: Url,
jwt: JwtSecret,
) -> Result<(Self, Claims), AuthenticatedTransportError> {
match url.scheme() {
"http" | "https" => Self::connect_http(url, jwt).await,
"ws" | "wss" => Self::connect_ws(url, jwt).await,
"file" => Ok((Self::connect_ipc(url).await?, Claims::default())),
_ => Err(AuthenticatedTransportError::BadScheme(url.scheme().to_string())),
}
}
/// Connects to an HTTP [`alloy_transport_http::Http`] transport. Returns an [`InnerTransport`]
/// and the [Claims] generated from the jwt.
async fn connect_http(
url: Url,
jwt: JwtSecret,
) -> Result<(Self, Claims), AuthenticatedTransportError> {
let mut client_builder =
reqwest::Client::builder().tls_built_in_root_certs(url.scheme() == "https");
let mut headers = reqwest::header::HeaderMap::new();
// Add the JWT it to the headers if we can decode it.
let (auth, claims) =
build_auth(jwt).map_err(|e| AuthenticatedTransportError::InvalidJwt(e.to_string()))?;
let mut auth_value: HeaderValue =
HeaderValue::from_str(&auth.to_string()).expect("Header should be valid string");
auth_value.set_sensitive(true);
headers.insert(reqwest::header::AUTHORIZATION, auth_value);
client_builder = client_builder.default_headers(headers);
let client =
client_builder.build().map_err(AuthenticatedTransportError::HttpConstructionError)?;
let inner = Self::Http(Http::with_client(client, url));
Ok((inner, claims))
}
/// Connects to a `WebSocket` [`alloy_transport_ws::WsConnect`] transport. Returns an
/// [`InnerTransport`] and the [`Claims`] generated from the jwt.
async fn connect_ws(
url: Url,
jwt: JwtSecret,
) -> Result<(Self, Claims), AuthenticatedTransportError> {
// Add the JWT it to the headers if we can decode it.
let (auth, claims) =
build_auth(jwt).map_err(|e| AuthenticatedTransportError::InvalidJwt(e.to_string()))?;
let inner = WsConnect { url: url.to_string(), auth: Some(auth) }
.into_service()
.await
.map(Self::Ws)
.map_err(|e| AuthenticatedTransportError::TransportError(e, url.to_string()))?;
Ok((inner, claims))
}
/// Connects to an IPC [`alloy_transport_ipc::IpcConnect`] transport. Returns an
/// [`InnerTransport`]. Does not return any [`Claims`] because IPC does not require them.
async fn connect_ipc(url: Url) -> Result<Self, AuthenticatedTransportError> {
// IPC, even for engine, typically does not require auth because it's local
IpcConnect::new(url.to_string())
.into_service()
.await
.map(InnerTransport::Ipc)
.map_err(|e| AuthenticatedTransportError::TransportError(e, url.to_string()))
}
}
/// An authenticated transport that can be used to send requests that contain a jwt bearer token.
#[derive(Debug, Clone)]
pub struct AuthenticatedTransport {
/// The inner actual transport used.
///
/// Also contains the current claims being used. This is used to determine whether or not we
/// should create another client.
inner_and_claims: Arc<RwLock<(InnerTransport, Claims)>>,
/// The current jwt being used. This is so we can recreate claims.
jwt: JwtSecret,
/// The current URL being used. This is so we can recreate the client if needed.
url: Url,
}
/// An error that can occur when creating an authenticated transport.
#[derive(Debug, thiserror::Error)]
pub enum AuthenticatedTransportError {
/// The URL is invalid.
#[error("The URL is invalid")]
InvalidUrl,
/// Failed to lock transport
#[error("Failed to lock transport")]
LockFailed,
/// The JWT is invalid.
#[error("The JWT is invalid: {0}")]
InvalidJwt(String),
/// The transport failed to connect.
#[error("The transport failed to connect to {1}, transport error: {0}")]
TransportError(TransportError, String),
/// The http client could not be built.
#[error("The http client could not be built")]
HttpConstructionError(reqwest::Error),
/// The scheme is invalid.
#[error("The URL scheme is invalid: {0}")]
BadScheme(String),
}
impl AuthenticatedTransport {
/// Create a new builder with the given URL.
pub async fn connect(url: Url, jwt: JwtSecret) -> Result<Self, AuthenticatedTransportError> {
let (inner, claims) = InnerTransport::connect(url.clone(), jwt).await?;
Ok(Self { inner_and_claims: Arc::new(RwLock::new((inner, claims))), jwt, url })
}
/// Sends a request using the underlying transport.
///
/// For sending the actual request, this action is delegated down to the underlying transport
/// through Tower's [`tower::Service::call`]. See tower's [`tower::Service`] trait for more
/// information.
fn request(&self, req: RequestPacket) -> TransportFut<'static> {
let this = self.clone();
Box::pin(async move {
let mut inner_and_claims = this.inner_and_claims.write().await;
// shift the iat forward by one second so there is some buffer time
let mut shifted_claims = inner_and_claims.1;
shifted_claims.iat -= 1;
// if the claims are out of date, reset the inner transport
if !shifted_claims.is_within_time_window() {
let (new_inner, new_claims) =
InnerTransport::connect(this.url.clone(), this.jwt).await.map_err(|e| {
TransportError::Transport(TransportErrorKind::Custom(Box::new(e)))
})?;
*inner_and_claims = (new_inner, new_claims);
}
match inner_and_claims.0 {
InnerTransport::Http(ref http) => {
let mut http = http;
http.call(req)
}
InnerTransport::Ws(ref ws) => {
let mut ws = ws;
ws.call(req)
}
InnerTransport::Ipc(ref ipc) => {
let mut ipc = ipc;
// we don't need to recreate the client for IPC
ipc.call(req)
}
}
.await
})
}
}
fn build_auth(secret: JwtSecret) -> eyre::Result<(Authorization, Claims)> {
// Generate claims (iat with current timestamp), this happens by default using the Default trait
// for Claims.
let claims = Claims::default();
let token = secret.encode(&claims)?;
let auth = Authorization::Bearer(token);
Ok((auth, claims))
}
/// This specifies how to connect to an authenticated transport.
#[derive(Clone, Debug)]
pub struct AuthenticatedTransportConnect {
/// The URL to connect to.
url: Url,
/// The JWT secret used to authenticate the transport.
jwt: JwtSecret,
}
impl AuthenticatedTransportConnect {
/// Create a new builder with the given URL.
pub const fn new(url: Url, jwt: JwtSecret) -> Self {
Self { url, jwt }
}
}
impl TransportConnect for AuthenticatedTransportConnect {
type Transport = AuthenticatedTransport;
fn is_local(&self) -> bool {
guess_local_url(&self.url)
}
fn get_transport<'a: 'b, 'b>(&'a self) -> Pbf<'b, Self::Transport, TransportError> {
AuthenticatedTransport::connect(self.url.clone(), self.jwt)
.map(|res| match res {
Ok(transport) => Ok(transport),
Err(err) => {
Err(TransportError::Transport(TransportErrorKind::Custom(Box::new(err))))
}
})
.boxed()
}
}
impl tower::Service<RequestPacket> for AuthenticatedTransport {
type Response = ResponsePacket;
type Error = TransportError;
type Future = TransportFut<'static>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: RequestPacket) -> Self::Future {
self.request(req)
}
}
impl tower::Service<RequestPacket> for &AuthenticatedTransport {
type Response = ResponsePacket;
type Error = TransportError;
type Future = TransportFut<'static>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: RequestPacket) -> Self::Future {
self.request(req)
}
}

View File

@ -0,0 +1,111 @@
//! This contains the [`BenchContext`], which is information that all replay-based benchmarks need.
//! The initialization code is also the same, so this can be shared across benchmark commands.
use crate::{authenticated_transport::AuthenticatedTransportConnect, bench_mode::BenchMode};
use alloy_eips::BlockNumberOrTag;
use alloy_provider::{
network::{AnyNetwork, Ethereum},
Provider, ProviderBuilder, RootProvider,
};
use alloy_rpc_client::ClientBuilder;
use alloy_rpc_types_engine::JwtSecret;
use alloy_transport::BoxTransport;
use alloy_transport_http::Http;
use reqwest::{Client, Url};
use reth_node_core::args::BenchmarkArgs;
use tracing::info;
/// This is intended to be used by benchmarks that replay blocks from an RPC.
///
/// It contains an authenticated provider for engine API queries, a block provider for block
/// queries, a [`BenchMode`] to determine whether the benchmark should run for a closed or open
/// range of blocks, and the next block to fetch.
pub(crate) struct BenchContext {
/// The auth provider used for engine API queries.
pub(crate) auth_provider: RootProvider<BoxTransport, AnyNetwork>,
/// The block provider used for block queries.
pub(crate) block_provider: RootProvider<Http<Client>, Ethereum>,
/// The benchmark mode, which defines whether the benchmark should run for a closed or open
/// range of blocks.
pub(crate) benchmark_mode: BenchMode,
/// The next block to fetch.
pub(crate) next_block: u64,
}
impl BenchContext {
/// This is the initialization code for most benchmarks, taking in a [`BenchmarkArgs`] and
/// returning the providers needed to run a benchmark.
pub(crate) async fn new(bench_args: &BenchmarkArgs, rpc_url: String) -> eyre::Result<Self> {
info!("Running benchmark using data from RPC URL: {}", rpc_url);
// Ensure that output directory is a directory
if let Some(output) = &bench_args.output {
if output.is_file() {
return Err(eyre::eyre!("Output path must be a directory"));
}
}
// set up alloy client for blocks
let block_provider = ProviderBuilder::new().on_http(rpc_url.parse()?);
// If neither `--from` nor `--to` are provided, we will run the benchmark continuously,
// starting at the latest block.
let mut benchmark_mode = BenchMode::new(bench_args.from, bench_args.to)?;
// construct the authenticated provider
let auth_jwt = bench_args.auth_jwtsecret.clone().ok_or_else(|| {
eyre::eyre!("--auth-jwtsecret must be provided for authenticated RPC")
})?;
// fetch jwt from file
//
// the jwt is hex encoded so we will decode it after
let jwt = std::fs::read_to_string(auth_jwt)?;
let jwt = JwtSecret::from_hex(jwt)?;
// get engine url
let auth_url = Url::parse(&bench_args.engine_rpc_url)?;
// construct the authed transport
info!("Connecting to Engine RPC at {} for replay", auth_url);
let auth_transport = AuthenticatedTransportConnect::new(auth_url, jwt);
let client = ClientBuilder::default().connect_boxed(auth_transport).await?;
let auth_provider = RootProvider::<_, AnyNetwork>::new(client);
let first_block = match benchmark_mode {
BenchMode::Continuous => {
// fetch Latest block
block_provider.get_block_by_number(BlockNumberOrTag::Latest, true).await?.unwrap()
}
BenchMode::Range(ref mut range) => {
match range.next() {
Some(block_number) => {
// fetch first block in range
block_provider
.get_block_by_number(block_number.into(), true)
.await?
.unwrap()
}
None => {
return Err(eyre::eyre!(
"Benchmark mode range is empty, please provide a larger range"
));
}
}
}
};
let next_block = match first_block.header.number {
Some(number) => {
// fetch next block
number + 1
}
None => {
// this should never happen
return Err(eyre::eyre!("First block number is None"));
}
};
Ok(Self { auth_provider, block_provider, benchmark_mode, next_block })
}
}

View File

@ -0,0 +1,53 @@
//! `reth benchmark` command. Collection of various benchmarking routines.
use clap::{Parser, Subcommand};
use reth_cli_runner::CliContext;
use reth_node_core::args::LogArgs;
use reth_tracing::FileWorkerGuard;
mod context;
mod new_payload_fcu;
mod new_payload_only;
mod output;
/// `reth bench` command
#[derive(Debug, Parser)]
pub struct BenchmarkCommand {
#[command(subcommand)]
command: Subcommands,
#[command(flatten)]
logs: LogArgs,
}
/// `reth benchmark` subcommands
#[derive(Subcommand, Debug)]
pub enum Subcommands {
/// Benchmark which calls `newPayload`, then `forkchoiceUpdated`.
NewPayloadFcu(new_payload_fcu::Command),
/// Benchmark which only calls subsequent `newPayload` calls.
NewPayloadOnly(new_payload_only::Command),
}
impl BenchmarkCommand {
/// Execute `benchmark` command
pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
// Initialize tracing
let _guard = self.init_tracing()?;
match self.command {
Subcommands::NewPayloadFcu(command) => command.execute(ctx).await,
Subcommands::NewPayloadOnly(command) => command.execute(ctx).await,
}
}
/// Initializes tracing with the configured options.
///
/// If file logging is enabled, this function returns a guard that must be kept alive to ensure
/// that all logs are flushed to disk.
pub fn init_tracing(&self) -> eyre::Result<Option<FileWorkerGuard>> {
let guard = self.logs.init_tracing()?;
Ok(guard)
}
}

View File

@ -0,0 +1,179 @@
//! Runs the `reth bench` command, calling first newPayload for each block, then calling
//! forkchoiceUpdated.
use crate::{
bench::{
context::BenchContext,
output::{
CombinedResult, NewPayloadResult, TotalGasOutput, TotalGasRow, COMBINED_OUTPUT_SUFFIX,
GAS_OUTPUT_SUFFIX,
},
},
valid_payload::{call_forkchoice_updated, call_new_payload},
};
use alloy_provider::Provider;
use alloy_rpc_types_engine::ForkchoiceState;
use clap::Parser;
use csv::Writer;
use reth_cli_runner::CliContext;
use reth_node_core::args::BenchmarkArgs;
use reth_primitives::{Block, B256};
use reth_rpc_types_compat::engine::payload::block_to_payload;
use std::time::Instant;
use tracing::{debug, info};
/// `reth benchmark new-payload-fcu` command
#[derive(Debug, Parser)]
pub struct Command {
/// The RPC url to use for getting data.
#[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
rpc_url: String,
#[command(flatten)]
benchmark: BenchmarkArgs,
}
impl Command {
/// Execute `benchmark new-payload-fcu` command
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
let cloned_args = self.benchmark.clone();
let BenchContext { benchmark_mode, block_provider, auth_provider, mut next_block } =
BenchContext::new(&cloned_args, self.rpc_url).await?;
let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
tokio::task::spawn(async move {
while benchmark_mode.contains(next_block) {
let block_res = block_provider.get_block_by_number(next_block.into(), true).await;
let block = block_res.unwrap().unwrap();
let block = match block.header.hash {
Some(block_hash) => {
// we can reuse the hash in the response
Block::try_from(block).unwrap().seal(block_hash)
}
None => {
// we don't have the hash, so let's just hash it
Block::try_from(block).unwrap().seal_slow()
}
};
let head_block_hash = block.hash();
let safe_block_hash =
block_provider.get_block_by_number((block.number - 32).into(), false);
let finalized_block_hash =
block_provider.get_block_by_number((block.number - 64).into(), false);
let (safe, finalized) = tokio::join!(safe_block_hash, finalized_block_hash,);
let safe_block_hash = safe
.unwrap()
.expect("finalized block exists")
.header
.hash
.expect("finalized block has hash");
let finalized_block_hash = finalized
.unwrap()
.expect("finalized block exists")
.header
.hash
.expect("finalized block has hash");
next_block += 1;
sender
.send((block, head_block_hash, safe_block_hash, finalized_block_hash))
.await
.unwrap();
}
});
// put results in a summary vec so they can be printed at the end
let mut results = Vec::new();
let total_benchmark_duration = Instant::now();
while let Some((block, head, safe, finalized)) = receiver.recv().await {
// just put gas used here
let gas_used = block.header.gas_used;
let block_number = block.header.number;
let versioned_hashes: Vec<B256> =
block.blob_versioned_hashes().into_iter().copied().collect();
let (payload, parent_beacon_block_root) = block_to_payload(block);
debug!(?block_number, "Sending payload",);
// construct fcu to call
let forkchoice_state = ForkchoiceState {
head_block_hash: head,
safe_block_hash: safe,
finalized_block_hash: finalized,
};
let start = Instant::now();
let message_version = call_new_payload(
&auth_provider,
payload,
parent_beacon_block_root,
versioned_hashes,
)
.await?;
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
call_forkchoice_updated(&auth_provider, message_version, forkchoice_state, None)
.await?;
// calculate the total duration and the fcu latency, record
let total_latency = start.elapsed();
let fcu_latency = total_latency - new_payload_result.latency;
let combined_result = CombinedResult { new_payload_result, fcu_latency, total_latency };
// current duration since the start of the benchmark
let current_duration = total_benchmark_duration.elapsed();
// convert gas used to gigagas, then compute gigagas per second
info!(%combined_result);
// record the current result
let gas_row = TotalGasRow { block_number, gas_used, time: current_duration };
results.push((gas_row, combined_result));
}
let (gas_output_results, combined_results): (_, Vec<CombinedResult>) =
results.into_iter().unzip();
// write the csv output to files
if let Some(path) = self.benchmark.output {
// first write the combined results to a file
let output_path = path.join(COMBINED_OUTPUT_SUFFIX);
info!("Writing engine api call latency output to file: {:?}", output_path);
let mut writer = Writer::from_path(output_path)?;
for result in combined_results {
writer.serialize(result)?;
}
writer.flush()?;
// now write the gas output to a file
let output_path = path.join(GAS_OUTPUT_SUFFIX);
info!("Writing total gas output to file: {:?}", output_path);
let mut writer = Writer::from_path(output_path)?;
for row in &gas_output_results {
writer.serialize(row)?;
}
writer.flush()?;
info!("Finished writing benchmark output files to {:?}.", path);
}
// accumulate the results and calculate the overall Ggas/s
let gas_output = TotalGasOutput::new(gas_output_results);
info!(
total_duration=?gas_output.total_duration,
total_gas_used=?gas_output.total_gas_used,
blocks_processed=?gas_output.blocks_processed,
"Total Ggas/s: {:.4}",
gas_output.total_gigagas_per_second()
);
Ok(())
}
}

View File

@ -0,0 +1,136 @@
//! Runs the `reth bench` command, sending only newPayload, without a forkchoiceUpdated call.
use crate::{
bench::{
context::BenchContext,
output::{
NewPayloadResult, TotalGasOutput, TotalGasRow, GAS_OUTPUT_SUFFIX,
NEW_PAYLOAD_OUTPUT_SUFFIX,
},
},
valid_payload::call_new_payload,
};
use alloy_provider::Provider;
use clap::Parser;
use csv::Writer;
use reth_cli_runner::CliContext;
use reth_node_core::args::BenchmarkArgs;
use reth_primitives::{Block, B256};
use reth_rpc_types_compat::engine::payload::block_to_payload;
use std::time::Instant;
use tracing::{debug, info};
/// `reth benchmark new-payload-only` command
#[derive(Debug, Parser)]
pub struct Command {
/// The RPC url to use for getting data.
#[arg(long, value_name = "RPC_URL", verbatim_doc_comment)]
rpc_url: String,
#[command(flatten)]
benchmark: BenchmarkArgs,
}
impl Command {
/// Execute `benchmark new-payload-only` command
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
let cloned_args = self.benchmark.clone();
// TODO: this could be just a function I guess, but destructuring makes the code slightly
// more readable than a 4 element tuple.
let BenchContext { benchmark_mode, block_provider, auth_provider, mut next_block } =
BenchContext::new(&cloned_args, self.rpc_url).await?;
let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
tokio::task::spawn(async move {
while benchmark_mode.contains(next_block) {
let block_res = block_provider.get_block_by_number(next_block.into(), true).await;
let block = block_res.unwrap().unwrap();
let block = match block.header.hash {
Some(block_hash) => {
// we can reuse the hash in the response
Block::try_from(block).unwrap().seal(block_hash)
}
None => {
// we don't have the hash, so let's just hash it
Block::try_from(block).unwrap().seal_slow()
}
};
next_block += 1;
sender.send(block).await.unwrap();
}
});
// put results in a summary vec so they can be printed at the end
let mut results = Vec::new();
let total_benchmark_duration = Instant::now();
while let Some(block) = receiver.recv().await {
// just put gas used here
let gas_used = block.header.gas_used;
let versioned_hashes: Vec<B256> =
block.blob_versioned_hashes().into_iter().copied().collect();
let (payload, parent_beacon_block_root) = block_to_payload(block);
let block_number = payload.block_number();
debug!(
number=?payload.block_number(),
"Sending payload to engine",
);
let start = Instant::now();
call_new_payload(&auth_provider, payload, parent_beacon_block_root, versioned_hashes)
.await?;
let new_payload_result = NewPayloadResult { gas_used, latency: start.elapsed() };
info!(%new_payload_result);
// current duration since the start of the benchmark
let current_duration = total_benchmark_duration.elapsed();
// record the current result
let row = TotalGasRow { block_number, gas_used, time: current_duration };
results.push((row, new_payload_result));
}
let (gas_output_results, new_payload_results): (_, Vec<NewPayloadResult>) =
results.into_iter().unzip();
// write the csv output to files
if let Some(path) = self.benchmark.output {
// first write the new payload results to a file
let output_path = path.join(NEW_PAYLOAD_OUTPUT_SUFFIX);
info!("Writing newPayload call latency output to file: {:?}", output_path);
let mut writer = Writer::from_path(output_path)?;
for result in new_payload_results {
writer.serialize(result)?;
}
writer.flush()?;
// now write the gas output to a file
let output_path = path.join(GAS_OUTPUT_SUFFIX);
info!("Writing total gas output to file: {:?}", output_path);
let mut writer = Writer::from_path(output_path)?;
for row in &gas_output_results {
writer.serialize(row)?;
}
writer.flush()?;
info!("Finished writing benchmark output files to {:?}.", path);
}
// accumulate the results and calculate the overall Ggas/s
let gas_output = TotalGasOutput::new(gas_output_results);
info!(
total_duration=?gas_output.total_duration,
total_gas_used=?gas_output.total_gas_used,
blocks_processed=?gas_output.blocks_processed,
"Total Ggas/s: {:.4}",
gas_output.total_gigagas_per_second()
);
Ok(())
}
}

View File

@ -0,0 +1,216 @@
//! Contains various benchmark output formats, either for logging or for
//! serialization to / from files.
//!
//! This also contains common constants for units, for example [GIGAGAS].
use serde::{ser::SerializeStruct, Serialize};
use std::time::Duration;
/// Represents one Kilogas, or `1_000` gas.
const KILOGAS: u64 = 1_000;
/// Represents one Megagas, or `1_000_000` gas.
const MEGAGAS: u64 = KILOGAS * 1_000;
/// Represents one Gigagas, or `1_000_000_000` gas.
const GIGAGAS: u64 = MEGAGAS * 1_000;
/// This is the suffix for gas output csv files.
pub(crate) const GAS_OUTPUT_SUFFIX: &str = "total_gas.csv";
/// This is the suffix for combined output csv files.
pub(crate) const COMBINED_OUTPUT_SUFFIX: &str = "combined_latency.csv";
/// This is the suffix for new payload output csv files.
pub(crate) const NEW_PAYLOAD_OUTPUT_SUFFIX: &str = "new_payload_latency.csv";
/// This represents the results of a single `newPayload` call in the benchmark, containing the gas
/// used and the `newPayload` latency.
#[derive(Debug)]
pub(crate) struct NewPayloadResult {
/// The gas used in the `newPayload` call.
pub(crate) gas_used: u64,
/// The latency of the `newPayload` call.
pub(crate) latency: Duration,
}
impl NewPayloadResult {
/// Returns the gas per second processed in the `newPayload` call.
pub(crate) fn gas_per_second(&self) -> f64 {
self.gas_used as f64 / self.latency.as_secs_f64()
}
}
impl std::fmt::Display for NewPayloadResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"New payload processed at {:.4} Ggas/s, used {} total gas. Latency: {:?}",
self.gas_per_second() / GIGAGAS as f64,
self.gas_used,
self.latency
)
}
}
/// This is another [`Serialize`] implementation for the [`NewPayloadResult`] struct, serializing
/// the duration as microseconds because the csv writer would fail otherwise.
impl Serialize for NewPayloadResult {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
// convert the time to microseconds
let time = self.latency.as_micros();
let mut state = serializer.serialize_struct("NewPayloadResult", 3)?;
state.serialize_field("gas_used", &self.gas_used)?;
state.serialize_field("latency", &time)?;
state.end()
}
}
/// This represents the combined results of a `newPayload` call and a `forkchoiceUpdated` call in
/// the benchmark, containing the gas used, the `newPayload` latency, and the `forkchoiceUpdated`
/// latency.
#[derive(Debug)]
pub(crate) struct CombinedResult {
/// The `newPayload` result.
pub(crate) new_payload_result: NewPayloadResult,
/// The latency of the `forkchoiceUpdated` call.
pub(crate) fcu_latency: Duration,
/// The latency of both calls combined.
pub(crate) total_latency: Duration,
}
impl CombinedResult {
/// Returns the gas per second, including the `newPayload` _and_ `forkchoiceUpdated` duration.
pub(crate) fn combined_gas_per_second(&self) -> f64 {
self.new_payload_result.gas_used as f64 / self.total_latency.as_secs_f64()
}
}
impl std::fmt::Display for CombinedResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Payload processed at {:.4} Ggas/s, used {} total gas. Combined gas per second: {:.4} Ggas/s. fcu latency: {:?}, newPayload latency: {:?}",
self.new_payload_result.gas_per_second() / GIGAGAS as f64,
self.new_payload_result.gas_used,
self.combined_gas_per_second() / GIGAGAS as f64,
self.fcu_latency,
self.new_payload_result.latency
)
}
}
/// This is a [`Serialize`] implementation for the [`CombinedResult`] struct, serializing the
/// durations as microseconds because the csv writer would fail otherwise.
impl Serialize for CombinedResult {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
// convert the time to microseconds
let fcu_latency = self.fcu_latency.as_micros();
let new_payload_latency = self.new_payload_result.latency.as_micros();
let total_latency = self.total_latency.as_micros();
let mut state = serializer.serialize_struct("CombinedResult", 4)?;
// flatten the new payload result because this is meant for CSV writing
state.serialize_field("gas_used", &self.new_payload_result.gas_used)?;
state.serialize_field("new_payload_latency", &new_payload_latency)?;
state.serialize_field("fcu_latency", &fcu_latency)?;
state.serialize_field("total_latency", &total_latency)?;
state.end()
}
}
/// This represents a row of total gas data in the benchmark.
#[derive(Debug)]
pub(crate) struct TotalGasRow {
/// The block number of the block being processed.
#[allow(dead_code)]
pub(crate) block_number: u64,
/// The total gas used in the block.
pub(crate) gas_used: u64,
/// Time since the start of the benchmark.
pub(crate) time: Duration,
}
/// This represents the aggregated output, meant to show gas per second metrics, of a benchmark run.
#[derive(Debug)]
pub(crate) struct TotalGasOutput {
/// The total gas used in the benchmark.
pub(crate) total_gas_used: u64,
/// The total duration of the benchmark.
pub(crate) total_duration: Duration,
/// The total gas used per second.
pub(crate) total_gas_per_second: f64,
/// The number of blocks processed.
pub(crate) blocks_processed: u64,
}
impl TotalGasOutput {
/// Create a new [`TotalGasOutput`] from a list of [`TotalGasRow`].
pub(crate) fn new(rows: Vec<TotalGasRow>) -> Self {
// the duration is obtained from the last row
let total_duration =
rows.last().map(|row| row.time).expect("the row has at least one element");
let blocks_processed = rows.len() as u64;
let total_gas_used: u64 = rows.into_iter().map(|row| row.gas_used).sum();
let total_gas_per_second = total_gas_used as f64 / total_duration.as_secs_f64();
Self { total_gas_used, total_duration, total_gas_per_second, blocks_processed }
}
/// Return the total gigagas per second.
pub(crate) fn total_gigagas_per_second(&self) -> f64 {
self.total_gas_per_second / GIGAGAS as f64
}
}
/// This serializes the `time` field of the [`TotalGasRow`] to microseconds.
///
/// This is essentially just for the csv writer, which would have headers
impl Serialize for TotalGasRow {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
// convert the time to microseconds
let time = self.time.as_micros();
let mut state = serializer.serialize_struct("TotalGasRow", 3)?;
state.serialize_field("block_number", &self.block_number)?;
state.serialize_field("gas_used", &self.gas_used)?;
state.serialize_field("time", &time)?;
state.end()
}
}
#[cfg(test)]
mod tests {
use super::*;
use csv::Writer;
use std::io::BufRead;
#[test]
fn test_write_total_gas_row_csv() {
let row = TotalGasRow { block_number: 1, gas_used: 1_000, time: Duration::from_secs(1) };
let mut writer = Writer::from_writer(vec![]);
writer.serialize(row).unwrap();
let result = writer.into_inner().unwrap();
// parse into Lines
let mut result = result.as_slice().lines();
// assert header
let expected_first_line = "block_number,gas_used,time";
let first_line = result.next().unwrap().unwrap();
assert_eq!(first_line, expected_first_line);
let expected_second_line = "1,1000,1000000";
let second_line = result.next().unwrap().unwrap();
assert_eq!(second_line, expected_second_line);
}
}

View File

@ -0,0 +1,37 @@
//! The benchmark mode defines whether the benchmark should run for a closed or open range of
//! blocks.
use std::ops::RangeInclusive;
/// Whether or not the benchmark should run as a continuous stream of payloads.
#[derive(Debug, PartialEq, Eq)]
pub enum BenchMode {
// TODO: just include the start block in `Continuous`
/// Run the benchmark as a continuous stream of payloads, until the benchmark is interrupted.
Continuous,
/// Run the benchmark for a specific range of blocks.
Range(RangeInclusive<u64>),
}
impl BenchMode {
/// Check if the block number is in the range
pub fn contains(&self, block_number: u64) -> bool {
match self {
Self::Continuous => true,
Self::Range(range) => range.contains(&block_number),
}
}
/// Create a [`BenchMode`] from optional `from` and `to` fields.
pub fn new(from: Option<u64>, to: Option<u64>) -> Result<Self, eyre::Error> {
// If neither `--from` nor `--to` are provided, we will run the benchmark continuously,
// starting at the latest block.
match (from, to) {
(Some(from), Some(to)) => Ok(Self::Range(from..=to)),
(None, None) => Ok(Self::Continuous),
_ => {
// both or neither are allowed, everything else is ambiguous
Err(eyre::eyre!("`from` and `to` must be provided together, or not at all."))
}
}
}
}

View File

@ -0,0 +1,34 @@
//! # reth-benchmark
//!
//! This is a tool that converts existing blocks into a stream of blocks for benchmarking purposes.
//! These blocks are then fed into reth as a stream of execution payloads.
// We use jemalloc for performance reasons.
#[cfg(all(feature = "jemalloc", unix))]
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
pub mod authenticated_transport;
pub mod bench;
pub mod bench_mode;
pub mod valid_payload;
use bench::BenchmarkCommand;
use clap::Parser;
use reth_cli_runner::CliRunner;
fn main() {
// Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided.
if std::env::var_os("RUST_BACKTRACE").is_none() {
std::env::set_var("RUST_BACKTRACE", "1");
}
// Run until either exit or sigint or sigterm
let runner = CliRunner::default();
runner
.run_command_until_exit(|ctx| {
let command = BenchmarkCommand::parse();
command.execute(ctx)
})
.unwrap();
}

View File

@ -0,0 +1,276 @@
//! This is an extension trait for any provider that implements the engine API, to wait for a VALID
//! response. This is useful for benchmarking, as it allows us to wait for a payload to be valid
//! before sending additional calls.
use alloy_provider::{ext::EngineApi, Network};
use alloy_rpc_types_engine::{
ExecutionPayloadInputV2, ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
PayloadStatusEnum,
};
use alloy_transport::{Transport, TransportResult};
use reth_node_api::EngineApiMessageVersion;
use reth_primitives::B256;
use reth_rpc_types::{ExecutionPayload, ExecutionPayloadV1, ExecutionPayloadV3};
use tracing::error;
/// An extension trait for providers that implement the engine API, to wait for a VALID response.
#[async_trait::async_trait]
pub trait EngineApiValidWaitExt<N, T>: Send + Sync {
/// Calls `engine_newPayloadV1` with the given [ExecutionPayloadV1], and waits until the
/// response is VALID.
async fn new_payload_v1_wait(
&self,
payload: ExecutionPayloadV1,
) -> TransportResult<PayloadStatus>;
/// Calls `engine_newPayloadV2` with the given [ExecutionPayloadInputV2], and waits until the
/// response is VALID.
async fn new_payload_v2_wait(
&self,
payload: ExecutionPayloadInputV2,
) -> TransportResult<PayloadStatus>;
/// Calls `engine_newPayloadV3` with the given [ExecutionPayloadV3], parent beacon block root,
/// and versioned hashes, and waits until the response is VALID.
async fn new_payload_v3_wait(
&self,
payload: ExecutionPayloadV3,
versioned_hashes: Vec<B256>,
parent_beacon_block_root: B256,
) -> TransportResult<PayloadStatus>;
/// Calls `engine_forkChoiceUpdatedV1` with the given [ForkchoiceState] and optional
/// [PayloadAttributes], and waits until the response is VALID.
async fn fork_choice_updated_v1_wait(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated>;
/// Calls `engine_forkChoiceUpdatedV2` with the given [ForkchoiceState] and optional
/// [PayloadAttributes], and waits until the response is VALID.
async fn fork_choice_updated_v2_wait(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated>;
/// Calls `engine_forkChoiceUpdatedV3` with the given [ForkchoiceState] and optional
/// [PayloadAttributes], and waits until the response is VALID.
async fn fork_choice_updated_v3_wait(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated>;
}
#[async_trait::async_trait]
impl<T, N, P> EngineApiValidWaitExt<N, T> for P
where
N: Network,
T: Transport + Clone,
P: EngineApi<N, T>,
{
async fn new_payload_v1_wait(
&self,
payload: ExecutionPayloadV1,
) -> TransportResult<PayloadStatus> {
let mut status = self.new_payload_v1(payload.clone()).await?;
while status.status != PayloadStatusEnum::Valid {
if status.status.is_invalid() {
error!(?status, ?payload, "Invalid newPayloadV1",);
panic!("Invalid newPayloadV1: {status:?}");
}
status = self.new_payload_v1(payload.clone()).await?;
}
Ok(status)
}
async fn new_payload_v2_wait(
&self,
payload: ExecutionPayloadInputV2,
) -> TransportResult<PayloadStatus> {
let mut status = self.new_payload_v2(payload.clone()).await?;
while status.status != PayloadStatusEnum::Valid {
if status.status.is_invalid() {
error!(?status, ?payload, "Invalid newPayloadV2",);
panic!("Invalid newPayloadV2: {status:?}");
}
status = self.new_payload_v2(payload.clone()).await?;
}
Ok(status)
}
async fn new_payload_v3_wait(
&self,
payload: ExecutionPayloadV3,
versioned_hashes: Vec<B256>,
parent_beacon_block_root: B256,
) -> TransportResult<PayloadStatus> {
let mut status = self
.new_payload_v3(payload.clone(), versioned_hashes.clone(), parent_beacon_block_root)
.await?;
while status.status != PayloadStatusEnum::Valid {
if status.status.is_invalid() {
error!(
?status,
?payload,
?versioned_hashes,
?parent_beacon_block_root,
"Invalid newPayloadV3",
);
panic!("Invalid newPayloadV3: {status:?}");
}
status = self
.new_payload_v3(payload.clone(), versioned_hashes.clone(), parent_beacon_block_root)
.await?;
}
Ok(status)
}
async fn fork_choice_updated_v1_wait(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
let mut status =
self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
while status.payload_status.status != PayloadStatusEnum::Valid {
if status.payload_status.status.is_invalid() {
error!(
?status,
?fork_choice_state,
?payload_attributes,
"Invalid forkchoiceUpdatedV1 message",
);
panic!("Invalid forkchoiceUpdatedV1: {status:?}");
}
status =
self.fork_choice_updated_v1(fork_choice_state, payload_attributes.clone()).await?;
}
Ok(status)
}
async fn fork_choice_updated_v2_wait(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
let mut status =
self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
while status.payload_status.status != PayloadStatusEnum::Valid {
if status.payload_status.status.is_invalid() {
error!(
?status,
?fork_choice_state,
?payload_attributes,
"Invalid forkchoiceUpdatedV2 message",
);
panic!("Invalid forkchoiceUpdatedV2: {status:?}");
}
status =
self.fork_choice_updated_v2(fork_choice_state, payload_attributes.clone()).await?;
}
Ok(status)
}
async fn fork_choice_updated_v3_wait(
&self,
fork_choice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
let mut status =
self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
while status.payload_status.status != PayloadStatusEnum::Valid {
if status.payload_status.status.is_invalid() {
error!(
?status,
?fork_choice_state,
?payload_attributes,
"Invalid forkchoiceUpdatedV3 message",
);
panic!("Invalid forkchoiceUpdatedV3: {status:?}");
}
status =
self.fork_choice_updated_v3(fork_choice_state, payload_attributes.clone()).await?;
}
Ok(status)
}
}
/// Calls the correct `engine_newPayload` method depending on the given [`ExecutionPayload`] and its
/// versioned variant. Returns the [`EngineApiMessageVersion`] depending on the payload's version.
///
/// # Panics
/// If the given payload is a V3 payload, but a parent beacon block root is provided as `None`.
pub(crate) async fn call_new_payload<N, T, P: EngineApiValidWaitExt<N, T>>(
provider: P,
payload: ExecutionPayload,
parent_beacon_block_root: Option<B256>,
versioned_hashes: Vec<B256>,
) -> TransportResult<EngineApiMessageVersion> {
match payload {
ExecutionPayload::V4(_payload) => {
todo!("V4 payloads not supported yet");
// auth_provider
// .new_payload_v4_wait(payload, versioned_hashes, parent_beacon_block_root, ...)
// .await?;
//
// Ok(EngineApiMessageVersion::V4)
}
ExecutionPayload::V3(payload) => {
// We expect the caller
let parent_beacon_block_root = parent_beacon_block_root
.expect("parent_beacon_block_root is required for V3 payloads");
provider
.new_payload_v3_wait(payload, versioned_hashes, parent_beacon_block_root)
.await?;
Ok(EngineApiMessageVersion::V3)
}
ExecutionPayload::V2(payload) => {
let input = ExecutionPayloadInputV2 {
execution_payload: payload.payload_inner,
withdrawals: Some(payload.withdrawals),
};
provider.new_payload_v2_wait(input).await?;
Ok(EngineApiMessageVersion::V2)
}
ExecutionPayload::V1(payload) => {
provider.new_payload_v1_wait(payload).await?;
Ok(EngineApiMessageVersion::V1)
}
}
}
/// Calls the correct `engine_forkchoiceUpdated` method depending on the given
/// `EngineApiMessageVersion`, using the provided forkchoice state and payload attributes for the
/// actual engine api message call.
pub(crate) async fn call_forkchoice_updated<N, T, P: EngineApiValidWaitExt<N, T>>(
provider: P,
message_version: EngineApiMessageVersion,
forkchoice_state: ForkchoiceState,
payload_attributes: Option<PayloadAttributes>,
) -> TransportResult<ForkchoiceUpdated> {
match message_version {
EngineApiMessageVersion::V4 => todo!("V4 payloads not supported yet"),
EngineApiMessageVersion::V3 => {
provider.fork_choice_updated_v3_wait(forkchoice_state, payload_attributes).await
}
EngineApiMessageVersion::V2 => {
provider.fork_choice_updated_v2_wait(forkchoice_state, payload_attributes).await
}
EngineApiMessageVersion::V1 => {
provider.fork_choice_updated_v1_wait(forkchoice_state, payload_attributes).await
}
}
}

View File

@ -0,0 +1,62 @@
//! clap [Args](clap::Args) for benchmark configuration
use clap::Args;
use std::path::PathBuf;
/// Parameters for benchmark configuration
#[derive(Debug, Args, PartialEq, Eq, Default, Clone)]
#[command(next_help_heading = "Benchmark")]
pub struct BenchmarkArgs {
/// Run the benchmark from a specific block.
#[arg(long, verbatim_doc_comment)]
pub from: Option<u64>,
/// Run the benchmark to a specific block.
#[arg(long, verbatim_doc_comment)]
pub to: Option<u64>,
/// Path to a JWT secret to use for the authenticated engine-API RPC server.
///
/// This will perform JWT authentication for all requests to the given engine RPC url.
///
/// If no path is provided, a secret will be generated and stored in the datadir under
/// `<DIR>/<CHAIN_ID>/jwt.hex`. For mainnet this would be `~/.reth/mainnet/jwt.hex` by default.
#[arg(long = "jwtsecret", value_name = "PATH", global = true, required = false)]
pub auth_jwtsecret: Option<PathBuf>,
/// The RPC url to use for sending engine requests.
#[arg(
long,
value_name = "ENGINE_RPC_URL",
verbatim_doc_comment,
default_value = "http://localhost:8551"
)]
pub engine_rpc_url: String,
/// The path to the output directory for granular benchmark results.
#[arg(long, short, value_name = "BENCHMARK_OUTPUT", verbatim_doc_comment)]
pub output: Option<PathBuf>,
}
#[cfg(test)]
mod tests {
use super::*;
use clap::Parser;
/// A helper type to parse Args more easily
#[derive(Parser)]
struct CommandParser<T: Args> {
#[command(flatten)]
args: T,
}
#[test]
fn test_parse_benchmark_args() {
let default_args = BenchmarkArgs {
engine_rpc_url: "http://localhost:8551".to_string(),
..Default::default()
};
let args = CommandParser::<BenchmarkArgs>::parse_from(["reth-bench"]).args;
assert_eq!(args, default_args);
}
}

View File

@ -55,6 +55,10 @@ pub use pruning::PruningArgs;
mod datadir_args;
pub use datadir_args::DatadirArgs;
/// BenchmarkArgs struct for configuring the benchmark to run
mod benchmark_args;
pub use benchmark_args::BenchmarkArgs;
pub mod utils;
pub mod types;