mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Compare commits
8 Commits
852e186b1a
...
nb-2025101
| Author | SHA1 | Date | |
|---|---|---|---|
| eee6eeb2fc | |||
| 611e6867bf | |||
| 6c3ed63c3c | |||
| 51924e9671 | |||
| 8f15aa311f | |||
| bc66716a41 | |||
| fc819dbba2 | |||
| 1c5a22a814 |
@ -19,62 +19,23 @@ use alloy_rpc_types::{
|
|||||||
TransactionInfo,
|
TransactionInfo,
|
||||||
pubsub::{Params, SubscriptionKind},
|
pubsub::{Params, SubscriptionKind},
|
||||||
};
|
};
|
||||||
use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink, proc_macros::rpc};
|
use jsonrpsee::{PendingSubscriptionSink, proc_macros::rpc};
|
||||||
use jsonrpsee_core::{RpcResult, async_trait};
|
use jsonrpsee_core::{RpcResult, async_trait};
|
||||||
use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE};
|
use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE};
|
||||||
use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner};
|
use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner};
|
||||||
use reth_primitives_traits::SignedTransaction;
|
use reth_primitives_traits::SignedTransaction;
|
||||||
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
|
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
|
||||||
use reth_rpc::{EthFilter, EthPubSub, RpcTypes, eth::pubsub::SubscriptionSerializeError};
|
use reth_rpc::{EthFilter, EthPubSub};
|
||||||
use reth_rpc_eth_api::{
|
use reth_rpc_eth_api::{
|
||||||
EthApiServer, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock,
|
EthApiTypes, EthFilterApiServer, EthPubSubApiServer, RpcBlock, RpcConvert, RpcReceipt,
|
||||||
RpcConvert, RpcHeader, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq,
|
RpcTransaction, helpers::EthBlocks, transaction::ConvertReceiptInput,
|
||||||
helpers::{EthBlocks, EthTransactions, LoadReceipt},
|
|
||||||
transaction::ConvertReceiptInput,
|
|
||||||
};
|
};
|
||||||
use reth_rpc_eth_types::EthApiError;
|
use reth_rpc_eth_types::EthApiError;
|
||||||
use serde::Serialize;
|
|
||||||
use std::{marker::PhantomData, sync::Arc};
|
use std::{marker::PhantomData, sync::Arc};
|
||||||
use tokio_stream::{Stream, StreamExt};
|
use tokio_stream::StreamExt;
|
||||||
use tracing::{Instrument, trace};
|
use tracing::{Instrument, trace};
|
||||||
|
|
||||||
use crate::{HlBlock, node::primitives::HlPrimitives};
|
use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
|
||||||
|
|
||||||
pub trait EthWrapper:
|
|
||||||
EthApiServer<
|
|
||||||
RpcTxReq<Self::NetworkTypes>,
|
|
||||||
RpcTransaction<Self::NetworkTypes>,
|
|
||||||
RpcBlock<Self::NetworkTypes>,
|
|
||||||
RpcReceipt<Self::NetworkTypes>,
|
|
||||||
RpcHeader<Self::NetworkTypes>,
|
|
||||||
> + FullEthApiTypes<
|
|
||||||
Primitives = HlPrimitives,
|
|
||||||
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
|
|
||||||
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
|
|
||||||
+ EthBlocks
|
|
||||||
+ EthTransactions
|
|
||||||
+ LoadReceipt
|
|
||||||
+ 'static
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> EthWrapper for T where
|
|
||||||
T: EthApiServer<
|
|
||||||
RpcTxReq<Self::NetworkTypes>,
|
|
||||||
RpcTransaction<Self::NetworkTypes>,
|
|
||||||
RpcBlock<Self::NetworkTypes>,
|
|
||||||
RpcReceipt<Self::NetworkTypes>,
|
|
||||||
RpcHeader<Self::NetworkTypes>,
|
|
||||||
> + FullEthApiTypes<
|
|
||||||
Primitives = HlPrimitives,
|
|
||||||
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
|
|
||||||
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
|
|
||||||
+ EthBlocks
|
|
||||||
+ EthTransactions
|
|
||||||
+ LoadReceipt
|
|
||||||
+ 'static
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
#[rpc(server, namespace = "eth")]
|
#[rpc(server, namespace = "eth")]
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@ -386,6 +347,8 @@ where
|
|||||||
pubsub.log_stream(filter).filter_map(|log| adjust_log::<Eth>(log, &provider)),
|
pubsub.log_stream(filter).filter_map(|log| adjust_log::<Eth>(log, &provider)),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
} else if kind == SubscriptionKind::NewHeads {
|
||||||
|
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
|
||||||
} else {
|
} else {
|
||||||
let _ = pubsub.handle_accepted(sink, kind, params).await;
|
let _ = pubsub.handle_accepted(sink, kind, params).await;
|
||||||
}
|
}
|
||||||
@ -412,23 +375,6 @@ fn adjust_log<Eth: EthWrapper>(mut log: Log, provider: &Eth::Provider) -> Option
|
|||||||
Some(log)
|
Some(log)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
|
|
||||||
sink: SubscriptionSink,
|
|
||||||
mut stream: St,
|
|
||||||
) -> Result<(), ErrorObject<'static>> {
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
_ = sink.closed() => break Ok(()),
|
|
||||||
maybe_item = stream.next() => {
|
|
||||||
let Some(item) = maybe_item else { break Ok(()) };
|
|
||||||
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
|
|
||||||
.map_err(SubscriptionSerializeError::from)?;
|
|
||||||
if sink.send(msg).await.is_err() { break Ok(()); }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct HlNodeBlockFilterHttp<Eth: EthWrapper> {
|
pub struct HlNodeBlockFilterHttp<Eth: EthWrapper> {
|
||||||
eth_api: Arc<Eth>,
|
eth_api: Arc<Eth>,
|
||||||
_marker: PhantomData<Eth>,
|
_marker: PhantomData<Eth>,
|
||||||
|
|||||||
@ -1,3 +1,5 @@
|
|||||||
pub mod call_forwarder;
|
pub mod call_forwarder;
|
||||||
pub mod hl_node_compliance;
|
pub mod hl_node_compliance;
|
||||||
pub mod tx_forwarder;
|
pub mod tx_forwarder;
|
||||||
|
pub mod subscribe_fixup;
|
||||||
|
mod utils;
|
||||||
|
|||||||
54
src/addons/subscribe_fixup.rs
Normal file
54
src/addons/subscribe_fixup.rs
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
|
||||||
|
use alloy_rpc_types::pubsub::{Params, SubscriptionKind};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use jsonrpsee::PendingSubscriptionSink;
|
||||||
|
use jsonrpsee_types::ErrorObject;
|
||||||
|
use reth::tasks::TaskSpawner;
|
||||||
|
use reth_rpc::EthPubSub;
|
||||||
|
use reth_rpc_convert::RpcTransaction;
|
||||||
|
use reth_rpc_eth_api::{EthApiTypes, EthPubSubApiServer};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
pub struct SubscribeFixup<Eth: EthWrapper> {
|
||||||
|
pubsub: Arc<EthPubSub<Eth>>,
|
||||||
|
provider: Arc<Eth::Provider>,
|
||||||
|
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for SubscribeFixup<Eth>
|
||||||
|
where
|
||||||
|
ErrorObject<'static>: From<<Eth as EthApiTypes>::Error>,
|
||||||
|
{
|
||||||
|
async fn subscribe(
|
||||||
|
&self,
|
||||||
|
pending: PendingSubscriptionSink,
|
||||||
|
kind: SubscriptionKind,
|
||||||
|
params: Option<Params>,
|
||||||
|
) -> jsonrpsee::core::SubscriptionResult {
|
||||||
|
let sink = pending.accept().await?;
|
||||||
|
let (pubsub, provider) = (self.pubsub.clone(), self.provider.clone());
|
||||||
|
self.subscription_task_spawner.spawn(Box::pin(async move {
|
||||||
|
if kind == SubscriptionKind::NewHeads {
|
||||||
|
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
|
||||||
|
} else {
|
||||||
|
let _ = pubsub.handle_accepted(sink, kind, params).await;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Eth: EthWrapper> SubscribeFixup<Eth> {
|
||||||
|
pub fn new(
|
||||||
|
pubsub: Arc<EthPubSub<Eth>>,
|
||||||
|
provider: Arc<Eth::Provider>,
|
||||||
|
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
|
||||||
|
) -> Self
|
||||||
|
where
|
||||||
|
Eth: EthWrapper,
|
||||||
|
ErrorObject<'static>: From<Eth::Error>,
|
||||||
|
{
|
||||||
|
Self { pubsub, provider, subscription_task_spawner }
|
||||||
|
}
|
||||||
|
}
|
||||||
90
src/addons/utils.rs
Normal file
90
src/addons/utils.rs
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use crate::{HlBlock, HlPrimitives};
|
||||||
|
use alloy_primitives::U256;
|
||||||
|
use alloy_rpc_types::Header;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use jsonrpsee::{SubscriptionMessage, SubscriptionSink};
|
||||||
|
use jsonrpsee_types::ErrorObject;
|
||||||
|
use reth_primitives::SealedHeader;
|
||||||
|
use reth_provider::{BlockReader, CanonStateSubscriptions};
|
||||||
|
use reth_rpc::{RpcTypes, eth::pubsub::SubscriptionSerializeError};
|
||||||
|
use reth_rpc_convert::{RpcBlock, RpcHeader, RpcReceipt, RpcTransaction, RpcTxReq};
|
||||||
|
use reth_rpc_eth_api::{
|
||||||
|
EthApiServer, FullEthApiTypes, RpcNodeCoreExt,
|
||||||
|
helpers::{EthBlocks, EthTransactions, LoadReceipt},
|
||||||
|
};
|
||||||
|
use serde::Serialize;
|
||||||
|
use tokio_stream::Stream;
|
||||||
|
|
||||||
|
pub trait EthWrapper:
|
||||||
|
EthApiServer<
|
||||||
|
RpcTxReq<Self::NetworkTypes>,
|
||||||
|
RpcTransaction<Self::NetworkTypes>,
|
||||||
|
RpcBlock<Self::NetworkTypes>,
|
||||||
|
RpcReceipt<Self::NetworkTypes>,
|
||||||
|
RpcHeader<Self::NetworkTypes>,
|
||||||
|
> + FullEthApiTypes<
|
||||||
|
Primitives = HlPrimitives,
|
||||||
|
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
|
||||||
|
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
|
||||||
|
+ EthBlocks
|
||||||
|
+ EthTransactions
|
||||||
|
+ LoadReceipt
|
||||||
|
+ 'static
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> EthWrapper for T where
|
||||||
|
T: EthApiServer<
|
||||||
|
RpcTxReq<Self::NetworkTypes>,
|
||||||
|
RpcTransaction<Self::NetworkTypes>,
|
||||||
|
RpcBlock<Self::NetworkTypes>,
|
||||||
|
RpcReceipt<Self::NetworkTypes>,
|
||||||
|
RpcHeader<Self::NetworkTypes>,
|
||||||
|
> + FullEthApiTypes<
|
||||||
|
Primitives = HlPrimitives,
|
||||||
|
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
|
||||||
|
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
|
||||||
|
+ EthBlocks
|
||||||
|
+ EthTransactions
|
||||||
|
+ LoadReceipt
|
||||||
|
+ 'static
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
|
||||||
|
sink: SubscriptionSink,
|
||||||
|
mut stream: St,
|
||||||
|
) -> Result<(), ErrorObject<'static>> {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = sink.closed() => break Ok(()),
|
||||||
|
maybe_item = stream.next() => {
|
||||||
|
let Some(item) = maybe_item else { break Ok(()) };
|
||||||
|
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
|
||||||
|
.map_err(SubscriptionSerializeError::from)?;
|
||||||
|
if sink.send(msg).await.is_err() { break Ok(()); }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn new_headers_stream<Eth: EthWrapper>(
|
||||||
|
provider: &Arc<Eth::Provider>,
|
||||||
|
) -> impl Stream<Item = Header<alloy_consensus::Header>> {
|
||||||
|
provider.canonical_state_stream().flat_map(|new_chain| {
|
||||||
|
let headers = new_chain
|
||||||
|
.committed()
|
||||||
|
.blocks_iter()
|
||||||
|
.map(|block| {
|
||||||
|
Header::from_consensus(
|
||||||
|
SealedHeader::new(block.header().inner.clone(), block.hash()).into(),
|
||||||
|
None,
|
||||||
|
Some(U256::from(block.rlp_length())),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
futures::stream::iter(headers)
|
||||||
|
})
|
||||||
|
}
|
||||||
17
src/main.rs
17
src/main.rs
@ -1,12 +1,16 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use reth::builder::{NodeBuilder, NodeHandle, WithLaunchContext};
|
use reth::{
|
||||||
|
builder::{NodeBuilder, NodeHandle, WithLaunchContext},
|
||||||
|
rpc::{api::EthPubSubApiServer, eth::RpcNodeCore},
|
||||||
|
};
|
||||||
use reth_db::DatabaseEnv;
|
use reth_db::DatabaseEnv;
|
||||||
use reth_hl::{
|
use reth_hl::{
|
||||||
addons::{
|
addons::{
|
||||||
call_forwarder::{self, CallForwarderApiServer},
|
call_forwarder::{self, CallForwarderApiServer},
|
||||||
hl_node_compliance::install_hl_node_compliance,
|
hl_node_compliance::install_hl_node_compliance,
|
||||||
|
subscribe_fixup::SubscribeFixup,
|
||||||
tx_forwarder::{self, EthForwarderApiServer},
|
tx_forwarder::{self, EthForwarderApiServer},
|
||||||
},
|
},
|
||||||
chainspec::{HlChainSpec, parser::HlChainSpecParser},
|
chainspec::{HlChainSpec, parser::HlChainSpecParser},
|
||||||
@ -59,6 +63,17 @@ fn main() -> eyre::Result<()> {
|
|||||||
info!("Call/gas estimation will be forwarded to {}", upstream_rpc_url);
|
info!("Call/gas estimation will be forwarded to {}", upstream_rpc_url);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is a temporary workaround to fix the issue with custom headers
|
||||||
|
// affects `eth_subscribe[type=newHeads]`
|
||||||
|
ctx.modules.replace_configured(
|
||||||
|
SubscribeFixup::new(
|
||||||
|
Arc::new(ctx.registry.eth_handlers().pubsub.clone()),
|
||||||
|
Arc::new(ctx.registry.eth_api().provider().clone()),
|
||||||
|
Box::new(ctx.node().task_executor.clone()),
|
||||||
|
)
|
||||||
|
.into_rpc(),
|
||||||
|
)?;
|
||||||
|
|
||||||
if ext.hl_node_compliant {
|
if ext.hl_node_compliant {
|
||||||
install_hl_node_compliance(&mut ctx)?;
|
install_hl_node_compliance(&mut ctx)?;
|
||||||
info!("hl-node compliant mode enabled");
|
info!("hl-node compliant mode enabled");
|
||||||
|
|||||||
@ -81,13 +81,13 @@ impl BlockPoller {
|
|||||||
.await
|
.await
|
||||||
.ok_or(eyre::eyre!("Failed to find latest block number"))?;
|
.ok_or(eyre::eyre!("Failed to find latest block number"))?;
|
||||||
|
|
||||||
if let Some(debug_cutoff_height) = debug_cutoff_height &&
|
|
||||||
next_block_number > debug_cutoff_height
|
|
||||||
{
|
|
||||||
next_block_number = debug_cutoff_height;
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
if let Some(debug_cutoff_height) = debug_cutoff_height
|
||||||
|
&& next_block_number > debug_cutoff_height
|
||||||
|
{
|
||||||
|
next_block_number = debug_cutoff_height;
|
||||||
|
}
|
||||||
|
|
||||||
match block_source.collect_block(next_block_number).await {
|
match block_source.collect_block(next_block_number).await {
|
||||||
Ok(block) => {
|
Ok(block) => {
|
||||||
block_tx.send((next_block_number, block)).await?;
|
block_tx.send((next_block_number, block)).await?;
|
||||||
|
|||||||
49
tests/run_tests.sh
Normal file
49
tests/run_tests.sh
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
export ETH_RPC_URL="${ETH_RPC_URL:-wss://hl-archive-node.xyz}"
|
||||||
|
|
||||||
|
success() {
|
||||||
|
echo "Success: $1"
|
||||||
|
}
|
||||||
|
|
||||||
|
fail() {
|
||||||
|
echo "Failed: $1"
|
||||||
|
exit 1
|
||||||
|
}
|
||||||
|
|
||||||
|
ensure_cmd() {
|
||||||
|
command -v "$1" > /dev/null 2>&1 || fail "$1 is required"
|
||||||
|
}
|
||||||
|
|
||||||
|
ensure_cmd jq
|
||||||
|
ensure_cmd cast
|
||||||
|
ensure_cmd wscat
|
||||||
|
|
||||||
|
if [[ ! "$ETH_RPC_URL" =~ ^wss?:// ]]; then
|
||||||
|
fail "ETH_RPC_URL must be a websocket url"
|
||||||
|
fi
|
||||||
|
|
||||||
|
TITLE="Issue #78 - eth_getLogs should return system transactions"
|
||||||
|
cast logs \
|
||||||
|
--rpc-url "$ETH_RPC_URL" \
|
||||||
|
--from-block 15312567 \
|
||||||
|
--to-block 15312570 \
|
||||||
|
--address 0x9fdbda0a5e284c32744d2f17ee5c74b284993463 \
|
||||||
|
0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef \
|
||||||
|
| grep -q "0x00000000000000000000000020000000000000000000000000000000000000c5" \
|
||||||
|
&& success "$TITLE" || fail "$TITLE"
|
||||||
|
|
||||||
|
TITLE="Issue #78 - eth_getBlockByNumber should return the same logsBloom as official RPC"
|
||||||
|
OFFICIAL_RPC="https://rpc.hyperliquid.xyz/evm"
|
||||||
|
A=$(cast block 1394092 --rpc-url "$ETH_RPC_URL" -f logsBloom | md5sum)
|
||||||
|
B=$(cast block 1394092 --rpc-url "$OFFICIAL_RPC" -f logsBloom | md5sum)
|
||||||
|
echo node "$A"
|
||||||
|
echo rpc\ "$B"
|
||||||
|
[[ "$A" == "$B" ]] && success "$TITLE" || fail "$TITLE"
|
||||||
|
|
||||||
|
TITLE="eth_subscribe newHeads via wscat"
|
||||||
|
CMD='{"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params":["newHeads"]}'
|
||||||
|
wscat -w 2 -c "$ETH_RPC_URL" -x "$CMD" | tail -1 | jq -r .params.result.nonce | grep 0x \
|
||||||
|
&& success "$TITLE" || fail "$TITLE"
|
||||||
Reference in New Issue
Block a user