chore(deps): bump http, hyper etc. to 1.0; jsonrpsee 0.23 (#7018)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
DaniPopes
2024-06-11 11:37:49 +02:00
committed by GitHub
parent e9d8cdab49
commit 55317eb004
26 changed files with 554 additions and 608 deletions

797
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -119,7 +119,7 @@ members = [
"examples/rpc-db/", "examples/rpc-db/",
"examples/txpool-tracing/", "examples/txpool-tracing/",
"testing/ef-tests/", "testing/ef-tests/",
"testing/testing-utils" "testing/testing-utils",
] ]
default-members = ["bin/reth"] default-members = ["bin/reth"]
@ -330,9 +330,15 @@ reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-types = { path = "crates/trie/types" } reth-trie-types = { path = "crates/trie/types" }
# revm # revm
revm = { version = "9.0.0", features = [ "std", "secp256k1", "blst", ], default-features = false } revm = { version = "9.0.0", features = [
revm-primitives = { version = "4.0.0", features = [ "std", ], default-features = false } "std",
revm-inspectors = { git = "https://github.com/paradigmxyz/evm-inspectors", rev = "53aa2b2" } "secp256k1",
"blst",
], default-features = false }
revm-primitives = { version = "4.0.0", features = [
"std",
], default-features = false }
revm-inspectors = { git = "https://github.com/paradigmxyz/revm-inspectors", rev = "5e3058a" }
# eth # eth
alloy-chains = "0.1.15" alloy-chains = "0.1.15"
@ -341,21 +347,21 @@ alloy-dyn-abi = "0.7.2"
alloy-sol-types = "0.7.2" alloy-sol-types = "0.7.2"
alloy-rlp = "0.3.4" alloy-rlp = "0.3.4"
alloy-trie = "0.4" alloy-trie = "0.4"
alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" } alloy-rpc-types = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-types-anvil = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" } alloy-rpc-types-anvil = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" } alloy-rpc-types-trace = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" } alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-rpc-types-beacon = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" } alloy-rpc-types-beacon = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" } alloy-genesis = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" } alloy-node-bindings = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93", default-features = false, features = [ alloy-provider = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d", default-features = false, features = [
"reqwest", "reqwest",
] } ] }
alloy-eips = { git = "https://github.com/alloy-rs/alloy", default-features = false, rev = "cc68b93" } alloy-eips = { git = "https://github.com/alloy-rs/alloy", default-features = false, rev = "14ed25d" }
alloy-signer = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" } alloy-signer = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-signer-wallet = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" } alloy-signer-wallet = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-network = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" } alloy-network = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "cc68b93" } alloy-consensus = { git = "https://github.com/alloy-rs/alloy", rev = "14ed25d" }
# misc # misc
auto_impl = "1" auto_impl = "1"
@ -415,21 +421,25 @@ async-trait = "0.1.68"
futures = "0.3.26" futures = "0.3.26"
pin-project = "1.0.12" pin-project = "1.0.12"
futures-util = "0.3.25" futures-util = "0.3.25"
hyper = "0.14.25" hyper = "1.3"
hyper-util = "0.1.5"
reqwest = { version = "0.12", default-features = false } reqwest = { version = "0.12", default-features = false }
tower = "0.4" tower = "0.4"
tower-http = "0.4" tower-http = "0.5"
http = "0.2.8"
http-body = "0.4.5"
# p2p # p2p
discv5 = "0.6.0" discv5 = "0.6.0"
igd-next = "0.14.3" igd-next = "0.14.3"
# rpc # rpc
jsonrpsee = "0.22" jsonrpsee = "0.23"
jsonrpsee-core = "0.22" jsonrpsee-core = "0.23"
jsonrpsee-types = "0.22" jsonrpsee-types = "0.23"
jsonrpsee-http-client = "0.23"
# http
http = "1.0"
http-body = "1.0"
# crypto # crypto
secp256k1 = { version = "0.28", default-features = false, features = [ secp256k1 = { version = "0.28", default-features = false, features = [

View File

@ -3,6 +3,7 @@ use alloy_eips::BlockNumberOrTag;
use alloy_provider::{Provider, ProviderBuilder}; use alloy_provider::{Provider, ProviderBuilder};
use futures::StreamExt; use futures::StreamExt;
use reth_node_core::rpc::types::RichBlock; use reth_node_core::rpc::types::RichBlock;
use reth_rpc_types::BlockTransactionsKind;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
/// Block provider that fetches new blocks from an RPC endpoint using a websocket connection. /// Block provider that fetches new blocks from an RPC endpoint using a websocket connection.
@ -32,7 +33,7 @@ impl BlockProvider for RpcBlockProvider {
while let Some(block) = stream.next().await { while let Some(block) = stream.next().await {
let full_block = ws_provider let full_block = ws_provider
.get_block_by_hash(block.header.hash.unwrap(), true) .get_block_by_hash(block.header.hash.unwrap(), BlockTransactionsKind::Full)
.await .await
.expect("failed to get block") .expect("failed to get block")
.expect("block not found"); .expect("block not found");

View File

@ -69,7 +69,9 @@ serde.workspace = true
serde_json.workspace = true serde_json.workspace = true
# http/rpc # http/rpc
hyper.workspace = true http.workspace = true
jsonrpsee.workspace = true
tower.workspace = true
# tracing # tracing
tracing.workspace = true tracing.workspace = true

View File

@ -2,10 +2,7 @@
use crate::metrics::version_metrics::register_version_metrics; use crate::metrics::version_metrics::register_version_metrics;
use eyre::WrapErr; use eyre::WrapErr;
use hyper::{ use http::Response;
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
};
use metrics::describe_gauge; use metrics::describe_gauge;
use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};
use metrics_util::layers::{PrefixLayer, Stack}; use metrics_util::layers::{PrefixLayer, Stack};
@ -64,29 +61,36 @@ async fn start_endpoint<F: Hook + 'static>(
hook: Arc<F>, hook: Arc<F>,
task_executor: TaskExecutor, task_executor: TaskExecutor,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let make_svc = make_service_fn(move |_| { let listener =
let handle = handle.clone(); tokio::net::TcpListener::bind(listen_addr).await.wrap_err("Could not bind to address")?;
let hook = Arc::clone(&hook);
async move { task_executor.spawn_with_graceful_shutdown_signal(|signal| async move {
Ok::<_, Infallible>(service_fn(move |_: Request<Body>| { let mut shutdown = signal.ignore_guard();
loop {
let io = tokio::select! {
res = listener.accept() => match res {
Ok((stream, _remote_addr)) => stream,
Err(err) => {
tracing::error!(%err, "failed to accept connection");
continue;
}
},
_ = &mut shutdown => break,
};
let handle = handle.clone();
let hook = hook.clone();
let service = tower::service_fn(move |_| {
(hook)(); (hook)();
let metrics = handle.render(); let metrics = handle.render();
async move { Ok::<_, Infallible>(Response::new(Body::from(metrics))) } async move { Ok::<_, Infallible>(Response::new(metrics)) }
})) });
}
});
let server = if let Err(error) =
Server::try_bind(&listen_addr).wrap_err("Could not bind to address")?.serve(make_svc); jsonrpsee::server::serve_with_graceful_shutdown(io, service, &mut shutdown).await
{
task_executor.spawn_with_graceful_shutdown_signal(move |signal| async move { tracing::error!(%error, "metrics endpoint crashed")
if let Err(error) = server }
.with_graceful_shutdown(async move {
let _ = signal.await;
})
.await
{
tracing::error!(%error, "metrics endpoint crashed")
} }
}); });

View File

@ -33,10 +33,7 @@ revm-primitives.workspace = true
# async # async
async-trait.workspace = true async-trait.workspace = true
hyper.workspace = true reqwest = { workspace = true, features = ["rustls-tls-native-roots"] }
reqwest = { workspace = true, default-features = false, features = [
"rustls-tls-native-roots",
] }
tracing.workspace = true tracing.workspace = true
# misc # misc

View File

@ -12,9 +12,6 @@ use std::sync::{atomic::AtomicUsize, Arc};
/// Error type when interacting with the Sequencer /// Error type when interacting with the Sequencer
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum SequencerRpcError { pub enum SequencerRpcError {
/// Wrapper around a [`hyper::Error`].
#[error(transparent)]
HyperError(#[from] hyper::Error),
/// Wrapper around an [`reqwest::Error`]. /// Wrapper around an [`reqwest::Error`].
#[error(transparent)] #[error(transparent)]
HttpError(#[from] reqwest::Error), HttpError(#[from] reqwest::Error),

View File

@ -91,7 +91,7 @@ where
/// async fn run_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { /// async fn run_server() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
/// let server = Builder::default().build("/tmp/my-uds"); /// let server = Builder::default().build("/tmp/my-uds");
/// let mut module = RpcModule::new(()); /// let mut module = RpcModule::new(());
/// module.register_method("say_hello", |_, _| "lo")?; /// module.register_method("say_hello", |_, _, _| "lo")?;
/// let handle = server.start(module).await?; /// let handle = server.start(module).await?;
/// ///
/// // In this example we don't care about doing shutdown so let's it run forever. /// // In this example we don't care about doing shutdown so let's it run forever.
@ -390,7 +390,7 @@ where
let rpc_service = self.rpc_middleware.service(RpcService::new( let rpc_service = self.rpc_middleware.service(RpcService::new(
self.inner.methods.clone(), self.inner.methods.clone(),
max_response_body_size, max_response_body_size,
self.inner.conn_id as usize, self.inner.conn_id.into(),
cfg, cfg,
)); ));
// an ipc connection needs to handle read+write concurrently // an ipc connection needs to handle read+write concurrently
@ -896,7 +896,7 @@ mod tests {
let endpoint = dummy_endpoint(); let endpoint = dummy_endpoint();
let server = Builder::default().max_response_body_size(100).build(&endpoint); let server = Builder::default().max_response_body_size(100).build(&endpoint);
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "a".repeat(101)).unwrap(); module.register_method("anything", |_, _, _| "a".repeat(101)).unwrap();
let handle = server.start(module).await.unwrap(); let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped()); tokio::spawn(handle.stopped());
@ -911,7 +911,7 @@ mod tests {
let endpoint = dummy_endpoint(); let endpoint = dummy_endpoint();
let server = Builder::default().max_request_body_size(100).build(&endpoint); let server = Builder::default().max_request_body_size(100).build(&endpoint);
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "succeed").unwrap(); module.register_method("anything", |_, _, _| "succeed").unwrap();
let handle = server.start(module).await.unwrap(); let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped()); tokio::spawn(handle.stopped());
@ -939,7 +939,7 @@ mod tests {
let endpoint = dummy_endpoint(); let endpoint = dummy_endpoint();
let server = Builder::default().max_connections(2).build(&endpoint); let server = Builder::default().max_connections(2).build(&endpoint);
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "succeed").unwrap(); module.register_method("anything", |_, _, _| "succeed").unwrap();
let handle = server.start(module).await.unwrap(); let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped()); tokio::spawn(handle.stopped());
@ -973,7 +973,7 @@ mod tests {
let server = Builder::default().build(&endpoint); let server = Builder::default().build(&endpoint);
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#; let msg = r#"{"jsonrpc":"2.0","id":83,"result":"0x7a69"}"#;
module.register_method("eth_chainId", move |_, _| msg).unwrap(); module.register_method("eth_chainId", move |_, _, _| msg).unwrap();
let handle = server.start(module).await.unwrap(); let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped()); tokio::spawn(handle.stopped());
@ -987,7 +987,7 @@ mod tests {
let endpoint = dummy_endpoint(); let endpoint = dummy_endpoint();
let server = Builder::default().build(&endpoint); let server = Builder::default().build(&endpoint);
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
module.register_method("anything", |_, _| "ok").unwrap(); module.register_method("anything", |_, _, _| "ok").unwrap();
let handle = server.start(module).await.unwrap(); let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped()); tokio::spawn(handle.stopped());
@ -1013,7 +1013,7 @@ mod tests {
let server = Builder::default().build(&endpoint); let server = Builder::default().build(&endpoint);
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#; let msg = r#"{"admin":"1.0","debug":"1.0","engine":"1.0","eth":"1.0","ethash":"1.0","miner":"1.0","net":"1.0","rpc":"1.0","txpool":"1.0","web3":"1.0"}"#;
module.register_method("rpc_modules", move |_, _| msg).unwrap(); module.register_method("rpc_modules", move |_, _, _| msg).unwrap();
let handle = server.start(module).await.unwrap(); let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped()); tokio::spawn(handle.stopped());
@ -1036,7 +1036,7 @@ mod tests {
"subscribe_hello", "subscribe_hello",
"s_hello", "s_hello",
"unsubscribe_hello", "unsubscribe_hello",
|_, pending, tx| async move { |_, pending, tx, _| async move {
let rx = tx.subscribe(); let rx = tx.subscribe();
let stream = BroadcastStream::new(rx); let stream = BroadcastStream::new(rx);
pipe_from_stream_with_bounded_buffer(pending, stream).await?; pipe_from_stream_with_bounded_buffer(pending, stream).await?;
@ -1088,8 +1088,8 @@ mod tests {
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#; let goodbye_msg = r#"{"jsonrpc":"2.0","id":1,"result":"goodbye"}"#;
let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#; let hello_msg = r#"{"jsonrpc":"2.0","id":2,"result":"hello"}"#;
module.register_method("say_hello", move |_, _| hello_msg).unwrap(); module.register_method("say_hello", move |_, _, _| hello_msg).unwrap();
module.register_method("say_goodbye", move |_, _| goodbye_msg).unwrap(); module.register_method("say_goodbye", move |_, _, _| goodbye_msg).unwrap();
let handle = server.start(module).await.unwrap(); let handle = server.start(module).await.unwrap();
tokio::spawn(handle.stopped()); tokio::spawn(handle.stopped());

View File

@ -6,15 +6,15 @@ use jsonrpsee::{
IdProvider, IdProvider,
}, },
types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Request}, types::{error::reject_too_many_subscriptions, ErrorCode, ErrorObject, Request},
BoundedSubscriptions, ConnectionDetails, MethodCallback, MethodResponse, MethodSink, Methods, BoundedSubscriptions, ConnectionId, Extensions, MethodCallback, MethodResponse, MethodSink,
SubscriptionState, Methods, SubscriptionState,
}; };
use std::sync::Arc; use std::sync::Arc;
/// JSON-RPC service middleware. /// JSON-RPC service middleware.
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RpcService { pub struct RpcService {
conn_id: usize, conn_id: ConnectionId,
methods: Methods, methods: Methods,
max_response_body_size: usize, max_response_body_size: usize,
cfg: RpcServiceCfg, cfg: RpcServiceCfg,
@ -39,7 +39,7 @@ impl RpcService {
pub(crate) const fn new( pub(crate) const fn new(
methods: Methods, methods: Methods,
max_response_body_size: usize, max_response_body_size: usize,
conn_id: usize, conn_id: ConnectionId,
cfg: RpcServiceCfg, cfg: RpcServiceCfg,
) -> Self { ) -> Self {
Self { methods, max_response_body_size, conn_id, cfg } Self { methods, max_response_body_size, conn_id, cfg }
@ -58,6 +58,7 @@ impl<'a> RpcServiceT<'a> for RpcService {
let params = req.params(); let params = req.params();
let name = req.method_name(); let name = req.method_name();
let id = req.id().clone(); let id = req.id().clone();
let extensions = Extensions::new();
match self.methods.method_with_name(name) { match self.methods.method_with_name(name) {
None => { None => {
@ -65,31 +66,17 @@ impl<'a> RpcServiceT<'a> for RpcService {
ResponseFuture::ready(rp) ResponseFuture::ready(rp)
} }
Some((_name, method)) => match method { Some((_name, method)) => match method {
MethodCallback::Sync(callback) => {
let rp = (callback)(id, params, max_response_body_size, extensions);
ResponseFuture::ready(rp)
}
MethodCallback::Async(callback) => { MethodCallback::Async(callback) => {
let params = params.into_owned(); let params = params.into_owned();
let id = id.into_owned(); let id = id.into_owned();
let fut = (callback)(id, params, conn_id, max_response_body_size); let fut = (callback)(id, params, conn_id, max_response_body_size, extensions);
ResponseFuture::future(fut) ResponseFuture::future(fut)
} }
MethodCallback::AsyncWithDetails(callback) => {
let params = params.into_owned();
let id = id.into_owned();
// Note: Add the `Request::extensions` to the connection details when available
// here.
let fut = (callback)(
id,
params,
ConnectionDetails::_new(conn_id),
max_response_body_size,
);
ResponseFuture::future(fut)
}
MethodCallback::Sync(callback) => {
let rp = (callback)(id, params, max_response_body_size);
ResponseFuture::ready(rp)
}
MethodCallback::Subscription(callback) => { MethodCallback::Subscription(callback) => {
let RpcServiceCfg::CallsAndSubscriptions { let RpcServiceCfg::CallsAndSubscriptions {
bounded_subscriptions, bounded_subscriptions,
@ -110,7 +97,7 @@ impl<'a> RpcServiceT<'a> for RpcService {
subscription_permit: p, subscription_permit: p,
}; };
let fut = callback(id.clone(), params, sink, conn_state); let fut = callback(id.clone(), params, sink, conn_state, extensions);
ResponseFuture::future(fut) ResponseFuture::future(fut)
} else { } else {
let max = bounded_subscriptions.max(); let max = bounded_subscriptions.max();
@ -129,7 +116,7 @@ impl<'a> RpcServiceT<'a> for RpcService {
return ResponseFuture::ready(rp); return ResponseFuture::ready(rp);
}; };
let rp = callback(id, params, conn_id, max_response_body_size); let rp = callback(id, params, conn_id, max_response_body_size, extensions);
ResponseFuture::ready(rp) ResponseFuture::ready(rp)
} }
}, },

View File

@ -30,7 +30,7 @@ reth-engine-primitives.workspace = true
jsonrpsee = { workspace = true, features = ["server"] } jsonrpsee = { workspace = true, features = ["server"] }
tower-http = { workspace = true, features = ["full"] } tower-http = { workspace = true, features = ["full"] }
tower = { workspace = true, features = ["full"] } tower = { workspace = true, features = ["full"] }
hyper.workspace = true http.workspace = true
pin-project.workspace = true pin-project.workspace = true
# metrics # metrics

View File

@ -1,6 +1,5 @@
use crate::error::{RpcError, ServerKind}; use crate::error::{RpcError, ServerKind};
use hyper::header::AUTHORIZATION; use http::header::AUTHORIZATION;
pub use jsonrpsee::server::ServerBuilder;
use jsonrpsee::{ use jsonrpsee::{
core::RegisterMethodError, core::RegisterMethodError,
http_client::{transport::HttpBackend, HeaderMap}, http_client::{transport::HttpBackend, HeaderMap},
@ -8,7 +7,6 @@ use jsonrpsee::{
Methods, Methods,
}; };
use reth_engine_primitives::EngineTypes; use reth_engine_primitives::EngineTypes;
pub use reth_ipc::server::Builder as IpcServerBuilder;
use reth_rpc::EthSubscriptionIdProvider; use reth_rpc::EthSubscriptionIdProvider;
use reth_rpc_api::servers::*; use reth_rpc_api::servers::*;
use reth_rpc_layer::{ use reth_rpc_layer::{
@ -19,6 +17,9 @@ use reth_rpc_server_types::constants;
use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use tower::layer::util::Identity; use tower::layer::util::Identity;
pub use jsonrpsee::server::ServerBuilder;
pub use reth_ipc::server::Builder as IpcServerBuilder;
/// Server configuration for the auth server. /// Server configuration for the auth server.
#[derive(Debug)] #[derive(Debug)]
pub struct AuthServerConfig { pub struct AuthServerConfig {

View File

@ -1,4 +1,4 @@
use hyper::{http::HeaderValue, Method}; use http::{HeaderValue, Method};
use tower_http::cors::{AllowOrigin, Any, CorsLayer}; use tower_http::cors::{AllowOrigin, Any, CorsLayer};
/// Error thrown when parsing cors domains went wrong /// Error thrown when parsing cors domains went wrong

View File

@ -160,8 +160,7 @@ use crate::{
metrics::RpcRequestMetrics, metrics::RpcRequestMetrics,
}; };
use error::{ConflictingModules, RpcError, ServerKind}; use error::{ConflictingModules, RpcError, ServerKind};
use hyper::{header::AUTHORIZATION, HeaderMap}; use http::{header::AUTHORIZATION, HeaderMap};
pub use jsonrpsee::server::ServerBuilder;
use jsonrpsee::{ use jsonrpsee::{
core::RegisterMethodError, core::RegisterMethodError,
server::{AlreadyStoppedError, IdProvider, RpcServiceBuilder, Server, ServerHandle}, server::{AlreadyStoppedError, IdProvider, RpcServiceBuilder, Server, ServerHandle},
@ -170,9 +169,6 @@ use jsonrpsee::{
use reth_engine_primitives::EngineTypes; use reth_engine_primitives::EngineTypes;
use reth_evm::ConfigureEvm; use reth_evm::ConfigureEvm;
use reth_ipc::server::IpcServer; use reth_ipc::server::IpcServer;
pub use reth_ipc::server::{
Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder,
};
use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers}; use reth_network_api::{noop::NoopNetwork, NetworkInfo, Peers};
use reth_provider::{ use reth_provider::{
AccountReader, BlockReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider, AccountReader, BlockReader, BlockReaderIdExt, CanonStateSubscriptions, ChainSpecProvider,
@ -204,14 +200,16 @@ use std::{
sync::Arc, sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
}; };
pub use tower::layer::util::{Identity, Stack};
use tower_http::cors::CorsLayer; use tower_http::cors::CorsLayer;
use tracing::{instrument, trace}; use tracing::{instrument, trace};
pub use crate::eth::{EthConfig, EthHandlers};
// re-export for convenience // re-export for convenience
pub use jsonrpsee::server::ServerBuilder;
pub use reth_ipc::server::{
Builder as IpcServerBuilder, RpcServiceBuilder as IpcRpcServiceBuilder,
};
pub use reth_rpc_server_types::{constants, RethRpcModule, RpcModuleSelection}; pub use reth_rpc_server_types::{constants, RethRpcModule, RpcModuleSelection};
pub use tower::layer::util::{Identity, Stack};
/// Auth server utilities. /// Auth server utilities.
pub mod auth; pub mod auth;
@ -227,6 +225,7 @@ pub mod error;
/// Eth utils /// Eth utils
mod eth; mod eth;
pub use eth::{EthConfig, EthHandlers};
// Rpc server metrics // Rpc server metrics
mod metrics; mod metrics;
@ -1959,6 +1958,7 @@ impl RpcServerHandle {
client.expect("failed to create http client").into() client.expect("failed to create http client").into()
} }
/// Returns a ws client connected to the server. /// Returns a ws client connected to the server.
pub async fn ws_client(&self) -> Option<jsonrpsee::ws_client::WsClient> { pub async fn ws_client(&self) -> Option<jsonrpsee::ws_client::WsClient> {
let url = self.ws_url()?; let url = self.ws_url()?;

View File

@ -34,11 +34,11 @@ metrics.workspace = true
# misc # misc
async-trait.workspace = true async-trait.workspace = true
thiserror.workspace = true
jsonrpsee-types.workspace = true
jsonrpsee-core.workspace = true jsonrpsee-core.workspace = true
tracing.workspace = true jsonrpsee-types.workspace = true
serde.workspace = true serde.workspace = true
thiserror.workspace = true
tracing.workspace = true
[dev-dependencies] [dev-dependencies]
reth-ethereum-engine-primitives.workspace = true reth-ethereum-engine-primitives.workspace = true

View File

@ -804,6 +804,7 @@ where
self.inner.metrics.latency.exchange_transition_configuration.record(start.elapsed()); self.inner.metrics.latency.exchange_transition_configuration.record(start.elapsed());
Ok(res?) Ok(res?)
} }
/// Handler for `engine_getClientVersionV1` /// Handler for `engine_getClientVersionV1`
/// ///
/// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/identification.md> /// See also <https://github.com/ethereum/execution-apis/blob/03911ffc053b8b806123f1fc237184b0092a485a/src/engine/identification.md>

View File

@ -14,15 +14,14 @@ workspace = true
alloy-rpc-types-engine.workspace = true alloy-rpc-types-engine.workspace = true
http.workspace = true http.workspace = true
hyper.workspace = true jsonrpsee-http-client.workspace = true
tower.workspace = true
http-body.workspace = true
pin-project.workspace = true pin-project.workspace = true
tower.workspace = true
tracing.workspace = true tracing.workspace = true
[dev-dependencies] [dev-dependencies]
hyper = { workspace = true, features = ["client", "tcp"] } reqwest.workspace = true
assert_matches.workspace = true assert_matches.workspace = true
tokio = { workspace = true, features = ["macros"] } tokio = { workspace = true, features = ["macros"] }
tempfile.workspace = true tempfile.workspace = true

View File

@ -1,11 +1,10 @@
use crate::{Claims, JwtSecret}; use crate::{Claims, JwtSecret};
use http::HeaderValue; use http::{header::AUTHORIZATION, HeaderValue};
use hyper::{header::AUTHORIZATION, service::Service};
use std::{ use std::{
task::{Context, Poll}, task::{Context, Poll},
time::{Duration, SystemTime, UNIX_EPOCH}, time::{Duration, SystemTime, UNIX_EPOCH},
}; };
use tower::Layer; use tower::{Layer, Service};
/// A layer that adds a new JWT token to every request using `AuthClientService`. /// A layer that adds a new JWT token to every request using `AuthClientService`.
#[derive(Debug)] #[derive(Debug)]
@ -41,9 +40,9 @@ impl<S> AuthClientService<S> {
} }
} }
impl<S, B> Service<hyper::Request<B>> for AuthClientService<S> impl<S, B> Service<http::Request<B>> for AuthClientService<S>
where where
S: Service<hyper::Request<B>>, S: Service<http::Request<B>>,
{ {
type Response = S::Response; type Response = S::Response;
type Error = S::Error; type Error = S::Error;
@ -53,7 +52,7 @@ where
self.inner.poll_ready(cx) self.inner.poll_ready(cx)
} }
fn call(&mut self, mut request: hyper::Request<B>) -> Self::Future { fn call(&mut self, mut request: http::Request<B>) -> Self::Future {
request.headers_mut().insert(AUTHORIZATION, secret_to_bearer_header(&self.secret)); request.headers_mut().insert(AUTHORIZATION, secret_to_bearer_header(&self.secret));
self.inner.call(request) self.inner.call(request)
} }

View File

@ -1,6 +1,5 @@
use super::AuthValidator; use super::AuthValidator;
use http::{Request, Response}; use jsonrpsee_http_client::{HttpRequest, HttpResponse};
use http_body::Body;
use pin_project::pin_project; use pin_project::pin_project;
use std::{ use std::{
future::Future, future::Future,
@ -65,7 +64,7 @@ where
/// This type is the actual implementation of the middleware. It follows the [`Service`] /// This type is the actual implementation of the middleware. It follows the [`Service`]
/// specification to correctly proxy Http requests to its inner service after headers validation. /// specification to correctly proxy Http requests to its inner service after headers validation.
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct AuthService<S, V> { pub struct AuthService<S, V> {
/// Performs auth validation logics /// Performs auth validation logics
validator: V, validator: V,
@ -73,16 +72,15 @@ pub struct AuthService<S, V> {
inner: S, inner: S,
} }
impl<ReqBody, ResBody, S, V> Service<Request<ReqBody>> for AuthService<S, V> impl<S, V> Service<HttpRequest> for AuthService<S, V>
where where
S: Service<Request<ReqBody>, Response = Response<ResBody>>, S: Service<HttpRequest, Response = HttpResponse>,
V: AuthValidator<ResponseBody = ResBody>, V: AuthValidator,
ReqBody: Body, Self: Clone,
ResBody: Body,
{ {
type Response = Response<ResBody>; type Response = HttpResponse;
type Error = S::Error; type Error = S::Error;
type Future = ResponseFuture<S::Future, ResBody>; type Future = ResponseFuture<S::Future>;
/// If we get polled it means that we dispatched an authorized Http request to the inner layer. /// If we get polled it means that we dispatched an authorized Http request to the inner layer.
/// So we just poll the inner layer ourselves. /// So we just poll the inner layer ourselves.
@ -96,7 +94,7 @@ where
/// Returns a future that wraps either: /// Returns a future that wraps either:
/// - The inner service future for authorized requests /// - The inner service future for authorized requests
/// - An error Http response in case of authorization errors /// - An error Http response in case of authorization errors
fn call(&mut self, req: Request<ReqBody>) -> Self::Future { fn call(&mut self, req: HttpRequest) -> Self::Future {
match self.validator.validate(req.headers()) { match self.validator.validate(req.headers()) {
Ok(_) => ResponseFuture::future(self.inner.call(req)), Ok(_) => ResponseFuture::future(self.inner.call(req)),
Err(res) => ResponseFuture::invalid_auth(res), Err(res) => ResponseFuture::invalid_auth(res),
@ -106,39 +104,35 @@ where
#[pin_project] #[pin_project]
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub struct ResponseFuture<F, B> { pub struct ResponseFuture<F> {
#[pin] #[pin]
kind: Kind<F, B>, kind: Kind<F>,
} }
impl<F, B> ResponseFuture<F, B> impl<F> ResponseFuture<F> {
where
B: Body,
{
const fn future(future: F) -> Self { const fn future(future: F) -> Self {
Self { kind: Kind::Future { future } } Self { kind: Kind::Future { future } }
} }
const fn invalid_auth(err_res: Response<B>) -> Self { const fn invalid_auth(err_res: HttpResponse) -> Self {
Self { kind: Kind::Error { response: Some(err_res) } } Self { kind: Kind::Error { response: Some(err_res) } }
} }
} }
#[pin_project(project = KindProj)] #[pin_project(project = KindProj)]
enum Kind<F, B> { enum Kind<F> {
Future { Future {
#[pin] #[pin]
future: F, future: F,
}, },
Error { Error {
response: Option<Response<B>>, response: Option<HttpResponse>,
}, },
} }
impl<F, B, E> Future for ResponseFuture<F, B> impl<F, E> Future for ResponseFuture<F>
where where
F: Future<Output = Result<Response<B>, E>>, F: Future<Output = Result<HttpResponse, E>>,
B: Body,
{ {
type Output = F::Output; type Output = F::Output;
@ -158,12 +152,11 @@ mod tests {
use super::*; use super::*;
use crate::JwtAuthValidator; use crate::JwtAuthValidator;
use alloy_rpc_types_engine::{Claims, JwtError, JwtSecret}; use alloy_rpc_types_engine::{Claims, JwtError, JwtSecret};
use http::{header, Method, Request, StatusCode};
use hyper::{body, Body};
use jsonrpsee::{ use jsonrpsee::{
server::{RandomStringIdProvider, ServerBuilder, ServerHandle}, server::{RandomStringIdProvider, ServerBuilder, ServerHandle},
RpcModule, RpcModule,
}; };
use reqwest::{header, StatusCode};
use std::{ use std::{
net::SocketAddr, net::SocketAddr,
time::{SystemTime, UNIX_EPOCH}, time::{SystemTime, UNIX_EPOCH},
@ -234,25 +227,20 @@ mod tests {
async fn send_request(jwt: Option<String>) -> (StatusCode, String) { async fn send_request(jwt: Option<String>) -> (StatusCode, String) {
let server = spawn_server().await; let server = spawn_server().await;
let client = hyper::Client::new(); let client =
reqwest::Client::builder().timeout(std::time::Duration::from_secs(1)).build().unwrap();
let jwt = jwt.unwrap_or_default();
let address = format!("http://{AUTH_ADDR}:{AUTH_PORT}");
let bearer = format!("Bearer {jwt}");
let body = r#"{"jsonrpc": "2.0", "method": "greet_melkor", "params": [], "id": 1}"#; let body = r#"{"jsonrpc": "2.0", "method": "greet_melkor", "params": [], "id": 1}"#;
let response = client
let req = Request::builder() .post(&format!("http://{AUTH_ADDR}:{AUTH_PORT}"))
.method(Method::POST) .bearer_auth(jwt.unwrap_or_default())
.header(header::AUTHORIZATION, bearer) .body(body)
.header(header::CONTENT_TYPE, "application/json") .header(header::CONTENT_TYPE, "application/json")
.uri(address) .send()
.body(Body::from(body)) .await
.unwrap(); .unwrap();
let status = response.status();
let res = client.request(req).await.unwrap(); let body = response.text().await.unwrap();
let status = res.status();
let body_bytes = body::to_bytes(res.into_body()).await.unwrap();
let body = String::from_utf8(body_bytes.to_vec()).expect("response was not valid utf-8");
server.stop().unwrap(); server.stop().unwrap();
server.stopped().await; server.stopped().await;
@ -278,7 +266,7 @@ mod tests {
// Create a mock rpc module // Create a mock rpc module
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
module.register_method("greet_melkor", |_, _| "You are the dark lord").unwrap(); module.register_method("greet_melkor", |_, _, _| "You are the dark lord").unwrap();
server.start(module) server.start(module)
} }

View File

@ -1,7 +1,7 @@
use http::{header, HeaderMap, Response, StatusCode};
use tracing::error;
use crate::{AuthValidator, JwtError, JwtSecret}; use crate::{AuthValidator, JwtError, JwtSecret};
use http::{header, HeaderMap, Response, StatusCode};
use jsonrpsee_http_client::{HttpBody, HttpResponse};
use tracing::error;
/// Implements JWT validation logics and integrates /// Implements JWT validation logics and integrates
/// to an Http [`AuthLayer`][crate::AuthLayer] /// to an Http [`AuthLayer`][crate::AuthLayer]
@ -22,9 +22,7 @@ impl JwtAuthValidator {
} }
impl AuthValidator for JwtAuthValidator { impl AuthValidator for JwtAuthValidator {
type ResponseBody = hyper::Body; fn validate(&self, headers: &HeaderMap) -> Result<(), HttpResponse> {
fn validate(&self, headers: &HeaderMap) -> Result<(), Response<Self::ResponseBody>> {
match get_bearer(headers) { match get_bearer(headers) {
Some(jwt) => match self.secret.validate(&jwt) { Some(jwt) => match self.secret.validate(&jwt) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
@ -55,14 +53,13 @@ fn get_bearer(headers: &HeaderMap) -> Option<String> {
Some(token.into()) Some(token.into())
} }
fn err_response(err: JwtError) -> Response<hyper::Body> { fn err_response(err: JwtError) -> HttpResponse {
let body = hyper::Body::from(err.to_string());
// We build a response from an error message. // We build a response from an error message.
// We don't cope with headers or other structured fields. // We don't cope with headers or other structured fields.
// Then we are safe to "expect" on the result. // Then we are safe to "expect" on the result.
Response::builder() Response::builder()
.status(StatusCode::UNAUTHORIZED) .status(StatusCode::UNAUTHORIZED)
.body(body) .body(HttpBody::new(err.to_string()))
.expect("This should never happen") .expect("This should never happen")
} }

View File

@ -8,7 +8,8 @@
#![cfg_attr(not(test), warn(unused_crate_dependencies))] #![cfg_attr(not(test), warn(unused_crate_dependencies))]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
use http::{HeaderMap, Response}; use http::HeaderMap;
use jsonrpsee_http_client::HttpResponse;
mod auth_client_layer; mod auth_client_layer;
mod auth_layer; mod auth_layer;
@ -24,10 +25,7 @@ pub use jwt_validator::JwtAuthValidator;
/// General purpose trait to validate Http Authorization headers. It's supposed to be integrated as /// General purpose trait to validate Http Authorization headers. It's supposed to be integrated as
/// a validator trait into an [`AuthLayer`]. /// a validator trait into an [`AuthLayer`].
pub trait AuthValidator { pub trait AuthValidator {
/// Body type of the error response
type ResponseBody;
/// This function is invoked by the [`AuthLayer`] to perform validation on Http headers. /// This function is invoked by the [`AuthLayer`] to perform validation on Http headers.
/// The result conveys validation errors in the form of an Http response. /// The result conveys validation errors in the form of an Http response.
fn validate(&self, headers: &HeaderMap) -> Result<(), Response<Self::ResponseBody>>; fn validate(&self, headers: &HeaderMap) -> Result<(), HttpResponse>;
} }

View File

@ -1734,7 +1734,7 @@ pub(crate) fn build_transaction_receipt_with_block_receipts(
} }
let rpc_receipt = reth_rpc_types::Receipt { let rpc_receipt = reth_rpc_types::Receipt {
status: receipt.success, status: receipt.success.into(),
cumulative_gas_used: receipt.cumulative_gas_used as u128, cumulative_gas_used: receipt.cumulative_gas_used as u128,
logs, logs,
}; };

View File

@ -158,7 +158,12 @@ where
.drain(page_start..page_end) .drain(page_start..page_end)
.map(|receipt| { .map(|receipt| {
let receipt = receipt.inner.map_inner(|receipt| OtsReceipt { let receipt = receipt.inner.map_inner(|receipt| OtsReceipt {
status: receipt.inner.receipt.status, status: receipt
.inner
.receipt
.status
.as_eip658()
.expect("ETH API returned pre-EIP-658 status"),
cumulative_gas_used: receipt.inner.receipt.cumulative_gas_used as u64, cumulative_gas_used: receipt.inner.receipt.cumulative_gas_used as u64,
logs: None, logs: None,
logs_bloom: None, logs_bloom: None,

View File

@ -13,8 +13,6 @@ use std::{
use tokio::sync::oneshot; use tokio::sync::oneshot;
/// A Future that resolves when the shutdown event has been fired. /// A Future that resolves when the shutdown event has been fired.
///
/// The [`TaskManager`](crate)
#[derive(Debug)] #[derive(Debug)]
pub struct GracefulShutdown { pub struct GracefulShutdown {
shutdown: Shutdown, shutdown: Shutdown,
@ -25,6 +23,13 @@ impl GracefulShutdown {
pub(crate) fn new(shutdown: Shutdown, guard: GracefulShutdownGuard) -> Self { pub(crate) fn new(shutdown: Shutdown, guard: GracefulShutdownGuard) -> Self {
Self { shutdown, guard: Some(guard) } Self { shutdown, guard: Some(guard) }
} }
/// Returns a new shutdown future that is ignores the returned [`GracefulShutdownGuard`].
///
/// This just maps the return value of the future to `()`, it does not drop the guard.
pub fn ignore_guard(self) -> impl Future<Output = ()> + Send + Sync + Unpin + 'static {
self.map(drop)
}
} }
impl Future for GracefulShutdown { impl Future for GracefulShutdown {

View File

@ -89,10 +89,9 @@ unknown-registry = "warn"
# in the allow list is encountered # in the allow list is encountered
unknown-git = "deny" unknown-git = "deny"
allow-git = [ allow-git = [
# TODO: remove, see ./Cargo.toml # TODO: Please avoid adding new entries to this list.
"https://github.com/alloy-rs/alloy", "https://github.com/alloy-rs/alloy",
"https://github.com/foundry-rs/block-explorers", "https://github.com/foundry-rs/block-explorers",
"https://github.com/bluealloy/revm", "https://github.com/bluealloy/revm",
"https://github.com/paradigmxyz/evm-inspectors", "https://github.com/paradigmxyz/revm-inspectors",
"https://github.com/sigp/discv5",
] ]

View File

@ -11,10 +11,10 @@ reth-node-ethereum.workspace = true
alloy-rpc-types-beacon.workspace = true alloy-rpc-types-beacon.workspace = true
serde.workspace = true
serde_json.workspace = true
clap.workspace = true clap.workspace = true
futures-util.workspace = true
eyre.workspace = true eyre.workspace = true
thiserror.workspace = true futures-util.workspace = true
reqwest.workspace = true reqwest.workspace = true
serde_json.workspace = true
serde.workspace = true
thiserror.workspace = true

View File

@ -8,10 +8,11 @@ license.workspace = true
[dependencies] [dependencies]
reth.workspace = true reth.workspace = true
reth-node-ethereum.workspace = true reth-node-ethereum.workspace = true
alloy-rpc-types-beacon.workspace = true alloy-rpc-types-beacon.workspace = true
clap.workspace = true clap.workspace = true
tracing.workspace = true
futures-util.workspace = true futures-util.workspace = true
mev-share-sse = { version = "0.3.0", default-features = false }
tokio = { workspace = true, features = ["time"] } tokio = { workspace = true, features = ["time"] }
mev-share-sse = { version = "0.2.0" , default-features = false } tracing.workspace = true