1 Commits

Author SHA1 Message Date
860fa666e3 Merge 233026871f into 92759f04db 2025-10-08 13:54:24 +00:00
24 changed files with 196 additions and 716 deletions

View File

@ -19,23 +19,62 @@ use alloy_rpc_types::{
TransactionInfo, TransactionInfo,
pubsub::{Params, SubscriptionKind}, pubsub::{Params, SubscriptionKind},
}; };
use jsonrpsee::{PendingSubscriptionSink, proc_macros::rpc}; use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink, proc_macros::rpc};
use jsonrpsee_core::{RpcResult, async_trait}; use jsonrpsee_core::{RpcResult, async_trait};
use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE}; use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE};
use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner}; use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner};
use reth_primitives_traits::SignedTransaction; use reth_primitives_traits::SignedTransaction;
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider}; use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
use reth_rpc::{EthFilter, EthPubSub}; use reth_rpc::{EthFilter, EthPubSub, RpcTypes, eth::pubsub::SubscriptionSerializeError};
use reth_rpc_eth_api::{ use reth_rpc_eth_api::{
EthApiTypes, EthFilterApiServer, EthPubSubApiServer, RpcBlock, RpcConvert, RpcReceipt, EthApiServer, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock,
RpcTransaction, helpers::EthBlocks, transaction::ConvertReceiptInput, RpcConvert, RpcHeader, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq,
helpers::{EthBlocks, EthTransactions, LoadReceipt},
transaction::ConvertReceiptInput,
}; };
use reth_rpc_eth_types::EthApiError; use reth_rpc_eth_types::EthApiError;
use serde::Serialize;
use std::{marker::PhantomData, sync::Arc}; use std::{marker::PhantomData, sync::Arc};
use tokio_stream::StreamExt; use tokio_stream::{Stream, StreamExt};
use tracing::{Instrument, trace}; use tracing::{Instrument, trace};
use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream}; use crate::{HlBlock, node::primitives::HlPrimitives};
pub trait EthWrapper:
EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
impl<T> EthWrapper for T where
T: EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
#[rpc(server, namespace = "eth")] #[rpc(server, namespace = "eth")]
#[async_trait] #[async_trait]
@ -347,8 +386,6 @@ where
pubsub.log_stream(filter).filter_map(|log| adjust_log::<Eth>(log, &provider)), pubsub.log_stream(filter).filter_map(|log| adjust_log::<Eth>(log, &provider)),
) )
.await; .await;
} else if kind == SubscriptionKind::NewHeads {
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
} else { } else {
let _ = pubsub.handle_accepted(sink, kind, params).await; let _ = pubsub.handle_accepted(sink, kind, params).await;
} }
@ -375,6 +412,23 @@ fn adjust_log<Eth: EthWrapper>(mut log: Log, provider: &Eth::Provider) -> Option
Some(log) Some(log)
} }
async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>> {
loop {
tokio::select! {
_ = sink.closed() => break Ok(()),
maybe_item = stream.next() => {
let Some(item) = maybe_item else { break Ok(()) };
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
.map_err(SubscriptionSerializeError::from)?;
if sink.send(msg).await.is_err() { break Ok(()); }
}
}
}
}
pub struct HlNodeBlockFilterHttp<Eth: EthWrapper> { pub struct HlNodeBlockFilterHttp<Eth: EthWrapper> {
eth_api: Arc<Eth>, eth_api: Arc<Eth>,
_marker: PhantomData<Eth>, _marker: PhantomData<Eth>,

View File

@ -1,5 +1,3 @@
pub mod call_forwarder; pub mod call_forwarder;
pub mod hl_node_compliance; pub mod hl_node_compliance;
pub mod subscribe_fixup;
pub mod tx_forwarder; pub mod tx_forwarder;
mod utils;

View File

@ -1,54 +0,0 @@
use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
use alloy_rpc_types::pubsub::{Params, SubscriptionKind};
use async_trait::async_trait;
use jsonrpsee::PendingSubscriptionSink;
use jsonrpsee_types::ErrorObject;
use reth::tasks::TaskSpawner;
use reth_rpc::EthPubSub;
use reth_rpc_convert::RpcTransaction;
use reth_rpc_eth_api::{EthApiTypes, EthPubSubApiServer};
use std::sync::Arc;
pub struct SubscribeFixup<Eth: EthWrapper> {
pubsub: Arc<EthPubSub<Eth>>,
provider: Arc<Eth::Provider>,
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
}
#[async_trait]
impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for SubscribeFixup<Eth>
where
ErrorObject<'static>: From<<Eth as EthApiTypes>::Error>,
{
async fn subscribe(
&self,
pending: PendingSubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let (pubsub, provider) = (self.pubsub.clone(), self.provider.clone());
self.subscription_task_spawner.spawn(Box::pin(async move {
if kind == SubscriptionKind::NewHeads {
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
} else {
let _ = pubsub.handle_accepted(sink, kind, params).await;
}
}));
Ok(())
}
}
impl<Eth: EthWrapper> SubscribeFixup<Eth> {
pub fn new(
pubsub: Arc<EthPubSub<Eth>>,
provider: Arc<Eth::Provider>,
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
) -> Self
where
Eth: EthWrapper,
ErrorObject<'static>: From<Eth::Error>,
{
Self { pubsub, provider, subscription_task_spawner }
}
}

View File

@ -1,90 +0,0 @@
use std::sync::Arc;
use crate::{HlBlock, HlPrimitives};
use alloy_primitives::U256;
use alloy_rpc_types::Header;
use futures::StreamExt;
use jsonrpsee::{SubscriptionMessage, SubscriptionSink};
use jsonrpsee_types::ErrorObject;
use reth_primitives::SealedHeader;
use reth_provider::{BlockReader, CanonStateSubscriptions};
use reth_rpc::{RpcTypes, eth::pubsub::SubscriptionSerializeError};
use reth_rpc_convert::{RpcBlock, RpcHeader, RpcReceipt, RpcTransaction, RpcTxReq};
use reth_rpc_eth_api::{
EthApiServer, FullEthApiTypes, RpcNodeCoreExt,
helpers::{EthBlocks, EthTransactions, LoadReceipt},
};
use serde::Serialize;
use tokio_stream::Stream;
pub trait EthWrapper:
EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
impl<T> EthWrapper for T where
T: EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<
Primitives = HlPrimitives,
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static
{
}
pub(super) async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>> {
loop {
tokio::select! {
_ = sink.closed() => break Ok(()),
maybe_item = stream.next() => {
let Some(item) = maybe_item else { break Ok(()) };
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
.map_err(SubscriptionSerializeError::from)?;
if sink.send(msg).await.is_err() { break Ok(()); }
}
}
}
}
pub(super) fn new_headers_stream<Eth: EthWrapper>(
provider: &Arc<Eth::Provider>,
) -> impl Stream<Item = Header<alloy_consensus::Header>> {
provider.canonical_state_stream().flat_map(|new_chain| {
let headers = new_chain
.committed()
.blocks_iter()
.map(|block| {
Header::from_consensus(
SealedHeader::new(block.header().inner.clone(), block.hash()).into(),
None,
Some(U256::from(block.rlp_length())),
)
})
.collect::<Vec<_>>();
futures::stream::iter(headers)
})
}

View File

@ -1,10 +1,7 @@
pub mod hl; pub mod hl;
pub mod parser; pub mod parser;
use crate::{ use crate::{hardforks::HlHardforks, node::primitives::{header::HlHeaderExtras, HlHeader}};
hardforks::HlHardforks,
node::primitives::{HlHeader, header::HlHeaderExtras},
};
use alloy_eips::eip7840::BlobParams; use alloy_eips::eip7840::BlobParams;
use alloy_genesis::Genesis; use alloy_genesis::Genesis;
use alloy_primitives::{Address, B256, U256}; use alloy_primitives::{Address, B256, U256};

View File

@ -1,16 +1,12 @@
use std::sync::Arc; use std::sync::Arc;
use clap::Parser; use clap::Parser;
use reth::{ use reth::builder::{NodeBuilder, NodeHandle, WithLaunchContext};
builder::{NodeBuilder, NodeHandle, WithLaunchContext},
rpc::{api::EthPubSubApiServer, eth::RpcNodeCore},
};
use reth_db::DatabaseEnv; use reth_db::DatabaseEnv;
use reth_hl::{ use reth_hl::{
addons::{ addons::{
call_forwarder::{self, CallForwarderApiServer}, call_forwarder::{self, CallForwarderApiServer},
hl_node_compliance::install_hl_node_compliance, hl_node_compliance::install_hl_node_compliance,
subscribe_fixup::SubscribeFixup,
tx_forwarder::{self, EthForwarderApiServer}, tx_forwarder::{self, EthForwarderApiServer},
}, },
chainspec::{HlChainSpec, parser::HlChainSpecParser}, chainspec::{HlChainSpec, parser::HlChainSpecParser},
@ -18,9 +14,7 @@ use reth_hl::{
HlNode, HlNode,
cli::{Cli, HlNodeArgs}, cli::{Cli, HlNodeArgs},
rpc::precompile::{HlBlockPrecompileApiServer, HlBlockPrecompileExt}, rpc::precompile::{HlBlockPrecompileApiServer, HlBlockPrecompileExt},
spot_meta::init as spot_meta_init,
storage::tables::Tables, storage::tables::Tables,
types::set_spot_metadata_db,
}, },
}; };
use tracing::info; use tracing::info;
@ -41,11 +35,8 @@ fn main() -> eyre::Result<()> {
ext: HlNodeArgs| async move { ext: HlNodeArgs| async move {
let default_upstream_rpc_url = builder.config().chain.official_rpc_url(); let default_upstream_rpc_url = builder.config().chain.official_rpc_url();
let (node, engine_handle_tx) = HlNode::new( let (node, engine_handle_tx) =
ext.block_source_args.parse().await?, HlNode::new(ext.block_source_args.parse().await?, ext.debug_cutoff_height);
ext.debug_cutoff_height,
ext.allow_network_overrides,
);
let NodeHandle { node, node_exit_future: exit_future } = builder let NodeHandle { node, node_exit_future: exit_future } = builder
.node(node) .node(node)
.extend_rpc_modules(move |mut ctx| { .extend_rpc_modules(move |mut ctx| {
@ -68,17 +59,6 @@ fn main() -> eyre::Result<()> {
info!("Call/gas estimation will be forwarded to {}", upstream_rpc_url); info!("Call/gas estimation will be forwarded to {}", upstream_rpc_url);
} }
// This is a temporary workaround to fix the issue with custom headers
// affects `eth_subscribe[type=newHeads]`
ctx.modules.replace_configured(
SubscribeFixup::new(
Arc::new(ctx.registry.eth_handlers().pubsub.clone()),
Arc::new(ctx.registry.eth_api().provider().clone()),
Box::new(ctx.node().task_executor.clone()),
)
.into_rpc(),
)?;
if ext.hl_node_compliant { if ext.hl_node_compliant {
install_hl_node_compliance(&mut ctx)?; install_hl_node_compliance(&mut ctx)?;
info!("hl-node compliant mode enabled"); info!("hl-node compliant mode enabled");
@ -97,16 +77,6 @@ fn main() -> eyre::Result<()> {
}) })
.apply(|mut builder| { .apply(|mut builder| {
builder.db_mut().create_tables_for::<Tables>().expect("create tables"); builder.db_mut().create_tables_for::<Tables>().expect("create tables");
let chain_id = builder.config().chain.inner.chain().id();
let db = builder.db_mut().clone();
// Set database handle for on-demand persistence
set_spot_metadata_db(db.clone());
// Load spot metadata from database and initialize cache
spot_meta_init::load_spot_metadata_cache(&db, chain_id);
builder builder
}) })
.launch() .launch()

View File

@ -2,7 +2,7 @@ use crate::{
chainspec::{HlChainSpec, parser::HlChainSpecParser}, chainspec::{HlChainSpec, parser::HlChainSpecParser},
node::{ node::{
HlNode, consensus::HlConsensus, evm::config::HlEvmConfig, migrate::Migrator, HlNode, consensus::HlConsensus, evm::config::HlEvmConfig, migrate::Migrator,
spot_meta::init as spot_meta_init, storage::tables::Tables, storage::tables::Tables,
}, },
pseudo_peer::BlockSourceArgs, pseudo_peer::BlockSourceArgs,
}; };
@ -82,13 +82,6 @@ pub struct HlNodeArgs {
/// * Refers to the Merkle trie used for eth_getProof and state root, not actual state values. /// * Refers to the Merkle trie used for eth_getProof and state root, not actual state values.
#[arg(long, env = "EXPERIMENTAL_ETH_GET_PROOF")] #[arg(long, env = "EXPERIMENTAL_ETH_GET_PROOF")]
pub experimental_eth_get_proof: bool, pub experimental_eth_get_proof: bool,
/// Allow network configuration overrides from CLI.
///
/// When enabled, network settings (discovery_addr, listener_addr, dns_discovery, nat)
/// will be taken from CLI arguments instead of being hardcoded to localhost-only defaults.
#[arg(long, env = "ALLOW_NETWORK_OVERRIDES")]
pub allow_network_overrides: bool,
} }
/// The main reth_hl cli interface. /// The main reth_hl cli interface.
@ -152,12 +145,8 @@ where
match self.command { match self.command {
Commands::Node(command) => runner.run_command_until_exit(|ctx| { Commands::Node(command) => runner.run_command_until_exit(|ctx| {
// NOTE: This is for one time migration around Oct 10 upgrade: Self::migrate_db(&command.chain, &command.datadir, &command.db)
// It's not necessary anymore, an environment variable gate is added here. .expect("Failed to migrate database");
if std::env::var("CHECK_DB_MIGRATION").is_ok() {
Self::migrate_db(&command.chain, &command.datadir, &command.db)
.expect("Failed to migrate database");
}
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher)) command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
}), }),
Commands::Init(command) => { Commands::Init(command) => {
@ -201,12 +190,7 @@ where
let data_dir = env.datadir.clone().resolve_datadir(env.chain.chain()); let data_dir = env.datadir.clone().resolve_datadir(env.chain.chain());
let db_path = data_dir.db(); let db_path = data_dir.db();
init_db(db_path.clone(), env.db.database_args())?; init_db(db_path.clone(), env.db.database_args())?;
init_db_for::<_, Tables>(db_path.clone(), env.db.database_args())?; init_db_for::<_, Tables>(db_path, env.db.database_args())?;
// Initialize spot metadata in database
let chain_id = env.chain.chain().id();
spot_meta_init::init_spot_metadata(db_path, env.db.database_args(), chain_id)?;
Ok(()) Ok(())
} }

View File

@ -1,8 +1,4 @@
use crate::{ use crate::{hardforks::HlHardforks, node::{primitives::HlHeader, HlNode}, HlBlock, HlBlockBody, HlPrimitives};
HlBlock, HlBlockBody, HlPrimitives,
hardforks::HlHardforks,
node::{HlNode, primitives::HlHeader},
};
use reth::{ use reth::{
api::{FullNodeTypes, NodeTypes}, api::{FullNodeTypes, NodeTypes},
beacon_consensus::EthBeaconConsensus, beacon_consensus::EthBeaconConsensus,

View File

@ -1,6 +1,5 @@
use crate::{ use crate::{
HlBlock, HlHeader, node::evm::config::{HlBlockExecutorFactory, HlEvmConfig}, HlBlock, HlHeader
node::evm::config::{HlBlockExecutorFactory, HlEvmConfig},
}; };
use reth_evm::{ use reth_evm::{
block::BlockExecutionError, block::BlockExecutionError,

View File

@ -1,48 +1,40 @@
use alloy_consensus::Header; use alloy_consensus::Header;
use alloy_primitives::{B256, BlockHash, Bytes, U256, b256, hex::ToHexExt}; use alloy_primitives::{b256, hex::ToHexExt, BlockHash, B256, U256};
use reth::{ use reth::{
api::NodeTypesWithDBAdapter, api::{NodeTypes, NodeTypesWithDBAdapter},
args::{DatabaseArgs, DatadirArgs}, args::{DatabaseArgs, DatadirArgs},
dirs::{ChainPath, DataDirPath}, dirs::{ChainPath, DataDirPath},
}; };
use reth_chainspec::EthChainSpec; use reth_chainspec::EthChainSpec;
use reth_db::{ use reth_db::{
DatabaseEnv, mdbx::{tx::Tx, RO},
mdbx::{RO, tx::Tx},
models::CompactU256, models::CompactU256,
static_file::iter_static_files, static_file::iter_static_files,
table::Decompress, table::Decompress,
tables, DatabaseEnv,
};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
transaction::{DbTx, DbTxMut},
}; };
use reth_errors::ProviderResult; use reth_errors::ProviderResult;
use reth_ethereum_primitives::EthereumReceipt;
use reth_provider::{ use reth_provider::{
DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory,
StaticFileSegment, StaticFileWriter,
providers::{NodeTypesForProvider, StaticFileProvider}, providers::{NodeTypesForProvider, StaticFileProvider},
static_file::SegmentRangeInclusive, static_file::SegmentRangeInclusive,
DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory,
StaticFileSegment, StaticFileWriter,
}; };
use std::{fs::File, io::Write, path::PathBuf, sync::Arc}; use std::{marker::PhantomData, path::PathBuf, sync::Arc};
use tracing::{info, warn}; use tracing::{info, warn};
use crate::{HlHeader, HlPrimitives, chainspec::HlChainSpec}; use crate::{chainspec::HlChainSpec, HlHeader, HlPrimitives};
pub(crate) trait HlNodeType: pub(super) struct Migrator<N: NodeTypesForProvider> {
NodeTypesForProvider<ChainSpec = HlChainSpec, Primitives = HlPrimitives>
{
}
impl<N: NodeTypesForProvider<ChainSpec = HlChainSpec, Primitives = HlPrimitives>> HlNodeType for N {}
pub(super) struct Migrator<N: HlNodeType> {
data_dir: ChainPath<DataDirPath>, data_dir: ChainPath<DataDirPath>,
provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>, provider_factory: ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
_nt: PhantomData<N>,
} }
impl<N: HlNodeType> Migrator<N> { impl<N: NodeTypesForProvider> Migrator<N>
where
N: NodeTypes<ChainSpec = HlChainSpec, Primitives = HlPrimitives>,
{
const MIGRATION_PATH_SUFFIX: &'static str = "migration-tmp"; const MIGRATION_PATH_SUFFIX: &'static str = "migration-tmp";
pub fn new( pub fn new(
@ -52,7 +44,7 @@ impl<N: HlNodeType> Migrator<N> {
) -> eyre::Result<Self> { ) -> eyre::Result<Self> {
let data_dir = datadir.clone().resolve_datadir(chain_spec.chain()); let data_dir = datadir.clone().resolve_datadir(chain_spec.chain());
let provider_factory = Self::provider_factory(chain_spec, datadir, database_args)?; let provider_factory = Self::provider_factory(chain_spec, datadir, database_args)?;
Ok(Self { data_dir, provider_factory }) Ok(Self { data_dir, provider_factory, _nt: PhantomData })
} }
pub fn sf_provider(&self) -> StaticFileProvider<HlPrimitives> { pub fn sf_provider(&self) -> StaticFileProvider<HlPrimitives> {
@ -74,12 +66,9 @@ impl<N: HlNodeType> Migrator<N> {
} }
fn migrate_db_inner(&self) -> eyre::Result<()> { fn migrate_db_inner(&self) -> eyre::Result<()> {
let migrated_mdbx = MigratorMdbx::<N>(self).migrate_mdbx()?; self.migrate_static_files()?;
let migrated_static_files = MigrateStaticFiles::<N>(self).migrate_static_files()?; self.migrate_mdbx()?;
info!("Database migrated successfully");
if migrated_mdbx || migrated_static_files {
info!("Database migrated successfully");
}
Ok(()) Ok(())
} }
@ -87,116 +76,6 @@ impl<N: HlNodeType> Migrator<N> {
self.data_dir.data_dir().join(Self::MIGRATION_PATH_SUFFIX) self.data_dir.data_dir().join(Self::MIGRATION_PATH_SUFFIX)
} }
fn provider_factory(
chain_spec: HlChainSpec,
datadir: DatadirArgs,
database_args: DatabaseArgs,
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>> {
let data_dir = datadir.clone().resolve_datadir(chain_spec.chain());
let db_env = reth_db::init_db(data_dir.db(), database_args.database_args())?;
let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?;
let db = Arc::new(db_env);
Ok(ProviderFactory::new(db, Arc::new(chain_spec), static_file_provider))
}
}
struct MigratorMdbx<'a, N: HlNodeType>(&'a Migrator<N>);
impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
fn migrate_mdbx(&self) -> eyre::Result<bool> {
// if any header is in old format, we need to migrate it, so we pick the first and last one
let db_env = self.0.provider_factory.provider()?;
let mut cursor = db_env.tx_ref().cursor_read::<tables::Headers<Bytes>>()?;
let migration_needed = {
let first_is_old = match cursor.first()? {
Some((number, header)) => using_old_header(number, &header),
None => false,
};
let last_is_old = match cursor.last()? {
Some((number, header)) => using_old_header(number, &header),
None => false,
};
first_is_old || last_is_old
};
if !migration_needed {
return Ok(false);
}
check_if_migration_enabled()?;
self.migrate_mdbx_inner()?;
Ok(true)
}
fn migrate_mdbx_inner(&self) -> eyre::Result<()> {
// There shouldn't be many headers in mdbx, but using file for safety
info!("Old database detected, migrating mdbx...");
let conversion_tmp = self.0.conversion_tmp_dir();
let tmp_path = conversion_tmp.join("headers.rmp");
if conversion_tmp.exists() {
std::fs::remove_dir_all(&conversion_tmp)?;
}
std::fs::create_dir_all(&conversion_tmp)?;
let count = self.export_old_headers(&tmp_path)?;
self.import_new_headers(tmp_path, count)?;
Ok(())
}
fn export_old_headers(&self, tmp_path: &PathBuf) -> Result<i32, eyre::Error> {
let db_env = self.0.provider_factory.provider()?;
let mut cursor_read = db_env.tx_ref().cursor_read::<tables::Headers<Bytes>>()?;
let mut tmp_writer = File::create(tmp_path)?;
let mut count = 0;
let old_headers = cursor_read.walk(None)?.filter_map(|row| {
let (block_number, header) = row.ok()?;
if !using_old_header(block_number, &header) {
None
} else {
Some((block_number, Header::decompress(&header).ok()?))
}
});
for (block_number, header) in old_headers {
let receipt =
db_env.receipts_by_block(block_number.into())?.expect("Receipt not found");
let new_header = to_hl_header(receipt, header);
tmp_writer.write_all(&rmp_serde::to_vec(&(block_number, new_header))?)?;
count += 1;
}
Ok(count)
}
fn import_new_headers(&self, tmp_path: PathBuf, count: i32) -> Result<(), eyre::Error> {
let mut tmp_reader = File::open(tmp_path)?;
let db_env = self.0.provider_factory.provider_rw()?;
let mut cursor_write = db_env.tx_ref().cursor_write::<tables::Headers<Bytes>>()?;
for _ in 0..count {
let (number, header) = rmp_serde::from_read::<_, (u64, HlHeader)>(&mut tmp_reader)?;
cursor_write.upsert(number, &rmp_serde::to_vec(&header)?.into())?;
}
db_env.commit()?;
Ok(())
}
}
fn check_if_migration_enabled() -> Result<(), eyre::Error> {
if std::env::var("EXPERIMENTAL_MIGRATE_DB").is_err() {
let err_msg = concat!(
"Detected an old database format but experimental database migration is currently disabled. ",
"To enable migration, set EXPERIMENTAL_MIGRATE_DB=1, or alternatively, resync your node (safest option)."
);
warn!("{}", err_msg);
return Err(eyre::eyre!("{}", err_msg));
}
Ok(())
}
struct MigrateStaticFiles<'a, N: HlNodeType>(&'a Migrator<N>);
impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
fn iterate_files_for_segment( fn iterate_files_for_segment(
&self, &self,
block_range: SegmentRangeInclusive, block_range: SegmentRangeInclusive,
@ -223,8 +102,8 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
fn create_placeholder(&self, block_range: SegmentRangeInclusive) -> eyre::Result<()> { fn create_placeholder(&self, block_range: SegmentRangeInclusive) -> eyre::Result<()> {
// The direction is opposite here // The direction is opposite here
let src = self.0.data_dir.static_files(); let src = self.data_dir.static_files();
let dst = self.0.conversion_tmp_dir(); let dst = self.conversion_tmp_dir();
for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? { for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? {
let dst_path = dst.join(file_name); let dst_path = dst.join(file_name);
@ -241,8 +120,8 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
&self, &self,
block_range: SegmentRangeInclusive, block_range: SegmentRangeInclusive,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let src = self.0.conversion_tmp_dir(); let src = self.conversion_tmp_dir();
let dst = self.0.data_dir.static_files(); let dst = self.data_dir.static_files();
for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? { for (src_path, file_name) in self.iterate_files_for_segment(block_range, &src)? {
let dst_path = dst.join(file_name); let dst_path = dst.join(file_name);
@ -254,9 +133,9 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
self.create_placeholder(block_range) self.create_placeholder(block_range)
} }
fn migrate_static_files(&self) -> eyre::Result<bool> { fn migrate_static_files(&self) -> eyre::Result<()> {
let conversion_tmp = self.0.conversion_tmp_dir(); let conversion_tmp = self.conversion_tmp_dir();
let old_path = self.0.data_dir.static_files(); let old_path = self.data_dir.static_files();
if conversion_tmp.exists() { if conversion_tmp.exists() {
std::fs::remove_dir_all(&conversion_tmp)?; std::fs::remove_dir_all(&conversion_tmp)?;
@ -266,6 +145,7 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
let mut all_static_files = iter_static_files(&old_path)?; let mut all_static_files = iter_static_files(&old_path)?;
let all_static_files = let all_static_files =
all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default(); all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default();
let provider = self.provider_factory.provider()?;
let mut first = true; let mut first = true;
@ -279,34 +159,60 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
} }
if first { if first {
check_if_migration_enabled()?; info!("Old database detected, migrating database...");
info!("Old database detected, migrating static files...");
first = false; first = false;
} }
let sf_provider = self.0.sf_provider(); let sf_provider = self.sf_provider();
let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?; let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?;
let provider = self.0.provider_factory.provider()?;
let block_range_for_filename = sf_provider.find_fixed_range(block_range.start()); let block_range_for_filename = sf_provider.find_fixed_range(block_range.start());
migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?; migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?;
self.move_static_files_for_segment(block_range_for_filename)?; self.move_static_files_for_segment(block_range_for_filename)?;
} }
Ok(!first) Ok(())
}
fn provider_factory(
chain_spec: HlChainSpec,
datadir: DatadirArgs,
database_args: DatabaseArgs,
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>> {
let data_dir = datadir.clone().resolve_datadir(chain_spec.chain());
let db_env = reth_db::init_db(data_dir.db(), database_args.database_args())?;
let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?;
let db = Arc::new(db_env);
Ok(ProviderFactory::new(db, Arc::new(chain_spec), static_file_provider))
}
fn migrate_mdbx(&self) -> eyre::Result<()> {
// Actually not much here, all of blocks should be in the static files
Ok(())
} }
fn using_old_header(&self, number: u64) -> eyre::Result<bool> { fn using_old_header(&self, number: u64) -> eyre::Result<bool> {
let sf_provider = self.0.sf_provider(); let sf_provider = self.sf_provider();
let content = old_headers_range(&sf_provider, number..=number)?; let content = old_headers_range(&sf_provider, number..=number)?;
let &[row] = &content.as_slice() else { let &[row] = &content.as_slice() else {
warn!("No header found for block {}", number); warn!("No header found for block {}", number);
return Ok(false); return Ok(false);
}; };
let header = &row[0];
Ok(using_old_header(number, &row[0])) let deserialized_old = is_old_header(header);
let deserialized_new = is_new_header(header);
assert!(
deserialized_old ^ deserialized_new,
"Header is not valid: {} {}\ndeserialized_old: {}\ndeserialized_new: {}",
number,
header.encode_hex(),
deserialized_old,
deserialized_new
);
Ok(deserialized_old && !deserialized_new)
} }
} }
@ -336,7 +242,7 @@ fn is_new_header(header: &[u8]) -> bool {
rmp_serde::from_slice::<HlHeader>(header).is_ok() rmp_serde::from_slice::<HlHeader>(header).is_ok()
} }
fn migrate_single_static_file<N: HlNodeType>( fn migrate_single_static_file<N: NodeTypesForProvider<Primitives = HlPrimitives>>(
sf_out: &StaticFileProvider<HlPrimitives>, sf_out: &StaticFileProvider<HlPrimitives>,
sf_in: &StaticFileProvider<HlPrimitives>, sf_in: &StaticFileProvider<HlPrimitives>,
provider: &DatabaseProvider<Tx<RO>, NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>, provider: &DatabaseProvider<Tx<RO>, NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>,
@ -344,9 +250,9 @@ fn migrate_single_static_file<N: HlNodeType>(
) -> Result<(), eyre::Error> { ) -> Result<(), eyre::Error> {
info!("Migrating block range {}...", block_range); info!("Migrating block range {}...", block_range);
// block_ranges into chunks of 50000 blocks // block_ranges into chunks of 100000 blocks
const CHUNK_SIZE: u64 = 50000; const CHUNK_SIZE: u64 = 100000;
for chunk in (block_range.start()..=block_range.end()).step_by(CHUNK_SIZE as usize) { for chunk in (0..=block_range.end()).step_by(CHUNK_SIZE as usize) {
let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end()); let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end());
let block_range = chunk..=end; let block_range = chunk..=end;
let headers = old_headers_range(sf_in, block_range.clone())?; let headers = old_headers_range(sf_in, block_range.clone())?;
@ -355,8 +261,11 @@ fn migrate_single_static_file<N: HlNodeType>(
let mut writer = sf_out.get_writer(*block_range.start(), StaticFileSegment::Headers)?; let mut writer = sf_out.get_writer(*block_range.start(), StaticFileSegment::Headers)?;
let new_headers = std::iter::zip(headers, receipts) let new_headers = std::iter::zip(headers, receipts)
.map(|(header, receipts)| { .map(|(header, receipts)| {
let system_tx_count =
receipts.iter().filter(|r| r.cumulative_gas_used == 0).count();
let eth_header = Header::decompress(&header[0]).unwrap(); let eth_header = Header::decompress(&header[0]).unwrap();
let hl_header = to_hl_header(receipts, eth_header); let hl_header =
HlHeader::from_ethereum_header(eth_header, &receipts, system_tx_count as u64);
let difficulty: U256 = CompactU256::decompress(&header[1]).unwrap().into(); let difficulty: U256 = CompactU256::decompress(&header[1]).unwrap().into();
let hash = BlockHash::decompress(&header[2]).unwrap(); let hash = BlockHash::decompress(&header[2]).unwrap();
@ -372,11 +281,6 @@ fn migrate_single_static_file<N: HlNodeType>(
Ok(()) Ok(())
} }
fn to_hl_header(receipts: Vec<EthereumReceipt>, eth_header: Header) -> HlHeader {
let system_tx_count = receipts.iter().filter(|r| r.cumulative_gas_used == 0).count();
HlHeader::from_ethereum_header(eth_header, &receipts, system_tx_count as u64)
}
fn old_headers_range( fn old_headers_range(
provider: &StaticFileProvider<HlPrimitives>, provider: &StaticFileProvider<HlPrimitives>,
block_range: impl std::ops::RangeBounds<u64>, block_range: impl std::ops::RangeBounds<u64>,
@ -412,18 +316,3 @@ fn to_range<R: std::ops::RangeBounds<u64>>(bounds: R) -> std::ops::Range<u64> {
start..end start..end
} }
fn using_old_header(number: u64, header: &[u8]) -> bool {
let deserialized_old = is_old_header(header);
let deserialized_new = is_new_header(header);
assert!(
deserialized_old ^ deserialized_new,
"Header is not valid: {} {}\ndeserialized_old: {}\ndeserialized_new: {}",
number,
header.encode_hex(),
deserialized_old,
deserialized_new
);
deserialized_old && !deserialized_new
}

View File

@ -51,14 +51,12 @@ pub struct HlNode {
engine_handle_rx: Arc<Mutex<Option<oneshot::Receiver<ConsensusEngineHandle<HlPayloadTypes>>>>>, engine_handle_rx: Arc<Mutex<Option<oneshot::Receiver<ConsensusEngineHandle<HlPayloadTypes>>>>>,
block_source_config: BlockSourceConfig, block_source_config: BlockSourceConfig,
debug_cutoff_height: Option<u64>, debug_cutoff_height: Option<u64>,
allow_network_overrides: bool,
} }
impl HlNode { impl HlNode {
pub fn new( pub fn new(
block_source_config: BlockSourceConfig, block_source_config: BlockSourceConfig,
debug_cutoff_height: Option<u64>, debug_cutoff_height: Option<u64>,
allow_network_overrides: bool,
) -> (Self, oneshot::Sender<ConsensusEngineHandle<HlPayloadTypes>>) { ) -> (Self, oneshot::Sender<ConsensusEngineHandle<HlPayloadTypes>>) {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
( (
@ -66,7 +64,6 @@ impl HlNode {
engine_handle_rx: Arc::new(Mutex::new(Some(rx))), engine_handle_rx: Arc::new(Mutex::new(Some(rx))),
block_source_config, block_source_config,
debug_cutoff_height, debug_cutoff_height,
allow_network_overrides,
}, },
tx, tx,
) )
@ -98,7 +95,6 @@ impl HlNode {
engine_handle_rx: self.engine_handle_rx.clone(), engine_handle_rx: self.engine_handle_rx.clone(),
block_source_config: self.block_source_config.clone(), block_source_config: self.block_source_config.clone(),
debug_cutoff_height: self.debug_cutoff_height, debug_cutoff_height: self.debug_cutoff_height,
allow_network_overrides: self.allow_network_overrides,
}) })
.consensus(HlConsensusBuilder::default()) .consensus(HlConsensusBuilder::default())
} }

View File

@ -179,7 +179,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::{HlHeader, chainspec::hl::hl_mainnet}; use crate::{chainspec::hl::hl_mainnet, HlHeader};
use super::*; use super::*;
use alloy_primitives::{B256, U128}; use alloy_primitives::{B256, U128};

View File

@ -25,10 +25,7 @@ use reth_network::{NetworkConfig, NetworkHandle, NetworkManager};
use reth_network_api::PeersInfo; use reth_network_api::PeersInfo;
use reth_provider::StageCheckpointReader; use reth_provider::StageCheckpointReader;
use reth_stages_types::StageId; use reth_stages_types::StageId;
use std::{ use std::sync::Arc;
net::{Ipv4Addr, SocketAddr},
sync::Arc,
};
use tokio::sync::{Mutex, mpsc, oneshot}; use tokio::sync::{Mutex, mpsc, oneshot};
use tracing::info; use tracing::info;
@ -147,8 +144,6 @@ pub struct HlNetworkBuilder {
pub(crate) block_source_config: BlockSourceConfig, pub(crate) block_source_config: BlockSourceConfig,
pub(crate) debug_cutoff_height: Option<u64>, pub(crate) debug_cutoff_height: Option<u64>,
pub(crate) allow_network_overrides: bool,
} }
impl HlNetworkBuilder { impl HlNetworkBuilder {
@ -179,24 +174,15 @@ impl HlNetworkBuilder {
ImportService::new(consensus, handle, from_network, to_network).await.unwrap(); ImportService::new(consensus, handle, from_network, to_network).await.unwrap();
}); });
let mut config_builder = ctx.network_config_builder()?; Ok(ctx.build_network_config(
ctx.network_config_builder()?
// Only apply localhost-only network settings if network overrides are NOT allowed
if !self.allow_network_overrides {
config_builder = config_builder
.discovery_addr(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
.listener_addr(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
.disable_dns_discovery() .disable_dns_discovery()
.disable_nat(); .disable_nat()
} .boot_nodes(boot_nodes())
.set_head(ctx.head())
config_builder = config_builder .with_pow()
.boot_nodes(boot_nodes()) .block_import(Box::new(HlBlockImport::new(handle))),
.set_head(ctx.head()) ))
.with_pow()
.block_import(Box::new(HlBlockImport::new(handle)));
Ok(ctx.build_network_config(config_builder))
} }
} }

View File

@ -3,13 +3,8 @@ use alloy_primitives::Address;
use reth_primitives_traits::{BlockBody as BlockBodyTrait, InMemorySize}; use reth_primitives_traits::{BlockBody as BlockBodyTrait, InMemorySize};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{ use crate::node::types::{ReadPrecompileCall, ReadPrecompileCalls};
HlHeader, use crate::{HlHeader, node::primitives::TransactionSigned};
node::{
primitives::TransactionSigned,
types::{ReadPrecompileCall, ReadPrecompileCalls},
},
};
/// Block body for HL. It is equivalent to Ethereum [`BlockBody`] but additionally stores sidecars /// Block body for HL. It is equivalent to Ethereum [`BlockBody`] but additionally stores sidecars
/// for blob transactions. /// for blob transactions.
@ -38,11 +33,13 @@ pub type BlockBody = alloy_consensus::BlockBody<TransactionSigned, HlHeader>;
impl InMemorySize for HlBlockBody { impl InMemorySize for HlBlockBody {
fn size(&self) -> usize { fn size(&self) -> usize {
self.inner.size() + self.inner.size()
self.sidecars + self
.sidecars
.as_ref() .as_ref()
.map_or(0, |s| s.capacity() * core::mem::size_of::<BlobTransactionSidecar>()) + .map_or(0, |s| s.capacity() * core::mem::size_of::<BlobTransactionSidecar>())
self.read_precompile_calls + self
.read_precompile_calls
.as_ref() .as_ref()
.map_or(0, |s| s.0.capacity() * core::mem::size_of::<ReadPrecompileCall>()) .map_or(0, |s| s.0.capacity() * core::mem::size_of::<ReadPrecompileCall>())
} }

View File

@ -45,11 +45,7 @@ pub struct HlHeaderExtras {
} }
impl HlHeader { impl HlHeader {
pub(crate) fn from_ethereum_header( pub(crate) fn from_ethereum_header(header: Header, receipts: &[EthereumReceipt], system_tx_count: u64) -> HlHeader {
header: Header,
receipts: &[EthereumReceipt],
system_tx_count: u64,
) -> HlHeader {
let logs_bloom = logs_bloom(receipts.iter().flat_map(|r| &r.logs)); let logs_bloom = logs_bloom(receipts.iter().flat_map(|r| &r.logs));
HlHeader { HlHeader {
inner: header, inner: header,
@ -187,9 +183,8 @@ impl reth_codecs::Compact for HlHeader {
// because Compact trait requires the Bytes field to be placed at the end of the struct. // because Compact trait requires the Bytes field to be placed at the end of the struct.
// Bytes::from_compact just reads all trailing data as the Bytes field. // Bytes::from_compact just reads all trailing data as the Bytes field.
// //
// Hence we need to use other form of serialization, since extra headers are not // Hence we need to use other form of serialization, since extra headers are not Compact-compatible.
// Compact-compatible. We just treat all header fields as rmp-serialized one `Bytes` // We just treat all header fields as rmp-serialized one `Bytes` field.
// field.
let result: Bytes = rmp_serde::to_vec(&self).unwrap().into(); let result: Bytes = rmp_serde::to_vec(&self).unwrap().into();
result.to_compact(buf) result.to_compact(buf)
} }

View File

@ -1,6 +1,6 @@
#![allow(clippy::owned_cow)] #![allow(clippy::owned_cow)]
use super::{HlBlock, HlBlockBody, TransactionSigned}; use super::{HlBlock, HlBlockBody, TransactionSigned};
use crate::{HlHeader, node::types::ReadPrecompileCalls}; use crate::{node::types::ReadPrecompileCalls, HlHeader};
use alloy_consensus::{BlobTransactionSidecar, BlockBody}; use alloy_consensus::{BlobTransactionSidecar, BlockBody};
use alloy_eips::eip4895::Withdrawals; use alloy_eips::eip4895::Withdrawals;
use alloy_primitives::Address; use alloy_primitives::Address;

View File

@ -6,10 +6,7 @@ use serde::{Deserialize, Serialize};
use std::borrow::Cow; use std::borrow::Cow;
use super::{HlBlock, HlBlockBody}; use super::{HlBlock, HlBlockBody};
use crate::{ use crate::{node::{primitives::BlockBody, types::ReadPrecompileCalls}, HlHeader};
HlHeader,
node::{primitives::BlockBody, types::ReadPrecompileCalls},
};
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct HlBlockBodyBincode<'a> { pub struct HlBlockBodyBincode<'a> {

View File

@ -1,103 +0,0 @@
use crate::node::{
spot_meta::{SpotId, erc20_contract_to_spot_token},
storage::tables::{self, SPOT_METADATA_KEY},
types::reth_compat,
};
use alloy_primitives::Address;
use reth_db::{
DatabaseEnv,
cursor::DbCursorRO,
};
use reth_db_api::{
Database,
transaction::DbTx,
};
use std::{collections::BTreeMap, sync::Arc};
use tracing::info;
/// Load spot metadata from database and initialize cache
pub fn load_spot_metadata_cache(db: &Arc<DatabaseEnv>, chain_id: u64) {
// Try to read from database
let data = match db.view(|tx| -> Result<Option<Vec<u8>>, reth_db::DatabaseError> {
let mut cursor = tx.cursor_read::<tables::SpotMetadata>()?;
Ok(cursor.seek_exact(SPOT_METADATA_KEY)?.map(|(_, data)| data.to_vec()))
}) {
Ok(Ok(data)) => data,
Ok(Err(e)) => {
info!(
"Failed to read spot metadata from database: {}. Will fetch on-demand from API.",
e
);
return;
}
Err(e) => {
info!(
"Database view error while loading spot metadata: {}. Will fetch on-demand from API.",
e
);
return;
}
};
// Check if data exists
let Some(data) = data else {
info!(
"No spot metadata found in database for chain {}. Run 'init-state' to populate, or it will be fetched on-demand from API.",
chain_id
);
return;
};
// Deserialize metadata
let serializable_map = match rmp_serde::from_slice::<BTreeMap<Address, u64>>(&data) {
Ok(map) => map,
Err(e) => {
info!("Failed to deserialize spot metadata: {}. Will fetch on-demand from API.", e);
return;
}
};
// Convert and initialize cache
let metadata: BTreeMap<Address, SpotId> =
serializable_map.into_iter().map(|(addr, index)| (addr, SpotId { index })).collect();
info!("Loaded spot metadata from database ({} entries)", metadata.len());
reth_compat::initialize_spot_metadata_cache(metadata);
}
/// Initialize spot metadata in database from API
pub fn init_spot_metadata(
db_path: impl AsRef<std::path::Path>,
db_args: reth_db::mdbx::DatabaseArguments,
chain_id: u64,
) -> eyre::Result<()> {
info!("Initializing spot metadata for chain {}", chain_id);
let db = Arc::new(reth_db::open_db(db_path.as_ref(), db_args)?);
// Check if spot metadata already exists
let exists = db.view(|tx| -> Result<bool, reth_db::DatabaseError> {
let mut cursor = tx.cursor_read::<tables::SpotMetadata>()?;
Ok(cursor.seek_exact(SPOT_METADATA_KEY)?.is_some())
})??;
if exists {
info!("Spot metadata already exists in database");
return Ok(());
}
// Fetch from API
let metadata = match erc20_contract_to_spot_token(chain_id) {
Ok(m) => m,
Err(e) => {
info!("Failed to fetch spot metadata from API: {}. Will be fetched on-demand.", e);
return Ok(());
}
};
// Store to database
reth_compat::store_spot_metadata(&db, &metadata)?;
info!("Successfully fetched and stored spot metadata for chain {}", chain_id);
Ok(())
}

View File

@ -5,7 +5,6 @@ use std::collections::BTreeMap;
use crate::chainspec::{MAINNET_CHAIN_ID, TESTNET_CHAIN_ID}; use crate::chainspec::{MAINNET_CHAIN_ID, TESTNET_CHAIN_ID};
pub mod init;
mod patch; mod patch;
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -26,7 +25,7 @@ pub struct SpotMeta {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct SpotId { pub(crate) struct SpotId {
pub index: u64, pub index: u64,
} }

View File

@ -2,21 +2,10 @@ use alloy_primitives::{BlockNumber, Bytes};
use reth_db::{TableSet, TableType, TableViewer, table::TableInfo, tables}; use reth_db::{TableSet, TableType, TableViewer, table::TableInfo, tables};
use std::fmt; use std::fmt;
/// Static key used for spot metadata, as the database is unique to each chain.
/// This may later serve as a versioning key to assist with future database migrations.
pub const SPOT_METADATA_KEY: u64 = 0;
tables! { tables! {
/// Read precompile calls for each block. /// Read precompile calls for each block.
table BlockReadPrecompileCalls { table BlockReadPrecompileCalls {
type Key = BlockNumber; type Key = BlockNumber;
type Value = Bytes; type Value = Bytes;
} }
/// Spot metadata mapping (EVM address to spot token index).
/// Uses a constant key since the database is chain-specific.
table SpotMetadata {
type Key = u64;
type Value = Bytes;
}
} }

View File

@ -19,9 +19,6 @@ pub struct ReadPrecompileCalls(pub Vec<ReadPrecompileCall>);
pub(crate) mod reth_compat; pub(crate) mod reth_compat;
// Re-export spot metadata functions
pub use reth_compat::{initialize_spot_metadata_cache, set_spot_metadata_db};
#[derive(Debug, Clone, Serialize, Deserialize, Default)] #[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct HlExtras { pub struct HlExtras {
pub read_precompile_calls: Option<ReadPrecompileCalls>, pub read_precompile_calls: Option<ReadPrecompileCalls>,
@ -130,19 +127,6 @@ pub struct SystemTx {
pub receipt: Option<LegacyReceipt>, pub receipt: Option<LegacyReceipt>,
} }
impl SystemTx {
pub fn gas_limit(&self) -> u64 {
use reth_compat::Transaction;
match &self.tx {
Transaction::Legacy(tx) => tx.gas_limit,
Transaction::Eip2930(tx) => tx.gas_limit,
Transaction::Eip1559(tx) => tx.gas_limit,
Transaction::Eip4844(tx) => tx.gas_limit,
Transaction::Eip7702(tx) => tx.gas_limit,
}
}
}
#[derive( #[derive(
Debug, Debug,
Clone, Clone,

View File

@ -1,14 +1,11 @@
//! Copy of reth codebase to preserve serialization compatibility //! Copy of reth codebase to preserve serialization compatibility
use crate::node::storage::tables::{SPOT_METADATA_KEY, SpotMetadata};
use alloy_consensus::{Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy}; use alloy_consensus::{Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy};
use alloy_primitives::{Address, BlockHash, Bytes, Signature, TxKind, U256}; use alloy_primitives::{Address, BlockHash, Signature, TxKind, U256};
use reth_db::{DatabaseEnv, DatabaseError, cursor::DbCursorRW};
use reth_db_api::{Database, transaction::DbTxMut};
use reth_primitives::TransactionSigned as RethTxSigned; use reth_primitives::TransactionSigned as RethTxSigned;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
collections::BTreeMap, collections::BTreeMap,
sync::{Arc, LazyLock, Mutex, RwLock}, sync::{Arc, LazyLock, RwLock},
}; };
use tracing::info; use tracing::info;
@ -84,79 +81,31 @@ pub struct SealedBlock {
pub body: BlockBody, pub body: BlockBody,
} }
static SPOT_EVM_MAP: LazyLock<Arc<RwLock<BTreeMap<Address, SpotId>>>> =
LazyLock::new(|| Arc::new(RwLock::new(BTreeMap::new())));
// Optional database handle for persisting on-demand fetches
static DB_HANDLE: LazyLock<Mutex<Option<Arc<DatabaseEnv>>>> = LazyLock::new(|| Mutex::new(None));
/// Set the database handle for persisting spot metadata
pub fn set_spot_metadata_db(db: Arc<DatabaseEnv>) {
*DB_HANDLE.lock().unwrap() = Some(db);
}
/// Initialize the spot metadata cache with data loaded from database.
/// This should be called during node initialization.
pub fn initialize_spot_metadata_cache(metadata: BTreeMap<Address, SpotId>) {
*SPOT_EVM_MAP.write().unwrap() = metadata;
}
/// Helper function to serialize and store spot metadata to database
pub fn store_spot_metadata(
db: &Arc<DatabaseEnv>,
metadata: &BTreeMap<Address, SpotId>,
) -> Result<(), DatabaseError> {
db.update(|tx| {
let mut cursor = tx.cursor_write::<SpotMetadata>()?;
// Serialize to BTreeMap<Address, u64>
let serializable_map: BTreeMap<Address, u64> =
metadata.iter().map(|(addr, spot)| (*addr, spot.index)).collect();
cursor.upsert(
SPOT_METADATA_KEY,
&Bytes::from(
rmp_serde::to_vec(&serializable_map).expect("Failed to serialize spot metadata"),
),
)?;
Ok(())
})?
}
/// Persist spot metadata to database if handle is available
fn persist_spot_metadata_to_db(metadata: &BTreeMap<Address, SpotId>) {
if let Some(db) = DB_HANDLE.lock().unwrap().as_ref() {
match store_spot_metadata(db, metadata) {
Ok(_) => info!("Persisted spot metadata to database"),
Err(e) => info!("Failed to persist spot metadata to database: {}", e),
}
}
}
fn system_tx_to_reth_transaction(transaction: &SystemTx, chain_id: u64) -> TxSigned { fn system_tx_to_reth_transaction(transaction: &SystemTx, chain_id: u64) -> TxSigned {
let Transaction::Legacy(tx) = &transaction.tx else { static EVM_MAP: LazyLock<Arc<RwLock<BTreeMap<Address, SpotId>>>> =
panic!("Unexpected transaction type"); LazyLock::new(|| Arc::new(RwLock::new(BTreeMap::new())));
}; {
let TxKind::Call(to) = tx.to else { let Transaction::Legacy(tx) = &transaction.tx else {
panic!("Unexpected contract creation"); panic!("Unexpected transaction type");
}; };
let s = if tx.input.is_empty() { let TxKind::Call(to) = tx.to else {
U256::from(0x1) panic!("Unexpected contract creation");
} else { };
loop { let s = if tx.input.is_empty() {
if let Some(spot) = SPOT_EVM_MAP.read().unwrap().get(&to) { U256::from(0x1)
break spot.to_s(); } else {
} loop {
if let Some(spot) = EVM_MAP.read().unwrap().get(&to) {
break spot.to_s();
}
// Cache miss - fetch from API, update cache, and persist to database info!("Contract not found: {to:?} from spot mapping, fetching again...");
info!("Contract not found: {to:?} from spot mapping, fetching from API..."); *EVM_MAP.write().unwrap() = erc20_contract_to_spot_token(chain_id).unwrap();
let metadata = erc20_contract_to_spot_token(chain_id).unwrap(); }
*SPOT_EVM_MAP.write().unwrap() = metadata.clone(); };
persist_spot_metadata_to_db(&metadata); let signature = Signature::new(U256::from(0x1), s, true);
} TxSigned::Default(RethTxSigned::Legacy(Signed::new_unhashed(tx.clone(), signature)))
}; }
let signature = Signature::new(U256::from(0x1), s, true);
TxSigned::Default(RethTxSigned::Legacy(Signed::new_unhashed(tx.clone(), signature)))
} }
impl SealedBlock { impl SealedBlock {
@ -164,13 +113,10 @@ impl SealedBlock {
&self, &self,
read_precompile_calls: ReadPrecompileCalls, read_precompile_calls: ReadPrecompileCalls,
highest_precompile_address: Option<Address>, highest_precompile_address: Option<Address>,
mut system_txs: Vec<super::SystemTx>, system_txs: Vec<super::SystemTx>,
receipts: Vec<LegacyReceipt>, receipts: Vec<LegacyReceipt>,
chain_id: u64, chain_id: u64,
) -> HlBlock { ) -> HlBlock {
// NOTE: These types of transactions are tracked at #97.
system_txs.retain(|tx| tx.receipt.is_some());
let mut merged_txs = vec![]; let mut merged_txs = vec![];
merged_txs.extend(system_txs.iter().map(|tx| system_tx_to_reth_transaction(tx, chain_id))); merged_txs.extend(system_txs.iter().map(|tx| system_tx_to_reth_transaction(tx, chain_id)));
merged_txs.extend(self.body.transactions.iter().map(|tx| tx.to_reth_transaction())); merged_txs.extend(self.body.transactions.iter().map(|tx| tx.to_reth_transaction()));

View File

@ -81,13 +81,13 @@ impl BlockPoller {
.await .await
.ok_or(eyre::eyre!("Failed to find latest block number"))?; .ok_or(eyre::eyre!("Failed to find latest block number"))?;
loop { if let Some(debug_cutoff_height) = debug_cutoff_height &&
if let Some(debug_cutoff_height) = debug_cutoff_height && next_block_number > debug_cutoff_height
next_block_number > debug_cutoff_height {
{ next_block_number = debug_cutoff_height;
next_block_number = debug_cutoff_height; }
}
loop {
match block_source.collect_block(next_block_number).await { match block_source.collect_block(next_block_number).await {
Ok(block) => { Ok(block) => {
block_tx.send((next_block_number, block)).await?; block_tx.send((next_block_number, block)).await?;

View File

@ -1,49 +0,0 @@
#!/bin/bash
set -e
export ETH_RPC_URL="${ETH_RPC_URL:-wss://hl-archive-node.xyz}"
success() {
echo "Success: $1"
}
fail() {
echo "Failed: $1"
exit 1
}
ensure_cmd() {
command -v "$1" > /dev/null 2>&1 || fail "$1 is required"
}
ensure_cmd jq
ensure_cmd cast
ensure_cmd wscat
if [[ ! "$ETH_RPC_URL" =~ ^wss?:// ]]; then
fail "ETH_RPC_URL must be a websocket url"
fi
TITLE="Issue #78 - eth_getLogs should return system transactions"
cast logs \
--rpc-url "$ETH_RPC_URL" \
--from-block 15312567 \
--to-block 15312570 \
--address 0x9fdbda0a5e284c32744d2f17ee5c74b284993463 \
0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef \
| grep -q "0x00000000000000000000000020000000000000000000000000000000000000c5" \
&& success "$TITLE" || fail "$TITLE"
TITLE="Issue #78 - eth_getBlockByNumber should return the same logsBloom as official RPC"
OFFICIAL_RPC="https://rpc.hyperliquid.xyz/evm"
A=$(cast block 1394092 --rpc-url "$ETH_RPC_URL" -f logsBloom | md5sum)
B=$(cast block 1394092 --rpc-url "$OFFICIAL_RPC" -f logsBloom | md5sum)
echo node "$A"
echo rpc\ "$B"
[[ "$A" == "$B" ]] && success "$TITLE" || fail "$TITLE"
TITLE="eth_subscribe newHeads via wscat"
CMD='{"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params":["newHeads"]}'
wscat -w 2 -c "$ETH_RPC_URL" -x "$CMD" | tail -1 | jq -r .params.result.nonce | grep 0x \
&& success "$TITLE" || fail "$TITLE"