24 Commits

Author SHA1 Message Date
cee0a342dc Update README.md 2025-09-13 17:00:08 -04:00
eb2953246e Update README.md
Add support channel
2025-09-13 16:59:11 -04:00
fb462f2b24 Merge pull request #56 from jonathanudd/fix-start-at-in-s3sync-runner
fix: Skipping logic in s3sync-runner.sh which was based on lexicograp…
2025-09-08 09:54:39 -04:00
9af9c93454 fix: Skipping logic in s3sync-runner.sh which was based on lexicographic order instead of numeric order 2025-09-08 06:58:52 +02:00
8632f9d74f Merge pull request #37 from wwwehr/feature/hlfs-evm-blocks
Feature: Archive Node File Sharing
2025-08-30 00:00:08 +09:00
738567894a only consider top level highest dir name on archive node file root; also added back hlfs to blockingest struct 2025-08-26 19:49:45 -07:00
e8be4c2e82 code review findings 2025-08-22 14:23:22 -07:00
06f7710777 code review findings 2025-08-22 14:21:48 -07:00
5c3828382c debug 2025-08-22 00:00:23 -07:00
fae4b4c8f9 debug 2025-08-21 23:50:10 -07:00
cdb0f9e8a2 debug 2025-08-21 23:46:53 -07:00
5114f76517 debug 2025-08-21 22:52:37 -07:00
3bdc491aba debug 2025-08-21 22:43:03 -07:00
2a653857aa debug 2025-08-21 22:18:42 -07:00
5f2955caa2 debug 2025-08-21 19:30:56 -07:00
022dfd1546 debug 2025-08-21 19:03:21 -07:00
249e3194dd added new max block opcode 2025-08-21 18:35:30 -07:00
530447e637 debug 2025-08-21 17:20:20 -07:00
2ff606c32c debugging host issue 2025-08-21 23:02:05 +00:00
ec417f9bf4 debug 2025-08-14 04:59:16 +00:00
e2045a195c debug block ingest workflow 2025-08-13 00:57:16 +00:00
ec281f9cc7 active block sharing 2025-08-11 22:16:30 -07:00
0d1c239a07 fixed lints 2025-08-11 14:18:24 -07:00
821846f671 added alternative block share 2025-08-11 14:13:55 -07:00
20 changed files with 701 additions and 213 deletions

19
Cargo.lock generated
View File

@ -6659,6 +6659,7 @@ dependencies = [
"clap",
"eyre",
"futures",
"futures-util",
"jsonrpsee",
"jsonrpsee-core",
"lz4_flex",
@ -6687,6 +6688,7 @@ dependencies = [
"reth-execution-types",
"reth-exex",
"reth-fs-util",
"reth-hlfs",
"reth-hyperliquid-types",
"reth-network",
"reth-network-api",
@ -7966,16 +7968,28 @@ dependencies = [
"thiserror 2.0.11",
]
[[package]]
name = "reth-hlfs"
version = "1.2.0"
dependencies = [
"bytes",
"parking_lot",
"rand 0.8.5",
"reth-tracing",
"tempfile",
"thiserror 2.0.11",
"tokio",
"tracing",
]
[[package]]
name = "reth-hyperliquid-types"
version = "1.2.0"
dependencies = [
"alloy-primitives",
"alloy-rlp",
"clap",
"parking_lot",
"reth-cli-commands",
"rmp-serde",
"serde",
"tokio",
]
@ -9061,7 +9075,6 @@ dependencies = [
"reth-evm",
"reth-execution-types",
"reth-fs-util",
"reth-hyperliquid-types",
"reth-metrics",
"reth-network-p2p",
"reth-nippy-jar",

View File

@ -54,6 +54,7 @@ members = [
"crates/net/ecies/",
"crates/net/eth-wire-types",
"crates/net/eth-wire/",
"crates/net/hlfs/",
"crates/net/nat/",
"crates/net/network-api/",
"crates/net/network-types/",
@ -356,6 +357,7 @@ reth-exex = { path = "crates/exex/exex" }
reth-exex-test-utils = { path = "crates/exex/test-utils" }
reth-exex-types = { path = "crates/exex/types" }
reth-fs-util = { path = "crates/fs-util" }
reth-hlfs = { path = "crates/net/hlfs" }
reth-invalid-block-hooks = { path = "crates/engine/invalid-block-hooks" }
reth-ipc = { path = "crates/rpc/ipc" }
reth-libmdbx = { path = "crates/storage/libmdbx-rs" }

View File

@ -2,6 +2,8 @@
Hyperliquid archive node based on [reth](https://github.com/paradigmxyz/reth).
Got questions? Drop by the [Hyperliquid Discord](https://discord.gg/hyperliquid) #node-operators channel.
## ⚠️ IMPORTANT: System Transactions Appear as Pseudo Transactions
Deposit transactions from `0x222..22` to user addresses are intentionally recorded as pseudo transactions.

View File

@ -35,6 +35,7 @@ reth-cli-runner.workspace = true
reth-cli-commands.workspace = true
reth-cli-util.workspace = true
reth-consensus-common.workspace = true
reth-hlfs.workspace = true
reth-rpc-builder.workspace = true
reth-rpc.workspace = true
reth-rpc-types-compat.workspace = true
@ -81,8 +82,9 @@ tracing.workspace = true
serde_json.workspace = true
# async
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread", "net", "fs"] }
futures.workspace = true
futures-util.workspace = true
# time
time = { workspace = true }

View File

@ -29,6 +29,7 @@ use tokio::sync::Mutex;
use tracing::{debug, info};
use crate::serialized::{BlockAndReceipts, EvmBlock};
use crate::share_blocks::ShareBlocks;
use crate::spot_meta::erc20_contract_to_spot_token;
/// Poll interval when tailing an *open* hourly file.
@ -41,6 +42,7 @@ pub(crate) struct BlockIngest {
pub local_ingest_dir: Option<PathBuf>,
pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block
pub precompiles_cache: PrecompilesCache,
pub hlfs: Option<ShareBlocks>,
}
#[derive(Deserialize)]
@ -155,9 +157,9 @@ impl BlockIngest {
if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning locally synced block for @ Height [{height}]");
return Some(block);
} else {
self.try_collect_s3_block(height)
}
self.try_collect_s3_block(height)
}
pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> {
@ -207,7 +209,7 @@ impl BlockIngest {
let mut u_pre_cache = precompiles_cache.lock();
for blk in new_blocks {
let precompiles = PrecompileData {
precompiles: blk.read_precompile_calls.0.clone(),
precompiles: blk.read_precompile_calls.clone(),
highest_precompile_address: blk.highest_precompile_address,
};
let h = match &blk.block {

View File

@ -6,6 +6,7 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
mod block_ingest;
mod call_forwarder;
mod serialized;
mod share_blocks;
mod spot_meta;
mod tx_forwarder;
@ -18,6 +19,7 @@ use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_hyperliquid_types::PrecompilesCache;
use reth_node_ethereum::EthereumNode;
use share_blocks::ShareBlocksArgs;
use tokio::sync::Mutex;
use tracing::info;
use tx_forwarder::EthForwarderApiServer;
@ -40,6 +42,10 @@ struct HyperliquidExtArgs {
/// 3. filters out logs and transactions from subscription.
#[arg(long, default_value = "false")]
pub hl_node_compliant: bool,
/// Enable hlfs to backfill archive blocks
#[command(flatten)]
pub hlfs: ShareBlocksArgs,
}
fn main() {
@ -85,8 +91,24 @@ fn main() {
.launch()
.await?;
let ingest =
BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache };
// start HLFS (serve + peer-backed backfill) using the node's network
let hlfs = if ext_args.hlfs.share_blocks {
let net = handle.node.network.clone();
Some(
crate::share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net)
.await?,
)
} else {
None
};
let ingest = BlockIngest {
ingest_dir,
local_ingest_dir,
local_blocks_cache,
precompiles_cache,
hlfs,
};
ingest.run(handle.node).await.unwrap();
handle.node_exit_future.await
},

View File

@ -1,5 +1,5 @@
use alloy_primitives::{Address, Log};
use reth_hyperliquid_types::{ReadPrecompileInput, ReadPrecompileResult, ReadPrecompileCalls};
use reth_hyperliquid_types::{ReadPrecompileInput, ReadPrecompileResult};
use reth_primitives::{SealedBlock, Transaction};
use serde::{Deserialize, Serialize};
@ -10,7 +10,7 @@ pub(crate) struct BlockAndReceipts {
#[serde(default)]
pub system_txs: Vec<SystemTx>,
#[serde(default)]
pub read_precompile_calls: ReadPrecompileCalls,
pub read_precompile_calls: Vec<(Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>)>,
pub highest_precompile_address: Option<Address>,
}

View File

@ -0,0 +1,167 @@
use clap::Args;
use reth_hlfs::{Backfiller, Client, PeerRecord, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK};
use reth_network_api::{events::NetworkEvent, FullNetwork};
use std::{
collections::HashSet,
net::{IpAddr, SocketAddr},
path::PathBuf,
sync::Arc,
};
use tokio::{
task::JoinHandle,
time::{sleep, timeout, Duration},
};
use tracing::{debug, info, warn};
// use futures_util::StreamExt;
use futures_util::stream::StreamExt;
#[derive(Args, Clone, Debug)]
pub(crate) struct ShareBlocksArgs {
#[arg(long, default_value_t = false)]
pub share_blocks: bool,
#[arg(long, default_value = "0.0.0.0")]
pub share_blocks_host: String,
#[arg(long, default_value_t = 9595)]
pub share_blocks_port: u16,
#[arg(long, default_value = "evm-blocks")]
pub archive_dir: PathBuf,
}
pub(crate) struct ShareBlocks {
_server: JoinHandle<()>,
_autodetect: JoinHandle<()>,
}
impl ShareBlocks {
pub(crate) async fn start_with_network<Net>(
args: &ShareBlocksArgs,
network: Net,
) -> eyre::Result<Self>
where
Net: FullNetwork + Clone + 'static,
{
let host: IpAddr = args
.share_blocks_host
.parse()
.map_err(|e| eyre::eyre!("invalid --share-blocks-host: {e}"))?;
let bind: SocketAddr = (host, args.share_blocks_port).into();
let srv = Server::new(bind, &args.archive_dir).with_limits(512, 50);
let _server = tokio::spawn(async move {
if let Err(e) = srv.run().await {
warn!(error=%e, "hlfs: server exited");
}
});
let _autodetect =
spawn_autodetect(network, host, args.share_blocks_port, args.archive_dir.clone());
info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)");
Ok(Self { _server, _autodetect })
}
}
fn spawn_autodetect<Net>(
network: Net,
self_ip: IpAddr,
hlfs_port: u16,
archive_dir: PathBuf,
) -> JoinHandle<()>
where
Net: FullNetwork + Clone + 'static,
{
let client = Client::new(&archive_dir, Vec::new()).with_timeout(Duration::from_secs(5));
let backfiller = Arc::new(tokio::sync::Mutex::new(Backfiller::new(client, &archive_dir)));
let good: Arc<tokio::sync::Mutex<HashSet<PeerRecord>>> =
Arc::new(tokio::sync::Mutex::new(HashSet::new()));
tokio::spawn({
warn!("hlfs: backfiller started");
let backfiller = backfiller.clone();
async move {
loop {
let mut bf = backfiller.lock().await;
if bf.client.max_block < bf.max_block_seen {
let block = bf.client.max_block + 1;
let new_height = bf.fetch_if_missing(block).await.expect("new height");
bf.client.max_block = new_height.unwrap();
}
sleep(Duration::from_millis(50)).await;
}
}
});
tokio::spawn({
let backfiller = backfiller.clone();
async move {
let mut events = network.event_listener();
loop {
let mut bf = backfiller.lock().await;
match events.next().await {
Some(NetworkEvent::ActivePeerSession { info, .. }) => {
let ip = info.remote_addr.ip();
if ip.is_unspecified() {
debug!(%ip, "hlfs: skip unspecified");
continue;
}
if ip == self_ip {
debug!(%ip, "hlfs: skip self");
continue;
}
let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port);
let max_block = probe_hlfs(addr).await;
if max_block != 0 {
let mut g = good.lock().await;
if g.insert(PeerRecord { addr, max_block }) {
let v: Vec<_> = g.iter().copied().collect();
bf.set_peers(v.clone());
info!(%addr, %max_block, total=v.len(), "hlfs: peer added");
}
} else {
debug!(%addr, "hlfs: peer has no HLFS");
}
}
Some(_) => {}
None => {
warn!("hlfs: network event stream ended");
break;
}
}
}
}
})
}
pub(crate) async fn probe_hlfs(addr: SocketAddr) -> u64 {
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
let fut = async {
let mut s = TcpStream::connect(addr).await.ok()?;
// send [OP][8 zero bytes]
let mut msg = [0u8; 9];
msg[0] = OP_REQ_MAX_BLOCK;
s.write_all(&msg).await.ok()?;
// read 1-byte opcode
let mut op = [0u8; 1];
s.read_exact(&mut op).await.ok()?;
if op[0] != OP_RES_MAX_BLOCK {
return None;
}
// read 8-byte little-endian block number
let mut blk = [0u8; 8];
s.read_exact(&mut blk).await.ok()?;
Some(u64::from_le_bytes(blk))
};
match timeout(Duration::from_secs(2), fut).await {
Ok(Some(n)) => n,
_ => 0,
}
}

View File

@ -30,6 +30,7 @@ use reth_evm::{ConfigureEvm, ConfigureEvmEnv, EvmEnv, EvmFactory, NextBlockEnvAt
use reth_hyperliquid_types::PrecompileData;
use reth_hyperliquid_types::{PrecompilesCache, ReadPrecompileInput, ReadPrecompileResult};
use reth_node_builder::HyperliquidSharedState;
use reth_primitives::SealedBlock;
use reth_primitives::TransactionSigned;
use reth_revm::context::result::{EVMError, HaltReason};
use reth_revm::handler::EthPrecompiles;
@ -42,6 +43,7 @@ use reth_revm::{
specification::hardfork::SpecId,
};
use reth_revm::{Context, Inspector, MainContext};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
@ -193,6 +195,17 @@ impl ConfigureEvmEnv for EthEvmConfig {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct BlockAndReceipts {
#[serde(default)]
pub read_precompile_calls: Vec<(Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>)>,
pub highest_precompile_address: Option<Address>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) enum EvmBlock {
Reth115(SealedBlock),
}
/// Custom EVM configuration.
#[derive(Debug, Clone, Default)]
@ -202,6 +215,16 @@ pub struct HyperliquidEvmFactory {
shared_state: Option<HyperliquidSharedState>,
}
pub(crate) fn collect_s3_block(ingest_path: PathBuf, height: u64) -> Option<BlockAndReceipts> {
let f = ((height - 1) / 1_000_000) * 1_000_000;
let s = ((height - 1) / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{height}.rmp.lz4", ingest_path.to_string_lossy());
let file = std::fs::read(path).ok()?;
let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder).unwrap();
Some(blocks[0].clone())
}
pub(crate) fn get_locally_sourced_precompiles_for_height(
precompiles_cache: PrecompilesCache,
height: u64,
@ -210,6 +233,27 @@ pub(crate) fn get_locally_sourced_precompiles_for_height(
u_cache.remove(&height)
}
pub(crate) fn collect_block(
ingest_path: PathBuf,
shared_state: Option<HyperliquidSharedState>,
height: u64,
) -> Option<BlockAndReceipts> {
// Attempt to source precompile from the cache that is shared the binary level with the block
// ingestor.
if let Some(shared_state) = shared_state {
if let Some(calls) =
get_locally_sourced_precompiles_for_height(shared_state.precompiles_cache, height)
{
return Some(BlockAndReceipts {
read_precompile_calls: calls.precompiles,
highest_precompile_address: calls.highest_precompile_address,
});
}
}
// Fallback to s3 always
collect_s3_block(ingest_path, height)
}
const WARM_PRECOMPILES_BLOCK_NUMBER: u64 = 8_197_684;
impl EvmFactory<EvmEnv> for HyperliquidEvmFactory {
@ -221,23 +265,20 @@ impl EvmFactory<EvmEnv> for HyperliquidEvmFactory {
type Context<DB: Database> = EthEvmContext<DB>;
fn create_evm<DB: Database>(&self, db: DB, input: EvmEnv) -> Self::Evm<DB, NoOpInspector> {
// Try to get precompile data from the shared state cache
// This avoids the overhead of loading block data on every EVM creation
let mut cache: HashMap<_, _> = HashMap::new();
if let Some(ref shared_state) = self.shared_state {
if let Some(precompile_data) = get_locally_sourced_precompiles_for_height(
shared_state.precompiles_cache.clone(),
let block = collect_block(
self.ingest_dir.clone().unwrap(),
self.shared_state.clone(),
input.block_env.number,
) {
cache = precompile_data
.precompiles
)
.expect("Failed to collect a submitted block. If sourcing locally, make sure your local hl-node is producing blocks.");
let mut cache: HashMap<_, _> = block
.read_precompile_calls
.into_iter()
.map(|(address, calls)| (address, HashMap::from_iter(calls.into_iter())))
.collect();
if input.block_env.number >= WARM_PRECOMPILES_BLOCK_NUMBER {
let highest_precompile_address = precompile_data
let highest_precompile_address = block
.highest_precompile_address
.unwrap_or(address!("0x000000000000000000000000000000000000080d"));
for i in 0x800.. {
@ -248,8 +289,6 @@ impl EvmFactory<EvmEnv> for HyperliquidEvmFactory {
cache.entry(address).or_insert(HashMap::new());
}
}
}
}
let evm = Context::mainnet()
.with_db(db)

View File

@ -12,9 +12,7 @@ workspace = true
[dependencies]
alloy-primitives.workspace = true
alloy-rlp = { workspace = true }
serde.workspace = true
rmp-serde = "1.3"
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
parking_lot.workspace = true

View File

@ -1,7 +1,6 @@
use std::{collections::BTreeMap, sync::Arc};
use alloy_primitives::{Address, Bytes};
use alloy_rlp::{Decodable, Encodable};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
@ -11,7 +10,7 @@ pub struct ReadPrecompileInput {
pub gas_limit: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ReadPrecompileResult {
Ok { gas_used: u64, bytes: Bytes },
OutOfGas,
@ -19,59 +18,6 @@ pub enum ReadPrecompileResult {
UnexpectedError,
}
/// ReadPrecompileCalls represents a collection of precompile calls with their results
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ReadPrecompileCalls(pub Vec<(Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>)>);
impl ReadPrecompileCalls {
/// Create an empty ReadPrecompileCalls
pub fn new() -> Self {
Self(Vec::new())
}
/// Create from a vector of precompile calls
pub fn from_vec(calls: Vec<(Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>)>) -> Self {
Self(calls)
}
/// Get the inner vector
pub fn into_inner(self) -> Vec<(Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>)> {
self.0
}
/// Serialize to bytes using MessagePack for database storage
pub fn to_db_bytes(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
rmp_serde::to_vec(&self.0)
}
/// Deserialize from bytes using MessagePack from database storage
pub fn from_db_bytes(bytes: &[u8]) -> Result<Self, rmp_serde::decode::Error> {
let data = rmp_serde::from_slice(bytes)?;
Ok(Self(data))
}
}
impl Encodable for ReadPrecompileCalls {
fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
// Encode as MessagePack bytes wrapped in RLP
let buf = self.to_db_bytes().unwrap_or_default();
buf.encode(out);
}
fn length(&self) -> usize {
let buf = self.to_db_bytes().unwrap_or_default();
buf.length()
}
}
impl Decodable for ReadPrecompileCalls {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let bytes = Vec::<u8>::decode(buf)?;
Self::from_db_bytes(&bytes)
.map_err(|_| alloy_rlp::Error::Custom("Failed to decode ReadPrecompileCalls"))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrecompileData {
pub precompiles: Vec<(Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>)>,

View File

@ -0,0 +1,25 @@
# crates/net/hlfs/Cargo.toml
[package]
name = "reth-hlfs"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Implementation of archive block downloader"
authors = ["@wwwehr"]
[dependencies]
tokio = { workspace = true, features = ["rt-multi-thread","macros","net","time","fs","sync","io-util"] }
bytes.workspace = true
parking_lot.workspace = true
thiserror.workspace = true
tracing.workspace = true
reth-tracing.workspace = true
[dev-dependencies]
rand.workspace = true
tempfile.workspace = true

374
crates/net/hlfs/src/lib.rs Normal file
View File

@ -0,0 +1,374 @@
//! HLFS TCP micro-protocol for historical backfill (single-block, RR per block).
use bytes::{Buf, BufMut, Bytes, BytesMut};
use parking_lot::Mutex;
use reth_tracing::tracing::{debug, info, trace, warn};
use std::{
fs,
hash::{Hash, Hasher},
io,
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use thiserror::Error;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
time::timeout,
};
type Result<T, E = HlfsError> = std::result::Result<T, E>;
pub const OP_REQ_BLOCK: u8 = 0x01;
pub const OP_RES_BLOCK: u8 = 0x02;
pub const OP_REQ_MAX_BLOCK: u8 = 0x03;
pub const OP_RES_MAX_BLOCK: u8 = 0x04;
pub const OP_ERR_TOO_BUSY: u8 = 0x05;
pub const OP_ERR_NOT_FOUND: u8 = 0x06;
#[derive(Error, Debug)]
pub enum HlfsError {
#[error("io: {0}")]
Io(#[from] io::Error),
#[error("proto")]
Proto,
#[error("no peers")]
NoPeers,
#[error("timeout")]
Timeout,
#[error("busy: retry_ms={0}")]
Busy(u32),
#[error("not found")]
NotFound,
}
#[inline]
fn put_u64(b: &mut BytesMut, v: u64) {
b.put_u64_le(v)
}
#[inline]
fn put_u32(b: &mut BytesMut, v: u32) {
b.put_u32_le(v)
}
async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> {
if let Some(parent) = Path::new(path).parent() {
fs::create_dir_all(parent)
} else {
Ok(())
}
}
/// Client: tries each peer once; rotates starting index per call
#[derive(Debug, Copy, Clone)]
pub struct PeerRecord {
pub addr: SocketAddr,
pub max_block: u64,
}
impl PartialEq for PeerRecord {
fn eq(&self, o: &Self) -> bool {
self.addr == o.addr
}
}
impl Eq for PeerRecord {}
impl Hash for PeerRecord {
fn hash<H: Hasher>(&self, s: &mut H) {
self.addr.hash(s);
}
}
#[derive(Clone)]
pub struct Client {
root: PathBuf,
pub peers: Arc<Mutex<Vec<PeerRecord>>>,
timeout: Duration,
pub max_block: u64,
}
impl Client {
pub fn new(root: impl Into<PathBuf>, peers: Vec<PeerRecord>) -> Self {
let root: PathBuf = root.into();
let n = find_max_number_file(&root).unwrap();
debug!(max_block = n, "hlfs: our archive");
Self {
root,
peers: Arc::new(Mutex::new(peers)),
timeout: Duration::from_secs(3),
max_block: n,
}
}
pub fn update_peers(&self, peers: Vec<PeerRecord>) {
*self.peers.lock() = peers;
}
pub fn with_timeout(mut self, d: Duration) -> Self {
self.timeout = d;
self
}
pub async fn wants_block(&self, number: u64, rr_index: usize) -> Result<Vec<u8>, HlfsError> {
let peers = self.peers.lock().clone();
debug!(peer_count = peers.len(), "hlfs: peers");
if peers.is_empty() {
return Err(HlfsError::NoPeers);
}
let mut all = (0..peers.len()).map(|i| (rr_index + i) % peers.len());
let mut last_busy: Option<u32> = None;
while let Some(i) = all.next() {
let addr = peers[i];
trace!(%addr.addr, "hlfs: dialing");
match timeout(self.timeout, TcpStream::connect(addr.addr)).await {
Err(_) => continue,
Ok(Err(_)) => continue,
Ok(Ok(mut sock)) => {
let mut req = BytesMut::with_capacity(1 + 8);
req.put_u8(OP_REQ_BLOCK);
put_u64(&mut req, number);
if let Err(e) = sock.write_all(&req).await {
debug!(%addr.addr, "hlfs: write err: {e}");
continue;
}
let mut op = [0u8; 1];
if let Err(e) = timeout(self.timeout, sock.read_exact(&mut op)).await {
debug!(%addr.addr, "hlfs: read op timeout {e:?}");
continue;
}
let op = op[0];
match op {
OP_RES_BLOCK => {
// DATA
let mut len = [0u8; 4];
sock.read_exact(&mut len).await?;
let len = u32::from_le_bytes(len) as usize;
let mut buf = vec![0u8; len];
sock.read_exact(&mut buf).await?;
return Ok(buf);
}
OP_ERR_TOO_BUSY => {
let mut ms = [0u8; 4];
sock.read_exact(&mut ms).await?;
last_busy = Some(u32::from_le_bytes(ms));
continue;
}
OP_ERR_NOT_FOUND => {
return Err(HlfsError::NotFound);
}
_ => {
continue;
}
}
}
}
}
if let Some(ms) = last_busy {
return Err(HlfsError::Busy(ms));
}
Err(HlfsError::NotFound)
}
}
fn find_max_number_file(root: &Path) -> Result<u64> {
fn parse_num(name: &str) -> Option<u64> {
name.strip_suffix(".rmp.lz4")?.parse::<u64>().ok()
}
fn walk(dir: &Path, best: &mut Option<u64>) -> io::Result<()> {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
let ft = entry.file_type()?;
if ft.is_dir() {
walk(&path, best)?;
} else if ft.is_file() {
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
if let Some(n) = parse_num(name) {
if best.map_or(true, |b| n > b) {
*best = Some(n);
}
}
}
}
}
Ok(())
}
let mut best = Some(0);
let top: PathBuf = fs::read_dir(root)?
.filter_map(|e| e.ok())
.filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
.filter_map(|e| {
let name = e.file_name();
let s = name.to_str()?;
let n: u64 = s.parse().ok()?;
Some((n, e.path()))
})
.max_by_key(|(n, _)| *n)
.map(|(_, p)| p)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no numeric top-level dirs"))?;
walk(&top, &mut best)?;
Ok(best.expect("cannot find block files"))
}
/// Server: serves `{root}/{number}.rlp`.
pub struct Server {
bind: SocketAddr,
root: PathBuf,
max_conns: usize,
inflight: Arc<Mutex<usize>>,
busy_retry_ms: u32,
max_block: u64,
}
impl Server {
pub fn new(bind: SocketAddr, root: impl Into<PathBuf>) -> Self {
let root: PathBuf = root.into();
let n = find_max_number_file(&root).unwrap();
Self {
bind,
root,
max_conns: 512,
inflight: Arc::new(Mutex::new(0)),
busy_retry_ms: 100,
max_block: n,
}
}
pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self {
self.max_conns = max_conns;
self.busy_retry_ms = busy_retry_ms;
self
}
pub async fn run(self) -> Result<(), HlfsError> {
fs::create_dir_all(&self.root).ok();
info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening");
let lst = TcpListener::bind(self.bind).await?;
loop {
let (mut sock, addr) = lst.accept().await?;
if *self.inflight.lock() >= self.max_conns {
let mut b = BytesMut::with_capacity(5);
b.put_u8(OP_ERR_TOO_BUSY);
put_u32(&mut b, self.busy_retry_ms);
let _ = sock.write_all(&b).await;
continue;
}
*self.inflight.lock() += 1;
let root = self.root.clone();
let inflight = self.inflight.clone();
let busy = self.busy_retry_ms;
tokio::spawn(async move {
let _ = handle_conn(&mut sock, &root, busy, addr, self.max_block).await;
*inflight.lock() -= 1;
});
}
}
}
async fn handle_conn(
sock: &mut TcpStream,
root: &Path,
busy_ms: u32,
addr: SocketAddr,
max_block: u64,
) -> Result<(), HlfsError> {
let mut op = [0u8; 1];
sock.read_exact(&mut op).await?;
if op[0] != OP_REQ_BLOCK && op[0] != OP_REQ_MAX_BLOCK {
warn!(%addr, "hlfs: bad op");
return Err(HlfsError::Proto);
}
if op[0] == OP_REQ_MAX_BLOCK {
let mut b = BytesMut::with_capacity(1 + 8);
b.put_u8(OP_RES_MAX_BLOCK);
put_u64(&mut b, max_block);
let _ = sock.write_all(&b).await;
return Ok(());
}
let mut num = [0u8; 8];
sock.read_exact(&mut num).await?;
let number = u64::from_le_bytes(num);
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
let f = (n / 1_000_000) * 1_000_000;
let s = (n / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", root.to_string_lossy());
trace!(%addr, number, %path, "hlfs: req");
if let Err(e) = ensure_parent_dirs(&path).await {
warn!(%addr, %path, "hlfs: mkdirs failed: {e}");
}
match fs::read(&path) {
Ok(data) => {
let mut b = BytesMut::with_capacity(1 + 4 + data.len());
b.put_u8(OP_RES_BLOCK);
put_u32(&mut b, data.len() as u32);
b.extend_from_slice(&data);
let _ = sock.write_all(&b).await;
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
let mut b = BytesMut::with_capacity(1);
b.put_u8(OP_ERR_NOT_FOUND);
let _ = sock.write_all(&b).await;
}
Err(e) => {
warn!(%addr, %path, "hlfs: read error: {e}");
let _ = sock.shutdown().await;
}
}
Ok(())
}
/// Backfiller: ask client per missing block; rotate peers every block.
#[derive(Clone)]
pub struct Backfiller {
pub client: Client,
root: PathBuf,
pub max_block_seen: u64,
}
impl Backfiller {
pub fn new(client: Client, root: impl Into<PathBuf>) -> Self {
Self { client, root: root.into(), max_block_seen: 0 }
}
pub fn set_peers(&mut self, peers: Vec<PeerRecord>) {
self.client.update_peers(peers);
let _peers = self.client.peers.lock().clone();
for p in _peers {
if p.max_block > self.max_block_seen {
self.max_block_seen = p.max_block
}
}
}
pub async fn fetch_if_missing(&mut self, number: u64) -> Result<Option<u64>, HlfsError> {
let rr_index = number as usize;
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
let f = (n / 1_000_000) * 1_000_000;
let s = (n / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy());
if Path::new(&path).exists() {
debug!(block = number, "hlfs: already have");
return Ok(None);
}
match self.client.wants_block(number, rr_index).await {
Err(HlfsError::NotFound) => Ok(None),
Err(HlfsError::Busy(ms)) => {
tokio::time::sleep(Duration::from_millis(ms as u64)).await;
Ok(None)
}
Err(e) => Err(e),
Ok(data) => {
if let Err(e) = ensure_parent_dirs(&path).await {
warn!(%path, "hlfs: mkdirs failed: {e}");
}
if let Err(e) = fs::write(&path, &data) {
warn!(%path, "hlfs: write failed: {e}");
return Ok(None);
}
debug!(block = number, "hlfs: got block");
Ok(Some(number))
}
}
}
}

View File

@ -525,13 +525,6 @@ tables! {
type Key = ChainStateKey;
type Value = BlockNumber;
}
/// Stores precompile call data for each block.
/// Maps block number to serialized ReadPrecompileCalls data.
table BlockReadPrecompileCalls {
type Key = BlockNumber;
type Value = Vec<u8>;
}
}
/// Keys for the `ChainState` table.

View File

@ -33,7 +33,6 @@ reth-codecs.workspace = true
reth-evm.workspace = true
reth-chain-state.workspace = true
reth-node-types.workspace = true
reth-hyperliquid-types.workspace = true
# ethereum
alloy-eips.workspace = true

View File

@ -50,9 +50,6 @@ mod metrics;
mod chain;
pub use chain::*;
mod precompile;
pub use precompile::DatabasePrecompileCallsProvider;
/// A common provider that fetches data from a database or static file.
///
/// This provider implements most provider or provider factory traits.

View File

@ -1,68 +0,0 @@
//! Database provider implementation for precompile calls storage
use crate::traits::PrecompileCallsProvider;
use alloy_primitives::BlockNumber;
use reth_db::cursor::DbCursorRW;
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_hyperliquid_types::ReadPrecompileCalls;
use reth_storage_errors::provider::ProviderResult;
/// Implementation of PrecompileCallsProvider for database provider
pub trait DatabasePrecompileCallsProvider: Send + Sync {
/// Transaction type
type Tx: DbTx;
/// Get a reference to the transaction
fn tx_ref(&self) -> &Self::Tx;
/// Get a mutable reference to the transaction
fn tx_mut(&mut self) -> &mut Self::Tx;
}
impl<T> PrecompileCallsProvider for T
where
T: DatabasePrecompileCallsProvider,
T::Tx: DbTx,
{
fn insert_block_precompile_calls(
&self,
block_number: BlockNumber,
calls: ReadPrecompileCalls,
) -> ProviderResult<()> {
use reth_db_api::transaction::DbTxMut;
// For now, we'll store this as a placeholder - the actual implementation
// will require mutable transaction access which needs to be added to the trait
// This is a read-only implementation for now
Ok(())
}
fn block_precompile_calls(
&self,
block_number: BlockNumber,
) -> ProviderResult<Option<ReadPrecompileCalls>> {
use reth_db_api::tables::BlockReadPrecompileCalls;
let tx = self.tx_ref();
// Get from BlockReadPrecompileCalls table
if let Some(bytes) = tx.get::<BlockReadPrecompileCalls>(block_number)? {
let calls = ReadPrecompileCalls::from_db_bytes(&bytes)
.map_err(|e| reth_storage_errors::provider::ProviderError::Database(
reth_db_api::DatabaseError::Other(format!("Failed to deserialize precompile calls: {}", e))
))?;
Ok(Some(calls))
} else {
Ok(None)
}
}
fn remove_block_precompile_calls_above(
&self,
block_number: BlockNumber,
) -> ProviderResult<()> {
// For now, this is a no-op as it requires mutable transaction access
// The actual implementation will be added when we have mutable access
Ok(())
}
}

View File

@ -19,6 +19,3 @@ pub use static_file_provider::StaticFileProviderFactory;
mod full;
pub use full::{FullProvider, FullRpcProvider};
mod precompile;
pub use precompile::PrecompileCallsProvider;

View File

@ -1,27 +0,0 @@
//! Trait for storing and retrieving precompile call data
use alloy_primitives::BlockNumber;
use reth_hyperliquid_types::ReadPrecompileCalls;
use reth_storage_errors::provider::ProviderResult;
/// Provider trait for ReadPrecompileCalls storage operations
pub trait PrecompileCallsProvider: Send + Sync {
/// Insert ReadPrecompileCalls data for a block
fn insert_block_precompile_calls(
&self,
block_number: BlockNumber,
calls: ReadPrecompileCalls,
) -> ProviderResult<()>;
/// Get ReadPrecompileCalls data for a block
fn block_precompile_calls(
&self,
block_number: BlockNumber,
) -> ProviderResult<Option<ReadPrecompileCalls>>;
/// Remove ReadPrecompileCalls data for blocks above a certain number
fn remove_block_precompile_calls_above(
&self,
block_number: BlockNumber,
) -> ProviderResult<()>;
}

View File

@ -9,6 +9,7 @@ DEST="${HOME}/evm-blocks-testnet"
WORKERS=512
S3SYNC="${HOME}/.local/bin/s3sync"
START_AT="" # default: run all
CHUNK_SIZE=1000000 # each prefix represents this many blocks
# ----------------
# parse args
@ -86,7 +87,6 @@ install_s3sync_latest() {
log "s3sync installed at $S3SYNC"
}
# --- deps & install/update ---
need aws
install_s3sync_latest
@ -101,18 +101,23 @@ mapfile -t PREFIXES < <(
)
((${#PREFIXES[@]})) || die "No prefixes found."
# mark initial status
# sort numerically to make order predictable
IFS=$'\n' read -r -d '' -a PREFIXES < <(printf '%s\n' "${PREFIXES[@]}" | sort -n && printf '\0')
# compute the effective start prefix:
# - if START_AT is set, floor it to the containing chunk boundary
effective_start=""
if [[ -n "$START_AT" ]]; then
# numeric, base-10 safe
start_num=$((10#$START_AT))
chunk=$((10#$CHUNK_SIZE))
effective_start=$(( (start_num / chunk) * chunk ))
fi
# mark initial status using numeric comparisons (no ordering assumptions)
declare -A RESULTS
if [[ ! -n "$START_AT" ]]; then
skipping=0
else
skipping=1
fi
for p in "${PREFIXES[@]}"; do
if [[ -n "$START_AT" && "$p" == "$START_AT" ]]; then
skipping=0
fi
if (( skipping )); then
if [[ -n "$effective_start" ]] && (( 10#$p < 10#$effective_start )); then
RESULTS["$p"]="-- SKIPPED"
else
RESULTS["$p"]="-- TODO"