diff --git a/Cargo.lock b/Cargo.lock index 74431b4c0..5a68ee773 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -808,10 +808,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" dependencies = [ "concurrent-queue", - "event-listener", + "event-listener 2.5.3", "futures-core", ] +[[package]] +name = "async-channel" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d4d23bcc79e27423727b36823d86233aad06dfea531837b038394d11e9928" +dependencies = [ + "concurrent-queue", + "event-listener 5.3.0", + "event-listener-strategy 0.5.1", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-compression" version = "0.4.9" @@ -828,14 +841,25 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-lock" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d034b430882f8381900d3fe6f0aaa3ad94f2cb4ac519b429692a1bc2dda4ae7b" +dependencies = [ + "event-listener 4.0.3", + "event-listener-strategy 0.4.0", + "pin-project-lite", +] + [[package]] name = "async-sse" version = "5.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e6fa871e4334a622afd6bb2f611635e8083a6f5e2936c0f90f37c7ef9856298" dependencies = [ - "async-channel", - "futures-lite", + "async-channel 1.9.0", + "futures-lite 1.13.0", "http-types", "log", "memchr", @@ -864,6 +888,12 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "async-task" +version = "4.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbb36e985947064623dbd357f727af08ffd077f93d696782f3c56365fa2e2799" + [[package]] name = "async-trait" version = "0.1.80" @@ -875,6 +905,12 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "attohttpc" version = "0.24.1" @@ -1122,6 +1158,22 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a37913e8dc4ddcc604f0c6d3bf2887c995153af3611de9e23c352b44c1b9118" +dependencies = [ + "async-channel 2.2.1", + "async-lock", + "async-task", + "fastrand 2.0.2", + "futures-io", + "futures-lite 2.3.0", + "piper", + "tracing", +] + [[package]] name = "blst" version = "0.3.11" @@ -2727,6 +2779,48 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "event-listener" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67b215c49b2b248c855fb73579eb1f4f26c38ffdc12973e20e07b91d78d5646e" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d9944b8ca13534cdfb2800775f8dd4902ff3fc75a50101466decadfdf322a24" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "958e4d70b6d5e81971bebec42271ec641e7ff4e170a6fa605f2b8a8b65cb97d3" +dependencies = [ + "event-listener 4.0.3", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "332f51cb23d20b0de8458b86580878211da09bcd4503cb579c225b3d124cabb3" +dependencies = [ + "event-listener 5.3.0", + "pin-project-lite", +] + [[package]] name = "examples" version = "0.0.0" @@ -2992,6 +3086,16 @@ dependencies = [ "waker-fn", ] +[[package]] +name = "futures-lite" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52527eb5074e35e9339c6b4e8d12600c7128b68fb25dcb9fa9dec18f7c25f3a5" +dependencies = [ + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.30" @@ -3428,9 +3532,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" dependencies = [ "anyhow", - "async-channel", + "async-channel 1.9.0", "base64 0.13.1", - "futures-lite", + "futures-lite 1.13.0", "infer", "pin-project-lite", "rand 0.7.3", @@ -3951,6 +4055,33 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "interprocess" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81f2533f3be42fffe3b5e63b71aeca416c1c3bc33e4e27be018521e76b1f38fb" +dependencies = [ + "blocking", + "cfg-if", + "futures-core", + "futures-io", + "intmap", + "libc", + "once_cell", + "rustc_version 0.4.0", + "spinning", + "thiserror", + "to_method", + "tokio", + "winapi", +] + +[[package]] +name = "intmap" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae52f28f45ac2bc96edb7714de995cffc174a395fb0abf5bff453587c980d7b9" + [[package]] name = "intrusive-collections" version = "0.9.6" @@ -5203,20 +5334,6 @@ dependencies = [ "syn 1.0.109", ] -[[package]] -name = "parity-tokio-ipc" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9981e32fb75e004cc148f5fb70342f393830e0a4aa62e3cc93b50976218d42b6" -dependencies = [ - "futures", - "libc", - "log", - "rand 0.7.3", - "tokio", - "winapi", -] - [[package]] name = "parking" version = "2.2.0" @@ -5400,6 +5517,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.2", + "futures-io", +] + [[package]] name = "pkcs8" version = "0.10.2" @@ -6744,9 +6872,10 @@ dependencies = [ "bytes", "futures", "futures-util", + "interprocess", "jsonrpsee", - "parity-tokio-ipc", "pin-project", + "rand 0.8.5", "reth-tracing", "serde_json", "thiserror", @@ -6755,6 +6884,7 @@ dependencies = [ "tokio-util", "tower", "tracing", + "windows-sys 0.52.0", ] [[package]] @@ -8657,6 +8787,15 @@ version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +[[package]] +name = "spinning" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d4f0e86297cad2658d92a707320d87bf4e6ae1050287f51d19b67ef3f153a7b" +dependencies = [ + "lock_api", +] + [[package]] name = "spki" version = "0.7.3" @@ -9125,6 +9264,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" +[[package]] +name = "to_method" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c4ceeeca15c8384bbc3e011dbd8fccb7f068a440b752b7d9b32ceb0ca0e2e8" + [[package]] name = "tokio" version = "1.37.0" diff --git a/crates/node-builder/src/rpc.rs b/crates/node-builder/src/rpc.rs index d6e2eb0f2..3efeba7f5 100644 --- a/crates/node-builder/src/rpc.rs +++ b/crates/node-builder/src/rpc.rs @@ -301,7 +301,6 @@ where let launch_auth = auth_module.clone().start_server(auth_config).map_ok(|handle| { let addr = handle.local_addr(); if let Some(ipc_endpoint) = handle.ipc_endpoint() { - let ipc_endpoint = ipc_endpoint.path(); info!(target: "reth::cli", url=%addr, ipc_endpoint=%ipc_endpoint,"RPC auth server started"); } else { info!(target: "reth::cli", url=%addr, "RPC auth server started"); diff --git a/crates/node-core/src/args/rpc_server_args.rs b/crates/node-core/src/args/rpc_server_args.rs index 2ac48e2ba..1a60aa31a 100644 --- a/crates/node-core/src/args/rpc_server_args.rs +++ b/crates/node-core/src/args/rpc_server_args.rs @@ -711,7 +711,7 @@ mod tests { config.ws_address().unwrap(), SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8888)) ); - assert_eq!(config.ipc_endpoint().unwrap().path(), constants::DEFAULT_IPC_ENDPOINT); + assert_eq!(config.ipc_endpoint().unwrap(), constants::DEFAULT_IPC_ENDPOINT); } #[test] diff --git a/crates/rpc/ipc/Cargo.toml b/crates/rpc/ipc/Cargo.toml index 21b645409..094fa5759 100644 --- a/crates/rpc/ipc/Cargo.toml +++ b/crates/rpc/ipc/Cargo.toml @@ -15,7 +15,6 @@ workspace = true # async/net futures.workspace = true -parity-tokio-ipc = "0.9.0" tokio = { workspace = true, features = ["net", "time", "rt-multi-thread"] } tokio-util = { workspace = true, features = ["codec"] } tokio-stream.workspace = true @@ -30,7 +29,12 @@ tracing.workspace = true bytes.workspace = true thiserror.workspace = true futures-util = "0.3.30" +interprocess = { version = "1.2.1", features = ["tokio_support"] } + +[target.'cfg(windows)'.dependencies] +windows-sys = { version = "0.52.0", features = ["Win32_Foundation"] } [dev-dependencies] tokio-stream = { workspace = true, features = ["sync"] } reth-tracing.workspace = true +rand.workspace = true diff --git a/crates/rpc/ipc/src/client.rs b/crates/rpc/ipc/src/client.rs deleted file mode 100644 index f4454958f..000000000 --- a/crates/rpc/ipc/src/client.rs +++ /dev/null @@ -1,151 +0,0 @@ -//! [`jsonrpsee`] transport adapter implementation for IPC. - -use crate::stream_codec::StreamCodec; -use futures::StreamExt; -use jsonrpsee::{ - async_client::{Client, ClientBuilder}, - core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT}, -}; -use std::{ - io, - path::{Path, PathBuf}, -}; -use tokio::{io::AsyncWriteExt, net::UnixStream}; -use tokio_util::codec::FramedRead; - -/// Builder type for [`Client`] -#[derive(Clone, Default, Debug)] -#[non_exhaustive] -pub struct IpcClientBuilder; - -impl IpcClientBuilder { - /// Connects to a IPC socket - pub async fn build(self, path: impl AsRef) -> Result { - let (tx, rx) = IpcTransportClientBuilder::default().build(path).await?; - Ok(self.build_with_tokio(tx, rx)) - } - - /// Uses the sender and receiver channels to connect to the socket. - pub fn build_with_tokio(self, sender: S, receiver: R) -> Client - where - S: TransportSenderT + Send, - R: TransportReceiverT + Send, - { - ClientBuilder::default().build_with_tokio(sender, receiver) - } -} - -/// Sending end of IPC transport. -#[derive(Debug)] -pub struct Sender { - inner: tokio::net::unix::OwnedWriteHalf, -} - -#[async_trait::async_trait] -impl TransportSenderT for Sender { - type Error = IpcError; - - /// Sends out a request. Returns a Future that finishes when the request has been successfully - /// sent. - async fn send(&mut self, msg: String) -> Result<(), Self::Error> { - Ok(self.inner.write_all(msg.as_bytes()).await?) - } - - async fn send_ping(&mut self) -> Result<(), Self::Error> { - tracing::trace!("send ping - not implemented"); - Err(IpcError::NotSupported) - } - - /// Close the connection. - async fn close(&mut self) -> Result<(), Self::Error> { - Ok(()) - } -} - -/// Receiving end of IPC transport. -#[derive(Debug)] -pub struct Receiver { - inner: FramedRead, -} - -#[async_trait::async_trait] -impl TransportReceiverT for Receiver { - type Error = IpcError; - - /// Returns a Future resolving when the server sent us something back. - async fn receive(&mut self) -> Result { - self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?))) - } -} - -/// Builder for IPC transport [`Sender`] and [`Receiver`] pair. -#[derive(Debug, Clone, Default)] -#[non_exhaustive] -pub struct IpcTransportClientBuilder; - -impl IpcTransportClientBuilder { - /// Try to establish the connection. - /// - /// ``` - /// use jsonrpsee::{core::client::ClientT, rpc_params}; - /// use reth_ipc::client::IpcClientBuilder; - /// # async fn run_client() -> Result<(), Box> { - /// let client = IpcClientBuilder::default().build("/tmp/my-uds").await?; - /// let response: String = client.request("say_hello", rpc_params![]).await?; - /// # Ok(()) - /// # } - /// ``` - pub async fn build(self, path: impl AsRef) -> Result<(Sender, Receiver), IpcError> { - let path = path.as_ref(); - - let stream = UnixStream::connect(path) - .await - .map_err(|err| IpcError::FailedToConnect { path: path.to_path_buf(), err })?; - - let (rhlf, whlf) = stream.into_split(); - - Ok(( - Sender { inner: whlf }, - Receiver { inner: FramedRead::new(rhlf, StreamCodec::stream_incoming()) }, - )) - } -} - -/// Error variants that can happen in IPC transport. -#[derive(Debug, thiserror::Error)] -pub enum IpcError { - /// Operation not supported - #[error("operation not supported")] - NotSupported, - /// Stream was closed - #[error("stream closed")] - Closed, - /// Thrown when failed to establish a socket connection. - #[error("failed to connect to socket {path}: {err}")] - FailedToConnect { - /// The path of the socket. - #[doc(hidden)] - path: PathBuf, - /// The error occurred while connecting. - #[doc(hidden)] - err: io::Error, - }, - /// Wrapped IO Error - #[error(transparent)] - Io(#[from] io::Error), -} - -#[cfg(test)] -mod tests { - use super::*; - use parity_tokio_ipc::{dummy_endpoint, Endpoint}; - - #[tokio::test] - async fn test_connect() { - let endpoint = dummy_endpoint(); - let _incoming = Endpoint::new(endpoint.clone()).incoming().unwrap(); - - let (tx, rx) = IpcTransportClientBuilder::default().build(endpoint).await.unwrap(); - let _ = IpcClientBuilder::default().build_with_tokio(tx, rx); - } -} diff --git a/crates/rpc/ipc/src/client/mod.rs b/crates/rpc/ipc/src/client/mod.rs new file mode 100644 index 000000000..8ca4b5406 --- /dev/null +++ b/crates/rpc/ipc/src/client/mod.rs @@ -0,0 +1,97 @@ +//! [`jsonrpsee`] transport adapter implementation for IPC. + +use std::{ + io, + path::{Path, PathBuf}, +}; + +use jsonrpsee::{ + async_client::{Client, ClientBuilder}, + core::client::{TransportReceiverT, TransportSenderT}, +}; + +#[cfg(unix)] +use crate::client::unix::IpcTransportClientBuilder; +#[cfg(windows)] +use crate::client::win::IpcTransportClientBuilder; + +#[cfg(unix)] +mod unix; +#[cfg(windows)] +mod win; + +/// Builder type for [`Client`] +#[derive(Clone, Default, Debug)] +#[non_exhaustive] +pub struct IpcClientBuilder; + +impl IpcClientBuilder { + /// Connects to a IPC socket + /// + /// ``` + /// use jsonrpsee::{core::client::ClientT, rpc_params}; + /// use reth_ipc::client::IpcClientBuilder; + /// # async fn run_client() -> Result<(), Box> { + /// let client = IpcClientBuilder::default().build("/tmp/my-uds").await?; + /// let response: String = client.request("say_hello", rpc_params![]).await?; + /// # Ok(()) + /// # } + /// ``` + pub async fn build(self, path: impl AsRef) -> Result { + let (tx, rx) = IpcTransportClientBuilder::default().build(path).await?; + Ok(self.build_with_tokio(tx, rx)) + } + + /// Uses the sender and receiver channels to connect to the socket. + pub fn build_with_tokio(self, sender: S, receiver: R) -> Client + where + S: TransportSenderT + Send, + R: TransportReceiverT + Send, + { + ClientBuilder::default().build_with_tokio(sender, receiver) + } +} + +/// Error variants that can happen in IPC transport. +#[derive(Debug, thiserror::Error)] +pub enum IpcError { + /// Operation not supported + #[error("operation not supported")] + NotSupported, + /// Stream was closed + #[error("stream closed")] + Closed, + /// Thrown when failed to establish a socket connection. + #[error("failed to connect to socket {path}: {err}")] + FailedToConnect { + /// The path of the socket. + #[doc(hidden)] + path: PathBuf, + /// The error occurred while connecting. + #[doc(hidden)] + err: io::Error, + }, + /// Wrapped IO Error + #[error(transparent)] + Io(#[from] io::Error), +} + +#[cfg(test)] +mod tests { + use crate::server::dummy_endpoint; + use interprocess::local_socket::tokio::LocalSocketListener; + + use super::*; + + #[tokio::test] + async fn test_connect() { + let endpoint = dummy_endpoint(); + let binding = LocalSocketListener::bind(endpoint.clone()).unwrap(); + tokio::spawn(async move { + let _x = binding.accept().await; + }); + + let (tx, rx) = IpcTransportClientBuilder::default().build(endpoint).await.unwrap(); + let _ = IpcClientBuilder::default().build_with_tokio(tx, rx); + } +} diff --git a/crates/rpc/ipc/src/client/unix.rs b/crates/rpc/ipc/src/client/unix.rs new file mode 100644 index 000000000..c7ed7bc7a --- /dev/null +++ b/crates/rpc/ipc/src/client/unix.rs @@ -0,0 +1,82 @@ +//! [`jsonrpsee`] transport adapter implementation for Unix IPC by using Unix Sockets. + +use crate::{client::IpcError, stream_codec::StreamCodec}; +use futures::StreamExt; +use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT}; +use std::path::Path; +use tokio::{ + io::AsyncWriteExt, + net::{ + unix::{OwnedReadHalf, OwnedWriteHalf}, + UnixStream, + }, +}; +use tokio_util::codec::FramedRead; + +/// Sending end of IPC transport. +#[derive(Debug)] +pub(crate) struct Sender { + inner: OwnedWriteHalf, +} + +#[async_trait::async_trait] +impl TransportSenderT for Sender { + type Error = IpcError; + + /// Sends out a request. Returns a Future that finishes when the request has been successfully + /// sent. + async fn send(&mut self, msg: String) -> Result<(), Self::Error> { + Ok(self.inner.write_all(msg.as_bytes()).await?) + } + + async fn send_ping(&mut self) -> Result<(), Self::Error> { + tracing::trace!("send ping - not implemented"); + Err(IpcError::NotSupported) + } + + /// Close the connection. + async fn close(&mut self) -> Result<(), Self::Error> { + Ok(()) + } +} + +/// Receiving end of IPC transport. +#[derive(Debug)] +pub(crate) struct Receiver { + pub(crate) inner: FramedRead, +} + +#[async_trait::async_trait] +impl TransportReceiverT for Receiver { + type Error = IpcError; + + /// Returns a Future resolving when the server sent us something back. + async fn receive(&mut self) -> Result { + self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?))) + } +} + +/// Builder for IPC transport [`Sender`] and [`Receiver`] pair. +#[derive(Debug, Clone, Default)] +#[non_exhaustive] +pub(crate) struct IpcTransportClientBuilder; + +impl IpcTransportClientBuilder { + pub(crate) async fn build( + self, + path: impl AsRef, + ) -> Result<(Sender, Receiver), IpcError> { + let path = path.as_ref(); + + let stream = UnixStream::connect(path) + .await + .map_err(|err| IpcError::FailedToConnect { path: path.to_path_buf(), err })?; + + let (rhlf, whlf) = stream.into_split(); + + Ok(( + Sender { inner: whlf }, + Receiver { inner: FramedRead::new(rhlf, StreamCodec::stream_incoming()) }, + )) + } +} diff --git a/crates/rpc/ipc/src/client/win.rs b/crates/rpc/ipc/src/client/win.rs new file mode 100644 index 000000000..69b3140fe --- /dev/null +++ b/crates/rpc/ipc/src/client/win.rs @@ -0,0 +1,82 @@ +//! [`jsonrpsee`] transport adapter implementation for Windows IPC by using NamedPipes. + +use crate::{client::IpcError, stream_codec::StreamCodec}; +use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT}; +use std::{path::Path, sync::Arc}; +use tokio::{ + io::AsyncWriteExt, + net::windows::named_pipe::{ClientOptions, NamedPipeClient}, + time, + time::Duration, +}; +use tokio_stream::StreamExt; +use tokio_util::codec::FramedRead; +use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY; + +/// Sending end of IPC transport. +#[derive(Debug)] +pub struct Sender { + inner: Arc, +} + +#[async_trait::async_trait] +impl TransportSenderT for Sender { + type Error = IpcError; + + /// Sends out a request. Returns a Future that finishes when the request has been successfully + /// sent. + async fn send(&mut self, msg: String) -> Result<(), Self::Error> { + Ok(self.inner.write_all(msg.as_bytes()).await?) + } + + async fn send_ping(&mut self) -> Result<(), Self::Error> { + tracing::trace!("send ping - not implemented"); + Err(IpcError::NotSupported) + } + + /// Close the connection. + async fn close(&mut self) -> Result<(), Self::Error> { + Ok(()) + } +} + +/// Receiving end of IPC transport. +#[derive(Debug)] +pub struct Receiver { + inner: FramedRead, StreamCodec>, +} + +#[async_trait::async_trait] +impl TransportReceiverT for Receiver { + type Error = IpcError; + + /// Returns a Future resolving when the server sent us something back. + async fn receive(&mut self) -> Result { + self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?))) + } +} + +/// Builder for IPC transport [`crate::client::win::Sender`] and [`crate::client::win::Receiver`] +/// pair. +#[derive(Debug, Clone, Default)] +#[non_exhaustive] +pub struct IpcTransportClientBuilder; + +impl IpcTransportClientBuilder { + pub async fn build(self, path: impl AsRef) -> Result<(Sender, Receiver), IpcError> { + let addr = path.as_ref().as_os_str(); + let client = loop { + match ClientOptions::new().open(addr) { + Ok(client) => break client, + Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (), + Err(e) => return IpcError::FailedToConnect { path: path.to_path_buf(), err: e }, + } + time::sleep(Duration::from_mills(50)).await; + }; + let client = Arc::new(client); + Ok(( + Sender { inner: client.clone() }, + Receiver { inner: FramedRead::new(client, StreamCodec::stream_incoming()) }, + )) + } +} diff --git a/crates/rpc/ipc/src/lib.rs b/crates/rpc/ipc/src/lib.rs index 2d0193ed6..ae7a8b221 100644 --- a/crates/rpc/ipc/src/lib.rs +++ b/crates/rpc/ipc/src/lib.rs @@ -12,7 +12,6 @@ #![cfg_attr(not(test), warn(unused_crate_dependencies))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -#[cfg(unix)] pub mod client; pub mod server; diff --git a/crates/rpc/ipc/src/server/connection.rs b/crates/rpc/ipc/src/server/connection.rs index abeba7bbf..05f7a53a9 100644 --- a/crates/rpc/ipc/src/server/connection.rs +++ b/crates/rpc/ipc/src/server/connection.rs @@ -1,12 +1,11 @@ //! A IPC connection. use crate::stream_codec::StreamCodec; -use futures::{ready, stream::FuturesUnordered, FutureExt, Sink, Stream, StreamExt}; +use futures::{stream::FuturesUnordered, FutureExt, Sink, Stream}; use std::{ collections::VecDeque, future::Future, io, - marker::PhantomData, pin::Pin, task::{Context, Poll}, }; @@ -16,58 +15,8 @@ use tower::Service; pub(crate) type JsonRpcStream = Framed; -/// Wraps a stream of incoming connections. #[pin_project::pin_project] -pub(crate) struct Incoming { - #[pin] - inner: T, - _marker: PhantomData, -} -impl Incoming -where - T: Stream> + Unpin + 'static, - Item: AsyncRead + AsyncWrite, -{ - /// Create a new instance. - pub(crate) fn new(inner: T) -> Self { - Self { inner, _marker: Default::default() } - } - - /// Polls to accept a new incoming connection to the endpoint. - pub(crate) fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<::Item> { - Poll::Ready(ready!(self.poll_next_unpin(cx)).map_or( - Err(io::Error::new(io::ErrorKind::ConnectionAborted, "ipc connection closed")), - |conn| conn, - )) - } -} - -impl Stream for Incoming -where - T: Stream> + 'static, - Item: AsyncRead + AsyncWrite, -{ - type Item = io::Result>>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let res = match ready!(this.inner.poll_next(cx)) { - Some(Ok(item)) => { - let framed = IpcConn(tokio_util::codec::Decoder::framed( - StreamCodec::stream_incoming(), - item, - )); - Ok(framed) - } - Some(Err(err)) => Err(err), - None => return Poll::Ready(None), - }; - Poll::Ready(Some(res)) - } -} - -#[pin_project::pin_project] -pub(crate) struct IpcConn(#[pin] T); +pub(crate) struct IpcConn(#[pin] pub(crate) T); impl IpcConn> where diff --git a/crates/rpc/ipc/src/server/future.rs b/crates/rpc/ipc/src/server/future.rs index 65aaccc88..f807af449 100644 --- a/crates/rpc/ipc/src/server/future.rs +++ b/crates/rpc/ipc/src/server/future.rs @@ -26,127 +26,9 @@ //! Utilities for handling async code. -use futures::FutureExt; -use std::{ - future::Future, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; -use tokio::{ - sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError}, - time::{self, Duration, Interval}, -}; +use std::sync::Arc; -/// Polling for server stop monitor interval in milliseconds. -const STOP_MONITOR_POLLING_INTERVAL: Duration = Duration::from_millis(1000); - -/// This is a flexible collection of futures that need to be driven to completion -/// alongside some other future, such as connection handlers that need to be -/// handled along with a listener for new connections. -/// -/// In order to `.await` on these futures and drive them to completion, call -/// `select_with` providing some other future, the result of which you need. -pub(crate) struct FutureDriver { - futures: Vec, - stop_monitor_heartbeat: Interval, -} - -impl Default for FutureDriver { - fn default() -> Self { - let mut heartbeat = time::interval(STOP_MONITOR_POLLING_INTERVAL); - - heartbeat.set_missed_tick_behavior(time::MissedTickBehavior::Skip); - - FutureDriver { futures: Vec::new(), stop_monitor_heartbeat: heartbeat } - } -} - -impl FutureDriver { - /// Add a new future to this driver - pub(crate) fn add(&mut self, future: F) { - self.futures.push(future); - } -} - -impl FutureDriver -where - F: Future + Unpin, -{ - pub(crate) async fn select_with(&mut self, selector: S) -> S::Output { - tokio::pin!(selector); - - DriverSelect { selector, driver: self }.await - } - - fn drive(&mut self, cx: &mut Context<'_>) { - let mut i = 0; - - while i < self.futures.len() { - if self.futures[i].poll_unpin(cx).is_ready() { - // Using `swap_remove` since we don't care about ordering, - // but we do care about removing being `O(1)`. - // - // We don't increment `i` in this branch, since we now - // have a shorter length, and potentially a new value at - // current index - self.futures.swap_remove(i); - } else { - i += 1; - } - } - } - - fn poll_stop_monitor_heartbeat(&mut self, cx: &mut Context<'_>) { - // We don't care about the ticks of the heartbeat, it's here only - // to periodically wake the `Waker` on `cx`. - let _ = self.stop_monitor_heartbeat.poll_tick(cx); - } -} - -impl Future for FutureDriver -where - F: Future + Unpin, -{ - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = Pin::into_inner(self); - - this.drive(cx); - - if this.futures.is_empty() { - Poll::Ready(()) - } else { - Poll::Pending - } - } -} - -/// This is a glorified select `Future` that will attempt to drive all -/// connection futures `F` to completion on each `poll`, while also -/// handling incoming connections. -struct DriverSelect<'a, S, F> { - selector: S, - driver: &'a mut FutureDriver, -} - -impl<'a, R, F> Future for DriverSelect<'a, R, F> -where - R: Future + Unpin, - F: Future + Unpin, -{ - type Output = R::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = Pin::into_inner(self); - - this.driver.drive(cx); - this.driver.poll_stop_monitor_heartbeat(cx); - - this.selector.poll_unpin(cx) - } -} +use tokio::sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError}; #[derive(Debug, Clone)] pub(crate) struct StopHandle(watch::Receiver<()>); @@ -156,12 +38,7 @@ impl StopHandle { Self(rx) } - pub(crate) fn shutdown_requested(&self) -> bool { - // if a message has been seen, it means that `stop` has been called. - self.0.has_changed().unwrap_or(true) - } - - pub(crate) async fn shutdown(&mut self) { + pub(crate) async fn shutdown(mut self) { // Err(_) implies that the `sender` has been dropped. // Ok(_) implies that `stop` has been called. let _ = self.0.changed().await; diff --git a/crates/rpc/ipc/src/server/mod.rs b/crates/rpc/ipc/src/server/mod.rs index 5301c7d21..c876457e1 100644 --- a/crates/rpc/ipc/src/server/mod.rs +++ b/crates/rpc/ipc/src/server/mod.rs @@ -1,14 +1,16 @@ //! JSON-RPC IPC server implementation use crate::server::{ - connection::{Incoming, IpcConn, JsonRpcStream}, - future::{ConnectionGuard, FutureDriver, StopHandle}, + connection::{IpcConn, JsonRpcStream}, + future::{ConnectionGuard, StopHandle}, }; -use futures::{FutureExt, Stream, StreamExt}; +use futures::StreamExt; +use futures_util::{future::Either, stream::FuturesUnordered}; +use interprocess::local_socket::tokio::{LocalSocketListener, LocalSocketStream}; use jsonrpsee::{ core::TEN_MB_SIZE_BYTES, server::{ - middleware::rpc::{either::Either, RpcLoggerLayer, RpcServiceT}, + middleware::rpc::{RpcLoggerLayer, RpcServiceT}, AlreadyStoppedError, IdProvider, RandomIntegerIdProvider, }, BoundedSubscriptions, MethodSink, Methods, @@ -25,16 +27,18 @@ use tokio::{ sync::{oneshot, watch, OwnedSemaphorePermit}, }; use tower::{layer::util::Identity, Layer, Service}; -use tracing::{debug, trace, warn}; - +use tracing::{debug, trace, warn, Instrument}; // re-export so can be used during builder setup -use crate::server::{ - connection::IpcConnDriver, - rpc_service::{RpcService, RpcServiceCfg}, +use crate::{ + server::{ + connection::IpcConnDriver, + rpc_service::{RpcService, RpcServiceCfg}, + }, + stream_codec::StreamCodec, }; -pub use parity_tokio_ipc::Endpoint; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; +use tokio_util::compat::FuturesAsyncReadCompatExt; use tower::layer::{util::Stack, LayerFn}; mod connection; @@ -47,7 +51,7 @@ mod rpc_service; // This is an adapted `jsonrpsee` Server, but for `Ipc` connections. pub struct IpcServer { /// The endpoint we listen for incoming transactions - endpoint: Endpoint, + endpoint: String, id_provider: Arc, cfg: Settings, rpc_middleware: RpcServiceBuilder, @@ -55,9 +59,9 @@ pub struct IpcServer { } impl IpcServer { - /// Returns the configured [Endpoint] - pub fn endpoint(&self) -> &Endpoint { - &self.endpoint + /// Returns the configured endpoint + pub fn endpoint(&self) -> String { + self.endpoint.clone() } } @@ -123,15 +127,29 @@ where stop_handle: StopHandle, on_ready: oneshot::Sender>, ) { - trace!(endpoint = ?self.endpoint.path(), "starting ipc server"); + trace!(endpoint = ?self.endpoint, "starting ipc server"); if cfg!(unix) { // ensure the file does not exist - if std::fs::remove_file(self.endpoint.path()).is_ok() { - debug!(endpoint = ?self.endpoint.path(), "removed existing IPC endpoint file"); + if std::fs::remove_file(&self.endpoint).is_ok() { + debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file"); } } + let listener = match LocalSocketListener::bind(self.endpoint.clone()) { + Err(err) => { + on_ready + .send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err })) + .ok(); + return; + } + + Ok(listener) => listener, + }; + + // signal that we're ready to accept connections + on_ready.send(Ok(())).ok(); + let message_buffer_capacity = self.cfg.message_buffer_capacity; let max_request_body_size = self.cfg.max_request_body_size; let max_response_body_size = self.cfg.max_response_body_size; @@ -142,37 +160,27 @@ where let mut id: u32 = 0; let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize); - let mut connections = FutureDriver::default(); - let endpoint_path = self.endpoint.path().to_string(); - let incoming = match self.endpoint.incoming() { - Ok(connections) => { - #[cfg(windows)] - let connections = Box::pin(connections); - Incoming::new(connections) - } - Err(err) => { - on_ready - .send(Err(IpcServerStartError { endpoint: endpoint_path, source: err })) - .ok(); - return - } - }; - // signal that we're ready to accept connections - on_ready.send(Ok(())).ok(); - - let mut incoming = Monitored::new(incoming, &stop_handle); + let mut connections = FuturesUnordered::new(); + let stopped = stop_handle.clone().shutdown(); + tokio::pin!(stopped); trace!("accepting ipc connections"); loop { - match connections.select_with(&mut incoming).await { - Ok(ipc) => { + match try_accept_conn(&listener, stopped).await { + AcceptConnection::Established { local_socket_stream, stop } => { trace!("established new connection"); + let ipc = IpcConn(tokio_util::codec::Decoder::framed( + StreamCodec::stream_incoming(), + local_socket_stream.compat(), + )); + let conn = match connection_guard.try_acquire() { Some(conn) => conn, None => { warn!("Too many IPC connections. Please try again later."); - connections.add(ipc.reject_connection().boxed()); - continue + connections.push(tokio::spawn(ipc.reject_connection().in_current_span())); + stopped = stop; + continue; } }; @@ -198,30 +206,58 @@ where }; let service = self.http_middleware.service(tower_service); - connections.add(Box::pin(spawn_connection( + connections.push(tokio::spawn(process_connection( ipc, service, stop_handle.clone(), rx, - ))); + ).in_current_span())); id = id.wrapping_add(1); + stopped = stop; } - Err(MonitoredError::Selector(err)) => { - tracing::error!("Error while awaiting a new IPC connection: {:?}", err); + AcceptConnection::Shutdown => { break; } + AcceptConnection::Err((e, stop)) => { + tracing::error!("Error while awaiting a new IPC connection: {:?}", e); + stopped = stop; } - Err(MonitoredError::Shutdown) => break, } } - connections.await; + // FuturesUnordered won't poll anything until this line but because the + // tasks are spawned (so that they can progress independently) + // then this just makes sure that all tasks are completed before + // returning from this function. + while connections.next().await.is_some() {} + } +} + +enum AcceptConnection { + Shutdown, + Established { local_socket_stream: LocalSocketStream, stop: S }, + Err((io::Error, S)), +} + +async fn try_accept_conn(listener: &LocalSocketListener, stopped: S) -> AcceptConnection +where + S: Future + Unpin, +{ + let accept = listener.accept(); + tokio::pin!(accept); + + match futures_util::future::select(accept, stopped).await { + Either::Left((res, stop)) => match res { + Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop }, + Err(e) => AcceptConnection::Err((e, stop)), + }, + Either::Right(_) => AcceptConnection::Shutdown, } } impl std::fmt::Debug for IpcServer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("IpcServer") - .field("endpoint", &self.endpoint.path()) + .field("endpoint", &self.endpoint) .field("cfg", &self.cfg) .field("id_provider", &self.id_provider) .finish() @@ -408,10 +444,10 @@ where } /// Spawns the IPC connection onto a new task -async fn spawn_connection( +async fn process_connection( conn: IpcConn>, service: S, - mut stop_handle: StopHandle, + stop_handle: StopHandle, rx: mpsc::Receiver, ) where S: Service> + Send + 'static, @@ -419,70 +455,34 @@ async fn spawn_connection( S::Future: Send + Unpin, T: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - let task = tokio::task::spawn(async move { - let rx_item = ReceiverStream::new(rx); - let conn = IpcConnDriver { - conn, - service, - pending_calls: Default::default(), - items: Default::default(), - }; - tokio::pin!(conn, rx_item); + let rx_item = ReceiverStream::new(rx); + let conn = IpcConnDriver { + conn, + service, + pending_calls: Default::default(), + items: Default::default(), + }; + tokio::pin!(conn, rx_item); - loop { - tokio::select! { - _ = &mut conn => { - break - } - item = rx_item.next() => { - if let Some(item) = item { - conn.push_back(item); - } - } - _ = stop_handle.shutdown() => { - // shutdown - break + let stopped = stop_handle.shutdown(); + + tokio::pin!(stopped); + + loop { + tokio::select! { + _ = &mut conn => { + break + } + item = rx_item.next() => { + if let Some(item) = item { + conn.push_back(item); } } + _ = &mut stopped=> { + // shutdown + break + } } - }); - - task.await.ok(); -} - -/// This is a glorified select listening for new messages, while also checking the `stop_receiver` -/// signal. -struct Monitored<'a, F> { - future: F, - stop_monitor: &'a StopHandle, -} - -impl<'a, F> Monitored<'a, F> { - fn new(future: F, stop_monitor: &'a StopHandle) -> Self { - Monitored { future, stop_monitor } - } -} - -enum MonitoredError { - Shutdown, - Selector(E), -} - -impl<'a, T, Item> Future for Monitored<'a, Incoming> -where - T: Stream> + Unpin + 'static, - Item: AsyncRead + AsyncWrite, -{ - type Output = Result>, MonitoredError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - if this.stop_monitor.shutdown_requested() { - return Poll::Ready(Err(MonitoredError::Shutdown)) - } - - this.future.poll_accept(cx).map_err(MonitoredError::Selector) } } @@ -734,17 +734,8 @@ impl Builder { /// Finalize the configuration of the server. Consumes the [`Builder`]. pub fn build(self, endpoint: impl AsRef) -> IpcServer { - let endpoint = Endpoint::new(endpoint.as_ref().to_string()); - self.build_with_endpoint(endpoint) - } - - /// Finalize the configuration of the server. Consumes the [`Builder`]. - pub fn build_with_endpoint( - self, - endpoint: Endpoint, - ) -> IpcServer { IpcServer { - endpoint, + endpoint: endpoint.as_ref().to_string(), cfg: self.settings, id_provider: self.id_provider, http_middleware: self.http_middleware, @@ -782,7 +773,18 @@ impl ServerHandle { } } -#[cfg(all(test, unix))] +/// For testing/examples +#[cfg(test)] +pub fn dummy_endpoint() -> String { + let num: u64 = rand::Rng::gen(&mut rand::thread_rng()); + if cfg!(windows) { + format!(r"\\.\pipe\my-pipe-{}", num) + } else { + format!(r"/tmp/my-uds-{}", num) + } +} + +#[cfg(test)] mod tests { use super::*; use crate::client::IpcClientBuilder; @@ -797,7 +799,6 @@ mod tests { types::Request, PendingSubscriptionSink, RpcModule, SubscriptionMessage, }; - use parity_tokio_ipc::dummy_endpoint; use tokio::sync::broadcast; use tokio_stream::wrappers::BroadcastStream; @@ -823,7 +824,7 @@ mod tests { // and you might want to do something smarter if it's // critical that "the most recent item" must be sent when it is produced. if sink.send(notif).await.is_err() { - break Ok(()) + break Ok(()); } closed = c; @@ -848,6 +849,7 @@ mod tests { #[tokio::test] async fn can_set_the_max_response_body_size() { + // init_test_tracing(); let endpoint = dummy_endpoint(); let server = Builder::default().max_response_body_size(100).build(&endpoint); let mut module = RpcModule::new(()); diff --git a/crates/rpc/rpc-builder/src/auth.rs b/crates/rpc/rpc-builder/src/auth.rs index cd21be271..372617257 100644 --- a/crates/rpc/rpc-builder/src/auth.rs +++ b/crates/rpc/rpc-builder/src/auth.rs @@ -13,7 +13,7 @@ use jsonrpsee::{ server::{AlreadyStoppedError, RpcModule}, Methods, }; -pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint}; +pub use reth_ipc::server::Builder as IpcServerBuilder; use reth_engine_primitives::EngineTypes; use reth_evm::ConfigureEvm; @@ -205,8 +205,7 @@ impl AuthServerConfig { let ipc_endpoint_str = ipc_endpoint .clone() .unwrap_or_else(|| constants::DEFAULT_ENGINE_API_IPC_ENDPOINT.to_string()); - let ipc_path = Endpoint::new(ipc_endpoint_str); - let ipc_server = ipc_server_config.build(ipc_path.path()); + let ipc_server = ipc_server_config.build(ipc_endpoint_str); let res = ipc_server .start(module.inner) .await @@ -449,7 +448,7 @@ impl AuthServerHandle { if let Some(ipc_endpoint) = self.ipc_endpoint.clone() { return Some( IpcClientBuilder::default() - .build(Endpoint::new(ipc_endpoint).path()) + .build(ipc_endpoint) .await .expect("Failed to create ipc client"), ) @@ -463,10 +462,7 @@ impl AuthServerHandle { } /// Return an ipc endpoint - pub fn ipc_endpoint(&self) -> Option { - if let Some(ipc_endpoint) = self.ipc_endpoint.clone() { - return Some(Endpoint::new(ipc_endpoint)) - } - None + pub fn ipc_endpoint(&self) -> Option { + self.ipc_endpoint.clone() } } diff --git a/crates/rpc/rpc-builder/src/lib.rs b/crates/rpc/rpc-builder/src/lib.rs index 4b9159e2d..9c28353c9 100644 --- a/crates/rpc/rpc-builder/src/lib.rs +++ b/crates/rpc/rpc-builder/src/lib.rs @@ -172,7 +172,7 @@ use reth_engine_primitives::EngineTypes; use reth_evm::ConfigureEvm; use reth_ipc::server::IpcServer; pub use reth_ipc::server::{ - Builder as IpcServerBuilder, Endpoint, RpcServiceBuilder as IpcRpcServiceBuilder, + Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder, }; use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers}; use reth_provider::{ @@ -1459,7 +1459,7 @@ where /// /// Once the [RpcModule] is built via [RpcModuleBuilder] the servers can be started, See also /// [ServerBuilder::build] and [Server::start](jsonrpsee::server::Server::start). -#[derive(Default)] +#[derive(Default, Debug)] pub struct RpcServerConfig { /// Configs for JSON-RPC Http. http_server_config: Option>, @@ -1476,26 +1476,11 @@ pub struct RpcServerConfig { /// Configs for JSON-RPC IPC server ipc_server_config: Option>, /// The Endpoint where to launch the ipc server - ipc_endpoint: Option, + ipc_endpoint: Option, /// JWT secret for authentication jwt_secret: Option, } -impl fmt::Debug for RpcServerConfig { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("RpcServerConfig") - .field("http_server_config", &self.http_server_config) - .field("http_cors_domains", &self.http_cors_domains) - .field("http_addr", &self.http_addr) - .field("ws_server_config", &self.ws_server_config) - .field("ws_addr", &self.ws_addr) - .field("ipc_server_config", &self.ipc_server_config) - .field("ipc_endpoint", &self.ipc_endpoint.as_ref().map(|endpoint| endpoint.path())) - .field("jwt_secret", &self.jwt_secret) - .finish() - } -} - /// === impl RpcServerConfig === impl RpcServerConfig { @@ -1599,7 +1584,7 @@ impl RpcServerConfig { /// /// Default is [DEFAULT_IPC_ENDPOINT] pub fn with_ipc_endpoint(mut self, path: impl Into) -> Self { - self.ipc_endpoint = Some(Endpoint::new(path.into())); + self.ipc_endpoint = Some(path.into()); self } @@ -1628,9 +1613,9 @@ impl RpcServerConfig { self.ws_addr } - /// Returns the [Endpoint] of the ipc server - pub fn ipc_endpoint(&self) -> Option<&Endpoint> { - self.ipc_endpoint.as_ref() + /// Returns the endpoint of the ipc server + pub fn ipc_endpoint(&self) -> Option { + self.ipc_endpoint.clone() } /// Convenience function to do [RpcServerConfig::build] and [RpcServer::start] in one step @@ -1759,12 +1744,10 @@ impl RpcServerConfig { if let Some(builder) = self.ipc_server_config { let metrics = modules.ipc.as_ref().map(RpcRequestMetrics::ipc).unwrap_or_default(); - let ipc_path = self - .ipc_endpoint - .unwrap_or_else(|| Endpoint::new(DEFAULT_IPC_ENDPOINT.to_string())); + let ipc_path = self.ipc_endpoint.unwrap_or_else(|| DEFAULT_IPC_ENDPOINT.into()); let ipc = builder .set_rpc_middleware(IpcRpcServiceBuilder::new().layer(metrics)) - .build(ipc_path.path()); + .build(ipc_path); server.ipc = Some(ipc); } @@ -2152,8 +2135,8 @@ impl RpcServer { self.ws_http.ws_local_addr } - /// Returns the [`Endpoint`] of the ipc server if started. - pub fn ipc_endpoint(&self) -> Option<&Endpoint> { + /// Returns the endpoint of the ipc server if started. + pub fn ipc_endpoint(&self) -> Option { self.ipc.as_ref().map(|ipc| ipc.endpoint()) } @@ -2161,7 +2144,7 @@ impl RpcServer { /// /// This returns an [RpcServerHandle] that's connected to the server task(s) until the server is /// stopped or the [RpcServerHandle] is dropped. - #[instrument(name = "start", skip_all, fields(http = ?self.http_local_addr(), ws = ?self.ws_local_addr(), ipc = ?self.ipc_endpoint().map(|ipc|ipc.path())), target = "rpc", level = "TRACE")] + #[instrument(name = "start", skip_all, fields(http = ?self.http_local_addr(), ws = ?self.ws_local_addr(), ipc = ?self.ipc_endpoint()), target = "rpc", level = "TRACE")] pub async fn start(self, modules: TransportRpcModules) -> Result { trace!(target: "rpc", "staring RPC server"); let Self { ws_http, ipc: ipc_server } = self; @@ -2183,7 +2166,7 @@ impl RpcServer { if let Some((server, module)) = ipc_server.and_then(|server| ipc.map(|module| (server, module))) { - handle.ipc_endpoint = Some(server.endpoint().path().to_string()); + handle.ipc_endpoint = Some(server.endpoint()); handle.ipc = Some(server.start(module).await?); } diff --git a/deny.toml b/deny.toml index 347b60965..61cced4fb 100644 --- a/deny.toml +++ b/deny.toml @@ -58,6 +58,7 @@ exceptions = [ { allow = ["CC0-1.0"], name = "secp256k1-sys" }, { allow = ["CC0-1.0"], name = "tiny-keccak" }, { allow = ["CC0-1.0"], name = "more-asserts" }, + { allow = ["CC0-1.0"], name = "to_method" }, { allow = ["CC0-1.0"], name = "aurora-engine-modexp" }, # TODO: decide on MPL-2.0 handling # These dependencies are grandfathered in in https://github.com/paradigmxyz/reth/pull/6980