@ -1,14 +1,16 @@
use std ::{
fs ::File ,
io ::{ BufRead , BufReader , Read , Seek , SeekFrom } ,
ops ::RangeInclusive ,
path ::{ Path , PathBuf } ,
sync ::Arc ,
} ;
use eyre ::{ Context , ContextCompat } ;
use futures ::future ::BoxFuture ;
use reth_network ::cache ::LruMap ;
use rangemap ::RangeInclusiveMap ;
use serde ::Deserialize ;
use time ::{ format_description , Duration , OffsetDateTime } ;
use time ::{ macros :: format_description, Date , Duration , OffsetDateTime , Time };
use tokio ::sync ::Mutex ;
use tracing ::{ info , warn } ;
@ -16,38 +18,54 @@ use crate::node::types::{BlockAndReceipts, EvmBlock};
use super ::{ BlockSource , BlockSourceBoxed } ;
/// Poll interval when tailing an *open* hourly file.
const TAIL_INTERVAL : std ::time ::Duration = std ::time ::Duration ::from_millis ( 25 ) ;
/// Sub‑ directory that contains day folders (inside `local_ingest_dir`).
const HOURLY_SUBDIR : & str = " hourly " ;
/// Maximum number of blocks to cache blocks from hl-node.
/// In normal situation, 0~1 blocks will be cached.
const CACHE_SIZE : u32 = 1000 ;
type LocalBlocksCache = Arc < Mutex < LruMap < u64 , BlockAndReceipts > > > ;
#[ derive(Debug) ]
pub struct LocalBlocksCache {
cache : LruMap < u64 , BlockAndReceipts > ,
// Lightweight range map to track the ranges of blocks in the local ingest directory
ranges : RangeInclusiveMap < u64 , PathBuf > ,
}
impl LocalBlocksCache {
// 3660 blocks per hour
const CACHE_SIZE : u32 = 8000 ;
fn new ( ) -> Self {
Self {
cache : LruMap ::new ( Self ::CACHE_SIZE ) ,
ranges : RangeInclusiveMap ::new ( ) ,
}
}
fn load_scan_result ( & mut self , scan_result : ScanResult ) {
for blk in scan_result . new_blocks {
let EvmBlock ::Reth115 ( b ) = & blk . block ;
self . cache . insert ( b . header . header . number , blk ) ;
}
for range in scan_result . new_block_ranges {
self . ranges . insert ( range , scan_result . path . clone ( ) ) ;
}
}
}
/// Block source that monitors the local ingest directory for the HL node.
///
/// In certain situations, the [hl-node][ref] may offer lower latency compared to S3.
/// This block source caches blocks from the HL node to minimize latency,
/// while still falling back to [super::LocalBlockSource] or [super::S3BlockSource] when needed.
///
/// Originally introduced in https://github.com/hl-archive-node/nanoreth/pull/7
///
/// [ref]: https://github.com/hyperliquid-dex/node
#[ derive(Debug, Clone) ]
pub struct HlNodeBlockSource {
pub fallback : BlockSourceBoxed ,
pub local_ingest_dir : PathBuf ,
pub local_blocks_cache : LocalBlocksCache , // height → block
pub local_blocks_cache : Arc < Mutex < LocalBlocksCache> > , // height → block
}
#[ derive(Deserialize) ]
struct LocalBlockAndReceipts ( String , BlockAndReceipts ) ;
struct ScanResult {
path : PathBuf ,
next_expected_height : u64 ,
new_blocks : Vec < BlockAndReceipts > ,
new_block_ranges : Vec < RangeInclusive < u64 > > ,
}
fn line_to_evm_block ( line : & str ) -> serde_json ::Result < ( BlockAndReceipts , u64 ) > {
@ -60,56 +78,64 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)>
}
fn scan_hour_file ( path : & Path , last_line : & mut usize , start_height : u64 ) -> ScanResult {
// info!(
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
// path, start_height, last_line
// );
let file = std ::fs ::File ::open ( path ) . expect ( " Failed to open hour file path " ) ;
let file = File ::open ( path ) . expect ( " Failed to open hour file path " ) ;
let reader = BufReader ::new ( file ) ;
let mut new_blocks = Vec ::< BlockAndReceipts > ::new ( ) ;
let mut new_blocks = Vec ::new ( ) ;
let mut last_height = start_height ;
let lines : Vec < String > = reader . lines ( ) . collect ::< Result < _ , _ > > ( ) . unwrap ( ) ;
let skip = if * last_line = = 0 { 0 } else { * last_line - 1 } ;
let mut block_ranges : Vec < RangeInclusive < u64 > > = Vec ::new ( ) ;
let mut current_range : Option < ( u64 , u64 ) > = None ;
for ( line_idx , line ) in lines . iter ( ) . enumerate ( ) . skip ( skip ) {
// Safety check ensuring efficiency
if line_idx < * last_line {
continue ;
}
if line . trim ( ) . is_empty ( ) {
continue ;
}
let Ok ( ( parsed_block , height ) ) = line_to_evm_block ( & line ) else {
warn! ( " Failed to parse line: {}... " , line . get ( 0 .. 50 ) . unwrap_or ( line ) ) ;
continue ;
} ;
if height < start_height {
if line_idx < * last_line | | line . trim ( ) . is_empty ( ) {
continue ;
}
match line_to_evm_block ( line ) {
Ok ( ( parsed_block , height ) ) = > {
if height > = start_height {
last_height = last_height . max ( height ) ;
new_blocks . push ( parsed_block ) ;
* last_line = line_idx ;
}
if matches! ( current_range , Some ( ( _ , end ) ) if end + 1 = = height ) {
current_range = Some ( ( current_range . unwrap ( ) . 0 , height ) ) ;
} else {
if let Some ( ( start , end ) ) = current_range . take ( ) {
block_ranges . push ( start ..= end ) ;
}
current_range = Some ( ( height , height ) ) ;
}
}
Err ( _ ) = > {
warn! ( " Failed to parse line: {}... " , line . get ( 0 .. 50 ) . unwrap_or ( line ) ) ;
continue ;
}
}
}
ScanResult { next_expected_height : last_height + 1 , new_blocks }
}
if let Some ( ( start , end ) ) = current_range . take ( ) {
block_ranges . push ( start ..= end ) ;
}
fn datetime_from_timestamp ( ts_sec : u64 ) -> OffsetDateTime {
OffsetDateTime ::from_unix_timestamp_nanos ( ( ts_sec as i128 ) * 1_000 * 1_000_000 )
. expect ( " timestamp out of range " )
ScanResult {
path : path . to_path_buf ( ) ,
next_expected_height : last_height + 1 ,
new_blocks ,
new_block_ranges : block_ranges ,
}
}
fn date_from_datetime ( dt : OffsetDateTime ) -> String {
dt . format ( & format_description ::parse ( " [year][month][day] " ) . unwrap ( ) ). unwrap ( )
dt . format ( & format_description! ( " [year][month][day] " ) ) . unwrap ( )
}
impl BlockSource for HlNodeBlockSource {
fn collect_block ( & self , height : u64 ) -> BoxFuture < eyre ::Result < BlockAndReceipts > > {
Box ::pin ( async move {
// Not a one liner (using .or) to include logs
if let Some ( block ) = self . try_collect_local_block ( height ) . await {
info! ( " Returning locally synced block for @ Height [{height}] " ) ;
Ok ( block )
@ -119,8 +145,28 @@ impl BlockSource for HlNodeBlockSource {
} )
}
fn find_latest_block_number ( & self ) -> futures ::future :: BoxFuture< Option < u64 > > {
self . fallback . find_latest_block_number ( )
fn find_latest_block_number ( & self ) -> BoxFuture < Option < u64 > > {
Box ::pin ( async move {
let Some ( dir ) = Self ::find_latest_hourly_file ( & self . local_ingest_dir ) else {
warn! (
" No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir " ,
self . local_ingest_dir
) ;
return self . fallback . find_latest_block_number ( ) . await ;
} ;
let mut file = File ::open ( & dir ) . expect ( " Failed to open hour file path " ) ;
let last_line = read_last_complete_line ( & mut file ) ;
let Ok ( ( _ , height ) ) = line_to_evm_block ( & last_line ) else {
warn! (
" Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir " ,
file
) ;
return self . fallback . find_latest_block_number ( ) . await ;
} ;
info! ( " Latest block number: {} with path {} " , height , dir . display ( ) ) ;
Some ( height )
} )
}
fn recommended_chunk_size ( & self ) -> u64 {
@ -128,36 +174,31 @@ impl BlockSource for HlNodeBlockSource {
}
}
fn to_hourly ( dt : OffsetDateTime ) -> Result < OffsetDateTime , time ::error ::ComponentRange > {
dt . replace_minute ( 0 ) ? . replace_second ( 0 ) ? . replace_nanosecond ( 0 )
}
fn read_last_line ( path : & Path ) -> String {
fn read_last_complete_line < R : Read + Seek > ( read : & mut R ) -> String {
const CHUNK_SIZE : u64 = 4096 ;
let mut file = std ::fs ::File ::open ( path ) . expect ( " Failed to open hour file path " ) ;
let mut buf = Vec ::with_capacity ( CHUNK_SIZE as usize ) ;
let mut pos = file . seek ( SeekFrom ::End ( 0 ) ) . unwrap ( ) ;
let mut pos = read . seek ( SeekFrom ::End ( 0 ) ) . unwrap ( ) ;
let mut last_line : Vec < u8 > = Vec ::new ( ) ;
// Read backwards in chunks until we find a newline or reach the start
while pos > 0 {
let read_size = std ::cmp ::min ( pos , CHUNK_SIZE ) ;
buf . resize ( read_size as usize , 0 ) ;
file . seek ( SeekFrom ::Start ( pos - read_size ) ) . unwrap ( ) ;
file . read_exact ( & mut buf ) . unwrap ( ) ;
read . seek ( SeekFrom ::Start ( pos - read_size ) ) . unwrap ( ) ;
read . read_exact ( & mut buf ) . unwrap ( ) ;
last_line = [ buf . clone ( ) , last_line ] . concat ( ) ;
// Remove trailing newline
if last_line . ends_with ( b " \n " ) {
last_line . pop ( ) ;
}
if let Some ( idx ) = last_line . iter ( ) . rposition ( | & b | b = = b '\n' ) {
// Found a newline, so the last line starts after this
let start = idx + 1 ;
return String ::from_utf8 ( last_line [ start .. ] . to_vec ( ) ) . unwrap ( ) ;
let candidate = & last_ line[ idx + 1 .. ] ;
if line_to_evm_block ( & String ::from_utf8 ( candidate . to_vec ( ) ) . unwrap ( ) ) . is_ok ( ) {
return String ::from_utf8 ( candidate . to_vec ( ) ) . unwrap ( ) ;
}
last_line . truncate ( idx ) ;
}
if pos < read_size {
@ -166,90 +207,98 @@ fn read_last_line(path: &Path) -> String {
pos - = read_size ;
}
// There is 0~1 lines in the entire file
String ::from_utf8 ( last_line ) . unwrap ( )
}
impl HlNodeBlockSource {
async fn try_collect_local_block ( & self , height : u64 ) -> Option < BlockAndReceipts > {
let mut u_cache = self . local_blocks_cache . lock ( ) . await ;
u_ cache. remove ( & height )
if let Some ( block ) = u_cache . cache . remove ( & height ) {
return Some ( block ) ;
}
let Some ( path ) = u_cache . ranges . get ( & height ) . cloned ( ) else {
return None ;
} ;
info! ( " Loading block data from {:?} " , path ) ;
u_cache . load_scan_result ( scan_hour_file ( & path , & mut 0 , height ) ) ;
u_cache . cache . get ( & height ) . cloned ( )
}
fn datetime_from_path ( path : & Path ) -> Option < OffsetDateTime > {
let dt_part = path . parent ( ) ? . file_name ( ) ? . to_str ( ) ? ;
let hour_part = path . file_name ( ) ? . to_str ( ) ? ;
let hour : u8 = hour_part . parse ( ) . ok ( ) ? ;
Some ( OffsetDateTime ::new_utc (
Date ::parse ( & format! ( " {dt_part} " ) , & format_description! ( " [year][month][day] " ) ) . ok ( ) ? ,
Time ::from_hms ( hour , 0 , 0 ) . ok ( ) ? ,
) )
}
fn all_hourly_files ( root : & Path ) -> Option < Vec < PathBuf > > {
let dir = root . join ( HOURLY_SUBDIR ) ;
let mut files = Vec ::new ( ) ;
for entry in std ::fs ::read_dir ( dir ) . ok ( ) ? {
let file = entry . ok ( ) ? . path ( ) ;
let subfiles : Vec < _ > = std ::fs ::read_dir ( & file )
. ok ( ) ?
. filter_map ( | f | f . ok ( ) . map ( | f | f . path ( ) ) )
. filter ( | p | Self ::datetime_from_path ( p ) . is_some ( ) )
. collect ( ) ;
files . extend ( subfiles ) ;
}
files . sort ( ) ;
Some ( files )
}
fn find_latest_hourly_file ( root : & Path ) -> Option < PathBuf > {
Self ::all_hourly_files ( root ) ? . last ( ) . cloned ( )
}
async fn try_backfill_local_blocks (
root : & Path ,
cache : & LocalBlocksCache ,
mut next _height : u64 ,
cache : & Arc < Mutex < LocalBlocksCache> > ,
cutoff _height : u64 ,
) -> eyre ::Result < ( ) > {
fn parse_file_name ( f : PathBuf ) -> Option < ( u64 , PathBuf ) > {
// Validate and returns sort key for hourly/<isoformat>/<0-23>
let file_name = f . file_name ( ) ? . to_str ( ) ? ;
let Ok ( file_name_num ) = file_name . parse ::< u64 > ( ) else {
warn! ( " Failed to parse file name: {:?} " , f ) ;
return None ;
} ;
// Check if filename is numeric and 0..24
if ! ( 0 ..= 24 ) . contains ( & file_name_num ) {
return None ;
}
Some ( ( file_name_num , f ) )
}
let mut u_cache = cache . lock ( ) . await ;
// We assume that ISO format is sorted properly using naive string sort
let hourly_subdir = root . join ( HOURLY_SUBDIR ) ;
let mut files : Vec < _ > = std ::fs ::read_dir ( hourly_subdir )
. context ( " Failed to read hourly subdir " ) ?
. filter_map ( | f | f . ok ( ) . map ( | f | f . path ( ) ) )
. collect ( ) ;
files . sort ( ) ;
for file in files {
let mut subfiles : Vec < _ > = file
. read_dir ( )
. context ( " Failed to read hourly subdir " ) ?
. filter_map ( | f | f . ok ( ) . map ( | f | f . path ( ) ) )
. filter_map ( parse_file_name )
. collect ( ) ;
subfiles . sort ( ) ;
for ( _ , subfile ) in subfiles {
// Fast path: check the last line of the file
let last_line = read_last_line ( & subfile ) ;
for subfile in Self ::all_hourly_files ( root ) . unwrap_or_default ( ) {
let mut file = File ::open ( & subfile ) . expect ( " Failed to open hour file path " ) ;
let last_line = read_last_complete_line ( & mut file ) ;
if let Ok ( ( _ , height ) ) = line_to_evm_block ( & last_line ) {
if height < next _height {
if height < cutoff _height {
continue ;
}
} else {
warn! ( " Failed to parse last line of file, fallback to slow path: {:?} " , file ) ;
warn! ( " Failed to parse last line of file, fallback to slow path: {:?} " , sub file) ;
}
let S canR esult { next_expected_height , new_blocks } =
scan_hour_file ( & file , & mut 0 , next_height ) ;
for blk in new_blocks {
let EvmBlock ::Reth115 ( b ) = & blk . block ;
u_cache . insert ( b . header . header . number , blk ) ;
let mut s can_r esult = scan_hour_file ( & subfile , & mut 0 , cutoff_height ) ;
// Only store the block ranges for now; actual block data will be loaded lazily later to optimize memory usage
scan_result . new_blocks . clear ( ) ;
u_cache . load_scan_result ( scan_result ) ;
}
next_height = next_expected_height ;
}
}
info! ( " Backfilled {} blocks " , u_cache . len ( ) ) ;
info! ( " Backfilled {} blocks " , u_cache . cache . len ( ) ) ;
Ok ( ( ) )
}
async fn start_local_ingest_loop ( & self , current_head : u64 , current_ts : u64 ) {
async fn start_local_ingest_loop ( & self , current_head : u64 ) {
let root = self . local_ingest_dir . to_owned ( ) ;
let cache = self . local_blocks_cache . clone ( ) ;
tokio ::spawn ( async move {
// hl-node backfill is for fast path; do not exit when it fails
let _ = Self ::try_backfill_local_blocks ( & root , & cache , current_head ) . await ;
let mut next_height = current_head ;
let mut dt = to_hourly ( datetime_from_timestamp ( current_ts ) ) . unwrap ( ) ;
// Wait for the first hourly file to be created
let mut dt = loop {
if let Some ( latest_file ) = Self ::find_latest_hourly_file ( & root ) {
break Self ::datetime_from_path ( & latest_file ) . unwrap ( ) ;
}
tokio ::time ::sleep ( TAIL_INTERVAL ) . await ;
} ;
let mut hour = dt . hour ( ) ;
let mut day_str = date_from_datetime ( dt ) ;
@ -261,26 +310,15 @@ impl HlNodeBlockSource {
let hour_file = root . join ( HOURLY_SUBDIR ) . join ( & day_str ) . join ( format! ( " {hour} " ) ) ;
if hour_file . exists ( ) {
let S canR esult { next_expected_height , new_blocks } =
scan_hour_file ( & hour_file , & mut last_line , next _height ) ;
if ! new_blocks . is_empty ( ) {
let s can_r esult = scan_hour_file ( & hour_file , & mut last_line , next_height ) ;
next_height = scan_result . next_expected _height ;
let mut u_cache = cache . lock ( ) . await ;
for blk in new_blocks {
let EvmBlock ::Reth115 ( b ) = & blk . block ;
u_cache . insert ( b . header . header . number , blk ) ;
}
next_height = next_expected_height ;
}
u_cache . load_scan_result ( scan_result ) ;
}
// Decide whether the *current* hour file is closed (past) or
// still live. If it’ s in the past by > 1 h, move to next hour;
// otherwise, keep tailing the same file.
let now = OffsetDateTime ::now_utc ( ) ;
// println!("Date Current {:?}", dt);
// println!("Now Current {:?}", now);
if dt + Duration ::HOUR < now {
dt + = Duration ::HOUR ;
hour = dt . hour ( ) ;
@ -298,29 +336,58 @@ impl HlNodeBlockSource {
} ) ;
}
pub ( crate ) async fn run ( & self ) -> eyre ::Result < ( ) > {
let latest_block_number = s elf
. fallback
. find_latest_block_number ( )
. await
. context ( " Failed to find latest block number " ) ? ;
pub ( crate ) async fn run ( & self , next_block_number : u64 ) -> eyre ::Result < ( ) > {
let _ = S elf::try_backfill_local_blocks (
& self . local_ingest_dir ,
& self . local_blocks_cache ,
next_block_number ,
)
. await ;
let EvmBlock ::Reth115 ( latest_block ) =
self . fallback . collect_block ( latest_block_number ) . await ? . block ;
let latest_block_ts = latest_block . header . header . timestamp ;
self . start_local_ingest_loop ( latest_block_number , latest_block_ts ) . await ;
self . start_local_ingest_loop ( next_block_number ) . await ;
Ok ( ( ) )
}
pub async fn new ( fallback : BlockSourceBoxed , local_ingest_dir : PathBuf ) -> Self {
pub async fn new (
fallback : BlockSourceBoxed ,
local_ingest_dir : PathBuf ,
next_block_number : u64 ,
) -> Self {
let block_source = HlNodeBlockSource {
fallback ,
local_ingest_dir ,
local_blocks_cache : Arc ::new ( Mutex ::new ( LruMap ::new ( CACHE_SIZE ) ) ) ,
local_blocks_cache : Arc ::new ( Mutex ::new ( LocalBlocksCache ::new ( ) ) ) ,
} ;
block_source . run ( ) . await . unwrap ( ) ;
block_source . run ( next_block_number ) . await . unwrap ( ) ;
block_source
}
}
#[ cfg(test) ]
mod tests {
use super ::* ;
#[ test ]
fn test_datetime_from_path ( ) {
let path = Path ::new ( " /home/username/hl/data/evm_block_and_receipts/hourly/20250731/4 " ) ;
let dt = HlNodeBlockSource ::datetime_from_path ( path ) . unwrap ( ) ;
println! ( " {:?} " , dt ) ;
}
#[ tokio::test ]
async fn test_backfill ( ) {
let test_path = Path ::new ( " /root/evm_block_and_receipts " ) ;
if ! test_path . exists ( ) {
return ;
}
let cache = Arc ::new ( Mutex ::new ( LocalBlocksCache ::new ( ) ) ) ;
HlNodeBlockSource ::try_backfill_local_blocks ( & test_path , & cache , 1000000 ) . await . unwrap ( ) ;
let u_cache = cache . lock ( ) . await ;
println! ( " {:?} " , u_cache . ranges ) ;
assert_eq! (
u_cache . ranges . get ( & 9735058 ) ,
Some ( & test_path . join ( HOURLY_SUBDIR ) . join ( " 20250729 " ) . join ( " 22 " ) )
) ;
}
}