mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 02:49:55 +00:00
Merge pull request #82 from hl-archive-node/fix/local-reader
fix(local-ingest-dir): Use more robust resumption for hl-node line reader, fix block number increment for reading files
This commit is contained in:
@ -82,11 +82,12 @@ where
|
|||||||
let mut tx_env = self.create_txn_env(&evm_env, request, &mut db)?;
|
let mut tx_env = self.create_txn_env(&evm_env, request, &mut db)?;
|
||||||
|
|
||||||
let mut is_basic_transfer = false;
|
let mut is_basic_transfer = false;
|
||||||
if tx_env.input().is_empty()
|
if tx_env.input().is_empty() &&
|
||||||
&& let TxKind::Call(to) = tx_env.kind()
|
let TxKind::Call(to) = tx_env.kind() &&
|
||||||
&& let Ok(code) = db.db.account_code(&to) {
|
let Ok(code) = db.db.account_code(&to)
|
||||||
is_basic_transfer = code.map(|code| code.is_empty()).unwrap_or(true);
|
{
|
||||||
}
|
is_basic_transfer = code.map(|code| code.is_empty()).unwrap_or(true);
|
||||||
|
}
|
||||||
|
|
||||||
if tx_env.gas_price() > 0 {
|
if tx_env.gas_price() > 0 {
|
||||||
highest_gas_limit =
|
highest_gas_limit =
|
||||||
@ -105,10 +106,11 @@ where
|
|||||||
let mut min_tx_env = tx_env.clone();
|
let mut min_tx_env = tx_env.clone();
|
||||||
min_tx_env.set_gas_limit(MIN_TRANSACTION_GAS);
|
min_tx_env.set_gas_limit(MIN_TRANSACTION_GAS);
|
||||||
|
|
||||||
if let Ok(res) = evm.transact(min_tx_env).map_err(Self::Error::from_evm_err)
|
if let Ok(res) = evm.transact(min_tx_env).map_err(Self::Error::from_evm_err) &&
|
||||||
&& res.result.is_success() {
|
res.result.is_success()
|
||||||
return Ok(U256::from(MIN_TRANSACTION_GAS));
|
{
|
||||||
}
|
return Ok(U256::from(MIN_TRANSACTION_GAS));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trace!(target: "rpc::eth::estimate", ?tx_env, gas_limit = tx_env.gas_limit(), is_basic_transfer, "Starting gas estimation");
|
trace!(target: "rpc::eth::estimate", ?tx_env, gas_limit = tx_env.gas_limit(), is_basic_transfer, "Starting gas estimation");
|
||||||
|
|||||||
@ -81,10 +81,11 @@ impl BlockPoller {
|
|||||||
.await
|
.await
|
||||||
.ok_or(eyre::eyre!("Failed to find latest block number"))?;
|
.ok_or(eyre::eyre!("Failed to find latest block number"))?;
|
||||||
|
|
||||||
if let Some(debug_cutoff_height) = debug_cutoff_height
|
if let Some(debug_cutoff_height) = debug_cutoff_height &&
|
||||||
&& next_block_number > debug_cutoff_height {
|
next_block_number > debug_cutoff_height
|
||||||
next_block_number = debug_cutoff_height;
|
{
|
||||||
}
|
next_block_number = debug_cutoff_height;
|
||||||
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
match block_source.collect_block(next_block_number).await {
|
match block_source.collect_block(next_block_number).await {
|
||||||
|
|||||||
@ -27,7 +27,7 @@ impl LocalBlocksCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_block(&mut self, height: u64) -> Option<BlockAndReceipts> {
|
pub fn get_block(&mut self, height: u64) -> Option<BlockAndReceipts> {
|
||||||
self.cache.remove(&height)
|
self.cache.get(&height).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_path_for_height(&self, height: u64) -> Option<PathBuf> {
|
pub fn get_path_for_height(&self, height: u64) -> Option<PathBuf> {
|
||||||
|
|||||||
@ -8,7 +8,7 @@ mod time_utils;
|
|||||||
use self::{
|
use self::{
|
||||||
cache::LocalBlocksCache,
|
cache::LocalBlocksCache,
|
||||||
file_ops::FileOperations,
|
file_ops::FileOperations,
|
||||||
scan::{ScanOptions, Scanner},
|
scan::{LineStream, ScanOptions, Scanner},
|
||||||
time_utils::TimeUtils,
|
time_utils::TimeUtils,
|
||||||
};
|
};
|
||||||
use super::{BlockSource, BlockSourceBoxed};
|
use super::{BlockSource, BlockSourceBoxed};
|
||||||
@ -52,6 +52,8 @@ pub struct HlNodeBlockSourceMetrics {
|
|||||||
pub fetched_from_hl_node: Counter,
|
pub fetched_from_hl_node: Counter,
|
||||||
/// How many times the HL node block source is fetched from the fallback
|
/// How many times the HL node block source is fetched from the fallback
|
||||||
pub fetched_from_fallback: Counter,
|
pub fetched_from_fallback: Counter,
|
||||||
|
/// How many times `try_collect_local_block` was faster than ingest loop
|
||||||
|
pub file_read_triggered: Counter,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockSource for HlNodeBlockSource {
|
impl BlockSource for HlNodeBlockSource {
|
||||||
@ -64,7 +66,9 @@ impl BlockSource for HlNodeBlockSource {
|
|||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let now = OffsetDateTime::now_utc();
|
let now = OffsetDateTime::now_utc();
|
||||||
|
|
||||||
if let Some(block) = Self::try_collect_local_block(local_blocks_cache, height).await {
|
if let Some(block) =
|
||||||
|
Self::try_collect_local_block(&metrics, local_blocks_cache, height).await
|
||||||
|
{
|
||||||
Self::update_last_fetch(last_local_fetch, height, now).await;
|
Self::update_last_fetch(last_local_fetch, height, now).await;
|
||||||
metrics.fetched_from_hl_node.increment(1);
|
metrics.fetched_from_hl_node.increment(1);
|
||||||
return Ok(block);
|
return Ok(block);
|
||||||
@ -120,6 +124,28 @@ impl BlockSource for HlNodeBlockSource {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct CurrentFile {
|
||||||
|
path: PathBuf,
|
||||||
|
line_stream: Option<LineStream>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CurrentFile {
|
||||||
|
pub fn from_datetime(dt: OffsetDateTime, root: &Path) -> Self {
|
||||||
|
let (hour, day_str) = (dt.hour(), TimeUtils::date_from_datetime(dt));
|
||||||
|
let path = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{}", hour));
|
||||||
|
Self { path, line_stream: None }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn open(&mut self) -> eyre::Result<()> {
|
||||||
|
if self.line_stream.is_some() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.line_stream = Some(LineStream::from_path(&self.path)?);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl HlNodeBlockSource {
|
impl HlNodeBlockSource {
|
||||||
async fn update_last_fetch(
|
async fn update_last_fetch(
|
||||||
last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
|
last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
|
||||||
@ -133,6 +159,7 @@ impl HlNodeBlockSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn try_collect_local_block(
|
async fn try_collect_local_block(
|
||||||
|
metrics: &HlNodeBlockSourceMetrics,
|
||||||
local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
|
local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
|
||||||
height: u64,
|
height: u64,
|
||||||
) -> Option<BlockAndReceipts> {
|
) -> Option<BlockAndReceipts> {
|
||||||
@ -142,9 +169,10 @@ impl HlNodeBlockSource {
|
|||||||
}
|
}
|
||||||
let path = u_cache.get_path_for_height(height)?;
|
let path = u_cache.get_path_for_height(height)?;
|
||||||
info!("Loading block data from {:?}", path);
|
info!("Loading block data from {:?}", path);
|
||||||
|
metrics.file_read_triggered.increment(1);
|
||||||
|
let mut line_stream = LineStream::from_path(&path).ok()?;
|
||||||
let scan_result = Scanner::scan_hour_file(
|
let scan_result = Scanner::scan_hour_file(
|
||||||
&path,
|
&mut line_stream,
|
||||||
&mut 0,
|
|
||||||
ScanOptions { start_height: 0, only_load_ranges: false },
|
ScanOptions { start_height: 0, only_load_ranges: false },
|
||||||
);
|
);
|
||||||
u_cache.load_scan_result(scan_result);
|
u_cache.load_scan_result(scan_result);
|
||||||
@ -165,9 +193,10 @@ impl HlNodeBlockSource {
|
|||||||
} else {
|
} else {
|
||||||
warn!("Failed to parse last line of file: {:?}", subfile);
|
warn!("Failed to parse last line of file: {:?}", subfile);
|
||||||
}
|
}
|
||||||
|
let mut line_stream =
|
||||||
|
LineStream::from_path(&subfile).expect("Failed to open line stream");
|
||||||
let mut scan_result = Scanner::scan_hour_file(
|
let mut scan_result = Scanner::scan_hour_file(
|
||||||
&subfile,
|
&mut line_stream,
|
||||||
&mut 0,
|
|
||||||
ScanOptions { start_height: cutoff_height, only_load_ranges: true },
|
ScanOptions { start_height: cutoff_height, only_load_ranges: true },
|
||||||
);
|
);
|
||||||
scan_result.new_blocks.clear(); // Only store ranges, load data lazily
|
scan_result.new_blocks.clear(); // Only store ranges, load data lazily
|
||||||
@ -188,15 +217,13 @@ impl HlNodeBlockSource {
|
|||||||
}
|
}
|
||||||
tokio::time::sleep(TAIL_INTERVAL).await;
|
tokio::time::sleep(TAIL_INTERVAL).await;
|
||||||
};
|
};
|
||||||
let (mut hour, mut day_str, mut last_line) =
|
let mut current_file = CurrentFile::from_datetime(dt, &root);
|
||||||
(dt.hour(), TimeUtils::date_from_datetime(dt), 0);
|
|
||||||
info!("Starting local ingest loop from height: {}", current_head);
|
info!("Starting local ingest loop from height: {}", current_head);
|
||||||
loop {
|
loop {
|
||||||
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
|
let _ = current_file.open();
|
||||||
if hour_file.exists() {
|
if let Some(line_stream) = &mut current_file.line_stream {
|
||||||
let scan_result = Scanner::scan_hour_file(
|
let scan_result = Scanner::scan_hour_file(
|
||||||
&hour_file,
|
line_stream,
|
||||||
&mut last_line,
|
|
||||||
ScanOptions { start_height: next_height, only_load_ranges: false },
|
ScanOptions { start_height: next_height, only_load_ranges: false },
|
||||||
);
|
);
|
||||||
next_height = scan_result.next_expected_height;
|
next_height = scan_result.next_expected_height;
|
||||||
@ -205,11 +232,8 @@ impl HlNodeBlockSource {
|
|||||||
let now = OffsetDateTime::now_utc();
|
let now = OffsetDateTime::now_utc();
|
||||||
if dt + ONE_HOUR < now {
|
if dt + ONE_HOUR < now {
|
||||||
dt += ONE_HOUR;
|
dt += ONE_HOUR;
|
||||||
(hour, day_str, last_line) = (dt.hour(), TimeUtils::date_from_datetime(dt), 0);
|
current_file = CurrentFile::from_datetime(dt, &root);
|
||||||
info!(
|
info!("Moving to new file: {:?}", current_file.path);
|
||||||
"Moving to new file: {:?}",
|
|
||||||
root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"))
|
|
||||||
);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
tokio::time::sleep(TAIL_INTERVAL).await;
|
tokio::time::sleep(TAIL_INTERVAL).await;
|
||||||
|
|||||||
@ -2,7 +2,7 @@ use crate::node::types::{BlockAndReceipts, EvmBlock};
|
|||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::{
|
use std::{
|
||||||
fs::File,
|
fs::File,
|
||||||
io::{BufRead, BufReader},
|
io::{BufRead, BufReader, Seek, SeekFrom},
|
||||||
ops::RangeInclusive,
|
ops::RangeInclusive,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
};
|
};
|
||||||
@ -25,6 +25,57 @@ pub struct ScanOptions {
|
|||||||
|
|
||||||
pub struct Scanner;
|
pub struct Scanner;
|
||||||
|
|
||||||
|
/// Stream for sequentially reading lines from a file.
|
||||||
|
///
|
||||||
|
/// This struct allows sequential iteration over lines over [Self::next] method.
|
||||||
|
/// It is resilient to cases where the line producer process is interrupted while writing:
|
||||||
|
/// - If a line is incomplete but still ends with a line ending, it is skipped: later, the fallback
|
||||||
|
/// block source will be used to retrieve the missing block.
|
||||||
|
/// - If a line does not end with a newline (i.e., the write was incomplete), the method returns
|
||||||
|
/// `None` to break out of the loop and avoid reading partial data.
|
||||||
|
/// - If a temporary I/O error occurs, the stream exits the loop without rewinding the cursor, which
|
||||||
|
/// will result in skipping ahead to the next unread bytes.
|
||||||
|
pub struct LineStream {
|
||||||
|
path: PathBuf,
|
||||||
|
reader: BufReader<File>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LineStream {
|
||||||
|
pub fn from_path(path: &Path) -> std::io::Result<Self> {
|
||||||
|
let reader = BufReader::with_capacity(1024 * 1024, File::open(path)?);
|
||||||
|
Ok(Self { path: path.to_path_buf(), reader })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn next(&mut self) -> Option<String> {
|
||||||
|
let mut line_buffer = vec![];
|
||||||
|
let Ok(size) = self.reader.read_until(b'\n', &mut line_buffer) else {
|
||||||
|
// Temporary I/O error; restart the loop
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Now cursor is right after the end of the line
|
||||||
|
// On UTF-8 error, skip the line
|
||||||
|
let Ok(mut line) = String::from_utf8(line_buffer) else {
|
||||||
|
return Some(String::new());
|
||||||
|
};
|
||||||
|
|
||||||
|
// If line is not completed yet, return None so that we can break the loop
|
||||||
|
if line.ends_with('\n') {
|
||||||
|
if line.ends_with('\r') {
|
||||||
|
line.pop();
|
||||||
|
}
|
||||||
|
line.pop();
|
||||||
|
return Some(line);
|
||||||
|
}
|
||||||
|
|
||||||
|
// info!("Line is not completed yet: {}", line);
|
||||||
|
if size != 0 {
|
||||||
|
self.reader.seek(SeekFrom::Current(-(size as i64))).unwrap();
|
||||||
|
}
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Scanner {
|
impl Scanner {
|
||||||
pub fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
|
pub fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
|
||||||
let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts =
|
let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts =
|
||||||
@ -35,31 +86,20 @@ impl Scanner {
|
|||||||
Ok((parsed_block, height))
|
Ok((parsed_block, height))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult {
|
pub fn scan_hour_file(line_stream: &mut LineStream, options: ScanOptions) -> ScanResult {
|
||||||
let lines: Vec<String> =
|
|
||||||
BufReader::new(File::open(path).expect("Failed to open hour file"))
|
|
||||||
.lines()
|
|
||||||
.collect::<Result<_, _>>()
|
|
||||||
.unwrap();
|
|
||||||
let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
|
|
||||||
let mut new_blocks = Vec::new();
|
let mut new_blocks = Vec::new();
|
||||||
let mut last_height = options.start_height;
|
let mut last_height = options.start_height;
|
||||||
let mut block_ranges = Vec::new();
|
let mut block_ranges = Vec::new();
|
||||||
let mut current_range: Option<(u64, u64)> = None;
|
let mut current_range: Option<(u64, u64)> = None;
|
||||||
|
|
||||||
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
|
while let Some(line) = line_stream.next() {
|
||||||
if line_idx < *last_line || line.trim().is_empty() {
|
match Self::line_to_evm_block(&line) {
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
match Self::line_to_evm_block(line) {
|
|
||||||
Ok((parsed_block, height)) => {
|
Ok((parsed_block, height)) => {
|
||||||
if height >= options.start_height {
|
if height >= options.start_height {
|
||||||
last_height = last_height.max(height);
|
last_height = last_height.max(height);
|
||||||
if !options.only_load_ranges {
|
if !options.only_load_ranges {
|
||||||
new_blocks.push(parsed_block);
|
new_blocks.push(parsed_block);
|
||||||
}
|
}
|
||||||
*last_line = line_idx;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
match current_range {
|
match current_range {
|
||||||
@ -74,16 +114,17 @@ impl Scanner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)),
|
Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(&line)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some((start, end)) = current_range {
|
if let Some((start, end)) = current_range {
|
||||||
block_ranges.push(start..=end);
|
block_ranges.push(start..=end);
|
||||||
}
|
}
|
||||||
|
|
||||||
ScanResult {
|
ScanResult {
|
||||||
path: path.to_path_buf(),
|
path: line_stream.path.clone(),
|
||||||
next_expected_height: last_height + 1,
|
next_expected_height: last_height + current_range.is_some() as u64,
|
||||||
new_blocks,
|
new_blocks,
|
||||||
new_block_ranges: block_ranges,
|
new_block_ranges: block_ranges,
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user