From 022dfd15461791b3437dc8a5360f05c198bb4fd7 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 19:03:21 -0700 Subject: [PATCH] debug --- crates/net/hlfs/src/lib.rs | 93 +++++++++++++++++++++++++------------- 1 file changed, 62 insertions(+), 31 deletions(-) diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index 9a8c88024..914f3f019 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -4,7 +4,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use parking_lot::Mutex; use reth_tracing::tracing::trace; use std::{ - fs, io, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, @@ -12,10 +11,11 @@ use std::{ }; use thiserror::Error; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, + fs, io, net::{TcpListener, TcpStream}, time::{sleep, timeout}, }; +use tokio::fs::DirEntry; use tracing::{debug, info, warn}; #[derive(Debug, Error)] @@ -47,7 +47,7 @@ fn put_u32(b: &mut BytesMut, v: u32) { async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { if let Some(parent) = Path::new(path).parent() { - fs::create_dir_all(parent) + fs::create_dir_all(parent).await } else { Ok(()) } @@ -163,34 +163,61 @@ async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> } } -fn find_max_number_file(root: &Path) -> Result { - fn parse_num(name: &str) -> Option { - name.strip_suffix(".rmp.lz4")?.parse::().ok() +fn parse_block_file(name: &str) -> Option { + // expects ".rmp.lz4" + let (stem, ext) = name.rsplit_once('.')?; + if ext != "lz4" { + return None; } + if !stem.ends_with(".rmp") { + return None; + } + let stem = stem.strip_suffix(".rmp")?; + stem.parse::().ok() +} - fn walk(dir: &Path, best: &mut Option) -> io::Result<()> { - for entry in fs::read_dir(dir)? { - let entry = entry?; - let path = entry.path(); - let ft = entry.file_type()?; - if ft.is_dir() { - walk(&path, best)?; - } else if ft.is_file() { - if let Some(name) = path.file_name().and_then(|s| s.to_str()) { - if let Some(n) = parse_num(name) { - if best.map_or(true, |(m, _)| n > m) { - *best = Some((n, path.clone())); +// Asynchronously find the largest block file under the 2-level shard layout: +// {root}/{floor_to_million}/{floor_to_thousand}/{number}.rmp.lz4 +pub async fn find_max_block(root: &Path) -> io::Result { + let mut max_num: Option = None; + + let mut top = fs::read_dir(root).await?; + while let Some(million_dir) = top.next_entry().await? { + if !million_dir.file_type().await?.is_dir() { + continue; + } + // Fast reject: top-level dir must parse to u64 (but we still scan if not—optional) + if million_dir.file_name().to_string_lossy().parse::().is_err() { + continue; + } + + let mut mid = fs::read_dir(million_dir.path()).await?; + while let Some(thousand_dir) = mid.next_entry().await? { + if !thousand_dir.file_type().await?.is_dir() { + continue; + } + // Optional reject again for dir-name parse to u64 + if thousand_dir.file_name().to_string_lossy().parse::().is_err() { + continue; + } + + let mut leaf = fs::read_dir(thousand_dir.path()).await?; + while let Some(ent) = leaf.next_entry().await? { + if !ent.file_type().await?.is_file() { + continue; + } + if let Some(name) = ent.file_name().to_str() { + if let Some(n) = parse_block_file(name) { + if max_num.map_or(true, |m| n > m) { + max_num = Some(n); } } } } } - Ok(()) } - let mut best = None; - walk(root, &mut best)?; - Ok(best.expect("cannot find block files")) + max_num.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no block files found")) } /// Server: serves `{root}/{number}.rlp`. @@ -204,24 +231,28 @@ pub struct Server { } impl Server { - pub fn new(bind: SocketAddr, root: impl Into) -> Self { - let n = find_max_number_file(&root.into()).unwrap(); - Self { + pub async fn new(bind: SocketAddr, root: impl Into) -> Result { + let root = root.into(); + fs::create_dir_all(&root).await?; // async, no unwrap/ok() + let max_block = find_max_block(&root).await?; // async discovery + + Ok(Self { bind, - root: root.into(), + root, max_conns: 512, - inflight: Arc::new(Mutex::new(0)), + inflight: Arc::new(parking_lot::Mutex::new(0)), busy_retry_ms: 100, - max_block: n.unwrap(), - } + max_block, + }) } + pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self { self.max_conns = max_conns; self.busy_retry_ms = busy_retry_ms; self } pub async fn run(self) -> Result<(), HlfsError> { - fs::create_dir_all(&self.root).ok(); + fs::create_dir_all(&self.root).await.ok(); info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening"); let lst = TcpListener::bind(self.bind).await?; loop { @@ -253,7 +284,7 @@ async fn handle_conn( ) -> Result<(), HlfsError> { let mut op = [0u8; 1]; sock.read_exact(&mut op).await?; - if op[0] != 0x01 || op[0] != 0x02 { + if op[0] != 0x01 && op[0] != 0x02 { warn!(%addr, "hlfs: bad op"); return Err(HlfsError::Proto); }