mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
fix: Reduce fallback usage in hl-node ingestion
This commit is contained in:
@ -1,11 +1,11 @@
|
|||||||
use std::{
|
use std::{
|
||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
|
fs::File,
|
||||||
io::{BufRead, BufReader, Read, Seek, SeekFrom},
|
io::{BufRead, BufReader, Read, Seek, SeekFrom},
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use eyre::Context;
|
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use time::{format_description, Duration, OffsetDateTime};
|
use time::{format_description, Duration, OffsetDateTime};
|
||||||
@ -16,22 +16,12 @@ use crate::node::types::{BlockAndReceipts, EvmBlock};
|
|||||||
|
|
||||||
use super::{BlockSource, BlockSourceBoxed};
|
use super::{BlockSource, BlockSourceBoxed};
|
||||||
|
|
||||||
/// Poll interval when tailing an *open* hourly file.
|
|
||||||
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
|
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
|
||||||
/// Sub‑directory that contains day folders (inside `local_ingest_dir`).
|
|
||||||
const HOURLY_SUBDIR: &str = "hourly";
|
const HOURLY_SUBDIR: &str = "hourly";
|
||||||
|
|
||||||
type LocalBlocksCache = Arc<Mutex<HashMap<u64, BlockAndReceipts>>>;
|
type LocalBlocksCache = Arc<Mutex<HashMap<u64, BlockAndReceipts>>>;
|
||||||
|
|
||||||
/// Block source that monitors the local ingest directory for the HL node.
|
/// Block source that monitors the local ingest directory for the HL node.
|
||||||
///
|
|
||||||
/// In certain situations, the [hl-node][ref] may offer lower latency compared to S3.
|
|
||||||
/// This block source caches blocks from the HL node to minimize latency,
|
|
||||||
/// while still falling back to [super::LocalBlockSource] or [super::S3BlockSource] when needed.
|
|
||||||
///
|
|
||||||
/// Originally introduced in https://github.com/hl-archive-node/nanoreth/pull/7
|
|
||||||
///
|
|
||||||
/// [ref]: https://github.com/hyperliquid-dex/node
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct HlNodeBlockSource {
|
pub struct HlNodeBlockSource {
|
||||||
pub fallback: BlockSourceBoxed,
|
pub fallback: BlockSourceBoxed,
|
||||||
@ -57,46 +47,34 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)>
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult {
|
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult {
|
||||||
// info!(
|
let file = File::open(path).expect("Failed to open hour file path");
|
||||||
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
|
|
||||||
// path, start_height, last_line
|
|
||||||
// );
|
|
||||||
let file = std::fs::File::open(path).expect("Failed to open hour file path");
|
|
||||||
let reader = BufReader::new(file);
|
let reader = BufReader::new(file);
|
||||||
|
|
||||||
let mut new_blocks = Vec::<BlockAndReceipts>::new();
|
let mut new_blocks = Vec::new();
|
||||||
let mut last_height = start_height;
|
let mut last_height = start_height;
|
||||||
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
|
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
|
||||||
let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
|
let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
|
||||||
|
|
||||||
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
|
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
|
||||||
// Safety check ensuring efficiency
|
if line_idx < *last_line || line.trim().is_empty() {
|
||||||
if line_idx < *last_line {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if line.trim().is_empty() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let Ok((parsed_block, height)) = line_to_evm_block(line) else {
|
|
||||||
warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line));
|
|
||||||
continue;
|
|
||||||
};
|
|
||||||
if height < start_height {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
match line_to_evm_block(line) {
|
||||||
|
Ok((parsed_block, height)) if height >= start_height => {
|
||||||
last_height = last_height.max(height);
|
last_height = last_height.max(height);
|
||||||
new_blocks.push(parsed_block);
|
new_blocks.push(parsed_block);
|
||||||
*last_line = line_idx;
|
*last_line = line_idx;
|
||||||
}
|
}
|
||||||
|
Ok(_) => continue,
|
||||||
ScanResult { next_expected_height: last_height + 1, new_blocks }
|
Err(_) => {
|
||||||
|
warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime {
|
ScanResult { next_expected_height: last_height + 1, new_blocks }
|
||||||
OffsetDateTime::from_unix_timestamp_nanos((ts_sec as i128) * 1_000 * 1_000_000)
|
|
||||||
.expect("timestamp out of range")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn date_from_datetime(dt: OffsetDateTime) -> String {
|
fn date_from_datetime(dt: OffsetDateTime) -> String {
|
||||||
@ -106,7 +84,6 @@ fn date_from_datetime(dt: OffsetDateTime) -> String {
|
|||||||
impl BlockSource for HlNodeBlockSource {
|
impl BlockSource for HlNodeBlockSource {
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
|
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
// Not a one liner (using .or) to include logs
|
|
||||||
if let Some(block) = self.try_collect_local_block(height).await {
|
if let Some(block) = self.try_collect_local_block(height).await {
|
||||||
info!("Returning locally synced block for @ Height [{height}]");
|
info!("Returning locally synced block for @ Height [{height}]");
|
||||||
Ok(block)
|
Ok(block)
|
||||||
@ -116,8 +93,24 @@ impl BlockSource for HlNodeBlockSource {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn find_latest_block_number(&self) -> futures::future::BoxFuture<Option<u64>> {
|
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
|
||||||
self.fallback.find_latest_block_number()
|
Box::pin(async move {
|
||||||
|
let Some(dir) = Self::find_latest_hourly_file(&self.local_ingest_dir) else {
|
||||||
|
warn!(
|
||||||
|
"No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir",
|
||||||
|
self.local_ingest_dir
|
||||||
|
);
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
let mut file = File::open(&dir).expect("Failed to open hour file path");
|
||||||
|
let last_line = read_last_complete_line(&mut file);
|
||||||
|
let Ok((_, height)) = line_to_evm_block(&last_line) else {
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
info!("Latest block number: {} with path {}", height, dir.display());
|
||||||
|
Some(height)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recommended_chunk_size(&self) -> u64 {
|
fn recommended_chunk_size(&self) -> u64 {
|
||||||
@ -125,36 +118,31 @@ impl BlockSource for HlNodeBlockSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn to_hourly(dt: OffsetDateTime) -> Result<OffsetDateTime, time::error::ComponentRange> {
|
fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> String {
|
||||||
dt.replace_minute(0)?.replace_second(0)?.replace_nanosecond(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_last_line(path: &Path) -> String {
|
|
||||||
const CHUNK_SIZE: u64 = 4096;
|
const CHUNK_SIZE: u64 = 4096;
|
||||||
let mut file = std::fs::File::open(path).expect("Failed to open hour file path");
|
|
||||||
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize);
|
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize);
|
||||||
let mut pos = file.seek(SeekFrom::End(0)).unwrap();
|
let mut pos = read.seek(SeekFrom::End(0)).unwrap();
|
||||||
let mut last_line: Vec<u8> = Vec::new();
|
let mut last_line: Vec<u8> = Vec::new();
|
||||||
|
|
||||||
// Read backwards in chunks until we find a newline or reach the start
|
|
||||||
while pos > 0 {
|
while pos > 0 {
|
||||||
let read_size = std::cmp::min(pos, CHUNK_SIZE);
|
let read_size = std::cmp::min(pos, CHUNK_SIZE);
|
||||||
buf.resize(read_size as usize, 0);
|
buf.resize(read_size as usize, 0);
|
||||||
|
|
||||||
file.seek(SeekFrom::Start(pos - read_size)).unwrap();
|
read.seek(SeekFrom::Start(pos - read_size)).unwrap();
|
||||||
file.read_exact(&mut buf).unwrap();
|
read.read_exact(&mut buf).unwrap();
|
||||||
|
|
||||||
last_line = [buf.clone(), last_line].concat();
|
last_line = [buf.clone(), last_line].concat();
|
||||||
|
|
||||||
// Remove trailing newline
|
|
||||||
if last_line.ends_with(b"\n") {
|
if last_line.ends_with(b"\n") {
|
||||||
last_line.pop();
|
last_line.pop();
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') {
|
if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') {
|
||||||
// Found a newline, so the last line starts after this
|
let candidate = &last_line[idx + 1..];
|
||||||
let start = idx + 1;
|
if line_to_evm_block(&String::from_utf8(candidate.to_vec()).unwrap()).is_ok() {
|
||||||
return String::from_utf8(last_line[start..].to_vec()).unwrap();
|
return String::from_utf8(candidate.to_vec()).unwrap();
|
||||||
|
}
|
||||||
|
last_line.truncate(idx);
|
||||||
}
|
}
|
||||||
|
|
||||||
if pos < read_size {
|
if pos < read_size {
|
||||||
@ -163,7 +151,6 @@ fn read_last_line(path: &Path) -> String {
|
|||||||
pos -= read_size;
|
pos -= read_size;
|
||||||
}
|
}
|
||||||
|
|
||||||
// There is 0~1 lines in the entire file
|
|
||||||
String::from_utf8(last_line).unwrap()
|
String::from_utf8(last_line).unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -173,57 +160,52 @@ impl HlNodeBlockSource {
|
|||||||
u_cache.remove(&height)
|
u_cache.remove(&height)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn datetime_from_path(path: &Path) -> Option<OffsetDateTime> {
|
||||||
|
let dt_part = path.parent()?.parent()?.file_name()?.to_str()?;
|
||||||
|
let hour_part = path.file_name()?.to_str()?;
|
||||||
|
let dt = OffsetDateTime::parse(
|
||||||
|
dt_part,
|
||||||
|
&format_description::parse("[year][month][day]").unwrap(),
|
||||||
|
)
|
||||||
|
.ok()?;
|
||||||
|
let hour: u8 = hour_part.parse().ok()?;
|
||||||
|
let dt = dt.replace_hour(hour).ok()?;
|
||||||
|
Some(dt)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn all_hourly_files(root: &Path) -> Option<Vec<PathBuf>> {
|
||||||
|
let dir = root.join(HOURLY_SUBDIR);
|
||||||
|
let mut files = Vec::new();
|
||||||
|
for entry in std::fs::read_dir(dir).ok()? {
|
||||||
|
let file = entry.ok()?.path();
|
||||||
|
let subfiles: Vec<_> =
|
||||||
|
std::fs::read_dir(&file).ok()?.filter_map(|f| f.ok().map(|f| f.path())).collect();
|
||||||
|
files.extend(subfiles);
|
||||||
|
}
|
||||||
|
files.sort();
|
||||||
|
Some(files)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_latest_hourly_file(root: &Path) -> Option<PathBuf> {
|
||||||
|
Self::all_hourly_files(root)?.last().cloned()
|
||||||
|
}
|
||||||
|
|
||||||
async fn try_backfill_local_blocks(
|
async fn try_backfill_local_blocks(
|
||||||
root: &Path,
|
root: &Path,
|
||||||
cache: &LocalBlocksCache,
|
cache: &LocalBlocksCache,
|
||||||
mut next_height: u64,
|
mut next_height: u64,
|
||||||
) -> eyre::Result<()> {
|
) -> eyre::Result<()> {
|
||||||
fn parse_file_name(f: PathBuf) -> Option<(u64, PathBuf)> {
|
|
||||||
// Validate and returns sort key for hourly/<isoformat>/<0-23>
|
|
||||||
let file_name = f.file_name()?.to_str()?;
|
|
||||||
let Ok(file_name_num) = file_name.parse::<u64>() else {
|
|
||||||
warn!("Failed to parse file name: {:?}", f);
|
|
||||||
return None;
|
|
||||||
};
|
|
||||||
|
|
||||||
// Check if filename is numeric and 0..24
|
|
||||||
if !(0..=24).contains(&file_name_num) {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
Some((file_name_num, f))
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut u_cache = cache.lock().await;
|
let mut u_cache = cache.lock().await;
|
||||||
|
|
||||||
// We assume that ISO format is sorted properly using naive string sort
|
for subfile in Self::all_hourly_files(root).unwrap_or_default() {
|
||||||
let hourly_subdir = root.join(HOURLY_SUBDIR);
|
let mut file = File::open(&subfile).expect("Failed to open hour file path");
|
||||||
let mut files: Vec<_> = std::fs::read_dir(hourly_subdir)
|
let last_line = read_last_complete_line(&mut file);
|
||||||
.context("Failed to read hourly subdir")?
|
|
||||||
.filter_map(|f| f.ok().map(|f| f.path()))
|
|
||||||
.collect();
|
|
||||||
files.sort();
|
|
||||||
|
|
||||||
for file in files {
|
|
||||||
let mut subfiles: Vec<_> = file
|
|
||||||
.read_dir()
|
|
||||||
.context("Failed to read hourly subdir")?
|
|
||||||
.filter_map(|f| f.ok().map(|f| f.path()))
|
|
||||||
.filter_map(parse_file_name)
|
|
||||||
.collect();
|
|
||||||
subfiles.sort();
|
|
||||||
|
|
||||||
for (_, subfile) in subfiles {
|
|
||||||
// Fast path: check the last line of the file
|
|
||||||
let last_line = read_last_line(&subfile);
|
|
||||||
if let Ok((_, height)) = line_to_evm_block(&last_line) {
|
if let Ok((_, height)) = line_to_evm_block(&last_line) {
|
||||||
if height < next_height {
|
if height < next_height {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
warn!(
|
warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile);
|
||||||
"Failed to parse last line of file, fallback to slow path: {:?}",
|
|
||||||
subfile
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let ScanResult { next_expected_height, new_blocks } =
|
let ScanResult { next_expected_height, new_blocks } =
|
||||||
@ -234,22 +216,25 @@ impl HlNodeBlockSource {
|
|||||||
}
|
}
|
||||||
next_height = next_expected_height;
|
next_height = next_expected_height;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
info!("Backfilled {} blocks", u_cache.len());
|
info!("Backfilled {} blocks", u_cache.len());
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) {
|
async fn start_local_ingest_loop(&self, current_head: u64) {
|
||||||
let root = self.local_ingest_dir.to_owned();
|
let root = self.local_ingest_dir.to_owned();
|
||||||
let cache = self.local_blocks_cache.clone();
|
let cache = self.local_blocks_cache.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// hl-node backfill is for fast path; do not exit when it fails
|
|
||||||
let _ = Self::try_backfill_local_blocks(&root, &cache, current_head).await;
|
|
||||||
|
|
||||||
let mut next_height = current_head;
|
let mut next_height = current_head;
|
||||||
let mut dt = to_hourly(datetime_from_timestamp(current_ts)).unwrap();
|
|
||||||
|
// Wait for the first hourly file to be created
|
||||||
|
let mut dt = loop {
|
||||||
|
if let Some(latest_file) = Self::find_latest_hourly_file(&root) {
|
||||||
|
break Self::datetime_from_path(&latest_file).unwrap();
|
||||||
|
}
|
||||||
|
tokio::time::sleep(TAIL_INTERVAL).await;
|
||||||
|
};
|
||||||
|
|
||||||
let mut hour = dt.hour();
|
let mut hour = dt.hour();
|
||||||
let mut day_str = date_from_datetime(dt);
|
let mut day_str = date_from_datetime(dt);
|
||||||
@ -273,14 +258,8 @@ impl HlNodeBlockSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decide whether the *current* hour file is closed (past) or
|
|
||||||
// still live. If it’s in the past by > 1 h, move to next hour;
|
|
||||||
// otherwise, keep tailing the same file.
|
|
||||||
let now = OffsetDateTime::now_utc();
|
let now = OffsetDateTime::now_utc();
|
||||||
|
|
||||||
// println!("Date Current {:?}", dt);
|
|
||||||
// println!("Now Current {:?}", now);
|
|
||||||
|
|
||||||
if dt + Duration::HOUR < now {
|
if dt + Duration::HOUR < now {
|
||||||
dt += Duration::HOUR;
|
dt += Duration::HOUR;
|
||||||
hour = dt.hour();
|
hour = dt.hour();
|
||||||
@ -299,12 +278,14 @@ impl HlNodeBlockSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> {
|
pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> {
|
||||||
let EvmBlock::Reth115(latest_block) =
|
let _ = Self::try_backfill_local_blocks(
|
||||||
self.fallback.collect_block(next_block_number).await?.block;
|
&self.local_ingest_dir,
|
||||||
|
&self.local_blocks_cache,
|
||||||
|
next_block_number,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
let latest_block_ts = latest_block.header.header.timestamp;
|
self.start_local_ingest_loop(next_block_number).await;
|
||||||
|
|
||||||
self.start_local_ingest_loop(next_block_number, latest_block_ts).await;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user