From 2a653857aa020f47836fd3238f9a759a318a8fe9 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 22:18:42 -0700 Subject: [PATCH] debug --- bin/reth/src/block_ingest.rs | 6 +-- bin/reth/src/share_blocks.rs | 69 ++++++++++++++++------------ crates/net/hlfs/src/lib.rs | 89 +++++++++++++++++++++--------------- 3 files changed, 95 insertions(+), 69 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index 455b5eae6..c6b60a1f5 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -150,7 +150,7 @@ fn date_from_datetime(dt: OffsetDateTime) -> String { } impl BlockIngest { - pub(crate) async fn collect_block(&self, head: u64, height: u64) -> Option { + pub(crate) async fn collect_block(&self, height: u64) -> Option { // info!("Attempting to collect block @ height [{height}]"); // Not a one liner (using .or) to include logs @@ -161,7 +161,7 @@ impl BlockIngest { if let Some(hlfs) = &self.hlfs { //info!("!! HEIGHT [{height}] :: HEAD [{head}]"); - if hlfs.try_fetch_one(height, head).await.ok().flatten().is_some() { + if hlfs.try_fetch_one(height).await.ok().flatten().is_some() { if let Some(block) = self.try_collect_local_block(height).await { info!("Returning HLFS-fetched block @[{height}]"); return Some(block); @@ -298,7 +298,7 @@ impl BlockIngest { self.start_local_ingest_loop(height, current_block_timestamp).await; loop { - let Some(original_block) = self.collect_block(head, height).await else { + let Some(original_block) = self.collect_block(head).await else { tokio::time::sleep(std::time::Duration::from_millis(25)).await; continue; }; diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index 3b0a6b9fd..075538c7f 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -1,5 +1,5 @@ use clap::Args; -use reth_hlfs::{Backfiller, Client, Server}; +use reth_hlfs::{Backfiller, Client, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK, PeerRecord}; use reth_network_api::{events::NetworkEvent, FullNetwork}; use std::{ collections::HashSet, @@ -24,8 +24,6 @@ pub(crate) struct ShareBlocksArgs { pub share_blocks_port: u16, #[arg(long, default_value = "evm-blocks")] pub archive_dir: PathBuf, - #[arg(long, default_value_t = 5_000)] - pub hist_threshold: u64, } pub(crate) struct ShareBlocks { @@ -55,19 +53,19 @@ impl ShareBlocks { } }); - let client = Client::new(Vec::new()).with_timeout(Duration::from_secs(5)); - let bf = Backfiller::new(client, &args.archive_dir, args.hist_threshold); + let client = Client::new(&args.archive_dir, Vec::new()).with_timeout(Duration::from_secs(5)); + let bf = Backfiller::new(client, &args.archive_dir); let _autodetect = spawn_autodetect(network, host, args.share_blocks_port, bf.clone()); - info!(%bind, dir=%args.archive_dir.display(), hist_threshold=%args.hist_threshold, "hlfs: enabled (reth peers)"); + info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)"); Ok(Self { _backfiller: bf, _server, _autodetect }) } #[allow(dead_code)] - pub(crate) async fn try_fetch_one(&self, block: u64, head: u64) -> eyre::Result> { - let rr = (block as usize) ^ (head as usize); // deterministic round-robin seed - self._backfiller.fetch_if_missing(block, head, rr).await.map_err(|e| eyre::eyre!(e)) + pub(crate) async fn try_fetch_one(&self, block: u64) -> eyre::Result> { + let rr = block as usize; + self._backfiller.fetch_if_missing(block, rr).await.map_err(|e| eyre::eyre!(e)) // <- fix: HlfsError -> eyre::Report } } @@ -81,8 +79,7 @@ fn spawn_autodetect( where Net: FullNetwork + Clone + 'static, { - let good: Arc>> = - Arc::new(tokio::sync::Mutex::new(HashSet::new())); + let good: Arc>> = Arc::new(tokio::sync::Mutex::new(HashSet::new())); tokio::spawn(async move { let mut events = network.event_listener(); @@ -90,7 +87,6 @@ where match events.next().await { Some(NetworkEvent::ActivePeerSession { info, .. }) => { let ip = info.remote_addr.ip(); - // skip unusable/self if ip.is_unspecified() { debug!(%ip, "hlfs: skip unspecified"); continue; @@ -100,9 +96,9 @@ where continue; } let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); - if probe_hlfs(addr).await { + if let max_block = probe_hlfs(addr).await { let mut g = good.lock().await; - if g.insert(addr) { + if g.insert(PeerRecord { addr, max_block }) { let v: Vec<_> = g.iter().copied().collect(); backfiller.set_peers(v.clone()); info!(%addr, total=v.len(), "hlfs: peer added"); @@ -121,22 +117,35 @@ where }) } -async fn probe_hlfs(addr: SocketAddr) -> bool { - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - use tokio::net::TcpStream; +pub async fn probe_hlfs(addr: SocketAddr) -> u64 { + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpStream, + }; - let res = timeout(Duration::from_secs(2), async { - if let Ok(mut s) = TcpStream::connect(addr).await { - let mut msg = [0u8; 9]; - msg[0] = 0x01; - let _ = s.write_all(&msg).await; - let mut op = [0u8; 1]; - if s.read_exact(&mut op).await.is_ok() { - return matches!(op[0], 0x02 | 0x03 | 0x04); - } + let fut = async { + let mut s = TcpStream::connect(addr).await.ok()?; + + // send [OP][8 zero bytes] + let mut msg = [0u8; 9]; + msg[0] = OP_REQ_MAX_BLOCK; + s.write_all(&msg).await.ok()?; + + // read 1-byte opcode + let mut op = [0u8; 1]; + s.read_exact(&mut op).await.ok()?; + if op[0] != OP_RES_MAX_BLOCK { + return None; } - false - }) - .await; - matches!(res, Ok(true)) + + // read 8-byte little-endian block number + let mut blk = [0u8; 8]; + s.read_exact(&mut blk).await.ok()?; + Some(u64::from_le_bytes(blk)) + }; + + match timeout(Duration::from_secs(2), fut).await { + Ok(Some(n)) => n, + _ => 0, + } } diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index c8d195282..45155180f 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -4,7 +4,9 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use parking_lot::Mutex; use reth_tracing::tracing::{debug, info, trace, warn}; use std::{ - fs, io, + fs, + hash::{Hash, Hasher}, + io, net::SocketAddr, path::{Path, PathBuf}, sync::Arc, @@ -19,6 +21,13 @@ use tokio::{ type Result = std::result::Result; +pub const OP_REQ_BLOCK: u8 = 0x01; +pub const OP_RES_BLOCK: u8 = 0x02; +pub const OP_REQ_MAX_BLOCK: u8 = 0x03; +pub const OP_RES_MAX_BLOCK: u8 = 0x04; +pub const OP_ERR_TOO_BUSY: u8 = 0x05; +pub const OP_ERR_NOT_FOUND: u8 = 0x06; + #[derive(Error, Debug)] pub enum HlfsError { #[error("io: {0}")] @@ -53,23 +62,45 @@ async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { } /// Client: tries each peer once; rotates starting index per call +#[derive(Debug, Copy, Clone)] +pub struct PeerRecord { + pub addr: SocketAddr, + pub max_block: u64, +} + +impl PartialEq for PeerRecord { + fn eq(&self, o: &Self) -> bool { + self.addr == o.addr + } +} +impl Eq for PeerRecord {} +impl Hash for PeerRecord { + fn hash(&self, s: &mut H) { + self.addr.hash(s); + } +} + #[derive(Clone)] pub struct Client { - peers: Arc>>, + root: PathBuf, + peers: Arc>>, timeout: Duration, + max_block: u64, } impl Client { - pub fn new(peers: Vec) -> Self { - Self { peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(3) } + pub fn new(root: impl Into, peers: Vec) -> Self { + let root: PathBuf = root.into(); + let n = find_max_number_file(&root).unwrap(); + Self { root, peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(3), max_block: n } } - pub fn update_peers(&self, peers: Vec) { + pub fn update_peers(&self, peers: Vec) { *self.peers.lock() = peers; } pub fn with_timeout(mut self, d: Duration) -> Self { self.timeout = d; self } - pub async fn get_block(&self, number: u64, rr_index: usize) -> Result, HlfsError> { + pub async fn wants_block(&self, number: u64, rr_index: usize) -> Result, HlfsError> { let peers = self.peers.lock().clone(); debug!(peer_count = peers.len(), "hlfs: peers"); if peers.is_empty() { @@ -80,26 +111,26 @@ impl Client { let mut last_busy: Option = None; while let Some(i) = all.next() { let addr = peers[i]; - trace!(%addr, "hlfs: dialing"); - match timeout(self.timeout, TcpStream::connect(addr)).await { + trace!(%addr.addr, "hlfs: dialing"); + match timeout(self.timeout, TcpStream::connect(addr.addr)).await { Err(_) => continue, Ok(Err(_)) => continue, Ok(Ok(mut sock)) => { let mut req = BytesMut::with_capacity(1 + 8); - req.put_u8(0x01); // GET + req.put_u8(OP_REQ_BLOCK); put_u64(&mut req, number); if let Err(e) = sock.write_all(&req).await { - debug!(%addr, "hlfs: write err: {e}"); + debug!(%addr.addr, "hlfs: write err: {e}"); continue; } let mut op = [0u8; 1]; if let Err(e) = timeout(self.timeout, sock.read_exact(&mut op)).await { - debug!(%addr, "hlfs: read op timeout {e:?}"); + debug!(%addr.addr, "hlfs: read op timeout {e:?}"); continue; } let op = op[0]; match op { - 0x03 => { + OP_RES_BLOCK => { // DATA let mut len = [0u8; 4]; sock.read_exact(&mut len).await?; @@ -108,13 +139,13 @@ impl Client { sock.read_exact(&mut buf).await?; return Ok(buf); } - 0x04 => { + OP_ERR_TOO_BUSY => { let mut ms = [0u8; 4]; sock.read_exact(&mut ms).await?; last_busy = Some(u32::from_le_bytes(ms)); continue; } - 0x06 => { + OP_ERR_NOT_FOUND => { return Err(HlfsError::NotFound); } _ => { @@ -197,7 +228,7 @@ impl Server { let (mut sock, addr) = lst.accept().await?; if *self.inflight.lock() >= self.max_conns { let mut b = BytesMut::with_capacity(5); - b.put_u8(0x04); + b.put_u8(OP_ERR_TOO_BUSY); put_u32(&mut b, self.busy_retry_ms); let _ = sock.write_all(&b).await; continue; @@ -222,18 +253,10 @@ async fn handle_conn( ) -> Result<(), HlfsError> { let mut op = [0u8; 1]; sock.read_exact(&mut op).await?; - if op[0] != 0x01 && op[0] != 0x02 { + if op[0] != OP_REQ_BLOCK { warn!(%addr, "hlfs: bad op"); return Err(HlfsError::Proto); } - if op[0] == 0x02 { - let mut b = BytesMut::with_capacity(1 + 8 + 4); - b.put_u8(0x05); - put_u64(&mut b, max_block); - put_u32(&mut b, busy_ms); - let _ = sock.write_all(&b).await; - return Ok(()); - } let mut num = [0u8; 8]; sock.read_exact(&mut num).await?; @@ -252,14 +275,14 @@ async fn handle_conn( match fs::read(&path) { Ok(data) => { let mut b = BytesMut::with_capacity(1 + 4 + data.len()); - b.put_u8(0x03); + b.put_u8(OP_RES_BLOCK); put_u32(&mut b, data.len() as u32); b.extend_from_slice(&data); let _ = sock.write_all(&b).await; } Err(e) if e.kind() == io::ErrorKind::NotFound => { let mut b = BytesMut::with_capacity(1); - b.put_u8(0x06); // not found + b.put_u8(OP_ERR_NOT_FOUND); let _ = sock.write_all(&b).await; } Err(e) => { @@ -275,25 +298,19 @@ async fn handle_conn( pub struct Backfiller { client: Client, root: PathBuf, - hist_threshold: u64, } impl Backfiller { - pub fn new(client: Client, root: impl Into, hist_threshold: u64) -> Self { - Self { client, root: root.into(), hist_threshold } + pub fn new(client: Client, root: impl Into) -> Self { + Self { client, root: root.into() } } - pub fn set_peers(&self, peers: Vec) { + pub fn set_peers(&self, peers: Vec) { self.client.update_peers(peers); } pub async fn fetch_if_missing( &self, number: u64, - head: u64, rr_index: usize, ) -> Result, HlfsError> { - if head >= self.hist_threshold && number + self.hist_threshold > head { - //debug!(block=number, "hlfs: skip"); - return Ok(None); - } let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 let f = (n / 1_000_000) * 1_000_000; let s = (n / 1_000) * 1_000; @@ -303,7 +320,7 @@ impl Backfiller { trace!(block = number, "hlfs: already have"); return Ok(None); } - match self.client.get_block(number, rr_index).await { + match self.client.wants_block(number, rr_index).await { Err(HlfsError::NotFound) => Ok(None), Err(HlfsError::Busy(ms)) => { tokio::time::sleep(Duration::from_millis(ms as u64)).await;