From 67cfb06fbbfb06e5d74db3e56cea9abd898a0730 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Fri, 5 Apr 2024 18:02:05 +0200 Subject: [PATCH] chore(deps): migrate to jsonrpsee 0.22 (#5894) Co-authored-by: DaniPopes <57450786+DaniPopes@users.noreply.github.com> --- Cargo.lock | 212 ++++++++++---- Cargo.toml | 20 +- crates/node-core/src/args/rpc_server_args.rs | 4 +- crates/node-core/src/cli/config.rs | 6 +- crates/rpc/ipc/src/server/ipc.rs | 129 +++------ crates/rpc/ipc/src/server/mod.rs | 133 +++++---- crates/rpc/rpc-builder/Cargo.toml | 1 + crates/rpc/rpc-builder/src/auth.rs | 37 ++- crates/rpc/rpc-builder/src/error.rs | 42 ++- crates/rpc/rpc-builder/src/lib.rs | 109 ++++---- crates/rpc/rpc-builder/src/metrics.rs | 262 ++++++++++++------ crates/rpc/rpc-builder/tests/it/http.rs | 5 +- crates/rpc/rpc-builder/tests/it/serde.rs | 4 +- crates/rpc/rpc-engine-api/src/error.rs | 6 - crates/rpc/rpc-testing-util/src/debug.rs | 14 +- crates/rpc/rpc-testing-util/src/trace.rs | 3 +- crates/rpc/rpc-types-compat/src/block.rs | 2 +- .../rpc-types-compat/src/transaction/mod.rs | 47 ++-- .../rpc-types-compat/src/transaction/typed.rs | 8 +- .../rpc-types/src/eth/transaction/typed.rs | 10 +- crates/rpc/rpc/src/eth/api/call.rs | 2 +- crates/rpc/rpc/src/eth/api/fee_history.rs | 10 +- crates/rpc/rpc/src/eth/api/fees.rs | 32 +-- crates/rpc/rpc/src/eth/api/server.rs | 23 +- crates/rpc/rpc/src/eth/api/transactions.rs | 76 ++--- crates/rpc/rpc/src/eth/error.rs | 10 +- crates/rpc/rpc/src/eth/pubsub.rs | 43 ++- crates/rpc/rpc/src/eth/revm_utils.rs | 10 +- crates/rpc/rpc/src/layers/auth_layer.rs | 4 +- 29 files changed, 715 insertions(+), 549 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a91daa53a..53dcb1bc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -146,12 +146,13 @@ dependencies = [ [[package]] name = "alloy-consensus" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=8c9dd0a#8c9dd0ae0a0f12eb81b5afe75a9b55ea4ad3abf4" +source = "git+https://github.com/alloy-rs/alloy?rev=17633df#17633df04920e07cd7afbd6ee825fcde677fa1d1" dependencies = [ "alloy-eips", "alloy-primitives", "alloy-rlp", "alloy-serde", + "c-kzg", "serde", "sha2 0.10.8", ] @@ -177,22 +178,25 @@ dependencies = [ [[package]] name = "alloy-eips" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=8c9dd0a#8c9dd0ae0a0f12eb81b5afe75a9b55ea4ad3abf4" +source = "git+https://github.com/alloy-rs/alloy?rev=17633df#17633df04920e07cd7afbd6ee825fcde677fa1d1" dependencies = [ "alloy-primitives", "alloy-rlp", "alloy-serde", + "arbitrary", "c-kzg", "ethereum_ssz", "ethereum_ssz_derive", "once_cell", + "proptest", + "proptest-derive", "serde", ] [[package]] name = "alloy-genesis" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=8c9dd0a#8c9dd0ae0a0f12eb81b5afe75a9b55ea4ad3abf4" +source = "git+https://github.com/alloy-rs/alloy?rev=17633df#17633df04920e07cd7afbd6ee825fcde677fa1d1" dependencies = [ "alloy-primitives", "alloy-serde", @@ -214,7 +218,7 @@ dependencies = [ [[package]] name = "alloy-node-bindings" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=8c9dd0a#8c9dd0ae0a0f12eb81b5afe75a9b55ea4ad3abf4" +source = "git+https://github.com/alloy-rs/alloy?rev=17633df#17633df04920e07cd7afbd6ee825fcde677fa1d1" dependencies = [ "alloy-genesis", "alloy-primitives", @@ -278,7 +282,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=8c9dd0a#8c9dd0ae0a0f12eb81b5afe75a9b55ea4ad3abf4" +source = "git+https://github.com/alloy-rs/alloy?rev=17633df#17633df04920e07cd7afbd6ee825fcde677fa1d1" dependencies = [ "alloy-consensus", "alloy-eips", @@ -300,7 +304,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types-engine" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=8c9dd0a#8c9dd0ae0a0f12eb81b5afe75a9b55ea4ad3abf4" +source = "git+https://github.com/alloy-rs/alloy?rev=17633df#17633df04920e07cd7afbd6ee825fcde677fa1d1" dependencies = [ "alloy-consensus", "alloy-eips", @@ -318,7 +322,7 @@ dependencies = [ [[package]] name = "alloy-rpc-types-trace" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=8c9dd0a#8c9dd0ae0a0f12eb81b5afe75a9b55ea4ad3abf4" +source = "git+https://github.com/alloy-rs/alloy?rev=17633df#17633df04920e07cd7afbd6ee825fcde677fa1d1" dependencies = [ "alloy-primitives", "alloy-rpc-types", @@ -330,7 +334,7 @@ dependencies = [ [[package]] name = "alloy-serde" version = "0.1.0" -source = "git+https://github.com/alloy-rs/alloy?rev=8c9dd0a#8c9dd0ae0a0f12eb81b5afe75a9b55ea4ad3abf4" +source = "git+https://github.com/alloy-rs/alloy?rev=17633df#17633df04920e07cd7afbd6ee825fcde677fa1d1" dependencies = [ "alloy-primitives", "serde", @@ -664,7 +668,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" dependencies = [ "concurrent-queue", - "event-listener", + "event-listener 2.5.3", "futures-core", ] @@ -686,11 +690,13 @@ dependencies = [ [[package]] name = "async-lock" -version = "2.8.0" +version = "3.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b" dependencies = [ - "event-listener", + "event-listener 4.0.3", + "event-listener-strategy", + "pin-project-lite", ] [[package]] @@ -2618,6 +2624,27 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "event-listener" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +dependencies = [ + "event-listener 4.0.3", + "pin-project-lite", +] + [[package]] name = "examples" version = "0.0.0" @@ -2969,9 +2996,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "gloo-net" -version = "0.4.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ac9e8288ae2c632fa9f8657ac70bfe38a1530f345282d7ba66a1f70b72b7dc4" +checksum = "43aaa242d1239a8822c15c645f02166398da4f8b5c4bae795c1f5b44e9eee173" dependencies = [ "futures-channel", "futures-core", @@ -3026,9 +3053,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.25" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fbd2820c5e49886948654ab546d0688ff24530286bdcf8fca3cefb16d4618eb" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ "bytes", "fnv", @@ -3295,7 +3322,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.6", + "socket2 0.4.10", "tokio", "tower-service", "tracing", @@ -3312,10 +3339,10 @@ dependencies = [ "http", "hyper", "log", - "rustls", - "rustls-native-certs", + "rustls 0.21.10", + "rustls-native-certs 0.6.3", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", ] [[package]] @@ -3761,9 +3788,9 @@ dependencies = [ [[package]] name = "jsonrpsee" -version = "0.20.3" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "affdc52f7596ccb2d7645231fc6163bb314630c989b64998f3699a28b4d5d4dc" +checksum = "3cdbb7cb6f3ba28f5b212dd250ab4483105efc3e381f5c8bb90340f14f0a2cc3" dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", @@ -3779,9 +3806,9 @@ dependencies = [ [[package]] name = "jsonrpsee-client-transport" -version = "0.20.3" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b005c793122d03217da09af68ba9383363caa950b90d3436106df8cabce935" +checksum = "9ab2e14e727d2faf388c99d9ca5210566ed3b044f07d92c29c3611718d178380" dependencies = [ "futures-channel", "futures-util", @@ -3789,22 +3816,23 @@ dependencies = [ "http", "jsonrpsee-core", "pin-project", - "rustls-native-certs", + "rustls-native-certs 0.7.0", + "rustls-pki-types", "soketto", "thiserror", "tokio", - "tokio-rustls", + "tokio-rustls 0.25.0", "tokio-util", "tracing", "url", - "webpki-roots", + "webpki-roots 0.26.1", ] [[package]] name = "jsonrpsee-core" -version = "0.20.3" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da2327ba8df2fdbd5e897e2b5ed25ce7f299d345b9736b6828814c3dbd1fd47b" +checksum = "71962a1c49af43adf81d337e4ebc93f3c915faf6eccaa14d74e255107dfd7723" dependencies = [ "anyhow", "async-lock", @@ -3815,22 +3843,23 @@ dependencies = [ "hyper", "jsonrpsee-types", "parking_lot 0.12.1", + "pin-project", "rand 0.8.5", "rustc-hash", "serde", "serde_json", - "soketto", "thiserror", "tokio", + "tokio-stream", "tracing", "wasm-bindgen-futures", ] [[package]] name = "jsonrpsee-http-client" -version = "0.20.3" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f80c17f62c7653ce767e3d7288b793dfec920f97067ceb189ebdd3570f2bc20" +checksum = "8c13987da51270bda2c1c9b40c19be0fe9b225c7a0553963d8f17e683a50ce84" dependencies = [ "async-trait", "hyper", @@ -3848,28 +3877,29 @@ dependencies = [ [[package]] name = "jsonrpsee-proc-macros" -version = "0.20.3" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29110019693a4fa2dbda04876499d098fa16d70eba06b1e6e2b3f1b251419515" +checksum = "1d7c2416c400c94b2e864603c51a5bbd5b103386da1f5e58cbf01e7bb3ef0833" dependencies = [ "heck 0.4.1", - "proc-macro-crate 1.3.1", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.57", ] [[package]] name = "jsonrpsee-server" -version = "0.20.3" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "82c39a00449c9ef3f50b84fc00fc4acba20ef8f559f07902244abf4c15c5ab9c" +checksum = "4882e640e70c2553e3d9487e6f4dddd5fd11918f25e40fa45218f9fe29ed2152" dependencies = [ "futures-util", "http", "hyper", "jsonrpsee-core", "jsonrpsee-types", + "pin-project", "route-recognizer", "serde", "serde_json", @@ -3884,23 +3914,22 @@ dependencies = [ [[package]] name = "jsonrpsee-types" -version = "0.20.3" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5be0be325642e850ed0bdff426674d2e66b2b7117c9be23a7caef68a2902b7d9" +checksum = "1e53c72de6cd2ad6ac1aa6e848206ef8b736f92ed02354959130373dfa5b3cbd" dependencies = [ "anyhow", "beef", "serde", "serde_json", "thiserror", - "tracing", ] [[package]] name = "jsonrpsee-wasm-client" -version = "0.20.3" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c7cbb3447cf14fd4d2f407c3cc96e6c9634d5440aa1fbed868a31f3c02b27f0" +checksum = "8ae2c3f2411052b4a831cb7a34cd1498e0d8b9309bd49fca67567634ff64023d" dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", @@ -3909,9 +3938,9 @@ dependencies = [ [[package]] name = "jsonrpsee-ws-client" -version = "0.20.3" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bca9cb3933ccae417eb6b08c3448eb1cb46e39834e5b503e395e5e5bd08546c0" +checksum = "c8a07ab8da9a283b906f6735ddd17d3680158bb72259e853441d1dd0167079ec" dependencies = [ "http", "jsonrpsee-client-transport", @@ -4775,7 +4804,7 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "681030a937600a36906c185595136d26abfebb4aa9c65701cefcaf8578bb982b" dependencies = [ - "proc-macro-crate 3.1.0", + "proc-macro-crate 1.3.1", "proc-macro2", "quote", "syn 2.0.57", @@ -5737,15 +5766,15 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls", - "rustls-pemfile", + "rustls 0.21.10", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", "sync_wrapper", "system-configuration", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", "tokio-util", "tower-service", "url", @@ -5753,7 +5782,7 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots", + "webpki-roots 0.25.4", "winreg", ] @@ -6912,6 +6941,7 @@ dependencies = [ "hyper", "jsonrpsee", "metrics", + "pin-project", "reth-beacon-consensus", "reth-interfaces", "reth-ipc", @@ -7215,7 +7245,7 @@ dependencies = [ [[package]] name = "revm-inspectors" version = "0.1.0" -source = "git+https://github.com/paradigmxyz/evm-inspectors?rev=0ef5814#0ef5814ed4df2a1fab17921afb69eccb05f6ef16" +source = "git+https://github.com/paradigmxyz/evm-inspectors?rev=c34b770#c34b77089b508bea7221aaa4bfeb33d444a62ea3" dependencies = [ "alloy-primitives", "alloy-rpc-types", @@ -7494,10 +7524,24 @@ checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" dependencies = [ "log", "ring 0.17.8", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99008d7ad0bbbea527ec27bddbc0e432c5b87d8175178cee68d2eec9c4a1813c" +dependencies = [ + "log", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki 0.102.2", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" version = "0.6.3" @@ -7505,7 +7549,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework", +] + +[[package]] +name = "rustls-native-certs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.1.1", + "rustls-pki-types", "schannel", "security-framework", ] @@ -7519,6 +7576,22 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f48172685e6ff52a556baa527774f61fcaa884f59daf3375c62a3f1cd2549dab" +dependencies = [ + "base64 0.21.7", + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecd36cc4259e3e4514335c4a138c6b43171a8d61d8f5c9348f9fc7529416f247" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -7529,6 +7602,17 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "rustls-webpki" +version = "0.102.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610" +dependencies = [ + "ring 0.17.8", + "rustls-pki-types", + "untrusted 0.9.0", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -8635,7 +8719,18 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.10", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f" +dependencies = [ + "rustls 0.22.3", + "rustls-pki-types", "tokio", ] @@ -9413,6 +9508,15 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "webpki-roots" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3de34ae270483955a94f4b21bdaaeb83d508bb84a01435f393818edb0012009" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "widestring" version = "1.0.2" diff --git a/Cargo.toml b/Cargo.toml index 5755c5274..c0a568964 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -254,7 +254,7 @@ reth-trie-parallel = { path = "crates/trie-parallel" } # revm revm = { version = "8.0.0", features = ["std", "secp256k1"], default-features = false } revm-primitives = { version = "3.1.0", features = ["std"], default-features = false } -revm-inspectors = { git = "https://github.com/paradigmxyz/evm-inspectors", rev = "0ef5814" } +revm-inspectors = { git = "https://github.com/paradigmxyz/evm-inspectors", rev = "c34b770" } # eth alloy-chains = { version = "0.1", feature = ["serde", "rlp", "arbitrary"] } @@ -263,12 +263,12 @@ alloy-dyn-abi = "0.7.0" alloy-sol-types = "0.7.0" alloy-rlp = "0.3.4" alloy-trie = "0.3" -alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "8c9dd0a" } -alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "8c9dd0a" } -alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "8c9dd0a" } -alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "8c9dd0a" } -alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy", rev = "8c9dd0a" } -alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "8c9dd0a" } +alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "17633df" } +alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "17633df" } +alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "17633df" } +alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "17633df" } +alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy", rev = "17633df" } +alloy-eips = { git = "https://github.com/alloy-rs/alloy", rev = "17633df" } # TODO: Remove ethers-core = { version = "2.0.14", default-features = false } @@ -328,9 +328,9 @@ discv5 = { git = "https://github.com/sigp/discv5", rev = "04ac004" } igd-next = "0.14.3" # rpc -jsonrpsee = "0.20" -jsonrpsee-core = "0.20" -jsonrpsee-types = "0.20" +jsonrpsee = "0.22" +jsonrpsee-core = "0.22" +jsonrpsee-types = "0.22" # crypto secp256k1 = { version = "0.27.0", default-features = false, features = [ diff --git a/crates/node-core/src/args/rpc_server_args.rs b/crates/node-core/src/args/rpc_server_args.rs index fd8374301..f659c5279 100644 --- a/crates/node-core/src/args/rpc_server_args.rs +++ b/crates/node-core/src/args/rpc_server_args.rs @@ -27,7 +27,7 @@ use reth_rpc_builder::{ auth::{AuthServerConfig, AuthServerHandle}, constants, error::RpcError, - EthConfig, IpcServerBuilder, RethRpcModule, RpcModuleConfig, RpcModuleSelection, + EthConfig, Identity, IpcServerBuilder, RethRpcModule, RpcModuleConfig, RpcModuleSelection, RpcServerConfig, RpcServerHandle, ServerBuilder, TransportRpcModuleConfig, }; use reth_rpc_engine_api::EngineApi; @@ -414,7 +414,7 @@ impl RethRpcConfig for RpcServerArgs { config } - fn http_ws_server_builder(&self) -> ServerBuilder { + fn http_ws_server_builder(&self) -> ServerBuilder { ServerBuilder::new() .max_connections(self.rpc_max_connections.get()) .max_request_body_size(self.rpc_max_request_size_bytes()) diff --git a/crates/node-core/src/cli/config.rs b/crates/node-core/src/cli/config.rs index e1dd34a46..988ef34d5 100644 --- a/crates/node-core/src/cli/config.rs +++ b/crates/node-core/src/cli/config.rs @@ -7,8 +7,8 @@ use reth_rpc::{ JwtError, JwtSecret, }; use reth_rpc_builder::{ - auth::AuthServerConfig, error::RpcError, EthConfig, IpcServerBuilder, RpcServerConfig, - ServerBuilder, TransportRpcModuleConfig, + auth::AuthServerConfig, error::RpcError, EthConfig, Identity, IpcServerBuilder, + RpcServerConfig, ServerBuilder, TransportRpcModuleConfig, }; use reth_transaction_pool::PoolConfig; use std::{borrow::Cow, path::PathBuf, time::Duration}; @@ -46,7 +46,7 @@ pub trait RethRpcConfig { fn transport_rpc_module_config(&self) -> TransportRpcModuleConfig; /// Returns the default server builder for http/ws - fn http_ws_server_builder(&self) -> ServerBuilder; + fn http_ws_server_builder(&self) -> ServerBuilder; /// Returns the default ipc server builder fn ipc_server_builder(&self) -> IpcServerBuilder; diff --git a/crates/rpc/ipc/src/server/ipc.rs b/crates/rpc/ipc/src/server/ipc.rs index 3bdeded98..8ce4502a2 100644 --- a/crates/rpc/ipc/src/server/ipc.rs +++ b/crates/rpc/ipc/src/server/ipc.rs @@ -2,25 +2,21 @@ use futures::{stream::FuturesOrdered, StreamExt}; use jsonrpsee::{ + batch_response_error, core::{ - server::helpers::{prepare_error, BatchResponseBuilder, MethodResponse}, - tracing::{rx_log_from_json, tx_log_from_str}, + server::helpers::prepare_error, + tracing::server::{rx_log_from_json, tx_log_from_str}, JsonRawValue, }, - helpers::{batch_response_error, MethodResponseResult}, - server::{ - logger, - logger::{Logger, TransportProtocol}, - IdProvider, - }, + server::IdProvider, types::{ error::{reject_too_many_subscriptions, ErrorCode}, ErrorObject, Id, InvalidRequest, Notification, Params, Request, }, - BoundedSubscriptions, CallOrSubscription, MethodCallback, MethodSink, Methods, - SubscriptionState, + BatchResponseBuilder, BoundedSubscriptions, CallOrSubscription, MethodCallback, MethodResponse, + MethodSink, Methods, ResponsePayload, SubscriptionState, }; -use std::sync::Arc; +use std::{sync::Arc, time::Instant}; use tokio::sync::OwnedSemaphorePermit; use tokio_util::either::Either; use tracing::instrument; @@ -28,21 +24,20 @@ use tracing::instrument; type Notif<'a> = Notification<'a, Option<&'a JsonRawValue>>; #[derive(Debug, Clone)] -pub(crate) struct Batch<'a, L: Logger> { +pub(crate) struct Batch<'a> { data: Vec, - call: CallData<'a, L>, + call: CallData<'a>, } #[derive(Debug, Clone)] -pub(crate) struct CallData<'a, L: Logger> { +pub(crate) struct CallData<'a> { conn_id: usize, - logger: &'a L, methods: &'a Methods, id_provider: &'a dyn IdProvider, sink: &'a MethodSink, max_response_body_size: u32, max_log_length: u32, - request_start: L::Instant, + request_start: Instant, bounded_subscriptions: BoundedSubscriptions, } @@ -50,10 +45,7 @@ pub(crate) struct CallData<'a, L: Logger> { // request in the batch and read the results off of a new channel, `rx_batch`, and then send the // complete batch response back to the client over `tx`. #[instrument(name = "batch", skip(b), level = "TRACE")] -pub(crate) async fn process_batch_request(b: Batch<'_, L>) -> Option -where - L: Logger, -{ +pub(crate) async fn process_batch_request(b: Batch<'_>) -> Option { let Batch { data, call } = b; if let Ok(batch) = serde_json::from_slice::>(&data) { @@ -88,23 +80,24 @@ where while let Some(response) = pending_calls.next().await { if let Err(too_large) = batch_response.append(&response) { - return Some(too_large) + return Some(too_large.to_result()) } } if got_notif && batch_response.is_empty() { None } else { - Some(batch_response.finish()) + let batch_resp = batch_response.finish(); + Some(MethodResponse::from_batch(batch_resp).to_result()) } } else { Some(batch_response_error(Id::Null, ErrorObject::from(ErrorCode::ParseError))) } } -pub(crate) async fn process_single_request( +pub(crate) async fn process_single_request( data: Vec, - call: CallData<'_, L>, + call: CallData<'_>, ) -> Option { if let Ok(req) = serde_json::from_slice::>(&data) { Some(execute_call_with_tracing(req, call).await) @@ -117,17 +110,14 @@ pub(crate) async fn process_single_request( } #[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(call, req), level = "TRACE")] -pub(crate) async fn execute_call_with_tracing<'a, L: Logger>( +pub(crate) async fn execute_call_with_tracing<'a>( req: Request<'a>, - call: CallData<'_, L>, + call: CallData<'_>, ) -> CallOrSubscription { execute_call(req, call).await } -pub(crate) async fn execute_call( - req: Request<'_>, - call: CallData<'_, L>, -) -> CallOrSubscription { +pub(crate) async fn execute_call(req: Request<'_>, call: CallData<'_>) -> CallOrSubscription { let CallData { methods, max_response_body_size, @@ -135,66 +125,42 @@ pub(crate) async fn execute_call( conn_id, id_provider, sink, - logger, request_start, bounded_subscriptions, } = call; rx_log_from_json(&req, call.max_log_length); - let params = Params::new(req.params.map(|params| params.get())); + let params = Params::new(req.params.as_ref().map(|params| params.get())); let name = &req.method; let id = req.id; let response = match methods.method_with_name(name) { None => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::Unknown, - TransportProtocol::Http, - ); let response = MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound)); CallOrSubscription::Call(response) } - Some((name, method)) => match method { + Some((_name, method)) => match method { MethodCallback::Sync(callback) => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::MethodCall, - TransportProtocol::Http, - ); let response = (callback)(id, params, max_response_body_size as usize); CallOrSubscription::Call(response) } MethodCallback::Async(callback) => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::MethodCall, - TransportProtocol::Http, - ); let id = id.into_owned(); let params = params.into_owned(); let response = (callback)(id, params, conn_id, max_response_body_size as usize).await; CallOrSubscription::Call(response) } + MethodCallback::AsyncWithDetails(_callback) => { + unimplemented!() + } MethodCallback::Subscription(callback) => { if let Some(p) = bounded_subscriptions.acquire() { let conn_state = SubscriptionState { conn_id, id_provider, subscription_permit: p }; - match callback(id, params, sink.clone(), conn_state).await { - Ok(r) => CallOrSubscription::Subscription(r), - Err(id) => { - let response = MethodResponse::error( - id, - ErrorObject::from(ErrorCode::InternalError), - ); - CallOrSubscription::Call(response) - } - } + let response = callback(id, params, sink.clone(), conn_state).await; + CallOrSubscription::Subscription(response) } else { let response = MethodResponse::error( id, @@ -204,13 +170,6 @@ pub(crate) async fn execute_call( } } MethodCallback::Unsubscription(callback) => { - logger.on_call( - name, - params.clone(), - logger::MethodKind::Unsubscription, - TransportProtocol::WebSocket, - ); - // Don't adhere to any resource or subscription limits; always let unsubscribing // happen! let result = callback(id, params, conn_id, max_response_body_size as usize); @@ -219,48 +178,38 @@ pub(crate) async fn execute_call( }, }; - tx_log_from_str(&response.as_response().result, max_log_length); - logger.on_result( - name, - response.as_response().success_or_error, - request_start, - TransportProtocol::Http, - ); + tx_log_from_str(response.as_response().as_result(), max_log_length); + let _ = request_start; response } #[instrument(name = "notification", fields(method = notif.method.as_ref()), skip(notif, max_log_length), level = "TRACE")] -fn execute_notification(notif: Notif<'_>, max_log_length: u32) -> MethodResponse { - rx_log_from_json(¬if, max_log_length); +fn execute_notification(notif: &Notif<'_>, max_log_length: u32) -> MethodResponse { + rx_log_from_json(notif, max_log_length); let response = - MethodResponse { result: String::new(), success_or_error: MethodResponseResult::Success }; - tx_log_from_str(&response.result, max_log_length); + MethodResponse::response(Id::Null, ResponsePayload::success(String::new()), usize::MAX); + tx_log_from_str(response.as_result(), max_log_length); response } #[allow(dead_code)] -pub(crate) struct HandleRequest { +pub(crate) struct HandleRequest { pub(crate) methods: Methods, pub(crate) max_request_body_size: u32, pub(crate) max_response_body_size: u32, pub(crate) max_log_length: u32, pub(crate) batch_requests_supported: bool, - pub(crate) logger: L, pub(crate) conn: Arc, pub(crate) bounded_subscriptions: BoundedSubscriptions, pub(crate) method_sink: MethodSink, pub(crate) id_provider: Arc, } -pub(crate) async fn handle_request( - request: String, - input: HandleRequest, -) -> Option { +pub(crate) async fn handle_request(request: String, input: HandleRequest) -> Option { let HandleRequest { methods, max_response_body_size, max_log_length, - logger, conn, bounded_subscriptions, method_sink, @@ -282,24 +231,22 @@ pub(crate) async fn handle_request( }) .unwrap_or(Kind::Single); - let request_start = logger.on_request(TransportProtocol::Http); - let call = CallData { conn_id: 0, - logger: &logger, methods: &methods, id_provider: &*id_provider, sink: &method_sink, max_response_body_size, max_log_length, - request_start, + request_start: Instant::now(), bounded_subscriptions, }; + // Single request or notification let res = if matches!(request_kind, Kind::Single) { let response = process_single_request(request.into_bytes(), call).await; match response { - Some(CallOrSubscription::Call(response)) => Some(response.result), + Some(CallOrSubscription::Call(response)) => Some(response.to_result()), Some(CallOrSubscription::Subscription(_)) => { // subscription responses are sent directly over the sink, return a response here // would lead to duplicate responses for the subscription response diff --git a/crates/rpc/ipc/src/server/mod.rs b/crates/rpc/ipc/src/server/mod.rs index 332899ad9..e6d1a6051 100644 --- a/crates/rpc/ipc/src/server/mod.rs +++ b/crates/rpc/ipc/src/server/mod.rs @@ -6,8 +6,8 @@ use crate::server::{ }; use futures::{FutureExt, Stream, StreamExt}; use jsonrpsee::{ - core::{Error, TEN_MB_SIZE_BYTES}, - server::{logger::Logger, IdProvider, RandomIntegerIdProvider, ServerHandle}, + core::TEN_MB_SIZE_BYTES, + server::{AlreadyStoppedError, IdProvider, RandomIntegerIdProvider}, BoundedSubscriptions, MethodSink, Methods, }; use std::{ @@ -37,19 +37,15 @@ mod ipc; /// Ipc Server implementation // This is an adapted `jsonrpsee` Server, but for `Ipc` connections. -pub struct IpcServer { +pub struct IpcServer { /// The endpoint we listen for incoming transactions endpoint: Endpoint, - logger: L, id_provider: Arc, cfg: Settings, service_builder: tower::ServiceBuilder, } -impl IpcServer -where - L: Logger, -{ +impl IpcServer { /// Returns the configured [Endpoint] pub fn endpoint(&self) -> &Endpoint { &self.endpoint @@ -64,7 +60,7 @@ where /// use jsonrpsee::RpcModule; /// use reth_ipc::server::Builder; /// async fn run_server() -> Result<(), Box> { - /// let server = Builder::default().build("/tmp/my-uds")?; + /// let server = Builder::default().build("/tmp/my-uds"); /// let mut module = RpcModule::new(()); /// module.register_method("say_hello", |_, _| "lo")?; /// let handle = server.start(module).await?; @@ -76,7 +72,10 @@ where /// Ok(()) /// } /// ``` - pub async fn start(mut self, methods: impl Into) -> Result { + pub async fn start( + mut self, + methods: impl Into, + ) -> Result { let methods = methods.into(); let (stop_tx, stop_rx) = watch::channel(()); @@ -89,7 +88,7 @@ where Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)), None => tokio::spawn(self.start_inner(methods, stop_handle, tx)), }; - rx.await.expect("channel is open").map_err(Error::Custom)?; + rx.await.expect("channel is open")?; Ok(ServerHandle::new(stop_tx)) } @@ -98,8 +97,8 @@ where self, methods: Methods, stop_handle: StopHandle, - on_ready: oneshot::Sender>, - ) -> io::Result<()> { + on_ready: oneshot::Sender>, + ) { trace!(endpoint = ?self.endpoint.path(), "starting ipc server"); if cfg!(unix) { @@ -115,7 +114,6 @@ where let max_log_length = self.cfg.max_log_length; let id_provider = self.id_provider; let max_subscriptions_per_connection = self.cfg.max_subscriptions_per_connection; - let logger = self.logger; let mut id: u32 = 0; let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize); @@ -129,9 +127,10 @@ where Incoming::new(connections) } Err(err) => { - let msg = format!("failed to listen on ipc endpoint `{endpoint_path}`: {err}"); - on_ready.send(Err(msg)).ok(); - return Err(err) + on_ready + .send(Err(IpcServerStartError { endpoint: endpoint_path, source: err })) + .ok(); + return } }; // signal that we're ready to accept connections @@ -154,8 +153,7 @@ where }; let (tx, rx) = mpsc::channel::(message_buffer_capacity as usize); - let method_sink = - MethodSink::new_with_limit(tx, max_response_body_size, max_log_length); + let method_sink = MethodSink::new_with_limit(tx, max_response_body_size); let tower_service = TowerService { inner: ServiceData { methods: methods.clone(), @@ -166,7 +164,6 @@ where stop_handle: stop_handle.clone(), max_subscriptions_per_connection, conn_id: id, - logger: logger.clone(), conn: Arc::new(conn), bounded_subscriptions: BoundedSubscriptions::new( max_subscriptions_per_connection, @@ -193,7 +190,6 @@ where } connections.await; - Ok(()) } } @@ -207,10 +203,19 @@ impl std::fmt::Debug for IpcServer { } } +/// Error thrown when server couldn't be started. +#[derive(Debug, thiserror::Error)] +#[error("failed to listen on ipc endpoint `{endpoint}`: {source}")] +pub struct IpcServerStartError { + endpoint: String, + #[source] + source: io::Error, +} + /// Data required by the server to handle requests received via an IPC connection #[derive(Debug, Clone)] #[allow(dead_code)] -pub(crate) struct ServiceData { +pub(crate) struct ServiceData { /// Registered server methods. pub(crate) methods: Methods, /// Max request body size. @@ -229,8 +234,6 @@ pub(crate) struct ServiceData { pub(crate) max_subscriptions_per_connection: u32, /// Connection ID pub(crate) conn_id: u32, - /// Logger. - pub(crate) logger: L, /// Handle to hold a `connection permit`. pub(crate) conn: Arc, /// Limits the number of subscriptions for this connection @@ -246,11 +249,11 @@ pub(crate) struct ServiceData { /// # Note /// This is similar to [`hyper::service::service_fn`](https://docs.rs/hyper/latest/hyper/service/fn.service_fn.html). #[derive(Debug)] -pub struct TowerService { - inner: ServiceData, +pub struct TowerService { + inner: ServiceData, } -impl Service for TowerService { +impl Service for TowerService { /// The response of a handled RPC call /// /// This is an `Option` because subscriptions and call responses are handled differently. @@ -277,7 +280,6 @@ impl Service for TowerService { max_response_body_size: self.inner.max_response_body_size, max_log_length: self.inner.max_log_length, batch_requests_supported: true, - logger: self.inner.logger.clone(), conn: self.inner.conn.clone(), bounded_subscriptions: self.inner.bounded_subscriptions.clone(), method_sink: self.inner.method_sink.clone(), @@ -411,9 +413,8 @@ impl Default for Settings { /// Builder to configure and create a JSON-RPC server #[derive(Debug)] -pub struct Builder { +pub struct Builder { settings: Settings, - logger: L, /// Subscription ID provider. id_provider: Arc, service_builder: tower::ServiceBuilder, @@ -423,14 +424,13 @@ impl Default for Builder { fn default() -> Self { Builder { settings: Settings::default(), - logger: (), id_provider: Arc::new(RandomIntegerIdProvider), service_builder: tower::ServiceBuilder::new(), } } } -impl Builder { +impl Builder { /// Set the maximum size of a request body in bytes. Default is 10 MiB. pub fn max_request_body_size(mut self, size: u32) -> Self { self.settings.max_request_body_size = size; @@ -483,16 +483,6 @@ impl Builder { self } - /// Add a logger to the builder [`Logger`]. - pub fn set_logger(self, logger: T) -> Builder { - Builder { - settings: self.settings, - logger, - id_provider: self.id_provider, - service_builder: self.service_builder, - } - } - /// Configure a custom [`tokio::runtime::Handle`] to run the server on. /// /// Default: [`tokio::spawn`] @@ -538,36 +528,57 @@ impl Builder { /// async fn main() { /// let builder = tower::ServiceBuilder::new(); /// - /// let server = reth_ipc::server::Builder::default() - /// .set_middleware(builder) - /// .build("/tmp/my-uds") - /// .unwrap(); + /// let server = + /// reth_ipc::server::Builder::default().set_middleware(builder).build("/tmp/my-uds"); /// } /// ``` - pub fn set_middleware(self, service_builder: tower::ServiceBuilder) -> Builder { - Builder { - settings: self.settings, - logger: self.logger, - id_provider: self.id_provider, - service_builder, - } + pub fn set_middleware(self, service_builder: tower::ServiceBuilder) -> Builder { + Builder { settings: self.settings, id_provider: self.id_provider, service_builder } } /// Finalize the configuration of the server. Consumes the [`Builder`]. - pub fn build(self, endpoint: impl AsRef) -> Result, Error> { + pub fn build(self, endpoint: impl AsRef) -> IpcServer { let endpoint = Endpoint::new(endpoint.as_ref().to_string()); self.build_with_endpoint(endpoint) } /// Finalize the configuration of the server. Consumes the [`Builder`]. - pub fn build_with_endpoint(self, endpoint: Endpoint) -> Result, Error> { - Ok(IpcServer { + pub fn build_with_endpoint(self, endpoint: Endpoint) -> IpcServer { + IpcServer { endpoint, cfg: self.settings, - logger: self.logger, id_provider: self.id_provider, service_builder: self.service_builder, - }) + } + } +} + +/// Server handle. +/// +/// When all [`jsonrpsee::server::StopHandle`]'s have been `dropped` or `stop` has been called +/// the server will be stopped. +#[derive(Debug, Clone)] +pub struct ServerHandle(Arc>); + +impl ServerHandle { + /// Create a new server handle. + pub(crate) fn new(tx: watch::Sender<()>) -> Self { + Self(Arc::new(tx)) + } + + /// Tell the server to stop without waiting for the server to stop. + pub fn stop(&self) -> Result<(), AlreadyStoppedError> { + self.0.send(()).map_err(|_| AlreadyStoppedError) + } + + /// Wait for the server to stop. + pub async fn stopped(self) { + self.0.closed().await + } + + /// Check if the server has been stopped. + pub fn is_stopped(&self) -> bool { + self.0.is_closed() } } @@ -632,7 +643,7 @@ mod tests { #[tokio::test] async fn test_rpc_request() { let endpoint = dummy_endpoint(); - let server = Builder::default().build(&endpoint).unwrap(); + let server = Builder::default().build(&endpoint); let mut module = RpcModule::new(()); let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#; module.register_method("eth_chainId", move |_, _| msg).unwrap(); @@ -647,7 +658,7 @@ mod tests { #[tokio::test] async fn test_ipc_modules() { let endpoint = dummy_endpoint(); - let server = Builder::default().build(&endpoint).unwrap(); + let server = Builder::default().build(&endpoint); let mut module = RpcModule::new(()); let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#; module.register_method("rpc_modules", move |_, _| msg).unwrap(); @@ -662,7 +673,7 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_rpc_subscription() { let endpoint = dummy_endpoint(); - let server = Builder::default().build(&endpoint).unwrap(); + let server = Builder::default().build(&endpoint); let (tx, _rx) = broadcast::channel::(16); let mut module = RpcModule::new(tx.clone()); diff --git a/crates/rpc/rpc-builder/Cargo.toml b/crates/rpc/rpc-builder/Cargo.toml index 2c494d99a..1769e7860 100644 --- a/crates/rpc/rpc-builder/Cargo.toml +++ b/crates/rpc/rpc-builder/Cargo.toml @@ -27,6 +27,7 @@ jsonrpsee = { workspace = true, features = ["server"] } tower-http = { workspace = true, features = ["full"] } tower = { workspace = true, features = ["full"] } hyper.workspace = true +pin-project.workspace = true # metrics reth-metrics = { workspace = true, features = ["common"] } diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index bdb70d7ad..aafce417b 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -7,8 +7,9 @@ use crate::{ use hyper::header::AUTHORIZATION; pub use jsonrpsee::server::ServerBuilder; use jsonrpsee::{ + core::RegisterMethodError, http_client::HeaderMap, - server::{RpcModule, ServerHandle}, + server::{AlreadyStoppedError, RpcModule, ServerHandle}, Methods, }; use reth_network_api::{NetworkInfo, Peers}; @@ -32,6 +33,7 @@ use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, time::{Duration, SystemTime, UNIX_EPOCH}, }; +use tower::layer::util::Identity; /// Configure and launch a _standalone_ auth server with `engine` and a _new_ `eth` namespace. #[allow(clippy::too_many_arguments)] @@ -129,12 +131,14 @@ where // By default, both http and ws are enabled. let server = ServerBuilder::new() - .set_middleware(middleware) + .set_http_middleware(middleware) .build(socket_addr) .await - .map_err(|err| RpcError::from_jsonrpsee_error(err, ServerKind::Auth(socket_addr)))?; + .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?; - let local_addr = server.local_addr()?; + let local_addr = server + .local_addr() + .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?; let handle = server.start(module); Ok(AuthServerHandle { handle, local_addr, secret }) @@ -148,7 +152,7 @@ pub struct AuthServerConfig { /// The secret for the auth layer of the server. pub(crate) secret: JwtSecret, /// Configs for JSON-RPC Http. - pub(crate) server_config: ServerBuilder, + pub(crate) server_config: ServerBuilder, } // === impl AuthServerConfig === @@ -173,12 +177,15 @@ impl AuthServerConfig { .layer(AuthLayer::new(JwtAuthValidator::new(secret.clone()))); // By default, both http and ws are enabled. - let server = - server_config.set_middleware(middleware).build(socket_addr).await.map_err(|err| { - RpcError::from_jsonrpsee_error(err, ServerKind::Auth(socket_addr)) - })?; + let server = server_config + .set_http_middleware(middleware) + .build(socket_addr) + .await + .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?; - let local_addr = server.local_addr()?; + let local_addr = server + .local_addr() + .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?; let handle = server.start(module.inner); Ok(AuthServerHandle { handle, local_addr, secret }) @@ -190,7 +197,7 @@ impl AuthServerConfig { pub struct AuthServerConfigBuilder { socket_addr: Option, secret: JwtSecret, - server_config: Option, + server_config: Option>, } // === impl AuthServerConfigBuilder === @@ -223,7 +230,7 @@ impl AuthServerConfigBuilder { /// /// Note: this always configures an [EthSubscriptionIdProvider] /// [IdProvider](jsonrpsee::server::IdProvider) for convenience. - pub fn with_server_config(mut self, config: ServerBuilder) -> Self { + pub fn with_server_config(mut self, config: ServerBuilder) -> Self { self.server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default())); self } @@ -286,7 +293,7 @@ impl AuthRpcModule { pub fn merge_auth_methods( &mut self, other: impl Into, - ) -> Result { + ) -> Result { self.module_mut().merge(other.into()).map(|_| true) } @@ -320,8 +327,8 @@ impl AuthServerHandle { } /// Tell the server to stop without waiting for the server to stop. - pub fn stop(self) -> Result<(), RpcError> { - Ok(self.handle.stop()?) + pub fn stop(self) -> Result<(), AlreadyStoppedError> { + self.handle.stop() } /// Returns the url to the http server diff --git a/crates/rpc/rpc-builder/src/error.rs b/crates/rpc/rpc-builder/src/error.rs index a1d6994f8..fd59536f7 100644 --- a/crates/rpc/rpc-builder/src/error.rs +++ b/crates/rpc/rpc-builder/src/error.rs @@ -1,8 +1,6 @@ -use std::net::SocketAddr; - use crate::RethRpcModule; -use jsonrpsee::core::Error as JsonRpseeError; -use std::{io, io::ErrorKind}; +use reth_ipc::server::IpcServerStartError; +use std::{io, io::ErrorKind, net::SocketAddr}; /// Rpc server kind. #[derive(Debug, PartialEq, Eq, Copy, Clone)] @@ -40,12 +38,17 @@ impl std::fmt::Display for ServerKind { } } -/// Rpc Errors. +/// Rpc Server related errors #[derive(Debug, thiserror::Error)] pub enum RpcError { - /// Wrapper for `jsonrpsee::core::Error`. - #[error(transparent)] - RpcError(#[from] JsonRpseeError), + /// Thrown during server start. + #[error("Failed to start {kind} server: {error}")] + ServerError { + /// Server kind. + kind: ServerKind, + /// IO error. + error: io::Error, + }, /// Address already in use. #[error("address {kind} is already in use (os error 98). Choose a different port using {}", kind.flags())] AddressAlreadyInUse { @@ -57,28 +60,21 @@ pub enum RpcError { /// Http and WS server configured on the same port but with conflicting settings. #[error(transparent)] WsHttpSamePortError(#[from] WsHttpSamePortError), + /// Thrown when IPC server fails to start. + #[error(transparent)] + IpcServerError(#[from] IpcServerStartError), /// Custom error. #[error("{0}")] Custom(String), } impl RpcError { - /// Converts a `jsonrpsee::core::Error` to a more descriptive `RpcError`. - pub fn from_jsonrpsee_error(err: JsonRpseeError, kind: ServerKind) -> RpcError { - match err { - JsonRpseeError::Transport(err) => { - if let Some(io_error) = err.downcast_ref::() { - if io_error.kind() == ErrorKind::AddrInUse { - return RpcError::AddressAlreadyInUse { - kind, - error: io::Error::from(io_error.kind()), - } - } - } - RpcError::RpcError(JsonRpseeError::Transport(err)) - } - _ => err.into(), + /// Converts an [io::Error] to a more descriptive `RpcError`. + pub fn server_error(io_error: io::Error, kind: ServerKind) -> RpcError { + if io_error.kind() == ErrorKind::AddrInUse { + return RpcError::AddressAlreadyInUse { kind, error: io_error } } + RpcError::ServerError { kind, error: io_error } } } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 4d56da16d..a19cd57e9 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -155,7 +155,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] use crate::{ - auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcServerMetrics, + auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcRequestMetrics, RpcModuleSelection::Selection, }; use constants::*; @@ -163,7 +163,8 @@ use error::{RpcError, ServerKind}; use hyper::{header::AUTHORIZATION, HeaderMap}; pub use jsonrpsee::server::ServerBuilder; use jsonrpsee::{ - server::{IdProvider, Server, ServerHandle}, + core::RegisterMethodError, + server::{AlreadyStoppedError, IdProvider, RpcServiceBuilder, Server, ServerHandle}, Methods, RpcModule, }; use reth_ipc::server::IpcServer; @@ -200,7 +201,7 @@ use std::{ time::{Duration, SystemTime, UNIX_EPOCH}, }; use strum::{AsRefStr, EnumIter, IntoStaticStr, ParseError, VariantArray, VariantNames}; -use tower::layer::util::{Identity, Stack}; +pub use tower::layer::util::{Identity, Stack}; use tower_http::cors::CorsLayer; use tracing::{instrument, trace}; @@ -1440,13 +1441,13 @@ where #[derive(Default)] pub struct RpcServerConfig { /// Configs for JSON-RPC Http. - http_server_config: Option, + http_server_config: Option>, /// Allowed CORS Domains for http http_cors_domains: Option, /// Address where to bind the http server to http_addr: Option, /// Configs for WS server - ws_server_config: Option, + ws_server_config: Option>, /// Allowed CORS Domains for ws. ws_cors_domains: Option, /// Address where to bind the ws server to @@ -1478,12 +1479,12 @@ impl fmt::Debug for RpcServerConfig { impl RpcServerConfig { /// Creates a new config with only http set - pub fn http(config: ServerBuilder) -> Self { + pub fn http(config: ServerBuilder) -> Self { Self::default().with_http(config) } /// Creates a new config with only ws set - pub fn ws(config: ServerBuilder) -> Self { + pub fn ws(config: ServerBuilder) -> Self { Self::default().with_ws(config) } @@ -1496,7 +1497,7 @@ impl RpcServerConfig { /// /// Note: this always configures an [EthSubscriptionIdProvider] [IdProvider] for convenience. /// To set a custom [IdProvider], please use [Self::with_id_provider]. - pub fn with_http(mut self, config: ServerBuilder) -> Self { + pub fn with_http(mut self, config: ServerBuilder) -> Self { self.http_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default())); self @@ -1523,7 +1524,7 @@ impl RpcServerConfig { /// /// Note: this always configures an [EthSubscriptionIdProvider] [IdProvider] for convenience. /// To set a custom [IdProvider], please use [Self::with_id_provider]. - pub fn with_ws(mut self, config: ServerBuilder) -> Self { + pub fn with_ws(mut self, config: ServerBuilder) -> Self { self.ws_server_config = Some(config.set_id_provider(EthSubscriptionIdProvider::default())); self } @@ -1671,7 +1672,7 @@ impl RpcServerConfig { .http .as_ref() .or(modules.ws.as_ref()) - .map(RpcServerMetrics::new) + .map(RpcRequestMetrics::same_port) .unwrap_or_default(), ) .await?; @@ -1696,7 +1697,7 @@ impl RpcServerConfig { self.ws_cors_domains.take(), self.jwt_secret.clone(), ServerKind::WS(ws_socket_addr), - modules.ws.as_ref().map(RpcServerMetrics::new).unwrap_or_default(), + modules.ws.as_ref().map(RpcRequestMetrics::ws).unwrap_or_default(), ) .await?; ws_local_addr = Some(addr); @@ -1711,7 +1712,7 @@ impl RpcServerConfig { self.http_cors_domains.take(), self.jwt_secret.clone(), ServerKind::Http(http_socket_addr), - modules.http.as_ref().map(RpcServerMetrics::new).unwrap_or_default(), + modules.http.as_ref().map(RpcRequestMetrics::http).unwrap_or_default(), ) .await?; http_local_addr = Some(addr); @@ -1736,11 +1737,14 @@ impl RpcServerConfig { server.ws_http = self.build_ws_http(modules).await?; if let Some(builder) = self.ipc_server_config { - let metrics = modules.ipc.as_ref().map(RpcServerMetrics::new).unwrap_or_default(); + // let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::new).unwrap_or_default(); let ipc_path = self .ipc_endpoint .unwrap_or_else(|| Endpoint::new(DEFAULT_IPC_ENDPOINT.to_string())); - let ipc = builder.set_logger(metrics).build(ipc_path.path())?; + let ipc = builder + // TODO(mattsse): add metrics middleware for IPC + // .set_middleware(metrics) + .build(ipc_path.path()); server.ipc = Some(ipc); } @@ -1874,10 +1878,7 @@ impl TransportRpcModules { /// Fails if any of the methods in other is present already. /// /// Returns [Ok(false)] if no http transport is configured. - pub fn merge_http( - &mut self, - other: impl Into, - ) -> Result { + pub fn merge_http(&mut self, other: impl Into) -> Result { if let Some(ref mut http) = self.http { return http.merge(other.into()).map(|_| true) } @@ -1889,10 +1890,7 @@ impl TransportRpcModules { /// Fails if any of the methods in other is present already. /// /// Returns [Ok(false)] if no ws transport is configured. - pub fn merge_ws( - &mut self, - other: impl Into, - ) -> Result { + pub fn merge_ws(&mut self, other: impl Into) -> Result { if let Some(ref mut ws) = self.ws { return ws.merge(other.into()).map(|_| true) } @@ -1904,10 +1902,7 @@ impl TransportRpcModules { /// Fails if any of the methods in other is present already. /// /// Returns [Ok(false)] if no ipc transport is configured. - pub fn merge_ipc( - &mut self, - other: impl Into, - ) -> Result { + pub fn merge_ipc(&mut self, other: impl Into) -> Result { if let Some(ref mut ipc) = self.ipc { return ipc.merge(other.into()).map(|_| true) } @@ -1920,7 +1915,7 @@ impl TransportRpcModules { pub fn merge_configured( &mut self, other: impl Into, - ) -> Result<(), jsonrpsee::core::error::Error> { + ) -> Result<(), RegisterMethodError> { let other = other.into(); self.merge_http(other.clone())?; self.merge_ws(other.clone())?; @@ -2004,16 +1999,22 @@ impl Default for WsHttpServers { } /// Http Servers Enum +#[allow(clippy::type_complexity)] enum WsHttpServerKind { /// Http server - Plain(Server), + Plain(Server>), /// Http server with cors - WithCors(Server, RpcServerMetrics>), + WithCors(Server, Stack>), /// Http server with auth - WithAuth(Server, Identity>, RpcServerMetrics>), + WithAuth( + Server, Identity>, Stack>, + ), /// Http server with cors and auth WithCorsAuth( - Server, Stack>, RpcServerMetrics>, + Server< + Stack, Stack>, + Stack, + >, ), } @@ -2034,12 +2035,12 @@ impl WsHttpServerKind { /// /// Returns the address of the started server. async fn build( - builder: ServerBuilder, + builder: ServerBuilder, socket_addr: SocketAddr, cors_domains: Option, jwt_secret: Option, server_kind: ServerKind, - metrics: RpcServerMetrics, + metrics: RpcRequestMetrics, ) -> Result<(Self, SocketAddr), RpcError> { if let Some(cors) = cors_domains.as_deref().map(cors::create_cors_layer) { let cors = cors.map_err(|err| RpcError::Custom(err.to_string()))?; @@ -2051,23 +2052,25 @@ impl WsHttpServerKind { .layer(AuthLayer::new(JwtAuthValidator::new(secret.clone()))); let server = builder - .set_middleware(middleware) - .set_logger(metrics) + .set_http_middleware(middleware) + .set_rpc_middleware(RpcServiceBuilder::new().layer(metrics)) .build(socket_addr) .await - .map_err(|err| RpcError::from_jsonrpsee_error(err, server_kind))?; - let local_addr = server.local_addr()?; + .map_err(|err| RpcError::server_error(err, server_kind))?; + let local_addr = + server.local_addr().map_err(|err| RpcError::server_error(err, server_kind))?; let server = WsHttpServerKind::WithCorsAuth(server); Ok((server, local_addr)) } else { let middleware = tower::ServiceBuilder::new().layer(cors); let server = builder - .set_middleware(middleware) - .set_logger(metrics) + .set_http_middleware(middleware) + .set_rpc_middleware(RpcServiceBuilder::new().layer(metrics)) .build(socket_addr) .await - .map_err(|err| RpcError::from_jsonrpsee_error(err, server_kind))?; - let local_addr = server.local_addr()?; + .map_err(|err| RpcError::server_error(err, server_kind))?; + let local_addr = + server.local_addr().map_err(|err| RpcError::server_error(err, server_kind))?; let server = WsHttpServerKind::WithCors(server); Ok((server, local_addr)) } @@ -2076,24 +2079,24 @@ impl WsHttpServerKind { let middleware = tower::ServiceBuilder::new() .layer(AuthLayer::new(JwtAuthValidator::new(secret.clone()))); let server = builder - .set_middleware(middleware) - .set_logger(metrics) + .set_http_middleware(middleware) + .set_rpc_middleware(RpcServiceBuilder::new().layer(metrics)) .build(socket_addr) .await - .map_err(|err| { - RpcError::from_jsonrpsee_error(err, ServerKind::Auth(socket_addr)) - })?; - let local_addr = server.local_addr()?; + .map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?; + let local_addr = + server.local_addr().map_err(|err| RpcError::server_error(err, server_kind))?; let server = WsHttpServerKind::WithAuth(server); Ok((server, local_addr)) } else { // plain server without any middleware let server = builder - .set_logger(metrics) + .set_rpc_middleware(RpcServiceBuilder::new().layer(metrics)) .build(socket_addr) .await - .map_err(|err| RpcError::from_jsonrpsee_error(err, server_kind))?; - let local_addr = server.local_addr()?; + .map_err(|err| RpcError::server_error(err, server_kind))?; + let local_addr = + server.local_addr().map_err(|err| RpcError::server_error(err, server_kind))?; let server = WsHttpServerKind::Plain(server); Ok((server, local_addr)) } @@ -2105,7 +2108,7 @@ pub struct RpcServer { /// Configured ws,http servers ws_http: WsHttpServer, /// ipc server - ipc: Option>, + ipc: Option, } // === impl RpcServer === @@ -2190,7 +2193,7 @@ pub struct RpcServerHandle { http: Option, ws: Option, ipc_endpoint: Option, - ipc: Option, + ipc: Option, jwt_secret: Option, } @@ -2224,7 +2227,7 @@ impl RpcServerHandle { } /// Tell the server to stop without waiting for the server to stop. - pub fn stop(self) -> Result<(), RpcError> { + pub fn stop(self) -> Result<(), AlreadyStoppedError> { if let Some(handle) = self.http { handle.stop()? } diff --git a/crates/rpc/rpc-builder/src/metrics.rs b/crates/rpc/rpc-builder/src/metrics.rs index 64a005037..a0285622b 100644 --- a/crates/rpc/rpc-builder/src/metrics.rs +++ b/crates/rpc/rpc-builder/src/metrics.rs @@ -1,63 +1,199 @@ -use jsonrpsee::{ - helpers::MethodResponseResult, - server::logger::{HttpRequest, Logger, MethodKind, Params, TransportProtocol}, - RpcModule, -}; +use jsonrpsee::{server::middleware::rpc::RpcServiceT, types::Request, MethodResponse, RpcModule}; use reth_metrics::{ metrics::{Counter, Histogram}, Metrics, }; -use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Instant}; +use std::{ + collections::HashMap, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Instant, +}; +use tower::Layer; -/// Metrics for the RPC server -#[derive(Default, Clone)] -pub(crate) struct RpcServerMetrics { +/// Metrics for the RPC server. +/// +/// Metrics are divided into two categories: +/// - Connection metrics: metrics for the connection (e.g. number of connections opened, relevant +/// for WS and IPC) +/// - Request metrics: metrics for each RPC method (e.g. number of calls started, time taken to +/// process a call) +#[derive(Default, Debug, Clone)] +pub(crate) struct RpcRequestMetrics { inner: Arc, } -/// Metrics for the RPC server -#[derive(Default, Clone)] -struct RpcServerMetricsInner { - /// Connection metrics per transport type - connection_metrics: ConnectionMetrics, - /// Call metrics per RPC method - call_metrics: HashMap<&'static str, RpcServerCallMetrics>, -} - -impl RpcServerMetrics { - pub(crate) fn new(module: &RpcModule<()>) -> Self { +impl RpcRequestMetrics { + pub(crate) fn new(module: &RpcModule<()>, transport: RpcTransport) -> Self { Self { inner: Arc::new(RpcServerMetricsInner { - connection_metrics: ConnectionMetrics::default(), + connection_metrics: transport.connection_metrics(), call_metrics: HashMap::from_iter(module.method_names().map(|method| { (method, RpcServerCallMetrics::new_with_labels(&[("method", method)])) })), }), } } + + /// Creates a new instance of the metrics layer for HTTP. + pub(crate) fn http(module: &RpcModule<()>) -> Self { + Self::new(module, RpcTransport::Http) + } + + /// Creates a new instance of the metrics layer for same port. + /// + /// Note: currently it's not possible to track transport specific metrics for a server that runs http and ws on the same port: until we have this feature we will use the http metrics for this case. + pub(crate) fn same_port(module: &RpcModule<()>) -> Self { + Self::http(module) + } + + /// Creates a new instance of the metrics layer for Ws. + pub(crate) fn ws(module: &RpcModule<()>) -> Self { + Self::new(module, RpcTransport::WebSocket) + } + + /// Creates a new instance of the metrics layer for Ws. + #[allow(unused)] + pub(crate) fn ipc(module: &RpcModule<()>) -> Self { + Self::new(module, RpcTransport::Ipc) + } } +impl Layer for RpcRequestMetrics { + type Service = RpcRequestMetricsService; + + fn layer(&self, inner: S) -> Self::Service { + RpcRequestMetricsService::new(inner, self.clone()) + } +} + +/// Metrics for the RPC server +#[derive(Default, Clone, Debug)] +struct RpcServerMetricsInner { + /// Connection metrics per transport type + connection_metrics: RpcServerConnectionMetrics, + /// Call metrics per RPC method + call_metrics: HashMap<&'static str, RpcServerCallMetrics>, +} + +/// A [RpcServiceT] middleware that captures RPC metrics for the server. +/// +/// This is created per connection and captures metrics for each request. #[derive(Clone)] -struct ConnectionMetrics { - http: RpcServerConnectionMetrics, - ws: RpcServerConnectionMetrics, +pub(crate) struct RpcRequestMetricsService { + metrics: RpcRequestMetrics, + inner: S, } -impl ConnectionMetrics { - fn get_metrics(&self, transport: TransportProtocol) -> &RpcServerConnectionMetrics { - match transport { - TransportProtocol::Http => &self.http, - TransportProtocol::WebSocket => &self.ws, +impl RpcRequestMetricsService { + pub(crate) fn new(service: S, metrics: RpcRequestMetrics) -> Self { + // this instance is kept alive for the duration of the connection + metrics.inner.connection_metrics.connections_opened.increment(1); + Self { inner: service, metrics } + } +} + +impl<'a, S> RpcServiceT<'a> for RpcRequestMetricsService +where + S: RpcServiceT<'a> + Send + Sync + Clone + 'static, +{ + type Future = MeteredRequestFuture; + + fn call(&self, req: Request<'a>) -> Self::Future { + self.metrics.inner.connection_metrics.requests_started.increment(1); + let call_metrics = self.metrics.inner.call_metrics.get_key_value(req.method.as_ref()); + if let Some((_, call_metrics)) = &call_metrics { + call_metrics.started.increment(1); + } + MeteredRequestFuture { + fut: self.inner.call(req), + started_at: Instant::now(), + metrics: self.metrics.clone(), + method: call_metrics.map(|(method, _)| *method), } } } -impl Default for ConnectionMetrics { - fn default() -> Self { - Self { - http: RpcServerConnectionMetrics::new_with_labels(&[("transport", "http")]), - ws: RpcServerConnectionMetrics::new_with_labels(&[("transport", "ws")]), +impl Drop for RpcRequestMetricsService { + fn drop(&mut self) { + // update connection metrics, connection closed + self.metrics.inner.connection_metrics.connections_closed.increment(1); + } +} + +/// Response future to update the metrics for a single request/response pair. +#[pin_project::pin_project] +pub(crate) struct MeteredRequestFuture { + #[pin] + fut: F, + /// time when the request started + started_at: Instant, + /// metrics for the method call + metrics: RpcRequestMetrics, + /// the method name if known + method: Option<&'static str>, +} + +impl std::fmt::Debug for MeteredRequestFuture { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("MeteredRequestFuture") + } +} + +impl> Future for MeteredRequestFuture { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + let res = this.fut.poll(cx); + if let Poll::Ready(resp) = &res { + let elapsed = this.started_at.elapsed().as_secs_f64(); + + // update transport metrics + this.metrics.inner.connection_metrics.requests_finished.increment(1); + this.metrics.inner.connection_metrics.request_time_seconds.record(elapsed); + + // update call metrics + if let Some(call_metrics) = + this.method.and_then(|method| this.metrics.inner.call_metrics.get(method)) + { + call_metrics.time_seconds.record(elapsed); + if resp.is_success() { + call_metrics.successful.increment(1); + } else { + call_metrics.failed.increment(1); + } + } } + res + } +} + +/// The transport protocol used for the RPC connection. +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub(crate) enum RpcTransport { + Http, + WebSocket, + #[allow(unused)] + Ipc, +} + +impl RpcTransport { + /// Returns the string representation of the transport protocol. + pub(crate) const fn as_str(&self) -> &'static str { + match self { + RpcTransport::Http => "http", + RpcTransport::WebSocket => "ws", + RpcTransport::Ipc => "ipc", + } + } + + /// Returns the connection metrics for the transport protocol. + fn connection_metrics(&self) -> RpcServerConnectionMetrics { + RpcServerConnectionMetrics::new_with_labels(&[("transport", self.as_str())]) } } @@ -90,61 +226,3 @@ struct RpcServerCallMetrics { /// Response for a single call time_seconds: Histogram, } - -impl Logger for RpcServerMetrics { - type Instant = Instant; - - fn on_connect( - &self, - _remote_addr: SocketAddr, - _request: &HttpRequest, - transport: TransportProtocol, - ) { - self.inner.connection_metrics.get_metrics(transport).connections_opened.increment(1) - } - - fn on_request(&self, transport: TransportProtocol) -> Self::Instant { - self.inner.connection_metrics.get_metrics(transport).requests_started.increment(1); - Instant::now() - } - - fn on_call( - &self, - method_name: &str, - _params: Params<'_>, - _kind: MethodKind, - _transport: TransportProtocol, - ) { - let Some(call_metrics) = self.inner.call_metrics.get(method_name) else { return }; - call_metrics.started.increment(1); - } - - fn on_result( - &self, - method_name: &str, - success: MethodResponseResult, - started_at: Self::Instant, - _transport: TransportProtocol, - ) { - let Some(call_metrics) = self.inner.call_metrics.get(method_name) else { return }; - - // capture call latency - call_metrics.time_seconds.record(started_at.elapsed().as_secs_f64()); - if success.is_success() { - call_metrics.successful.increment(1); - } else { - call_metrics.failed.increment(1); - } - } - - fn on_response(&self, _result: &str, started_at: Self::Instant, transport: TransportProtocol) { - let metrics = self.inner.connection_metrics.get_metrics(transport); - // capture request latency for this request/response pair - metrics.request_time_seconds.record(started_at.elapsed().as_secs_f64()); - metrics.requests_finished.increment(1); - } - - fn on_disconnect(&self, _remote_addr: SocketAddr, transport: TransportProtocol) { - self.inner.connection_metrics.get_metrics(transport).connections_closed.increment(1) - } -} diff --git a/crates/rpc/rpc-builder/tests/it/http.rs b/crates/rpc/rpc-builder/tests/it/http.rs index c2740d880..7fc714a2d 100644 --- a/crates/rpc/rpc-builder/tests/it/http.rs +++ b/crates/rpc/rpc-builder/tests/it/http.rs @@ -5,7 +5,6 @@ use crate::utils::{launch_http, launch_http_ws, launch_ws}; use jsonrpsee::{ core::{ client::{ClientT, SubscriptionClientT}, - error::Error, params::ArrayParams, }, http_client::HttpClient, @@ -30,9 +29,9 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::Value; use std::collections::HashSet; -fn is_unimplemented(err: Error) -> bool { +fn is_unimplemented(err: jsonrpsee::core::client::Error) -> bool { match err { - Error::Call(error_obj) => { + jsonrpsee::core::client::Error::Call(error_obj) => { error_obj.code() == ErrorCode::InternalError.code() && error_obj.message() == "unimplemented" } diff --git a/crates/rpc/rpc-builder/tests/it/serde.rs b/crates/rpc/rpc-builder/tests/it/serde.rs index a781e1d47..6a51c4f8d 100644 --- a/crates/rpc/rpc-builder/tests/it/serde.rs +++ b/crates/rpc/rpc-builder/tests/it/serde.rs @@ -2,7 +2,7 @@ use crate::utils::launch_http; use jsonrpsee::{ - core::{client::ClientT, traits::ToRpcParams, Error}, + core::{client::ClientT, traits::ToRpcParams}, types::Request, }; use reth_primitives::U256; @@ -12,7 +12,7 @@ use serde_json::value::RawValue; struct RawRpcParams(Box); impl ToRpcParams for RawRpcParams { - fn to_rpc_params(self) -> Result>, Error> { + fn to_rpc_params(self) -> Result>, serde_json::Error> { Ok(Some(self.0)) } } diff --git a/crates/rpc/rpc-engine-api/src/error.rs b/crates/rpc/rpc-engine-api/src/error.rs index 28a5dfbed..8a9a00a3b 100644 --- a/crates/rpc/rpc-engine-api/src/error.rs +++ b/crates/rpc/rpc-engine-api/src/error.rs @@ -210,12 +210,6 @@ impl From for jsonrpsee_types::error::ErrorObject<'static> { } } -impl From for jsonrpsee_core::error::Error { - fn from(error: EngineApiError) -> Self { - jsonrpsee_core::error::Error::Call(error.into()) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/crates/rpc/rpc-testing-util/src/debug.rs b/crates/rpc/rpc-testing-util/src/debug.rs index 8ec26deae..bdb549c30 100644 --- a/crates/rpc/rpc-testing-util/src/debug.rs +++ b/crates/rpc/rpc-testing-util/src/debug.rs @@ -1,7 +1,7 @@ //! Helpers for testing debug trace calls. use futures::{Stream, StreamExt}; -use jsonrpsee::core::Error as RpcError; +use jsonrpsee::core::client::Error as RpcError; use reth_primitives::{BlockId, TxHash, B256}; use reth_rpc_api::{clients::DebugApiClient, EthApiClient}; use reth_rpc_types::{ @@ -37,14 +37,14 @@ pub trait DebugApiExt { &self, hash: B256, opts: GethDebugTracingOptions, - ) -> impl Future> + Send; + ) -> impl Future> + Send; /// Trace all transactions in a block individually with the given tracing opts. fn debug_trace_transactions_in_block( &self, block: B, opts: GethDebugTracingOptions, - ) -> impl Future, jsonrpsee::core::Error>> + Send + ) -> impl Future, RpcError>> + Send where B: Into + Send; @@ -64,7 +64,7 @@ pub trait DebugApiExt { &self, request: TransactionRequest, opts: GethDebugTracingOptions, - ) -> impl Future> + Send; + ) -> impl Future> + Send; /// method for debug_traceCall using raw JSON strings for the request and options. fn debug_trace_call_raw_json( @@ -84,7 +84,7 @@ where &self, hash: B256, opts: GethDebugTracingOptions, - ) -> Result { + ) -> Result { let mut params = jsonrpsee::core::params::ArrayParams::new(); params.insert(hash).unwrap(); params.insert(opts).unwrap(); @@ -95,7 +95,7 @@ where &self, block: B, opts: GethDebugTracingOptions, - ) -> Result, jsonrpsee::core::Error> + ) -> Result, RpcError> where B: Into + Send, { @@ -150,7 +150,7 @@ where &self, request: TransactionRequest, opts: GethDebugTracingOptions, - ) -> Result { + ) -> Result { let mut params = jsonrpsee::core::params::ArrayParams::new(); params.insert(request).unwrap(); params.insert(opts).unwrap(); diff --git a/crates/rpc/rpc-testing-util/src/trace.rs b/crates/rpc/rpc-testing-util/src/trace.rs index 5ab544c84..df90e813b 100644 --- a/crates/rpc/rpc-testing-util/src/trace.rs +++ b/crates/rpc/rpc-testing-util/src/trace.rs @@ -1,7 +1,7 @@ //! Helpers for testing trace calls. use futures::{Stream, StreamExt}; -use jsonrpsee::core::Error as RpcError; +use jsonrpsee::core::client::Error as RpcError; use reth_primitives::{BlockId, Bytes, TxHash, B256}; use reth_rpc_api::clients::TraceApiClient; use reth_rpc_types::{ @@ -17,6 +17,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; + /// A type alias that represents the result of a raw transaction trace stream. type RawTransactionTraceResult<'a> = Pin> + 'a>>; diff --git a/crates/rpc/rpc-types-compat/src/block.rs b/crates/rpc/rpc-types-compat/src/block.rs index e2c3bb0dd..f7257f90c 100644 --- a/crates/rpc/rpc-types-compat/src/block.rs +++ b/crates/rpc/rpc-types-compat/src/block.rs @@ -76,7 +76,7 @@ pub fn from_block_full( block_hash, block_number, base_fee_per_gas, - U256::from(idx), + idx, ) }) .collect::>(); diff --git a/crates/rpc/rpc-types-compat/src/transaction/mod.rs b/crates/rpc/rpc-types-compat/src/transaction/mod.rs index 518c83bf6..745d32e34 100644 --- a/crates/rpc/rpc-types-compat/src/transaction/mod.rs +++ b/crates/rpc/rpc-types-compat/src/transaction/mod.rs @@ -1,16 +1,19 @@ //! Compatibility functions for rpc `Transaction` type. -mod signature; -mod typed; use alloy_rpc_types::request::{TransactionInput, TransactionRequest}; + use reth_primitives::{ BlockNumber, Transaction as PrimitiveTransaction, TransactionKind as PrimitiveTransactionKind, - TransactionSignedEcRecovered, TxType, B256, U128, U256, U8, + TransactionSignedEcRecovered, TxType, B256, }; #[cfg(feature = "optimism")] use reth_rpc_types::optimism::OptimismTransactionFields; use reth_rpc_types::{AccessList, AccessListItem, Transaction}; use signature::from_primitive_signature; pub use typed::*; + +mod signature; +mod typed; + /// Create a new rpc transaction result for a mined transaction, using the given block hash, /// number, and tx index fields to populate the corresponding fields in the rpc result. /// @@ -21,7 +24,7 @@ pub fn from_recovered_with_block_context( block_hash: B256, block_number: BlockNumber, base_fee: Option, - tx_index: U256, + tx_index: usize, ) -> Transaction { fill(tx, Some(block_hash), Some(block_number), base_fee, Some(tx_index)) } @@ -39,7 +42,7 @@ fn fill( block_hash: Option, block_number: Option, base_fee: Option, - transaction_index: Option, + transaction_index: Option, ) -> Transaction { let signer = tx.signer(); let mut signed_tx = tx.into_signed(); @@ -51,8 +54,8 @@ fn fill( #[allow(unreachable_patterns)] let (gas_price, max_fee_per_gas) = match signed_tx.tx_type() { - TxType::Legacy => (Some(U128::from(signed_tx.max_fee_per_gas())), None), - TxType::Eip2930 => (Some(U128::from(signed_tx.max_fee_per_gas())), None), + TxType::Legacy => (Some(signed_tx.max_fee_per_gas()), None), + TxType::Eip2930 => (Some(signed_tx.max_fee_per_gas()), None), TxType::Eip1559 | TxType::Eip4844 => { // the gas price field for EIP1559 is set to `min(tip, gasFeeCap - baseFee) + // baseFee` @@ -64,7 +67,7 @@ fn fill( }) .unwrap_or_else(|| signed_tx.max_fee_per_gas()); - (Some(U128::from(gas_price)), Some(U128::from(signed_tx.max_fee_per_gas()))) + (Some(gas_price), Some(signed_tx.max_fee_per_gas())) } _ => { // OP-deposit @@ -129,28 +132,28 @@ fn fill( from: signer, to, value: signed_tx.value(), - gas_price: gas_price.map(U256::from), - max_fee_per_gas: max_fee_per_gas.map(U256::from), - max_priority_fee_per_gas: signed_tx.max_priority_fee_per_gas().map(U256::from), + gas_price, + max_fee_per_gas, + max_priority_fee_per_gas: signed_tx.max_priority_fee_per_gas(), signature: Some(signature), - gas: U256::from(signed_tx.gas_limit()), + gas: signed_tx.gas_limit() as u128, input: signed_tx.input().clone(), chain_id, access_list, - transaction_type: Some(U8::from(signed_tx.tx_type() as u8)), + transaction_type: Some(signed_tx.tx_type() as u8), // These fields are set to None because they are not stored as part of the transaction block_hash, - block_number: block_number.map(U256::from), - transaction_index, + block_number, + transaction_index: transaction_index.map(|idx| idx as u64), // EIP-4844 fields - max_fee_per_blob_gas: signed_tx.max_fee_per_blob_gas().map(U256::from), + max_fee_per_blob_gas: signed_tx.max_fee_per_blob_gas(), blob_versioned_hashes, // Optimism fields #[cfg(feature = "optimism")] other: OptimismTransactionFields { source_hash: signed_tx.source_hash(), - mint: signed_tx.mint().map(U128::from), + mint: signed_tx.mint().map(reth_primitives::U128::from), is_system_tx: signed_tx.is_deposit().then_some(signed_tx.is_system_transaction()), } .into(), @@ -200,16 +203,16 @@ pub fn transaction_to_call_request(tx: TransactionSignedEcRecovered) -> Transact TransactionRequest { from: Some(from), to, - gas_price: gas_price.map(U256::from), - max_fee_per_gas: max_fee_per_gas.map(U256::from), - max_priority_fee_per_gas: max_priority_fee_per_gas.map(U256::from), - gas: Some(U256::from(gas)), + gas_price, + max_fee_per_gas, + max_priority_fee_per_gas, + gas: Some(gas as u128), value: Some(value), input: TransactionInput::new(input), nonce: Some(nonce), chain_id, access_list, - max_fee_per_blob_gas: max_fee_per_blob_gas.map(U256::from), + max_fee_per_blob_gas, blob_versioned_hashes, transaction_type: Some(tx_type.into()), sidecar: None, diff --git a/crates/rpc/rpc-types-compat/src/transaction/typed.rs b/crates/rpc/rpc-types-compat/src/transaction/typed.rs index 2b9b32471..a14898195 100644 --- a/crates/rpc/rpc-types-compat/src/transaction/typed.rs +++ b/crates/rpc/rpc-types-compat/src/transaction/typed.rs @@ -13,7 +13,7 @@ pub fn to_primitive_transaction( Some(match tx_request { TypedTransactionRequest::Legacy(tx) => Transaction::Legacy(TxLegacy { chain_id: tx.chain_id, - nonce: tx.nonce.to(), + nonce: tx.nonce, gas_price: tx.gas_price.to(), gas_limit: tx.gas_limit.try_into().ok()?, to: to_primitive_transaction_kind(tx.kind), @@ -22,7 +22,7 @@ pub fn to_primitive_transaction( }), TypedTransactionRequest::EIP2930(tx) => Transaction::Eip2930(TxEip2930 { chain_id: tx.chain_id, - nonce: tx.nonce.to(), + nonce: tx.nonce, gas_price: tx.gas_price.to(), gas_limit: tx.gas_limit.try_into().ok()?, to: to_primitive_transaction_kind(tx.kind), @@ -32,7 +32,7 @@ pub fn to_primitive_transaction( }), TypedTransactionRequest::EIP1559(tx) => Transaction::Eip1559(TxEip1559 { chain_id: tx.chain_id, - nonce: tx.nonce.to(), + nonce: tx.nonce, max_fee_per_gas: tx.max_fee_per_gas.to(), gas_limit: tx.gas_limit.try_into().ok()?, to: to_primitive_transaction_kind(tx.kind), @@ -43,7 +43,7 @@ pub fn to_primitive_transaction( }), TypedTransactionRequest::EIP4844(tx) => Transaction::Eip4844(TxEip4844 { chain_id: tx.chain_id, - nonce: tx.nonce.to(), + nonce: tx.nonce, gas_limit: tx.gas_limit.to(), max_fee_per_gas: tx.max_fee_per_gas.to(), max_priority_fee_per_gas: tx.max_priority_fee_per_gas.to(), diff --git a/crates/rpc/rpc-types/src/eth/transaction/typed.rs b/crates/rpc/rpc-types/src/eth/transaction/typed.rs index cf0abc9c1..bf995c353 100644 --- a/crates/rpc/rpc-types/src/eth/transaction/typed.rs +++ b/crates/rpc/rpc-types/src/eth/transaction/typed.rs @@ -2,7 +2,7 @@ //! transaction deserialized from the json input of an RPC call. Depending on what fields are set, //! it can be converted into the container type [`TypedTransactionRequest`]. -use alloy_primitives::{Address, Bytes, B256, U256, U64}; +use alloy_primitives::{Address, Bytes, B256, U256}; use alloy_rlp::{Buf, BufMut, Decodable, Encodable, Error as RlpError, EMPTY_STRING_CODE}; use alloy_rpc_types::{AccessList, BlobTransactionSidecar}; use serde::{Deserialize, Serialize}; @@ -30,7 +30,7 @@ pub enum TypedTransactionRequest { #[derive(Debug, Clone, PartialEq, Eq)] pub struct LegacyTransactionRequest { /// The nonce of the transaction - pub nonce: U64, + pub nonce: u64, /// The gas price for the transaction pub gas_price: U256, /// The gas limit for the transaction @@ -51,7 +51,7 @@ pub struct EIP2930TransactionRequest { /// The chain ID for the transaction pub chain_id: u64, /// The nonce of the transaction - pub nonce: U64, + pub nonce: u64, /// The gas price for the transaction pub gas_price: U256, /// The gas limit for the transaction @@ -72,7 +72,7 @@ pub struct EIP1559TransactionRequest { /// The chain ID for the transaction pub chain_id: u64, /// The nonce of the transaction - pub nonce: U64, + pub nonce: u64, /// The maximum priority fee per gas for the transaction pub max_priority_fee_per_gas: U256, /// The maximum fee per gas for the transaction @@ -95,7 +95,7 @@ pub struct EIP4844TransactionRequest { /// The chain ID for the transaction pub chain_id: u64, /// The nonce of the transaction - pub nonce: U64, + pub nonce: u64, /// The maximum priority fee per gas for the transaction pub max_priority_fee_per_gas: U256, /// The maximum fee per gas for the transaction diff --git a/crates/rpc/rpc/src/eth/api/call.rs b/crates/rpc/rpc/src/eth/api/call.rs index dee060942..b00f82a78 100644 --- a/crates/rpc/rpc/src/eth/api/call.rs +++ b/crates/rpc/rpc/src/eth/api/call.rs @@ -209,7 +209,7 @@ where // get the highest possible gas limit, either the request's set value or the currently // configured gas limit - let mut highest_gas_limit = request.gas.unwrap_or(block.gas_limit); + let mut highest_gas_limit = request.gas.map(U256::from).unwrap_or(block.gas_limit); // Configure the evm env let mut env = build_call_evm_env(cfg, block, request)?; diff --git a/crates/rpc/rpc/src/eth/api/fee_history.rs b/crates/rpc/rpc/src/eth/api/fee_history.rs index 69eaded6c..f0cb62dda 100644 --- a/crates/rpc/rpc/src/eth/api/fee_history.rs +++ b/crates/rpc/rpc/src/eth/api/fee_history.rs @@ -9,7 +9,7 @@ use metrics::atomics::AtomicU64; use reth_primitives::{ basefee::calculate_next_block_base_fee, eip4844::{calc_blob_gasprice, calculate_excess_blob_gas}, - ChainSpec, Receipt, SealedBlock, TransactionSigned, B256, U256, + ChainSpec, Receipt, SealedBlock, TransactionSigned, B256, }; use reth_provider::{BlockReaderIdExt, CanonStateNotification, ChainSpecProvider}; use reth_rpc_types::TxGasAndReward; @@ -267,7 +267,7 @@ pub(crate) fn calculate_reward_percentiles_for_block( base_fee_per_gas: u64, transactions: &[TransactionSigned], receipts: &[Receipt], -) -> Result, EthApiError> { +) -> Result, EthApiError> { let mut transactions = transactions .iter() .zip(receipts) @@ -301,7 +301,7 @@ pub(crate) fn calculate_reward_percentiles_for_block( for percentile in percentiles { // Empty blocks should return in a zero row if transactions.is_empty() { - rewards_in_block.push(U256::ZERO); + rewards_in_block.push(0); continue } @@ -310,7 +310,7 @@ pub(crate) fn calculate_reward_percentiles_for_block( tx_index += 1; cumulative_gas_used += transactions[tx_index].gas_used; } - rewards_in_block.push(U256::from(transactions[tx_index].reward)); + rewards_in_block.push(transactions[tx_index].reward); } Ok(rewards_in_block) @@ -343,7 +343,7 @@ pub struct FeeHistoryEntry { /// Hash of the block. pub header_hash: B256, /// Approximated rewards for the configured percentiles. - pub rewards: Vec, + pub rewards: Vec, /// The timestamp of the block. pub timestamp: u64, } diff --git a/crates/rpc/rpc/src/eth/api/fees.rs b/crates/rpc/rpc/src/eth/api/fees.rs index 83e8ddcb7..9b7445a2f 100644 --- a/crates/rpc/rpc/src/eth/api/fees.rs +++ b/crates/rpc/rpc/src/eth/api/fees.rs @@ -107,13 +107,13 @@ where let start_block = end_block_plus - block_count; // Collect base fees, gas usage ratios and (optionally) reward percentile data - let mut base_fee_per_gas: Vec = Vec::new(); + let mut base_fee_per_gas: Vec = Vec::new(); let mut gas_used_ratio: Vec = Vec::new(); - let mut base_fee_per_blob_gas: Vec = Vec::new(); + let mut base_fee_per_blob_gas: Vec = Vec::new(); let mut blob_gas_used_ratio: Vec = Vec::new(); - let mut rewards: Vec> = Vec::new(); + let mut rewards: Vec> = Vec::new(); // Check if the requested range is within the cache bounds let fee_entries = self.fee_history_cache().get_history(start_block, end_block).await; @@ -124,10 +124,9 @@ where } for entry in &fee_entries { - base_fee_per_gas.push(U256::from(entry.base_fee_per_gas)); + base_fee_per_gas.push(entry.base_fee_per_gas as u128); gas_used_ratio.push(entry.gas_used_ratio); - base_fee_per_blob_gas - .push(U256::from(entry.base_fee_per_blob_gas.unwrap_or_default())); + base_fee_per_blob_gas.push(entry.base_fee_per_blob_gas.unwrap_or_default()); blob_gas_used_ratio.push(entry.blob_gas_used_ratio); if let Some(percentiles) = &reward_percentiles { @@ -143,10 +142,9 @@ where // Also need to include the `base_fee_per_gas` and `base_fee_per_blob_gas` for the next // block base_fee_per_gas - .push(U256::from(last_entry.next_block_base_fee(&self.provider().chain_spec()))); + .push(last_entry.next_block_base_fee(&self.provider().chain_spec()) as u128); - base_fee_per_blob_gas - .push(U256::from(last_entry.next_block_blob_fee().unwrap_or_default())); + base_fee_per_blob_gas.push(last_entry.next_block_blob_fee().unwrap_or_default()); } else { // read the requested header range let headers = self.provider().sealed_headers_range(start_block..=end_block)?; @@ -155,9 +153,9 @@ where } for header in &headers { - base_fee_per_gas.push(U256::from(header.base_fee_per_gas.unwrap_or_default())); + base_fee_per_gas.push(header.base_fee_per_gas.unwrap_or_default() as u128); gas_used_ratio.push(header.gas_used as f64 / header.gas_limit as f64); - base_fee_per_blob_gas.push(U256::from(header.blob_fee().unwrap_or_default())); + base_fee_per_blob_gas.push(header.blob_fee().unwrap_or_default()); blob_gas_used_ratio.push( header.blob_gas_used.unwrap_or_default() as f64 / reth_primitives::constants::eip4844::MAX_DATA_GAS_PER_BLOCK as f64, @@ -189,17 +187,17 @@ where // // The unwrap is safe since we checked earlier that we got at least 1 header. let last_header = headers.last().expect("is present"); - base_fee_per_gas.push(U256::from(calculate_next_block_base_fee( + base_fee_per_gas.push(calculate_next_block_base_fee( last_header.gas_used, last_header.gas_limit, last_header.base_fee_per_gas.unwrap_or_default(), self.provider().chain_spec().base_fee_params(last_header.timestamp), - ))); + ) as u128); // Same goes for the `base_fee_per_blob_gas`: // > "[..] includes the next block after the newest of the returned range, because this value can be derived from the newest block. base_fee_per_blob_gas - .push(U256::from(last_header.next_block_blob_fee().unwrap_or_default())); + .push(last_header.next_block_blob_fee().unwrap_or_default()); }; Ok(FeeHistory { @@ -207,14 +205,14 @@ where gas_used_ratio, base_fee_per_blob_gas, blob_gas_used_ratio, - oldest_block: U256::from(start_block), + oldest_block: start_block, reward: reward_percentiles.map(|_| rewards), }) } /// Approximates reward at a given percentile for a specific block /// Based on the configured resolution - fn approximate_percentile(&self, entry: &FeeHistoryEntry, requested_percentile: f64) -> U256 { + fn approximate_percentile(&self, entry: &FeeHistoryEntry, requested_percentile: f64) -> u128 { let resolution = self.fee_history_cache().resolution(); let rounded_percentile = (requested_percentile * resolution as f64).round() / resolution as f64; @@ -223,6 +221,6 @@ where // Calculate the index in the precomputed rewards array let index = (clamped_percentile / (1.0 / resolution as f64)).round() as usize; // Fetch the reward from the FeeHistoryEntry - entry.rewards.get(index).cloned().unwrap_or(U256::ZERO) + entry.rewards.get(index).cloned().unwrap_or_default() } } diff --git a/crates/rpc/rpc/src/eth/api/server.rs b/crates/rpc/rpc/src/eth/api/server.rs index 0d31339f5..19d8e587c 100644 --- a/crates/rpc/rpc/src/eth/api/server.rs +++ b/crates/rpc/rpc/src/eth/api/server.rs @@ -448,7 +448,7 @@ mod tests { use reth_network_api::noop::NoopNetwork; use reth_primitives::{ basefee::calculate_next_block_base_fee, constants::ETHEREUM_BLOCK_GAS_LIMIT, BaseFeeParams, - Block, BlockNumberOrTag, Header, TransactionSigned, B256, U256, + Block, BlockNumberOrTag, Header, TransactionSigned, B256, }; use reth_provider::{ test_utils::{MockEthProvider, NoopProvider}, @@ -495,7 +495,7 @@ mod tests { mut oldest_block: Option, block_count: u64, mock_provider: MockEthProvider, - ) -> (EthApi, Vec, Vec) { + ) -> (EthApi, Vec, Vec) { let mut rng = generators::rng(); // Build mock data @@ -559,18 +559,17 @@ mod tests { oldest_block.get_or_insert(hash); gas_used_ratios.push(gas_used as f64 / gas_limit as f64); - base_fees_per_gas - .push(base_fee_per_gas.map(|fee| U256::try_from(fee).unwrap()).unwrap_or_default()); + base_fees_per_gas.push(base_fee_per_gas.map(|fee| fee as u128).unwrap_or_default()); } // Add final base fee (for the next block outside of the request) let last_header = last_header.unwrap(); - base_fees_per_gas.push(U256::from(calculate_next_block_base_fee( + base_fees_per_gas.push(calculate_next_block_base_fee( last_header.gas_used, last_header.gas_limit, last_header.base_fee_per_gas.unwrap_or_default(), BaseFeeParams::ethereum(), - ))); + ) as u128); let eth_api = build_test_eth_api(mock_provider); @@ -675,7 +674,7 @@ mod tests { let fee_history = eth_api.fee_history(1, newest_block.into(), None).await.unwrap(); assert_eq!( - &fee_history.base_fee_per_gas, + fee_history.base_fee_per_gas, &base_fees_per_gas[base_fees_per_gas.len() - 2..], "one: base fee per gas is incorrect" ); @@ -689,19 +688,15 @@ mod tests { &gas_used_ratios[gas_used_ratios.len() - 1..], "one: gas used ratio is incorrect" ); - assert_eq!( - fee_history.oldest_block, - U256::from(newest_block), - "one: oldest block is incorrect" - ); + assert_eq!(fee_history.oldest_block, newest_block, "one: oldest block is incorrect"); assert!( fee_history.reward.is_none(), "one: no percentiles were requested, so there should be no rewards result" ); } - #[tokio::test] /// Requesting all blocks should be ok + #[tokio::test] async fn test_fee_history_all_blocks() { let block_count = 10; let newest_block = 1337; @@ -728,7 +723,7 @@ mod tests { ); assert_eq!( fee_history.oldest_block, - U256::from(newest_block - block_count + 1), + newest_block - block_count + 1, "all: oldest block is incorrect" ); assert!( diff --git a/crates/rpc/rpc/src/eth/api/transactions.rs b/crates/rpc/rpc/src/eth/api/transactions.rs index 82b9dca19..917ebc1f1 100644 --- a/crates/rpc/rpc/src/eth/api/transactions.rs +++ b/crates/rpc/rpc/src/eth/api/transactions.rs @@ -17,7 +17,7 @@ use reth_primitives::{ Address, BlockId, BlockNumberOrTag, Bytes, FromRecoveredPooledTransaction, Header, IntoRecoveredTransaction, Receipt, SealedBlock, SealedBlockWithSenders, TransactionKind::{Call, Create}, - TransactionMeta, TransactionSigned, TransactionSignedEcRecovered, B256, U256, U64, + TransactionMeta, TransactionSigned, TransactionSignedEcRecovered, B256, U256, }; use reth_provider::{ BlockReaderIdExt, ChainSpecProvider, EvmEnvProvider, StateProviderBox, StateProviderFactory, @@ -910,9 +910,9 @@ where // gas price required (Some(_), None, None, None, None, None) => { Some(TypedTransactionRequest::Legacy(LegacyTransactionRequest { - nonce: U64::from(nonce.unwrap_or_default()), - gas_price: gas_price.unwrap_or_default(), - gas_limit: gas.unwrap_or_default(), + nonce: nonce.unwrap_or_default(), + gas_price: U256::from(gas_price.unwrap_or_default()), + gas_limit: U256::from(gas.unwrap_or_default()), value: value.unwrap_or_default(), input: data.into_input().unwrap_or_default(), kind: match to { @@ -926,9 +926,9 @@ where // if only accesslist is set, and no eip1599 fees (_, None, Some(access_list), None, None, None) => { Some(TypedTransactionRequest::EIP2930(EIP2930TransactionRequest { - nonce: U64::from(nonce.unwrap_or_default()), - gas_price: gas_price.unwrap_or_default(), - gas_limit: gas.unwrap_or_default(), + nonce: nonce.unwrap_or_default(), + gas_price: U256::from(gas_price.unwrap_or_default()), + gas_limit: U256::from(gas.unwrap_or_default()), value: value.unwrap_or_default(), input: data.into_input().unwrap_or_default(), kind: match to { @@ -946,10 +946,12 @@ where (None, _, _, None, None, None) => { // Empty fields fall back to the canonical transaction schema. Some(TypedTransactionRequest::EIP1559(EIP1559TransactionRequest { - nonce: U64::from(nonce.unwrap_or_default()), - max_fee_per_gas: max_fee_per_gas.unwrap_or_default(), - max_priority_fee_per_gas: max_priority_fee_per_gas.unwrap_or_default(), - gas_limit: gas.unwrap_or_default(), + nonce: nonce.unwrap_or_default(), + max_fee_per_gas: U256::from(max_fee_per_gas.unwrap_or_default()), + max_priority_fee_per_gas: U256::from( + max_priority_fee_per_gas.unwrap_or_default(), + ), + gas_limit: U256::from(gas.unwrap_or_default()), value: value.unwrap_or_default(), input: data.into_input().unwrap_or_default(), kind: match to { @@ -973,10 +975,12 @@ where // As per the EIP, we follow the same semantics as EIP-1559. Some(TypedTransactionRequest::EIP4844(EIP4844TransactionRequest { chain_id: 0, - nonce: U64::from(nonce.unwrap_or_default()), - max_priority_fee_per_gas: max_priority_fee_per_gas.unwrap_or_default(), - max_fee_per_gas: max_fee_per_gas.unwrap_or_default(), - gas_limit: gas.unwrap_or_default(), + nonce: nonce.unwrap_or_default(), + max_priority_fee_per_gas: U256::from( + max_priority_fee_per_gas.unwrap_or_default(), + ), + max_fee_per_gas: U256::from(max_fee_per_gas.unwrap_or_default()), + gas_limit: U256::from(gas.unwrap_or_default()), value: value.unwrap_or_default(), input: data.into_input().unwrap_or_default(), kind: match to { @@ -986,7 +990,7 @@ where access_list: access_list.unwrap_or_default(), // eip-4844 specific. - max_fee_per_blob_gas, + max_fee_per_blob_gas: U256::from(max_fee_per_blob_gas), blob_versioned_hashes, sidecar, })) @@ -998,36 +1002,45 @@ where let transaction = match transaction { Some(TypedTransactionRequest::Legacy(mut req)) => { req.chain_id = Some(chain_id.to()); - req.gas_limit = gas_limit; - req.gas_price = self.legacy_gas_price(gas_price).await?; + req.gas_limit = gas_limit.saturating_to(); + req.gas_price = self.legacy_gas_price(gas_price.map(U256::from)).await?; TypedTransactionRequest::Legacy(req) } Some(TypedTransactionRequest::EIP2930(mut req)) => { req.chain_id = chain_id.to(); - req.gas_limit = gas_limit; - req.gas_price = self.legacy_gas_price(gas_price).await?; + req.gas_limit = gas_limit.saturating_to(); + req.gas_price = self.legacy_gas_price(gas_price.map(U256::from)).await?; TypedTransactionRequest::EIP2930(req) } Some(TypedTransactionRequest::EIP1559(mut req)) => { - let (max_fee_per_gas, max_priority_fee_per_gas) = - self.eip1559_fees(max_fee_per_gas, max_priority_fee_per_gas).await?; + let (max_fee_per_gas, max_priority_fee_per_gas) = self + .eip1559_fees( + max_fee_per_gas.map(U256::from), + max_priority_fee_per_gas.map(U256::from), + ) + .await?; req.chain_id = chain_id.to(); - req.gas_limit = gas_limit; - req.max_fee_per_gas = max_fee_per_gas; - req.max_priority_fee_per_gas = max_priority_fee_per_gas; + req.gas_limit = gas_limit.saturating_to(); + req.max_fee_per_gas = max_fee_per_gas.saturating_to(); + req.max_priority_fee_per_gas = max_priority_fee_per_gas.saturating_to(); TypedTransactionRequest::EIP1559(req) } Some(TypedTransactionRequest::EIP4844(mut req)) => { - let (max_fee_per_gas, max_priority_fee_per_gas) = - self.eip1559_fees(max_fee_per_gas, max_priority_fee_per_gas).await?; + let (max_fee_per_gas, max_priority_fee_per_gas) = self + .eip1559_fees( + max_fee_per_gas.map(U256::from), + max_priority_fee_per_gas.map(U256::from), + ) + .await?; req.max_fee_per_gas = max_fee_per_gas; req.max_priority_fee_per_gas = max_priority_fee_per_gas; - req.max_fee_per_blob_gas = self.eip4844_blob_fee(max_fee_per_blob_gas).await?; + req.max_fee_per_blob_gas = + self.eip4844_blob_fee(max_fee_per_blob_gas.map(U256::from)).await?; req.chain_id = chain_id.to(); req.gas_limit = gas_limit; @@ -1674,7 +1687,7 @@ impl From for Transaction { block_hash, block_number, base_fee, - U256::from(index), + index as usize, ) } } @@ -1768,8 +1781,9 @@ pub(crate) fn build_transaction_receipt_with_block_receipts( let mut op_fields = OptimismTransactionReceiptFields::default(); if transaction.is_deposit() { - op_fields.deposit_nonce = receipt.deposit_nonce.map(U64::from); - op_fields.deposit_receipt_version = receipt.deposit_receipt_version.map(U64::from); + op_fields.deposit_nonce = receipt.deposit_nonce.map(reth_primitives::U64::from); + op_fields.deposit_receipt_version = + receipt.deposit_receipt_version.map(reth_primitives::U64::from); } else if let Some(l1_block_info) = optimism_tx_meta.l1_block_info { op_fields.l1_fee = optimism_tx_meta.l1_fee; op_fields.l1_gas_used = optimism_tx_meta diff --git a/crates/rpc/rpc/src/eth/error.rs b/crates/rpc/rpc/src/eth/error.rs index 3e54c2093..d8add6397 100644 --- a/crates/rpc/rpc/src/eth/error.rs +++ b/crates/rpc/rpc/src/eth/error.rs @@ -2,10 +2,7 @@ use crate::result::{internal_rpc_err, invalid_params_rpc_err, rpc_err, rpc_error_with_code}; use alloy_sol_types::decode_revert_reason; -use jsonrpsee::{ - core::Error as RpcError, - types::{error::CALL_EXECUTION_FAILED_CODE, ErrorObject}, -}; +use jsonrpsee::types::{error::CALL_EXECUTION_FAILED_CODE, ErrorObject}; use reth_interfaces::RethError; use reth_primitives::{revm_primitives::InvalidHeader, Address, Bytes, U256}; use reth_revm::tracing::{js::JsInspectorError, MuxError}; @@ -175,11 +172,6 @@ impl From for ErrorObject<'static> { } } -impl From for RpcError { - fn from(error: EthApiError) -> Self { - RpcError::Call(error.into()) - } -} impl From for EthApiError { fn from(error: JsInspectorError) -> Self { match error { diff --git a/crates/rpc/rpc/src/eth/pubsub.rs b/crates/rpc/rpc/src/eth/pubsub.rs index b237a3256..ad7486867 100644 --- a/crates/rpc/rpc/src/eth/pubsub.rs +++ b/crates/rpc/rpc/src/eth/pubsub.rs @@ -1,8 +1,13 @@ //! `eth_` PubSub RPC handler implementation -use crate::{eth::logs_utils, result::invalid_params_rpc_err}; +use crate::{ + eth::logs_utils, + result::{internal_rpc_err, invalid_params_rpc_err}, +}; use futures::StreamExt; -use jsonrpsee::{server::SubscriptionMessage, PendingSubscriptionSink, SubscriptionSink}; +use jsonrpsee::{ + server::SubscriptionMessage, types::ErrorObject, PendingSubscriptionSink, SubscriptionSink, +}; use reth_network_api::NetworkInfo; use reth_primitives::{IntoRecoveredTransaction, TxHash}; use reth_provider::{BlockReader, CanonStateSubscriptions, EvmEnvProvider}; @@ -95,7 +100,7 @@ async fn handle_accepted( accepted_sink: SubscriptionSink, kind: SubscriptionKind, params: Option, -) -> Result<(), jsonrpsee::core::Error> +) -> Result<(), ErrorObject<'static>> where Provider: BlockReader + EvmEnvProvider + Clone + 'static, Pool: TransactionPool + 'static, @@ -114,7 +119,7 @@ where let filter = match params { Some(Params::Logs(filter)) => FilteredParams::new(Some(*filter)), Some(Params::Bool(_)) => { - return Err(invalid_params_rpc_err("Invalid params for logs").into()) + return Err(invalid_params_rpc_err("Invalid params for logs")) } _ => FilteredParams::default(), }; @@ -142,8 +147,7 @@ where Params::Logs(_) => { return Err(invalid_params_rpc_err( "Invalid params for newPendingTransactions", - ) - .into()) + )) } } } @@ -162,7 +166,8 @@ where let current_sub_res = pubsub.sync_status(initial_sync_status).await; // send the current status immediately - let msg = SubscriptionMessage::from_json(¤t_sub_res)?; + let msg = SubscriptionMessage::from_json(¤t_sub_res) + .map_err(SubscriptionSerializeError::new)?; if accepted_sink.send(msg).await.is_err() { return Ok(()) } @@ -176,7 +181,8 @@ where // send a new message now that the status changed let sync_status = pubsub.sync_status(current_syncing).await; - let msg = SubscriptionMessage::from_json(&sync_status)?; + let msg = SubscriptionMessage::from_json(&sync_status) + .map_err(SubscriptionSerializeError::new)?; if accepted_sink.send(msg).await.is_err() { break } @@ -188,11 +194,28 @@ where } } +/// Helper to convert a serde error into an [`ErrorObject`] +#[derive(Debug, thiserror::Error)] +#[error("Failed to serialize subscription item: {0}")] +pub(crate) struct SubscriptionSerializeError(#[from] serde_json::Error); + +impl SubscriptionSerializeError { + pub(crate) const fn new(err: serde_json::Error) -> Self { + Self(err) + } +} + +impl From for ErrorObject<'static> { + fn from(value: SubscriptionSerializeError) -> Self { + internal_rpc_err(value.to_string()) + } +} + /// Pipes all stream items to the subscription sink. async fn pipe_from_stream( sink: SubscriptionSink, mut stream: St, -) -> Result<(), jsonrpsee::core::Error> +) -> Result<(), ErrorObject<'static>> where St: Stream + Unpin, T: Serialize, @@ -211,7 +234,7 @@ where break Ok(()) }, }; - let msg = SubscriptionMessage::from_json(&item)?; + let msg = SubscriptionMessage::from_json(&item).map_err(SubscriptionSerializeError::new)?; if sink.send(msg).await.is_err() { break Ok(()); } diff --git a/crates/rpc/rpc/src/eth/revm_utils.rs b/crates/rpc/rpc/src/eth/revm_utils.rs index 5d14756b2..374918d39 100644 --- a/crates/rpc/rpc/src/eth/revm_utils.rs +++ b/crates/rpc/rpc/src/eth/revm_utils.rs @@ -240,16 +240,16 @@ pub(crate) fn create_txn_env( let CallFees { max_priority_fee_per_gas, gas_price, max_fee_per_blob_gas } = CallFees::ensure_fees( - gas_price, - max_fee_per_gas, - max_priority_fee_per_gas, + gas_price.map(U256::from), + max_fee_per_gas.map(U256::from), + max_priority_fee_per_gas.map(U256::from), block_env.basefee, blob_versioned_hashes.as_deref(), - max_fee_per_blob_gas, + max_fee_per_blob_gas.map(U256::from), block_env.get_blob_gasprice().map(U256::from), )?; - let gas_limit = gas.unwrap_or_else(|| block_env.gas_limit.min(U256::from(u64::MAX))); + let gas_limit = gas.unwrap_or_else(|| block_env.gas_limit.min(U256::from(u64::MAX)).to()); let env = TxEnv { gas_limit: gas_limit.try_into().map_err(|_| RpcInvalidTransactionError::GasUintOverflow)?, nonce, diff --git a/crates/rpc/rpc/src/layers/auth_layer.rs b/crates/rpc/rpc/src/layers/auth_layer.rs index e7c56789d..19514ea37 100644 --- a/crates/rpc/rpc/src/layers/auth_layer.rs +++ b/crates/rpc/rpc/src/layers/auth_layer.rs @@ -33,7 +33,7 @@ use tower::{Layer, Service}; /// let middleware = tower::ServiceBuilder::default().layer(layer); /// /// let _server = ServerBuilder::default() -/// .set_middleware(middleware) +/// .set_http_middleware(middleware) /// .build(addr.parse::().unwrap()) /// .await /// .unwrap(); @@ -276,7 +276,7 @@ mod tests { // Create a layered server let server = ServerBuilder::default() .set_id_provider(RandomStringIdProvider::new(16)) - .set_middleware(middleware) + .set_http_middleware(middleware) .build(addr.parse::().unwrap()) .await .unwrap();