9 Commits

Author SHA1 Message Date
bc66716a41 Merge pull request #89 from hl-archive-node/cleanup
fix: Convert header type for eth_subscribe
2025-10-10 23:09:33 -04:00
fc819dbba2 test: Add regression tests 2025-10-11 02:52:09 +00:00
1c5a22a814 fix: Convert header type for eth_subscribe
Due to custom header usage, only `eth_subscribe` method was returning the new header format in raw format, while other part were using RpcConvert to convert headers.

Make `eth_subscribe` newHeads to return the `inner` field (original eth header) instead.
2025-10-11 02:49:19 +00:00
852e186b1a Merge pull request #88 from hl-archive-node/hotfix
hotfix: Mark migrator experimantal
2025-10-09 04:55:53 -04:00
f83326059f chore: clippy 2025-10-09 08:55:40 +00:00
ca8c374116 feat: Mark migrator as experimental 2025-10-09 08:49:29 +00:00
5ba12a4850 perf: adjust chunk size, do not hold tx too long 2025-10-09 08:20:22 +00:00
8a179a6d9e perf: Use smaller chunks 2025-10-09 08:13:53 +00:00
d570cf3e8d fix: Create directory before migration 2025-10-09 08:13:45 +00:00
7 changed files with 254 additions and 76 deletions

View File

@ -19,62 +19,23 @@ use alloy_rpc_types::{
TransactionInfo,
pubsub::{Params, SubscriptionKind},
};
use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink, proc_macros::rpc};
use jsonrpsee::{PendingSubscriptionSink, proc_macros::rpc};
use jsonrpsee_core::{RpcResult, async_trait};
use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE};
use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner};
use reth_primitives_traits::SignedTransaction;
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
use reth_rpc::{EthFilter, EthPubSub, RpcTypes, eth::pubsub::SubscriptionSerializeError};
use reth_rpc::{EthFilter, EthPubSub};
use reth_rpc_eth_api::{
EthApiServer, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock,
RpcConvert, RpcHeader, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq,
helpers::{EthBlocks, EthTransactions, LoadReceipt},
transaction::ConvertReceiptInput,
EthApiTypes, EthFilterApiServer, EthPubSubApiServer, RpcBlock, RpcConvert, RpcReceipt,
RpcTransaction, helpers::EthBlocks, transaction::ConvertReceiptInput,
};
use reth_rpc_eth_types::EthApiError;
use serde::Serialize;
use std::{marker::PhantomData, sync::Arc};
use tokio_stream::{Stream, StreamExt};
use tokio_stream::StreamExt;
use tracing::{Instrument, trace};
use crate::{HlBlock, node::primitives::HlPrimitives};
pub trait EthWrapper:
EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
impl<T> EthWrapper for T where
T: EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
#[rpc(server, namespace = "eth")]
#[async_trait]
@ -387,7 +348,7 @@ where
)
.await;
} else {
let _ = pubsub.handle_accepted(sink, kind, params).await;
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
}
}));
Ok(())
@ -412,23 +373,6 @@ fn adjust_log<Eth: EthWrapper>(mut log: Log, provider: &Eth::Provider) -> Option
Some(log)
}
async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>> {
loop {
tokio::select! {
_ = sink.closed() => break Ok(()),
maybe_item = stream.next() => {
let Some(item) = maybe_item else { break Ok(()) };
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
.map_err(SubscriptionSerializeError::from)?;
if sink.send(msg).await.is_err() { break Ok(()); }
}
}
}
}
pub struct HlNodeBlockFilterHttp<Eth: EthWrapper> {
eth_api: Arc<Eth>,
_marker: PhantomData<Eth>,

View File

@ -1,3 +1,5 @@
pub mod call_forwarder;
pub mod hl_node_compliance;
pub mod tx_forwarder;
pub mod subscribe_fixup;
mod utils;

View File

@ -0,0 +1,54 @@
use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
use alloy_rpc_types::pubsub::{Params, SubscriptionKind};
use async_trait::async_trait;
use jsonrpsee::PendingSubscriptionSink;
use jsonrpsee_types::ErrorObject;
use reth::tasks::TaskSpawner;
use reth_rpc::EthPubSub;
use reth_rpc_convert::RpcTransaction;
use reth_rpc_eth_api::{EthApiTypes, EthPubSubApiServer};
use std::sync::Arc;
pub struct SubscribeFixup<Eth: EthWrapper> {
pubsub: Arc<EthPubSub<Eth>>,
provider: Arc<Eth::Provider>,
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
}
#[async_trait]
impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for SubscribeFixup<Eth>
where
ErrorObject<'static>: From<<Eth as EthApiTypes>::Error>,
{
async fn subscribe(
&self,
pending: PendingSubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let (pubsub, provider) = (self.pubsub.clone(), self.provider.clone());
self.subscription_task_spawner.spawn(Box::pin(async move {
if kind == SubscriptionKind::NewHeads {
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
} else {
let _ = pubsub.handle_accepted(sink, kind, params).await;
}
}));
Ok(())
}
}
impl<Eth: EthWrapper> SubscribeFixup<Eth> {
pub fn new(
pubsub: Arc<EthPubSub<Eth>>,
provider: Arc<Eth::Provider>,
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
) -> Self
where
Eth: EthWrapper,
ErrorObject<'static>: From<Eth::Error>,
{
Self { pubsub, provider, subscription_task_spawner }
}
}

90
src/addons/utils.rs Normal file
View File

@ -0,0 +1,90 @@
use std::sync::Arc;
use crate::{HlBlock, HlPrimitives};
use alloy_primitives::U256;
use alloy_rpc_types::Header;
use futures::StreamExt;
use jsonrpsee::{SubscriptionMessage, SubscriptionSink};
use jsonrpsee_types::ErrorObject;
use reth_primitives::SealedHeader;
use reth_provider::{BlockReader, CanonStateSubscriptions};
use reth_rpc::{RpcTypes, eth::pubsub::SubscriptionSerializeError};
use reth_rpc_convert::{RpcBlock, RpcHeader, RpcReceipt, RpcTransaction, RpcTxReq};
use reth_rpc_eth_api::{
EthApiServer, FullEthApiTypes, RpcNodeCoreExt,
helpers::{EthBlocks, EthTransactions, LoadReceipt},
};
use serde::Serialize;
use tokio_stream::Stream;
pub trait EthWrapper:
EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
impl<T> EthWrapper for T where
T: EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
pub(super) async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>> {
loop {
tokio::select! {
_ = sink.closed() => break Ok(()),
maybe_item = stream.next() => {
let Some(item) = maybe_item else { break Ok(()) };
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
.map_err(SubscriptionSerializeError::from)?;
if sink.send(msg).await.is_err() { break Ok(()); }
}
}
}
}
pub(super) fn new_headers_stream<Eth: EthWrapper>(
provider: &Arc<Eth::Provider>,
) -> impl Stream<Item = Header<alloy_consensus::Header>> {
provider.canonical_state_stream().flat_map(|new_chain| {
let headers = new_chain
.committed()
.blocks_iter()
.map(|block| {
Header::from_consensus(
SealedHeader::new(block.header().inner.clone(), block.hash()).into(),
None,
Some(U256::from(block.rlp_length())),
)
})
.collect::<Vec<_>>();
futures::stream::iter(headers)
})
}

View File

@ -1,12 +1,16 @@
use std::sync::Arc;
use clap::Parser;
use reth::builder::{NodeBuilder, NodeHandle, WithLaunchContext};
use reth::{
builder::{NodeBuilder, NodeHandle, WithLaunchContext},
rpc::{api::EthPubSubApiServer, eth::RpcNodeCore},
};
use reth_db::DatabaseEnv;
use reth_hl::{
addons::{
call_forwarder::{self, CallForwarderApiServer},
hl_node_compliance::install_hl_node_compliance,
subscribe_fixup::SubscribeFixup,
tx_forwarder::{self, EthForwarderApiServer},
},
chainspec::{HlChainSpec, parser::HlChainSpecParser},
@ -69,6 +73,17 @@ fn main() -> eyre::Result<()> {
info!("eth_getProof is disabled by default");
}
// This is a temporary workaround to fix the issue with custom headers
// affects `eth_subscribe[type=newHeads]`
ctx.modules.replace_configured(
SubscribeFixup::new(
Arc::new(ctx.registry.eth_handlers().pubsub.clone()),
Arc::new(ctx.registry.eth_api().provider().clone()),
Box::new(ctx.node().task_executor.clone()),
)
.into_rpc(),
)?;
ctx.modules.merge_configured(
HlBlockPrecompileExt::new(ctx.registry.eth_api().clone()).into_rpc(),
)?;

View File

@ -1,5 +1,5 @@
use alloy_consensus::Header;
use alloy_primitives::{b256, hex::ToHexExt, BlockHash, Bytes, B256, U256};
use alloy_primitives::{B256, BlockHash, Bytes, U256, b256, hex::ToHexExt};
use reth::{
api::NodeTypesWithDBAdapter,
args::{DatabaseArgs, DatadirArgs},
@ -7,11 +7,12 @@ use reth::{
};
use reth_chainspec::EthChainSpec;
use reth_db::{
mdbx::{tx::Tx, RO},
DatabaseEnv,
mdbx::{RO, tx::Tx},
models::CompactU256,
static_file::iter_static_files,
table::Decompress,
tables, DatabaseEnv,
tables,
};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
@ -20,15 +21,15 @@ use reth_db_api::{
use reth_errors::ProviderResult;
use reth_ethereum_primitives::EthereumReceipt;
use reth_provider::{
providers::{NodeTypesForProvider, StaticFileProvider},
static_file::SegmentRangeInclusive,
DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory,
StaticFileSegment, StaticFileWriter,
providers::{NodeTypesForProvider, StaticFileProvider},
static_file::SegmentRangeInclusive,
};
use std::{fs::File, io::Write, path::PathBuf, sync::Arc};
use tracing::{info, warn};
use crate::{chainspec::HlChainSpec, HlHeader, HlPrimitives};
use crate::{HlHeader, HlPrimitives, chainspec::HlChainSpec};
pub(crate) trait HlNodeType:
NodeTypesForProvider<ChainSpec = HlChainSpec, Primitives = HlPrimitives>
@ -123,6 +124,8 @@ impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
return Ok(false);
}
check_if_migration_enabled()?;
self.migrate_mdbx_inner()?;
Ok(true)
}
@ -130,7 +133,14 @@ impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
fn migrate_mdbx_inner(&self) -> eyre::Result<()> {
// There shouldn't be many headers in mdbx, but using file for safety
info!("Old database detected, migrating mdbx...");
let tmp_path = self.0.conversion_tmp_dir().join("headers.rmp");
let conversion_tmp = self.0.conversion_tmp_dir();
let tmp_path = conversion_tmp.join("headers.rmp");
if conversion_tmp.exists() {
std::fs::remove_dir_all(&conversion_tmp)?;
}
std::fs::create_dir_all(&conversion_tmp)?;
let count = self.export_old_headers(&tmp_path)?;
self.import_new_headers(tmp_path, count)?;
Ok(())
@ -172,6 +182,18 @@ impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
}
}
fn check_if_migration_enabled() -> Result<(), eyre::Error> {
if std::env::var("EXPERIMENTAL_MIGRATE_DB").is_err() {
let err_msg = concat!(
"Detected an old database format but experimental database migration is currently disabled. ",
"To enable migration, set EXPERIMENTAL_MIGRATE_DB=1, or alternatively, resync your node (safest option)."
);
warn!("{}", err_msg);
return Err(eyre::eyre!("{}", err_msg));
}
Ok(())
}
struct MigrateStaticFiles<'a, N: HlNodeType>(&'a Migrator<N>);
impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
@ -244,13 +266,12 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
let mut all_static_files = iter_static_files(&old_path)?;
let all_static_files =
all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default();
let provider = self.0.provider_factory.provider()?;
let mut first = true;
for (block_range, _tx_ranges) in all_static_files {
let migration_needed = self.using_old_header(block_range.start())? ||
self.using_old_header(block_range.end())?;
let migration_needed = self.using_old_header(block_range.start())?
|| self.using_old_header(block_range.end())?;
if !migration_needed {
// Create a placeholder symlink
self.create_placeholder(block_range)?;
@ -258,12 +279,15 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
}
if first {
check_if_migration_enabled()?;
info!("Old database detected, migrating static files...");
first = false;
}
let sf_provider = self.0.sf_provider();
let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?;
let provider = self.0.provider_factory.provider()?;
let block_range_for_filename = sf_provider.find_fixed_range(block_range.start());
migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?;
@ -320,8 +344,8 @@ fn migrate_single_static_file<N: HlNodeType>(
) -> Result<(), eyre::Error> {
info!("Migrating block range {}...", block_range);
// block_ranges into chunks of 100000 blocks
const CHUNK_SIZE: u64 = 100000;
// block_ranges into chunks of 50000 blocks
const CHUNK_SIZE: u64 = 50000;
for chunk in (0..=block_range.end()).step_by(CHUNK_SIZE as usize) {
let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end());
let block_range = chunk..=end;

49
tests/run_tests.sh Normal file
View File

@ -0,0 +1,49 @@
#!/bin/bash
set -e
export ETH_RPC_URL="${ETH_RPC_URL:-wss://hl-archive-node.xyz}"
success() {
echo "Success: $1"
}
fail() {
echo "Failed: $1"
exit 1
}
ensure_cmd() {
command -v "$1" > /dev/null 2>&1 || fail "$1 is required"
}
ensure_cmd jq
ensure_cmd cast
ensure_cmd wscat
if [[ ! "$ETH_RPC_URL" =~ ^wss?:// ]]; then
fail "ETH_RPC_URL must be a websocket url"
fi
TITLE="Issue #78 - eth_getLogs should return system transactions"
cast logs \
--rpc-url "$ETH_RPC_URL" \
--from-block 15312567 \
--to-block 15312570 \
--address 0x9fdbda0a5e284c32744d2f17ee5c74b284993463 \
0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef \
| grep -q "0x00000000000000000000000020000000000000000000000000000000000000c5" \
&& success "$TITLE" || fail "$TITLE"
TITLE="Issue #78 - eth_getBlockByNumber should return the same logsBloom as official RPC"
OFFICIAL_RPC="https://rpc.hyperliquid.xyz/evm"
A=$(cast block 1394092 --rpc-url "$ETH_RPC_URL" -f logsBloom | md5sum)
B=$(cast block 1394092 --rpc-url "$OFFICIAL_RPC" -f logsBloom | md5sum)
echo node "$A"
echo rpc\ "$B"
[[ "$A" == "$B" ]] && success "$TITLE" || fail "$TITLE"
TITLE="eth_subscribe newHeads via wscat"
CMD='{"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params":["newHeads"]}'
wscat -w 2 -c "$ETH_RPC_URL" -x "$CMD" | tail -1 | jq -r .params.result.nonce | grep 0x \
&& success "$TITLE" || fail "$TITLE"