diff --git a/Cargo.lock b/Cargo.lock index d8642eb7f..91ba40344 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -653,7 +653,7 @@ dependencies = [ "alloy-transport", "bytes", "futures", - "interprocess 2.2.0", + "interprocess", "pin-project", "serde_json", "tokio", @@ -953,22 +953,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" dependencies = [ "concurrent-queue", - "event-listener 2.5.3", + "event-listener", "futures-core", ] -[[package]] -name = "async-channel" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - [[package]] name = "async-compression" version = "0.4.11" @@ -991,8 +979,8 @@ version = "5.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e6fa871e4334a622afd6bb2f611635e8083a6f5e2936c0f90f37c7ef9856298" dependencies = [ - "async-channel 1.9.0", - "futures-lite 1.13.0", + "async-channel", + "futures-lite", "http-types", "log", "memchr", @@ -1021,12 +1009,6 @@ dependencies = [ "syn 2.0.66", ] -[[package]] -name = "async-task" -version = "4.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" - [[package]] name = "async-trait" version = "0.1.80" @@ -1192,7 +1174,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 1.1.0", "shlex", "syn 2.0.66", "which", @@ -1284,19 +1266,6 @@ dependencies = [ "generic-array", ] -[[package]] -name = "blocking" -version = "1.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" -dependencies = [ - "async-channel 2.3.1", - "async-task", - "futures-io", - "futures-lite 2.3.0", - "piper", -] - [[package]] name = "blst" version = "0.3.12" @@ -1320,7 +1289,7 @@ dependencies = [ "boa_macros", "indexmap 2.2.6", "num-bigint", - "rustc-hash", + "rustc-hash 1.1.0", ] [[package]] @@ -1356,7 +1325,7 @@ dependencies = [ "portable-atomic", "rand 0.8.5", "regress", - "rustc-hash", + "rustc-hash 1.1.0", "ryu-js", "serde", "serde_json", @@ -1392,7 +1361,7 @@ dependencies = [ "indexmap 2.2.6", "once_cell", "phf", - "rustc-hash", + "rustc-hash 1.1.0", "static_assertions", ] @@ -1424,7 +1393,7 @@ dependencies = [ "num-bigint", "num-traits", "regress", - "rustc-hash", + "rustc-hash 1.1.0", ] [[package]] @@ -2703,27 +2672,6 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" -[[package]] -name = "event-listener" -version = "5.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f214dc438f977e6d4e3500aaa277f5ad94ca83fbbd9b1a15713ce2344ccc5a1" -dependencies = [ - "event-listener 5.3.1", - "pin-project-lite", -] - [[package]] name = "example-beacon-api-sidecar-fetcher" version = "0.1.0" @@ -3298,16 +3246,6 @@ dependencies = [ "waker-fn", ] -[[package]] -name = "futures-lite" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" -dependencies = [ - "futures-core", - "pin-project-lite", -] - [[package]] name = "futures-macro" version = "0.3.30" @@ -3715,9 +3653,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" dependencies = [ "anyhow", - "async-channel 1.9.0", + "async-channel", "base64 0.13.1", - "futures-lite 1.13.0", + "futures-lite", "infer", "pin-project-lite", "rand 0.7.3", @@ -3825,9 +3763,9 @@ dependencies = [ [[package]] name = "iai-callgrind" -version = "0.10.2" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e99bf26f496b13ac6273014f40afda46a233fbfb0289ce50fb4daaad2f2ffc80" +checksum = "7b780c98c212412a6d54b5d3d7cf62fb20d88cd32c0653d6df2a03d63e52a903" dependencies = [ "bincode", "bindgen", @@ -3851,9 +3789,9 @@ dependencies = [ [[package]] name = "iai-callgrind-runner" -version = "0.10.2" +version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c23a951b9eccaa1e38556d27473d1462a9c247a27961812edcaac156af861282" +checksum = "fa8d015de54e6431004efede625ee79e3b4105dcb2100cd574de914e06fd4f7c" dependencies = [ "serde", ] @@ -4155,27 +4093,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "interprocess" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81f2533f3be42fffe3b5e63b71aeca416c1c3bc33e4e27be018521e76b1f38fb" -dependencies = [ - "blocking", - "cfg-if", - "futures-core", - "futures-io", - "intmap", - "libc", - "once_cell", - "rustc_version 0.4.0", - "spinning", - "thiserror", - "to_method", - "tokio", - "winapi", -] - [[package]] name = "interprocess" version = "2.2.0" @@ -4191,12 +4108,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "intmap" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae52f28f45ac2bc96edb7714de995cffc174a395fb0abf5bff453587c980d7b9" - [[package]] name = "intrusive-collections" version = "0.9.6" @@ -4384,7 +4295,7 @@ dependencies = [ "parking_lot 0.12.3", "pin-project", "rand 0.8.5", - "rustc-hash", + "rustc-hash 1.1.0", "serde", "serde_json", "thiserror", @@ -4876,15 +4787,6 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" -[[package]] -name = "memmap2" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f49388d20533534cd19360ad3d6a7dadc885944aa802ba3995040c5ec11288c6" -dependencies = [ - "libc", -] - [[package]] name = "memmap2" version = "0.9.4" @@ -5623,17 +5525,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" -[[package]] -name = "piper" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1d5c74c9876f070d3e8fd503d748c7d974c3e48da8f41350fa5222ef9b4391" -dependencies = [ - "atomic-waker", - "fastrand 2.1.0", - "futures-io", -] - [[package]] name = "pkcs8" version = "0.10.2" @@ -5951,7 +5842,7 @@ dependencies = [ "pin-project-lite", "quinn-proto", "quinn-udp", - "rustc-hash", + "rustc-hash 1.1.0", "rustls", "thiserror", "tokio", @@ -5967,7 +5858,7 @@ dependencies = [ "bytes", "rand 0.8.5", "ring", - "rustc-hash", + "rustc-hash 1.1.0", "rustls", "slab", "thiserror", @@ -6305,7 +6196,7 @@ dependencies = [ "fdlimit", "futures", "human_bytes", - "itertools 0.12.1", + "itertools 0.13.0", "jsonrpsee", "libc", "metrics-process", @@ -6429,7 +6320,7 @@ dependencies = [ "alloy-genesis", "assert_matches", "futures", - "itertools 0.12.1", + "itertools 0.13.0", "metrics", "reth-blockchain-tree", "reth-blockchain-tree-api", @@ -6706,7 +6597,7 @@ dependencies = [ "reth-storage-errors", "reth-tracing", "reth-trie-common", - "rustc-hash", + "rustc-hash 2.0.0", "serde", "serde_json", "strum", @@ -6806,7 +6697,7 @@ dependencies = [ "discv5", "enr", "futures", - "itertools 0.12.1", + "itertools 0.13.0", "libp2p-identity", "metrics", "multiaddr", @@ -6858,7 +6749,7 @@ dependencies = [ "assert_matches", "futures", "futures-util", - "itertools 0.12.1", + "itertools 0.13.0", "metrics", "pin-project", "rand 0.8.5", @@ -7252,7 +7143,7 @@ dependencies = [ "bytes", "futures", "futures-util", - "interprocess 1.2.1", + "interprocess", "jsonrpsee", "pin-project", "rand 0.8.5", @@ -7355,7 +7246,7 @@ dependencies = [ "fnv", "futures", "humantime-serde", - "itertools 0.12.1", + "itertools 0.13.0", "metrics", "parking_lot 0.12.3", "pin-project", @@ -7452,7 +7343,7 @@ dependencies = [ "cuckoofilter", "derive_more", "lz4_flex", - "memmap2 0.7.1", + "memmap2", "ph", "rand 0.8.5", "reth-fs-util", @@ -7857,7 +7748,7 @@ dependencies = [ "assert_matches", "auto_impl", "dashmap", - "itertools 0.12.1", + "itertools 0.13.0", "metrics", "parking_lot 0.12.3", "pin-project", @@ -7896,7 +7787,7 @@ version = "1.0.0-rc.2" dependencies = [ "alloy-primitives", "assert_matches", - "itertools 0.12.1", + "itertools 0.13.0", "metrics", "rayon", "reth-chainspec", @@ -8189,7 +8080,7 @@ dependencies = [ "assert_matches", "criterion", "futures-util", - "itertools 0.12.1", + "itertools 0.13.0", "num-traits", "paste", "pprof", @@ -8390,7 +8281,7 @@ dependencies = [ "bitflags 2.5.0", "criterion", "futures-util", - "itertools 0.12.1", + "itertools 0.13.0", "metrics", "parking_lot 0.12.3", "paste", @@ -8407,7 +8298,7 @@ dependencies = [ "reth-tasks", "reth-tracing", "revm", - "rustc-hash", + "rustc-hash 2.0.0", "schnellru", "serde", "serde_json", @@ -8465,7 +8356,7 @@ dependencies = [ "bytes", "derive_more", "hash-db", - "itertools 0.12.1", + "itertools 0.13.0", "nybbles", "plain_hasher", "proptest", @@ -8486,7 +8377,7 @@ dependencies = [ "alloy-rlp", "criterion", "derive_more", - "itertools 0.12.1", + "itertools 0.13.0", "metrics", "proptest", "rand 0.8.5", @@ -8732,6 +8623,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" + [[package]] name = "rustc-hex" version = "2.1.0" @@ -9403,15 +9300,6 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -[[package]] -name = "spinning" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d4f0e86297cad2658d92a707320d87bf4e6ae1050287f51d19b67ef3f153a7b" -dependencies = [ - "lock_api", -] - [[package]] name = "spki" version = "0.7.3" @@ -9520,7 +9408,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71297dc3e250f7dbdf8adb99e235da783d690f5819fdeb4cce39d9cfb0aca9f1" dependencies = [ "debugid", - "memmap2 0.9.4", + "memmap2", "stable_deref_trait", "uuid", ] @@ -9859,12 +9747,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" -[[package]] -name = "to_method" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7c4ceeeca15c8384bbc3e011dbd8fccb7f068a440b752b7d9b32ceb0ca0e2e8" - [[package]] name = "tokio" version = "1.38.0" diff --git a/Cargo.toml b/Cargo.toml index 4cf177878..69a06061c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -409,11 +409,11 @@ serde_with = "3.3.0" humantime = "2.1" humantime-serde = "1.1" rand = "0.8.5" -rustc-hash = "1.1.0" +rustc-hash = "2.0" schnellru = "0.2" strum = "0.26" rayon = "1.7" -itertools = "0.12" +itertools = "0.13" parking_lot = "0.12" modular-bitfield = "0.11.2" once_cell = "1.17" @@ -494,6 +494,7 @@ proptest-derive = "0.4" serial_test = "3" similar-asserts = "1.5.0" test-fuzz = "5" +iai-callgrind = "0.11" [patch.crates-io] revm = { git = "https://github.com/bluealloy/revm.git", rev = "41e2f7f" } diff --git a/crates/rpc/ipc/Cargo.toml b/crates/rpc/ipc/Cargo.toml index af6e64db1..42bb64d53 100644 --- a/crates/rpc/ipc/Cargo.toml +++ b/crates/rpc/ipc/Cargo.toml @@ -29,7 +29,7 @@ tracing.workspace = true bytes.workspace = true thiserror.workspace = true futures-util = "0.3.30" -interprocess = { version = "1.2.1", features = ["tokio_support"] } +interprocess = { version = "2.2.0", features = ["tokio"] } [dev-dependencies] tokio-stream = { workspace = true, features = ["sync"] } diff --git a/crates/rpc/ipc/src/client/mod.rs b/crates/rpc/ipc/src/client/mod.rs index 05ea7ed58..e8eff9c8f 100644 --- a/crates/rpc/ipc/src/client/mod.rs +++ b/crates/rpc/ipc/src/client/mod.rs @@ -1,23 +1,23 @@ //! [`jsonrpsee`] transport adapter implementation for IPC. use crate::stream_codec::StreamCodec; -use futures::StreamExt; -use interprocess::local_socket::tokio::{LocalSocketStream, OwnedReadHalf, OwnedWriteHalf}; +use futures::{StreamExt, TryFutureExt}; +use interprocess::local_socket::{ + tokio::{prelude::*, RecvHalf, SendHalf}, + GenericFilePath, +}; use jsonrpsee::{ async_client::{Client, ClientBuilder}, core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT}, }; use std::io; use tokio::io::AsyncWriteExt; -use tokio_util::{ - codec::FramedRead, - compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}, -}; +use tokio_util::codec::FramedRead; /// Sending end of IPC transport. #[derive(Debug)] pub(crate) struct Sender { - inner: Compat, + inner: SendHalf, } #[async_trait::async_trait] @@ -44,7 +44,7 @@ impl TransportSenderT for Sender { /// Receiving end of IPC transport. #[derive(Debug)] pub(crate) struct Receiver { - pub(crate) inner: FramedRead, StreamCodec>, + pub(crate) inner: FramedRead, } #[async_trait::async_trait] @@ -63,20 +63,17 @@ impl TransportReceiverT for Receiver { pub(crate) struct IpcTransportClientBuilder; impl IpcTransportClientBuilder { - pub(crate) async fn build( - self, - endpoint: impl AsRef, - ) -> Result<(Sender, Receiver), IpcError> { - let endpoint = endpoint.as_ref().to_string(); - let conn = LocalSocketStream::connect(endpoint.clone()) + pub(crate) async fn build(self, path: &str) -> Result<(Sender, Receiver), IpcError> { + let conn = async { path.to_fs_name::() } + .and_then(LocalSocketStream::connect) .await - .map_err(|err| IpcError::FailedToConnect { path: endpoint, err })?; + .map_err(|err| IpcError::FailedToConnect { path: path.to_string(), err })?; - let (rhlf, whlf) = conn.into_split(); + let (recv, send) = conn.split(); Ok(( - Sender { inner: whlf.compat_write() }, - Receiver { inner: FramedRead::new(rhlf.compat(), StreamCodec::stream_incoming()) }, + Sender { inner: send }, + Receiver { inner: FramedRead::new(recv, StreamCodec::stream_incoming()) }, )) } } @@ -92,14 +89,14 @@ impl IpcClientBuilder { /// ``` /// use jsonrpsee::{core::client::ClientT, rpc_params}; /// use reth_ipc::client::IpcClientBuilder; + /// /// # async fn run_client() -> Result<(), Box> { /// let client = IpcClientBuilder::default().build("/tmp/my-uds").await?; /// let response: String = client.request("say_hello", rpc_params![]).await?; - /// # Ok(()) - /// # } + /// # Ok(()) } /// ``` - pub async fn build(self, path: impl AsRef) -> Result { - let (tx, rx) = IpcTransportClientBuilder::default().build(path).await?; + pub async fn build(self, name: &str) -> Result { + let (tx, rx) = IpcTransportClientBuilder::default().build(name).await?; Ok(self.build_with_tokio(tx, rx)) } @@ -139,20 +136,24 @@ pub enum IpcError { #[cfg(test)] mod tests { - use crate::server::dummy_endpoint; - use interprocess::local_socket::tokio::LocalSocketListener; + use interprocess::local_socket::ListenerOptions; use super::*; + use crate::server::dummy_name; #[tokio::test] async fn test_connect() { - let endpoint = dummy_endpoint(); - let binding = LocalSocketListener::bind(endpoint.clone()).unwrap(); + let name = &dummy_name(); + + let binding = ListenerOptions::new() + .name(name.as_str().to_fs_name::().unwrap()) + .create_tokio() + .unwrap(); tokio::spawn(async move { let _x = binding.accept().await; }); - let (tx, rx) = IpcTransportClientBuilder::default().build(endpoint).await.unwrap(); + let (tx, rx) = IpcTransportClientBuilder::default().build(name).await.unwrap(); let _ = IpcClientBuilder::default().build_with_tokio(tx, rx); } } diff --git a/crates/rpc/ipc/src/server/mod.rs b/crates/rpc/ipc/src/server/mod.rs index d001909d3..c38b1629e 100644 --- a/crates/rpc/ipc/src/server/mod.rs +++ b/crates/rpc/ipc/src/server/mod.rs @@ -5,8 +5,12 @@ use crate::server::{ future::StopHandle, }; use futures::StreamExt; -use futures_util::{future::Either, AsyncWriteExt}; -use interprocess::local_socket::tokio::{LocalSocketListener, LocalSocketStream}; +use futures_util::future::Either; +use interprocess::local_socket::{ + tokio::prelude::{LocalSocketListener, LocalSocketStream}, + traits::tokio::{Listener, Stream}, + GenericFilePath, ListenerOptions, ToFsName, +}; use jsonrpsee::{ core::TEN_MB_SIZE_BYTES, server::{ @@ -24,7 +28,7 @@ use std::{ task::{Context, Poll}, }; use tokio::{ - io::{AsyncRead, AsyncWrite}, + io::{AsyncRead, AsyncWrite, AsyncWriteExt}, sync::{oneshot, watch}, }; use tower::{layer::util::Identity, Layer, Service}; @@ -39,7 +43,6 @@ use crate::{ }; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tokio_util::compat::FuturesAsyncReadCompatExt; use tower::layer::{util::Stack, LayerFn}; mod connection; @@ -68,17 +71,17 @@ impl IpcServer { impl IpcServer where - RpcMiddleware: Layer + Clone + Send + 'static, - for<'a> >::Service: RpcServiceT<'a>, - HttpMiddleware: Layer> + Send + 'static, - >>::Service: Send - + Service< - String, - Response = Option, - Error = Box, - >, - <>>::Service as Service>::Future: - Send + Unpin, + RpcMiddleware: for<'a> Layer> + Clone + Send + 'static, + HttpMiddleware: Layer< + TowerServiceNoHttp, + Service: Service< + String, + Response = Option, + Error = Box, + Future: Send + Unpin, + > + Send, + > + Send + + 'static, { /// Start responding to connections requests. /// @@ -89,7 +92,7 @@ where /// use jsonrpsee::RpcModule; /// use reth_ipc::server::Builder; /// async fn run_server() -> Result<(), Box> { - /// let server = Builder::default().build("/tmp/my-uds"); + /// let server = Builder::default().build("/tmp/my-uds".into()); /// let mut module = RpcModule::new(()); /// module.register_method("say_hello", |_, _, _| "lo")?; /// let handle = server.start(module).await?; @@ -137,15 +140,19 @@ where } } - let listener = match LocalSocketListener::bind(self.endpoint.clone()) { + let listener = match self + .endpoint + .as_str() + .to_fs_name::() + .and_then(|name| ListenerOptions::new().name(name).create_tokio()) + { + Ok(listener) => listener, Err(err) => { on_ready .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err })) .ok(); return; } - - Ok(listener) => listener, }; // signal that we're ready to accept connections @@ -164,9 +171,10 @@ where match try_accept_conn(&listener, stopped).await { AcceptConnection::Established { local_socket_stream, stop } => { let Some(conn_permit) = connection_guard.try_acquire() else { - let (mut _reader, mut writer) = local_socket_stream.into_split(); - let _ = writer.write_all(b"Too many connections. Please try again later.").await; - drop((_reader, writer)); + let (_reader, mut writer) = local_socket_stream.split(); + let _ = writer + .write_all(b"Too many connections. Please try again later.") + .await; stopped = stop; continue; }; @@ -177,7 +185,7 @@ where let conn_permit = Arc::new(conn_permit); - process_connection(ProcessConnection{ + process_connection(ProcessConnection { http_middleware: &self.http_middleware, rpc_middleware: self.rpc_middleware.clone(), conn_permit, @@ -193,9 +201,11 @@ where id = id.wrapping_add(1); stopped = stop; } - AcceptConnection::Shutdown => { break; } - AcceptConnection::Err((e, stop)) => { - tracing::error!("Error while awaiting a new IPC connection: {:?}", e); + AcceptConnection::Shutdown => { + break; + } + AcceptConnection::Err((err, stop)) => { + tracing::error!(%err, "Failed accepting a new IPC connection"); stopped = stop; } } @@ -204,7 +214,8 @@ where // Drop the last Sender drop(drop_on_completion); - // Once this channel is closed it is safe to assume that all connections have been gracefully shutdown + // Once this channel is closed it is safe to assume that all connections have been + // gracefully shutdown while process_connection_awaiter.recv().await.is_some() { // Generally, messages should not be sent across this channel, // but we'll loop here to wait for `None` just to be on the safe side @@ -222,10 +233,7 @@ async fn try_accept_conn(listener: &LocalSocketListener, stopped: S) -> Accep where S: Future + Unpin, { - let accept = listener.accept(); - let accept = pin!(accept); - - match futures_util::future::select(accept, stopped).await { + match futures_util::future::select(pin!(listener.accept()), stopped).await { Either::Left((res, stop)) => match res { Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop }, Err(e) => AcceptConnection::Err((e, stop)), @@ -459,7 +467,7 @@ fn process_connection<'b, RpcMiddleware, HttpMiddleware>( let ipc = IpcConn(tokio_util::codec::Decoder::framed( StreamCodec::stream_incoming(), - local_socket_stream.compat(), + local_socket_stream, )); let (tx, rx) = mpsc::channel::(server_cfg.message_buffer_capacity as usize); @@ -682,9 +690,9 @@ impl Builder { /// #[tokio::main] /// async fn main() { /// let builder = tower::ServiceBuilder::new(); - /// - /// let server = - /// reth_ipc::server::Builder::default().set_http_middleware(builder).build("/tmp/my-uds"); + /// let server = reth_ipc::server::Builder::default() + /// .set_http_middleware(builder) + /// .build("/tmp/my-uds".into()); /// } /// ``` pub fn set_http_middleware( @@ -776,9 +784,9 @@ impl Builder { } /// Finalize the configuration of the server. Consumes the [`Builder`]. - pub fn build(self, endpoint: impl AsRef) -> IpcServer { + pub fn build(self, endpoint: String) -> IpcServer { IpcServer { - endpoint: endpoint.as_ref().to_string(), + endpoint, cfg: self.settings, id_provider: self.id_provider, http_middleware: self.http_middleware, @@ -816,9 +824,8 @@ impl ServerHandle { } } -/// For testing/examples #[cfg(test)] -pub fn dummy_endpoint() -> String { +pub fn dummy_name() -> String { let num: u64 = rand::Rng::gen(&mut rand::thread_rng()); if cfg!(windows) { format!(r"\\.\pipe\my-pipe-{}", num) @@ -893,8 +900,8 @@ mod tests { #[tokio::test] async fn can_set_the_max_response_body_size() { // init_test_tracing(); - let endpoint = dummy_endpoint(); - let server = Builder::default().max_response_body_size(100).build(&endpoint); + let endpoint = &dummy_name(); + let server = Builder::default().max_response_body_size(100).build(endpoint.clone()); let mut module = RpcModule::new(()); module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap(); let handle = server.start(module).await.unwrap(); @@ -908,8 +915,8 @@ mod tests { #[tokio::test] async fn can_set_the_max_request_body_size() { init_test_tracing(); - let endpoint = dummy_endpoint(); - let server = Builder::default().max_request_body_size(100).build(&endpoint); + let endpoint = &dummy_name(); + let server = Builder::default().max_request_body_size(100).build(endpoint.clone()); let mut module = RpcModule::new(()); module.register_method("anything", |_, _, _| "succeed").unwrap(); let handle = server.start(module).await.unwrap(); @@ -936,16 +943,16 @@ mod tests { async fn can_set_max_connections() { init_test_tracing(); - let endpoint = dummy_endpoint(); - let server = Builder::default().max_connections(2).build(&endpoint); + let endpoint = &dummy_name(); + let server = Builder::default().max_connections(2).build(endpoint.clone()); let mut module = RpcModule::new(()); module.register_method("anything", |_, _, _| "succeed").unwrap(); let handle = server.start(module).await.unwrap(); tokio::spawn(handle.stopped()); - let client1 = IpcClientBuilder::default().build(endpoint.clone()).await.unwrap(); - let client2 = IpcClientBuilder::default().build(endpoint.clone()).await.unwrap(); - let client3 = IpcClientBuilder::default().build(endpoint.clone()).await.unwrap(); + let client1 = IpcClientBuilder::default().build(endpoint).await.unwrap(); + let client2 = IpcClientBuilder::default().build(endpoint).await.unwrap(); + let client3 = IpcClientBuilder::default().build(endpoint).await.unwrap(); let response1: Result = client1.request("anything", rpc_params![]).await; let response2: Result = client2.request("anything", rpc_params![]).await; @@ -961,7 +968,7 @@ mod tests { tokio::time::sleep(std::time::Duration::from_millis(100)).await; // Can connect again - let client4 = IpcClientBuilder::default().build(endpoint.clone()).await.unwrap(); + let client4 = IpcClientBuilder::default().build(endpoint).await.unwrap(); let response4: Result = client4.request("anything", rpc_params![]).await; assert!(response4.is_ok()); } @@ -969,8 +976,8 @@ mod tests { #[tokio::test] async fn test_rpc_request() { init_test_tracing(); - let endpoint = dummy_endpoint(); - let server = Builder::default().build(&endpoint); + let endpoint = &dummy_name(); + let server = Builder::default().build(endpoint.clone()); let mut module = RpcModule::new(()); let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#; module.register_method("eth_chainId", move |_, _, _| msg).unwrap(); @@ -984,8 +991,8 @@ mod tests { #[tokio::test] async fn test_batch_request() { - let endpoint = dummy_endpoint(); - let server = Builder::default().build(&endpoint); + let endpoint = &dummy_name(); + let server = Builder::default().build(endpoint.clone()); let mut module = RpcModule::new(()); module.register_method("anything", |_, _, _| "ok").unwrap(); let handle = server.start(module).await.unwrap(); @@ -1009,8 +1016,8 @@ mod tests { #[tokio::test] async fn test_ipc_modules() { reth_tracing::init_test_tracing(); - let endpoint = dummy_endpoint(); - let server = Builder::default().build(&endpoint); + let endpoint = &dummy_name(); + let server = Builder::default().build(endpoint.clone()); let mut module = RpcModule::new(()); let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#; module.register_method("rpc_modules", move |_, _, _| msg).unwrap(); @@ -1024,8 +1031,8 @@ mod tests { #[tokio::test(flavor = "multi_thread")] async fn test_rpc_subscription() { - let endpoint = dummy_endpoint(); - let server = Builder::default().build(&endpoint); + let endpoint = &dummy_name(); + let server = Builder::default().build(endpoint.clone()); let (tx, _rx) = broadcast::channel::(16); let mut module = RpcModule::new(tx.clone()); @@ -1080,10 +1087,10 @@ mod tests { } reth_tracing::init_test_tracing(); - let endpoint = dummy_endpoint(); + let endpoint = &dummy_name(); let rpc_middleware = RpcServiceBuilder::new().layer_fn(ModifyRequestIf); - let server = Builder::default().set_rpc_middleware(rpc_middleware).build(&endpoint); + let server = Builder::default().set_rpc_middleware(rpc_middleware).build(endpoint.clone()); let mut module = RpcModule::new(()); let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#; diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index 62a676eaa..3f016c330 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -298,7 +298,7 @@ impl AuthServerHandle { pub async fn ipc_client(&self) -> Option { use reth_ipc::client::IpcClientBuilder; - if let Some(ipc_endpoint) = self.ipc_endpoint.clone() { + if let Some(ipc_endpoint) = &self.ipc_endpoint { return Some( IpcClientBuilder::default() .build(ipc_endpoint) diff --git a/crates/storage/db-api/Cargo.toml b/crates/storage/db-api/Cargo.toml index 1f55e6d7f..a3b33abfd 100644 --- a/crates/storage/db-api/Cargo.toml +++ b/crates/storage/db-api/Cargo.toml @@ -54,7 +54,7 @@ pprof = { workspace = true, features = [ "criterion", ] } criterion.workspace = true -iai-callgrind = "0.10.2" +iai-callgrind.workspace = true arbitrary = { workspace = true, features = ["derive"] } proptest.workspace = true diff --git a/crates/storage/db/Cargo.toml b/crates/storage/db/Cargo.toml index b563e1d78..336cc75d2 100644 --- a/crates/storage/db/Cargo.toml +++ b/crates/storage/db/Cargo.toml @@ -64,7 +64,7 @@ pprof = { workspace = true, features = [ "criterion", ] } criterion.workspace = true -iai-callgrind = "0.10.2" +iai-callgrind.workspace = true arbitrary = { workspace = true, features = ["derive"] } proptest.workspace = true diff --git a/crates/storage/db/src/metrics.rs b/crates/storage/db/src/metrics.rs index 0d0b68722..fecd691ee 100644 --- a/crates/storage/db/src/metrics.rs +++ b/crates/storage/db/src/metrics.rs @@ -1,12 +1,8 @@ use crate::Tables; use metrics::{Gauge, Histogram}; use reth_metrics::{metrics::Counter, Metrics}; -use rustc_hash::{FxHashMap, FxHasher}; -use std::{ - collections::HashMap, - hash::BuildHasherDefault, - time::{Duration, Instant}, -}; +use rustc_hash::FxHashMap; +use std::time::{Duration, Instant}; use strum::{EnumCount, EnumIter, IntoEnumIterator}; const LARGE_VALUE_THRESHOLD_BYTES: usize = 4096; @@ -45,7 +41,7 @@ impl DatabaseEnvMetrics { fn generate_operation_handles() -> FxHashMap<(&'static str, Operation), OperationMetrics> { let mut operations = FxHashMap::with_capacity_and_hasher( Tables::COUNT * Operation::COUNT, - BuildHasherDefault::::default(), + Default::default(), ); for table in Tables::ALL { for operation in Operation::iter() { @@ -81,9 +77,9 @@ impl DatabaseEnvMetrics { /// Used for tracking various stats for finished transactions (e.g. commit duration). fn generate_transaction_outcome_handles( ) -> FxHashMap<(TransactionMode, TransactionOutcome), TransactionOutcomeMetrics> { - let mut transaction_outcomes = HashMap::with_capacity_and_hasher( + let mut transaction_outcomes = FxHashMap::with_capacity_and_hasher( TransactionMode::COUNT * TransactionOutcome::COUNT, - BuildHasherDefault::::default(), + Default::default(), ); for mode in TransactionMode::iter() { for outcome in TransactionOutcome::iter() { diff --git a/crates/storage/nippy-jar/Cargo.toml b/crates/storage/nippy-jar/Cargo.toml index fb485e32a..dcc916e78 100644 --- a/crates/storage/nippy-jar/Cargo.toml +++ b/crates/storage/nippy-jar/Cargo.toml @@ -32,7 +32,7 @@ lz4_flex = { version = "0.11", default-features = false } # offsets sucds = "~0.8" -memmap2 = "0.7.1" +memmap2 = "0.9.4" bincode = "1.3" serde = { workspace = true, features = ["derive"] } tracing.workspace = true diff --git a/crates/trie/common/Cargo.toml b/crates/trie/common/Cargo.toml index 241a3518f..f9267c2b8 100644 --- a/crates/trie/common/Cargo.toml +++ b/crates/trie/common/Cargo.toml @@ -29,7 +29,7 @@ itertools.workspace = true nybbles = { workspace = true, features = ["serde", "rlp"] } # `test-utils` feature -hash-db = { version = "~0.15", optional = true } +hash-db = { version = "=0.15.2", optional = true } plain_hasher = { version = "0.2", optional = true } arbitrary = { workspace = true, features = ["derive"], optional = true } proptest = { workspace = true, optional = true } @@ -43,7 +43,7 @@ proptest-derive.workspace = true serde_json.workspace = true test-fuzz.workspace = true toml.workspace = true -hash-db = "~0.15" +hash-db = "=0.15.2" plain_hasher = "0.2" [features] @@ -53,4 +53,4 @@ arbitrary = [ "dep:arbitrary", "dep:proptest", "dep:proptest-derive", -] \ No newline at end of file +]