35 Commits

Author SHA1 Message Date
cee0a342dc Update README.md 2025-09-13 17:00:08 -04:00
eb2953246e Update README.md
Add support channel
2025-09-13 16:59:11 -04:00
fb462f2b24 Merge pull request #56 from jonathanudd/fix-start-at-in-s3sync-runner
fix: Skipping logic in s3sync-runner.sh which was based on lexicograp…
2025-09-08 09:54:39 -04:00
9af9c93454 fix: Skipping logic in s3sync-runner.sh which was based on lexicographic order instead of numeric order 2025-09-08 06:58:52 +02:00
8632f9d74f Merge pull request #37 from wwwehr/feature/hlfs-evm-blocks
Feature: Archive Node File Sharing
2025-08-30 00:00:08 +09:00
5baa5770ac Merge pull request #32 from wwwehr/main
Update testnet instructions on README
2025-08-29 12:50:07 +09:00
fa9f8fc5df Updated top level README.md
updated instructions and prefer the s3sync-runner tool
2025-08-28 11:34:18 -07:00
bf51dc83e5 incorporated s3 sync tool from external github repo 2025-08-28 11:27:31 -07:00
738567894a only consider top level highest dir name on archive node file root; also added back hlfs to blockingest struct 2025-08-26 19:49:45 -07:00
e9dcff4015 readme nits 2025-08-26 19:04:18 -07:00
21e7c718ea Update README.md
Co-authored-by: sprites0 <lovelysprites@gmail.com>
2025-08-26 19:02:19 -07:00
8c6ea1ae7a Update README.md
Co-authored-by: sprites0 <lovelysprites@gmail.com>
2025-08-26 19:01:41 -07:00
e8be4c2e82 code review findings 2025-08-22 14:23:22 -07:00
06f7710777 code review findings 2025-08-22 14:21:48 -07:00
5c3828382c debug 2025-08-22 00:00:23 -07:00
fae4b4c8f9 debug 2025-08-21 23:50:10 -07:00
cdb0f9e8a2 debug 2025-08-21 23:46:53 -07:00
5114f76517 debug 2025-08-21 22:52:37 -07:00
3bdc491aba debug 2025-08-21 22:43:03 -07:00
2a653857aa debug 2025-08-21 22:18:42 -07:00
5f2955caa2 debug 2025-08-21 19:30:56 -07:00
022dfd1546 debug 2025-08-21 19:03:21 -07:00
249e3194dd added new max block opcode 2025-08-21 18:35:30 -07:00
530447e637 debug 2025-08-21 17:20:20 -07:00
29c8d4fa39 Merge pull request #1 from wwwehr/fix/clarify-testnet-build
Update testnet instructions on README
2025-08-21 17:06:12 -07:00
5d3041b10d Update README.md 2025-08-21 16:24:25 -07:00
2ff606c32c debugging host issue 2025-08-21 23:02:05 +00:00
9f952ac2ed fix: Prevent excessive file crawling when syncing the first block 2025-08-20 21:49:17 -04:00
ec417f9bf4 debug 2025-08-14 04:59:16 +00:00
e2045a195c debug block ingest workflow 2025-08-13 00:57:16 +00:00
ec281f9cc7 active block sharing 2025-08-11 22:16:30 -07:00
0d1c239a07 fixed lints 2025-08-11 14:18:24 -07:00
821846f671 added alternative block share 2025-08-11 14:13:55 -07:00
aab45b9c02 Merge pull request #23 from hl-archive-node/fix/handle-hl-node-termination
fix: Handle incomplete line when handling local-block-ingest
2025-07-19 21:47:07 -04:00
0ce6b818ad fix: Handle incomplete line when handling local-block-ingest 2025-07-19 17:04:01 -04:00
11 changed files with 892 additions and 11 deletions

16
Cargo.lock generated
View File

@ -6659,6 +6659,7 @@ dependencies = [
"clap", "clap",
"eyre", "eyre",
"futures", "futures",
"futures-util",
"jsonrpsee", "jsonrpsee",
"jsonrpsee-core", "jsonrpsee-core",
"lz4_flex", "lz4_flex",
@ -6687,6 +6688,7 @@ dependencies = [
"reth-execution-types", "reth-execution-types",
"reth-exex", "reth-exex",
"reth-fs-util", "reth-fs-util",
"reth-hlfs",
"reth-hyperliquid-types", "reth-hyperliquid-types",
"reth-network", "reth-network",
"reth-network-api", "reth-network-api",
@ -7966,6 +7968,20 @@ dependencies = [
"thiserror 2.0.11", "thiserror 2.0.11",
] ]
[[package]]
name = "reth-hlfs"
version = "1.2.0"
dependencies = [
"bytes",
"parking_lot",
"rand 0.8.5",
"reth-tracing",
"tempfile",
"thiserror 2.0.11",
"tokio",
"tracing",
]
[[package]] [[package]]
name = "reth-hyperliquid-types" name = "reth-hyperliquid-types"
version = "1.2.0" version = "1.2.0"

View File

@ -54,6 +54,7 @@ members = [
"crates/net/ecies/", "crates/net/ecies/",
"crates/net/eth-wire-types", "crates/net/eth-wire-types",
"crates/net/eth-wire/", "crates/net/eth-wire/",
"crates/net/hlfs/",
"crates/net/nat/", "crates/net/nat/",
"crates/net/network-api/", "crates/net/network-api/",
"crates/net/network-types/", "crates/net/network-types/",
@ -356,6 +357,7 @@ reth-exex = { path = "crates/exex/exex" }
reth-exex-test-utils = { path = "crates/exex/test-utils" } reth-exex-test-utils = { path = "crates/exex/test-utils" }
reth-exex-types = { path = "crates/exex/types" } reth-exex-types = { path = "crates/exex/types" }
reth-fs-util = { path = "crates/fs-util" } reth-fs-util = { path = "crates/fs-util" }
reth-hlfs = { path = "crates/net/hlfs" }
reth-invalid-block-hooks = { path = "crates/engine/invalid-block-hooks" } reth-invalid-block-hooks = { path = "crates/engine/invalid-block-hooks" }
reth-ipc = { path = "crates/rpc/ipc" } reth-ipc = { path = "crates/rpc/ipc" }
reth-libmdbx = { path = "crates/storage/libmdbx-rs" } reth-libmdbx = { path = "crates/storage/libmdbx-rs" }

View File

@ -2,6 +2,8 @@
Hyperliquid archive node based on [reth](https://github.com/paradigmxyz/reth). Hyperliquid archive node based on [reth](https://github.com/paradigmxyz/reth).
Got questions? Drop by the [Hyperliquid Discord](https://discord.gg/hyperliquid) #node-operators channel.
## ⚠️ IMPORTANT: System Transactions Appear as Pseudo Transactions ## ⚠️ IMPORTANT: System Transactions Appear as Pseudo Transactions
Deposit transactions from `0x222..22` to user addresses are intentionally recorded as pseudo transactions. Deposit transactions from `0x222..22` to user addresses are intentionally recorded as pseudo transactions.
@ -16,17 +18,35 @@ Building NanoReth from source requires Rust and Cargo to be installed:
## How to run (mainnet) ## How to run (mainnet)
1) `$ aws s3 sync s3://hl-mainnet-evm-blocks/ ~/evm-blocks --request-payer requester # one-time` - this will backfill the existing blocks from HyperLiquid's EVM S3 bucket. The current state of the block files comprise of millions of small objects totalling over 20 Gigs and counting. The "requester pays" option means you will need a configured aws environment, and you could incur charges which varies according to destination (ec2 versus local).
2) `$ make install` - this will install the NanoReth binary. 1) this will backfill the existing blocks from Hyperliquid's EVM S3 bucket:
3) Start NanoReth which will begin syncing using the blocks in `~/evm-blocks`: > use our rust based s3 tool wrapper to optimize your download experience - [read more](./etc/evm-block-sync/README.md)
```shell
chmod +x ./etc/evm-block-sync/s3sync-runner.sh
./etc/evm-block-sync/s3sync-runner.sh
```
> or use the conventional [aws cli](https://aws.amazon.com/cli/)
```shell
aws s3 sync s3://hl-mainnet-evm-blocks/ ~/evm-blocks \
--request-payer requester \
--exact-timestamps \
--size-only \
--only-show-errors
```
1) `$ make install` - this will install the NanoReth binary.
2) Start NanoReth which will begin syncing using the blocks in `~/evm-blocks`:
```sh ```sh
$ reth node --http --http.addr 0.0.0.0 --http.api eth,ots,net,web3 --ws --ws.addr 0.0.0.0 --ws.origins '*' --ws.api eth,ots,net,web3 --ingest-dir ~/evm-blocks --ws.port 8545 $ reth node --http --http.addr 0.0.0.0 --http.api eth,ots,net,web3 --ws --ws.addr 0.0.0.0 --ws.origins '*' --ws.api eth,ots,net,web3 --ingest-dir ~/evm-blocks --ws.port 8545
``` ```
4) Once the node logs stops making progress this means it's caught up with the existing blocks. 3) Once the node logs stops making progress this means it's caught up with the existing blocks.
Stop the NanoReth process and then start Goofys: `$ goofys --region=ap-northeast-1 --requester-pays hl-mainnet-evm-blocks evm-blocks` Stop the NanoReth process and then start Goofys: `$ goofys --region=ap-northeast-1 --requester-pays hl-mainnet-evm-blocks evm-blocks`
@ -65,12 +85,25 @@ $ reth node --http --http.addr 0.0.0.0 --http.api eth,ots,net,web3 \
Testnet is supported since block 21304281. Testnet is supported since block 21304281.
> [!NOTE]
> To run testnet locally, you will need:
> - [ ] [git lfs](https://git-lfs.com/)
> - [ ] [rust toolchain](https://rustup.rs/)
```sh ```sh
# Get testnet genesis at block 21304281 # Get testnet genesis at block 21304281
$ cd ~ $ cd ~
$ git clone https://github.com/sprites0/hl-testnet-genesis $ git clone https://github.com/sprites0/hl-testnet-genesis
$ git -C hl-testnet-genesis lfs pull
$ zstd --rm -d ~/hl-testnet-genesis/*.zst $ zstd --rm -d ~/hl-testnet-genesis/*.zst
# Now return to where you have cloned this project to continue
$ cd -
# prepare your rust toolchain
$ rustup install 1.82 # (this corresponds with rust version in our Cargo.toml)
$ rustup default 1.82
# Init node # Init node
$ make install $ make install
$ reth init-state --without-evm --chain testnet --header ~/hl-testnet-genesis/21304281.rlp \ $ reth init-state --without-evm --chain testnet --header ~/hl-testnet-genesis/21304281.rlp \

View File

@ -35,6 +35,7 @@ reth-cli-runner.workspace = true
reth-cli-commands.workspace = true reth-cli-commands.workspace = true
reth-cli-util.workspace = true reth-cli-util.workspace = true
reth-consensus-common.workspace = true reth-consensus-common.workspace = true
reth-hlfs.workspace = true
reth-rpc-builder.workspace = true reth-rpc-builder.workspace = true
reth-rpc.workspace = true reth-rpc.workspace = true
reth-rpc-types-compat.workspace = true reth-rpc-types-compat.workspace = true
@ -81,8 +82,9 @@ tracing.workspace = true
serde_json.workspace = true serde_json.workspace = true
# async # async
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] } tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread", "net", "fs"] }
futures.workspace = true futures.workspace = true
futures-util.workspace = true
# time # time
time = { workspace = true } time = { workspace = true }

View File

@ -29,6 +29,7 @@ use tokio::sync::Mutex;
use tracing::{debug, info}; use tracing::{debug, info};
use crate::serialized::{BlockAndReceipts, EvmBlock}; use crate::serialized::{BlockAndReceipts, EvmBlock};
use crate::share_blocks::ShareBlocks;
use crate::spot_meta::erc20_contract_to_spot_token; use crate::spot_meta::erc20_contract_to_spot_token;
/// Poll interval when tailing an *open* hourly file. /// Poll interval when tailing an *open* hourly file.
@ -41,6 +42,7 @@ pub(crate) struct BlockIngest {
pub local_ingest_dir: Option<PathBuf>, pub local_ingest_dir: Option<PathBuf>,
pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block
pub precompiles_cache: PrecompilesCache, pub precompiles_cache: PrecompilesCache,
pub hlfs: Option<ShareBlocks>,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -73,8 +75,23 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan
continue; continue;
} }
let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = let (_block_timestamp, parsed_block) = match serde_json::from_str(&line) {
serde_json::from_str(&line).expect("Failed to parse local block and receipts"); Ok(LocalBlockAndReceipts(_block_timestamp, parsed_block)) => {
(_block_timestamp, parsed_block)
}
Err(_) => {
// Possible scenarios:
let is_last_line = line_idx == lines.len() - 1;
if is_last_line {
// 1. It's not written fully yet - in this case, just wait for the next line
break;
} else {
// 2. hl-node previously terminated while writing the lines
// In this case, try to skip this line
continue;
}
}
};
let height = match &parsed_block.block { let height = match &parsed_block.block {
EvmBlock::Reth115(b) => { EvmBlock::Reth115(b) => {
@ -140,9 +157,9 @@ impl BlockIngest {
if let Some(block) = self.try_collect_local_block(height).await { if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning locally synced block for @ Height [{height}]"); info!("Returning locally synced block for @ Height [{height}]");
return Some(block); return Some(block);
} else {
self.try_collect_s3_block(height)
} }
self.try_collect_s3_block(height)
} }
pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> { pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> {
@ -260,6 +277,7 @@ impl BlockIngest {
let engine_api = node.auth_server_handle().http_client(); let engine_api = node.auth_server_handle().http_client();
let mut evm_map = erc20_contract_to_spot_token(node.chain_spec().chain_id()).await?; let mut evm_map = erc20_contract_to_spot_token(node.chain_spec().chain_id()).await?;
const MINIMUM_TIMESTAMP: u64 = 1739849780;
let current_block_timestamp: u64 = provider let current_block_timestamp: u64 = provider
.block_by_number(head) .block_by_number(head)
.expect("Failed to fetch current block in db") .expect("Failed to fetch current block in db")
@ -267,6 +285,8 @@ impl BlockIngest {
.into_header() .into_header()
.timestamp(); .timestamp();
let current_block_timestamp = current_block_timestamp.max(MINIMUM_TIMESTAMP);
info!("Current height {height}, timestamp {current_block_timestamp}"); info!("Current height {height}, timestamp {current_block_timestamp}");
self.start_local_ingest_loop(height, current_block_timestamp).await; self.start_local_ingest_loop(height, current_block_timestamp).await;

View File

@ -6,6 +6,7 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
mod block_ingest; mod block_ingest;
mod call_forwarder; mod call_forwarder;
mod serialized; mod serialized;
mod share_blocks;
mod spot_meta; mod spot_meta;
mod tx_forwarder; mod tx_forwarder;
@ -18,6 +19,7 @@ use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser; use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_hyperliquid_types::PrecompilesCache; use reth_hyperliquid_types::PrecompilesCache;
use reth_node_ethereum::EthereumNode; use reth_node_ethereum::EthereumNode;
use share_blocks::ShareBlocksArgs;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::info; use tracing::info;
use tx_forwarder::EthForwarderApiServer; use tx_forwarder::EthForwarderApiServer;
@ -40,6 +42,10 @@ struct HyperliquidExtArgs {
/// 3. filters out logs and transactions from subscription. /// 3. filters out logs and transactions from subscription.
#[arg(long, default_value = "false")] #[arg(long, default_value = "false")]
pub hl_node_compliant: bool, pub hl_node_compliant: bool,
/// Enable hlfs to backfill archive blocks
#[command(flatten)]
pub hlfs: ShareBlocksArgs,
} }
fn main() { fn main() {
@ -85,8 +91,24 @@ fn main() {
.launch() .launch()
.await?; .await?;
let ingest = // start HLFS (serve + peer-backed backfill) using the node's network
BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache }; let hlfs = if ext_args.hlfs.share_blocks {
let net = handle.node.network.clone();
Some(
crate::share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net)
.await?,
)
} else {
None
};
let ingest = BlockIngest {
ingest_dir,
local_ingest_dir,
local_blocks_cache,
precompiles_cache,
hlfs,
};
ingest.run(handle.node).await.unwrap(); ingest.run(handle.node).await.unwrap();
handle.node_exit_future.await handle.node_exit_future.await
}, },

View File

@ -0,0 +1,167 @@
use clap::Args;
use reth_hlfs::{Backfiller, Client, PeerRecord, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK};
use reth_network_api::{events::NetworkEvent, FullNetwork};
use std::{
collections::HashSet,
net::{IpAddr, SocketAddr},
path::PathBuf,
sync::Arc,
};
use tokio::{
task::JoinHandle,
time::{sleep, timeout, Duration},
};
use tracing::{debug, info, warn};
// use futures_util::StreamExt;
use futures_util::stream::StreamExt;
#[derive(Args, Clone, Debug)]
pub(crate) struct ShareBlocksArgs {
#[arg(long, default_value_t = false)]
pub share_blocks: bool,
#[arg(long, default_value = "0.0.0.0")]
pub share_blocks_host: String,
#[arg(long, default_value_t = 9595)]
pub share_blocks_port: u16,
#[arg(long, default_value = "evm-blocks")]
pub archive_dir: PathBuf,
}
pub(crate) struct ShareBlocks {
_server: JoinHandle<()>,
_autodetect: JoinHandle<()>,
}
impl ShareBlocks {
pub(crate) async fn start_with_network<Net>(
args: &ShareBlocksArgs,
network: Net,
) -> eyre::Result<Self>
where
Net: FullNetwork + Clone + 'static,
{
let host: IpAddr = args
.share_blocks_host
.parse()
.map_err(|e| eyre::eyre!("invalid --share-blocks-host: {e}"))?;
let bind: SocketAddr = (host, args.share_blocks_port).into();
let srv = Server::new(bind, &args.archive_dir).with_limits(512, 50);
let _server = tokio::spawn(async move {
if let Err(e) = srv.run().await {
warn!(error=%e, "hlfs: server exited");
}
});
let _autodetect =
spawn_autodetect(network, host, args.share_blocks_port, args.archive_dir.clone());
info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)");
Ok(Self { _server, _autodetect })
}
}
fn spawn_autodetect<Net>(
network: Net,
self_ip: IpAddr,
hlfs_port: u16,
archive_dir: PathBuf,
) -> JoinHandle<()>
where
Net: FullNetwork + Clone + 'static,
{
let client = Client::new(&archive_dir, Vec::new()).with_timeout(Duration::from_secs(5));
let backfiller = Arc::new(tokio::sync::Mutex::new(Backfiller::new(client, &archive_dir)));
let good: Arc<tokio::sync::Mutex<HashSet<PeerRecord>>> =
Arc::new(tokio::sync::Mutex::new(HashSet::new()));
tokio::spawn({
warn!("hlfs: backfiller started");
let backfiller = backfiller.clone();
async move {
loop {
let mut bf = backfiller.lock().await;
if bf.client.max_block < bf.max_block_seen {
let block = bf.client.max_block + 1;
let new_height = bf.fetch_if_missing(block).await.expect("new height");
bf.client.max_block = new_height.unwrap();
}
sleep(Duration::from_millis(50)).await;
}
}
});
tokio::spawn({
let backfiller = backfiller.clone();
async move {
let mut events = network.event_listener();
loop {
let mut bf = backfiller.lock().await;
match events.next().await {
Some(NetworkEvent::ActivePeerSession { info, .. }) => {
let ip = info.remote_addr.ip();
if ip.is_unspecified() {
debug!(%ip, "hlfs: skip unspecified");
continue;
}
if ip == self_ip {
debug!(%ip, "hlfs: skip self");
continue;
}
let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port);
let max_block = probe_hlfs(addr).await;
if max_block != 0 {
let mut g = good.lock().await;
if g.insert(PeerRecord { addr, max_block }) {
let v: Vec<_> = g.iter().copied().collect();
bf.set_peers(v.clone());
info!(%addr, %max_block, total=v.len(), "hlfs: peer added");
}
} else {
debug!(%addr, "hlfs: peer has no HLFS");
}
}
Some(_) => {}
None => {
warn!("hlfs: network event stream ended");
break;
}
}
}
}
})
}
pub(crate) async fn probe_hlfs(addr: SocketAddr) -> u64 {
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
let fut = async {
let mut s = TcpStream::connect(addr).await.ok()?;
// send [OP][8 zero bytes]
let mut msg = [0u8; 9];
msg[0] = OP_REQ_MAX_BLOCK;
s.write_all(&msg).await.ok()?;
// read 1-byte opcode
let mut op = [0u8; 1];
s.read_exact(&mut op).await.ok()?;
if op[0] != OP_RES_MAX_BLOCK {
return None;
}
// read 8-byte little-endian block number
let mut blk = [0u8; 8];
s.read_exact(&mut blk).await.ok()?;
Some(u64::from_le_bytes(blk))
};
match timeout(Duration::from_secs(2), fut).await {
Ok(Some(n)) => n,
_ => 0,
}
}

View File

@ -0,0 +1,25 @@
# crates/net/hlfs/Cargo.toml
[package]
name = "reth-hlfs"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Implementation of archive block downloader"
authors = ["@wwwehr"]
[dependencies]
tokio = { workspace = true, features = ["rt-multi-thread","macros","net","time","fs","sync","io-util"] }
bytes.workspace = true
parking_lot.workspace = true
thiserror.workspace = true
tracing.workspace = true
reth-tracing.workspace = true
[dev-dependencies]
rand.workspace = true
tempfile.workspace = true

374
crates/net/hlfs/src/lib.rs Normal file
View File

@ -0,0 +1,374 @@
//! HLFS TCP micro-protocol for historical backfill (single-block, RR per block).
use bytes::{Buf, BufMut, Bytes, BytesMut};
use parking_lot::Mutex;
use reth_tracing::tracing::{debug, info, trace, warn};
use std::{
fs,
hash::{Hash, Hasher},
io,
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use thiserror::Error;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
time::timeout,
};
type Result<T, E = HlfsError> = std::result::Result<T, E>;
pub const OP_REQ_BLOCK: u8 = 0x01;
pub const OP_RES_BLOCK: u8 = 0x02;
pub const OP_REQ_MAX_BLOCK: u8 = 0x03;
pub const OP_RES_MAX_BLOCK: u8 = 0x04;
pub const OP_ERR_TOO_BUSY: u8 = 0x05;
pub const OP_ERR_NOT_FOUND: u8 = 0x06;
#[derive(Error, Debug)]
pub enum HlfsError {
#[error("io: {0}")]
Io(#[from] io::Error),
#[error("proto")]
Proto,
#[error("no peers")]
NoPeers,
#[error("timeout")]
Timeout,
#[error("busy: retry_ms={0}")]
Busy(u32),
#[error("not found")]
NotFound,
}
#[inline]
fn put_u64(b: &mut BytesMut, v: u64) {
b.put_u64_le(v)
}
#[inline]
fn put_u32(b: &mut BytesMut, v: u32) {
b.put_u32_le(v)
}
async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> {
if let Some(parent) = Path::new(path).parent() {
fs::create_dir_all(parent)
} else {
Ok(())
}
}
/// Client: tries each peer once; rotates starting index per call
#[derive(Debug, Copy, Clone)]
pub struct PeerRecord {
pub addr: SocketAddr,
pub max_block: u64,
}
impl PartialEq for PeerRecord {
fn eq(&self, o: &Self) -> bool {
self.addr == o.addr
}
}
impl Eq for PeerRecord {}
impl Hash for PeerRecord {
fn hash<H: Hasher>(&self, s: &mut H) {
self.addr.hash(s);
}
}
#[derive(Clone)]
pub struct Client {
root: PathBuf,
pub peers: Arc<Mutex<Vec<PeerRecord>>>,
timeout: Duration,
pub max_block: u64,
}
impl Client {
pub fn new(root: impl Into<PathBuf>, peers: Vec<PeerRecord>) -> Self {
let root: PathBuf = root.into();
let n = find_max_number_file(&root).unwrap();
debug!(max_block = n, "hlfs: our archive");
Self {
root,
peers: Arc::new(Mutex::new(peers)),
timeout: Duration::from_secs(3),
max_block: n,
}
}
pub fn update_peers(&self, peers: Vec<PeerRecord>) {
*self.peers.lock() = peers;
}
pub fn with_timeout(mut self, d: Duration) -> Self {
self.timeout = d;
self
}
pub async fn wants_block(&self, number: u64, rr_index: usize) -> Result<Vec<u8>, HlfsError> {
let peers = self.peers.lock().clone();
debug!(peer_count = peers.len(), "hlfs: peers");
if peers.is_empty() {
return Err(HlfsError::NoPeers);
}
let mut all = (0..peers.len()).map(|i| (rr_index + i) % peers.len());
let mut last_busy: Option<u32> = None;
while let Some(i) = all.next() {
let addr = peers[i];
trace!(%addr.addr, "hlfs: dialing");
match timeout(self.timeout, TcpStream::connect(addr.addr)).await {
Err(_) => continue,
Ok(Err(_)) => continue,
Ok(Ok(mut sock)) => {
let mut req = BytesMut::with_capacity(1 + 8);
req.put_u8(OP_REQ_BLOCK);
put_u64(&mut req, number);
if let Err(e) = sock.write_all(&req).await {
debug!(%addr.addr, "hlfs: write err: {e}");
continue;
}
let mut op = [0u8; 1];
if let Err(e) = timeout(self.timeout, sock.read_exact(&mut op)).await {
debug!(%addr.addr, "hlfs: read op timeout {e:?}");
continue;
}
let op = op[0];
match op {
OP_RES_BLOCK => {
// DATA
let mut len = [0u8; 4];
sock.read_exact(&mut len).await?;
let len = u32::from_le_bytes(len) as usize;
let mut buf = vec![0u8; len];
sock.read_exact(&mut buf).await?;
return Ok(buf);
}
OP_ERR_TOO_BUSY => {
let mut ms = [0u8; 4];
sock.read_exact(&mut ms).await?;
last_busy = Some(u32::from_le_bytes(ms));
continue;
}
OP_ERR_NOT_FOUND => {
return Err(HlfsError::NotFound);
}
_ => {
continue;
}
}
}
}
}
if let Some(ms) = last_busy {
return Err(HlfsError::Busy(ms));
}
Err(HlfsError::NotFound)
}
}
fn find_max_number_file(root: &Path) -> Result<u64> {
fn parse_num(name: &str) -> Option<u64> {
name.strip_suffix(".rmp.lz4")?.parse::<u64>().ok()
}
fn walk(dir: &Path, best: &mut Option<u64>) -> io::Result<()> {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
let ft = entry.file_type()?;
if ft.is_dir() {
walk(&path, best)?;
} else if ft.is_file() {
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
if let Some(n) = parse_num(name) {
if best.map_or(true, |b| n > b) {
*best = Some(n);
}
}
}
}
}
Ok(())
}
let mut best = Some(0);
let top: PathBuf = fs::read_dir(root)?
.filter_map(|e| e.ok())
.filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
.filter_map(|e| {
let name = e.file_name();
let s = name.to_str()?;
let n: u64 = s.parse().ok()?;
Some((n, e.path()))
})
.max_by_key(|(n, _)| *n)
.map(|(_, p)| p)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no numeric top-level dirs"))?;
walk(&top, &mut best)?;
Ok(best.expect("cannot find block files"))
}
/// Server: serves `{root}/{number}.rlp`.
pub struct Server {
bind: SocketAddr,
root: PathBuf,
max_conns: usize,
inflight: Arc<Mutex<usize>>,
busy_retry_ms: u32,
max_block: u64,
}
impl Server {
pub fn new(bind: SocketAddr, root: impl Into<PathBuf>) -> Self {
let root: PathBuf = root.into();
let n = find_max_number_file(&root).unwrap();
Self {
bind,
root,
max_conns: 512,
inflight: Arc::new(Mutex::new(0)),
busy_retry_ms: 100,
max_block: n,
}
}
pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self {
self.max_conns = max_conns;
self.busy_retry_ms = busy_retry_ms;
self
}
pub async fn run(self) -> Result<(), HlfsError> {
fs::create_dir_all(&self.root).ok();
info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening");
let lst = TcpListener::bind(self.bind).await?;
loop {
let (mut sock, addr) = lst.accept().await?;
if *self.inflight.lock() >= self.max_conns {
let mut b = BytesMut::with_capacity(5);
b.put_u8(OP_ERR_TOO_BUSY);
put_u32(&mut b, self.busy_retry_ms);
let _ = sock.write_all(&b).await;
continue;
}
*self.inflight.lock() += 1;
let root = self.root.clone();
let inflight = self.inflight.clone();
let busy = self.busy_retry_ms;
tokio::spawn(async move {
let _ = handle_conn(&mut sock, &root, busy, addr, self.max_block).await;
*inflight.lock() -= 1;
});
}
}
}
async fn handle_conn(
sock: &mut TcpStream,
root: &Path,
busy_ms: u32,
addr: SocketAddr,
max_block: u64,
) -> Result<(), HlfsError> {
let mut op = [0u8; 1];
sock.read_exact(&mut op).await?;
if op[0] != OP_REQ_BLOCK && op[0] != OP_REQ_MAX_BLOCK {
warn!(%addr, "hlfs: bad op");
return Err(HlfsError::Proto);
}
if op[0] == OP_REQ_MAX_BLOCK {
let mut b = BytesMut::with_capacity(1 + 8);
b.put_u8(OP_RES_MAX_BLOCK);
put_u64(&mut b, max_block);
let _ = sock.write_all(&b).await;
return Ok(());
}
let mut num = [0u8; 8];
sock.read_exact(&mut num).await?;
let number = u64::from_le_bytes(num);
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
let f = (n / 1_000_000) * 1_000_000;
let s = (n / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", root.to_string_lossy());
trace!(%addr, number, %path, "hlfs: req");
if let Err(e) = ensure_parent_dirs(&path).await {
warn!(%addr, %path, "hlfs: mkdirs failed: {e}");
}
match fs::read(&path) {
Ok(data) => {
let mut b = BytesMut::with_capacity(1 + 4 + data.len());
b.put_u8(OP_RES_BLOCK);
put_u32(&mut b, data.len() as u32);
b.extend_from_slice(&data);
let _ = sock.write_all(&b).await;
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
let mut b = BytesMut::with_capacity(1);
b.put_u8(OP_ERR_NOT_FOUND);
let _ = sock.write_all(&b).await;
}
Err(e) => {
warn!(%addr, %path, "hlfs: read error: {e}");
let _ = sock.shutdown().await;
}
}
Ok(())
}
/// Backfiller: ask client per missing block; rotate peers every block.
#[derive(Clone)]
pub struct Backfiller {
pub client: Client,
root: PathBuf,
pub max_block_seen: u64,
}
impl Backfiller {
pub fn new(client: Client, root: impl Into<PathBuf>) -> Self {
Self { client, root: root.into(), max_block_seen: 0 }
}
pub fn set_peers(&mut self, peers: Vec<PeerRecord>) {
self.client.update_peers(peers);
let _peers = self.client.peers.lock().clone();
for p in _peers {
if p.max_block > self.max_block_seen {
self.max_block_seen = p.max_block
}
}
}
pub async fn fetch_if_missing(&mut self, number: u64) -> Result<Option<u64>, HlfsError> {
let rr_index = number as usize;
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
let f = (n / 1_000_000) * 1_000_000;
let s = (n / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy());
if Path::new(&path).exists() {
debug!(block = number, "hlfs: already have");
return Ok(None);
}
match self.client.wants_block(number, rr_index).await {
Err(HlfsError::NotFound) => Ok(None),
Err(HlfsError::Busy(ms)) => {
tokio::time::sleep(Duration::from_millis(ms as u64)).await;
Ok(None)
}
Err(e) => Err(e),
Ok(data) => {
if let Err(e) = ensure_parent_dirs(&path).await {
warn!(%path, "hlfs: mkdirs failed: {e}");
}
if let Err(e) = fs::write(&path, &data) {
warn!(%path, "hlfs: write failed: {e}");
return Ok(None);
}
debug!(block = number, "hlfs: got block");
Ok(Some(number))
}
}
}
}

View File

@ -0,0 +1,57 @@
# 🚀 S3Sync Runner
Fastest way to pull down evm block files from s3
This script automates syncing **massive S3 object stores** in a **safe, resumable, and time-tracked way**. The traditional `s3 sync` is just wayy to slow.
## Features
- ✅ Auto-installs [nidor1998/s3sync](https://github.com/nidor1998/s3sync) (latest release) into `~/.local/bin`
- ✅ Sequential per-prefix syncs (e.g., `21000000/`, `22000000/`, …)
- ✅ Per-prefix timing: `22000000 took 12 minutes!`
- ✅ Total runtime summary at the end
- ✅ Designed for **tiny files at scale** (EVM block archives)
- ✅ Zero-config bootstrap — just run the script
## Quick Start
```bash
chmod +x s3sync-runner.sh
./s3sync-runner.sh
```
> Skipping to relevant block section
```bash
./s3sync-runner.sh --start-at 30000000
```
The script will:
* Install or update s3sync into ~/.local/bin
* Discover top-level prefixes in your S3 bucket
* Sync them one at a time, printing elapsed minutes
## Configuration
Edit the top of s3sync-runner.sh if needed:
```bash
BUCKET="hl-testnet-evm-blocks" # could be hl-mainnet-evm-blocks
REGION="ap-northeast-1" # hardcoded bucket region
DEST="$HOME/evm-blocks-testnet" # local target directory (this is what nanoreth will look at)
WORKERS=512 # worker threads per sync (lotsa workers needs lotsa RAM)
```
## Example Output
```bash
[2025-08-20 20:01:02] START 21000000
[2025-08-20 20:13:15] 21000000 took 12 minutes!
[2025-08-20 20:13:15] START 22000000
[2025-08-20 20:26:40] 22000000 took 13 minutes!
[2025-08-20 20:26:40] ALL DONE in 25 minutes.
```
## Hackathon Context
This runner was built as part of the Hyperliquid DEX Hackathon to accelerate:
* ⛓️ Blockchain archive node ingestion
* 📂 EVM block dataset replication
* 🧩 DEX ecosystem data pipelines

View File

@ -0,0 +1,163 @@
#!/usr/bin/env bash
# @author Niko Wehr (wwwehr)
set -euo pipefail
# ---- config ----
BUCKET="hl-testnet-evm-blocks"
REGION="ap-northeast-1"
DEST="${HOME}/evm-blocks-testnet"
WORKERS=512
S3SYNC="${HOME}/.local/bin/s3sync"
START_AT="" # default: run all
CHUNK_SIZE=1000000 # each prefix represents this many blocks
# ----------------
# parse args
while [[ $# -gt 0 ]]; do
case "$1" in
--start-at)
START_AT="$2"
shift 2
;;
*)
echo "Unknown arg: $1" >&2
exit 1
;;
esac
done
now(){ date +"%F %T"; }
log(){ printf '[%s] %s\n' "$(now)" "$*"; }
die(){ log "ERROR: $*"; exit 1; }
trap 'log "Signal received, exiting."; exit 2' INT TERM
need(){ command -v "$1" >/dev/null 2>&1 || die "missing dependency: $1"; }
install_s3sync_latest() {
need curl
GHAPI="https://api.github.com/repos/nidor1998/s3sync/releases/latest"
os="$(uname | tr '[:upper:]' '[:lower:]')"
arch_raw="$(uname -m)"
case "$arch_raw" in
x86_64|amd64) arch_tag="x86_64" ;;
aarch64|arm64) arch_tag="aarch64" ;;
*) die "unsupported arch: ${arch_raw}" ;;
esac
# Map OS → asset prefix
case "$os" in
linux) prefix="s3sync-linux-glibc2.28-${arch_tag}" ;;
darwin) prefix="s3sync-macos-${arch_tag}" ;;
msys*|mingw*|cygwin*|windows) prefix="s3sync-windows-${arch_tag}" ;;
*) die "unsupported OS: ${os}" ;;
esac
# Fetch latest release JSON (unauthenticated)
json="$(curl -fsSL "$GHAPI")" || die "failed to query GitHub API"
# Pick URLs for tarball and checksum
tar_url="$(printf '%s' "$json" | awk -F'"' '/browser_download_url/ {print $4}' | grep -F "${prefix}.tar.gz" | head -n1)"
sum_url="$(printf '%s' "$json" | awk -F'"' '/browser_download_url/ {print $4}' | grep -F "${prefix}.sha256" | head -n1)"
[[ -n "$tar_url" ]] || die "could not find asset for prefix: ${prefix}"
[[ -n "$sum_url" ]] || die "could not find checksum for prefix: ${prefix}"
mkdir -p "${HOME}/.local/bin"
tmpdir="$(mktemp -d)"; trap 'rm -rf "$tmpdir"' EXIT
tar_path="${tmpdir}/s3sync.tar.gz"
sum_path="${tmpdir}/s3sync.sha256"
log "Downloading: $tar_url"
curl -fL --retry 5 --retry-delay 1 -o "$tar_path" "$tar_url"
curl -fL --retry 5 --retry-delay 1 -o "$sum_path" "$sum_url"
# Verify checksum
want_sum="$(cut -d: -f2 <<<"$(sed -n 's/^sha256:\(.*\)$/\1/p' "$sum_path" | tr -d '[:space:]')" || true)"
[[ -n "$want_sum" ]] || want_sum="$(awk '{print $1}' "$sum_path" || true)"
[[ -n "$want_sum" ]] || die "could not parse checksum file"
got_sum="$(sha256sum "$tar_path" | awk '{print $1}')"
[[ "$want_sum" == "$got_sum" ]] || die "sha256 mismatch: want $want_sum got $got_sum"
# Extract and install
tar -xzf "$tar_path" -C "$tmpdir"
binpath="$(find "$tmpdir" -maxdepth 2 -type f -name 's3sync' | head -n1)"
[[ -x "$binpath" ]] || die "s3sync binary not found in archive"
chmod +x "$binpath"
mv -f "$binpath" "$S3SYNC"
log "s3sync installed at $S3SYNC"
}
# --- deps & install/update ---
need aws
install_s3sync_latest
[[ ":$PATH:" == *":$HOME/.local/bin:"* ]] || export PATH="$HOME/.local/bin:$PATH"
mkdir -p "$DEST"
# list prefixes
log "Listing top-level prefixes in s3://${BUCKET}/"
mapfile -t PREFIXES < <(
aws s3 ls "s3://${BUCKET}/" --region "$REGION" --request-payer requester \
| awk '/^ *PRE /{print $2}' | sed 's:/$::' | grep -E '^[0-9]+$' || true
)
((${#PREFIXES[@]})) || die "No prefixes found."
# sort numerically to make order predictable
IFS=$'\n' read -r -d '' -a PREFIXES < <(printf '%s\n' "${PREFIXES[@]}" | sort -n && printf '\0')
# compute the effective start prefix:
# - if START_AT is set, floor it to the containing chunk boundary
effective_start=""
if [[ -n "$START_AT" ]]; then
# numeric, base-10 safe
start_num=$((10#$START_AT))
chunk=$((10#$CHUNK_SIZE))
effective_start=$(( (start_num / chunk) * chunk ))
fi
# mark initial status using numeric comparisons (no ordering assumptions)
declare -A RESULTS
for p in "${PREFIXES[@]}"; do
if [[ -n "$effective_start" ]] && (( 10#$p < 10#$effective_start )); then
RESULTS["$p"]="-- SKIPPED"
else
RESULTS["$p"]="-- TODO"
fi
done
total_start=$(date +%s)
for p in "${PREFIXES[@]}"; do
if [[ "${RESULTS[$p]}" == "-- SKIPPED" ]]; then
continue
fi
src="s3://${BUCKET}/${p}/"
dst="${DEST}/${p}/"
mkdir -p "$dst"
log "START ${p}"
start=$(date +%s)
"$S3SYNC" \
--source-request-payer \
--source-region "$REGION" \
--worker-size "$WORKERS" \
--max-parallel-uploads "$WORKERS" \
"$src" "$dst"
end=$(date +%s)
mins=$(( (end - start + 59) / 60 ))
RESULTS["$p"]="$mins minutes"
# Print status table so far
echo "---- Status ----"
for k in "${PREFIXES[@]}"; do
echo "$k ${RESULTS[$k]}"
done
echo "----------------"
done
total_end=$(date +%s)
total_mins=$(( (total_end - total_start + 59) / 60 ))
echo "ALL DONE in $total_mins minutes."