18 Commits

Author SHA1 Message Date
f109130f88 feat(draft): Spot meta 2025-11-05 04:07:46 +00:00
010d056aad Merge pull request #102 from hl-archive-node/fix/testnet-txs-tracking
fix: Fix testnet transaction types
2025-11-04 12:24:04 -05:00
821c63494e fix: Fix testnet transaction types 2025-11-04 17:23:31 +00:00
f915aba568 Merge pull request #100 from hl-archive-node/feat/deprecate-migrator
feat: Place migrator behind `CHECK_DB_MIGRATION` env
2025-11-01 06:23:01 -04:00
1fe03bfc41 feat: Place migrator behind CHECK_DB_MIGRATION env 2025-11-01 09:36:30 +00:00
893822e5b0 Merge pull request #98 from hl-archive-node/fix/testnet-system-tx 2025-10-26 03:39:06 -04:00
c2528ce223 fix: Support certain types of system tx 2025-10-26 06:42:14 +00:00
d46e808b8d Merge pull request #94 from hl-archive-node/fix/migrator-typo
fix(migrate): Fix wrong chunk ranges
2025-10-16 10:42:59 -04:00
497353fd2f fix(migrate): Fix wrong chunk ranges 2025-10-16 14:35:04 +00:00
eee6eeb2fc Merge pull request #93 from hl-archive-node/fix/subscriptions
fix: Prevent #89 from overriding --hl-node-compliant subscriptions
2025-10-13 01:27:19 -04:00
611e6867bf fix: Do not override --hl-node-compliant for subscription 2025-10-13 02:57:25 +00:00
6c3ed63c3c fix: Override NewHeads only 2025-10-13 02:57:05 +00:00
51924e9671 Merge pull request #91 from hl-archive-node/fix/debug-cutoff
fix: Fix --debug-cutoff-height semantics
2025-10-11 22:29:45 -04:00
8f15aa311f fix: Fix --debug-cutoff-height semantics
NOTE: This is a debug feature not on by default.

The original intention of it was limiting the highest block number. But it was instead enforcing the starting block number for fetching, leading to block progression.
2025-10-12 02:22:55 +00:00
bc66716a41 Merge pull request #89 from hl-archive-node/cleanup
fix: Convert header type for eth_subscribe
2025-10-10 23:09:33 -04:00
fc819dbba2 test: Add regression tests 2025-10-11 02:52:09 +00:00
1c5a22a814 fix: Convert header type for eth_subscribe
Due to custom header usage, only `eth_subscribe` method was returning the new header format in raw format, while other part were using RpcConvert to convert headers.

Make `eth_subscribe` newHeads to return the `inner` field (original eth header) instead.
2025-10-11 02:49:19 +00:00
852e186b1a Merge pull request #88 from hl-archive-node/hotfix
hotfix: Mark migrator experimantal
2025-10-09 04:55:53 -04:00
14 changed files with 471 additions and 105 deletions

View File

@ -19,62 +19,23 @@ use alloy_rpc_types::{
TransactionInfo, TransactionInfo,
pubsub::{Params, SubscriptionKind}, pubsub::{Params, SubscriptionKind},
}; };
use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink, proc_macros::rpc}; use jsonrpsee::{PendingSubscriptionSink, proc_macros::rpc};
use jsonrpsee_core::{RpcResult, async_trait}; use jsonrpsee_core::{RpcResult, async_trait};
use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE}; use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE};
use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner}; use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner};
use reth_primitives_traits::SignedTransaction; use reth_primitives_traits::SignedTransaction;
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider}; use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
use reth_rpc::{EthFilter, EthPubSub, RpcTypes, eth::pubsub::SubscriptionSerializeError}; use reth_rpc::{EthFilter, EthPubSub};
use reth_rpc_eth_api::{ use reth_rpc_eth_api::{
EthApiServer, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, RpcBlock, RpcConvert, RpcReceipt,
RpcConvert, RpcHeader, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq, RpcTransaction, helpers::EthBlocks, transaction::ConvertReceiptInput,
helpers::{EthBlocks, EthTransactions, LoadReceipt},
transaction::ConvertReceiptInput,
}; };
use reth_rpc_eth_types::EthApiError; use reth_rpc_eth_types::EthApiError;
use serde::Serialize;
use std::{marker::PhantomData, sync::Arc}; use std::{marker::PhantomData, sync::Arc};
use tokio_stream::{Stream, StreamExt}; use tokio_stream::StreamExt;
use tracing::{Instrument, trace}; use tracing::{Instrument, trace};
use crate::{HlBlock, node::primitives::HlPrimitives}; use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
pub trait EthWrapper:
EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
impl<T> EthWrapper for T where
T: EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
#[rpc(server, namespace = "eth")] #[rpc(server, namespace = "eth")]
#[async_trait] #[async_trait]
@ -386,6 +347,8 @@ where
pubsub.log_stream(filter).filter_map(|log| adjust_log::<Eth>(log, &provider)), pubsub.log_stream(filter).filter_map(|log| adjust_log::<Eth>(log, &provider)),
) )
.await; .await;
} else if kind == SubscriptionKind::NewHeads {
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
} else { } else {
let _ = pubsub.handle_accepted(sink, kind, params).await; let _ = pubsub.handle_accepted(sink, kind, params).await;
} }
@ -412,23 +375,6 @@ fn adjust_log<Eth: EthWrapper>(mut log: Log, provider: &Eth::Provider) -> Option
Some(log) Some(log)
} }
async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>> {
loop {
tokio::select! {
_ = sink.closed() => break Ok(()),
maybe_item = stream.next() => {
let Some(item) = maybe_item else { break Ok(()) };
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
.map_err(SubscriptionSerializeError::from)?;
if sink.send(msg).await.is_err() { break Ok(()); }
}
}
}
}
pub struct HlNodeBlockFilterHttp<Eth: EthWrapper> { pub struct HlNodeBlockFilterHttp<Eth: EthWrapper> {
eth_api: Arc<Eth>, eth_api: Arc<Eth>,
_marker: PhantomData<Eth>, _marker: PhantomData<Eth>,

View File

@ -1,3 +1,5 @@
pub mod call_forwarder; pub mod call_forwarder;
pub mod hl_node_compliance; pub mod hl_node_compliance;
pub mod tx_forwarder; pub mod tx_forwarder;
pub mod subscribe_fixup;
mod utils;

View File

@ -0,0 +1,54 @@
use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
use alloy_rpc_types::pubsub::{Params, SubscriptionKind};
use async_trait::async_trait;
use jsonrpsee::PendingSubscriptionSink;
use jsonrpsee_types::ErrorObject;
use reth::tasks::TaskSpawner;
use reth_rpc::EthPubSub;
use reth_rpc_convert::RpcTransaction;
use reth_rpc_eth_api::{EthApiTypes, EthPubSubApiServer};
use std::sync::Arc;
pub struct SubscribeFixup<Eth: EthWrapper> {
pubsub: Arc<EthPubSub<Eth>>,
provider: Arc<Eth::Provider>,
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
}
#[async_trait]
impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for SubscribeFixup<Eth>
where
ErrorObject<'static>: From<<Eth as EthApiTypes>::Error>,
{
async fn subscribe(
&self,
pending: PendingSubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let (pubsub, provider) = (self.pubsub.clone(), self.provider.clone());
self.subscription_task_spawner.spawn(Box::pin(async move {
if kind == SubscriptionKind::NewHeads {
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
} else {
let _ = pubsub.handle_accepted(sink, kind, params).await;
}
}));
Ok(())
}
}
impl<Eth: EthWrapper> SubscribeFixup<Eth> {
pub fn new(
pubsub: Arc<EthPubSub<Eth>>,
provider: Arc<Eth::Provider>,
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
) -> Self
where
Eth: EthWrapper,
ErrorObject<'static>: From<Eth::Error>,
{
Self { pubsub, provider, subscription_task_spawner }
}
}

90
src/addons/utils.rs Normal file
View File

@ -0,0 +1,90 @@
use std::sync::Arc;
use crate::{HlBlock, HlPrimitives};
use alloy_primitives::U256;
use alloy_rpc_types::Header;
use futures::StreamExt;
use jsonrpsee::{SubscriptionMessage, SubscriptionSink};
use jsonrpsee_types::ErrorObject;
use reth_primitives::SealedHeader;
use reth_provider::{BlockReader, CanonStateSubscriptions};
use reth_rpc::{RpcTypes, eth::pubsub::SubscriptionSerializeError};
use reth_rpc_convert::{RpcBlock, RpcHeader, RpcReceipt, RpcTransaction, RpcTxReq};
use reth_rpc_eth_api::{
EthApiServer, FullEthApiTypes, RpcNodeCoreExt,
helpers::{EthBlocks, EthTransactions, LoadReceipt},
};
use serde::Serialize;
use tokio_stream::Stream;
pub trait EthWrapper:
EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
impl<T> EthWrapper for T where
T: EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
pub(super) async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>> {
loop {
tokio::select! {
_ = sink.closed() => break Ok(()),
maybe_item = stream.next() => {
let Some(item) = maybe_item else { break Ok(()) };
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
.map_err(SubscriptionSerializeError::from)?;
if sink.send(msg).await.is_err() { break Ok(()); }
}
}
}
}
pub(super) fn new_headers_stream<Eth: EthWrapper>(
provider: &Arc<Eth::Provider>,
) -> impl Stream<Item = Header<alloy_consensus::Header>> {
provider.canonical_state_stream().flat_map(|new_chain| {
let headers = new_chain
.committed()
.blocks_iter()
.map(|block| {
Header::from_consensus(
SealedHeader::new(block.header().inner.clone(), block.hash()).into(),
None,
Some(U256::from(block.rlp_length())),
)
})
.collect::<Vec<_>>();
futures::stream::iter(headers)
})
}

View File

@ -1,12 +1,16 @@
use std::sync::Arc; use std::sync::Arc;
use clap::Parser; use clap::Parser;
use reth::builder::{NodeBuilder, NodeHandle, WithLaunchContext}; use reth::{
builder::{NodeBuilder, NodeHandle, WithLaunchContext},
rpc::{api::EthPubSubApiServer, eth::RpcNodeCore},
};
use reth_db::DatabaseEnv; use reth_db::DatabaseEnv;
use reth_hl::{ use reth_hl::{
addons::{ addons::{
call_forwarder::{self, CallForwarderApiServer}, call_forwarder::{self, CallForwarderApiServer},
hl_node_compliance::install_hl_node_compliance, hl_node_compliance::install_hl_node_compliance,
subscribe_fixup::SubscribeFixup,
tx_forwarder::{self, EthForwarderApiServer}, tx_forwarder::{self, EthForwarderApiServer},
}, },
chainspec::{HlChainSpec, parser::HlChainSpecParser}, chainspec::{HlChainSpec, parser::HlChainSpecParser},
@ -14,7 +18,9 @@ use reth_hl::{
HlNode, HlNode,
cli::{Cli, HlNodeArgs}, cli::{Cli, HlNodeArgs},
rpc::precompile::{HlBlockPrecompileApiServer, HlBlockPrecompileExt}, rpc::precompile::{HlBlockPrecompileApiServer, HlBlockPrecompileExt},
spot_meta::init as spot_meta_init,
storage::tables::Tables, storage::tables::Tables,
types::set_spot_metadata_db,
}, },
}; };
use tracing::info; use tracing::info;
@ -59,6 +65,17 @@ fn main() -> eyre::Result<()> {
info!("Call/gas estimation will be forwarded to {}", upstream_rpc_url); info!("Call/gas estimation will be forwarded to {}", upstream_rpc_url);
} }
// This is a temporary workaround to fix the issue with custom headers
// affects `eth_subscribe[type=newHeads]`
ctx.modules.replace_configured(
SubscribeFixup::new(
Arc::new(ctx.registry.eth_handlers().pubsub.clone()),
Arc::new(ctx.registry.eth_api().provider().clone()),
Box::new(ctx.node().task_executor.clone()),
)
.into_rpc(),
)?;
if ext.hl_node_compliant { if ext.hl_node_compliant {
install_hl_node_compliance(&mut ctx)?; install_hl_node_compliance(&mut ctx)?;
info!("hl-node compliant mode enabled"); info!("hl-node compliant mode enabled");
@ -77,6 +94,16 @@ fn main() -> eyre::Result<()> {
}) })
.apply(|mut builder| { .apply(|mut builder| {
builder.db_mut().create_tables_for::<Tables>().expect("create tables"); builder.db_mut().create_tables_for::<Tables>().expect("create tables");
let chain_id = builder.config().chain.inner.chain().id();
let db = builder.db_mut().clone();
// Set database handle for on-demand persistence
set_spot_metadata_db(db.clone());
// Load spot metadata from database and initialize cache
spot_meta_init::load_spot_metadata_cache(&db, chain_id);
builder builder
}) })
.launch() .launch()

View File

@ -1,7 +1,11 @@
use crate::{ use crate::{
chainspec::{HlChainSpec, parser::HlChainSpecParser}, chainspec::{HlChainSpec, parser::HlChainSpecParser},
node::{ node::{
HlNode, consensus::HlConsensus, evm::config::HlEvmConfig, migrate::Migrator, HlNode,
consensus::HlConsensus,
evm::config::HlEvmConfig,
migrate::Migrator,
spot_meta::init as spot_meta_init,
storage::tables::Tables, storage::tables::Tables,
}, },
pseudo_peer::BlockSourceArgs, pseudo_peer::BlockSourceArgs,
@ -20,10 +24,7 @@ use reth_cli::chainspec::ChainSpecParser;
use reth_cli_commands::{common::EnvironmentArgs, launcher::FnLauncher}; use reth_cli_commands::{common::EnvironmentArgs, launcher::FnLauncher};
use reth_db::{DatabaseEnv, init_db, mdbx::init_db_for}; use reth_db::{DatabaseEnv, init_db, mdbx::init_db_for};
use reth_tracing::FileWorkerGuard; use reth_tracing::FileWorkerGuard;
use std::{ use std::{fmt::{self}, sync::Arc};
fmt::{self},
sync::Arc,
};
use tracing::info; use tracing::info;
macro_rules! not_applicable { macro_rules! not_applicable {
@ -145,8 +146,12 @@ where
match self.command { match self.command {
Commands::Node(command) => runner.run_command_until_exit(|ctx| { Commands::Node(command) => runner.run_command_until_exit(|ctx| {
Self::migrate_db(&command.chain, &command.datadir, &command.db) // NOTE: This is for one time migration around Oct 10 upgrade:
.expect("Failed to migrate database"); // It's not necessary anymore, an environment variable gate is added here.
if std::env::var("CHECK_DB_MIGRATION").is_ok() {
Self::migrate_db(&command.chain, &command.datadir, &command.db)
.expect("Failed to migrate database");
}
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher)) command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
}), }),
Commands::Init(command) => { Commands::Init(command) => {
@ -190,7 +195,12 @@ where
let data_dir = env.datadir.clone().resolve_datadir(env.chain.chain()); let data_dir = env.datadir.clone().resolve_datadir(env.chain.chain());
let db_path = data_dir.db(); let db_path = data_dir.db();
init_db(db_path.clone(), env.db.database_args())?; init_db(db_path.clone(), env.db.database_args())?;
init_db_for::<_, Tables>(db_path, env.db.database_args())?; init_db_for::<_, Tables>(db_path.clone(), env.db.database_args())?;
// Initialize spot metadata in database
let chain_id = env.chain.chain().id();
spot_meta_init::init_spot_metadata(db_path, env.db.database_args(), chain_id)?;
Ok(()) Ok(())
} }

View File

@ -346,7 +346,7 @@ fn migrate_single_static_file<N: HlNodeType>(
// block_ranges into chunks of 50000 blocks // block_ranges into chunks of 50000 blocks
const CHUNK_SIZE: u64 = 50000; const CHUNK_SIZE: u64 = 50000;
for chunk in (0..=block_range.end()).step_by(CHUNK_SIZE as usize) { for chunk in (block_range.start()..=block_range.end()).step_by(CHUNK_SIZE as usize) {
let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end()); let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end());
let block_range = chunk..=end; let block_range = chunk..=end;
let headers = old_headers_range(sf_in, block_range.clone())?; let headers = old_headers_range(sf_in, block_range.clone())?;

106
src/node/spot_meta/init.rs Normal file
View File

@ -0,0 +1,106 @@
use crate::node::{
spot_meta::{SpotId, erc20_contract_to_spot_token},
storage::tables::{self, SPOT_METADATA_KEY},
types::reth_compat,
};
use alloy_primitives::{Address, Bytes};
use reth_db::{DatabaseEnv, cursor::{DbCursorRO, DbCursorRW}};
use reth_db_api::{Database, transaction::{DbTx, DbTxMut}};
use std::{collections::BTreeMap, sync::Arc};
use tracing::info;
/// Load spot metadata from database and initialize cache
pub fn load_spot_metadata_cache(db: &Arc<DatabaseEnv>, chain_id: u64) {
// Try to read from database
let data = match db.view(|tx| -> Result<Option<Vec<u8>>, reth_db::DatabaseError> {
let mut cursor = tx.cursor_read::<tables::SpotMetadata>()?;
Ok(cursor.seek_exact(SPOT_METADATA_KEY)?.map(|(_, data)| data.to_vec()))
}) {
Ok(Ok(data)) => data,
Ok(Err(e)) => {
info!("Failed to read spot metadata from database: {}. Will fetch on-demand from API.", e);
return;
}
Err(e) => {
info!("Database view error while loading spot metadata: {}. Will fetch on-demand from API.", e);
return;
}
};
// Check if data exists
let Some(data) = data else {
info!(
"No spot metadata found in database for chain {}. Run 'init-state' to populate, or it will be fetched on-demand from API.",
chain_id
);
return;
};
// Deserialize metadata
let serializable_map = match rmp_serde::from_slice::<BTreeMap<Address, u64>>(&data) {
Ok(map) => map,
Err(e) => {
info!("Failed to deserialize spot metadata: {}. Will fetch on-demand from API.", e);
return;
}
};
// Convert and initialize cache
let metadata: BTreeMap<Address, SpotId> = serializable_map
.into_iter()
.map(|(addr, index)| (addr, SpotId { index }))
.collect();
info!("Loaded spot metadata from database ({} entries)", metadata.len());
reth_compat::initialize_spot_metadata_cache(metadata);
}
/// Initialize spot metadata in database from API
pub fn init_spot_metadata(
db_path: impl AsRef<std::path::Path>,
db_args: reth_db::mdbx::DatabaseArguments,
chain_id: u64,
) -> eyre::Result<()> {
info!("Initializing spot metadata for chain {}", chain_id);
let db = Arc::new(reth_db::open_db(db_path.as_ref(), db_args)?);
// Check if spot metadata already exists
let exists = db.view(|tx| -> Result<bool, reth_db::DatabaseError> {
let mut cursor = tx.cursor_read::<tables::SpotMetadata>()?;
Ok(cursor.seek_exact(SPOT_METADATA_KEY)?.is_some())
})??;
if exists {
info!("Spot metadata already exists in database");
return Ok(());
}
// Fetch from API
let metadata = match erc20_contract_to_spot_token(chain_id) {
Ok(m) => m,
Err(e) => {
info!("Failed to fetch spot metadata from API: {}. Will be fetched on-demand.", e);
return Ok(());
}
};
// Serialize and store
let serializable_map: BTreeMap<Address, u64> =
metadata.iter().map(|(addr, spot)| (*addr, spot.index)).collect();
db.update(|tx| -> Result<(), reth_db::DatabaseError> {
let mut cursor = tx.cursor_write::<tables::SpotMetadata>()?;
cursor.upsert(
SPOT_METADATA_KEY,
&Bytes::from(
rmp_serde::to_vec(&serializable_map)
.expect("Failed to serialize spot metadata"),
),
)?;
Ok(())
})??;
info!("Successfully fetched and stored spot metadata for chain {}", chain_id);
Ok(())
}

View File

@ -5,6 +5,7 @@ use std::collections::BTreeMap;
use crate::chainspec::{MAINNET_CHAIN_ID, TESTNET_CHAIN_ID}; use crate::chainspec::{MAINNET_CHAIN_ID, TESTNET_CHAIN_ID};
pub mod init;
mod patch; mod patch;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -25,7 +26,7 @@ pub struct SpotMeta {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct SpotId { pub struct SpotId {
pub index: u64, pub index: u64,
} }

View File

@ -2,10 +2,21 @@ use alloy_primitives::{BlockNumber, Bytes};
use reth_db::{TableSet, TableType, TableViewer, table::TableInfo, tables}; use reth_db::{TableSet, TableType, TableViewer, table::TableInfo, tables};
use std::fmt; use std::fmt;
/// Static key used for spot metadata, as the database is unique to each chain.
/// This may later serve as a versioning key to assist with future database migrations.
pub const SPOT_METADATA_KEY: u64 = 0;
tables! { tables! {
/// Read precompile calls for each block. /// Read precompile calls for each block.
table BlockReadPrecompileCalls { table BlockReadPrecompileCalls {
type Key = BlockNumber; type Key = BlockNumber;
type Value = Bytes; type Value = Bytes;
} }
/// Spot metadata mapping (EVM address to spot token index).
/// Uses a constant key since the database is chain-specific.
table SpotMetadata {
type Key = u64;
type Value = Bytes;
}
} }

View File

@ -19,6 +19,9 @@ pub struct ReadPrecompileCalls(pub Vec<ReadPrecompileCall>);
pub(crate) mod reth_compat; pub(crate) mod reth_compat;
// Re-export spot metadata functions
pub use reth_compat::{initialize_spot_metadata_cache, set_spot_metadata_db};
#[derive(Debug, Clone, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct HlExtras { pub struct HlExtras {
pub read_precompile_calls: Option<ReadPrecompileCalls>, pub read_precompile_calls: Option<ReadPrecompileCalls>,
@ -127,6 +130,19 @@ pub struct SystemTx {
pub receipt: Option<LegacyReceipt>, pub receipt: Option<LegacyReceipt>,
} }
impl SystemTx {
pub fn gas_limit(&self) -> u64 {
use reth_compat::Transaction;
match &self.tx {
Transaction::Legacy(tx) => tx.gas_limit,
Transaction::Eip2930(tx) => tx.gas_limit,
Transaction::Eip1559(tx) => tx.gas_limit,
Transaction::Eip4844(tx) => tx.gas_limit,
Transaction::Eip7702(tx) => tx.gas_limit,
}
}
}
#[derive( #[derive(
Debug, Debug,
Clone, Clone,

View File

@ -1,11 +1,15 @@
//! Copy of reth codebase to preserve serialization compatibility //! Copy of reth codebase to preserve serialization compatibility
use crate::chainspec::TESTNET_CHAIN_ID;
use crate::node::storage::tables::{SPOT_METADATA_KEY, SpotMetadata};
use alloy_consensus::{Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy}; use alloy_consensus::{Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy};
use alloy_primitives::{Address, BlockHash, Signature, TxKind, U256}; use alloy_primitives::{Address, BlockHash, Bytes, Signature, TxKind, U256};
use reth_db::cursor::DbCursorRW;
use reth_db_api::{Database, transaction::DbTxMut};
use reth_primitives::TransactionSigned as RethTxSigned; use reth_primitives::TransactionSigned as RethTxSigned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
collections::BTreeMap, collections::BTreeMap,
sync::{Arc, LazyLock, RwLock}, sync::{Arc, LazyLock, Mutex, RwLock},
}; };
use tracing::info; use tracing::info;
@ -81,42 +85,92 @@ pub struct SealedBlock {
pub body: BlockBody, pub body: BlockBody,
} }
fn system_tx_to_reth_transaction(transaction: &SystemTx, chain_id: u64) -> TxSigned { static EVM_MAP: LazyLock<Arc<RwLock<BTreeMap<Address, SpotId>>>> =
static EVM_MAP: LazyLock<Arc<RwLock<BTreeMap<Address, SpotId>>>> = LazyLock::new(|| Arc::new(RwLock::new(BTreeMap::new())));
LazyLock::new(|| Arc::new(RwLock::new(BTreeMap::new())));
{
let Transaction::Legacy(tx) = &transaction.tx else {
panic!("Unexpected transaction type");
};
let TxKind::Call(to) = tx.to else {
panic!("Unexpected contract creation");
};
let s = if tx.input.is_empty() {
U256::from(0x1)
} else {
loop {
if let Some(spot) = EVM_MAP.read().unwrap().get(&to) {
break spot.to_s();
}
info!("Contract not found: {to:?} from spot mapping, fetching again..."); // Optional database handle for persisting on-demand fetches
*EVM_MAP.write().unwrap() = erc20_contract_to_spot_token(chain_id).unwrap(); static DB_HANDLE: LazyLock<Mutex<Option<Arc<reth_db::DatabaseEnv>>>> =
} LazyLock::new(|| Mutex::new(None));
};
let signature = Signature::new(U256::from(0x1), s, true); /// Set the database handle for persisting spot metadata
TxSigned::Default(RethTxSigned::Legacy(Signed::new_unhashed(tx.clone(), signature))) pub fn set_spot_metadata_db(db: Arc<reth_db::DatabaseEnv>) {
*DB_HANDLE.lock().unwrap() = Some(db);
}
/// Initialize the spot metadata cache with data loaded from database.
/// This should be called during node initialization.
pub fn initialize_spot_metadata_cache(metadata: BTreeMap<Address, SpotId>) {
*EVM_MAP.write().unwrap() = metadata;
}
/// Persist spot metadata to database if handle is available
fn persist_spot_metadata_to_db(metadata: &BTreeMap<Address, SpotId>) {
if let Some(db) = DB_HANDLE.lock().unwrap().as_ref() {
let result = db.update(|tx| -> Result<(), reth_db::DatabaseError> {
let mut cursor = tx.cursor_write::<SpotMetadata>()?;
// Serialize to BTreeMap<Address, u64>
let serializable_map: BTreeMap<Address, u64> =
metadata.iter().map(|(addr, spot)| (*addr, spot.index)).collect();
cursor.upsert(
SPOT_METADATA_KEY,
&Bytes::from(
rmp_serde::to_vec(&serializable_map)
.expect("Failed to serialize spot metadata"),
),
)?;
Ok(())
});
match result {
Ok(_) => info!("Persisted spot metadata to database"),
Err(e) => info!("Failed to persist spot metadata to database: {}", e),
}
} }
} }
fn system_tx_to_reth_transaction(transaction: &SystemTx, chain_id: u64) -> TxSigned {
let Transaction::Legacy(tx) = &transaction.tx else {
panic!("Unexpected transaction type");
};
let TxKind::Call(to) = tx.to else {
panic!("Unexpected contract creation");
};
let s = if tx.input.is_empty() {
U256::from(0x1)
} else {
loop {
if let Some(spot) = EVM_MAP.read().unwrap().get(&to) {
break spot.to_s();
}
// Cache miss - fetch from API, update cache, and persist to database
info!("Contract not found: {to:?} from spot mapping, fetching from API...");
let metadata = erc20_contract_to_spot_token(chain_id).unwrap();
*EVM_MAP.write().unwrap() = metadata.clone();
persist_spot_metadata_to_db(&metadata);
}
};
let signature = Signature::new(U256::from(0x1), s, true);
TxSigned::Default(RethTxSigned::Legacy(Signed::new_unhashed(tx.clone(), signature)))
}
impl SealedBlock { impl SealedBlock {
pub fn to_reth_block( pub fn to_reth_block(
&self, &self,
read_precompile_calls: ReadPrecompileCalls, read_precompile_calls: ReadPrecompileCalls,
highest_precompile_address: Option<Address>, highest_precompile_address: Option<Address>,
system_txs: Vec<super::SystemTx>, mut system_txs: Vec<super::SystemTx>,
receipts: Vec<LegacyReceipt>, receipts: Vec<LegacyReceipt>,
chain_id: u64, chain_id: u64,
) -> HlBlock { ) -> HlBlock {
// NOTE: Filter out system transactions that may be rejected by the EVM (tracked by #97,
// testnet only).
if chain_id == TESTNET_CHAIN_ID {
system_txs = system_txs.into_iter().filter(|tx| tx.receipt.is_some()).collect();
}
let mut merged_txs = vec![]; let mut merged_txs = vec![];
merged_txs.extend(system_txs.iter().map(|tx| system_tx_to_reth_transaction(tx, chain_id))); merged_txs.extend(system_txs.iter().map(|tx| system_tx_to_reth_transaction(tx, chain_id)));
merged_txs.extend(self.body.transactions.iter().map(|tx| tx.to_reth_transaction())); merged_txs.extend(self.body.transactions.iter().map(|tx| tx.to_reth_transaction()));

View File

@ -81,13 +81,13 @@ impl BlockPoller {
.await .await
.ok_or(eyre::eyre!("Failed to find latest block number"))?; .ok_or(eyre::eyre!("Failed to find latest block number"))?;
if let Some(debug_cutoff_height) = debug_cutoff_height &&
next_block_number > debug_cutoff_height
{
next_block_number = debug_cutoff_height;
}
loop { loop {
if let Some(debug_cutoff_height) = debug_cutoff_height
&& next_block_number > debug_cutoff_height
{
next_block_number = debug_cutoff_height;
}
match block_source.collect_block(next_block_number).await { match block_source.collect_block(next_block_number).await {
Ok(block) => { Ok(block) => {
block_tx.send((next_block_number, block)).await?; block_tx.send((next_block_number, block)).await?;

49
tests/run_tests.sh Normal file
View File

@ -0,0 +1,49 @@
#!/bin/bash
set -e
export ETH_RPC_URL="${ETH_RPC_URL:-wss://hl-archive-node.xyz}"
success() {
echo "Success: $1"
}
fail() {
echo "Failed: $1"
exit 1
}
ensure_cmd() {
command -v "$1" > /dev/null 2>&1 || fail "$1 is required"
}
ensure_cmd jq
ensure_cmd cast
ensure_cmd wscat
if [[ ! "$ETH_RPC_URL" =~ ^wss?:// ]]; then
fail "ETH_RPC_URL must be a websocket url"
fi
TITLE="Issue #78 - eth_getLogs should return system transactions"
cast logs \
--rpc-url "$ETH_RPC_URL" \
--from-block 15312567 \
--to-block 15312570 \
--address 0x9fdbda0a5e284c32744d2f17ee5c74b284993463 \
0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef \
| grep -q "0x00000000000000000000000020000000000000000000000000000000000000c5" \
&& success "$TITLE" || fail "$TITLE"
TITLE="Issue #78 - eth_getBlockByNumber should return the same logsBloom as official RPC"
OFFICIAL_RPC="https://rpc.hyperliquid.xyz/evm"
A=$(cast block 1394092 --rpc-url "$ETH_RPC_URL" -f logsBloom | md5sum)
B=$(cast block 1394092 --rpc-url "$OFFICIAL_RPC" -f logsBloom | md5sum)
echo node "$A"
echo rpc\ "$B"
[[ "$A" == "$B" ]] && success "$TITLE" || fail "$TITLE"
TITLE="eth_subscribe newHeads via wscat"
CMD='{"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params":["newHeads"]}'
wscat -w 2 -c "$ETH_RPC_URL" -x "$CMD" | tail -1 | jq -r .params.result.nonce | grep 0x \
&& success "$TITLE" || fail "$TITLE"