diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index 7d44aca44..4373ea437 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -1,3 +1,9 @@ +use super::{BlockSource, BlockSourceBoxed}; +use crate::node::types::{BlockAndReceipts, EvmBlock}; +use futures::future::BoxFuture; +use rangemap::RangeInclusiveMap; +use reth_network::cache::LruMap; +use serde::{Deserialize, Serialize}; use std::{ fs::File, io::{BufRead, BufReader, Read, Seek, SeekFrom}, @@ -5,19 +11,10 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; - -use futures::future::BoxFuture; -use rangemap::RangeInclusiveMap; -use reth_network::cache::LruMap; -use serde::{Deserialize, Serialize}; use time::{macros::format_description, Date, Duration, OffsetDateTime, Time}; use tokio::sync::Mutex; use tracing::{info, warn}; -use crate::node::types::{BlockAndReceipts, EvmBlock}; - -use super::{BlockSource, BlockSourceBoxed}; - const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); const HOURLY_SUBDIR: &str = "hourly"; @@ -57,6 +54,11 @@ struct ScanResult { new_block_ranges: Vec>, } +struct ScanOptions { + start_height: u64, + only_load_ranges: bool, +} + fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = serde_json::from_str(line)?; @@ -66,11 +68,6 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> Ok((parsed_block, height)) } -struct ScanOptions { - start_height: u64, - only_load_ranges: bool, -} - fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult { let file = File::open(path).expect("Failed to open hour file path"); let reader = BufReader::new(file); @@ -82,7 +79,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> S let lines: Vec = reader.lines().collect::>().unwrap(); let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; - let mut block_ranges: Vec> = Vec::new(); + let mut block_ranges = Vec::new(); let mut current_range: Option<(u64, u64)> = None; for (line_idx, line) in lines.iter().enumerate().skip(skip) { @@ -99,13 +96,17 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> S } *last_line = line_idx; } - if matches!(current_range, Some((_, end)) if end + 1 == height) { - current_range = Some((current_range.unwrap().0, height)); - } else { - if let Some((start, end)) = current_range.take() { - block_ranges.push(start..=end); + + match current_range { + Some((start, end)) if end + 1 == height => { + current_range = Some((start, height)); + } + _ => { + if let Some((start, end)) = current_range.take() { + block_ranges.push(start..=end); + } + current_range = Some((height, height)); } - current_range = Some((height, height)); } } Err(_) => { @@ -115,7 +116,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> S } } - if let Some((start, end)) = current_range.take() { + if let Some((start, end)) = current_range { block_ranges.push(start..=end); } @@ -144,26 +145,26 @@ impl BlockSource for HlNodeBlockSource { fn collect_block(&self, height: u64) -> BoxFuture> { Box::pin(async move { let now = OffsetDateTime::now_utc(); + if let Some(block) = self.try_collect_local_block(height).await { self.update_last_fetch(height, now).await; - Ok(block) - } else { - if let Some((last_height, last_poll_time)) = *self.last_local_fetch.lock().await { - let more_recent = last_height < height; - let too_soon = - now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK; - if more_recent && too_soon { - return Err(eyre::eyre!( + return Ok(block); + } + + if let Some((last_height, last_poll_time)) = *self.last_local_fetch.lock().await { + let more_recent = last_height < height; + let too_soon = now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK; + if more_recent && too_soon { + return Err(eyre::eyre!( "Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up" )); - } } - - info!("Falling back to s3/ingest-dir for block @ Height [{height}]"); - let block = self.fallback.collect_block(height).await?; - self.update_last_fetch(height, now).await; - Ok(block) } + + info!("Falling back to s3/ingest-dir for block @ Height [{height}]"); + let block = self.fallback.collect_block(height).await?; + self.update_last_fetch(height, now).await; + Ok(block) }) } @@ -176,6 +177,7 @@ impl BlockSource for HlNodeBlockSource { ); return self.fallback.find_latest_block_number().await; }; + let mut file = File::open(&dir).expect("Failed to open hour file path"); if let Some((_, height)) = read_last_complete_line(&mut file) { info!("Latest block number: {} with path {}", height, dir.display()); @@ -199,7 +201,7 @@ fn read_last_complete_line(read: &mut R) -> Option<(BlockAndRece const CHUNK_SIZE: u64 = 50000; let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); let mut pos = read.seek(SeekFrom::End(0)).unwrap(); - let mut last_line: Vec = Vec::new(); + let mut last_line = Vec::new(); while pos > 0 { let read_size = std::cmp::min(pos, CHUNK_SIZE); @@ -217,7 +219,7 @@ fn read_last_complete_line(read: &mut R) -> Option<(BlockAndRece if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { let candidate = &last_line[idx + 1..]; if let Ok((evm_block, height)) = - line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap()) + line_to_evm_block(str::from_utf8(candidate).unwrap()) { return Some((evm_block, height)); } @@ -244,12 +246,13 @@ impl HlNodeBlockSource { pub(crate) const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000); async fn update_last_fetch(&self, height: u64, now: OffsetDateTime) { - if let Some((last_height, _)) = *self.last_local_fetch.lock().await { + let mut last_fetch = self.last_local_fetch.lock().await; + if let Some((last_height, _)) = *last_fetch { if last_height >= height { return; } } - *self.last_local_fetch.lock().await = Some((height, now)); + *last_fetch = Some((height, now)); } async fn try_collect_local_block(&self, height: u64) -> Option { @@ -258,9 +261,7 @@ impl HlNodeBlockSource { return Some(block); } - let Some(path) = u_cache.ranges.get(&height).cloned() else { - return None; - }; + let path = u_cache.ranges.get(&height).cloned()?; info!("Loading block data from {:?}", path); u_cache.load_scan_result(scan_hour_file( @@ -277,7 +278,7 @@ impl HlNodeBlockSource { let hour: u8 = hour_part.parse().ok()?; Some(OffsetDateTime::new_utc( - Date::parse(&format!("{dt_part}"), &format_description!("[year][month][day]")).ok()?, + Date::parse(dt_part, &format_description!("[year][month][day]")).ok()?, Time::from_hms(hour, 0, 0).ok()?, )) } @@ -285,6 +286,7 @@ impl HlNodeBlockSource { fn all_hourly_files(root: &Path) -> Option> { let dir = root.join(HOURLY_SUBDIR); let mut files = Vec::new(); + for entry in std::fs::read_dir(dir).ok()? { let file = entry.ok()?.path(); let subfiles: Vec<_> = std::fs::read_dir(&file) @@ -294,6 +296,7 @@ impl HlNodeBlockSource { .collect(); files.extend(subfiles); } + files.sort(); Some(files) } @@ -311,6 +314,7 @@ impl HlNodeBlockSource { for subfile in Self::all_hourly_files(root).unwrap_or_default() { let mut file = File::open(&subfile).expect("Failed to open hour file path"); + if let Some((_, height)) = read_last_complete_line(&mut file) { if height < cutoff_height { continue; @@ -430,17 +434,20 @@ impl HlNodeBlockSource { #[cfg(test)] mod tests { - use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256}; - use super::*; + use crate::node::types::reth_compat; use crate::node::types::ReadPrecompileCalls; use crate::pseudo_peer::sources::LocalBlockSource; + use alloy_consensus::{BlockBody, Header}; + use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256}; + use std::io::Write; + use std::time::Duration; #[test] fn test_datetime_from_path() { let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4"); let dt = HlNodeBlockSource::datetime_from_path(path).unwrap(); - println!("{:?}", dt); + println!("{dt:?}"); } #[tokio::test] @@ -451,7 +458,7 @@ mod tests { } let cache = Arc::new(Mutex::new(LocalBlocksCache::new())); - HlNodeBlockSource::try_backfill_local_blocks(&test_path, &cache, 1000000).await.unwrap(); + HlNodeBlockSource::try_backfill_local_blocks(test_path, &cache, 1000000).await.unwrap(); let u_cache = cache.lock().await; println!("{:?}", u_cache.ranges); @@ -461,12 +468,6 @@ mod tests { ); } - use std::io::Write; - use std::time::Duration; - - use crate::node::types::reth_compat; - use alloy_consensus::{BlockBody, Header}; - fn scan_result_from_single_block(block: BlockAndReceipts) -> ScanResult { let height = match &block.block { EvmBlock::Reth115(b) => b.header.header.number,