From bfd61094eefeec8c3b41ef81680bf6dbc36162f3 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Sun, 5 Oct 2025 09:57:36 +0000 Subject: [PATCH 1/5] chore: cargo fmt --- src/addons/hl_node_compliance.rs | 2 +- src/node/rpc/estimate.rs | 20 +++++++++++--------- src/pseudo_peer/service.rs | 9 +++++---- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/src/addons/hl_node_compliance.rs b/src/addons/hl_node_compliance.rs index 9c6169985..0c2f68142 100644 --- a/src/addons/hl_node_compliance.rs +++ b/src/addons/hl_node_compliance.rs @@ -579,7 +579,7 @@ async fn adjust_transaction_receipt( fn system_tx_count_for_block(eth_api: &Eth, block_id: BlockId) -> usize { let provider = eth_api.provider(); let block = provider.block_by_id(block_id).unwrap().unwrap(); - + block.body.transactions().iter().filter(|tx| tx.is_system_transaction()).count() } diff --git a/src/node/rpc/estimate.rs b/src/node/rpc/estimate.rs index 392deb08a..fae7c8772 100644 --- a/src/node/rpc/estimate.rs +++ b/src/node/rpc/estimate.rs @@ -82,11 +82,12 @@ where let mut tx_env = self.create_txn_env(&evm_env, request, &mut db)?; let mut is_basic_transfer = false; - if tx_env.input().is_empty() - && let TxKind::Call(to) = tx_env.kind() - && let Ok(code) = db.db.account_code(&to) { - is_basic_transfer = code.map(|code| code.is_empty()).unwrap_or(true); - } + if tx_env.input().is_empty() && + let TxKind::Call(to) = tx_env.kind() && + let Ok(code) = db.db.account_code(&to) + { + is_basic_transfer = code.map(|code| code.is_empty()).unwrap_or(true); + } if tx_env.gas_price() > 0 { highest_gas_limit = @@ -105,10 +106,11 @@ where let mut min_tx_env = tx_env.clone(); min_tx_env.set_gas_limit(MIN_TRANSACTION_GAS); - if let Ok(res) = evm.transact(min_tx_env).map_err(Self::Error::from_evm_err) - && res.result.is_success() { - return Ok(U256::from(MIN_TRANSACTION_GAS)); - } + if let Ok(res) = evm.transact(min_tx_env).map_err(Self::Error::from_evm_err) && + res.result.is_success() + { + return Ok(U256::from(MIN_TRANSACTION_GAS)); + } } trace!(target: "rpc::eth::estimate", ?tx_env, gas_limit = tx_env.gas_limit(), is_basic_transfer, "Starting gas estimation"); diff --git a/src/pseudo_peer/service.rs b/src/pseudo_peer/service.rs index 4d4c74397..d0324b6e7 100644 --- a/src/pseudo_peer/service.rs +++ b/src/pseudo_peer/service.rs @@ -81,10 +81,11 @@ impl BlockPoller { .await .ok_or(eyre::eyre!("Failed to find latest block number"))?; - if let Some(debug_cutoff_height) = debug_cutoff_height - && next_block_number > debug_cutoff_height { - next_block_number = debug_cutoff_height; - } + if let Some(debug_cutoff_height) = debug_cutoff_height && + next_block_number > debug_cutoff_height + { + next_block_number = debug_cutoff_height; + } loop { match block_source.collect_block(next_block_number).await { From 0fd4b7943f31447ad19f3c5f7080307ac1d5d66d Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Sun, 5 Oct 2025 09:59:12 +0000 Subject: [PATCH 2/5] refactor: Use offsets instead of lines, wrap related structs in one --- src/pseudo_peer/sources/hl_node/mod.rs | 50 +++++++++++------ src/pseudo_peer/sources/hl_node/scan.rs | 75 +++++++++++++++++++------ 2 files changed, 92 insertions(+), 33 deletions(-) diff --git a/src/pseudo_peer/sources/hl_node/mod.rs b/src/pseudo_peer/sources/hl_node/mod.rs index 5af59bad0..65eeaf267 100644 --- a/src/pseudo_peer/sources/hl_node/mod.rs +++ b/src/pseudo_peer/sources/hl_node/mod.rs @@ -8,7 +8,7 @@ mod time_utils; use self::{ cache::LocalBlocksCache, file_ops::FileOperations, - scan::{ScanOptions, Scanner}, + scan::{LineStream, ScanOptions, Scanner}, time_utils::TimeUtils, }; use super::{BlockSource, BlockSourceBoxed}; @@ -120,6 +120,28 @@ impl BlockSource for HlNodeBlockSource { } } +struct CurrentFile { + path: PathBuf, + line_stream: Option, +} + +impl CurrentFile { + pub fn from_datetime(dt: OffsetDateTime, root: &Path) -> Self { + let (hour, day_str) = (dt.hour(), TimeUtils::date_from_datetime(dt)); + let path = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{}", hour)); + Self { path, line_stream: None } + } + + pub fn open(&mut self) -> eyre::Result<()> { + if self.line_stream.is_some() { + return Ok(()); + } + + self.line_stream = Some(LineStream::from_path(&self.path)?); + Ok(()) + } +} + impl HlNodeBlockSource { async fn update_last_fetch( last_local_fetch: Arc>>, @@ -142,9 +164,9 @@ impl HlNodeBlockSource { } let path = u_cache.get_path_for_height(height)?; info!("Loading block data from {:?}", path); + let mut line_stream = LineStream::from_path(&path).ok()?; let scan_result = Scanner::scan_hour_file( - &path, - &mut 0, + &mut line_stream, ScanOptions { start_height: 0, only_load_ranges: false }, ); u_cache.load_scan_result(scan_result); @@ -165,9 +187,10 @@ impl HlNodeBlockSource { } else { warn!("Failed to parse last line of file: {:?}", subfile); } + let mut line_stream = + LineStream::from_path(&subfile).expect("Failed to open line stream"); let mut scan_result = Scanner::scan_hour_file( - &subfile, - &mut 0, + &mut line_stream, ScanOptions { start_height: cutoff_height, only_load_ranges: true }, ); scan_result.new_blocks.clear(); // Only store ranges, load data lazily @@ -188,15 +211,13 @@ impl HlNodeBlockSource { } tokio::time::sleep(TAIL_INTERVAL).await; }; - let (mut hour, mut day_str, mut last_line) = - (dt.hour(), TimeUtils::date_from_datetime(dt), 0); + let mut current_file = CurrentFile::from_datetime(dt, &root); info!("Starting local ingest loop from height: {}", current_head); loop { - let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); - if hour_file.exists() { + let _ = current_file.open(); + if let Some(line_stream) = &mut current_file.line_stream { let scan_result = Scanner::scan_hour_file( - &hour_file, - &mut last_line, + line_stream, ScanOptions { start_height: next_height, only_load_ranges: false }, ); next_height = scan_result.next_expected_height; @@ -205,11 +226,8 @@ impl HlNodeBlockSource { let now = OffsetDateTime::now_utc(); if dt + ONE_HOUR < now { dt += ONE_HOUR; - (hour, day_str, last_line) = (dt.hour(), TimeUtils::date_from_datetime(dt), 0); - info!( - "Moving to new file: {:?}", - root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")) - ); + current_file = CurrentFile::from_datetime(dt, &root); + info!("Moving to new file: {:?}", current_file.path); continue; } tokio::time::sleep(TAIL_INTERVAL).await; diff --git a/src/pseudo_peer/sources/hl_node/scan.rs b/src/pseudo_peer/sources/hl_node/scan.rs index 5a8fe43ce..7ae0c775b 100644 --- a/src/pseudo_peer/sources/hl_node/scan.rs +++ b/src/pseudo_peer/sources/hl_node/scan.rs @@ -2,7 +2,7 @@ use crate::node::types::{BlockAndReceipts, EvmBlock}; use serde::{Deserialize, Serialize}; use std::{ fs::File, - io::{BufRead, BufReader}, + io::{BufRead, BufReader, Seek, SeekFrom}, ops::RangeInclusive, path::{Path, PathBuf}, }; @@ -18,6 +18,7 @@ pub struct ScanResult { pub new_block_ranges: Vec>, } +#[derive(Debug, Clone)] pub struct ScanOptions { pub start_height: u64, pub only_load_ranges: bool, @@ -25,6 +26,57 @@ pub struct ScanOptions { pub struct Scanner; +/// Stream for sequentially reading lines from a file. +/// +/// This struct allows sequential iteration over lines over [Self::next] method. +/// It is resilient to cases where the line producer process is interrupted while writing: +/// - If a line is incomplete but still ends with a line ending, it is skipped: later, the fallback +/// block source will be used to retrieve the missing block. +/// - If a line does not end with a newline (i.e., the write was incomplete), the method returns +/// `None` to break out of the loop and avoid reading partial data. +/// - If a temporary I/O error occurs, the stream exits the loop without rewinding the cursor, which +/// will result in skipping ahead to the next unread bytes. +pub struct LineStream { + path: PathBuf, + reader: BufReader, +} + +impl LineStream { + pub fn from_path(path: &Path) -> std::io::Result { + let reader = BufReader::with_capacity(1024 * 1024, File::open(path)?); + Ok(Self { path: path.to_path_buf(), reader }) + } + + pub fn next(&mut self) -> Option { + let mut line_buffer = vec![]; + let Ok(size) = self.reader.read_until(b'\n', &mut line_buffer) else { + // Temporary I/O error; restart the loop + return None; + }; + + // Now cursor is right after the end of the line + // On UTF-8 error, skip the line + let Ok(mut line) = String::from_utf8(line_buffer) else { + return Some(String::new()); + }; + + // If line is not completed yet, return None so that we can break the loop + if line.ends_with('\n') { + if line.ends_with('\r') { + line.pop(); + } + line.pop(); + return Some(line); + } + + // info!("Line is not completed yet: {}", line); + if size != 0 { + self.reader.seek(SeekFrom::Current(-(size as i64))).unwrap(); + } + None + } +} + impl Scanner { pub fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts = @@ -35,31 +87,20 @@ impl Scanner { Ok((parsed_block, height)) } - pub fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult { - let lines: Vec = - BufReader::new(File::open(path).expect("Failed to open hour file")) - .lines() - .collect::>() - .unwrap(); - let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; + pub fn scan_hour_file(line_stream: &mut LineStream, options: ScanOptions) -> ScanResult { let mut new_blocks = Vec::new(); let mut last_height = options.start_height; let mut block_ranges = Vec::new(); let mut current_range: Option<(u64, u64)> = None; - for (line_idx, line) in lines.iter().enumerate().skip(skip) { - if line_idx < *last_line || line.trim().is_empty() { - continue; - } - - match Self::line_to_evm_block(line) { + while let Some(line) = line_stream.next() { + match Self::line_to_evm_block(&line) { Ok((parsed_block, height)) => { if height >= options.start_height { last_height = last_height.max(height); if !options.only_load_ranges { new_blocks.push(parsed_block); } - *last_line = line_idx; } match current_range { @@ -74,7 +115,7 @@ impl Scanner { } } } - Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)), + Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(&line)), } } @@ -82,7 +123,7 @@ impl Scanner { block_ranges.push(start..=end); } ScanResult { - path: path.to_path_buf(), + path: line_stream.path.clone(), next_expected_height: last_height + 1, new_blocks, new_block_ranges: block_ranges, From b8bae7cde975cfd2daba2c5162a72de2277bae5b Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Sun, 5 Oct 2025 11:26:03 +0000 Subject: [PATCH 3/5] fix: Utillize LruMap better LruMap was introduced to allow getting the same block twice, so removing the item when getting the block doesn't make sense. --- src/pseudo_peer/sources/hl_node/cache.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pseudo_peer/sources/hl_node/cache.rs b/src/pseudo_peer/sources/hl_node/cache.rs index a484db3b1..08412147c 100644 --- a/src/pseudo_peer/sources/hl_node/cache.rs +++ b/src/pseudo_peer/sources/hl_node/cache.rs @@ -27,7 +27,7 @@ impl LocalBlocksCache { } pub fn get_block(&mut self, height: u64) -> Option { - self.cache.remove(&height) + self.cache.get(&height).cloned() } pub fn get_path_for_height(&self, height: u64) -> Option { From 12f366573e34fcbf79854ba3a613dd56def74d32 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Sun, 5 Oct 2025 11:26:51 +0000 Subject: [PATCH 4/5] fix: Do not increase block counter when no block is read This made ingest loop to infinitely increase the block number --- src/pseudo_peer/sources/hl_node/scan.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pseudo_peer/sources/hl_node/scan.rs b/src/pseudo_peer/sources/hl_node/scan.rs index 7ae0c775b..574815dcc 100644 --- a/src/pseudo_peer/sources/hl_node/scan.rs +++ b/src/pseudo_peer/sources/hl_node/scan.rs @@ -18,7 +18,6 @@ pub struct ScanResult { pub new_block_ranges: Vec>, } -#[derive(Debug, Clone)] pub struct ScanOptions { pub start_height: u64, pub only_load_ranges: bool, @@ -122,9 +121,10 @@ impl Scanner { if let Some((start, end)) = current_range { block_ranges.push(start..=end); } + ScanResult { path: line_stream.path.clone(), - next_expected_height: last_height + 1, + next_expected_height: last_height + current_range.is_some() as u64, new_blocks, new_block_ranges: block_ranges, } From 4d83b687d4eb761d61475dee693c7c0e9fba7ca7 Mon Sep 17 00:00:00 2001 From: sprites0 <199826320+sprites0@users.noreply.github.com> Date: Sun, 5 Oct 2025 11:27:23 +0000 Subject: [PATCH 5/5] feat: Add metrics for file read triggered Usually, "Loading block data from ..." shouldn't be shown in logs at all. Add metrics to detect the file read. --- src/pseudo_peer/sources/hl_node/mod.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/pseudo_peer/sources/hl_node/mod.rs b/src/pseudo_peer/sources/hl_node/mod.rs index 65eeaf267..9b6c43774 100644 --- a/src/pseudo_peer/sources/hl_node/mod.rs +++ b/src/pseudo_peer/sources/hl_node/mod.rs @@ -52,6 +52,8 @@ pub struct HlNodeBlockSourceMetrics { pub fetched_from_hl_node: Counter, /// How many times the HL node block source is fetched from the fallback pub fetched_from_fallback: Counter, + /// How many times `try_collect_local_block` was faster than ingest loop + pub file_read_triggered: Counter, } impl BlockSource for HlNodeBlockSource { @@ -64,7 +66,9 @@ impl BlockSource for HlNodeBlockSource { Box::pin(async move { let now = OffsetDateTime::now_utc(); - if let Some(block) = Self::try_collect_local_block(local_blocks_cache, height).await { + if let Some(block) = + Self::try_collect_local_block(&metrics, local_blocks_cache, height).await + { Self::update_last_fetch(last_local_fetch, height, now).await; metrics.fetched_from_hl_node.increment(1); return Ok(block); @@ -155,6 +159,7 @@ impl HlNodeBlockSource { } async fn try_collect_local_block( + metrics: &HlNodeBlockSourceMetrics, local_blocks_cache: Arc>, height: u64, ) -> Option { @@ -164,6 +169,7 @@ impl HlNodeBlockSource { } let path = u_cache.get_path_for_height(height)?; info!("Loading block data from {:?}", path); + metrics.file_read_triggered.increment(1); let mut line_stream = LineStream::from_path(&path).ok()?; let scan_result = Scanner::scan_hour_file( &mut line_stream,