From 530447e637c83cc18c48dba7594834ca3c9a9680 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 17:20:20 -0700 Subject: [PATCH] debug --- crates/net/hlfs/src/lib.rs | 53 +++++++++++++++++++++++++++++++++++--- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index f0a8bace6..e5b318f42 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -31,6 +31,10 @@ pub enum HlfsError { Busy(Duration), #[error("protocol")] Proto, + #[error("no peers")] + NoPeers, + #[error("unknown")] + Unknown, } #[inline] @@ -72,9 +76,12 @@ impl Client { let peers = self.peers.lock().clone(); debug!(peer_count = peers.len(), "hlfs: peers"); if peers.is_empty() { - debug!(block = number, "hlfs: no peers"); - return Err(HlfsError::Timeout); + return Err(HlfsError::NoPeers); } + + let mut all_not_found = true; + let mut any_timeout = false; + for t in 0..peers.len() { let i = (rr_index + t) % peers.len(); let addr = peers[i]; @@ -83,19 +90,24 @@ impl Client { match timeout(self.timeout, fetch_once(addr, number)).await { Err(_) => { debug!(elapsed=?start.elapsed(), %addr, block=number, "hlfs: timeout"); + any_timeout = true; + all_not_found = false; continue; } Ok(Err(HlfsError::Busy(d))) => { trace!(block=number, %addr, delay_ms=?d, "hlfs: busy"); sleep(d.min(Duration::from_millis(self.backoff_ms as u64))).await; + all_not_found = false; continue; } Ok(Err(HlfsError::NotFound)) => { trace!(block=number, %addr, "hlfs: not found"); + // Keep all_not_found as true unless we see other errors continue; } Ok(Err(e)) => { debug!(block=number, %addr, error=%e, "hlfs: error"); + all_not_found = false; continue; } Ok(Ok(bytes)) => { @@ -104,18 +116,28 @@ impl Client { } } } - Err(HlfsError::Timeout) + + // Return the most specific error + if all_not_found { + Err(HlfsError::NotFound) + } else if any_timeout { + Err(HlfsError::Timeout) + } else { + Err(HlfsError::Unknown) // Fallback for other errors + } } } async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> { debug!(%addr, "hlfs: connect"); let mut s = TcpStream::connect(addr).await?; + debug!(%addr, "hlfs: CONNECTED"); let mut buf = BytesMut::with_capacity(9); buf.put_u8(0x01); put_u64(&mut buf, number); s.write_all(&buf).await?; let mut op = [0u8; 1]; s.read_exact(&mut op).await?; + debug!(code=op[0], "hlfs: opcode"); match op[0] { 0x02 => { let mut meta = [0u8; 12]; @@ -369,4 +391,29 @@ mod tests { let b = c.get_block(7, 0).await.unwrap(); assert_eq!(b.len(), 3072); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn all_peers_return_not_found() { + reth_tracing::init_test_tracing(); + let dir = tempfile::tempdir().unwrap(); + // Don't create the block file, so all servers will return NotFound + + let s1 = Server::new("127.0.0.1:9603".parse().unwrap(), dir.path()); + let s2 = Server::new("127.0.0.1:9604".parse().unwrap(), dir.path()); + tokio::spawn(async move { + let _ = s1.run().await; + }); + tokio::spawn(async move { + let _ = s2.run().await; + }); + tokio::time::sleep(Duration::from_millis(50)).await; + + let client = + Client::new(vec!["127.0.0.1:9603".parse().unwrap(), "127.0.0.1:9604".parse().unwrap()]) + .with_timeout(Duration::from_secs(1)); + + // Request a block that doesn't exist on any peer + let result = client.get_block(999, 0).await; + assert!(matches!(result, Err(HlfsError::NotFound))); + } }