From 2f052a81120507dc90de4352cb1dca38de86836a Mon Sep 17 00:00:00 2001 From: Abner Zheng Date: Sat, 27 Apr 2024 11:47:34 +0800 Subject: [PATCH] feat: replace low level IPC with interprocess (#7922) --- Cargo.lock | 1 - crates/rpc/ipc/Cargo.toml | 3 - crates/rpc/ipc/src/client/mod.rs | 93 +++++++++++++++++++++++++------ crates/rpc/ipc/src/client/unix.rs | 82 --------------------------- crates/rpc/ipc/src/client/win.rs | 82 --------------------------- 5 files changed, 77 insertions(+), 184 deletions(-) delete mode 100644 crates/rpc/ipc/src/client/unix.rs delete mode 100644 crates/rpc/ipc/src/client/win.rs diff --git a/Cargo.lock b/Cargo.lock index c046501c6..c66304e91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6915,7 +6915,6 @@ dependencies = [ "tokio-util", "tower", "tracing", - "windows-sys 0.52.0", ] [[package]] diff --git a/crates/rpc/ipc/Cargo.toml b/crates/rpc/ipc/Cargo.toml index 094fa5759..af6e64db1 100644 --- a/crates/rpc/ipc/Cargo.toml +++ b/crates/rpc/ipc/Cargo.toml @@ -31,9 +31,6 @@ thiserror.workspace = true futures-util = "0.3.30" interprocess = { version = "1.2.1", features = ["tokio_support"] } -[target.'cfg(windows)'.dependencies] -windows-sys = { version = "0.52.0", features = ["Win32_Foundation"] } - [dev-dependencies] tokio-stream = { workspace = true, features = ["sync"] } reth-tracing.workspace = true diff --git a/crates/rpc/ipc/src/client/mod.rs b/crates/rpc/ipc/src/client/mod.rs index 8ca4b5406..05ea7ed58 100644 --- a/crates/rpc/ipc/src/client/mod.rs +++ b/crates/rpc/ipc/src/client/mod.rs @@ -1,24 +1,85 @@ //! [`jsonrpsee`] transport adapter implementation for IPC. -use std::{ - io, - path::{Path, PathBuf}, -}; - +use crate::stream_codec::StreamCodec; +use futures::StreamExt; +use interprocess::local_socket::tokio::{LocalSocketStream, OwnedReadHalf, OwnedWriteHalf}; use jsonrpsee::{ async_client::{Client, ClientBuilder}, - core::client::{TransportReceiverT, TransportSenderT}, + core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT}, +}; +use std::io; +use tokio::io::AsyncWriteExt; +use tokio_util::{ + codec::FramedRead, + compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt}, }; -#[cfg(unix)] -use crate::client::unix::IpcTransportClientBuilder; -#[cfg(windows)] -use crate::client::win::IpcTransportClientBuilder; +/// Sending end of IPC transport. +#[derive(Debug)] +pub(crate) struct Sender { + inner: Compat, +} -#[cfg(unix)] -mod unix; -#[cfg(windows)] -mod win; +#[async_trait::async_trait] +impl TransportSenderT for Sender { + type Error = IpcError; + + /// Sends out a request. Returns a Future that finishes when the request has been successfully + /// sent. + async fn send(&mut self, msg: String) -> Result<(), Self::Error> { + Ok(self.inner.write_all(msg.as_bytes()).await?) + } + + async fn send_ping(&mut self) -> Result<(), Self::Error> { + tracing::trace!("send ping - not implemented"); + Err(IpcError::NotSupported) + } + + /// Close the connection. + async fn close(&mut self) -> Result<(), Self::Error> { + Ok(()) + } +} + +/// Receiving end of IPC transport. +#[derive(Debug)] +pub(crate) struct Receiver { + pub(crate) inner: FramedRead, StreamCodec>, +} + +#[async_trait::async_trait] +impl TransportReceiverT for Receiver { + type Error = IpcError; + + /// Returns a Future resolving when the server sent us something back. + async fn receive(&mut self) -> Result { + self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?))) + } +} + +/// Builder for IPC transport [`Sender`] and [`Receiver`] pair. +#[derive(Debug, Clone, Default)] +#[non_exhaustive] +pub(crate) struct IpcTransportClientBuilder; + +impl IpcTransportClientBuilder { + pub(crate) async fn build( + self, + endpoint: impl AsRef, + ) -> Result<(Sender, Receiver), IpcError> { + let endpoint = endpoint.as_ref().to_string(); + let conn = LocalSocketStream::connect(endpoint.clone()) + .await + .map_err(|err| IpcError::FailedToConnect { path: endpoint, err })?; + + let (rhlf, whlf) = conn.into_split(); + + Ok(( + Sender { inner: whlf.compat_write() }, + Receiver { inner: FramedRead::new(rhlf.compat(), StreamCodec::stream_incoming()) }, + )) + } +} /// Builder type for [`Client`] #[derive(Clone, Default, Debug)] @@ -37,7 +98,7 @@ impl IpcClientBuilder { /// # Ok(()) /// # } /// ``` - pub async fn build(self, path: impl AsRef) -> Result { + pub async fn build(self, path: impl AsRef) -> Result { let (tx, rx) = IpcTransportClientBuilder::default().build(path).await?; Ok(self.build_with_tokio(tx, rx)) } @@ -66,7 +127,7 @@ pub enum IpcError { FailedToConnect { /// The path of the socket. #[doc(hidden)] - path: PathBuf, + path: String, /// The error occurred while connecting. #[doc(hidden)] err: io::Error, diff --git a/crates/rpc/ipc/src/client/unix.rs b/crates/rpc/ipc/src/client/unix.rs deleted file mode 100644 index c7ed7bc7a..000000000 --- a/crates/rpc/ipc/src/client/unix.rs +++ /dev/null @@ -1,82 +0,0 @@ -//! [`jsonrpsee`] transport adapter implementation for Unix IPC by using Unix Sockets. - -use crate::{client::IpcError, stream_codec::StreamCodec}; -use futures::StreamExt; -use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT}; -use std::path::Path; -use tokio::{ - io::AsyncWriteExt, - net::{ - unix::{OwnedReadHalf, OwnedWriteHalf}, - UnixStream, - }, -}; -use tokio_util::codec::FramedRead; - -/// Sending end of IPC transport. -#[derive(Debug)] -pub(crate) struct Sender { - inner: OwnedWriteHalf, -} - -#[async_trait::async_trait] -impl TransportSenderT for Sender { - type Error = IpcError; - - /// Sends out a request. Returns a Future that finishes when the request has been successfully - /// sent. - async fn send(&mut self, msg: String) -> Result<(), Self::Error> { - Ok(self.inner.write_all(msg.as_bytes()).await?) - } - - async fn send_ping(&mut self) -> Result<(), Self::Error> { - tracing::trace!("send ping - not implemented"); - Err(IpcError::NotSupported) - } - - /// Close the connection. - async fn close(&mut self) -> Result<(), Self::Error> { - Ok(()) - } -} - -/// Receiving end of IPC transport. -#[derive(Debug)] -pub(crate) struct Receiver { - pub(crate) inner: FramedRead, -} - -#[async_trait::async_trait] -impl TransportReceiverT for Receiver { - type Error = IpcError; - - /// Returns a Future resolving when the server sent us something back. - async fn receive(&mut self) -> Result { - self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?))) - } -} - -/// Builder for IPC transport [`Sender`] and [`Receiver`] pair. -#[derive(Debug, Clone, Default)] -#[non_exhaustive] -pub(crate) struct IpcTransportClientBuilder; - -impl IpcTransportClientBuilder { - pub(crate) async fn build( - self, - path: impl AsRef, - ) -> Result<(Sender, Receiver), IpcError> { - let path = path.as_ref(); - - let stream = UnixStream::connect(path) - .await - .map_err(|err| IpcError::FailedToConnect { path: path.to_path_buf(), err })?; - - let (rhlf, whlf) = stream.into_split(); - - Ok(( - Sender { inner: whlf }, - Receiver { inner: FramedRead::new(rhlf, StreamCodec::stream_incoming()) }, - )) - } -} diff --git a/crates/rpc/ipc/src/client/win.rs b/crates/rpc/ipc/src/client/win.rs deleted file mode 100644 index 69b3140fe..000000000 --- a/crates/rpc/ipc/src/client/win.rs +++ /dev/null @@ -1,82 +0,0 @@ -//! [`jsonrpsee`] transport adapter implementation for Windows IPC by using NamedPipes. - -use crate::{client::IpcError, stream_codec::StreamCodec}; -use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT}; -use std::{path::Path, sync::Arc}; -use tokio::{ - io::AsyncWriteExt, - net::windows::named_pipe::{ClientOptions, NamedPipeClient}, - time, - time::Duration, -}; -use tokio_stream::StreamExt; -use tokio_util::codec::FramedRead; -use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY; - -/// Sending end of IPC transport. -#[derive(Debug)] -pub struct Sender { - inner: Arc, -} - -#[async_trait::async_trait] -impl TransportSenderT for Sender { - type Error = IpcError; - - /// Sends out a request. Returns a Future that finishes when the request has been successfully - /// sent. - async fn send(&mut self, msg: String) -> Result<(), Self::Error> { - Ok(self.inner.write_all(msg.as_bytes()).await?) - } - - async fn send_ping(&mut self) -> Result<(), Self::Error> { - tracing::trace!("send ping - not implemented"); - Err(IpcError::NotSupported) - } - - /// Close the connection. - async fn close(&mut self) -> Result<(), Self::Error> { - Ok(()) - } -} - -/// Receiving end of IPC transport. -#[derive(Debug)] -pub struct Receiver { - inner: FramedRead, StreamCodec>, -} - -#[async_trait::async_trait] -impl TransportReceiverT for Receiver { - type Error = IpcError; - - /// Returns a Future resolving when the server sent us something back. - async fn receive(&mut self) -> Result { - self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?))) - } -} - -/// Builder for IPC transport [`crate::client::win::Sender`] and [`crate::client::win::Receiver`] -/// pair. -#[derive(Debug, Clone, Default)] -#[non_exhaustive] -pub struct IpcTransportClientBuilder; - -impl IpcTransportClientBuilder { - pub async fn build(self, path: impl AsRef) -> Result<(Sender, Receiver), IpcError> { - let addr = path.as_ref().as_os_str(); - let client = loop { - match ClientOptions::new().open(addr) { - Ok(client) => break client, - Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (), - Err(e) => return IpcError::FailedToConnect { path: path.to_path_buf(), err: e }, - } - time::sleep(Duration::from_mills(50)).await; - }; - let client = Arc::new(client); - Ok(( - Sender { inner: client.clone() }, - Receiver { inner: FramedRead::new(client, StreamCodec::stream_incoming()) }, - )) - } -}