mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Compare commits
4 Commits
0180711ae4
...
c0b3acf181
| Author | SHA1 | Date | |
|---|---|---|---|
| c0b3acf181 | |||
| 77158aa164 | |||
| 2d6b5e5cd2 | |||
| ff2e55b5a2 |
@ -7,8 +7,8 @@ use std::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use reth_network::cache::LruMap;
|
|
||||||
use rangemap::RangeInclusiveMap;
|
use rangemap::RangeInclusiveMap;
|
||||||
|
use reth_network::cache::LruMap;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use time::{macros::format_description, Date, Duration, OffsetDateTime, Time};
|
use time::{macros::format_description, Date, Duration, OffsetDateTime, Time};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
@ -33,10 +33,7 @@ impl LocalBlocksCache {
|
|||||||
const CACHE_SIZE: u32 = 8000;
|
const CACHE_SIZE: u32 = 8000;
|
||||||
|
|
||||||
fn new() -> Self {
|
fn new() -> Self {
|
||||||
Self {
|
Self { cache: LruMap::new(Self::CACHE_SIZE), ranges: RangeInclusiveMap::new() }
|
||||||
cache: LruMap::new(Self::CACHE_SIZE),
|
|
||||||
ranges: RangeInclusiveMap::new(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn load_scan_result(&mut self, scan_result: ScanResult) {
|
fn load_scan_result(&mut self, scan_result: ScanResult) {
|
||||||
@ -77,10 +74,17 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)>
|
|||||||
Ok((parsed_block, height))
|
Ok((parsed_block, height))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult {
|
struct ScanOptions {
|
||||||
|
start_height: u64,
|
||||||
|
only_load_ranges: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult {
|
||||||
let file = File::open(path).expect("Failed to open hour file path");
|
let file = File::open(path).expect("Failed to open hour file path");
|
||||||
let reader = BufReader::new(file);
|
let reader = BufReader::new(file);
|
||||||
|
|
||||||
|
let ScanOptions { start_height, only_load_ranges } = options;
|
||||||
|
|
||||||
let mut new_blocks = Vec::new();
|
let mut new_blocks = Vec::new();
|
||||||
let mut last_height = start_height;
|
let mut last_height = start_height;
|
||||||
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
|
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
|
||||||
@ -98,7 +102,9 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Scan
|
|||||||
Ok((parsed_block, height)) => {
|
Ok((parsed_block, height)) => {
|
||||||
if height >= start_height {
|
if height >= start_height {
|
||||||
last_height = last_height.max(height);
|
last_height = last_height.max(height);
|
||||||
|
if !only_load_ranges {
|
||||||
new_blocks.push(parsed_block);
|
new_blocks.push(parsed_block);
|
||||||
|
}
|
||||||
*last_line = line_idx;
|
*last_line = line_idx;
|
||||||
}
|
}
|
||||||
if matches!(current_range, Some((_, end)) if end + 1 == height) {
|
if matches!(current_range, Some((_, end)) if end + 1 == height) {
|
||||||
@ -137,9 +143,9 @@ impl BlockSource for HlNodeBlockSource {
|
|||||||
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
|
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
if let Some(block) = self.try_collect_local_block(height).await {
|
if let Some(block) = self.try_collect_local_block(height).await {
|
||||||
info!("Returning locally synced block for @ Height [{height}]");
|
|
||||||
Ok(block)
|
Ok(block)
|
||||||
} else {
|
} else {
|
||||||
|
info!("Falling back to s3/ingest-dir for block @ Height [{height}]");
|
||||||
self.fallback.collect_block(height).await
|
self.fallback.collect_block(height).await
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -222,7 +228,11 @@ impl HlNodeBlockSource {
|
|||||||
};
|
};
|
||||||
|
|
||||||
info!("Loading block data from {:?}", path);
|
info!("Loading block data from {:?}", path);
|
||||||
u_cache.load_scan_result(scan_hour_file(&path, &mut 0, height));
|
u_cache.load_scan_result(scan_hour_file(
|
||||||
|
&path,
|
||||||
|
&mut 0,
|
||||||
|
ScanOptions { start_height: 0, only_load_ranges: false },
|
||||||
|
));
|
||||||
u_cache.cache.get(&height).cloned()
|
u_cache.cache.get(&height).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -275,12 +285,28 @@ impl HlNodeBlockSource {
|
|||||||
warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile);
|
warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut scan_result = scan_hour_file(&subfile, &mut 0, cutoff_height);
|
let mut scan_result = scan_hour_file(
|
||||||
|
&subfile,
|
||||||
|
&mut 0,
|
||||||
|
ScanOptions { start_height: cutoff_height, only_load_ranges: true },
|
||||||
|
);
|
||||||
// Only store the block ranges for now; actual block data will be loaded lazily later to optimize memory usage
|
// Only store the block ranges for now; actual block data will be loaded lazily later to optimize memory usage
|
||||||
scan_result.new_blocks.clear();
|
scan_result.new_blocks.clear();
|
||||||
u_cache.load_scan_result(scan_result);
|
u_cache.load_scan_result(scan_result);
|
||||||
}
|
}
|
||||||
info!("Backfilled {} blocks", u_cache.cache.len());
|
|
||||||
|
if u_cache.ranges.is_empty() {
|
||||||
|
warn!("No ranges found in {:?}", root);
|
||||||
|
} else {
|
||||||
|
let (min, _) = u_cache.ranges.first_range_value().unwrap();
|
||||||
|
let (max, _) = u_cache.ranges.last_range_value().unwrap();
|
||||||
|
info!(
|
||||||
|
"Populated {} ranges (min: {}, max: {})",
|
||||||
|
u_cache.ranges.len(),
|
||||||
|
min.start(),
|
||||||
|
max.end()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@ -310,7 +336,11 @@ impl HlNodeBlockSource {
|
|||||||
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
|
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
|
||||||
|
|
||||||
if hour_file.exists() {
|
if hour_file.exists() {
|
||||||
let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height);
|
let scan_result = scan_hour_file(
|
||||||
|
&hour_file,
|
||||||
|
&mut last_line,
|
||||||
|
ScanOptions { start_height: next_height, only_load_ranges: false },
|
||||||
|
);
|
||||||
next_height = scan_result.next_expected_height;
|
next_height = scan_result.next_expected_height;
|
||||||
|
|
||||||
let mut u_cache = cache.lock().await;
|
let mut u_cache = cache.lock().await;
|
||||||
|
|||||||
Reference in New Issue
Block a user