From 86ce34c2e398c21ee2ed888ccb7b93e1997ffcb3 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Sat, 1 Mar 2025 17:30:08 +0000 Subject: [PATCH] feat: Forward incoming txs to upstream server --- Cargo.lock | 1 + bin/reth/Cargo.toml | 1 + bin/reth/src/forwarder.rs | 45 +++++++++++++++++++++++++++++++++++++++ bin/reth/src/main.rs | 41 ++++++++++++++++++++++++++--------- 4 files changed, 78 insertions(+), 10 deletions(-) create mode 100644 bin/reth/src/forwarder.rs diff --git a/Cargo.lock b/Cargo.lock index 685ee41de..1cbcfa7a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6659,6 +6659,7 @@ dependencies = [ "eyre", "futures", "jsonrpsee", + "jsonrpsee-core", "lz4_flex", "once_cell", "parking_lot", diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 1bbef6495..22bbdeaf6 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -97,6 +97,7 @@ reth-e2e-test-utils.workspace = true once_cell.workspace = true reth-ethereum-forks.workspace = true jsonrpsee.workspace = true +jsonrpsee-core.workspace = true reth-rpc-layer.workspace = true [dev-dependencies] diff --git a/bin/reth/src/forwarder.rs b/bin/reth/src/forwarder.rs new file mode 100644 index 000000000..4f8e2ab46 --- /dev/null +++ b/bin/reth/src/forwarder.rs @@ -0,0 +1,45 @@ +use alloy_primitives::{Bytes, B256}; +use jsonrpsee::{ + http_client::{HttpClient, HttpClientBuilder}, + proc_macros::rpc, + types::{error::INTERNAL_ERROR_CODE, ErrorObject}, +}; +use jsonrpsee_core::{async_trait, client::ClientT, ClientError, RpcResult}; + +#[rpc(server, namespace = "eth")] +pub(crate) trait EthForwarderApi { + /// Returns block 0. + #[method(name = "sendRawTransaction")] + async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult; +} + +pub(crate) struct EthForwarderExt { + client: HttpClient, +} + +impl EthForwarderExt { + pub(crate) fn new(upstream_rpc_url: String) -> Self { + let client = + HttpClientBuilder::default().build(upstream_rpc_url).expect("Failed to build client"); + + Self { client } + } +} + +#[async_trait] +impl EthForwarderApiServer for EthForwarderExt { + async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult { + let txhash = + self.client.clone().request("eth_sendRawTransaction", vec![tx]).await.map_err(|e| { + match e { + ClientError::Call(e) => e, + _ => ErrorObject::owned( + INTERNAL_ERROR_CODE, + format!("Failed to send transaction: {:?}", e), + Some(()), + ), + } + })?; + Ok(txhash) + } +} diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index 9409c5799..1d26d1e5a 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -4,22 +4,28 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::new_allocator(); mod block_ingest; +mod forwarder; mod serialized; use std::path::PathBuf; use block_ingest::BlockIngest; use clap::{Args, Parser}; +use forwarder::EthForwarderApiServer; use reth::cli::Cli; use reth_ethereum_cli::chainspec::EthereumChainSpecParser; use reth_node_ethereum::EthereumNode; use tracing::info; #[derive(Args, Debug, Clone)] -struct IngestArgs { +struct HyperliquidExtArgs { /// EVM blocks base directory - #[arg(long, default_value="/tmp/evm-blocks")] + #[arg(long, default_value = "/tmp/evm-blocks")] pub ingest_dir: PathBuf, + + /// Upstream RPC URL to forward incoming transactions. + #[arg(long, default_value = "https://rpc.hyperliquid.xyz/evm")] + pub upstream_rpc_url: String, } fn main() { @@ -30,15 +36,30 @@ fn main() { std::env::set_var("RUST_BACKTRACE", "1"); } - if let Err(err) = Cli::::parse().run(|builder, ingest_args| async move { - info!(target: "reth::cli", "Launching node"); - let handle = builder.launch_node(EthereumNode::default()).await?; + if let Err(err) = Cli::::parse().run( + |builder, ingest_args| async move { + info!(target: "reth::cli", "Launching node"); + let handle = builder + .node(EthereumNode::default()) + .extend_rpc_modules(move |ctx| { + let upstream_rpc_url = ingest_args.upstream_rpc_url.clone(); + ctx.modules.remove_method_from_configured("eth_sendRawTransaction"); + ctx.modules.merge_configured( + forwarder::EthForwarderExt::new(upstream_rpc_url).into_rpc(), + )?; - let ingest_dir = ingest_args.ingest_dir; - let ingest = BlockIngest(ingest_dir); - ingest.run(handle.node).await.unwrap(); - handle.node_exit_future.await - }) { + info!("Transaction forwarder extension enabled"); + Ok(()) + }) + .launch() + .await?; + + let ingest_dir = ingest_args.ingest_dir; + let ingest = BlockIngest(ingest_dir); + ingest.run(handle.node).await.unwrap(); + handle.node_exit_future.await + }, + ) { eprintln!("Error: {err:?}"); std::process::exit(1); }