7 Commits

Author SHA1 Message Date
860cdfaba1 Merge c7d1f61817 into 5baa5770ac 2025-08-29 12:55:26 +09:00
c7d1f61817 revert changes 2025-08-25 15:24:55 +05:30
1a50bdfe12 rm all changes in this file 2025-08-25 15:15:01 +05:30
e72d7df2eb based on the review 2025-08-25 15:04:15 +05:30
4e88d19747 fix the peer removal expect 2025-08-25 03:38:05 +05:30
edd5383e43 blog ingest 2025-08-25 00:59:26 +05:30
25738366e4 fix the unwrap 2025-08-25 00:56:29 +05:30
15 changed files with 115 additions and 687 deletions

16
Cargo.lock generated
View File

@ -6659,7 +6659,6 @@ dependencies = [
"clap",
"eyre",
"futures",
"futures-util",
"jsonrpsee",
"jsonrpsee-core",
"lz4_flex",
@ -6688,7 +6687,6 @@ dependencies = [
"reth-execution-types",
"reth-exex",
"reth-fs-util",
"reth-hlfs",
"reth-hyperliquid-types",
"reth-network",
"reth-network-api",
@ -7968,20 +7966,6 @@ dependencies = [
"thiserror 2.0.11",
]
[[package]]
name = "reth-hlfs"
version = "1.2.0"
dependencies = [
"bytes",
"parking_lot",
"rand 0.8.5",
"reth-tracing",
"tempfile",
"thiserror 2.0.11",
"tokio",
"tracing",
]
[[package]]
name = "reth-hyperliquid-types"
version = "1.2.0"

View File

@ -54,7 +54,6 @@ members = [
"crates/net/ecies/",
"crates/net/eth-wire-types",
"crates/net/eth-wire/",
"crates/net/hlfs/",
"crates/net/nat/",
"crates/net/network-api/",
"crates/net/network-types/",
@ -357,7 +356,6 @@ reth-exex = { path = "crates/exex/exex" }
reth-exex-test-utils = { path = "crates/exex/test-utils" }
reth-exex-types = { path = "crates/exex/types" }
reth-fs-util = { path = "crates/fs-util" }
reth-hlfs = { path = "crates/net/hlfs" }
reth-invalid-block-hooks = { path = "crates/engine/invalid-block-hooks" }
reth-ipc = { path = "crates/rpc/ipc" }
reth-libmdbx = { path = "crates/storage/libmdbx-rs" }

View File

@ -2,8 +2,6 @@
Hyperliquid archive node based on [reth](https://github.com/paradigmxyz/reth).
Got questions? Drop by the [Hyperliquid Discord](https://discord.gg/hyperliquid) #node-operators channel.
## ⚠️ IMPORTANT: System Transactions Appear as Pseudo Transactions
Deposit transactions from `0x222..22` to user addresses are intentionally recorded as pseudo transactions.

View File

@ -35,7 +35,6 @@ reth-cli-runner.workspace = true
reth-cli-commands.workspace = true
reth-cli-util.workspace = true
reth-consensus-common.workspace = true
reth-hlfs.workspace = true
reth-rpc-builder.workspace = true
reth-rpc.workspace = true
reth-rpc-types-compat.workspace = true
@ -82,9 +81,8 @@ tracing.workspace = true
serde_json.workspace = true
# async
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread", "net", "fs"] }
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
futures.workspace = true
futures-util.workspace = true
# time
time = { workspace = true }

View File

@ -1,11 +1,12 @@
use std::collections::BTreeMap;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::{
collections::BTreeMap,
io::{BufRead, BufReader},
path::{Path, PathBuf},
sync::Arc,
};
use alloy_consensus::{BlockBody, BlockHeader, Transaction};
use alloy_primitives::TxKind;
use alloy_primitives::{Address, PrimitiveSignature, B256, U256};
use alloy_primitives::{Address, PrimitiveSignature, TxKind, B256, U256};
use alloy_rpc_types::engine::{
ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
};
@ -14,9 +15,7 @@ use reth::network::PeersHandleProvider;
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_hyperliquid_types::{PrecompileData, PrecompilesCache};
use reth_node_api::{Block, FullNodeComponents, PayloadTypes};
use reth_node_builder::EngineTypes;
use reth_node_builder::NodeTypesWithEngine;
use reth_node_builder::{rpc::RethRpcAddOns, FullNode};
use reth_node_builder::{rpc::RethRpcAddOns, EngineTypes, FullNode, NodeTypesWithEngine};
use reth_payload_builder::{EthBuiltPayload, EthPayloadBuilderAttributes, PayloadId};
use reth_primitives::{Transaction as TypedTransaction, TransactionSigned};
use reth_provider::{BlockHashReader, BlockReader, StageCheckpointReader};
@ -28,9 +27,10 @@ use time::{format_description, Duration, OffsetDateTime};
use tokio::sync::Mutex;
use tracing::{debug, info};
use crate::serialized::{BlockAndReceipts, EvmBlock};
use crate::share_blocks::ShareBlocks;
use crate::spot_meta::erc20_contract_to_spot_token;
use crate::{
serialized::{BlockAndReceipts, EvmBlock},
spot_meta::erc20_contract_to_spot_token,
};
/// Poll interval when tailing an *open* hourly file.
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
@ -42,7 +42,6 @@ pub(crate) struct BlockIngest {
pub local_ingest_dir: Option<PathBuf>,
pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block
pub precompiles_cache: PrecompilesCache,
pub hlfs: Option<ShareBlocks>,
}
#[derive(Deserialize)]
@ -53,17 +52,25 @@ struct ScanResult {
new_blocks: Vec<BlockAndReceipts>,
}
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult {
fn scan_hour_file(
path: &Path,
last_line: &mut usize,
start_height: u64,
) -> Result<ScanResult, Box<dyn std::error::Error + Send + Sync>> {
// info!(
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
// path, start_height, last_line
// );
let file = std::fs::File::open(path).expect("Failed to open hour file path");
let file = std::fs::File::open(path)
.map_err(|e| format!("Failed to open hour file path {}: {}", path.display(), e))?;
let reader = BufReader::new(file);
let mut new_blocks = Vec::<BlockAndReceipts>::new();
let mut last_height = start_height;
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
let lines: Vec<String> = reader
.lines()
.collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("Failed to read lines from file {}: {}", path.display(), e))?;
let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 };
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
@ -111,7 +118,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan
}
}
ScanResult { next_expected_height: last_height + 1, new_blocks }
Ok(ScanResult { next_expected_height: last_height + 1, new_blocks })
}
async fn submit_payload<Engine: PayloadTypes + EngineTypes>(
@ -130,7 +137,9 @@ async fn submit_payload<Engine: PayloadTypes + EngineTypes>(
engine_api_client,
envelope.execution_payload,
versioned_hashes,
payload_builder_attributes.parent_beacon_block_root.unwrap(),
payload_builder_attributes
.parent_beacon_block_root
.ok_or("Missing required parent_beacon_block_root")?,
)
.await?
};
@ -146,7 +155,9 @@ fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime {
}
fn date_from_datetime(dt: OffsetDateTime) -> String {
dt.format(&format_description::parse("[year][month][day]").unwrap()).unwrap()
// Format string is constant and guaranteed to be valid
dt.format(&format_description::parse("[year][month][day]").expect("Valid format string"))
.expect("DateTime formatting should always succeed with valid format")
}
impl BlockIngest {
@ -157,9 +168,9 @@ impl BlockIngest {
if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning locally synced block for @ Height [{height}]");
return Some(block);
} else {
self.try_collect_s3_block(height)
}
self.try_collect_s3_block(height)
}
pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> {
@ -168,7 +179,18 @@ impl BlockIngest {
let path = format!("{}/{f}/{s}/{height}.rmp.lz4", self.ingest_dir.to_string_lossy());
let file = std::fs::read(path).ok()?;
let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder).unwrap();
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)
.map_err(|e| {
tracing::error!("Failed to deserialize block data for height {}: {}", height, e);
e
})
.ok()?;
if blocks.is_empty() {
tracing::error!("Deserialized empty blocks vector for height {}", height);
return None;
}
info!("Returning s3 synced block for @ Height [{height}]");
Some(blocks[0].clone())
}
@ -188,11 +210,11 @@ impl BlockIngest {
let mut next_height = current_head;
let mut dt = datetime_from_timestamp(current_ts)
.replace_minute(0)
.unwrap()
.expect("Valid minute replacement")
.replace_second(0)
.unwrap()
.expect("Valid second replacement")
.replace_nanosecond(0)
.unwrap();
.expect("Valid nanosecond replacement");
let mut hour = dt.hour();
let mut day_str = date_from_datetime(dt);
@ -202,26 +224,37 @@ impl BlockIngest {
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
if hour_file.exists() {
let ScanResult { next_expected_height, new_blocks } =
scan_hour_file(&hour_file, &mut last_line, next_height);
if !new_blocks.is_empty() {
let mut u_cache = cache.lock().await;
let mut u_pre_cache = precompiles_cache.lock();
for blk in new_blocks {
let precompiles = PrecompileData {
precompiles: blk.read_precompile_calls.clone(),
highest_precompile_address: blk.highest_precompile_address,
};
let h = match &blk.block {
EvmBlock::Reth115(b) => {
let block_number = b.header().number() as u64;
block_number
let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height);
match scan_result {
Ok(ScanResult { next_expected_height, new_blocks }) => {
if !new_blocks.is_empty() {
let mut u_cache = cache.lock().await;
let mut u_pre_cache = precompiles_cache.lock();
for blk in new_blocks {
let precompiles = PrecompileData {
precompiles: blk.read_precompile_calls.clone(),
highest_precompile_address: blk.highest_precompile_address,
};
let h = match &blk.block {
EvmBlock::Reth115(b) => {
let block_number = b.header().number() as u64;
block_number
}
};
u_cache.insert(h, blk);
u_pre_cache.insert(h, precompiles);
}
};
u_cache.insert(h, blk);
u_pre_cache.insert(h, precompiles);
next_height = next_expected_height;
}
}
Err(e) => {
tracing::error!(
"Failed to scan hour file {}: {}",
hour_file.display(),
e
);
// Continue processing but skip this file
}
next_height = next_expected_height;
}
}
@ -271,8 +304,10 @@ impl BlockIngest {
let mut height = head + 1;
let mut previous_hash = provider.block_hash(head)?.unwrap_or(genesis_hash);
let mut previous_timestamp =
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis();
let mut previous_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("System time should be after UNIX epoch")
.as_millis();
let engine_api = node.auth_server_handle().http_client();
let mut evm_map = erc20_contract_to_spot_token(node.chain_spec().chain_id()).await?;
@ -280,8 +315,8 @@ impl BlockIngest {
const MINIMUM_TIMESTAMP: u64 = 1739849780;
let current_block_timestamp: u64 = provider
.block_by_number(head)
.expect("Failed to fetch current block in db")
.expect("Block does not exist")
.map_err(|e| format!("Database error fetching block {}: {}", head, e))?
.ok_or_else(|| format!("Block {} does not exist in database", head))?
.into_header()
.timestamp();
@ -378,11 +413,11 @@ impl BlockIngest {
let current_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.expect("System time should be after UNIX epoch")
.as_millis();
if height % 100 == 0 || current_timestamp - previous_timestamp > 100 {
EngineApiClient::<Engine>::fork_choice_updated_v2(
if let Err(e) = EngineApiClient::<Engine>::fork_choice_updated_v2(
&engine_api,
ForkchoiceState {
head_block_hash: block_hash,
@ -392,7 +427,11 @@ impl BlockIngest {
None,
)
.await
.unwrap();
{
tracing::error!("Failed to update fork choice for block {}: {}", height, e);
// Continue processing but log the failure - don't panic the entire
// blockchain
}
previous_timestamp = current_timestamp;
}
previous_hash = block_hash;

View File

@ -6,7 +6,6 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
mod block_ingest;
mod call_forwarder;
mod serialized;
mod share_blocks;
mod spot_meta;
mod tx_forwarder;
@ -19,7 +18,6 @@ use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_hyperliquid_types::PrecompilesCache;
use reth_node_ethereum::EthereumNode;
use share_blocks::ShareBlocksArgs;
use tokio::sync::Mutex;
use tracing::info;
use tx_forwarder::EthForwarderApiServer;
@ -42,10 +40,6 @@ struct HyperliquidExtArgs {
/// 3. filters out logs and transactions from subscription.
#[arg(long, default_value = "false")]
pub hl_node_compliant: bool,
/// Enable hlfs to backfill archive blocks
#[command(flatten)]
pub hlfs: ShareBlocksArgs,
}
fn main() {
@ -91,24 +85,8 @@ fn main() {
.launch()
.await?;
// start HLFS (serve + peer-backed backfill) using the node's network
let hlfs = if ext_args.hlfs.share_blocks {
let net = handle.node.network.clone();
Some(
crate::share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net)
.await?,
)
} else {
None
};
let ingest = BlockIngest {
ingest_dir,
local_ingest_dir,
local_blocks_cache,
precompiles_cache,
hlfs,
};
let ingest =
BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache };
ingest.run(handle.node).await.unwrap();
handle.node_exit_future.await
},

View File

@ -1,167 +0,0 @@
use clap::Args;
use reth_hlfs::{Backfiller, Client, PeerRecord, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK};
use reth_network_api::{events::NetworkEvent, FullNetwork};
use std::{
collections::HashSet,
net::{IpAddr, SocketAddr},
path::PathBuf,
sync::Arc,
};
use tokio::{
task::JoinHandle,
time::{sleep, timeout, Duration},
};
use tracing::{debug, info, warn};
// use futures_util::StreamExt;
use futures_util::stream::StreamExt;
#[derive(Args, Clone, Debug)]
pub(crate) struct ShareBlocksArgs {
#[arg(long, default_value_t = false)]
pub share_blocks: bool,
#[arg(long, default_value = "0.0.0.0")]
pub share_blocks_host: String,
#[arg(long, default_value_t = 9595)]
pub share_blocks_port: u16,
#[arg(long, default_value = "evm-blocks")]
pub archive_dir: PathBuf,
}
pub(crate) struct ShareBlocks {
_server: JoinHandle<()>,
_autodetect: JoinHandle<()>,
}
impl ShareBlocks {
pub(crate) async fn start_with_network<Net>(
args: &ShareBlocksArgs,
network: Net,
) -> eyre::Result<Self>
where
Net: FullNetwork + Clone + 'static,
{
let host: IpAddr = args
.share_blocks_host
.parse()
.map_err(|e| eyre::eyre!("invalid --share-blocks-host: {e}"))?;
let bind: SocketAddr = (host, args.share_blocks_port).into();
let srv = Server::new(bind, &args.archive_dir).with_limits(512, 50);
let _server = tokio::spawn(async move {
if let Err(e) = srv.run().await {
warn!(error=%e, "hlfs: server exited");
}
});
let _autodetect =
spawn_autodetect(network, host, args.share_blocks_port, args.archive_dir.clone());
info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)");
Ok(Self { _server, _autodetect })
}
}
fn spawn_autodetect<Net>(
network: Net,
self_ip: IpAddr,
hlfs_port: u16,
archive_dir: PathBuf,
) -> JoinHandle<()>
where
Net: FullNetwork + Clone + 'static,
{
let client = Client::new(&archive_dir, Vec::new()).with_timeout(Duration::from_secs(5));
let backfiller = Arc::new(tokio::sync::Mutex::new(Backfiller::new(client, &archive_dir)));
let good: Arc<tokio::sync::Mutex<HashSet<PeerRecord>>> =
Arc::new(tokio::sync::Mutex::new(HashSet::new()));
tokio::spawn({
warn!("hlfs: backfiller started");
let backfiller = backfiller.clone();
async move {
loop {
let mut bf = backfiller.lock().await;
if bf.client.max_block < bf.max_block_seen {
let block = bf.client.max_block + 1;
let new_height = bf.fetch_if_missing(block).await.expect("new height");
bf.client.max_block = new_height.unwrap();
}
sleep(Duration::from_millis(50)).await;
}
}
});
tokio::spawn({
let backfiller = backfiller.clone();
async move {
let mut events = network.event_listener();
loop {
let mut bf = backfiller.lock().await;
match events.next().await {
Some(NetworkEvent::ActivePeerSession { info, .. }) => {
let ip = info.remote_addr.ip();
if ip.is_unspecified() {
debug!(%ip, "hlfs: skip unspecified");
continue;
}
if ip == self_ip {
debug!(%ip, "hlfs: skip self");
continue;
}
let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port);
let max_block = probe_hlfs(addr).await;
if max_block != 0 {
let mut g = good.lock().await;
if g.insert(PeerRecord { addr, max_block }) {
let v: Vec<_> = g.iter().copied().collect();
bf.set_peers(v.clone());
info!(%addr, %max_block, total=v.len(), "hlfs: peer added");
}
} else {
debug!(%addr, "hlfs: peer has no HLFS");
}
}
Some(_) => {}
None => {
warn!("hlfs: network event stream ended");
break;
}
}
}
}
})
}
pub(crate) async fn probe_hlfs(addr: SocketAddr) -> u64 {
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
let fut = async {
let mut s = TcpStream::connect(addr).await.ok()?;
// send [OP][8 zero bytes]
let mut msg = [0u8; 9];
msg[0] = OP_REQ_MAX_BLOCK;
s.write_all(&msg).await.ok()?;
// read 1-byte opcode
let mut op = [0u8; 1];
s.read_exact(&mut op).await.ok()?;
if op[0] != OP_RES_MAX_BLOCK {
return None;
}
// read 8-byte little-endian block number
let mut blk = [0u8; 8];
s.read_exact(&mut blk).await.ok()?;
Some(u64::from_le_bytes(blk))
};
match timeout(Duration::from_secs(2), fut).await {
Ok(Some(n)) => n,
_ => 0,
}
}

View File

@ -1,25 +0,0 @@
# crates/net/hlfs/Cargo.toml
[package]
name = "reth-hlfs"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true
description = "Implementation of archive block downloader"
authors = ["@wwwehr"]
[dependencies]
tokio = { workspace = true, features = ["rt-multi-thread","macros","net","time","fs","sync","io-util"] }
bytes.workspace = true
parking_lot.workspace = true
thiserror.workspace = true
tracing.workspace = true
reth-tracing.workspace = true
[dev-dependencies]
rand.workspace = true
tempfile.workspace = true

View File

@ -1,374 +0,0 @@
//! HLFS TCP micro-protocol for historical backfill (single-block, RR per block).
use bytes::{Buf, BufMut, Bytes, BytesMut};
use parking_lot::Mutex;
use reth_tracing::tracing::{debug, info, trace, warn};
use std::{
fs,
hash::{Hash, Hasher},
io,
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
time::Duration,
};
use thiserror::Error;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
time::timeout,
};
type Result<T, E = HlfsError> = std::result::Result<T, E>;
pub const OP_REQ_BLOCK: u8 = 0x01;
pub const OP_RES_BLOCK: u8 = 0x02;
pub const OP_REQ_MAX_BLOCK: u8 = 0x03;
pub const OP_RES_MAX_BLOCK: u8 = 0x04;
pub const OP_ERR_TOO_BUSY: u8 = 0x05;
pub const OP_ERR_NOT_FOUND: u8 = 0x06;
#[derive(Error, Debug)]
pub enum HlfsError {
#[error("io: {0}")]
Io(#[from] io::Error),
#[error("proto")]
Proto,
#[error("no peers")]
NoPeers,
#[error("timeout")]
Timeout,
#[error("busy: retry_ms={0}")]
Busy(u32),
#[error("not found")]
NotFound,
}
#[inline]
fn put_u64(b: &mut BytesMut, v: u64) {
b.put_u64_le(v)
}
#[inline]
fn put_u32(b: &mut BytesMut, v: u32) {
b.put_u32_le(v)
}
async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> {
if let Some(parent) = Path::new(path).parent() {
fs::create_dir_all(parent)
} else {
Ok(())
}
}
/// Client: tries each peer once; rotates starting index per call
#[derive(Debug, Copy, Clone)]
pub struct PeerRecord {
pub addr: SocketAddr,
pub max_block: u64,
}
impl PartialEq for PeerRecord {
fn eq(&self, o: &Self) -> bool {
self.addr == o.addr
}
}
impl Eq for PeerRecord {}
impl Hash for PeerRecord {
fn hash<H: Hasher>(&self, s: &mut H) {
self.addr.hash(s);
}
}
#[derive(Clone)]
pub struct Client {
root: PathBuf,
pub peers: Arc<Mutex<Vec<PeerRecord>>>,
timeout: Duration,
pub max_block: u64,
}
impl Client {
pub fn new(root: impl Into<PathBuf>, peers: Vec<PeerRecord>) -> Self {
let root: PathBuf = root.into();
let n = find_max_number_file(&root).unwrap();
debug!(max_block = n, "hlfs: our archive");
Self {
root,
peers: Arc::new(Mutex::new(peers)),
timeout: Duration::from_secs(3),
max_block: n,
}
}
pub fn update_peers(&self, peers: Vec<PeerRecord>) {
*self.peers.lock() = peers;
}
pub fn with_timeout(mut self, d: Duration) -> Self {
self.timeout = d;
self
}
pub async fn wants_block(&self, number: u64, rr_index: usize) -> Result<Vec<u8>, HlfsError> {
let peers = self.peers.lock().clone();
debug!(peer_count = peers.len(), "hlfs: peers");
if peers.is_empty() {
return Err(HlfsError::NoPeers);
}
let mut all = (0..peers.len()).map(|i| (rr_index + i) % peers.len());
let mut last_busy: Option<u32> = None;
while let Some(i) = all.next() {
let addr = peers[i];
trace!(%addr.addr, "hlfs: dialing");
match timeout(self.timeout, TcpStream::connect(addr.addr)).await {
Err(_) => continue,
Ok(Err(_)) => continue,
Ok(Ok(mut sock)) => {
let mut req = BytesMut::with_capacity(1 + 8);
req.put_u8(OP_REQ_BLOCK);
put_u64(&mut req, number);
if let Err(e) = sock.write_all(&req).await {
debug!(%addr.addr, "hlfs: write err: {e}");
continue;
}
let mut op = [0u8; 1];
if let Err(e) = timeout(self.timeout, sock.read_exact(&mut op)).await {
debug!(%addr.addr, "hlfs: read op timeout {e:?}");
continue;
}
let op = op[0];
match op {
OP_RES_BLOCK => {
// DATA
let mut len = [0u8; 4];
sock.read_exact(&mut len).await?;
let len = u32::from_le_bytes(len) as usize;
let mut buf = vec![0u8; len];
sock.read_exact(&mut buf).await?;
return Ok(buf);
}
OP_ERR_TOO_BUSY => {
let mut ms = [0u8; 4];
sock.read_exact(&mut ms).await?;
last_busy = Some(u32::from_le_bytes(ms));
continue;
}
OP_ERR_NOT_FOUND => {
return Err(HlfsError::NotFound);
}
_ => {
continue;
}
}
}
}
}
if let Some(ms) = last_busy {
return Err(HlfsError::Busy(ms));
}
Err(HlfsError::NotFound)
}
}
fn find_max_number_file(root: &Path) -> Result<u64> {
fn parse_num(name: &str) -> Option<u64> {
name.strip_suffix(".rmp.lz4")?.parse::<u64>().ok()
}
fn walk(dir: &Path, best: &mut Option<u64>) -> io::Result<()> {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
let ft = entry.file_type()?;
if ft.is_dir() {
walk(&path, best)?;
} else if ft.is_file() {
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
if let Some(n) = parse_num(name) {
if best.map_or(true, |b| n > b) {
*best = Some(n);
}
}
}
}
}
Ok(())
}
let mut best = Some(0);
let top: PathBuf = fs::read_dir(root)?
.filter_map(|e| e.ok())
.filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
.filter_map(|e| {
let name = e.file_name();
let s = name.to_str()?;
let n: u64 = s.parse().ok()?;
Some((n, e.path()))
})
.max_by_key(|(n, _)| *n)
.map(|(_, p)| p)
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no numeric top-level dirs"))?;
walk(&top, &mut best)?;
Ok(best.expect("cannot find block files"))
}
/// Server: serves `{root}/{number}.rlp`.
pub struct Server {
bind: SocketAddr,
root: PathBuf,
max_conns: usize,
inflight: Arc<Mutex<usize>>,
busy_retry_ms: u32,
max_block: u64,
}
impl Server {
pub fn new(bind: SocketAddr, root: impl Into<PathBuf>) -> Self {
let root: PathBuf = root.into();
let n = find_max_number_file(&root).unwrap();
Self {
bind,
root,
max_conns: 512,
inflight: Arc::new(Mutex::new(0)),
busy_retry_ms: 100,
max_block: n,
}
}
pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self {
self.max_conns = max_conns;
self.busy_retry_ms = busy_retry_ms;
self
}
pub async fn run(self) -> Result<(), HlfsError> {
fs::create_dir_all(&self.root).ok();
info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening");
let lst = TcpListener::bind(self.bind).await?;
loop {
let (mut sock, addr) = lst.accept().await?;
if *self.inflight.lock() >= self.max_conns {
let mut b = BytesMut::with_capacity(5);
b.put_u8(OP_ERR_TOO_BUSY);
put_u32(&mut b, self.busy_retry_ms);
let _ = sock.write_all(&b).await;
continue;
}
*self.inflight.lock() += 1;
let root = self.root.clone();
let inflight = self.inflight.clone();
let busy = self.busy_retry_ms;
tokio::spawn(async move {
let _ = handle_conn(&mut sock, &root, busy, addr, self.max_block).await;
*inflight.lock() -= 1;
});
}
}
}
async fn handle_conn(
sock: &mut TcpStream,
root: &Path,
busy_ms: u32,
addr: SocketAddr,
max_block: u64,
) -> Result<(), HlfsError> {
let mut op = [0u8; 1];
sock.read_exact(&mut op).await?;
if op[0] != OP_REQ_BLOCK && op[0] != OP_REQ_MAX_BLOCK {
warn!(%addr, "hlfs: bad op");
return Err(HlfsError::Proto);
}
if op[0] == OP_REQ_MAX_BLOCK {
let mut b = BytesMut::with_capacity(1 + 8);
b.put_u8(OP_RES_MAX_BLOCK);
put_u64(&mut b, max_block);
let _ = sock.write_all(&b).await;
return Ok(());
}
let mut num = [0u8; 8];
sock.read_exact(&mut num).await?;
let number = u64::from_le_bytes(num);
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
let f = (n / 1_000_000) * 1_000_000;
let s = (n / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", root.to_string_lossy());
trace!(%addr, number, %path, "hlfs: req");
if let Err(e) = ensure_parent_dirs(&path).await {
warn!(%addr, %path, "hlfs: mkdirs failed: {e}");
}
match fs::read(&path) {
Ok(data) => {
let mut b = BytesMut::with_capacity(1 + 4 + data.len());
b.put_u8(OP_RES_BLOCK);
put_u32(&mut b, data.len() as u32);
b.extend_from_slice(&data);
let _ = sock.write_all(&b).await;
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
let mut b = BytesMut::with_capacity(1);
b.put_u8(OP_ERR_NOT_FOUND);
let _ = sock.write_all(&b).await;
}
Err(e) => {
warn!(%addr, %path, "hlfs: read error: {e}");
let _ = sock.shutdown().await;
}
}
Ok(())
}
/// Backfiller: ask client per missing block; rotate peers every block.
#[derive(Clone)]
pub struct Backfiller {
pub client: Client,
root: PathBuf,
pub max_block_seen: u64,
}
impl Backfiller {
pub fn new(client: Client, root: impl Into<PathBuf>) -> Self {
Self { client, root: root.into(), max_block_seen: 0 }
}
pub fn set_peers(&mut self, peers: Vec<PeerRecord>) {
self.client.update_peers(peers);
let _peers = self.client.peers.lock().clone();
for p in _peers {
if p.max_block > self.max_block_seen {
self.max_block_seen = p.max_block
}
}
}
pub async fn fetch_if_missing(&mut self, number: u64) -> Result<Option<u64>, HlfsError> {
let rr_index = number as usize;
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
let f = (n / 1_000_000) * 1_000_000;
let s = (n / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy());
if Path::new(&path).exists() {
debug!(block = number, "hlfs: already have");
return Ok(None);
}
match self.client.wants_block(number, rr_index).await {
Err(HlfsError::NotFound) => Ok(None),
Err(HlfsError::Busy(ms)) => {
tokio::time::sleep(Duration::from_millis(ms as u64)).await;
Ok(None)
}
Err(e) => Err(e),
Ok(data) => {
if let Err(e) = ensure_parent_dirs(&path).await {
warn!(%path, "hlfs: mkdirs failed: {e}");
}
if let Err(e) = fs::write(&path, &data) {
warn!(%path, "hlfs: write failed: {e}");
return Ok(None);
}
debug!(block = number, "hlfs: got block");
Ok(Some(number))
}
}
}
}

View File

@ -149,7 +149,7 @@ impl<N: NetworkPrimitives> NetworkHandle<N> {
pub async fn transactions_handle(&self) -> Option<TransactionsHandle<N>> {
let (tx, rx) = oneshot::channel();
let _ = self.manager().send(NetworkHandleMessage::GetTransactionsHandle(tx));
rx.await.unwrap()
rx.await.ok().flatten()
}
/// Send message to gracefully shutdown node.
@ -266,7 +266,8 @@ impl<N: NetworkPrimitives> PeersInfo for NetworkHandle<N> {
builder.udp6(local_node_record.udp_port);
builder.tcp6(local_node_record.tcp_port);
}
builder.build(&self.inner.secret_key).expect("valid enr")
builder.build(&self.inner.secret_key)
.expect("ENR builder should always succeed with valid IP and ports")
}
}

View File

@ -647,8 +647,11 @@ impl PeersManager {
// remove peer if it has been marked for removal
if remove_peer {
let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
if let Some((peer_id, _)) = self.peers.remove_entry(peer_id) {
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
} else {
tracing::warn!(target: "net::peers", "Attempted to remove non-existent peer: {:?}", peer_id);
}
} else if let Some(backoff_until) = backoff_until {
// otherwise, backoff the peer if marked as such
self.backoff_peer_until(*peer_id, backoff_until);

View File

@ -391,7 +391,7 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
};
self.terminate_message = Some((self.to_session_manager.inner().clone(), msg));
self.poll_terminate_message(cx).expect("message is set")
self.poll_terminate_message(cx).unwrap_or(Poll::Ready(()))
}
/// Report back that this session has been closed due to an error
@ -402,7 +402,7 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
error,
};
self.terminate_message = Some((self.to_session_manager.inner().clone(), msg));
self.poll_terminate_message(cx).expect("message is set")
self.poll_terminate_message(cx).unwrap_or(Poll::Ready(()))
}
/// Starts the disconnect process

View File

@ -3202,4 +3202,4 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
fn prune_modes_ref(&self) -> &PruneModes {
self.prune_modes_ref()
}
}
}

View File

@ -171,4 +171,4 @@ fn range_size_hint(range: &impl RangeBounds<u64>) -> Option<usize> {
Bound::Unbounded => return None,
};
end.checked_sub(start).map(|x| x as _)
}
}

View File

@ -9,7 +9,6 @@ DEST="${HOME}/evm-blocks-testnet"
WORKERS=512
S3SYNC="${HOME}/.local/bin/s3sync"
START_AT="" # default: run all
CHUNK_SIZE=1000000 # each prefix represents this many blocks
# ----------------
# parse args
@ -87,6 +86,7 @@ install_s3sync_latest() {
log "s3sync installed at $S3SYNC"
}
# --- deps & install/update ---
need aws
install_s3sync_latest
@ -101,23 +101,18 @@ mapfile -t PREFIXES < <(
)
((${#PREFIXES[@]})) || die "No prefixes found."
# sort numerically to make order predictable
IFS=$'\n' read -r -d '' -a PREFIXES < <(printf '%s\n' "${PREFIXES[@]}" | sort -n && printf '\0')
# compute the effective start prefix:
# - if START_AT is set, floor it to the containing chunk boundary
effective_start=""
if [[ -n "$START_AT" ]]; then
# numeric, base-10 safe
start_num=$((10#$START_AT))
chunk=$((10#$CHUNK_SIZE))
effective_start=$(( (start_num / chunk) * chunk ))
fi
# mark initial status using numeric comparisons (no ordering assumptions)
# mark initial status
declare -A RESULTS
if [[ ! -n "$START_AT" ]]; then
skipping=0
else
skipping=1
fi
for p in "${PREFIXES[@]}"; do
if [[ -n "$effective_start" ]] && (( 10#$p < 10#$effective_start )); then
if [[ -n "$START_AT" && "$p" == "$START_AT" ]]; then
skipping=0
fi
if (( skipping )); then
RESULTS["$p"]="-- SKIPPED"
else
RESULTS["$p"]="-- TODO"