20 Commits

Author SHA1 Message Date
f915aba568 Merge pull request #100 from hl-archive-node/feat/deprecate-migrator
feat: Place migrator behind `CHECK_DB_MIGRATION` env
2025-11-01 06:23:01 -04:00
1fe03bfc41 feat: Place migrator behind CHECK_DB_MIGRATION env 2025-11-01 09:36:30 +00:00
893822e5b0 Merge pull request #98 from hl-archive-node/fix/testnet-system-tx 2025-10-26 03:39:06 -04:00
c2528ce223 fix: Support certain types of system tx 2025-10-26 06:42:14 +00:00
d46e808b8d Merge pull request #94 from hl-archive-node/fix/migrator-typo
fix(migrate): Fix wrong chunk ranges
2025-10-16 10:42:59 -04:00
497353fd2f fix(migrate): Fix wrong chunk ranges 2025-10-16 14:35:04 +00:00
eee6eeb2fc Merge pull request #93 from hl-archive-node/fix/subscriptions
fix: Prevent #89 from overriding --hl-node-compliant subscriptions
2025-10-13 01:27:19 -04:00
611e6867bf fix: Do not override --hl-node-compliant for subscription 2025-10-13 02:57:25 +00:00
6c3ed63c3c fix: Override NewHeads only 2025-10-13 02:57:05 +00:00
51924e9671 Merge pull request #91 from hl-archive-node/fix/debug-cutoff
fix: Fix --debug-cutoff-height semantics
2025-10-11 22:29:45 -04:00
8f15aa311f fix: Fix --debug-cutoff-height semantics
NOTE: This is a debug feature not on by default.

The original intention of it was limiting the highest block number. But it was instead enforcing the starting block number for fetching, leading to block progression.
2025-10-12 02:22:55 +00:00
bc66716a41 Merge pull request #89 from hl-archive-node/cleanup
fix: Convert header type for eth_subscribe
2025-10-10 23:09:33 -04:00
fc819dbba2 test: Add regression tests 2025-10-11 02:52:09 +00:00
1c5a22a814 fix: Convert header type for eth_subscribe
Due to custom header usage, only `eth_subscribe` method was returning the new header format in raw format, while other part were using RpcConvert to convert headers.

Make `eth_subscribe` newHeads to return the `inner` field (original eth header) instead.
2025-10-11 02:49:19 +00:00
852e186b1a Merge pull request #88 from hl-archive-node/hotfix
hotfix: Mark migrator experimantal
2025-10-09 04:55:53 -04:00
f83326059f chore: clippy 2025-10-09 08:55:40 +00:00
ca8c374116 feat: Mark migrator as experimental 2025-10-09 08:49:29 +00:00
5ba12a4850 perf: adjust chunk size, do not hold tx too long 2025-10-09 08:20:22 +00:00
8a179a6d9e perf: Use smaller chunks 2025-10-09 08:13:53 +00:00
d570cf3e8d fix: Create directory before migration 2025-10-09 08:13:45 +00:00
11 changed files with 284 additions and 84 deletions

View File

@ -19,62 +19,23 @@ use alloy_rpc_types::{
TransactionInfo,
pubsub::{Params, SubscriptionKind},
};
use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink, proc_macros::rpc};
use jsonrpsee::{PendingSubscriptionSink, proc_macros::rpc};
use jsonrpsee_core::{RpcResult, async_trait};
use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE};
use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner};
use reth_primitives_traits::SignedTransaction;
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
use reth_rpc::{EthFilter, EthPubSub, RpcTypes, eth::pubsub::SubscriptionSerializeError};
use reth_rpc::{EthFilter, EthPubSub};
use reth_rpc_eth_api::{
EthApiServer, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock,
RpcConvert, RpcHeader, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq,
helpers::{EthBlocks, EthTransactions, LoadReceipt},
transaction::ConvertReceiptInput,
EthApiTypes, EthFilterApiServer, EthPubSubApiServer, RpcBlock, RpcConvert, RpcReceipt,
RpcTransaction, helpers::EthBlocks, transaction::ConvertReceiptInput,
};
use reth_rpc_eth_types::EthApiError;
use serde::Serialize;
use std::{marker::PhantomData, sync::Arc};
use tokio_stream::{Stream, StreamExt};
use tokio_stream::StreamExt;
use tracing::{Instrument, trace};
use crate::{HlBlock, node::primitives::HlPrimitives};
pub trait EthWrapper:
EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
impl<T> EthWrapper for T where
T: EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
#[rpc(server, namespace = "eth")]
#[async_trait]
@ -386,6 +347,8 @@ where
pubsub.log_stream(filter).filter_map(|log| adjust_log::<Eth>(log, &provider)),
)
.await;
} else if kind == SubscriptionKind::NewHeads {
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
} else {
let _ = pubsub.handle_accepted(sink, kind, params).await;
}
@ -412,23 +375,6 @@ fn adjust_log<Eth: EthWrapper>(mut log: Log, provider: &Eth::Provider) -> Option
Some(log)
}
async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>> {
loop {
tokio::select! {
_ = sink.closed() => break Ok(()),
maybe_item = stream.next() => {
let Some(item) = maybe_item else { break Ok(()) };
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
.map_err(SubscriptionSerializeError::from)?;
if sink.send(msg).await.is_err() { break Ok(()); }
}
}
}
}
pub struct HlNodeBlockFilterHttp<Eth: EthWrapper> {
eth_api: Arc<Eth>,
_marker: PhantomData<Eth>,

View File

@ -1,3 +1,5 @@
pub mod call_forwarder;
pub mod hl_node_compliance;
pub mod tx_forwarder;
pub mod subscribe_fixup;
mod utils;

View File

@ -0,0 +1,54 @@
use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
use alloy_rpc_types::pubsub::{Params, SubscriptionKind};
use async_trait::async_trait;
use jsonrpsee::PendingSubscriptionSink;
use jsonrpsee_types::ErrorObject;
use reth::tasks::TaskSpawner;
use reth_rpc::EthPubSub;
use reth_rpc_convert::RpcTransaction;
use reth_rpc_eth_api::{EthApiTypes, EthPubSubApiServer};
use std::sync::Arc;
pub struct SubscribeFixup<Eth: EthWrapper> {
pubsub: Arc<EthPubSub<Eth>>,
provider: Arc<Eth::Provider>,
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
}
#[async_trait]
impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for SubscribeFixup<Eth>
where
ErrorObject<'static>: From<<Eth as EthApiTypes>::Error>,
{
async fn subscribe(
&self,
pending: PendingSubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let (pubsub, provider) = (self.pubsub.clone(), self.provider.clone());
self.subscription_task_spawner.spawn(Box::pin(async move {
if kind == SubscriptionKind::NewHeads {
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
} else {
let _ = pubsub.handle_accepted(sink, kind, params).await;
}
}));
Ok(())
}
}
impl<Eth: EthWrapper> SubscribeFixup<Eth> {
pub fn new(
pubsub: Arc<EthPubSub<Eth>>,
provider: Arc<Eth::Provider>,
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
) -> Self
where
Eth: EthWrapper,
ErrorObject<'static>: From<Eth::Error>,
{
Self { pubsub, provider, subscription_task_spawner }
}
}

90
src/addons/utils.rs Normal file
View File

@ -0,0 +1,90 @@
use std::sync::Arc;
use crate::{HlBlock, HlPrimitives};
use alloy_primitives::U256;
use alloy_rpc_types::Header;
use futures::StreamExt;
use jsonrpsee::{SubscriptionMessage, SubscriptionSink};
use jsonrpsee_types::ErrorObject;
use reth_primitives::SealedHeader;
use reth_provider::{BlockReader, CanonStateSubscriptions};
use reth_rpc::{RpcTypes, eth::pubsub::SubscriptionSerializeError};
use reth_rpc_convert::{RpcBlock, RpcHeader, RpcReceipt, RpcTransaction, RpcTxReq};
use reth_rpc_eth_api::{
EthApiServer, FullEthApiTypes, RpcNodeCoreExt,
helpers::{EthBlocks, EthTransactions, LoadReceipt},
};
use serde::Serialize;
use tokio_stream::Stream;
pub trait EthWrapper:
EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
impl<T> EthWrapper for T where
T: EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
pub(super) async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>> {
loop {
tokio::select! {
_ = sink.closed() => break Ok(()),
maybe_item = stream.next() => {
let Some(item) = maybe_item else { break Ok(()) };
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
.map_err(SubscriptionSerializeError::from)?;
if sink.send(msg).await.is_err() { break Ok(()); }
}
}
}
}
pub(super) fn new_headers_stream<Eth: EthWrapper>(
provider: &Arc<Eth::Provider>,
) -> impl Stream<Item = Header<alloy_consensus::Header>> {
provider.canonical_state_stream().flat_map(|new_chain| {
let headers = new_chain
.committed()
.blocks_iter()
.map(|block| {
Header::from_consensus(
SealedHeader::new(block.header().inner.clone(), block.hash()).into(),
None,
Some(U256::from(block.rlp_length())),
)
})
.collect::<Vec<_>>();
futures::stream::iter(headers)
})
}

View File

@ -1,12 +1,16 @@
use std::sync::Arc;
use clap::Parser;
use reth::builder::{NodeBuilder, NodeHandle, WithLaunchContext};
use reth::{
builder::{NodeBuilder, NodeHandle, WithLaunchContext},
rpc::{api::EthPubSubApiServer, eth::RpcNodeCore},
};
use reth_db::DatabaseEnv;
use reth_hl::{
addons::{
call_forwarder::{self, CallForwarderApiServer},
hl_node_compliance::install_hl_node_compliance,
subscribe_fixup::SubscribeFixup,
tx_forwarder::{self, EthForwarderApiServer},
},
chainspec::{HlChainSpec, parser::HlChainSpecParser},
@ -59,6 +63,17 @@ fn main() -> eyre::Result<()> {
info!("Call/gas estimation will be forwarded to {}", upstream_rpc_url);
}
// This is a temporary workaround to fix the issue with custom headers
// affects `eth_subscribe[type=newHeads]`
ctx.modules.replace_configured(
SubscribeFixup::new(
Arc::new(ctx.registry.eth_handlers().pubsub.clone()),
Arc::new(ctx.registry.eth_api().provider().clone()),
Box::new(ctx.node().task_executor.clone()),
)
.into_rpc(),
)?;
if ext.hl_node_compliant {
install_hl_node_compliance(&mut ctx)?;
info!("hl-node compliant mode enabled");

View File

@ -145,8 +145,12 @@ where
match self.command {
Commands::Node(command) => runner.run_command_until_exit(|ctx| {
Self::migrate_db(&command.chain, &command.datadir, &command.db)
.expect("Failed to migrate database");
// NOTE: This is for one time migration around Oct 10 upgrade:
// It's not necessary anymore, an environment variable gate is added here.
if std::env::var("CHECK_DB_MIGRATION").is_ok() {
Self::migrate_db(&command.chain, &command.datadir, &command.db)
.expect("Failed to migrate database");
}
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
}),
Commands::Init(command) => {

View File

@ -1,5 +1,5 @@
use alloy_consensus::Header;
use alloy_primitives::{b256, hex::ToHexExt, BlockHash, Bytes, B256, U256};
use alloy_primitives::{B256, BlockHash, Bytes, U256, b256, hex::ToHexExt};
use reth::{
api::NodeTypesWithDBAdapter,
args::{DatabaseArgs, DatadirArgs},
@ -7,11 +7,12 @@ use reth::{
};
use reth_chainspec::EthChainSpec;
use reth_db::{
mdbx::{tx::Tx, RO},
DatabaseEnv,
mdbx::{RO, tx::Tx},
models::CompactU256,
static_file::iter_static_files,
table::Decompress,
tables, DatabaseEnv,
tables,
};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
@ -20,15 +21,15 @@ use reth_db_api::{
use reth_errors::ProviderResult;
use reth_ethereum_primitives::EthereumReceipt;
use reth_provider::{
providers::{NodeTypesForProvider, StaticFileProvider},
static_file::SegmentRangeInclusive,
DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory,
StaticFileSegment, StaticFileWriter,
providers::{NodeTypesForProvider, StaticFileProvider},
static_file::SegmentRangeInclusive,
};
use std::{fs::File, io::Write, path::PathBuf, sync::Arc};
use tracing::{info, warn};
use crate::{chainspec::HlChainSpec, HlHeader, HlPrimitives};
use crate::{HlHeader, HlPrimitives, chainspec::HlChainSpec};
pub(crate) trait HlNodeType:
NodeTypesForProvider<ChainSpec = HlChainSpec, Primitives = HlPrimitives>
@ -123,6 +124,8 @@ impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
return Ok(false);
}
check_if_migration_enabled()?;
self.migrate_mdbx_inner()?;
Ok(true)
}
@ -130,7 +133,14 @@ impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
fn migrate_mdbx_inner(&self) -> eyre::Result<()> {
// There shouldn't be many headers in mdbx, but using file for safety
info!("Old database detected, migrating mdbx...");
let tmp_path = self.0.conversion_tmp_dir().join("headers.rmp");
let conversion_tmp = self.0.conversion_tmp_dir();
let tmp_path = conversion_tmp.join("headers.rmp");
if conversion_tmp.exists() {
std::fs::remove_dir_all(&conversion_tmp)?;
}
std::fs::create_dir_all(&conversion_tmp)?;
let count = self.export_old_headers(&tmp_path)?;
self.import_new_headers(tmp_path, count)?;
Ok(())
@ -172,6 +182,18 @@ impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
}
}
fn check_if_migration_enabled() -> Result<(), eyre::Error> {
if std::env::var("EXPERIMENTAL_MIGRATE_DB").is_err() {
let err_msg = concat!(
"Detected an old database format but experimental database migration is currently disabled. ",
"To enable migration, set EXPERIMENTAL_MIGRATE_DB=1, or alternatively, resync your node (safest option)."
);
warn!("{}", err_msg);
return Err(eyre::eyre!("{}", err_msg));
}
Ok(())
}
struct MigrateStaticFiles<'a, N: HlNodeType>(&'a Migrator<N>);
impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
@ -244,13 +266,12 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
let mut all_static_files = iter_static_files(&old_path)?;
let all_static_files =
all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default();
let provider = self.0.provider_factory.provider()?;
let mut first = true;
for (block_range, _tx_ranges) in all_static_files {
let migration_needed = self.using_old_header(block_range.start())? ||
self.using_old_header(block_range.end())?;
let migration_needed = self.using_old_header(block_range.start())?
|| self.using_old_header(block_range.end())?;
if !migration_needed {
// Create a placeholder symlink
self.create_placeholder(block_range)?;
@ -258,12 +279,15 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
}
if first {
check_if_migration_enabled()?;
info!("Old database detected, migrating static files...");
first = false;
}
let sf_provider = self.0.sf_provider();
let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?;
let provider = self.0.provider_factory.provider()?;
let block_range_for_filename = sf_provider.find_fixed_range(block_range.start());
migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?;
@ -320,9 +344,9 @@ fn migrate_single_static_file<N: HlNodeType>(
) -> Result<(), eyre::Error> {
info!("Migrating block range {}...", block_range);
// block_ranges into chunks of 100000 blocks
const CHUNK_SIZE: u64 = 100000;
for chunk in (0..=block_range.end()).step_by(CHUNK_SIZE as usize) {
// block_ranges into chunks of 50000 blocks
const CHUNK_SIZE: u64 = 50000;
for chunk in (block_range.start()..=block_range.end()).step_by(CHUNK_SIZE as usize) {
let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end());
let block_range = chunk..=end;
let headers = old_headers_range(sf_in, block_range.clone())?;

View File

@ -127,6 +127,19 @@ pub struct SystemTx {
pub receipt: Option<LegacyReceipt>,
}
impl SystemTx {
pub fn gas_limit(&self) -> u64 {
use reth_compat::Transaction;
match &self.tx {
Transaction::Legacy(tx) => tx.gas_limit,
Transaction::Eip2930(tx) => tx.gas_limit,
Transaction::Eip1559(tx) => tx.gas_limit,
Transaction::Eip4844(tx) => tx.gas_limit,
Transaction::Eip7702(tx) => tx.gas_limit,
}
}
}
#[derive(
Debug,
Clone,

View File

@ -117,6 +117,9 @@ impl SealedBlock {
receipts: Vec<LegacyReceipt>,
chain_id: u64,
) -> HlBlock {
// NOTE: Filter out system transactions that may be rejected by the EVM (tracked by #97, testnet only).
let system_txs: Vec<_> = system_txs.into_iter().filter(|tx| tx.gas_limit() != 0).collect();
let mut merged_txs = vec![];
merged_txs.extend(system_txs.iter().map(|tx| system_tx_to_reth_transaction(tx, chain_id)));
merged_txs.extend(self.body.transactions.iter().map(|tx| tx.to_reth_transaction()));

View File

@ -81,13 +81,13 @@ impl BlockPoller {
.await
.ok_or(eyre::eyre!("Failed to find latest block number"))?;
if let Some(debug_cutoff_height) = debug_cutoff_height &&
next_block_number > debug_cutoff_height
{
next_block_number = debug_cutoff_height;
}
loop {
if let Some(debug_cutoff_height) = debug_cutoff_height
&& next_block_number > debug_cutoff_height
{
next_block_number = debug_cutoff_height;
}
match block_source.collect_block(next_block_number).await {
Ok(block) => {
block_tx.send((next_block_number, block)).await?;

49
tests/run_tests.sh Normal file
View File

@ -0,0 +1,49 @@
#!/bin/bash
set -e
export ETH_RPC_URL="${ETH_RPC_URL:-wss://hl-archive-node.xyz}"
success() {
echo "Success: $1"
}
fail() {
echo "Failed: $1"
exit 1
}
ensure_cmd() {
command -v "$1" > /dev/null 2>&1 || fail "$1 is required"
}
ensure_cmd jq
ensure_cmd cast
ensure_cmd wscat
if [[ ! "$ETH_RPC_URL" =~ ^wss?:// ]]; then
fail "ETH_RPC_URL must be a websocket url"
fi
TITLE="Issue #78 - eth_getLogs should return system transactions"
cast logs \
--rpc-url "$ETH_RPC_URL" \
--from-block 15312567 \
--to-block 15312570 \
--address 0x9fdbda0a5e284c32744d2f17ee5c74b284993463 \
0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef \
| grep -q "0x00000000000000000000000020000000000000000000000000000000000000c5" \
&& success "$TITLE" || fail "$TITLE"
TITLE="Issue #78 - eth_getBlockByNumber should return the same logsBloom as official RPC"
OFFICIAL_RPC="https://rpc.hyperliquid.xyz/evm"
A=$(cast block 1394092 --rpc-url "$ETH_RPC_URL" -f logsBloom | md5sum)
B=$(cast block 1394092 --rpc-url "$OFFICIAL_RPC" -f logsBloom | md5sum)
echo node "$A"
echo rpc\ "$B"
[[ "$A" == "$B" ]] && success "$TITLE" || fail "$TITLE"
TITLE="eth_subscribe newHeads via wscat"
CMD='{"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params":["newHeads"]}'
wscat -w 2 -c "$ETH_RPC_URL" -x "$CMD" | tail -1 | jq -r .params.result.nonce | grep 0x \
&& success "$TITLE" || fail "$TITLE"