1 Commits

Author SHA1 Message Date
6ea7a8c261 Merge 29c8d4fa39 into 9f952ac2ed 2025-08-21 17:07:30 -07:00
11 changed files with 12 additions and 850 deletions

16
Cargo.lock generated
View File

@ -6659,7 +6659,6 @@ dependencies = [
"clap",
"eyre",
"futures",
"futures-util",
"jsonrpsee",
"jsonrpsee-core",
"lz4_flex",
@ -6688,7 +6687,6 @@ dependencies = [
"reth-execution-types",
"reth-exex",
"reth-fs-util",
"reth-hlfs",
"reth-hyperliquid-types",
"reth-network",
"reth-network-api",
@ -7968,20 +7966,6 @@ dependencies = [
"thiserror 2.0.11",
]
[[package]]
name = "reth-hlfs"
version = "1.2.0"
dependencies = [
"bytes",
"parking_lot",
"rand 0.8.5",
"reth-tracing",
"tempfile",
"thiserror 2.0.11",
"tokio",
"tracing",
]
[[package]]
name = "reth-hyperliquid-types"
version = "1.2.0"

View File

@ -54,7 +54,6 @@ members = [
"crates/net/ecies/",
"crates/net/eth-wire-types",
"crates/net/eth-wire/",
"crates/net/hlfs/",
"crates/net/nat/",
"crates/net/network-api/",
"crates/net/network-types/",
@ -357,7 +356,6 @@ reth-exex = { path = "crates/exex/exex" }
reth-exex-test-utils = { path = "crates/exex/test-utils" }
reth-exex-types = { path = "crates/exex/types" }
reth-fs-util = { path = "crates/fs-util" }
reth-hlfs = { path = "crates/net/hlfs" }
reth-invalid-block-hooks = { path = "crates/engine/invalid-block-hooks" }
reth-ipc = { path = "crates/rpc/ipc" }
reth-libmdbx = { path = "crates/storage/libmdbx-rs" }

View File

@ -2,8 +2,6 @@
Hyperliquid archive node based on [reth](https://github.com/paradigmxyz/reth).
Got questions? Drop by the [Hyperliquid Discord](https://discord.gg/hyperliquid) #node-operators channel.
## ⚠️ IMPORTANT: System Transactions Appear as Pseudo Transactions
Deposit transactions from `0x222..22` to user addresses are intentionally recorded as pseudo transactions.
@ -20,33 +18,27 @@ Building NanoReth from source requires Rust and Cargo to be installed:
The current state of the block files comprise of millions of small objects totalling over 20 Gigs and counting. The "requester pays" option means you will need a configured aws environment, and you could incur charges which varies according to destination (ec2 versus local).
1) this will backfill the existing blocks from Hyperliquid's EVM S3 bucket:
1) this will backfill the existing blocks from HyperLiquid's EVM S3 bucket:
> use our rust based s3 tool wrapper to optimize your download experience - [read more](./etc/evm-block-sync/README.md)
```shell
chmod +x ./etc/evm-block-sync/s3sync-runner.sh
./etc/evm-block-sync/s3sync-runner.sh
```
> or use the conventional [aws cli](https://aws.amazon.com/cli/)
```shell
aws s3 sync s3://hl-mainnet-evm-blocks/ ~/evm-blocks \
--request-payer requester \
--exact-timestamps \
--size-only \
--page-size 1000 \
--only-show-errors
```
> consider using this [rust based s3 tool wrapper](https://github.com/wwwehr/hl-evm-block-sync) alternative to optimize your download experience
2) `$ make install` - this will install the NanoReth binary.
1) `$ make install` - this will install the NanoReth binary.
2) Start NanoReth which will begin syncing using the blocks in `~/evm-blocks`:
3) Start NanoReth which will begin syncing using the blocks in `~/evm-blocks`:
```sh
$ reth node --http --http.addr 0.0.0.0 --http.api eth,ots,net,web3 --ws --ws.addr 0.0.0.0 --ws.origins '*' --ws.api eth,ots,net,web3 --ingest-dir ~/evm-blocks --ws.port 8545
```
3) Once the node logs stops making progress this means it's caught up with the existing blocks.
4) Once the node logs stops making progress this means it's caught up with the existing blocks.
Stop the NanoReth process and then start Goofys: `$ goofys --region=ap-northeast-1 --requester-pays hl-mainnet-evm-blocks evm-blocks`
@ -94,7 +86,7 @@ Testnet is supported since block 21304281.
# Get testnet genesis at block 21304281
$ cd ~
$ git clone https://github.com/sprites0/hl-testnet-genesis
$ git -C hl-testnet-genesis lfs pull
$ git lfs pull
$ zstd --rm -d ~/hl-testnet-genesis/*.zst
# Now return to where you have cloned this project to continue

View File

@ -35,7 +35,6 @@ reth-cli-runner.workspace = true
reth-cli-commands.workspace = true
reth-cli-util.workspace = true
reth-consensus-common.workspace = true
reth-hlfs.workspace = true
reth-rpc-builder.workspace = true
reth-rpc.workspace = true
reth-rpc-types-compat.workspace = true
@ -82,9 +81,8 @@ tracing.workspace = true
serde_json.workspace = true
# async
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread", "net", "fs"] }
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
futures.workspace = true
futures-util.workspace = true
# time
time = { workspace = true }

View File

@ -29,7 +29,6 @@ use tokio::sync::Mutex;
use tracing::{debug, info};
use crate::serialized::{BlockAndReceipts, EvmBlock};
use crate::share_blocks::ShareBlocks;
use crate::spot_meta::erc20_contract_to_spot_token;
/// Poll interval when tailing an *open* hourly file.
@ -42,7 +41,6 @@ pub(crate) struct BlockIngest {
pub local_ingest_dir: Option<PathBuf>,
pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block
pub precompiles_cache: PrecompilesCache,
pub hlfs: Option<ShareBlocks>,
}
#[derive(Deserialize)]
@ -157,9 +155,9 @@ impl BlockIngest {
if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning locally synced block for @ Height [{height}]");
return Some(block);
} else {
self.try_collect_s3_block(height)
}
self.try_collect_s3_block(height)
}
pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> {

View File

@ -6,7 +6,6 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
mod block_ingest;
mod call_forwarder;
mod serialized;
mod share_blocks;
mod spot_meta;
mod tx_forwarder;
@ -19,7 +18,6 @@ use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_hyperliquid_types::PrecompilesCache;
use reth_node_ethereum::EthereumNode;
use share_blocks::ShareBlocksArgs;
use tokio::sync::Mutex;
use tracing::info;
use tx_forwarder::EthForwarderApiServer;
@ -42,10 +40,6 @@ struct HyperliquidExtArgs {
/// 3. filters out logs and transactions from subscription.
#[arg(long, default_value = "false")]
pub hl_node_compliant: bool,
/// Enable hlfs to backfill archive blocks
#[command(flatten)]
pub hlfs: ShareBlocksArgs,
}
fn main() {
@ -91,24 +85,8 @@ fn main() {
.launch()
.await?;
// start HLFS (serve + peer-backed backfill) using the node's network
let hlfs = if ext_args.hlfs.share_blocks {
let net = handle.node.network.clone();
Some(
crate::share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net)
.await?,
)
} else {
None
};
let ingest = BlockIngest {
ingest_dir,
local_ingest_dir,
local_blocks_cache,
precompiles_cache,
hlfs,
};
let ingest =
BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache };
ingest.run(handle.node).await.unwrap();
handle.node_exit_future.await
},

View File

@ -1,167 +0,0 @@
use clap::Args;
use reth_hlfs::{Backfiller, Client, PeerRecord, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK};
use reth_network_api::{events::NetworkEvent, FullNetwork};
use std::{
collections::HashSet,
net::{IpAddr, SocketAddr},
path::PathBuf,
sync::Arc,
};
use tokio::{
task::JoinHandle,
time::{sleep, timeout, Duration},
};
use tracing::{debug, info, warn};
// use futures_util::StreamExt;
use futures_util::stream::StreamExt;
#[derive(Args, Clone, Debug)]
pub(crate) struct ShareBlocksArgs {
#[arg(long, default_value_t = false)]
pub share_blocks: bool,
#[arg(long, default_value = "0.0.0.0")]
pub share_blocks_host: String,
#[arg(long, default_value_t = 9595)]
pub share_blocks_port: u16,
#[arg(long, default_value = "evm-blocks")]
pub archive_dir: PathBuf,
}
pub(crate) struct ShareBlocks {
_server: JoinHandle<()>,
_autodetect: JoinHandle<()>,
}
impl ShareBlocks {
pub(crate) async fn start_with_network<Net>(
args: &ShareBlocksArgs,
network: Net,
) -> eyre::Result<Self>
where
Net: FullNetwork + Clone + 'static,
{
let host: IpAddr = args
.share_blocks_host
.parse()
.map_err(|e| eyre::eyre!("invalid --share-blocks-host: {e}"))?;
let bind: SocketAddr = (host, args.share_blocks_port).into();
let srv = Server::new(bind, &args.archive_dir).with_limits(512, 50);
let _server = tokio::spawn(async move {
if let Err(e) = srv.run().await {
warn!(error=%e, "hlfs: server exited");
}
});
let _autodetect =
spawn_autodetect(network, host, args.share_blocks_port, args.archive_dir.clone());
info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)");
Ok(Self { _server, _autodetect })
}
}
fn spawn_autodetect<Net>(
network: Net,
self_ip: IpAddr,
hlfs_port: u16,
archive_dir: PathBuf,
) -> JoinHandle<()>
where
Net: FullNetwork + Clone + 'static,
{
let client = Client::new(&archive_dir, Vec::new()).with_timeout(Duration::from_secs(5));
let backfiller = Arc::new(tokio::sync::Mutex::new(Backfiller::new(client, &archive_dir)));
let good: Arc<tokio::sync::Mutex<HashSet<PeerRecord>>> =
Arc::new(tokio::sync::Mutex::new(HashSet::new()));
tokio::spawn({
warn!("hlfs: backfiller started");
let backfiller = backfiller.clone();
async move {
loop {
let mut bf = backfiller.lock().await;
if bf.client.max_block < bf.max_block_seen {
let block = bf.client.max_block + 1;
let new_height = bf.fetch_if_missing(block).await.expect("new height");
bf.client.max_block = new_height.unwrap();
}
sleep(Duration::from_millis(50)).await;
}
}
});
tokio::spawn({
let backfiller = backfiller.clone();
async move {
let mut events = network.event_listener();
loop {
let mut bf = backfiller.lock().await;
match events.next().await {
Some(NetworkEvent::ActivePeerSession { info, .. }) => {
let ip = info.remote_addr.ip();
if ip.is_unspecified() {
debug!(%ip, "hlfs: skip unspecified");
continue;
}
if ip == self_ip {
debug!(%ip, "hlfs: skip self");
continue;
}
let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port);
let max_block = probe_hlfs(addr).await;
if max_block != 0 {
let mut g = good.lock().await;
if g.insert(PeerRecord { addr, max_block }) {
let v: Vec<_> = g.iter().copied().collect();
bf.set_peers(v.clone());
info!(%addr, %max_block, total=v.len(), "hlfs: peer added");
}
} else {
debug!(%addr, "hlfs: peer has no HLFS");
}
}
Some(_) => {}
None => {
warn!("hlfs: network event stream ended");
break;
}
}
}
}
})
}
pub(crate) async fn probe_hlfs(addr: SocketAddr) -> u64 {
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
let fut = async {
let mut s = TcpStream::connect(addr).await.ok()?;
// send [OP][8 zero bytes]
let mut msg = [0u8; 9];
msg[0] = OP_REQ_MAX_BLOCK;
s.write_all(&msg).await.ok()?;
// read 1-byte opcode
let mut op = [0u8; 1];
s.read_exact(&mut op).await.ok()?;
if op[0] != OP_RES_MAX_BLOCK {
return None;
}
// read 8-byte little-endian block number
let mut blk = [0u8; 8];
s.read_exact(&mut blk).await.ok()?;
Some(u64::from_le_bytes(blk))
};
match timeout(Duration::from_secs(2), fut).await {
Ok(Some(n)) => n,
_ => 0,
}
}

View File

@ -1,25 +0,0 @@
# crates/net/hlfs/Cargo.toml
[package]
name = "reth-hlfs"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Implementation of archive block downloader"
authors = ["@wwwehr"]
[dependencies]
tokio = { workspace = true, features = ["rt-multi-thread","macros","net","time","fs","sync","io-util"] }
bytes.workspace = true
parking_lot.workspace = true
thiserror.workspace = true
tracing.workspace = true
reth-tracing.workspace = true
[dev-dependencies]
rand.workspace = true
tempfile.workspace = true

View File

@ -1,374 +0,0 @@
//! HLFS TCP micro-protocol for historical backfill (single-block, RR per block).
use bytes::{Buf, BufMut, Bytes, BytesMut};
use parking_lot::Mutex;
use reth_tracing::tracing::{debug, info, trace, warn};
use std::{
fs,
hash::{Hash, Hasher},
io,
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use thiserror::Error;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
time::timeout,
};
type Result<T, E = HlfsError> = std::result::Result<T, E>;
pub const OP_REQ_BLOCK: u8 = 0x01;
pub const OP_RES_BLOCK: u8 = 0x02;
pub const OP_REQ_MAX_BLOCK: u8 = 0x03;
pub const OP_RES_MAX_BLOCK: u8 = 0x04;
pub const OP_ERR_TOO_BUSY: u8 = 0x05;
pub const OP_ERR_NOT_FOUND: u8 = 0x06;
#[derive(Error, Debug)]
pub enum HlfsError {
#[error("io: {0}")]
Io(#[from] io::Error),
#[error("proto")]
Proto,
#[error("no peers")]
NoPeers,
#[error("timeout")]
Timeout,
#[error("busy: retry_ms={0}")]
Busy(u32),
#[error("not found")]
NotFound,
}
#[inline]
fn put_u64(b: &mut BytesMut, v: u64) {
b.put_u64_le(v)
}
#[inline]
fn put_u32(b: &mut BytesMut, v: u32) {
b.put_u32_le(v)
}
async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> {
if let Some(parent) = Path::new(path).parent() {
fs::create_dir_all(parent)
} else {
Ok(())
}
}
/// Client: tries each peer once; rotates starting index per call
#[derive(Debug, Copy, Clone)]
pub struct PeerRecord {
pub addr: SocketAddr,
pub max_block: u64,
}
impl PartialEq for PeerRecord {
fn eq(&self, o: &Self) -> bool {
self.addr == o.addr
}
}
impl Eq for PeerRecord {}
impl Hash for PeerRecord {
fn hash<H: Hasher>(&self, s: &mut H) {
self.addr.hash(s);
}
}
#[derive(Clone)]
pub struct Client {
root: PathBuf,
pub peers: Arc<Mutex<Vec<PeerRecord>>>,
timeout: Duration,
pub max_block: u64,
}
impl Client {
pub fn new(root: impl Into<PathBuf>, peers: Vec<PeerRecord>) -> Self {
let root: PathBuf = root.into();
let n = find_max_number_file(&root).unwrap();
debug!(max_block = n, "hlfs: our archive");
Self {
root,
peers: Arc::new(Mutex::new(peers)),
timeout: Duration::from_secs(3),
max_block: n,
}
}
pub fn update_peers(&self, peers: Vec<PeerRecord>) {
*self.peers.lock() = peers;
}
pub fn with_timeout(mut self, d: Duration) -> Self {
self.timeout = d;
self
}
pub async fn wants_block(&self, number: u64, rr_index: usize) -> Result<Vec<u8>, HlfsError> {
let peers = self.peers.lock().clone();
debug!(peer_count = peers.len(), "hlfs: peers");
if peers.is_empty() {
return Err(HlfsError::NoPeers);
}
let mut all = (0..peers.len()).map(|i| (rr_index + i) % peers.len());
let mut last_busy: Option<u32> = None;
while let Some(i) = all.next() {
let addr = peers[i];
trace!(%addr.addr, "hlfs: dialing");
match timeout(self.timeout, TcpStream::connect(addr.addr)).await {
Err(_) => continue,
Ok(Err(_)) => continue,
Ok(Ok(mut sock)) => {
let mut req = BytesMut::with_capacity(1 + 8);
req.put_u8(OP_REQ_BLOCK);
put_u64(&mut req, number);
if let Err(e) = sock.write_all(&req).await {
debug!(%addr.addr, "hlfs: write err: {e}");
continue;
}
let mut op = [0u8; 1];
if let Err(e) = timeout(self.timeout, sock.read_exact(&mut op)).await {
debug!(%addr.addr, "hlfs: read op timeout {e:?}");
continue;
}
let op = op[0];
match op {
OP_RES_BLOCK => {
// DATA
let mut len = [0u8; 4];
sock.read_exact(&mut len).await?;
let len = u32::from_le_bytes(len) as usize;
let mut buf = vec![0u8; len];
sock.read_exact(&mut buf).await?;
return Ok(buf);
}
OP_ERR_TOO_BUSY => {
let mut ms = [0u8; 4];
sock.read_exact(&mut ms).await?;
last_busy = Some(u32::from_le_bytes(ms));
continue;
}
OP_ERR_NOT_FOUND => {
return Err(HlfsError::NotFound);
}
_ => {
continue;
}
}
}
}
}
if let Some(ms) = last_busy {
return Err(HlfsError::Busy(ms));
}
Err(HlfsError::NotFound)
}
}
fn find_max_number_file(root: &Path) -> Result<u64> {
fn parse_num(name: &str) -> Option<u64> {
name.strip_suffix(".rmp.lz4")?.parse::<u64>().ok()
}
fn walk(dir: &Path, best: &mut Option<u64>) -> io::Result<()> {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
let ft = entry.file_type()?;
if ft.is_dir() {
walk(&path, best)?;
} else if ft.is_file() {
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
if let Some(n) = parse_num(name) {
if best.map_or(true, |b| n > b) {
*best = Some(n);
}
}
}
}
}
Ok(())
}
let mut best = Some(0);
let top: PathBuf = fs::read_dir(root)?
.filter_map(|e| e.ok())
.filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
.filter_map(|e| {
let name = e.file_name();
let s = name.to_str()?;
let n: u64 = s.parse().ok()?;
Some((n, e.path()))
})
.max_by_key(|(n, _)| *n)
.map(|(_, p)| p)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no numeric top-level dirs"))?;
walk(&top, &mut best)?;
Ok(best.expect("cannot find block files"))
}
/// Server: serves `{root}/{number}.rlp`.
pub struct Server {
bind: SocketAddr,
root: PathBuf,
max_conns: usize,
inflight: Arc<Mutex<usize>>,
busy_retry_ms: u32,
max_block: u64,
}
impl Server {
pub fn new(bind: SocketAddr, root: impl Into<PathBuf>) -> Self {
let root: PathBuf = root.into();
let n = find_max_number_file(&root).unwrap();
Self {
bind,
root,
max_conns: 512,
inflight: Arc::new(Mutex::new(0)),
busy_retry_ms: 100,
max_block: n,
}
}
pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self {
self.max_conns = max_conns;
self.busy_retry_ms = busy_retry_ms;
self
}
pub async fn run(self) -> Result<(), HlfsError> {
fs::create_dir_all(&self.root).ok();
info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening");
let lst = TcpListener::bind(self.bind).await?;
loop {
let (mut sock, addr) = lst.accept().await?;
if *self.inflight.lock() >= self.max_conns {
let mut b = BytesMut::with_capacity(5);
b.put_u8(OP_ERR_TOO_BUSY);
put_u32(&mut b, self.busy_retry_ms);
let _ = sock.write_all(&b).await;
continue;
}
*self.inflight.lock() += 1;
let root = self.root.clone();
let inflight = self.inflight.clone();
let busy = self.busy_retry_ms;
tokio::spawn(async move {
let _ = handle_conn(&mut sock, &root, busy, addr, self.max_block).await;
*inflight.lock() -= 1;
});
}
}
}
async fn handle_conn(
sock: &mut TcpStream,
root: &Path,
busy_ms: u32,
addr: SocketAddr,
max_block: u64,
) -> Result<(), HlfsError> {
let mut op = [0u8; 1];
sock.read_exact(&mut op).await?;
if op[0] != OP_REQ_BLOCK && op[0] != OP_REQ_MAX_BLOCK {
warn!(%addr, "hlfs: bad op");
return Err(HlfsError::Proto);
}
if op[0] == OP_REQ_MAX_BLOCK {
let mut b = BytesMut::with_capacity(1 + 8);
b.put_u8(OP_RES_MAX_BLOCK);
put_u64(&mut b, max_block);
let _ = sock.write_all(&b).await;
return Ok(());
}
let mut num = [0u8; 8];
sock.read_exact(&mut num).await?;
let number = u64::from_le_bytes(num);
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
let f = (n / 1_000_000) * 1_000_000;
let s = (n / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", root.to_string_lossy());
trace!(%addr, number, %path, "hlfs: req");
if let Err(e) = ensure_parent_dirs(&path).await {
warn!(%addr, %path, "hlfs: mkdirs failed: {e}");
}
match fs::read(&path) {
Ok(data) => {
let mut b = BytesMut::with_capacity(1 + 4 + data.len());
b.put_u8(OP_RES_BLOCK);
put_u32(&mut b, data.len() as u32);
b.extend_from_slice(&data);
let _ = sock.write_all(&b).await;
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
let mut b = BytesMut::with_capacity(1);
b.put_u8(OP_ERR_NOT_FOUND);
let _ = sock.write_all(&b).await;
}
Err(e) => {
warn!(%addr, %path, "hlfs: read error: {e}");
let _ = sock.shutdown().await;
}
}
Ok(())
}
/// Backfiller: ask client per missing block; rotate peers every block.
#[derive(Clone)]
pub struct Backfiller {
pub client: Client,
root: PathBuf,
pub max_block_seen: u64,
}
impl Backfiller {
pub fn new(client: Client, root: impl Into<PathBuf>) -> Self {
Self { client, root: root.into(), max_block_seen: 0 }
}
pub fn set_peers(&mut self, peers: Vec<PeerRecord>) {
self.client.update_peers(peers);
let _peers = self.client.peers.lock().clone();
for p in _peers {
if p.max_block > self.max_block_seen {
self.max_block_seen = p.max_block
}
}
}
pub async fn fetch_if_missing(&mut self, number: u64) -> Result<Option<u64>, HlfsError> {
let rr_index = number as usize;
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
let f = (n / 1_000_000) * 1_000_000;
let s = (n / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy());
if Path::new(&path).exists() {
debug!(block = number, "hlfs: already have");
return Ok(None);
}
match self.client.wants_block(number, rr_index).await {
Err(HlfsError::NotFound) => Ok(None),
Err(HlfsError::Busy(ms)) => {
tokio::time::sleep(Duration::from_millis(ms as u64)).await;
Ok(None)
}
Err(e) => Err(e),
Ok(data) => {
if let Err(e) = ensure_parent_dirs(&path).await {
warn!(%path, "hlfs: mkdirs failed: {e}");
}
if let Err(e) = fs::write(&path, &data) {
warn!(%path, "hlfs: write failed: {e}");
return Ok(None);
}
debug!(block = number, "hlfs: got block");
Ok(Some(number))
}
}
}
}

View File

@ -1,57 +0,0 @@
# 🚀 S3Sync Runner
Fastest way to pull down evm block files from s3
This script automates syncing **massive S3 object stores** in a **safe, resumable, and time-tracked way**. The traditional `s3 sync` is just wayy to slow.
## Features
- ✅ Auto-installs [nidor1998/s3sync](https://github.com/nidor1998/s3sync) (latest release) into `~/.local/bin`
- ✅ Sequential per-prefix syncs (e.g., `21000000/`, `22000000/`, …)
- ✅ Per-prefix timing: `22000000 took 12 minutes!`
- ✅ Total runtime summary at the end
- ✅ Designed for **tiny files at scale** (EVM block archives)
- ✅ Zero-config bootstrap — just run the script
## Quick Start
```bash
chmod +x s3sync-runner.sh
./s3sync-runner.sh
```
> Skipping to relevant block section
```bash
./s3sync-runner.sh --start-at 30000000
```
The script will:
* Install or update s3sync into ~/.local/bin
* Discover top-level prefixes in your S3 bucket
* Sync them one at a time, printing elapsed minutes
## Configuration
Edit the top of s3sync-runner.sh if needed:
```bash
BUCKET="hl-testnet-evm-blocks" # could be hl-mainnet-evm-blocks
REGION="ap-northeast-1" # hardcoded bucket region
DEST="$HOME/evm-blocks-testnet" # local target directory (this is what nanoreth will look at)
WORKERS=512 # worker threads per sync (lotsa workers needs lotsa RAM)
```
## Example Output
```bash
[2025-08-20 20:01:02] START 21000000
[2025-08-20 20:13:15] 21000000 took 12 minutes!
[2025-08-20 20:13:15] START 22000000
[2025-08-20 20:26:40] 22000000 took 13 minutes!
[2025-08-20 20:26:40] ALL DONE in 25 minutes.
```
## Hackathon Context
This runner was built as part of the Hyperliquid DEX Hackathon to accelerate:
* ⛓️ Blockchain archive node ingestion
* 📂 EVM block dataset replication
* 🧩 DEX ecosystem data pipelines

View File

@ -1,163 +0,0 @@
#!/usr/bin/env bash
# @author Niko Wehr (wwwehr)
set -euo pipefail
# ---- config ----
BUCKET="hl-testnet-evm-blocks"
REGION="ap-northeast-1"
DEST="${HOME}/evm-blocks-testnet"
WORKERS=512
S3SYNC="${HOME}/.local/bin/s3sync"
START_AT="" # default: run all
CHUNK_SIZE=1000000 # each prefix represents this many blocks
# ----------------
# parse args
while [[ $# -gt 0 ]]; do
case "$1" in
--start-at)
START_AT="$2"
shift 2
;;
*)
echo "Unknown arg: $1" >&2
exit 1
;;
esac
done
now(){ date +"%F %T"; }
log(){ printf '[%s] %s\n' "$(now)" "$*"; }
die(){ log "ERROR: $*"; exit 1; }
trap 'log "Signal received, exiting."; exit 2' INT TERM
need(){ command -v "$1" >/dev/null 2>&1 || die "missing dependency: $1"; }
install_s3sync_latest() {
need curl
GHAPI="https://api.github.com/repos/nidor1998/s3sync/releases/latest"
os="$(uname | tr '[:upper:]' '[:lower:]')"
arch_raw="$(uname -m)"
case "$arch_raw" in
x86_64|amd64) arch_tag="x86_64" ;;
aarch64|arm64) arch_tag="aarch64" ;;
*) die "unsupported arch: ${arch_raw}" ;;
esac
# Map OS → asset prefix
case "$os" in
linux) prefix="s3sync-linux-glibc2.28-${arch_tag}" ;;
darwin) prefix="s3sync-macos-${arch_tag}" ;;
msys*|mingw*|cygwin*|windows) prefix="s3sync-windows-${arch_tag}" ;;
*) die "unsupported OS: ${os}" ;;
esac
# Fetch latest release JSON (unauthenticated)
json="$(curl -fsSL "$GHAPI")" || die "failed to query GitHub API"
# Pick URLs for tarball and checksum
tar_url="$(printf '%s' "$json" | awk -F'"' '/browser_download_url/ {print $4}' | grep -F "${prefix}.tar.gz" | head -n1)"
sum_url="$(printf '%s' "$json" | awk -F'"' '/browser_download_url/ {print $4}' | grep -F "${prefix}.sha256" | head -n1)"
[[ -n "$tar_url" ]] || die "could not find asset for prefix: ${prefix}"
[[ -n "$sum_url" ]] || die "could not find checksum for prefix: ${prefix}"
mkdir -p "${HOME}/.local/bin"
tmpdir="$(mktemp -d)"; trap 'rm -rf "$tmpdir"' EXIT
tar_path="${tmpdir}/s3sync.tar.gz"
sum_path="${tmpdir}/s3sync.sha256"
log "Downloading: $tar_url"
curl -fL --retry 5 --retry-delay 1 -o "$tar_path" "$tar_url"
curl -fL --retry 5 --retry-delay 1 -o "$sum_path" "$sum_url"
# Verify checksum
want_sum="$(cut -d: -f2 <<<"$(sed -n 's/^sha256:\(.*\)$/\1/p' "$sum_path" | tr -d '[:space:]')" || true)"
[[ -n "$want_sum" ]] || want_sum="$(awk '{print $1}' "$sum_path" || true)"
[[ -n "$want_sum" ]] || die "could not parse checksum file"
got_sum="$(sha256sum "$tar_path" | awk '{print $1}')"
[[ "$want_sum" == "$got_sum" ]] || die "sha256 mismatch: want $want_sum got $got_sum"
# Extract and install
tar -xzf "$tar_path" -C "$tmpdir"
binpath="$(find "$tmpdir" -maxdepth 2 -type f -name 's3sync' | head -n1)"
[[ -x "$binpath" ]] || die "s3sync binary not found in archive"
chmod +x "$binpath"
mv -f "$binpath" "$S3SYNC"
log "s3sync installed at $S3SYNC"
}
# --- deps & install/update ---
need aws
install_s3sync_latest
[[ ":$PATH:" == *":$HOME/.local/bin:"* ]] || export PATH="$HOME/.local/bin:$PATH"
mkdir -p "$DEST"
# list prefixes
log "Listing top-level prefixes in s3://${BUCKET}/"
mapfile -t PREFIXES < <(
aws s3 ls "s3://${BUCKET}/" --region "$REGION" --request-payer requester \
| awk '/^ *PRE /{print $2}' | sed 's:/$::' | grep -E '^[0-9]+$' || true
)
((${#PREFIXES[@]})) || die "No prefixes found."
# sort numerically to make order predictable
IFS=$'\n' read -r -d '' -a PREFIXES < <(printf '%s\n' "${PREFIXES[@]}" | sort -n && printf '\0')
# compute the effective start prefix:
# - if START_AT is set, floor it to the containing chunk boundary
effective_start=""
if [[ -n "$START_AT" ]]; then
# numeric, base-10 safe
start_num=$((10#$START_AT))
chunk=$((10#$CHUNK_SIZE))
effective_start=$(( (start_num / chunk) * chunk ))
fi
# mark initial status using numeric comparisons (no ordering assumptions)
declare -A RESULTS
for p in "${PREFIXES[@]}"; do
if [[ -n "$effective_start" ]] && (( 10#$p < 10#$effective_start )); then
RESULTS["$p"]="-- SKIPPED"
else
RESULTS["$p"]="-- TODO"
fi
done
total_start=$(date +%s)
for p in "${PREFIXES[@]}"; do
if [[ "${RESULTS[$p]}" == "-- SKIPPED" ]]; then
continue
fi
src="s3://${BUCKET}/${p}/"
dst="${DEST}/${p}/"
mkdir -p "$dst"
log "START ${p}"
start=$(date +%s)
"$S3SYNC" \
--source-request-payer \
--source-region "$REGION" \
--worker-size "$WORKERS" \
--max-parallel-uploads "$WORKERS" \
"$src" "$dst"
end=$(date +%s)
mins=$(( (end - start + 59) / 60 ))
RESULTS["$p"]="$mins minutes"
# Print status table so far
echo "---- Status ----"
for k in "${PREFIXES[@]}"; do
echo "$k ${RESULTS[$k]}"
done
echo "----------------"
done
total_end=$(date +%s)
total_mins=$(( (total_end - total_start + 59) / 60 ))
echo "ALL DONE in $total_mins minutes."