From 1c5a22a81498f6be58126f22937af8d7cb993a28 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Sat, 11 Oct 2025 02:49:19 +0000 Subject: [PATCH] fix: Convert header type for eth_subscribe Due to custom header usage, only `eth_subscribe` method was returning the new header format in raw format, while other part were using RpcConvert to convert headers. Make `eth_subscribe` newHeads to return the `inner` field (original eth header) instead. --- src/addons/hl_node_compliance.rs | 70 +++---------------------- src/addons/mod.rs | 2 + src/addons/subscribe_fixup.rs | 54 +++++++++++++++++++ src/addons/utils.rs | 90 ++++++++++++++++++++++++++++++++ src/main.rs | 17 +++++- 5 files changed, 169 insertions(+), 64 deletions(-) create mode 100644 src/addons/subscribe_fixup.rs create mode 100644 src/addons/utils.rs diff --git a/src/addons/hl_node_compliance.rs b/src/addons/hl_node_compliance.rs index 60deeb6f8..1ed058e52 100644 --- a/src/addons/hl_node_compliance.rs +++ b/src/addons/hl_node_compliance.rs @@ -19,62 +19,23 @@ use alloy_rpc_types::{ TransactionInfo, pubsub::{Params, SubscriptionKind}, }; -use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink, proc_macros::rpc}; +use jsonrpsee::{PendingSubscriptionSink, proc_macros::rpc}; use jsonrpsee_core::{RpcResult, async_trait}; use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE}; use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner}; use reth_primitives_traits::SignedTransaction; use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider}; -use reth_rpc::{EthFilter, EthPubSub, RpcTypes, eth::pubsub::SubscriptionSerializeError}; +use reth_rpc::{EthFilter, EthPubSub}; use reth_rpc_eth_api::{ - EthApiServer, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock, - RpcConvert, RpcHeader, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq, - helpers::{EthBlocks, EthTransactions, LoadReceipt}, - transaction::ConvertReceiptInput, + EthApiTypes, EthFilterApiServer, EthPubSubApiServer, RpcBlock, RpcConvert, RpcReceipt, + RpcTransaction, helpers::EthBlocks, transaction::ConvertReceiptInput, }; use reth_rpc_eth_types::EthApiError; -use serde::Serialize; use std::{marker::PhantomData, sync::Arc}; -use tokio_stream::{Stream, StreamExt}; +use tokio_stream::StreamExt; use tracing::{Instrument, trace}; -use crate::{HlBlock, node::primitives::HlPrimitives}; - -pub trait EthWrapper: - EthApiServer< - RpcTxReq, - RpcTransaction, - RpcBlock, - RpcReceipt, - RpcHeader, - > + FullEthApiTypes< - Primitives = HlPrimitives, - NetworkTypes: RpcTypes, - > + RpcNodeCoreExt> - + EthBlocks - + EthTransactions - + LoadReceipt - + 'static -{ -} - -impl EthWrapper for T where - T: EthApiServer< - RpcTxReq, - RpcTransaction, - RpcBlock, - RpcReceipt, - RpcHeader, - > + FullEthApiTypes< - Primitives = HlPrimitives, - NetworkTypes: RpcTypes, - > + RpcNodeCoreExt> - + EthBlocks - + EthTransactions - + LoadReceipt - + 'static -{ -} +use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream}; #[rpc(server, namespace = "eth")] #[async_trait] @@ -387,7 +348,7 @@ where ) .await; } else { - let _ = pubsub.handle_accepted(sink, kind, params).await; + let _ = pipe_from_stream(sink, new_headers_stream::(&provider)).await; } })); Ok(()) @@ -412,23 +373,6 @@ fn adjust_log(mut log: Log, provider: &Eth::Provider) -> Option Some(log) } -async fn pipe_from_stream + Unpin>( - sink: SubscriptionSink, - mut stream: St, -) -> Result<(), ErrorObject<'static>> { - loop { - tokio::select! { - _ = sink.closed() => break Ok(()), - maybe_item = stream.next() => { - let Some(item) = maybe_item else { break Ok(()) }; - let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item) - .map_err(SubscriptionSerializeError::from)?; - if sink.send(msg).await.is_err() { break Ok(()); } - } - } - } -} - pub struct HlNodeBlockFilterHttp { eth_api: Arc, _marker: PhantomData, diff --git a/src/addons/mod.rs b/src/addons/mod.rs index 5ea3d2d08..71e488f8d 100644 --- a/src/addons/mod.rs +++ b/src/addons/mod.rs @@ -1,3 +1,5 @@ pub mod call_forwarder; pub mod hl_node_compliance; pub mod tx_forwarder; +pub mod subscribe_fixup; +mod utils; diff --git a/src/addons/subscribe_fixup.rs b/src/addons/subscribe_fixup.rs new file mode 100644 index 000000000..6eebf287d --- /dev/null +++ b/src/addons/subscribe_fixup.rs @@ -0,0 +1,54 @@ +use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream}; +use alloy_rpc_types::pubsub::{Params, SubscriptionKind}; +use async_trait::async_trait; +use jsonrpsee::PendingSubscriptionSink; +use jsonrpsee_types::ErrorObject; +use reth::tasks::TaskSpawner; +use reth_rpc::EthPubSub; +use reth_rpc_convert::RpcTransaction; +use reth_rpc_eth_api::{EthApiTypes, EthPubSubApiServer}; +use std::sync::Arc; + +pub struct SubscribeFixup { + pubsub: Arc>, + provider: Arc, + subscription_task_spawner: Box, +} + +#[async_trait] +impl EthPubSubApiServer> for SubscribeFixup +where + ErrorObject<'static>: From<::Error>, +{ + async fn subscribe( + &self, + pending: PendingSubscriptionSink, + kind: SubscriptionKind, + params: Option, + ) -> jsonrpsee::core::SubscriptionResult { + let sink = pending.accept().await?; + let (pubsub, provider) = (self.pubsub.clone(), self.provider.clone()); + self.subscription_task_spawner.spawn(Box::pin(async move { + if kind == SubscriptionKind::NewHeads { + let _ = pipe_from_stream(sink, new_headers_stream::(&provider)).await; + } else { + let _ = pubsub.handle_accepted(sink, kind, params).await; + } + })); + Ok(()) + } +} + +impl SubscribeFixup { + pub fn new( + pubsub: Arc>, + provider: Arc, + subscription_task_spawner: Box, + ) -> Self + where + Eth: EthWrapper, + ErrorObject<'static>: From, + { + Self { pubsub, provider, subscription_task_spawner } + } +} diff --git a/src/addons/utils.rs b/src/addons/utils.rs new file mode 100644 index 000000000..f9cbb467e --- /dev/null +++ b/src/addons/utils.rs @@ -0,0 +1,90 @@ +use std::sync::Arc; + +use crate::{HlBlock, HlPrimitives}; +use alloy_primitives::U256; +use alloy_rpc_types::Header; +use futures::StreamExt; +use jsonrpsee::{SubscriptionMessage, SubscriptionSink}; +use jsonrpsee_types::ErrorObject; +use reth_primitives::SealedHeader; +use reth_provider::{BlockReader, CanonStateSubscriptions}; +use reth_rpc::{RpcTypes, eth::pubsub::SubscriptionSerializeError}; +use reth_rpc_convert::{RpcBlock, RpcHeader, RpcReceipt, RpcTransaction, RpcTxReq}; +use reth_rpc_eth_api::{ + EthApiServer, FullEthApiTypes, RpcNodeCoreExt, + helpers::{EthBlocks, EthTransactions, LoadReceipt}, +}; +use serde::Serialize; +use tokio_stream::Stream; + +pub trait EthWrapper: + EthApiServer< + RpcTxReq, + RpcTransaction, + RpcBlock, + RpcReceipt, + RpcHeader, + > + FullEthApiTypes< + Primitives = HlPrimitives, + NetworkTypes: RpcTypes, + > + RpcNodeCoreExt> + + EthBlocks + + EthTransactions + + LoadReceipt + + 'static +{ +} + +impl EthWrapper for T where + T: EthApiServer< + RpcTxReq, + RpcTransaction, + RpcBlock, + RpcReceipt, + RpcHeader, + > + FullEthApiTypes< + Primitives = HlPrimitives, + NetworkTypes: RpcTypes, + > + RpcNodeCoreExt> + + EthBlocks + + EthTransactions + + LoadReceipt + + 'static +{ +} + +pub(super) async fn pipe_from_stream + Unpin>( + sink: SubscriptionSink, + mut stream: St, +) -> Result<(), ErrorObject<'static>> { + loop { + tokio::select! { + _ = sink.closed() => break Ok(()), + maybe_item = stream.next() => { + let Some(item) = maybe_item else { break Ok(()) }; + let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item) + .map_err(SubscriptionSerializeError::from)?; + if sink.send(msg).await.is_err() { break Ok(()); } + } + } + } +} + +pub(super) fn new_headers_stream( + provider: &Arc, +) -> impl Stream> { + provider.canonical_state_stream().flat_map(|new_chain| { + let headers = new_chain + .committed() + .blocks_iter() + .map(|block| { + Header::from_consensus( + SealedHeader::new(block.header().inner.clone(), block.hash()).into(), + None, + Some(U256::from(block.rlp_length())), + ) + }) + .collect::>(); + futures::stream::iter(headers) + }) +} diff --git a/src/main.rs b/src/main.rs index 78125b78a..319d5dde4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,16 @@ use std::sync::Arc; use clap::Parser; -use reth::builder::{NodeBuilder, NodeHandle, WithLaunchContext}; +use reth::{ + builder::{NodeBuilder, NodeHandle, WithLaunchContext}, + rpc::{api::EthPubSubApiServer, eth::RpcNodeCore}, +}; use reth_db::DatabaseEnv; use reth_hl::{ addons::{ call_forwarder::{self, CallForwarderApiServer}, hl_node_compliance::install_hl_node_compliance, + subscribe_fixup::SubscribeFixup, tx_forwarder::{self, EthForwarderApiServer}, }, chainspec::{HlChainSpec, parser::HlChainSpecParser}, @@ -69,6 +73,17 @@ fn main() -> eyre::Result<()> { info!("eth_getProof is disabled by default"); } + // This is a temporary workaround to fix the issue with custom headers + // affects `eth_subscribe[type=newHeads]` + ctx.modules.replace_configured( + SubscribeFixup::new( + Arc::new(ctx.registry.eth_handlers().pubsub.clone()), + Arc::new(ctx.registry.eth_api().provider().clone()), + Box::new(ctx.node().task_executor.clone()), + ) + .into_rpc(), + )?; + ctx.modules.merge_configured( HlBlockPrecompileExt::new(ctx.registry.eth_api().clone()).into_rpc(), )?;