From 821846f671ef1f80861467625380eba020cc882a Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Mon, 11 Aug 2025 14:13:55 -0700 Subject: [PATCH 01/19] added alternative block share --- Cargo.lock | 16 ++ Cargo.toml | 2 + bin/reth/Cargo.toml | 4 +- bin/reth/src/main.rs | 14 ++ bin/reth/src/share_blocks.rs | 129 +++++++++++++ crates/net/hlfs/Cargo.toml | 25 +++ crates/net/hlfs/src/lib.rs | 343 +++++++++++++++++++++++++++++++++++ 7 files changed, 532 insertions(+), 1 deletion(-) create mode 100644 bin/reth/src/share_blocks.rs create mode 100644 crates/net/hlfs/Cargo.toml create mode 100644 crates/net/hlfs/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 6222c9589..4bdb36957 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6659,6 +6659,7 @@ dependencies = [ "clap", "eyre", "futures", + "futures-util", "jsonrpsee", "jsonrpsee-core", "lz4_flex", @@ -6687,6 +6688,7 @@ dependencies = [ "reth-execution-types", "reth-exex", "reth-fs-util", + "reth-hlfs", "reth-hyperliquid-types", "reth-network", "reth-network-api", @@ -7966,6 +7968,20 @@ dependencies = [ "thiserror 2.0.11", ] +[[package]] +name = "reth-hlfs" +version = "1.2.0" +dependencies = [ + "bytes", + "parking_lot", + "rand 0.8.5", + "reth-tracing", + "tempfile", + "thiserror 2.0.11", + "tokio", + "tracing", +] + [[package]] name = "reth-hyperliquid-types" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index cc7c842fc..5c9ea2d6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ members = [ "crates/net/ecies/", "crates/net/eth-wire-types", "crates/net/eth-wire/", + "crates/net/hlfs/", "crates/net/nat/", "crates/net/network-api/", "crates/net/network-types/", @@ -356,6 +357,7 @@ reth-exex = { path = "crates/exex/exex" } reth-exex-test-utils = { path = "crates/exex/test-utils" } reth-exex-types = { path = "crates/exex/types" } reth-fs-util = { path = "crates/fs-util" } +reth-hlfs = { path = "crates/net/hlfs" } reth-invalid-block-hooks = { path = "crates/engine/invalid-block-hooks" } reth-ipc = { path = "crates/rpc/ipc" } reth-libmdbx = { path = "crates/storage/libmdbx-rs" } diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 0457f4586..775db8f82 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -35,6 +35,7 @@ reth-cli-runner.workspace = true reth-cli-commands.workspace = true reth-cli-util.workspace = true reth-consensus-common.workspace = true +reth-hlfs.workspace = true reth-rpc-builder.workspace = true reth-rpc.workspace = true reth-rpc-types-compat.workspace = true @@ -81,8 +82,9 @@ tracing.workspace = true serde_json.workspace = true # async -tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] } +tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread", "net", "fs"] } futures.workspace = true +futures-util.workspace = true # time time = { workspace = true } diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index a19ff491a..d65506fe0 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -6,6 +6,7 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne mod block_ingest; mod call_forwarder; mod serialized; +mod share_blocks; mod spot_meta; mod tx_forwarder; @@ -18,6 +19,7 @@ use reth::cli::Cli; use reth_ethereum_cli::chainspec::EthereumChainSpecParser; use reth_hyperliquid_types::PrecompilesCache; use reth_node_ethereum::EthereumNode; +use share_blocks::ShareBlocksArgs; use tokio::sync::Mutex; use tracing::info; use tx_forwarder::EthForwarderApiServer; @@ -40,6 +42,10 @@ struct HyperliquidExtArgs { /// 3. filters out logs and transactions from subscription. #[arg(long, default_value = "false")] pub hl_node_compliant: bool, + + /// Enable hlfs to backfill archive blocks + #[command(flatten)] + pub hlfs: ShareBlocksArgs, } fn main() { @@ -85,6 +91,14 @@ fn main() { .launch() .await?; + // start HLFS (serve + peer-backed backfill) using the node's network + if ext_args.hlfs.share_blocks { + let net = handle.node.network.clone(); // returns a FullNetwork (NetworkHandle under the hood) + // keep handle alive for the whole process + let _hlfs = + share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net).await?; + } + let ingest = BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache }; ingest.run(handle.node).await.unwrap(); diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs new file mode 100644 index 000000000..28de1920f --- /dev/null +++ b/bin/reth/src/share_blocks.rs @@ -0,0 +1,129 @@ +use clap::Args; +use once_cell::sync::Lazy; +use reth_hlfs::{Backfiller, Client, Server}; +use reth_network_api::{events::NetworkEvent, FullNetwork}; +use std::{ + collections::HashSet, + net::{IpAddr, SocketAddr}, + path::PathBuf, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::{task::JoinHandle, time::timeout}; +use tracing::{debug, info, warn}; + +// use futures_util::StreamExt; +use futures_util::stream::StreamExt; + +static RR: Lazy = Lazy::new(|| AtomicUsize::new(0)); + +#[derive(Args, Clone, Debug)] +pub struct ShareBlocksArgs { + #[arg(long, default_value_t = false)] + pub share_blocks: bool, + #[arg(long, default_value = "0.0.0.0")] + pub share_blocks_host: String, + #[arg(long, default_value_t = 9595)] + pub share_blocks_port: u16, + #[arg(long, default_value = "evm-blocks")] + pub archive_dir: PathBuf, + #[arg(long, default_value_t = 5_000)] + pub hist_threshold: u64, +} + +pub struct ShareBlocks { + pub backfiller: Backfiller, + _server: JoinHandle<()>, + _autodetect: JoinHandle<()>, +} + +impl ShareBlocks { + pub async fn start_with_network(args: &ShareBlocksArgs, network: Net) -> eyre::Result + where + Net: FullNetwork + Clone + 'static, + { + let host: IpAddr = args + .share_blocks_host + .parse() + .map_err(|e| eyre::eyre!("invalid --share-blocks-host: {e}"))?; + let bind: SocketAddr = (host, args.share_blocks_port).into(); + + let srv = Server::new(bind, &args.archive_dir).with_limits(512, 50); + let _server = tokio::spawn(async move { + if let Err(e) = srv.run().await { + warn!(error=%e, "hlfs: server exited"); + } + }); + + let client = Client::new(Vec::new()).with_timeout(Duration::from_secs(5)); + let bf = Backfiller::new(client, &args.archive_dir, args.hist_threshold); + + let _autodetect = spawn_autodetect(network, args.share_blocks_port, bf.clone()); + + info!(%bind, dir=%args.archive_dir.display(), hist_threshold=%args.hist_threshold, "hlfs: enabled (reth peers)"); + Ok(Self { backfiller: bf, _server, _autodetect }) + } + + pub async fn try_fetch_one(&self, block: u64, head: u64) -> eyre::Result> { + let rr = RR.fetch_add(1, Ordering::Relaxed); + self.backfiller.fetch_if_missing(block, head, rr).await.map_err(|e| eyre::eyre!(e)) + // <- fix: HlfsError -> eyre::Report + } +} + +fn spawn_autodetect(network: Net, hlfs_port: u16, backfiller: Backfiller) -> JoinHandle<()> +where + Net: FullNetwork + Clone + 'static, +{ + let good: Arc>> = + Arc::new(tokio::sync::Mutex::new(HashSet::new())); + + tokio::spawn(async move { + let mut events = network.event_listener(); + loop { + match events.next().await { + Some(NetworkEvent::ActivePeerSession { info, .. }) => { + let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); + if probe_hlfs(addr).await { + let mut g = good.lock().await; + if g.insert(addr) { + let v: Vec<_> = g.iter().copied().collect(); + backfiller.set_peers(v.clone()); + info!(%addr, total=v.len(), "hlfs: peer added"); + } + } else { + debug!(%addr, "hlfs: peer has no HLFS"); + } + } + Some(_) => {} + None => { + warn!("hlfs: network event stream ended"); + break; + } + } + } + }) +} + +async fn probe_hlfs(addr: SocketAddr) -> bool { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpStream; + + let res = timeout(Duration::from_secs(2), async { + if let Ok(mut s) = TcpStream::connect(addr).await { + let mut msg = [0u8; 9]; + msg[0] = 0x01; + let _ = s.write_all(&msg).await; + let mut op = [0u8; 1]; + if s.read_exact(&mut op).await.is_ok() { + return matches!(op[0], 0x02 | 0x03 | 0x04); + } + } + false + }) + .await; + matches!(res, Ok(true)) +} diff --git a/crates/net/hlfs/Cargo.toml b/crates/net/hlfs/Cargo.toml new file mode 100644 index 000000000..37b803d4b --- /dev/null +++ b/crates/net/hlfs/Cargo.toml @@ -0,0 +1,25 @@ +# crates/net/hlfs/Cargo.toml +[package] +name = "reth-hlfs" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = "Implementation of archive block downloader" +authors = ["@wwwehr"] + + +[dependencies] +tokio = { workspace = true, features = ["rt-multi-thread","macros","net","time","fs","sync","io-util"] } +bytes.workspace = true +parking_lot.workspace = true +thiserror.workspace = true +tracing.workspace = true +reth-tracing.workspace = true + +[dev-dependencies] +rand.workspace = true +tempfile.workspace = true + diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs new file mode 100644 index 000000000..f67fbee74 --- /dev/null +++ b/crates/net/hlfs/src/lib.rs @@ -0,0 +1,343 @@ +//! HLFS TCP micro-protocol for historical backfill (single-block, RR per block). + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use parking_lot::Mutex; +use reth_tracing::tracing::trace; +use std::{ + io, + net::SocketAddr, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; +use thiserror::Error; +use tokio::{ + fs, + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, + time::{sleep, timeout}, +}; +use tracing::{debug, info, warn}; + +#[derive(Debug, Error)] +pub enum HlfsError { + #[error("io: {0}")] + Io(#[from] io::Error), + #[error("timeout")] + Timeout, + #[error("not found")] + NotFound, + #[error("busy {0:?}")] + Busy(Duration), + #[error("protocol")] + Proto, +} + +#[inline] +fn put_u64(b: &mut BytesMut, v: u64) { + b.put_u64_le(v) +} +#[inline] +fn put_u32(b: &mut BytesMut, v: u32) { + b.put_u32_le(v) +} + +/// Client: tries each peer once; rotates starting index per call. +#[derive(Clone)] +pub struct Client { + peers: Arc>>, + timeout: Duration, + backoff_ms: u32, +} +impl Client { + pub fn new(peers: Vec) -> Self { + Self { peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(5), backoff_ms: 50 } + } + pub fn update_peers(&self, peers: Vec) { + *self.peers.lock() = peers; + } + pub fn with_timeout(mut self, d: Duration) -> Self { + self.timeout = d; + self + } + pub async fn get_block(&self, number: u64, rr_index: usize) -> Result, HlfsError> { + let peers = self.peers.lock().clone(); + if peers.is_empty() { + debug!(block = number, "hlfs: no peers"); + return Err(HlfsError::Timeout); + } + for t in 0..peers.len() { + let i = (rr_index + t) % peers.len(); + let addr = peers[i]; + trace!(block=number, %addr, "hlfs: try"); + match timeout(self.timeout, fetch_once(addr, number)).await { + Err(_) => { + debug!(block=number, %addr, "hlfs: timeout"); + continue; + } + Ok(Err(HlfsError::Busy(d))) => { + trace!(block=number, %addr, delay_ms=?d, "hlfs: busy"); + sleep(d.min(Duration::from_millis(self.backoff_ms as u64))).await; + continue; + } + Ok(Err(HlfsError::NotFound)) => { + trace!(block=number, %addr, "hlfs: not found"); + continue; + } + Ok(Err(e)) => { + debug!(block=number, %addr, error=%e, "hlfs: error"); + continue; + } + Ok(Ok(bytes)) => { + info!(block=number, %addr, bytes=bytes.len(), "hlfs: fetched"); + return Ok(bytes); + } + } + } + Err(HlfsError::Timeout) + } +} +async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> { + let mut s = TcpStream::connect(addr).await?; + let mut buf = BytesMut::with_capacity(9); + buf.put_u8(0x01); + put_u64(&mut buf, number); + s.write_all(&buf).await?; + let mut op = [0u8; 1]; + s.read_exact(&mut op).await?; + match op[0] { + 0x02 => { + let mut meta = [0u8; 12]; + s.read_exact(&mut meta).await?; + let mut m = Bytes::from(meta.to_vec()); + let _n = m.get_u64_le(); + let len = m.get_u32_le() as usize; + let mut data = vec![0u8; len]; + s.read_exact(&mut data).await?; + Ok(data) + } + 0x03 => { + let mut _n = [0u8; 8]; + let _ = s.read_exact(&mut _n).await; + Err(HlfsError::NotFound) + } + 0x04 => { + let mut d = [0u8; 4]; + s.read_exact(&mut d).await?; + Err(HlfsError::Busy(Duration::from_millis(u32::from_le_bytes(d) as u64))) + } + _ => Err(HlfsError::Proto), + } +} + +/// Server: serves `{root}/{number}.rlp`. +pub struct Server { + bind: SocketAddr, + root: PathBuf, + max_conns: usize, + inflight: Arc>, + busy_retry_ms: u32, +} +impl Server { + pub fn new(bind: SocketAddr, root: impl Into) -> Self { + Self { + bind, + root: root.into(), + max_conns: 512, + inflight: Arc::new(Mutex::new(0)), + busy_retry_ms: 100, + } + } + pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self { + self.max_conns = max_conns; + self.busy_retry_ms = busy_retry_ms; + self + } + pub async fn run(self) -> Result<(), HlfsError> { + fs::create_dir_all(&self.root).await.ok(); + info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening"); + let lst = TcpListener::bind(self.bind).await?; + loop { + let (mut sock, addr) = lst.accept().await?; + if *self.inflight.lock() >= self.max_conns { + let mut b = BytesMut::with_capacity(5); + b.put_u8(0x04); + put_u32(&mut b, self.busy_retry_ms); + let _ = sock.write_all(&b).await; + continue; + } + *self.inflight.lock() += 1; + let root = self.root.clone(); + let inflight = self.inflight.clone(); + let busy = self.busy_retry_ms; + tokio::spawn(async move { + let _ = handle_conn(&mut sock, &root, busy, addr).await; + *inflight.lock() -= 1; + }); + } + } +} +async fn handle_conn( + sock: &mut TcpStream, + root: &Path, + busy_ms: u32, + addr: SocketAddr, +) -> Result<(), HlfsError> { + let mut op = [0u8; 1]; + sock.read_exact(&mut op).await?; + if op[0] != 0x01 { + warn!(%addr, "hlfs: bad op"); + return Err(HlfsError::Proto); + } + let mut nb = [0u8; 8]; + sock.read_exact(&mut nb).await?; + let number = u64::from_le_bytes(nb); + let path = root.join(format!("{number}.rlp")); + match fs::read(&path).await { + Ok(data) => { + let mut b = BytesMut::with_capacity(1 + 8 + 4 + data.len()); + b.put_u8(0x02); + put_u64(&mut b, number); + put_u32(&mut b, data.len() as u32); + b.extend_from_slice(&data); + sock.write_all(&b).await?; + Ok(()) + } + Err(e) if e.kind() == io::ErrorKind::NotFound => { + let mut b = [0u8; 9]; + b[0] = 0x03; + b[1..9].copy_from_slice(&number.to_le_bytes()); + sock.write_all(&b).await?; + Ok(()) + } + Err(_) => { + let mut b = BytesMut::with_capacity(5); + b.put_u8(0x04); + put_u32(&mut b, busy_ms); + let _ = sock.write_all(&b).await; + Err(HlfsError::Io(io::Error::new(io::ErrorKind::Other, "fs error"))) + } + } +} + +/// Backfiller: ask client per missing block; rotate peers every block. +#[derive(Clone)] +pub struct Backfiller { + client: Client, + root: PathBuf, + hist_threshold: u64, +} +impl Backfiller { + pub fn new(client: Client, root: impl Into, hist_threshold: u64) -> Self { + Self { client, root: root.into(), hist_threshold } + } + pub fn set_peers(&self, peers: Vec) { + self.client.update_peers(peers); + } + pub async fn fetch_if_missing( + &self, + number: u64, + head: u64, + rr_index: usize, + ) -> Result, HlfsError> { + if number + self.hist_threshold > head { + return Ok(None); + } + let path = self.root.join(format!("{number}.rlp")); + if fs::try_exists(&path).await? { + return Ok(None); + } + match self.client.get_block(number, rr_index).await { + Ok(bytes) => { + let tmp = self.root.join(format!("{number}.rlp.part")); + fs::write(&tmp, &bytes).await?; + fs::rename(&tmp, &path).await?; + info!(block=number, bytes=bytes.len(), path=%path.display(), "hlfs: wrote"); + Ok(Some(bytes.len())) + } + Err(e) => { + debug!(block=number, error=%e, "hlfs: fetch failed"); + Err(e) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::{rngs::StdRng, Rng, SeedableRng}; + fn sample(n: u64) -> Vec { + vec![((n as usize) % 251) as u8; 3072] + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn serve_and_fetch_rr() { + reth_tracing::init_test_tracing(); + let dir = tempfile::tempdir().unwrap(); + for n in 0..100u64 { + fs::write(dir.path().join(format!("{n}.rlp")), sample(n)).await.unwrap(); + } + let s1 = Server::new("127.0.0.1:9597".parse().unwrap(), dir.path()).with_limits(64, 20); + let s2 = Server::new("127.0.0.1:9598".parse().unwrap(), dir.path()).with_limits(64, 20); + tokio::spawn(async move { + let _ = s1.run().await; + }); + tokio::spawn(async move { + let _ = s2.run().await; + }); + tokio::time::sleep(Duration::from_millis(50)).await; + + let client = + Client::new(vec!["127.0.0.1:9597".parse().unwrap(), "127.0.0.1:9598".parse().unwrap()]) + .with_timeout(Duration::from_secs(1)); + let a = client.get_block(10, 0).await.unwrap(); + let b = client.get_block(11, 1).await.unwrap(); + assert_eq!(a.len(), 3072); + assert_eq!(b.len(), 3072); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn backfill_only_when_older_than_threshold() { + reth_tracing::init_test_tracing(); + let src = tempfile::tempdir().unwrap(); + let dst = tempfile::tempdir().unwrap(); + fs::write(src.path().join("5.rlp"), sample(5)).await.unwrap(); + let srv = Server::new("127.0.0.1:9599".parse().unwrap(), src.path()); + tokio::spawn(async move { + let _ = srv.run().await; + }); + tokio::time::sleep(Duration::from_millis(50)).await; + + let bf = Backfiller::new( + Client::new(vec!["127.0.0.1:9599".parse().unwrap()]), + dst.path(), + 5_000, + ); + let got = bf.fetch_if_missing(5, 10_000, 0).await.unwrap(); + assert_eq!(got, Some(3072)); + let skip = bf.fetch_if_missing(9_999, 10_000, 1).await.unwrap(); + assert_eq!(skip, None); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn busy_and_notfound_rotate() { + reth_tracing::init_test_tracing(); + let dir = tempfile::tempdir().unwrap(); + fs::write(dir.path().join("7.rlp"), sample(7)).await.unwrap(); + let s_busy = Server::new("127.0.0.1:9601".parse().unwrap(), dir.path()).with_limits(0, 10); + let s_ok = Server::new("127.0.0.1:9602".parse().unwrap(), dir.path()); + tokio::spawn(async move { + let _ = s_busy.run().await; + }); + tokio::spawn(async move { + let _ = s_ok.run().await; + }); + tokio::time::sleep(Duration::from_millis(50)).await; + + let c = + Client::new(vec!["127.0.0.1:9601".parse().unwrap(), "127.0.0.1:9602".parse().unwrap()]); + let b = c.get_block(7, 0).await.unwrap(); + assert_eq!(b.len(), 3072); + } +} From 0d1c239a07bf4ab35a570b8c040a388cdd5080ac Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Mon, 11 Aug 2025 14:18:24 -0700 Subject: [PATCH 02/19] fixed lints --- bin/reth/src/share_blocks.rs | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index 28de1920f..49666ce6d 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -1,15 +1,11 @@ use clap::Args; -use once_cell::sync::Lazy; use reth_hlfs::{Backfiller, Client, Server}; use reth_network_api::{events::NetworkEvent, FullNetwork}; use std::{ collections::HashSet, net::{IpAddr, SocketAddr}, path::PathBuf, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, + sync::Arc, time::Duration, }; use tokio::{task::JoinHandle, time::timeout}; @@ -18,10 +14,8 @@ use tracing::{debug, info, warn}; // use futures_util::StreamExt; use futures_util::stream::StreamExt; -static RR: Lazy = Lazy::new(|| AtomicUsize::new(0)); - #[derive(Args, Clone, Debug)] -pub struct ShareBlocksArgs { +pub(crate) struct ShareBlocksArgs { #[arg(long, default_value_t = false)] pub share_blocks: bool, #[arg(long, default_value = "0.0.0.0")] @@ -34,14 +28,17 @@ pub struct ShareBlocksArgs { pub hist_threshold: u64, } -pub struct ShareBlocks { - pub backfiller: Backfiller, +pub(crate) struct ShareBlocks { + pub(crate) _backfiller: Backfiller, _server: JoinHandle<()>, _autodetect: JoinHandle<()>, } impl ShareBlocks { - pub async fn start_with_network(args: &ShareBlocksArgs, network: Net) -> eyre::Result + pub(crate) async fn start_with_network( + args: &ShareBlocksArgs, + network: Net, + ) -> eyre::Result where Net: FullNetwork + Clone + 'static, { @@ -64,12 +61,13 @@ impl ShareBlocks { let _autodetect = spawn_autodetect(network, args.share_blocks_port, bf.clone()); info!(%bind, dir=%args.archive_dir.display(), hist_threshold=%args.hist_threshold, "hlfs: enabled (reth peers)"); - Ok(Self { backfiller: bf, _server, _autodetect }) + Ok(Self { _backfiller: bf, _server, _autodetect }) } - pub async fn try_fetch_one(&self, block: u64, head: u64) -> eyre::Result> { - let rr = RR.fetch_add(1, Ordering::Relaxed); - self.backfiller.fetch_if_missing(block, head, rr).await.map_err(|e| eyre::eyre!(e)) + #[allow(dead_code)] + pub(crate) async fn try_fetch_one(&self, block: u64, head: u64) -> eyre::Result> { + let rr = (block as usize) ^ (head as usize); // deterministic round-robin seed + self._backfiller.fetch_if_missing(block, head, rr).await.map_err(|e| eyre::eyre!(e)) // <- fix: HlfsError -> eyre::Report } } From ec281f9cc79ba4f5054fa18201fab2bf7afaa191 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Mon, 11 Aug 2025 22:16:30 -0700 Subject: [PATCH 03/19] active block sharing --- bin/reth/src/block_ingest.rs | 17 +++++++++++++++-- bin/reth/src/main.rs | 24 ++++++++++++++++-------- crates/net/hlfs/src/lib.rs | 10 ++++++---- 3 files changed, 37 insertions(+), 14 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index 03e67ea0e..e5ce0f033 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -29,6 +29,7 @@ use tokio::sync::Mutex; use tracing::{debug, info}; use crate::serialized::{BlockAndReceipts, EvmBlock}; +use crate::share_blocks::ShareBlocks; use crate::spot_meta::erc20_contract_to_spot_token; /// Poll interval when tailing an *open* hourly file. @@ -41,6 +42,7 @@ pub(crate) struct BlockIngest { pub local_ingest_dir: Option, pub local_blocks_cache: Arc>>, // height → block pub precompiles_cache: PrecompilesCache, + pub hlfs: Option, } #[derive(Deserialize)] @@ -155,9 +157,20 @@ impl BlockIngest { if let Some(block) = self.try_collect_local_block(height).await { info!("Returning locally synced block for @ Height [{height}]"); return Some(block); - } else { - self.try_collect_s3_block(height) } + + if let Some(hlfs) = &self.hlfs { + let u_cache = self.local_blocks_cache.lock().await; + let head = u_cache.keys().next_back().copied().unwrap_or(0); + if hlfs.try_fetch_one(height, head).await.ok().flatten().is_some() { + if let Some(block) = self.try_collect_local_block(height).await { + info!("Returning HLFS-fetched block @[{height}]"); + return Some(block); + } + } + } + + self.try_collect_s3_block(height) } pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option { diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index d65506fe0..b53231b43 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -92,15 +92,23 @@ fn main() { .await?; // start HLFS (serve + peer-backed backfill) using the node's network - if ext_args.hlfs.share_blocks { - let net = handle.node.network.clone(); // returns a FullNetwork (NetworkHandle under the hood) - // keep handle alive for the whole process - let _hlfs = - share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net).await?; - } + let hlfs = if ext_args.hlfs.share_blocks { + let net = handle.node.network.clone(); + Some( + crate::share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net) + .await?, + ) + } else { + None + }; - let ingest = - BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache }; + let ingest = BlockIngest { + ingest_dir, + local_ingest_dir, + local_blocks_cache, + precompiles_cache, + hlfs, + }; ingest.run(handle.node).await.unwrap(); handle.node_exit_future.await }, diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index f67fbee74..bd6419a4c 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -240,19 +240,21 @@ impl Backfiller { head: u64, rr_index: usize, ) -> Result, HlfsError> { - if number + self.hist_threshold > head { + if head >= self.hist_threshold && number + self.hist_threshold > head { return Ok(None); } - let path = self.root.join(format!("{number}.rlp")); + let f = ((number - 1) / 1_000_000) * 1_000_000; + let s = ((number - 1) / 1_000) * 1_000; + let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy()); if fs::try_exists(&path).await? { return Ok(None); } match self.client.get_block(number, rr_index).await { Ok(bytes) => { - let tmp = self.root.join(format!("{number}.rlp.part")); + let tmp = format!("{}/{f}/{s}/{number}.rlp.lz4.part", self.root.to_string_lossy()); fs::write(&tmp, &bytes).await?; fs::rename(&tmp, &path).await?; - info!(block=number, bytes=bytes.len(), path=%path.display(), "hlfs: wrote"); + info!(block=number, bytes=bytes.len(), path=%path, "hlfs: wrote"); Ok(Some(bytes.len())) } Err(e) => { From e2045a195ca60b40f1daaf46aa5227d79b5164ec Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Wed, 13 Aug 2025 00:57:16 +0000 Subject: [PATCH 04/19] debug block ingest workflow --- bin/reth/src/block_ingest.rs | 7 +++---- crates/net/hlfs/src/lib.rs | 16 ++++++++++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index e5ce0f033..455b5eae6 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -150,7 +150,7 @@ fn date_from_datetime(dt: OffsetDateTime) -> String { } impl BlockIngest { - pub(crate) async fn collect_block(&self, height: u64) -> Option { + pub(crate) async fn collect_block(&self, head: u64, height: u64) -> Option { // info!("Attempting to collect block @ height [{height}]"); // Not a one liner (using .or) to include logs @@ -160,8 +160,7 @@ impl BlockIngest { } if let Some(hlfs) = &self.hlfs { - let u_cache = self.local_blocks_cache.lock().await; - let head = u_cache.keys().next_back().copied().unwrap_or(0); + //info!("!! HEIGHT [{height}] :: HEAD [{head}]"); if hlfs.try_fetch_one(height, head).await.ok().flatten().is_some() { if let Some(block) = self.try_collect_local_block(height).await { info!("Returning HLFS-fetched block @[{height}]"); @@ -299,7 +298,7 @@ impl BlockIngest { self.start_local_ingest_loop(height, current_block_timestamp).await; loop { - let Some(original_block) = self.collect_block(height).await else { + let Some(original_block) = self.collect_block(head, height).await else { tokio::time::sleep(std::time::Duration::from_millis(25)).await; continue; }; diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index bd6419a4c..bd940b9f5 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -192,9 +192,13 @@ async fn handle_conn( let mut nb = [0u8; 8]; sock.read_exact(&mut nb).await?; let number = u64::from_le_bytes(nb); - let path = root.join(format!("{number}.rlp")); + let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 + let f = (n / 1_000_000) * 1_000_000; + let s = (n / 1_000) * 1_000; + let path = format!("{}/{f}/{s}/{number}.rmp.lz4", root.to_string_lossy()); match fs::read(&path).await { Ok(data) => { + debug!("hlfs: found path [{path}]"); let mut b = BytesMut::with_capacity(1 + 8 + 4 + data.len()); b.put_u8(0x02); put_u64(&mut b, number); @@ -241,17 +245,21 @@ impl Backfiller { rr_index: usize, ) -> Result, HlfsError> { if head >= self.hist_threshold && number + self.hist_threshold > head { + //debug!(block=number, "hlfs: skip"); return Ok(None); } - let f = ((number - 1) / 1_000_000) * 1_000_000; - let s = ((number - 1) / 1_000) * 1_000; + let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 + let f = (n / 1_000_000) * 1_000_000; + let s = (n / 1_000) * 1_000; + let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy()); if fs::try_exists(&path).await? { return Ok(None); } + debug!(block = number, "hlfs: going to get_block from client"); match self.client.get_block(number, rr_index).await { Ok(bytes) => { - let tmp = format!("{}/{f}/{s}/{number}.rlp.lz4.part", self.root.to_string_lossy()); + let tmp = format!("{}/{f}/{s}/{number}.rmp.lz4.part", self.root.to_string_lossy()); fs::write(&tmp, &bytes).await?; fs::rename(&tmp, &path).await?; info!(block=number, bytes=bytes.len(), path=%path, "hlfs: wrote"); From ec417f9bf481c8ba026d969bdb17af68606a25c9 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 14 Aug 2025 04:59:16 +0000 Subject: [PATCH 05/19] debug --- bin/reth/src/share_blocks.rs | 15 +++++++++++++-- crates/net/hlfs/src/lib.rs | 23 +++++++++++++++++++++-- 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index 49666ce6d..ba7454922 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -58,7 +58,7 @@ impl ShareBlocks { let client = Client::new(Vec::new()).with_timeout(Duration::from_secs(5)); let bf = Backfiller::new(client, &args.archive_dir, args.hist_threshold); - let _autodetect = spawn_autodetect(network, args.share_blocks_port, bf.clone()); + let _autodetect = spawn_autodetect(network, host, args.share_blocks_port, bf.clone()); info!(%bind, dir=%args.archive_dir.display(), hist_threshold=%args.hist_threshold, "hlfs: enabled (reth peers)"); Ok(Self { _backfiller: bf, _server, _autodetect }) @@ -72,7 +72,12 @@ impl ShareBlocks { } } -fn spawn_autodetect(network: Net, hlfs_port: u16, backfiller: Backfiller) -> JoinHandle<()> +fn spawn_autodetect( + network: Net, + self_ip: IpAddr, + hlfs_port: u16, + backfiller: Backfiller, +) -> JoinHandle<()> where Net: FullNetwork + Clone + 'static, { @@ -84,6 +89,12 @@ where loop { match events.next().await { Some(NetworkEvent::ActivePeerSession { info, .. }) => { + let ip = info.remote_addr.ip(); + // skip unusable/self + if ip.is_unspecified() || ip == self_ip { + debug!(%ip, "hlfs: skip self/unspecified"); + continue; + } let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); if probe_hlfs(addr).await { let mut g = good.lock().await; diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index bd940b9f5..f0a8bace6 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -42,6 +42,14 @@ fn put_u32(b: &mut BytesMut, v: u32) { b.put_u32_le(v) } +async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { + if let Some(parent) = Path::new(path).parent() { + fs::create_dir_all(parent).await + } else { + Ok(()) + } +} + /// Client: tries each peer once; rotates starting index per call. #[derive(Clone)] pub struct Client { @@ -62,6 +70,7 @@ impl Client { } pub async fn get_block(&self, number: u64, rr_index: usize) -> Result, HlfsError> { let peers = self.peers.lock().clone(); + debug!(peer_count = peers.len(), "hlfs: peers"); if peers.is_empty() { debug!(block = number, "hlfs: no peers"); return Err(HlfsError::Timeout); @@ -69,10 +78,11 @@ impl Client { for t in 0..peers.len() { let i = (rr_index + t) % peers.len(); let addr = peers[i]; - trace!(block=number, %addr, "hlfs: try"); + debug!(block=number, %addr, "hlfs: try"); + let start = std::time::Instant::now(); match timeout(self.timeout, fetch_once(addr, number)).await { Err(_) => { - debug!(block=number, %addr, "hlfs: timeout"); + debug!(elapsed=?start.elapsed(), %addr, block=number, "hlfs: timeout"); continue; } Ok(Err(HlfsError::Busy(d))) => { @@ -98,6 +108,7 @@ impl Client { } } async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> { + debug!(%addr, "hlfs: connect"); let mut s = TcpStream::connect(addr).await?; let mut buf = BytesMut::with_capacity(9); buf.put_u8(0x01); @@ -255,12 +266,20 @@ impl Backfiller { let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy()); if fs::try_exists(&path).await? { return Ok(None); + } else { + ensure_parent_dirs(&path).await?; } + debug!(block = number, "hlfs: going to get_block from client"); match self.client.get_block(number, rr_index).await { Ok(bytes) => { + debug!(block = number, "hlfs: YAY! got block from client"); let tmp = format!("{}/{f}/{s}/{number}.rmp.lz4.part", self.root.to_string_lossy()); + ensure_parent_dirs(&tmp).await?; + + debug!(block = number, path=%tmp, "hlfs: writing file"); fs::write(&tmp, &bytes).await?; + debug!(block = number, from=%tmp, to=%path, "hlfs: moving file"); fs::rename(&tmp, &path).await?; info!(block=number, bytes=bytes.len(), path=%path, "hlfs: wrote"); Ok(Some(bytes.len())) From 2ff606c32c577415bc527040e6ca943df66552af Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 23:02:05 +0000 Subject: [PATCH 06/19] debugging host issue --- bin/reth/src/share_blocks.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index ba7454922..3b0a6b9fd 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -91,8 +91,12 @@ where Some(NetworkEvent::ActivePeerSession { info, .. }) => { let ip = info.remote_addr.ip(); // skip unusable/self - if ip.is_unspecified() || ip == self_ip { - debug!(%ip, "hlfs: skip self/unspecified"); + if ip.is_unspecified() { + debug!(%ip, "hlfs: skip unspecified"); + continue; + } + if ip == self_ip { + debug!(%ip, "hlfs: skip self"); continue; } let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); From 530447e637c83cc18c48dba7594834ca3c9a9680 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 17:20:20 -0700 Subject: [PATCH 07/19] debug --- crates/net/hlfs/src/lib.rs | 53 +++++++++++++++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index f0a8bace6..e5b318f42 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -31,6 +31,10 @@ pub enum HlfsError { Busy(Duration), #[error("protocol")] Proto, + #[error("no peers")] + NoPeers, + #[error("unknown")] + Unknown, } #[inline] @@ -72,9 +76,12 @@ impl Client { let peers = self.peers.lock().clone(); debug!(peer_count = peers.len(), "hlfs: peers"); if peers.is_empty() { - debug!(block = number, "hlfs: no peers"); - return Err(HlfsError::Timeout); + return Err(HlfsError::NoPeers); } + + let mut all_not_found = true; + let mut any_timeout = false; + for t in 0..peers.len() { let i = (rr_index + t) % peers.len(); let addr = peers[i]; @@ -83,19 +90,24 @@ impl Client { match timeout(self.timeout, fetch_once(addr, number)).await { Err(_) => { debug!(elapsed=?start.elapsed(), %addr, block=number, "hlfs: timeout"); + any_timeout = true; + all_not_found = false; continue; } Ok(Err(HlfsError::Busy(d))) => { trace!(block=number, %addr, delay_ms=?d, "hlfs: busy"); sleep(d.min(Duration::from_millis(self.backoff_ms as u64))).await; + all_not_found = false; continue; } Ok(Err(HlfsError::NotFound)) => { trace!(block=number, %addr, "hlfs: not found"); + // Keep all_not_found as true unless we see other errors continue; } Ok(Err(e)) => { debug!(block=number, %addr, error=%e, "hlfs: error"); + all_not_found = false; continue; } Ok(Ok(bytes)) => { @@ -104,18 +116,28 @@ impl Client { } } } - Err(HlfsError::Timeout) + + // Return the most specific error + if all_not_found { + Err(HlfsError::NotFound) + } else if any_timeout { + Err(HlfsError::Timeout) + } else { + Err(HlfsError::Unknown) // Fallback for other errors + } } } async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> { debug!(%addr, "hlfs: connect"); let mut s = TcpStream::connect(addr).await?; + debug!(%addr, "hlfs: CONNECTED"); let mut buf = BytesMut::with_capacity(9); buf.put_u8(0x01); put_u64(&mut buf, number); s.write_all(&buf).await?; let mut op = [0u8; 1]; s.read_exact(&mut op).await?; + debug!(code=op[0], "hlfs: opcode"); match op[0] { 0x02 => { let mut meta = [0u8; 12]; @@ -369,4 +391,29 @@ mod tests { let b = c.get_block(7, 0).await.unwrap(); assert_eq!(b.len(), 3072); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn all_peers_return_not_found() { + reth_tracing::init_test_tracing(); + let dir = tempfile::tempdir().unwrap(); + // Don't create the block file, so all servers will return NotFound + + let s1 = Server::new("127.0.0.1:9603".parse().unwrap(), dir.path()); + let s2 = Server::new("127.0.0.1:9604".parse().unwrap(), dir.path()); + tokio::spawn(async move { + let _ = s1.run().await; + }); + tokio::spawn(async move { + let _ = s2.run().await; + }); + tokio::time::sleep(Duration::from_millis(50)).await; + + let client = + Client::new(vec!["127.0.0.1:9603".parse().unwrap(), "127.0.0.1:9604".parse().unwrap()]) + .with_timeout(Duration::from_secs(1)); + + // Request a block that doesn't exist on any peer + let result = client.get_block(999, 0).await; + assert!(matches!(result, Err(HlfsError::NotFound))); + } } From 249e3194ddb4eb4ca9c5b9779b77bb56ffed3446 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 18:35:30 -0700 Subject: [PATCH 08/19] added new max block opcode --- crates/net/hlfs/src/lib.rs | 58 ++++++++++++++++++++++++++++++++------ 1 file changed, 50 insertions(+), 8 deletions(-) diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index e5b318f42..9a8c88024 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -4,7 +4,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use parking_lot::Mutex; use reth_tracing::tracing::trace; use std::{ - io, + fs, io, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, @@ -12,7 +12,6 @@ use std::{ }; use thiserror::Error; use tokio::{ - fs, io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, time::{sleep, timeout}, @@ -48,7 +47,7 @@ fn put_u32(b: &mut BytesMut, v: u32) { async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { if let Some(parent) = Path::new(path).parent() { - fs::create_dir_all(parent).await + fs::create_dir_all(parent) } else { Ok(()) } @@ -123,10 +122,11 @@ impl Client { } else if any_timeout { Err(HlfsError::Timeout) } else { - Err(HlfsError::Unknown) // Fallback for other errors + Err(HlfsError::Unknown) // Fallback for other errors } } } + async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> { debug!(%addr, "hlfs: connect"); let mut s = TcpStream::connect(addr).await?; @@ -137,7 +137,7 @@ async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> s.write_all(&buf).await?; let mut op = [0u8; 1]; s.read_exact(&mut op).await?; - debug!(code=op[0], "hlfs: opcode"); + debug!(code = op[0], "hlfs: opcode"); match op[0] { 0x02 => { let mut meta = [0u8; 12]; @@ -163,6 +163,36 @@ async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> } } +fn find_max_number_file(root: &Path) -> Result { + fn parse_num(name: &str) -> Option { + name.strip_suffix(".rmp.lz4")?.parse::().ok() + } + + fn walk(dir: &Path, best: &mut Option) -> io::Result<()> { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + let ft = entry.file_type()?; + if ft.is_dir() { + walk(&path, best)?; + } else if ft.is_file() { + if let Some(name) = path.file_name().and_then(|s| s.to_str()) { + if let Some(n) = parse_num(name) { + if best.map_or(true, |(m, _)| n > m) { + *best = Some((n, path.clone())); + } + } + } + } + } + Ok(()) + } + + let mut best = None; + walk(root, &mut best)?; + Ok(best.expect("cannot find block files")) +} + /// Server: serves `{root}/{number}.rlp`. pub struct Server { bind: SocketAddr, @@ -170,15 +200,19 @@ pub struct Server { max_conns: usize, inflight: Arc>, busy_retry_ms: u32, + max_block: u64, } + impl Server { pub fn new(bind: SocketAddr, root: impl Into) -> Self { + let n = find_max_number_file(&root.into()).unwrap(); Self { bind, root: root.into(), max_conns: 512, inflight: Arc::new(Mutex::new(0)), busy_retry_ms: 100, + max_block: n.unwrap(), } } pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self { @@ -187,7 +221,7 @@ impl Server { self } pub async fn run(self) -> Result<(), HlfsError> { - fs::create_dir_all(&self.root).await.ok(); + fs::create_dir_all(&self.root).ok(); info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening"); let lst = TcpListener::bind(self.bind).await?; loop { @@ -204,7 +238,7 @@ impl Server { let inflight = self.inflight.clone(); let busy = self.busy_retry_ms; tokio::spawn(async move { - let _ = handle_conn(&mut sock, &root, busy, addr).await; + let _ = handle_conn(&mut sock, &root, busy, addr, self.max_block).await; *inflight.lock() -= 1; }); } @@ -215,13 +249,21 @@ async fn handle_conn( root: &Path, busy_ms: u32, addr: SocketAddr, + max_block: u64, ) -> Result<(), HlfsError> { let mut op = [0u8; 1]; sock.read_exact(&mut op).await?; - if op[0] != 0x01 { + if op[0] != 0x01 || op[0] != 0x02 { warn!(%addr, "hlfs: bad op"); return Err(HlfsError::Proto); } + if op[0] == 0x02 { + let mut b = BytesMut::with_capacity(1 + 8 + 4); + b.put_u8(0x05); + put_u64(&mut b, max_block); + return Ok(()); + } + let mut nb = [0u8; 8]; sock.read_exact(&mut nb).await?; let number = u64::from_le_bytes(nb); From 022dfd15461791b3437dc8a5360f05c198bb4fd7 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 19:03:21 -0700 Subject: [PATCH 09/19] debug --- crates/net/hlfs/src/lib.rs | 93 +++++++++++++++++++++++++------------- 1 file changed, 62 insertions(+), 31 deletions(-) diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index 9a8c88024..914f3f019 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -4,7 +4,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use parking_lot::Mutex; use reth_tracing::tracing::trace; use std::{ - fs, io, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, @@ -12,10 +11,11 @@ use std::{ }; use thiserror::Error; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, + fs, io, net::{TcpListener, TcpStream}, time::{sleep, timeout}, }; +use tokio::fs::DirEntry; use tracing::{debug, info, warn}; #[derive(Debug, Error)] @@ -47,7 +47,7 @@ fn put_u32(b: &mut BytesMut, v: u32) { async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { if let Some(parent) = Path::new(path).parent() { - fs::create_dir_all(parent) + fs::create_dir_all(parent).await } else { Ok(()) } @@ -163,34 +163,61 @@ async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> } } -fn find_max_number_file(root: &Path) -> Result { - fn parse_num(name: &str) -> Option { - name.strip_suffix(".rmp.lz4")?.parse::().ok() +fn parse_block_file(name: &str) -> Option { + // expects ".rmp.lz4" + let (stem, ext) = name.rsplit_once('.')?; + if ext != "lz4" { + return None; } + if !stem.ends_with(".rmp") { + return None; + } + let stem = stem.strip_suffix(".rmp")?; + stem.parse::().ok() +} - fn walk(dir: &Path, best: &mut Option) -> io::Result<()> { - for entry in fs::read_dir(dir)? { - let entry = entry?; - let path = entry.path(); - let ft = entry.file_type()?; - if ft.is_dir() { - walk(&path, best)?; - } else if ft.is_file() { - if let Some(name) = path.file_name().and_then(|s| s.to_str()) { - if let Some(n) = parse_num(name) { - if best.map_or(true, |(m, _)| n > m) { - *best = Some((n, path.clone())); +// Asynchronously find the largest block file under the 2-level shard layout: +// {root}/{floor_to_million}/{floor_to_thousand}/{number}.rmp.lz4 +pub async fn find_max_block(root: &Path) -> io::Result { + let mut max_num: Option = None; + + let mut top = fs::read_dir(root).await?; + while let Some(million_dir) = top.next_entry().await? { + if !million_dir.file_type().await?.is_dir() { + continue; + } + // Fast reject: top-level dir must parse to u64 (but we still scan if not—optional) + if million_dir.file_name().to_string_lossy().parse::().is_err() { + continue; + } + + let mut mid = fs::read_dir(million_dir.path()).await?; + while let Some(thousand_dir) = mid.next_entry().await? { + if !thousand_dir.file_type().await?.is_dir() { + continue; + } + // Optional reject again for dir-name parse to u64 + if thousand_dir.file_name().to_string_lossy().parse::().is_err() { + continue; + } + + let mut leaf = fs::read_dir(thousand_dir.path()).await?; + while let Some(ent) = leaf.next_entry().await? { + if !ent.file_type().await?.is_file() { + continue; + } + if let Some(name) = ent.file_name().to_str() { + if let Some(n) = parse_block_file(name) { + if max_num.map_or(true, |m| n > m) { + max_num = Some(n); } } } } } - Ok(()) } - let mut best = None; - walk(root, &mut best)?; - Ok(best.expect("cannot find block files")) + max_num.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no block files found")) } /// Server: serves `{root}/{number}.rlp`. @@ -204,24 +231,28 @@ pub struct Server { } impl Server { - pub fn new(bind: SocketAddr, root: impl Into) -> Self { - let n = find_max_number_file(&root.into()).unwrap(); - Self { + pub async fn new(bind: SocketAddr, root: impl Into) -> Result { + let root = root.into(); + fs::create_dir_all(&root).await?; // async, no unwrap/ok() + let max_block = find_max_block(&root).await?; // async discovery + + Ok(Self { bind, - root: root.into(), + root, max_conns: 512, - inflight: Arc::new(Mutex::new(0)), + inflight: Arc::new(parking_lot::Mutex::new(0)), busy_retry_ms: 100, - max_block: n.unwrap(), - } + max_block, + }) } + pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self { self.max_conns = max_conns; self.busy_retry_ms = busy_retry_ms; self } pub async fn run(self) -> Result<(), HlfsError> { - fs::create_dir_all(&self.root).ok(); + fs::create_dir_all(&self.root).await.ok(); info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening"); let lst = TcpListener::bind(self.bind).await?; loop { @@ -253,7 +284,7 @@ async fn handle_conn( ) -> Result<(), HlfsError> { let mut op = [0u8; 1]; sock.read_exact(&mut op).await?; - if op[0] != 0x01 || op[0] != 0x02 { + if op[0] != 0x01 && op[0] != 0x02 { warn!(%addr, "hlfs: bad op"); return Err(HlfsError::Proto); } From 5f2955caa2e79fd7d97a4df0472997d53d2b3b0b Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 19:30:56 -0700 Subject: [PATCH 10/19] debug --- crates/net/hlfs/src/lib.rs | 445 ++++++++++++------------------------- 1 file changed, 139 insertions(+), 306 deletions(-) diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index 914f3f019..c8d195282 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -2,8 +2,9 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use parking_lot::Mutex; -use reth_tracing::tracing::trace; +use reth_tracing::tracing::{debug, info, trace, warn}; use std::{ + fs, io, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, @@ -11,29 +12,27 @@ use std::{ }; use thiserror::Error; use tokio::{ - fs, io, + io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, - time::{sleep, timeout}, + time::timeout, }; -use tokio::fs::DirEntry; -use tracing::{debug, info, warn}; -#[derive(Debug, Error)] +type Result = std::result::Result; + +#[derive(Error, Debug)] pub enum HlfsError { #[error("io: {0}")] Io(#[from] io::Error), - #[error("timeout")] - Timeout, - #[error("not found")] - NotFound, - #[error("busy {0:?}")] - Busy(Duration), - #[error("protocol")] + #[error("proto")] Proto, #[error("no peers")] NoPeers, - #[error("unknown")] - Unknown, + #[error("timeout")] + Timeout, + #[error("busy: retry_ms={0}")] + Busy(u32), + #[error("not found")] + NotFound, } #[inline] @@ -47,22 +46,21 @@ fn put_u32(b: &mut BytesMut, v: u32) { async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { if let Some(parent) = Path::new(path).parent() { - fs::create_dir_all(parent).await + fs::create_dir_all(parent) } else { Ok(()) } } -/// Client: tries each peer once; rotates starting index per call. +/// Client: tries each peer once; rotates starting index per call #[derive(Clone)] pub struct Client { peers: Arc>>, timeout: Duration, - backoff_ms: u32, } impl Client { pub fn new(peers: Vec) -> Self { - Self { peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(5), backoff_ms: 50 } + Self { peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(3) } } pub fn update_peers(&self, peers: Vec) { *self.peers.lock() = peers; @@ -78,146 +76,89 @@ impl Client { return Err(HlfsError::NoPeers); } - let mut all_not_found = true; - let mut any_timeout = false; - - for t in 0..peers.len() { - let i = (rr_index + t) % peers.len(); + let mut all = (0..peers.len()).map(|i| (rr_index + i) % peers.len()); + let mut last_busy: Option = None; + while let Some(i) = all.next() { let addr = peers[i]; - debug!(block=number, %addr, "hlfs: try"); - let start = std::time::Instant::now(); - match timeout(self.timeout, fetch_once(addr, number)).await { - Err(_) => { - debug!(elapsed=?start.elapsed(), %addr, block=number, "hlfs: timeout"); - any_timeout = true; - all_not_found = false; - continue; - } - Ok(Err(HlfsError::Busy(d))) => { - trace!(block=number, %addr, delay_ms=?d, "hlfs: busy"); - sleep(d.min(Duration::from_millis(self.backoff_ms as u64))).await; - all_not_found = false; - continue; - } - Ok(Err(HlfsError::NotFound)) => { - trace!(block=number, %addr, "hlfs: not found"); - // Keep all_not_found as true unless we see other errors - continue; - } - Ok(Err(e)) => { - debug!(block=number, %addr, error=%e, "hlfs: error"); - all_not_found = false; - continue; - } - Ok(Ok(bytes)) => { - info!(block=number, %addr, bytes=bytes.len(), "hlfs: fetched"); - return Ok(bytes); - } - } - } - - // Return the most specific error - if all_not_found { - Err(HlfsError::NotFound) - } else if any_timeout { - Err(HlfsError::Timeout) - } else { - Err(HlfsError::Unknown) // Fallback for other errors - } - } -} - -async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> { - debug!(%addr, "hlfs: connect"); - let mut s = TcpStream::connect(addr).await?; - debug!(%addr, "hlfs: CONNECTED"); - let mut buf = BytesMut::with_capacity(9); - buf.put_u8(0x01); - put_u64(&mut buf, number); - s.write_all(&buf).await?; - let mut op = [0u8; 1]; - s.read_exact(&mut op).await?; - debug!(code = op[0], "hlfs: opcode"); - match op[0] { - 0x02 => { - let mut meta = [0u8; 12]; - s.read_exact(&mut meta).await?; - let mut m = Bytes::from(meta.to_vec()); - let _n = m.get_u64_le(); - let len = m.get_u32_le() as usize; - let mut data = vec![0u8; len]; - s.read_exact(&mut data).await?; - Ok(data) - } - 0x03 => { - let mut _n = [0u8; 8]; - let _ = s.read_exact(&mut _n).await; - Err(HlfsError::NotFound) - } - 0x04 => { - let mut d = [0u8; 4]; - s.read_exact(&mut d).await?; - Err(HlfsError::Busy(Duration::from_millis(u32::from_le_bytes(d) as u64))) - } - _ => Err(HlfsError::Proto), - } -} - -fn parse_block_file(name: &str) -> Option { - // expects ".rmp.lz4" - let (stem, ext) = name.rsplit_once('.')?; - if ext != "lz4" { - return None; - } - if !stem.ends_with(".rmp") { - return None; - } - let stem = stem.strip_suffix(".rmp")?; - stem.parse::().ok() -} - -// Asynchronously find the largest block file under the 2-level shard layout: -// {root}/{floor_to_million}/{floor_to_thousand}/{number}.rmp.lz4 -pub async fn find_max_block(root: &Path) -> io::Result { - let mut max_num: Option = None; - - let mut top = fs::read_dir(root).await?; - while let Some(million_dir) = top.next_entry().await? { - if !million_dir.file_type().await?.is_dir() { - continue; - } - // Fast reject: top-level dir must parse to u64 (but we still scan if not—optional) - if million_dir.file_name().to_string_lossy().parse::().is_err() { - continue; - } - - let mut mid = fs::read_dir(million_dir.path()).await?; - while let Some(thousand_dir) = mid.next_entry().await? { - if !thousand_dir.file_type().await?.is_dir() { - continue; - } - // Optional reject again for dir-name parse to u64 - if thousand_dir.file_name().to_string_lossy().parse::().is_err() { - continue; - } - - let mut leaf = fs::read_dir(thousand_dir.path()).await?; - while let Some(ent) = leaf.next_entry().await? { - if !ent.file_type().await?.is_file() { - continue; - } - if let Some(name) = ent.file_name().to_str() { - if let Some(n) = parse_block_file(name) { - if max_num.map_or(true, |m| n > m) { - max_num = Some(n); + trace!(%addr, "hlfs: dialing"); + match timeout(self.timeout, TcpStream::connect(addr)).await { + Err(_) => continue, + Ok(Err(_)) => continue, + Ok(Ok(mut sock)) => { + let mut req = BytesMut::with_capacity(1 + 8); + req.put_u8(0x01); // GET + put_u64(&mut req, number); + if let Err(e) = sock.write_all(&req).await { + debug!(%addr, "hlfs: write err: {e}"); + continue; + } + let mut op = [0u8; 1]; + if let Err(e) = timeout(self.timeout, sock.read_exact(&mut op)).await { + debug!(%addr, "hlfs: read op timeout {e:?}"); + continue; + } + let op = op[0]; + match op { + 0x03 => { + // DATA + let mut len = [0u8; 4]; + sock.read_exact(&mut len).await?; + let len = u32::from_le_bytes(len) as usize; + let mut buf = vec![0u8; len]; + sock.read_exact(&mut buf).await?; + return Ok(buf); + } + 0x04 => { + let mut ms = [0u8; 4]; + sock.read_exact(&mut ms).await?; + last_busy = Some(u32::from_le_bytes(ms)); + continue; + } + 0x06 => { + return Err(HlfsError::NotFound); + } + _ => { + continue; } } } } } + if let Some(ms) = last_busy { + return Err(HlfsError::Busy(ms)); + } + Err(HlfsError::NotFound) + } +} + +fn find_max_number_file(root: &Path) -> Result { + fn parse_num(name: &str) -> Option { + name.strip_suffix(".rmp.lz4")?.parse::().ok() } - max_num.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no block files found")) + fn walk(dir: &Path, best: &mut Option) -> io::Result<()> { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + let ft = entry.file_type()?; + if ft.is_dir() { + walk(&path, best)?; + } else if ft.is_file() { + if let Some(name) = path.file_name().and_then(|s| s.to_str()) { + if let Some(n) = parse_num(name) { + if best.map_or(true, |b| n > b) { + *best = Some(n); + } + } + } + } + } + Ok(()) + } + + let mut best = None; + walk(root, &mut best)?; + Ok(best.expect("cannot find block files")) } /// Server: serves `{root}/{number}.rlp`. @@ -231,28 +172,25 @@ pub struct Server { } impl Server { - pub async fn new(bind: SocketAddr, root: impl Into) -> Result { - let root = root.into(); - fs::create_dir_all(&root).await?; // async, no unwrap/ok() - let max_block = find_max_block(&root).await?; // async discovery - - Ok(Self { + pub fn new(bind: SocketAddr, root: impl Into) -> Self { + let root: PathBuf = root.into(); + let n = find_max_number_file(&root).unwrap(); + Self { bind, root, max_conns: 512, - inflight: Arc::new(parking_lot::Mutex::new(0)), + inflight: Arc::new(Mutex::new(0)), busy_retry_ms: 100, - max_block, - }) + max_block: n, + } } - pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self { self.max_conns = max_conns; self.busy_retry_ms = busy_retry_ms; self } pub async fn run(self) -> Result<(), HlfsError> { - fs::create_dir_all(&self.root).await.ok(); + fs::create_dir_all(&self.root).ok(); info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening"); let lst = TcpListener::bind(self.bind).await?; loop { @@ -292,42 +230,44 @@ async fn handle_conn( let mut b = BytesMut::with_capacity(1 + 8 + 4); b.put_u8(0x05); put_u64(&mut b, max_block); + put_u32(&mut b, busy_ms); + let _ = sock.write_all(&b).await; return Ok(()); } - let mut nb = [0u8; 8]; - sock.read_exact(&mut nb).await?; - let number = u64::from_le_bytes(nb); + let mut num = [0u8; 8]; + sock.read_exact(&mut num).await?; + let number = u64::from_le_bytes(num); + let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 let f = (n / 1_000_000) * 1_000_000; let s = (n / 1_000) * 1_000; let path = format!("{}/{f}/{s}/{number}.rmp.lz4", root.to_string_lossy()); - match fs::read(&path).await { + + trace!(%addr, number, %path, "hlfs: req"); + if let Err(e) = ensure_parent_dirs(&path).await { + warn!(%addr, %path, "hlfs: mkdirs failed: {e}"); + } + + match fs::read(&path) { Ok(data) => { - debug!("hlfs: found path [{path}]"); - let mut b = BytesMut::with_capacity(1 + 8 + 4 + data.len()); - b.put_u8(0x02); - put_u64(&mut b, number); + let mut b = BytesMut::with_capacity(1 + 4 + data.len()); + b.put_u8(0x03); put_u32(&mut b, data.len() as u32); b.extend_from_slice(&data); - sock.write_all(&b).await?; - Ok(()) + let _ = sock.write_all(&b).await; } Err(e) if e.kind() == io::ErrorKind::NotFound => { - let mut b = [0u8; 9]; - b[0] = 0x03; - b[1..9].copy_from_slice(&number.to_le_bytes()); - sock.write_all(&b).await?; - Ok(()) - } - Err(_) => { - let mut b = BytesMut::with_capacity(5); - b.put_u8(0x04); - put_u32(&mut b, busy_ms); + let mut b = BytesMut::with_capacity(1); + b.put_u8(0x06); // not found let _ = sock.write_all(&b).await; - Err(HlfsError::Io(io::Error::new(io::ErrorKind::Other, "fs error"))) + } + Err(e) => { + warn!(%addr, %path, "hlfs: read error: {e}"); + let _ = sock.shutdown().await; } } + Ok(()) } /// Backfiller: ask client per missing block; rotate peers every block. @@ -359,134 +299,27 @@ impl Backfiller { let s = (n / 1_000) * 1_000; let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy()); - if fs::try_exists(&path).await? { + if Path::new(&path).exists() { + trace!(block = number, "hlfs: already have"); return Ok(None); - } else { - ensure_parent_dirs(&path).await?; } - - debug!(block = number, "hlfs: going to get_block from client"); match self.client.get_block(number, rr_index).await { - Ok(bytes) => { - debug!(block = number, "hlfs: YAY! got block from client"); - let tmp = format!("{}/{f}/{s}/{number}.rmp.lz4.part", self.root.to_string_lossy()); - ensure_parent_dirs(&tmp).await?; - - debug!(block = number, path=%tmp, "hlfs: writing file"); - fs::write(&tmp, &bytes).await?; - debug!(block = number, from=%tmp, to=%path, "hlfs: moving file"); - fs::rename(&tmp, &path).await?; - info!(block=number, bytes=bytes.len(), path=%path, "hlfs: wrote"); - Ok(Some(bytes.len())) + Err(HlfsError::NotFound) => Ok(None), + Err(HlfsError::Busy(ms)) => { + tokio::time::sleep(Duration::from_millis(ms as u64)).await; + Ok(None) } - Err(e) => { - debug!(block=number, error=%e, "hlfs: fetch failed"); - Err(e) + Err(e) => Err(e), + Ok(data) => { + if let Err(e) = ensure_parent_dirs(&path).await { + warn!(%path, "hlfs: mkdirs failed: {e}"); + } + if let Err(e) = fs::write(&path, &data) { + warn!(%path, "hlfs: write failed: {e}"); + return Ok(None); + } + Ok(Some(data.len())) } } } } - -#[cfg(test)] -mod tests { - use super::*; - use rand::{rngs::StdRng, Rng, SeedableRng}; - fn sample(n: u64) -> Vec { - vec![((n as usize) % 251) as u8; 3072] - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn serve_and_fetch_rr() { - reth_tracing::init_test_tracing(); - let dir = tempfile::tempdir().unwrap(); - for n in 0..100u64 { - fs::write(dir.path().join(format!("{n}.rlp")), sample(n)).await.unwrap(); - } - let s1 = Server::new("127.0.0.1:9597".parse().unwrap(), dir.path()).with_limits(64, 20); - let s2 = Server::new("127.0.0.1:9598".parse().unwrap(), dir.path()).with_limits(64, 20); - tokio::spawn(async move { - let _ = s1.run().await; - }); - tokio::spawn(async move { - let _ = s2.run().await; - }); - tokio::time::sleep(Duration::from_millis(50)).await; - - let client = - Client::new(vec!["127.0.0.1:9597".parse().unwrap(), "127.0.0.1:9598".parse().unwrap()]) - .with_timeout(Duration::from_secs(1)); - let a = client.get_block(10, 0).await.unwrap(); - let b = client.get_block(11, 1).await.unwrap(); - assert_eq!(a.len(), 3072); - assert_eq!(b.len(), 3072); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn backfill_only_when_older_than_threshold() { - reth_tracing::init_test_tracing(); - let src = tempfile::tempdir().unwrap(); - let dst = tempfile::tempdir().unwrap(); - fs::write(src.path().join("5.rlp"), sample(5)).await.unwrap(); - let srv = Server::new("127.0.0.1:9599".parse().unwrap(), src.path()); - tokio::spawn(async move { - let _ = srv.run().await; - }); - tokio::time::sleep(Duration::from_millis(50)).await; - - let bf = Backfiller::new( - Client::new(vec!["127.0.0.1:9599".parse().unwrap()]), - dst.path(), - 5_000, - ); - let got = bf.fetch_if_missing(5, 10_000, 0).await.unwrap(); - assert_eq!(got, Some(3072)); - let skip = bf.fetch_if_missing(9_999, 10_000, 1).await.unwrap(); - assert_eq!(skip, None); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn busy_and_notfound_rotate() { - reth_tracing::init_test_tracing(); - let dir = tempfile::tempdir().unwrap(); - fs::write(dir.path().join("7.rlp"), sample(7)).await.unwrap(); - let s_busy = Server::new("127.0.0.1:9601".parse().unwrap(), dir.path()).with_limits(0, 10); - let s_ok = Server::new("127.0.0.1:9602".parse().unwrap(), dir.path()); - tokio::spawn(async move { - let _ = s_busy.run().await; - }); - tokio::spawn(async move { - let _ = s_ok.run().await; - }); - tokio::time::sleep(Duration::from_millis(50)).await; - - let c = - Client::new(vec!["127.0.0.1:9601".parse().unwrap(), "127.0.0.1:9602".parse().unwrap()]); - let b = c.get_block(7, 0).await.unwrap(); - assert_eq!(b.len(), 3072); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn all_peers_return_not_found() { - reth_tracing::init_test_tracing(); - let dir = tempfile::tempdir().unwrap(); - // Don't create the block file, so all servers will return NotFound - - let s1 = Server::new("127.0.0.1:9603".parse().unwrap(), dir.path()); - let s2 = Server::new("127.0.0.1:9604".parse().unwrap(), dir.path()); - tokio::spawn(async move { - let _ = s1.run().await; - }); - tokio::spawn(async move { - let _ = s2.run().await; - }); - tokio::time::sleep(Duration::from_millis(50)).await; - - let client = - Client::new(vec!["127.0.0.1:9603".parse().unwrap(), "127.0.0.1:9604".parse().unwrap()]) - .with_timeout(Duration::from_secs(1)); - - // Request a block that doesn't exist on any peer - let result = client.get_block(999, 0).await; - assert!(matches!(result, Err(HlfsError::NotFound))); - } -} From 2a653857aa020f47836fd3238f9a759a318a8fe9 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 22:18:42 -0700 Subject: [PATCH 11/19] debug --- bin/reth/src/block_ingest.rs | 6 +-- bin/reth/src/share_blocks.rs | 69 ++++++++++++++++------------ crates/net/hlfs/src/lib.rs | 89 +++++++++++++++++++++--------------- 3 files changed, 95 insertions(+), 69 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index 455b5eae6..c6b60a1f5 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -150,7 +150,7 @@ fn date_from_datetime(dt: OffsetDateTime) -> String { } impl BlockIngest { - pub(crate) async fn collect_block(&self, head: u64, height: u64) -> Option { + pub(crate) async fn collect_block(&self, height: u64) -> Option { // info!("Attempting to collect block @ height [{height}]"); // Not a one liner (using .or) to include logs @@ -161,7 +161,7 @@ impl BlockIngest { if let Some(hlfs) = &self.hlfs { //info!("!! HEIGHT [{height}] :: HEAD [{head}]"); - if hlfs.try_fetch_one(height, head).await.ok().flatten().is_some() { + if hlfs.try_fetch_one(height).await.ok().flatten().is_some() { if let Some(block) = self.try_collect_local_block(height).await { info!("Returning HLFS-fetched block @[{height}]"); return Some(block); @@ -298,7 +298,7 @@ impl BlockIngest { self.start_local_ingest_loop(height, current_block_timestamp).await; loop { - let Some(original_block) = self.collect_block(head, height).await else { + let Some(original_block) = self.collect_block(head).await else { tokio::time::sleep(std::time::Duration::from_millis(25)).await; continue; }; diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index 3b0a6b9fd..075538c7f 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -1,5 +1,5 @@ use clap::Args; -use reth_hlfs::{Backfiller, Client, Server}; +use reth_hlfs::{Backfiller, Client, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK, PeerRecord}; use reth_network_api::{events::NetworkEvent, FullNetwork}; use std::{ collections::HashSet, @@ -24,8 +24,6 @@ pub(crate) struct ShareBlocksArgs { pub share_blocks_port: u16, #[arg(long, default_value = "evm-blocks")] pub archive_dir: PathBuf, - #[arg(long, default_value_t = 5_000)] - pub hist_threshold: u64, } pub(crate) struct ShareBlocks { @@ -55,19 +53,19 @@ impl ShareBlocks { } }); - let client = Client::new(Vec::new()).with_timeout(Duration::from_secs(5)); - let bf = Backfiller::new(client, &args.archive_dir, args.hist_threshold); + let client = Client::new(&args.archive_dir, Vec::new()).with_timeout(Duration::from_secs(5)); + let bf = Backfiller::new(client, &args.archive_dir); let _autodetect = spawn_autodetect(network, host, args.share_blocks_port, bf.clone()); - info!(%bind, dir=%args.archive_dir.display(), hist_threshold=%args.hist_threshold, "hlfs: enabled (reth peers)"); + info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)"); Ok(Self { _backfiller: bf, _server, _autodetect }) } #[allow(dead_code)] - pub(crate) async fn try_fetch_one(&self, block: u64, head: u64) -> eyre::Result> { - let rr = (block as usize) ^ (head as usize); // deterministic round-robin seed - self._backfiller.fetch_if_missing(block, head, rr).await.map_err(|e| eyre::eyre!(e)) + pub(crate) async fn try_fetch_one(&self, block: u64) -> eyre::Result> { + let rr = block as usize; + self._backfiller.fetch_if_missing(block, rr).await.map_err(|e| eyre::eyre!(e)) // <- fix: HlfsError -> eyre::Report } } @@ -81,8 +79,7 @@ fn spawn_autodetect( where Net: FullNetwork + Clone + 'static, { - let good: Arc>> = - Arc::new(tokio::sync::Mutex::new(HashSet::new())); + let good: Arc>> = Arc::new(tokio::sync::Mutex::new(HashSet::new())); tokio::spawn(async move { let mut events = network.event_listener(); @@ -90,7 +87,6 @@ where match events.next().await { Some(NetworkEvent::ActivePeerSession { info, .. }) => { let ip = info.remote_addr.ip(); - // skip unusable/self if ip.is_unspecified() { debug!(%ip, "hlfs: skip unspecified"); continue; @@ -100,9 +96,9 @@ where continue; } let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); - if probe_hlfs(addr).await { + if let max_block = probe_hlfs(addr).await { let mut g = good.lock().await; - if g.insert(addr) { + if g.insert(PeerRecord { addr, max_block }) { let v: Vec<_> = g.iter().copied().collect(); backfiller.set_peers(v.clone()); info!(%addr, total=v.len(), "hlfs: peer added"); @@ -121,22 +117,35 @@ where }) } -async fn probe_hlfs(addr: SocketAddr) -> bool { - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - use tokio::net::TcpStream; +pub async fn probe_hlfs(addr: SocketAddr) -> u64 { + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpStream, + }; - let res = timeout(Duration::from_secs(2), async { - if let Ok(mut s) = TcpStream::connect(addr).await { - let mut msg = [0u8; 9]; - msg[0] = 0x01; - let _ = s.write_all(&msg).await; - let mut op = [0u8; 1]; - if s.read_exact(&mut op).await.is_ok() { - return matches!(op[0], 0x02 | 0x03 | 0x04); - } + let fut = async { + let mut s = TcpStream::connect(addr).await.ok()?; + + // send [OP][8 zero bytes] + let mut msg = [0u8; 9]; + msg[0] = OP_REQ_MAX_BLOCK; + s.write_all(&msg).await.ok()?; + + // read 1-byte opcode + let mut op = [0u8; 1]; + s.read_exact(&mut op).await.ok()?; + if op[0] != OP_RES_MAX_BLOCK { + return None; } - false - }) - .await; - matches!(res, Ok(true)) + + // read 8-byte little-endian block number + let mut blk = [0u8; 8]; + s.read_exact(&mut blk).await.ok()?; + Some(u64::from_le_bytes(blk)) + }; + + match timeout(Duration::from_secs(2), fut).await { + Ok(Some(n)) => n, + _ => 0, + } } diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index c8d195282..45155180f 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -4,7 +4,9 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use parking_lot::Mutex; use reth_tracing::tracing::{debug, info, trace, warn}; use std::{ - fs, io, + fs, + hash::{Hash, Hasher}, + io, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, @@ -19,6 +21,13 @@ use tokio::{ type Result = std::result::Result; +pub const OP_REQ_BLOCK: u8 = 0x01; +pub const OP_RES_BLOCK: u8 = 0x02; +pub const OP_REQ_MAX_BLOCK: u8 = 0x03; +pub const OP_RES_MAX_BLOCK: u8 = 0x04; +pub const OP_ERR_TOO_BUSY: u8 = 0x05; +pub const OP_ERR_NOT_FOUND: u8 = 0x06; + #[derive(Error, Debug)] pub enum HlfsError { #[error("io: {0}")] @@ -53,23 +62,45 @@ async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { } /// Client: tries each peer once; rotates starting index per call +#[derive(Debug, Copy, Clone)] +pub struct PeerRecord { + pub addr: SocketAddr, + pub max_block: u64, +} + +impl PartialEq for PeerRecord { + fn eq(&self, o: &Self) -> bool { + self.addr == o.addr + } +} +impl Eq for PeerRecord {} +impl Hash for PeerRecord { + fn hash(&self, s: &mut H) { + self.addr.hash(s); + } +} + #[derive(Clone)] pub struct Client { - peers: Arc>>, + root: PathBuf, + peers: Arc>>, timeout: Duration, + max_block: u64, } impl Client { - pub fn new(peers: Vec) -> Self { - Self { peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(3) } + pub fn new(root: impl Into, peers: Vec) -> Self { + let root: PathBuf = root.into(); + let n = find_max_number_file(&root).unwrap(); + Self { root, peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(3), max_block: n } } - pub fn update_peers(&self, peers: Vec) { + pub fn update_peers(&self, peers: Vec) { *self.peers.lock() = peers; } pub fn with_timeout(mut self, d: Duration) -> Self { self.timeout = d; self } - pub async fn get_block(&self, number: u64, rr_index: usize) -> Result, HlfsError> { + pub async fn wants_block(&self, number: u64, rr_index: usize) -> Result, HlfsError> { let peers = self.peers.lock().clone(); debug!(peer_count = peers.len(), "hlfs: peers"); if peers.is_empty() { @@ -80,26 +111,26 @@ impl Client { let mut last_busy: Option = None; while let Some(i) = all.next() { let addr = peers[i]; - trace!(%addr, "hlfs: dialing"); - match timeout(self.timeout, TcpStream::connect(addr)).await { + trace!(%addr.addr, "hlfs: dialing"); + match timeout(self.timeout, TcpStream::connect(addr.addr)).await { Err(_) => continue, Ok(Err(_)) => continue, Ok(Ok(mut sock)) => { let mut req = BytesMut::with_capacity(1 + 8); - req.put_u8(0x01); // GET + req.put_u8(OP_REQ_BLOCK); put_u64(&mut req, number); if let Err(e) = sock.write_all(&req).await { - debug!(%addr, "hlfs: write err: {e}"); + debug!(%addr.addr, "hlfs: write err: {e}"); continue; } let mut op = [0u8; 1]; if let Err(e) = timeout(self.timeout, sock.read_exact(&mut op)).await { - debug!(%addr, "hlfs: read op timeout {e:?}"); + debug!(%addr.addr, "hlfs: read op timeout {e:?}"); continue; } let op = op[0]; match op { - 0x03 => { + OP_RES_BLOCK => { // DATA let mut len = [0u8; 4]; sock.read_exact(&mut len).await?; @@ -108,13 +139,13 @@ impl Client { sock.read_exact(&mut buf).await?; return Ok(buf); } - 0x04 => { + OP_ERR_TOO_BUSY => { let mut ms = [0u8; 4]; sock.read_exact(&mut ms).await?; last_busy = Some(u32::from_le_bytes(ms)); continue; } - 0x06 => { + OP_ERR_NOT_FOUND => { return Err(HlfsError::NotFound); } _ => { @@ -197,7 +228,7 @@ impl Server { let (mut sock, addr) = lst.accept().await?; if *self.inflight.lock() >= self.max_conns { let mut b = BytesMut::with_capacity(5); - b.put_u8(0x04); + b.put_u8(OP_ERR_TOO_BUSY); put_u32(&mut b, self.busy_retry_ms); let _ = sock.write_all(&b).await; continue; @@ -222,18 +253,10 @@ async fn handle_conn( ) -> Result<(), HlfsError> { let mut op = [0u8; 1]; sock.read_exact(&mut op).await?; - if op[0] != 0x01 && op[0] != 0x02 { + if op[0] != OP_REQ_BLOCK { warn!(%addr, "hlfs: bad op"); return Err(HlfsError::Proto); } - if op[0] == 0x02 { - let mut b = BytesMut::with_capacity(1 + 8 + 4); - b.put_u8(0x05); - put_u64(&mut b, max_block); - put_u32(&mut b, busy_ms); - let _ = sock.write_all(&b).await; - return Ok(()); - } let mut num = [0u8; 8]; sock.read_exact(&mut num).await?; @@ -252,14 +275,14 @@ async fn handle_conn( match fs::read(&path) { Ok(data) => { let mut b = BytesMut::with_capacity(1 + 4 + data.len()); - b.put_u8(0x03); + b.put_u8(OP_RES_BLOCK); put_u32(&mut b, data.len() as u32); b.extend_from_slice(&data); let _ = sock.write_all(&b).await; } Err(e) if e.kind() == io::ErrorKind::NotFound => { let mut b = BytesMut::with_capacity(1); - b.put_u8(0x06); // not found + b.put_u8(OP_ERR_NOT_FOUND); let _ = sock.write_all(&b).await; } Err(e) => { @@ -275,25 +298,19 @@ async fn handle_conn( pub struct Backfiller { client: Client, root: PathBuf, - hist_threshold: u64, } impl Backfiller { - pub fn new(client: Client, root: impl Into, hist_threshold: u64) -> Self { - Self { client, root: root.into(), hist_threshold } + pub fn new(client: Client, root: impl Into) -> Self { + Self { client, root: root.into() } } - pub fn set_peers(&self, peers: Vec) { + pub fn set_peers(&self, peers: Vec) { self.client.update_peers(peers); } pub async fn fetch_if_missing( &self, number: u64, - head: u64, rr_index: usize, ) -> Result, HlfsError> { - if head >= self.hist_threshold && number + self.hist_threshold > head { - //debug!(block=number, "hlfs: skip"); - return Ok(None); - } let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 let f = (n / 1_000_000) * 1_000_000; let s = (n / 1_000) * 1_000; @@ -303,7 +320,7 @@ impl Backfiller { trace!(block = number, "hlfs: already have"); return Ok(None); } - match self.client.get_block(number, rr_index).await { + match self.client.wants_block(number, rr_index).await { Err(HlfsError::NotFound) => Ok(None), Err(HlfsError::Busy(ms)) => { tokio::time::sleep(Duration::from_millis(ms as u64)).await; From 3bdc491abaaac9250385ab13e101093c335979bd Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 22:43:03 -0700 Subject: [PATCH 12/19] debug --- crates/net/hlfs/src/lib.rs | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index 45155180f..02ad42462 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -91,7 +91,12 @@ impl Client { pub fn new(root: impl Into, peers: Vec) -> Self { let root: PathBuf = root.into(); let n = find_max_number_file(&root).unwrap(); - Self { root, peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(3), max_block: n } + Self { + root, + peers: Arc::new(Mutex::new(peers)), + timeout: Duration::from_secs(3), + max_block: n, + } } pub fn update_peers(&self, peers: Vec) { *self.peers.lock() = peers; @@ -187,7 +192,7 @@ fn find_max_number_file(root: &Path) -> Result { Ok(()) } - let mut best = None; + let mut best = Some(0); walk(root, &mut best)?; Ok(best.expect("cannot find block files")) } @@ -253,11 +258,19 @@ async fn handle_conn( ) -> Result<(), HlfsError> { let mut op = [0u8; 1]; sock.read_exact(&mut op).await?; - if op[0] != OP_REQ_BLOCK { + if op[0] != OP_REQ_BLOCK && op[0] != OP_REQ_MAX_BLOCK { warn!(%addr, "hlfs: bad op"); return Err(HlfsError::Proto); } + if op[0] == OP_REQ_MAX_BLOCK { + let mut b = BytesMut::with_capacity(1 + 8); + b.put_u8(OP_RES_MAX_BLOCK); + put_u64(&mut b, max_block); + let _ = sock.write_all(&b).await; + return Ok(()); + } + let mut num = [0u8; 8]; sock.read_exact(&mut num).await?; let number = u64::from_le_bytes(num); From 5114f7651729833e28fe1a9432f6c45ce8692891 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 22:52:37 -0700 Subject: [PATCH 13/19] debug --- bin/reth/src/share_blocks.rs | 7 ++++--- crates/net/hlfs/src/lib.rs | 1 + 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index 075538c7f..7eeddba08 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -96,12 +96,13 @@ where continue; } let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); - if let max_block = probe_hlfs(addr).await { + let max_block = probe_hlfs(addr).await; + if max_block != 0 { let mut g = good.lock().await; if g.insert(PeerRecord { addr, max_block }) { let v: Vec<_> = g.iter().copied().collect(); backfiller.set_peers(v.clone()); - info!(%addr, total=v.len(), "hlfs: peer added"); + info!(%addr, %max_block, total=v.len(), "hlfs: peer added"); } } else { debug!(%addr, "hlfs: peer has no HLFS"); @@ -117,7 +118,7 @@ where }) } -pub async fn probe_hlfs(addr: SocketAddr) -> u64 { +pub(crate) async fn probe_hlfs(addr: SocketAddr) -> u64 { use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, net::TcpStream, diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index 02ad42462..ddbbb8b2d 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -91,6 +91,7 @@ impl Client { pub fn new(root: impl Into, peers: Vec) -> Self { let root: PathBuf = root.into(); let n = find_max_number_file(&root).unwrap(); + debug!(max_block = n, "hlfs: our archive"); Self { root, peers: Arc::new(Mutex::new(peers)), From cdb0f9e8a2cb59f3b7e5225c705814f23f5710e2 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 23:46:53 -0700 Subject: [PATCH 14/19] debug --- bin/reth/src/block_ingest.rs | 18 +++--- bin/reth/src/share_blocks.rs | 113 ++++++++++++++++++++--------------- crates/net/hlfs/src/lib.rs | 21 ++++--- 3 files changed, 89 insertions(+), 63 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index c6b60a1f5..74b82bbf0 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -159,15 +159,15 @@ impl BlockIngest { return Some(block); } - if let Some(hlfs) = &self.hlfs { - //info!("!! HEIGHT [{height}] :: HEAD [{head}]"); - if hlfs.try_fetch_one(height).await.ok().flatten().is_some() { - if let Some(block) = self.try_collect_local_block(height).await { - info!("Returning HLFS-fetched block @[{height}]"); - return Some(block); - } - } - } + // if let Some(hlfs) = &self.hlfs { + // //info!("!! HEIGHT [{height}] :: HEAD [{head}]"); + // if hlfs.try_fetch_one(height).await.ok().flatten().is_some() { + // if let Some(block) = self.try_collect_local_block(height).await { + // info!("Returning HLFS-fetched block @[{height}]"); + // return Some(block); + // } + // } + // } self.try_collect_s3_block(height) } diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index 7eeddba08..ede25305c 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -1,14 +1,16 @@ use clap::Args; -use reth_hlfs::{Backfiller, Client, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK, PeerRecord}; +use reth_hlfs::{Backfiller, Client, PeerRecord, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK}; use reth_network_api::{events::NetworkEvent, FullNetwork}; use std::{ collections::HashSet, net::{IpAddr, SocketAddr}, path::PathBuf, sync::Arc, - time::Duration, }; -use tokio::{task::JoinHandle, time::timeout}; +use tokio::{ + task::JoinHandle, + time::{sleep, timeout, Duration}, +}; use tracing::{debug, info, warn}; // use futures_util::StreamExt; @@ -27,7 +29,6 @@ pub(crate) struct ShareBlocksArgs { } pub(crate) struct ShareBlocks { - pub(crate) _backfiller: Backfiller, _server: JoinHandle<()>, _autodetect: JoinHandle<()>, } @@ -53,65 +54,83 @@ impl ShareBlocks { } }); - let client = Client::new(&args.archive_dir, Vec::new()).with_timeout(Duration::from_secs(5)); - let bf = Backfiller::new(client, &args.archive_dir); - - let _autodetect = spawn_autodetect(network, host, args.share_blocks_port, bf.clone()); + let _autodetect = spawn_autodetect(network, host, args.share_blocks_port, args.archive_dir.clone()); info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)"); - Ok(Self { _backfiller: bf, _server, _autodetect }) + Ok(Self { _server, _autodetect }) } - #[allow(dead_code)] - pub(crate) async fn try_fetch_one(&self, block: u64) -> eyre::Result> { - let rr = block as usize; - self._backfiller.fetch_if_missing(block, rr).await.map_err(|e| eyre::eyre!(e)) - // <- fix: HlfsError -> eyre::Report - } + // #[allow(dead_code)] + // pub(crate) async fn try_fetch_one(&self, block: u64) -> eyre::Result> { + // self._backfiller.fetch_if_missing(block).await.map_err(|e| eyre::eyre!(e)) + // } } fn spawn_autodetect( network: Net, self_ip: IpAddr, hlfs_port: u16, - backfiller: Backfiller, + archive_dir: PathBuf, ) -> JoinHandle<()> where Net: FullNetwork + Clone + 'static, { - let good: Arc>> = Arc::new(tokio::sync::Mutex::new(HashSet::new())); + let client = Client::new(&archive_dir, Vec::new()).with_timeout(Duration::from_secs(5)); + let backfiller = Arc::new(tokio::sync::Mutex::new(Backfiller::new(client, &archive_dir))); + let good: Arc>> = + Arc::new(tokio::sync::Mutex::new(HashSet::new())); - tokio::spawn(async move { - let mut events = network.event_listener(); - loop { - match events.next().await { - Some(NetworkEvent::ActivePeerSession { info, .. }) => { - let ip = info.remote_addr.ip(); - if ip.is_unspecified() { - debug!(%ip, "hlfs: skip unspecified"); - continue; - } - if ip == self_ip { - debug!(%ip, "hlfs: skip self"); - continue; - } - let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); - let max_block = probe_hlfs(addr).await; - if max_block != 0 { - let mut g = good.lock().await; - if g.insert(PeerRecord { addr, max_block }) { - let v: Vec<_> = g.iter().copied().collect(); - backfiller.set_peers(v.clone()); - info!(%addr, %max_block, total=v.len(), "hlfs: peer added"); - } - } else { - debug!(%addr, "hlfs: peer has no HLFS"); - } + tokio::spawn({ + let backfiller = backfiller.clone(); + async move { + loop { + let mut bf = backfiller.lock().await; + warn!("hlfs: backfiller started"); + if bf.client.max_block < bf.max_block_seen { + let block = bf.client.max_block + 1; + let _ = bf.fetch_if_missing(block).await; } - Some(_) => {} - None => { - warn!("hlfs: network event stream ended"); - break; + + sleep(Duration::from_secs(1)).await; + } + } + }); + + tokio::spawn({ + let backfiller = backfiller.clone(); + async move { + let mut events = network.event_listener(); + loop { + let mut bf = backfiller.lock().await; + match events.next().await { + Some(NetworkEvent::ActivePeerSession { info, .. }) => { + let ip = info.remote_addr.ip(); + if ip.is_unspecified() { + debug!(%ip, "hlfs: skip unspecified"); + continue; + } + if ip == self_ip { + debug!(%ip, "hlfs: skip self"); + continue; + } + let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); + let max_block = probe_hlfs(addr).await; + if max_block != 0 { + let mut g = good.lock().await; + if g.insert(PeerRecord { addr, max_block }) { + let v: Vec<_> = g.iter().copied().collect(); + bf.set_peers(v.clone()); + info!(%addr, %max_block, total=v.len(), "hlfs: peer added"); + } + } else { + debug!(%addr, "hlfs: peer has no HLFS"); + } + } + Some(_) => {} + None => { + warn!("hlfs: network event stream ended"); + break; + } } } } diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index ddbbb8b2d..bbbf4403e 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -83,9 +83,9 @@ impl Hash for PeerRecord { #[derive(Clone)] pub struct Client { root: PathBuf, - peers: Arc>>, + pub peers: Arc>>, timeout: Duration, - max_block: u64, + pub max_block: u64, } impl Client { pub fn new(root: impl Into, peers: Vec) -> Self { @@ -310,21 +310,28 @@ async fn handle_conn( /// Backfiller: ask client per missing block; rotate peers every block. #[derive(Clone)] pub struct Backfiller { - client: Client, + pub client: Client, root: PathBuf, + pub max_block_seen: u64, } impl Backfiller { pub fn new(client: Client, root: impl Into) -> Self { - Self { client, root: root.into() } + Self { client, root: root.into(), max_block_seen: 0 } } - pub fn set_peers(&self, peers: Vec) { + pub fn set_peers(&mut self, peers: Vec) { self.client.update_peers(peers); + let _peers = self.client.peers.lock().clone(); + for p in _peers { + if p.max_block > self.max_block_seen { + self.max_block_seen = p.max_block + } + } } pub async fn fetch_if_missing( - &self, + &mut self, number: u64, - rr_index: usize, ) -> Result, HlfsError> { + let rr_index = number as usize; let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 let f = (n / 1_000_000) * 1_000_000; let s = (n / 1_000) * 1_000; From fae4b4c8f903c87fa48ffc20aa25c511f59ebdbe Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 23:50:10 -0700 Subject: [PATCH 15/19] debug --- bin/reth/src/share_blocks.rs | 3 +-- crates/net/hlfs/src/lib.rs | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index ede25305c..6383bb4c1 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -81,16 +81,15 @@ where Arc::new(tokio::sync::Mutex::new(HashSet::new())); tokio::spawn({ + warn!("hlfs: backfiller started"); let backfiller = backfiller.clone(); async move { loop { let mut bf = backfiller.lock().await; - warn!("hlfs: backfiller started"); if bf.client.max_block < bf.max_block_seen { let block = bf.client.max_block + 1; let _ = bf.fetch_if_missing(block).await; } - sleep(Duration::from_secs(1)).await; } } diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index bbbf4403e..a5c739a02 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -338,7 +338,7 @@ impl Backfiller { let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy()); if Path::new(&path).exists() { - trace!(block = number, "hlfs: already have"); + debug!(block = number, "hlfs: already have"); return Ok(None); } match self.client.wants_block(number, rr_index).await { @@ -356,6 +356,7 @@ impl Backfiller { warn!(%path, "hlfs: write failed: {e}"); return Ok(None); } + debug!(block = number, "hlfs: got block"); Ok(Some(data.len())) } } From 5c3828382c20dab4fd0ca5d7105ba558d142c992 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Fri, 22 Aug 2025 00:00:23 -0700 Subject: [PATCH 16/19] debug --- bin/reth/src/share_blocks.rs | 5 +++-- crates/net/hlfs/src/lib.rs | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index 6383bb4c1..8edee53a6 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -88,9 +88,10 @@ where let mut bf = backfiller.lock().await; if bf.client.max_block < bf.max_block_seen { let block = bf.client.max_block + 1; - let _ = bf.fetch_if_missing(block).await; + let new_height = bf.fetch_if_missing(block).await.expect("new height"); + bf.client.max_block = new_height.unwrap(); } - sleep(Duration::from_secs(1)).await; + sleep(Duration::from_millis(50)).await; } } }); diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index a5c739a02..d8ba3a059 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -330,7 +330,7 @@ impl Backfiller { pub async fn fetch_if_missing( &mut self, number: u64, - ) -> Result, HlfsError> { + ) -> Result, HlfsError> { let rr_index = number as usize; let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 let f = (n / 1_000_000) * 1_000_000; @@ -357,7 +357,7 @@ impl Backfiller { return Ok(None); } debug!(block = number, "hlfs: got block"); - Ok(Some(data.len())) + Ok(Some(number)) } } } From 06f7710777bb333d63d0f0b4949167f66613d772 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Fri, 22 Aug 2025 14:21:48 -0700 Subject: [PATCH 17/19] code review findings --- bin/reth/src/block_ingest.rs | 12 ------------ bin/reth/src/share_blocks.rs | 4 ---- 2 files changed, 16 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index 74b82bbf0..2b023b6c7 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -29,7 +29,6 @@ use tokio::sync::Mutex; use tracing::{debug, info}; use crate::serialized::{BlockAndReceipts, EvmBlock}; -use crate::share_blocks::ShareBlocks; use crate::spot_meta::erc20_contract_to_spot_token; /// Poll interval when tailing an *open* hourly file. @@ -42,7 +41,6 @@ pub(crate) struct BlockIngest { pub local_ingest_dir: Option, pub local_blocks_cache: Arc>>, // height → block pub precompiles_cache: PrecompilesCache, - pub hlfs: Option, } #[derive(Deserialize)] @@ -159,16 +157,6 @@ impl BlockIngest { return Some(block); } - // if let Some(hlfs) = &self.hlfs { - // //info!("!! HEIGHT [{height}] :: HEAD [{head}]"); - // if hlfs.try_fetch_one(height).await.ok().flatten().is_some() { - // if let Some(block) = self.try_collect_local_block(height).await { - // info!("Returning HLFS-fetched block @[{height}]"); - // return Some(block); - // } - // } - // } - self.try_collect_s3_block(height) } diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index 8edee53a6..ee7afa1bc 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -60,10 +60,6 @@ impl ShareBlocks { Ok(Self { _server, _autodetect }) } - // #[allow(dead_code)] - // pub(crate) async fn try_fetch_one(&self, block: u64) -> eyre::Result> { - // self._backfiller.fetch_if_missing(block).await.map_err(|e| eyre::eyre!(e)) - // } } fn spawn_autodetect( From e8be4c2e8242c54bb0ce41c22b86e4d95779a098 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Fri, 22 Aug 2025 14:23:22 -0700 Subject: [PATCH 18/19] code review findings --- bin/reth/src/block_ingest.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index 2b023b6c7..39586e305 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -286,7 +286,7 @@ impl BlockIngest { self.start_local_ingest_loop(height, current_block_timestamp).await; loop { - let Some(original_block) = self.collect_block(head).await else { + let Some(original_block) = self.collect_block(height).await else { tokio::time::sleep(std::time::Duration::from_millis(25)).await; continue; }; From 738567894adfb801699a6faaad9a4140723ef2a9 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Tue, 26 Aug 2025 19:49:45 -0700 Subject: [PATCH 19/19] only consider top level highest dir name on archive node file root; also added back hlfs to blockingest struct --- bin/reth/src/block_ingest.rs | 2 ++ bin/reth/src/share_blocks.rs | 4 ++-- crates/net/hlfs/src/lib.rs | 20 +++++++++++++++----- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index 39586e305..da6884870 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -29,6 +29,7 @@ use tokio::sync::Mutex; use tracing::{debug, info}; use crate::serialized::{BlockAndReceipts, EvmBlock}; +use crate::share_blocks::ShareBlocks; use crate::spot_meta::erc20_contract_to_spot_token; /// Poll interval when tailing an *open* hourly file. @@ -41,6 +42,7 @@ pub(crate) struct BlockIngest { pub local_ingest_dir: Option, pub local_blocks_cache: Arc>>, // height → block pub precompiles_cache: PrecompilesCache, + pub hlfs: Option, } #[derive(Deserialize)] diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index ee7afa1bc..4e5188c12 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -54,12 +54,12 @@ impl ShareBlocks { } }); - let _autodetect = spawn_autodetect(network, host, args.share_blocks_port, args.archive_dir.clone()); + let _autodetect = + spawn_autodetect(network, host, args.share_blocks_port, args.archive_dir.clone()); info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)"); Ok(Self { _server, _autodetect }) } - } fn spawn_autodetect( diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index d8ba3a059..d9490f159 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -194,7 +194,20 @@ fn find_max_number_file(root: &Path) -> Result { } let mut best = Some(0); - walk(root, &mut best)?; + let top: PathBuf = fs::read_dir(root)? + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false)) + .filter_map(|e| { + let name = e.file_name(); + let s = name.to_str()?; + let n: u64 = s.parse().ok()?; + Some((n, e.path())) + }) + .max_by_key(|(n, _)| *n) + .map(|(_, p)| p) + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no numeric top-level dirs"))?; + + walk(&top, &mut best)?; Ok(best.expect("cannot find block files")) } @@ -327,10 +340,7 @@ impl Backfiller { } } } - pub async fn fetch_if_missing( - &mut self, - number: u64, - ) -> Result, HlfsError> { + pub async fn fetch_if_missing(&mut self, number: u64) -> Result, HlfsError> { let rr_index = number as usize; let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 let f = (n / 1_000_000) * 1_000_000;