This commit is contained in:
Nicholas Wehr
2025-08-21 19:03:21 -07:00
parent 249e3194dd
commit 022dfd1546

View File

@ -4,7 +4,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use parking_lot::Mutex; use parking_lot::Mutex;
use reth_tracing::tracing::trace; use reth_tracing::tracing::trace;
use std::{ use std::{
fs, io,
net::SocketAddr, net::SocketAddr,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
@ -12,10 +11,11 @@ use std::{
}; };
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
io::{AsyncReadExt, AsyncWriteExt}, fs, io,
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
time::{sleep, timeout}, time::{sleep, timeout},
}; };
use tokio::fs::DirEntry;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
#[derive(Debug, Error)] #[derive(Debug, Error)]
@ -47,7 +47,7 @@ fn put_u32(b: &mut BytesMut, v: u32) {
async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> {
if let Some(parent) = Path::new(path).parent() { if let Some(parent) = Path::new(path).parent() {
fs::create_dir_all(parent) fs::create_dir_all(parent).await
} else { } else {
Ok(()) Ok(())
} }
@ -163,34 +163,61 @@ async fn fetch_once(addr: SocketAddr, number: u64) -> Result<Vec<u8>, HlfsError>
} }
} }
fn find_max_number_file(root: &Path) -> Result<u64> { fn parse_block_file(name: &str) -> Option<u64> {
fn parse_num(name: &str) -> Option<u64> { // expects "<number>.rmp.lz4"
name.strip_suffix(".rmp.lz4")?.parse::<u64>().ok() let (stem, ext) = name.rsplit_once('.')?;
if ext != "lz4" {
return None;
}
if !stem.ends_with(".rmp") {
return None;
}
let stem = stem.strip_suffix(".rmp")?;
stem.parse::<u64>().ok()
}
// Asynchronously find the largest block file under the 2-level shard layout:
// {root}/{floor_to_million}/{floor_to_thousand}/{number}.rmp.lz4
pub async fn find_max_block(root: &Path) -> io::Result<u64> {
let mut max_num: Option<u64> = None;
let mut top = fs::read_dir(root).await?;
while let Some(million_dir) = top.next_entry().await? {
if !million_dir.file_type().await?.is_dir() {
continue;
}
// Fast reject: top-level dir must parse to u64 (but we still scan if not—optional)
if million_dir.file_name().to_string_lossy().parse::<u64>().is_err() {
continue;
} }
fn walk(dir: &Path, best: &mut Option<u64>) -> io::Result<()> { let mut mid = fs::read_dir(million_dir.path()).await?;
for entry in fs::read_dir(dir)? { while let Some(thousand_dir) = mid.next_entry().await? {
let entry = entry?; if !thousand_dir.file_type().await?.is_dir() {
let path = entry.path(); continue;
let ft = entry.file_type()?;
if ft.is_dir() {
walk(&path, best)?;
} else if ft.is_file() {
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
if let Some(n) = parse_num(name) {
if best.map_or(true, |(m, _)| n > m) {
*best = Some((n, path.clone()));
} }
} // Optional reject again for dir-name parse to u64
} if thousand_dir.file_name().to_string_lossy().parse::<u64>().is_err() {
} continue;
}
Ok(())
} }
let mut best = None; let mut leaf = fs::read_dir(thousand_dir.path()).await?;
walk(root, &mut best)?; while let Some(ent) = leaf.next_entry().await? {
Ok(best.expect("cannot find block files")) if !ent.file_type().await?.is_file() {
continue;
}
if let Some(name) = ent.file_name().to_str() {
if let Some(n) = parse_block_file(name) {
if max_num.map_or(true, |m| n > m) {
max_num = Some(n);
}
}
}
}
}
}
max_num.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no block files found"))
} }
/// Server: serves `{root}/{number}.rlp`. /// Server: serves `{root}/{number}.rlp`.
@ -204,24 +231,28 @@ pub struct Server {
} }
impl Server { impl Server {
pub fn new(bind: SocketAddr, root: impl Into<PathBuf>) -> Self { pub async fn new(bind: SocketAddr, root: impl Into<PathBuf>) -> Result<Self, HlfsError> {
let n = find_max_number_file(&root.into()).unwrap(); let root = root.into();
Self { fs::create_dir_all(&root).await?; // async, no unwrap/ok()
let max_block = find_max_block(&root).await?; // async discovery
Ok(Self {
bind, bind,
root: root.into(), root,
max_conns: 512, max_conns: 512,
inflight: Arc::new(Mutex::new(0)), inflight: Arc::new(parking_lot::Mutex::new(0)),
busy_retry_ms: 100, busy_retry_ms: 100,
max_block: n.unwrap(), max_block,
} })
} }
pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self { pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self {
self.max_conns = max_conns; self.max_conns = max_conns;
self.busy_retry_ms = busy_retry_ms; self.busy_retry_ms = busy_retry_ms;
self self
} }
pub async fn run(self) -> Result<(), HlfsError> { pub async fn run(self) -> Result<(), HlfsError> {
fs::create_dir_all(&self.root).ok(); fs::create_dir_all(&self.root).await.ok();
info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening"); info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening");
let lst = TcpListener::bind(self.bind).await?; let lst = TcpListener::bind(self.bind).await?;
loop { loop {
@ -253,7 +284,7 @@ async fn handle_conn(
) -> Result<(), HlfsError> { ) -> Result<(), HlfsError> {
let mut op = [0u8; 1]; let mut op = [0u8; 1];
sock.read_exact(&mut op).await?; sock.read_exact(&mut op).await?;
if op[0] != 0x01 || op[0] != 0x02 { if op[0] != 0x01 && op[0] != 0x02 {
warn!(%addr, "hlfs: bad op"); warn!(%addr, "hlfs: bad op");
return Err(HlfsError::Proto); return Err(HlfsError::Proto);
} }