mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
WIP: feat: ipc support (#144)
* feat: ipc support * copy futures * feat: port ipc implementation * cleanup * add test * fix clippy * add request test
This commit is contained in:
164
Cargo.lock
generated
164
Cargo.lock
generated
@ -1931,26 +1931,26 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "jsonrpsee"
|
||||
version = "0.15.1"
|
||||
version = "0.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8bd0d559d5e679b1ab2f869b486a11182923863b1b3ee8b421763cdd707b783a"
|
||||
checksum = "e0ee76536f6a303b67c13a99ecae0002bb388674dbf416094dde808263ea229c"
|
||||
dependencies = [
|
||||
"jsonrpsee-client-transport",
|
||||
"jsonrpsee-core",
|
||||
"jsonrpsee-http-client",
|
||||
"jsonrpsee-http-server",
|
||||
"jsonrpsee-proc-macros",
|
||||
"jsonrpsee-server",
|
||||
"jsonrpsee-types",
|
||||
"jsonrpsee-wasm-client",
|
||||
"jsonrpsee-ws-client",
|
||||
"jsonrpsee-ws-server",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonrpsee-client-transport"
|
||||
version = "0.15.1"
|
||||
version = "0.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8752740ecd374bcbf8b69f3e80b0327942df76f793f8d4e60d3355650c31fb74"
|
||||
checksum = "74c8f8f21b684623d23be8b6fcb101594f4e95d8a505ffd0568de863d93668f4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"futures-channel",
|
||||
@ -1973,9 +1973,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "jsonrpsee-core"
|
||||
version = "0.15.1"
|
||||
version = "0.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f3dc3e9cf2ba50b7b1d7d76a667619f82846caa39e8e8daa8a4962d74acaddca"
|
||||
checksum = "f57020f4c98b6c6e8848fb115e61227fba6993517bee0faa38e4db627a9f7254"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"arrayvec",
|
||||
@ -1986,10 +1986,8 @@ dependencies = [
|
||||
"futures-timer",
|
||||
"futures-util",
|
||||
"globset",
|
||||
"http",
|
||||
"hyper",
|
||||
"jsonrpsee-types",
|
||||
"lazy_static",
|
||||
"parking_lot 0.12.1",
|
||||
"rand 0.8.5",
|
||||
"rustc-hash",
|
||||
@ -1999,16 +1997,14 @@ dependencies = [
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-futures",
|
||||
"unicase",
|
||||
"wasm-bindgen-futures",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonrpsee-http-client"
|
||||
version = "0.15.1"
|
||||
version = "0.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "52f7c0e2333ab2115c302eeb4f137c8a4af5ab609762df68bbda8f06496677c9"
|
||||
checksum = "7ca71086fd13ad0991cd4a0e50c9f4c59488b1acfac4a528c448c2e10020aa1e"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"hyper",
|
||||
@ -2021,33 +2017,15 @@ dependencies = [
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-futures",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonrpsee-http-server"
|
||||
version = "0.15.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "03802f0373a38c2420c70b5144742d800b509e2937edc4afb116434f07120117"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"hyper",
|
||||
"jsonrpsee-core",
|
||||
"jsonrpsee-types",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"tracing-futures",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonrpsee-proc-macros"
|
||||
version = "0.15.1"
|
||||
version = "0.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bd67957d4280217247588ac86614ead007b301ca2fa9f19c19f880a536f029e3"
|
||||
checksum = "d335519bfe970511318f2780b7716f91d99d67fbf32ac3ea94b5f2f6c9818a4d"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro-crate",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
@ -2055,10 +2033,32 @@ dependencies = [
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonrpsee-types"
|
||||
version = "0.15.1"
|
||||
name = "jsonrpsee-server"
|
||||
version = "0.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e290bba767401b646812f608c099b922d8142603c9e73a50fb192d3ac86f4a0d"
|
||||
checksum = "ff5de9e3d6280c5354882e001494bc6ffb3ea31ac7dd81440f997aa380039e39"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"http",
|
||||
"hyper",
|
||||
"jsonrpsee-core",
|
||||
"jsonrpsee-types",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"soketto",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util 0.7.4",
|
||||
"tower",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonrpsee-types"
|
||||
version = "0.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "88c88c764104fef883eb8a832d0d77688a63f67d75b385f5cdae7b3db8925288"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"beef",
|
||||
@ -2070,9 +2070,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "jsonrpsee-wasm-client"
|
||||
version = "0.15.1"
|
||||
version = "0.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "597b4eb94730e7695d0a2a429bc37a12e6e84d12680fdafb9b8f5f53652aab57"
|
||||
checksum = "d11a058951524f3f6e02e94c26d5c189a5df0f2dea81339147c603b9eb7c511d"
|
||||
dependencies = [
|
||||
"jsonrpsee-client-transport",
|
||||
"jsonrpsee-core",
|
||||
@ -2081,9 +2081,9 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "jsonrpsee-ws-client"
|
||||
version = "0.15.1"
|
||||
version = "0.16.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ee5feddd5188e62ac08fcf0e56478138e581509d4730f3f7be9b57dd402a4ff"
|
||||
checksum = "ea609539b5062f856a43652fd01d8ed8df40cd4d7067be6f6b7ce81d8bbd03be"
|
||||
dependencies = [
|
||||
"http",
|
||||
"jsonrpsee-client-transport",
|
||||
@ -2091,26 +2091,6 @@ dependencies = [
|
||||
"jsonrpsee-types",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "jsonrpsee-ws-server"
|
||||
version = "0.15.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d488ba74fb369e5ab68926feb75a483458b88e768d44319f37e4ecad283c7325"
|
||||
dependencies = [
|
||||
"futures-channel",
|
||||
"futures-util",
|
||||
"http",
|
||||
"jsonrpsee-core",
|
||||
"jsonrpsee-types",
|
||||
"serde_json",
|
||||
"soketto",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
"tokio-util 0.7.4",
|
||||
"tracing",
|
||||
"tracing-futures",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "k256"
|
||||
version = "0.11.6"
|
||||
@ -2544,6 +2524,20 @@ dependencies = [
|
||||
"syn",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parity-tokio-ipc"
|
||||
version = "0.9.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9981e32fb75e004cc148f5fb70342f393830e0a4aa62e3cc93b50976218d42b6"
|
||||
dependencies = [
|
||||
"futures",
|
||||
"libc",
|
||||
"log",
|
||||
"rand 0.7.3",
|
||||
"tokio",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "parking"
|
||||
version = "2.0.0"
|
||||
@ -3250,6 +3244,25 @@ dependencies = [
|
||||
"tokio-stream",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reth-ipc"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"futures",
|
||||
"jsonrpsee",
|
||||
"parity-tokio-ipc",
|
||||
"pin-project",
|
||||
"serde_json",
|
||||
"thiserror",
|
||||
"tokio",
|
||||
"tokio-util 0.7.4",
|
||||
"tower",
|
||||
"tracing",
|
||||
"tracing-test",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reth-libmdbx"
|
||||
version = "0.1.6"
|
||||
@ -4023,6 +4036,7 @@ dependencies = [
|
||||
"base64",
|
||||
"bytes",
|
||||
"futures",
|
||||
"http",
|
||||
"httparse",
|
||||
"log",
|
||||
"rand 0.8.5",
|
||||
@ -4440,6 +4454,23 @@ dependencies = [
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
|
||||
dependencies = [
|
||||
"tower-layer",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower-layer"
|
||||
version = "0.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
|
||||
|
||||
[[package]]
|
||||
name = "tower-service"
|
||||
version = "0.3.2"
|
||||
@ -4629,15 +4660,6 @@ dependencies = [
|
||||
"static_assertions",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicase"
|
||||
version = "2.6.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
|
||||
dependencies = [
|
||||
"version_check",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicode-bidi"
|
||||
version = "0.3.8"
|
||||
|
||||
@ -13,6 +13,7 @@ members = [
|
||||
"crates/net/eth-wire",
|
||||
"crates/net/discv4",
|
||||
"crates/net/network",
|
||||
"crates/net/ipc",
|
||||
"crates/net/rpc",
|
||||
"crates/net/rpc-api",
|
||||
"crates/net/rpc-types",
|
||||
|
||||
34
crates/net/ipc/Cargo.toml
Normal file
34
crates/net/ipc/Cargo.toml
Normal file
@ -0,0 +1,34 @@
|
||||
[package]
|
||||
name = "reth-ipc"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
license = "MIT OR Apache-2.0"
|
||||
repository = "https://github.com/foundry-rs/reth"
|
||||
readme = "README.md"
|
||||
description = """
|
||||
IPC support for reth
|
||||
"""
|
||||
|
||||
[dependencies]
|
||||
|
||||
# async/net
|
||||
futures = "0.3"
|
||||
parity-tokio-ipc = "0.9.0"
|
||||
tokio = { version = "1", features = ["net", "time", "rt-multi-thread"] }
|
||||
tokio-util = { version = "0.7", features = ["codec"] }
|
||||
async-trait = "0.1"
|
||||
pin-project = "1.0"
|
||||
tower = "0.4"
|
||||
|
||||
# misc
|
||||
jsonrpsee = { version = "0.16", features = ["server", "client"] }
|
||||
serde_json = "1.0"
|
||||
tracing = "0.1.37"
|
||||
bytes = "1.2.1"
|
||||
thiserror = "1.0.37"
|
||||
|
||||
[dev-dependencies]
|
||||
tracing-test = "0.2"
|
||||
|
||||
[features]
|
||||
client = ["jsonrpsee/client", "jsonrpsee/async-client"]
|
||||
3
crates/net/ipc/README.md
Normal file
3
crates/net/ipc/README.md
Normal file
@ -0,0 +1,3 @@
|
||||
# <h1 align="center"> reth-ipc </h1>
|
||||
|
||||
IPC server and client implementation for [`jsonrpsee`](https://github.com/paritytech/jsonrpsee/).
|
||||
151
crates/net/ipc/src/client.rs
Normal file
151
crates/net/ipc/src/client.rs
Normal file
@ -0,0 +1,151 @@
|
||||
//! [`jsonrpsee`] transport adapter implementation for IPC.
|
||||
|
||||
use crate::stream_codec::StreamCodec;
|
||||
use futures::StreamExt;
|
||||
use jsonrpsee::{
|
||||
async_client::{Client, ClientBuilder},
|
||||
core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
|
||||
};
|
||||
use std::{
|
||||
io,
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tokio::{io::AsyncWriteExt, net::UnixStream};
|
||||
use tokio_util::codec::FramedRead;
|
||||
|
||||
/// Builder type for [`Client`]
|
||||
#[derive(Clone, Default, Debug)]
|
||||
#[non_exhaustive]
|
||||
pub struct IpcClientBuilder;
|
||||
|
||||
impl IpcClientBuilder {
|
||||
/// Connects to a IPC socket
|
||||
pub async fn build(self, path: impl AsRef<Path>) -> Result<Client, IpcError> {
|
||||
let (tx, rx) = IpcTransportClientBuilder::default().build(path).await?;
|
||||
Ok(self.build_with_tokio(tx, rx))
|
||||
}
|
||||
|
||||
/// Uses the sender and receiver channels to connect to the socket.
|
||||
pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
|
||||
where
|
||||
S: TransportSenderT + Send,
|
||||
R: TransportReceiverT + Send,
|
||||
{
|
||||
ClientBuilder::default().build_with_tokio(sender, receiver)
|
||||
}
|
||||
}
|
||||
|
||||
/// Sending end of IPC transport.
|
||||
#[derive(Debug)]
|
||||
pub struct Sender {
|
||||
inner: tokio::net::unix::OwnedWriteHalf,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TransportSenderT for Sender {
|
||||
type Error = IpcError;
|
||||
|
||||
/// Sends out a request. Returns a Future that finishes when the request has been successfully
|
||||
/// sent.
|
||||
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
|
||||
Ok(self.inner.write_all(msg.as_bytes()).await?)
|
||||
}
|
||||
|
||||
async fn send_ping(&mut self) -> Result<(), Self::Error> {
|
||||
tracing::trace!("send ping - not implemented");
|
||||
Err(IpcError::NotSupported)
|
||||
}
|
||||
|
||||
/// Close the connection.
|
||||
async fn close(&mut self) -> Result<(), Self::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Receiving end of IPC transport.
|
||||
#[derive(Debug)]
|
||||
pub struct Receiver {
|
||||
inner: FramedRead<tokio::net::unix::OwnedReadHalf, StreamCodec>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl TransportReceiverT for Receiver {
|
||||
type Error = IpcError;
|
||||
|
||||
/// Returns a Future resolving when the server sent us something back.
|
||||
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
|
||||
match self.inner.next().await {
|
||||
None => Err(IpcError::Closed),
|
||||
Some(val) => Ok(ReceivedMessage::Text(val?)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder for IPC transport [`Sender`] and ['Receiver`] pair.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
#[non_exhaustive]
|
||||
pub struct IpcTransportClientBuilder;
|
||||
|
||||
impl IpcTransportClientBuilder {
|
||||
/// Try to establish the connection.
|
||||
///
|
||||
/// ```
|
||||
/// use jsonrpsee::rpc_params;
|
||||
/// use reth_ipc::client::IpcClientBuilder;
|
||||
/// use jsonrpsee::core::client::ClientT;
|
||||
/// # async fn run_client() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
/// let client = IpcClientBuilder::default().build("/tmp/my-uds").await?;
|
||||
/// let response: String = client.request("say_hello", rpc_params![]).await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
pub async fn build(self, path: impl AsRef<Path>) -> Result<(Sender, Receiver), IpcError> {
|
||||
let path = path.as_ref();
|
||||
let stream = UnixStream::connect(path)
|
||||
.await
|
||||
.map_err(|err| IpcError::FailedToConnect { path: path.to_path_buf(), err })?;
|
||||
|
||||
let (rhlf, whlf) = stream.into_split();
|
||||
|
||||
Ok((
|
||||
Sender { inner: whlf },
|
||||
Receiver { inner: FramedRead::new(rhlf, StreamCodec::stream_incoming()) },
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Error variants that can happen in IPC transport.
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[allow(missing_docs)]
|
||||
pub enum IpcError {
|
||||
/// Operation not supported
|
||||
#[error("Operation not supported")]
|
||||
NotSupported,
|
||||
/// Stream was closed
|
||||
#[error("Stream closed")]
|
||||
Closed,
|
||||
/// Thrown when failed to establish a socket connection.
|
||||
#[error("Failed to connect to socket {path}: {err}")]
|
||||
FailedToConnect {
|
||||
/// The path of the socket.
|
||||
path: PathBuf,
|
||||
err: io::Error,
|
||||
},
|
||||
#[error(transparent)]
|
||||
Io(#[from] io::Error),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use parity_tokio_ipc::{dummy_endpoint, Endpoint};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_connect() {
|
||||
let endpoint = dummy_endpoint();
|
||||
let _incoming = Endpoint::new(endpoint.clone()).incoming().unwrap();
|
||||
|
||||
let (tx, rx) = IpcTransportClientBuilder::default().build(endpoint).await.unwrap();
|
||||
let _ = IpcClientBuilder::default().build_with_tokio(tx, rx);
|
||||
}
|
||||
}
|
||||
14
crates/net/ipc/src/lib.rs
Normal file
14
crates/net/ipc/src/lib.rs
Normal file
@ -0,0 +1,14 @@
|
||||
#![warn(missing_debug_implementations, missing_docs, unreachable_pub, unused_crate_dependencies)]
|
||||
#![deny(unused_must_use, rust_2018_idioms)]
|
||||
#![doc(test(
|
||||
no_crate_inject,
|
||||
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
|
||||
))]
|
||||
|
||||
//! Reth IPC implementation
|
||||
|
||||
pub mod client;
|
||||
pub mod server;
|
||||
|
||||
/// Json codec implementation
|
||||
pub mod stream_codec;
|
||||
115
crates/net/ipc/src/server/connection.rs
Normal file
115
crates/net/ipc/src/server/connection.rs
Normal file
@ -0,0 +1,115 @@
|
||||
//! A IPC connection.
|
||||
|
||||
use crate::stream_codec::StreamCodec;
|
||||
use futures::{ready, Sink, Stream, StreamExt};
|
||||
use std::{
|
||||
io,
|
||||
marker::PhantomData,
|
||||
pin::Pin,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
|
||||
use tokio_util::codec::Framed;
|
||||
|
||||
pub(crate) type JsonRpcStream<T> = Framed<T, StreamCodec>;
|
||||
|
||||
/// Wraps a stream of incoming connections.
|
||||
#[pin_project::pin_project]
|
||||
pub(crate) struct Incoming<T, Item> {
|
||||
#[pin]
|
||||
inner: T,
|
||||
_marker: PhantomData<Item>,
|
||||
}
|
||||
impl<T, Item> Incoming<T, Item>
|
||||
where
|
||||
T: Stream<Item = io::Result<Item>> + Unpin + 'static,
|
||||
Item: AsyncRead + AsyncWrite,
|
||||
{
|
||||
/// Create a new instance.
|
||||
pub(crate) fn new(inner: T) -> Self {
|
||||
Self { inner, _marker: Default::default() }
|
||||
}
|
||||
|
||||
/// Polls to accept a new incoming connection to the endpoint.
|
||||
pub(crate) fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
|
||||
let res = match ready!(self.poll_next_unpin(cx)) {
|
||||
None => Err(io::Error::new(io::ErrorKind::ConnectionAborted, "ipc connection closed")),
|
||||
Some(conn) => conn,
|
||||
};
|
||||
Poll::Ready(res)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, Item> Stream for Incoming<T, Item>
|
||||
where
|
||||
T: Stream<Item = io::Result<Item>> + 'static,
|
||||
Item: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Item = io::Result<IpcConn<JsonRpcStream<Item>>>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
let this = self.project();
|
||||
let res = match ready!(this.inner.poll_next(cx)) {
|
||||
Some(Ok(item)) => {
|
||||
let framed = IpcConn(tokio_util::codec::Decoder::framed(
|
||||
StreamCodec::stream_incoming(),
|
||||
item,
|
||||
));
|
||||
Ok(framed)
|
||||
}
|
||||
Some(Err(err)) => Err(err),
|
||||
None => return Poll::Ready(None),
|
||||
};
|
||||
Poll::Ready(Some(res))
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project::pin_project]
|
||||
pub(crate) struct IpcConn<T>(#[pin] T);
|
||||
|
||||
impl<T> IpcConn<JsonRpcStream<T>>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
/// Create a response for when the server is busy and can't accept more requests.
|
||||
pub(crate) async fn reject_connection(self) {
|
||||
let mut parts = self.0.into_parts();
|
||||
let _ = parts.io.write_all(b"Too many connections. Please try again later.").await;
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Stream for IpcConn<JsonRpcStream<T>>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Item = io::Result<String>;
|
||||
|
||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
self.project().0.poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Sink<String> for IpcConn<JsonRpcStream<T>>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
// NOTE: we always flush here this prevents buffering in the underlying
|
||||
// `Framed` impl that would cause stalled requests
|
||||
self.project().0.poll_flush(cx)
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> {
|
||||
self.project().0.start_send(item)
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.project().0.poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.project().0.poll_close(cx)
|
||||
}
|
||||
}
|
||||
208
crates/net/ipc/src/server/future.rs
Normal file
208
crates/net/ipc/src/server/future.rs
Normal file
@ -0,0 +1,208 @@
|
||||
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any
|
||||
// person obtaining a copy of this software and associated
|
||||
// documentation files (the "Software"), to deal in the
|
||||
// Software without restriction, including without
|
||||
// limitation the rights to use, copy, modify, merge,
|
||||
// publish, distribute, sublicense, and/or sell copies of
|
||||
// the Software, and to permit persons to whom the Software
|
||||
// is furnished to do so, subject to the following
|
||||
// conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice
|
||||
// shall be included in all copies or substantial portions
|
||||
// of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
|
||||
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
|
||||
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
|
||||
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
|
||||
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
|
||||
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
||||
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
|
||||
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
//! Utilities for handling async code.
|
||||
|
||||
use futures::FutureExt;
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::{
|
||||
sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError},
|
||||
time::{self, Duration, Interval},
|
||||
};
|
||||
|
||||
/// Polling for server stop monitor interval in milliseconds.
|
||||
const STOP_MONITOR_POLLING_INTERVAL: Duration = Duration::from_millis(1000);
|
||||
|
||||
/// This is a flexible collection of futures that need to be driven to completion
|
||||
/// alongside some other future, such as connection handlers that need to be
|
||||
/// handled along with a listener for new connections.
|
||||
///
|
||||
/// In order to `.await` on these futures and drive them to completion, call
|
||||
/// `select_with` providing some other future, the result of which you need.
|
||||
pub(crate) struct FutureDriver<F> {
|
||||
futures: Vec<F>,
|
||||
stop_monitor_heartbeat: Interval,
|
||||
}
|
||||
|
||||
impl<F> Default for FutureDriver<F> {
|
||||
fn default() -> Self {
|
||||
let mut heartbeat = time::interval(STOP_MONITOR_POLLING_INTERVAL);
|
||||
|
||||
heartbeat.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
|
||||
|
||||
FutureDriver { futures: Vec::new(), stop_monitor_heartbeat: heartbeat }
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> FutureDriver<F> {
|
||||
/// Add a new future to this driver
|
||||
pub(crate) fn add(&mut self, future: F) {
|
||||
self.futures.push(future);
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> FutureDriver<F>
|
||||
where
|
||||
F: Future + Unpin,
|
||||
{
|
||||
pub(crate) async fn select_with<S: Future>(&mut self, selector: S) -> S::Output {
|
||||
tokio::pin!(selector);
|
||||
|
||||
DriverSelect { selector, driver: self }.await
|
||||
}
|
||||
|
||||
fn drive(&mut self, cx: &mut Context<'_>) {
|
||||
let mut i = 0;
|
||||
|
||||
while i < self.futures.len() {
|
||||
if self.futures[i].poll_unpin(cx).is_ready() {
|
||||
// Using `swap_remove` since we don't care about ordering
|
||||
// but we do care about removing being `O(1)`.
|
||||
//
|
||||
// We don't increment `i` in this branch, since we now
|
||||
// have a shorter length, and potentially a new value at
|
||||
// current index
|
||||
self.futures.swap_remove(i);
|
||||
} else {
|
||||
i += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_stop_monitor_heartbeat(&mut self, cx: &mut Context<'_>) {
|
||||
// We don't care about the ticks of the heartbeat, it's here only
|
||||
// to periodically wake the `Waker` on `cx`.
|
||||
let _ = self.stop_monitor_heartbeat.poll_tick(cx);
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> Future for FutureDriver<F>
|
||||
where
|
||||
F: Future + Unpin,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = Pin::into_inner(self);
|
||||
|
||||
this.drive(cx);
|
||||
|
||||
if this.futures.is_empty() {
|
||||
Poll::Ready(())
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// This is a glorified select `Future` that will attempt to drive all
|
||||
/// connection futures `F` to completion on each `poll`, while also
|
||||
/// handling incoming connections.
|
||||
struct DriverSelect<'a, S, F> {
|
||||
selector: S,
|
||||
driver: &'a mut FutureDriver<F>,
|
||||
}
|
||||
|
||||
impl<'a, R, F> Future for DriverSelect<'a, R, F>
|
||||
where
|
||||
R: Future + Unpin,
|
||||
F: Future + Unpin,
|
||||
{
|
||||
type Output = R::Output;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = Pin::into_inner(self);
|
||||
|
||||
this.driver.drive(cx);
|
||||
this.driver.poll_stop_monitor_heartbeat(cx);
|
||||
|
||||
this.selector.poll_unpin(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct StopHandle(watch::Receiver<()>);
|
||||
|
||||
impl StopHandle {
|
||||
pub(crate) fn new(rx: watch::Receiver<()>) -> Self {
|
||||
Self(rx)
|
||||
}
|
||||
|
||||
pub(crate) fn shutdown_requested(&self) -> bool {
|
||||
// if a message has been seen, it means that `stop` has been called.
|
||||
self.0.has_changed().unwrap_or(true)
|
||||
}
|
||||
|
||||
pub(crate) async fn shutdown(&mut self) {
|
||||
// Err(_) implies that the `sender` has been dropped.
|
||||
// Ok(_) implies that `stop` has been called.
|
||||
let _ = self.0.changed().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Server handle.
|
||||
///
|
||||
/// When all [`StopHandle`]'s have been `dropped` or `stop` has been called
|
||||
/// the server will be stopped.
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct ServerHandle(Arc<watch::Sender<()>>);
|
||||
|
||||
impl ServerHandle {
|
||||
/// Wait for the server to stop.
|
||||
#[allow(unused)]
|
||||
pub(crate) async fn stopped(self) {
|
||||
self.0.closed().await
|
||||
}
|
||||
}
|
||||
|
||||
/// Limits the number of connections.
|
||||
pub(crate) struct ConnectionGuard(Arc<Semaphore>);
|
||||
|
||||
impl ConnectionGuard {
|
||||
pub(crate) fn new(limit: usize) -> Self {
|
||||
Self(Arc::new(Semaphore::new(limit)))
|
||||
}
|
||||
|
||||
pub(crate) fn try_acquire(&self) -> Option<OwnedSemaphorePermit> {
|
||||
match self.0.clone().try_acquire_owned() {
|
||||
Ok(guard) => Some(guard),
|
||||
Err(TryAcquireError::Closed) => {
|
||||
unreachable!("Semaphore::Close is never called and can't be closed; qed")
|
||||
}
|
||||
Err(TryAcquireError::NoPermits) => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) fn available_connections(&self) -> usize {
|
||||
self.0.available_permits()
|
||||
}
|
||||
}
|
||||
291
crates/net/ipc/src/server/ipc.rs
Normal file
291
crates/net/ipc/src/server/ipc.rs
Normal file
@ -0,0 +1,291 @@
|
||||
//! IPC request handling adapted from [`jsonrpsee`] http request handling
|
||||
use futures::{stream::FuturesOrdered, StreamExt};
|
||||
use jsonrpsee::{
|
||||
core::{
|
||||
server::{
|
||||
helpers::{prepare_error, BatchResponse, BatchResponseBuilder, MethodResponse},
|
||||
resource_limiting::Resources,
|
||||
rpc_module::{MethodKind, Methods},
|
||||
},
|
||||
tracing::{rx_log_from_json, tx_log_from_str},
|
||||
JsonRawValue,
|
||||
},
|
||||
server::{
|
||||
logger,
|
||||
logger::{Logger, TransportProtocol},
|
||||
},
|
||||
types::{error::ErrorCode, ErrorObject, Id, InvalidRequest, Notification, Params, Request},
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::OwnedSemaphorePermit;
|
||||
use tokio_util::either::Either;
|
||||
use tracing::instrument;
|
||||
|
||||
type Notif<'a> = Notification<'a, Option<&'a JsonRawValue>>;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct Batch<'a, L: Logger> {
|
||||
data: Vec<u8>,
|
||||
call: CallData<'a, L>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct CallData<'a, L: Logger> {
|
||||
conn_id: usize,
|
||||
logger: &'a L,
|
||||
methods: &'a Methods,
|
||||
max_response_body_size: u32,
|
||||
max_log_length: u32,
|
||||
resources: &'a Resources,
|
||||
request_start: L::Instant,
|
||||
}
|
||||
|
||||
// Batch responses must be sent back as a single message so we read the results from each
|
||||
// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
|
||||
// complete batch response back to the client over `tx`.
|
||||
#[instrument(name = "batch", skip(b), level = "TRACE")]
|
||||
pub(crate) async fn process_batch_request<L>(b: Batch<'_, L>) -> BatchResponse
|
||||
where
|
||||
L: Logger,
|
||||
{
|
||||
let Batch { data, call } = b;
|
||||
|
||||
if let Ok(batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(&data) {
|
||||
let mut got_notif = false;
|
||||
let mut batch_response =
|
||||
BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize);
|
||||
|
||||
let mut pending_calls: FuturesOrdered<_> = batch
|
||||
.into_iter()
|
||||
.filter_map(|v| {
|
||||
if let Ok(req) = serde_json::from_str::<Request<'_>>(v.get()) {
|
||||
Some(Either::Right(execute_call(req, call.clone())))
|
||||
} else if let Ok(_notif) =
|
||||
serde_json::from_str::<Notification<'_, &JsonRawValue>>(v.get())
|
||||
{
|
||||
// notifications should not be answered.
|
||||
got_notif = true;
|
||||
None
|
||||
} else {
|
||||
// valid JSON but could be not parsable as `InvalidRequest`
|
||||
let id = match serde_json::from_str::<InvalidRequest<'_>>(v.get()) {
|
||||
Ok(err) => err.id,
|
||||
Err(_) => Id::Null,
|
||||
};
|
||||
|
||||
Some(Either::Left(async {
|
||||
MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest))
|
||||
}))
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
while let Some(response) = pending_calls.next().await {
|
||||
if let Err(too_large) = batch_response.append(&response) {
|
||||
return too_large
|
||||
}
|
||||
}
|
||||
|
||||
if got_notif && batch_response.is_empty() {
|
||||
BatchResponse { result: String::new(), success: true }
|
||||
} else {
|
||||
batch_response.finish()
|
||||
}
|
||||
} else {
|
||||
BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::ParseError))
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn process_single_request<L: Logger>(
|
||||
data: Vec<u8>,
|
||||
call: CallData<'_, L>,
|
||||
) -> MethodResponse {
|
||||
if let Ok(req) = serde_json::from_slice::<Request<'_>>(&data) {
|
||||
execute_call_with_tracing(req, call).await
|
||||
} else if let Ok(notif) = serde_json::from_slice::<Notif<'_>>(&data) {
|
||||
execute_notification(notif, call.max_log_length)
|
||||
} else {
|
||||
let (id, code) = prepare_error(&data);
|
||||
MethodResponse::error(id, ErrorObject::from(code))
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(call, req), level = "TRACE")]
|
||||
pub(crate) async fn execute_call_with_tracing<'a, L: Logger>(
|
||||
req: Request<'a>,
|
||||
call: CallData<'_, L>,
|
||||
) -> MethodResponse {
|
||||
execute_call(req, call).await
|
||||
}
|
||||
|
||||
pub(crate) async fn execute_call<L: Logger>(
|
||||
req: Request<'_>,
|
||||
call: CallData<'_, L>,
|
||||
) -> MethodResponse {
|
||||
let CallData {
|
||||
resources,
|
||||
methods,
|
||||
logger,
|
||||
max_response_body_size,
|
||||
max_log_length,
|
||||
conn_id,
|
||||
request_start,
|
||||
} = call;
|
||||
|
||||
rx_log_from_json(&req, call.max_log_length);
|
||||
|
||||
let params = Params::new(req.params.map(|params| params.get()));
|
||||
let name = &req.method;
|
||||
let id = req.id;
|
||||
|
||||
let response = match methods.method_with_name(name) {
|
||||
None => {
|
||||
logger.on_call(
|
||||
name,
|
||||
params.clone(),
|
||||
logger::MethodKind::Unknown,
|
||||
TransportProtocol::Http,
|
||||
);
|
||||
MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound))
|
||||
}
|
||||
Some((name, method)) => match &method.inner() {
|
||||
MethodKind::Sync(callback) => {
|
||||
logger.on_call(
|
||||
name,
|
||||
params.clone(),
|
||||
logger::MethodKind::MethodCall,
|
||||
TransportProtocol::Http,
|
||||
);
|
||||
|
||||
match method.claim(name, resources) {
|
||||
Ok(guard) => {
|
||||
let r = (callback)(id, params, max_response_body_size as usize);
|
||||
drop(guard);
|
||||
r
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"[Methods::execute_with_resources] failed to lock resources: {}",
|
||||
err
|
||||
);
|
||||
MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
|
||||
}
|
||||
}
|
||||
}
|
||||
MethodKind::Async(callback) => {
|
||||
logger.on_call(
|
||||
name,
|
||||
params.clone(),
|
||||
logger::MethodKind::MethodCall,
|
||||
TransportProtocol::Http,
|
||||
);
|
||||
match method.claim(name, resources) {
|
||||
Ok(guard) => {
|
||||
let id = id.into_owned();
|
||||
let params = params.into_owned();
|
||||
|
||||
(callback)(
|
||||
id,
|
||||
params,
|
||||
conn_id,
|
||||
max_response_body_size as usize,
|
||||
Some(guard),
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
"[Methods::execute_with_resources] failed to lock resources: {}",
|
||||
err
|
||||
);
|
||||
MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
|
||||
}
|
||||
}
|
||||
}
|
||||
MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
|
||||
logger.on_call(
|
||||
name,
|
||||
params.clone(),
|
||||
logger::MethodKind::Unknown,
|
||||
TransportProtocol::Http,
|
||||
);
|
||||
tracing::error!("Subscriptions not supported on HTTP");
|
||||
MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
tx_log_from_str(&response.result, max_log_length);
|
||||
logger.on_result(name, response.success, request_start, TransportProtocol::Http);
|
||||
response
|
||||
}
|
||||
|
||||
#[instrument(name = "notification", fields(method = notif.method.as_ref()), skip(notif, max_log_length), level = "TRACE")]
|
||||
fn execute_notification(notif: Notif<'_>, max_log_length: u32) -> MethodResponse {
|
||||
rx_log_from_json(¬if, max_log_length);
|
||||
let response = MethodResponse { result: String::new(), success: true };
|
||||
tx_log_from_str(&response.result, max_log_length);
|
||||
response
|
||||
}
|
||||
|
||||
#[allow(unused)]
|
||||
pub(crate) struct HandleRequest<L: Logger> {
|
||||
pub(crate) methods: Methods,
|
||||
pub(crate) resources: Resources,
|
||||
pub(crate) max_request_body_size: u32,
|
||||
pub(crate) max_response_body_size: u32,
|
||||
pub(crate) max_log_length: u32,
|
||||
pub(crate) batch_requests_supported: bool,
|
||||
pub(crate) logger: L,
|
||||
pub(crate) conn: Arc<OwnedSemaphorePermit>,
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_request<L: Logger>(request: String, input: HandleRequest<L>) -> String {
|
||||
let HandleRequest {
|
||||
methods,
|
||||
resources,
|
||||
max_response_body_size,
|
||||
max_log_length,
|
||||
logger,
|
||||
conn,
|
||||
..
|
||||
} = input;
|
||||
|
||||
enum Kind {
|
||||
Single,
|
||||
Batch,
|
||||
}
|
||||
|
||||
let request_kind = request
|
||||
.chars()
|
||||
.find_map(|c| match c {
|
||||
'{' => Some(Kind::Single),
|
||||
'[' => Some(Kind::Batch),
|
||||
_ => None,
|
||||
})
|
||||
.unwrap_or(Kind::Single);
|
||||
|
||||
let request_start = logger.on_request(TransportProtocol::Http);
|
||||
|
||||
let call = CallData {
|
||||
conn_id: 0,
|
||||
logger: &logger,
|
||||
methods: &methods,
|
||||
max_response_body_size,
|
||||
max_log_length,
|
||||
resources: &resources,
|
||||
request_start,
|
||||
};
|
||||
// Single request or notification
|
||||
let res = if matches!(request_kind, Kind::Single) {
|
||||
let response = process_single_request(request.into_bytes(), call).await;
|
||||
response.result
|
||||
} else {
|
||||
let response = process_batch_request(Batch { data: request.into_bytes(), call }).await;
|
||||
response.result
|
||||
};
|
||||
|
||||
drop(conn);
|
||||
|
||||
res
|
||||
}
|
||||
558
crates/net/ipc/src/server/mod.rs
Normal file
558
crates/net/ipc/src/server/mod.rs
Normal file
@ -0,0 +1,558 @@
|
||||
//! JSON-RPC IPC server implementation
|
||||
|
||||
use crate::server::{
|
||||
connection::{Incoming, IpcConn, JsonRpcStream},
|
||||
future::{ConnectionGuard, FutureDriver, StopHandle},
|
||||
};
|
||||
use futures::{FutureExt, SinkExt, Stream, StreamExt};
|
||||
use jsonrpsee::{
|
||||
core::{
|
||||
server::{resource_limiting::Resources, rpc_module::Methods},
|
||||
Error, TEN_MB_SIZE_BYTES,
|
||||
},
|
||||
server::{logger::Logger, IdProvider, RandomIntegerIdProvider, ServerHandle},
|
||||
};
|
||||
use parity_tokio_ipc::Endpoint;
|
||||
use std::{
|
||||
future::Future,
|
||||
io,
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncWrite},
|
||||
sync::{oneshot, watch, OwnedSemaphorePermit},
|
||||
};
|
||||
use tower::{layer::util::Identity, Service};
|
||||
use tracing::{trace, warn};
|
||||
|
||||
mod connection;
|
||||
mod future;
|
||||
mod ipc;
|
||||
|
||||
/// Ipc Server implementation
|
||||
|
||||
// This is an adapted `jsonrpsee` Server, but for `Ipc` connections.
|
||||
pub struct IpcServer<B = Identity, L = ()> {
|
||||
/// The endpoint we listen for incoming transactions
|
||||
endpoint: Endpoint,
|
||||
resources: Resources,
|
||||
logger: L,
|
||||
id_provider: Arc<dyn IdProvider>,
|
||||
cfg: Settings,
|
||||
service_builder: tower::ServiceBuilder<B>,
|
||||
}
|
||||
|
||||
impl IpcServer {
|
||||
/// Start responding to connections requests.
|
||||
///
|
||||
/// This will run on the tokio runtime until the server is stopped or the ServerHandle is
|
||||
/// dropped.
|
||||
///
|
||||
/// ```
|
||||
/// use jsonrpsee::RpcModule;
|
||||
/// use reth_ipc::server::Builder;
|
||||
/// async fn run_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
/// let server = Builder::default().build("/tmp/my-uds")?;
|
||||
/// let mut module = RpcModule::new(());
|
||||
/// module.register_method("say_hello", |_, _| Ok("lo"))?;
|
||||
/// let handle = server.start(module).await?;
|
||||
///
|
||||
/// // In this example we don't care about doing shutdown so let's it run forever.
|
||||
/// // You may use the `ServerHandle` to shut it down or manage it yourself.
|
||||
/// let server = tokio::spawn(handle.stopped());
|
||||
/// server.await.unwrap();
|
||||
/// Ok(())
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn start(mut self, methods: impl Into<Methods>) -> Result<ServerHandle, Error> {
|
||||
let methods = methods.into().initialize_resources(&self.resources)?;
|
||||
let (stop_tx, stop_rx) = watch::channel(());
|
||||
|
||||
let stop_handle = StopHandle::new(stop_rx);
|
||||
|
||||
// use a signal channel to wait until we're ready to accept connections
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
match self.cfg.tokio_runtime.take() {
|
||||
Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
|
||||
None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
|
||||
};
|
||||
rx.await.expect("channel is open").map_err(Error::Custom)?;
|
||||
|
||||
Ok(ServerHandle::new(stop_tx))
|
||||
}
|
||||
|
||||
#[allow(clippy::let_unit_value)]
|
||||
async fn start_inner(
|
||||
self,
|
||||
methods: Methods,
|
||||
stop_handle: StopHandle,
|
||||
on_ready: oneshot::Sender<Result<(), String>>,
|
||||
) -> io::Result<()> {
|
||||
trace!( endpoint=?self.endpoint.path(), "starting ipc server" );
|
||||
|
||||
if cfg!(unix) {
|
||||
// ensure the file does not exist
|
||||
if std::fs::remove_file(self.endpoint.path()).is_ok() {
|
||||
warn!( endpoint=?self.endpoint.path(), "removed existing file");
|
||||
}
|
||||
}
|
||||
|
||||
let max_request_body_size = self.cfg.max_request_body_size;
|
||||
let max_response_body_size = self.cfg.max_response_body_size;
|
||||
let max_log_length = self.cfg.max_log_length;
|
||||
let resources = self.resources;
|
||||
let id_provider = self.id_provider;
|
||||
let max_subscriptions_per_connection = self.cfg.max_subscriptions_per_connection;
|
||||
let logger = self.logger;
|
||||
|
||||
let mut id: u32 = 0;
|
||||
let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
|
||||
|
||||
let mut connections = FutureDriver::default();
|
||||
let incoming = match self.endpoint.incoming() {
|
||||
Ok(connections) => Incoming::new(connections),
|
||||
Err(err) => {
|
||||
on_ready.send(Err(err.to_string())).ok();
|
||||
return Err(err)
|
||||
}
|
||||
};
|
||||
// signal that we're ready to accept connections
|
||||
on_ready.send(Ok(())).ok();
|
||||
|
||||
let mut incoming = Monitored::new(incoming, &stop_handle);
|
||||
|
||||
trace!("accepting ipc connections");
|
||||
loop {
|
||||
match connections.select_with(&mut incoming).await {
|
||||
Ok(ipc) => {
|
||||
trace!("established new connection");
|
||||
let conn = match connection_guard.try_acquire() {
|
||||
Some(conn) => conn,
|
||||
None => {
|
||||
warn!("Too many connections. Please try again later.");
|
||||
connections.add(ipc.reject_connection().boxed());
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
let tower_service = TowerService {
|
||||
inner: ServiceData {
|
||||
methods: methods.clone(),
|
||||
resources: resources.clone(),
|
||||
max_request_body_size,
|
||||
max_response_body_size,
|
||||
max_log_length,
|
||||
id_provider: id_provider.clone(),
|
||||
stop_handle: stop_handle.clone(),
|
||||
max_subscriptions_per_connection,
|
||||
conn_id: id,
|
||||
logger,
|
||||
conn: Arc::new(conn),
|
||||
},
|
||||
};
|
||||
|
||||
let service = self.service_builder.service(tower_service);
|
||||
connections.add(Box::pin(spawn_connection(ipc, service, stop_handle.clone())));
|
||||
|
||||
id = id.wrapping_add(1);
|
||||
}
|
||||
Err(MonitoredError::Selector(err)) => {
|
||||
tracing::error!("Error while awaiting a new connection: {:?}", err);
|
||||
}
|
||||
Err(MonitoredError::Shutdown) => break,
|
||||
}
|
||||
}
|
||||
|
||||
connections.await;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for IpcServer {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("IpcServer")
|
||||
.field("endpoint", &self.endpoint.path())
|
||||
.field("cfg", &self.cfg)
|
||||
.field("id_provider", &self.id_provider)
|
||||
.field("resources", &self.resources)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// Data required by the server to handle requests.
|
||||
#[derive(Debug, Clone)]
|
||||
#[allow(unused)]
|
||||
pub(crate) struct ServiceData<L: Logger> {
|
||||
/// Registered server methods.
|
||||
pub(crate) methods: Methods,
|
||||
/// Tracker for currently used resources on the server.
|
||||
pub(crate) resources: Resources,
|
||||
/// Max request body size.
|
||||
pub(crate) max_request_body_size: u32,
|
||||
/// Max request body size.
|
||||
pub(crate) max_response_body_size: u32,
|
||||
/// Max length for logging for request and response
|
||||
///
|
||||
/// Logs bigger than this limit will be truncated.
|
||||
pub(crate) max_log_length: u32,
|
||||
/// Subscription ID provider.
|
||||
pub(crate) id_provider: Arc<dyn IdProvider>,
|
||||
/// Stop handle.
|
||||
pub(crate) stop_handle: StopHandle,
|
||||
/// Max subscriptions per connection.
|
||||
pub(crate) max_subscriptions_per_connection: u32,
|
||||
/// Connection ID
|
||||
pub(crate) conn_id: u32,
|
||||
/// Logger.
|
||||
pub(crate) logger: L,
|
||||
/// Handle to hold a `connection permit`.
|
||||
pub(crate) conn: Arc<OwnedSemaphorePermit>,
|
||||
}
|
||||
|
||||
/// JsonRPSee service compatible with `tower`.
|
||||
///
|
||||
/// # Note
|
||||
/// This is similar to [`hyper::service::service_fn`].
|
||||
#[derive(Debug)]
|
||||
pub struct TowerService<L: Logger> {
|
||||
inner: ServiceData<L>,
|
||||
}
|
||||
|
||||
impl<L: Logger> Service<String> for TowerService<L> {
|
||||
type Response = String;
|
||||
|
||||
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
|
||||
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||||
|
||||
/// Opens door for back pressure implementation.
|
||||
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn call(&mut self, request: String) -> Self::Future {
|
||||
trace!("{:?}", request);
|
||||
|
||||
// handle the request
|
||||
let data = ipc::HandleRequest {
|
||||
methods: self.inner.methods.clone(),
|
||||
resources: self.inner.resources.clone(),
|
||||
max_request_body_size: self.inner.max_request_body_size,
|
||||
max_response_body_size: self.inner.max_response_body_size,
|
||||
max_log_length: self.inner.max_log_length,
|
||||
batch_requests_supported: true,
|
||||
logger: self.inner.logger.clone(),
|
||||
conn: self.inner.conn.clone(),
|
||||
};
|
||||
Box::pin(ipc::handle_request(request, data).map(Ok))
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns the connection in a new task
|
||||
async fn spawn_connection<S, T>(
|
||||
conn: IpcConn<JsonRpcStream<T>>,
|
||||
mut service: S,
|
||||
mut stop_handle: StopHandle,
|
||||
) where
|
||||
S: Service<String, Response = String> + Send + 'static,
|
||||
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
|
||||
S::Future: Send,
|
||||
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
{
|
||||
let task = tokio::task::spawn(async move {
|
||||
tokio::pin!(conn);
|
||||
|
||||
loop {
|
||||
let request = tokio::select! {
|
||||
res = conn.next() => {
|
||||
match res {
|
||||
Some(Ok(request)) => {
|
||||
request
|
||||
},
|
||||
Some(Err(e)) => {
|
||||
tracing::warn!("Request failed: {:?}", e);
|
||||
break
|
||||
}
|
||||
None => {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
_ = stop_handle.shutdown() => {
|
||||
break
|
||||
}
|
||||
};
|
||||
|
||||
// handle the RPC request
|
||||
let resp = match service.call(request).await {
|
||||
Ok(resp) => resp,
|
||||
Err(err) => err.into().to_string(),
|
||||
};
|
||||
|
||||
// send back
|
||||
if let Err(err) = conn.send(resp).await {
|
||||
warn!("Failed to send response: {:?}", err);
|
||||
break
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
task.await.ok();
|
||||
}
|
||||
|
||||
/// This is a glorified select listening for new messages, while also checking the `stop_receiver`
|
||||
/// signal.
|
||||
struct Monitored<'a, F> {
|
||||
future: F,
|
||||
stop_monitor: &'a StopHandle,
|
||||
}
|
||||
|
||||
impl<'a, F> Monitored<'a, F> {
|
||||
fn new(future: F, stop_monitor: &'a StopHandle) -> Self {
|
||||
Monitored { future, stop_monitor }
|
||||
}
|
||||
}
|
||||
|
||||
enum MonitoredError<E> {
|
||||
Shutdown,
|
||||
Selector(E),
|
||||
}
|
||||
|
||||
impl<'a, T, Item> Future for Monitored<'a, Incoming<T, Item>>
|
||||
where
|
||||
T: Stream<Item = io::Result<Item>> + Unpin + 'static,
|
||||
Item: AsyncRead + AsyncWrite,
|
||||
{
|
||||
type Output = Result<IpcConn<JsonRpcStream<Item>>, MonitoredError<io::Error>>;
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
|
||||
if this.stop_monitor.shutdown_requested() {
|
||||
return Poll::Ready(Err(MonitoredError::Shutdown))
|
||||
}
|
||||
|
||||
this.future.poll_accept(cx).map_err(MonitoredError::Selector)
|
||||
}
|
||||
}
|
||||
|
||||
/// JSON-RPC IPC server settings.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Settings {
|
||||
/// Maximum size in bytes of a request.
|
||||
max_request_body_size: u32,
|
||||
/// Maximum size in bytes of a response.
|
||||
max_response_body_size: u32,
|
||||
/// Max length for logging for requests and responses
|
||||
///
|
||||
/// Logs bigger than this limit will be truncated.
|
||||
max_log_length: u32,
|
||||
/// Maximum number of incoming connections allowed.
|
||||
max_connections: u32,
|
||||
/// Maximum number of subscriptions per connection.
|
||||
max_subscriptions_per_connection: u32,
|
||||
/// Custom tokio runtime to run the server on.
|
||||
tokio_runtime: Option<tokio::runtime::Handle>,
|
||||
}
|
||||
|
||||
impl Default for Settings {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
max_request_body_size: TEN_MB_SIZE_BYTES,
|
||||
max_response_body_size: TEN_MB_SIZE_BYTES,
|
||||
max_log_length: 4096,
|
||||
max_connections: 100,
|
||||
max_subscriptions_per_connection: 1024,
|
||||
tokio_runtime: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder to configure and create a JSON-RPC server
|
||||
#[derive(Debug)]
|
||||
pub struct Builder<B = Identity, L = ()> {
|
||||
settings: Settings,
|
||||
resources: Resources,
|
||||
logger: L,
|
||||
id_provider: Arc<dyn IdProvider>,
|
||||
service_builder: tower::ServiceBuilder<B>,
|
||||
}
|
||||
|
||||
impl Default for Builder {
|
||||
fn default() -> Self {
|
||||
Builder {
|
||||
settings: Settings::default(),
|
||||
resources: Resources::default(),
|
||||
logger: (),
|
||||
id_provider: Arc::new(RandomIntegerIdProvider),
|
||||
service_builder: tower::ServiceBuilder::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<B, L> Builder<B, L> {
|
||||
/// Set the maximum size of a request body in bytes. Default is 10 MiB.
|
||||
pub fn max_request_body_size(mut self, size: u32) -> Self {
|
||||
self.settings.max_request_body_size = size;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the maximum size of a response body in bytes. Default is 10 MiB.
|
||||
pub fn max_response_body_size(mut self, size: u32) -> Self {
|
||||
self.settings.max_response_body_size = size;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the maximum size of a log
|
||||
pub fn max_log_length(mut self, size: u32) -> Self {
|
||||
self.settings.max_log_length = size;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the maximum number of connections allowed. Default is 100.
|
||||
pub fn max_connections(mut self, max: u32) -> Self {
|
||||
self.settings.max_connections = max;
|
||||
self
|
||||
}
|
||||
|
||||
/// Set the maximum number of connections allowed. Default is 1024.
|
||||
pub fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
|
||||
self.settings.max_subscriptions_per_connection = max;
|
||||
self
|
||||
}
|
||||
|
||||
/// Register a new resource kind. Errors if `label` is already registered, or if the number of
|
||||
/// registered resources on this server instance would exceed 8.
|
||||
///
|
||||
/// See the module documentation for
|
||||
/// [`resurce_limiting`](../jsonrpsee_utils/server/resource_limiting/index.html#
|
||||
/// resource-limiting) for details.
|
||||
pub fn register_resource(
|
||||
mut self,
|
||||
label: &'static str,
|
||||
capacity: u16,
|
||||
default: u16,
|
||||
) -> Result<Self, Error> {
|
||||
self.resources.register(label, capacity, default)?;
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
/// Add a logger to the builder [`Logger`].
|
||||
pub fn set_logger<T: Logger>(self, logger: T) -> Builder<B, T> {
|
||||
Builder {
|
||||
settings: self.settings,
|
||||
resources: self.resources,
|
||||
logger,
|
||||
id_provider: self.id_provider,
|
||||
service_builder: self.service_builder,
|
||||
}
|
||||
}
|
||||
|
||||
/// Configure a custom [`tokio::runtime::Handle`] to run the server on.
|
||||
///
|
||||
/// Default: [`tokio::spawn`]
|
||||
pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
|
||||
self.settings.tokio_runtime = Some(rt);
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure custom `subscription ID` provider for the server to use
|
||||
/// to when getting new subscription calls.
|
||||
///
|
||||
/// You may choose static dispatch or dynamic dispatch because
|
||||
/// `IdProvider` is implemented for `Box<T>`.
|
||||
///
|
||||
/// Default: [`RandomIntegerIdProvider`].
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
/// use jsonrpsee::server::RandomStringIdProvider;
|
||||
/// use reth_ipc::server::Builder;
|
||||
///
|
||||
/// // static dispatch
|
||||
/// let builder1 = Builder::default().set_id_provider(RandomStringIdProvider::new(16));
|
||||
///
|
||||
/// // or dynamic dispatch
|
||||
/// let builder2 = Builder::default().set_id_provider(Box::new(RandomStringIdProvider::new(16)));
|
||||
/// ```
|
||||
pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
|
||||
self.id_provider = Arc::new(id_provider);
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure a custom [`tower::ServiceBuilder`] middleware for composing layers to be applied
|
||||
/// to the RPC service.
|
||||
///
|
||||
/// Default: No tower layers are applied to the RPC service.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```rust
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let builder = tower::ServiceBuilder::new();
|
||||
///
|
||||
/// let server = reth_ipc::server::Builder::default()
|
||||
/// .set_middleware(builder)
|
||||
/// .build("/tmp/my-uds")
|
||||
/// .unwrap();
|
||||
/// }
|
||||
/// ```
|
||||
pub fn set_middleware<T>(self, service_builder: tower::ServiceBuilder<T>) -> Builder<T, L> {
|
||||
Builder {
|
||||
settings: self.settings,
|
||||
resources: self.resources,
|
||||
logger: self.logger,
|
||||
id_provider: self.id_provider,
|
||||
service_builder,
|
||||
}
|
||||
}
|
||||
|
||||
/// Finalize the configuration of the server. Consumes the [`Builder`].
|
||||
pub fn build(self, endpoint: impl AsRef<str>) -> Result<IpcServer<B, L>, Error> {
|
||||
let endpoint = Endpoint::new(endpoint.as_ref().to_string());
|
||||
self.build_with_endpoint(endpoint)
|
||||
}
|
||||
|
||||
/// Finalize the configuration of the server. Consumes the [`Builder`].
|
||||
pub fn build_with_endpoint(self, endpoint: Endpoint) -> Result<IpcServer<B, L>, Error> {
|
||||
Ok(IpcServer {
|
||||
endpoint,
|
||||
cfg: self.settings,
|
||||
resources: self.resources,
|
||||
logger: self.logger,
|
||||
id_provider: self.id_provider,
|
||||
service_builder: self.service_builder,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::client::IpcClientBuilder;
|
||||
use jsonrpsee::{core::client::ClientT, rpc_params, RpcModule};
|
||||
use parity_tokio_ipc::dummy_endpoint;
|
||||
use tracing_test::traced_test;
|
||||
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn test_rpc_request() {
|
||||
let endpoint = dummy_endpoint();
|
||||
let server = Builder::default().build(&endpoint).unwrap();
|
||||
let mut module = RpcModule::new(());
|
||||
let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
|
||||
module.register_method("eth_chainId", move |_, _| Ok(msg)).unwrap();
|
||||
let handle = server.start(module).await.unwrap();
|
||||
tokio::spawn(handle.stopped());
|
||||
|
||||
let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
|
||||
let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
|
||||
assert_eq!(response, msg);
|
||||
}
|
||||
}
|
||||
307
crates/net/ipc/src/stream_codec.rs
Normal file
307
crates/net/ipc/src/stream_codec.rs
Normal file
@ -0,0 +1,307 @@
|
||||
// Copyright (c) 2015-2017 Parity Technologies Limited
|
||||
//
|
||||
// Permission is hereby granted, free of charge, to any
|
||||
// person obtaining a copy of this software and associated
|
||||
// documentation files (the "Software"), to deal in the
|
||||
// Software without restriction, including without
|
||||
// limitation the rights to use, copy, modify, merge,
|
||||
// publish, distribute, sublicense, and/or sell copies of
|
||||
// the Software, and to permit persons to whom the Software
|
||||
// is furnished to do so, subject to the following
|
||||
// conditions:
|
||||
//
|
||||
// The above copyright notice and this permission notice
|
||||
// shall be included in all copies or substantial portions
|
||||
// of the Software.
|
||||
//
|
||||
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
|
||||
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
|
||||
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
|
||||
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
|
||||
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
|
||||
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
||||
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
|
||||
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
// DEALINGS IN THE SOFTWARE.
|
||||
|
||||
// This basis of this file has been taken from the deprecated jsonrpc codebase:
|
||||
// https://github.com/paritytech/jsonrpc
|
||||
|
||||
use bytes::BytesMut;
|
||||
use std::{io, str};
|
||||
|
||||
/// Separator for enveloping messages in streaming codecs
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum Separator {
|
||||
/// No envelope is expected between messages. Decoder will try to figure out
|
||||
/// message boundaries by accumulating incoming bytes until valid JSON is formed.
|
||||
/// Encoder will send messages without any boundaries between requests.
|
||||
Empty,
|
||||
/// Byte is used as a sentinel between messages
|
||||
Byte(u8),
|
||||
}
|
||||
|
||||
impl Default for Separator {
|
||||
fn default() -> Self {
|
||||
Separator::Byte(b'\n')
|
||||
}
|
||||
}
|
||||
|
||||
/// Stream codec for streaming protocols (ipc, tcp)
|
||||
#[derive(Debug, Default)]
|
||||
pub struct StreamCodec {
|
||||
incoming_separator: Separator,
|
||||
outgoing_separator: Separator,
|
||||
}
|
||||
|
||||
impl StreamCodec {
|
||||
/// Default codec with streaming input data. Input can be both enveloped and not.
|
||||
pub fn stream_incoming() -> Self {
|
||||
StreamCodec::new(Separator::Empty, Default::default())
|
||||
}
|
||||
|
||||
/// New custom stream codec
|
||||
pub fn new(incoming_separator: Separator, outgoing_separator: Separator) -> Self {
|
||||
StreamCodec { incoming_separator, outgoing_separator }
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn is_whitespace(byte: u8) -> bool {
|
||||
matches!(byte, 0x0D | 0x0A | 0x20 | 0x09)
|
||||
}
|
||||
|
||||
impl tokio_util::codec::Decoder for StreamCodec {
|
||||
type Item = String;
|
||||
type Error = io::Error;
|
||||
|
||||
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
|
||||
if let Separator::Byte(separator) = self.incoming_separator {
|
||||
if let Some(i) = buf.as_ref().iter().position(|&b| b == separator) {
|
||||
let line = buf.split_to(i);
|
||||
let _ = buf.split_to(1);
|
||||
|
||||
match str::from_utf8(line.as_ref()) {
|
||||
Ok(s) => Ok(Some(s.to_string())),
|
||||
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid UTF-8")),
|
||||
}
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
} else {
|
||||
let mut depth = 0;
|
||||
let mut in_str = false;
|
||||
let mut is_escaped = false;
|
||||
let mut start_idx = 0;
|
||||
let mut whitespaces = 0;
|
||||
|
||||
for idx in 0..buf.as_ref().len() {
|
||||
let byte = buf.as_ref()[idx];
|
||||
|
||||
if (byte == b'{' || byte == b'[') && !in_str {
|
||||
if depth == 0 {
|
||||
start_idx = idx;
|
||||
}
|
||||
depth += 1;
|
||||
} else if (byte == b'}' || byte == b']') && !in_str {
|
||||
depth -= 1;
|
||||
} else if byte == b'"' && !is_escaped {
|
||||
in_str = !in_str;
|
||||
} else if is_whitespace(byte) {
|
||||
whitespaces += 1;
|
||||
}
|
||||
if byte == b'\\' && !is_escaped && in_str {
|
||||
is_escaped = true;
|
||||
} else {
|
||||
is_escaped = false;
|
||||
}
|
||||
|
||||
if depth == 0 && idx != start_idx && idx - start_idx + 1 > whitespaces {
|
||||
let bts = buf.split_to(idx + 1);
|
||||
return match String::from_utf8(bts.as_ref().to_vec()) {
|
||||
Ok(val) => Ok(Some(val)),
|
||||
Err(_) => Ok(None),
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl tokio_util::codec::Encoder<String> for StreamCodec {
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
|
||||
let mut payload = msg.into_bytes();
|
||||
if let Separator::Byte(separator) = self.outgoing_separator {
|
||||
payload.push(separator);
|
||||
}
|
||||
buf.extend_from_slice(&payload);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use bytes::{BufMut, BytesMut};
|
||||
use tokio_util::codec::Decoder;
|
||||
|
||||
#[test]
|
||||
fn simple_encode() {
|
||||
let mut buf = BytesMut::with_capacity(2048);
|
||||
buf.put_slice(b"{ test: 1 }{ test: 2 }{ test: 3 }");
|
||||
|
||||
let mut codec = StreamCodec::stream_incoming();
|
||||
|
||||
let request = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in simple test")
|
||||
.expect("There should be at least one request in simple test");
|
||||
|
||||
assert_eq!(request, "{ test: 1 }");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn escape() {
|
||||
let mut buf = BytesMut::with_capacity(2048);
|
||||
buf.put_slice(br#"{ test: "\"\\" }{ test: "\ " }{ test: "\}" }[ test: "\]" ]"#);
|
||||
|
||||
let mut codec = StreamCodec::stream_incoming();
|
||||
|
||||
let request = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in first escape test")
|
||||
.expect("There should be a request in first escape test");
|
||||
|
||||
assert_eq!(request, r#"{ test: "\"\\" }"#);
|
||||
|
||||
let request2 = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in 2nd escape test")
|
||||
.expect("There should be a request in 2nd escape test");
|
||||
assert_eq!(request2, r#"{ test: "\ " }"#);
|
||||
|
||||
let request3 = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in 3rd escape test")
|
||||
.expect("There should be a request in 3rd escape test");
|
||||
assert_eq!(request3, r#"{ test: "\}" }"#);
|
||||
|
||||
let request4 = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in 4th escape test")
|
||||
.expect("There should be a request in 4th escape test");
|
||||
assert_eq!(request4, r#"[ test: "\]" ]"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn whitespace() {
|
||||
let mut buf = BytesMut::with_capacity(2048);
|
||||
buf.put_slice(b"{ test: 1 }\n\n\n\n{ test: 2 }\n\r{\n test: 3 } ");
|
||||
|
||||
let mut codec = StreamCodec::stream_incoming();
|
||||
|
||||
let request = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in first whitespace test")
|
||||
.expect("There should be a request in first whitespace test");
|
||||
|
||||
assert_eq!(request, "{ test: 1 }");
|
||||
|
||||
let request2 = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in first 2nd test")
|
||||
.expect("There should be aa request in 2nd whitespace test");
|
||||
// TODO: maybe actually trim it out
|
||||
assert_eq!(request2, "\n\n\n\n{ test: 2 }");
|
||||
|
||||
let request3 = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in first 3rd test")
|
||||
.expect("There should be a request in 3rd whitespace test");
|
||||
assert_eq!(request3, "\n\r{\n test: 3 }");
|
||||
|
||||
let request4 = codec.decode(&mut buf).expect("There should be no error in first 4th test");
|
||||
assert!(
|
||||
request4.is_none(),
|
||||
"There should be no 4th request because it contains only whitespaces"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fragmented_encode() {
|
||||
let mut buf = BytesMut::with_capacity(2048);
|
||||
buf.put_slice(b"{ test: 1 }{ test: 2 }{ tes");
|
||||
|
||||
let mut codec = StreamCodec::stream_incoming();
|
||||
|
||||
let request = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in first fragmented test")
|
||||
.expect("There should be at least one request in first fragmented test");
|
||||
assert_eq!(request, "{ test: 1 }");
|
||||
codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in second fragmented test")
|
||||
.expect("There should be at least one request in second fragmented test");
|
||||
assert_eq!(String::from_utf8(buf.as_ref().to_vec()).unwrap(), "{ tes");
|
||||
|
||||
buf.put_slice(b"t: 3 }");
|
||||
let request = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in third fragmented test")
|
||||
.expect("There should be at least one request in third fragmented test");
|
||||
assert_eq!(request, "{ test: 3 }");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn huge() {
|
||||
let request = r#"
|
||||
{
|
||||
"jsonrpc":"2.0",
|
||||
"method":"say_hello",
|
||||
"params": [
|
||||
42,
|
||||
0,
|
||||
{
|
||||
"from":"0xb60e8dd61c5d32be8058bb8eb970870f07233155",
|
||||
"gas":"0x2dc6c0",
|
||||
"data":"0x606060405260003411156010576002565b6001805433600160a060020a0319918216811790925560028054909116909117905561291f806100406000396000f3606060405236156100e55760e060020a600035046304029f2381146100ed5780630a1273621461015f57806317c1dd87146102335780631f9ea25d14610271578063266fa0e91461029357806349593f5314610429578063569aa0d8146104fc57806359a4669f14610673578063647a4d5f14610759578063656104f5146108095780636e9febfe1461082b57806370de8c6e1461090d57806371bde852146109ed5780638f30435d14610ab4578063916dbc1714610da35780639f5a7cd414610eef578063c91540f614610fe6578063eae99e1c146110b5578063fedc2a281461115a575b61122d610002565b61122d6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050606435600154600090600160a060020a03908116339091161461233357610002565b61122f6004808035906020019082018035906020019191908080601f016020809104026020016040519081016040528093929190818152602001838380828437509496505093359350506044359150506064355b60006000600060005086604051808280519060200190808383829060006004602084601f0104600f02600301f1509050019150509081526020016040518091039020600050905042816005016000508560ff1660028110156100025760040201835060010154604060020a90046001604060020a0316116115df576115d6565b6112416004355b604080516001604060020a038316408152606060020a33600160a060020a031602602082015290519081900360340190205b919050565b61122d600435600254600160a060020a0390811633909116146128e357610002565b61125e6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050505060006000600060006000600060005087604051808280519060200190808383829060006004602084601f0104600f02600301f1509050019150509081526020016040518091039020600050905080600001600050600087600160a060020a0316815260200190815260200160002060005060000160059054906101000a90046001604060020a03169450845080600001600050600087600160a060020a03168152602001908152602001600020600050600001600d9054906101000a90046001604060020a03169350835080600001600050600087600160a060020a0316815260200190815260200160002060005060000160009054906101000a900460ff169250825080600001600050600087600160a060020a0316815260200190815260200160002060005060000160019054906101000a900463ffffffff16915081505092959194509250565b61122d6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050606435608435600060006000600060005088604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509250346000141515611c0e5760405133600160a060020a0316908290349082818181858883f193505050501515611c1a57610002565b6112996004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050600060006000600060006000600060006000508a604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509050806001016000508960ff16600281101561000257600160a060020a038a168452828101600101602052604084205463ffffffff1698506002811015610002576040842054606060020a90046001604060020a031697506002811015610002576040842054640100000000900463ffffffff169650600281101561000257604084206001015495506002811015610002576040842054604060020a900463ffffffff169450600281101561000257505060409091205495999498509296509094509260a060020a90046001604060020a0316919050565b61122d6004808035906020019082018035906020019191908080601f016020809104026020016040519081016040528093929190818152602001838380828437509496505050505050506000600060005082604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509050348160050160005082600d0160009054906101000a900460ff1660ff16600281101561000257600402830160070180546001608060020a0381169093016001608060020a03199390931692909217909155505b5050565b6112e26004808035906020019082018035906020019191908080601f01602080910003423423094734987103498712093847102938740192387401349857109487501938475"
|
||||
}
|
||||
]
|
||||
}"#;
|
||||
|
||||
let mut buf = BytesMut::with_capacity(65536);
|
||||
buf.put_slice(request.as_bytes());
|
||||
|
||||
let mut codec = StreamCodec::stream_incoming();
|
||||
|
||||
let parsed_request = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in huge test")
|
||||
.expect("There should be at least one request huge test");
|
||||
assert_eq!(request, parsed_request);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn simple_line_codec() {
|
||||
let mut buf = BytesMut::with_capacity(2048);
|
||||
buf.put_slice(b"{ test: 1 }\n{ test: 2 }\n{ test: 3 }");
|
||||
|
||||
let mut codec = StreamCodec::default();
|
||||
|
||||
let request = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in simple test")
|
||||
.expect("There should be at least one request in simple test");
|
||||
let request2 = codec
|
||||
.decode(&mut buf)
|
||||
.expect("There should be no error in simple test")
|
||||
.expect("There should be at least one request in simple test");
|
||||
|
||||
assert_eq!(request, "{ test: 1 }");
|
||||
assert_eq!(request2, "{ test: 2 }");
|
||||
}
|
||||
}
|
||||
@ -15,7 +15,7 @@ reth-primitives = { path = "../../primitives" }
|
||||
reth-rpc-types = { path = "../rpc-types" }
|
||||
|
||||
# misc
|
||||
jsonrpsee = { version = "0.15", features = ["server", "macros"] }
|
||||
jsonrpsee = { version = "0.16", features = ["server", "macros"] }
|
||||
serde_json = "1.0"
|
||||
|
||||
[features]
|
||||
|
||||
@ -17,7 +17,7 @@ reth-rpc-types = { path = "../rpc-types" }
|
||||
reth-transaction-pool = { path = "../../transaction-pool" }
|
||||
|
||||
# rpc
|
||||
jsonrpsee = { version = "0.15" }
|
||||
jsonrpsee = { version = "0.16" }
|
||||
|
||||
# misc
|
||||
async-trait = "0.1"
|
||||
|
||||
Reference in New Issue
Block a user