31 Commits

Author SHA1 Message Date
524db5af01 Merge pull request #105 from hl-archive-node/feat/cache-spot-meta
feat: cache spot metadata in database to reduce API calls
2025-11-05 02:56:06 -05:00
98cc4ce30b feat: cache spot metadata in database to reduce API calls
Implements persistent caching of ERC20 contract address to spot token ID
mappings in the database to minimize API requests and improve performance.

Changes:
- Add SpotMetadata database table for persistent storage
- Implement load_spot_metadata_cache() to initialize cache on startup
- Add init_spot_metadata() for init-state command to pre-populate cache
- Extract store_spot_metadata() helper to DRY serialization logic
- Enable on-demand API fetches with automatic database persistence
- Integrate cache loading in main node startup flow

The cache falls back to on-demand API fetches if database is empty,
with automatic persistence of fetched data for future use.
2025-11-05 07:48:42 +00:00
95c653cf14 chore: fmt 2025-11-05 07:46:22 +00:00
cb73aa7dd4 Merge pull request #104 from hl-archive-node/chore/discovery-local-only
feat: Default to localhost-only network, add --allow-network-overrides
2025-11-05 02:44:54 -05:00
2a118cdacd feat: Default to localhost-only network, add --allow-network-overrides
Network now defaults to localhost-only (local discovery/listener, no DNS/NAT).
Use --allow-network-overrides flag to restore CLI-based network configuration.
2025-11-05 07:38:24 +00:00
ff272fcfb3 Merge pull request #103 from hl-archive-node/chore/tx-spec
fix: Adjust transaction parser
2025-11-05 02:08:52 -05:00
387acd1024 fix: Adjust transaction parser based on observation on all blocks in both networks
Tracked by #97
2025-11-05 07:00:41 +00:00
010d056aad Merge pull request #102 from hl-archive-node/fix/testnet-txs-tracking
fix: Fix testnet transaction types
2025-11-04 12:24:04 -05:00
821c63494e fix: Fix testnet transaction types 2025-11-04 17:23:31 +00:00
f915aba568 Merge pull request #100 from hl-archive-node/feat/deprecate-migrator
feat: Place migrator behind `CHECK_DB_MIGRATION` env
2025-11-01 06:23:01 -04:00
1fe03bfc41 feat: Place migrator behind CHECK_DB_MIGRATION env 2025-11-01 09:36:30 +00:00
893822e5b0 Merge pull request #98 from hl-archive-node/fix/testnet-system-tx 2025-10-26 03:39:06 -04:00
c2528ce223 fix: Support certain types of system tx 2025-10-26 06:42:14 +00:00
d46e808b8d Merge pull request #94 from hl-archive-node/fix/migrator-typo
fix(migrate): Fix wrong chunk ranges
2025-10-16 10:42:59 -04:00
497353fd2f fix(migrate): Fix wrong chunk ranges 2025-10-16 14:35:04 +00:00
eee6eeb2fc Merge pull request #93 from hl-archive-node/fix/subscriptions
fix: Prevent #89 from overriding --hl-node-compliant subscriptions
2025-10-13 01:27:19 -04:00
611e6867bf fix: Do not override --hl-node-compliant for subscription 2025-10-13 02:57:25 +00:00
6c3ed63c3c fix: Override NewHeads only 2025-10-13 02:57:05 +00:00
51924e9671 Merge pull request #91 from hl-archive-node/fix/debug-cutoff
fix: Fix --debug-cutoff-height semantics
2025-10-11 22:29:45 -04:00
8f15aa311f fix: Fix --debug-cutoff-height semantics
NOTE: This is a debug feature not on by default.

The original intention of it was limiting the highest block number. But it was instead enforcing the starting block number for fetching, leading to block progression.
2025-10-12 02:22:55 +00:00
bc66716a41 Merge pull request #89 from hl-archive-node/cleanup
fix: Convert header type for eth_subscribe
2025-10-10 23:09:33 -04:00
fc819dbba2 test: Add regression tests 2025-10-11 02:52:09 +00:00
1c5a22a814 fix: Convert header type for eth_subscribe
Due to custom header usage, only `eth_subscribe` method was returning the new header format in raw format, while other part were using RpcConvert to convert headers.

Make `eth_subscribe` newHeads to return the `inner` field (original eth header) instead.
2025-10-11 02:49:19 +00:00
852e186b1a Merge pull request #88 from hl-archive-node/hotfix
hotfix: Mark migrator experimantal
2025-10-09 04:55:53 -04:00
f83326059f chore: clippy 2025-10-09 08:55:40 +00:00
ca8c374116 feat: Mark migrator as experimental 2025-10-09 08:49:29 +00:00
5ba12a4850 perf: adjust chunk size, do not hold tx too long 2025-10-09 08:20:22 +00:00
8a179a6d9e perf: Use smaller chunks 2025-10-09 08:13:53 +00:00
d570cf3e8d fix: Create directory before migration 2025-10-09 08:13:45 +00:00
0e49e65068 Merge pull request #86 from hl-archive-node/breaking/hl-header
feat(breaking): Use custom header format (HlHeader)
2025-10-09 02:51:09 -04:00
13b63ff136 feat: add migrator for mdbx as well 2025-10-09 06:35:56 +00:00
24 changed files with 716 additions and 196 deletions

View File

@ -19,62 +19,23 @@ use alloy_rpc_types::{
TransactionInfo, TransactionInfo,
pubsub::{Params, SubscriptionKind}, pubsub::{Params, SubscriptionKind},
}; };
use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink, proc_macros::rpc}; use jsonrpsee::{PendingSubscriptionSink, proc_macros::rpc};
use jsonrpsee_core::{RpcResult, async_trait}; use jsonrpsee_core::{RpcResult, async_trait};
use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE}; use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE};
use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner}; use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner};
use reth_primitives_traits::SignedTransaction; use reth_primitives_traits::SignedTransaction;
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider}; use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
use reth_rpc::{EthFilter, EthPubSub, RpcTypes, eth::pubsub::SubscriptionSerializeError}; use reth_rpc::{EthFilter, EthPubSub};
use reth_rpc_eth_api::{ use reth_rpc_eth_api::{
EthApiServer, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, RpcBlock, RpcConvert, RpcReceipt,
RpcConvert, RpcHeader, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq, RpcTransaction, helpers::EthBlocks, transaction::ConvertReceiptInput,
helpers::{EthBlocks, EthTransactions, LoadReceipt},
transaction::ConvertReceiptInput,
}; };
use reth_rpc_eth_types::EthApiError; use reth_rpc_eth_types::EthApiError;
use serde::Serialize;
use std::{marker::PhantomData, sync::Arc}; use std::{marker::PhantomData, sync::Arc};
use tokio_stream::{Stream, StreamExt}; use tokio_stream::StreamExt;
use tracing::{Instrument, trace}; use tracing::{Instrument, trace};
use crate::{HlBlock, node::primitives::HlPrimitives}; use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
pub trait EthWrapper:
EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
impl<T> EthWrapper for T where
T: EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
#[rpc(server, namespace = "eth")] #[rpc(server, namespace = "eth")]
#[async_trait] #[async_trait]
@ -386,6 +347,8 @@ where
pubsub.log_stream(filter).filter_map(|log| adjust_log::<Eth>(log, &provider)), pubsub.log_stream(filter).filter_map(|log| adjust_log::<Eth>(log, &provider)),
) )
.await; .await;
} else if kind == SubscriptionKind::NewHeads {
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
} else { } else {
let _ = pubsub.handle_accepted(sink, kind, params).await; let _ = pubsub.handle_accepted(sink, kind, params).await;
} }
@ -412,23 +375,6 @@ fn adjust_log<Eth: EthWrapper>(mut log: Log, provider: &Eth::Provider) -> Option
Some(log) Some(log)
} }
async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>> {
loop {
tokio::select! {
_ = sink.closed() => break Ok(()),
maybe_item = stream.next() => {
let Some(item) = maybe_item else { break Ok(()) };
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
.map_err(SubscriptionSerializeError::from)?;
if sink.send(msg).await.is_err() { break Ok(()); }
}
}
}
}
pub struct HlNodeBlockFilterHttp<Eth: EthWrapper> { pub struct HlNodeBlockFilterHttp<Eth: EthWrapper> {
eth_api: Arc<Eth>, eth_api: Arc<Eth>,
_marker: PhantomData<Eth>, _marker: PhantomData<Eth>,

View File

@ -1,3 +1,5 @@
pub mod call_forwarder; pub mod call_forwarder;
pub mod hl_node_compliance; pub mod hl_node_compliance;
pub mod subscribe_fixup;
pub mod tx_forwarder; pub mod tx_forwarder;
mod utils;

View File

@ -0,0 +1,54 @@
use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
use alloy_rpc_types::pubsub::{Params, SubscriptionKind};
use async_trait::async_trait;
use jsonrpsee::PendingSubscriptionSink;
use jsonrpsee_types::ErrorObject;
use reth::tasks::TaskSpawner;
use reth_rpc::EthPubSub;
use reth_rpc_convert::RpcTransaction;
use reth_rpc_eth_api::{EthApiTypes, EthPubSubApiServer};
use std::sync::Arc;
pub struct SubscribeFixup<Eth: EthWrapper> {
pubsub: Arc<EthPubSub<Eth>>,
provider: Arc<Eth::Provider>,
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
}
#[async_trait]
impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for SubscribeFixup<Eth>
where
ErrorObject<'static>: From<<Eth as EthApiTypes>::Error>,
{
async fn subscribe(
&self,
pending: PendingSubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let (pubsub, provider) = (self.pubsub.clone(), self.provider.clone());
self.subscription_task_spawner.spawn(Box::pin(async move {
if kind == SubscriptionKind::NewHeads {
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
} else {
let _ = pubsub.handle_accepted(sink, kind, params).await;
}
}));
Ok(())
}
}
impl<Eth: EthWrapper> SubscribeFixup<Eth> {
pub fn new(
pubsub: Arc<EthPubSub<Eth>>,
provider: Arc<Eth::Provider>,
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
) -> Self
where
Eth: EthWrapper,
ErrorObject<'static>: From<Eth::Error>,
{
Self { pubsub, provider, subscription_task_spawner }
}
}

90
src/addons/utils.rs Normal file
View File

@ -0,0 +1,90 @@
use std::sync::Arc;
use crate::{HlBlock, HlPrimitives};
use alloy_primitives::U256;
use alloy_rpc_types::Header;
use futures::StreamExt;
use jsonrpsee::{SubscriptionMessage, SubscriptionSink};
use jsonrpsee_types::ErrorObject;
use reth_primitives::SealedHeader;
use reth_provider::{BlockReader, CanonStateSubscriptions};
use reth_rpc::{RpcTypes, eth::pubsub::SubscriptionSerializeError};
use reth_rpc_convert::{RpcBlock, RpcHeader, RpcReceipt, RpcTransaction, RpcTxReq};
use reth_rpc_eth_api::{
EthApiServer, FullEthApiTypes, RpcNodeCoreExt,
helpers::{EthBlocks, EthTransactions, LoadReceipt},
};
use serde::Serialize;
use tokio_stream::Stream;
pub trait EthWrapper:
EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
impl<T> EthWrapper for T where
T: EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
pub(super) async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>> {
loop {
tokio::select! {
_ = sink.closed() => break Ok(()),
maybe_item = stream.next() => {
let Some(item) = maybe_item else { break Ok(()) };
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
.map_err(SubscriptionSerializeError::from)?;
if sink.send(msg).await.is_err() { break Ok(()); }
}
}
}
}
pub(super) fn new_headers_stream<Eth: EthWrapper>(
provider: &Arc<Eth::Provider>,
) -> impl Stream<Item = Header<alloy_consensus::Header>> {
provider.canonical_state_stream().flat_map(|new_chain| {
let headers = new_chain
.committed()
.blocks_iter()
.map(|block| {
Header::from_consensus(
SealedHeader::new(block.header().inner.clone(), block.hash()).into(),
None,
Some(U256::from(block.rlp_length())),
)
})
.collect::<Vec<_>>();
futures::stream::iter(headers)
})
}

View File

@ -1,7 +1,10 @@
pub mod hl; pub mod hl;
pub mod parser; pub mod parser;
use crate::{hardforks::HlHardforks, node::primitives::{header::HlHeaderExtras, HlHeader}}; use crate::{
hardforks::HlHardforks,
node::primitives::{HlHeader, header::HlHeaderExtras},
};
use alloy_eips::eip7840::BlobParams; use alloy_eips::eip7840::BlobParams;
use alloy_genesis::Genesis; use alloy_genesis::Genesis;
use alloy_primitives::{Address, B256, U256}; use alloy_primitives::{Address, B256, U256};

View File

@ -1,12 +1,16 @@
use std::sync::Arc; use std::sync::Arc;
use clap::Parser; use clap::Parser;
use reth::builder::{NodeBuilder, NodeHandle, WithLaunchContext}; use reth::{
builder::{NodeBuilder, NodeHandle, WithLaunchContext},
rpc::{api::EthPubSubApiServer, eth::RpcNodeCore},
};
use reth_db::DatabaseEnv; use reth_db::DatabaseEnv;
use reth_hl::{ use reth_hl::{
addons::{ addons::{
call_forwarder::{self, CallForwarderApiServer}, call_forwarder::{self, CallForwarderApiServer},
hl_node_compliance::install_hl_node_compliance, hl_node_compliance::install_hl_node_compliance,
subscribe_fixup::SubscribeFixup,
tx_forwarder::{self, EthForwarderApiServer}, tx_forwarder::{self, EthForwarderApiServer},
}, },
chainspec::{HlChainSpec, parser::HlChainSpecParser}, chainspec::{HlChainSpec, parser::HlChainSpecParser},
@ -14,7 +18,9 @@ use reth_hl::{
HlNode, HlNode,
cli::{Cli, HlNodeArgs}, cli::{Cli, HlNodeArgs},
rpc::precompile::{HlBlockPrecompileApiServer, HlBlockPrecompileExt}, rpc::precompile::{HlBlockPrecompileApiServer, HlBlockPrecompileExt},
spot_meta::init as spot_meta_init,
storage::tables::Tables, storage::tables::Tables,
types::set_spot_metadata_db,
}, },
}; };
use tracing::info; use tracing::info;
@ -35,8 +41,11 @@ fn main() -> eyre::Result<()> {
ext: HlNodeArgs| async move { ext: HlNodeArgs| async move {
let default_upstream_rpc_url = builder.config().chain.official_rpc_url(); let default_upstream_rpc_url = builder.config().chain.official_rpc_url();
let (node, engine_handle_tx) = let (node, engine_handle_tx) = HlNode::new(
HlNode::new(ext.block_source_args.parse().await?, ext.debug_cutoff_height); ext.block_source_args.parse().await?,
ext.debug_cutoff_height,
ext.allow_network_overrides,
);
let NodeHandle { node, node_exit_future: exit_future } = builder let NodeHandle { node, node_exit_future: exit_future } = builder
.node(node) .node(node)
.extend_rpc_modules(move |mut ctx| { .extend_rpc_modules(move |mut ctx| {
@ -59,6 +68,17 @@ fn main() -> eyre::Result<()> {
info!("Call/gas estimation will be forwarded to {}", upstream_rpc_url); info!("Call/gas estimation will be forwarded to {}", upstream_rpc_url);
} }
// This is a temporary workaround to fix the issue with custom headers
// affects `eth_subscribe[type=newHeads]`
ctx.modules.replace_configured(
SubscribeFixup::new(
Arc::new(ctx.registry.eth_handlers().pubsub.clone()),
Arc::new(ctx.registry.eth_api().provider().clone()),
Box::new(ctx.node().task_executor.clone()),
)
.into_rpc(),
)?;
if ext.hl_node_compliant { if ext.hl_node_compliant {
install_hl_node_compliance(&mut ctx)?; install_hl_node_compliance(&mut ctx)?;
info!("hl-node compliant mode enabled"); info!("hl-node compliant mode enabled");
@ -77,6 +97,16 @@ fn main() -> eyre::Result<()> {
}) })
.apply(|mut builder| { .apply(|mut builder| {
builder.db_mut().create_tables_for::<Tables>().expect("create tables"); builder.db_mut().create_tables_for::<Tables>().expect("create tables");
let chain_id = builder.config().chain.inner.chain().id();
let db = builder.db_mut().clone();
// Set database handle for on-demand persistence
set_spot_metadata_db(db.clone());
// Load spot metadata from database and initialize cache
spot_meta_init::load_spot_metadata_cache(&db, chain_id);
builder builder
}) })
.launch() .launch()

View File

@ -2,7 +2,7 @@ use crate::{
chainspec::{HlChainSpec, parser::HlChainSpecParser}, chainspec::{HlChainSpec, parser::HlChainSpecParser},
node::{ node::{
HlNode, consensus::HlConsensus, evm::config::HlEvmConfig, migrate::Migrator, HlNode, consensus::HlConsensus, evm::config::HlEvmConfig, migrate::Migrator,
storage::tables::Tables, spot_meta::init as spot_meta_init, storage::tables::Tables,
}, },
pseudo_peer::BlockSourceArgs, pseudo_peer::BlockSourceArgs,
}; };
@ -82,6 +82,13 @@ pub struct HlNodeArgs {
/// * Refers to the Merkle trie used for eth_getProof and state root, not actual state values. /// * Refers to the Merkle trie used for eth_getProof and state root, not actual state values.
#[arg(long, env = "EXPERIMENTAL_ETH_GET_PROOF")] #[arg(long, env = "EXPERIMENTAL_ETH_GET_PROOF")]
pub experimental_eth_get_proof: bool, pub experimental_eth_get_proof: bool,
/// Allow network configuration overrides from CLI.
///
/// When enabled, network settings (discovery_addr, listener_addr, dns_discovery, nat)
/// will be taken from CLI arguments instead of being hardcoded to localhost-only defaults.
#[arg(long, env = "ALLOW_NETWORK_OVERRIDES")]
pub allow_network_overrides: bool,
} }
/// The main reth_hl cli interface. /// The main reth_hl cli interface.
@ -145,8 +152,12 @@ where
match self.command { match self.command {
Commands::Node(command) => runner.run_command_until_exit(|ctx| { Commands::Node(command) => runner.run_command_until_exit(|ctx| {
// NOTE: This is for one time migration around Oct 10 upgrade:
// It's not necessary anymore, an environment variable gate is added here.
if std::env::var("CHECK_DB_MIGRATION").is_ok() {
Self::migrate_db(&command.chain, &command.datadir, &command.db) Self::migrate_db(&command.chain, &command.datadir, &command.db)
.expect("Failed to migrate database"); .expect("Failed to migrate database");
}
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher)) command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
}), }),
Commands::Init(command) => { Commands::Init(command) => {
@ -190,7 +201,12 @@ where
let data_dir = env.datadir.clone().resolve_datadir(env.chain.chain()); let data_dir = env.datadir.clone().resolve_datadir(env.chain.chain());
let db_path = data_dir.db(); let db_path = data_dir.db();
init_db(db_path.clone(), env.db.database_args())?; init_db(db_path.clone(), env.db.database_args())?;
init_db_for::<_, Tables>(db_path, env.db.database_args())?; init_db_for::<_, Tables>(db_path.clone(), env.db.database_args())?;
// Initialize spot metadata in database
let chain_id = env.chain.chain().id();
spot_meta_init::init_spot_metadata(db_path, env.db.database_args(), chain_id)?;
Ok(()) Ok(())
} }

View File

@ -1,4 +1,8 @@
use crate::{hardforks::HlHardforks, node::{primitives::HlHeader, HlNode}, HlBlock, HlBlockBody, HlPrimitives}; use crate::{
HlBlock, HlBlockBody, HlPrimitives,
hardforks::HlHardforks,
node::{HlNode, primitives::HlHeader},
};
use reth::{ use reth::{
api::{FullNodeTypes, NodeTypes}, api::{FullNodeTypes, NodeTypes},
beacon_consensus::EthBeaconConsensus, beacon_consensus::EthBeaconConsensus,

View File

@ -1,5 +1,6 @@
use crate::{ use crate::{
node::evm::config::{HlBlockExecutorFactory, HlEvmConfig}, HlBlock, HlHeader HlBlock, HlHeader,
node::evm::config::{HlBlockExecutorFactory, HlEvmConfig},
}; };
use reth_evm::{ use reth_evm::{
block::BlockExecutionError, block::BlockExecutionError,

View File

@ -1,40 +1,48 @@
use alloy_consensus::Header; use alloy_consensus::Header;
use alloy_primitives::{b256, hex::ToHexExt, BlockHash, B256, U256}; use alloy_primitives::{B256, BlockHash, Bytes, U256, b256, hex::ToHexExt};
use reth::{ use reth::{
api::{NodeTypes, NodeTypesWithDBAdapter}, api::NodeTypesWithDBAdapter,
args::{DatabaseArgs, DatadirArgs}, args::{DatabaseArgs, DatadirArgs},
dirs::{ChainPath, DataDirPath}, dirs::{ChainPath, DataDirPath},
}; };
use reth_chainspec::EthChainSpec; use reth_chainspec::EthChainSpec;
use reth_db::{ use reth_db::{
mdbx::{tx::Tx, RO}, DatabaseEnv,
mdbx::{RO, tx::Tx},
models::CompactU256, models::CompactU256,
static_file::iter_static_files, static_file::iter_static_files,
table::Decompress, table::Decompress,
DatabaseEnv, tables,
};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
transaction::{DbTx, DbTxMut},
}; };
use reth_errors::ProviderResult; use reth_errors::ProviderResult;
use reth_ethereum_primitives::EthereumReceipt;
use reth_provider::{ use reth_provider::{
providers::{NodeTypesForProvider, StaticFileProvider},
static_file::SegmentRangeInclusive,
DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory, DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory,
StaticFileSegment, StaticFileWriter, StaticFileSegment, StaticFileWriter,
providers::{NodeTypesForProvider, StaticFileProvider},
static_file::SegmentRangeInclusive,
}; };
use std::{marker::PhantomData, path::PathBuf, sync::Arc}; use std::{fs::File, io::Write, path::PathBuf, sync::Arc};
use tracing::{info, warn}; use tracing::{info, warn};
use crate::{chainspec::HlChainSpec, HlHeader, HlPrimitives}; use crate::{HlHeader, HlPrimitives, chainspec::HlChainSpec};
pub(super) struct Migrator<N: NodeTypesForProvider> { pub(crate) trait HlNodeType:
NodeTypesForProvider<ChainSpec = HlChainSpec, Primitives = HlPrimitives>
{
}
impl<N: NodeTypesForProvider<ChainSpec = HlChainSpec, Primitives = HlPrimitives>> HlNodeType for N {}
pub(super) struct Migrator<N: HlNodeType> {
data_dir: ChainPath<DataDirPath>, data_dir: ChainPath<DataDirPath>,
provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>, provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
_nt: PhantomData<N>,
} }
impl<N: NodeTypesForProvider> Migrator<N> impl<N: HlNodeType> Migrator<N> {
where
N: NodeTypes<ChainSpec = HlChainSpec, Primitives = HlPrimitives>,
{
const MIGRATION_PATH_SUFFIX: &'static str = "migration-tmp"; const MIGRATION_PATH_SUFFIX: &'static str = "migration-tmp";
pub fn new( pub fn new(
@ -44,7 +52,7 @@ where
) -> eyre::Result<Self> { ) -> eyre::Result<Self> {
let data_dir = datadir.clone().resolve_datadir(chain_spec.chain()); let data_dir = datadir.clone().resolve_datadir(chain_spec.chain());
let provider_factory = Self::provider_factory(chain_spec, datadir, database_args)?; let provider_factory = Self::provider_factory(chain_spec, datadir, database_args)?;
Ok(Self { data_dir, provider_factory, _nt: PhantomData }) Ok(Self { data_dir, provider_factory })
} }
pub fn sf_provider(&self) -> StaticFileProvider<HlPrimitives> { pub fn sf_provider(&self) -> StaticFileProvider<HlPrimitives> {
@ -66,9 +74,12 @@ where
} }
fn migrate_db_inner(&self) -> eyre::Result<()> { fn migrate_db_inner(&self) -> eyre::Result<()> {
self.migrate_static_files()?; let migrated_mdbx = MigratorMdbx::<N>(self).migrate_mdbx()?;
self.migrate_mdbx()?; let migrated_static_files = MigrateStaticFiles::<N>(self).migrate_static_files()?;
if migrated_mdbx || migrated_static_files {
info!("Database migrated successfully"); info!("Database migrated successfully");
}
Ok(()) Ok(())
} }
@ -76,6 +87,116 @@ where
self.data_dir.data_dir().join(Self::MIGRATION_PATH_SUFFIX) self.data_dir.data_dir().join(Self::MIGRATION_PATH_SUFFIX)
} }
fn provider_factory(
chain_spec: HlChainSpec,
datadir: DatadirArgs,
database_args: DatabaseArgs,
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>> {
let data_dir = datadir.clone().resolve_datadir(chain_spec.chain());
let db_env = reth_db::init_db(data_dir.db(), database_args.database_args())?;
let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?;
let db = Arc::new(db_env);
Ok(ProviderFactory::new(db, Arc::new(chain_spec), static_file_provider))
}
}
struct MigratorMdbx<'a, N: HlNodeType>(&'a Migrator<N>);
impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
fn migrate_mdbx(&self) -> eyre::Result<bool> {
// if any header is in old format, we need to migrate it, so we pick the first and last one
let db_env = self.0.provider_factory.provider()?;
let mut cursor = db_env.tx_ref().cursor_read::<tables::Headers<Bytes>>()?;
let migration_needed = {
let first_is_old = match cursor.first()? {
Some((number, header)) => using_old_header(number, &header),
None => false,
};
let last_is_old = match cursor.last()? {
Some((number, header)) => using_old_header(number, &header),
None => false,
};
first_is_old || last_is_old
};
if !migration_needed {
return Ok(false);
}
check_if_migration_enabled()?;
self.migrate_mdbx_inner()?;
Ok(true)
}
fn migrate_mdbx_inner(&self) -> eyre::Result<()> {
// There shouldn't be many headers in mdbx, but using file for safety
info!("Old database detected, migrating mdbx...");
let conversion_tmp = self.0.conversion_tmp_dir();
let tmp_path = conversion_tmp.join("headers.rmp");
if conversion_tmp.exists() {
std::fs::remove_dir_all(&conversion_tmp)?;
}
std::fs::create_dir_all(&conversion_tmp)?;
let count = self.export_old_headers(&tmp_path)?;
self.import_new_headers(tmp_path, count)?;
Ok(())
}
fn export_old_headers(&self, tmp_path: &PathBuf) -> Result<i32, eyre::Error> {
let db_env = self.0.provider_factory.provider()?;
let mut cursor_read = db_env.tx_ref().cursor_read::<tables::Headers<Bytes>>()?;
let mut tmp_writer = File::create(tmp_path)?;
let mut count = 0;
let old_headers = cursor_read.walk(None)?.filter_map(|row| {
let (block_number, header) = row.ok()?;
if !using_old_header(block_number, &header) {
None
} else {
Some((block_number, Header::decompress(&header).ok()?))
}
});
for (block_number, header) in old_headers {
let receipt =
db_env.receipts_by_block(block_number.into())?.expect("Receipt not found");
let new_header = to_hl_header(receipt, header);
tmp_writer.write_all(&rmp_serde::to_vec(&(block_number, new_header))?)?;
count += 1;
}
Ok(count)
}
fn import_new_headers(&self, tmp_path: PathBuf, count: i32) -> Result<(), eyre::Error> {
let mut tmp_reader = File::open(tmp_path)?;
let db_env = self.0.provider_factory.provider_rw()?;
let mut cursor_write = db_env.tx_ref().cursor_write::<tables::Headers<Bytes>>()?;
for _ in 0..count {
let (number, header) = rmp_serde::from_read::<_, (u64, HlHeader)>(&mut tmp_reader)?;
cursor_write.upsert(number, &rmp_serde::to_vec(&header)?.into())?;
}
db_env.commit()?;
Ok(())
}
}
fn check_if_migration_enabled() -> Result<(), eyre::Error> {
if std::env::var("EXPERIMENTAL_MIGRATE_DB").is_err() {
let err_msg = concat!(
"Detected an old database format but experimental database migration is currently disabled. ",
"To enable migration, set EXPERIMENTAL_MIGRATE_DB=1, or alternatively, resync your node (safest option)."
);
warn!("{}", err_msg);
return Err(eyre::eyre!("{}", err_msg));
}
Ok(())
}
struct MigrateStaticFiles<'a, N: HlNodeType>(&'a Migrator<N>);
impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
fn iterate_files_for_segment( fn iterate_files_for_segment(
&self, &self,
block_range: SegmentRangeInclusive, block_range: SegmentRangeInclusive,
@ -102,8 +223,8 @@ where
fn create_placeholder(&self, block_range: SegmentRangeInclusive) -> eyre::Result<()> { fn create_placeholder(&self, block_range: SegmentRangeInclusive) -> eyre::Result<()> {
// The direction is opposite here // The direction is opposite here
let src = self.data_dir.static_files(); let src = self.0.data_dir.static_files();
let dst = self.conversion_tmp_dir(); let dst = self.0.conversion_tmp_dir();
for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? { for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? {
let dst_path = dst.join(file_name); let dst_path = dst.join(file_name);
@ -120,8 +241,8 @@ where
&self, &self,
block_range: SegmentRangeInclusive, block_range: SegmentRangeInclusive,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let src = self.conversion_tmp_dir(); let src = self.0.conversion_tmp_dir();
let dst = self.data_dir.static_files(); let dst = self.0.data_dir.static_files();
for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? { for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? {
let dst_path = dst.join(file_name); let dst_path = dst.join(file_name);
@ -133,9 +254,9 @@ where
self.create_placeholder(block_range) self.create_placeholder(block_range)
} }
fn migrate_static_files(&self) -> eyre::Result<()> { fn migrate_static_files(&self) -> eyre::Result<bool> {
let conversion_tmp = self.conversion_tmp_dir(); let conversion_tmp = self.0.conversion_tmp_dir();
let old_path = self.data_dir.static_files(); let old_path = self.0.data_dir.static_files();
if conversion_tmp.exists() { if conversion_tmp.exists() {
std::fs::remove_dir_all(&conversion_tmp)?; std::fs::remove_dir_all(&conversion_tmp)?;
@ -145,7 +266,6 @@ where
let mut all_static_files = iter_static_files(&old_path)?; let mut all_static_files = iter_static_files(&old_path)?;
let all_static_files = let all_static_files =
all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default(); all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default();
let provider = self.provider_factory.provider()?;
let mut first = true; let mut first = true;
@ -159,60 +279,34 @@ where
} }
if first { if first {
info!("Old database detected, migrating database..."); check_if_migration_enabled()?;
info!("Old database detected, migrating static files...");
first = false; first = false;
} }
let sf_provider = self.sf_provider(); let sf_provider = self.0.sf_provider();
let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?; let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?;
let provider = self.0.provider_factory.provider()?;
let block_range_for_filename = sf_provider.find_fixed_range(block_range.start()); let block_range_for_filename = sf_provider.find_fixed_range(block_range.start());
migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?; migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?;
self.move_static_files_for_segment(block_range_for_filename)?; self.move_static_files_for_segment(block_range_for_filename)?;
} }
Ok(()) Ok(!first)
}
fn provider_factory(
chain_spec: HlChainSpec,
datadir: DatadirArgs,
database_args: DatabaseArgs,
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>> {
let data_dir = datadir.clone().resolve_datadir(chain_spec.chain());
let db_env = reth_db::init_db(data_dir.db(), database_args.database_args())?;
let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?;
let db = Arc::new(db_env);
Ok(ProviderFactory::new(db, Arc::new(chain_spec), static_file_provider))
}
fn migrate_mdbx(&self) -> eyre::Result<()> {
// Actually not much here, all of blocks should be in the static files
Ok(())
} }
fn using_old_header(&self, number: u64) -> eyre::Result<bool> { fn using_old_header(&self, number: u64) -> eyre::Result<bool> {
let sf_provider = self.sf_provider(); let sf_provider = self.0.sf_provider();
let content = old_headers_range(&sf_provider, number..=number)?; let content = old_headers_range(&sf_provider, number..=number)?;
let &[row] = &content.as_slice() else { let &[row] = &content.as_slice() else {
warn!("No header found for block {}", number); warn!("No header found for block {}", number);
return Ok(false); return Ok(false);
}; };
let header = &row[0];
let deserialized_old = is_old_header(header); Ok(using_old_header(number, &row[0]))
let deserialized_new = is_new_header(header);
assert!(
deserialized_old ^ deserialized_new,
"Header is not valid: {} {}\ndeserialized_old: {}\ndeserialized_new: {}",
number,
header.encode_hex(),
deserialized_old,
deserialized_new
);
Ok(deserialized_old && !deserialized_new)
} }
} }
@ -242,7 +336,7 @@ fn is_new_header(header: &[u8]) -> bool {
rmp_serde::from_slice::<HlHeader>(header).is_ok() rmp_serde::from_slice::<HlHeader>(header).is_ok()
} }
fn migrate_single_static_file<N: NodeTypesForProvider<Primitives = HlPrimitives>>( fn migrate_single_static_file<N: HlNodeType>(
sf_out: &StaticFileProvider<HlPrimitives>, sf_out: &StaticFileProvider<HlPrimitives>,
sf_in: &StaticFileProvider<HlPrimitives>, sf_in: &StaticFileProvider<HlPrimitives>,
provider: &DatabaseProvider<Tx<RO>, NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>, provider: &DatabaseProvider<Tx<RO>, NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
@ -250,9 +344,9 @@ fn migrate_single_static_file<N: NodeTypesForProvider<Primitives = HlPrimitives>
) -> Result<(), eyre::Error> { ) -> Result<(), eyre::Error> {
info!("Migrating block range {}...", block_range); info!("Migrating block range {}...", block_range);
// block_ranges into chunks of 100000 blocks // block_ranges into chunks of 50000 blocks
const CHUNK_SIZE: u64 = 100000; const CHUNK_SIZE: u64 = 50000;
for chunk in (0..=block_range.end()).step_by(CHUNK_SIZE as usize) { for chunk in (block_range.start()..=block_range.end()).step_by(CHUNK_SIZE as usize) {
let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end()); let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end());
let block_range = chunk..=end; let block_range = chunk..=end;
let headers = old_headers_range(sf_in, block_range.clone())?; let headers = old_headers_range(sf_in, block_range.clone())?;
@ -261,11 +355,8 @@ fn migrate_single_static_file<N: NodeTypesForProvider<Primitives = HlPrimitives>
let mut writer = sf_out.get_writer(*block_range.start(), StaticFileSegment::Headers)?; let mut writer = sf_out.get_writer(*block_range.start(), StaticFileSegment::Headers)?;
let new_headers = std::iter::zip(headers, receipts) let new_headers = std::iter::zip(headers, receipts)
.map(|(header, receipts)| { .map(|(header, receipts)| {
let system_tx_count =
receipts.iter().filter(|r| r.cumulative_gas_used == 0).count();
let eth_header = Header::decompress(&header[0]).unwrap(); let eth_header = Header::decompress(&header[0]).unwrap();
let hl_header = let hl_header = to_hl_header(receipts, eth_header);
HlHeader::from_ethereum_header(eth_header, &receipts, system_tx_count as u64);
let difficulty: U256 = CompactU256::decompress(&header[1]).unwrap().into(); let difficulty: U256 = CompactU256::decompress(&header[1]).unwrap().into();
let hash = BlockHash::decompress(&header[2]).unwrap(); let hash = BlockHash::decompress(&header[2]).unwrap();
@ -281,6 +372,11 @@ fn migrate_single_static_file<N: NodeTypesForProvider<Primitives = HlPrimitives>
Ok(()) Ok(())
} }
fn to_hl_header(receipts: Vec<EthereumReceipt>, eth_header: Header) -> HlHeader {
let system_tx_count = receipts.iter().filter(|r| r.cumulative_gas_used == 0).count();
HlHeader::from_ethereum_header(eth_header, &receipts, system_tx_count as u64)
}
fn old_headers_range( fn old_headers_range(
provider: &StaticFileProvider<HlPrimitives>, provider: &StaticFileProvider<HlPrimitives>,
block_range: impl std::ops::RangeBounds<u64>, block_range: impl std::ops::RangeBounds<u64>,
@ -316,3 +412,18 @@ fn to_range<R: std::ops::RangeBounds<u64>>(bounds: R) -> std::ops::Range<u64> {
start..end start..end
} }
fn using_old_header(number: u64, header: &[u8]) -> bool {
let deserialized_old = is_old_header(header);
let deserialized_new = is_new_header(header);
assert!(
deserialized_old ^ deserialized_new,
"Header is not valid: {} {}\ndeserialized_old: {}\ndeserialized_new: {}",
number,
header.encode_hex(),
deserialized_old,
deserialized_new
);
deserialized_old && !deserialized_new
}

View File

@ -51,12 +51,14 @@ pub struct HlNode {
engine_handle_rx: Arc<Mutex<Option<oneshot::Receiver<ConsensusEngineHandle<HlPayloadTypes>>>>>, engine_handle_rx: Arc<Mutex<Option<oneshot::Receiver<ConsensusEngineHandle<HlPayloadTypes>>>>>,
block_source_config: BlockSourceConfig, block_source_config: BlockSourceConfig,
debug_cutoff_height: Option<u64>, debug_cutoff_height: Option<u64>,
allow_network_overrides: bool,
} }
impl HlNode { impl HlNode {
pub fn new( pub fn new(
block_source_config: BlockSourceConfig, block_source_config: BlockSourceConfig,
debug_cutoff_height: Option<u64>, debug_cutoff_height: Option<u64>,
allow_network_overrides: bool,
) -> (Self, oneshot::Sender<ConsensusEngineHandle<HlPayloadTypes>>) { ) -> (Self, oneshot::Sender<ConsensusEngineHandle<HlPayloadTypes>>) {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
( (
@ -64,6 +66,7 @@ impl HlNode {
engine_handle_rx: Arc::new(Mutex::new(Some(rx))), engine_handle_rx: Arc::new(Mutex::new(Some(rx))),
block_source_config, block_source_config,
debug_cutoff_height, debug_cutoff_height,
allow_network_overrides,
}, },
tx, tx,
) )
@ -95,6 +98,7 @@ impl HlNode {
engine_handle_rx: self.engine_handle_rx.clone(), engine_handle_rx: self.engine_handle_rx.clone(),
block_source_config: self.block_source_config.clone(), block_source_config: self.block_source_config.clone(),
debug_cutoff_height: self.debug_cutoff_height, debug_cutoff_height: self.debug_cutoff_height,
allow_network_overrides: self.allow_network_overrides,
}) })
.consensus(HlConsensusBuilder::default()) .consensus(HlConsensusBuilder::default())
} }

View File

@ -179,7 +179,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{chainspec::hl::hl_mainnet, HlHeader}; use crate::{HlHeader, chainspec::hl::hl_mainnet};
use super::*; use super::*;
use alloy_primitives::{B256, U128}; use alloy_primitives::{B256, U128};

View File

@ -25,7 +25,10 @@ use reth_network::{NetworkConfig, NetworkHandle, NetworkManager};
use reth_network_api::PeersInfo; use reth_network_api::PeersInfo;
use reth_provider::StageCheckpointReader; use reth_provider::StageCheckpointReader;
use reth_stages_types::StageId; use reth_stages_types::StageId;
use std::sync::Arc; use std::{
net::{Ipv4Addr, SocketAddr},
sync::Arc,
};
use tokio::sync::{Mutex, mpsc, oneshot}; use tokio::sync::{Mutex, mpsc, oneshot};
use tracing::info; use tracing::info;
@ -144,6 +147,8 @@ pub struct HlNetworkBuilder {
pub(crate) block_source_config: BlockSourceConfig, pub(crate) block_source_config: BlockSourceConfig,
pub(crate) debug_cutoff_height: Option<u64>, pub(crate) debug_cutoff_height: Option<u64>,
pub(crate) allow_network_overrides: bool,
} }
impl HlNetworkBuilder { impl HlNetworkBuilder {
@ -174,15 +179,24 @@ impl HlNetworkBuilder {
ImportService::new(consensus, handle, from_network, to_network).await.unwrap(); ImportService::new(consensus, handle, from_network, to_network).await.unwrap();
}); });
Ok(ctx.build_network_config( let mut config_builder = ctx.network_config_builder()?;
ctx.network_config_builder()?
// Only apply localhost-only network settings if network overrides are NOT allowed
if !self.allow_network_overrides {
config_builder = config_builder
.discovery_addr(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
.listener_addr(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
.disable_dns_discovery() .disable_dns_discovery()
.disable_nat() .disable_nat();
}
config_builder = config_builder
.boot_nodes(boot_nodes()) .boot_nodes(boot_nodes())
.set_head(ctx.head()) .set_head(ctx.head())
.with_pow() .with_pow()
.block_import(Box::new(HlBlockImport::new(handle))), .block_import(Box::new(HlBlockImport::new(handle)));
))
Ok(ctx.build_network_config(config_builder))
} }
} }

View File

@ -3,8 +3,13 @@ use alloy_primitives::Address;
use reth_primitives_traits::{BlockBody as BlockBodyTrait, InMemorySize}; use reth_primitives_traits::{BlockBody as BlockBodyTrait, InMemorySize};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::node::types::{ReadPrecompileCall, ReadPrecompileCalls}; use crate::{
use crate::{HlHeader, node::primitives::TransactionSigned}; HlHeader,
node::{
primitives::TransactionSigned,
types::{ReadPrecompileCall, ReadPrecompileCalls},
},
};
/// Block body for HL. It is equivalent to Ethereum [`BlockBody`] but additionally stores sidecars /// Block body for HL. It is equivalent to Ethereum [`BlockBody`] but additionally stores sidecars
/// for blob transactions. /// for blob transactions.
@ -33,13 +38,11 @@ pub type BlockBody = alloy_consensus::BlockBody<TransactionSigned, HlHeader>;
impl InMemorySize for HlBlockBody { impl InMemorySize for HlBlockBody {
fn size(&self) -> usize { fn size(&self) -> usize {
self.inner.size() self.inner.size() +
+ self self.sidecars
.sidecars
.as_ref() .as_ref()
.map_or(0, |s| s.capacity() * core::mem::size_of::<BlobTransactionSidecar>()) .map_or(0, |s| s.capacity() * core::mem::size_of::<BlobTransactionSidecar>()) +
+ self self.read_precompile_calls
.read_precompile_calls
.as_ref() .as_ref()
.map_or(0, |s| s.0.capacity() * core::mem::size_of::<ReadPrecompileCall>()) .map_or(0, |s| s.0.capacity() * core::mem::size_of::<ReadPrecompileCall>())
} }

View File

@ -45,7 +45,11 @@ pub struct HlHeaderExtras {
} }
impl HlHeader { impl HlHeader {
pub(crate) fn from_ethereum_header(header: Header, receipts: &[EthereumReceipt], system_tx_count: u64) -> HlHeader { pub(crate) fn from_ethereum_header(
header: Header,
receipts: &[EthereumReceipt],
system_tx_count: u64,
) -> HlHeader {
let logs_bloom = logs_bloom(receipts.iter().flat_map(|r| &r.logs)); let logs_bloom = logs_bloom(receipts.iter().flat_map(|r| &r.logs));
HlHeader { HlHeader {
inner: header, inner: header,
@ -183,8 +187,9 @@ impl reth_codecs::Compact for HlHeader {
// because Compact trait requires the Bytes field to be placed at the end of the struct. // because Compact trait requires the Bytes field to be placed at the end of the struct.
// Bytes::from_compact just reads all trailing data as the Bytes field. // Bytes::from_compact just reads all trailing data as the Bytes field.
// //
// Hence we need to use other form of serialization, since extra headers are not Compact-compatible. // Hence we need to use other form of serialization, since extra headers are not
// We just treat all header fields as rmp-serialized one `Bytes` field. // Compact-compatible. We just treat all header fields as rmp-serialized one `Bytes`
// field.
let result: Bytes = rmp_serde::to_vec(&self).unwrap().into(); let result: Bytes = rmp_serde::to_vec(&self).unwrap().into();
result.to_compact(buf) result.to_compact(buf)
} }

View File

@ -1,6 +1,6 @@
#![allow(clippy::owned_cow)] #![allow(clippy::owned_cow)]
use super::{HlBlock, HlBlockBody, TransactionSigned}; use super::{HlBlock, HlBlockBody, TransactionSigned};
use crate::{node::types::ReadPrecompileCalls, HlHeader}; use crate::{HlHeader, node::types::ReadPrecompileCalls};
use alloy_consensus::{BlobTransactionSidecar, BlockBody}; use alloy_consensus::{BlobTransactionSidecar, BlockBody};
use alloy_eips::eip4895::Withdrawals; use alloy_eips::eip4895::Withdrawals;
use alloy_primitives::Address; use alloy_primitives::Address;

View File

@ -6,7 +6,10 @@ use serde::{Deserialize, Serialize};
use std::borrow::Cow; use std::borrow::Cow;
use super::{HlBlock, HlBlockBody}; use super::{HlBlock, HlBlockBody};
use crate::{node::{primitives::BlockBody, types::ReadPrecompileCalls}, HlHeader}; use crate::{
HlHeader,
node::{primitives::BlockBody, types::ReadPrecompileCalls},
};
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct HlBlockBodyBincode<'a> { pub struct HlBlockBodyBincode<'a> {

103
src/node/spot_meta/init.rs Normal file
View File

@ -0,0 +1,103 @@
use crate::node::{
spot_meta::{SpotId, erc20_contract_to_spot_token},
storage::tables::{self, SPOT_METADATA_KEY},
types::reth_compat,
};
use alloy_primitives::Address;
use reth_db::{
DatabaseEnv,
cursor::DbCursorRO,
};
use reth_db_api::{
Database,
transaction::DbTx,
};
use std::{collections::BTreeMap, sync::Arc};
use tracing::info;
/// Load spot metadata from database and initialize cache
pub fn load_spot_metadata_cache(db: &Arc<DatabaseEnv>, chain_id: u64) {
// Try to read from database
let data = match db.view(|tx| -> Result<Option<Vec<u8>>, reth_db::DatabaseError> {
let mut cursor = tx.cursor_read::<tables::SpotMetadata>()?;
Ok(cursor.seek_exact(SPOT_METADATA_KEY)?.map(|(_, data)| data.to_vec()))
}) {
Ok(Ok(data)) => data,
Ok(Err(e)) => {
info!(
"Failed to read spot metadata from database: {}. Will fetch on-demand from API.",
e
);
return;
}
Err(e) => {
info!(
"Database view error while loading spot metadata: {}. Will fetch on-demand from API.",
e
);
return;
}
};
// Check if data exists
let Some(data) = data else {
info!(
"No spot metadata found in database for chain {}. Run 'init-state' to populate, or it will be fetched on-demand from API.",
chain_id
);
return;
};
// Deserialize metadata
let serializable_map = match rmp_serde::from_slice::<BTreeMap<Address, u64>>(&data) {
Ok(map) => map,
Err(e) => {
info!("Failed to deserialize spot metadata: {}. Will fetch on-demand from API.", e);
return;
}
};
// Convert and initialize cache
let metadata: BTreeMap<Address, SpotId> =
serializable_map.into_iter().map(|(addr, index)| (addr, SpotId { index })).collect();
info!("Loaded spot metadata from database ({} entries)", metadata.len());
reth_compat::initialize_spot_metadata_cache(metadata);
}
/// Initialize spot metadata in database from API
pub fn init_spot_metadata(
db_path: impl AsRef<std::path::Path>,
db_args: reth_db::mdbx::DatabaseArguments,
chain_id: u64,
) -> eyre::Result<()> {
info!("Initializing spot metadata for chain {}", chain_id);
let db = Arc::new(reth_db::open_db(db_path.as_ref(), db_args)?);
// Check if spot metadata already exists
let exists = db.view(|tx| -> Result<bool, reth_db::DatabaseError> {
let mut cursor = tx.cursor_read::<tables::SpotMetadata>()?;
Ok(cursor.seek_exact(SPOT_METADATA_KEY)?.is_some())
})??;
if exists {
info!("Spot metadata already exists in database");
return Ok(());
}
// Fetch from API
let metadata = match erc20_contract_to_spot_token(chain_id) {
Ok(m) => m,
Err(e) => {
info!("Failed to fetch spot metadata from API: {}. Will be fetched on-demand.", e);
return Ok(());
}
};
// Store to database
reth_compat::store_spot_metadata(&db, &metadata)?;
info!("Successfully fetched and stored spot metadata for chain {}", chain_id);
Ok(())
}

View File

@ -5,6 +5,7 @@ use std::collections::BTreeMap;
use crate::chainspec::{MAINNET_CHAIN_ID, TESTNET_CHAIN_ID}; use crate::chainspec::{MAINNET_CHAIN_ID, TESTNET_CHAIN_ID};
pub mod init;
mod patch; mod patch;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -25,7 +26,7 @@ pub struct SpotMeta {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub(crate) struct SpotId { pub struct SpotId {
pub index: u64, pub index: u64,
} }

View File

@ -2,10 +2,21 @@ use alloy_primitives::{BlockNumber, Bytes};
use reth_db::{TableSet, TableType, TableViewer, table::TableInfo, tables}; use reth_db::{TableSet, TableType, TableViewer, table::TableInfo, tables};
use std::fmt; use std::fmt;
/// Static key used for spot metadata, as the database is unique to each chain.
/// This may later serve as a versioning key to assist with future database migrations.
pub const SPOT_METADATA_KEY: u64 = 0;
tables! { tables! {
/// Read precompile calls for each block. /// Read precompile calls for each block.
table BlockReadPrecompileCalls { table BlockReadPrecompileCalls {
type Key = BlockNumber; type Key = BlockNumber;
type Value = Bytes; type Value = Bytes;
} }
/// Spot metadata mapping (EVM address to spot token index).
/// Uses a constant key since the database is chain-specific.
table SpotMetadata {
type Key = u64;
type Value = Bytes;
}
} }

View File

@ -19,6 +19,9 @@ pub struct ReadPrecompileCalls(pub Vec<ReadPrecompileCall>);
pub(crate) mod reth_compat; pub(crate) mod reth_compat;
// Re-export spot metadata functions
pub use reth_compat::{initialize_spot_metadata_cache, set_spot_metadata_db};
#[derive(Debug, Clone, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct HlExtras { pub struct HlExtras {
pub read_precompile_calls: Option<ReadPrecompileCalls>, pub read_precompile_calls: Option<ReadPrecompileCalls>,
@ -127,6 +130,19 @@ pub struct SystemTx {
pub receipt: Option<LegacyReceipt>, pub receipt: Option<LegacyReceipt>,
} }
impl SystemTx {
pub fn gas_limit(&self) -> u64 {
use reth_compat::Transaction;
match &self.tx {
Transaction::Legacy(tx) => tx.gas_limit,
Transaction::Eip2930(tx) => tx.gas_limit,
Transaction::Eip1559(tx) => tx.gas_limit,
Transaction::Eip4844(tx) => tx.gas_limit,
Transaction::Eip7702(tx) => tx.gas_limit,
}
}
}
#[derive( #[derive(
Debug, Debug,
Clone, Clone,

View File

@ -1,11 +1,14 @@
//! Copy of reth codebase to preserve serialization compatibility //! Copy of reth codebase to preserve serialization compatibility
use crate::node::storage::tables::{SPOT_METADATA_KEY, SpotMetadata};
use alloy_consensus::{Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy}; use alloy_consensus::{Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy};
use alloy_primitives::{Address, BlockHash, Signature, TxKind, U256}; use alloy_primitives::{Address, BlockHash, Bytes, Signature, TxKind, U256};
use reth_db::{DatabaseEnv, DatabaseError, cursor::DbCursorRW};
use reth_db_api::{Database, transaction::DbTxMut};
use reth_primitives::TransactionSigned as RethTxSigned; use reth_primitives::TransactionSigned as RethTxSigned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
collections::BTreeMap, collections::BTreeMap,
sync::{Arc, LazyLock, RwLock}, sync::{Arc, LazyLock, Mutex, RwLock},
}; };
use tracing::info; use tracing::info;
@ -81,10 +84,56 @@ pub struct SealedBlock {
pub body: BlockBody, pub body: BlockBody,
} }
fn system_tx_to_reth_transaction(transaction: &SystemTx, chain_id: u64) -> TxSigned { static SPOT_EVM_MAP: LazyLock<Arc<RwLock<BTreeMap<Address, SpotId>>>> =
static EVM_MAP: LazyLock<Arc<RwLock<BTreeMap<Address, SpotId>>>> =
LazyLock::new(|| Arc::new(RwLock::new(BTreeMap::new()))); LazyLock::new(|| Arc::new(RwLock::new(BTreeMap::new())));
{
// Optional database handle for persisting on-demand fetches
static DB_HANDLE: LazyLock<Mutex<Option<Arc<DatabaseEnv>>>> = LazyLock::new(|| Mutex::new(None));
/// Set the database handle for persisting spot metadata
pub fn set_spot_metadata_db(db: Arc<DatabaseEnv>) {
*DB_HANDLE.lock().unwrap() = Some(db);
}
/// Initialize the spot metadata cache with data loaded from database.
/// This should be called during node initialization.
pub fn initialize_spot_metadata_cache(metadata: BTreeMap<Address, SpotId>) {
*SPOT_EVM_MAP.write().unwrap() = metadata;
}
/// Helper function to serialize and store spot metadata to database
pub fn store_spot_metadata(
db: &Arc<DatabaseEnv>,
metadata: &BTreeMap<Address, SpotId>,
) -> Result<(), DatabaseError> {
db.update(|tx| {
let mut cursor = tx.cursor_write::<SpotMetadata>()?;
// Serialize to BTreeMap<Address, u64>
let serializable_map: BTreeMap<Address, u64> =
metadata.iter().map(|(addr, spot)| (*addr, spot.index)).collect();
cursor.upsert(
SPOT_METADATA_KEY,
&Bytes::from(
rmp_serde::to_vec(&serializable_map).expect("Failed to serialize spot metadata"),
),
)?;
Ok(())
})?
}
/// Persist spot metadata to database if handle is available
fn persist_spot_metadata_to_db(metadata: &BTreeMap<Address, SpotId>) {
if let Some(db) = DB_HANDLE.lock().unwrap().as_ref() {
match store_spot_metadata(db, metadata) {
Ok(_) => info!("Persisted spot metadata to database"),
Err(e) => info!("Failed to persist spot metadata to database: {}", e),
}
}
}
fn system_tx_to_reth_transaction(transaction: &SystemTx, chain_id: u64) -> TxSigned {
let Transaction::Legacy(tx) = &transaction.tx else { let Transaction::Legacy(tx) = &transaction.tx else {
panic!("Unexpected transaction type"); panic!("Unexpected transaction type");
}; };
@ -95,28 +144,33 @@ fn system_tx_to_reth_transaction(transaction: &SystemTx, chain_id: u64) -> TxSig
U256::from(0x1) U256::from(0x1)
} else { } else {
loop { loop {
if let Some(spot) = EVM_MAP.read().unwrap().get(&to) { if let Some(spot) = SPOT_EVM_MAP.read().unwrap().get(&to) {
break spot.to_s(); break spot.to_s();
} }
info!("Contract not found: {to:?} from spot mapping, fetching again..."); // Cache miss - fetch from API, update cache, and persist to database
*EVM_MAP.write().unwrap() = erc20_contract_to_spot_token(chain_id).unwrap(); info!("Contract not found: {to:?} from spot mapping, fetching from API...");
let metadata = erc20_contract_to_spot_token(chain_id).unwrap();
*SPOT_EVM_MAP.write().unwrap() = metadata.clone();
persist_spot_metadata_to_db(&metadata);
} }
}; };
let signature = Signature::new(U256::from(0x1), s, true); let signature = Signature::new(U256::from(0x1), s, true);
TxSigned::Default(RethTxSigned::Legacy(Signed::new_unhashed(tx.clone(), signature))) TxSigned::Default(RethTxSigned::Legacy(Signed::new_unhashed(tx.clone(), signature)))
} }
}
impl SealedBlock { impl SealedBlock {
pub fn to_reth_block( pub fn to_reth_block(
&self, &self,
read_precompile_calls: ReadPrecompileCalls, read_precompile_calls: ReadPrecompileCalls,
highest_precompile_address: Option<Address>, highest_precompile_address: Option<Address>,
system_txs: Vec<super::SystemTx>, mut system_txs: Vec<super::SystemTx>,
receipts: Vec<LegacyReceipt>, receipts: Vec<LegacyReceipt>,
chain_id: u64, chain_id: u64,
) -> HlBlock { ) -> HlBlock {
// NOTE: These types of transactions are tracked at #97.
system_txs.retain(|tx| tx.receipt.is_some());
let mut merged_txs = vec![]; let mut merged_txs = vec![];
merged_txs.extend(system_txs.iter().map(|tx| system_tx_to_reth_transaction(tx, chain_id))); merged_txs.extend(system_txs.iter().map(|tx| system_tx_to_reth_transaction(tx, chain_id)));
merged_txs.extend(self.body.transactions.iter().map(|tx| tx.to_reth_transaction())); merged_txs.extend(self.body.transactions.iter().map(|tx| tx.to_reth_transaction()));

View File

@ -81,13 +81,13 @@ impl BlockPoller {
.await .await
.ok_or(eyre::eyre!("Failed to find latest block number"))?; .ok_or(eyre::eyre!("Failed to find latest block number"))?;
loop {
if let Some(debug_cutoff_height) = debug_cutoff_height && if let Some(debug_cutoff_height) = debug_cutoff_height &&
next_block_number > debug_cutoff_height next_block_number > debug_cutoff_height
{ {
next_block_number = debug_cutoff_height; next_block_number = debug_cutoff_height;
} }
loop {
match block_source.collect_block(next_block_number).await { match block_source.collect_block(next_block_number).await {
Ok(block) => { Ok(block) => {
block_tx.send((next_block_number, block)).await?; block_tx.send((next_block_number, block)).await?;

49
tests/run_tests.sh Normal file
View File

@ -0,0 +1,49 @@
#!/bin/bash
set -e
export ETH_RPC_URL="${ETH_RPC_URL:-wss://hl-archive-node.xyz}"
success() {
echo "Success: $1"
}
fail() {
echo "Failed: $1"
exit 1
}
ensure_cmd() {
command -v "$1" > /dev/null 2>&1 || fail "$1 is required"
}
ensure_cmd jq
ensure_cmd cast
ensure_cmd wscat
if [[ ! "$ETH_RPC_URL" =~ ^wss?:// ]]; then
fail "ETH_RPC_URL must be a websocket url"
fi
TITLE="Issue #78 - eth_getLogs should return system transactions"
cast logs \
--rpc-url "$ETH_RPC_URL" \
--from-block 15312567 \
--to-block 15312570 \
--address 0x9fdbda0a5e284c32744d2f17ee5c74b284993463 \
0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef \
| grep -q "0x00000000000000000000000020000000000000000000000000000000000000c5" \
&& success "$TITLE" || fail "$TITLE"
TITLE="Issue #78 - eth_getBlockByNumber should return the same logsBloom as official RPC"
OFFICIAL_RPC="https://rpc.hyperliquid.xyz/evm"
A=$(cast block 1394092 --rpc-url "$ETH_RPC_URL" -f logsBloom | md5sum)
B=$(cast block 1394092 --rpc-url "$OFFICIAL_RPC" -f logsBloom | md5sum)
echo node "$A"
echo rpc\ "$B"
[[ "$A" == "$B" ]] && success "$TITLE" || fail "$TITLE"
TITLE="eth_subscribe newHeads via wscat"
CMD='{"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params":["newHeads"]}'
wscat -w 2 -c "$ETH_RPC_URL" -x "$CMD" | tail -1 | jq -r .params.result.nonce | grep 0x \
&& success "$TITLE" || fail "$TITLE"