From e2045a195ca60b40f1daaf46aa5227d79b5164ec Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Wed, 13 Aug 2025 00:57:16 +0000 Subject: [PATCH] debug block ingest workflow --- bin/reth/src/block_ingest.rs | 7 +++---- crates/net/hlfs/src/lib.rs | 16 ++++++++++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index e5ce0f033..455b5eae6 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -150,7 +150,7 @@ fn date_from_datetime(dt: OffsetDateTime) -> String { } impl BlockIngest { - pub(crate) async fn collect_block(&self, height: u64) -> Option { + pub(crate) async fn collect_block(&self, head: u64, height: u64) -> Option { // info!("Attempting to collect block @ height [{height}]"); // Not a one liner (using .or) to include logs @@ -160,8 +160,7 @@ impl BlockIngest { } if let Some(hlfs) = &self.hlfs { - let u_cache = self.local_blocks_cache.lock().await; - let head = u_cache.keys().next_back().copied().unwrap_or(0); + //info!("!! HEIGHT [{height}] :: HEAD [{head}]"); if hlfs.try_fetch_one(height, head).await.ok().flatten().is_some() { if let Some(block) = self.try_collect_local_block(height).await { info!("Returning HLFS-fetched block @[{height}]"); @@ -299,7 +298,7 @@ impl BlockIngest { self.start_local_ingest_loop(height, current_block_timestamp).await; loop { - let Some(original_block) = self.collect_block(height).await else { + let Some(original_block) = self.collect_block(head, height).await else { tokio::time::sleep(std::time::Duration::from_millis(25)).await; continue; }; diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index bd6419a4c..bd940b9f5 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -192,9 +192,13 @@ async fn handle_conn( let mut nb = [0u8; 8]; sock.read_exact(&mut nb).await?; let number = u64::from_le_bytes(nb); - let path = root.join(format!("{number}.rlp")); + let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 + let f = (n / 1_000_000) * 1_000_000; + let s = (n / 1_000) * 1_000; + let path = format!("{}/{f}/{s}/{number}.rmp.lz4", root.to_string_lossy()); match fs::read(&path).await { Ok(data) => { + debug!("hlfs: found path [{path}]"); let mut b = BytesMut::with_capacity(1 + 8 + 4 + data.len()); b.put_u8(0x02); put_u64(&mut b, number); @@ -241,17 +245,21 @@ impl Backfiller { rr_index: usize, ) -> Result, HlfsError> { if head >= self.hist_threshold && number + self.hist_threshold > head { + //debug!(block=number, "hlfs: skip"); return Ok(None); } - let f = ((number - 1) / 1_000_000) * 1_000_000; - let s = ((number - 1) / 1_000) * 1_000; + let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 + let f = (n / 1_000_000) * 1_000_000; + let s = (n / 1_000) * 1_000; + let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy()); if fs::try_exists(&path).await? { return Ok(None); } + debug!(block = number, "hlfs: going to get_block from client"); match self.client.get_block(number, rr_index).await { Ok(bytes) => { - let tmp = format!("{}/{f}/{s}/{number}.rlp.lz4.part", self.root.to_string_lossy()); + let tmp = format!("{}/{f}/{s}/{number}.rmp.lz4.part", self.root.to_string_lossy()); fs::write(&tmp, &bytes).await?; fs::rename(&tmp, &path).await?; info!(block=number, bytes=bytes.len(), path=%path, "hlfs: wrote");