added new max block opcode

This commit is contained in:
Nicholas Wehr
2025-08-21 18:35:30 -07:00
parent 530447e637
commit 249e3194dd

View File

@ -4,7 +4,7 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use parking_lot::Mutex; use parking_lot::Mutex;
use reth_tracing::tracing::trace; use reth_tracing::tracing::trace;
use std::{ use std::{
io, fs, io,
net::SocketAddr, net::SocketAddr,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
@ -12,7 +12,6 @@ use std::{
}; };
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
fs,
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
time::{sleep, timeout}, time::{sleep, timeout},
@ -48,7 +47,7 @@ fn put_u32(b: &mut BytesMut, v: u32) {
async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> {
if let Some(parent) = Path::new(path).parent() { if let Some(parent) = Path::new(path).parent() {
fs::create_dir_all(parent).await fs::create_dir_all(parent)
} else { } else {
Ok(()) Ok(())
} }
@ -127,6 +126,7 @@ impl Client {
} }
} }
} }
async fn fetch_once(addr: SocketAddr, number: u64) -> Result<Vec<u8>, HlfsError> { async fn fetch_once(addr: SocketAddr, number: u64) -> Result<Vec<u8>, HlfsError> {
debug!(%addr, "hlfs: connect"); debug!(%addr, "hlfs: connect");
let mut s = TcpStream::connect(addr).await?; let mut s = TcpStream::connect(addr).await?;
@ -163,6 +163,36 @@ async fn fetch_once(addr: SocketAddr, number: u64) -> Result<Vec<u8>, HlfsError>
} }
} }
fn find_max_number_file(root: &Path) -> Result<u64> {
fn parse_num(name: &str) -> Option<u64> {
name.strip_suffix(".rmp.lz4")?.parse::<u64>().ok()
}
fn walk(dir: &Path, best: &mut Option<u64>) -> io::Result<()> {
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
let ft = entry.file_type()?;
if ft.is_dir() {
walk(&path, best)?;
} else if ft.is_file() {
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
if let Some(n) = parse_num(name) {
if best.map_or(true, |(m, _)| n > m) {
*best = Some((n, path.clone()));
}
}
}
}
}
Ok(())
}
let mut best = None;
walk(root, &mut best)?;
Ok(best.expect("cannot find block files"))
}
/// Server: serves `{root}/{number}.rlp`. /// Server: serves `{root}/{number}.rlp`.
pub struct Server { pub struct Server {
bind: SocketAddr, bind: SocketAddr,
@ -170,15 +200,19 @@ pub struct Server {
max_conns: usize, max_conns: usize,
inflight: Arc<Mutex<usize>>, inflight: Arc<Mutex<usize>>,
busy_retry_ms: u32, busy_retry_ms: u32,
max_block: u64,
} }
impl Server { impl Server {
pub fn new(bind: SocketAddr, root: impl Into<PathBuf>) -> Self { pub fn new(bind: SocketAddr, root: impl Into<PathBuf>) -> Self {
let n = find_max_number_file(&root.into()).unwrap();
Self { Self {
bind, bind,
root: root.into(), root: root.into(),
max_conns: 512, max_conns: 512,
inflight: Arc::new(Mutex::new(0)), inflight: Arc::new(Mutex::new(0)),
busy_retry_ms: 100, busy_retry_ms: 100,
max_block: n.unwrap(),
} }
} }
pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self { pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self {
@ -187,7 +221,7 @@ impl Server {
self self
} }
pub async fn run(self) -> Result<(), HlfsError> { pub async fn run(self) -> Result<(), HlfsError> {
fs::create_dir_all(&self.root).await.ok(); fs::create_dir_all(&self.root).ok();
info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening"); info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening");
let lst = TcpListener::bind(self.bind).await?; let lst = TcpListener::bind(self.bind).await?;
loop { loop {
@ -204,7 +238,7 @@ impl Server {
let inflight = self.inflight.clone(); let inflight = self.inflight.clone();
let busy = self.busy_retry_ms; let busy = self.busy_retry_ms;
tokio::spawn(async move { tokio::spawn(async move {
let _ = handle_conn(&mut sock, &root, busy, addr).await; let _ = handle_conn(&mut sock, &root, busy, addr, self.max_block).await;
*inflight.lock() -= 1; *inflight.lock() -= 1;
}); });
} }
@ -215,13 +249,21 @@ async fn handle_conn(
root: &Path, root: &Path,
busy_ms: u32, busy_ms: u32,
addr: SocketAddr, addr: SocketAddr,
max_block: u64,
) -> Result<(), HlfsError> { ) -> Result<(), HlfsError> {
let mut op = [0u8; 1]; let mut op = [0u8; 1];
sock.read_exact(&mut op).await?; sock.read_exact(&mut op).await?;
if op[0] != 0x01 { if op[0] != 0x01 || op[0] != 0x02 {
warn!(%addr, "hlfs: bad op"); warn!(%addr, "hlfs: bad op");
return Err(HlfsError::Proto); return Err(HlfsError::Proto);
} }
if op[0] == 0x02 {
let mut b = BytesMut::with_capacity(1 + 8 + 4);
b.put_u8(0x05);
put_u64(&mut b, max_block);
return Ok(());
}
let mut nb = [0u8; 8]; let mut nb = [0u8; 8];
sock.read_exact(&mut nb).await?; sock.read_exact(&mut nb).await?;
let number = u64::from_le_bytes(nb); let number = u64::from_le_bytes(nb);