refactor: pull out rpc/ from net/rpc (#1147)

This commit is contained in:
Georgios Konstantopoulos
2023-02-02 14:17:26 -08:00
committed by GitHub
parent 1f64d8e9d7
commit e048718ea2
71 changed files with 24 additions and 24 deletions

34
crates/rpc/ipc/Cargo.toml Normal file
View File

@ -0,0 +1,34 @@
[package]
name = "reth-ipc"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/paradigmxyz/reth"
readme = "README.md"
description = """
IPC support for reth
"""
[dependencies]
# async/net
futures = "0.3"
parity-tokio-ipc = "0.9.0"
tokio = { version = "1", features = ["net", "time", "rt-multi-thread"] }
tokio-util = { version = "0.7", features = ["codec"] }
async-trait = "0.1"
pin-project = "1.0"
tower = "0.4"
# misc
jsonrpsee = { version = "0.16", features = ["server", "client"] }
serde_json = "1.0"
tracing = "0.1.37"
bytes = "1.2.1"
thiserror = "1.0.37"
[dev-dependencies]
tracing-test = "0.2"
[features]
client = ["jsonrpsee/client", "jsonrpsee/async-client"]

3
crates/rpc/ipc/README.md Normal file
View File

@ -0,0 +1,3 @@
# <h1 align="center"> reth-ipc </h1>
IPC server and client implementation for [`jsonrpsee`](https://github.com/paritytech/jsonrpsee/).

View File

@ -0,0 +1,151 @@
//! [`jsonrpsee`] transport adapter implementation for IPC.
use crate::stream_codec::StreamCodec;
use futures::StreamExt;
use jsonrpsee::{
async_client::{Client, ClientBuilder},
core::client::{ReceivedMessage, TransportReceiverT, TransportSenderT},
};
use std::{
io,
path::{Path, PathBuf},
};
use tokio::{io::AsyncWriteExt, net::UnixStream};
use tokio_util::codec::FramedRead;
/// Builder type for [`Client`]
#[derive(Clone, Default, Debug)]
#[non_exhaustive]
pub struct IpcClientBuilder;
impl IpcClientBuilder {
/// Connects to a IPC socket
pub async fn build(self, path: impl AsRef<Path>) -> Result<Client, IpcError> {
let (tx, rx) = IpcTransportClientBuilder::default().build(path).await?;
Ok(self.build_with_tokio(tx, rx))
}
/// Uses the sender and receiver channels to connect to the socket.
pub fn build_with_tokio<S, R>(self, sender: S, receiver: R) -> Client
where
S: TransportSenderT + Send,
R: TransportReceiverT + Send,
{
ClientBuilder::default().build_with_tokio(sender, receiver)
}
}
/// Sending end of IPC transport.
#[derive(Debug)]
pub struct Sender {
inner: tokio::net::unix::OwnedWriteHalf,
}
#[async_trait::async_trait]
impl TransportSenderT for Sender {
type Error = IpcError;
/// Sends out a request. Returns a Future that finishes when the request has been successfully
/// sent.
async fn send(&mut self, msg: String) -> Result<(), Self::Error> {
Ok(self.inner.write_all(msg.as_bytes()).await?)
}
async fn send_ping(&mut self) -> Result<(), Self::Error> {
tracing::trace!("send ping - not implemented");
Err(IpcError::NotSupported)
}
/// Close the connection.
async fn close(&mut self) -> Result<(), Self::Error> {
Ok(())
}
}
/// Receiving end of IPC transport.
#[derive(Debug)]
pub struct Receiver {
inner: FramedRead<tokio::net::unix::OwnedReadHalf, StreamCodec>,
}
#[async_trait::async_trait]
impl TransportReceiverT for Receiver {
type Error = IpcError;
/// Returns a Future resolving when the server sent us something back.
async fn receive(&mut self) -> Result<ReceivedMessage, Self::Error> {
match self.inner.next().await {
None => Err(IpcError::Closed),
Some(val) => Ok(ReceivedMessage::Text(val?)),
}
}
}
/// Builder for IPC transport [`Sender`] and ['Receiver`] pair.
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct IpcTransportClientBuilder;
impl IpcTransportClientBuilder {
/// Try to establish the connection.
///
/// ```
/// use jsonrpsee::rpc_params;
/// use reth_ipc::client::IpcClientBuilder;
/// use jsonrpsee::core::client::ClientT;
/// # async fn run_client() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let client = IpcClientBuilder::default().build("/tmp/my-uds").await?;
/// let response: String = client.request("say_hello", rpc_params![]).await?;
/// # Ok(())
/// # }
/// ```
pub async fn build(self, path: impl AsRef<Path>) -> Result<(Sender, Receiver), IpcError> {
let path = path.as_ref();
let stream = UnixStream::connect(path)
.await
.map_err(|err| IpcError::FailedToConnect { path: path.to_path_buf(), err })?;
let (rhlf, whlf) = stream.into_split();
Ok((
Sender { inner: whlf },
Receiver { inner: FramedRead::new(rhlf, StreamCodec::stream_incoming()) },
))
}
}
/// Error variants that can happen in IPC transport.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
pub enum IpcError {
/// Operation not supported
#[error("Operation not supported")]
NotSupported,
/// Stream was closed
#[error("Stream closed")]
Closed,
/// Thrown when failed to establish a socket connection.
#[error("Failed to connect to socket {path}: {err}")]
FailedToConnect {
/// The path of the socket.
path: PathBuf,
err: io::Error,
},
#[error(transparent)]
Io(#[from] io::Error),
}
#[cfg(test)]
mod tests {
use super::*;
use parity_tokio_ipc::{dummy_endpoint, Endpoint};
#[tokio::test]
async fn test_connect() {
let endpoint = dummy_endpoint();
let _incoming = Endpoint::new(endpoint.clone()).incoming().unwrap();
let (tx, rx) = IpcTransportClientBuilder::default().build(endpoint).await.unwrap();
let _ = IpcClientBuilder::default().build_with_tokio(tx, rx);
}
}

14
crates/rpc/ipc/src/lib.rs Normal file
View File

@ -0,0 +1,14 @@
#![warn(missing_debug_implementations, missing_docs, unreachable_pub, unused_crate_dependencies)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Reth IPC implementation
pub mod client;
pub mod server;
/// Json codec implementation
pub mod stream_codec;

View File

@ -0,0 +1,115 @@
//! A IPC connection.
use crate::stream_codec::StreamCodec;
use futures::{ready, Sink, Stream, StreamExt};
use std::{
io,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::codec::Framed;
pub(crate) type JsonRpcStream<T> = Framed<T, StreamCodec>;
/// Wraps a stream of incoming connections.
#[pin_project::pin_project]
pub(crate) struct Incoming<T, Item> {
#[pin]
inner: T,
_marker: PhantomData<Item>,
}
impl<T, Item> Incoming<T, Item>
where
T: Stream<Item = io::Result<Item>> + Unpin + 'static,
Item: AsyncRead + AsyncWrite,
{
/// Create a new instance.
pub(crate) fn new(inner: T) -> Self {
Self { inner, _marker: Default::default() }
}
/// Polls to accept a new incoming connection to the endpoint.
pub(crate) fn poll_accept(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
let res = match ready!(self.poll_next_unpin(cx)) {
None => Err(io::Error::new(io::ErrorKind::ConnectionAborted, "ipc connection closed")),
Some(conn) => conn,
};
Poll::Ready(res)
}
}
impl<T, Item> Stream for Incoming<T, Item>
where
T: Stream<Item = io::Result<Item>> + 'static,
Item: AsyncRead + AsyncWrite,
{
type Item = io::Result<IpcConn<JsonRpcStream<Item>>>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let res = match ready!(this.inner.poll_next(cx)) {
Some(Ok(item)) => {
let framed = IpcConn(tokio_util::codec::Decoder::framed(
StreamCodec::stream_incoming(),
item,
));
Ok(framed)
}
Some(Err(err)) => Err(err),
None => return Poll::Ready(None),
};
Poll::Ready(Some(res))
}
}
#[pin_project::pin_project]
pub(crate) struct IpcConn<T>(#[pin] T);
impl<T> IpcConn<JsonRpcStream<T>>
where
T: AsyncRead + AsyncWrite + Unpin,
{
/// Create a response for when the server is busy and can't accept more requests.
pub(crate) async fn reject_connection(self) {
let mut parts = self.0.into_parts();
let _ = parts.io.write_all(b"Too many connections. Please try again later.").await;
}
}
impl<T> Stream for IpcConn<JsonRpcStream<T>>
where
T: AsyncRead + AsyncWrite,
{
type Item = io::Result<String>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().0.poll_next(cx)
}
}
impl<T> Sink<String> for IpcConn<JsonRpcStream<T>>
where
T: AsyncRead + AsyncWrite,
{
type Error = io::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// NOTE: we always flush here this prevents buffering in the underlying
// `Framed` impl that would cause stalled requests
self.project().0.poll_flush(cx)
}
fn start_send(self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> {
self.project().0.start_send(item)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().0.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().0.poll_close(cx)
}
}

View File

@ -0,0 +1,208 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Utilities for handling async code.
use futures::FutureExt;
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError},
time::{self, Duration, Interval},
};
/// Polling for server stop monitor interval in milliseconds.
const STOP_MONITOR_POLLING_INTERVAL: Duration = Duration::from_millis(1000);
/// This is a flexible collection of futures that need to be driven to completion
/// alongside some other future, such as connection handlers that need to be
/// handled along with a listener for new connections.
///
/// In order to `.await` on these futures and drive them to completion, call
/// `select_with` providing some other future, the result of which you need.
pub(crate) struct FutureDriver<F> {
futures: Vec<F>,
stop_monitor_heartbeat: Interval,
}
impl<F> Default for FutureDriver<F> {
fn default() -> Self {
let mut heartbeat = time::interval(STOP_MONITOR_POLLING_INTERVAL);
heartbeat.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
FutureDriver { futures: Vec::new(), stop_monitor_heartbeat: heartbeat }
}
}
impl<F> FutureDriver<F> {
/// Add a new future to this driver
pub(crate) fn add(&mut self, future: F) {
self.futures.push(future);
}
}
impl<F> FutureDriver<F>
where
F: Future + Unpin,
{
pub(crate) async fn select_with<S: Future>(&mut self, selector: S) -> S::Output {
tokio::pin!(selector);
DriverSelect { selector, driver: self }.await
}
fn drive(&mut self, cx: &mut Context<'_>) {
let mut i = 0;
while i < self.futures.len() {
if self.futures[i].poll_unpin(cx).is_ready() {
// Using `swap_remove` since we don't care about ordering
// but we do care about removing being `O(1)`.
//
// We don't increment `i` in this branch, since we now
// have a shorter length, and potentially a new value at
// current index
self.futures.swap_remove(i);
} else {
i += 1;
}
}
}
fn poll_stop_monitor_heartbeat(&mut self, cx: &mut Context<'_>) {
// We don't care about the ticks of the heartbeat, it's here only
// to periodically wake the `Waker` on `cx`.
let _ = self.stop_monitor_heartbeat.poll_tick(cx);
}
}
impl<F> Future for FutureDriver<F>
where
F: Future + Unpin,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
this.drive(cx);
if this.futures.is_empty() {
Poll::Ready(())
} else {
Poll::Pending
}
}
}
/// This is a glorified select `Future` that will attempt to drive all
/// connection futures `F` to completion on each `poll`, while also
/// handling incoming connections.
struct DriverSelect<'a, S, F> {
selector: S,
driver: &'a mut FutureDriver<F>,
}
impl<'a, R, F> Future for DriverSelect<'a, R, F>
where
R: Future + Unpin,
F: Future + Unpin,
{
type Output = R::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = Pin::into_inner(self);
this.driver.drive(cx);
this.driver.poll_stop_monitor_heartbeat(cx);
this.selector.poll_unpin(cx)
}
}
#[derive(Debug, Clone)]
pub(crate) struct StopHandle(watch::Receiver<()>);
impl StopHandle {
pub(crate) fn new(rx: watch::Receiver<()>) -> Self {
Self(rx)
}
pub(crate) fn shutdown_requested(&self) -> bool {
// if a message has been seen, it means that `stop` has been called.
self.0.has_changed().unwrap_or(true)
}
pub(crate) async fn shutdown(&mut self) {
// Err(_) implies that the `sender` has been dropped.
// Ok(_) implies that `stop` has been called.
let _ = self.0.changed().await;
}
}
/// Server handle.
///
/// When all [`StopHandle`]'s have been `dropped` or `stop` has been called
/// the server will be stopped.
#[derive(Debug, Clone)]
pub(crate) struct ServerHandle(Arc<watch::Sender<()>>);
impl ServerHandle {
/// Wait for the server to stop.
#[allow(unused)]
pub(crate) async fn stopped(self) {
self.0.closed().await
}
}
/// Limits the number of connections.
pub(crate) struct ConnectionGuard(Arc<Semaphore>);
impl ConnectionGuard {
pub(crate) fn new(limit: usize) -> Self {
Self(Arc::new(Semaphore::new(limit)))
}
pub(crate) fn try_acquire(&self) -> Option<OwnedSemaphorePermit> {
match self.0.clone().try_acquire_owned() {
Ok(guard) => Some(guard),
Err(TryAcquireError::Closed) => {
unreachable!("Semaphore::Close is never called and can't be closed; qed")
}
Err(TryAcquireError::NoPermits) => None,
}
}
#[allow(unused)]
pub(crate) fn available_connections(&self) -> usize {
self.0.available_permits()
}
}

View File

@ -0,0 +1,291 @@
//! IPC request handling adapted from [`jsonrpsee`] http request handling
use futures::{stream::FuturesOrdered, StreamExt};
use jsonrpsee::{
core::{
server::{
helpers::{prepare_error, BatchResponse, BatchResponseBuilder, MethodResponse},
resource_limiting::Resources,
rpc_module::{MethodKind, Methods},
},
tracing::{rx_log_from_json, tx_log_from_str},
JsonRawValue,
},
server::{
logger,
logger::{Logger, TransportProtocol},
},
types::{error::ErrorCode, ErrorObject, Id, InvalidRequest, Notification, Params, Request},
};
use std::sync::Arc;
use tokio::sync::OwnedSemaphorePermit;
use tokio_util::either::Either;
use tracing::instrument;
type Notif<'a> = Notification<'a, Option<&'a JsonRawValue>>;
#[derive(Debug, Clone)]
pub(crate) struct Batch<'a, L: Logger> {
data: Vec<u8>,
call: CallData<'a, L>,
}
#[derive(Debug, Clone)]
pub(crate) struct CallData<'a, L: Logger> {
conn_id: usize,
logger: &'a L,
methods: &'a Methods,
max_response_body_size: u32,
max_log_length: u32,
resources: &'a Resources,
request_start: L::Instant,
}
// Batch responses must be sent back as a single message so we read the results from each
// request in the batch and read the results off of a new channel, `rx_batch`, and then send the
// complete batch response back to the client over `tx`.
#[instrument(name = "batch", skip(b), level = "TRACE")]
pub(crate) async fn process_batch_request<L>(b: Batch<'_, L>) -> BatchResponse
where
L: Logger,
{
let Batch { data, call } = b;
if let Ok(batch) = serde_json::from_slice::<Vec<&JsonRawValue>>(&data) {
let mut got_notif = false;
let mut batch_response =
BatchResponseBuilder::new_with_limit(call.max_response_body_size as usize);
let mut pending_calls: FuturesOrdered<_> = batch
.into_iter()
.filter_map(|v| {
if let Ok(req) = serde_json::from_str::<Request<'_>>(v.get()) {
Some(Either::Right(execute_call(req, call.clone())))
} else if let Ok(_notif) =
serde_json::from_str::<Notification<'_, &JsonRawValue>>(v.get())
{
// notifications should not be answered.
got_notif = true;
None
} else {
// valid JSON but could be not parsable as `InvalidRequest`
let id = match serde_json::from_str::<InvalidRequest<'_>>(v.get()) {
Ok(err) => err.id,
Err(_) => Id::Null,
};
Some(Either::Left(async {
MethodResponse::error(id, ErrorObject::from(ErrorCode::InvalidRequest))
}))
}
})
.collect();
while let Some(response) = pending_calls.next().await {
if let Err(too_large) = batch_response.append(&response) {
return too_large
}
}
if got_notif && batch_response.is_empty() {
BatchResponse { result: String::new(), success: true }
} else {
batch_response.finish()
}
} else {
BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::ParseError))
}
}
pub(crate) async fn process_single_request<L: Logger>(
data: Vec<u8>,
call: CallData<'_, L>,
) -> MethodResponse {
if let Ok(req) = serde_json::from_slice::<Request<'_>>(&data) {
execute_call_with_tracing(req, call).await
} else if let Ok(notif) = serde_json::from_slice::<Notif<'_>>(&data) {
execute_notification(notif, call.max_log_length)
} else {
let (id, code) = prepare_error(&data);
MethodResponse::error(id, ErrorObject::from(code))
}
}
#[instrument(name = "method_call", fields(method = req.method.as_ref()), skip(call, req), level = "TRACE")]
pub(crate) async fn execute_call_with_tracing<'a, L: Logger>(
req: Request<'a>,
call: CallData<'_, L>,
) -> MethodResponse {
execute_call(req, call).await
}
pub(crate) async fn execute_call<L: Logger>(
req: Request<'_>,
call: CallData<'_, L>,
) -> MethodResponse {
let CallData {
resources,
methods,
logger,
max_response_body_size,
max_log_length,
conn_id,
request_start,
} = call;
rx_log_from_json(&req, call.max_log_length);
let params = Params::new(req.params.map(|params| params.get()));
let name = &req.method;
let id = req.id;
let response = match methods.method_with_name(name) {
None => {
logger.on_call(
name,
params.clone(),
logger::MethodKind::Unknown,
TransportProtocol::Http,
);
MethodResponse::error(id, ErrorObject::from(ErrorCode::MethodNotFound))
}
Some((name, method)) => match &method.inner() {
MethodKind::Sync(callback) => {
logger.on_call(
name,
params.clone(),
logger::MethodKind::MethodCall,
TransportProtocol::Http,
);
match method.claim(name, resources) {
Ok(guard) => {
let r = (callback)(id, params, max_response_body_size as usize);
drop(guard);
r
}
Err(err) => {
tracing::error!(
"[Methods::execute_with_resources] failed to lock resources: {}",
err
);
MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
}
}
MethodKind::Async(callback) => {
logger.on_call(
name,
params.clone(),
logger::MethodKind::MethodCall,
TransportProtocol::Http,
);
match method.claim(name, resources) {
Ok(guard) => {
let id = id.into_owned();
let params = params.into_owned();
(callback)(
id,
params,
conn_id,
max_response_body_size as usize,
Some(guard),
)
.await
}
Err(err) => {
tracing::error!(
"[Methods::execute_with_resources] failed to lock resources: {}",
err
);
MethodResponse::error(id, ErrorObject::from(ErrorCode::ServerIsBusy))
}
}
}
MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => {
logger.on_call(
name,
params.clone(),
logger::MethodKind::Unknown,
TransportProtocol::Http,
);
tracing::error!("Subscriptions not supported on HTTP");
MethodResponse::error(id, ErrorObject::from(ErrorCode::InternalError))
}
},
};
tx_log_from_str(&response.result, max_log_length);
logger.on_result(name, response.success, request_start, TransportProtocol::Http);
response
}
#[instrument(name = "notification", fields(method = notif.method.as_ref()), skip(notif, max_log_length), level = "TRACE")]
fn execute_notification(notif: Notif<'_>, max_log_length: u32) -> MethodResponse {
rx_log_from_json(&notif, max_log_length);
let response = MethodResponse { result: String::new(), success: true };
tx_log_from_str(&response.result, max_log_length);
response
}
#[allow(unused)]
pub(crate) struct HandleRequest<L: Logger> {
pub(crate) methods: Methods,
pub(crate) resources: Resources,
pub(crate) max_request_body_size: u32,
pub(crate) max_response_body_size: u32,
pub(crate) max_log_length: u32,
pub(crate) batch_requests_supported: bool,
pub(crate) logger: L,
pub(crate) conn: Arc<OwnedSemaphorePermit>,
}
pub(crate) async fn handle_request<L: Logger>(request: String, input: HandleRequest<L>) -> String {
let HandleRequest {
methods,
resources,
max_response_body_size,
max_log_length,
logger,
conn,
..
} = input;
enum Kind {
Single,
Batch,
}
let request_kind = request
.chars()
.find_map(|c| match c {
'{' => Some(Kind::Single),
'[' => Some(Kind::Batch),
_ => None,
})
.unwrap_or(Kind::Single);
let request_start = logger.on_request(TransportProtocol::Http);
let call = CallData {
conn_id: 0,
logger: &logger,
methods: &methods,
max_response_body_size,
max_log_length,
resources: &resources,
request_start,
};
// Single request or notification
let res = if matches!(request_kind, Kind::Single) {
let response = process_single_request(request.into_bytes(), call).await;
response.result
} else {
let response = process_batch_request(Batch { data: request.into_bytes(), call }).await;
response.result
};
drop(conn);
res
}

View File

@ -0,0 +1,559 @@
//! JSON-RPC IPC server implementation
use crate::server::{
connection::{Incoming, IpcConn, JsonRpcStream},
future::{ConnectionGuard, FutureDriver, StopHandle},
};
use futures::{FutureExt, SinkExt, Stream, StreamExt};
use jsonrpsee::{
core::{
server::{resource_limiting::Resources, rpc_module::Methods},
Error, TEN_MB_SIZE_BYTES,
},
server::{logger::Logger, IdProvider, RandomIntegerIdProvider, ServerHandle},
};
use std::{
future::Future,
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::{
io::{AsyncRead, AsyncWrite},
sync::{oneshot, watch, OwnedSemaphorePermit},
};
use tower::{layer::util::Identity, Service};
use tracing::{trace, warn};
// re-export so can be used during builder setup
pub use parity_tokio_ipc::Endpoint;
mod connection;
mod future;
mod ipc;
/// Ipc Server implementation
// This is an adapted `jsonrpsee` Server, but for `Ipc` connections.
pub struct IpcServer<B = Identity, L = ()> {
/// The endpoint we listen for incoming transactions
endpoint: Endpoint,
resources: Resources,
logger: L,
id_provider: Arc<dyn IdProvider>,
cfg: Settings,
service_builder: tower::ServiceBuilder<B>,
}
impl IpcServer {
/// Start responding to connections requests.
///
/// This will run on the tokio runtime until the server is stopped or the ServerHandle is
/// dropped.
///
/// ```
/// use jsonrpsee::RpcModule;
/// use reth_ipc::server::Builder;
/// async fn run_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let server = Builder::default().build("/tmp/my-uds")?;
/// let mut module = RpcModule::new(());
/// module.register_method("say_hello", |_, _| Ok("lo"))?;
/// let handle = server.start(module).await?;
///
/// // In this example we don't care about doing shutdown so let's it run forever.
/// // You may use the `ServerHandle` to shut it down or manage it yourself.
/// let server = tokio::spawn(handle.stopped());
/// server.await.unwrap();
/// Ok(())
/// }
/// ```
pub async fn start(mut self, methods: impl Into<Methods>) -> Result<ServerHandle, Error> {
let methods = methods.into().initialize_resources(&self.resources)?;
let (stop_tx, stop_rx) = watch::channel(());
let stop_handle = StopHandle::new(stop_rx);
// use a signal channel to wait until we're ready to accept connections
let (tx, rx) = oneshot::channel();
match self.cfg.tokio_runtime.take() {
Some(rt) => rt.spawn(self.start_inner(methods, stop_handle, tx)),
None => tokio::spawn(self.start_inner(methods, stop_handle, tx)),
};
rx.await.expect("channel is open").map_err(Error::Custom)?;
Ok(ServerHandle::new(stop_tx))
}
#[allow(clippy::let_unit_value)]
async fn start_inner(
self,
methods: Methods,
stop_handle: StopHandle,
on_ready: oneshot::Sender<Result<(), String>>,
) -> io::Result<()> {
trace!( endpoint=?self.endpoint.path(), "starting ipc server" );
if cfg!(unix) {
// ensure the file does not exist
if std::fs::remove_file(self.endpoint.path()).is_ok() {
warn!( endpoint=?self.endpoint.path(), "removed existing file");
}
}
let max_request_body_size = self.cfg.max_request_body_size;
let max_response_body_size = self.cfg.max_response_body_size;
let max_log_length = self.cfg.max_log_length;
let resources = self.resources;
let id_provider = self.id_provider;
let max_subscriptions_per_connection = self.cfg.max_subscriptions_per_connection;
let logger = self.logger;
let mut id: u32 = 0;
let connection_guard = ConnectionGuard::new(self.cfg.max_connections as usize);
let mut connections = FutureDriver::default();
let incoming = match self.endpoint.incoming() {
Ok(connections) => Incoming::new(connections),
Err(err) => {
on_ready.send(Err(err.to_string())).ok();
return Err(err)
}
};
// signal that we're ready to accept connections
on_ready.send(Ok(())).ok();
let mut incoming = Monitored::new(incoming, &stop_handle);
trace!("accepting ipc connections");
loop {
match connections.select_with(&mut incoming).await {
Ok(ipc) => {
trace!("established new connection");
let conn = match connection_guard.try_acquire() {
Some(conn) => conn,
None => {
warn!("Too many connections. Please try again later.");
connections.add(ipc.reject_connection().boxed());
continue
}
};
let tower_service = TowerService {
inner: ServiceData {
methods: methods.clone(),
resources: resources.clone(),
max_request_body_size,
max_response_body_size,
max_log_length,
id_provider: id_provider.clone(),
stop_handle: stop_handle.clone(),
max_subscriptions_per_connection,
conn_id: id,
logger,
conn: Arc::new(conn),
},
};
let service = self.service_builder.service(tower_service);
connections.add(Box::pin(spawn_connection(ipc, service, stop_handle.clone())));
id = id.wrapping_add(1);
}
Err(MonitoredError::Selector(err)) => {
tracing::error!("Error while awaiting a new connection: {:?}", err);
}
Err(MonitoredError::Shutdown) => break,
}
}
connections.await;
Ok(())
}
}
impl std::fmt::Debug for IpcServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IpcServer")
.field("endpoint", &self.endpoint.path())
.field("cfg", &self.cfg)
.field("id_provider", &self.id_provider)
.field("resources", &self.resources)
.finish()
}
}
/// Data required by the server to handle requests.
#[derive(Debug, Clone)]
#[allow(unused)]
pub(crate) struct ServiceData<L: Logger> {
/// Registered server methods.
pub(crate) methods: Methods,
/// Tracker for currently used resources on the server.
pub(crate) resources: Resources,
/// Max request body size.
pub(crate) max_request_body_size: u32,
/// Max request body size.
pub(crate) max_response_body_size: u32,
/// Max length for logging for request and response
///
/// Logs bigger than this limit will be truncated.
pub(crate) max_log_length: u32,
/// Subscription ID provider.
pub(crate) id_provider: Arc<dyn IdProvider>,
/// Stop handle.
pub(crate) stop_handle: StopHandle,
/// Max subscriptions per connection.
pub(crate) max_subscriptions_per_connection: u32,
/// Connection ID
pub(crate) conn_id: u32,
/// Logger.
pub(crate) logger: L,
/// Handle to hold a `connection permit`.
pub(crate) conn: Arc<OwnedSemaphorePermit>,
}
/// JsonRPSee service compatible with `tower`.
///
/// # Note
/// This is similar to [`hyper::service::service_fn`](https://docs.rs/hyper/latest/hyper/service/fn.service_fn.html).
#[derive(Debug)]
pub struct TowerService<L: Logger> {
inner: ServiceData<L>,
}
impl<L: Logger> Service<String> for TowerService<L> {
type Response = String;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
/// Opens door for back pressure implementation.
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, request: String) -> Self::Future {
trace!("{:?}", request);
// handle the request
let data = ipc::HandleRequest {
methods: self.inner.methods.clone(),
resources: self.inner.resources.clone(),
max_request_body_size: self.inner.max_request_body_size,
max_response_body_size: self.inner.max_response_body_size,
max_log_length: self.inner.max_log_length,
batch_requests_supported: true,
logger: self.inner.logger.clone(),
conn: self.inner.conn.clone(),
};
Box::pin(ipc::handle_request(request, data).map(Ok))
}
}
/// Spawns the connection in a new task
async fn spawn_connection<S, T>(
conn: IpcConn<JsonRpcStream<T>>,
mut service: S,
mut stop_handle: StopHandle,
) where
S: Service<String, Response = String> + Send + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
S::Future: Send,
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
let task = tokio::task::spawn(async move {
tokio::pin!(conn);
loop {
let request = tokio::select! {
res = conn.next() => {
match res {
Some(Ok(request)) => {
request
},
Some(Err(e)) => {
tracing::warn!("Request failed: {:?}", e);
break
}
None => {
return
}
}
}
_ = stop_handle.shutdown() => {
break
}
};
// handle the RPC request
let resp = match service.call(request).await {
Ok(resp) => resp,
Err(err) => err.into().to_string(),
};
// send back
if let Err(err) = conn.send(resp).await {
warn!("Failed to send response: {:?}", err);
break
}
}
});
task.await.ok();
}
/// This is a glorified select listening for new messages, while also checking the `stop_receiver`
/// signal.
struct Monitored<'a, F> {
future: F,
stop_monitor: &'a StopHandle,
}
impl<'a, F> Monitored<'a, F> {
fn new(future: F, stop_monitor: &'a StopHandle) -> Self {
Monitored { future, stop_monitor }
}
}
enum MonitoredError<E> {
Shutdown,
Selector(E),
}
impl<'a, T, Item> Future for Monitored<'a, Incoming<T, Item>>
where
T: Stream<Item = io::Result<Item>> + Unpin + 'static,
Item: AsyncRead + AsyncWrite,
{
type Output = Result<IpcConn<JsonRpcStream<Item>>, MonitoredError<io::Error>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
if this.stop_monitor.shutdown_requested() {
return Poll::Ready(Err(MonitoredError::Shutdown))
}
this.future.poll_accept(cx).map_err(MonitoredError::Selector)
}
}
/// JSON-RPC IPC server settings.
#[derive(Debug, Clone)]
pub struct Settings {
/// Maximum size in bytes of a request.
max_request_body_size: u32,
/// Maximum size in bytes of a response.
max_response_body_size: u32,
/// Max length for logging for requests and responses
///
/// Logs bigger than this limit will be truncated.
max_log_length: u32,
/// Maximum number of incoming connections allowed.
max_connections: u32,
/// Maximum number of subscriptions per connection.
max_subscriptions_per_connection: u32,
/// Custom tokio runtime to run the server on.
tokio_runtime: Option<tokio::runtime::Handle>,
}
impl Default for Settings {
fn default() -> Self {
Self {
max_request_body_size: TEN_MB_SIZE_BYTES,
max_response_body_size: TEN_MB_SIZE_BYTES,
max_log_length: 4096,
max_connections: 100,
max_subscriptions_per_connection: 1024,
tokio_runtime: None,
}
}
}
/// Builder to configure and create a JSON-RPC server
#[derive(Debug)]
pub struct Builder<B = Identity, L = ()> {
settings: Settings,
resources: Resources,
logger: L,
id_provider: Arc<dyn IdProvider>,
service_builder: tower::ServiceBuilder<B>,
}
impl Default for Builder {
fn default() -> Self {
Builder {
settings: Settings::default(),
resources: Resources::default(),
logger: (),
id_provider: Arc::new(RandomIntegerIdProvider),
service_builder: tower::ServiceBuilder::new(),
}
}
}
impl<B, L> Builder<B, L> {
/// Set the maximum size of a request body in bytes. Default is 10 MiB.
pub fn max_request_body_size(mut self, size: u32) -> Self {
self.settings.max_request_body_size = size;
self
}
/// Set the maximum size of a response body in bytes. Default is 10 MiB.
pub fn max_response_body_size(mut self, size: u32) -> Self {
self.settings.max_response_body_size = size;
self
}
/// Set the maximum size of a log
pub fn max_log_length(mut self, size: u32) -> Self {
self.settings.max_log_length = size;
self
}
/// Set the maximum number of connections allowed. Default is 100.
pub fn max_connections(mut self, max: u32) -> Self {
self.settings.max_connections = max;
self
}
/// Set the maximum number of connections allowed. Default is 1024.
pub fn max_subscriptions_per_connection(mut self, max: u32) -> Self {
self.settings.max_subscriptions_per_connection = max;
self
}
/// Register a new resource kind. Errors if `label` is already registered, or if the number of
/// registered resources on this server instance would exceed 8.
///
/// See the module documentation for
/// [`resource_limiting`][jsonrpsee::core::server::resource_limiting] for details.
pub fn register_resource(
mut self,
label: &'static str,
capacity: u16,
default: u16,
) -> Result<Self, Error> {
self.resources.register(label, capacity, default)?;
Ok(self)
}
/// Add a logger to the builder [`Logger`].
pub fn set_logger<T: Logger>(self, logger: T) -> Builder<B, T> {
Builder {
settings: self.settings,
resources: self.resources,
logger,
id_provider: self.id_provider,
service_builder: self.service_builder,
}
}
/// Configure a custom [`tokio::runtime::Handle`] to run the server on.
///
/// Default: [`tokio::spawn`]
pub fn custom_tokio_runtime(mut self, rt: tokio::runtime::Handle) -> Self {
self.settings.tokio_runtime = Some(rt);
self
}
/// Configure custom `subscription ID` provider for the server to use
/// to when getting new subscription calls.
///
/// You may choose static dispatch or dynamic dispatch because
/// `IdProvider` is implemented for `Box<T>`.
///
/// Default: [`RandomIntegerIdProvider`].
///
/// # Examples
///
/// ```rust
/// use jsonrpsee::server::RandomStringIdProvider;
/// use reth_ipc::server::Builder;
///
/// // static dispatch
/// let builder1 = Builder::default().set_id_provider(RandomStringIdProvider::new(16));
///
/// // or dynamic dispatch
/// let builder2 = Builder::default().set_id_provider(Box::new(RandomStringIdProvider::new(16)));
/// ```
pub fn set_id_provider<I: IdProvider + 'static>(mut self, id_provider: I) -> Self {
self.id_provider = Arc::new(id_provider);
self
}
/// Configure a custom [`tower::ServiceBuilder`] middleware for composing layers to be applied
/// to the RPC service.
///
/// Default: No tower layers are applied to the RPC service.
///
/// # Examples
///
/// ```rust
///
/// #[tokio::main]
/// async fn main() {
/// let builder = tower::ServiceBuilder::new();
///
/// let server = reth_ipc::server::Builder::default()
/// .set_middleware(builder)
/// .build("/tmp/my-uds")
/// .unwrap();
/// }
/// ```
pub fn set_middleware<T>(self, service_builder: tower::ServiceBuilder<T>) -> Builder<T, L> {
Builder {
settings: self.settings,
resources: self.resources,
logger: self.logger,
id_provider: self.id_provider,
service_builder,
}
}
/// Finalize the configuration of the server. Consumes the [`Builder`].
pub fn build(self, endpoint: impl AsRef<str>) -> Result<IpcServer<B, L>, Error> {
let endpoint = Endpoint::new(endpoint.as_ref().to_string());
self.build_with_endpoint(endpoint)
}
/// Finalize the configuration of the server. Consumes the [`Builder`].
pub fn build_with_endpoint(self, endpoint: Endpoint) -> Result<IpcServer<B, L>, Error> {
Ok(IpcServer {
endpoint,
cfg: self.settings,
resources: self.resources,
logger: self.logger,
id_provider: self.id_provider,
service_builder: self.service_builder,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::client::IpcClientBuilder;
use jsonrpsee::{core::client::ClientT, rpc_params, RpcModule};
use parity_tokio_ipc::dummy_endpoint;
use tracing_test::traced_test;
#[tokio::test]
#[traced_test]
async fn test_rpc_request() {
let endpoint = dummy_endpoint();
let server = Builder::default().build(&endpoint).unwrap();
let mut module = RpcModule::new(());
let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
module.register_method("eth_chainId", move |_, _| Ok(msg)).unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());
let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
let response: String = client.request("eth_chainId", rpc_params![]).await.unwrap();
assert_eq!(response, msg);
}
}

View File

@ -0,0 +1,307 @@
// Copyright (c) 2015-2017 Parity Technologies Limited
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
// This basis of this file has been taken from the deprecated jsonrpc codebase:
// https://github.com/paritytech/jsonrpc
use bytes::BytesMut;
use std::{io, str};
/// Separator for enveloping messages in streaming codecs
#[derive(Debug, Clone)]
pub enum Separator {
/// No envelope is expected between messages. Decoder will try to figure out
/// message boundaries by accumulating incoming bytes until valid JSON is formed.
/// Encoder will send messages without any boundaries between requests.
Empty,
/// Byte is used as a sentinel between messages
Byte(u8),
}
impl Default for Separator {
fn default() -> Self {
Separator::Byte(b'\n')
}
}
/// Stream codec for streaming protocols (ipc, tcp)
#[derive(Debug, Default)]
pub struct StreamCodec {
incoming_separator: Separator,
outgoing_separator: Separator,
}
impl StreamCodec {
/// Default codec with streaming input data. Input can be both enveloped and not.
pub fn stream_incoming() -> Self {
StreamCodec::new(Separator::Empty, Default::default())
}
/// New custom stream codec
pub fn new(incoming_separator: Separator, outgoing_separator: Separator) -> Self {
StreamCodec { incoming_separator, outgoing_separator }
}
}
#[inline]
fn is_whitespace(byte: u8) -> bool {
matches!(byte, 0x0D | 0x0A | 0x20 | 0x09)
}
impl tokio_util::codec::Decoder for StreamCodec {
type Item = String;
type Error = io::Error;
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<Self::Item>> {
if let Separator::Byte(separator) = self.incoming_separator {
if let Some(i) = buf.as_ref().iter().position(|&b| b == separator) {
let line = buf.split_to(i);
let _ = buf.split_to(1);
match str::from_utf8(line.as_ref()) {
Ok(s) => Ok(Some(s.to_string())),
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "invalid UTF-8")),
}
} else {
Ok(None)
}
} else {
let mut depth = 0;
let mut in_str = false;
let mut is_escaped = false;
let mut start_idx = 0;
let mut whitespaces = 0;
for idx in 0..buf.as_ref().len() {
let byte = buf.as_ref()[idx];
if (byte == b'{' || byte == b'[') && !in_str {
if depth == 0 {
start_idx = idx;
}
depth += 1;
} else if (byte == b'}' || byte == b']') && !in_str {
depth -= 1;
} else if byte == b'"' && !is_escaped {
in_str = !in_str;
} else if is_whitespace(byte) {
whitespaces += 1;
}
if byte == b'\\' && !is_escaped && in_str {
is_escaped = true;
} else {
is_escaped = false;
}
if depth == 0 && idx != start_idx && idx - start_idx + 1 > whitespaces {
let bts = buf.split_to(idx + 1);
return match String::from_utf8(bts.as_ref().to_vec()) {
Ok(val) => Ok(Some(val)),
Err(_) => Ok(None),
}
}
}
Ok(None)
}
}
}
impl tokio_util::codec::Encoder<String> for StreamCodec {
type Error = io::Error;
fn encode(&mut self, msg: String, buf: &mut BytesMut) -> io::Result<()> {
let mut payload = msg.into_bytes();
if let Separator::Byte(separator) = self.outgoing_separator {
payload.push(separator);
}
buf.extend_from_slice(&payload);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::{BufMut, BytesMut};
use tokio_util::codec::Decoder;
#[test]
fn simple_encode() {
let mut buf = BytesMut::with_capacity(2048);
buf.put_slice(b"{ test: 1 }{ test: 2 }{ test: 3 }");
let mut codec = StreamCodec::stream_incoming();
let request = codec
.decode(&mut buf)
.expect("There should be no error in simple test")
.expect("There should be at least one request in simple test");
assert_eq!(request, "{ test: 1 }");
}
#[test]
fn escape() {
let mut buf = BytesMut::with_capacity(2048);
buf.put_slice(br#"{ test: "\"\\" }{ test: "\ " }{ test: "\}" }[ test: "\]" ]"#);
let mut codec = StreamCodec::stream_incoming();
let request = codec
.decode(&mut buf)
.expect("There should be no error in first escape test")
.expect("There should be a request in first escape test");
assert_eq!(request, r#"{ test: "\"\\" }"#);
let request2 = codec
.decode(&mut buf)
.expect("There should be no error in 2nd escape test")
.expect("There should be a request in 2nd escape test");
assert_eq!(request2, r#"{ test: "\ " }"#);
let request3 = codec
.decode(&mut buf)
.expect("There should be no error in 3rd escape test")
.expect("There should be a request in 3rd escape test");
assert_eq!(request3, r#"{ test: "\}" }"#);
let request4 = codec
.decode(&mut buf)
.expect("There should be no error in 4th escape test")
.expect("There should be a request in 4th escape test");
assert_eq!(request4, r#"[ test: "\]" ]"#);
}
#[test]
fn whitespace() {
let mut buf = BytesMut::with_capacity(2048);
buf.put_slice(b"{ test: 1 }\n\n\n\n{ test: 2 }\n\r{\n test: 3 } ");
let mut codec = StreamCodec::stream_incoming();
let request = codec
.decode(&mut buf)
.expect("There should be no error in first whitespace test")
.expect("There should be a request in first whitespace test");
assert_eq!(request, "{ test: 1 }");
let request2 = codec
.decode(&mut buf)
.expect("There should be no error in first 2nd test")
.expect("There should be aa request in 2nd whitespace test");
// TODO: maybe actually trim it out
assert_eq!(request2, "\n\n\n\n{ test: 2 }");
let request3 = codec
.decode(&mut buf)
.expect("There should be no error in first 3rd test")
.expect("There should be a request in 3rd whitespace test");
assert_eq!(request3, "\n\r{\n test: 3 }");
let request4 = codec.decode(&mut buf).expect("There should be no error in first 4th test");
assert!(
request4.is_none(),
"There should be no 4th request because it contains only whitespaces"
);
}
#[test]
fn fragmented_encode() {
let mut buf = BytesMut::with_capacity(2048);
buf.put_slice(b"{ test: 1 }{ test: 2 }{ tes");
let mut codec = StreamCodec::stream_incoming();
let request = codec
.decode(&mut buf)
.expect("There should be no error in first fragmented test")
.expect("There should be at least one request in first fragmented test");
assert_eq!(request, "{ test: 1 }");
codec
.decode(&mut buf)
.expect("There should be no error in second fragmented test")
.expect("There should be at least one request in second fragmented test");
assert_eq!(String::from_utf8(buf.as_ref().to_vec()).unwrap(), "{ tes");
buf.put_slice(b"t: 3 }");
let request = codec
.decode(&mut buf)
.expect("There should be no error in third fragmented test")
.expect("There should be at least one request in third fragmented test");
assert_eq!(request, "{ test: 3 }");
}
#[test]
fn huge() {
let request = r#"
{
"jsonrpc":"2.0",
"method":"say_hello",
"params": [
42,
0,
{
"from":"0xb60e8dd61c5d32be8058bb8eb970870f07233155",
"gas":"0x2dc6c0",
"data":"0x606060405260003411156010576002565b6001805433600160a060020a0319918216811790925560028054909116909117905561291f806100406000396000f3606060405236156100e55760e060020a600035046304029f2381146100ed5780630a1273621461015f57806317c1dd87146102335780631f9ea25d14610271578063266fa0e91461029357806349593f5314610429578063569aa0d8146104fc57806359a4669f14610673578063647a4d5f14610759578063656104f5146108095780636e9febfe1461082b57806370de8c6e1461090d57806371bde852146109ed5780638f30435d14610ab4578063916dbc1714610da35780639f5a7cd414610eef578063c91540f614610fe6578063eae99e1c146110b5578063fedc2a281461115a575b61122d610002565b61122d6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050606435600154600090600160a060020a03908116339091161461233357610002565b61122f6004808035906020019082018035906020019191908080601f016020809104026020016040519081016040528093929190818152602001838380828437509496505093359350506044359150506064355b60006000600060005086604051808280519060200190808383829060006004602084601f0104600f02600301f1509050019150509081526020016040518091039020600050905042816005016000508560ff1660028110156100025760040201835060010154604060020a90046001604060020a0316116115df576115d6565b6112416004355b604080516001604060020a038316408152606060020a33600160a060020a031602602082015290519081900360340190205b919050565b61122d600435600254600160a060020a0390811633909116146128e357610002565b61125e6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050505060006000600060006000600060005087604051808280519060200190808383829060006004602084601f0104600f02600301f1509050019150509081526020016040518091039020600050905080600001600050600087600160a060020a0316815260200190815260200160002060005060000160059054906101000a90046001604060020a03169450845080600001600050600087600160a060020a03168152602001908152602001600020600050600001600d9054906101000a90046001604060020a03169350835080600001600050600087600160a060020a0316815260200190815260200160002060005060000160009054906101000a900460ff169250825080600001600050600087600160a060020a0316815260200190815260200160002060005060000160019054906101000a900463ffffffff16915081505092959194509250565b61122d6004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050606435608435600060006000600060005088604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509250346000141515611c0e5760405133600160a060020a0316908290349082818181858883f193505050501515611c1a57610002565b6112996004808035906020019082018035906020019191908080601f01602080910402602001604051908101604052809392919081815260200183838082843750949650509335935050604435915050600060006000600060006000600060006000508a604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509050806001016000508960ff16600281101561000257600160a060020a038a168452828101600101602052604084205463ffffffff1698506002811015610002576040842054606060020a90046001604060020a031697506002811015610002576040842054640100000000900463ffffffff169650600281101561000257604084206001015495506002811015610002576040842054604060020a900463ffffffff169450600281101561000257505060409091205495999498509296509094509260a060020a90046001604060020a0316919050565b61122d6004808035906020019082018035906020019191908080601f016020809104026020016040519081016040528093929190818152602001838380828437509496505050505050506000600060005082604051808280519060200190808383829060006004602084601f0104600f02600301f15090500191505090815260200160405180910390206000509050348160050160005082600d0160009054906101000a900460ff1660ff16600281101561000257600402830160070180546001608060020a0381169093016001608060020a03199390931692909217909155505b5050565b6112e26004808035906020019082018035906020019191908080601f01602080910003423423094734987103498712093847102938740192387401349857109487501938475"
}
]
}"#;
let mut buf = BytesMut::with_capacity(65536);
buf.put_slice(request.as_bytes());
let mut codec = StreamCodec::stream_incoming();
let parsed_request = codec
.decode(&mut buf)
.expect("There should be no error in huge test")
.expect("There should be at least one request huge test");
assert_eq!(request, parsed_request);
}
#[test]
fn simple_line_codec() {
let mut buf = BytesMut::with_capacity(2048);
buf.put_slice(b"{ test: 1 }\n{ test: 2 }\n{ test: 3 }");
let mut codec = StreamCodec::default();
let request = codec
.decode(&mut buf)
.expect("There should be no error in simple test")
.expect("There should be at least one request in simple test");
let request2 = codec
.decode(&mut buf)
.expect("There should be no error in simple test")
.expect("There should be at least one request in simple test");
assert_eq!(request, "{ test: 1 }");
assert_eq!(request2, "{ test: 2 }");
}
}