debug block ingest workflow

This commit is contained in:
Nicholas Wehr
2025-08-13 00:57:16 +00:00
parent ec281f9cc7
commit e2045a195c
2 changed files with 15 additions and 8 deletions

View File

@ -150,7 +150,7 @@ fn date_from_datetime(dt: OffsetDateTime) -> String {
} }
impl BlockIngest { impl BlockIngest {
pub(crate) async fn collect_block(&self, height: u64) -> Option<BlockAndReceipts> { pub(crate) async fn collect_block(&self, head: u64, height: u64) -> Option<BlockAndReceipts> {
// info!("Attempting to collect block @ height [{height}]"); // info!("Attempting to collect block @ height [{height}]");
// Not a one liner (using .or) to include logs // Not a one liner (using .or) to include logs
@ -160,8 +160,7 @@ impl BlockIngest {
} }
if let Some(hlfs) = &self.hlfs { if let Some(hlfs) = &self.hlfs {
let u_cache = self.local_blocks_cache.lock().await; //info!("!! HEIGHT [{height}] :: HEAD [{head}]");
let head = u_cache.keys().next_back().copied().unwrap_or(0);
if hlfs.try_fetch_one(height, head).await.ok().flatten().is_some() { if hlfs.try_fetch_one(height, head).await.ok().flatten().is_some() {
if let Some(block) = self.try_collect_local_block(height).await { if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning HLFS-fetched block @[{height}]"); info!("Returning HLFS-fetched block @[{height}]");
@ -299,7 +298,7 @@ impl BlockIngest {
self.start_local_ingest_loop(height, current_block_timestamp).await; self.start_local_ingest_loop(height, current_block_timestamp).await;
loop { loop {
let Some(original_block) = self.collect_block(height).await else { let Some(original_block) = self.collect_block(head, height).await else {
tokio::time::sleep(std::time::Duration::from_millis(25)).await; tokio::time::sleep(std::time::Duration::from_millis(25)).await;
continue; continue;
}; };

View File

@ -192,9 +192,13 @@ async fn handle_conn(
let mut nb = [0u8; 8]; let mut nb = [0u8; 8];
sock.read_exact(&mut nb).await?; sock.read_exact(&mut nb).await?;
let number = u64::from_le_bytes(nb); let number = u64::from_le_bytes(nb);
let path = root.join(format!("{number}.rlp")); let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
let f = (n / 1_000_000) * 1_000_000;
let s = (n / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", root.to_string_lossy());
match fs::read(&path).await { match fs::read(&path).await {
Ok(data) => { Ok(data) => {
debug!("hlfs: found path [{path}]");
let mut b = BytesMut::with_capacity(1 + 8 + 4 + data.len()); let mut b = BytesMut::with_capacity(1 + 8 + 4 + data.len());
b.put_u8(0x02); b.put_u8(0x02);
put_u64(&mut b, number); put_u64(&mut b, number);
@ -241,17 +245,21 @@ impl Backfiller {
rr_index: usize, rr_index: usize,
) -> Result<Option<usize>, HlfsError> { ) -> Result<Option<usize>, HlfsError> {
if head >= self.hist_threshold && number + self.hist_threshold > head { if head >= self.hist_threshold && number + self.hist_threshold > head {
//debug!(block=number, "hlfs: skip");
return Ok(None); return Ok(None);
} }
let f = ((number - 1) / 1_000_000) * 1_000_000; let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
let s = ((number - 1) / 1_000) * 1_000; let f = (n / 1_000_000) * 1_000_000;
let s = (n / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy()); let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy());
if fs::try_exists(&path).await? { if fs::try_exists(&path).await? {
return Ok(None); return Ok(None);
} }
debug!(block = number, "hlfs: going to get_block from client");
match self.client.get_block(number, rr_index).await { match self.client.get_block(number, rr_index).await {
Ok(bytes) => { Ok(bytes) => {
let tmp = format!("{}/{f}/{s}/{number}.rlp.lz4.part", self.root.to_string_lossy()); let tmp = format!("{}/{f}/{s}/{number}.rmp.lz4.part", self.root.to_string_lossy());
fs::write(&tmp, &bytes).await?; fs::write(&tmp, &bytes).await?;
fs::rename(&tmp, &path).await?; fs::rename(&tmp, &path).await?;
info!(block=number, bytes=bytes.len(), path=%path, "hlfs: wrote"); info!(block=number, bytes=bytes.len(), path=%path, "hlfs: wrote");