Merge pull request #3 from sprites0/feat/call-forwarder-and-complete-tx-forwarder

feat: Call forwarder, more complete tx forwarder
This commit is contained in:
sprites0
2025-07-04 23:26:37 -04:00
committed by GitHub
11 changed files with 225 additions and 26 deletions

1
Cargo.lock generated
View File

@ -9280,6 +9280,7 @@ dependencies = [
"alloy-evm", "alloy-evm",
"alloy-genesis", "alloy-genesis",
"alloy-json-abi", "alloy-json-abi",
"alloy-json-rpc",
"alloy-network", "alloy-network",
"alloy-primitives", "alloy-primitives",
"alloy-rlp", "alloy-rlp",

View File

@ -57,6 +57,7 @@ alloy-chains = "0.2.0"
alloy-eips = "1.0.13" alloy-eips = "1.0.13"
alloy-evm = "0.12" alloy-evm = "0.12"
alloy-json-abi = { version = "1.0.0", default-features = false } alloy-json-abi = { version = "1.0.0", default-features = false }
alloy-json-rpc = { version = "1.0.13", default-features = false }
alloy-dyn-abi = "1.2.0" alloy-dyn-abi = "1.2.0"
alloy-network = "1.0.13" alloy-network = "1.0.13"
alloy-primitives = { version = "1.2.0", default-features = false, features = ["map-foldhash"] } alloy-primitives = { version = "1.2.0", default-features = false, features = ["map-foldhash"] }

97
src/call_forwarder.rs Normal file
View File

@ -0,0 +1,97 @@
use alloy_eips::BlockId;
use alloy_primitives::{Bytes, U256};
use alloy_rpc_types_eth::{state::StateOverride, transaction::TransactionRequest, BlockOverrides};
use jsonrpsee::{
http_client::{HttpClient, HttpClientBuilder},
proc_macros::rpc,
rpc_params,
types::{error::INTERNAL_ERROR_CODE, ErrorObject},
};
use jsonrpsee_core::{async_trait, client::ClientT, ClientError, RpcResult};
#[rpc(server, namespace = "eth")]
pub(crate) trait CallForwarderApi {
/// Executes a new message call immediately without creating a transaction on the block chain.
#[method(name = "call")]
async fn call(
&self,
request: TransactionRequest,
block_number: Option<BlockId>,
state_overrides: Option<StateOverride>,
block_overrides: Option<Box<BlockOverrides>>,
) -> RpcResult<Bytes>;
/// Generates and returns an estimate of how much gas is necessary to allow the transaction to
/// complete.
#[method(name = "estimateGas")]
async fn estimate_gas(
&self,
request: TransactionRequest,
block_number: Option<BlockId>,
state_override: Option<StateOverride>,
) -> RpcResult<U256>;
}
pub struct CallForwarderExt {
client: HttpClient,
}
impl CallForwarderExt {
pub fn new(upstream_rpc_url: String) -> Self {
let client =
HttpClientBuilder::default().build(upstream_rpc_url).expect("Failed to build client");
Self { client }
}
}
#[async_trait]
impl CallForwarderApiServer for CallForwarderExt {
async fn call(
&self,
request: TransactionRequest,
block_number: Option<BlockId>,
state_overrides: Option<StateOverride>,
block_overrides: Option<Box<BlockOverrides>>,
) -> RpcResult<Bytes> {
let result = self
.client
.clone()
.request(
"eth_call",
rpc_params![request, block_number, state_overrides, block_overrides],
)
.await
.map_err(|e| match e {
ClientError::Call(e) => e,
_ => ErrorObject::owned(
INTERNAL_ERROR_CODE,
format!("Failed to call: {e:?}"),
Some(()),
),
})?;
Ok(result)
}
async fn estimate_gas(
&self,
request: TransactionRequest,
block_number: Option<BlockId>,
state_override: Option<StateOverride>,
) -> RpcResult<U256> {
let result = self
.client
.clone()
.request("eth_estimateGas", rpc_params![request, block_number, state_override])
.await
.map_err(|e| match e {
ClientError::Call(e) => e,
_ => ErrorObject::owned(
INTERNAL_ERROR_CODE,
format!("Failed to estimate gas: {e:?}"),
Some(()),
),
})?;
Ok(result)
}
}

View File

@ -137,3 +137,16 @@ impl HlHardforks for Arc<HlChainSpec> {
self.as_ref().hl_fork_activation(fork) self.as_ref().hl_fork_activation(fork)
} }
} }
impl HlChainSpec {
pub const MAINNET_RPC_URL: &str = "https://rpc.hyperliquid.xyz/evm";
pub const TESTNET_RPC_URL: &str = "https://rpc.hyperliquid-testnet.xyz/evm";
pub fn official_rpc_url(&self) -> &'static str {
match self.inner.chain().id() {
999 => Self::MAINNET_RPC_URL,
998 => Self::TESTNET_RPC_URL,
_ => unreachable!("Unreachable since ChainSpecParser won't return other chains"),
}
}
}

View File

@ -6,5 +6,6 @@ pub mod hl_node_compliance;
pub mod node; pub mod node;
pub mod pseudo_peer; pub mod pseudo_peer;
pub mod tx_forwarder; pub mod tx_forwarder;
pub mod call_forwarder;
pub use node::primitives::{HlBlock, HlBlockBody, HlPrimitives}; pub use node::primitives::{HlBlock, HlBlockBody, HlPrimitives};

View File

@ -1,6 +1,7 @@
use clap::Parser; use clap::Parser;
use reth::builder::NodeHandle; use reth::builder::NodeHandle;
use reth_hl::{ use reth_hl::{
call_forwarder::{self, CallForwarderApiServer},
chainspec::parser::HlChainSpecParser, chainspec::parser::HlChainSpecParser,
hl_node_compliance::install_hl_node_compliance, hl_node_compliance::install_hl_node_compliance,
node::{ node::{
@ -26,23 +27,32 @@ fn main() -> eyre::Result<()> {
} }
Cli::<HlChainSpecParser, HlNodeArgs>::parse().run(|builder, ext| async move { Cli::<HlChainSpecParser, HlNodeArgs>::parse().run(|builder, ext| async move {
let default_upstream_rpc_url = builder.config().chain.official_rpc_url();
builder.builder.database.create_tables_for::<Tables>()?; builder.builder.database.create_tables_for::<Tables>()?;
let (node, engine_handle_tx) = let (node, engine_handle_tx) =
HlNode::new(ext.block_source_args.parse().await?, ext.hl_node_compliant); HlNode::new(ext.block_source_args.parse().await?, ext.hl_node_compliant);
let NodeHandle { node, node_exit_future: exit_future } = builder let NodeHandle { node, node_exit_future: exit_future } = builder
.node(node) .node(node)
.extend_rpc_modules(move |ctx| { .extend_rpc_modules(move |ctx| {
let upstream_rpc_url = ext.upstream_rpc_url; let upstream_rpc_url =
if let Some(upstream_rpc_url) = upstream_rpc_url { ext.upstream_rpc_url.unwrap_or_else(|| default_upstream_rpc_url.to_owned());
ctx.modules.replace_configured(
tx_forwarder::EthForwarderExt::new(upstream_rpc_url.clone()).into_rpc(),
)?;
info!("Transaction forwarding enabled"); ctx.modules.replace_configured(
tx_forwarder::EthForwarderExt::new(upstream_rpc_url.clone()).into_rpc(),
)?;
info!("Transaction will be forwarded to {}", upstream_rpc_url);
if ext.forward_call {
ctx.modules.replace_configured(
call_forwarder::CallForwarderExt::new(upstream_rpc_url.clone()).into_rpc(),
)?;
info!("Call/gas estimation will be forwarded to {}", upstream_rpc_url);
} }
if ext.hl_node_compliant { if ext.hl_node_compliant {
install_hl_node_compliance(ctx)?; install_hl_node_compliance(ctx)?;
info!("hl-node compliant mode enabled");
} }
Ok(()) Ok(())

View File

@ -33,6 +33,8 @@ pub struct HlNodeArgs {
pub block_source_args: BlockSourceArgs, pub block_source_args: BlockSourceArgs,
/// Upstream RPC URL to forward incoming transactions. /// Upstream RPC URL to forward incoming transactions.
///
/// Default to Hyperliquid's RPC URL when not provided (https://rpc.hyperliquid.xyz/evm).
#[arg(long, env = "UPSTREAM_RPC_URL")] #[arg(long, env = "UPSTREAM_RPC_URL")]
pub upstream_rpc_url: Option<String>, pub upstream_rpc_url: Option<String>,
@ -44,6 +46,12 @@ pub struct HlNodeArgs {
/// 3. filters out logs and transactions from subscription. /// 3. filters out logs and transactions from subscription.
#[arg(long, env = "HL_NODE_COMPLIANT")] #[arg(long, env = "HL_NODE_COMPLIANT")]
pub hl_node_compliant: bool, pub hl_node_compliant: bool,
/// Forward eth_call and eth_estimateGas to the upstream RPC.
///
/// This is useful when read precompile is needed for gas estimation.
#[arg(long, env = "FORWARD_CALL")]
pub forward_call: bool,
} }
/// The main reth_hl cli interface. /// The main reth_hl cli interface.

View File

@ -226,11 +226,18 @@ where
let network = NetworkManager::builder(network_config).await?; let network = NetworkManager::builder(network_config).await?;
let handle = ctx.start_network(network, pool); let handle = ctx.start_network(network, pool);
let local_node_record = handle.local_node_record(); let local_node_record = handle.local_node_record();
let chain_spec = ctx.chain_spec();
info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized"); info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized");
ctx.task_executor().spawn_critical("pseudo peer", async move { ctx.task_executor().spawn_critical("pseudo peer", async move {
let block_source = block_source_config.create_cached_block_source().await; let block_source = block_source_config.create_cached_block_source().await;
start_pseudo_peer(local_node_record.to_string(), block_source).await.unwrap(); start_pseudo_peer(
chain_spec,
local_node_record.to_string(),
block_source,
)
.await
.unwrap();
}); });
Ok(handle) Ok(handle)

View File

@ -12,6 +12,8 @@ pub mod service;
pub mod sources; pub mod sources;
pub mod utils; pub mod utils;
use std::sync::Arc;
pub use cli::*; pub use cli::*;
pub use config::*; pub use config::*;
pub use error::*; pub use error::*;
@ -35,10 +37,12 @@ pub mod prelude {
}; };
} }
use crate::chainspec::HlChainSpec;
use reth_network::{NetworkEvent, NetworkEventListenerProvider}; use reth_network::{NetworkEvent, NetworkEventListenerProvider};
/// Main function that starts the network manager and processes eth requests /// Main function that starts the network manager and processes eth requests
pub async fn start_pseudo_peer( pub async fn start_pseudo_peer(
chain_spec: Arc<HlChainSpec>,
destination_peer: String, destination_peer: String,
block_source: BlockSourceBoxed, block_source: BlockSourceBoxed,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
@ -63,7 +67,7 @@ pub async fn start_pseudo_peer(
let mut network_events = network_handle.event_listener(); let mut network_events = network_handle.event_listener();
info!("Starting network manager..."); info!("Starting network manager...");
let mut service = PseudoPeer::new(block_source, blockhash_cache.clone()); let mut service = PseudoPeer::new(chain_spec, block_source, blockhash_cache.clone());
tokio::spawn(network); tokio::spawn(network);
let mut first = true; let mut first = true;

View File

@ -1,7 +1,10 @@
use super::{sources::BlockSource, utils::LruBiMap}; use super::{sources::BlockSource, utils::LruBiMap};
use crate::node::{ use crate::{
network::{HlNetworkPrimitives, HlNewBlock}, chainspec::HlChainSpec,
types::BlockAndReceipts, node::{
network::{HlNetworkPrimitives, HlNewBlock},
types::BlockAndReceipts,
},
}; };
use alloy_eips::HashOrNumber; use alloy_eips::HashOrNumber;
use alloy_primitives::{B256, U128}; use alloy_primitives::{B256, U128};
@ -116,6 +119,7 @@ impl BlockImport<HlNewBlock> for BlockPoller {
/// A pseudo peer that can process eth requests and feed blocks to reth /// A pseudo peer that can process eth requests and feed blocks to reth
pub struct PseudoPeer<BS: BlockSource> { pub struct PseudoPeer<BS: BlockSource> {
chain_spec: Arc<HlChainSpec>,
block_source: BS, block_source: BS,
blockhash_cache: BlockHashCache, blockhash_cache: BlockHashCache,
warm_cache_size: u64, warm_cache_size: u64,
@ -127,8 +131,13 @@ pub struct PseudoPeer<BS: BlockSource> {
} }
impl<BS: BlockSource> PseudoPeer<BS> { impl<BS: BlockSource> PseudoPeer<BS> {
pub fn new(block_source: BS, blockhash_cache: BlockHashCache) -> Self { pub fn new(
chain_spec: Arc<HlChainSpec>,
block_source: BS,
blockhash_cache: BlockHashCache,
) -> Self {
Self { Self {
chain_spec,
block_source, block_source,
blockhash_cache, blockhash_cache,
warm_cache_size: 1000, // reth default chunk size for GetBlockBodies warm_cache_size: 1000, // reth default chunk size for GetBlockBodies
@ -244,7 +253,8 @@ impl<BS: BlockSource> PseudoPeer<BS> {
use jsonrpsee_core::client::ClientT; use jsonrpsee_core::client::ClientT;
debug!("Fallback to official RPC: {hash:?}"); debug!("Fallback to official RPC: {hash:?}");
let client = HttpClientBuilder::default().build("https://rpc.hyperliquid.xyz/evm").unwrap(); let client =
HttpClientBuilder::default().build(self.chain_spec.official_rpc_url()).unwrap();
let target_block: Block = client.request("eth_getBlockByHash", (hash, false)).await?; let target_block: Block = client.request("eth_getBlockByHash", (hash, false)).await?;
debug!("From official RPC: {:?} for {hash:?}", target_block.header.number); debug!("From official RPC: {:?} for {hash:?}", target_block.header.number);

View File

@ -1,15 +1,28 @@
use std::time::Duration;
use alloy_json_rpc::RpcObject;
use alloy_network::Ethereum;
use alloy_primitives::{Bytes, B256}; use alloy_primitives::{Bytes, B256};
use alloy_rpc_types::TransactionRequest;
use jsonrpsee::{ use jsonrpsee::{
http_client::{HttpClient, HttpClientBuilder}, http_client::{HttpClient, HttpClientBuilder},
proc_macros::rpc, proc_macros::rpc,
types::{error::INTERNAL_ERROR_CODE, ErrorObject}, types::{error::INTERNAL_ERROR_CODE, ErrorObject},
}; };
use jsonrpsee_core::{async_trait, client::ClientT, ClientError, RpcResult}; use jsonrpsee_core::{async_trait, client::ClientT, ClientError, RpcResult};
use reth::rpc::{result::internal_rpc_err, server_types::eth::EthApiError};
use reth_rpc_eth_api::RpcReceipt;
#[rpc(server, namespace = "eth")] #[rpc(server, namespace = "eth")]
pub trait EthForwarderApi { pub trait EthForwarderApi<R: RpcObject> {
#[method(name = "sendRawTransaction")] #[method(name = "sendRawTransaction")]
async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult<B256>; async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult<B256>;
#[method(name = "eth_sendTransaction")]
async fn send_transaction(&self, _tx: TransactionRequest) -> RpcResult<B256>;
#[method(name = "eth_sendRawTransactionSync")]
async fn send_raw_transaction_sync(&self, tx: Bytes) -> RpcResult<R>;
} }
pub struct EthForwarderExt { pub struct EthForwarderExt {
@ -23,22 +36,56 @@ impl EthForwarderExt {
Self { client } Self { client }
} }
fn from_client_error(e: ClientError, internal_error_prefix: &str) -> ErrorObject {
match e {
ClientError::Call(e) => e,
_ => ErrorObject::owned(
INTERNAL_ERROR_CODE,
format!("{internal_error_prefix}: {e:?}"),
Some(()),
),
}
}
} }
#[async_trait] #[async_trait]
impl EthForwarderApiServer for EthForwarderExt { impl EthForwarderApiServer<RpcReceipt<Ethereum>> for EthForwarderExt {
async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult<B256> { async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult<B256> {
let txhash = let txhash = self
self.client.clone().request("eth_sendRawTransaction", vec![tx]).await.map_err(|e| { .client
match e { .clone()
ClientError::Call(e) => e, .request("eth_sendRawTransaction", vec![tx])
_ => ErrorObject::owned( .await
INTERNAL_ERROR_CODE, .map_err(|e| Self::from_client_error(e, "Failed to send transaction"))?;
format!("Failed to send transaction: {e:?}"),
Some(()),
),
}
})?;
Ok(txhash) Ok(txhash)
} }
async fn send_transaction(&self, _tx: TransactionRequest) -> RpcResult<B256> {
Err(internal_rpc_err("Unimplemented"))
}
async fn send_raw_transaction_sync(&self, tx: Bytes) -> RpcResult<RpcReceipt<Ethereum>> {
let hash = self.send_raw_transaction(tx).await?;
const TIMEOUT_DURATION: Duration = Duration::from_secs(30);
const INTERVAL: Duration = Duration::from_secs(1);
tokio::time::timeout(TIMEOUT_DURATION, async {
loop {
let receipt =
self.client.request("eth_getTransactionReceipt", vec![hash]).await.map_err(
|e| Self::from_client_error(e, "Failed to get transaction receipt"),
)?;
if let Some(receipt) = receipt {
return Ok(receipt);
}
tokio::time::sleep(INTERVAL).await;
}
})
.await
.unwrap_or_else(|_elapsed| {
Err(EthApiError::TransactionConfirmationTimeout { hash, duration: TIMEOUT_DURATION }
.into())
})
}
} }