feat(rpc): add compression to JSON-RPC (#12352)

This commit is contained in:
Ayodeji Akinola
2024-11-13 00:12:45 +01:00
committed by GitHub
parent 8221e5bd54
commit 3c56686000
6 changed files with 195 additions and 12 deletions

10
Cargo.lock generated
View File

@ -8944,12 +8944,14 @@ version = "1.1.1"
dependencies = [
"alloy-rpc-types-engine",
"http",
"http-body-util",
"jsonrpsee",
"jsonrpsee-http-client",
"pin-project",
"reqwest",
"tokio",
"tower 0.4.13",
"tower-http",
"tracing",
]
@ -10904,12 +10906,12 @@ dependencies = [
[[package]]
name = "tower-http"
version = "0.5.2"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5"
checksum = "8437150ab6bbc8c5f0f519e3d5ed4aa883a83dd4cdd3d1b21f9482936046cb97"
dependencies = [
"async-compression",
"base64 0.21.7",
"base64 0.22.1",
"bitflags 2.6.0",
"bytes",
"futures-core",
@ -10926,7 +10928,7 @@ dependencies = [
"pin-project-lite",
"tokio",
"tokio-util",
"tower 0.4.13",
"tower 0.5.1",
"tower-layer",
"tower-service",
"tracing",

View File

@ -552,7 +552,8 @@ hyper-util = "0.1.5"
pin-project = "1.0.12"
reqwest = { version = "0.12", default-features = false }
tower = "0.4"
tower-http = "0.5"
tower-http = "0.6"
# p2p
discv5 = "0.8.0"
@ -567,6 +568,7 @@ jsonrpsee-types = "0.24"
# http
http = "1.0"
http-body = "1.0"
http-body-util = "0.1.2"
jsonwebtoken = "9"
proptest-arbitrary-interop = "0.1.0"

View File

@ -166,6 +166,7 @@ use std::{
time::{Duration, SystemTime, UNIX_EPOCH},
};
use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcRequestMetrics};
use error::{ConflictingModules, RpcError, ServerKind};
use eth::DynEthApiBuilder;
use http::{header::AUTHORIZATION, HeaderMap};
@ -197,15 +198,13 @@ use reth_rpc_eth_api::{
EthApiServer, EthApiTypes, FullEthApiServer, RpcBlock, RpcReceipt, RpcTransaction,
};
use reth_rpc_eth_types::{EthConfig, EthStateCache, EthSubscriptionIdProvider};
use reth_rpc_layer::{AuthLayer, Claims, JwtAuthValidator, JwtSecret};
use reth_rpc_layer::{AuthLayer, Claims, CompressionLayer, JwtAuthValidator, JwtSecret};
use reth_tasks::{pool::BlockingTaskGuard, TaskSpawner, TokioTaskExecutor};
use reth_transaction_pool::{noop::NoopTransactionPool, TransactionPool};
use serde::{Deserialize, Serialize};
use tower::Layer;
use tower_http::cors::CorsLayer;
use crate::{auth::AuthRpcModule, error::WsHttpSamePortError, metrics::RpcRequestMetrics};
pub use cors::CorsDomainError;
// re-export for convenience
@ -1647,6 +1646,12 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
jwt_secret.map(|secret| AuthLayer::new(JwtAuthValidator::new(secret)))
}
/// Returns a [`CompressionLayer`] that adds compression support (gzip, deflate, brotli, zstd)
/// based on the client's `Accept-Encoding` header
fn maybe_compression_layer() -> Option<CompressionLayer> {
Some(CompressionLayer::new())
}
/// Builds and starts the configured server(s): http, ws, ipc.
///
/// If both http and ws are on the same port, they are combined into one server.
@ -1711,7 +1716,8 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(Self::maybe_cors_layer(cors)?)
.option_layer(Self::maybe_jwt_layer(self.jwt_secret)),
.option_layer(Self::maybe_jwt_layer(self.jwt_secret))
.option_layer(Self::maybe_compression_layer()),
)
.set_rpc_middleware(
self.rpc_middleware.clone().layer(
@ -1783,8 +1789,9 @@ impl<RpcMiddleware> RpcServerConfig<RpcMiddleware> {
.http_only()
.set_http_middleware(
tower::ServiceBuilder::new()
.option_layer(Self::maybe_cors_layer(self.http_cors_domains.clone())?)
.option_layer(Self::maybe_jwt_layer(self.jwt_secret)),
.option_layer(Self::maybe_cors_layer(self.ws_cors_domains.clone())?)
.option_layer(Self::maybe_jwt_layer(self.jwt_secret))
.option_layer(Self::maybe_compression_layer()),
)
.set_rpc_middleware(
self.rpc_middleware.clone().layer(

View File

@ -17,10 +17,11 @@ http.workspace = true
jsonrpsee-http-client.workspace = true
pin-project.workspace = true
tower.workspace = true
tower-http = { workspace = true, features = ["full"] }
tracing.workspace = true
[dev-dependencies]
reqwest.workspace = true
tokio = { workspace = true, features = ["macros"] }
jsonrpsee = { workspace = true, features = ["server"] }
http-body-util.workspace=true

View File

@ -0,0 +1,169 @@
use jsonrpsee_http_client::{HttpBody, HttpRequest, HttpResponse};
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tower::{Layer, Service};
use tower_http::compression::{Compression, CompressionLayer as TowerCompressionLayer};
/// This layer is a wrapper around [`tower_http::compression::CompressionLayer`] that integrates
/// with jsonrpsee's HTTP types. It automatically compresses responses based on the client's
/// Accept-Encoding header.
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct CompressionLayer {
inner_layer: TowerCompressionLayer,
}
impl CompressionLayer {
/// Creates a new compression layer with zstd, gzip, brotli and deflate enabled.
pub fn new() -> Self {
Self {
inner_layer: TowerCompressionLayer::new().gzip(true).br(true).deflate(true).zstd(true),
}
}
}
impl Default for CompressionLayer {
/// Creates a new compression layer with default settings.
/// See [`CompressionLayer::new`] for details.
fn default() -> Self {
Self::new()
}
}
impl<S> Layer<S> for CompressionLayer {
type Service = CompressionService<S>;
fn layer(&self, inner: S) -> Self::Service {
CompressionService { compression: self.inner_layer.layer(inner) }
}
}
/// Service that performs response compression.
///
/// Created by [`CompressionLayer`].
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct CompressionService<S> {
compression: Compression<S>,
}
impl<S> Service<HttpRequest> for CompressionService<S>
where
S: Service<HttpRequest, Response = HttpResponse>,
S::Future: Send + 'static,
{
type Response = HttpResponse;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.compression.poll_ready(cx)
}
fn call(&mut self, req: HttpRequest) -> Self::Future {
let fut = self.compression.call(req);
Box::pin(async move {
let resp = fut.await?;
let (parts, compressed_body) = resp.into_parts();
let http_body = HttpBody::new(compressed_body);
Ok(Self::Response::from_parts(parts, http_body))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use http::header::{ACCEPT_ENCODING, CONTENT_ENCODING};
use http_body_util::BodyExt;
use jsonrpsee_http_client::{HttpRequest, HttpResponse};
use std::{convert::Infallible, future::ready};
const TEST_DATA: &str = "compress test data ";
const REPEAT_COUNT: usize = 1000;
#[derive(Clone)]
struct MockRequestService;
impl Service<HttpRequest> for MockRequestService {
type Response = HttpResponse;
type Error = Infallible;
type Future = std::future::Ready<Result<Self::Response, Self::Error>>;
fn poll_ready(
&mut self,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
std::task::Poll::Ready(Ok(()))
}
fn call(&mut self, _: HttpRequest) -> Self::Future {
let body = HttpBody::from(TEST_DATA.repeat(REPEAT_COUNT));
let response = HttpResponse::builder().body(body).unwrap();
ready(Ok(response))
}
}
fn setup_compression_service(
) -> impl Service<HttpRequest, Response = HttpResponse, Error = Infallible> {
CompressionLayer::new().layer(MockRequestService)
}
async fn get_response_size(response: HttpResponse) -> usize {
// Get the total size of the response body
response.into_body().collect().await.unwrap().to_bytes().len()
}
#[tokio::test]
async fn test_gzip_compression() {
let mut service = setup_compression_service();
let request =
HttpRequest::builder().header(ACCEPT_ENCODING, "gzip").body(HttpBody::empty()).unwrap();
let uncompressed_len = TEST_DATA.repeat(REPEAT_COUNT).len();
// Make the request
let response = service.call(request).await.unwrap();
// Verify the response has gzip content-encoding
assert_eq!(
response.headers().get(CONTENT_ENCODING).unwrap(),
"gzip",
"Response should be gzip encoded"
);
// Verify the response body is actually compressed (should be smaller than original)
let compressed_size = get_response_size(response).await;
assert!(
compressed_size < uncompressed_len,
"Compressed size ({compressed_size}) should be smaller than original size ({uncompressed_len})"
);
}
#[tokio::test]
async fn test_no_compression_when_not_requested() {
// Create a service with compression
let mut service = setup_compression_service();
let request = HttpRequest::builder().body(HttpBody::empty()).unwrap();
let response = service.call(request).await.unwrap();
assert!(
response.headers().get(CONTENT_ENCODING).is_none(),
"Response should not be compressed when not requested"
);
let uncompressed_len = TEST_DATA.repeat(REPEAT_COUNT).len();
// Verify the response body matches the original size
let response_size = get_response_size(response).await;
assert!(
response_size == uncompressed_len,
"Response size ({response_size}) should equal original size ({uncompressed_len})"
);
}
}

View File

@ -13,9 +13,11 @@ use jsonrpsee_http_client::HttpResponse;
mod auth_client_layer;
mod auth_layer;
mod compression_layer;
mod jwt_validator;
pub use auth_layer::{AuthService, ResponseFuture};
pub use compression_layer::CompressionLayer;
// Export alloy JWT types
pub use alloy_rpc_types_engine::{Claims, JwtError, JwtSecret};