refactor: Use offsets instead of lines, wrap related structs in one

This commit is contained in:
sprites0
2025-10-05 09:59:12 +00:00
parent bfd61094ee
commit 0fd4b7943f
2 changed files with 92 additions and 33 deletions

View File

@ -8,7 +8,7 @@ mod time_utils;
use self::{ use self::{
cache::LocalBlocksCache, cache::LocalBlocksCache,
file_ops::FileOperations, file_ops::FileOperations,
scan::{ScanOptions, Scanner}, scan::{LineStream, ScanOptions, Scanner},
time_utils::TimeUtils, time_utils::TimeUtils,
}; };
use super::{BlockSource, BlockSourceBoxed}; use super::{BlockSource, BlockSourceBoxed};
@ -120,6 +120,28 @@ impl BlockSource for HlNodeBlockSource {
} }
} }
struct CurrentFile {
path: PathBuf,
line_stream: Option<LineStream>,
}
impl CurrentFile {
pub fn from_datetime(dt: OffsetDateTime, root: &Path) -> Self {
let (hour, day_str) = (dt.hour(), TimeUtils::date_from_datetime(dt));
let path = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{}", hour));
Self { path, line_stream: None }
}
pub fn open(&mut self) -> eyre::Result<()> {
if self.line_stream.is_some() {
return Ok(());
}
self.line_stream = Some(LineStream::from_path(&self.path)?);
Ok(())
}
}
impl HlNodeBlockSource { impl HlNodeBlockSource {
async fn update_last_fetch( async fn update_last_fetch(
last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>, last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
@ -142,9 +164,9 @@ impl HlNodeBlockSource {
} }
let path = u_cache.get_path_for_height(height)?; let path = u_cache.get_path_for_height(height)?;
info!("Loading block data from {:?}", path); info!("Loading block data from {:?}", path);
let mut line_stream = LineStream::from_path(&path).ok()?;
let scan_result = Scanner::scan_hour_file( let scan_result = Scanner::scan_hour_file(
&path, &mut line_stream,
&mut 0,
ScanOptions { start_height: 0, only_load_ranges: false }, ScanOptions { start_height: 0, only_load_ranges: false },
); );
u_cache.load_scan_result(scan_result); u_cache.load_scan_result(scan_result);
@ -165,9 +187,10 @@ impl HlNodeBlockSource {
} else { } else {
warn!("Failed to parse last line of file: {:?}", subfile); warn!("Failed to parse last line of file: {:?}", subfile);
} }
let mut line_stream =
LineStream::from_path(&subfile).expect("Failed to open line stream");
let mut scan_result = Scanner::scan_hour_file( let mut scan_result = Scanner::scan_hour_file(
&subfile, &mut line_stream,
&mut 0,
ScanOptions { start_height: cutoff_height, only_load_ranges: true }, ScanOptions { start_height: cutoff_height, only_load_ranges: true },
); );
scan_result.new_blocks.clear(); // Only store ranges, load data lazily scan_result.new_blocks.clear(); // Only store ranges, load data lazily
@ -188,15 +211,13 @@ impl HlNodeBlockSource {
} }
tokio::time::sleep(TAIL_INTERVAL).await; tokio::time::sleep(TAIL_INTERVAL).await;
}; };
let (mut hour, mut day_str, mut last_line) = let mut current_file = CurrentFile::from_datetime(dt, &root);
(dt.hour(), TimeUtils::date_from_datetime(dt), 0);
info!("Starting local ingest loop from height: {}", current_head); info!("Starting local ingest loop from height: {}", current_head);
loop { loop {
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); let _ = current_file.open();
if hour_file.exists() { if let Some(line_stream) = &mut current_file.line_stream {
let scan_result = Scanner::scan_hour_file( let scan_result = Scanner::scan_hour_file(
&hour_file, line_stream,
&mut last_line,
ScanOptions { start_height: next_height, only_load_ranges: false }, ScanOptions { start_height: next_height, only_load_ranges: false },
); );
next_height = scan_result.next_expected_height; next_height = scan_result.next_expected_height;
@ -205,11 +226,8 @@ impl HlNodeBlockSource {
let now = OffsetDateTime::now_utc(); let now = OffsetDateTime::now_utc();
if dt + ONE_HOUR < now { if dt + ONE_HOUR < now {
dt += ONE_HOUR; dt += ONE_HOUR;
(hour, day_str, last_line) = (dt.hour(), TimeUtils::date_from_datetime(dt), 0); current_file = CurrentFile::from_datetime(dt, &root);
info!( info!("Moving to new file: {:?}", current_file.path);
"Moving to new file: {:?}",
root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"))
);
continue; continue;
} }
tokio::time::sleep(TAIL_INTERVAL).await; tokio::time::sleep(TAIL_INTERVAL).await;

View File

@ -2,7 +2,7 @@ use crate::node::types::{BlockAndReceipts, EvmBlock};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
fs::File, fs::File,
io::{BufRead, BufReader}, io::{BufRead, BufReader, Seek, SeekFrom},
ops::RangeInclusive, ops::RangeInclusive,
path::{Path, PathBuf}, path::{Path, PathBuf},
}; };
@ -18,6 +18,7 @@ pub struct ScanResult {
pub new_block_ranges: Vec<RangeInclusive<u64>>, pub new_block_ranges: Vec<RangeInclusive<u64>>,
} }
#[derive(Debug, Clone)]
pub struct ScanOptions { pub struct ScanOptions {
pub start_height: u64, pub start_height: u64,
pub only_load_ranges: bool, pub only_load_ranges: bool,
@ -25,6 +26,57 @@ pub struct ScanOptions {
pub struct Scanner; pub struct Scanner;
/// Stream for sequentially reading lines from a file.
///
/// This struct allows sequential iteration over lines over [Self::next] method.
/// It is resilient to cases where the line producer process is interrupted while writing:
/// - If a line is incomplete but still ends with a line ending, it is skipped: later, the fallback
/// block source will be used to retrieve the missing block.
/// - If a line does not end with a newline (i.e., the write was incomplete), the method returns
/// `None` to break out of the loop and avoid reading partial data.
/// - If a temporary I/O error occurs, the stream exits the loop without rewinding the cursor, which
/// will result in skipping ahead to the next unread bytes.
pub struct LineStream {
path: PathBuf,
reader: BufReader<File>,
}
impl LineStream {
pub fn from_path(path: &Path) -> std::io::Result<Self> {
let reader = BufReader::with_capacity(1024 * 1024, File::open(path)?);
Ok(Self { path: path.to_path_buf(), reader })
}
pub fn next(&mut self) -> Option<String> {
let mut line_buffer = vec![];
let Ok(size) = self.reader.read_until(b'\n', &mut line_buffer) else {
// Temporary I/O error; restart the loop
return None;
};
// Now cursor is right after the end of the line
// On UTF-8 error, skip the line
let Ok(mut line) = String::from_utf8(line_buffer) else {
return Some(String::new());
};
// If line is not completed yet, return None so that we can break the loop
if line.ends_with('\n') {
if line.ends_with('\r') {
line.pop();
}
line.pop();
return Some(line);
}
// info!("Line is not completed yet: {}", line);
if size != 0 {
self.reader.seek(SeekFrom::Current(-(size as i64))).unwrap();
}
None
}
}
impl Scanner { impl Scanner {
pub fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { pub fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts = let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts =
@ -35,31 +87,20 @@ impl Scanner {
Ok((parsed_block, height)) Ok((parsed_block, height))
} }
pub fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult { pub fn scan_hour_file(line_stream: &mut LineStream, options: ScanOptions) -> ScanResult {
let lines: Vec<String> =
BufReader::new(File::open(path).expect("Failed to open hour file"))
.lines()
.collect::<Result<_, _>>()
.unwrap();
let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
let mut new_blocks = Vec::new(); let mut new_blocks = Vec::new();
let mut last_height = options.start_height; let mut last_height = options.start_height;
let mut block_ranges = Vec::new(); let mut block_ranges = Vec::new();
let mut current_range: Option<(u64, u64)> = None; let mut current_range: Option<(u64, u64)> = None;
for (line_idx, line) in lines.iter().enumerate().skip(skip) { while let Some(line) = line_stream.next() {
if line_idx < *last_line || line.trim().is_empty() { match Self::line_to_evm_block(&line) {
continue;
}
match Self::line_to_evm_block(line) {
Ok((parsed_block, height)) => { Ok((parsed_block, height)) => {
if height >= options.start_height { if height >= options.start_height {
last_height = last_height.max(height); last_height = last_height.max(height);
if !options.only_load_ranges { if !options.only_load_ranges {
new_blocks.push(parsed_block); new_blocks.push(parsed_block);
} }
*last_line = line_idx;
} }
match current_range { match current_range {
@ -74,7 +115,7 @@ impl Scanner {
} }
} }
} }
Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)), Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(&line)),
} }
} }
@ -82,7 +123,7 @@ impl Scanner {
block_ranges.push(start..=end); block_ranges.push(start..=end);
} }
ScanResult { ScanResult {
path: path.to_path_buf(), path: line_stream.path.clone(),
next_expected_height: last_height + 1, next_expected_height: last_height + 1,
new_blocks, new_blocks,
new_block_ranges: block_ranges, new_block_ranges: block_ranges,