refactor: Code style, lint

This commit is contained in:
sprites0
2025-08-04 02:50:39 -04:00
parent a8df1fdaeb
commit bd9a0020e6

View File

@ -1,3 +1,9 @@
use super::{BlockSource, BlockSourceBoxed};
use crate::node::types::{BlockAndReceipts, EvmBlock};
use futures::future::BoxFuture;
use rangemap::RangeInclusiveMap;
use reth_network::cache::LruMap;
use serde::{Deserialize, Serialize};
use std::{ use std::{
fs::File, fs::File,
io::{BufRead, BufReader, Read, Seek, SeekFrom}, io::{BufRead, BufReader, Read, Seek, SeekFrom},
@ -5,19 +11,10 @@ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
}; };
use futures::future::BoxFuture;
use rangemap::RangeInclusiveMap;
use reth_network::cache::LruMap;
use serde::{Deserialize, Serialize};
use time::{macros::format_description, Date, Duration, OffsetDateTime, Time}; use time::{macros::format_description, Date, Duration, OffsetDateTime, Time};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::{info, warn}; use tracing::{info, warn};
use crate::node::types::{BlockAndReceipts, EvmBlock};
use super::{BlockSource, BlockSourceBoxed};
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
const HOURLY_SUBDIR: &str = "hourly"; const HOURLY_SUBDIR: &str = "hourly";
@ -57,6 +54,11 @@ struct ScanResult {
new_block_ranges: Vec<RangeInclusive<u64>>, new_block_ranges: Vec<RangeInclusive<u64>>,
} }
struct ScanOptions {
start_height: u64,
only_load_ranges: bool,
}
fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts =
serde_json::from_str(line)?; serde_json::from_str(line)?;
@ -66,11 +68,6 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)>
Ok((parsed_block, height)) Ok((parsed_block, height))
} }
struct ScanOptions {
start_height: u64,
only_load_ranges: bool,
}
fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult { fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult {
let file = File::open(path).expect("Failed to open hour file path"); let file = File::open(path).expect("Failed to open hour file path");
let reader = BufReader::new(file); let reader = BufReader::new(file);
@ -82,7 +79,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> S
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap(); let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
let mut block_ranges: Vec<RangeInclusive<u64>> = Vec::new(); let mut block_ranges = Vec::new();
let mut current_range: Option<(u64, u64)> = None; let mut current_range: Option<(u64, u64)> = None;
for (line_idx, line) in lines.iter().enumerate().skip(skip) { for (line_idx, line) in lines.iter().enumerate().skip(skip) {
@ -99,13 +96,17 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> S
} }
*last_line = line_idx; *last_line = line_idx;
} }
if matches!(current_range, Some((_, end)) if end + 1 == height) {
current_range = Some((current_range.unwrap().0, height)); match current_range {
} else { Some((start, end)) if end + 1 == height => {
if let Some((start, end)) = current_range.take() { current_range = Some((start, height));
block_ranges.push(start..=end); }
_ => {
if let Some((start, end)) = current_range.take() {
block_ranges.push(start..=end);
}
current_range = Some((height, height));
} }
current_range = Some((height, height));
} }
} }
Err(_) => { Err(_) => {
@ -115,7 +116,7 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> S
} }
} }
if let Some((start, end)) = current_range.take() { if let Some((start, end)) = current_range {
block_ranges.push(start..=end); block_ranges.push(start..=end);
} }
@ -144,26 +145,26 @@ impl BlockSource for HlNodeBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> { fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
Box::pin(async move { Box::pin(async move {
let now = OffsetDateTime::now_utc(); let now = OffsetDateTime::now_utc();
if let Some(block) = self.try_collect_local_block(height).await { if let Some(block) = self.try_collect_local_block(height).await {
self.update_last_fetch(height, now).await; self.update_last_fetch(height, now).await;
Ok(block) return Ok(block);
} else { }
if let Some((last_height, last_poll_time)) = *self.last_local_fetch.lock().await {
let more_recent = last_height < height; if let Some((last_height, last_poll_time)) = *self.last_local_fetch.lock().await {
let too_soon = let more_recent = last_height < height;
now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK; let too_soon = now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK;
if more_recent && too_soon { if more_recent && too_soon {
return Err(eyre::eyre!( return Err(eyre::eyre!(
"Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up" "Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up"
)); ));
}
} }
info!("Falling back to s3/ingest-dir for block @ Height [{height}]");
let block = self.fallback.collect_block(height).await?;
self.update_last_fetch(height, now).await;
Ok(block)
} }
info!("Falling back to s3/ingest-dir for block @ Height [{height}]");
let block = self.fallback.collect_block(height).await?;
self.update_last_fetch(height, now).await;
Ok(block)
}) })
} }
@ -176,6 +177,7 @@ impl BlockSource for HlNodeBlockSource {
); );
return self.fallback.find_latest_block_number().await; return self.fallback.find_latest_block_number().await;
}; };
let mut file = File::open(&dir).expect("Failed to open hour file path"); let mut file = File::open(&dir).expect("Failed to open hour file path");
if let Some((_, height)) = read_last_complete_line(&mut file) { if let Some((_, height)) = read_last_complete_line(&mut file) {
info!("Latest block number: {} with path {}", height, dir.display()); info!("Latest block number: {} with path {}", height, dir.display());
@ -199,7 +201,7 @@ fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> Option<(BlockAndRece
const CHUNK_SIZE: u64 = 50000; const CHUNK_SIZE: u64 = 50000;
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); let mut buf = Vec::with_capacity(CHUNK_SIZE as usize);
let mut pos = read.seek(SeekFrom::End(0)).unwrap(); let mut pos = read.seek(SeekFrom::End(0)).unwrap();
let mut last_line: Vec<u8> = Vec::new(); let mut last_line = Vec::new();
while pos > 0 { while pos > 0 {
let read_size = std::cmp::min(pos, CHUNK_SIZE); let read_size = std::cmp::min(pos, CHUNK_SIZE);
@ -217,7 +219,7 @@ fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> Option<(BlockAndRece
if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') {
let candidate = &last_line[idx + 1..]; let candidate = &last_line[idx + 1..];
if let Ok((evm_block, height)) = if let Ok((evm_block, height)) =
line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap()) line_to_evm_block(str::from_utf8(candidate).unwrap())
{ {
return Some((evm_block, height)); return Some((evm_block, height));
} }
@ -244,12 +246,13 @@ impl HlNodeBlockSource {
pub(crate) const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000); pub(crate) const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000);
async fn update_last_fetch(&self, height: u64, now: OffsetDateTime) { async fn update_last_fetch(&self, height: u64, now: OffsetDateTime) {
if let Some((last_height, _)) = *self.last_local_fetch.lock().await { let mut last_fetch = self.last_local_fetch.lock().await;
if let Some((last_height, _)) = *last_fetch {
if last_height >= height { if last_height >= height {
return; return;
} }
} }
*self.last_local_fetch.lock().await = Some((height, now)); *last_fetch = Some((height, now));
} }
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> { async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
@ -258,9 +261,7 @@ impl HlNodeBlockSource {
return Some(block); return Some(block);
} }
let Some(path) = u_cache.ranges.get(&height).cloned() else { let path = u_cache.ranges.get(&height).cloned()?;
return None;
};
info!("Loading block data from {:?}", path); info!("Loading block data from {:?}", path);
u_cache.load_scan_result(scan_hour_file( u_cache.load_scan_result(scan_hour_file(
@ -277,7 +278,7 @@ impl HlNodeBlockSource {
let hour: u8 = hour_part.parse().ok()?; let hour: u8 = hour_part.parse().ok()?;
Some(OffsetDateTime::new_utc( Some(OffsetDateTime::new_utc(
Date::parse(&format!("{dt_part}"), &format_description!("[year][month][day]")).ok()?, Date::parse(dt_part, &format_description!("[year][month][day]")).ok()?,
Time::from_hms(hour, 0, 0).ok()?, Time::from_hms(hour, 0, 0).ok()?,
)) ))
} }
@ -285,6 +286,7 @@ impl HlNodeBlockSource {
fn all_hourly_files(root: &Path) -> Option<Vec<PathBuf>> { fn all_hourly_files(root: &Path) -> Option<Vec<PathBuf>> {
let dir = root.join(HOURLY_SUBDIR); let dir = root.join(HOURLY_SUBDIR);
let mut files = Vec::new(); let mut files = Vec::new();
for entry in std::fs::read_dir(dir).ok()? { for entry in std::fs::read_dir(dir).ok()? {
let file = entry.ok()?.path(); let file = entry.ok()?.path();
let subfiles: Vec<_> = std::fs::read_dir(&file) let subfiles: Vec<_> = std::fs::read_dir(&file)
@ -294,6 +296,7 @@ impl HlNodeBlockSource {
.collect(); .collect();
files.extend(subfiles); files.extend(subfiles);
} }
files.sort(); files.sort();
Some(files) Some(files)
} }
@ -311,6 +314,7 @@ impl HlNodeBlockSource {
for subfile in Self::all_hourly_files(root).unwrap_or_default() { for subfile in Self::all_hourly_files(root).unwrap_or_default() {
let mut file = File::open(&subfile).expect("Failed to open hour file path"); let mut file = File::open(&subfile).expect("Failed to open hour file path");
if let Some((_, height)) = read_last_complete_line(&mut file) { if let Some((_, height)) = read_last_complete_line(&mut file) {
if height < cutoff_height { if height < cutoff_height {
continue; continue;
@ -430,17 +434,20 @@ impl HlNodeBlockSource {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256};
use super::*; use super::*;
use crate::node::types::reth_compat;
use crate::node::types::ReadPrecompileCalls; use crate::node::types::ReadPrecompileCalls;
use crate::pseudo_peer::sources::LocalBlockSource; use crate::pseudo_peer::sources::LocalBlockSource;
use alloy_consensus::{BlockBody, Header};
use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256};
use std::io::Write;
use std::time::Duration;
#[test] #[test]
fn test_datetime_from_path() { fn test_datetime_from_path() {
let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4"); let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4");
let dt = HlNodeBlockSource::datetime_from_path(path).unwrap(); let dt = HlNodeBlockSource::datetime_from_path(path).unwrap();
println!("{:?}", dt); println!("{dt:?}");
} }
#[tokio::test] #[tokio::test]
@ -451,7 +458,7 @@ mod tests {
} }
let cache = Arc::new(Mutex::new(LocalBlocksCache::new())); let cache = Arc::new(Mutex::new(LocalBlocksCache::new()));
HlNodeBlockSource::try_backfill_local_blocks(&test_path, &cache, 1000000).await.unwrap(); HlNodeBlockSource::try_backfill_local_blocks(test_path, &cache, 1000000).await.unwrap();
let u_cache = cache.lock().await; let u_cache = cache.lock().await;
println!("{:?}", u_cache.ranges); println!("{:?}", u_cache.ranges);
@ -461,12 +468,6 @@ mod tests {
); );
} }
use std::io::Write;
use std::time::Duration;
use crate::node::types::reth_compat;
use alloy_consensus::{BlockBody, Header};
fn scan_result_from_single_block(block: BlockAndReceipts) -> ScanResult { fn scan_result_from_single_block(block: BlockAndReceipts) -> ScanResult {
let height = match &block.block { let height = match &block.block {
EvmBlock::Reth115(b) => b.header.header.number, EvmBlock::Reth115(b) => b.header.header.number,