diff --git a/src/pseudo_peer/sources/hl_node/mod.rs b/src/pseudo_peer/sources/hl_node/mod.rs index 5af59bad0..65eeaf267 100644 --- a/src/pseudo_peer/sources/hl_node/mod.rs +++ b/src/pseudo_peer/sources/hl_node/mod.rs @@ -8,7 +8,7 @@ mod time_utils; use self::{ cache::LocalBlocksCache, file_ops::FileOperations, - scan::{ScanOptions, Scanner}, + scan::{LineStream, ScanOptions, Scanner}, time_utils::TimeUtils, }; use super::{BlockSource, BlockSourceBoxed}; @@ -120,6 +120,28 @@ impl BlockSource for HlNodeBlockSource { } } +struct CurrentFile { + path: PathBuf, + line_stream: Option, +} + +impl CurrentFile { + pub fn from_datetime(dt: OffsetDateTime, root: &Path) -> Self { + let (hour, day_str) = (dt.hour(), TimeUtils::date_from_datetime(dt)); + let path = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{}", hour)); + Self { path, line_stream: None } + } + + pub fn open(&mut self) -> eyre::Result<()> { + if self.line_stream.is_some() { + return Ok(()); + } + + self.line_stream = Some(LineStream::from_path(&self.path)?); + Ok(()) + } +} + impl HlNodeBlockSource { async fn update_last_fetch( last_local_fetch: Arc>>, @@ -142,9 +164,9 @@ impl HlNodeBlockSource { } let path = u_cache.get_path_for_height(height)?; info!("Loading block data from {:?}", path); + let mut line_stream = LineStream::from_path(&path).ok()?; let scan_result = Scanner::scan_hour_file( - &path, - &mut 0, + &mut line_stream, ScanOptions { start_height: 0, only_load_ranges: false }, ); u_cache.load_scan_result(scan_result); @@ -165,9 +187,10 @@ impl HlNodeBlockSource { } else { warn!("Failed to parse last line of file: {:?}", subfile); } + let mut line_stream = + LineStream::from_path(&subfile).expect("Failed to open line stream"); let mut scan_result = Scanner::scan_hour_file( - &subfile, - &mut 0, + &mut line_stream, ScanOptions { start_height: cutoff_height, only_load_ranges: true }, ); scan_result.new_blocks.clear(); // Only store ranges, load data lazily @@ -188,15 +211,13 @@ impl HlNodeBlockSource { } tokio::time::sleep(TAIL_INTERVAL).await; }; - let (mut hour, mut day_str, mut last_line) = - (dt.hour(), TimeUtils::date_from_datetime(dt), 0); + let mut current_file = CurrentFile::from_datetime(dt, &root); info!("Starting local ingest loop from height: {}", current_head); loop { - let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); - if hour_file.exists() { + let _ = current_file.open(); + if let Some(line_stream) = &mut current_file.line_stream { let scan_result = Scanner::scan_hour_file( - &hour_file, - &mut last_line, + line_stream, ScanOptions { start_height: next_height, only_load_ranges: false }, ); next_height = scan_result.next_expected_height; @@ -205,11 +226,8 @@ impl HlNodeBlockSource { let now = OffsetDateTime::now_utc(); if dt + ONE_HOUR < now { dt += ONE_HOUR; - (hour, day_str, last_line) = (dt.hour(), TimeUtils::date_from_datetime(dt), 0); - info!( - "Moving to new file: {:?}", - root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")) - ); + current_file = CurrentFile::from_datetime(dt, &root); + info!("Moving to new file: {:?}", current_file.path); continue; } tokio::time::sleep(TAIL_INTERVAL).await; diff --git a/src/pseudo_peer/sources/hl_node/scan.rs b/src/pseudo_peer/sources/hl_node/scan.rs index 5a8fe43ce..7ae0c775b 100644 --- a/src/pseudo_peer/sources/hl_node/scan.rs +++ b/src/pseudo_peer/sources/hl_node/scan.rs @@ -2,7 +2,7 @@ use crate::node::types::{BlockAndReceipts, EvmBlock}; use serde::{Deserialize, Serialize}; use std::{ fs::File, - io::{BufRead, BufReader}, + io::{BufRead, BufReader, Seek, SeekFrom}, ops::RangeInclusive, path::{Path, PathBuf}, }; @@ -18,6 +18,7 @@ pub struct ScanResult { pub new_block_ranges: Vec>, } +#[derive(Debug, Clone)] pub struct ScanOptions { pub start_height: u64, pub only_load_ranges: bool, @@ -25,6 +26,57 @@ pub struct ScanOptions { pub struct Scanner; +/// Stream for sequentially reading lines from a file. +/// +/// This struct allows sequential iteration over lines over [Self::next] method. +/// It is resilient to cases where the line producer process is interrupted while writing: +/// - If a line is incomplete but still ends with a line ending, it is skipped: later, the fallback +/// block source will be used to retrieve the missing block. +/// - If a line does not end with a newline (i.e., the write was incomplete), the method returns +/// `None` to break out of the loop and avoid reading partial data. +/// - If a temporary I/O error occurs, the stream exits the loop without rewinding the cursor, which +/// will result in skipping ahead to the next unread bytes. +pub struct LineStream { + path: PathBuf, + reader: BufReader, +} + +impl LineStream { + pub fn from_path(path: &Path) -> std::io::Result { + let reader = BufReader::with_capacity(1024 * 1024, File::open(path)?); + Ok(Self { path: path.to_path_buf(), reader }) + } + + pub fn next(&mut self) -> Option { + let mut line_buffer = vec![]; + let Ok(size) = self.reader.read_until(b'\n', &mut line_buffer) else { + // Temporary I/O error; restart the loop + return None; + }; + + // Now cursor is right after the end of the line + // On UTF-8 error, skip the line + let Ok(mut line) = String::from_utf8(line_buffer) else { + return Some(String::new()); + }; + + // If line is not completed yet, return None so that we can break the loop + if line.ends_with('\n') { + if line.ends_with('\r') { + line.pop(); + } + line.pop(); + return Some(line); + } + + // info!("Line is not completed yet: {}", line); + if size != 0 { + self.reader.seek(SeekFrom::Current(-(size as i64))).unwrap(); + } + None + } +} + impl Scanner { pub fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts = @@ -35,31 +87,20 @@ impl Scanner { Ok((parsed_block, height)) } - pub fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult { - let lines: Vec = - BufReader::new(File::open(path).expect("Failed to open hour file")) - .lines() - .collect::>() - .unwrap(); - let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; + pub fn scan_hour_file(line_stream: &mut LineStream, options: ScanOptions) -> ScanResult { let mut new_blocks = Vec::new(); let mut last_height = options.start_height; let mut block_ranges = Vec::new(); let mut current_range: Option<(u64, u64)> = None; - for (line_idx, line) in lines.iter().enumerate().skip(skip) { - if line_idx < *last_line || line.trim().is_empty() { - continue; - } - - match Self::line_to_evm_block(line) { + while let Some(line) = line_stream.next() { + match Self::line_to_evm_block(&line) { Ok((parsed_block, height)) => { if height >= options.start_height { last_height = last_height.max(height); if !options.only_load_ranges { new_blocks.push(parsed_block); } - *last_line = line_idx; } match current_range { @@ -74,7 +115,7 @@ impl Scanner { } } } - Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)), + Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(&line)), } } @@ -82,7 +123,7 @@ impl Scanner { block_ranges.push(start..=end); } ScanResult { - path: path.to_path_buf(), + path: line_stream.path.clone(), next_expected_height: last_height + 1, new_blocks, new_block_ranges: block_ranges,