From a47674ac30da7710a9557cdc123d907227a658a2 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Wed, 30 Jul 2025 04:05:52 +0000 Subject: [PATCH] feat: Add backfill support from local node This currently does not support huge amount of backfill log within local node --- src/pseudo_peer/sources/hl_node.rs | 153 +++++++++++++++++++++++++---- 1 file changed, 132 insertions(+), 21 deletions(-) diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 07cafe3b7..bc712265e 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -1,16 +1,16 @@ use std::{ - io::{BufRead, BufReader}, + io::{BufRead, BufReader, Read, Seek, SeekFrom}, path::{Path, PathBuf}, sync::Arc, }; -use eyre::ContextCompat; +use eyre::{Context, ContextCompat}; use futures::future::BoxFuture; use reth_network::cache::LruMap; use serde::Deserialize; use time::{format_description, Duration, OffsetDateTime}; use tokio::sync::Mutex; -use tracing::info; +use tracing::{info, warn}; use crate::node::types::{BlockAndReceipts, EvmBlock}; @@ -24,6 +24,8 @@ const HOURLY_SUBDIR: &str = "hourly"; /// In normal situation, 0~1 blocks will be cached. const CACHE_SIZE: u32 = 1000; +type LocalBlocksCache = Arc>>; + /// Block source that monitors the local ingest directory for the HL node. /// /// In certain situations, the [hl-node][ref] may offer lower latency compared to S3. @@ -37,7 +39,7 @@ const CACHE_SIZE: u32 = 1000; pub struct HlNodeBlockSource { pub fallback: BlockSourceBoxed, pub local_ingest_dir: PathBuf, - pub local_blocks_cache: Arc>>, // height → block + pub local_blocks_cache: LocalBlocksCache, // height → block } #[derive(Deserialize)] @@ -48,6 +50,15 @@ struct ScanResult { new_blocks: Vec, } +fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { + let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = + serde_json::from_str(line)?; + let height = match &parsed_block.block { + EvmBlock::Reth115(b) => b.header.header.number, + }; + Ok((parsed_block, height)) +} + fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult { // info!( // "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}", @@ -70,25 +81,17 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan continue; } - let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = - serde_json::from_str(line).expect("Failed to parse local block and receipts"); - - let height = match &parsed_block.block { - EvmBlock::Reth115(b) => { - let block_number = b.header.header.number; - // Another check to ensure not returning an older block - if block_number < start_height { - continue; - } - block_number - } + let Ok((parsed_block, height)) = line_to_evm_block(&line) else { + warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); + continue; }; - // println!("Iterating block height {:?} | Line {}", height, line_idx); - if height >= start_height { - last_height = last_height.max(height); - new_blocks.push(parsed_block); - *last_line = line_idx; + if height < start_height { + continue; } + + last_height = last_height.max(height); + new_blocks.push(parsed_block); + *last_line = line_idx; } ScanResult { next_expected_height: last_height + 1, new_blocks } @@ -129,17 +132,125 @@ fn to_hourly(dt: OffsetDateTime) -> Result String { + const CHUNK_SIZE: u64 = 4096; + let mut file = std::fs::File::open(path).expect("Failed to open hour file path"); + let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); + let mut pos = file.seek(SeekFrom::End(0)).unwrap(); + let mut last_line: Vec = Vec::new(); + + // Read backwards in chunks until we find a newline or reach the start + while pos > 0 { + let read_size = std::cmp::min(pos, CHUNK_SIZE); + buf.resize(read_size as usize, 0); + + file.seek(SeekFrom::Start(pos - read_size)).unwrap(); + file.read_exact(&mut buf).unwrap(); + + last_line = [buf.clone(), last_line].concat(); + + // Remove trailing newline + if last_line.ends_with(b"\n") { + last_line.pop(); + } + + if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { + // Found a newline, so the last line starts after this + let start = idx + 1; + return String::from_utf8(last_line[start..].to_vec()).unwrap(); + } + + if pos < read_size { + break; + } + pos -= read_size; + } + + // There is 0~1 lines in the entire file + String::from_utf8(last_line).unwrap() +} + impl HlNodeBlockSource { async fn try_collect_local_block(&self, height: u64) -> Option { let mut u_cache = self.local_blocks_cache.lock().await; u_cache.remove(&height) } + async fn try_backfill_local_blocks( + root: &Path, + cache: &LocalBlocksCache, + mut next_height: u64, + ) -> eyre::Result<()> { + fn parse_file_name(f: PathBuf) -> Option<(u64, PathBuf)> { + // Validate and returns sort key for hourly//<0-23> + let file_name = f.file_name()?.to_str()?; + let Ok(file_name_num) = file_name.parse::() else { + warn!("Failed to parse file name: {:?}", f); + return None; + }; + + // Check if filename is numeric and 0..24 + if !(0..=24).contains(&file_name_num) { + return None; + } + Some((file_name_num, f)) + } + + let mut u_cache = cache.lock().await; + + // We assume that ISO format is sorted properly using naive string sort + let hourly_subdir = root.join(HOURLY_SUBDIR); + let mut files: Vec<_> = std::fs::read_dir(hourly_subdir) + .context("Failed to read hourly subdir")? + .filter_map(|f| f.ok().map(|f| f.path())) + .collect(); + files.sort(); + + for file in files { + let mut subfiles: Vec<_> = file + .read_dir() + .context("Failed to read hourly subdir")? + .filter_map(|f| f.ok().map(|f| f.path())) + .filter_map(parse_file_name) + .collect(); + subfiles.sort(); + + for (file_name_num, subfile) in subfiles { + if file_name_num < next_height { + continue; + } + + // Fast path: check the last line of the file + let last_line = read_last_line(&subfile); + if let Ok((_, height)) = line_to_evm_block(&last_line) { + if height < next_height { + continue; + } + } else { + warn!("Failed to parse last line of file, fallback to slow path: {:?}", file); + } + + let ScanResult { next_expected_height, new_blocks } = + scan_hour_file(&file, &mut 0, next_height); + for blk in new_blocks { + let EvmBlock::Reth115(b) = &blk.block; + u_cache.insert(b.header.header.number, blk); + } + next_height = next_expected_height; + } + } + + Ok(()) + } + async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) { let root = self.local_ingest_dir.to_owned(); let cache = self.local_blocks_cache.clone(); tokio::spawn(async move { + // hl-node backfill is for fast path; do not exit when it fails + let _ = Self::try_backfill_local_blocks(&root, &cache, current_head).await; + let mut next_height = current_head; let mut dt = to_hourly(datetime_from_timestamp(current_ts)).unwrap();