feat: support max_request_body_size (#7880)

This commit is contained in:
Abner Zheng
2024-04-25 23:51:31 +08:00
committed by GitHub
parent 35ac20b8e4
commit 9567b256c8
2 changed files with 93 additions and 9 deletions

View File

@ -11,7 +11,10 @@ use jsonrpsee::{
JsonRawValue,
},
server::middleware::rpc::RpcServiceT,
types::{error::ErrorCode, ErrorObject, Id, InvalidRequest, Notification, Request},
types::{
error::{reject_too_big_request, ErrorCode},
ErrorObject, Id, InvalidRequest, Notification, Request,
},
BatchResponseBuilder, MethodResponse, ResponsePayload,
};
use tokio::sync::OwnedSemaphorePermit;
@ -124,6 +127,7 @@ pub(crate) async fn call_with_service<S>(
request: String,
rpc_service: S,
max_response_body_size: usize,
max_request_body_size: usize,
conn: Arc<OwnedSemaphorePermit>,
) -> Option<String>
where
@ -143,9 +147,17 @@ where
})
.unwrap_or(Kind::Single);
let data = request.into_bytes();
if data.len() > max_request_body_size {
return Some(batch_response_error(
Id::Null,
reject_too_big_request(max_request_body_size as u32),
));
}
// Single request or notification
let res = if matches!(request_kind, Kind::Single) {
let response = process_single_request(request.into_bytes(), &rpc_service).await;
let response = process_single_request(data, &rpc_service).await;
match response {
Some(response) if response.is_method_call() => Some(response.to_result()),
_ => {
@ -155,11 +167,7 @@ where
}
}
} else {
process_batch_request(
Batch { data: request.into_bytes(), rpc_service },
max_response_body_size,
)
.await
process_batch_request(Batch { data, rpc_service }, max_response_body_size).await
};
drop(conn);

View File

@ -378,6 +378,7 @@ where
};
let max_response_body_size = self.inner.max_response_body_size as usize;
let max_request_body_size = self.inner.max_request_body_size as usize;
let rpc_service = self.rpc_middleware.service(RpcService::new(
self.inner.methods.clone(),
max_response_body_size,
@ -392,7 +393,14 @@ where
// work to a separate task takes the pressure off the connection so all concurrent responses
// are also serialized concurrently and the connection can focus on read+write
let f = tokio::task::spawn(async move {
ipc::call_with_service(request, rpc_service, max_response_body_size, conn).await
ipc::call_with_service(
request,
rpc_service,
max_response_body_size,
max_request_body_size,
conn,
)
.await
});
Box::pin(async move { f.await.map_err(|err| err.into()) })
@ -780,7 +788,11 @@ mod tests {
use crate::client::IpcClientBuilder;
use futures::future::{select, Either};
use jsonrpsee::{
core::client::{ClientT, Subscription, SubscriptionClientT},
core::{
client,
client::{ClientT, Error, Subscription, SubscriptionClientT},
params::BatchRequestBuilder,
},
rpc_params,
types::Request,
PendingSubscriptionSink, RpcModule, SubscriptionMessage,
@ -834,6 +846,46 @@ mod tests {
}
}
#[tokio::test]
async fn can_set_the_max_response_body_size() {
let endpoint = dummy_endpoint();
let server = Builder::default().max_response_body_size(100).build(&endpoint);
let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "a".repeat(101)).unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());
let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
let response: Result<String, Error> = client.request("anything", rpc_params![]).await;
assert!(response.unwrap_err().to_string().contains("Exceeded max limit of"));
}
#[tokio::test]
async fn can_set_the_max_request_body_size() {
let endpoint = dummy_endpoint();
let server = Builder::default().max_request_body_size(100).build(&endpoint);
let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "succeed").unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());
let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
let response: Result<String, Error> =
client.request("anything", rpc_params!["a".repeat(101)]).await;
assert!(response.is_err());
let mut batch_request_builder = BatchRequestBuilder::new();
let _ = batch_request_builder.insert("anything", rpc_params![]);
let _ = batch_request_builder.insert("anything", rpc_params![]);
let _ = batch_request_builder.insert("anything", rpc_params![]);
// the raw request string is:
// [{"jsonrpc":"2.0","id":0,"method":"anything"},{"jsonrpc":"2.0","id":1, \
// "method":"anything"},{"jsonrpc":"2.0","id":2,"method":"anything"}]"
// which is 136 bytes, more than 100 bytes.
let response: Result<client::BatchResponse<'_, String>, Error> =
client.batch_request(batch_request_builder).await;
assert!(response.is_err());
}
#[tokio::test]
async fn test_rpc_request() {
let endpoint = dummy_endpoint();
@ -849,6 +901,30 @@ mod tests {
assert_eq!(response, msg);
}
#[tokio::test]
async fn test_batch_request() {
let endpoint = dummy_endpoint();
let server = Builder::default().build(&endpoint);
let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "ok").unwrap();
let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped());
let client = IpcClientBuilder::default().build(endpoint).await.unwrap();
let mut batch_request_builder = BatchRequestBuilder::new();
let _ = batch_request_builder.insert("anything", rpc_params![]);
let _ = batch_request_builder.insert("anything", rpc_params![]);
let _ = batch_request_builder.insert("anything", rpc_params![]);
let result = client
.batch_request(batch_request_builder)
.await
.unwrap()
.into_ok()
.unwrap()
.collect::<Vec<String>>();
assert_eq!(result, vec!["ok", "ok", "ok"]);
}
#[tokio::test]
async fn test_ipc_modules() {
reth_tracing::init_test_tracing();