mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Compare commits
21 Commits
nb-2025100
...
6bf25558b9
| Author | SHA1 | Date | |
|---|---|---|---|
| 6bf25558b9 | |||
| f915aba568 | |||
| 1fe03bfc41 | |||
| 893822e5b0 | |||
| c2528ce223 | |||
| d46e808b8d | |||
| 497353fd2f | |||
| eee6eeb2fc | |||
| 611e6867bf | |||
| 6c3ed63c3c | |||
| 51924e9671 | |||
| 8f15aa311f | |||
| bc66716a41 | |||
| fc819dbba2 | |||
| 1c5a22a814 | |||
| 852e186b1a | |||
| f83326059f | |||
| ca8c374116 | |||
| 5ba12a4850 | |||
| 8a179a6d9e | |||
| d570cf3e8d |
@ -19,62 +19,23 @@ use alloy_rpc_types::{
|
|||||||
TransactionInfo,
|
TransactionInfo,
|
||||||
pubsub::{Params, SubscriptionKind},
|
pubsub::{Params, SubscriptionKind},
|
||||||
};
|
};
|
||||||
use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink, proc_macros::rpc};
|
use jsonrpsee::{PendingSubscriptionSink, proc_macros::rpc};
|
||||||
use jsonrpsee_core::{RpcResult, async_trait};
|
use jsonrpsee_core::{RpcResult, async_trait};
|
||||||
use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE};
|
use jsonrpsee_types::{ErrorObject, error::INTERNAL_ERROR_CODE};
|
||||||
use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner};
|
use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner};
|
||||||
use reth_primitives_traits::SignedTransaction;
|
use reth_primitives_traits::SignedTransaction;
|
||||||
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
|
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
|
||||||
use reth_rpc::{EthFilter, EthPubSub, RpcTypes, eth::pubsub::SubscriptionSerializeError};
|
use reth_rpc::{EthFilter, EthPubSub};
|
||||||
use reth_rpc_eth_api::{
|
use reth_rpc_eth_api::{
|
||||||
EthApiServer, EthApiTypes, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock,
|
EthApiTypes, EthFilterApiServer, EthPubSubApiServer, RpcBlock, RpcConvert, RpcReceipt,
|
||||||
RpcConvert, RpcHeader, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq,
|
RpcTransaction, helpers::EthBlocks, transaction::ConvertReceiptInput,
|
||||||
helpers::{EthBlocks, EthTransactions, LoadReceipt},
|
|
||||||
transaction::ConvertReceiptInput,
|
|
||||||
};
|
};
|
||||||
use reth_rpc_eth_types::EthApiError;
|
use reth_rpc_eth_types::EthApiError;
|
||||||
use serde::Serialize;
|
|
||||||
use std::{marker::PhantomData, sync::Arc};
|
use std::{marker::PhantomData, sync::Arc};
|
||||||
use tokio_stream::{Stream, StreamExt};
|
use tokio_stream::StreamExt;
|
||||||
use tracing::{Instrument, trace};
|
use tracing::{Instrument, trace};
|
||||||
|
|
||||||
use crate::{HlBlock, node::primitives::HlPrimitives};
|
use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
|
||||||
|
|
||||||
pub trait EthWrapper:
|
|
||||||
EthApiServer<
|
|
||||||
RpcTxReq<Self::NetworkTypes>,
|
|
||||||
RpcTransaction<Self::NetworkTypes>,
|
|
||||||
RpcBlock<Self::NetworkTypes>,
|
|
||||||
RpcReceipt<Self::NetworkTypes>,
|
|
||||||
RpcHeader<Self::NetworkTypes>,
|
|
||||||
> + FullEthApiTypes<
|
|
||||||
Primitives = HlPrimitives,
|
|
||||||
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
|
|
||||||
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
|
|
||||||
+ EthBlocks
|
|
||||||
+ EthTransactions
|
|
||||||
+ LoadReceipt
|
|
||||||
+ 'static
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T> EthWrapper for T where
|
|
||||||
T: EthApiServer<
|
|
||||||
RpcTxReq<Self::NetworkTypes>,
|
|
||||||
RpcTransaction<Self::NetworkTypes>,
|
|
||||||
RpcBlock<Self::NetworkTypes>,
|
|
||||||
RpcReceipt<Self::NetworkTypes>,
|
|
||||||
RpcHeader<Self::NetworkTypes>,
|
|
||||||
> + FullEthApiTypes<
|
|
||||||
Primitives = HlPrimitives,
|
|
||||||
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
|
|
||||||
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
|
|
||||||
+ EthBlocks
|
|
||||||
+ EthTransactions
|
|
||||||
+ LoadReceipt
|
|
||||||
+ 'static
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
#[rpc(server, namespace = "eth")]
|
#[rpc(server, namespace = "eth")]
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@ -386,6 +347,8 @@ where
|
|||||||
pubsub.log_stream(filter).filter_map(|log| adjust_log::<Eth>(log, &provider)),
|
pubsub.log_stream(filter).filter_map(|log| adjust_log::<Eth>(log, &provider)),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
} else if kind == SubscriptionKind::NewHeads {
|
||||||
|
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
|
||||||
} else {
|
} else {
|
||||||
let _ = pubsub.handle_accepted(sink, kind, params).await;
|
let _ = pubsub.handle_accepted(sink, kind, params).await;
|
||||||
}
|
}
|
||||||
@ -412,23 +375,6 @@ fn adjust_log<Eth: EthWrapper>(mut log: Log, provider: &Eth::Provider) -> Option
|
|||||||
Some(log)
|
Some(log)
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
|
|
||||||
sink: SubscriptionSink,
|
|
||||||
mut stream: St,
|
|
||||||
) -> Result<(), ErrorObject<'static>> {
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
_ = sink.closed() => break Ok(()),
|
|
||||||
maybe_item = stream.next() => {
|
|
||||||
let Some(item) = maybe_item else { break Ok(()) };
|
|
||||||
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
|
|
||||||
.map_err(SubscriptionSerializeError::from)?;
|
|
||||||
if sink.send(msg).await.is_err() { break Ok(()); }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct HlNodeBlockFilterHttp<Eth: EthWrapper> {
|
pub struct HlNodeBlockFilterHttp<Eth: EthWrapper> {
|
||||||
eth_api: Arc<Eth>,
|
eth_api: Arc<Eth>,
|
||||||
_marker: PhantomData<Eth>,
|
_marker: PhantomData<Eth>,
|
||||||
|
|||||||
@ -1,3 +1,5 @@
|
|||||||
pub mod call_forwarder;
|
pub mod call_forwarder;
|
||||||
pub mod hl_node_compliance;
|
pub mod hl_node_compliance;
|
||||||
pub mod tx_forwarder;
|
pub mod tx_forwarder;
|
||||||
|
pub mod subscribe_fixup;
|
||||||
|
mod utils;
|
||||||
|
|||||||
54
src/addons/subscribe_fixup.rs
Normal file
54
src/addons/subscribe_fixup.rs
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
use crate::addons::utils::{EthWrapper, new_headers_stream, pipe_from_stream};
|
||||||
|
use alloy_rpc_types::pubsub::{Params, SubscriptionKind};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use jsonrpsee::PendingSubscriptionSink;
|
||||||
|
use jsonrpsee_types::ErrorObject;
|
||||||
|
use reth::tasks::TaskSpawner;
|
||||||
|
use reth_rpc::EthPubSub;
|
||||||
|
use reth_rpc_convert::RpcTransaction;
|
||||||
|
use reth_rpc_eth_api::{EthApiTypes, EthPubSubApiServer};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
pub struct SubscribeFixup<Eth: EthWrapper> {
|
||||||
|
pubsub: Arc<EthPubSub<Eth>>,
|
||||||
|
provider: Arc<Eth::Provider>,
|
||||||
|
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for SubscribeFixup<Eth>
|
||||||
|
where
|
||||||
|
ErrorObject<'static>: From<<Eth as EthApiTypes>::Error>,
|
||||||
|
{
|
||||||
|
async fn subscribe(
|
||||||
|
&self,
|
||||||
|
pending: PendingSubscriptionSink,
|
||||||
|
kind: SubscriptionKind,
|
||||||
|
params: Option<Params>,
|
||||||
|
) -> jsonrpsee::core::SubscriptionResult {
|
||||||
|
let sink = pending.accept().await?;
|
||||||
|
let (pubsub, provider) = (self.pubsub.clone(), self.provider.clone());
|
||||||
|
self.subscription_task_spawner.spawn(Box::pin(async move {
|
||||||
|
if kind == SubscriptionKind::NewHeads {
|
||||||
|
let _ = pipe_from_stream(sink, new_headers_stream::<Eth>(&provider)).await;
|
||||||
|
} else {
|
||||||
|
let _ = pubsub.handle_accepted(sink, kind, params).await;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Eth: EthWrapper> SubscribeFixup<Eth> {
|
||||||
|
pub fn new(
|
||||||
|
pubsub: Arc<EthPubSub<Eth>>,
|
||||||
|
provider: Arc<Eth::Provider>,
|
||||||
|
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
|
||||||
|
) -> Self
|
||||||
|
where
|
||||||
|
Eth: EthWrapper,
|
||||||
|
ErrorObject<'static>: From<Eth::Error>,
|
||||||
|
{
|
||||||
|
Self { pubsub, provider, subscription_task_spawner }
|
||||||
|
}
|
||||||
|
}
|
||||||
90
src/addons/utils.rs
Normal file
90
src/addons/utils.rs
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use crate::{HlBlock, HlPrimitives};
|
||||||
|
use alloy_primitives::U256;
|
||||||
|
use alloy_rpc_types::Header;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use jsonrpsee::{SubscriptionMessage, SubscriptionSink};
|
||||||
|
use jsonrpsee_types::ErrorObject;
|
||||||
|
use reth_primitives::SealedHeader;
|
||||||
|
use reth_provider::{BlockReader, CanonStateSubscriptions};
|
||||||
|
use reth_rpc::{RpcTypes, eth::pubsub::SubscriptionSerializeError};
|
||||||
|
use reth_rpc_convert::{RpcBlock, RpcHeader, RpcReceipt, RpcTransaction, RpcTxReq};
|
||||||
|
use reth_rpc_eth_api::{
|
||||||
|
EthApiServer, FullEthApiTypes, RpcNodeCoreExt,
|
||||||
|
helpers::{EthBlocks, EthTransactions, LoadReceipt},
|
||||||
|
};
|
||||||
|
use serde::Serialize;
|
||||||
|
use tokio_stream::Stream;
|
||||||
|
|
||||||
|
pub trait EthWrapper:
|
||||||
|
EthApiServer<
|
||||||
|
RpcTxReq<Self::NetworkTypes>,
|
||||||
|
RpcTransaction<Self::NetworkTypes>,
|
||||||
|
RpcBlock<Self::NetworkTypes>,
|
||||||
|
RpcReceipt<Self::NetworkTypes>,
|
||||||
|
RpcHeader<Self::NetworkTypes>,
|
||||||
|
> + FullEthApiTypes<
|
||||||
|
Primitives = HlPrimitives,
|
||||||
|
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
|
||||||
|
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
|
||||||
|
+ EthBlocks
|
||||||
|
+ EthTransactions
|
||||||
|
+ LoadReceipt
|
||||||
|
+ 'static
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> EthWrapper for T where
|
||||||
|
T: EthApiServer<
|
||||||
|
RpcTxReq<Self::NetworkTypes>,
|
||||||
|
RpcTransaction<Self::NetworkTypes>,
|
||||||
|
RpcBlock<Self::NetworkTypes>,
|
||||||
|
RpcReceipt<Self::NetworkTypes>,
|
||||||
|
RpcHeader<Self::NetworkTypes>,
|
||||||
|
> + FullEthApiTypes<
|
||||||
|
Primitives = HlPrimitives,
|
||||||
|
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
|
||||||
|
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
|
||||||
|
+ EthBlocks
|
||||||
|
+ EthTransactions
|
||||||
|
+ LoadReceipt
|
||||||
|
+ 'static
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
|
||||||
|
sink: SubscriptionSink,
|
||||||
|
mut stream: St,
|
||||||
|
) -> Result<(), ErrorObject<'static>> {
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = sink.closed() => break Ok(()),
|
||||||
|
maybe_item = stream.next() => {
|
||||||
|
let Some(item) = maybe_item else { break Ok(()) };
|
||||||
|
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
|
||||||
|
.map_err(SubscriptionSerializeError::from)?;
|
||||||
|
if sink.send(msg).await.is_err() { break Ok(()); }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn new_headers_stream<Eth: EthWrapper>(
|
||||||
|
provider: &Arc<Eth::Provider>,
|
||||||
|
) -> impl Stream<Item = Header<alloy_consensus::Header>> {
|
||||||
|
provider.canonical_state_stream().flat_map(|new_chain| {
|
||||||
|
let headers = new_chain
|
||||||
|
.committed()
|
||||||
|
.blocks_iter()
|
||||||
|
.map(|block| {
|
||||||
|
Header::from_consensus(
|
||||||
|
SealedHeader::new(block.header().inner.clone(), block.hash()).into(),
|
||||||
|
None,
|
||||||
|
Some(U256::from(block.rlp_length())),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
futures::stream::iter(headers)
|
||||||
|
})
|
||||||
|
}
|
||||||
29
src/main.rs
29
src/main.rs
@ -1,12 +1,16 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use reth::builder::{NodeBuilder, NodeHandle, WithLaunchContext};
|
use reth::{
|
||||||
|
builder::{NodeBuilder, NodeHandle, WithLaunchContext},
|
||||||
|
rpc::{api::EthPubSubApiServer, eth::RpcNodeCore},
|
||||||
|
};
|
||||||
use reth_db::DatabaseEnv;
|
use reth_db::DatabaseEnv;
|
||||||
use reth_hl::{
|
use reth_hl::{
|
||||||
addons::{
|
addons::{
|
||||||
call_forwarder::{self, CallForwarderApiServer},
|
call_forwarder::{self, CallForwarderApiServer},
|
||||||
hl_node_compliance::install_hl_node_compliance,
|
hl_node_compliance::install_hl_node_compliance,
|
||||||
|
subscribe_fixup::SubscribeFixup,
|
||||||
tx_forwarder::{self, EthForwarderApiServer},
|
tx_forwarder::{self, EthForwarderApiServer},
|
||||||
},
|
},
|
||||||
chainspec::{HlChainSpec, parser::HlChainSpecParser},
|
chainspec::{HlChainSpec, parser::HlChainSpecParser},
|
||||||
@ -14,7 +18,9 @@ use reth_hl::{
|
|||||||
HlNode,
|
HlNode,
|
||||||
cli::{Cli, HlNodeArgs},
|
cli::{Cli, HlNodeArgs},
|
||||||
rpc::precompile::{HlBlockPrecompileApiServer, HlBlockPrecompileExt},
|
rpc::precompile::{HlBlockPrecompileApiServer, HlBlockPrecompileExt},
|
||||||
|
spot_meta::init as spot_meta_init,
|
||||||
storage::tables::Tables,
|
storage::tables::Tables,
|
||||||
|
types::set_spot_metadata_db,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
@ -59,6 +65,17 @@ fn main() -> eyre::Result<()> {
|
|||||||
info!("Call/gas estimation will be forwarded to {}", upstream_rpc_url);
|
info!("Call/gas estimation will be forwarded to {}", upstream_rpc_url);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This is a temporary workaround to fix the issue with custom headers
|
||||||
|
// affects `eth_subscribe[type=newHeads]`
|
||||||
|
ctx.modules.replace_configured(
|
||||||
|
SubscribeFixup::new(
|
||||||
|
Arc::new(ctx.registry.eth_handlers().pubsub.clone()),
|
||||||
|
Arc::new(ctx.registry.eth_api().provider().clone()),
|
||||||
|
Box::new(ctx.node().task_executor.clone()),
|
||||||
|
)
|
||||||
|
.into_rpc(),
|
||||||
|
)?;
|
||||||
|
|
||||||
if ext.hl_node_compliant {
|
if ext.hl_node_compliant {
|
||||||
install_hl_node_compliance(&mut ctx)?;
|
install_hl_node_compliance(&mut ctx)?;
|
||||||
info!("hl-node compliant mode enabled");
|
info!("hl-node compliant mode enabled");
|
||||||
@ -77,6 +94,16 @@ fn main() -> eyre::Result<()> {
|
|||||||
})
|
})
|
||||||
.apply(|mut builder| {
|
.apply(|mut builder| {
|
||||||
builder.db_mut().create_tables_for::<Tables>().expect("create tables");
|
builder.db_mut().create_tables_for::<Tables>().expect("create tables");
|
||||||
|
|
||||||
|
let chain_id = builder.config().chain.inner.chain().id();
|
||||||
|
let db = builder.db_mut().clone();
|
||||||
|
|
||||||
|
// Set database handle for on-demand persistence
|
||||||
|
set_spot_metadata_db(db.clone());
|
||||||
|
|
||||||
|
// Load spot metadata from database and initialize cache
|
||||||
|
spot_meta_init::load_spot_metadata_cache(&db, chain_id);
|
||||||
|
|
||||||
builder
|
builder
|
||||||
})
|
})
|
||||||
.launch()
|
.launch()
|
||||||
|
|||||||
@ -1,7 +1,11 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
chainspec::{HlChainSpec, parser::HlChainSpecParser},
|
chainspec::{HlChainSpec, parser::HlChainSpecParser},
|
||||||
node::{
|
node::{
|
||||||
HlNode, consensus::HlConsensus, evm::config::HlEvmConfig, migrate::Migrator,
|
HlNode,
|
||||||
|
consensus::HlConsensus,
|
||||||
|
evm::config::HlEvmConfig,
|
||||||
|
migrate::Migrator,
|
||||||
|
spot_meta::init as spot_meta_init,
|
||||||
storage::tables::Tables,
|
storage::tables::Tables,
|
||||||
},
|
},
|
||||||
pseudo_peer::BlockSourceArgs,
|
pseudo_peer::BlockSourceArgs,
|
||||||
@ -20,10 +24,7 @@ use reth_cli::chainspec::ChainSpecParser;
|
|||||||
use reth_cli_commands::{common::EnvironmentArgs, launcher::FnLauncher};
|
use reth_cli_commands::{common::EnvironmentArgs, launcher::FnLauncher};
|
||||||
use reth_db::{DatabaseEnv, init_db, mdbx::init_db_for};
|
use reth_db::{DatabaseEnv, init_db, mdbx::init_db_for};
|
||||||
use reth_tracing::FileWorkerGuard;
|
use reth_tracing::FileWorkerGuard;
|
||||||
use std::{
|
use std::{fmt::{self}, sync::Arc};
|
||||||
fmt::{self},
|
|
||||||
sync::Arc,
|
|
||||||
};
|
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
macro_rules! not_applicable {
|
macro_rules! not_applicable {
|
||||||
@ -145,8 +146,12 @@ where
|
|||||||
|
|
||||||
match self.command {
|
match self.command {
|
||||||
Commands::Node(command) => runner.run_command_until_exit(|ctx| {
|
Commands::Node(command) => runner.run_command_until_exit(|ctx| {
|
||||||
Self::migrate_db(&command.chain, &command.datadir, &command.db)
|
// NOTE: This is for one time migration around Oct 10 upgrade:
|
||||||
.expect("Failed to migrate database");
|
// It's not necessary anymore, an environment variable gate is added here.
|
||||||
|
if std::env::var("CHECK_DB_MIGRATION").is_ok() {
|
||||||
|
Self::migrate_db(&command.chain, &command.datadir, &command.db)
|
||||||
|
.expect("Failed to migrate database");
|
||||||
|
}
|
||||||
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
|
command.execute(ctx, FnLauncher::new::<C, Ext>(launcher))
|
||||||
}),
|
}),
|
||||||
Commands::Init(command) => {
|
Commands::Init(command) => {
|
||||||
@ -190,7 +195,12 @@ where
|
|||||||
let data_dir = env.datadir.clone().resolve_datadir(env.chain.chain());
|
let data_dir = env.datadir.clone().resolve_datadir(env.chain.chain());
|
||||||
let db_path = data_dir.db();
|
let db_path = data_dir.db();
|
||||||
init_db(db_path.clone(), env.db.database_args())?;
|
init_db(db_path.clone(), env.db.database_args())?;
|
||||||
init_db_for::<_, Tables>(db_path, env.db.database_args())?;
|
init_db_for::<_, Tables>(db_path.clone(), env.db.database_args())?;
|
||||||
|
|
||||||
|
// Initialize spot metadata in database
|
||||||
|
let chain_id = env.chain.chain().id();
|
||||||
|
spot_meta_init::init_spot_metadata(db_path, env.db.database_args(), chain_id)?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,5 @@
|
|||||||
use alloy_consensus::Header;
|
use alloy_consensus::Header;
|
||||||
use alloy_primitives::{b256, hex::ToHexExt, BlockHash, Bytes, B256, U256};
|
use alloy_primitives::{B256, BlockHash, Bytes, U256, b256, hex::ToHexExt};
|
||||||
use reth::{
|
use reth::{
|
||||||
api::NodeTypesWithDBAdapter,
|
api::NodeTypesWithDBAdapter,
|
||||||
args::{DatabaseArgs, DatadirArgs},
|
args::{DatabaseArgs, DatadirArgs},
|
||||||
@ -7,11 +7,12 @@ use reth::{
|
|||||||
};
|
};
|
||||||
use reth_chainspec::EthChainSpec;
|
use reth_chainspec::EthChainSpec;
|
||||||
use reth_db::{
|
use reth_db::{
|
||||||
mdbx::{tx::Tx, RO},
|
DatabaseEnv,
|
||||||
|
mdbx::{RO, tx::Tx},
|
||||||
models::CompactU256,
|
models::CompactU256,
|
||||||
static_file::iter_static_files,
|
static_file::iter_static_files,
|
||||||
table::Decompress,
|
table::Decompress,
|
||||||
tables, DatabaseEnv,
|
tables,
|
||||||
};
|
};
|
||||||
use reth_db_api::{
|
use reth_db_api::{
|
||||||
cursor::{DbCursorRO, DbCursorRW},
|
cursor::{DbCursorRO, DbCursorRW},
|
||||||
@ -20,15 +21,15 @@ use reth_db_api::{
|
|||||||
use reth_errors::ProviderResult;
|
use reth_errors::ProviderResult;
|
||||||
use reth_ethereum_primitives::EthereumReceipt;
|
use reth_ethereum_primitives::EthereumReceipt;
|
||||||
use reth_provider::{
|
use reth_provider::{
|
||||||
providers::{NodeTypesForProvider, StaticFileProvider},
|
|
||||||
static_file::SegmentRangeInclusive,
|
|
||||||
DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory,
|
DatabaseProvider, ProviderFactory, ReceiptProvider, StaticFileProviderFactory,
|
||||||
StaticFileSegment, StaticFileWriter,
|
StaticFileSegment, StaticFileWriter,
|
||||||
|
providers::{NodeTypesForProvider, StaticFileProvider},
|
||||||
|
static_file::SegmentRangeInclusive,
|
||||||
};
|
};
|
||||||
use std::{fs::File, io::Write, path::PathBuf, sync::Arc};
|
use std::{fs::File, io::Write, path::PathBuf, sync::Arc};
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
use crate::{chainspec::HlChainSpec, HlHeader, HlPrimitives};
|
use crate::{HlHeader, HlPrimitives, chainspec::HlChainSpec};
|
||||||
|
|
||||||
pub(crate) trait HlNodeType:
|
pub(crate) trait HlNodeType:
|
||||||
NodeTypesForProvider<ChainSpec = HlChainSpec, Primitives = HlPrimitives>
|
NodeTypesForProvider<ChainSpec = HlChainSpec, Primitives = HlPrimitives>
|
||||||
@ -123,6 +124,8 @@ impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
|
|||||||
return Ok(false);
|
return Ok(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
check_if_migration_enabled()?;
|
||||||
|
|
||||||
self.migrate_mdbx_inner()?;
|
self.migrate_mdbx_inner()?;
|
||||||
Ok(true)
|
Ok(true)
|
||||||
}
|
}
|
||||||
@ -130,7 +133,14 @@ impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
|
|||||||
fn migrate_mdbx_inner(&self) -> eyre::Result<()> {
|
fn migrate_mdbx_inner(&self) -> eyre::Result<()> {
|
||||||
// There shouldn't be many headers in mdbx, but using file for safety
|
// There shouldn't be many headers in mdbx, but using file for safety
|
||||||
info!("Old database detected, migrating mdbx...");
|
info!("Old database detected, migrating mdbx...");
|
||||||
let tmp_path = self.0.conversion_tmp_dir().join("headers.rmp");
|
let conversion_tmp = self.0.conversion_tmp_dir();
|
||||||
|
let tmp_path = conversion_tmp.join("headers.rmp");
|
||||||
|
|
||||||
|
if conversion_tmp.exists() {
|
||||||
|
std::fs::remove_dir_all(&conversion_tmp)?;
|
||||||
|
}
|
||||||
|
std::fs::create_dir_all(&conversion_tmp)?;
|
||||||
|
|
||||||
let count = self.export_old_headers(&tmp_path)?;
|
let count = self.export_old_headers(&tmp_path)?;
|
||||||
self.import_new_headers(tmp_path, count)?;
|
self.import_new_headers(tmp_path, count)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -172,6 +182,18 @@ impl<'a, N: HlNodeType> MigratorMdbx<'a, N> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn check_if_migration_enabled() -> Result<(), eyre::Error> {
|
||||||
|
if std::env::var("EXPERIMENTAL_MIGRATE_DB").is_err() {
|
||||||
|
let err_msg = concat!(
|
||||||
|
"Detected an old database format but experimental database migration is currently disabled. ",
|
||||||
|
"To enable migration, set EXPERIMENTAL_MIGRATE_DB=1, or alternatively, resync your node (safest option)."
|
||||||
|
);
|
||||||
|
warn!("{}", err_msg);
|
||||||
|
return Err(eyre::eyre!("{}", err_msg));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
struct MigrateStaticFiles<'a, N: HlNodeType>(&'a Migrator<N>);
|
struct MigrateStaticFiles<'a, N: HlNodeType>(&'a Migrator<N>);
|
||||||
|
|
||||||
impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
|
impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
|
||||||
@ -244,13 +266,12 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
|
|||||||
let mut all_static_files = iter_static_files(&old_path)?;
|
let mut all_static_files = iter_static_files(&old_path)?;
|
||||||
let all_static_files =
|
let all_static_files =
|
||||||
all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default();
|
all_static_files.remove(&StaticFileSegment::Headers).unwrap_or_default();
|
||||||
let provider = self.0.provider_factory.provider()?;
|
|
||||||
|
|
||||||
let mut first = true;
|
let mut first = true;
|
||||||
|
|
||||||
for (block_range, _tx_ranges) in all_static_files {
|
for (block_range, _tx_ranges) in all_static_files {
|
||||||
let migration_needed = self.using_old_header(block_range.start())? ||
|
let migration_needed = self.using_old_header(block_range.start())?
|
||||||
self.using_old_header(block_range.end())?;
|
|| self.using_old_header(block_range.end())?;
|
||||||
if !migration_needed {
|
if !migration_needed {
|
||||||
// Create a placeholder symlink
|
// Create a placeholder symlink
|
||||||
self.create_placeholder(block_range)?;
|
self.create_placeholder(block_range)?;
|
||||||
@ -258,12 +279,15 @@ impl<'a, N: HlNodeType> MigrateStaticFiles<'a, N> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if first {
|
if first {
|
||||||
|
check_if_migration_enabled()?;
|
||||||
|
|
||||||
info!("Old database detected, migrating static files...");
|
info!("Old database detected, migrating static files...");
|
||||||
first = false;
|
first = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
let sf_provider = self.0.sf_provider();
|
let sf_provider = self.0.sf_provider();
|
||||||
let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?;
|
let sf_tmp_provider = StaticFileProvider::<HlPrimitives>::read_write(&conversion_tmp)?;
|
||||||
|
let provider = self.0.provider_factory.provider()?;
|
||||||
let block_range_for_filename = sf_provider.find_fixed_range(block_range.start());
|
let block_range_for_filename = sf_provider.find_fixed_range(block_range.start());
|
||||||
migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?;
|
migrate_single_static_file(&sf_tmp_provider, &sf_provider, &provider, block_range)?;
|
||||||
|
|
||||||
@ -320,9 +344,9 @@ fn migrate_single_static_file<N: HlNodeType>(
|
|||||||
) -> Result<(), eyre::Error> {
|
) -> Result<(), eyre::Error> {
|
||||||
info!("Migrating block range {}...", block_range);
|
info!("Migrating block range {}...", block_range);
|
||||||
|
|
||||||
// block_ranges into chunks of 100000 blocks
|
// block_ranges into chunks of 50000 blocks
|
||||||
const CHUNK_SIZE: u64 = 100000;
|
const CHUNK_SIZE: u64 = 50000;
|
||||||
for chunk in (0..=block_range.end()).step_by(CHUNK_SIZE as usize) {
|
for chunk in (block_range.start()..=block_range.end()).step_by(CHUNK_SIZE as usize) {
|
||||||
let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end());
|
let end = std::cmp::min(chunk + CHUNK_SIZE - 1, block_range.end());
|
||||||
let block_range = chunk..=end;
|
let block_range = chunk..=end;
|
||||||
let headers = old_headers_range(sf_in, block_range.clone())?;
|
let headers = old_headers_range(sf_in, block_range.clone())?;
|
||||||
|
|||||||
106
src/node/spot_meta/init.rs
Normal file
106
src/node/spot_meta/init.rs
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
use crate::node::{
|
||||||
|
spot_meta::{SpotId, erc20_contract_to_spot_token},
|
||||||
|
storage::tables::{self, SPOT_METADATA_KEY},
|
||||||
|
types::reth_compat,
|
||||||
|
};
|
||||||
|
use alloy_primitives::{Address, Bytes};
|
||||||
|
use reth_db::{DatabaseEnv, cursor::{DbCursorRO, DbCursorRW}};
|
||||||
|
use reth_db_api::{Database, transaction::{DbTx, DbTxMut}};
|
||||||
|
use std::{collections::BTreeMap, sync::Arc};
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
/// Load spot metadata from database and initialize cache
|
||||||
|
pub fn load_spot_metadata_cache(db: &Arc<DatabaseEnv>, chain_id: u64) {
|
||||||
|
// Try to read from database
|
||||||
|
let data = match db.view(|tx| -> Result<Option<Vec<u8>>, reth_db::DatabaseError> {
|
||||||
|
let mut cursor = tx.cursor_read::<tables::SpotMetadata>()?;
|
||||||
|
Ok(cursor.seek_exact(SPOT_METADATA_KEY)?.map(|(_, data)| data.to_vec()))
|
||||||
|
}) {
|
||||||
|
Ok(Ok(data)) => data,
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
info!("Failed to read spot metadata from database: {}. Will fetch on-demand from API.", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
info!("Database view error while loading spot metadata: {}. Will fetch on-demand from API.", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Check if data exists
|
||||||
|
let Some(data) = data else {
|
||||||
|
info!(
|
||||||
|
"No spot metadata found in database for chain {}. Run 'init-state' to populate, or it will be fetched on-demand from API.",
|
||||||
|
chain_id
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Deserialize metadata
|
||||||
|
let serializable_map = match rmp_serde::from_slice::<BTreeMap<Address, u64>>(&data) {
|
||||||
|
Ok(map) => map,
|
||||||
|
Err(e) => {
|
||||||
|
info!("Failed to deserialize spot metadata: {}. Will fetch on-demand from API.", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Convert and initialize cache
|
||||||
|
let metadata: BTreeMap<Address, SpotId> = serializable_map
|
||||||
|
.into_iter()
|
||||||
|
.map(|(addr, index)| (addr, SpotId { index }))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
info!("Loaded spot metadata from database ({} entries)", metadata.len());
|
||||||
|
reth_compat::initialize_spot_metadata_cache(metadata);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialize spot metadata in database from API
|
||||||
|
pub fn init_spot_metadata(
|
||||||
|
db_path: impl AsRef<std::path::Path>,
|
||||||
|
db_args: reth_db::mdbx::DatabaseArguments,
|
||||||
|
chain_id: u64,
|
||||||
|
) -> eyre::Result<()> {
|
||||||
|
info!("Initializing spot metadata for chain {}", chain_id);
|
||||||
|
|
||||||
|
let db = Arc::new(reth_db::open_db(db_path.as_ref(), db_args)?);
|
||||||
|
|
||||||
|
// Check if spot metadata already exists
|
||||||
|
let exists = db.view(|tx| -> Result<bool, reth_db::DatabaseError> {
|
||||||
|
let mut cursor = tx.cursor_read::<tables::SpotMetadata>()?;
|
||||||
|
Ok(cursor.seek_exact(SPOT_METADATA_KEY)?.is_some())
|
||||||
|
})??;
|
||||||
|
|
||||||
|
if exists {
|
||||||
|
info!("Spot metadata already exists in database");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch from API
|
||||||
|
let metadata = match erc20_contract_to_spot_token(chain_id) {
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(e) => {
|
||||||
|
info!("Failed to fetch spot metadata from API: {}. Will be fetched on-demand.", e);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Serialize and store
|
||||||
|
let serializable_map: BTreeMap<Address, u64> =
|
||||||
|
metadata.iter().map(|(addr, spot)| (*addr, spot.index)).collect();
|
||||||
|
|
||||||
|
db.update(|tx| -> Result<(), reth_db::DatabaseError> {
|
||||||
|
let mut cursor = tx.cursor_write::<tables::SpotMetadata>()?;
|
||||||
|
cursor.upsert(
|
||||||
|
SPOT_METADATA_KEY,
|
||||||
|
&Bytes::from(
|
||||||
|
rmp_serde::to_vec(&serializable_map)
|
||||||
|
.expect("Failed to serialize spot metadata"),
|
||||||
|
),
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
})??;
|
||||||
|
|
||||||
|
info!("Successfully fetched and stored spot metadata for chain {}", chain_id);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@ -5,6 +5,7 @@ use std::collections::BTreeMap;
|
|||||||
|
|
||||||
use crate::chainspec::{MAINNET_CHAIN_ID, TESTNET_CHAIN_ID};
|
use crate::chainspec::{MAINNET_CHAIN_ID, TESTNET_CHAIN_ID};
|
||||||
|
|
||||||
|
pub mod init;
|
||||||
mod patch;
|
mod patch;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
@ -25,7 +26,7 @@ pub struct SpotMeta {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub(crate) struct SpotId {
|
pub struct SpotId {
|
||||||
pub index: u64,
|
pub index: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2,10 +2,21 @@ use alloy_primitives::{BlockNumber, Bytes};
|
|||||||
use reth_db::{TableSet, TableType, TableViewer, table::TableInfo, tables};
|
use reth_db::{TableSet, TableType, TableViewer, table::TableInfo, tables};
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
|
/// Static key used for spot metadata, as the database is unique to each chain.
|
||||||
|
/// This may later serve as a versioning key to assist with future database migrations.
|
||||||
|
pub const SPOT_METADATA_KEY: u64 = 0;
|
||||||
|
|
||||||
tables! {
|
tables! {
|
||||||
/// Read precompile calls for each block.
|
/// Read precompile calls for each block.
|
||||||
table BlockReadPrecompileCalls {
|
table BlockReadPrecompileCalls {
|
||||||
type Key = BlockNumber;
|
type Key = BlockNumber;
|
||||||
type Value = Bytes;
|
type Value = Bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spot metadata mapping (EVM address to spot token index).
|
||||||
|
/// Uses a constant key since the database is chain-specific.
|
||||||
|
table SpotMetadata {
|
||||||
|
type Key = u64;
|
||||||
|
type Value = Bytes;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -19,6 +19,9 @@ pub struct ReadPrecompileCalls(pub Vec<ReadPrecompileCall>);
|
|||||||
|
|
||||||
pub(crate) mod reth_compat;
|
pub(crate) mod reth_compat;
|
||||||
|
|
||||||
|
// Re-export spot metadata functions
|
||||||
|
pub use reth_compat::{initialize_spot_metadata_cache, set_spot_metadata_db};
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||||
pub struct HlExtras {
|
pub struct HlExtras {
|
||||||
pub read_precompile_calls: Option<ReadPrecompileCalls>,
|
pub read_precompile_calls: Option<ReadPrecompileCalls>,
|
||||||
@ -127,6 +130,19 @@ pub struct SystemTx {
|
|||||||
pub receipt: Option<LegacyReceipt>,
|
pub receipt: Option<LegacyReceipt>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl SystemTx {
|
||||||
|
pub fn gas_limit(&self) -> u64 {
|
||||||
|
use reth_compat::Transaction;
|
||||||
|
match &self.tx {
|
||||||
|
Transaction::Legacy(tx) => tx.gas_limit,
|
||||||
|
Transaction::Eip2930(tx) => tx.gas_limit,
|
||||||
|
Transaction::Eip1559(tx) => tx.gas_limit,
|
||||||
|
Transaction::Eip4844(tx) => tx.gas_limit,
|
||||||
|
Transaction::Eip7702(tx) => tx.gas_limit,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(
|
#[derive(
|
||||||
Debug,
|
Debug,
|
||||||
Clone,
|
Clone,
|
||||||
|
|||||||
@ -1,11 +1,14 @@
|
|||||||
//! Copy of reth codebase to preserve serialization compatibility
|
//! Copy of reth codebase to preserve serialization compatibility
|
||||||
|
use crate::node::storage::tables::{SPOT_METADATA_KEY, SpotMetadata};
|
||||||
use alloy_consensus::{Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy};
|
use alloy_consensus::{Header, Signed, TxEip1559, TxEip2930, TxEip4844, TxEip7702, TxLegacy};
|
||||||
use alloy_primitives::{Address, BlockHash, Signature, TxKind, U256};
|
use alloy_primitives::{Address, BlockHash, Bytes, Signature, TxKind, U256};
|
||||||
|
use reth_db::cursor::DbCursorRW;
|
||||||
|
use reth_db_api::{Database, transaction::DbTxMut};
|
||||||
use reth_primitives::TransactionSigned as RethTxSigned;
|
use reth_primitives::TransactionSigned as RethTxSigned;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{
|
use std::{
|
||||||
collections::BTreeMap,
|
collections::BTreeMap,
|
||||||
sync::{Arc, LazyLock, RwLock},
|
sync::{Arc, LazyLock, Mutex, RwLock},
|
||||||
};
|
};
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
@ -81,33 +84,77 @@ pub struct SealedBlock {
|
|||||||
pub body: BlockBody,
|
pub body: BlockBody,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn system_tx_to_reth_transaction(transaction: &SystemTx, chain_id: u64) -> TxSigned {
|
static EVM_MAP: LazyLock<Arc<RwLock<BTreeMap<Address, SpotId>>>> =
|
||||||
static EVM_MAP: LazyLock<Arc<RwLock<BTreeMap<Address, SpotId>>>> =
|
LazyLock::new(|| Arc::new(RwLock::new(BTreeMap::new())));
|
||||||
LazyLock::new(|| Arc::new(RwLock::new(BTreeMap::new())));
|
|
||||||
{
|
|
||||||
let Transaction::Legacy(tx) = &transaction.tx else {
|
|
||||||
panic!("Unexpected transaction type");
|
|
||||||
};
|
|
||||||
let TxKind::Call(to) = tx.to else {
|
|
||||||
panic!("Unexpected contract creation");
|
|
||||||
};
|
|
||||||
let s = if tx.input.is_empty() {
|
|
||||||
U256::from(0x1)
|
|
||||||
} else {
|
|
||||||
loop {
|
|
||||||
if let Some(spot) = EVM_MAP.read().unwrap().get(&to) {
|
|
||||||
break spot.to_s();
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Contract not found: {to:?} from spot mapping, fetching again...");
|
// Optional database handle for persisting on-demand fetches
|
||||||
*EVM_MAP.write().unwrap() = erc20_contract_to_spot_token(chain_id).unwrap();
|
static DB_HANDLE: LazyLock<Mutex<Option<Arc<reth_db::DatabaseEnv>>>> =
|
||||||
}
|
LazyLock::new(|| Mutex::new(None));
|
||||||
};
|
|
||||||
let signature = Signature::new(U256::from(0x1), s, true);
|
/// Set the database handle for persisting spot metadata
|
||||||
TxSigned::Default(RethTxSigned::Legacy(Signed::new_unhashed(tx.clone(), signature)))
|
pub fn set_spot_metadata_db(db: Arc<reth_db::DatabaseEnv>) {
|
||||||
|
*DB_HANDLE.lock().unwrap() = Some(db);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Initialize the spot metadata cache with data loaded from database.
|
||||||
|
/// This should be called during node initialization.
|
||||||
|
pub fn initialize_spot_metadata_cache(metadata: BTreeMap<Address, SpotId>) {
|
||||||
|
*EVM_MAP.write().unwrap() = metadata;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Persist spot metadata to database if handle is available
|
||||||
|
fn persist_spot_metadata_to_db(metadata: &BTreeMap<Address, SpotId>) {
|
||||||
|
if let Some(db) = DB_HANDLE.lock().unwrap().as_ref() {
|
||||||
|
let result = db.update(|tx| -> Result<(), reth_db::DatabaseError> {
|
||||||
|
let mut cursor = tx.cursor_write::<SpotMetadata>()?;
|
||||||
|
|
||||||
|
// Serialize to BTreeMap<Address, u64>
|
||||||
|
let serializable_map: BTreeMap<Address, u64> =
|
||||||
|
metadata.iter().map(|(addr, spot)| (*addr, spot.index)).collect();
|
||||||
|
|
||||||
|
cursor.upsert(
|
||||||
|
SPOT_METADATA_KEY,
|
||||||
|
&Bytes::from(
|
||||||
|
rmp_serde::to_vec(&serializable_map)
|
||||||
|
.expect("Failed to serialize spot metadata"),
|
||||||
|
),
|
||||||
|
)?;
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(_) => info!("Persisted spot metadata to database"),
|
||||||
|
Err(e) => info!("Failed to persist spot metadata to database: {}", e),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn system_tx_to_reth_transaction(transaction: &SystemTx, chain_id: u64) -> TxSigned {
|
||||||
|
let Transaction::Legacy(tx) = &transaction.tx else {
|
||||||
|
panic!("Unexpected transaction type");
|
||||||
|
};
|
||||||
|
let TxKind::Call(to) = tx.to else {
|
||||||
|
panic!("Unexpected contract creation");
|
||||||
|
};
|
||||||
|
let s = if tx.input.is_empty() {
|
||||||
|
U256::from(0x1)
|
||||||
|
} else {
|
||||||
|
loop {
|
||||||
|
if let Some(spot) = EVM_MAP.read().unwrap().get(&to) {
|
||||||
|
break spot.to_s();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cache miss - fetch from API, update cache, and persist to database
|
||||||
|
info!("Contract not found: {to:?} from spot mapping, fetching from API...");
|
||||||
|
let metadata = erc20_contract_to_spot_token(chain_id).unwrap();
|
||||||
|
*EVM_MAP.write().unwrap() = metadata.clone();
|
||||||
|
persist_spot_metadata_to_db(&metadata);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let signature = Signature::new(U256::from(0x1), s, true);
|
||||||
|
TxSigned::Default(RethTxSigned::Legacy(Signed::new_unhashed(tx.clone(), signature)))
|
||||||
|
}
|
||||||
|
|
||||||
impl SealedBlock {
|
impl SealedBlock {
|
||||||
pub fn to_reth_block(
|
pub fn to_reth_block(
|
||||||
&self,
|
&self,
|
||||||
@ -117,6 +164,10 @@ impl SealedBlock {
|
|||||||
receipts: Vec<LegacyReceipt>,
|
receipts: Vec<LegacyReceipt>,
|
||||||
chain_id: u64,
|
chain_id: u64,
|
||||||
) -> HlBlock {
|
) -> HlBlock {
|
||||||
|
// NOTE: Filter out system transactions that may be rejected by the EVM (tracked by #97,
|
||||||
|
// testnet only).
|
||||||
|
let system_txs: Vec<_> = system_txs.into_iter().filter(|tx| tx.gas_limit() != 0).collect();
|
||||||
|
|
||||||
let mut merged_txs = vec![];
|
let mut merged_txs = vec![];
|
||||||
merged_txs.extend(system_txs.iter().map(|tx| system_tx_to_reth_transaction(tx, chain_id)));
|
merged_txs.extend(system_txs.iter().map(|tx| system_tx_to_reth_transaction(tx, chain_id)));
|
||||||
merged_txs.extend(self.body.transactions.iter().map(|tx| tx.to_reth_transaction()));
|
merged_txs.extend(self.body.transactions.iter().map(|tx| tx.to_reth_transaction()));
|
||||||
|
|||||||
@ -81,13 +81,13 @@ impl BlockPoller {
|
|||||||
.await
|
.await
|
||||||
.ok_or(eyre::eyre!("Failed to find latest block number"))?;
|
.ok_or(eyre::eyre!("Failed to find latest block number"))?;
|
||||||
|
|
||||||
if let Some(debug_cutoff_height) = debug_cutoff_height &&
|
|
||||||
next_block_number > debug_cutoff_height
|
|
||||||
{
|
|
||||||
next_block_number = debug_cutoff_height;
|
|
||||||
}
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
if let Some(debug_cutoff_height) = debug_cutoff_height
|
||||||
|
&& next_block_number > debug_cutoff_height
|
||||||
|
{
|
||||||
|
next_block_number = debug_cutoff_height;
|
||||||
|
}
|
||||||
|
|
||||||
match block_source.collect_block(next_block_number).await {
|
match block_source.collect_block(next_block_number).await {
|
||||||
Ok(block) => {
|
Ok(block) => {
|
||||||
block_tx.send((next_block_number, block)).await?;
|
block_tx.send((next_block_number, block)).await?;
|
||||||
|
|||||||
49
tests/run_tests.sh
Normal file
49
tests/run_tests.sh
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
export ETH_RPC_URL="${ETH_RPC_URL:-wss://hl-archive-node.xyz}"
|
||||||
|
|
||||||
|
success() {
|
||||||
|
echo "Success: $1"
|
||||||
|
}
|
||||||
|
|
||||||
|
fail() {
|
||||||
|
echo "Failed: $1"
|
||||||
|
exit 1
|
||||||
|
}
|
||||||
|
|
||||||
|
ensure_cmd() {
|
||||||
|
command -v "$1" > /dev/null 2>&1 || fail "$1 is required"
|
||||||
|
}
|
||||||
|
|
||||||
|
ensure_cmd jq
|
||||||
|
ensure_cmd cast
|
||||||
|
ensure_cmd wscat
|
||||||
|
|
||||||
|
if [[ ! "$ETH_RPC_URL" =~ ^wss?:// ]]; then
|
||||||
|
fail "ETH_RPC_URL must be a websocket url"
|
||||||
|
fi
|
||||||
|
|
||||||
|
TITLE="Issue #78 - eth_getLogs should return system transactions"
|
||||||
|
cast logs \
|
||||||
|
--rpc-url "$ETH_RPC_URL" \
|
||||||
|
--from-block 15312567 \
|
||||||
|
--to-block 15312570 \
|
||||||
|
--address 0x9fdbda0a5e284c32744d2f17ee5c74b284993463 \
|
||||||
|
0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef \
|
||||||
|
| grep -q "0x00000000000000000000000020000000000000000000000000000000000000c5" \
|
||||||
|
&& success "$TITLE" || fail "$TITLE"
|
||||||
|
|
||||||
|
TITLE="Issue #78 - eth_getBlockByNumber should return the same logsBloom as official RPC"
|
||||||
|
OFFICIAL_RPC="https://rpc.hyperliquid.xyz/evm"
|
||||||
|
A=$(cast block 1394092 --rpc-url "$ETH_RPC_URL" -f logsBloom | md5sum)
|
||||||
|
B=$(cast block 1394092 --rpc-url "$OFFICIAL_RPC" -f logsBloom | md5sum)
|
||||||
|
echo node "$A"
|
||||||
|
echo rpc\ "$B"
|
||||||
|
[[ "$A" == "$B" ]] && success "$TITLE" || fail "$TITLE"
|
||||||
|
|
||||||
|
TITLE="eth_subscribe newHeads via wscat"
|
||||||
|
CMD='{"jsonrpc":"2.0","id":1,"method":"eth_subscribe","params":["newHeads"]}'
|
||||||
|
wscat -w 2 -c "$ETH_RPC_URL" -x "$CMD" | tail -1 | jq -r .params.result.nonce | grep 0x \
|
||||||
|
&& success "$TITLE" || fail "$TITLE"
|
||||||
Reference in New Issue
Block a user