diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs index 49666ce6d..ba7454922 100644 --- a/bin/reth/src/share_blocks.rs +++ b/bin/reth/src/share_blocks.rs @@ -58,7 +58,7 @@ impl ShareBlocks { let client = Client::new(Vec::new()).with_timeout(Duration::from_secs(5)); let bf = Backfiller::new(client, &args.archive_dir, args.hist_threshold); - let _autodetect = spawn_autodetect(network, args.share_blocks_port, bf.clone()); + let _autodetect = spawn_autodetect(network, host, args.share_blocks_port, bf.clone()); info!(%bind, dir=%args.archive_dir.display(), hist_threshold=%args.hist_threshold, "hlfs: enabled (reth peers)"); Ok(Self { _backfiller: bf, _server, _autodetect }) @@ -72,7 +72,12 @@ impl ShareBlocks { } } -fn spawn_autodetect(network: Net, hlfs_port: u16, backfiller: Backfiller) -> JoinHandle<()> +fn spawn_autodetect( + network: Net, + self_ip: IpAddr, + hlfs_port: u16, + backfiller: Backfiller, +) -> JoinHandle<()> where Net: FullNetwork + Clone + 'static, { @@ -84,6 +89,12 @@ where loop { match events.next().await { Some(NetworkEvent::ActivePeerSession { info, .. }) => { + let ip = info.remote_addr.ip(); + // skip unusable/self + if ip.is_unspecified() || ip == self_ip { + debug!(%ip, "hlfs: skip self/unspecified"); + continue; + } let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); if probe_hlfs(addr).await { let mut g = good.lock().await; diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index bd940b9f5..f0a8bace6 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -42,6 +42,14 @@ fn put_u32(b: &mut BytesMut, v: u32) { b.put_u32_le(v) } +async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { + if let Some(parent) = Path::new(path).parent() { + fs::create_dir_all(parent).await + } else { + Ok(()) + } +} + /// Client: tries each peer once; rotates starting index per call. #[derive(Clone)] pub struct Client { @@ -62,6 +70,7 @@ impl Client { } pub async fn get_block(&self, number: u64, rr_index: usize) -> Result, HlfsError> { let peers = self.peers.lock().clone(); + debug!(peer_count = peers.len(), "hlfs: peers"); if peers.is_empty() { debug!(block = number, "hlfs: no peers"); return Err(HlfsError::Timeout); @@ -69,10 +78,11 @@ impl Client { for t in 0..peers.len() { let i = (rr_index + t) % peers.len(); let addr = peers[i]; - trace!(block=number, %addr, "hlfs: try"); + debug!(block=number, %addr, "hlfs: try"); + let start = std::time::Instant::now(); match timeout(self.timeout, fetch_once(addr, number)).await { Err(_) => { - debug!(block=number, %addr, "hlfs: timeout"); + debug!(elapsed=?start.elapsed(), %addr, block=number, "hlfs: timeout"); continue; } Ok(Err(HlfsError::Busy(d))) => { @@ -98,6 +108,7 @@ impl Client { } } async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> { + debug!(%addr, "hlfs: connect"); let mut s = TcpStream::connect(addr).await?; let mut buf = BytesMut::with_capacity(9); buf.put_u8(0x01); @@ -255,12 +266,20 @@ impl Backfiller { let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy()); if fs::try_exists(&path).await? { return Ok(None); + } else { + ensure_parent_dirs(&path).await?; } + debug!(block = number, "hlfs: going to get_block from client"); match self.client.get_block(number, rr_index).await { Ok(bytes) => { + debug!(block = number, "hlfs: YAY! got block from client"); let tmp = format!("{}/{f}/{s}/{number}.rmp.lz4.part", self.root.to_string_lossy()); + ensure_parent_dirs(&tmp).await?; + + debug!(block = number, path=%tmp, "hlfs: writing file"); fs::write(&tmp, &bytes).await?; + debug!(block = number, from=%tmp, to=%path, "hlfs: moving file"); fs::rename(&tmp, &path).await?; info!(block=number, bytes=bytes.len(), path=%path, "hlfs: wrote"); Ok(Some(bytes.len()))