From cdb0f9e8a2cb59f3b7e5225c705814f23f5710e2 Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Thu, 21 Aug 2025 23:46:53 -0700 Subject: [PATCH] debug --- bin/reth/src/block_ingest.rs | 18 +++--- bin/reth/src/share_blocks.rs | 113 ++++++++++++++++++++--------------- crates/net/hlfs/src/lib.rs | 21 ++++--- 3 files changed, 89 insertions(+), 63 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index c6b60a1f5..74b82bbf0 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -159,15 +159,15 @@ impl BlockIngest { return Some(block); } - if let Some(hlfs) = &self.hlfs { - //info!("!! HEIGHT [{height}] :: HEAD [{head}]"); - if hlfs.try_fetch_one(height).await.ok().flatten().is_some() { - if let Some(block) = self.try_collect_local_block(height).await { - info!("Returning HLFS-fetched block @[{height}]"); - return Some(block); - } - } - } + // if let Some(hlfs) = &self.hlfs { + // //info!("!! HEIGHT [{height}] :: HEAD [{head}]"); + // if hlfs.try_fetch_one(height).await.ok().flatten().is_some() { + // if let Some(block) = self.try_collect_local_block(height).await { + // info!("Returning HLFS-fetched block @[{height}]"); + // return Some(block); + // } + // } + // } self.try_collect_s3_block(height) } diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index 7eeddba08..ede25305c 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -1,14 +1,16 @@ use clap::Args; -use reth_hlfs::{Backfiller, Client, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK, PeerRecord}; +use reth_hlfs::{Backfiller, Client, PeerRecord, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK}; use reth_network_api::{events::NetworkEvent, FullNetwork}; use std::{ collections::HashSet, net::{IpAddr, SocketAddr}, path::PathBuf, sync::Arc, - time::Duration, }; -use tokio::{task::JoinHandle, time::timeout}; +use tokio::{ + task::JoinHandle, + time::{sleep, timeout, Duration}, +}; use tracing::{debug, info, warn}; // use futures_util::StreamExt; @@ -27,7 +29,6 @@ pub(crate) struct ShareBlocksArgs { } pub(crate) struct ShareBlocks { - pub(crate) _backfiller: Backfiller, _server: JoinHandle<()>, _autodetect: JoinHandle<()>, } @@ -53,65 +54,83 @@ impl ShareBlocks { } }); - let client = Client::new(&args.archive_dir, Vec::new()).with_timeout(Duration::from_secs(5)); - let bf = Backfiller::new(client, &args.archive_dir); - - let _autodetect = spawn_autodetect(network, host, args.share_blocks_port, bf.clone()); + let _autodetect = spawn_autodetect(network, host, args.share_blocks_port, args.archive_dir.clone()); info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)"); - Ok(Self { _backfiller: bf, _server, _autodetect }) + Ok(Self { _server, _autodetect }) } - #[allow(dead_code)] - pub(crate) async fn try_fetch_one(&self, block: u64) -> eyre::Result> { - let rr = block as usize; - self._backfiller.fetch_if_missing(block, rr).await.map_err(|e| eyre::eyre!(e)) - // <- fix: HlfsError -> eyre::Report - } + // #[allow(dead_code)] + // pub(crate) async fn try_fetch_one(&self, block: u64) -> eyre::Result> { + // self._backfiller.fetch_if_missing(block).await.map_err(|e| eyre::eyre!(e)) + // } } fn spawn_autodetect( network: Net, self_ip: IpAddr, hlfs_port: u16, - backfiller: Backfiller, + archive_dir: PathBuf, ) -> JoinHandle<()> where Net: FullNetwork + Clone + 'static, { - let good: Arc>> = Arc::new(tokio::sync::Mutex::new(HashSet::new())); + let client = Client::new(&archive_dir, Vec::new()).with_timeout(Duration::from_secs(5)); + let backfiller = Arc::new(tokio::sync::Mutex::new(Backfiller::new(client, &archive_dir))); + let good: Arc>> = + Arc::new(tokio::sync::Mutex::new(HashSet::new())); - tokio::spawn(async move { - let mut events = network.event_listener(); - loop { - match events.next().await { - Some(NetworkEvent::ActivePeerSession { info, .. }) => { - let ip = info.remote_addr.ip(); - if ip.is_unspecified() { - debug!(%ip, "hlfs: skip unspecified"); - continue; - } - if ip == self_ip { - debug!(%ip, "hlfs: skip self"); - continue; - } - let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); - let max_block = probe_hlfs(addr).await; - if max_block != 0 { - let mut g = good.lock().await; - if g.insert(PeerRecord { addr, max_block }) { - let v: Vec<_> = g.iter().copied().collect(); - backfiller.set_peers(v.clone()); - info!(%addr, %max_block, total=v.len(), "hlfs: peer added"); - } - } else { - debug!(%addr, "hlfs: peer has no HLFS"); - } + tokio::spawn({ + let backfiller = backfiller.clone(); + async move { + loop { + let mut bf = backfiller.lock().await; + warn!("hlfs: backfiller started"); + if bf.client.max_block < bf.max_block_seen { + let block = bf.client.max_block + 1; + let _ = bf.fetch_if_missing(block).await; } - Some(_) => {} - None => { - warn!("hlfs: network event stream ended"); - break; + + sleep(Duration::from_secs(1)).await; + } + } + }); + + tokio::spawn({ + let backfiller = backfiller.clone(); + async move { + let mut events = network.event_listener(); + loop { + let mut bf = backfiller.lock().await; + match events.next().await { + Some(NetworkEvent::ActivePeerSession { info, .. }) => { + let ip = info.remote_addr.ip(); + if ip.is_unspecified() { + debug!(%ip, "hlfs: skip unspecified"); + continue; + } + if ip == self_ip { + debug!(%ip, "hlfs: skip self"); + continue; + } + let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); + let max_block = probe_hlfs(addr).await; + if max_block != 0 { + let mut g = good.lock().await; + if g.insert(PeerRecord { addr, max_block }) { + let v: Vec<_> = g.iter().copied().collect(); + bf.set_peers(v.clone()); + info!(%addr, %max_block, total=v.len(), "hlfs: peer added"); + } + } else { + debug!(%addr, "hlfs: peer has no HLFS"); + } + } + Some(_) => {} + None => { + warn!("hlfs: network event stream ended"); + break; + } } } } diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index ddbbb8b2d..bbbf4403e 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -83,9 +83,9 @@ impl Hash for PeerRecord { #[derive(Clone)] pub struct Client { root: PathBuf, - peers: Arc>>, + pub peers: Arc>>, timeout: Duration, - max_block: u64, + pub max_block: u64, } impl Client { pub fn new(root: impl Into, peers: Vec) -> Self { @@ -310,21 +310,28 @@ async fn handle_conn( /// Backfiller: ask client per missing block; rotate peers every block. #[derive(Clone)] pub struct Backfiller { - client: Client, + pub client: Client, root: PathBuf, + pub max_block_seen: u64, } impl Backfiller { pub fn new(client: Client, root: impl Into) -> Self { - Self { client, root: root.into() } + Self { client, root: root.into(), max_block_seen: 0 } } - pub fn set_peers(&self, peers: Vec) { + pub fn set_peers(&mut self, peers: Vec) { self.client.update_peers(peers); + let _peers = self.client.peers.lock().clone(); + for p in _peers { + if p.max_block > self.max_block_seen { + self.max_block_seen = p.max_block + } + } } pub async fn fetch_if_missing( - &self, + &mut self, number: u64, - rr_index: usize, ) -> Result, HlfsError> { + let rr_index = number as usize; let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 let f = (n / 1_000_000) * 1_000_000; let s = (n / 1_000) * 1_000;