mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Compare commits
35 Commits
f6cea357e9
...
fork-depre
| Author | SHA1 | Date | |
|---|---|---|---|
| cee0a342dc | |||
| eb2953246e | |||
| fb462f2b24 | |||
| 9af9c93454 | |||
| 8632f9d74f | |||
| 5baa5770ac | |||
| fa9f8fc5df | |||
| bf51dc83e5 | |||
| 738567894a | |||
| e9dcff4015 | |||
| 21e7c718ea | |||
| 8c6ea1ae7a | |||
| e8be4c2e82 | |||
| 06f7710777 | |||
| 5c3828382c | |||
| fae4b4c8f9 | |||
| cdb0f9e8a2 | |||
| 5114f76517 | |||
| 3bdc491aba | |||
| 2a653857aa | |||
| 5f2955caa2 | |||
| 022dfd1546 | |||
| 249e3194dd | |||
| 530447e637 | |||
| 29c8d4fa39 | |||
| 5d3041b10d | |||
| 2ff606c32c | |||
| 9f952ac2ed | |||
| ec417f9bf4 | |||
| e2045a195c | |||
| ec281f9cc7 | |||
| 0d1c239a07 | |||
| 821846f671 | |||
| aab45b9c02 | |||
| 0ce6b818ad |
16
Cargo.lock
generated
16
Cargo.lock
generated
@ -6659,6 +6659,7 @@ dependencies = [
|
|||||||
"clap",
|
"clap",
|
||||||
"eyre",
|
"eyre",
|
||||||
"futures",
|
"futures",
|
||||||
|
"futures-util",
|
||||||
"jsonrpsee",
|
"jsonrpsee",
|
||||||
"jsonrpsee-core",
|
"jsonrpsee-core",
|
||||||
"lz4_flex",
|
"lz4_flex",
|
||||||
@ -6687,6 +6688,7 @@ dependencies = [
|
|||||||
"reth-execution-types",
|
"reth-execution-types",
|
||||||
"reth-exex",
|
"reth-exex",
|
||||||
"reth-fs-util",
|
"reth-fs-util",
|
||||||
|
"reth-hlfs",
|
||||||
"reth-hyperliquid-types",
|
"reth-hyperliquid-types",
|
||||||
"reth-network",
|
"reth-network",
|
||||||
"reth-network-api",
|
"reth-network-api",
|
||||||
@ -7966,6 +7968,20 @@ dependencies = [
|
|||||||
"thiserror 2.0.11",
|
"thiserror 2.0.11",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "reth-hlfs"
|
||||||
|
version = "1.2.0"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"parking_lot",
|
||||||
|
"rand 0.8.5",
|
||||||
|
"reth-tracing",
|
||||||
|
"tempfile",
|
||||||
|
"thiserror 2.0.11",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "reth-hyperliquid-types"
|
name = "reth-hyperliquid-types"
|
||||||
version = "1.2.0"
|
version = "1.2.0"
|
||||||
|
|||||||
@ -54,6 +54,7 @@ members = [
|
|||||||
"crates/net/ecies/",
|
"crates/net/ecies/",
|
||||||
"crates/net/eth-wire-types",
|
"crates/net/eth-wire-types",
|
||||||
"crates/net/eth-wire/",
|
"crates/net/eth-wire/",
|
||||||
|
"crates/net/hlfs/",
|
||||||
"crates/net/nat/",
|
"crates/net/nat/",
|
||||||
"crates/net/network-api/",
|
"crates/net/network-api/",
|
||||||
"crates/net/network-types/",
|
"crates/net/network-types/",
|
||||||
@ -356,6 +357,7 @@ reth-exex = { path = "crates/exex/exex" }
|
|||||||
reth-exex-test-utils = { path = "crates/exex/test-utils" }
|
reth-exex-test-utils = { path = "crates/exex/test-utils" }
|
||||||
reth-exex-types = { path = "crates/exex/types" }
|
reth-exex-types = { path = "crates/exex/types" }
|
||||||
reth-fs-util = { path = "crates/fs-util" }
|
reth-fs-util = { path = "crates/fs-util" }
|
||||||
|
reth-hlfs = { path = "crates/net/hlfs" }
|
||||||
reth-invalid-block-hooks = { path = "crates/engine/invalid-block-hooks" }
|
reth-invalid-block-hooks = { path = "crates/engine/invalid-block-hooks" }
|
||||||
reth-ipc = { path = "crates/rpc/ipc" }
|
reth-ipc = { path = "crates/rpc/ipc" }
|
||||||
reth-libmdbx = { path = "crates/storage/libmdbx-rs" }
|
reth-libmdbx = { path = "crates/storage/libmdbx-rs" }
|
||||||
|
|||||||
41
README.md
41
README.md
@ -2,6 +2,8 @@
|
|||||||
|
|
||||||
Hyperliquid archive node based on [reth](https://github.com/paradigmxyz/reth).
|
Hyperliquid archive node based on [reth](https://github.com/paradigmxyz/reth).
|
||||||
|
|
||||||
|
Got questions? Drop by the [Hyperliquid Discord](https://discord.gg/hyperliquid) #node-operators channel.
|
||||||
|
|
||||||
## ⚠️ IMPORTANT: System Transactions Appear as Pseudo Transactions
|
## ⚠️ IMPORTANT: System Transactions Appear as Pseudo Transactions
|
||||||
|
|
||||||
Deposit transactions from `0x222..22` to user addresses are intentionally recorded as pseudo transactions.
|
Deposit transactions from `0x222..22` to user addresses are intentionally recorded as pseudo transactions.
|
||||||
@ -16,17 +18,35 @@ Building NanoReth from source requires Rust and Cargo to be installed:
|
|||||||
|
|
||||||
## How to run (mainnet)
|
## How to run (mainnet)
|
||||||
|
|
||||||
1) `$ aws s3 sync s3://hl-mainnet-evm-blocks/ ~/evm-blocks --request-payer requester # one-time` - this will backfill the existing blocks from HyperLiquid's EVM S3 bucket.
|
The current state of the block files comprise of millions of small objects totalling over 20 Gigs and counting. The "requester pays" option means you will need a configured aws environment, and you could incur charges which varies according to destination (ec2 versus local).
|
||||||
|
|
||||||
2) `$ make install` - this will install the NanoReth binary.
|
1) this will backfill the existing blocks from Hyperliquid's EVM S3 bucket:
|
||||||
|
|
||||||
3) Start NanoReth which will begin syncing using the blocks in `~/evm-blocks`:
|
> use our rust based s3 tool wrapper to optimize your download experience - [read more](./etc/evm-block-sync/README.md)
|
||||||
|
```shell
|
||||||
|
chmod +x ./etc/evm-block-sync/s3sync-runner.sh
|
||||||
|
./etc/evm-block-sync/s3sync-runner.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
> or use the conventional [aws cli](https://aws.amazon.com/cli/)
|
||||||
|
```shell
|
||||||
|
aws s3 sync s3://hl-mainnet-evm-blocks/ ~/evm-blocks \
|
||||||
|
--request-payer requester \
|
||||||
|
--exact-timestamps \
|
||||||
|
--size-only \
|
||||||
|
--only-show-errors
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
1) `$ make install` - this will install the NanoReth binary.
|
||||||
|
|
||||||
|
2) Start NanoReth which will begin syncing using the blocks in `~/evm-blocks`:
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
$ reth node --http --http.addr 0.0.0.0 --http.api eth,ots,net,web3 --ws --ws.addr 0.0.0.0 --ws.origins '*' --ws.api eth,ots,net,web3 --ingest-dir ~/evm-blocks --ws.port 8545
|
$ reth node --http --http.addr 0.0.0.0 --http.api eth,ots,net,web3 --ws --ws.addr 0.0.0.0 --ws.origins '*' --ws.api eth,ots,net,web3 --ingest-dir ~/evm-blocks --ws.port 8545
|
||||||
```
|
```
|
||||||
|
|
||||||
4) Once the node logs stops making progress this means it's caught up with the existing blocks.
|
3) Once the node logs stops making progress this means it's caught up with the existing blocks.
|
||||||
|
|
||||||
Stop the NanoReth process and then start Goofys: `$ goofys --region=ap-northeast-1 --requester-pays hl-mainnet-evm-blocks evm-blocks`
|
Stop the NanoReth process and then start Goofys: `$ goofys --region=ap-northeast-1 --requester-pays hl-mainnet-evm-blocks evm-blocks`
|
||||||
|
|
||||||
@ -65,12 +85,25 @@ $ reth node --http --http.addr 0.0.0.0 --http.api eth,ots,net,web3 \
|
|||||||
|
|
||||||
Testnet is supported since block 21304281.
|
Testnet is supported since block 21304281.
|
||||||
|
|
||||||
|
> [!NOTE]
|
||||||
|
> To run testnet locally, you will need:
|
||||||
|
> - [ ] [git lfs](https://git-lfs.com/)
|
||||||
|
> - [ ] [rust toolchain](https://rustup.rs/)
|
||||||
|
|
||||||
```sh
|
```sh
|
||||||
# Get testnet genesis at block 21304281
|
# Get testnet genesis at block 21304281
|
||||||
$ cd ~
|
$ cd ~
|
||||||
$ git clone https://github.com/sprites0/hl-testnet-genesis
|
$ git clone https://github.com/sprites0/hl-testnet-genesis
|
||||||
|
$ git -C hl-testnet-genesis lfs pull
|
||||||
$ zstd --rm -d ~/hl-testnet-genesis/*.zst
|
$ zstd --rm -d ~/hl-testnet-genesis/*.zst
|
||||||
|
|
||||||
|
# Now return to where you have cloned this project to continue
|
||||||
|
$ cd -
|
||||||
|
|
||||||
|
# prepare your rust toolchain
|
||||||
|
$ rustup install 1.82 # (this corresponds with rust version in our Cargo.toml)
|
||||||
|
$ rustup default 1.82
|
||||||
|
|
||||||
# Init node
|
# Init node
|
||||||
$ make install
|
$ make install
|
||||||
$ reth init-state --without-evm --chain testnet --header ~/hl-testnet-genesis/21304281.rlp \
|
$ reth init-state --without-evm --chain testnet --header ~/hl-testnet-genesis/21304281.rlp \
|
||||||
|
|||||||
@ -35,6 +35,7 @@ reth-cli-runner.workspace = true
|
|||||||
reth-cli-commands.workspace = true
|
reth-cli-commands.workspace = true
|
||||||
reth-cli-util.workspace = true
|
reth-cli-util.workspace = true
|
||||||
reth-consensus-common.workspace = true
|
reth-consensus-common.workspace = true
|
||||||
|
reth-hlfs.workspace = true
|
||||||
reth-rpc-builder.workspace = true
|
reth-rpc-builder.workspace = true
|
||||||
reth-rpc.workspace = true
|
reth-rpc.workspace = true
|
||||||
reth-rpc-types-compat.workspace = true
|
reth-rpc-types-compat.workspace = true
|
||||||
@ -81,8 +82,9 @@ tracing.workspace = true
|
|||||||
serde_json.workspace = true
|
serde_json.workspace = true
|
||||||
|
|
||||||
# async
|
# async
|
||||||
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
|
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread", "net", "fs"] }
|
||||||
futures.workspace = true
|
futures.workspace = true
|
||||||
|
futures-util.workspace = true
|
||||||
|
|
||||||
# time
|
# time
|
||||||
time = { workspace = true }
|
time = { workspace = true }
|
||||||
|
|||||||
@ -29,6 +29,7 @@ use tokio::sync::Mutex;
|
|||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
|
|
||||||
use crate::serialized::{BlockAndReceipts, EvmBlock};
|
use crate::serialized::{BlockAndReceipts, EvmBlock};
|
||||||
|
use crate::share_blocks::ShareBlocks;
|
||||||
use crate::spot_meta::erc20_contract_to_spot_token;
|
use crate::spot_meta::erc20_contract_to_spot_token;
|
||||||
|
|
||||||
/// Poll interval when tailing an *open* hourly file.
|
/// Poll interval when tailing an *open* hourly file.
|
||||||
@ -41,6 +42,7 @@ pub(crate) struct BlockIngest {
|
|||||||
pub local_ingest_dir: Option<PathBuf>,
|
pub local_ingest_dir: Option<PathBuf>,
|
||||||
pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block
|
pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block
|
||||||
pub precompiles_cache: PrecompilesCache,
|
pub precompiles_cache: PrecompilesCache,
|
||||||
|
pub hlfs: Option<ShareBlocks>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
@ -73,8 +75,23 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts =
|
let (_block_timestamp, parsed_block) = match serde_json::from_str(&line) {
|
||||||
serde_json::from_str(&line).expect("Failed to parse local block and receipts");
|
Ok(LocalBlockAndReceipts(_block_timestamp, parsed_block)) => {
|
||||||
|
(_block_timestamp, parsed_block)
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
// Possible scenarios:
|
||||||
|
let is_last_line = line_idx == lines.len() - 1;
|
||||||
|
if is_last_line {
|
||||||
|
// 1. It's not written fully yet - in this case, just wait for the next line
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// 2. hl-node previously terminated while writing the lines
|
||||||
|
// In this case, try to skip this line
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
let height = match &parsed_block.block {
|
let height = match &parsed_block.block {
|
||||||
EvmBlock::Reth115(b) => {
|
EvmBlock::Reth115(b) => {
|
||||||
@ -140,9 +157,9 @@ impl BlockIngest {
|
|||||||
if let Some(block) = self.try_collect_local_block(height).await {
|
if let Some(block) = self.try_collect_local_block(height).await {
|
||||||
info!("Returning locally synced block for @ Height [{height}]");
|
info!("Returning locally synced block for @ Height [{height}]");
|
||||||
return Some(block);
|
return Some(block);
|
||||||
} else {
|
|
||||||
self.try_collect_s3_block(height)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.try_collect_s3_block(height)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> {
|
pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> {
|
||||||
@ -260,6 +277,7 @@ impl BlockIngest {
|
|||||||
let engine_api = node.auth_server_handle().http_client();
|
let engine_api = node.auth_server_handle().http_client();
|
||||||
let mut evm_map = erc20_contract_to_spot_token(node.chain_spec().chain_id()).await?;
|
let mut evm_map = erc20_contract_to_spot_token(node.chain_spec().chain_id()).await?;
|
||||||
|
|
||||||
|
const MINIMUM_TIMESTAMP: u64 = 1739849780;
|
||||||
let current_block_timestamp: u64 = provider
|
let current_block_timestamp: u64 = provider
|
||||||
.block_by_number(head)
|
.block_by_number(head)
|
||||||
.expect("Failed to fetch current block in db")
|
.expect("Failed to fetch current block in db")
|
||||||
@ -267,6 +285,8 @@ impl BlockIngest {
|
|||||||
.into_header()
|
.into_header()
|
||||||
.timestamp();
|
.timestamp();
|
||||||
|
|
||||||
|
let current_block_timestamp = current_block_timestamp.max(MINIMUM_TIMESTAMP);
|
||||||
|
|
||||||
info!("Current height {height}, timestamp {current_block_timestamp}");
|
info!("Current height {height}, timestamp {current_block_timestamp}");
|
||||||
self.start_local_ingest_loop(height, current_block_timestamp).await;
|
self.start_local_ingest_loop(height, current_block_timestamp).await;
|
||||||
|
|
||||||
|
|||||||
@ -6,6 +6,7 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
|
|||||||
mod block_ingest;
|
mod block_ingest;
|
||||||
mod call_forwarder;
|
mod call_forwarder;
|
||||||
mod serialized;
|
mod serialized;
|
||||||
|
mod share_blocks;
|
||||||
mod spot_meta;
|
mod spot_meta;
|
||||||
mod tx_forwarder;
|
mod tx_forwarder;
|
||||||
|
|
||||||
@ -18,6 +19,7 @@ use reth::cli::Cli;
|
|||||||
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
|
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
|
||||||
use reth_hyperliquid_types::PrecompilesCache;
|
use reth_hyperliquid_types::PrecompilesCache;
|
||||||
use reth_node_ethereum::EthereumNode;
|
use reth_node_ethereum::EthereumNode;
|
||||||
|
use share_blocks::ShareBlocksArgs;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
use tx_forwarder::EthForwarderApiServer;
|
use tx_forwarder::EthForwarderApiServer;
|
||||||
@ -40,6 +42,10 @@ struct HyperliquidExtArgs {
|
|||||||
/// 3. filters out logs and transactions from subscription.
|
/// 3. filters out logs and transactions from subscription.
|
||||||
#[arg(long, default_value = "false")]
|
#[arg(long, default_value = "false")]
|
||||||
pub hl_node_compliant: bool,
|
pub hl_node_compliant: bool,
|
||||||
|
|
||||||
|
/// Enable hlfs to backfill archive blocks
|
||||||
|
#[command(flatten)]
|
||||||
|
pub hlfs: ShareBlocksArgs,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
@ -85,8 +91,24 @@ fn main() {
|
|||||||
.launch()
|
.launch()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let ingest =
|
// start HLFS (serve + peer-backed backfill) using the node's network
|
||||||
BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache };
|
let hlfs = if ext_args.hlfs.share_blocks {
|
||||||
|
let net = handle.node.network.clone();
|
||||||
|
Some(
|
||||||
|
crate::share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net)
|
||||||
|
.await?,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
let ingest = BlockIngest {
|
||||||
|
ingest_dir,
|
||||||
|
local_ingest_dir,
|
||||||
|
local_blocks_cache,
|
||||||
|
precompiles_cache,
|
||||||
|
hlfs,
|
||||||
|
};
|
||||||
ingest.run(handle.node).await.unwrap();
|
ingest.run(handle.node).await.unwrap();
|
||||||
handle.node_exit_future.await
|
handle.node_exit_future.await
|
||||||
},
|
},
|
||||||
|
|||||||
167
bin/reth/src/share_blocks.rs
Normal file
167
bin/reth/src/share_blocks.rs
Normal file
@ -0,0 +1,167 @@
|
|||||||
|
use clap::Args;
|
||||||
|
use reth_hlfs::{Backfiller, Client, PeerRecord, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK};
|
||||||
|
use reth_network_api::{events::NetworkEvent, FullNetwork};
|
||||||
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
|
net::{IpAddr, SocketAddr},
|
||||||
|
path::PathBuf,
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
use tokio::{
|
||||||
|
task::JoinHandle,
|
||||||
|
time::{sleep, timeout, Duration},
|
||||||
|
};
|
||||||
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
|
// use futures_util::StreamExt;
|
||||||
|
use futures_util::stream::StreamExt;
|
||||||
|
|
||||||
|
#[derive(Args, Clone, Debug)]
|
||||||
|
pub(crate) struct ShareBlocksArgs {
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
pub share_blocks: bool,
|
||||||
|
#[arg(long, default_value = "0.0.0.0")]
|
||||||
|
pub share_blocks_host: String,
|
||||||
|
#[arg(long, default_value_t = 9595)]
|
||||||
|
pub share_blocks_port: u16,
|
||||||
|
#[arg(long, default_value = "evm-blocks")]
|
||||||
|
pub archive_dir: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) struct ShareBlocks {
|
||||||
|
_server: JoinHandle<()>,
|
||||||
|
_autodetect: JoinHandle<()>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ShareBlocks {
|
||||||
|
pub(crate) async fn start_with_network<Net>(
|
||||||
|
args: &ShareBlocksArgs,
|
||||||
|
network: Net,
|
||||||
|
) -> eyre::Result<Self>
|
||||||
|
where
|
||||||
|
Net: FullNetwork + Clone + 'static,
|
||||||
|
{
|
||||||
|
let host: IpAddr = args
|
||||||
|
.share_blocks_host
|
||||||
|
.parse()
|
||||||
|
.map_err(|e| eyre::eyre!("invalid --share-blocks-host: {e}"))?;
|
||||||
|
let bind: SocketAddr = (host, args.share_blocks_port).into();
|
||||||
|
|
||||||
|
let srv = Server::new(bind, &args.archive_dir).with_limits(512, 50);
|
||||||
|
let _server = tokio::spawn(async move {
|
||||||
|
if let Err(e) = srv.run().await {
|
||||||
|
warn!(error=%e, "hlfs: server exited");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let _autodetect =
|
||||||
|
spawn_autodetect(network, host, args.share_blocks_port, args.archive_dir.clone());
|
||||||
|
|
||||||
|
info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)");
|
||||||
|
Ok(Self { _server, _autodetect })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn spawn_autodetect<Net>(
|
||||||
|
network: Net,
|
||||||
|
self_ip: IpAddr,
|
||||||
|
hlfs_port: u16,
|
||||||
|
archive_dir: PathBuf,
|
||||||
|
) -> JoinHandle<()>
|
||||||
|
where
|
||||||
|
Net: FullNetwork + Clone + 'static,
|
||||||
|
{
|
||||||
|
let client = Client::new(&archive_dir, Vec::new()).with_timeout(Duration::from_secs(5));
|
||||||
|
let backfiller = Arc::new(tokio::sync::Mutex::new(Backfiller::new(client, &archive_dir)));
|
||||||
|
let good: Arc<tokio::sync::Mutex<HashSet<PeerRecord>>> =
|
||||||
|
Arc::new(tokio::sync::Mutex::new(HashSet::new()));
|
||||||
|
|
||||||
|
tokio::spawn({
|
||||||
|
warn!("hlfs: backfiller started");
|
||||||
|
let backfiller = backfiller.clone();
|
||||||
|
async move {
|
||||||
|
loop {
|
||||||
|
let mut bf = backfiller.lock().await;
|
||||||
|
if bf.client.max_block < bf.max_block_seen {
|
||||||
|
let block = bf.client.max_block + 1;
|
||||||
|
let new_height = bf.fetch_if_missing(block).await.expect("new height");
|
||||||
|
bf.client.max_block = new_height.unwrap();
|
||||||
|
}
|
||||||
|
sleep(Duration::from_millis(50)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tokio::spawn({
|
||||||
|
let backfiller = backfiller.clone();
|
||||||
|
async move {
|
||||||
|
let mut events = network.event_listener();
|
||||||
|
loop {
|
||||||
|
let mut bf = backfiller.lock().await;
|
||||||
|
match events.next().await {
|
||||||
|
Some(NetworkEvent::ActivePeerSession { info, .. }) => {
|
||||||
|
let ip = info.remote_addr.ip();
|
||||||
|
if ip.is_unspecified() {
|
||||||
|
debug!(%ip, "hlfs: skip unspecified");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if ip == self_ip {
|
||||||
|
debug!(%ip, "hlfs: skip self");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port);
|
||||||
|
let max_block = probe_hlfs(addr).await;
|
||||||
|
if max_block != 0 {
|
||||||
|
let mut g = good.lock().await;
|
||||||
|
if g.insert(PeerRecord { addr, max_block }) {
|
||||||
|
let v: Vec<_> = g.iter().copied().collect();
|
||||||
|
bf.set_peers(v.clone());
|
||||||
|
info!(%addr, %max_block, total=v.len(), "hlfs: peer added");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
debug!(%addr, "hlfs: peer has no HLFS");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(_) => {}
|
||||||
|
None => {
|
||||||
|
warn!("hlfs: network event stream ended");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn probe_hlfs(addr: SocketAddr) -> u64 {
|
||||||
|
use tokio::{
|
||||||
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
|
net::TcpStream,
|
||||||
|
};
|
||||||
|
|
||||||
|
let fut = async {
|
||||||
|
let mut s = TcpStream::connect(addr).await.ok()?;
|
||||||
|
|
||||||
|
// send [OP][8 zero bytes]
|
||||||
|
let mut msg = [0u8; 9];
|
||||||
|
msg[0] = OP_REQ_MAX_BLOCK;
|
||||||
|
s.write_all(&msg).await.ok()?;
|
||||||
|
|
||||||
|
// read 1-byte opcode
|
||||||
|
let mut op = [0u8; 1];
|
||||||
|
s.read_exact(&mut op).await.ok()?;
|
||||||
|
if op[0] != OP_RES_MAX_BLOCK {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// read 8-byte little-endian block number
|
||||||
|
let mut blk = [0u8; 8];
|
||||||
|
s.read_exact(&mut blk).await.ok()?;
|
||||||
|
Some(u64::from_le_bytes(blk))
|
||||||
|
};
|
||||||
|
|
||||||
|
match timeout(Duration::from_secs(2), fut).await {
|
||||||
|
Ok(Some(n)) => n,
|
||||||
|
_ => 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
25
crates/net/hlfs/Cargo.toml
Normal file
25
crates/net/hlfs/Cargo.toml
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
# crates/net/hlfs/Cargo.toml
|
||||||
|
[package]
|
||||||
|
name = "reth-hlfs"
|
||||||
|
version.workspace = true
|
||||||
|
edition.workspace = true
|
||||||
|
rust-version.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
homepage.workspace = true
|
||||||
|
repository.workspace = true
|
||||||
|
description = "Implementation of archive block downloader"
|
||||||
|
authors = ["@wwwehr"]
|
||||||
|
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
tokio = { workspace = true, features = ["rt-multi-thread","macros","net","time","fs","sync","io-util"] }
|
||||||
|
bytes.workspace = true
|
||||||
|
parking_lot.workspace = true
|
||||||
|
thiserror.workspace = true
|
||||||
|
tracing.workspace = true
|
||||||
|
reth-tracing.workspace = true
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
rand.workspace = true
|
||||||
|
tempfile.workspace = true
|
||||||
|
|
||||||
374
crates/net/hlfs/src/lib.rs
Normal file
374
crates/net/hlfs/src/lib.rs
Normal file
@ -0,0 +1,374 @@
|
|||||||
|
//! HLFS TCP micro-protocol for historical backfill (single-block, RR per block).
|
||||||
|
|
||||||
|
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||||
|
use parking_lot::Mutex;
|
||||||
|
use reth_tracing::tracing::{debug, info, trace, warn};
|
||||||
|
use std::{
|
||||||
|
fs,
|
||||||
|
hash::{Hash, Hasher},
|
||||||
|
io,
|
||||||
|
net::SocketAddr,
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
use thiserror::Error;
|
||||||
|
use tokio::{
|
||||||
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
|
net::{TcpListener, TcpStream},
|
||||||
|
time::timeout,
|
||||||
|
};
|
||||||
|
|
||||||
|
type Result<T, E = HlfsError> = std::result::Result<T, E>;
|
||||||
|
|
||||||
|
pub const OP_REQ_BLOCK: u8 = 0x01;
|
||||||
|
pub const OP_RES_BLOCK: u8 = 0x02;
|
||||||
|
pub const OP_REQ_MAX_BLOCK: u8 = 0x03;
|
||||||
|
pub const OP_RES_MAX_BLOCK: u8 = 0x04;
|
||||||
|
pub const OP_ERR_TOO_BUSY: u8 = 0x05;
|
||||||
|
pub const OP_ERR_NOT_FOUND: u8 = 0x06;
|
||||||
|
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum HlfsError {
|
||||||
|
#[error("io: {0}")]
|
||||||
|
Io(#[from] io::Error),
|
||||||
|
#[error("proto")]
|
||||||
|
Proto,
|
||||||
|
#[error("no peers")]
|
||||||
|
NoPeers,
|
||||||
|
#[error("timeout")]
|
||||||
|
Timeout,
|
||||||
|
#[error("busy: retry_ms={0}")]
|
||||||
|
Busy(u32),
|
||||||
|
#[error("not found")]
|
||||||
|
NotFound,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
fn put_u64(b: &mut BytesMut, v: u64) {
|
||||||
|
b.put_u64_le(v)
|
||||||
|
}
|
||||||
|
#[inline]
|
||||||
|
fn put_u32(b: &mut BytesMut, v: u32) {
|
||||||
|
b.put_u32_le(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> {
|
||||||
|
if let Some(parent) = Path::new(path).parent() {
|
||||||
|
fs::create_dir_all(parent)
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Client: tries each peer once; rotates starting index per call
|
||||||
|
#[derive(Debug, Copy, Clone)]
|
||||||
|
pub struct PeerRecord {
|
||||||
|
pub addr: SocketAddr,
|
||||||
|
pub max_block: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PartialEq for PeerRecord {
|
||||||
|
fn eq(&self, o: &Self) -> bool {
|
||||||
|
self.addr == o.addr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl Eq for PeerRecord {}
|
||||||
|
impl Hash for PeerRecord {
|
||||||
|
fn hash<H: Hasher>(&self, s: &mut H) {
|
||||||
|
self.addr.hash(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Client {
|
||||||
|
root: PathBuf,
|
||||||
|
pub peers: Arc<Mutex<Vec<PeerRecord>>>,
|
||||||
|
timeout: Duration,
|
||||||
|
pub max_block: u64,
|
||||||
|
}
|
||||||
|
impl Client {
|
||||||
|
pub fn new(root: impl Into<PathBuf>, peers: Vec<PeerRecord>) -> Self {
|
||||||
|
let root: PathBuf = root.into();
|
||||||
|
let n = find_max_number_file(&root).unwrap();
|
||||||
|
debug!(max_block = n, "hlfs: our archive");
|
||||||
|
Self {
|
||||||
|
root,
|
||||||
|
peers: Arc::new(Mutex::new(peers)),
|
||||||
|
timeout: Duration::from_secs(3),
|
||||||
|
max_block: n,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn update_peers(&self, peers: Vec<PeerRecord>) {
|
||||||
|
*self.peers.lock() = peers;
|
||||||
|
}
|
||||||
|
pub fn with_timeout(mut self, d: Duration) -> Self {
|
||||||
|
self.timeout = d;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
pub async fn wants_block(&self, number: u64, rr_index: usize) -> Result<Vec<u8>, HlfsError> {
|
||||||
|
let peers = self.peers.lock().clone();
|
||||||
|
debug!(peer_count = peers.len(), "hlfs: peers");
|
||||||
|
if peers.is_empty() {
|
||||||
|
return Err(HlfsError::NoPeers);
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut all = (0..peers.len()).map(|i| (rr_index + i) % peers.len());
|
||||||
|
let mut last_busy: Option<u32> = None;
|
||||||
|
while let Some(i) = all.next() {
|
||||||
|
let addr = peers[i];
|
||||||
|
trace!(%addr.addr, "hlfs: dialing");
|
||||||
|
match timeout(self.timeout, TcpStream::connect(addr.addr)).await {
|
||||||
|
Err(_) => continue,
|
||||||
|
Ok(Err(_)) => continue,
|
||||||
|
Ok(Ok(mut sock)) => {
|
||||||
|
let mut req = BytesMut::with_capacity(1 + 8);
|
||||||
|
req.put_u8(OP_REQ_BLOCK);
|
||||||
|
put_u64(&mut req, number);
|
||||||
|
if let Err(e) = sock.write_all(&req).await {
|
||||||
|
debug!(%addr.addr, "hlfs: write err: {e}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let mut op = [0u8; 1];
|
||||||
|
if let Err(e) = timeout(self.timeout, sock.read_exact(&mut op)).await {
|
||||||
|
debug!(%addr.addr, "hlfs: read op timeout {e:?}");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let op = op[0];
|
||||||
|
match op {
|
||||||
|
OP_RES_BLOCK => {
|
||||||
|
// DATA
|
||||||
|
let mut len = [0u8; 4];
|
||||||
|
sock.read_exact(&mut len).await?;
|
||||||
|
let len = u32::from_le_bytes(len) as usize;
|
||||||
|
let mut buf = vec![0u8; len];
|
||||||
|
sock.read_exact(&mut buf).await?;
|
||||||
|
return Ok(buf);
|
||||||
|
}
|
||||||
|
OP_ERR_TOO_BUSY => {
|
||||||
|
let mut ms = [0u8; 4];
|
||||||
|
sock.read_exact(&mut ms).await?;
|
||||||
|
last_busy = Some(u32::from_le_bytes(ms));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
OP_ERR_NOT_FOUND => {
|
||||||
|
return Err(HlfsError::NotFound);
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(ms) = last_busy {
|
||||||
|
return Err(HlfsError::Busy(ms));
|
||||||
|
}
|
||||||
|
Err(HlfsError::NotFound)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_max_number_file(root: &Path) -> Result<u64> {
|
||||||
|
fn parse_num(name: &str) -> Option<u64> {
|
||||||
|
name.strip_suffix(".rmp.lz4")?.parse::<u64>().ok()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn walk(dir: &Path, best: &mut Option<u64>) -> io::Result<()> {
|
||||||
|
for entry in fs::read_dir(dir)? {
|
||||||
|
let entry = entry?;
|
||||||
|
let path = entry.path();
|
||||||
|
let ft = entry.file_type()?;
|
||||||
|
if ft.is_dir() {
|
||||||
|
walk(&path, best)?;
|
||||||
|
} else if ft.is_file() {
|
||||||
|
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
|
||||||
|
if let Some(n) = parse_num(name) {
|
||||||
|
if best.map_or(true, |b| n > b) {
|
||||||
|
*best = Some(n);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut best = Some(0);
|
||||||
|
let top: PathBuf = fs::read_dir(root)?
|
||||||
|
.filter_map(|e| e.ok())
|
||||||
|
.filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
|
||||||
|
.filter_map(|e| {
|
||||||
|
let name = e.file_name();
|
||||||
|
let s = name.to_str()?;
|
||||||
|
let n: u64 = s.parse().ok()?;
|
||||||
|
Some((n, e.path()))
|
||||||
|
})
|
||||||
|
.max_by_key(|(n, _)| *n)
|
||||||
|
.map(|(_, p)| p)
|
||||||
|
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no numeric top-level dirs"))?;
|
||||||
|
|
||||||
|
walk(&top, &mut best)?;
|
||||||
|
Ok(best.expect("cannot find block files"))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Server: serves `{root}/{number}.rlp`.
|
||||||
|
pub struct Server {
|
||||||
|
bind: SocketAddr,
|
||||||
|
root: PathBuf,
|
||||||
|
max_conns: usize,
|
||||||
|
inflight: Arc<Mutex<usize>>,
|
||||||
|
busy_retry_ms: u32,
|
||||||
|
max_block: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Server {
|
||||||
|
pub fn new(bind: SocketAddr, root: impl Into<PathBuf>) -> Self {
|
||||||
|
let root: PathBuf = root.into();
|
||||||
|
let n = find_max_number_file(&root).unwrap();
|
||||||
|
Self {
|
||||||
|
bind,
|
||||||
|
root,
|
||||||
|
max_conns: 512,
|
||||||
|
inflight: Arc::new(Mutex::new(0)),
|
||||||
|
busy_retry_ms: 100,
|
||||||
|
max_block: n,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self {
|
||||||
|
self.max_conns = max_conns;
|
||||||
|
self.busy_retry_ms = busy_retry_ms;
|
||||||
|
self
|
||||||
|
}
|
||||||
|
pub async fn run(self) -> Result<(), HlfsError> {
|
||||||
|
fs::create_dir_all(&self.root).ok();
|
||||||
|
info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening");
|
||||||
|
let lst = TcpListener::bind(self.bind).await?;
|
||||||
|
loop {
|
||||||
|
let (mut sock, addr) = lst.accept().await?;
|
||||||
|
if *self.inflight.lock() >= self.max_conns {
|
||||||
|
let mut b = BytesMut::with_capacity(5);
|
||||||
|
b.put_u8(OP_ERR_TOO_BUSY);
|
||||||
|
put_u32(&mut b, self.busy_retry_ms);
|
||||||
|
let _ = sock.write_all(&b).await;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
*self.inflight.lock() += 1;
|
||||||
|
let root = self.root.clone();
|
||||||
|
let inflight = self.inflight.clone();
|
||||||
|
let busy = self.busy_retry_ms;
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _ = handle_conn(&mut sock, &root, busy, addr, self.max_block).await;
|
||||||
|
*inflight.lock() -= 1;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn handle_conn(
|
||||||
|
sock: &mut TcpStream,
|
||||||
|
root: &Path,
|
||||||
|
busy_ms: u32,
|
||||||
|
addr: SocketAddr,
|
||||||
|
max_block: u64,
|
||||||
|
) -> Result<(), HlfsError> {
|
||||||
|
let mut op = [0u8; 1];
|
||||||
|
sock.read_exact(&mut op).await?;
|
||||||
|
if op[0] != OP_REQ_BLOCK && op[0] != OP_REQ_MAX_BLOCK {
|
||||||
|
warn!(%addr, "hlfs: bad op");
|
||||||
|
return Err(HlfsError::Proto);
|
||||||
|
}
|
||||||
|
|
||||||
|
if op[0] == OP_REQ_MAX_BLOCK {
|
||||||
|
let mut b = BytesMut::with_capacity(1 + 8);
|
||||||
|
b.put_u8(OP_RES_MAX_BLOCK);
|
||||||
|
put_u64(&mut b, max_block);
|
||||||
|
let _ = sock.write_all(&b).await;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut num = [0u8; 8];
|
||||||
|
sock.read_exact(&mut num).await?;
|
||||||
|
let number = u64::from_le_bytes(num);
|
||||||
|
|
||||||
|
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
|
||||||
|
let f = (n / 1_000_000) * 1_000_000;
|
||||||
|
let s = (n / 1_000) * 1_000;
|
||||||
|
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", root.to_string_lossy());
|
||||||
|
|
||||||
|
trace!(%addr, number, %path, "hlfs: req");
|
||||||
|
if let Err(e) = ensure_parent_dirs(&path).await {
|
||||||
|
warn!(%addr, %path, "hlfs: mkdirs failed: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
|
match fs::read(&path) {
|
||||||
|
Ok(data) => {
|
||||||
|
let mut b = BytesMut::with_capacity(1 + 4 + data.len());
|
||||||
|
b.put_u8(OP_RES_BLOCK);
|
||||||
|
put_u32(&mut b, data.len() as u32);
|
||||||
|
b.extend_from_slice(&data);
|
||||||
|
let _ = sock.write_all(&b).await;
|
||||||
|
}
|
||||||
|
Err(e) if e.kind() == io::ErrorKind::NotFound => {
|
||||||
|
let mut b = BytesMut::with_capacity(1);
|
||||||
|
b.put_u8(OP_ERR_NOT_FOUND);
|
||||||
|
let _ = sock.write_all(&b).await;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(%addr, %path, "hlfs: read error: {e}");
|
||||||
|
let _ = sock.shutdown().await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Backfiller: ask client per missing block; rotate peers every block.
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct Backfiller {
|
||||||
|
pub client: Client,
|
||||||
|
root: PathBuf,
|
||||||
|
pub max_block_seen: u64,
|
||||||
|
}
|
||||||
|
impl Backfiller {
|
||||||
|
pub fn new(client: Client, root: impl Into<PathBuf>) -> Self {
|
||||||
|
Self { client, root: root.into(), max_block_seen: 0 }
|
||||||
|
}
|
||||||
|
pub fn set_peers(&mut self, peers: Vec<PeerRecord>) {
|
||||||
|
self.client.update_peers(peers);
|
||||||
|
let _peers = self.client.peers.lock().clone();
|
||||||
|
for p in _peers {
|
||||||
|
if p.max_block > self.max_block_seen {
|
||||||
|
self.max_block_seen = p.max_block
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub async fn fetch_if_missing(&mut self, number: u64) -> Result<Option<u64>, HlfsError> {
|
||||||
|
let rr_index = number as usize;
|
||||||
|
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
|
||||||
|
let f = (n / 1_000_000) * 1_000_000;
|
||||||
|
let s = (n / 1_000) * 1_000;
|
||||||
|
|
||||||
|
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy());
|
||||||
|
if Path::new(&path).exists() {
|
||||||
|
debug!(block = number, "hlfs: already have");
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
match self.client.wants_block(number, rr_index).await {
|
||||||
|
Err(HlfsError::NotFound) => Ok(None),
|
||||||
|
Err(HlfsError::Busy(ms)) => {
|
||||||
|
tokio::time::sleep(Duration::from_millis(ms as u64)).await;
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
Err(e) => Err(e),
|
||||||
|
Ok(data) => {
|
||||||
|
if let Err(e) = ensure_parent_dirs(&path).await {
|
||||||
|
warn!(%path, "hlfs: mkdirs failed: {e}");
|
||||||
|
}
|
||||||
|
if let Err(e) = fs::write(&path, &data) {
|
||||||
|
warn!(%path, "hlfs: write failed: {e}");
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
debug!(block = number, "hlfs: got block");
|
||||||
|
Ok(Some(number))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
57
etc/evm-block-sync/README.md
Normal file
57
etc/evm-block-sync/README.md
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
# 🚀 S3Sync Runner
|
||||||
|
|
||||||
|
Fastest way to pull down evm block files from s3
|
||||||
|
|
||||||
|
This script automates syncing **massive S3 object stores** in a **safe, resumable, and time-tracked way**. The traditional `s3 sync` is just wayy to slow.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
- ✅ Auto-installs [nidor1998/s3sync](https://github.com/nidor1998/s3sync) (latest release) into `~/.local/bin`
|
||||||
|
- ✅ Sequential per-prefix syncs (e.g., `21000000/`, `22000000/`, …)
|
||||||
|
- ✅ Per-prefix timing: `22000000 took 12 minutes!`
|
||||||
|
- ✅ Total runtime summary at the end
|
||||||
|
- ✅ Designed for **tiny files at scale** (EVM block archives)
|
||||||
|
- ✅ Zero-config bootstrap — just run the script
|
||||||
|
|
||||||
|
## Quick Start
|
||||||
|
|
||||||
|
```bash
|
||||||
|
chmod +x s3sync-runner.sh
|
||||||
|
./s3sync-runner.sh
|
||||||
|
```
|
||||||
|
|
||||||
|
> Skipping to relevant block section
|
||||||
|
```bash
|
||||||
|
./s3sync-runner.sh --start-at 30000000
|
||||||
|
```
|
||||||
|
|
||||||
|
The script will:
|
||||||
|
* Install or update s3sync into ~/.local/bin
|
||||||
|
* Discover top-level prefixes in your S3 bucket
|
||||||
|
* Sync them one at a time, printing elapsed minutes
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
Edit the top of s3sync-runner.sh if needed:
|
||||||
|
```bash
|
||||||
|
BUCKET="hl-testnet-evm-blocks" # could be hl-mainnet-evm-blocks
|
||||||
|
REGION="ap-northeast-1" # hardcoded bucket region
|
||||||
|
DEST="$HOME/evm-blocks-testnet" # local target directory (this is what nanoreth will look at)
|
||||||
|
WORKERS=512 # worker threads per sync (lotsa workers needs lotsa RAM)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Example Output
|
||||||
|
```bash
|
||||||
|
[2025-08-20 20:01:02] START 21000000
|
||||||
|
[2025-08-20 20:13:15] 21000000 took 12 minutes!
|
||||||
|
[2025-08-20 20:13:15] START 22000000
|
||||||
|
[2025-08-20 20:26:40] 22000000 took 13 minutes!
|
||||||
|
[2025-08-20 20:26:40] ALL DONE in 25 minutes.
|
||||||
|
```
|
||||||
|
|
||||||
|
## Hackathon Context
|
||||||
|
|
||||||
|
This runner was built as part of the Hyperliquid DEX Hackathon to accelerate:
|
||||||
|
* ⛓️ Blockchain archive node ingestion
|
||||||
|
* 📂 EVM block dataset replication
|
||||||
|
* 🧩 DEX ecosystem data pipelines
|
||||||
163
etc/evm-block-sync/s3sync-runner.sh
Executable file
163
etc/evm-block-sync/s3sync-runner.sh
Executable file
@ -0,0 +1,163 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
# @author Niko Wehr (wwwehr)
|
||||||
|
set -euo pipefail
|
||||||
|
|
||||||
|
# ---- config ----
|
||||||
|
BUCKET="hl-testnet-evm-blocks"
|
||||||
|
REGION="ap-northeast-1"
|
||||||
|
DEST="${HOME}/evm-blocks-testnet"
|
||||||
|
WORKERS=512
|
||||||
|
S3SYNC="${HOME}/.local/bin/s3sync"
|
||||||
|
START_AT="" # default: run all
|
||||||
|
CHUNK_SIZE=1000000 # each prefix represents this many blocks
|
||||||
|
# ----------------
|
||||||
|
|
||||||
|
# parse args
|
||||||
|
while [[ $# -gt 0 ]]; do
|
||||||
|
case "$1" in
|
||||||
|
--start-at)
|
||||||
|
START_AT="$2"
|
||||||
|
shift 2
|
||||||
|
;;
|
||||||
|
*)
|
||||||
|
echo "Unknown arg: $1" >&2
|
||||||
|
exit 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
done
|
||||||
|
|
||||||
|
now(){ date +"%F %T"; }
|
||||||
|
log(){ printf '[%s] %s\n' "$(now)" "$*"; }
|
||||||
|
die(){ log "ERROR: $*"; exit 1; }
|
||||||
|
trap 'log "Signal received, exiting."; exit 2' INT TERM
|
||||||
|
|
||||||
|
need(){ command -v "$1" >/dev/null 2>&1 || die "missing dependency: $1"; }
|
||||||
|
|
||||||
|
install_s3sync_latest() {
|
||||||
|
need curl
|
||||||
|
GHAPI="https://api.github.com/repos/nidor1998/s3sync/releases/latest"
|
||||||
|
|
||||||
|
os="$(uname | tr '[:upper:]' '[:lower:]')"
|
||||||
|
arch_raw="$(uname -m)"
|
||||||
|
case "$arch_raw" in
|
||||||
|
x86_64|amd64) arch_tag="x86_64" ;;
|
||||||
|
aarch64|arm64) arch_tag="aarch64" ;;
|
||||||
|
*) die "unsupported arch: ${arch_raw}" ;;
|
||||||
|
esac
|
||||||
|
|
||||||
|
# Map OS → asset prefix
|
||||||
|
case "$os" in
|
||||||
|
linux) prefix="s3sync-linux-glibc2.28-${arch_tag}" ;;
|
||||||
|
darwin) prefix="s3sync-macos-${arch_tag}" ;;
|
||||||
|
msys*|mingw*|cygwin*|windows) prefix="s3sync-windows-${arch_tag}" ;;
|
||||||
|
*) die "unsupported OS: ${os}" ;;
|
||||||
|
esac
|
||||||
|
|
||||||
|
# Fetch latest release JSON (unauthenticated)
|
||||||
|
json="$(curl -fsSL "$GHAPI")" || die "failed to query GitHub API"
|
||||||
|
|
||||||
|
# Pick URLs for tarball and checksum
|
||||||
|
tar_url="$(printf '%s' "$json" | awk -F'"' '/browser_download_url/ {print $4}' | grep -F "${prefix}.tar.gz" | head -n1)"
|
||||||
|
sum_url="$(printf '%s' "$json" | awk -F'"' '/browser_download_url/ {print $4}' | grep -F "${prefix}.sha256" | head -n1)"
|
||||||
|
[[ -n "$tar_url" ]] || die "could not find asset for prefix: ${prefix}"
|
||||||
|
[[ -n "$sum_url" ]] || die "could not find checksum for prefix: ${prefix}"
|
||||||
|
|
||||||
|
mkdir -p "${HOME}/.local/bin"
|
||||||
|
tmpdir="$(mktemp -d)"; trap 'rm -rf "$tmpdir"' EXIT
|
||||||
|
tar_path="${tmpdir}/s3sync.tar.gz"
|
||||||
|
sum_path="${tmpdir}/s3sync.sha256"
|
||||||
|
|
||||||
|
log "Downloading: $tar_url"
|
||||||
|
curl -fL --retry 5 --retry-delay 1 -o "$tar_path" "$tar_url"
|
||||||
|
curl -fL --retry 5 --retry-delay 1 -o "$sum_path" "$sum_url"
|
||||||
|
|
||||||
|
# Verify checksum
|
||||||
|
want_sum="$(cut -d: -f2 <<<"$(sed -n 's/^sha256:\(.*\)$/\1/p' "$sum_path" | tr -d '[:space:]')" || true)"
|
||||||
|
[[ -n "$want_sum" ]] || want_sum="$(awk '{print $1}' "$sum_path" || true)"
|
||||||
|
[[ -n "$want_sum" ]] || die "could not parse checksum file"
|
||||||
|
got_sum="$(sha256sum "$tar_path" | awk '{print $1}')"
|
||||||
|
[[ "$want_sum" == "$got_sum" ]] || die "sha256 mismatch: want $want_sum got $got_sum"
|
||||||
|
|
||||||
|
# Extract and install
|
||||||
|
tar -xzf "$tar_path" -C "$tmpdir"
|
||||||
|
binpath="$(find "$tmpdir" -maxdepth 2 -type f -name 's3sync' | head -n1)"
|
||||||
|
[[ -x "$binpath" ]] || die "s3sync binary not found in archive"
|
||||||
|
chmod +x "$binpath"
|
||||||
|
mv -f "$binpath" "$S3SYNC"
|
||||||
|
log "s3sync installed at $S3SYNC"
|
||||||
|
}
|
||||||
|
|
||||||
|
# --- deps & install/update ---
|
||||||
|
need aws
|
||||||
|
install_s3sync_latest
|
||||||
|
[[ ":$PATH:" == *":$HOME/.local/bin:"* ]] || export PATH="$HOME/.local/bin:$PATH"
|
||||||
|
mkdir -p "$DEST"
|
||||||
|
|
||||||
|
# list prefixes
|
||||||
|
log "Listing top-level prefixes in s3://${BUCKET}/"
|
||||||
|
mapfile -t PREFIXES < <(
|
||||||
|
aws s3 ls "s3://${BUCKET}/" --region "$REGION" --request-payer requester \
|
||||||
|
| awk '/^ *PRE /{print $2}' | sed 's:/$::' | grep -E '^[0-9]+$' || true
|
||||||
|
)
|
||||||
|
((${#PREFIXES[@]})) || die "No prefixes found."
|
||||||
|
|
||||||
|
# sort numerically to make order predictable
|
||||||
|
IFS=$'\n' read -r -d '' -a PREFIXES < <(printf '%s\n' "${PREFIXES[@]}" | sort -n && printf '\0')
|
||||||
|
|
||||||
|
# compute the effective start prefix:
|
||||||
|
# - if START_AT is set, floor it to the containing chunk boundary
|
||||||
|
effective_start=""
|
||||||
|
if [[ -n "$START_AT" ]]; then
|
||||||
|
# numeric, base-10 safe
|
||||||
|
start_num=$((10#$START_AT))
|
||||||
|
chunk=$((10#$CHUNK_SIZE))
|
||||||
|
effective_start=$(( (start_num / chunk) * chunk ))
|
||||||
|
fi
|
||||||
|
|
||||||
|
# mark initial status using numeric comparisons (no ordering assumptions)
|
||||||
|
declare -A RESULTS
|
||||||
|
for p in "${PREFIXES[@]}"; do
|
||||||
|
if [[ -n "$effective_start" ]] && (( 10#$p < 10#$effective_start )); then
|
||||||
|
RESULTS["$p"]="-- SKIPPED"
|
||||||
|
else
|
||||||
|
RESULTS["$p"]="-- TODO"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
total_start=$(date +%s)
|
||||||
|
|
||||||
|
for p in "${PREFIXES[@]}"; do
|
||||||
|
if [[ "${RESULTS[$p]}" == "-- SKIPPED" ]]; then
|
||||||
|
continue
|
||||||
|
fi
|
||||||
|
src="s3://${BUCKET}/${p}/"
|
||||||
|
dst="${DEST}/${p}/"
|
||||||
|
mkdir -p "$dst"
|
||||||
|
|
||||||
|
log "START ${p}"
|
||||||
|
start=$(date +%s)
|
||||||
|
|
||||||
|
"$S3SYNC" \
|
||||||
|
--source-request-payer \
|
||||||
|
--source-region "$REGION" \
|
||||||
|
--worker-size "$WORKERS" \
|
||||||
|
--max-parallel-uploads "$WORKERS" \
|
||||||
|
"$src" "$dst"
|
||||||
|
|
||||||
|
end=$(date +%s)
|
||||||
|
mins=$(( (end - start + 59) / 60 ))
|
||||||
|
RESULTS["$p"]="$mins minutes"
|
||||||
|
|
||||||
|
# Print status table so far
|
||||||
|
echo "---- Status ----"
|
||||||
|
for k in "${PREFIXES[@]}"; do
|
||||||
|
echo "$k ${RESULTS[$k]}"
|
||||||
|
done
|
||||||
|
echo "----------------"
|
||||||
|
done
|
||||||
|
|
||||||
|
total_end=$(date +%s)
|
||||||
|
total_mins=$(( (total_end - total_start + 59) / 60 ))
|
||||||
|
|
||||||
|
echo "ALL DONE in $total_mins minutes."
|
||||||
|
|
||||||
Reference in New Issue
Block a user