mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat: Support range-based backfill for hl-node ingestion
This commit is contained in:
7
Cargo.lock
generated
7
Cargo.lock
generated
@ -6504,6 +6504,12 @@ dependencies = [
|
|||||||
"rand_core 0.9.3",
|
"rand_core 0.9.3",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rangemap"
|
||||||
|
version = "1.6.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f93e7e49bb0bf967717f7bd674458b3d6b0c5f48ec7e3038166026a69fc22223"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ratatui"
|
name = "ratatui"
|
||||||
version = "0.29.0"
|
version = "0.29.0"
|
||||||
@ -9308,6 +9314,7 @@ dependencies = [
|
|||||||
"lz4_flex",
|
"lz4_flex",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"parking_lot",
|
"parking_lot",
|
||||||
|
"rangemap",
|
||||||
"rayon",
|
"rayon",
|
||||||
"reth",
|
"reth",
|
||||||
"reth-basic-payload-builder",
|
"reth-basic-payload-builder",
|
||||||
|
|||||||
@ -112,6 +112,7 @@ aws-sdk-s3 = "1.93.0"
|
|||||||
aws-config = "1.8.0"
|
aws-config = "1.8.0"
|
||||||
rayon = "1.7"
|
rayon = "1.7"
|
||||||
time = "0.3.41"
|
time = "0.3.41"
|
||||||
|
rangemap = "=1.6.0"
|
||||||
|
|
||||||
|
|
||||||
[target.'cfg(unix)'.dependencies]
|
[target.'cfg(unix)'.dependencies]
|
||||||
@ -162,3 +163,6 @@ client = [
|
|||||||
"jsonrpsee/async-client",
|
"jsonrpsee/async-client",
|
||||||
"reth-rpc-eth-api/client",
|
"reth-rpc-eth-api/client",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[profile.test]
|
||||||
|
inherits = "release"
|
||||||
|
|||||||
@ -2,11 +2,13 @@ use std::{
|
|||||||
collections::HashMap,
|
collections::HashMap,
|
||||||
fs::File,
|
fs::File,
|
||||||
io::{BufRead, BufReader, Read, Seek, SeekFrom},
|
io::{BufRead, BufReader, Read, Seek, SeekFrom},
|
||||||
|
ops::RangeInclusive,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
|
use rangemap::RangeInclusiveMap;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use time::{macros::format_description, Date, Duration, OffsetDateTime, Time};
|
use time::{macros::format_description, Date, Duration, OffsetDateTime, Time};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
@ -19,22 +21,45 @@ use super::{BlockSource, BlockSourceBoxed};
|
|||||||
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
|
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
|
||||||
const HOURLY_SUBDIR: &str = "hourly";
|
const HOURLY_SUBDIR: &str = "hourly";
|
||||||
|
|
||||||
type LocalBlocksCache = Arc<Mutex<HashMap<u64, BlockAndReceipts>>>;
|
#[derive(Debug)]
|
||||||
|
pub struct LocalBlocksCache {
|
||||||
|
cache: HashMap<u64, BlockAndReceipts>,
|
||||||
|
// Lightweight range map to track the ranges of blocks in the local ingest directory
|
||||||
|
ranges: RangeInclusiveMap<u64, PathBuf>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LocalBlocksCache {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self { cache: HashMap::new(), ranges: RangeInclusiveMap::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
fn load_scan_result(&mut self, scan_result: ScanResult) {
|
||||||
|
for blk in scan_result.new_blocks {
|
||||||
|
let EvmBlock::Reth115(b) = &blk.block;
|
||||||
|
self.cache.insert(b.header.header.number, blk);
|
||||||
|
}
|
||||||
|
for range in scan_result.new_block_ranges {
|
||||||
|
self.ranges.insert(range, scan_result.path.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Block source that monitors the local ingest directory for the HL node.
|
/// Block source that monitors the local ingest directory for the HL node.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct HlNodeBlockSource {
|
pub struct HlNodeBlockSource {
|
||||||
pub fallback: BlockSourceBoxed,
|
pub fallback: BlockSourceBoxed,
|
||||||
pub local_ingest_dir: PathBuf,
|
pub local_ingest_dir: PathBuf,
|
||||||
pub local_blocks_cache: LocalBlocksCache, // height → block
|
pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>, // height → block
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
struct LocalBlockAndReceipts(String, BlockAndReceipts);
|
struct LocalBlockAndReceipts(String, BlockAndReceipts);
|
||||||
|
|
||||||
struct ScanResult {
|
struct ScanResult {
|
||||||
|
path: PathBuf,
|
||||||
next_expected_height: u64,
|
next_expected_height: u64,
|
||||||
new_blocks: Vec<BlockAndReceipts>,
|
new_blocks: Vec<BlockAndReceipts>,
|
||||||
|
new_block_ranges: Vec<RangeInclusive<u64>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
|
fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
|
||||||
@ -55,18 +80,30 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan
|
|||||||
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
|
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
|
||||||
let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
|
let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
|
||||||
|
|
||||||
|
let mut block_ranges: Vec<RangeInclusive<u64>> = Vec::new();
|
||||||
|
let mut current_range: Option<(u64, u64)> = None;
|
||||||
|
|
||||||
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
|
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
|
||||||
if line_idx < *last_line || line.trim().is_empty() {
|
if line_idx < *last_line || line.trim().is_empty() {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
match line_to_evm_block(line) {
|
match line_to_evm_block(line) {
|
||||||
Ok((parsed_block, height)) if height >= start_height => {
|
Ok((parsed_block, height)) => {
|
||||||
last_height = last_height.max(height);
|
if height >= start_height {
|
||||||
new_blocks.push(parsed_block);
|
last_height = last_height.max(height);
|
||||||
*last_line = line_idx;
|
new_blocks.push(parsed_block);
|
||||||
|
*last_line = line_idx;
|
||||||
|
}
|
||||||
|
if matches!(current_range, Some((_, end)) if end + 1 == height) {
|
||||||
|
current_range = Some((current_range.unwrap().0, height));
|
||||||
|
} else {
|
||||||
|
if let Some((start, end)) = current_range.take() {
|
||||||
|
block_ranges.push(start..=end);
|
||||||
|
}
|
||||||
|
current_range = Some((height, height));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(_) => continue,
|
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line));
|
warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line));
|
||||||
continue;
|
continue;
|
||||||
@ -74,7 +111,16 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ScanResult { next_expected_height: last_height + 1, new_blocks }
|
if let Some((start, end)) = current_range.take() {
|
||||||
|
block_ranges.push(start..=end);
|
||||||
|
}
|
||||||
|
|
||||||
|
ScanResult {
|
||||||
|
path: path.to_path_buf(),
|
||||||
|
next_expected_height: last_height + 1,
|
||||||
|
new_blocks,
|
||||||
|
new_block_ranges: block_ranges,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn date_from_datetime(dt: OffsetDateTime) -> String {
|
fn date_from_datetime(dt: OffsetDateTime) -> String {
|
||||||
@ -161,7 +207,16 @@ fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> String {
|
|||||||
impl HlNodeBlockSource {
|
impl HlNodeBlockSource {
|
||||||
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
|
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
|
||||||
let mut u_cache = self.local_blocks_cache.lock().await;
|
let mut u_cache = self.local_blocks_cache.lock().await;
|
||||||
u_cache.remove(&height)
|
if let Some(block) = u_cache.cache.remove(&height) {
|
||||||
|
return Some(block);
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(path) = u_cache.ranges.get(&height).cloned() else {
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
u_cache.load_scan_result(scan_hour_file(&path, &mut 0, height));
|
||||||
|
u_cache.cache.get(&height).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn datetime_from_path(path: &Path) -> Option<OffsetDateTime> {
|
fn datetime_from_path(path: &Path) -> Option<OffsetDateTime> {
|
||||||
@ -197,8 +252,8 @@ impl HlNodeBlockSource {
|
|||||||
|
|
||||||
async fn try_backfill_local_blocks(
|
async fn try_backfill_local_blocks(
|
||||||
root: &Path,
|
root: &Path,
|
||||||
cache: &LocalBlocksCache,
|
cache: &Arc<Mutex<LocalBlocksCache>>,
|
||||||
mut next_height: u64,
|
cutoff_height: u64,
|
||||||
) -> eyre::Result<()> {
|
) -> eyre::Result<()> {
|
||||||
let mut u_cache = cache.lock().await;
|
let mut u_cache = cache.lock().await;
|
||||||
|
|
||||||
@ -206,22 +261,19 @@ impl HlNodeBlockSource {
|
|||||||
let mut file = File::open(&subfile).expect("Failed to open hour file path");
|
let mut file = File::open(&subfile).expect("Failed to open hour file path");
|
||||||
let last_line = read_last_complete_line(&mut file);
|
let last_line = read_last_complete_line(&mut file);
|
||||||
if let Ok((_, height)) = line_to_evm_block(&last_line) {
|
if let Ok((_, height)) = line_to_evm_block(&last_line) {
|
||||||
if height < next_height {
|
if height < cutoff_height {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile);
|
warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile);
|
||||||
}
|
}
|
||||||
|
|
||||||
let ScanResult { next_expected_height, new_blocks } =
|
let mut scan_result = scan_hour_file(&subfile, &mut 0, cutoff_height);
|
||||||
scan_hour_file(&subfile, &mut 0, next_height);
|
// Only store the block ranges for now; actual block data will be loaded lazily later to optimize memory usage
|
||||||
for blk in new_blocks {
|
scan_result.new_blocks.clear();
|
||||||
let EvmBlock::Reth115(b) = &blk.block;
|
u_cache.load_scan_result(scan_result);
|
||||||
u_cache.insert(b.header.header.number, blk);
|
|
||||||
}
|
|
||||||
next_height = next_expected_height;
|
|
||||||
}
|
}
|
||||||
info!("Backfilled {} blocks", u_cache.len());
|
info!("Backfilled {} blocks", u_cache.cache.len());
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -251,16 +303,11 @@ impl HlNodeBlockSource {
|
|||||||
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
|
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
|
||||||
|
|
||||||
if hour_file.exists() {
|
if hour_file.exists() {
|
||||||
let ScanResult { next_expected_height, new_blocks } =
|
let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height);
|
||||||
scan_hour_file(&hour_file, &mut last_line, next_height);
|
next_height = scan_result.next_expected_height;
|
||||||
if !new_blocks.is_empty() {
|
|
||||||
let mut u_cache = cache.lock().await;
|
let mut u_cache = cache.lock().await;
|
||||||
for blk in new_blocks {
|
u_cache.load_scan_result(scan_result);
|
||||||
let EvmBlock::Reth115(b) = &blk.block;
|
|
||||||
u_cache.insert(b.header.header.number, blk);
|
|
||||||
}
|
|
||||||
next_height = next_expected_height;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let now = OffsetDateTime::now_utc();
|
let now = OffsetDateTime::now_utc();
|
||||||
@ -302,7 +349,7 @@ impl HlNodeBlockSource {
|
|||||||
let block_source = HlNodeBlockSource {
|
let block_source = HlNodeBlockSource {
|
||||||
fallback,
|
fallback,
|
||||||
local_ingest_dir,
|
local_ingest_dir,
|
||||||
local_blocks_cache: Arc::new(Mutex::new(HashMap::new())),
|
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())),
|
||||||
};
|
};
|
||||||
block_source.run(next_block_number).await.unwrap();
|
block_source.run(next_block_number).await.unwrap();
|
||||||
block_source
|
block_source
|
||||||
@ -318,4 +365,22 @@ mod tests {
|
|||||||
let dt = HlNodeBlockSource::datetime_from_path(path).unwrap();
|
let dt = HlNodeBlockSource::datetime_from_path(path).unwrap();
|
||||||
println!("{:?}", dt);
|
println!("{:?}", dt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_backfill() {
|
||||||
|
let test_path = Path::new("/root/evm_block_and_receipts");
|
||||||
|
if !test_path.exists() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let cache = Arc::new(Mutex::new(LocalBlocksCache::new()));
|
||||||
|
HlNodeBlockSource::try_backfill_local_blocks(&test_path, &cache, 1000000).await.unwrap();
|
||||||
|
|
||||||
|
let u_cache = cache.lock().await;
|
||||||
|
println!("{:?}", u_cache.ranges);
|
||||||
|
assert_eq!(
|
||||||
|
u_cache.ranges.get(&9735058),
|
||||||
|
Some(&test_path.join(HOURLY_SUBDIR).join("20250729").join("22"))
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user