This commit is contained in:
Nicholas Wehr
2025-08-14 04:59:16 +00:00
parent e2045a195c
commit ec417f9bf4
2 changed files with 34 additions and 4 deletions

View File

@ -42,6 +42,14 @@ fn put_u32(b: &mut BytesMut, v: u32) {
b.put_u32_le(v)
}
async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> {
if let Some(parent) = Path::new(path).parent() {
fs::create_dir_all(parent).await
} else {
Ok(())
}
}
/// Client: tries each peer once; rotates starting index per call.
#[derive(Clone)]
pub struct Client {
@ -62,6 +70,7 @@ impl Client {
}
pub async fn get_block(&self, number: u64, rr_index: usize) -> Result<Vec<u8>, HlfsError> {
let peers = self.peers.lock().clone();
debug!(peer_count = peers.len(), "hlfs: peers");
if peers.is_empty() {
debug!(block = number, "hlfs: no peers");
return Err(HlfsError::Timeout);
@ -69,10 +78,11 @@ impl Client {
for t in 0..peers.len() {
let i = (rr_index + t) % peers.len();
let addr = peers[i];
trace!(block=number, %addr, "hlfs: try");
debug!(block=number, %addr, "hlfs: try");
let start = std::time::Instant::now();
match timeout(self.timeout, fetch_once(addr, number)).await {
Err(_) => {
debug!(block=number, %addr, "hlfs: timeout");
debug!(elapsed=?start.elapsed(), %addr, block=number, "hlfs: timeout");
continue;
}
Ok(Err(HlfsError::Busy(d))) => {
@ -98,6 +108,7 @@ impl Client {
}
}
async fn fetch_once(addr: SocketAddr, number: u64) -> Result<Vec<u8>, HlfsError> {
debug!(%addr, "hlfs: connect");
let mut s = TcpStream::connect(addr).await?;
let mut buf = BytesMut::with_capacity(9);
buf.put_u8(0x01);
@ -255,12 +266,20 @@ impl Backfiller {
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy());
if fs::try_exists(&path).await? {
return Ok(None);
} else {
ensure_parent_dirs(&path).await?;
}
debug!(block = number, "hlfs: going to get_block from client");
match self.client.get_block(number, rr_index).await {
Ok(bytes) => {
debug!(block = number, "hlfs: YAY! got block from client");
let tmp = format!("{}/{f}/{s}/{number}.rmp.lz4.part", self.root.to_string_lossy());
ensure_parent_dirs(&tmp).await?;
debug!(block = number, path=%tmp, "hlfs: writing file");
fs::write(&tmp, &bytes).await?;
debug!(block = number, from=%tmp, to=%path, "hlfs: moving file");
fs::rename(&tmp, &path).await?;
info!(block=number, bytes=bytes.len(), path=%path, "hlfs: wrote");
Ok(Some(bytes.len()))