mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: Add backfill support from local node
This currently does not support huge amount of backfill log within local node
This commit is contained in:
@ -1,16 +1,16 @@
|
|||||||
use std::{
|
use std::{
|
||||||
io::{BufRead, BufReader},
|
io::{BufRead, BufReader, Read, Seek, SeekFrom},
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use eyre::ContextCompat;
|
use eyre::{Context, ContextCompat};
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use reth_network::cache::LruMap;
|
use reth_network::cache::LruMap;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use time::{format_description, Duration, OffsetDateTime};
|
use time::{format_description, Duration, OffsetDateTime};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tracing::info;
|
use tracing::{info, warn};
|
||||||
|
|
||||||
use crate::node::types::{BlockAndReceipts, EvmBlock};
|
use crate::node::types::{BlockAndReceipts, EvmBlock};
|
||||||
|
|
||||||
@ -24,6 +24,8 @@ const HOURLY_SUBDIR: &str = "hourly";
|
|||||||
/// In normal situation, 0~1 blocks will be cached.
|
/// In normal situation, 0~1 blocks will be cached.
|
||||||
const CACHE_SIZE: u32 = 1000;
|
const CACHE_SIZE: u32 = 1000;
|
||||||
|
|
||||||
|
type LocalBlocksCache = Arc<Mutex<LruMap<u64, BlockAndReceipts>>>;
|
||||||
|
|
||||||
/// Block source that monitors the local ingest directory for the HL node.
|
/// Block source that monitors the local ingest directory for the HL node.
|
||||||
///
|
///
|
||||||
/// In certain situations, the [hl-node][ref] may offer lower latency compared to S3.
|
/// In certain situations, the [hl-node][ref] may offer lower latency compared to S3.
|
||||||
@ -37,7 +39,7 @@ const CACHE_SIZE: u32 = 1000;
|
|||||||
pub struct HlNodeBlockSource {
|
pub struct HlNodeBlockSource {
|
||||||
pub fallback: BlockSourceBoxed,
|
pub fallback: BlockSourceBoxed,
|
||||||
pub local_ingest_dir: PathBuf,
|
pub local_ingest_dir: PathBuf,
|
||||||
pub local_blocks_cache: Arc<Mutex<LruMap<u64, BlockAndReceipts>>>, // height → block
|
pub local_blocks_cache: LocalBlocksCache, // height → block
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
@ -48,6 +50,15 @@ struct ScanResult {
|
|||||||
new_blocks: Vec<BlockAndReceipts>,
|
new_blocks: Vec<BlockAndReceipts>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
|
||||||
|
let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts =
|
||||||
|
serde_json::from_str(line)?;
|
||||||
|
let height = match &parsed_block.block {
|
||||||
|
EvmBlock::Reth115(b) => b.header.header.number,
|
||||||
|
};
|
||||||
|
Ok((parsed_block, height))
|
||||||
|
}
|
||||||
|
|
||||||
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult {
|
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult {
|
||||||
// info!(
|
// info!(
|
||||||
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
|
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
|
||||||
@ -70,25 +81,17 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts =
|
let Ok((parsed_block, height)) = line_to_evm_block(&line) else {
|
||||||
serde_json::from_str(line).expect("Failed to parse local block and receipts");
|
warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line));
|
||||||
|
continue;
|
||||||
let height = match &parsed_block.block {
|
|
||||||
EvmBlock::Reth115(b) => {
|
|
||||||
let block_number = b.header.header.number;
|
|
||||||
// Another check to ensure not returning an older block
|
|
||||||
if block_number < start_height {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
block_number
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
// println!("Iterating block height {:?} | Line {}", height, line_idx);
|
if height < start_height {
|
||||||
if height >= start_height {
|
continue;
|
||||||
last_height = last_height.max(height);
|
|
||||||
new_blocks.push(parsed_block);
|
|
||||||
*last_line = line_idx;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
last_height = last_height.max(height);
|
||||||
|
new_blocks.push(parsed_block);
|
||||||
|
*last_line = line_idx;
|
||||||
}
|
}
|
||||||
|
|
||||||
ScanResult { next_expected_height: last_height + 1, new_blocks }
|
ScanResult { next_expected_height: last_height + 1, new_blocks }
|
||||||
@ -129,17 +132,125 @@ fn to_hourly(dt: OffsetDateTime) -> Result<OffsetDateTime, time::error::Componen
|
|||||||
dt.replace_minute(0)?.replace_second(0)?.replace_nanosecond(0)
|
dt.replace_minute(0)?.replace_second(0)?.replace_nanosecond(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn read_last_line(path: &Path) -> String {
|
||||||
|
const CHUNK_SIZE: u64 = 4096;
|
||||||
|
let mut file = std::fs::File::open(path).expect("Failed to open hour file path");
|
||||||
|
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize);
|
||||||
|
let mut pos = file.seek(SeekFrom::End(0)).unwrap();
|
||||||
|
let mut last_line: Vec<u8> = Vec::new();
|
||||||
|
|
||||||
|
// Read backwards in chunks until we find a newline or reach the start
|
||||||
|
while pos > 0 {
|
||||||
|
let read_size = std::cmp::min(pos, CHUNK_SIZE);
|
||||||
|
buf.resize(read_size as usize, 0);
|
||||||
|
|
||||||
|
file.seek(SeekFrom::Start(pos - read_size)).unwrap();
|
||||||
|
file.read_exact(&mut buf).unwrap();
|
||||||
|
|
||||||
|
last_line = [buf.clone(), last_line].concat();
|
||||||
|
|
||||||
|
// Remove trailing newline
|
||||||
|
if last_line.ends_with(b"\n") {
|
||||||
|
last_line.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') {
|
||||||
|
// Found a newline, so the last line starts after this
|
||||||
|
let start = idx + 1;
|
||||||
|
return String::from_utf8(last_line[start..].to_vec()).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
if pos < read_size {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pos -= read_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
// There is 0~1 lines in the entire file
|
||||||
|
String::from_utf8(last_line).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
impl HlNodeBlockSource {
|
impl HlNodeBlockSource {
|
||||||
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
|
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
|
||||||
let mut u_cache = self.local_blocks_cache.lock().await;
|
let mut u_cache = self.local_blocks_cache.lock().await;
|
||||||
u_cache.remove(&height)
|
u_cache.remove(&height)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn try_backfill_local_blocks(
|
||||||
|
root: &Path,
|
||||||
|
cache: &LocalBlocksCache,
|
||||||
|
mut next_height: u64,
|
||||||
|
) -> eyre::Result<()> {
|
||||||
|
fn parse_file_name(f: PathBuf) -> Option<(u64, PathBuf)> {
|
||||||
|
// Validate and returns sort key for hourly/<isoformat>/<0-23>
|
||||||
|
let file_name = f.file_name()?.to_str()?;
|
||||||
|
let Ok(file_name_num) = file_name.parse::<u64>() else {
|
||||||
|
warn!("Failed to parse file name: {:?}", f);
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Check if filename is numeric and 0..24
|
||||||
|
if !(0..=24).contains(&file_name_num) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some((file_name_num, f))
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut u_cache = cache.lock().await;
|
||||||
|
|
||||||
|
// We assume that ISO format is sorted properly using naive string sort
|
||||||
|
let hourly_subdir = root.join(HOURLY_SUBDIR);
|
||||||
|
let mut files: Vec<_> = std::fs::read_dir(hourly_subdir)
|
||||||
|
.context("Failed to read hourly subdir")?
|
||||||
|
.filter_map(|f| f.ok().map(|f| f.path()))
|
||||||
|
.collect();
|
||||||
|
files.sort();
|
||||||
|
|
||||||
|
for file in files {
|
||||||
|
let mut subfiles: Vec<_> = file
|
||||||
|
.read_dir()
|
||||||
|
.context("Failed to read hourly subdir")?
|
||||||
|
.filter_map(|f| f.ok().map(|f| f.path()))
|
||||||
|
.filter_map(parse_file_name)
|
||||||
|
.collect();
|
||||||
|
subfiles.sort();
|
||||||
|
|
||||||
|
for (file_name_num, subfile) in subfiles {
|
||||||
|
if file_name_num < next_height {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fast path: check the last line of the file
|
||||||
|
let last_line = read_last_line(&subfile);
|
||||||
|
if let Ok((_, height)) = line_to_evm_block(&last_line) {
|
||||||
|
if height < next_height {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!("Failed to parse last line of file, fallback to slow path: {:?}", file);
|
||||||
|
}
|
||||||
|
|
||||||
|
let ScanResult { next_expected_height, new_blocks } =
|
||||||
|
scan_hour_file(&file, &mut 0, next_height);
|
||||||
|
for blk in new_blocks {
|
||||||
|
let EvmBlock::Reth115(b) = &blk.block;
|
||||||
|
u_cache.insert(b.header.header.number, blk);
|
||||||
|
}
|
||||||
|
next_height = next_expected_height;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) {
|
async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) {
|
||||||
let root = self.local_ingest_dir.to_owned();
|
let root = self.local_ingest_dir.to_owned();
|
||||||
let cache = self.local_blocks_cache.clone();
|
let cache = self.local_blocks_cache.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
// hl-node backfill is for fast path; do not exit when it fails
|
||||||
|
let _ = Self::try_backfill_local_blocks(&root, &cache, current_head).await;
|
||||||
|
|
||||||
let mut next_height = current_head;
|
let mut next_height = current_head;
|
||||||
let mut dt = to_hourly(datetime_from_timestamp(current_ts)).unwrap();
|
let mut dt = to_hourly(datetime_from_timestamp(current_ts)).unwrap();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user