diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index e5b318f42..9a8c88024 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -4,7 +4,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use parking_lot::Mutex; use reth_tracing::tracing::trace; use std::{ - io, + fs, io, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, @@ -12,7 +12,6 @@ use std::{ }; use thiserror::Error; use tokio::{ - fs, io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, time::{sleep, timeout}, @@ -48,7 +47,7 @@ fn put_u32(b: &mut BytesMut, v: u32) { async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { if let Some(parent) = Path::new(path).parent() { - fs::create_dir_all(parent).await + fs::create_dir_all(parent) } else { Ok(()) } @@ -123,10 +122,11 @@ impl Client { } else if any_timeout { Err(HlfsError::Timeout) } else { - Err(HlfsError::Unknown) // Fallback for other errors + Err(HlfsError::Unknown) // Fallback for other errors } } } + async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> { debug!(%addr, "hlfs: connect"); let mut s = TcpStream::connect(addr).await?; @@ -137,7 +137,7 @@ async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> s.write_all(&buf).await?; let mut op = [0u8; 1]; s.read_exact(&mut op).await?; - debug!(code=op[0], "hlfs: opcode"); + debug!(code = op[0], "hlfs: opcode"); match op[0] { 0x02 => { let mut meta = [0u8; 12]; @@ -163,6 +163,36 @@ async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> } } +fn find_max_number_file(root: &Path) -> Result { + fn parse_num(name: &str) -> Option { + name.strip_suffix(".rmp.lz4")?.parse::().ok() + } + + fn walk(dir: &Path, best: &mut Option) -> io::Result<()> { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + let ft = entry.file_type()?; + if ft.is_dir() { + walk(&path, best)?; + } else if ft.is_file() { + if let Some(name) = path.file_name().and_then(|s| s.to_str()) { + if let Some(n) = parse_num(name) { + if best.map_or(true, |(m, _)| n > m) { + *best = Some((n, path.clone())); + } + } + } + } + } + Ok(()) + } + + let mut best = None; + walk(root, &mut best)?; + Ok(best.expect("cannot find block files")) +} + /// Server: serves `{root}/{number}.rlp`. pub struct Server { bind: SocketAddr, @@ -170,15 +200,19 @@ pub struct Server { max_conns: usize, inflight: Arc>, busy_retry_ms: u32, + max_block: u64, } + impl Server { pub fn new(bind: SocketAddr, root: impl Into) -> Self { + let n = find_max_number_file(&root.into()).unwrap(); Self { bind, root: root.into(), max_conns: 512, inflight: Arc::new(Mutex::new(0)), busy_retry_ms: 100, + max_block: n.unwrap(), } } pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self { @@ -187,7 +221,7 @@ impl Server { self } pub async fn run(self) -> Result<(), HlfsError> { - fs::create_dir_all(&self.root).await.ok(); + fs::create_dir_all(&self.root).ok(); info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening"); let lst = TcpListener::bind(self.bind).await?; loop { @@ -204,7 +238,7 @@ impl Server { let inflight = self.inflight.clone(); let busy = self.busy_retry_ms; tokio::spawn(async move { - let _ = handle_conn(&mut sock, &root, busy, addr).await; + let _ = handle_conn(&mut sock, &root, busy, addr, self.max_block).await; *inflight.lock() -= 1; }); } @@ -215,13 +249,21 @@ async fn handle_conn( root: &Path, busy_ms: u32, addr: SocketAddr, + max_block: u64, ) -> Result<(), HlfsError> { let mut op = [0u8; 1]; sock.read_exact(&mut op).await?; - if op[0] != 0x01 { + if op[0] != 0x01 || op[0] != 0x02 { warn!(%addr, "hlfs: bad op"); return Err(HlfsError::Proto); } + if op[0] == 0x02 { + let mut b = BytesMut::with_capacity(1 + 8 + 4); + b.put_u8(0x05); + put_u64(&mut b, max_block); + return Ok(()); + } + let mut nb = [0u8; 8]; sock.read_exact(&mut nb).await?; let number = u64::from_le_bytes(nb);