mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Add windows Ipc Client implementation (#7187)
This commit is contained in:
@ -15,7 +15,6 @@ workspace = true
|
||||
|
||||
# async/net
|
||||
futures.workspace = true
|
||||
parity-tokio-ipc = "0.9.0"
|
||||
tokio = { workspace = true, features = ["net", "time", "rt-multi-thread"] }
|
||||
tokio-util = { workspace = true, features = ["codec"] }
|
||||
tokio-stream.workspace = true
|
||||
@ -30,7 +29,12 @@ tracing.workspace = true
|
||||
bytes.workspace = true
|
||||
thiserror.workspace = true
|
||||
futures-util = "0.3.30"
|
||||
interprocess = { version = "1.2.1", features = ["tokio_support"] }
|
||||
|
||||
[target.'cfg(windows)'.dependencies]
|
||||
windows-sys = { version = "0.52.0", features = ["Win32_Foundation"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio-stream = { workspace = true, features = ["sync"] }
|
||||
reth-tracing.workspace = true
|
||||
rand.workspace = true
|
||||
|
||||
@ -1,151 +0,0 @@
|
||||
//! [`jsonrpsee`] transport adapter implementation for IPC.
|
||||
|
||||
use crate::stream_codec::StreamCodec;
|
||||
use futures::StreamExt;
|
||||
use jsonrpsee::{
|
||||
async_client::{Client, ClientBuilder},
|
||||
core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
|
||||
};
|
||||
use std::{
|
||||
io,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tokio::{io::AsyncWriteExt, net::UnixStream};
|
||||
use tokio_util::codec::FramedRead;
|
||||
|
||||
/// Builder type for [`Client`]
|
||||
#[derive(Clone, Default, Debug)]
|
||||
#[non_exhaustive]
|
||||
pub struct IpcClientBuilder;
|
||||
|
||||
impl IpcClientBuilder {
|
||||
/// Connects to a IPC socket
|
||||
pub async fn build(self, path: impl AsRef<Path>) -> Result<Client, IpcError> {
|
||||
let (tx, rx) = IpcTransportClientBuilder::default().build(path).await?;
|
||||
Ok(self.build_with_tokio(tx, rx))
|
||||
}
|
||||
|
||||
/// Uses the sender and receiver channels to connect to the socket.
|
||||
pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
|
||||
where
|
||||
S: TransportSenderT + Send,
|
||||
R: TransportReceiverT + Send,
|
||||
{
|
||||
ClientBuilder::default().build_with_tokio(sender, receiver)
|
||||
}
|
||||
}
|
||||
|
||||
/// Sending end of IPC transport.
|
||||
#[derive(Debug)]
|
||||
pub struct Sender {
|
||||
inner: tokio::net::unix::OwnedWriteHalf,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TransportSenderT for Sender {
|
||||
type Error = IpcError;
|
||||
|
||||
/// Sends out a request. Returns a Future that finishes when the request has been successfully
|
||||
/// sent.
|
||||
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
|
||||
Ok(self.inner.write_all(msg.as_bytes()).await?)
|
||||
}
|
||||
|
||||
async fn send_ping(&mut self) -> Result<(), Self::Error> {
|
||||
tracing::trace!("send ping - not implemented");
|
||||
Err(IpcError::NotSupported)
|
||||
}
|
||||
|
||||
/// Close the connection.
|
||||
async fn close(&mut self) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Receiving end of IPC transport.
|
||||
#[derive(Debug)]
|
||||
pub struct Receiver {
|
||||
inner: FramedRead<tokio::net::unix::OwnedReadHalf, StreamCodec>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TransportReceiverT for Receiver {
|
||||
type Error = IpcError;
|
||||
|
||||
/// Returns a Future resolving when the server sent us something back.
|
||||
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
|
||||
self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder for IPC transport [`Sender`] and [`Receiver`] pair.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[non_exhaustive]
|
||||
pub struct IpcTransportClientBuilder;
|
||||
|
||||
impl IpcTransportClientBuilder {
|
||||
/// Try to establish the connection.
|
||||
///
|
||||
/// ```
|
||||
/// use jsonrpsee::{core::client::ClientT, rpc_params};
|
||||
/// use reth_ipc::client::IpcClientBuilder;
|
||||
/// # async fn run_client() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
/// let client = IpcClientBuilder::default().build("/tmp/my-uds").await?;
|
||||
/// let response: String = client.request("say_hello", rpc_params![]).await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn build(self, path: impl AsRef<Path>) -> Result<(Sender, Receiver), IpcError> {
|
||||
let path = path.as_ref();
|
||||
|
||||
let stream = UnixStream::connect(path)
|
||||
.await
|
||||
.map_err(|err| IpcError::FailedToConnect { path: path.to_path_buf(), err })?;
|
||||
|
||||
let (rhlf, whlf) = stream.into_split();
|
||||
|
||||
Ok((
|
||||
Sender { inner: whlf },
|
||||
Receiver { inner: FramedRead::new(rhlf, StreamCodec::stream_incoming()) },
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Error variants that can happen in IPC transport.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum IpcError {
|
||||
/// Operation not supported
|
||||
#[error("operation not supported")]
|
||||
NotSupported,
|
||||
/// Stream was closed
|
||||
#[error("stream closed")]
|
||||
Closed,
|
||||
/// Thrown when failed to establish a socket connection.
|
||||
#[error("failed to connect to socket {path}: {err}")]
|
||||
FailedToConnect {
|
||||
/// The path of the socket.
|
||||
#[doc(hidden)]
|
||||
path: PathBuf,
|
||||
/// The error occurred while connecting.
|
||||
#[doc(hidden)]
|
||||
err: io::Error,
|
||||
},
|
||||
/// Wrapped IO Error
|
||||
#[error(transparent)]
|
||||
Io(#[from] io::Error),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use parity_tokio_ipc::{dummy_endpoint, Endpoint};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_connect() {
|
||||
let endpoint = dummy_endpoint();
|
||||
let _incoming = Endpoint::new(endpoint.clone()).incoming().unwrap();
|
||||
|
||||
let (tx, rx) = IpcTransportClientBuilder::default().build(endpoint).await.unwrap();
|
||||
let _ = IpcClientBuilder::default().build_with_tokio(tx, rx);
|
||||
}
|
||||
}
|
||||
97
crates/rpc/ipc/src/client/mod.rs
Normal file
97
crates/rpc/ipc/src/client/mod.rs
Normal file
@ -0,0 +1,97 @@
|
||||
//! [`jsonrpsee`] transport adapter implementation for IPC.
|
||||
|
||||
use std::{
|
||||
io,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
|
||||
use jsonrpsee::{
|
||||
async_client::{Client, ClientBuilder},
|
||||
core::client::{TransportReceiverT, TransportSenderT},
|
||||
};
|
||||
|
||||
#[cfg(unix)]
|
||||
use crate::client::unix::IpcTransportClientBuilder;
|
||||
#[cfg(windows)]
|
||||
use crate::client::win::IpcTransportClientBuilder;
|
||||
|
||||
#[cfg(unix)]
|
||||
mod unix;
|
||||
#[cfg(windows)]
|
||||
mod win;
|
||||
|
||||
/// Builder type for [`Client`]
|
||||
#[derive(Clone, Default, Debug)]
|
||||
#[non_exhaustive]
|
||||
pub struct IpcClientBuilder;
|
||||
|
||||
impl IpcClientBuilder {
|
||||
/// Connects to a IPC socket
|
||||
///
|
||||
/// ```
|
||||
/// use jsonrpsee::{core::client::ClientT, rpc_params};
|
||||
/// use reth_ipc::client::IpcClientBuilder;
|
||||
/// # async fn run_client() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
/// let client = IpcClientBuilder::default().build("/tmp/my-uds").await?;
|
||||
/// let response: String = client.request("say_hello", rpc_params![]).await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn build(self, path: impl AsRef<Path>) -> Result<Client, IpcError> {
|
||||
let (tx, rx) = IpcTransportClientBuilder::default().build(path).await?;
|
||||
Ok(self.build_with_tokio(tx, rx))
|
||||
}
|
||||
|
||||
/// Uses the sender and receiver channels to connect to the socket.
|
||||
pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
|
||||
where
|
||||
S: TransportSenderT + Send,
|
||||
R: TransportReceiverT + Send,
|
||||
{
|
||||
ClientBuilder::default().build_with_tokio(sender, receiver)
|
||||
}
|
||||
}
|
||||
|
||||
/// Error variants that can happen in IPC transport.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum IpcError {
|
||||
/// Operation not supported
|
||||
#[error("operation not supported")]
|
||||
NotSupported,
|
||||
/// Stream was closed
|
||||
#[error("stream closed")]
|
||||
Closed,
|
||||
/// Thrown when failed to establish a socket connection.
|
||||
#[error("failed to connect to socket {path}: {err}")]
|
||||
FailedToConnect {
|
||||
/// The path of the socket.
|
||||
#[doc(hidden)]
|
||||
path: PathBuf,
|
||||
/// The error occurred while connecting.
|
||||
#[doc(hidden)]
|
||||
err: io::Error,
|
||||
},
|
||||
/// Wrapped IO Error
|
||||
#[error(transparent)]
|
||||
Io(#[from] io::Error),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::server::dummy_endpoint;
|
||||
use interprocess::local_socket::tokio::LocalSocketListener;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_connect() {
|
||||
let endpoint = dummy_endpoint();
|
||||
let binding = LocalSocketListener::bind(endpoint.clone()).unwrap();
|
||||
tokio::spawn(async move {
|
||||
let _x = binding.accept().await;
|
||||
});
|
||||
|
||||
let (tx, rx) = IpcTransportClientBuilder::default().build(endpoint).await.unwrap();
|
||||
let _ = IpcClientBuilder::default().build_with_tokio(tx, rx);
|
||||
}
|
||||
}
|
||||
82
crates/rpc/ipc/src/client/unix.rs
Normal file
82
crates/rpc/ipc/src/client/unix.rs
Normal file
@ -0,0 +1,82 @@
|
||||
//! [`jsonrpsee`] transport adapter implementation for Unix IPC by using Unix Sockets.
|
||||
|
||||
use crate::{client::IpcError, stream_codec::StreamCodec};
|
||||
use futures::StreamExt;
|
||||
use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT};
|
||||
use std::path::Path;
|
||||
use tokio::{
|
||||
io::AsyncWriteExt,
|
||||
net::{
|
||||
unix::{OwnedReadHalf, OwnedWriteHalf},
|
||||
UnixStream,
|
||||
},
|
||||
};
|
||||
use tokio_util::codec::FramedRead;
|
||||
|
||||
/// Sending end of IPC transport.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Sender {
|
||||
inner: OwnedWriteHalf,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TransportSenderT for Sender {
|
||||
type Error = IpcError;
|
||||
|
||||
/// Sends out a request. Returns a Future that finishes when the request has been successfully
|
||||
/// sent.
|
||||
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
|
||||
Ok(self.inner.write_all(msg.as_bytes()).await?)
|
||||
}
|
||||
|
||||
async fn send_ping(&mut self) -> Result<(), Self::Error> {
|
||||
tracing::trace!("send ping - not implemented");
|
||||
Err(IpcError::NotSupported)
|
||||
}
|
||||
|
||||
/// Close the connection.
|
||||
async fn close(&mut self) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Receiving end of IPC transport.
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct Receiver {
|
||||
pub(crate) inner: FramedRead<OwnedReadHalf, StreamCodec>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TransportReceiverT for Receiver {
|
||||
type Error = IpcError;
|
||||
|
||||
/// Returns a Future resolving when the server sent us something back.
|
||||
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
|
||||
self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder for IPC transport [`Sender`] and [`Receiver`] pair.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[non_exhaustive]
|
||||
pub(crate) struct IpcTransportClientBuilder;
|
||||
|
||||
impl IpcTransportClientBuilder {
|
||||
pub(crate) async fn build(
|
||||
self,
|
||||
path: impl AsRef<Path>,
|
||||
) -> Result<(Sender, Receiver), IpcError> {
|
||||
let path = path.as_ref();
|
||||
|
||||
let stream = UnixStream::connect(path)
|
||||
.await
|
||||
.map_err(|err| IpcError::FailedToConnect { path: path.to_path_buf(), err })?;
|
||||
|
||||
let (rhlf, whlf) = stream.into_split();
|
||||
|
||||
Ok((
|
||||
Sender { inner: whlf },
|
||||
Receiver { inner: FramedRead::new(rhlf, StreamCodec::stream_incoming()) },
|
||||
))
|
||||
}
|
||||
}
|
||||
82
crates/rpc/ipc/src/client/win.rs
Normal file
82
crates/rpc/ipc/src/client/win.rs
Normal file
@ -0,0 +1,82 @@
|
||||
//! [`jsonrpsee`] transport adapter implementation for Windows IPC by using NamedPipes.
|
||||
|
||||
use crate::{client::IpcError, stream_codec::StreamCodec};
|
||||
use jsonrpsee::core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT};
|
||||
use std::{path::Path, sync::Arc};
|
||||
use tokio::{
|
||||
io::AsyncWriteExt,
|
||||
net::windows::named_pipe::{ClientOptions, NamedPipeClient},
|
||||
time,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_util::codec::FramedRead;
|
||||
use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
|
||||
|
||||
/// Sending end of IPC transport.
|
||||
#[derive(Debug)]
|
||||
pub struct Sender {
|
||||
inner: Arc<NamedPipeClient>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TransportSenderT for Sender {
|
||||
type Error = IpcError;
|
||||
|
||||
/// Sends out a request. Returns a Future that finishes when the request has been successfully
|
||||
/// sent.
|
||||
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
|
||||
Ok(self.inner.write_all(msg.as_bytes()).await?)
|
||||
}
|
||||
|
||||
async fn send_ping(&mut self) -> Result<(), Self::Error> {
|
||||
tracing::trace!("send ping - not implemented");
|
||||
Err(IpcError::NotSupported)
|
||||
}
|
||||
|
||||
/// Close the connection.
|
||||
async fn close(&mut self) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Receiving end of IPC transport.
|
||||
#[derive(Debug)]
|
||||
pub struct Receiver {
|
||||
inner: FramedRead<Arc<NamedPipeClient>, StreamCodec>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TransportReceiverT for Receiver {
|
||||
type Error = IpcError;
|
||||
|
||||
/// Returns a Future resolving when the server sent us something back.
|
||||
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
|
||||
self.inner.next().await.map_or(Err(IpcError::Closed), |val| Ok(ReceivedMessage::Text(val?)))
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder for IPC transport [`crate::client::win::Sender`] and [`crate::client::win::Receiver`]
|
||||
/// pair.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[non_exhaustive]
|
||||
pub struct IpcTransportClientBuilder;
|
||||
|
||||
impl IpcTransportClientBuilder {
|
||||
pub async fn build(self, path: impl AsRef<Path>) -> Result<(Sender, Receiver), IpcError> {
|
||||
let addr = path.as_ref().as_os_str();
|
||||
let client = loop {
|
||||
match ClientOptions::new().open(addr) {
|
||||
Ok(client) => break client,
|
||||
Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
|
||||
Err(e) => return IpcError::FailedToConnect { path: path.to_path_buf(), err: e },
|
||||
}
|
||||
time::sleep(Duration::from_mills(50)).await;
|
||||
};
|
||||
let client = Arc::new(client);
|
||||
Ok((
|
||||
Sender { inner: client.clone() },
|
||||
Receiver { inner: FramedRead::new(client, StreamCodec::stream_incoming()) },
|
||||
))
|
||||
}
|
||||
}
|
||||
@ -12,7 +12,6 @@
|
||||
#![cfg_attr(not(test), warn(unused_crate_dependencies))]
|
||||
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
|
||||
|
||||
#[cfg(unix)]
|
||||
pub mod client;
|
||||
pub mod server;
|
||||
|
||||
|
||||
@ -1,12 +1,11 @@
|
||||
//! A IPC connection.
|
||||
|
||||
use crate::stream_codec::StreamCodec;
|
||||
use futures::{ready, stream::FuturesUnordered, FutureExt, Sink, Stream, StreamExt};
|
||||
use futures::{stream::FuturesUnordered, FutureExt, Sink, Stream};
|
||||
use std::{
|
||||
collections::VecDeque,
|
||||
future::Future,
|
||||
io,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
@ -16,58 +15,8 @@ use tower::Service;
|
||||
|
||||
pub(crate) type JsonRpcStream<T> = Framed<T, StreamCodec>;
|
||||
|
||||
/// Wraps a stream of incoming connections.
|
||||
#[pin_project::pin_project]
|
||||
pub(crate) struct Incoming<T, Item> {
|
||||
#[pin]
|
||||
inner: T,
|
||||
_marker: PhantomData<Item>,
|
||||
}
|
||||
impl<T, Item> Incoming<T, Item>
|
||||
where
|
||||
T: Stream<Item = io::Result<Item>> + Unpin + 'static,
|
||||
Item: AsyncRead + AsyncWrite,
|
||||
{
|
||||
/// Create a new instance.
|
||||
pub(crate) fn new(inner: T) -> Self {
|
||||
Self { inner, _marker: Default::default() }
|
||||
}
|
||||
|
||||
/// Polls to accept a new incoming connection to the endpoint.
|
||||
pub(crate) fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
|
||||
Poll::Ready(ready!(self.poll_next_unpin(cx)).map_or(
|
||||
Err(io::Error::new(io::ErrorKind::ConnectionAborted, "ipc connection closed")),
|
||||
|conn| conn,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Item> Stream for Incoming<T, Item>
|
||||
where
|
||||
T: Stream<Item = io::Result<Item>> + 'static,
|
||||
Item: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Item = io::Result<IpcConn<JsonRpcStream<Item>>>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
let res = match ready!(this.inner.poll_next(cx)) {
|
||||
Some(Ok(item)) => {
|
||||
let framed = IpcConn(tokio_util::codec::Decoder::framed(
|
||||
StreamCodec::stream_incoming(),
|
||||
item,
|
||||
));
|
||||
Ok(framed)
|
||||
}
|
||||
Some(Err(err)) => Err(err),
|
||||
None => return Poll::Ready(None),
|
||||
};
|
||||
Poll::Ready(Some(res))
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub(crate) struct IpcConn<T>(#[pin] T);
|
||||
pub(crate) struct IpcConn<T>(#[pin] pub(crate) T);
|
||||
|
||||
impl<T> IpcConn<JsonRpcStream<T>>
|
||||
where
|
||||
|
||||
@ -26,127 +26,9 @@
|
||||
|
||||
//! Utilities for handling async code.
|
||||
|
||||
use futures::FutureExt;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::{
|
||||
sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError},
|
||||
time::{self, Duration, Interval},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// Polling for server stop monitor interval in milliseconds.
|
||||
const STOP_MONITOR_POLLING_INTERVAL: Duration = Duration::from_millis(1000);
|
||||
|
||||
/// This is a flexible collection of futures that need to be driven to completion
|
||||
/// alongside some other future, such as connection handlers that need to be
|
||||
/// handled along with a listener for new connections.
|
||||
///
|
||||
/// In order to `.await` on these futures and drive them to completion, call
|
||||
/// `select_with` providing some other future, the result of which you need.
|
||||
pub(crate) struct FutureDriver<F> {
|
||||
futures: Vec<F>,
|
||||
stop_monitor_heartbeat: Interval,
|
||||
}
|
||||
|
||||
impl<F> Default for FutureDriver<F> {
|
||||
fn default() -> Self {
|
||||
let mut heartbeat = time::interval(STOP_MONITOR_POLLING_INTERVAL);
|
||||
|
||||
heartbeat.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
|
||||
|
||||
FutureDriver { futures: Vec::new(), stop_monitor_heartbeat: heartbeat }
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> FutureDriver<F> {
|
||||
/// Add a new future to this driver
|
||||
pub(crate) fn add(&mut self, future: F) {
|
||||
self.futures.push(future);
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> FutureDriver<F>
|
||||
where
|
||||
F: Future + Unpin,
|
||||
{
|
||||
pub(crate) async fn select_with<S: Future>(&mut self, selector: S) -> S::Output {
|
||||
tokio::pin!(selector);
|
||||
|
||||
DriverSelect { selector, driver: self }.await
|
||||
}
|
||||
|
||||
fn drive(&mut self, cx: &mut Context<'_>) {
|
||||
let mut i = 0;
|
||||
|
||||
while i < self.futures.len() {
|
||||
if self.futures[i].poll_unpin(cx).is_ready() {
|
||||
// Using `swap_remove` since we don't care about ordering,
|
||||
// but we do care about removing being `O(1)`.
|
||||
//
|
||||
// We don't increment `i` in this branch, since we now
|
||||
// have a shorter length, and potentially a new value at
|
||||
// current index
|
||||
self.futures.swap_remove(i);
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_stop_monitor_heartbeat(&mut self, cx: &mut Context<'_>) {
|
||||
// We don't care about the ticks of the heartbeat, it's here only
|
||||
// to periodically wake the `Waker` on `cx`.
|
||||
let _ = self.stop_monitor_heartbeat.poll_tick(cx);
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> Future for FutureDriver<F>
|
||||
where
|
||||
F: Future + Unpin,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = Pin::into_inner(self);
|
||||
|
||||
this.drive(cx);
|
||||
|
||||
if this.futures.is_empty() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is a glorified select `Future` that will attempt to drive all
|
||||
/// connection futures `F` to completion on each `poll`, while also
|
||||
/// handling incoming connections.
|
||||
struct DriverSelect<'a, S, F> {
|
||||
selector: S,
|
||||
driver: &'a mut FutureDriver<F>,
|
||||
}
|
||||
|
||||
impl<'a, R, F> Future for DriverSelect<'a, R, F>
|
||||
where
|
||||
R: Future + Unpin,
|
||||
F: Future + Unpin,
|
||||
{
|
||||
type Output = R::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = Pin::into_inner(self);
|
||||
|
||||
this.driver.drive(cx);
|
||||
this.driver.poll_stop_monitor_heartbeat(cx);
|
||||
|
||||
this.selector.poll_unpin(cx)
|
||||
}
|
||||
}
|
||||
use tokio::sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct StopHandle(watch::Receiver<()>);
|
||||
@ -156,12 +38,7 @@ impl StopHandle {
|
||||
Self(rx)
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown_requested(&self) -> bool {
|
||||
// if a message has been seen, it means that `stop` has been called.
|
||||
self.0.has_changed().unwrap_or(true)
|
||||
}
|
||||
|
||||
pub(crate) async fn shutdown(&mut self) {
|
||||
pub(crate) async fn shutdown(mut self) {
|
||||
// Err(_) implies that the `sender` has been dropped.
|
||||
// Ok(_) implies that `stop` has been called.
|
||||
let _ = self.0.changed().await;
|
||||
|
||||
@ -1,14 +1,16 @@
|
||||
//! JSON-RPC IPC server implementation
|
||||
|
||||
use crate::server::{
|
||||
connection::{Incoming, IpcConn, JsonRpcStream},
|
||||
future::{ConnectionGuard, FutureDriver, StopHandle},
|
||||
connection::{IpcConn, JsonRpcStream},
|
||||
future::{ConnectionGuard, StopHandle},
|
||||
};
|
||||
use futures::{FutureExt, Stream, StreamExt};
|
||||
use futures::StreamExt;
|
||||
use futures_util::{future::Either, stream::FuturesUnordered};
|
||||
use interprocess::local_socket::tokio::{LocalSocketListener, LocalSocketStream};
|
||||
use jsonrpsee::{
|
||||
core::TEN_MB_SIZE_BYTES,
|
||||
server::{
|
||||
middleware::rpc::{either::Either, RpcLoggerLayer, RpcServiceT},
|
||||
middleware::rpc::{RpcLoggerLayer, RpcServiceT},
|
||||
AlreadyStoppedError, IdProvider, RandomIntegerIdProvider,
|
||||
},
|
||||
BoundedSubscriptions, MethodSink, Methods,
|
||||
@ -25,16 +27,18 @@ use tokio::{
|
||||
sync::{oneshot, watch, OwnedSemaphorePermit},
|
||||
};
|
||||
use tower::{layer::util::Identity, Layer, Service};
|
||||
use tracing::{debug, trace, warn};
|
||||
|
||||
use tracing::{debug, trace, warn, Instrument};
|
||||
// re-export so can be used during builder setup
|
||||
use crate::server::{
|
||||
connection::IpcConnDriver,
|
||||
rpc_service::{RpcService, RpcServiceCfg},
|
||||
use crate::{
|
||||
server::{
|
||||
connection::IpcConnDriver,
|
||||
rpc_service::{RpcService, RpcServiceCfg},
|
||||
},
|
||||
stream_codec::StreamCodec,
|
||||
};
|
||||
pub use parity_tokio_ipc::Endpoint;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||
use tower::layer::{util::Stack, LayerFn};
|
||||
|
||||
mod connection;
|
||||
@ -47,7 +51,7 @@ mod rpc_service;
|
||||
// This is an adapted `jsonrpsee` Server, but for `Ipc` connections.
|
||||
pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
|
||||
/// The endpoint we listen for incoming transactions
|
||||
endpoint: Endpoint,
|
||||
endpoint: String,
|
||||
id_provider: Arc<dyn IdProvider>,
|
||||
cfg: Settings,
|
||||
rpc_middleware: RpcServiceBuilder<RpcMiddleware>,
|
||||
@ -55,9 +59,9 @@ pub struct IpcServer<HttpMiddleware = Identity, RpcMiddleware = Identity> {
|
||||
}
|
||||
|
||||
impl<HttpMiddleware, RpcMiddleware> IpcServer<HttpMiddleware, RpcMiddleware> {
|
||||
/// Returns the configured [Endpoint]
|
||||
pub fn endpoint(&self) -> &Endpoint {
|
||||
&self.endpoint
|
||||
/// Returns the configured endpoint
|
||||
pub fn endpoint(&self) -> String {
|
||||
self.endpoint.clone()
|
||||
}
|
||||
}
|
||||
|
||||
@ -123,15 +127,29 @@ where
|
||||
stop_handle: StopHandle,
|
||||
on_ready: oneshot::Sender<Result<(), IpcServerStartError>>,
|
||||
) {
|
||||
trace!(endpoint = ?self.endpoint.path(), "starting ipc server");
|
||||
trace!(endpoint = ?self.endpoint, "starting ipc server");
|
||||
|
||||
if cfg!(unix) {
|
||||
// ensure the file does not exist
|
||||
if std::fs::remove_file(self.endpoint.path()).is_ok() {
|
||||
debug!(endpoint = ?self.endpoint.path(), "removed existing IPC endpoint file");
|
||||
if std::fs::remove_file(&self.endpoint).is_ok() {
|
||||
debug!(endpoint = ?self.endpoint, "removed existing IPC endpoint file");
|
||||
}
|
||||
}
|
||||
|
||||
let listener = match LocalSocketListener::bind(self.endpoint.clone()) {
|
||||
Err(err) => {
|
||||
on_ready
|
||||
.send(Err(IpcServerStartError { endpoint: self.endpoint.clone(), source: err }))
|
||||
.ok();
|
||||
return;
|
||||
}
|
||||
|
||||
Ok(listener) => listener,
|
||||
};
|
||||
|
||||
// signal that we're ready to accept connections
|
||||
on_ready.send(Ok(())).ok();
|
||||
|
||||
let message_buffer_capacity = self.cfg.message_buffer_capacity;
|
||||
let max_request_body_size = self.cfg.max_request_body_size;
|
||||
let max_response_body_size = self.cfg.max_response_body_size;
|
||||
@ -142,37 +160,27 @@ where
|
||||
let mut id: u32 = 0;
|
||||
let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
|
||||
|
||||
let mut connections = FutureDriver::default();
|
||||
let endpoint_path = self.endpoint.path().to_string();
|
||||
let incoming = match self.endpoint.incoming() {
|
||||
Ok(connections) => {
|
||||
#[cfg(windows)]
|
||||
let connections = Box::pin(connections);
|
||||
Incoming::new(connections)
|
||||
}
|
||||
Err(err) => {
|
||||
on_ready
|
||||
.send(Err(IpcServerStartError { endpoint: endpoint_path, source: err }))
|
||||
.ok();
|
||||
return
|
||||
}
|
||||
};
|
||||
// signal that we're ready to accept connections
|
||||
on_ready.send(Ok(())).ok();
|
||||
|
||||
let mut incoming = Monitored::new(incoming, &stop_handle);
|
||||
let mut connections = FuturesUnordered::new();
|
||||
let stopped = stop_handle.clone().shutdown();
|
||||
tokio::pin!(stopped);
|
||||
|
||||
trace!("accepting ipc connections");
|
||||
loop {
|
||||
match connections.select_with(&mut incoming).await {
|
||||
Ok(ipc) => {
|
||||
match try_accept_conn(&listener, stopped).await {
|
||||
AcceptConnection::Established { local_socket_stream, stop } => {
|
||||
trace!("established new connection");
|
||||
let ipc = IpcConn(tokio_util::codec::Decoder::framed(
|
||||
StreamCodec::stream_incoming(),
|
||||
local_socket_stream.compat(),
|
||||
));
|
||||
|
||||
let conn = match connection_guard.try_acquire() {
|
||||
Some(conn) => conn,
|
||||
None => {
|
||||
warn!("Too many IPC connections. Please try again later.");
|
||||
connections.add(ipc.reject_connection().boxed());
|
||||
continue
|
||||
connections.push(tokio::spawn(ipc.reject_connection().in_current_span()));
|
||||
stopped = stop;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
@ -198,30 +206,58 @@ where
|
||||
};
|
||||
|
||||
let service = self.http_middleware.service(tower_service);
|
||||
connections.add(Box::pin(spawn_connection(
|
||||
connections.push(tokio::spawn(process_connection(
|
||||
ipc,
|
||||
service,
|
||||
stop_handle.clone(),
|
||||
rx,
|
||||
)));
|
||||
).in_current_span()));
|
||||
|
||||
id = id.wrapping_add(1);
|
||||
stopped = stop;
|
||||
}
|
||||
Err(MonitoredError::Selector(err)) => {
|
||||
tracing::error!("Error while awaiting a new IPC connection: {:?}", err);
|
||||
AcceptConnection::Shutdown => { break; }
|
||||
AcceptConnection::Err((e, stop)) => {
|
||||
tracing::error!("Error while awaiting a new IPC connection: {:?}", e);
|
||||
stopped = stop;
|
||||
}
|
||||
Err(MonitoredError::Shutdown) => break,
|
||||
}
|
||||
}
|
||||
|
||||
connections.await;
|
||||
// FuturesUnordered won't poll anything until this line but because the
|
||||
// tasks are spawned (so that they can progress independently)
|
||||
// then this just makes sure that all tasks are completed before
|
||||
// returning from this function.
|
||||
while connections.next().await.is_some() {}
|
||||
}
|
||||
}
|
||||
|
||||
enum AcceptConnection<S> {
|
||||
Shutdown,
|
||||
Established { local_socket_stream: LocalSocketStream, stop: S },
|
||||
Err((io::Error, S)),
|
||||
}
|
||||
|
||||
async fn try_accept_conn<S>(listener: &LocalSocketListener, stopped: S) -> AcceptConnection<S>
|
||||
where
|
||||
S: Future + Unpin,
|
||||
{
|
||||
let accept = listener.accept();
|
||||
tokio::pin!(accept);
|
||||
|
||||
match futures_util::future::select(accept, stopped).await {
|
||||
Either::Left((res, stop)) => match res {
|
||||
Ok(local_socket_stream) => AcceptConnection::Established { local_socket_stream, stop },
|
||||
Err(e) => AcceptConnection::Err((e, stop)),
|
||||
},
|
||||
Either::Right(_) => AcceptConnection::Shutdown,
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for IpcServer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("IpcServer")
|
||||
.field("endpoint", &self.endpoint.path())
|
||||
.field("endpoint", &self.endpoint)
|
||||
.field("cfg", &self.cfg)
|
||||
.field("id_provider", &self.id_provider)
|
||||
.finish()
|
||||
@ -408,10 +444,10 @@ where
|
||||
}
|
||||
|
||||
/// Spawns the IPC connection onto a new task
|
||||
async fn spawn_connection<S, T>(
|
||||
async fn process_connection<S, T>(
|
||||
conn: IpcConn<JsonRpcStream<T>>,
|
||||
service: S,
|
||||
mut stop_handle: StopHandle,
|
||||
stop_handle: StopHandle,
|
||||
rx: mpsc::Receiver<String>,
|
||||
) where
|
||||
S: Service<String, Response = Option<String>> + Send + 'static,
|
||||
@ -419,70 +455,34 @@ async fn spawn_connection<S, T>(
|
||||
S::Future: Send + Unpin,
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
let task = tokio::task::spawn(async move {
|
||||
let rx_item = ReceiverStream::new(rx);
|
||||
let conn = IpcConnDriver {
|
||||
conn,
|
||||
service,
|
||||
pending_calls: Default::default(),
|
||||
items: Default::default(),
|
||||
};
|
||||
tokio::pin!(conn, rx_item);
|
||||
let rx_item = ReceiverStream::new(rx);
|
||||
let conn = IpcConnDriver {
|
||||
conn,
|
||||
service,
|
||||
pending_calls: Default::default(),
|
||||
items: Default::default(),
|
||||
};
|
||||
tokio::pin!(conn, rx_item);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut conn => {
|
||||
break
|
||||
}
|
||||
item = rx_item.next() => {
|
||||
if let Some(item) = item {
|
||||
conn.push_back(item);
|
||||
}
|
||||
}
|
||||
_ = stop_handle.shutdown() => {
|
||||
// shutdown
|
||||
break
|
||||
let stopped = stop_handle.shutdown();
|
||||
|
||||
tokio::pin!(stopped);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut conn => {
|
||||
break
|
||||
}
|
||||
item = rx_item.next() => {
|
||||
if let Some(item) = item {
|
||||
conn.push_back(item);
|
||||
}
|
||||
}
|
||||
_ = &mut stopped=> {
|
||||
// shutdown
|
||||
break
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
task.await.ok();
|
||||
}
|
||||
|
||||
/// This is a glorified select listening for new messages, while also checking the `stop_receiver`
|
||||
/// signal.
|
||||
struct Monitored<'a, F> {
|
||||
future: F,
|
||||
stop_monitor: &'a StopHandle,
|
||||
}
|
||||
|
||||
impl<'a, F> Monitored<'a, F> {
|
||||
fn new(future: F, stop_monitor: &'a StopHandle) -> Self {
|
||||
Monitored { future, stop_monitor }
|
||||
}
|
||||
}
|
||||
|
||||
enum MonitoredError<E> {
|
||||
Shutdown,
|
||||
Selector(E),
|
||||
}
|
||||
|
||||
impl<'a, T, Item> Future for Monitored<'a, Incoming<T, Item>>
|
||||
where
|
||||
T: Stream<Item = io::Result<Item>> + Unpin + 'static,
|
||||
Item: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Output = Result<IpcConn<JsonRpcStream<Item>>, MonitoredError<io::Error>>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
if this.stop_monitor.shutdown_requested() {
|
||||
return Poll::Ready(Err(MonitoredError::Shutdown))
|
||||
}
|
||||
|
||||
this.future.poll_accept(cx).map_err(MonitoredError::Selector)
|
||||
}
|
||||
}
|
||||
|
||||
@ -734,17 +734,8 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
|
||||
|
||||
/// Finalize the configuration of the server. Consumes the [`Builder`].
|
||||
pub fn build(self, endpoint: impl AsRef<str>) -> IpcServer<HttpMiddleware, RpcMiddleware> {
|
||||
let endpoint = Endpoint::new(endpoint.as_ref().to_string());
|
||||
self.build_with_endpoint(endpoint)
|
||||
}
|
||||
|
||||
/// Finalize the configuration of the server. Consumes the [`Builder`].
|
||||
pub fn build_with_endpoint(
|
||||
self,
|
||||
endpoint: Endpoint,
|
||||
) -> IpcServer<HttpMiddleware, RpcMiddleware> {
|
||||
IpcServer {
|
||||
endpoint,
|
||||
endpoint: endpoint.as_ref().to_string(),
|
||||
cfg: self.settings,
|
||||
id_provider: self.id_provider,
|
||||
http_middleware: self.http_middleware,
|
||||
@ -782,7 +773,18 @@ impl ServerHandle {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(all(test, unix))]
|
||||
/// For testing/examples
|
||||
#[cfg(test)]
|
||||
pub fn dummy_endpoint() -> String {
|
||||
let num: u64 = rand::Rng::gen(&mut rand::thread_rng());
|
||||
if cfg!(windows) {
|
||||
format!(r"\\.\pipe\my-pipe-{}", num)
|
||||
} else {
|
||||
format!(r"/tmp/my-uds-{}", num)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::client::IpcClientBuilder;
|
||||
@ -797,7 +799,6 @@ mod tests {
|
||||
types::Request,
|
||||
PendingSubscriptionSink, RpcModule, SubscriptionMessage,
|
||||
};
|
||||
use parity_tokio_ipc::dummy_endpoint;
|
||||
use tokio::sync::broadcast;
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
|
||||
@ -823,7 +824,7 @@ mod tests {
|
||||
// and you might want to do something smarter if it's
|
||||
// critical that "the most recent item" must be sent when it is produced.
|
||||
if sink.send(notif).await.is_err() {
|
||||
break Ok(())
|
||||
break Ok(());
|
||||
}
|
||||
|
||||
closed = c;
|
||||
@ -848,6 +849,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn can_set_the_max_response_body_size() {
|
||||
// init_test_tracing();
|
||||
let endpoint = dummy_endpoint();
|
||||
let server = Builder::default().max_response_body_size(100).build(&endpoint);
|
||||
let mut module = RpcModule::new(());
|
||||
|
||||
Reference in New Issue
Block a user