mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
refactor: Split files for block sources
By claude code
This commit is contained in:
44
src/pseudo_peer/sources/cached.rs
Normal file
44
src/pseudo_peer/sources/cached.rs
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
use super::{BlockSource, BlockSourceBoxed};
|
||||||
|
use crate::node::types::BlockAndReceipts;
|
||||||
|
use futures::{future::BoxFuture, FutureExt};
|
||||||
|
use reth_network::cache::LruMap;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
|
||||||
|
/// Block source wrapper that caches blocks in memory
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct CachedBlockSource {
|
||||||
|
block_source: BlockSourceBoxed,
|
||||||
|
cache: Arc<RwLock<LruMap<u64, BlockAndReceipts>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CachedBlockSource {
|
||||||
|
const CACHE_LIMIT: u32 = 100000;
|
||||||
|
|
||||||
|
pub fn new(block_source: BlockSourceBoxed) -> Self {
|
||||||
|
Self { block_source, cache: Arc::new(RwLock::new(LruMap::new(Self::CACHE_LIMIT))) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BlockSource for CachedBlockSource {
|
||||||
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||||
|
let block_source = self.block_source.clone();
|
||||||
|
let cache = self.cache.clone();
|
||||||
|
async move {
|
||||||
|
if let Some(block) = cache.write().unwrap().get(&height) {
|
||||||
|
return Ok(block.clone());
|
||||||
|
}
|
||||||
|
let block = block_source.collect_block(height).await?;
|
||||||
|
cache.write().unwrap().insert(height, block.clone());
|
||||||
|
Ok(block)
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
||||||
|
self.block_source.find_latest_block_number()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn recommended_chunk_size(&self) -> u64 {
|
||||||
|
self.block_source.recommended_chunk_size()
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,601 +0,0 @@
|
|||||||
use super::{BlockSource, BlockSourceBoxed};
|
|
||||||
use crate::node::types::{BlockAndReceipts, EvmBlock};
|
|
||||||
use futures::future::BoxFuture;
|
|
||||||
use rangemap::RangeInclusiveMap;
|
|
||||||
use reth_network::cache::LruMap;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::{
|
|
||||||
fs::File,
|
|
||||||
io::{BufRead, BufReader, Read, Seek, SeekFrom},
|
|
||||||
ops::RangeInclusive,
|
|
||||||
path::{Path, PathBuf},
|
|
||||||
sync::Arc,
|
|
||||||
};
|
|
||||||
use time::{macros::format_description, Date, Duration, OffsetDateTime, Time};
|
|
||||||
use tokio::sync::Mutex;
|
|
||||||
use tracing::{info, warn};
|
|
||||||
|
|
||||||
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
|
|
||||||
const HOURLY_SUBDIR: &str = "hourly";
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct LocalBlocksCache {
|
|
||||||
cache: LruMap<u64, BlockAndReceipts>,
|
|
||||||
// Lightweight range map to track the ranges of blocks in the local ingest directory
|
|
||||||
ranges: RangeInclusiveMap<u64, PathBuf>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LocalBlocksCache {
|
|
||||||
const CACHE_SIZE: u32 = 8000; // 3660 blocks per hour
|
|
||||||
|
|
||||||
fn new() -> Self {
|
|
||||||
Self { cache: LruMap::new(Self::CACHE_SIZE), ranges: RangeInclusiveMap::new() }
|
|
||||||
}
|
|
||||||
|
|
||||||
fn load_scan_result(&mut self, scan_result: ScanResult) {
|
|
||||||
for blk in scan_result.new_blocks {
|
|
||||||
let EvmBlock::Reth115(b) = &blk.block;
|
|
||||||
self.cache.insert(b.header.header.number, blk);
|
|
||||||
}
|
|
||||||
for range in scan_result.new_block_ranges {
|
|
||||||
self.ranges.insert(range, scan_result.path.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
||||||
struct LocalBlockAndReceipts(String, BlockAndReceipts);
|
|
||||||
|
|
||||||
struct ScanResult {
|
|
||||||
path: PathBuf,
|
|
||||||
next_expected_height: u64,
|
|
||||||
new_blocks: Vec<BlockAndReceipts>,
|
|
||||||
new_block_ranges: Vec<RangeInclusive<u64>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ScanOptions {
|
|
||||||
start_height: u64,
|
|
||||||
only_load_ranges: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
|
|
||||||
let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts = serde_json::from_str(line)?;
|
|
||||||
let height = match &parsed_block.block {
|
|
||||||
EvmBlock::Reth115(b) => b.header.header.number,
|
|
||||||
};
|
|
||||||
Ok((parsed_block, height))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult {
|
|
||||||
let lines: Vec<String> = BufReader::new(File::open(path).expect("Failed to open hour file"))
|
|
||||||
.lines()
|
|
||||||
.collect::<Result<_, _>>()
|
|
||||||
.unwrap();
|
|
||||||
let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
|
|
||||||
let mut new_blocks = Vec::new();
|
|
||||||
let mut last_height = options.start_height;
|
|
||||||
let mut block_ranges = Vec::new();
|
|
||||||
let mut current_range: Option<(u64, u64)> = None;
|
|
||||||
|
|
||||||
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
|
|
||||||
if line_idx < *last_line || line.trim().is_empty() {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
match line_to_evm_block(line) {
|
|
||||||
Ok((parsed_block, height)) if height >= options.start_height => {
|
|
||||||
last_height = last_height.max(height);
|
|
||||||
if !options.only_load_ranges {
|
|
||||||
new_blocks.push(parsed_block);
|
|
||||||
}
|
|
||||||
*last_line = line_idx;
|
|
||||||
|
|
||||||
match current_range {
|
|
||||||
Some((start, end)) if end + 1 == height => {
|
|
||||||
current_range = Some((start, height))
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
if let Some((start, end)) = current_range.take() {
|
|
||||||
block_ranges.push(start..=end);
|
|
||||||
}
|
|
||||||
current_range = Some((height, height));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(_) => {}
|
|
||||||
Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some((start, end)) = current_range {
|
|
||||||
block_ranges.push(start..=end);
|
|
||||||
}
|
|
||||||
ScanResult {
|
|
||||||
path: path.to_path_buf(),
|
|
||||||
next_expected_height: last_height + 1,
|
|
||||||
new_blocks,
|
|
||||||
new_block_ranges: block_ranges,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn date_from_datetime(dt: OffsetDateTime) -> String {
|
|
||||||
dt.format(&format_description!("[year][month][day]")).unwrap()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Block source that monitors the local ingest directory for the HL node.
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct HlNodeBlockSource {
|
|
||||||
pub fallback: BlockSourceBoxed,
|
|
||||||
pub local_ingest_dir: PathBuf,
|
|
||||||
pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>, // height → block
|
|
||||||
// for rate limiting requests to fallback
|
|
||||||
pub last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BlockSource for HlNodeBlockSource {
|
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
|
||||||
let fallback = self.fallback.clone();
|
|
||||||
let local_blocks_cache = self.local_blocks_cache.clone();
|
|
||||||
let last_local_fetch = self.last_local_fetch.clone();
|
|
||||||
Box::pin(async move {
|
|
||||||
let now = OffsetDateTime::now_utc();
|
|
||||||
|
|
||||||
if let Some(block) = Self::try_collect_local_block(local_blocks_cache, height).await {
|
|
||||||
Self::update_last_fetch(last_local_fetch, height, now).await;
|
|
||||||
return Ok(block);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some((last_height, last_poll_time)) = *last_local_fetch.lock().await {
|
|
||||||
let more_recent = last_height < height;
|
|
||||||
let too_soon = now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK;
|
|
||||||
if more_recent && too_soon {
|
|
||||||
return Err(eyre::eyre!(
|
|
||||||
"Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up"
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let block = fallback.collect_block(height).await?;
|
|
||||||
Self::update_last_fetch(last_local_fetch, height, now).await;
|
|
||||||
Ok(block)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
|
||||||
let fallback = self.fallback.clone();
|
|
||||||
let local_ingest_dir = self.local_ingest_dir.clone();
|
|
||||||
Box::pin(async move {
|
|
||||||
let Some(dir) = Self::find_latest_hourly_file(&local_ingest_dir) else {
|
|
||||||
warn!(
|
|
||||||
"No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir",
|
|
||||||
local_ingest_dir
|
|
||||||
);
|
|
||||||
return fallback.find_latest_block_number().await;
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut file = File::open(&dir).expect("Failed to open hour file path");
|
|
||||||
if let Some((_, height)) = read_last_complete_line(&mut file) {
|
|
||||||
info!("Latest block number: {} with path {}", height, dir.display());
|
|
||||||
Some(height)
|
|
||||||
} else {
|
|
||||||
warn!(
|
|
||||||
"Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir",
|
|
||||||
file
|
|
||||||
);
|
|
||||||
fallback.find_latest_block_number().await
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn recommended_chunk_size(&self) -> u64 {
|
|
||||||
self.fallback.recommended_chunk_size()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> Option<(BlockAndReceipts, u64)> {
|
|
||||||
const CHUNK_SIZE: u64 = 50000;
|
|
||||||
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize);
|
|
||||||
let mut pos = read.seek(SeekFrom::End(0)).unwrap();
|
|
||||||
let mut last_line = Vec::new();
|
|
||||||
|
|
||||||
while pos > 0 {
|
|
||||||
let read_size = pos.min(CHUNK_SIZE);
|
|
||||||
buf.resize(read_size as usize, 0);
|
|
||||||
read.seek(SeekFrom::Start(pos - read_size)).unwrap();
|
|
||||||
read.read_exact(&mut buf).unwrap();
|
|
||||||
last_line = [buf.clone(), last_line].concat();
|
|
||||||
if last_line.ends_with(b"\n") {
|
|
||||||
last_line.pop();
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') {
|
|
||||||
let candidate = &last_line[idx + 1..];
|
|
||||||
if let Ok(result) = line_to_evm_block(str::from_utf8(candidate).unwrap()) {
|
|
||||||
return Some(result);
|
|
||||||
}
|
|
||||||
last_line.truncate(idx);
|
|
||||||
}
|
|
||||||
if pos < read_size {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
pos -= read_size;
|
|
||||||
}
|
|
||||||
line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok()
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HlNodeBlockSource {
|
|
||||||
/// [HlNodeBlockSource] picks the faster one between local ingest directory and s3/ingest-dir.
|
|
||||||
/// But if we immediately fallback to s3/ingest-dir, in case of S3, it may cause unnecessary
|
|
||||||
/// requests to S3 while it'll return 404.
|
|
||||||
///
|
|
||||||
/// To avoid unnecessary fallback, we set a short threshold period.
|
|
||||||
/// This threshold is several times longer than the expected block time, reducing redundant
|
|
||||||
/// fallback attempts.
|
|
||||||
pub(crate) const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000);
|
|
||||||
|
|
||||||
async fn update_last_fetch(
|
|
||||||
last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
|
|
||||||
height: u64,
|
|
||||||
now: OffsetDateTime,
|
|
||||||
) {
|
|
||||||
let mut last_fetch = last_local_fetch.lock().await;
|
|
||||||
if last_fetch.is_none_or(|(h, _)| h < height) {
|
|
||||||
*last_fetch = Some((height, now));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn try_collect_local_block(
|
|
||||||
local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
|
|
||||||
height: u64,
|
|
||||||
) -> Option<BlockAndReceipts> {
|
|
||||||
let mut u_cache = local_blocks_cache.lock().await;
|
|
||||||
if let Some(block) = u_cache.cache.remove(&height) {
|
|
||||||
return Some(block);
|
|
||||||
}
|
|
||||||
let path = u_cache.ranges.get(&height).cloned()?;
|
|
||||||
info!("Loading block data from {:?}", path);
|
|
||||||
u_cache.load_scan_result(scan_hour_file(
|
|
||||||
&path,
|
|
||||||
&mut 0,
|
|
||||||
ScanOptions { start_height: 0, only_load_ranges: false },
|
|
||||||
));
|
|
||||||
u_cache.cache.get(&height).cloned()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn datetime_from_path(path: &Path) -> Option<OffsetDateTime> {
|
|
||||||
let (dt_part, hour_part) =
|
|
||||||
(path.parent()?.file_name()?.to_str()?, path.file_name()?.to_str()?);
|
|
||||||
Some(OffsetDateTime::new_utc(
|
|
||||||
Date::parse(dt_part, &format_description!("[year][month][day]")).ok()?,
|
|
||||||
Time::from_hms(hour_part.parse().ok()?, 0, 0).ok()?,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn all_hourly_files(root: &Path) -> Option<Vec<PathBuf>> {
|
|
||||||
let mut files = Vec::new();
|
|
||||||
for entry in std::fs::read_dir(root.join(HOURLY_SUBDIR)).ok()? {
|
|
||||||
let dir = entry.ok()?.path();
|
|
||||||
if let Ok(subentries) = std::fs::read_dir(&dir) {
|
|
||||||
files.extend(
|
|
||||||
subentries
|
|
||||||
.filter_map(|f| f.ok().map(|f| f.path()))
|
|
||||||
.filter(|p| Self::datetime_from_path(p).is_some()),
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
files.sort();
|
|
||||||
Some(files)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn find_latest_hourly_file(root: &Path) -> Option<PathBuf> {
|
|
||||||
Self::all_hourly_files(root)?.into_iter().last()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn try_backfill_local_blocks(
|
|
||||||
root: &Path,
|
|
||||||
cache: &Arc<Mutex<LocalBlocksCache>>,
|
|
||||||
cutoff_height: u64,
|
|
||||||
) -> eyre::Result<()> {
|
|
||||||
let mut u_cache = cache.lock().await;
|
|
||||||
for subfile in Self::all_hourly_files(root).unwrap_or_default() {
|
|
||||||
let mut file = File::open(&subfile).expect("Failed to open hour file");
|
|
||||||
if let Some((_, height)) = read_last_complete_line(&mut file) {
|
|
||||||
if height < cutoff_height {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
warn!("Failed to parse last line of file: {:?}", subfile);
|
|
||||||
}
|
|
||||||
let mut scan_result = scan_hour_file(
|
|
||||||
&subfile,
|
|
||||||
&mut 0,
|
|
||||||
ScanOptions { start_height: cutoff_height, only_load_ranges: true },
|
|
||||||
);
|
|
||||||
scan_result.new_blocks.clear(); // Only store ranges, load data lazily
|
|
||||||
u_cache.load_scan_result(scan_result);
|
|
||||||
}
|
|
||||||
if u_cache.ranges.is_empty() {
|
|
||||||
warn!("No ranges found in {:?}", root);
|
|
||||||
} else {
|
|
||||||
let (min, max) = (
|
|
||||||
u_cache.ranges.first_range_value().unwrap(),
|
|
||||||
u_cache.ranges.last_range_value().unwrap(),
|
|
||||||
);
|
|
||||||
info!(
|
|
||||||
"Populated {} ranges (min: {}, max: {})",
|
|
||||||
u_cache.ranges.len(),
|
|
||||||
min.0.start(),
|
|
||||||
max.0.end()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn start_local_ingest_loop(&self, current_head: u64) {
|
|
||||||
let root = self.local_ingest_dir.to_owned();
|
|
||||||
let cache = self.local_blocks_cache.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut next_height = current_head;
|
|
||||||
let mut dt = loop {
|
|
||||||
if let Some(f) = Self::find_latest_hourly_file(&root) {
|
|
||||||
break Self::datetime_from_path(&f).unwrap();
|
|
||||||
}
|
|
||||||
tokio::time::sleep(TAIL_INTERVAL).await;
|
|
||||||
};
|
|
||||||
let (mut hour, mut day_str, mut last_line) = (dt.hour(), date_from_datetime(dt), 0);
|
|
||||||
info!("Starting local ingest loop from height: {}", current_head);
|
|
||||||
loop {
|
|
||||||
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
|
|
||||||
if hour_file.exists() {
|
|
||||||
let scan_result = scan_hour_file(
|
|
||||||
&hour_file,
|
|
||||||
&mut last_line,
|
|
||||||
ScanOptions { start_height: next_height, only_load_ranges: false },
|
|
||||||
);
|
|
||||||
next_height = scan_result.next_expected_height;
|
|
||||||
cache.lock().await.load_scan_result(scan_result);
|
|
||||||
}
|
|
||||||
let now = OffsetDateTime::now_utc();
|
|
||||||
if dt + Duration::HOUR < now {
|
|
||||||
dt += Duration::HOUR;
|
|
||||||
(hour, day_str, last_line) = (dt.hour(), date_from_datetime(dt), 0);
|
|
||||||
info!(
|
|
||||||
"Moving to new file: {:?}",
|
|
||||||
root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"))
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
tokio::time::sleep(TAIL_INTERVAL).await;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> {
|
|
||||||
let _ = Self::try_backfill_local_blocks(
|
|
||||||
&self.local_ingest_dir,
|
|
||||||
&self.local_blocks_cache,
|
|
||||||
next_block_number,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
self.start_local_ingest_loop(next_block_number).await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn new(
|
|
||||||
fallback: BlockSourceBoxed,
|
|
||||||
local_ingest_dir: PathBuf,
|
|
||||||
next_block_number: u64,
|
|
||||||
) -> Self {
|
|
||||||
let block_source = Self {
|
|
||||||
fallback,
|
|
||||||
local_ingest_dir,
|
|
||||||
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())),
|
|
||||||
last_local_fetch: Arc::new(Mutex::new(None)),
|
|
||||||
};
|
|
||||||
block_source.run(next_block_number).await.unwrap();
|
|
||||||
block_source
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use crate::{
|
|
||||||
node::types::{reth_compat, ReadPrecompileCalls},
|
|
||||||
pseudo_peer::sources::LocalBlockSource,
|
|
||||||
};
|
|
||||||
use alloy_consensus::{BlockBody, Header};
|
|
||||||
use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256};
|
|
||||||
use std::{io::Write, time::Duration};
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_datetime_from_path() {
|
|
||||||
let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4");
|
|
||||||
let dt = HlNodeBlockSource::datetime_from_path(path).unwrap();
|
|
||||||
println!("{dt:?}");
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_backfill() {
|
|
||||||
let test_path = Path::new("/root/evm_block_and_receipts");
|
|
||||||
if !test_path.exists() {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let cache = Arc::new(Mutex::new(LocalBlocksCache::new()));
|
|
||||||
HlNodeBlockSource::try_backfill_local_blocks(test_path, &cache, 1000000).await.unwrap();
|
|
||||||
|
|
||||||
let u_cache = cache.lock().await;
|
|
||||||
println!("{:?}", u_cache.ranges);
|
|
||||||
assert_eq!(
|
|
||||||
u_cache.ranges.get(&9735058),
|
|
||||||
Some(&test_path.join(HOURLY_SUBDIR).join("20250729").join("22"))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn scan_result_from_single_block(block: BlockAndReceipts) -> ScanResult {
|
|
||||||
let height = match &block.block {
|
|
||||||
EvmBlock::Reth115(b) => b.header.header.number,
|
|
||||||
};
|
|
||||||
ScanResult {
|
|
||||||
path: PathBuf::from("/nonexistent-block"),
|
|
||||||
next_expected_height: height + 1,
|
|
||||||
new_blocks: vec![block],
|
|
||||||
new_block_ranges: vec![height..=height],
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn empty_block(
|
|
||||||
number: u64,
|
|
||||||
timestamp: u64,
|
|
||||||
extra_data: &'static [u8],
|
|
||||||
) -> LocalBlockAndReceipts {
|
|
||||||
LocalBlockAndReceipts(
|
|
||||||
timestamp.to_string(),
|
|
||||||
BlockAndReceipts {
|
|
||||||
block: EvmBlock::Reth115(reth_compat::SealedBlock {
|
|
||||||
header: reth_compat::SealedHeader {
|
|
||||||
header: Header {
|
|
||||||
parent_hash: B256::ZERO,
|
|
||||||
ommers_hash: B256::ZERO,
|
|
||||||
beneficiary: Address::ZERO,
|
|
||||||
state_root: B256::ZERO,
|
|
||||||
transactions_root: B256::ZERO,
|
|
||||||
receipts_root: B256::ZERO,
|
|
||||||
logs_bloom: Bloom::ZERO,
|
|
||||||
difficulty: U256::ZERO,
|
|
||||||
number,
|
|
||||||
gas_limit: 0,
|
|
||||||
gas_used: 0,
|
|
||||||
timestamp,
|
|
||||||
extra_data: Bytes::from_static(extra_data),
|
|
||||||
mix_hash: B256::ZERO,
|
|
||||||
nonce: B64::ZERO,
|
|
||||||
base_fee_per_gas: None,
|
|
||||||
withdrawals_root: None,
|
|
||||||
blob_gas_used: None,
|
|
||||||
excess_blob_gas: None,
|
|
||||||
parent_beacon_block_root: None,
|
|
||||||
requests_hash: None,
|
|
||||||
},
|
|
||||||
hash: B256::ZERO,
|
|
||||||
},
|
|
||||||
body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None },
|
|
||||||
}),
|
|
||||||
receipts: vec![],
|
|
||||||
system_txs: vec![],
|
|
||||||
read_precompile_calls: ReadPrecompileCalls(vec![]),
|
|
||||||
highest_precompile_address: None,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, File)> {
|
|
||||||
let now = OffsetDateTime::now_utc();
|
|
||||||
let temp_dir = tempfile::tempdir()?;
|
|
||||||
let path = temp_dir
|
|
||||||
.path()
|
|
||||||
.join(HOURLY_SUBDIR)
|
|
||||||
.join(date_from_datetime(now))
|
|
||||||
.join(format!("{}", now.hour()));
|
|
||||||
std::fs::create_dir_all(path.parent().unwrap())?;
|
|
||||||
Ok((temp_dir, File::create(path)?))
|
|
||||||
}
|
|
||||||
|
|
||||||
struct BlockSourceHierarchy {
|
|
||||||
block_source: HlNodeBlockSource,
|
|
||||||
_temp_dir: tempfile::TempDir,
|
|
||||||
file1: File,
|
|
||||||
current_block: LocalBlockAndReceipts,
|
|
||||||
future_block_hl_node: LocalBlockAndReceipts,
|
|
||||||
future_block_fallback: LocalBlockAndReceipts,
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn setup_block_source_hierarchy() -> eyre::Result<BlockSourceHierarchy> {
|
|
||||||
// Setup fallback block source
|
|
||||||
let block_source_fallback = HlNodeBlockSource::new(
|
|
||||||
BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))),
|
|
||||||
PathBuf::from("/nonexistent"),
|
|
||||||
1000000,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
let block_hl_node_0 = empty_block(1000000, 1722633600, b"hl-node");
|
|
||||||
let block_hl_node_1 = empty_block(1000001, 1722633600, b"hl-node");
|
|
||||||
let block_fallback_1 = empty_block(1000001, 1722633600, b"fallback");
|
|
||||||
|
|
||||||
let (temp_dir1, mut file1) = setup_temp_dir_and_file()?;
|
|
||||||
writeln!(&mut file1, "{}", serde_json::to_string(&block_hl_node_0)?)?;
|
|
||||||
|
|
||||||
let block_source = HlNodeBlockSource::new(
|
|
||||||
BlockSourceBoxed::new(Box::new(block_source_fallback.clone())),
|
|
||||||
temp_dir1.path().to_path_buf(),
|
|
||||||
1000000,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
block_source_fallback
|
|
||||||
.local_blocks_cache
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.load_scan_result(scan_result_from_single_block(block_fallback_1.1.clone()));
|
|
||||||
|
|
||||||
Ok(BlockSourceHierarchy {
|
|
||||||
block_source,
|
|
||||||
_temp_dir: temp_dir1,
|
|
||||||
file1,
|
|
||||||
current_block: block_hl_node_0,
|
|
||||||
future_block_hl_node: block_hl_node_1,
|
|
||||||
future_block_fallback: block_fallback_1,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> {
|
|
||||||
let hierarchy = setup_block_source_hierarchy().await?;
|
|
||||||
let BlockSourceHierarchy {
|
|
||||||
block_source,
|
|
||||||
current_block,
|
|
||||||
future_block_hl_node,
|
|
||||||
mut file1,
|
|
||||||
..
|
|
||||||
} = hierarchy;
|
|
||||||
|
|
||||||
let block = block_source.collect_block(1000000).await.unwrap();
|
|
||||||
assert_eq!(block, current_block.1);
|
|
||||||
|
|
||||||
let block = block_source.collect_block(1000001).await;
|
|
||||||
assert!(block.is_err());
|
|
||||||
|
|
||||||
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?;
|
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
|
||||||
|
|
||||||
let block = block_source.collect_block(1000001).await.unwrap();
|
|
||||||
assert_eq!(block, future_block_hl_node.1);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_update_last_fetch_fallback() -> eyre::Result<()> {
|
|
||||||
let hierarchy = setup_block_source_hierarchy().await?;
|
|
||||||
let BlockSourceHierarchy {
|
|
||||||
block_source,
|
|
||||||
current_block,
|
|
||||||
future_block_fallback,
|
|
||||||
mut file1,
|
|
||||||
..
|
|
||||||
} = hierarchy;
|
|
||||||
|
|
||||||
let block = block_source.collect_block(1000000).await.unwrap();
|
|
||||||
assert_eq!(block, current_block.1);
|
|
||||||
|
|
||||||
tokio::time::sleep(HlNodeBlockSource::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs())
|
|
||||||
.await;
|
|
||||||
|
|
||||||
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?;
|
|
||||||
let block = block_source.collect_block(1000001).await.unwrap();
|
|
||||||
assert_eq!(block, future_block_fallback.1);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
51
src/pseudo_peer/sources/hl_node/cache.rs
Normal file
51
src/pseudo_peer/sources/hl_node/cache.rs
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
use super::scan::ScanResult;
|
||||||
|
use crate::node::types::{BlockAndReceipts, EvmBlock};
|
||||||
|
use rangemap::RangeInclusiveMap;
|
||||||
|
use reth_network::cache::LruMap;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct LocalBlocksCache {
|
||||||
|
cache: LruMap<u64, BlockAndReceipts>,
|
||||||
|
ranges: RangeInclusiveMap<u64, PathBuf>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LocalBlocksCache {
|
||||||
|
pub fn new(cache_size: u32) -> Self {
|
||||||
|
Self { cache: LruMap::new(cache_size), ranges: RangeInclusiveMap::new() }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn load_scan_result(&mut self, scan_result: ScanResult) {
|
||||||
|
for blk in scan_result.new_blocks {
|
||||||
|
let EvmBlock::Reth115(b) = &blk.block;
|
||||||
|
self.cache.insert(b.header.header.number, blk);
|
||||||
|
}
|
||||||
|
for range in scan_result.new_block_ranges {
|
||||||
|
self.ranges.insert(range, scan_result.path.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_block(&mut self, height: u64) -> Option<BlockAndReceipts> {
|
||||||
|
self.cache.remove(&height)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_path_for_height(&self, height: u64) -> Option<PathBuf> {
|
||||||
|
self.ranges.get(&height).cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn log_range_summary(&self, root: &Path) {
|
||||||
|
if self.ranges.is_empty() {
|
||||||
|
warn!("No ranges found in {:?}", root);
|
||||||
|
} else {
|
||||||
|
let (min, max) =
|
||||||
|
(self.ranges.first_range_value().unwrap(), self.ranges.last_range_value().unwrap());
|
||||||
|
info!(
|
||||||
|
"Populated {} ranges (min: {}, max: {})",
|
||||||
|
self.ranges.len(),
|
||||||
|
min.0.start(),
|
||||||
|
max.0.end()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
67
src/pseudo_peer/sources/hl_node/file_ops.rs
Normal file
67
src/pseudo_peer/sources/hl_node/file_ops.rs
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
use super::{scan::Scanner, time_utils::TimeUtils, HOURLY_SUBDIR};
|
||||||
|
use crate::node::types::BlockAndReceipts;
|
||||||
|
use std::{
|
||||||
|
fs::File,
|
||||||
|
io::{Read, Seek, SeekFrom},
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct FileOperations;
|
||||||
|
|
||||||
|
impl FileOperations {
|
||||||
|
pub fn all_hourly_files(root: &Path) -> Option<Vec<PathBuf>> {
|
||||||
|
let mut files = Vec::new();
|
||||||
|
for entry in std::fs::read_dir(root.join(HOURLY_SUBDIR)).ok()? {
|
||||||
|
let dir = entry.ok()?.path();
|
||||||
|
if let Ok(subentries) = std::fs::read_dir(&dir) {
|
||||||
|
files.extend(
|
||||||
|
subentries
|
||||||
|
.filter_map(|f| f.ok().map(|f| f.path()))
|
||||||
|
.filter(|p| TimeUtils::datetime_from_path(p).is_some()),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
files.sort();
|
||||||
|
Some(files)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn find_latest_hourly_file(root: &Path) -> Option<PathBuf> {
|
||||||
|
Self::all_hourly_files(root)?.into_iter().last()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn read_last_block_from_file(path: &Path) -> Option<(BlockAndReceipts, u64)> {
|
||||||
|
let mut file = File::open(path).ok()?;
|
||||||
|
Self::read_last_complete_line(&mut file)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> Option<(BlockAndReceipts, u64)> {
|
||||||
|
const CHUNK_SIZE: u64 = 50000;
|
||||||
|
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize);
|
||||||
|
let mut pos = read.seek(SeekFrom::End(0)).unwrap();
|
||||||
|
let mut last_line = Vec::new();
|
||||||
|
|
||||||
|
while pos > 0 {
|
||||||
|
let read_size = pos.min(CHUNK_SIZE);
|
||||||
|
buf.resize(read_size as usize, 0);
|
||||||
|
read.seek(SeekFrom::Start(pos - read_size)).unwrap();
|
||||||
|
read.read_exact(&mut buf).unwrap();
|
||||||
|
last_line = [buf.clone(), last_line].concat();
|
||||||
|
if last_line.ends_with(b"\n") {
|
||||||
|
last_line.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') {
|
||||||
|
let candidate = &last_line[idx + 1..];
|
||||||
|
if let Ok(result) = Scanner::line_to_evm_block(str::from_utf8(candidate).unwrap()) {
|
||||||
|
return Some(result);
|
||||||
|
}
|
||||||
|
last_line.truncate(idx);
|
||||||
|
}
|
||||||
|
if pos < read_size {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pos -= read_size;
|
||||||
|
}
|
||||||
|
Scanner::line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok()
|
||||||
|
}
|
||||||
|
}
|
||||||
227
src/pseudo_peer/sources/hl_node/mod.rs
Normal file
227
src/pseudo_peer/sources/hl_node/mod.rs
Normal file
@ -0,0 +1,227 @@
|
|||||||
|
mod cache;
|
||||||
|
mod file_ops;
|
||||||
|
mod scan;
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests;
|
||||||
|
mod time_utils;
|
||||||
|
|
||||||
|
use self::{
|
||||||
|
cache::LocalBlocksCache,
|
||||||
|
file_ops::FileOperations,
|
||||||
|
scan::{ScanOptions, Scanner},
|
||||||
|
time_utils::TimeUtils,
|
||||||
|
};
|
||||||
|
use super::{BlockSource, BlockSourceBoxed};
|
||||||
|
use crate::node::types::BlockAndReceipts;
|
||||||
|
use futures::future::BoxFuture;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::{
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
use time::{Duration, OffsetDateTime};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use tracing::{info, warn};
|
||||||
|
|
||||||
|
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
|
||||||
|
const HOURLY_SUBDIR: &str = "hourly";
|
||||||
|
const CACHE_SIZE: u32 = 8000; // 3660 blocks per hour
|
||||||
|
const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000);
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub(crate) struct LocalBlockAndReceipts(String, BlockAndReceipts);
|
||||||
|
|
||||||
|
/// Block source that monitors the local ingest directory for the HL node.
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct HlNodeBlockSource {
|
||||||
|
pub fallback: BlockSourceBoxed,
|
||||||
|
pub local_ingest_dir: PathBuf,
|
||||||
|
pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
|
||||||
|
pub last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BlockSource for HlNodeBlockSource {
|
||||||
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||||
|
let fallback = self.fallback.clone();
|
||||||
|
let local_blocks_cache = self.local_blocks_cache.clone();
|
||||||
|
let last_local_fetch = self.last_local_fetch.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
let now = OffsetDateTime::now_utc();
|
||||||
|
|
||||||
|
if let Some(block) = Self::try_collect_local_block(local_blocks_cache, height).await {
|
||||||
|
Self::update_last_fetch(last_local_fetch, height, now).await;
|
||||||
|
return Ok(block);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some((last_height, last_poll_time)) = *last_local_fetch.lock().await {
|
||||||
|
let more_recent = last_height < height;
|
||||||
|
let too_soon = now - last_poll_time < MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK;
|
||||||
|
if more_recent && too_soon {
|
||||||
|
return Err(eyre::eyre!(
|
||||||
|
"Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up"
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let block = fallback.collect_block(height).await?;
|
||||||
|
Self::update_last_fetch(last_local_fetch, height, now).await;
|
||||||
|
Ok(block)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
||||||
|
let fallback = self.fallback.clone();
|
||||||
|
let local_ingest_dir = self.local_ingest_dir.clone();
|
||||||
|
Box::pin(async move {
|
||||||
|
let Some(dir) = FileOperations::find_latest_hourly_file(&local_ingest_dir) else {
|
||||||
|
warn!(
|
||||||
|
"No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir",
|
||||||
|
local_ingest_dir
|
||||||
|
);
|
||||||
|
return fallback.find_latest_block_number().await;
|
||||||
|
};
|
||||||
|
|
||||||
|
match FileOperations::read_last_block_from_file(&dir) {
|
||||||
|
Some((_, height)) => {
|
||||||
|
info!("Latest block number: {} with path {}", height, dir.display());
|
||||||
|
Some(height)
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
warn!(
|
||||||
|
"Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir",
|
||||||
|
dir
|
||||||
|
);
|
||||||
|
fallback.find_latest_block_number().await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn recommended_chunk_size(&self) -> u64 {
|
||||||
|
self.fallback.recommended_chunk_size()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HlNodeBlockSource {
|
||||||
|
async fn update_last_fetch(
|
||||||
|
last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
|
||||||
|
height: u64,
|
||||||
|
now: OffsetDateTime,
|
||||||
|
) {
|
||||||
|
let mut last_fetch = last_local_fetch.lock().await;
|
||||||
|
if last_fetch.is_none_or(|(h, _)| h < height) {
|
||||||
|
*last_fetch = Some((height, now));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn try_collect_local_block(
|
||||||
|
local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
|
||||||
|
height: u64,
|
||||||
|
) -> Option<BlockAndReceipts> {
|
||||||
|
let mut u_cache = local_blocks_cache.lock().await;
|
||||||
|
if let Some(block) = u_cache.get_block(height) {
|
||||||
|
return Some(block);
|
||||||
|
}
|
||||||
|
let path = u_cache.get_path_for_height(height)?;
|
||||||
|
info!("Loading block data from {:?}", path);
|
||||||
|
let scan_result = Scanner::scan_hour_file(
|
||||||
|
&path,
|
||||||
|
&mut 0,
|
||||||
|
ScanOptions { start_height: 0, only_load_ranges: false },
|
||||||
|
);
|
||||||
|
u_cache.load_scan_result(scan_result);
|
||||||
|
u_cache.get_block(height)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn try_backfill_local_blocks(
|
||||||
|
root: &Path,
|
||||||
|
cache: &Arc<Mutex<LocalBlocksCache>>,
|
||||||
|
cutoff_height: u64,
|
||||||
|
) -> eyre::Result<()> {
|
||||||
|
let mut u_cache = cache.lock().await;
|
||||||
|
for subfile in FileOperations::all_hourly_files(root).unwrap_or_default() {
|
||||||
|
if let Some((_, height)) = FileOperations::read_last_block_from_file(&subfile) {
|
||||||
|
if height < cutoff_height {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
warn!("Failed to parse last line of file: {:?}", subfile);
|
||||||
|
}
|
||||||
|
let mut scan_result = Scanner::scan_hour_file(
|
||||||
|
&subfile,
|
||||||
|
&mut 0,
|
||||||
|
ScanOptions { start_height: cutoff_height, only_load_ranges: true },
|
||||||
|
);
|
||||||
|
scan_result.new_blocks.clear(); // Only store ranges, load data lazily
|
||||||
|
u_cache.load_scan_result(scan_result);
|
||||||
|
}
|
||||||
|
u_cache.log_range_summary(root);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn start_local_ingest_loop(&self, current_head: u64) {
|
||||||
|
let root = self.local_ingest_dir.to_owned();
|
||||||
|
let cache = self.local_blocks_cache.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut next_height = current_head;
|
||||||
|
let mut dt = loop {
|
||||||
|
if let Some(f) = FileOperations::find_latest_hourly_file(&root) {
|
||||||
|
break TimeUtils::datetime_from_path(&f).unwrap();
|
||||||
|
}
|
||||||
|
tokio::time::sleep(TAIL_INTERVAL).await;
|
||||||
|
};
|
||||||
|
let (mut hour, mut day_str, mut last_line) =
|
||||||
|
(dt.hour(), TimeUtils::date_from_datetime(dt), 0);
|
||||||
|
info!("Starting local ingest loop from height: {}", current_head);
|
||||||
|
loop {
|
||||||
|
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
|
||||||
|
if hour_file.exists() {
|
||||||
|
let scan_result = Scanner::scan_hour_file(
|
||||||
|
&hour_file,
|
||||||
|
&mut last_line,
|
||||||
|
ScanOptions { start_height: next_height, only_load_ranges: false },
|
||||||
|
);
|
||||||
|
next_height = scan_result.next_expected_height;
|
||||||
|
cache.lock().await.load_scan_result(scan_result);
|
||||||
|
}
|
||||||
|
let now = OffsetDateTime::now_utc();
|
||||||
|
if dt + Duration::HOUR < now {
|
||||||
|
dt += Duration::HOUR;
|
||||||
|
(hour, day_str, last_line) = (dt.hour(), TimeUtils::date_from_datetime(dt), 0);
|
||||||
|
info!(
|
||||||
|
"Moving to new file: {:?}",
|
||||||
|
root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"))
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
tokio::time::sleep(TAIL_INTERVAL).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> {
|
||||||
|
let _ = Self::try_backfill_local_blocks(
|
||||||
|
&self.local_ingest_dir,
|
||||||
|
&self.local_blocks_cache,
|
||||||
|
next_block_number,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
self.start_local_ingest_loop(next_block_number).await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn new(
|
||||||
|
fallback: BlockSourceBoxed,
|
||||||
|
local_ingest_dir: PathBuf,
|
||||||
|
next_block_number: u64,
|
||||||
|
) -> Self {
|
||||||
|
let block_source = Self {
|
||||||
|
fallback,
|
||||||
|
local_ingest_dir,
|
||||||
|
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))),
|
||||||
|
last_local_fetch: Arc::new(Mutex::new(None)),
|
||||||
|
};
|
||||||
|
block_source.run(next_block_number).await.unwrap();
|
||||||
|
block_source
|
||||||
|
}
|
||||||
|
}
|
||||||
90
src/pseudo_peer/sources/hl_node/scan.rs
Normal file
90
src/pseudo_peer/sources/hl_node/scan.rs
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
use crate::node::types::{BlockAndReceipts, EvmBlock};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::{
|
||||||
|
fs::File,
|
||||||
|
io::{BufRead, BufReader},
|
||||||
|
ops::RangeInclusive,
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
};
|
||||||
|
use tracing::warn;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||||
|
pub struct LocalBlockAndReceipts(pub String, pub BlockAndReceipts);
|
||||||
|
|
||||||
|
pub struct ScanResult {
|
||||||
|
pub path: PathBuf,
|
||||||
|
pub next_expected_height: u64,
|
||||||
|
pub new_blocks: Vec<BlockAndReceipts>,
|
||||||
|
pub new_block_ranges: Vec<RangeInclusive<u64>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ScanOptions {
|
||||||
|
pub start_height: u64,
|
||||||
|
pub only_load_ranges: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Scanner;
|
||||||
|
|
||||||
|
impl Scanner {
|
||||||
|
pub fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
|
||||||
|
let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts =
|
||||||
|
serde_json::from_str(line)?;
|
||||||
|
let height = match &parsed_block.block {
|
||||||
|
EvmBlock::Reth115(b) => b.header.header.number,
|
||||||
|
};
|
||||||
|
Ok((parsed_block, height))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult {
|
||||||
|
let lines: Vec<String> =
|
||||||
|
BufReader::new(File::open(path).expect("Failed to open hour file"))
|
||||||
|
.lines()
|
||||||
|
.collect::<Result<_, _>>()
|
||||||
|
.unwrap();
|
||||||
|
let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
|
||||||
|
let mut new_blocks = Vec::new();
|
||||||
|
let mut last_height = options.start_height;
|
||||||
|
let mut block_ranges = Vec::new();
|
||||||
|
let mut current_range: Option<(u64, u64)> = None;
|
||||||
|
|
||||||
|
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
|
||||||
|
if line_idx < *last_line || line.trim().is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
match Self::line_to_evm_block(line) {
|
||||||
|
Ok((parsed_block, height)) if height >= options.start_height => {
|
||||||
|
last_height = last_height.max(height);
|
||||||
|
if !options.only_load_ranges {
|
||||||
|
new_blocks.push(parsed_block);
|
||||||
|
}
|
||||||
|
*last_line = line_idx;
|
||||||
|
|
||||||
|
match current_range {
|
||||||
|
Some((start, end)) if end + 1 == height => {
|
||||||
|
current_range = Some((start, height))
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
if let Some((start, end)) = current_range.take() {
|
||||||
|
block_ranges.push(start..=end);
|
||||||
|
}
|
||||||
|
current_range = Some((height, height));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(_) => {}
|
||||||
|
Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some((start, end)) = current_range {
|
||||||
|
block_ranges.push(start..=end);
|
||||||
|
}
|
||||||
|
ScanResult {
|
||||||
|
path: path.to_path_buf(),
|
||||||
|
next_expected_height: last_height + 1,
|
||||||
|
new_blocks,
|
||||||
|
new_block_ranges: block_ranges,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
187
src/pseudo_peer/sources/hl_node/tests.rs
Normal file
187
src/pseudo_peer/sources/hl_node/tests.rs
Normal file
@ -0,0 +1,187 @@
|
|||||||
|
use super::*;
|
||||||
|
use crate::{
|
||||||
|
node::types::{reth_compat, ReadPrecompileCalls},
|
||||||
|
pseudo_peer::sources::LocalBlockSource,
|
||||||
|
};
|
||||||
|
use alloy_consensus::{BlockBody, Header};
|
||||||
|
use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256};
|
||||||
|
use std::{io::Write, time::Duration as StdDuration};
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_datetime_from_path() {
|
||||||
|
let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4");
|
||||||
|
let dt = TimeUtils::datetime_from_path(path).unwrap();
|
||||||
|
println!("{dt:?}");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_backfill() {
|
||||||
|
let test_path = Path::new("/root/evm_block_and_receipts");
|
||||||
|
if !test_path.exists() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let cache = Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE)));
|
||||||
|
HlNodeBlockSource::try_backfill_local_blocks(test_path, &cache, 1000000).await.unwrap();
|
||||||
|
|
||||||
|
let u_cache = cache.lock().await;
|
||||||
|
assert_eq!(
|
||||||
|
u_cache.get_path_for_height(9735058),
|
||||||
|
Some(test_path.join(HOURLY_SUBDIR).join("20250729").join("22"))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn scan_result_from_single_block(block: BlockAndReceipts) -> scan::ScanResult {
|
||||||
|
use crate::node::types::EvmBlock;
|
||||||
|
let height = match &block.block {
|
||||||
|
EvmBlock::Reth115(b) => b.header.header.number,
|
||||||
|
};
|
||||||
|
scan::ScanResult {
|
||||||
|
path: PathBuf::from("/nonexistent-block"),
|
||||||
|
next_expected_height: height + 1,
|
||||||
|
new_blocks: vec![block],
|
||||||
|
new_block_ranges: vec![height..=height],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn empty_block(number: u64, timestamp: u64, extra_data: &'static [u8]) -> LocalBlockAndReceipts {
|
||||||
|
use crate::node::types::EvmBlock;
|
||||||
|
LocalBlockAndReceipts(
|
||||||
|
timestamp.to_string(),
|
||||||
|
BlockAndReceipts {
|
||||||
|
block: EvmBlock::Reth115(reth_compat::SealedBlock {
|
||||||
|
header: reth_compat::SealedHeader {
|
||||||
|
header: Header {
|
||||||
|
parent_hash: B256::ZERO,
|
||||||
|
ommers_hash: B256::ZERO,
|
||||||
|
beneficiary: Address::ZERO,
|
||||||
|
state_root: B256::ZERO,
|
||||||
|
transactions_root: B256::ZERO,
|
||||||
|
receipts_root: B256::ZERO,
|
||||||
|
logs_bloom: Bloom::ZERO,
|
||||||
|
difficulty: U256::ZERO,
|
||||||
|
number,
|
||||||
|
gas_limit: 0,
|
||||||
|
gas_used: 0,
|
||||||
|
timestamp,
|
||||||
|
extra_data: Bytes::from_static(extra_data),
|
||||||
|
mix_hash: B256::ZERO,
|
||||||
|
nonce: B64::ZERO,
|
||||||
|
base_fee_per_gas: None,
|
||||||
|
withdrawals_root: None,
|
||||||
|
blob_gas_used: None,
|
||||||
|
excess_blob_gas: None,
|
||||||
|
parent_beacon_block_root: None,
|
||||||
|
requests_hash: None,
|
||||||
|
},
|
||||||
|
hash: B256::ZERO,
|
||||||
|
},
|
||||||
|
body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None },
|
||||||
|
}),
|
||||||
|
receipts: vec![],
|
||||||
|
system_txs: vec![],
|
||||||
|
read_precompile_calls: ReadPrecompileCalls(vec![]),
|
||||||
|
highest_precompile_address: None,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, std::fs::File)> {
|
||||||
|
let now = OffsetDateTime::now_utc();
|
||||||
|
let temp_dir = tempfile::tempdir()?;
|
||||||
|
let path = temp_dir
|
||||||
|
.path()
|
||||||
|
.join(HOURLY_SUBDIR)
|
||||||
|
.join(TimeUtils::date_from_datetime(now))
|
||||||
|
.join(format!("{}", now.hour()));
|
||||||
|
std::fs::create_dir_all(path.parent().unwrap())?;
|
||||||
|
Ok((temp_dir, std::fs::File::create(path)?))
|
||||||
|
}
|
||||||
|
|
||||||
|
struct BlockSourceHierarchy {
|
||||||
|
block_source: HlNodeBlockSource,
|
||||||
|
_temp_dir: tempfile::TempDir,
|
||||||
|
file1: std::fs::File,
|
||||||
|
current_block: LocalBlockAndReceipts,
|
||||||
|
future_block_hl_node: LocalBlockAndReceipts,
|
||||||
|
future_block_fallback: LocalBlockAndReceipts,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn setup_block_source_hierarchy() -> eyre::Result<BlockSourceHierarchy> {
|
||||||
|
// Setup fallback block source
|
||||||
|
let block_source_fallback = HlNodeBlockSource::new(
|
||||||
|
BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))),
|
||||||
|
PathBuf::from("/nonexistent"),
|
||||||
|
1000000,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
let block_hl_node_0 = empty_block(1000000, 1722633600, b"hl-node");
|
||||||
|
let block_hl_node_1 = empty_block(1000001, 1722633600, b"hl-node");
|
||||||
|
let block_fallback_1 = empty_block(1000001, 1722633600, b"fallback");
|
||||||
|
|
||||||
|
let (temp_dir1, mut file1) = setup_temp_dir_and_file()?;
|
||||||
|
writeln!(&mut file1, "{}", serde_json::to_string(&block_hl_node_0)?)?;
|
||||||
|
|
||||||
|
let block_source = HlNodeBlockSource::new(
|
||||||
|
BlockSourceBoxed::new(Box::new(block_source_fallback.clone())),
|
||||||
|
temp_dir1.path().to_path_buf(),
|
||||||
|
1000000,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
block_source_fallback
|
||||||
|
.local_blocks_cache
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.load_scan_result(scan_result_from_single_block(block_fallback_1.1.clone()));
|
||||||
|
|
||||||
|
Ok(BlockSourceHierarchy {
|
||||||
|
block_source,
|
||||||
|
_temp_dir: temp_dir1,
|
||||||
|
file1,
|
||||||
|
current_block: block_hl_node_0,
|
||||||
|
future_block_hl_node: block_hl_node_1,
|
||||||
|
future_block_fallback: block_fallback_1,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> {
|
||||||
|
let hierarchy = setup_block_source_hierarchy().await?;
|
||||||
|
let BlockSourceHierarchy {
|
||||||
|
block_source, current_block, future_block_hl_node, mut file1, ..
|
||||||
|
} = hierarchy;
|
||||||
|
|
||||||
|
let block = block_source.collect_block(1000000).await.unwrap();
|
||||||
|
assert_eq!(block, current_block.1);
|
||||||
|
|
||||||
|
let block = block_source.collect_block(1000001).await;
|
||||||
|
assert!(block.is_err());
|
||||||
|
|
||||||
|
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?;
|
||||||
|
tokio::time::sleep(StdDuration::from_millis(100)).await;
|
||||||
|
|
||||||
|
let block = block_source.collect_block(1000001).await.unwrap();
|
||||||
|
assert_eq!(block, future_block_hl_node.1);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_update_last_fetch_fallback() -> eyre::Result<()> {
|
||||||
|
let hierarchy = setup_block_source_hierarchy().await?;
|
||||||
|
let BlockSourceHierarchy {
|
||||||
|
block_source, current_block, future_block_fallback, mut file1, ..
|
||||||
|
} = hierarchy;
|
||||||
|
|
||||||
|
let block = block_source.collect_block(1000000).await.unwrap();
|
||||||
|
assert_eq!(block, current_block.1);
|
||||||
|
|
||||||
|
tokio::time::sleep(MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs()).await;
|
||||||
|
|
||||||
|
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?;
|
||||||
|
let block = block_source.collect_block(1000001).await.unwrap();
|
||||||
|
assert_eq!(block, future_block_fallback.1);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
19
src/pseudo_peer/sources/hl_node/time_utils.rs
Normal file
19
src/pseudo_peer/sources/hl_node/time_utils.rs
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
use std::path::Path;
|
||||||
|
use time::{macros::format_description, Date, OffsetDateTime, Time};
|
||||||
|
|
||||||
|
pub struct TimeUtils;
|
||||||
|
|
||||||
|
impl TimeUtils {
|
||||||
|
pub fn datetime_from_path(path: &Path) -> Option<OffsetDateTime> {
|
||||||
|
let (dt_part, hour_part) =
|
||||||
|
(path.parent()?.file_name()?.to_str()?, path.file_name()?.to_str()?);
|
||||||
|
Some(OffsetDateTime::new_utc(
|
||||||
|
Date::parse(dt_part, &format_description!("[year][month][day]")).ok()?,
|
||||||
|
Time::from_hms(hour_part.parse().ok()?, 0, 0).ok()?,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn date_from_datetime(dt: OffsetDateTime) -> String {
|
||||||
|
dt.format(&format_description!("[year][month][day]")).unwrap()
|
||||||
|
}
|
||||||
|
}
|
||||||
64
src/pseudo_peer/sources/local.rs
Normal file
64
src/pseudo_peer/sources/local.rs
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
use super::{utils, BlockSource};
|
||||||
|
use crate::node::types::BlockAndReceipts;
|
||||||
|
use eyre::Context;
|
||||||
|
use futures::{future::BoxFuture, FutureExt};
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
/// Block source that reads blocks from local filesystem (--ingest-dir)
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct LocalBlockSource {
|
||||||
|
dir: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LocalBlockSource {
|
||||||
|
pub fn new(dir: impl Into<PathBuf>) -> Self {
|
||||||
|
Self { dir: dir.into() }
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn pick_path_with_highest_number(dir: PathBuf, is_dir: bool) -> Option<(u64, String)> {
|
||||||
|
let files = std::fs::read_dir(&dir).unwrap().collect::<Vec<_>>();
|
||||||
|
let files = files
|
||||||
|
.into_iter()
|
||||||
|
.filter(|path| path.as_ref().unwrap().path().is_dir() == is_dir)
|
||||||
|
.map(|entry| entry.unwrap().path().to_string_lossy().to_string())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
utils::name_with_largest_number(&files, is_dir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BlockSource for LocalBlockSource {
|
||||||
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||||
|
let dir = self.dir.clone();
|
||||||
|
async move {
|
||||||
|
let path = dir.join(utils::rmp_path(height));
|
||||||
|
let file = tokio::fs::read(&path)
|
||||||
|
.await
|
||||||
|
.wrap_err_with(|| format!("Failed to read block from {path:?}"))?;
|
||||||
|
let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]);
|
||||||
|
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
|
||||||
|
Ok(blocks[0].clone())
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
||||||
|
let dir = self.dir.clone();
|
||||||
|
async move {
|
||||||
|
let (_, first_level) = Self::pick_path_with_highest_number(dir.clone(), true).await?;
|
||||||
|
let (_, second_level) =
|
||||||
|
Self::pick_path_with_highest_number(dir.join(first_level), true).await?;
|
||||||
|
let (block_number, third_level) =
|
||||||
|
Self::pick_path_with_highest_number(dir.join(second_level), false).await?;
|
||||||
|
|
||||||
|
info!("Latest block number: {} with path {}", block_number, third_level);
|
||||||
|
Some(block_number)
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn recommended_chunk_size(&self) -> u64 {
|
||||||
|
1000
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,206 +1,35 @@
|
|||||||
use crate::node::types::BlockAndReceipts;
|
use crate::node::types::BlockAndReceipts;
|
||||||
use aws_sdk_s3::types::RequestPayer;
|
use futures::future::BoxFuture;
|
||||||
use eyre::Context;
|
use std::sync::Arc;
|
||||||
use futures::{future::BoxFuture, FutureExt};
|
|
||||||
use reth_network::cache::LruMap;
|
|
||||||
use std::{
|
|
||||||
path::PathBuf,
|
|
||||||
sync::{Arc, RwLock},
|
|
||||||
};
|
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
|
// Module declarations
|
||||||
|
mod cached;
|
||||||
mod hl_node;
|
mod hl_node;
|
||||||
pub use hl_node::HlNodeBlockSource;
|
mod local;
|
||||||
|
mod s3;
|
||||||
|
mod utils;
|
||||||
|
|
||||||
|
// Public exports
|
||||||
|
pub use cached::CachedBlockSource;
|
||||||
|
pub use hl_node::HlNodeBlockSource;
|
||||||
|
pub use local::LocalBlockSource;
|
||||||
|
pub use s3::S3BlockSource;
|
||||||
|
|
||||||
|
/// Trait for block sources that can retrieve blocks from various sources
|
||||||
pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static {
|
pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static {
|
||||||
|
/// Retrieves a block at the specified height
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>>;
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>>;
|
||||||
|
|
||||||
|
/// Finds the latest block number available from this source
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>>;
|
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>>;
|
||||||
|
|
||||||
|
/// Returns the recommended chunk size for batch operations
|
||||||
fn recommended_chunk_size(&self) -> u64;
|
fn recommended_chunk_size(&self) -> u64;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Type alias for a boxed block source
|
||||||
pub type BlockSourceBoxed = Arc<Box<dyn BlockSource>>;
|
pub type BlockSourceBoxed = Arc<Box<dyn BlockSource>>;
|
||||||
|
|
||||||
fn name_with_largest_number(files: &[String], is_dir: bool) -> Option<(u64, String)> {
|
|
||||||
let mut files = files
|
|
||||||
.iter()
|
|
||||||
.filter_map(|file_raw| {
|
|
||||||
let file = file_raw.strip_suffix("/").unwrap_or(file_raw);
|
|
||||||
let file = file.split("/").last().unwrap();
|
|
||||||
let stem = if is_dir { file } else { file.strip_suffix(".rmp.lz4")? };
|
|
||||||
stem.parse::<u64>().ok().map(|number| (number, file_raw.to_string()))
|
|
||||||
})
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
if files.is_empty() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
files.sort_by_key(|(number, _)| *number);
|
|
||||||
files.last().cloned()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct S3BlockSource {
|
|
||||||
client: aws_sdk_s3::Client,
|
|
||||||
bucket: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl S3BlockSource {
|
|
||||||
pub fn new(client: aws_sdk_s3::Client, bucket: String) -> Self {
|
|
||||||
Self { client, bucket }
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn pick_path_with_highest_number(
|
|
||||||
client: aws_sdk_s3::Client,
|
|
||||||
bucket: String,
|
|
||||||
dir: String,
|
|
||||||
is_dir: bool,
|
|
||||||
) -> Option<(u64, String)> {
|
|
||||||
let request = client
|
|
||||||
.list_objects()
|
|
||||||
.bucket(&bucket)
|
|
||||||
.prefix(dir)
|
|
||||||
.delimiter("/")
|
|
||||||
.request_payer(RequestPayer::Requester);
|
|
||||||
let response = request.send().await.ok()?;
|
|
||||||
let files: Vec<String> = if is_dir {
|
|
||||||
response
|
|
||||||
.common_prefixes
|
|
||||||
.unwrap()
|
|
||||||
.iter()
|
|
||||||
.map(|object| object.prefix.as_ref().unwrap().to_string())
|
|
||||||
.collect()
|
|
||||||
} else {
|
|
||||||
response
|
|
||||||
.contents
|
|
||||||
.unwrap()
|
|
||||||
.iter()
|
|
||||||
.map(|object| object.key.as_ref().unwrap().to_string())
|
|
||||||
.collect()
|
|
||||||
};
|
|
||||||
name_with_largest_number(&files, is_dir)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BlockSource for S3BlockSource {
|
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
|
||||||
let client = self.client.clone();
|
|
||||||
let bucket = self.bucket.clone();
|
|
||||||
async move {
|
|
||||||
let path = rmp_path(height);
|
|
||||||
let request = client
|
|
||||||
.get_object()
|
|
||||||
.request_payer(RequestPayer::Requester)
|
|
||||||
.bucket(&bucket)
|
|
||||||
.key(path);
|
|
||||||
let response = request.send().await?;
|
|
||||||
let bytes = response.body.collect().await?.into_bytes();
|
|
||||||
let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[..]);
|
|
||||||
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
|
|
||||||
Ok(blocks[0].clone())
|
|
||||||
}
|
|
||||||
.boxed()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
|
||||||
let client = self.client.clone();
|
|
||||||
let bucket = self.bucket.clone();
|
|
||||||
async move {
|
|
||||||
let (_, first_level) = Self::pick_path_with_highest_number(
|
|
||||||
client.clone(),
|
|
||||||
bucket.clone(),
|
|
||||||
"".to_string(),
|
|
||||||
true,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
let (_, second_level) = Self::pick_path_with_highest_number(
|
|
||||||
client.clone(),
|
|
||||||
bucket.clone(),
|
|
||||||
first_level,
|
|
||||||
true,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
let (block_number, third_level) = Self::pick_path_with_highest_number(
|
|
||||||
client.clone(),
|
|
||||||
bucket.clone(),
|
|
||||||
second_level,
|
|
||||||
false,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
info!("Latest block number: {} with path {}", block_number, third_level);
|
|
||||||
Some(block_number)
|
|
||||||
}
|
|
||||||
.boxed()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn recommended_chunk_size(&self) -> u64 {
|
|
||||||
1000
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BlockSource for LocalBlockSource {
|
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
|
||||||
let dir = self.dir.clone();
|
|
||||||
async move {
|
|
||||||
let path = dir.join(rmp_path(height));
|
|
||||||
let file = tokio::fs::read(&path)
|
|
||||||
.await
|
|
||||||
.wrap_err_with(|| format!("Failed to read block from {path:?}"))?;
|
|
||||||
let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]);
|
|
||||||
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
|
|
||||||
Ok(blocks[0].clone())
|
|
||||||
}
|
|
||||||
.boxed()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
|
||||||
let dir = self.dir.clone();
|
|
||||||
async move {
|
|
||||||
let (_, first_level) = Self::pick_path_with_highest_number(dir.clone(), true).await?;
|
|
||||||
let (_, second_level) =
|
|
||||||
Self::pick_path_with_highest_number(dir.join(first_level), true).await?;
|
|
||||||
let (block_number, third_level) =
|
|
||||||
Self::pick_path_with_highest_number(dir.join(second_level), false).await?;
|
|
||||||
|
|
||||||
info!("Latest block number: {} with path {}", block_number, third_level);
|
|
||||||
Some(block_number)
|
|
||||||
}
|
|
||||||
.boxed()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn recommended_chunk_size(&self) -> u64 {
|
|
||||||
1000
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct LocalBlockSource {
|
|
||||||
dir: PathBuf,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LocalBlockSource {
|
|
||||||
pub fn new(dir: impl Into<PathBuf>) -> Self {
|
|
||||||
Self { dir: dir.into() }
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn pick_path_with_highest_number(dir: PathBuf, is_dir: bool) -> Option<(u64, String)> {
|
|
||||||
let files = std::fs::read_dir(&dir).unwrap().collect::<Vec<_>>();
|
|
||||||
let files = files
|
|
||||||
.into_iter()
|
|
||||||
.filter(|path| path.as_ref().unwrap().path().is_dir() == is_dir)
|
|
||||||
.map(|entry| entry.unwrap().path().to_string_lossy().to_string())
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
name_with_largest_number(&files, is_dir)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn rmp_path(height: u64) -> String {
|
|
||||||
let f = ((height - 1) / 1_000_000) * 1_000_000;
|
|
||||||
let s = ((height - 1) / 1_000) * 1_000;
|
|
||||||
let path = format!("{f}/{s}/{height}.rmp.lz4");
|
|
||||||
path
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BlockSource for BlockSourceBoxed {
|
impl BlockSource for BlockSourceBoxed {
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||||
self.as_ref().collect_block(height)
|
self.as_ref().collect_block(height)
|
||||||
@ -214,40 +43,3 @@ impl BlockSource for BlockSourceBoxed {
|
|||||||
self.as_ref().recommended_chunk_size()
|
self.as_ref().recommended_chunk_size()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct CachedBlockSource {
|
|
||||||
block_source: BlockSourceBoxed,
|
|
||||||
cache: Arc<RwLock<LruMap<u64, BlockAndReceipts>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl CachedBlockSource {
|
|
||||||
const CACHE_LIMIT: u32 = 100000;
|
|
||||||
pub fn new(block_source: BlockSourceBoxed) -> Self {
|
|
||||||
Self { block_source, cache: Arc::new(RwLock::new(LruMap::new(Self::CACHE_LIMIT))) }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BlockSource for CachedBlockSource {
|
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
|
||||||
let block_source = self.block_source.clone();
|
|
||||||
let cache = self.cache.clone();
|
|
||||||
async move {
|
|
||||||
if let Some(block) = cache.write().unwrap().get(&height) {
|
|
||||||
return Ok(block.clone());
|
|
||||||
}
|
|
||||||
let block = block_source.collect_block(height).await?;
|
|
||||||
cache.write().unwrap().insert(height, block.clone());
|
|
||||||
Ok(block)
|
|
||||||
}
|
|
||||||
.boxed()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
|
||||||
self.block_source.find_latest_block_number()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn recommended_chunk_size(&self) -> u64 {
|
|
||||||
self.block_source.recommended_chunk_size()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
105
src/pseudo_peer/sources/s3.rs
Normal file
105
src/pseudo_peer/sources/s3.rs
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
use super::{utils, BlockSource};
|
||||||
|
use crate::node::types::BlockAndReceipts;
|
||||||
|
use aws_sdk_s3::types::RequestPayer;
|
||||||
|
use futures::{future::BoxFuture, FutureExt};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
/// Block source that reads blocks from S3 (--s3)
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct S3BlockSource {
|
||||||
|
client: Arc<aws_sdk_s3::Client>,
|
||||||
|
bucket: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl S3BlockSource {
|
||||||
|
pub fn new(client: aws_sdk_s3::Client, bucket: String) -> Self {
|
||||||
|
Self { client: client.into(), bucket }
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn pick_path_with_highest_number(
|
||||||
|
client: &aws_sdk_s3::Client,
|
||||||
|
bucket: &str,
|
||||||
|
dir: &str,
|
||||||
|
is_dir: bool,
|
||||||
|
) -> Option<(u64, String)> {
|
||||||
|
let request = client
|
||||||
|
.list_objects()
|
||||||
|
.bucket(bucket)
|
||||||
|
.prefix(dir)
|
||||||
|
.delimiter("/")
|
||||||
|
.request_payer(RequestPayer::Requester);
|
||||||
|
let response = request.send().await.ok()?;
|
||||||
|
let files: Vec<String> = if is_dir {
|
||||||
|
response
|
||||||
|
.common_prefixes?
|
||||||
|
.iter()
|
||||||
|
.map(|object| object.prefix.as_ref().unwrap().to_string())
|
||||||
|
.collect()
|
||||||
|
} else {
|
||||||
|
response
|
||||||
|
.contents?
|
||||||
|
.iter()
|
||||||
|
.map(|object| object.key.as_ref().unwrap().to_string())
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
utils::name_with_largest_number(&files, is_dir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BlockSource for S3BlockSource {
|
||||||
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||||
|
let client = self.client.clone();
|
||||||
|
let bucket = self.bucket.clone();
|
||||||
|
async move {
|
||||||
|
let path = utils::rmp_path(height);
|
||||||
|
let request = client
|
||||||
|
.get_object()
|
||||||
|
.request_payer(RequestPayer::Requester)
|
||||||
|
.bucket(&bucket)
|
||||||
|
.key(path);
|
||||||
|
let response = request.send().await?;
|
||||||
|
let bytes = response.body.collect().await?.into_bytes();
|
||||||
|
let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[..]);
|
||||||
|
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
|
||||||
|
Ok(blocks[0].clone())
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
||||||
|
let client = self.client.clone();
|
||||||
|
let bucket = self.bucket.clone();
|
||||||
|
async move {
|
||||||
|
let (_, first_level) = Self::pick_path_with_highest_number(
|
||||||
|
&client,
|
||||||
|
&bucket,
|
||||||
|
"",
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let (_, second_level) = Self::pick_path_with_highest_number(
|
||||||
|
&client,
|
||||||
|
&bucket,
|
||||||
|
&first_level,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
let (block_number, third_level) = Self::pick_path_with_highest_number(
|
||||||
|
&client,
|
||||||
|
&bucket,
|
||||||
|
&second_level,
|
||||||
|
false,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
info!("Latest block number: {} with path {}", block_number, third_level);
|
||||||
|
Some(block_number)
|
||||||
|
}
|
||||||
|
.boxed()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn recommended_chunk_size(&self) -> u64 {
|
||||||
|
1000
|
||||||
|
}
|
||||||
|
}
|
||||||
26
src/pseudo_peer/sources/utils.rs
Normal file
26
src/pseudo_peer/sources/utils.rs
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
/// Shared utilities for block sources
|
||||||
|
|
||||||
|
/// Finds the file/directory with the largest number in its name from a list of files
|
||||||
|
pub fn name_with_largest_number(files: &[String], is_dir: bool) -> Option<(u64, String)> {
|
||||||
|
let mut files = files
|
||||||
|
.iter()
|
||||||
|
.filter_map(|file_raw| {
|
||||||
|
let file = file_raw.strip_suffix("/").unwrap_or(file_raw);
|
||||||
|
let file = file.split("/").last().unwrap();
|
||||||
|
let stem = if is_dir { file } else { file.strip_suffix(".rmp.lz4")? };
|
||||||
|
stem.parse::<u64>().ok().map(|number| (number, file_raw.to_string()))
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
if files.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
files.sort_by_key(|(number, _)| *number);
|
||||||
|
files.last().cloned()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generates the RMP file path for a given block height
|
||||||
|
pub fn rmp_path(height: u64) -> String {
|
||||||
|
let f = ((height - 1) / 1_000_000) * 1_000_000;
|
||||||
|
let s = ((height - 1) / 1_000) * 1_000;
|
||||||
|
format!("{f}/{s}/{height}.rmp.lz4")
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user