This commit is contained in:
Nicholas Wehr
2025-08-21 17:20:20 -07:00
parent 2ff606c32c
commit 530447e637

View File

@ -31,6 +31,10 @@ pub enum HlfsError {
Busy(Duration),
#[error("protocol")]
Proto,
#[error("no peers")]
NoPeers,
#[error("unknown")]
Unknown,
}
#[inline]
@ -72,9 +76,12 @@ impl Client {
let peers = self.peers.lock().clone();
debug!(peer_count = peers.len(), "hlfs: peers");
if peers.is_empty() {
debug!(block = number, "hlfs: no peers");
return Err(HlfsError::Timeout);
return Err(HlfsError::NoPeers);
}
let mut all_not_found = true;
let mut any_timeout = false;
for t in 0..peers.len() {
let i = (rr_index + t) % peers.len();
let addr = peers[i];
@ -83,19 +90,24 @@ impl Client {
match timeout(self.timeout, fetch_once(addr, number)).await {
Err(_) => {
debug!(elapsed=?start.elapsed(), %addr, block=number, "hlfs: timeout");
any_timeout = true;
all_not_found = false;
continue;
}
Ok(Err(HlfsError::Busy(d))) => {
trace!(block=number, %addr, delay_ms=?d, "hlfs: busy");
sleep(d.min(Duration::from_millis(self.backoff_ms as u64))).await;
all_not_found = false;
continue;
}
Ok(Err(HlfsError::NotFound)) => {
trace!(block=number, %addr, "hlfs: not found");
// Keep all_not_found as true unless we see other errors
continue;
}
Ok(Err(e)) => {
debug!(block=number, %addr, error=%e, "hlfs: error");
all_not_found = false;
continue;
}
Ok(Ok(bytes)) => {
@ -104,18 +116,28 @@ impl Client {
}
}
}
Err(HlfsError::Timeout)
// Return the most specific error
if all_not_found {
Err(HlfsError::NotFound)
} else if any_timeout {
Err(HlfsError::Timeout)
} else {
Err(HlfsError::Unknown) // Fallback for other errors
}
}
}
async fn fetch_once(addr: SocketAddr, number: u64) -> Result<Vec<u8>, HlfsError> {
debug!(%addr, "hlfs: connect");
let mut s = TcpStream::connect(addr).await?;
debug!(%addr, "hlfs: CONNECTED");
let mut buf = BytesMut::with_capacity(9);
buf.put_u8(0x01);
put_u64(&mut buf, number);
s.write_all(&buf).await?;
let mut op = [0u8; 1];
s.read_exact(&mut op).await?;
debug!(code=op[0], "hlfs: opcode");
match op[0] {
0x02 => {
let mut meta = [0u8; 12];
@ -369,4 +391,29 @@ mod tests {
let b = c.get_block(7, 0).await.unwrap();
assert_eq!(b.len(), 3072);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn all_peers_return_not_found() {
reth_tracing::init_test_tracing();
let dir = tempfile::tempdir().unwrap();
// Don't create the block file, so all servers will return NotFound
let s1 = Server::new("127.0.0.1:9603".parse().unwrap(), dir.path());
let s2 = Server::new("127.0.0.1:9604".parse().unwrap(), dir.path());
tokio::spawn(async move {
let _ = s1.run().await;
});
tokio::spawn(async move {
let _ = s2.run().await;
});
tokio::time::sleep(Duration::from_millis(50)).await;
let client =
Client::new(vec!["127.0.0.1:9603".parse().unwrap(), "127.0.0.1:9604".parse().unwrap()])
.with_timeout(Duration::from_secs(1));
// Request a block that doesn't exist on any peer
let result = client.get_block(999, 0).await;
assert!(matches!(result, Err(HlfsError::NotFound)));
}
}