feat(rpc): remove ipc future and now using ServerHandle and StopHandle from jsonrpsee (#9044)

This commit is contained in:
Tien Nguyen
2024-06-24 17:00:02 +07:00
committed by GitHub
parent 8a5308672f
commit 31e247086c
4 changed files with 11 additions and 105 deletions

View File

@ -1,60 +0,0 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.
//! Utilities for handling async code.
use std::sync::Arc;
use tokio::sync::watch;
#[derive(Debug, Clone)]
pub(crate) struct StopHandle(watch::Receiver<()>);
impl StopHandle {
pub(crate) const fn new(rx: watch::Receiver<()>) -> Self {
Self(rx)
}
pub(crate) async fn shutdown(mut self) {
// Err(_) implies that the `sender` has been dropped.
// Ok(_) implies that `stop` has been called.
let _ = self.0.changed().await;
}
}
/// Server handle.
///
/// When all [`StopHandle`]'s have been `dropped` or `stop` has been called
/// the server will be stopped.
#[derive(Debug, Clone)]
pub(crate) struct ServerHandle(Arc<watch::Sender<()>>);
impl ServerHandle {
/// Wait for the server to stop.
#[allow(dead_code)]
pub(crate) async fn stopped(self) {
self.0.closed().await
}
}

View File

@ -1,9 +1,6 @@
//! JSON-RPC IPC server implementation
use crate::server::{
connection::{IpcConn, JsonRpcStream},
future::StopHandle,
};
use crate::server::connection::{IpcConn, JsonRpcStream};
use futures::StreamExt;
use futures_util::future::Either;
use interprocess::local_socket::{
@ -15,8 +12,8 @@ use jsonrpsee::{
core::TEN_MB_SIZE_BYTES,
server::{
middleware::rpc::{RpcLoggerLayer, RpcServiceT},
AlreadyStoppedError, ConnectionGuard, ConnectionPermit, IdProvider,
RandomIntegerIdProvider,
stop_channel, ConnectionGuard, ConnectionPermit, IdProvider, RandomIntegerIdProvider,
ServerHandle, StopHandle,
},
BoundedSubscriptions, MethodSink, Methods,
};
@ -29,7 +26,7 @@ use std::{
};
use tokio::{
io::{AsyncRead, AsyncWrite, AsyncWriteExt},
sync::{oneshot, watch},
sync::oneshot,
};
use tower::{layer::util::Identity, Layer, Service};
use tracing::{debug, instrument, trace, warn, Instrument};
@ -46,7 +43,6 @@ use tokio_stream::wrappers::ReceiverStream;
use tower::layer::{util::Stack, LayerFn};
mod connection;
mod future;
mod ipc;
mod rpc_service;
@ -109,9 +105,8 @@ where
methods: impl Into<Methods>,
) -> Result<ServerHandle, IpcServerStartError> {
let methods = methods.into();
let (stop_tx, stop_rx) = watch::channel(());
let stop_handle = StopHandle::new(stop_rx);
let (stop_handle, server_handle) = stop_channel();
// use a signal channel to wait until we're ready to accept connections
let (tx, rx) = oneshot::channel();
@ -122,7 +117,7 @@ where
};
rx.await.expect("channel is open")?;
Ok(ServerHandle::new(stop_tx))
Ok(server_handle)
}
async fn start_inner(
@ -795,35 +790,6 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
}
}
/// Server handle.
///
/// When all [`jsonrpsee::server::StopHandle`]'s have been `dropped` or `stop` has been called
/// the server will be stopped.
#[derive(Debug, Clone)]
pub struct ServerHandle(Arc<watch::Sender<()>>);
impl ServerHandle {
/// Create a new server handle.
pub(crate) fn new(tx: watch::Sender<()>) -> Self {
Self(Arc::new(tx))
}
/// Tell the server to stop without waiting for the server to stop.
pub fn stop(&self) -> Result<(), AlreadyStoppedError> {
self.0.send(()).map_err(|_| AlreadyStoppedError)
}
/// Wait for the server to stop.
pub async fn stopped(self) {
self.0.closed().await
}
/// Check if the server has been stopped.
pub fn is_stopped(&self) -> bool {
self.0.is_closed()
}
}
#[cfg(test)]
pub fn dummy_name() -> String {
let num: u64 = rand::Rng::gen(&mut rand::thread_rng());
@ -877,7 +843,7 @@ mod tests {
// and you might want to do something smarter if it's
// critical that "the most recent item" must be sent when it is produced.
if sink.send(notif).await.is_err() {
break Ok(())
break Ok(());
}
closed = c;

View File

@ -68,7 +68,7 @@ impl AuthServerConfig {
.map_err(|err| RpcError::server_error(err, ServerKind::Auth(socket_addr)))?;
let handle = server.start(module.inner.clone());
let mut ipc_handle: Option<reth_ipc::server::ServerHandle> = None;
let mut ipc_handle: Option<jsonrpsee::server::ServerHandle> = None;
if let Some(ipc_server_config) = ipc_server_config {
let ipc_endpoint_str = ipc_endpoint
@ -241,7 +241,7 @@ pub struct AuthServerHandle {
handle: jsonrpsee::server::ServerHandle,
secret: JwtSecret,
ipc_endpoint: Option<String>,
ipc_handle: Option<reth_ipc::server::ServerHandle>,
ipc_handle: Option<jsonrpsee::server::ServerHandle>,
}
// === impl AuthServerHandle ===
@ -310,7 +310,7 @@ impl AuthServerHandle {
}
/// Returns an ipc handle
pub fn ipc_handle(&self) -> Option<reth_ipc::server::ServerHandle> {
pub fn ipc_handle(&self) -> Option<jsonrpsee::server::ServerHandle> {
self.ipc_handle.clone()
}

View File

@ -1821,7 +1821,7 @@ pub struct RpcServerHandle {
http: Option<ServerHandle>,
ws: Option<ServerHandle>,
ipc_endpoint: Option<String>,
ipc: Option<reth_ipc::server::ServerHandle>,
ipc: Option<jsonrpsee::server::ServerHandle>,
jwt_secret: Option<JwtSecret>,
}