7 Commits

Author SHA1 Message Date
0180711ae4 perf: Constrain memory size again, add log 2025-07-31 01:56:39 -04:00
a766ee0236 feat: Support range-based backfill for hl-node ingestion 2025-07-31 01:51:22 -04:00
46c9d4cbf9 fix: Fix path parser 2025-07-31 01:00:28 -04:00
78ae5643b1 fix: Should use fallback when there is no hl-node files 2025-07-30 23:32:25 -04:00
4e59ee62dc fix: Reduce fallback usage in hl-node ingestion 2025-07-30 23:29:18 -04:00
c34ee01b70 fix: Do not use LruMap
LruMap does not support backfill.
2025-07-30 22:20:21 -04:00
9fcc04e889 fix: Use correct cutoff block number 2025-07-30 21:27:55 -04:00
5 changed files with 243 additions and 154 deletions

8
Cargo.lock generated
View File

@ -6504,6 +6504,12 @@ dependencies = [
"rand_core 0.9.3", "rand_core 0.9.3",
] ]
[[package]]
name = "rangemap"
version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f93e7e49bb0bf967717f7bd674458b3d6b0c5f48ec7e3038166026a69fc22223"
[[package]] [[package]]
name = "ratatui" name = "ratatui"
version = "0.29.0" version = "0.29.0"
@ -9308,6 +9314,7 @@ dependencies = [
"lz4_flex", "lz4_flex",
"once_cell", "once_cell",
"parking_lot", "parking_lot",
"rangemap",
"rayon", "rayon",
"reth", "reth",
"reth-basic-payload-builder", "reth-basic-payload-builder",
@ -9341,6 +9348,7 @@ dependencies = [
"reth-rpc", "reth-rpc",
"reth-rpc-engine-api", "reth-rpc-engine-api",
"reth-rpc-eth-api", "reth-rpc-eth-api",
"reth-stages-types",
"reth-tracing", "reth-tracing",
"reth-transaction-pool", "reth-transaction-pool",
"reth-trie-common", "reth-trie-common",

View File

@ -61,6 +61,7 @@ reth-trie-common = { git = "https://github.com/sprites0/reth", rev = "fc754e5983
reth-trie-db = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-trie-db = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-codecs = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-codecs = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-transaction-pool = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-transaction-pool = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-stages-types = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
revm = { version = "26.0.1" } revm = { version = "26.0.1" }
# alloy dependencies # alloy dependencies
@ -111,6 +112,7 @@ aws-sdk-s3 = "1.93.0"
aws-config = "1.8.0" aws-config = "1.8.0"
rayon = "1.7" rayon = "1.7"
time = "0.3.41" time = "0.3.41"
rangemap = "=1.6.0"
[target.'cfg(unix)'.dependencies] [target.'cfg(unix)'.dependencies]
@ -161,3 +163,6 @@ client = [
"jsonrpsee/async-client", "jsonrpsee/async-client",
"reth-rpc-eth-api/client", "reth-rpc-eth-api/client",
] ]
[profile.test]
inherits = "release"

View File

@ -24,6 +24,8 @@ use reth_eth_wire::{BasicNetworkPrimitives, NewBlock, NewBlockPayload};
use reth_ethereum_primitives::PooledTransactionVariant; use reth_ethereum_primitives::PooledTransactionVariant;
use reth_network::{NetworkConfig, NetworkHandle, NetworkManager}; use reth_network::{NetworkConfig, NetworkHandle, NetworkManager};
use reth_network_api::PeersInfo; use reth_network_api::PeersInfo;
use reth_provider::StageCheckpointReader;
use reth_stages_types::StageId;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::sync::{mpsc, oneshot, Mutex};
use tracing::info; use tracing::info;
@ -235,8 +237,13 @@ where
let chain_spec = ctx.chain_spec(); let chain_spec = ctx.chain_spec();
info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized"); info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized");
let next_block_number =
ctx.provider().get_stage_checkpoint(StageId::Finish)?.unwrap_or_default().block_number
+ 1;
ctx.task_executor().spawn_critical("pseudo peer", async move { ctx.task_executor().spawn_critical("pseudo peer", async move {
let block_source = block_source_config.create_cached_block_source().await; let block_source =
block_source_config.create_cached_block_source(next_block_number).await;
start_pseudo_peer(chain_spec, local_node_record.to_string(), block_source) start_pseudo_peer(chain_spec, local_node_record.to_string(), block_source)
.await .await
.unwrap(); .unwrap();

View File

@ -72,6 +72,7 @@ impl BlockSourceConfig {
pub async fn create_block_source_from_node( pub async fn create_block_source_from_node(
&self, &self,
next_block_number: u64,
fallback_block_source: BlockSourceBoxed, fallback_block_source: BlockSourceBoxed,
) -> BlockSourceBoxed { ) -> BlockSourceBoxed {
let Some(block_source_from_node) = self.block_source_from_node.as_ref() else { let Some(block_source_from_node) = self.block_source_from_node.as_ref() else {
@ -82,14 +83,15 @@ impl BlockSourceConfig {
HlNodeBlockSource::new( HlNodeBlockSource::new(
fallback_block_source, fallback_block_source,
PathBuf::from(block_source_from_node.clone()), PathBuf::from(block_source_from_node.clone()),
next_block_number,
) )
.await, .await,
)) ))
} }
pub async fn create_cached_block_source(&self) -> BlockSourceBoxed { pub async fn create_cached_block_source(&self, next_block_number: u64) -> BlockSourceBoxed {
let block_source = self.create_block_source().await; let block_source = self.create_block_source().await;
let block_source = self.create_block_source_from_node(block_source).await; let block_source = self.create_block_source_from_node(next_block_number, block_source).await;
Arc::new(Box::new(CachedBlockSource::new(block_source))) Arc::new(Box::new(CachedBlockSource::new(block_source)))
} }
} }

View File

@ -1,14 +1,16 @@
use std::{ use std::{
fs::File,
io::{BufRead, BufReader, Read, Seek, SeekFrom}, io::{BufRead, BufReader, Read, Seek, SeekFrom},
ops::RangeInclusive,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::Arc, sync::Arc,
}; };
use eyre::{Context, ContextCompat};
use futures::future::BoxFuture; use futures::future::BoxFuture;
use reth_network::cache::LruMap; use reth_network::cache::LruMap;
use rangemap::RangeInclusiveMap;
use serde::Deserialize; use serde::Deserialize;
use time::{format_description, Duration, OffsetDateTime}; use time::{macros::format_description, Date, Duration, OffsetDateTime, Time};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tracing::{info, warn}; use tracing::{info, warn};
@ -16,38 +18,54 @@ use crate::node::types::{BlockAndReceipts, EvmBlock};
use super::{BlockSource, BlockSourceBoxed}; use super::{BlockSource, BlockSourceBoxed};
/// Poll interval when tailing an *open* hourly file.
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
/// Subdirectory that contains day folders (inside `local_ingest_dir`).
const HOURLY_SUBDIR: &str = "hourly"; const HOURLY_SUBDIR: &str = "hourly";
/// Maximum number of blocks to cache blocks from hl-node.
/// In normal situation, 0~1 blocks will be cached.
const CACHE_SIZE: u32 = 1000;
type LocalBlocksCache = Arc<Mutex<LruMap<u64, BlockAndReceipts>>>; #[derive(Debug)]
pub struct LocalBlocksCache {
cache: LruMap<u64, BlockAndReceipts>,
// Lightweight range map to track the ranges of blocks in the local ingest directory
ranges: RangeInclusiveMap<u64, PathBuf>,
}
impl LocalBlocksCache {
// 3660 blocks per hour
const CACHE_SIZE: u32 = 8000;
fn new() -> Self {
Self {
cache: LruMap::new(Self::CACHE_SIZE),
ranges: RangeInclusiveMap::new(),
}
}
fn load_scan_result(&mut self, scan_result: ScanResult) {
for blk in scan_result.new_blocks {
let EvmBlock::Reth115(b) = &blk.block;
self.cache.insert(b.header.header.number, blk);
}
for range in scan_result.new_block_ranges {
self.ranges.insert(range, scan_result.path.clone());
}
}
}
/// Block source that monitors the local ingest directory for the HL node. /// Block source that monitors the local ingest directory for the HL node.
///
/// In certain situations, the [hl-node][ref] may offer lower latency compared to S3.
/// This block source caches blocks from the HL node to minimize latency,
/// while still falling back to [super::LocalBlockSource] or [super::S3BlockSource] when needed.
///
/// Originally introduced in https://github.com/hl-archive-node/nanoreth/pull/7
///
/// [ref]: https://github.com/hyperliquid-dex/node
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct HlNodeBlockSource { pub struct HlNodeBlockSource {
pub fallback: BlockSourceBoxed, pub fallback: BlockSourceBoxed,
pub local_ingest_dir: PathBuf, pub local_ingest_dir: PathBuf,
pub local_blocks_cache: LocalBlocksCache, // height → block pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>, // height → block
} }
#[derive(Deserialize)] #[derive(Deserialize)]
struct LocalBlockAndReceipts(String, BlockAndReceipts); struct LocalBlockAndReceipts(String, BlockAndReceipts);
struct ScanResult { struct ScanResult {
path: PathBuf,
next_expected_height: u64, next_expected_height: u64,
new_blocks: Vec<BlockAndReceipts>, new_blocks: Vec<BlockAndReceipts>,
new_block_ranges: Vec<RangeInclusive<u64>>,
} }
fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
@ -60,56 +78,64 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)>
} }
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult { fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult {
// info!( let file = File::open(path).expect("Failed to open hour file path");
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
// path, start_height, last_line
// );
let file = std::fs::File::open(path).expect("Failed to open hour file path");
let reader = BufReader::new(file); let reader = BufReader::new(file);
let mut new_blocks = Vec::<BlockAndReceipts>::new(); let mut new_blocks = Vec::new();
let mut last_height = start_height; let mut last_height = start_height;
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap(); let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
let mut block_ranges: Vec<RangeInclusive<u64>> = Vec::new();
let mut current_range: Option<(u64, u64)> = None;
for (line_idx, line) in lines.iter().enumerate().skip(skip) { for (line_idx, line) in lines.iter().enumerate().skip(skip) {
// Safety check ensuring efficiency if line_idx < *last_line || line.trim().is_empty() {
if line_idx < *last_line {
continue;
}
if line.trim().is_empty() {
continue; continue;
} }
let Ok((parsed_block, height)) = line_to_evm_block(&line) else { match line_to_evm_block(line) {
warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); Ok((parsed_block, height)) => {
continue; if height >= start_height {
}; last_height = last_height.max(height);
if height < start_height { new_blocks.push(parsed_block);
continue; *last_line = line_idx;
}
if matches!(current_range, Some((_, end)) if end + 1 == height) {
current_range = Some((current_range.unwrap().0, height));
} else {
if let Some((start, end)) = current_range.take() {
block_ranges.push(start..=end);
}
current_range = Some((height, height));
}
}
Err(_) => {
warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line));
continue;
}
} }
last_height = last_height.max(height);
new_blocks.push(parsed_block);
*last_line = line_idx;
} }
ScanResult { next_expected_height: last_height + 1, new_blocks } if let Some((start, end)) = current_range.take() {
} block_ranges.push(start..=end);
}
fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime { ScanResult {
OffsetDateTime::from_unix_timestamp_nanos((ts_sec as i128) * 1_000 * 1_000_000) path: path.to_path_buf(),
.expect("timestamp out of range") next_expected_height: last_height + 1,
new_blocks,
new_block_ranges: block_ranges,
}
} }
fn date_from_datetime(dt: OffsetDateTime) -> String { fn date_from_datetime(dt: OffsetDateTime) -> String {
dt.format(&format_description::parse("[year][month][day]").unwrap()).unwrap() dt.format(&format_description!("[year][month][day]")).unwrap()
} }
impl BlockSource for HlNodeBlockSource { impl BlockSource for HlNodeBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> { fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
Box::pin(async move { Box::pin(async move {
// Not a one liner (using .or) to include logs
if let Some(block) = self.try_collect_local_block(height).await { if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning locally synced block for @ Height [{height}]"); info!("Returning locally synced block for @ Height [{height}]");
Ok(block) Ok(block)
@ -119,8 +145,28 @@ impl BlockSource for HlNodeBlockSource {
}) })
} }
fn find_latest_block_number(&self) -> futures::future::BoxFuture<Option<u64>> { fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
self.fallback.find_latest_block_number() Box::pin(async move {
let Some(dir) = Self::find_latest_hourly_file(&self.local_ingest_dir) else {
warn!(
"No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir",
self.local_ingest_dir
);
return self.fallback.find_latest_block_number().await;
};
let mut file = File::open(&dir).expect("Failed to open hour file path");
let last_line = read_last_complete_line(&mut file);
let Ok((_, height)) = line_to_evm_block(&last_line) else {
warn!(
"Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir",
file
);
return self.fallback.find_latest_block_number().await;
};
info!("Latest block number: {} with path {}", height, dir.display());
Some(height)
})
} }
fn recommended_chunk_size(&self) -> u64 { fn recommended_chunk_size(&self) -> u64 {
@ -128,36 +174,31 @@ impl BlockSource for HlNodeBlockSource {
} }
} }
fn to_hourly(dt: OffsetDateTime) -> Result<OffsetDateTime, time::error::ComponentRange> { fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> String {
dt.replace_minute(0)?.replace_second(0)?.replace_nanosecond(0)
}
fn read_last_line(path: &Path) -> String {
const CHUNK_SIZE: u64 = 4096; const CHUNK_SIZE: u64 = 4096;
let mut file = std::fs::File::open(path).expect("Failed to open hour file path");
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); let mut buf = Vec::with_capacity(CHUNK_SIZE as usize);
let mut pos = file.seek(SeekFrom::End(0)).unwrap(); let mut pos = read.seek(SeekFrom::End(0)).unwrap();
let mut last_line: Vec<u8> = Vec::new(); let mut last_line: Vec<u8> = Vec::new();
// Read backwards in chunks until we find a newline or reach the start
while pos > 0 { while pos > 0 {
let read_size = std::cmp::min(pos, CHUNK_SIZE); let read_size = std::cmp::min(pos, CHUNK_SIZE);
buf.resize(read_size as usize, 0); buf.resize(read_size as usize, 0);
file.seek(SeekFrom::Start(pos - read_size)).unwrap(); read.seek(SeekFrom::Start(pos - read_size)).unwrap();
file.read_exact(&mut buf).unwrap(); read.read_exact(&mut buf).unwrap();
last_line = [buf.clone(), last_line].concat(); last_line = [buf.clone(), last_line].concat();
// Remove trailing newline
if last_line.ends_with(b"\n") { if last_line.ends_with(b"\n") {
last_line.pop(); last_line.pop();
} }
if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') {
// Found a newline, so the last line starts after this let candidate = &last_line[idx + 1..];
let start = idx + 1; if line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap()).is_ok() {
return String::from_utf8(last_line[start..].to_vec()).unwrap(); return String::from_utf8(candidate.to_vec()).unwrap();
}
last_line.truncate(idx);
} }
if pos < read_size { if pos < read_size {
@ -166,90 +207,98 @@ fn read_last_line(path: &Path) -> String {
pos -= read_size; pos -= read_size;
} }
// There is 0~1 lines in the entire file
String::from_utf8(last_line).unwrap() String::from_utf8(last_line).unwrap()
} }
impl HlNodeBlockSource { impl HlNodeBlockSource {
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> { async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
let mut u_cache = self.local_blocks_cache.lock().await; let mut u_cache = self.local_blocks_cache.lock().await;
u_cache.remove(&height) if let Some(block) = u_cache.cache.remove(&height) {
return Some(block);
}
let Some(path) = u_cache.ranges.get(&height).cloned() else {
return None;
};
info!("Loading block data from {:?}", path);
u_cache.load_scan_result(scan_hour_file(&path, &mut 0, height));
u_cache.cache.get(&height).cloned()
}
fn datetime_from_path(path: &Path) -> Option<OffsetDateTime> {
let dt_part = path.parent()?.file_name()?.to_str()?;
let hour_part = path.file_name()?.to_str()?;
let hour: u8 = hour_part.parse().ok()?;
Some(OffsetDateTime::new_utc(
Date::parse(&format!("{dt_part}"), &format_description!("[year][month][day]")).ok()?,
Time::from_hms(hour, 0, 0).ok()?,
))
}
fn all_hourly_files(root: &Path) -> Option<Vec<PathBuf>> {
let dir = root.join(HOURLY_SUBDIR);
let mut files = Vec::new();
for entry in std::fs::read_dir(dir).ok()? {
let file = entry.ok()?.path();
let subfiles: Vec<_> = std::fs::read_dir(&file)
.ok()?
.filter_map(|f| f.ok().map(|f| f.path()))
.filter(|p| Self::datetime_from_path(p).is_some())
.collect();
files.extend(subfiles);
}
files.sort();
Some(files)
}
fn find_latest_hourly_file(root: &Path) -> Option<PathBuf> {
Self::all_hourly_files(root)?.last().cloned()
} }
async fn try_backfill_local_blocks( async fn try_backfill_local_blocks(
root: &Path, root: &Path,
cache: &LocalBlocksCache, cache: &Arc<Mutex<LocalBlocksCache>>,
mut next_height: u64, cutoff_height: u64,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
fn parse_file_name(f: PathBuf) -> Option<(u64, PathBuf)> {
// Validate and returns sort key for hourly/<isoformat>/<0-23>
let file_name = f.file_name()?.to_str()?;
let Ok(file_name_num) = file_name.parse::<u64>() else {
warn!("Failed to parse file name: {:?}", f);
return None;
};
// Check if filename is numeric and 0..24
if !(0..=24).contains(&file_name_num) {
return None;
}
Some((file_name_num, f))
}
let mut u_cache = cache.lock().await; let mut u_cache = cache.lock().await;
// We assume that ISO format is sorted properly using naive string sort for subfile in Self::all_hourly_files(root).unwrap_or_default() {
let hourly_subdir = root.join(HOURLY_SUBDIR); let mut file = File::open(&subfile).expect("Failed to open hour file path");
let mut files: Vec<_> = std::fs::read_dir(hourly_subdir) let last_line = read_last_complete_line(&mut file);
.context("Failed to read hourly subdir")? if let Ok((_, height)) = line_to_evm_block(&last_line) {
.filter_map(|f| f.ok().map(|f| f.path())) if height < cutoff_height {
.collect(); continue;
files.sort();
for file in files {
let mut subfiles: Vec<_> = file
.read_dir()
.context("Failed to read hourly subdir")?
.filter_map(|f| f.ok().map(|f| f.path()))
.filter_map(parse_file_name)
.collect();
subfiles.sort();
for (_, subfile) in subfiles {
// Fast path: check the last line of the file
let last_line = read_last_line(&subfile);
if let Ok((_, height)) = line_to_evm_block(&last_line) {
if height < next_height {
continue;
}
} else {
warn!("Failed to parse last line of file, fallback to slow path: {:?}", file);
} }
} else {
let ScanResult { next_expected_height, new_blocks } = warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile);
scan_hour_file(&file, &mut 0, next_height);
for blk in new_blocks {
let EvmBlock::Reth115(b) = &blk.block;
u_cache.insert(b.header.header.number, blk);
}
next_height = next_expected_height;
} }
let mut scan_result = scan_hour_file(&subfile, &mut 0, cutoff_height);
// Only store the block ranges for now; actual block data will be loaded lazily later to optimize memory usage
scan_result.new_blocks.clear();
u_cache.load_scan_result(scan_result);
} }
info!("Backfilled {} blocks", u_cache.len()); info!("Backfilled {} blocks", u_cache.cache.len());
Ok(()) Ok(())
} }
async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) { async fn start_local_ingest_loop(&self, current_head: u64) {
let root = self.local_ingest_dir.to_owned(); let root = self.local_ingest_dir.to_owned();
let cache = self.local_blocks_cache.clone(); let cache = self.local_blocks_cache.clone();
tokio::spawn(async move { tokio::spawn(async move {
// hl-node backfill is for fast path; do not exit when it fails
let _ = Self::try_backfill_local_blocks(&root, &cache, current_head).await;
let mut next_height = current_head; let mut next_height = current_head;
let mut dt = to_hourly(datetime_from_timestamp(current_ts)).unwrap();
// Wait for the first hourly file to be created
let mut dt = loop {
if let Some(latest_file) = Self::find_latest_hourly_file(&root) {
break Self::datetime_from_path(&latest_file).unwrap();
}
tokio::time::sleep(TAIL_INTERVAL).await;
};
let mut hour = dt.hour(); let mut hour = dt.hour();
let mut day_str = date_from_datetime(dt); let mut day_str = date_from_datetime(dt);
@ -261,26 +310,15 @@ impl HlNodeBlockSource {
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
if hour_file.exists() { if hour_file.exists() {
let ScanResult { next_expected_height, new_blocks } = let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height);
scan_hour_file(&hour_file, &mut last_line, next_height); next_height = scan_result.next_expected_height;
if !new_blocks.is_empty() {
let mut u_cache = cache.lock().await; let mut u_cache = cache.lock().await;
for blk in new_blocks { u_cache.load_scan_result(scan_result);
let EvmBlock::Reth115(b) = &blk.block;
u_cache.insert(b.header.header.number, blk);
}
next_height = next_expected_height;
}
} }
// Decide whether the *current* hour file is closed (past) or
// still live. If its in the past by > 1 h, move to next hour;
// otherwise, keep tailing the same file.
let now = OffsetDateTime::now_utc(); let now = OffsetDateTime::now_utc();
// println!("Date Current {:?}", dt);
// println!("Now Current {:?}", now);
if dt + Duration::HOUR < now { if dt + Duration::HOUR < now {
dt += Duration::HOUR; dt += Duration::HOUR;
hour = dt.hour(); hour = dt.hour();
@ -298,29 +336,58 @@ impl HlNodeBlockSource {
}); });
} }
pub(crate) async fn run(&self) -> eyre::Result<()> { pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> {
let latest_block_number = self let _ = Self::try_backfill_local_blocks(
.fallback &self.local_ingest_dir,
.find_latest_block_number() &self.local_blocks_cache,
.await next_block_number,
.context("Failed to find latest block number")?; )
.await;
let EvmBlock::Reth115(latest_block) = self.start_local_ingest_loop(next_block_number).await;
self.fallback.collect_block(latest_block_number).await?.block;
let latest_block_ts = latest_block.header.header.timestamp;
self.start_local_ingest_loop(latest_block_number, latest_block_ts).await;
Ok(()) Ok(())
} }
pub async fn new(fallback: BlockSourceBoxed, local_ingest_dir: PathBuf) -> Self { pub async fn new(
fallback: BlockSourceBoxed,
local_ingest_dir: PathBuf,
next_block_number: u64,
) -> Self {
let block_source = HlNodeBlockSource { let block_source = HlNodeBlockSource {
fallback, fallback,
local_ingest_dir, local_ingest_dir,
local_blocks_cache: Arc::new(Mutex::new(LruMap::new(CACHE_SIZE))), local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())),
}; };
block_source.run().await.unwrap(); block_source.run(next_block_number).await.unwrap();
block_source block_source
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_datetime_from_path() {
let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4");
let dt = HlNodeBlockSource::datetime_from_path(path).unwrap();
println!("{:?}", dt);
}
#[tokio::test]
async fn test_backfill() {
let test_path = Path::new("/root/evm_block_and_receipts");
if !test_path.exists() {
return;
}
let cache = Arc::new(Mutex::new(LocalBlocksCache::new()));
HlNodeBlockSource::try_backfill_local_blocks(&test_path, &cache, 1000000).await.unwrap();
let u_cache = cache.lock().await;
println!("{:?}", u_cache.ranges);
assert_eq!(
u_cache.ranges.get(&9735058),
Some(&test_path.join(HOURLY_SUBDIR).join("20250729").join("22"))
);
}
}