feat(rpc): launch auth server (#1506)

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Roman Krasiuk
2023-02-23 00:39:37 +02:00
committed by GitHub
parent 0cf27ce8e8
commit 823866ac57
15 changed files with 303 additions and 100 deletions

View File

@ -13,12 +13,13 @@ reth-network-api = { path = "../../net/network-api" }
reth-provider = { path = "../../storage/provider" }
reth-rpc = { path = "../rpc" }
reth-rpc-api = { path = "../rpc-api" }
reth-rpc-engine-api = { path = "../rpc-engine-api" }
reth-rpc-types = { path = "../rpc-types" }
reth-transaction-pool = { path = "../../transaction-pool" }
jsonrpsee = { version = "0.16", features = ["server"] }
tower-http = { version = "0.3", features = ["full"] }
tower = {version = "0.4" , features = ["full"] }
tower = { version = "0.4", features = ["full"] }
hyper = "0.14"
strum = { version = "0.24", features = ["derive"] }

View File

@ -0,0 +1,76 @@
use crate::{constants::DEFAULT_AUTH_PORT, RpcServerConfig};
use hyper::{http::HeaderValue, Method};
pub use jsonrpsee::server::ServerBuilder;
use jsonrpsee::{
core::{
server::{host_filtering::AllowHosts, rpc_module::Methods},
Error as RpcError,
},
server::{middleware, Server, ServerHandle},
RpcModule,
};
use reth_ipc::server::IpcServer;
pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint};
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory};
use reth_rpc::{
AdminApi, AuthLayer, DebugApi, EngineApi, EthApi, JwtAuthValidator, JwtSecret, NetApi,
TraceApi, Web3Api,
};
use reth_rpc_api::servers::*;
use reth_rpc_engine_api::EngineApiHandle;
use reth_transaction_pool::TransactionPool;
use serde::{Deserialize, Serialize, Serializer};
use std::{
collections::{hash_map::Entry, HashMap},
fmt,
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
str::FromStr,
};
use strum::{AsRefStr, EnumString, EnumVariantNames, ParseError, VariantNames};
use tower::layer::util::{Identity, Stack};
use tower_http::cors::{AllowOrigin, Any, CorsLayer};
/// Configure and launch an auth server with `engine` and `eth` namespaces.
pub async fn launch<Client, Pool, Network>(
client: Client,
pool: Pool,
network: Network,
handle: EngineApiHandle,
socket_addr: SocketAddr,
secret: JwtSecret,
) -> Result<ServerHandle, RpcError>
where
Client: BlockProvider + HeaderProvider + StateProviderFactory + Clone + 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
{
launch_with_eth_api(EthApi::new(client, pool, network), handle, socket_addr, secret).await
}
/// Configure and launch an auth server with existing EthApi implementation.
pub async fn launch_with_eth_api<Client, Pool, Network>(
eth_api: EthApi<Client, Pool, Network>,
handle: EngineApiHandle,
socket_addr: SocketAddr,
secret: JwtSecret,
) -> Result<ServerHandle, RpcError>
where
Client: BlockProvider + HeaderProvider + StateProviderFactory + Clone + 'static,
Pool: TransactionPool + Clone + 'static,
Network: NetworkInfo + Peers + Clone + 'static,
{
// Configure the module and start the server.
let mut module = RpcModule::new(());
module.merge(EngineApi::new(handle).into_rpc());
module.merge(eth_api.into_rpc());
// Create auth middleware.
let middleware =
tower::ServiceBuilder::new().layer(AuthLayer::new(JwtAuthValidator::new(secret)));
// By default both http and ws are enabled.
let server = ServerBuilder::new().set_middleware(middleware).build(socket_addr).await?;
server.start(module)
}

View File

@ -0,0 +1,16 @@
/// The default port for the http server
pub const DEFAULT_HTTP_RPC_PORT: u16 = 8545;
/// The default port for the ws server
pub const DEFAULT_WS_RPC_PORT: u16 = 8546;
/// The default port for the auth server.
pub const DEFAULT_AUTH_PORT: u16 = 8551;
/// The default IPC endpoint
#[cfg(windows)]
pub const DEFAULT_IPC_ENDPOINT: &str = r"\\.\pipe\reth.ipc";
/// The default IPC endpoint
#[cfg(not(windows))]
pub const DEFAULT_IPC_ENDPOINT: &str = "/tmp/reth.ipc";

View File

@ -0,0 +1,44 @@
use hyper::{http::HeaderValue, Method};
use tower_http::cors::{AllowOrigin, Any, CorsLayer};
/// Error thrown when parsing cors domains went wrong
#[derive(Debug, thiserror::Error)]
pub(crate) enum CorsDomainError {
#[error("{domain} is an invalid header value")]
InvalidHeader { domain: String },
#[error("Wildcard origin (`*`) cannot be passed as part of a list: {input}")]
WildCardNotAllowed { input: String },
}
/// Creates a [CorsLayer] from the given domains
pub(crate) fn create_cors_layer(http_cors_domains: &str) -> Result<CorsLayer, CorsDomainError> {
let cors = match http_cors_domains.trim() {
"*" => CorsLayer::new()
.allow_methods([Method::GET, Method::POST])
.allow_origin(Any)
.allow_headers(Any),
_ => {
let iter = http_cors_domains.split(',');
if iter.clone().any(|o| o == "*") {
return Err(CorsDomainError::WildCardNotAllowed {
input: http_cors_domains.to_string(),
})
}
let origins = iter
.map(|domain| {
domain
.parse::<HeaderValue>()
.map_err(|_| CorsDomainError::InvalidHeader { domain: domain.to_string() })
})
.collect::<Result<Vec<HeaderValue>, _>>()?;
let origin = AllowOrigin::list(origins);
CorsLayer::new()
.allow_methods([Method::GET, Method::POST])
.allow_origin(origin)
.allow_headers(Any)
}
};
Ok(cors)
}

View File

@ -52,7 +52,6 @@
//! ```
use hyper::{http::HeaderValue, Method};
pub use jsonrpsee::server::ServerBuilder;
use jsonrpsee::{
core::{
server::{host_filtering::AllowHosts, rpc_module::Methods},
@ -62,7 +61,6 @@ use jsonrpsee::{
RpcModule,
};
use reth_ipc::server::IpcServer;
pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint};
use reth_network_api::{NetworkInfo, Peers};
use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory};
use reth_rpc::{AdminApi, DebugApi, EthApi, NetApi, TraceApi, Web3Api};
@ -77,21 +75,20 @@ use std::{
};
use strum::{AsRefStr, EnumString, EnumVariantNames, ParseError, VariantNames};
use tower::layer::util::{Identity, Stack};
use tower_http::cors::{AllowOrigin, Any, CorsLayer};
use tower_http::cors::CorsLayer;
/// The default port for the http server
pub const DEFAULT_HTTP_RPC_PORT: u16 = 8545;
pub use jsonrpsee::server::ServerBuilder;
pub use reth_ipc::server::{Builder as IpcServerBuilder, Endpoint};
/// The default port for the ws server
pub const DEFAULT_WS_RPC_PORT: u16 = 8546;
/// Auth server utilities.
pub mod auth;
/// The default IPC endpoint
#[cfg(windows)]
pub const DEFAULT_IPC_ENDPOINT: &str = r"\\.\pipe\reth.ipc";
/// Common RPC constants.
pub mod constants;
use constants::*;
/// The default IPC endpoint
#[cfg(not(windows))]
pub const DEFAULT_IPC_ENDPOINT: &str = "/tmp/reth.ipc";
/// Cors utilities.
mod cors;
/// Convenience function for starting a server in one step.
pub async fn launch<Client, Pool, Network>(
@ -508,10 +505,10 @@ pub struct RpcServerConfig {
http_server_config: Option<ServerBuilder>,
/// Cors Domains
http_cors_domains: Option<String>,
/// Configs for WS server
ws_server_config: Option<ServerBuilder>,
/// Address where to bind the http server to
http_addr: Option<SocketAddr>,
/// Configs for WS server
ws_server_config: Option<ServerBuilder>,
/// Address where to bind the ws server to
ws_addr: Option<SocketAddr>,
/// Configs for JSON-RPC IPC server
@ -621,7 +618,7 @@ impl RpcServerConfig {
)));
if let Some(builder) = self.http_server_config {
if let Some(cors) = self.http_cors_domains.as_deref().map(create_cors_layer) {
if let Some(cors) = self.http_cors_domains.as_deref().map(cors::create_cors_layer) {
let cors = cors.map_err(|err| RpcError::Custom(err.to_string()))?;
let middleware = tower::ServiceBuilder::new().layer(cors);
let http_server =
@ -658,48 +655,6 @@ impl RpcServerConfig {
}
}
/// Error thrown when parsing cors domains went wrong
#[derive(Debug, thiserror::Error)]
enum CorsDomainError {
#[error("{domain} is an invalid header value")]
InvalidHeader { domain: String },
#[error("Wildcard origin (`*`) cannot be passed as part of a list: {input}")]
WildCardNotAllowed { input: String },
}
/// Creates a [CorsLayer] from the given domains
fn create_cors_layer(http_cors_domains: &str) -> Result<CorsLayer, CorsDomainError> {
let cors = match http_cors_domains.trim() {
"*" => CorsLayer::new()
.allow_methods([Method::GET, Method::POST])
.allow_origin(Any)
.allow_headers(Any),
_ => {
let iter = http_cors_domains.split(',');
if iter.clone().any(|o| o == "*") {
return Err(CorsDomainError::WildCardNotAllowed {
input: http_cors_domains.to_string(),
})
}
let origins = iter
.map(|domain| {
domain
.parse::<HeaderValue>()
.map_err(|_| CorsDomainError::InvalidHeader { domain: domain.to_string() })
})
.collect::<Result<Vec<HeaderValue>, _>>()?;
let origin = AllowOrigin::list(origins);
CorsLayer::new()
.allow_methods([Method::GET, Method::POST])
.allow_origin(origin)
.allow_headers(Any)
}
};
Ok(cors)
}
/// Holds modules to be installed per transport type
///
/// # Example

View File

@ -10,7 +10,7 @@ use reth_primitives::{
BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock, TransactionSigned,
H64, U256,
};
use reth_provider::{BlockProvider, HeaderProvider, StateProvider};
use reth_provider::{BlockProvider, HeaderProvider, StateProviderFactory};
use reth_rlp::Decodable;
use reth_rpc_types::engine::{
ExecutionPayload, ExecutionPayloadBodies, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
@ -19,13 +19,15 @@ use reth_rpc_types::engine::{
use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::{oneshot, watch};
use tokio::sync::{mpsc, oneshot, watch};
use tokio_stream::wrappers::UnboundedReceiverStream;
/// The Engine API response sender
/// The Engine API handle.
pub type EngineApiHandle = mpsc::UnboundedSender<EngineApiMessage>;
/// The Engine API response sender.
pub type EngineApiSender<Ok> = oneshot::Sender<EngineApiResult<Ok>>;
/// The upper limit for payload bodies request.
@ -35,7 +37,7 @@ const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024;
/// functions in the Execution layer that are crucial for the consensus process.
#[must_use = "EngineApi does nothing unless polled."]
pub struct EngineApi<Client> {
client: Arc<Client>,
client: Client,
/// Consensus configuration
chain_spec: ChainSpec,
message_rx: UnboundedReceiverStream<EngineApiMessage>,
@ -45,7 +47,22 @@ pub struct EngineApi<Client> {
// remote_store: HashMap<H64, ExecutionPayload>,
}
impl<Client: HeaderProvider + BlockProvider + StateProvider> EngineApi<Client> {
impl<Client: HeaderProvider + BlockProvider + StateProviderFactory> EngineApi<Client> {
/// Create new instance of [EngineApi].
pub fn new(
client: Client,
chain_spec: ChainSpec,
message_rx: mpsc::UnboundedReceiver<EngineApiMessage>,
forkchoice_state_tx: watch::Sender<ForkchoiceState>,
) -> Self {
Self {
client,
chain_spec,
message_rx: UnboundedReceiverStream::new(message_rx),
forkchoice_state_tx,
}
}
fn on_message(&mut self, msg: EngineApiMessage) {
match msg {
EngineApiMessage::GetPayload(payload_id, tx) => {
@ -285,14 +302,14 @@ impl<Client: HeaderProvider + BlockProvider + StateProvider> EngineApi<Client> {
}))
}
let mut state_provider = SubState::new(State::new(&*self.client));
let state_provider = self.client.latest()?;
let total_difficulty = parent_td + block.header.difficulty;
match executor::execute_and_verify_receipt(
&block.unseal(),
total_difficulty,
None,
&self.chain_spec,
&mut state_provider,
&mut SubState::new(State::new(&state_provider)),
) {
Ok(_) => Ok(PayloadStatus::new(PayloadStatusEnum::Valid, block_hash)),
Err(err) => Ok(PayloadStatus::new(
@ -394,7 +411,7 @@ impl<Client: HeaderProvider + BlockProvider + StateProvider> EngineApi<Client> {
impl<Client> Future for EngineApi<Client>
where
Client: HeaderProvider + BlockProvider + StateProvider + Unpin,
Client: HeaderProvider + BlockProvider + StateProviderFactory + Unpin,
{
type Output = ();
@ -419,12 +436,13 @@ mod tests {
use reth_interfaces::test_utils::generators::random_block;
use reth_primitives::{H256, MAINNET};
use reth_provider::test_utils::MockEthProvider;
use std::sync::Arc;
use tokio::sync::{
mpsc::{unbounded_channel, UnboundedSender},
watch::Receiver as WatchReceiver,
};
fn setup_engine_api() -> (EngineApiTestHandle, EngineApi<MockEthProvider>) {
fn setup_engine_api() -> (EngineApiTestHandle, EngineApi<Arc<MockEthProvider>>) {
let chain_spec = MAINNET.clone();
let client = Arc::new(MockEthProvider::default());
let (msg_tx, msg_rx) = unbounded_channel();

View File

@ -17,6 +17,6 @@ mod message;
/// Engine API error.
mod error;
pub use engine_api::{EngineApi, EngineApiSender};
pub use engine_api::{EngineApi, EngineApiHandle, EngineApiSender};
pub use error::*;
pub use message::{EngineApiMessage, EngineApiMessageVersion};

View File

@ -8,22 +8,26 @@ use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::{BlockHash, BlockNumber, H64};
use reth_rpc_api::EngineApiServer;
use reth_rpc_engine_api::{
EngineApiError, EngineApiMessage, EngineApiMessageVersion, EngineApiResult,
EngineApiError, EngineApiHandle, EngineApiMessage, EngineApiMessageVersion, EngineApiResult,
REQUEST_TOO_LARGE_CODE, UNKNOWN_PAYLOAD_CODE,
};
use reth_rpc_types::engine::{
ExecutionPayload, ExecutionPayloadBodies, ForkchoiceUpdated, PayloadAttributes, PayloadStatus,
TransitionConfiguration, CAPABILITIES,
};
use tokio::sync::{
mpsc::UnboundedSender,
oneshot::{self, Receiver},
};
use tokio::sync::oneshot::{self, Receiver};
/// The server implementation of Engine API
pub struct EngineApi {
/// Handle to the consensus engine
engine_tx: UnboundedSender<EngineApiMessage>,
engine_tx: EngineApiHandle,
}
impl EngineApi {
/// Creates a new instance of [EngineApi].
pub fn new(engine_tx: EngineApiHandle) -> Self {
Self { engine_tx }
}
}
impl std::fmt::Debug for EngineApi {