diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index 914f3f019..c8d195282 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -2,8 +2,9 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use parking_lot::Mutex; -use reth_tracing::tracing::trace; +use reth_tracing::tracing::{debug, info, trace, warn}; use std::{ + fs, io, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, @@ -11,29 +12,27 @@ use std::{ }; use thiserror::Error; use tokio::{ - fs, io, + io::{AsyncReadExt, AsyncWriteExt}, net::{TcpListener, TcpStream}, - time::{sleep, timeout}, + time::timeout, }; -use tokio::fs::DirEntry; -use tracing::{debug, info, warn}; -#[derive(Debug, Error)] +type Result = std::result::Result; + +#[derive(Error, Debug)] pub enum HlfsError { #[error("io: {0}")] Io(#[from] io::Error), - #[error("timeout")] - Timeout, - #[error("not found")] - NotFound, - #[error("busy {0:?}")] - Busy(Duration), - #[error("protocol")] + #[error("proto")] Proto, #[error("no peers")] NoPeers, - #[error("unknown")] - Unknown, + #[error("timeout")] + Timeout, + #[error("busy: retry_ms={0}")] + Busy(u32), + #[error("not found")] + NotFound, } #[inline] @@ -47,22 +46,21 @@ fn put_u32(b: &mut BytesMut, v: u32) { async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { if let Some(parent) = Path::new(path).parent() { - fs::create_dir_all(parent).await + fs::create_dir_all(parent) } else { Ok(()) } } -/// Client: tries each peer once; rotates starting index per call. +/// Client: tries each peer once; rotates starting index per call #[derive(Clone)] pub struct Client { peers: Arc>>, timeout: Duration, - backoff_ms: u32, } impl Client { pub fn new(peers: Vec) -> Self { - Self { peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(5), backoff_ms: 50 } + Self { peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(3) } } pub fn update_peers(&self, peers: Vec) { *self.peers.lock() = peers; @@ -78,146 +76,89 @@ impl Client { return Err(HlfsError::NoPeers); } - let mut all_not_found = true; - let mut any_timeout = false; - - for t in 0..peers.len() { - let i = (rr_index + t) % peers.len(); + let mut all = (0..peers.len()).map(|i| (rr_index + i) % peers.len()); + let mut last_busy: Option = None; + while let Some(i) = all.next() { let addr = peers[i]; - debug!(block=number, %addr, "hlfs: try"); - let start = std::time::Instant::now(); - match timeout(self.timeout, fetch_once(addr, number)).await { - Err(_) => { - debug!(elapsed=?start.elapsed(), %addr, block=number, "hlfs: timeout"); - any_timeout = true; - all_not_found = false; - continue; - } - Ok(Err(HlfsError::Busy(d))) => { - trace!(block=number, %addr, delay_ms=?d, "hlfs: busy"); - sleep(d.min(Duration::from_millis(self.backoff_ms as u64))).await; - all_not_found = false; - continue; - } - Ok(Err(HlfsError::NotFound)) => { - trace!(block=number, %addr, "hlfs: not found"); - // Keep all_not_found as true unless we see other errors - continue; - } - Ok(Err(e)) => { - debug!(block=number, %addr, error=%e, "hlfs: error"); - all_not_found = false; - continue; - } - Ok(Ok(bytes)) => { - info!(block=number, %addr, bytes=bytes.len(), "hlfs: fetched"); - return Ok(bytes); - } - } - } - - // Return the most specific error - if all_not_found { - Err(HlfsError::NotFound) - } else if any_timeout { - Err(HlfsError::Timeout) - } else { - Err(HlfsError::Unknown) // Fallback for other errors - } - } -} - -async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> { - debug!(%addr, "hlfs: connect"); - let mut s = TcpStream::connect(addr).await?; - debug!(%addr, "hlfs: CONNECTED"); - let mut buf = BytesMut::with_capacity(9); - buf.put_u8(0x01); - put_u64(&mut buf, number); - s.write_all(&buf).await?; - let mut op = [0u8; 1]; - s.read_exact(&mut op).await?; - debug!(code = op[0], "hlfs: opcode"); - match op[0] { - 0x02 => { - let mut meta = [0u8; 12]; - s.read_exact(&mut meta).await?; - let mut m = Bytes::from(meta.to_vec()); - let _n = m.get_u64_le(); - let len = m.get_u32_le() as usize; - let mut data = vec![0u8; len]; - s.read_exact(&mut data).await?; - Ok(data) - } - 0x03 => { - let mut _n = [0u8; 8]; - let _ = s.read_exact(&mut _n).await; - Err(HlfsError::NotFound) - } - 0x04 => { - let mut d = [0u8; 4]; - s.read_exact(&mut d).await?; - Err(HlfsError::Busy(Duration::from_millis(u32::from_le_bytes(d) as u64))) - } - _ => Err(HlfsError::Proto), - } -} - -fn parse_block_file(name: &str) -> Option { - // expects ".rmp.lz4" - let (stem, ext) = name.rsplit_once('.')?; - if ext != "lz4" { - return None; - } - if !stem.ends_with(".rmp") { - return None; - } - let stem = stem.strip_suffix(".rmp")?; - stem.parse::().ok() -} - -// Asynchronously find the largest block file under the 2-level shard layout: -// {root}/{floor_to_million}/{floor_to_thousand}/{number}.rmp.lz4 -pub async fn find_max_block(root: &Path) -> io::Result { - let mut max_num: Option = None; - - let mut top = fs::read_dir(root).await?; - while let Some(million_dir) = top.next_entry().await? { - if !million_dir.file_type().await?.is_dir() { - continue; - } - // Fast reject: top-level dir must parse to u64 (but we still scan if not—optional) - if million_dir.file_name().to_string_lossy().parse::().is_err() { - continue; - } - - let mut mid = fs::read_dir(million_dir.path()).await?; - while let Some(thousand_dir) = mid.next_entry().await? { - if !thousand_dir.file_type().await?.is_dir() { - continue; - } - // Optional reject again for dir-name parse to u64 - if thousand_dir.file_name().to_string_lossy().parse::().is_err() { - continue; - } - - let mut leaf = fs::read_dir(thousand_dir.path()).await?; - while let Some(ent) = leaf.next_entry().await? { - if !ent.file_type().await?.is_file() { - continue; - } - if let Some(name) = ent.file_name().to_str() { - if let Some(n) = parse_block_file(name) { - if max_num.map_or(true, |m| n > m) { - max_num = Some(n); + trace!(%addr, "hlfs: dialing"); + match timeout(self.timeout, TcpStream::connect(addr)).await { + Err(_) => continue, + Ok(Err(_)) => continue, + Ok(Ok(mut sock)) => { + let mut req = BytesMut::with_capacity(1 + 8); + req.put_u8(0x01); // GET + put_u64(&mut req, number); + if let Err(e) = sock.write_all(&req).await { + debug!(%addr, "hlfs: write err: {e}"); + continue; + } + let mut op = [0u8; 1]; + if let Err(e) = timeout(self.timeout, sock.read_exact(&mut op)).await { + debug!(%addr, "hlfs: read op timeout {e:?}"); + continue; + } + let op = op[0]; + match op { + 0x03 => { + // DATA + let mut len = [0u8; 4]; + sock.read_exact(&mut len).await?; + let len = u32::from_le_bytes(len) as usize; + let mut buf = vec![0u8; len]; + sock.read_exact(&mut buf).await?; + return Ok(buf); + } + 0x04 => { + let mut ms = [0u8; 4]; + sock.read_exact(&mut ms).await?; + last_busy = Some(u32::from_le_bytes(ms)); + continue; + } + 0x06 => { + return Err(HlfsError::NotFound); + } + _ => { + continue; } } } } } + if let Some(ms) = last_busy { + return Err(HlfsError::Busy(ms)); + } + Err(HlfsError::NotFound) + } +} + +fn find_max_number_file(root: &Path) -> Result { + fn parse_num(name: &str) -> Option { + name.strip_suffix(".rmp.lz4")?.parse::().ok() } - max_num.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no block files found")) + fn walk(dir: &Path, best: &mut Option) -> io::Result<()> { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + let ft = entry.file_type()?; + if ft.is_dir() { + walk(&path, best)?; + } else if ft.is_file() { + if let Some(name) = path.file_name().and_then(|s| s.to_str()) { + if let Some(n) = parse_num(name) { + if best.map_or(true, |b| n > b) { + *best = Some(n); + } + } + } + } + } + Ok(()) + } + + let mut best = None; + walk(root, &mut best)?; + Ok(best.expect("cannot find block files")) } /// Server: serves `{root}/{number}.rlp`. @@ -231,28 +172,25 @@ pub struct Server { } impl Server { - pub async fn new(bind: SocketAddr, root: impl Into) -> Result { - let root = root.into(); - fs::create_dir_all(&root).await?; // async, no unwrap/ok() - let max_block = find_max_block(&root).await?; // async discovery - - Ok(Self { + pub fn new(bind: SocketAddr, root: impl Into) -> Self { + let root: PathBuf = root.into(); + let n = find_max_number_file(&root).unwrap(); + Self { bind, root, max_conns: 512, - inflight: Arc::new(parking_lot::Mutex::new(0)), + inflight: Arc::new(Mutex::new(0)), busy_retry_ms: 100, - max_block, - }) + max_block: n, + } } - pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self { self.max_conns = max_conns; self.busy_retry_ms = busy_retry_ms; self } pub async fn run(self) -> Result<(), HlfsError> { - fs::create_dir_all(&self.root).await.ok(); + fs::create_dir_all(&self.root).ok(); info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening"); let lst = TcpListener::bind(self.bind).await?; loop { @@ -292,42 +230,44 @@ async fn handle_conn( let mut b = BytesMut::with_capacity(1 + 8 + 4); b.put_u8(0x05); put_u64(&mut b, max_block); + put_u32(&mut b, busy_ms); + let _ = sock.write_all(&b).await; return Ok(()); } - let mut nb = [0u8; 8]; - sock.read_exact(&mut nb).await?; - let number = u64::from_le_bytes(nb); + let mut num = [0u8; 8]; + sock.read_exact(&mut num).await?; + let number = u64::from_le_bytes(num); + let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 let f = (n / 1_000_000) * 1_000_000; let s = (n / 1_000) * 1_000; let path = format!("{}/{f}/{s}/{number}.rmp.lz4", root.to_string_lossy()); - match fs::read(&path).await { + + trace!(%addr, number, %path, "hlfs: req"); + if let Err(e) = ensure_parent_dirs(&path).await { + warn!(%addr, %path, "hlfs: mkdirs failed: {e}"); + } + + match fs::read(&path) { Ok(data) => { - debug!("hlfs: found path [{path}]"); - let mut b = BytesMut::with_capacity(1 + 8 + 4 + data.len()); - b.put_u8(0x02); - put_u64(&mut b, number); + let mut b = BytesMut::with_capacity(1 + 4 + data.len()); + b.put_u8(0x03); put_u32(&mut b, data.len() as u32); b.extend_from_slice(&data); - sock.write_all(&b).await?; - Ok(()) + let _ = sock.write_all(&b).await; } Err(e) if e.kind() == io::ErrorKind::NotFound => { - let mut b = [0u8; 9]; - b[0] = 0x03; - b[1..9].copy_from_slice(&number.to_le_bytes()); - sock.write_all(&b).await?; - Ok(()) - } - Err(_) => { - let mut b = BytesMut::with_capacity(5); - b.put_u8(0x04); - put_u32(&mut b, busy_ms); + let mut b = BytesMut::with_capacity(1); + b.put_u8(0x06); // not found let _ = sock.write_all(&b).await; - Err(HlfsError::Io(io::Error::new(io::ErrorKind::Other, "fs error"))) + } + Err(e) => { + warn!(%addr, %path, "hlfs: read error: {e}"); + let _ = sock.shutdown().await; } } + Ok(()) } /// Backfiller: ask client per missing block; rotate peers every block. @@ -359,134 +299,27 @@ impl Backfiller { let s = (n / 1_000) * 1_000; let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy()); - if fs::try_exists(&path).await? { + if Path::new(&path).exists() { + trace!(block = number, "hlfs: already have"); return Ok(None); - } else { - ensure_parent_dirs(&path).await?; } - - debug!(block = number, "hlfs: going to get_block from client"); match self.client.get_block(number, rr_index).await { - Ok(bytes) => { - debug!(block = number, "hlfs: YAY! got block from client"); - let tmp = format!("{}/{f}/{s}/{number}.rmp.lz4.part", self.root.to_string_lossy()); - ensure_parent_dirs(&tmp).await?; - - debug!(block = number, path=%tmp, "hlfs: writing file"); - fs::write(&tmp, &bytes).await?; - debug!(block = number, from=%tmp, to=%path, "hlfs: moving file"); - fs::rename(&tmp, &path).await?; - info!(block=number, bytes=bytes.len(), path=%path, "hlfs: wrote"); - Ok(Some(bytes.len())) + Err(HlfsError::NotFound) => Ok(None), + Err(HlfsError::Busy(ms)) => { + tokio::time::sleep(Duration::from_millis(ms as u64)).await; + Ok(None) } - Err(e) => { - debug!(block=number, error=%e, "hlfs: fetch failed"); - Err(e) + Err(e) => Err(e), + Ok(data) => { + if let Err(e) = ensure_parent_dirs(&path).await { + warn!(%path, "hlfs: mkdirs failed: {e}"); + } + if let Err(e) = fs::write(&path, &data) { + warn!(%path, "hlfs: write failed: {e}"); + return Ok(None); + } + Ok(Some(data.len())) } } } } - -#[cfg(test)] -mod tests { - use super::*; - use rand::{rngs::StdRng, Rng, SeedableRng}; - fn sample(n: u64) -> Vec { - vec![((n as usize) % 251) as u8; 3072] - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn serve_and_fetch_rr() { - reth_tracing::init_test_tracing(); - let dir = tempfile::tempdir().unwrap(); - for n in 0..100u64 { - fs::write(dir.path().join(format!("{n}.rlp")), sample(n)).await.unwrap(); - } - let s1 = Server::new("127.0.0.1:9597".parse().unwrap(), dir.path()).with_limits(64, 20); - let s2 = Server::new("127.0.0.1:9598".parse().unwrap(), dir.path()).with_limits(64, 20); - tokio::spawn(async move { - let _ = s1.run().await; - }); - tokio::spawn(async move { - let _ = s2.run().await; - }); - tokio::time::sleep(Duration::from_millis(50)).await; - - let client = - Client::new(vec!["127.0.0.1:9597".parse().unwrap(), "127.0.0.1:9598".parse().unwrap()]) - .with_timeout(Duration::from_secs(1)); - let a = client.get_block(10, 0).await.unwrap(); - let b = client.get_block(11, 1).await.unwrap(); - assert_eq!(a.len(), 3072); - assert_eq!(b.len(), 3072); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn backfill_only_when_older_than_threshold() { - reth_tracing::init_test_tracing(); - let src = tempfile::tempdir().unwrap(); - let dst = tempfile::tempdir().unwrap(); - fs::write(src.path().join("5.rlp"), sample(5)).await.unwrap(); - let srv = Server::new("127.0.0.1:9599".parse().unwrap(), src.path()); - tokio::spawn(async move { - let _ = srv.run().await; - }); - tokio::time::sleep(Duration::from_millis(50)).await; - - let bf = Backfiller::new( - Client::new(vec!["127.0.0.1:9599".parse().unwrap()]), - dst.path(), - 5_000, - ); - let got = bf.fetch_if_missing(5, 10_000, 0).await.unwrap(); - assert_eq!(got, Some(3072)); - let skip = bf.fetch_if_missing(9_999, 10_000, 1).await.unwrap(); - assert_eq!(skip, None); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn busy_and_notfound_rotate() { - reth_tracing::init_test_tracing(); - let dir = tempfile::tempdir().unwrap(); - fs::write(dir.path().join("7.rlp"), sample(7)).await.unwrap(); - let s_busy = Server::new("127.0.0.1:9601".parse().unwrap(), dir.path()).with_limits(0, 10); - let s_ok = Server::new("127.0.0.1:9602".parse().unwrap(), dir.path()); - tokio::spawn(async move { - let _ = s_busy.run().await; - }); - tokio::spawn(async move { - let _ = s_ok.run().await; - }); - tokio::time::sleep(Duration::from_millis(50)).await; - - let c = - Client::new(vec!["127.0.0.1:9601".parse().unwrap(), "127.0.0.1:9602".parse().unwrap()]); - let b = c.get_block(7, 0).await.unwrap(); - assert_eq!(b.len(), 3072); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn all_peers_return_not_found() { - reth_tracing::init_test_tracing(); - let dir = tempfile::tempdir().unwrap(); - // Don't create the block file, so all servers will return NotFound - - let s1 = Server::new("127.0.0.1:9603".parse().unwrap(), dir.path()); - let s2 = Server::new("127.0.0.1:9604".parse().unwrap(), dir.path()); - tokio::spawn(async move { - let _ = s1.run().await; - }); - tokio::spawn(async move { - let _ = s2.run().await; - }); - tokio::time::sleep(Duration::from_millis(50)).await; - - let client = - Client::new(vec!["127.0.0.1:9603".parse().unwrap(), "127.0.0.1:9604".parse().unwrap()]) - .with_timeout(Duration::from_secs(1)); - - // Request a block that doesn't exist on any peer - let result = client.get_block(999, 0).await; - assert!(matches!(result, Err(HlfsError::NotFound))); - } -}