This commit is contained in:
Nicholas Wehr
2025-08-21 22:18:42 -07:00
parent 5f2955caa2
commit 2a653857aa
3 changed files with 95 additions and 69 deletions

View File

@ -4,7 +4,9 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use parking_lot::Mutex;
use reth_tracing::tracing::{debug, info, trace, warn};
use std::{
fs, io,
fs,
hash::{Hash, Hasher},
io,
net::SocketAddr,
path::{Path, PathBuf},
sync::Arc,
@ -19,6 +21,13 @@ use tokio::{
type Result<T, E = HlfsError> = std::result::Result<T, E>;
pub const OP_REQ_BLOCK: u8 = 0x01;
pub const OP_RES_BLOCK: u8 = 0x02;
pub const OP_REQ_MAX_BLOCK: u8 = 0x03;
pub const OP_RES_MAX_BLOCK: u8 = 0x04;
pub const OP_ERR_TOO_BUSY: u8 = 0x05;
pub const OP_ERR_NOT_FOUND: u8 = 0x06;
#[derive(Error, Debug)]
pub enum HlfsError {
#[error("io: {0}")]
@ -53,23 +62,45 @@ async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> {
}
/// Client: tries each peer once; rotates starting index per call
#[derive(Debug, Copy, Clone)]
pub struct PeerRecord {
pub addr: SocketAddr,
pub max_block: u64,
}
impl PartialEq for PeerRecord {
fn eq(&self, o: &Self) -> bool {
self.addr == o.addr
}
}
impl Eq for PeerRecord {}
impl Hash for PeerRecord {
fn hash<H: Hasher>(&self, s: &mut H) {
self.addr.hash(s);
}
}
#[derive(Clone)]
pub struct Client {
peers: Arc<Mutex<Vec<SocketAddr>>>,
root: PathBuf,
peers: Arc<Mutex<Vec<PeerRecord>>>,
timeout: Duration,
max_block: u64,
}
impl Client {
pub fn new(peers: Vec<SocketAddr>) -> Self {
Self { peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(3) }
pub fn new(root: impl Into<PathBuf>, peers: Vec<PeerRecord>) -> Self {
let root: PathBuf = root.into();
let n = find_max_number_file(&root).unwrap();
Self { root, peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(3), max_block: n }
}
pub fn update_peers(&self, peers: Vec<SocketAddr>) {
pub fn update_peers(&self, peers: Vec<PeerRecord>) {
*self.peers.lock() = peers;
}
pub fn with_timeout(mut self, d: Duration) -> Self {
self.timeout = d;
self
}
pub async fn get_block(&self, number: u64, rr_index: usize) -> Result<Vec<u8>, HlfsError> {
pub async fn wants_block(&self, number: u64, rr_index: usize) -> Result<Vec<u8>, HlfsError> {
let peers = self.peers.lock().clone();
debug!(peer_count = peers.len(), "hlfs: peers");
if peers.is_empty() {
@ -80,26 +111,26 @@ impl Client {
let mut last_busy: Option<u32> = None;
while let Some(i) = all.next() {
let addr = peers[i];
trace!(%addr, "hlfs: dialing");
match timeout(self.timeout, TcpStream::connect(addr)).await {
trace!(%addr.addr, "hlfs: dialing");
match timeout(self.timeout, TcpStream::connect(addr.addr)).await {
Err(_) => continue,
Ok(Err(_)) => continue,
Ok(Ok(mut sock)) => {
let mut req = BytesMut::with_capacity(1 + 8);
req.put_u8(0x01); // GET
req.put_u8(OP_REQ_BLOCK);
put_u64(&mut req, number);
if let Err(e) = sock.write_all(&req).await {
debug!(%addr, "hlfs: write err: {e}");
debug!(%addr.addr, "hlfs: write err: {e}");
continue;
}
let mut op = [0u8; 1];
if let Err(e) = timeout(self.timeout, sock.read_exact(&mut op)).await {
debug!(%addr, "hlfs: read op timeout {e:?}");
debug!(%addr.addr, "hlfs: read op timeout {e:?}");
continue;
}
let op = op[0];
match op {
0x03 => {
OP_RES_BLOCK => {
// DATA
let mut len = [0u8; 4];
sock.read_exact(&mut len).await?;
@ -108,13 +139,13 @@ impl Client {
sock.read_exact(&mut buf).await?;
return Ok(buf);
}
0x04 => {
OP_ERR_TOO_BUSY => {
let mut ms = [0u8; 4];
sock.read_exact(&mut ms).await?;
last_busy = Some(u32::from_le_bytes(ms));
continue;
}
0x06 => {
OP_ERR_NOT_FOUND => {
return Err(HlfsError::NotFound);
}
_ => {
@ -197,7 +228,7 @@ impl Server {
let (mut sock, addr) = lst.accept().await?;
if *self.inflight.lock() >= self.max_conns {
let mut b = BytesMut::with_capacity(5);
b.put_u8(0x04);
b.put_u8(OP_ERR_TOO_BUSY);
put_u32(&mut b, self.busy_retry_ms);
let _ = sock.write_all(&b).await;
continue;
@ -222,18 +253,10 @@ async fn handle_conn(
) -> Result<(), HlfsError> {
let mut op = [0u8; 1];
sock.read_exact(&mut op).await?;
if op[0] != 0x01 && op[0] != 0x02 {
if op[0] != OP_REQ_BLOCK {
warn!(%addr, "hlfs: bad op");
return Err(HlfsError::Proto);
}
if op[0] == 0x02 {
let mut b = BytesMut::with_capacity(1 + 8 + 4);
b.put_u8(0x05);
put_u64(&mut b, max_block);
put_u32(&mut b, busy_ms);
let _ = sock.write_all(&b).await;
return Ok(());
}
let mut num = [0u8; 8];
sock.read_exact(&mut num).await?;
@ -252,14 +275,14 @@ async fn handle_conn(
match fs::read(&path) {
Ok(data) => {
let mut b = BytesMut::with_capacity(1 + 4 + data.len());
b.put_u8(0x03);
b.put_u8(OP_RES_BLOCK);
put_u32(&mut b, data.len() as u32);
b.extend_from_slice(&data);
let _ = sock.write_all(&b).await;
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
let mut b = BytesMut::with_capacity(1);
b.put_u8(0x06); // not found
b.put_u8(OP_ERR_NOT_FOUND);
let _ = sock.write_all(&b).await;
}
Err(e) => {
@ -275,25 +298,19 @@ async fn handle_conn(
pub struct Backfiller {
client: Client,
root: PathBuf,
hist_threshold: u64,
}
impl Backfiller {
pub fn new(client: Client, root: impl Into<PathBuf>, hist_threshold: u64) -> Self {
Self { client, root: root.into(), hist_threshold }
pub fn new(client: Client, root: impl Into<PathBuf>) -> Self {
Self { client, root: root.into() }
}
pub fn set_peers(&self, peers: Vec<SocketAddr>) {
pub fn set_peers(&self, peers: Vec<PeerRecord>) {
self.client.update_peers(peers);
}
pub async fn fetch_if_missing(
&self,
number: u64,
head: u64,
rr_index: usize,
) -> Result<Option<usize>, HlfsError> {
if head >= self.hist_threshold && number + self.hist_threshold > head {
//debug!(block=number, "hlfs: skip");
return Ok(None);
}
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
let f = (n / 1_000_000) * 1_000_000;
let s = (n / 1_000) * 1_000;
@ -303,7 +320,7 @@ impl Backfiller {
trace!(block = number, "hlfs: already have");
return Ok(None);
}
match self.client.get_block(number, rr_index).await {
match self.client.wants_block(number, rr_index).await {
Err(HlfsError::NotFound) => Ok(None),
Err(HlfsError::Busy(ms)) => {
tokio::time::sleep(Duration::from_millis(ms as u64)).await;