From 28f6c1e6beeb8959a11c25be2e94edde39ecfe06 Mon Sep 17 00:00:00 2001 From: arb00r <40786269+arb000r@users.noreply.github.com> Date: Thu, 26 Jun 2025 04:15:58 +1000 Subject: [PATCH] Local block sync (#7) * update: logs * update: more logging * update: rename local ingest dir args * update: fix build * update: directory path * update: logs * update: log ts * update: fetch last block * update: time formatting * update: handle seconds * update: lmore logs * fix: provided args * update: logs * fix: build * update: indefinite wiat * update: run the right loop * update: remove offset * update: scan impl * update: log exists * update: collect s3 blocks * update: change the file * update: logs * fix: deserialization * fix: build * update: remove block * update: add logs * update: logs * update: logs * update: dates * update: ignore older blocks * update: hook up to sync * fix: build * fix: build * update: logs * update: logs * update: start height cond * update: height * update: loggy * update: cond * update: cond * update: cond * update: logs * update: fix height issues * update: logs * only collect s3 * update: log block * update: log both blocks * update; return s3 block * update: use local block * update: blocks * update: remove logs * update: logs * update: remove warns and logs * update: collection log * update: logs * update: logs * update: scan through heights when registering evm * update: add local ingest dir to other evm factory * fix: build * update: add cli cmd * update: remove additional arg * update: change where local ingest dir comes from * fix: receipts * update: deser format * update: fix build * update: logs * update: logs * update: logs * update: logs * update: share precompiles with engine * update: insert compiles * update: change sync dir * update: logs * update: logs * update: logs * update: fix build * update: pipe builder context through * update: untracked * update: pass through context * fix: build * fix: build * update: logs * update: logs * update: logs * update: fix cache passthrough * update: remove logs * update: logs * update: hour rollover * update: zero out hour * update: hour sync * update: cleanup code and speedup sync * update: speedup sync * update: remove logs * update: speed up sync * update: speed up sync * update: ingest in reverse * fix: iter rev * update: break line loop early * update: remove break * update: iteration speed * update: fix build * update: slow down tail ival * update: logs * update: skip last line * update: remove log * update: height * update: logs * update: return logs * update: disable attempt logs * update: tail interval * update: cleanup logs * update: add iter skip * update: fix build * update: skip -1 * fix: skip * fix: build * update: build * fix: build * update: logs * update: log idx * update: skip after enumerate * update: cleanup * update: more cleanup * update: refactor BuilderSharedState to HyperliquidSharedState * update: more cleanup * update: cleanup and refactor collect_local_block * update: error msg * update: readme * update: typo * update: file log * fix: typo build * update: debug log --- Cargo.lock | 21 ++- Cargo.toml | 3 + README.md | 23 +++ bin/reth/Cargo.toml | 3 + bin/reth/src/block_ingest.rs | 195 +++++++++++++++++++++- bin/reth/src/main.rs | 12 +- crates/cli/commands/src/node.rs | 6 + crates/ethereum/consensus/src/lib.rs | 1 + crates/ethereum/evm/Cargo.toml | 1 + crates/ethereum/evm/src/lib.rs | 63 +++++-- crates/ethereum/node/src/node.rs | 4 +- crates/ethereum/node/src/payload.rs | 8 +- crates/hyperliquid-types/Cargo.toml | 2 + crates/hyperliquid-types/src/lib.rs | 8 +- crates/node/builder/Cargo.toml | 1 + crates/node/builder/src/builder/mod.rs | 22 ++- crates/node/builder/src/builder/states.rs | 16 ++ crates/node/builder/src/launch/common.rs | 15 +- crates/node/builder/src/launch/engine.rs | 4 +- crates/node/core/src/node_config.rs | 10 +- examples/custom-engine-types/src/main.rs | 4 +- 21 files changed, 375 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2ed119bcb..e4f8f6652 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2588,9 +2588,9 @@ dependencies = [ [[package]] name = "deranged" -version = "0.3.11" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" +checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" dependencies = [ "powerfmt", "serde", @@ -6724,6 +6724,7 @@ dependencies = [ "serde_json", "similar-asserts", "tempfile", + "time", "tokio", "tracing", ] @@ -7813,6 +7814,7 @@ dependencies = [ "reth-evm", "reth-execution-types", "reth-hyperliquid-types", + "reth-node-builder", "reth-primitives", "reth-primitives-traits", "reth-revm", @@ -7968,8 +7970,10 @@ version = "1.2.0" dependencies = [ "alloy-primitives", "clap", + "parking_lot", "reth-cli-commands", "serde", + "tokio", ] [[package]] @@ -8289,6 +8293,7 @@ dependencies = [ "reth-evm", "reth-exex", "reth-fs-util", + "reth-hyperliquid-types", "reth-invalid-block-hooks", "reth-network", "reth-network-api", @@ -11427,9 +11432,9 @@ dependencies = [ [[package]] name = "time" -version = "0.3.37" +version = "0.3.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" dependencies = [ "deranged", "itoa", @@ -11445,15 +11450,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.2" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" +checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" [[package]] name = "time-macros" -version = "0.2.19" +version = "0.2.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2834e6017e3e5e4b9834939793b282bc03b37a3336245fa820e35e233e2a85de" +checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" dependencies = [ "num-conv", "time-core", diff --git a/Cargo.toml b/Cargo.toml index 98b44ae1d..e0d631197 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -555,6 +555,9 @@ tokio = { version = "1.39", default-features = false } tokio-stream = "0.1.11" tokio-util = { version = "0.7.4", features = ["codec"] } +# time +time = "0.3.41" + # async async-stream = "0.3" async-trait = "0.1.68" diff --git a/README.md b/README.md index 2351d8850..61fd8b53e 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,29 @@ $ reth node --http --http.addr 0.0.0.0 --http.api eth,ots,net,web3 \ --ws --ws.addr 0.0.0.0 --ws.origins '*' --ws.api eth,ots,net,web3 --ingest-dir ~/evm-blocks --ws.port 8545 ``` +## How to run (mainnet) (with local block sync) + +You can choose to source blocks from your local instance of hl-node instead of relying on an s3 replica. +This will require you to first have a hl-node outputting blocks prior to running the initial s3 sync, +the node will prioritise locally produced blocks with a fallback to s3. +This method will allow you to reduce the need to rely on goofys. + +It is recommended that you periodically sync evm-blocks from s3 so you have a fallback in case your hl-node fails, as hl-node +will not backfill evm blocks. +```sh +# Run your local hl-node (make sure output file buffering is disabled) +# Make sure evm blocks are being produced inside evm_block_and_receipts +$ hl-node run-non-validator --replica-cmds-style recent-actions --serve-eth-rpc --disable-output-file-buffering + +# Fetch EVM blocks (Initial sync) +$ aws s3 sync s3://hl-mainnet-evm-blocks/ ~/evm-blocks --request-payer requester # one-time + +# Run node (with local-ingest-dir arg) +$ make install +$ reth node --http --http.addr 0.0.0.0 --http.api eth,ots,net,web3 \ + --ws --ws.addr 0.0.0.0 --ws.origins '*' --ws.api eth,ots,net,web3 --ingest-dir ~/evm-blocks --local-ingest-dir --ws.port 8545 +``` + ## How to run (testnet) Testnet is supported since block 21304281. diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index aec1a9faf..0457f4586 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -84,6 +84,9 @@ serde_json.workspace = true tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] } futures.workspace = true +# time +time = { workspace = true } + # misc aquamarine.workspace = true eyre.workspace = true diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index f5ef56c95..1961f3a7d 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -1,4 +1,6 @@ -use std::path::PathBuf; +use std::collections::BTreeMap; +use std::io::{BufRead, BufReader}; +use std::path::{Path, PathBuf}; use std::sync::Arc; use alloy_consensus::{BlockBody, BlockHeader, Transaction}; @@ -10,22 +12,90 @@ use alloy_rpc_types::engine::{ use jsonrpsee::http_client::{transport::HttpBackend, HttpClient}; use reth::network::PeersHandleProvider; use reth_chainspec::{EthChainSpec, EthereumHardforks}; -use reth_node_api::{FullNodeComponents, PayloadTypes}; +use reth_hyperliquid_types::PrecompilesCache; +use reth_node_api::{Block, FullNodeComponents, PayloadTypes}; use reth_node_builder::EngineTypes; use reth_node_builder::NodeTypesWithEngine; use reth_node_builder::{rpc::RethRpcAddOns, FullNode}; use reth_payload_builder::{EthBuiltPayload, EthPayloadBuilderAttributes, PayloadId}; use reth_primitives::{Transaction as TypedTransaction, TransactionSigned}; -use reth_provider::{BlockHashReader, StageCheckpointReader}; +use reth_provider::{BlockHashReader, BlockReader, StageCheckpointReader}; use reth_rpc_api::EngineApiClient; use reth_rpc_layer::AuthClientService; use reth_stages::StageId; +use serde::Deserialize; +use time::{format_description, Duration, OffsetDateTime}; +use tokio::sync::Mutex; use tracing::{debug, info}; use crate::serialized::{BlockAndReceipts, EvmBlock}; use crate::spot_meta::erc20_contract_to_spot_token; -pub(crate) struct BlockIngest(pub PathBuf); +/// Poll interval when tailing an *open* hourly file. +const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); +/// Sub‑directory that contains day folders (inside `local_ingest_dir`). +const HOURLY_SUBDIR: &str = "hourly"; + +pub(crate) struct BlockIngest { + pub ingest_dir: PathBuf, + pub local_ingest_dir: Option, + pub local_blocks_cache: Arc>>, // height → block + pub precompiles_cache: PrecompilesCache, +} + +#[derive(Deserialize)] +struct LocalBlockAndReceipts(String, BlockAndReceipts); + +struct ScanResult { + next_expected_height: u64, + new_blocks: Vec, +} + +fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult { + // info!( + // "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}", + // path, start_height, last_line + // ); + let file = std::fs::File::open(path).expect("Failed to open hour file path"); + let reader = BufReader::new(file); + + let mut new_blocks = Vec::::new(); + let mut last_height = start_height; + let lines: Vec = reader.lines().collect::>().unwrap(); + let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 }; + + for (line_idx, line) in lines.iter().enumerate().skip(skip) { + // Safety check ensuring efficiency + if line_idx < *last_line { + continue; + } + if line.trim().is_empty() { + continue; + } + + let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = + serde_json::from_str(&line).expect("Failed to parse local block and receipts"); + + let height = match &parsed_block.block { + EvmBlock::Reth115(b) => { + let block_number = b.header().number() as u64; + // Another check to ensure not returning an older block + if block_number < start_height { + continue; + } + block_number + } + }; + // println!("Iterating block height {:?} | Line {}", height, line_idx); + if height >= start_height { + last_height = last_height.max(height); + new_blocks.push(parsed_block); + *last_line = line_idx; + } + } + + ScanResult { next_expected_height: last_height + 1, new_blocks } +} async fn submit_payload( engine_api_client: &HttpClient>, @@ -53,22 +123,118 @@ async fn submit_payload( Ok(submission.latest_valid_hash.unwrap_or_default()) } +fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime { + OffsetDateTime::from_unix_timestamp_nanos((ts_sec as i128) * 1_000 * 1_000_000) + .expect("timestamp out of range") +} + +fn date_from_datetime(dt: OffsetDateTime) -> String { + dt.format(&format_description::parse("[year][month][day]").unwrap()).unwrap() +} + impl BlockIngest { - pub(crate) fn collect_block(&self, height: u64) -> Option { + pub(crate) async fn collect_block(&self, height: u64) -> Option { + // info!("Attempting to collect block @ height [{height}]"); + + // Not a one liner (using .or) to include logs + if let Some(block) = self.try_collect_local_block(height).await { + info!("Returning locally synced block for @ Height [{height}]"); + return Some(block); + } else { + self.try_collect_s3_block(height) + } + } + + pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option { let f = ((height - 1) / 1_000_000) * 1_000_000; let s = ((height - 1) / 1_000) * 1_000; - let path = format!("{}/{f}/{s}/{height}.rmp.lz4", self.0.to_string_lossy()); + let path = format!("{}/{f}/{s}/{height}.rmp.lz4", self.ingest_dir.to_string_lossy()); if std::path::Path::new(&path).exists() { let file = std::fs::File::open(path).unwrap(); let file = std::io::BufReader::new(file); let mut decoder = lz4_flex::frame::FrameDecoder::new(file); let blocks: Vec = rmp_serde::from_read(&mut decoder).unwrap(); + info!("Returning s3 synced block for @ Height [{height}]"); Some(blocks[0].clone()) } else { None } } + async fn try_collect_local_block(&self, height: u64) -> Option { + let mut u_cache = self.local_blocks_cache.lock().await; + u_cache.remove(&height) + } + + async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) { + let Some(root) = &self.local_ingest_dir else { return }; // nothing to do + let root = root.to_owned(); + let cache = self.local_blocks_cache.clone(); + let precompiles_cache = self.precompiles_cache.clone(); + + tokio::spawn(async move { + let mut next_height = current_head; + let mut dt = datetime_from_timestamp(current_ts) + .replace_minute(0) + .unwrap() + .replace_second(0) + .unwrap() + .replace_nanosecond(0) + .unwrap(); + + let mut hour = dt.hour(); + let mut day_str = date_from_datetime(dt); + let mut last_line = 0; + + loop { + let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); + + if hour_file.exists() { + let ScanResult { next_expected_height, new_blocks } = + scan_hour_file(&hour_file, &mut last_line, next_height); + if !new_blocks.is_empty() { + let mut u_cache = cache.lock().await; + let mut u_pre_cache = precompiles_cache.lock(); + for blk in new_blocks { + let precompiles = blk.read_precompile_calls.clone(); + let h = match &blk.block { + EvmBlock::Reth115(b) => { + let block_number = b.header().number() as u64; + block_number + } + }; + u_cache.insert(h, blk); + u_pre_cache.insert(h, precompiles); + } + next_height = next_expected_height; + } + } + + // Decide whether the *current* hour file is closed (past) or + // still live. If it’s in the past by > 1 h, move to next hour; + // otherwise, keep tailing the same file. + let now = OffsetDateTime::now_utc(); + + // println!("Date Current {:?}", dt); + // println!("Now Current {:?}", now); + + if dt + Duration::HOUR < now { + dt += Duration::HOUR; + hour = dt.hour(); + day_str = date_from_datetime(dt); + last_line = 0; + info!( + "Moving to a new file. {:?}", + root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")) + ); + continue; + } + + tokio::time::sleep(TAIL_INTERVAL).await; + } + }); + } + pub(crate) async fn run( &self, node: FullNode, @@ -96,9 +262,19 @@ impl BlockIngest { let engine_api = node.auth_server_handle().http_client(); let mut evm_map = erc20_contract_to_spot_token(node.chain_spec().chain_id()).await?; + let current_block_timestamp: u64 = provider + .block_by_number(head) + .expect("Failed to fetch current block in db") + .expect("Block does not exist") + .into_header() + .timestamp(); + + info!("Current height {height}, timestamp {current_block_timestamp}"); + self.start_local_ingest_loop(height, current_block_timestamp).await; + loop { - let Some(original_block) = self.collect_block(height) else { - tokio::time::sleep(std::time::Duration::from_millis(200)).await; + let Some(original_block) = self.collect_block(height).await else { + tokio::time::sleep(std::time::Duration::from_millis(25)).await; continue; }; let EvmBlock::Reth115(mut block) = original_block.block; @@ -111,6 +287,7 @@ impl BlockIngest { let BlockBody { transactions, ommers, withdrawals } = std::mem::take(block.body_mut()); let mut system_txs = vec![]; + for transaction in original_block.system_txs { let TypedTransaction::Legacy(tx) = &transaction.tx else { panic!("Unexpected transaction type"); @@ -180,10 +357,12 @@ impl BlockIngest { PayloadStatusEnum::Valid, ) .await?; + let current_timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_millis(); + if height % 100 == 0 || current_timestamp - previous_timestamp > 100 { EngineApiClient::::fork_choice_updated_v2( &engine_api, diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index d0e27d761..150fb0ea2 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -9,12 +9,16 @@ mod serialized; mod spot_meta; mod tx_forwarder; +use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; + use block_ingest::BlockIngest; use call_forwarder::CallForwarderApiServer; use clap::{Args, Parser}; use reth::cli::Cli; use reth_ethereum_cli::chainspec::EthereumChainSpecParser; +use reth_hyperliquid_types::PrecompilesCache; use reth_node_ethereum::EthereumNode; +use tokio::sync::Mutex; use tracing::info; use tx_forwarder::EthForwarderApiServer; @@ -37,12 +41,17 @@ fn main() { std::env::set_var("RUST_BACKTRACE", "1"); } + let precompiles_cache = PrecompilesCache::new(parking_lot::Mutex::new(BTreeMap::new())); + let local_blocks_cache = Arc::new(Mutex::new(BTreeMap::new())); + if let Err(err) = Cli::::parse().run( |builder, ext_args| async move { let ingest_dir = builder.config().ingest_dir.clone().expect("ingest dir not set"); + let local_ingest_dir = builder.config().local_ingest_dir.clone(); info!(target: "reth::cli", "Launching node"); let handle = builder .node(EthereumNode::default()) + .add_precompiles_cache(precompiles_cache.clone()) .extend_rpc_modules(move |ctx| { let upstream_rpc_url = ext_args.upstream_rpc_url; ctx.modules.replace_configured( @@ -62,7 +71,8 @@ fn main() { .launch() .await?; - let ingest = BlockIngest(ingest_dir); + let ingest = + BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache }; ingest.run(handle.node).await.unwrap(); handle.node_exit_future.await }, diff --git a/crates/cli/commands/src/node.rs b/crates/cli/commands/src/node.rs index 5ce7dba0f..a563d0f87 100644 --- a/crates/cli/commands/src/node.rs +++ b/crates/cli/commands/src/node.rs @@ -118,6 +118,10 @@ pub struct NodeCommand< /// EVM blocks base directory #[arg(long, default_value = "/tmp/evm-blocks")] pub ingest_dir: PathBuf, + + /// Local EVM blocks base directory + #[arg(long, default_value = "/tmp/evm-blocks-and-receipts")] + pub local_ingest_dir: PathBuf, } impl NodeCommand { @@ -170,6 +174,7 @@ impl< ext, engine, ingest_dir, + local_ingest_dir, } = self; // set up node config @@ -189,6 +194,7 @@ impl< pruning, engine, ingest_dir: Some(ingest_dir), + local_ingest_dir: Some(local_ingest_dir), }; let data_dir = node_config.datadir(); diff --git a/crates/ethereum/consensus/src/lib.rs b/crates/ethereum/consensus/src/lib.rs index 3e00654dc..d1688575f 100644 --- a/crates/ethereum/consensus/src/lib.rs +++ b/crates/ethereum/consensus/src/lib.rs @@ -225,6 +225,7 @@ where #[cfg(test)] mod tests { use super::*; + use alloy_eips::eip1559::GAS_LIMIT_BOUND_DIVISOR; use alloy_primitives::B256; use reth_chainspec::{ChainSpec, ChainSpecBuilder}; use reth_primitives_traits::proofs; diff --git a/crates/ethereum/evm/Cargo.toml b/crates/ethereum/evm/Cargo.toml index 3387ae561..104bacce8 100644 --- a/crates/ethereum/evm/Cargo.toml +++ b/crates/ethereum/evm/Cargo.toml @@ -18,6 +18,7 @@ reth-ethereum-forks.workspace = true reth-revm.workspace = true reth-evm.workspace = true reth-primitives.workspace = true +reth-node-builder.workspace = true # Ethereum reth-primitives-traits.workspace = true diff --git a/crates/ethereum/evm/src/lib.rs b/crates/ethereum/evm/src/lib.rs index 38d774e7a..2881a61d2 100644 --- a/crates/ethereum/evm/src/lib.rs +++ b/crates/ethereum/evm/src/lib.rs @@ -21,24 +21,21 @@ use alloc::sync::Arc; use alloy_consensus::{BlockHeader, Header}; use alloy_evm::eth::EthEvmContext; pub use alloy_evm::EthEvm; -use alloy_primitives::bytes::BufMut; -use alloy_primitives::hex::{FromHex, ToHexExt}; -use alloy_primitives::{Address, B256}; -use alloy_primitives::{Bytes, U256}; +use alloy_primitives::Address; +use alloy_primitives::U256; use core::{convert::Infallible, fmt::Debug}; use parking_lot::RwLock; use reth_chainspec::{ChainSpec, EthChainSpec, MAINNET}; use reth_evm::Database; use reth_evm::{ConfigureEvm, ConfigureEvmEnv, EvmEnv, EvmFactory, NextBlockEnvAttributes}; -use reth_hyperliquid_types::{ReadPrecompileInput, ReadPrecompileResult}; +use reth_hyperliquid_types::{PrecompilesCache, ReadPrecompileInput, ReadPrecompileResult}; +use reth_node_builder::HyperliquidSharedState; +use reth_primitives::SealedBlock; use reth_primitives::TransactionSigned; -use reth_primitives::{SealedBlock, Transaction}; use reth_revm::context::result::{EVMError, HaltReason}; -use reth_revm::context::Cfg; use reth_revm::handler::EthPrecompiles; use reth_revm::inspector::NoOpInspector; use reth_revm::interpreter::interpreter::EthInterpreter; -use reth_revm::precompile::{PrecompileError, PrecompileErrors, Precompiles}; use reth_revm::MainBuilder; use reth_revm::{ context::{BlockEnv, CfgEnv, TxEnv}, @@ -48,7 +45,6 @@ use reth_revm::{ use reth_revm::{Context, Inspector, MainContext}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; -use std::io::Write; use std::path::PathBuf; mod config; @@ -72,12 +68,13 @@ pub struct EthEvmConfig { chain_spec: Arc, evm_factory: HyperliquidEvmFactory, ingest_dir: Option, + shared_state: Option, } impl EthEvmConfig { /// Creates a new Ethereum EVM configuration with the given chain spec. pub fn new(chain_spec: Arc) -> Self { - Self { chain_spec, ingest_dir: None, evm_factory: Default::default() } + Self { chain_spec, ingest_dir: None, evm_factory: Default::default(), shared_state: None } } pub fn with_ingest_dir(mut self, ingest_dir: PathBuf) -> Self { @@ -86,6 +83,12 @@ impl EthEvmConfig { self } + pub fn with_shared_state(mut self, shared_state: Option) -> Self { + self.shared_state = shared_state.clone(); + self.evm_factory.shared_state = shared_state; + self + } + /// Creates a new Ethereum EVM configuration for the ethereum mainnet. pub fn mainnet() -> Self { Self::new(MAINNET.clone()) @@ -208,9 +211,10 @@ pub(crate) enum EvmBlock { #[non_exhaustive] pub struct HyperliquidEvmFactory { ingest_dir: Option, + shared_state: Option, } -pub(crate) fn collect_block(ingest_path: PathBuf, height: u64) -> Option { +pub(crate) fn collect_s3_block(ingest_path: PathBuf, height: u64) -> Option { let f = ((height - 1) / 1_000_000) * 1_000_000; let s = ((height - 1) / 1_000) * 1_000; let path = format!("{}/{f}/{s}/{height}.rmp.lz4", ingest_path.to_string_lossy()); @@ -225,6 +229,32 @@ pub(crate) fn collect_block(ingest_path: PathBuf, height: u64) -> Option Option)>> { + let mut u_cache = precompiles_cache.lock(); + u_cache.remove(&height) +} + +pub(crate) fn collect_block( + ingest_path: PathBuf, + shared_state: Option, + height: u64, +) -> Option { + // Attempt to source precompile from the cache that is shared the binary level with the block + // ingestor. + if let Some(shared_state) = shared_state { + if let Some(calls) = + get_locally_sourced_precompiles_for_height(shared_state.precompiles_cache, height) + { + return Some(BlockAndReceipts { read_precompile_calls: calls }); + } + } + // Fallback to s3 always + collect_s3_block(ingest_path, height) +} + impl EvmFactory for HyperliquidEvmFactory { type Evm, EthInterpreter>> = EthEvm>>; @@ -234,9 +264,14 @@ impl EvmFactory for HyperliquidEvmFactory { type Context = EthEvmContext; fn create_evm(&self, db: DB, input: EvmEnv) -> Self::Evm { - let cache = collect_block(self.ingest_dir.clone().unwrap(), input.block_env.number) - .unwrap() - .read_precompile_calls; + let block = collect_block( + self.ingest_dir.clone().unwrap(), + self.shared_state.clone(), + input.block_env.number, + ) + .expect("Failed to collect a submitted block. If sourcing locally, make sure your local hl-node is producing blocks."); + let cache = block.read_precompile_calls; + let evm = Context::mainnet() .with_db(db) .with_cfg(input.cfg_env) diff --git a/crates/ethereum/node/src/node.rs b/crates/ethereum/node/src/node.rs index 7aeb7615c..735347a48 100644 --- a/crates/ethereum/node/src/node.rs +++ b/crates/ethereum/node/src/node.rs @@ -248,7 +248,9 @@ where ctx: &BuilderContext, ) -> eyre::Result<(Self::EVM, Self::Executor)> { let chain_spec = ctx.chain_spec(); - let evm_config = EthEvmConfig::new(ctx.chain_spec()).with_ingest_dir(ctx.ingest_dir()); + let evm_config = EthEvmConfig::new(ctx.chain_spec()) + .with_ingest_dir(ctx.ingest_dir()) + .with_shared_state(ctx.shared_state()); let strategy_factory = EthExecutionStrategyFactory::new(chain_spec, evm_config.clone()); let executor = BasicBlockExecutorProvider::new(strategy_factory); diff --git a/crates/ethereum/node/src/payload.rs b/crates/ethereum/node/src/payload.rs index 27c2e1517..d613b7e33 100644 --- a/crates/ethereum/node/src/payload.rs +++ b/crates/ethereum/node/src/payload.rs @@ -74,6 +74,12 @@ where ctx: &BuilderContext, pool: Pool, ) -> eyre::Result { - self.build(EthEvmConfig::new(ctx.chain_spec()).with_ingest_dir(ctx.ingest_dir()), ctx, pool) + self.build( + EthEvmConfig::new(ctx.chain_spec()) + .with_ingest_dir(ctx.ingest_dir()) + .with_shared_state(ctx.shared_state()), + ctx, + pool, + ) } } diff --git a/crates/hyperliquid-types/Cargo.toml b/crates/hyperliquid-types/Cargo.toml index cbedfecb0..d455882c3 100644 --- a/crates/hyperliquid-types/Cargo.toml +++ b/crates/hyperliquid-types/Cargo.toml @@ -13,6 +13,8 @@ workspace = true [dependencies] alloy-primitives.workspace = true serde.workspace = true +tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] } +parking_lot.workspace = true [dev-dependencies] clap.workspace = true diff --git a/crates/hyperliquid-types/src/lib.rs b/crates/hyperliquid-types/src/lib.rs index c7d60349a..70d19119e 100644 --- a/crates/hyperliquid-types/src/lib.rs +++ b/crates/hyperliquid-types/src/lib.rs @@ -1,4 +1,7 @@ -use alloy_primitives::Bytes; +use std::{collections::BTreeMap, sync::Arc}; + +use alloy_primitives::{Address, Bytes}; +use parking_lot::Mutex; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize, Hash)] @@ -14,3 +17,6 @@ pub enum ReadPrecompileResult { Error, UnexpectedError, } + +pub type PrecompilesCache = + Arc)>>>>; diff --git a/crates/node/builder/Cargo.toml b/crates/node/builder/Cargo.toml index 7158a7bc6..0095bbe6c 100644 --- a/crates/node/builder/Cargo.toml +++ b/crates/node/builder/Cargo.toml @@ -55,6 +55,7 @@ reth-tokio-util.workspace = true reth-tracing.workspace = true reth-transaction-pool.workspace = true reth-basic-payload-builder.workspace = true +reth-hyperliquid-types.workspace = true ## ethereum alloy-consensus.workspace = true diff --git a/crates/node/builder/src/builder/mod.rs b/crates/node/builder/src/builder/mod.rs index 89d0917c8..ab8dce09d 100644 --- a/crates/node/builder/src/builder/mod.rs +++ b/crates/node/builder/src/builder/mod.rs @@ -16,6 +16,7 @@ use reth_cli_util::get_secret_key; use reth_db_api::{database::Database, database_metrics::DatabaseMetrics}; use reth_engine_tree::tree::TreeConfig; use reth_exex::ExExContext; +use reth_hyperliquid_types::PrecompilesCache; use reth_network::{ transactions::TransactionsManagerConfig, NetworkBuilder, NetworkConfig, NetworkConfigBuilder, NetworkHandle, NetworkManager, NetworkPrimitives, @@ -474,6 +475,14 @@ where Self { builder: self.builder.on_rpc_started(hook), task_executor: self.task_executor } } + /// Add precompiles cache + pub fn add_precompiles_cache(self, precompile_cache: PrecompilesCache) -> Self { + Self { + builder: self.builder.add_precompiles_cache(precompile_cache), + task_executor: self.task_executor, + } + } + /// Sets the hook that is run to configure the rpc modules. pub fn extend_rpc_modules(self, hook: F) -> Self where @@ -587,6 +596,8 @@ pub struct BuilderContext { pub(crate) executor: TaskExecutor, /// Config container pub(crate) config_container: WithConfigs<::ChainSpec>, + /// Shared state + pub(crate) shared_state: Option, } impl BuilderContext { @@ -596,8 +607,9 @@ impl BuilderContext { provider: Node::Provider, executor: TaskExecutor, config_container: WithConfigs<::ChainSpec>, + shared_state: Option, ) -> Self { - Self { head, provider, executor, config_container } + Self { head, provider, executor, config_container, shared_state } } /// Returns the configured provider to interact with the blockchain. @@ -754,6 +766,14 @@ impl BuilderContext { pub fn ingest_dir(&self) -> PathBuf { self.config().ingest_dir.clone().expect("ingest dir not set") } + + pub fn local_ingest_dir(&self) -> PathBuf { + self.config().local_ingest_dir.clone().expect("local ingest dir not set") + } + + pub fn shared_state(&self) -> Option { + self.shared_state.clone() + } } impl>> BuilderContext { diff --git a/crates/node/builder/src/builder/states.rs b/crates/node/builder/src/builder/states.rs index a07caccbd..24e37cb16 100644 --- a/crates/node/builder/src/builder/states.rs +++ b/crates/node/builder/src/builder/states.rs @@ -13,6 +13,7 @@ use crate::{ AddOns, FullNode, }; use reth_exex::ExExContext; +use reth_hyperliquid_types::PrecompilesCache; use reth_node_api::{FullNodeComponents, FullNodeTypes, NodeAddOns, NodeTypes}; use reth_node_core::node_config::NodeConfig; use reth_tasks::TaskExecutor; @@ -47,6 +48,7 @@ impl NodeBuilderWithTypes { adapter, components_builder, add_ons: AddOns { hooks: NodeHooks::default(), exexs: Vec::new(), add_ons: () }, + shared_state: None, } } } @@ -146,6 +148,11 @@ impl> Clone for NodeAdapter { } } +#[derive(Debug, Clone)] +pub struct HyperliquidSharedState { + pub precompiles_cache: PrecompilesCache, +} + /// A fully type configured node builder. /// /// Supports adding additional addons to the node. @@ -162,6 +169,8 @@ pub struct NodeBuilderWithComponents< pub components_builder: CB, /// Additional node extensions. pub add_ons: AddOns, AO>, + /// Shared state + pub shared_state: Option, } impl NodeBuilderWithComponents @@ -182,6 +191,7 @@ where adapter, components_builder, add_ons: AddOns { hooks: NodeHooks::default(), exexs: Vec::new(), add_ons }, + shared_state: None, } } } @@ -294,4 +304,10 @@ where add_ons }) } + + /// Add state + pub fn add_precompiles_cache(mut self, precompiles_cache: PrecompilesCache) -> Self { + self.shared_state = Some(HyperliquidSharedState { precompiles_cache }); + self + } } diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 9bbee6ae9..c174eb1d9 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -5,7 +5,7 @@ use std::{sync::Arc, thread::available_parallelism}; use crate::{ components::{NodeComponents, NodeComponentsBuilder}, hooks::OnComponentInitializedHook, - BuilderContext, NodeAdapter, + BuilderContext, HyperliquidSharedState, NodeAdapter, }; use alloy_primitives::{BlockNumber, B256}; use eyre::{Context, OptionExt}; @@ -258,6 +258,7 @@ impl LaunchContextWith> { &mut self.attachment.right } } + impl LaunchContextWith, R>> { /// Adjust certain settings in the config to make sure they are set correctly /// @@ -655,6 +656,7 @@ where on_component_initialized: Box< dyn OnComponentInitializedHook>, >, + shared_state: Option, ) -> eyre::Result< LaunchContextWith< Attached::ChainSpec>, WithComponents>, @@ -671,6 +673,7 @@ where self.blockchain_db().clone(), self.task_executor().clone(), self.configs().clone(), + shared_state, ); debug!(target: "reth::cli", "creating components"); @@ -790,15 +793,15 @@ where /// This checks for OP-Mainnet and ensures we have all the necessary data to progress (past /// bedrock height) fn ensure_chain_specific_db_checks(&self) -> ProviderResult<()> { - if self.chain_spec().is_optimism() && - !self.is_dev() && - self.chain_id() == Chain::optimism_mainnet() + if self.chain_spec().is_optimism() + && !self.is_dev() + && self.chain_id() == Chain::optimism_mainnet() { let latest = self.blockchain_db().last_block_number()?; // bedrock height if latest < 105235063 { error!("Op-mainnet has been launched without importing the pre-Bedrock state. The chain can't progress without this. See also https://reth.rs/run/sync-op-mainnet.html?minimal-bootstrap-recommended"); - return Err(ProviderError::BestBlockNotFound) + return Err(ProviderError::BestBlockNotFound); } } @@ -880,7 +883,7 @@ where &self, ) -> eyre::Result::Primitives>>> { let Some(ref hook) = self.node_config().debug.invalid_block_hook else { - return Ok(Box::new(NoopInvalidBlockHook::default())) + return Ok(Box::new(NoopInvalidBlockHook::default())); }; let healthy_node_rpc_client = self.get_healthy_node_client()?; diff --git a/crates/node/builder/src/launch/engine.rs b/crates/node/builder/src/launch/engine.rs index 3e699b913..9efd7f53d 100644 --- a/crates/node/builder/src/launch/engine.rs +++ b/crates/node/builder/src/launch/engine.rs @@ -93,7 +93,9 @@ where components_builder, add_ons: AddOns { hooks, exexs: installed_exex, add_ons }, config, + shared_state, } = target; + let NodeHooks { on_component_initialized, on_node_started, .. } = hooks; // setup the launch context @@ -126,7 +128,7 @@ where .with_blockchain_db::(move |provider_factory| { Ok(BlockchainProvider::new(provider_factory)?) })? - .with_components(components_builder, on_component_initialized).await?; + .with_components(components_builder, on_component_initialized, shared_state.clone()).await?; // spawn exexs let exex_manager_handle = ExExLauncher::new( diff --git a/crates/node/core/src/node_config.rs b/crates/node/core/src/node_config.rs index 1e0064471..eb8577798 100644 --- a/crates/node/core/src/node_config.rs +++ b/crates/node/core/src/node_config.rs @@ -148,6 +148,9 @@ pub struct NodeConfig { /// The ingest directory for the node. pub ingest_dir: Option, + + /// The local ingest directory for the node. + pub local_ingest_dir: Option, } impl NodeConfig { @@ -178,6 +181,7 @@ impl NodeConfig { datadir: DatadirArgs::default(), engine: EngineArgs::default(), ingest_dir: None, + local_ingest_dir: None, } } @@ -363,7 +367,7 @@ impl NodeConfig { // try to look up the header in the database if let Some(header) = header { info!(target: "reth::cli", ?tip, "Successfully looked up tip block in the database"); - return Ok(header.number()) + return Ok(header.number()); } Ok(self.fetch_tip_from_network(client, tip.into()).await.number()) @@ -386,7 +390,7 @@ impl NodeConfig { match get_single_header(&client, tip).await { Ok(tip_header) => { info!(target: "reth::cli", ?tip, "Successfully fetched tip"); - return tip_header + return tip_header; } Err(error) => { fetch_failures += 1; @@ -470,6 +474,7 @@ impl NodeConfig { pruning: self.pruning, engine: self.engine, ingest_dir: self.ingest_dir, + local_ingest_dir: self.local_ingest_dir, } } } @@ -498,6 +503,7 @@ impl Clone for NodeConfig { datadir: self.datadir.clone(), engine: self.engine.clone(), ingest_dir: self.ingest_dir.clone(), + local_ingest_dir: self.local_ingest_dir.clone(), } } } diff --git a/examples/custom-engine-types/src/main.rs b/examples/custom-engine-types/src/main.rs index a487034d1..010136199 100644 --- a/examples/custom-engine-types/src/main.rs +++ b/examples/custom-engine-types/src/main.rs @@ -240,7 +240,7 @@ where if attributes.custom == 0 { return Err(EngineObjectValidationError::invalid_params( CustomError::CustomFieldIsNotZero, - )) + )); } Ok(()) @@ -451,7 +451,5 @@ async fn main() -> eyre::Result<()> { .await .unwrap(); - println!("Node started"); - handle.node_exit_future.await }