diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 5281e5e6d..d3136bf73 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -1,11 +1,11 @@ use std::{ collections::HashMap, + fs::File, io::{BufRead, BufReader, Read, Seek, SeekFrom}, path::{Path, PathBuf}, sync::Arc, }; -use eyre::Context; use futures::future::BoxFuture; use serde::Deserialize; use time::{format_description, Duration, OffsetDateTime}; @@ -16,22 +16,12 @@ use crate::node::types::{BlockAndReceipts, EvmBlock}; use super::{BlockSource, BlockSourceBoxed}; -/// Poll interval when tailing an *open* hourly file. const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); -/// Sub‑directory that contains day folders (inside `local_ingest_dir`). const HOURLY_SUBDIR: &str = "hourly"; type LocalBlocksCache = Arc>>; /// Block source that monitors the local ingest directory for the HL node. -/// -/// In certain situations, the [hl-node][ref] may offer lower latency compared to S3. -/// This block source caches blocks from the HL node to minimize latency, -/// while still falling back to [super::LocalBlockSource] or [super::S3BlockSource] when needed. -/// -/// Originally introduced in https://github.com/hl-archive-node/nanoreth/pull/7 -/// -/// [ref]: https://github.com/hyperliquid-dex/node #[derive(Debug, Clone)] pub struct HlNodeBlockSource { pub fallback: BlockSourceBoxed, @@ -57,48 +47,36 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> } fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult { - // info!( - // "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}", - // path, start_height, last_line - // ); - let file = std::fs::File::open(path).expect("Failed to open hour file path"); + let file = File::open(path).expect("Failed to open hour file path"); let reader = BufReader::new(file); - let mut new_blocks = Vec::::new(); + let mut new_blocks = Vec::new(); let mut last_height = start_height; let lines: Vec = reader.lines().collect::>().unwrap(); let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; for (line_idx, line) in lines.iter().enumerate().skip(skip) { - // Safety check ensuring efficiency - if line_idx < *last_line { - continue; - } - if line.trim().is_empty() { + if line_idx < *last_line || line.trim().is_empty() { continue; } - let Ok((parsed_block, height)) = line_to_evm_block(line) else { - warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); - continue; - }; - if height < start_height { - continue; + match line_to_evm_block(line) { + Ok((parsed_block, height)) if height >= start_height => { + last_height = last_height.max(height); + new_blocks.push(parsed_block); + *last_line = line_idx; + } + Ok(_) => continue, + Err(_) => { + warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); + continue; + } } - - last_height = last_height.max(height); - new_blocks.push(parsed_block); - *last_line = line_idx; } ScanResult { next_expected_height: last_height + 1, new_blocks } } -fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime { - OffsetDateTime::from_unix_timestamp_nanos((ts_sec as i128) * 1_000 * 1_000_000) - .expect("timestamp out of range") -} - fn date_from_datetime(dt: OffsetDateTime) -> String { dt.format(&format_description::parse("[year][month][day]").unwrap()).unwrap() } @@ -106,7 +84,6 @@ fn date_from_datetime(dt: OffsetDateTime) -> String { impl BlockSource for HlNodeBlockSource { fn collect_block(&self, height: u64) -> BoxFuture> { Box::pin(async move { - // Not a one liner (using .or) to include logs if let Some(block) = self.try_collect_local_block(height).await { info!("Returning locally synced block for @ Height [{height}]"); Ok(block) @@ -116,8 +93,24 @@ impl BlockSource for HlNodeBlockSource { }) } - fn find_latest_block_number(&self) -> futures::future::BoxFuture> { - self.fallback.find_latest_block_number() + fn find_latest_block_number(&self) -> BoxFuture> { + Box::pin(async move { + let Some(dir) = Self::find_latest_hourly_file(&self.local_ingest_dir) else { + warn!( + "No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir", + self.local_ingest_dir + ); + return None; + }; + let mut file = File::open(&dir).expect("Failed to open hour file path"); + let last_line = read_last_complete_line(&mut file); + let Ok((_, height)) = line_to_evm_block(&last_line) else { + return None; + }; + + info!("Latest block number: {} with path {}", height, dir.display()); + Some(height) + }) } fn recommended_chunk_size(&self) -> u64 { @@ -125,36 +118,31 @@ impl BlockSource for HlNodeBlockSource { } } -fn to_hourly(dt: OffsetDateTime) -> Result { - dt.replace_minute(0)?.replace_second(0)?.replace_nanosecond(0) -} - -fn read_last_line(path: &Path) -> String { +fn read_last_complete_line(read: &mut R) -> String { const CHUNK_SIZE: u64 = 4096; - let mut file = std::fs::File::open(path).expect("Failed to open hour file path"); let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); - let mut pos = file.seek(SeekFrom::End(0)).unwrap(); + let mut pos = read.seek(SeekFrom::End(0)).unwrap(); let mut last_line: Vec = Vec::new(); - // Read backwards in chunks until we find a newline or reach the start while pos > 0 { let read_size = std::cmp::min(pos, CHUNK_SIZE); buf.resize(read_size as usize, 0); - file.seek(SeekFrom::Start(pos - read_size)).unwrap(); - file.read_exact(&mut buf).unwrap(); + read.seek(SeekFrom::Start(pos - read_size)).unwrap(); + read.read_exact(&mut buf).unwrap(); last_line = [buf.clone(), last_line].concat(); - // Remove trailing newline if last_line.ends_with(b"\n") { last_line.pop(); } if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { - // Found a newline, so the last line starts after this - let start = idx + 1; - return String::from_utf8(last_line[start..].to_vec()).unwrap(); + let candidate = &last_line[idx + 1..]; + if line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap()).is_ok() { + return String::from_utf8(candidate.to_vec()).unwrap(); + } + last_line.truncate(idx); } if pos < read_size { @@ -163,7 +151,6 @@ fn read_last_line(path: &Path) -> String { pos -= read_size; } - // There is 0~1 lines in the entire file String::from_utf8(last_line).unwrap() } @@ -173,83 +160,81 @@ impl HlNodeBlockSource { u_cache.remove(&height) } + fn datetime_from_path(path: &Path) -> Option { + let dt_part = path.parent()?.parent()?.file_name()?.to_str()?; + let hour_part = path.file_name()?.to_str()?; + let dt = OffsetDateTime::parse( + dt_part, + &format_description::parse("[year][month][day]").unwrap(), + ) + .ok()?; + let hour: u8 = hour_part.parse().ok()?; + let dt = dt.replace_hour(hour).ok()?; + Some(dt) + } + + fn all_hourly_files(root: &Path) -> Option> { + let dir = root.join(HOURLY_SUBDIR); + let mut files = Vec::new(); + for entry in std::fs::read_dir(dir).ok()? { + let file = entry.ok()?.path(); + let subfiles: Vec<_> = + std::fs::read_dir(&file).ok()?.filter_map(|f| f.ok().map(|f| f.path())).collect(); + files.extend(subfiles); + } + files.sort(); + Some(files) + } + + fn find_latest_hourly_file(root: &Path) -> Option { + Self::all_hourly_files(root)?.last().cloned() + } + async fn try_backfill_local_blocks( root: &Path, cache: &LocalBlocksCache, mut next_height: u64, ) -> eyre::Result<()> { - fn parse_file_name(f: PathBuf) -> Option<(u64, PathBuf)> { - // Validate and returns sort key for hourly//<0-23> - let file_name = f.file_name()?.to_str()?; - let Ok(file_name_num) = file_name.parse::() else { - warn!("Failed to parse file name: {:?}", f); - return None; - }; - - // Check if filename is numeric and 0..24 - if !(0..=24).contains(&file_name_num) { - return None; - } - Some((file_name_num, f)) - } - let mut u_cache = cache.lock().await; - // We assume that ISO format is sorted properly using naive string sort - let hourly_subdir = root.join(HOURLY_SUBDIR); - let mut files: Vec<_> = std::fs::read_dir(hourly_subdir) - .context("Failed to read hourly subdir")? - .filter_map(|f| f.ok().map(|f| f.path())) - .collect(); - files.sort(); - - for file in files { - let mut subfiles: Vec<_> = file - .read_dir() - .context("Failed to read hourly subdir")? - .filter_map(|f| f.ok().map(|f| f.path())) - .filter_map(parse_file_name) - .collect(); - subfiles.sort(); - - for (_, subfile) in subfiles { - // Fast path: check the last line of the file - let last_line = read_last_line(&subfile); - if let Ok((_, height)) = line_to_evm_block(&last_line) { - if height < next_height { - continue; - } - } else { - warn!( - "Failed to parse last line of file, fallback to slow path: {:?}", - subfile - ); + for subfile in Self::all_hourly_files(root).unwrap_or_default() { + let mut file = File::open(&subfile).expect("Failed to open hour file path"); + let last_line = read_last_complete_line(&mut file); + if let Ok((_, height)) = line_to_evm_block(&last_line) { + if height < next_height { + continue; } - - let ScanResult { next_expected_height, new_blocks } = - scan_hour_file(&subfile, &mut 0, next_height); - for blk in new_blocks { - let EvmBlock::Reth115(b) = &blk.block; - u_cache.insert(b.header.header.number, blk); - } - next_height = next_expected_height; + } else { + warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile); } + + let ScanResult { next_expected_height, new_blocks } = + scan_hour_file(&subfile, &mut 0, next_height); + for blk in new_blocks { + let EvmBlock::Reth115(b) = &blk.block; + u_cache.insert(b.header.header.number, blk); + } + next_height = next_expected_height; } info!("Backfilled {} blocks", u_cache.len()); Ok(()) } - async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) { + async fn start_local_ingest_loop(&self, current_head: u64) { let root = self.local_ingest_dir.to_owned(); let cache = self.local_blocks_cache.clone(); tokio::spawn(async move { - // hl-node backfill is for fast path; do not exit when it fails - let _ = Self::try_backfill_local_blocks(&root, &cache, current_head).await; - let mut next_height = current_head; - let mut dt = to_hourly(datetime_from_timestamp(current_ts)).unwrap(); + + // Wait for the first hourly file to be created + let mut dt = loop { + if let Some(latest_file) = Self::find_latest_hourly_file(&root) { + break Self::datetime_from_path(&latest_file).unwrap(); + } + tokio::time::sleep(TAIL_INTERVAL).await; + }; let mut hour = dt.hour(); let mut day_str = date_from_datetime(dt); @@ -273,14 +258,8 @@ impl HlNodeBlockSource { } } - // Decide whether the *current* hour file is closed (past) or - // still live. If it’s in the past by > 1 h, move to next hour; - // otherwise, keep tailing the same file. let now = OffsetDateTime::now_utc(); - // println!("Date Current {:?}", dt); - // println!("Now Current {:?}", now); - if dt + Duration::HOUR < now { dt += Duration::HOUR; hour = dt.hour(); @@ -299,12 +278,14 @@ impl HlNodeBlockSource { } pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> { - let EvmBlock::Reth115(latest_block) = - self.fallback.collect_block(next_block_number).await?.block; + let _ = Self::try_backfill_local_blocks( + &self.local_ingest_dir, + &self.local_blocks_cache, + next_block_number, + ) + .await; - let latest_block_ts = latest_block.header.header.timestamp; - - self.start_local_ingest_loop(next_block_number, latest_block_ts).await; + self.start_local_ingest_loop(next_block_number).await; Ok(()) }