feat: replace low level IPC with interprocess (#7922)

This commit is contained in:
Abner Zheng
2024-04-27 11:47:34 +08:00
committed by GitHub
parent ffa36b7348
commit 2f052a8112
5 changed files with 77 additions and 184 deletions

1
Cargo.lock generated
View File

@ -6915,7 +6915,6 @@ dependencies = [
"tokio-util",
"tower",
"tracing",
"windows-sys 0.52.0",
]
[[package]]

View File

@ -31,9 +31,6 @@ thiserror.workspace = true
futures-util = "0.3.30"
interprocess = { version = "1.2.1", features = ["tokio_support"] }
[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.52.0", features = ["Win32_Foundation"] }
[dev-dependencies]
tokio-stream = { workspace = true, features = ["sync"] }
reth-tracing.workspace = true

View File

@ -1,24 +1,85 @@
//! [`jsonrpsee`] transport adapter implementation for IPC.
use std::{
io,
path::{Path, PathBuf},
};
use crate::stream_codec::StreamCodec;
use futures::StreamExt;
use interprocess::local_socket::tokio::{LocalSocketStream, OwnedReadHalf, OwnedWriteHalf};
use jsonrpsee::{
async_client::{Client, ClientBuilder},
core::client::{TransportReceiverT, TransportSenderT},
core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
};
use std::io;
use tokio::io::AsyncWriteExt;
use tokio_util::{
codec::FramedRead,
compat::{Compat, FuturesAsyncReadCompatExt, FuturesAsyncWriteCompatExt},
};
#[cfg(unix)]
use crate::client::unix::IpcTransportClientBuilder;
#[cfg(windows)]
use crate::client::win::IpcTransportClientBuilder;
/// Sending end of IPC transport.
#[derive(Debug)]
pub(crate) struct Sender {
inner: Compat<OwnedWriteHalf>,
}
#[cfg(unix)]
mod unix;
#[cfg(windows)]
mod win;
#[async_trait::async_trait]
impl TransportSenderT for Sender {
type Error = IpcError;
/// Sends out a request. Returns a Future that finishes when the request has been successfully
/// sent.
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
Ok(self.inner.write_all(msg.as_bytes()).await?)
}
async fn send_ping(&mut self) -> Result<(), Self::Error> {
tracing::trace!("send ping - not implemented");
Err(IpcError::NotSupported)
}
/// Close the connection.
async fn close(&mut self) -> Result<(), Self::Error> {
Ok(())
}
}
/// Receiving end of IPC transport.
#[derive(Debug)]
pub(crate) struct Receiver {
pub(crate) inner: FramedRead<Compat<OwnedReadHalf>, StreamCodec>,
}
#[async_trait::async_trait]
impl TransportReceiverT for Receiver {
type Error = IpcError;
/// Returns a Future resolving when the server sent us something back.
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
}
}
/// Builder for IPC transport [`Sender`] and [`Receiver`] pair.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub(crate) struct IpcTransportClientBuilder;
impl IpcTransportClientBuilder {
pub(crate) async fn build(
self,
endpoint: impl AsRef<str>,
) -> Result<(Sender, Receiver), IpcError> {
let endpoint = endpoint.as_ref().to_string();
let conn = LocalSocketStream::connect(endpoint.clone())
.await
.map_err(|err| IpcError::FailedToConnect { path: endpoint, err })?;
let (rhlf, whlf) = conn.into_split();
Ok((
Sender { inner: whlf.compat_write() },
Receiver { inner: FramedRead::new(rhlf.compat(), StreamCodec::stream_incoming()) },
))
}
}
/// Builder type for [`Client`]
#[derive(Clone, Default, Debug)]
@ -37,7 +98,7 @@ impl IpcClientBuilder {
/// # Ok(())
/// # }
/// ```
pub async fn build(self, path: impl AsRef<Path>) -> Result<Client, IpcError> {
pub async fn build(self, path: impl AsRef<str>) -> Result<Client, IpcError> {
let (tx, rx) = IpcTransportClientBuilder::default().build(path).await?;
Ok(self.build_with_tokio(tx, rx))
}
@ -66,7 +127,7 @@ pub enum IpcError {
FailedToConnect {
/// The path of the socket.
#[doc(hidden)]
path: PathBuf,
path: String,
/// The error occurred while connecting.
#[doc(hidden)]
err: io::Error,

View File

@ -1,82 +0,0 @@
//! [`jsonrpsee`] transport adapter implementation for Unix IPC by using Unix Sockets.
use crate::{client::IpcError, stream_codec::StreamCodec};
use futures::StreamExt;
use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT};
use std::path::Path;
use tokio::{
io::AsyncWriteExt,
net::{
unix::{OwnedReadHalf, OwnedWriteHalf},
UnixStream,
},
};
use tokio_util::codec::FramedRead;
/// Sending end of IPC transport.
#[derive(Debug)]
pub(crate) struct Sender {
inner: OwnedWriteHalf,
}
#[async_trait::async_trait]
impl TransportSenderT for Sender {
type Error = IpcError;
/// Sends out a request. Returns a Future that finishes when the request has been successfully
/// sent.
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
Ok(self.inner.write_all(msg.as_bytes()).await?)
}
async fn send_ping(&mut self) -> Result<(), Self::Error> {
tracing::trace!("send ping - not implemented");
Err(IpcError::NotSupported)
}
/// Close the connection.
async fn close(&mut self) -> Result<(), Self::Error> {
Ok(())
}
}
/// Receiving end of IPC transport.
#[derive(Debug)]
pub(crate) struct Receiver {
pub(crate) inner: FramedRead<OwnedReadHalf, StreamCodec>,
}
#[async_trait::async_trait]
impl TransportReceiverT for Receiver {
type Error = IpcError;
/// Returns a Future resolving when the server sent us something back.
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
}
}
/// Builder for IPC transport [`Sender`] and [`Receiver`] pair.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub(crate) struct IpcTransportClientBuilder;
impl IpcTransportClientBuilder {
pub(crate) async fn build(
self,
path: impl AsRef<Path>,
) -> Result<(Sender, Receiver), IpcError> {
let path = path.as_ref();
let stream = UnixStream::connect(path)
.await
.map_err(|err| IpcError::FailedToConnect { path: path.to_path_buf(), err })?;
let (rhlf, whlf) = stream.into_split();
Ok((
Sender { inner: whlf },
Receiver { inner: FramedRead::new(rhlf, StreamCodec::stream_incoming()) },
))
}
}

View File

@ -1,82 +0,0 @@
//! [`jsonrpsee`] transport adapter implementation for Windows IPC by using NamedPipes.
use crate::{client::IpcError, stream_codec::StreamCodec};
use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT};
use std::{path::Path, sync::Arc};
use tokio::{
io::AsyncWriteExt,
net::windows::named_pipe::{ClientOptions, NamedPipeClient},
time,
time::Duration,
};
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
/// Sending end of IPC transport.
#[derive(Debug)]
pub struct Sender {
inner: Arc<NamedPipeClient>,
}
#[async_trait::async_trait]
impl TransportSenderT for Sender {
type Error = IpcError;
/// Sends out a request. Returns a Future that finishes when the request has been successfully
/// sent.
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
Ok(self.inner.write_all(msg.as_bytes()).await?)
}
async fn send_ping(&mut self) -> Result<(), Self::Error> {
tracing::trace!("send ping - not implemented");
Err(IpcError::NotSupported)
}
/// Close the connection.
async fn close(&mut self) -> Result<(), Self::Error> {
Ok(())
}
}
/// Receiving end of IPC transport.
#[derive(Debug)]
pub struct Receiver {
inner: FramedRead<Arc<NamedPipeClient>, StreamCodec>,
}
#[async_trait::async_trait]
impl TransportReceiverT for Receiver {
type Error = IpcError;
/// Returns a Future resolving when the server sent us something back.
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
}
}
/// Builder for IPC transport [`crate::client::win::Sender`] and [`crate::client::win::Receiver`]
/// pair.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct IpcTransportClientBuilder;
impl IpcTransportClientBuilder {
pub async fn build(self, path: impl AsRef<Path>) -> Result<(Sender, Receiver), IpcError> {
let addr = path.as_ref().as_os_str();
let client = loop {
match ClientOptions::new().open(addr) {
Ok(client) => break client,
Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
Err(e) => return IpcError::FailedToConnect { path: path.to_path_buf(), err: e },
}
time::sleep(Duration::from_mills(50)).await;
};
let client = Arc::new(client);
Ok((
Sender { inner: client.clone() },
Receiver { inner: FramedRead::new(client, StreamCodec::stream_incoming()) },
))
}
}