diff --git a/src/pseudo_peer/sources/cached.rs b/src/pseudo_peer/sources/cached.rs new file mode 100644 index 000000000..d8bfa5983 --- /dev/null +++ b/src/pseudo_peer/sources/cached.rs @@ -0,0 +1,44 @@ +use super::{BlockSource, BlockSourceBoxed}; +use crate::node::types::BlockAndReceipts; +use futures::{future::BoxFuture, FutureExt}; +use reth_network::cache::LruMap; +use std::sync::{Arc, RwLock}; + +/// Block source wrapper that caches blocks in memory +#[derive(Debug, Clone)] +pub struct CachedBlockSource { + block_source: BlockSourceBoxed, + cache: Arc>>, +} + +impl CachedBlockSource { + const CACHE_LIMIT: u32 = 100000; + + pub fn new(block_source: BlockSourceBoxed) -> Self { + Self { block_source, cache: Arc::new(RwLock::new(LruMap::new(Self::CACHE_LIMIT))) } + } +} + +impl BlockSource for CachedBlockSource { + fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { + let block_source = self.block_source.clone(); + let cache = self.cache.clone(); + async move { + if let Some(block) = cache.write().unwrap().get(&height) { + return Ok(block.clone()); + } + let block = block_source.collect_block(height).await?; + cache.write().unwrap().insert(height, block.clone()); + Ok(block) + } + .boxed() + } + + fn find_latest_block_number(&self) -> BoxFuture<'static, Option> { + self.block_source.find_latest_block_number() + } + + fn recommended_chunk_size(&self) -> u64 { + self.block_source.recommended_chunk_size() + } +} diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs deleted file mode 100644 index 413051c0a..000000000 --- a/src/pseudo_peer/sources/hl_node.rs +++ /dev/null @@ -1,601 +0,0 @@ -use super::{BlockSource, BlockSourceBoxed}; -use crate::node::types::{BlockAndReceipts, EvmBlock}; -use futures::future::BoxFuture; -use rangemap::RangeInclusiveMap; -use reth_network::cache::LruMap; -use serde::{Deserialize, Serialize}; -use std::{ - fs::File, - io::{BufRead, BufReader, Read, Seek, SeekFrom}, - ops::RangeInclusive, - path::{Path, PathBuf}, - sync::Arc, -}; -use time::{macros::format_description, Date, Duration, OffsetDateTime, Time}; -use tokio::sync::Mutex; -use tracing::{info, warn}; - -const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); -const HOURLY_SUBDIR: &str = "hourly"; - -#[derive(Debug)] -pub struct LocalBlocksCache { - cache: LruMap, - // Lightweight range map to track the ranges of blocks in the local ingest directory - ranges: RangeInclusiveMap, -} - -impl LocalBlocksCache { - const CACHE_SIZE: u32 = 8000; // 3660 blocks per hour - - fn new() -> Self { - Self { cache: LruMap::new(Self::CACHE_SIZE), ranges: RangeInclusiveMap::new() } - } - - fn load_scan_result(&mut self, scan_result: ScanResult) { - for blk in scan_result.new_blocks { - let EvmBlock::Reth115(b) = &blk.block; - self.cache.insert(b.header.header.number, blk); - } - for range in scan_result.new_block_ranges { - self.ranges.insert(range, scan_result.path.clone()); - } - } -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -struct LocalBlockAndReceipts(String, BlockAndReceipts); - -struct ScanResult { - path: PathBuf, - next_expected_height: u64, - new_blocks: Vec, - new_block_ranges: Vec>, -} - -struct ScanOptions { - start_height: u64, - only_load_ranges: bool, -} - -fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { - let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts = serde_json::from_str(line)?; - let height = match &parsed_block.block { - EvmBlock::Reth115(b) => b.header.header.number, - }; - Ok((parsed_block, height)) -} - -fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult { - let lines: Vec = BufReader::new(File::open(path).expect("Failed to open hour file")) - .lines() - .collect::>() - .unwrap(); - let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; - let mut new_blocks = Vec::new(); - let mut last_height = options.start_height; - let mut block_ranges = Vec::new(); - let mut current_range: Option<(u64, u64)> = None; - - for (line_idx, line) in lines.iter().enumerate().skip(skip) { - if line_idx < *last_line || line.trim().is_empty() { - continue; - } - - match line_to_evm_block(line) { - Ok((parsed_block, height)) if height >= options.start_height => { - last_height = last_height.max(height); - if !options.only_load_ranges { - new_blocks.push(parsed_block); - } - *last_line = line_idx; - - match current_range { - Some((start, end)) if end + 1 == height => { - current_range = Some((start, height)) - } - _ => { - if let Some((start, end)) = current_range.take() { - block_ranges.push(start..=end); - } - current_range = Some((height, height)); - } - } - } - Ok(_) => {} - Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)), - } - } - - if let Some((start, end)) = current_range { - block_ranges.push(start..=end); - } - ScanResult { - path: path.to_path_buf(), - next_expected_height: last_height + 1, - new_blocks, - new_block_ranges: block_ranges, - } -} - -fn date_from_datetime(dt: OffsetDateTime) -> String { - dt.format(&format_description!("[year][month][day]")).unwrap() -} - -/// Block source that monitors the local ingest directory for the HL node. -#[derive(Debug, Clone)] -pub struct HlNodeBlockSource { - pub fallback: BlockSourceBoxed, - pub local_ingest_dir: PathBuf, - pub local_blocks_cache: Arc>, // height → block - // for rate limiting requests to fallback - pub last_local_fetch: Arc>>, -} - -impl BlockSource for HlNodeBlockSource { - fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { - let fallback = self.fallback.clone(); - let local_blocks_cache = self.local_blocks_cache.clone(); - let last_local_fetch = self.last_local_fetch.clone(); - Box::pin(async move { - let now = OffsetDateTime::now_utc(); - - if let Some(block) = Self::try_collect_local_block(local_blocks_cache, height).await { - Self::update_last_fetch(last_local_fetch, height, now).await; - return Ok(block); - } - - if let Some((last_height, last_poll_time)) = *last_local_fetch.lock().await { - let more_recent = last_height < height; - let too_soon = now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK; - if more_recent && too_soon { - return Err(eyre::eyre!( - "Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up" - )); - } - } - - let block = fallback.collect_block(height).await?; - Self::update_last_fetch(last_local_fetch, height, now).await; - Ok(block) - }) - } - - fn find_latest_block_number(&self) -> BoxFuture<'static, Option> { - let fallback = self.fallback.clone(); - let local_ingest_dir = self.local_ingest_dir.clone(); - Box::pin(async move { - let Some(dir) = Self::find_latest_hourly_file(&local_ingest_dir) else { - warn!( - "No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir", - local_ingest_dir - ); - return fallback.find_latest_block_number().await; - }; - - let mut file = File::open(&dir).expect("Failed to open hour file path"); - if let Some((_, height)) = read_last_complete_line(&mut file) { - info!("Latest block number: {} with path {}", height, dir.display()); - Some(height) - } else { - warn!( - "Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir", - file - ); - fallback.find_latest_block_number().await - } - }) - } - - fn recommended_chunk_size(&self) -> u64 { - self.fallback.recommended_chunk_size() - } -} - -fn read_last_complete_line(read: &mut R) -> Option<(BlockAndReceipts, u64)> { - const CHUNK_SIZE: u64 = 50000; - let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); - let mut pos = read.seek(SeekFrom::End(0)).unwrap(); - let mut last_line = Vec::new(); - - while pos > 0 { - let read_size = pos.min(CHUNK_SIZE); - buf.resize(read_size as usize, 0); - read.seek(SeekFrom::Start(pos - read_size)).unwrap(); - read.read_exact(&mut buf).unwrap(); - last_line = [buf.clone(), last_line].concat(); - if last_line.ends_with(b"\n") { - last_line.pop(); - } - - if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { - let candidate = &last_line[idx + 1..]; - if let Ok(result) = line_to_evm_block(str::from_utf8(candidate).unwrap()) { - return Some(result); - } - last_line.truncate(idx); - } - if pos < read_size { - break; - } - pos -= read_size; - } - line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok() -} - -impl HlNodeBlockSource { - /// [HlNodeBlockSource] picks the faster one between local ingest directory and s3/ingest-dir. - /// But if we immediately fallback to s3/ingest-dir, in case of S3, it may cause unnecessary - /// requests to S3 while it'll return 404. - /// - /// To avoid unnecessary fallback, we set a short threshold period. - /// This threshold is several times longer than the expected block time, reducing redundant - /// fallback attempts. - pub(crate) const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000); - - async fn update_last_fetch( - last_local_fetch: Arc>>, - height: u64, - now: OffsetDateTime, - ) { - let mut last_fetch = last_local_fetch.lock().await; - if last_fetch.is_none_or(|(h, _)| h < height) { - *last_fetch = Some((height, now)); - } - } - - async fn try_collect_local_block( - local_blocks_cache: Arc>, - height: u64, - ) -> Option { - let mut u_cache = local_blocks_cache.lock().await; - if let Some(block) = u_cache.cache.remove(&height) { - return Some(block); - } - let path = u_cache.ranges.get(&height).cloned()?; - info!("Loading block data from {:?}", path); - u_cache.load_scan_result(scan_hour_file( - &path, - &mut 0, - ScanOptions { start_height: 0, only_load_ranges: false }, - )); - u_cache.cache.get(&height).cloned() - } - - fn datetime_from_path(path: &Path) -> Option { - let (dt_part, hour_part) = - (path.parent()?.file_name()?.to_str()?, path.file_name()?.to_str()?); - Some(OffsetDateTime::new_utc( - Date::parse(dt_part, &format_description!("[year][month][day]")).ok()?, - Time::from_hms(hour_part.parse().ok()?, 0, 0).ok()?, - )) - } - - fn all_hourly_files(root: &Path) -> Option> { - let mut files = Vec::new(); - for entry in std::fs::read_dir(root.join(HOURLY_SUBDIR)).ok()? { - let dir = entry.ok()?.path(); - if let Ok(subentries) = std::fs::read_dir(&dir) { - files.extend( - subentries - .filter_map(|f| f.ok().map(|f| f.path())) - .filter(|p| Self::datetime_from_path(p).is_some()), - ); - } - } - files.sort(); - Some(files) - } - - fn find_latest_hourly_file(root: &Path) -> Option { - Self::all_hourly_files(root)?.into_iter().last() - } - - async fn try_backfill_local_blocks( - root: &Path, - cache: &Arc>, - cutoff_height: u64, - ) -> eyre::Result<()> { - let mut u_cache = cache.lock().await; - for subfile in Self::all_hourly_files(root).unwrap_or_default() { - let mut file = File::open(&subfile).expect("Failed to open hour file"); - if let Some((_, height)) = read_last_complete_line(&mut file) { - if height < cutoff_height { - continue; - } - } else { - warn!("Failed to parse last line of file: {:?}", subfile); - } - let mut scan_result = scan_hour_file( - &subfile, - &mut 0, - ScanOptions { start_height: cutoff_height, only_load_ranges: true }, - ); - scan_result.new_blocks.clear(); // Only store ranges, load data lazily - u_cache.load_scan_result(scan_result); - } - if u_cache.ranges.is_empty() { - warn!("No ranges found in {:?}", root); - } else { - let (min, max) = ( - u_cache.ranges.first_range_value().unwrap(), - u_cache.ranges.last_range_value().unwrap(), - ); - info!( - "Populated {} ranges (min: {}, max: {})", - u_cache.ranges.len(), - min.0.start(), - max.0.end() - ); - } - Ok(()) - } - - async fn start_local_ingest_loop(&self, current_head: u64) { - let root = self.local_ingest_dir.to_owned(); - let cache = self.local_blocks_cache.clone(); - tokio::spawn(async move { - let mut next_height = current_head; - let mut dt = loop { - if let Some(f) = Self::find_latest_hourly_file(&root) { - break Self::datetime_from_path(&f).unwrap(); - } - tokio::time::sleep(TAIL_INTERVAL).await; - }; - let (mut hour, mut day_str, mut last_line) = (dt.hour(), date_from_datetime(dt), 0); - info!("Starting local ingest loop from height: {}", current_head); - loop { - let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); - if hour_file.exists() { - let scan_result = scan_hour_file( - &hour_file, - &mut last_line, - ScanOptions { start_height: next_height, only_load_ranges: false }, - ); - next_height = scan_result.next_expected_height; - cache.lock().await.load_scan_result(scan_result); - } - let now = OffsetDateTime::now_utc(); - if dt + Duration::HOUR < now { - dt += Duration::HOUR; - (hour, day_str, last_line) = (dt.hour(), date_from_datetime(dt), 0); - info!( - "Moving to new file: {:?}", - root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")) - ); - continue; - } - tokio::time::sleep(TAIL_INTERVAL).await; - } - }); - } - - pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> { - let _ = Self::try_backfill_local_blocks( - &self.local_ingest_dir, - &self.local_blocks_cache, - next_block_number, - ) - .await; - self.start_local_ingest_loop(next_block_number).await; - Ok(()) - } - - pub async fn new( - fallback: BlockSourceBoxed, - local_ingest_dir: PathBuf, - next_block_number: u64, - ) -> Self { - let block_source = Self { - fallback, - local_ingest_dir, - local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())), - last_local_fetch: Arc::new(Mutex::new(None)), - }; - block_source.run(next_block_number).await.unwrap(); - block_source - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - node::types::{reth_compat, ReadPrecompileCalls}, - pseudo_peer::sources::LocalBlockSource, - }; - use alloy_consensus::{BlockBody, Header}; - use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256}; - use std::{io::Write, time::Duration}; - - #[test] - fn test_datetime_from_path() { - let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4"); - let dt = HlNodeBlockSource::datetime_from_path(path).unwrap(); - println!("{dt:?}"); - } - - #[tokio::test] - async fn test_backfill() { - let test_path = Path::new("/root/evm_block_and_receipts"); - if !test_path.exists() { - return; - } - - let cache = Arc::new(Mutex::new(LocalBlocksCache::new())); - HlNodeBlockSource::try_backfill_local_blocks(test_path, &cache, 1000000).await.unwrap(); - - let u_cache = cache.lock().await; - println!("{:?}", u_cache.ranges); - assert_eq!( - u_cache.ranges.get(&9735058), - Some(&test_path.join(HOURLY_SUBDIR).join("20250729").join("22")) - ); - } - - fn scan_result_from_single_block(block: BlockAndReceipts) -> ScanResult { - let height = match &block.block { - EvmBlock::Reth115(b) => b.header.header.number, - }; - ScanResult { - path: PathBuf::from("/nonexistent-block"), - next_expected_height: height + 1, - new_blocks: vec![block], - new_block_ranges: vec![height..=height], - } - } - - fn empty_block( - number: u64, - timestamp: u64, - extra_data: &'static [u8], - ) -> LocalBlockAndReceipts { - LocalBlockAndReceipts( - timestamp.to_string(), - BlockAndReceipts { - block: EvmBlock::Reth115(reth_compat::SealedBlock { - header: reth_compat::SealedHeader { - header: Header { - parent_hash: B256::ZERO, - ommers_hash: B256::ZERO, - beneficiary: Address::ZERO, - state_root: B256::ZERO, - transactions_root: B256::ZERO, - receipts_root: B256::ZERO, - logs_bloom: Bloom::ZERO, - difficulty: U256::ZERO, - number, - gas_limit: 0, - gas_used: 0, - timestamp, - extra_data: Bytes::from_static(extra_data), - mix_hash: B256::ZERO, - nonce: B64::ZERO, - base_fee_per_gas: None, - withdrawals_root: None, - blob_gas_used: None, - excess_blob_gas: None, - parent_beacon_block_root: None, - requests_hash: None, - }, - hash: B256::ZERO, - }, - body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None }, - }), - receipts: vec![], - system_txs: vec![], - read_precompile_calls: ReadPrecompileCalls(vec![]), - highest_precompile_address: None, - }, - ) - } - - fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, File)> { - let now = OffsetDateTime::now_utc(); - let temp_dir = tempfile::tempdir()?; - let path = temp_dir - .path() - .join(HOURLY_SUBDIR) - .join(date_from_datetime(now)) - .join(format!("{}", now.hour())); - std::fs::create_dir_all(path.parent().unwrap())?; - Ok((temp_dir, File::create(path)?)) - } - - struct BlockSourceHierarchy { - block_source: HlNodeBlockSource, - _temp_dir: tempfile::TempDir, - file1: File, - current_block: LocalBlockAndReceipts, - future_block_hl_node: LocalBlockAndReceipts, - future_block_fallback: LocalBlockAndReceipts, - } - - async fn setup_block_source_hierarchy() -> eyre::Result { - // Setup fallback block source - let block_source_fallback = HlNodeBlockSource::new( - BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))), - PathBuf::from("/nonexistent"), - 1000000, - ) - .await; - let block_hl_node_0 = empty_block(1000000, 1722633600, b"hl-node"); - let block_hl_node_1 = empty_block(1000001, 1722633600, b"hl-node"); - let block_fallback_1 = empty_block(1000001, 1722633600, b"fallback"); - - let (temp_dir1, mut file1) = setup_temp_dir_and_file()?; - writeln!(&mut file1, "{}", serde_json::to_string(&block_hl_node_0)?)?; - - let block_source = HlNodeBlockSource::new( - BlockSourceBoxed::new(Box::new(block_source_fallback.clone())), - temp_dir1.path().to_path_buf(), - 1000000, - ) - .await; - - block_source_fallback - .local_blocks_cache - .lock() - .await - .load_scan_result(scan_result_from_single_block(block_fallback_1.1.clone())); - - Ok(BlockSourceHierarchy { - block_source, - _temp_dir: temp_dir1, - file1, - current_block: block_hl_node_0, - future_block_hl_node: block_hl_node_1, - future_block_fallback: block_fallback_1, - }) - } - - #[tokio::test] - async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> { - let hierarchy = setup_block_source_hierarchy().await?; - let BlockSourceHierarchy { - block_source, - current_block, - future_block_hl_node, - mut file1, - .. - } = hierarchy; - - let block = block_source.collect_block(1000000).await.unwrap(); - assert_eq!(block, current_block.1); - - let block = block_source.collect_block(1000001).await; - assert!(block.is_err()); - - writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?; - tokio::time::sleep(Duration::from_millis(100)).await; - - let block = block_source.collect_block(1000001).await.unwrap(); - assert_eq!(block, future_block_hl_node.1); - - Ok(()) - } - - #[tokio::test] - async fn test_update_last_fetch_fallback() -> eyre::Result<()> { - let hierarchy = setup_block_source_hierarchy().await?; - let BlockSourceHierarchy { - block_source, - current_block, - future_block_fallback, - mut file1, - .. - } = hierarchy; - - let block = block_source.collect_block(1000000).await.unwrap(); - assert_eq!(block, current_block.1); - - tokio::time::sleep(HlNodeBlockSource::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs()) - .await; - - writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?; - let block = block_source.collect_block(1000001).await.unwrap(); - assert_eq!(block, future_block_fallback.1); - - Ok(()) - } -} diff --git a/src/pseudo_peer/sources/hl_node/cache.rs b/src/pseudo_peer/sources/hl_node/cache.rs new file mode 100644 index 000000000..a484db3b1 --- /dev/null +++ b/src/pseudo_peer/sources/hl_node/cache.rs @@ -0,0 +1,51 @@ +use super::scan::ScanResult; +use crate::node::types::{BlockAndReceipts, EvmBlock}; +use rangemap::RangeInclusiveMap; +use reth_network::cache::LruMap; +use std::path::{Path, PathBuf}; +use tracing::{info, warn}; + +#[derive(Debug)] +pub struct LocalBlocksCache { + cache: LruMap, + ranges: RangeInclusiveMap, +} + +impl LocalBlocksCache { + pub fn new(cache_size: u32) -> Self { + Self { cache: LruMap::new(cache_size), ranges: RangeInclusiveMap::new() } + } + + pub fn load_scan_result(&mut self, scan_result: ScanResult) { + for blk in scan_result.new_blocks { + let EvmBlock::Reth115(b) = &blk.block; + self.cache.insert(b.header.header.number, blk); + } + for range in scan_result.new_block_ranges { + self.ranges.insert(range, scan_result.path.clone()); + } + } + + pub fn get_block(&mut self, height: u64) -> Option { + self.cache.remove(&height) + } + + pub fn get_path_for_height(&self, height: u64) -> Option { + self.ranges.get(&height).cloned() + } + + pub fn log_range_summary(&self, root: &Path) { + if self.ranges.is_empty() { + warn!("No ranges found in {:?}", root); + } else { + let (min, max) = + (self.ranges.first_range_value().unwrap(), self.ranges.last_range_value().unwrap()); + info!( + "Populated {} ranges (min: {}, max: {})", + self.ranges.len(), + min.0.start(), + max.0.end() + ); + } + } +} diff --git a/src/pseudo_peer/sources/hl_node/file_ops.rs b/src/pseudo_peer/sources/hl_node/file_ops.rs new file mode 100644 index 000000000..07852670d --- /dev/null +++ b/src/pseudo_peer/sources/hl_node/file_ops.rs @@ -0,0 +1,67 @@ +use super::{scan::Scanner, time_utils::TimeUtils, HOURLY_SUBDIR}; +use crate::node::types::BlockAndReceipts; +use std::{ + fs::File, + io::{Read, Seek, SeekFrom}, + path::{Path, PathBuf}, +}; + +pub struct FileOperations; + +impl FileOperations { + pub fn all_hourly_files(root: &Path) -> Option> { + let mut files = Vec::new(); + for entry in std::fs::read_dir(root.join(HOURLY_SUBDIR)).ok()? { + let dir = entry.ok()?.path(); + if let Ok(subentries) = std::fs::read_dir(&dir) { + files.extend( + subentries + .filter_map(|f| f.ok().map(|f| f.path())) + .filter(|p| TimeUtils::datetime_from_path(p).is_some()), + ); + } + } + files.sort(); + Some(files) + } + + pub fn find_latest_hourly_file(root: &Path) -> Option { + Self::all_hourly_files(root)?.into_iter().last() + } + + pub fn read_last_block_from_file(path: &Path) -> Option<(BlockAndReceipts, u64)> { + let mut file = File::open(path).ok()?; + Self::read_last_complete_line(&mut file) + } + + fn read_last_complete_line(read: &mut R) -> Option<(BlockAndReceipts, u64)> { + const CHUNK_SIZE: u64 = 50000; + let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); + let mut pos = read.seek(SeekFrom::End(0)).unwrap(); + let mut last_line = Vec::new(); + + while pos > 0 { + let read_size = pos.min(CHUNK_SIZE); + buf.resize(read_size as usize, 0); + read.seek(SeekFrom::Start(pos - read_size)).unwrap(); + read.read_exact(&mut buf).unwrap(); + last_line = [buf.clone(), last_line].concat(); + if last_line.ends_with(b"\n") { + last_line.pop(); + } + + if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { + let candidate = &last_line[idx + 1..]; + if let Ok(result) = Scanner::line_to_evm_block(str::from_utf8(candidate).unwrap()) { + return Some(result); + } + last_line.truncate(idx); + } + if pos < read_size { + break; + } + pos -= read_size; + } + Scanner::line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok() + } +} diff --git a/src/pseudo_peer/sources/hl_node/mod.rs b/src/pseudo_peer/sources/hl_node/mod.rs new file mode 100644 index 000000000..c02cc9c79 --- /dev/null +++ b/src/pseudo_peer/sources/hl_node/mod.rs @@ -0,0 +1,227 @@ +mod cache; +mod file_ops; +mod scan; +#[cfg(test)] +mod tests; +mod time_utils; + +use self::{ + cache::LocalBlocksCache, + file_ops::FileOperations, + scan::{ScanOptions, Scanner}, + time_utils::TimeUtils, +}; +use super::{BlockSource, BlockSourceBoxed}; +use crate::node::types::BlockAndReceipts; +use futures::future::BoxFuture; +use serde::{Deserialize, Serialize}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; +use time::{Duration, OffsetDateTime}; +use tokio::sync::Mutex; +use tracing::{info, warn}; + +const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25); +const HOURLY_SUBDIR: &str = "hourly"; +const CACHE_SIZE: u32 = 8000; // 3660 blocks per hour +const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000); + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub(crate) struct LocalBlockAndReceipts(String, BlockAndReceipts); + +/// Block source that monitors the local ingest directory for the HL node. +#[derive(Debug, Clone)] +pub struct HlNodeBlockSource { + pub fallback: BlockSourceBoxed, + pub local_ingest_dir: PathBuf, + pub local_blocks_cache: Arc>, + pub last_local_fetch: Arc>>, +} + +impl BlockSource for HlNodeBlockSource { + fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { + let fallback = self.fallback.clone(); + let local_blocks_cache = self.local_blocks_cache.clone(); + let last_local_fetch = self.last_local_fetch.clone(); + Box::pin(async move { + let now = OffsetDateTime::now_utc(); + + if let Some(block) = Self::try_collect_local_block(local_blocks_cache, height).await { + Self::update_last_fetch(last_local_fetch, height, now).await; + return Ok(block); + } + + if let Some((last_height, last_poll_time)) = *last_local_fetch.lock().await { + let more_recent = last_height < height; + let too_soon = now - last_poll_time < MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK; + if more_recent && too_soon { + return Err(eyre::eyre!( + "Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up" + )); + } + } + + let block = fallback.collect_block(height).await?; + Self::update_last_fetch(last_local_fetch, height, now).await; + Ok(block) + }) + } + + fn find_latest_block_number(&self) -> BoxFuture<'static, Option> { + let fallback = self.fallback.clone(); + let local_ingest_dir = self.local_ingest_dir.clone(); + Box::pin(async move { + let Some(dir) = FileOperations::find_latest_hourly_file(&local_ingest_dir) else { + warn!( + "No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir", + local_ingest_dir + ); + return fallback.find_latest_block_number().await; + }; + + match FileOperations::read_last_block_from_file(&dir) { + Some((_, height)) => { + info!("Latest block number: {} with path {}", height, dir.display()); + Some(height) + } + None => { + warn!( + "Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir", + dir + ); + fallback.find_latest_block_number().await + } + } + }) + } + + fn recommended_chunk_size(&self) -> u64 { + self.fallback.recommended_chunk_size() + } +} + +impl HlNodeBlockSource { + async fn update_last_fetch( + last_local_fetch: Arc>>, + height: u64, + now: OffsetDateTime, + ) { + let mut last_fetch = last_local_fetch.lock().await; + if last_fetch.is_none_or(|(h, _)| h < height) { + *last_fetch = Some((height, now)); + } + } + + async fn try_collect_local_block( + local_blocks_cache: Arc>, + height: u64, + ) -> Option { + let mut u_cache = local_blocks_cache.lock().await; + if let Some(block) = u_cache.get_block(height) { + return Some(block); + } + let path = u_cache.get_path_for_height(height)?; + info!("Loading block data from {:?}", path); + let scan_result = Scanner::scan_hour_file( + &path, + &mut 0, + ScanOptions { start_height: 0, only_load_ranges: false }, + ); + u_cache.load_scan_result(scan_result); + u_cache.get_block(height) + } + + async fn try_backfill_local_blocks( + root: &Path, + cache: &Arc>, + cutoff_height: u64, + ) -> eyre::Result<()> { + let mut u_cache = cache.lock().await; + for subfile in FileOperations::all_hourly_files(root).unwrap_or_default() { + if let Some((_, height)) = FileOperations::read_last_block_from_file(&subfile) { + if height < cutoff_height { + continue; + } + } else { + warn!("Failed to parse last line of file: {:?}", subfile); + } + let mut scan_result = Scanner::scan_hour_file( + &subfile, + &mut 0, + ScanOptions { start_height: cutoff_height, only_load_ranges: true }, + ); + scan_result.new_blocks.clear(); // Only store ranges, load data lazily + u_cache.load_scan_result(scan_result); + } + u_cache.log_range_summary(root); + Ok(()) + } + + async fn start_local_ingest_loop(&self, current_head: u64) { + let root = self.local_ingest_dir.to_owned(); + let cache = self.local_blocks_cache.clone(); + tokio::spawn(async move { + let mut next_height = current_head; + let mut dt = loop { + if let Some(f) = FileOperations::find_latest_hourly_file(&root) { + break TimeUtils::datetime_from_path(&f).unwrap(); + } + tokio::time::sleep(TAIL_INTERVAL).await; + }; + let (mut hour, mut day_str, mut last_line) = + (dt.hour(), TimeUtils::date_from_datetime(dt), 0); + info!("Starting local ingest loop from height: {}", current_head); + loop { + let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); + if hour_file.exists() { + let scan_result = Scanner::scan_hour_file( + &hour_file, + &mut last_line, + ScanOptions { start_height: next_height, only_load_ranges: false }, + ); + next_height = scan_result.next_expected_height; + cache.lock().await.load_scan_result(scan_result); + } + let now = OffsetDateTime::now_utc(); + if dt + Duration::HOUR < now { + dt += Duration::HOUR; + (hour, day_str, last_line) = (dt.hour(), TimeUtils::date_from_datetime(dt), 0); + info!( + "Moving to new file: {:?}", + root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")) + ); + continue; + } + tokio::time::sleep(TAIL_INTERVAL).await; + } + }); + } + + pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> { + let _ = Self::try_backfill_local_blocks( + &self.local_ingest_dir, + &self.local_blocks_cache, + next_block_number, + ) + .await; + self.start_local_ingest_loop(next_block_number).await; + Ok(()) + } + + pub async fn new( + fallback: BlockSourceBoxed, + local_ingest_dir: PathBuf, + next_block_number: u64, + ) -> Self { + let block_source = Self { + fallback, + local_ingest_dir, + local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))), + last_local_fetch: Arc::new(Mutex::new(None)), + }; + block_source.run(next_block_number).await.unwrap(); + block_source + } +} diff --git a/src/pseudo_peer/sources/hl_node/scan.rs b/src/pseudo_peer/sources/hl_node/scan.rs new file mode 100644 index 000000000..f55d2428b --- /dev/null +++ b/src/pseudo_peer/sources/hl_node/scan.rs @@ -0,0 +1,90 @@ +use crate::node::types::{BlockAndReceipts, EvmBlock}; +use serde::{Deserialize, Serialize}; +use std::{ + fs::File, + io::{BufRead, BufReader}, + ops::RangeInclusive, + path::{Path, PathBuf}, +}; +use tracing::warn; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct LocalBlockAndReceipts(pub String, pub BlockAndReceipts); + +pub struct ScanResult { + pub path: PathBuf, + pub next_expected_height: u64, + pub new_blocks: Vec, + pub new_block_ranges: Vec>, +} + +pub struct ScanOptions { + pub start_height: u64, + pub only_load_ranges: bool, +} + +pub struct Scanner; + +impl Scanner { + pub fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { + let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts = + serde_json::from_str(line)?; + let height = match &parsed_block.block { + EvmBlock::Reth115(b) => b.header.header.number, + }; + Ok((parsed_block, height)) + } + + pub fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult { + let lines: Vec = + BufReader::new(File::open(path).expect("Failed to open hour file")) + .lines() + .collect::>() + .unwrap(); + let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; + let mut new_blocks = Vec::new(); + let mut last_height = options.start_height; + let mut block_ranges = Vec::new(); + let mut current_range: Option<(u64, u64)> = None; + + for (line_idx, line) in lines.iter().enumerate().skip(skip) { + if line_idx < *last_line || line.trim().is_empty() { + continue; + } + + match Self::line_to_evm_block(line) { + Ok((parsed_block, height)) if height >= options.start_height => { + last_height = last_height.max(height); + if !options.only_load_ranges { + new_blocks.push(parsed_block); + } + *last_line = line_idx; + + match current_range { + Some((start, end)) if end + 1 == height => { + current_range = Some((start, height)) + } + _ => { + if let Some((start, end)) = current_range.take() { + block_ranges.push(start..=end); + } + current_range = Some((height, height)); + } + } + } + Ok(_) => {} + Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)), + } + } + + if let Some((start, end)) = current_range { + block_ranges.push(start..=end); + } + ScanResult { + path: path.to_path_buf(), + next_expected_height: last_height + 1, + new_blocks, + new_block_ranges: block_ranges, + } + } +} diff --git a/src/pseudo_peer/sources/hl_node/tests.rs b/src/pseudo_peer/sources/hl_node/tests.rs new file mode 100644 index 000000000..4674ef63a --- /dev/null +++ b/src/pseudo_peer/sources/hl_node/tests.rs @@ -0,0 +1,187 @@ +use super::*; +use crate::{ + node::types::{reth_compat, ReadPrecompileCalls}, + pseudo_peer::sources::LocalBlockSource, +}; +use alloy_consensus::{BlockBody, Header}; +use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256}; +use std::{io::Write, time::Duration as StdDuration}; + +#[test] +fn test_datetime_from_path() { + let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4"); + let dt = TimeUtils::datetime_from_path(path).unwrap(); + println!("{dt:?}"); +} + +#[tokio::test] +async fn test_backfill() { + let test_path = Path::new("/root/evm_block_and_receipts"); + if !test_path.exists() { + return; + } + + let cache = Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))); + HlNodeBlockSource::try_backfill_local_blocks(test_path, &cache, 1000000).await.unwrap(); + + let u_cache = cache.lock().await; + assert_eq!( + u_cache.get_path_for_height(9735058), + Some(test_path.join(HOURLY_SUBDIR).join("20250729").join("22")) + ); +} + +fn scan_result_from_single_block(block: BlockAndReceipts) -> scan::ScanResult { + use crate::node::types::EvmBlock; + let height = match &block.block { + EvmBlock::Reth115(b) => b.header.header.number, + }; + scan::ScanResult { + path: PathBuf::from("/nonexistent-block"), + next_expected_height: height + 1, + new_blocks: vec![block], + new_block_ranges: vec![height..=height], + } +} + +fn empty_block(number: u64, timestamp: u64, extra_data: &'static [u8]) -> LocalBlockAndReceipts { + use crate::node::types::EvmBlock; + LocalBlockAndReceipts( + timestamp.to_string(), + BlockAndReceipts { + block: EvmBlock::Reth115(reth_compat::SealedBlock { + header: reth_compat::SealedHeader { + header: Header { + parent_hash: B256::ZERO, + ommers_hash: B256::ZERO, + beneficiary: Address::ZERO, + state_root: B256::ZERO, + transactions_root: B256::ZERO, + receipts_root: B256::ZERO, + logs_bloom: Bloom::ZERO, + difficulty: U256::ZERO, + number, + gas_limit: 0, + gas_used: 0, + timestamp, + extra_data: Bytes::from_static(extra_data), + mix_hash: B256::ZERO, + nonce: B64::ZERO, + base_fee_per_gas: None, + withdrawals_root: None, + blob_gas_used: None, + excess_blob_gas: None, + parent_beacon_block_root: None, + requests_hash: None, + }, + hash: B256::ZERO, + }, + body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None }, + }), + receipts: vec![], + system_txs: vec![], + read_precompile_calls: ReadPrecompileCalls(vec![]), + highest_precompile_address: None, + }, + ) +} + +fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, std::fs::File)> { + let now = OffsetDateTime::now_utc(); + let temp_dir = tempfile::tempdir()?; + let path = temp_dir + .path() + .join(HOURLY_SUBDIR) + .join(TimeUtils::date_from_datetime(now)) + .join(format!("{}", now.hour())); + std::fs::create_dir_all(path.parent().unwrap())?; + Ok((temp_dir, std::fs::File::create(path)?)) +} + +struct BlockSourceHierarchy { + block_source: HlNodeBlockSource, + _temp_dir: tempfile::TempDir, + file1: std::fs::File, + current_block: LocalBlockAndReceipts, + future_block_hl_node: LocalBlockAndReceipts, + future_block_fallback: LocalBlockAndReceipts, +} + +async fn setup_block_source_hierarchy() -> eyre::Result { + // Setup fallback block source + let block_source_fallback = HlNodeBlockSource::new( + BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))), + PathBuf::from("/nonexistent"), + 1000000, + ) + .await; + let block_hl_node_0 = empty_block(1000000, 1722633600, b"hl-node"); + let block_hl_node_1 = empty_block(1000001, 1722633600, b"hl-node"); + let block_fallback_1 = empty_block(1000001, 1722633600, b"fallback"); + + let (temp_dir1, mut file1) = setup_temp_dir_and_file()?; + writeln!(&mut file1, "{}", serde_json::to_string(&block_hl_node_0)?)?; + + let block_source = HlNodeBlockSource::new( + BlockSourceBoxed::new(Box::new(block_source_fallback.clone())), + temp_dir1.path().to_path_buf(), + 1000000, + ) + .await; + + block_source_fallback + .local_blocks_cache + .lock() + .await + .load_scan_result(scan_result_from_single_block(block_fallback_1.1.clone())); + + Ok(BlockSourceHierarchy { + block_source, + _temp_dir: temp_dir1, + file1, + current_block: block_hl_node_0, + future_block_hl_node: block_hl_node_1, + future_block_fallback: block_fallback_1, + }) +} + +#[tokio::test] +async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> { + let hierarchy = setup_block_source_hierarchy().await?; + let BlockSourceHierarchy { + block_source, current_block, future_block_hl_node, mut file1, .. + } = hierarchy; + + let block = block_source.collect_block(1000000).await.unwrap(); + assert_eq!(block, current_block.1); + + let block = block_source.collect_block(1000001).await; + assert!(block.is_err()); + + writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?; + tokio::time::sleep(StdDuration::from_millis(100)).await; + + let block = block_source.collect_block(1000001).await.unwrap(); + assert_eq!(block, future_block_hl_node.1); + + Ok(()) +} + +#[tokio::test] +async fn test_update_last_fetch_fallback() -> eyre::Result<()> { + let hierarchy = setup_block_source_hierarchy().await?; + let BlockSourceHierarchy { + block_source, current_block, future_block_fallback, mut file1, .. + } = hierarchy; + + let block = block_source.collect_block(1000000).await.unwrap(); + assert_eq!(block, current_block.1); + + tokio::time::sleep(MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs()).await; + + writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?; + let block = block_source.collect_block(1000001).await.unwrap(); + assert_eq!(block, future_block_fallback.1); + + Ok(()) +} diff --git a/src/pseudo_peer/sources/hl_node/time_utils.rs b/src/pseudo_peer/sources/hl_node/time_utils.rs new file mode 100644 index 000000000..136287b2a --- /dev/null +++ b/src/pseudo_peer/sources/hl_node/time_utils.rs @@ -0,0 +1,19 @@ +use std::path::Path; +use time::{macros::format_description, Date, OffsetDateTime, Time}; + +pub struct TimeUtils; + +impl TimeUtils { + pub fn datetime_from_path(path: &Path) -> Option { + let (dt_part, hour_part) = + (path.parent()?.file_name()?.to_str()?, path.file_name()?.to_str()?); + Some(OffsetDateTime::new_utc( + Date::parse(dt_part, &format_description!("[year][month][day]")).ok()?, + Time::from_hms(hour_part.parse().ok()?, 0, 0).ok()?, + )) + } + + pub fn date_from_datetime(dt: OffsetDateTime) -> String { + dt.format(&format_description!("[year][month][day]")).unwrap() + } +} diff --git a/src/pseudo_peer/sources/local.rs b/src/pseudo_peer/sources/local.rs new file mode 100644 index 000000000..0c66b36ce --- /dev/null +++ b/src/pseudo_peer/sources/local.rs @@ -0,0 +1,64 @@ +use super::{utils, BlockSource}; +use crate::node::types::BlockAndReceipts; +use eyre::Context; +use futures::{future::BoxFuture, FutureExt}; +use std::path::PathBuf; +use tracing::info; + +/// Block source that reads blocks from local filesystem (--ingest-dir) +#[derive(Debug, Clone)] +pub struct LocalBlockSource { + dir: PathBuf, +} + +impl LocalBlockSource { + pub fn new(dir: impl Into) -> Self { + Self { dir: dir.into() } + } + + async fn pick_path_with_highest_number(dir: PathBuf, is_dir: bool) -> Option<(u64, String)> { + let files = std::fs::read_dir(&dir).unwrap().collect::>(); + let files = files + .into_iter() + .filter(|path| path.as_ref().unwrap().path().is_dir() == is_dir) + .map(|entry| entry.unwrap().path().to_string_lossy().to_string()) + .collect::>(); + + utils::name_with_largest_number(&files, is_dir) + } +} + +impl BlockSource for LocalBlockSource { + fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { + let dir = self.dir.clone(); + async move { + let path = dir.join(utils::rmp_path(height)); + let file = tokio::fs::read(&path) + .await + .wrap_err_with(|| format!("Failed to read block from {path:?}"))?; + let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]); + let blocks: Vec = rmp_serde::from_read(&mut decoder)?; + Ok(blocks[0].clone()) + } + .boxed() + } + + fn find_latest_block_number(&self) -> BoxFuture<'static, Option> { + let dir = self.dir.clone(); + async move { + let (_, first_level) = Self::pick_path_with_highest_number(dir.clone(), true).await?; + let (_, second_level) = + Self::pick_path_with_highest_number(dir.join(first_level), true).await?; + let (block_number, third_level) = + Self::pick_path_with_highest_number(dir.join(second_level), false).await?; + + info!("Latest block number: {} with path {}", block_number, third_level); + Some(block_number) + } + .boxed() + } + + fn recommended_chunk_size(&self) -> u64 { + 1000 + } +} diff --git a/src/pseudo_peer/sources/mod.rs b/src/pseudo_peer/sources/mod.rs index b7ba1ff39..274222a1f 100644 --- a/src/pseudo_peer/sources/mod.rs +++ b/src/pseudo_peer/sources/mod.rs @@ -1,206 +1,35 @@ use crate::node::types::BlockAndReceipts; -use aws_sdk_s3::types::RequestPayer; -use eyre::Context; -use futures::{future::BoxFuture, FutureExt}; -use reth_network::cache::LruMap; -use std::{ - path::PathBuf, - sync::{Arc, RwLock}, -}; -use tracing::info; +use futures::future::BoxFuture; +use std::sync::Arc; +// Module declarations +mod cached; mod hl_node; -pub use hl_node::HlNodeBlockSource; +mod local; +mod s3; +mod utils; +// Public exports +pub use cached::CachedBlockSource; +pub use hl_node::HlNodeBlockSource; +pub use local::LocalBlockSource; +pub use s3::S3BlockSource; + +/// Trait for block sources that can retrieve blocks from various sources pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static { + /// Retrieves a block at the specified height fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result>; + + /// Finds the latest block number available from this source fn find_latest_block_number(&self) -> BoxFuture<'static, Option>; + + /// Returns the recommended chunk size for batch operations fn recommended_chunk_size(&self) -> u64; } +/// Type alias for a boxed block source pub type BlockSourceBoxed = Arc>; -fn name_with_largest_number(files: &[String], is_dir: bool) -> Option<(u64, String)> { - let mut files = files - .iter() - .filter_map(|file_raw| { - let file = file_raw.strip_suffix("/").unwrap_or(file_raw); - let file = file.split("/").last().unwrap(); - let stem = if is_dir { file } else { file.strip_suffix(".rmp.lz4")? }; - stem.parse::().ok().map(|number| (number, file_raw.to_string())) - }) - .collect::>(); - if files.is_empty() { - return None; - } - files.sort_by_key(|(number, _)| *number); - files.last().cloned() -} - -#[derive(Debug, Clone)] -pub struct S3BlockSource { - client: aws_sdk_s3::Client, - bucket: String, -} - -impl S3BlockSource { - pub fn new(client: aws_sdk_s3::Client, bucket: String) -> Self { - Self { client, bucket } - } - - async fn pick_path_with_highest_number( - client: aws_sdk_s3::Client, - bucket: String, - dir: String, - is_dir: bool, - ) -> Option<(u64, String)> { - let request = client - .list_objects() - .bucket(&bucket) - .prefix(dir) - .delimiter("/") - .request_payer(RequestPayer::Requester); - let response = request.send().await.ok()?; - let files: Vec = if is_dir { - response - .common_prefixes - .unwrap() - .iter() - .map(|object| object.prefix.as_ref().unwrap().to_string()) - .collect() - } else { - response - .contents - .unwrap() - .iter() - .map(|object| object.key.as_ref().unwrap().to_string()) - .collect() - }; - name_with_largest_number(&files, is_dir) - } -} - -impl BlockSource for S3BlockSource { - fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { - let client = self.client.clone(); - let bucket = self.bucket.clone(); - async move { - let path = rmp_path(height); - let request = client - .get_object() - .request_payer(RequestPayer::Requester) - .bucket(&bucket) - .key(path); - let response = request.send().await?; - let bytes = response.body.collect().await?.into_bytes(); - let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[..]); - let blocks: Vec = rmp_serde::from_read(&mut decoder)?; - Ok(blocks[0].clone()) - } - .boxed() - } - - fn find_latest_block_number(&self) -> BoxFuture<'static, Option> { - let client = self.client.clone(); - let bucket = self.bucket.clone(); - async move { - let (_, first_level) = Self::pick_path_with_highest_number( - client.clone(), - bucket.clone(), - "".to_string(), - true, - ) - .await?; - let (_, second_level) = Self::pick_path_with_highest_number( - client.clone(), - bucket.clone(), - first_level, - true, - ) - .await?; - let (block_number, third_level) = Self::pick_path_with_highest_number( - client.clone(), - bucket.clone(), - second_level, - false, - ) - .await?; - - info!("Latest block number: {} with path {}", block_number, third_level); - Some(block_number) - } - .boxed() - } - - fn recommended_chunk_size(&self) -> u64 { - 1000 - } -} - -impl BlockSource for LocalBlockSource { - fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { - let dir = self.dir.clone(); - async move { - let path = dir.join(rmp_path(height)); - let file = tokio::fs::read(&path) - .await - .wrap_err_with(|| format!("Failed to read block from {path:?}"))?; - let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]); - let blocks: Vec = rmp_serde::from_read(&mut decoder)?; - Ok(blocks[0].clone()) - } - .boxed() - } - - fn find_latest_block_number(&self) -> BoxFuture<'static, Option> { - let dir = self.dir.clone(); - async move { - let (_, first_level) = Self::pick_path_with_highest_number(dir.clone(), true).await?; - let (_, second_level) = - Self::pick_path_with_highest_number(dir.join(first_level), true).await?; - let (block_number, third_level) = - Self::pick_path_with_highest_number(dir.join(second_level), false).await?; - - info!("Latest block number: {} with path {}", block_number, third_level); - Some(block_number) - } - .boxed() - } - - fn recommended_chunk_size(&self) -> u64 { - 1000 - } -} - -#[derive(Debug, Clone)] -pub struct LocalBlockSource { - dir: PathBuf, -} - -impl LocalBlockSource { - pub fn new(dir: impl Into) -> Self { - Self { dir: dir.into() } - } - - async fn pick_path_with_highest_number(dir: PathBuf, is_dir: bool) -> Option<(u64, String)> { - let files = std::fs::read_dir(&dir).unwrap().collect::>(); - let files = files - .into_iter() - .filter(|path| path.as_ref().unwrap().path().is_dir() == is_dir) - .map(|entry| entry.unwrap().path().to_string_lossy().to_string()) - .collect::>(); - - name_with_largest_number(&files, is_dir) - } -} - -fn rmp_path(height: u64) -> String { - let f = ((height - 1) / 1_000_000) * 1_000_000; - let s = ((height - 1) / 1_000) * 1_000; - let path = format!("{f}/{s}/{height}.rmp.lz4"); - path -} - impl BlockSource for BlockSourceBoxed { fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { self.as_ref().collect_block(height) @@ -214,40 +43,3 @@ impl BlockSource for BlockSourceBoxed { self.as_ref().recommended_chunk_size() } } - -#[derive(Debug, Clone)] -pub struct CachedBlockSource { - block_source: BlockSourceBoxed, - cache: Arc>>, -} - -impl CachedBlockSource { - const CACHE_LIMIT: u32 = 100000; - pub fn new(block_source: BlockSourceBoxed) -> Self { - Self { block_source, cache: Arc::new(RwLock::new(LruMap::new(Self::CACHE_LIMIT))) } - } -} - -impl BlockSource for CachedBlockSource { - fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { - let block_source = self.block_source.clone(); - let cache = self.cache.clone(); - async move { - if let Some(block) = cache.write().unwrap().get(&height) { - return Ok(block.clone()); - } - let block = block_source.collect_block(height).await?; - cache.write().unwrap().insert(height, block.clone()); - Ok(block) - } - .boxed() - } - - fn find_latest_block_number(&self) -> BoxFuture<'static, Option> { - self.block_source.find_latest_block_number() - } - - fn recommended_chunk_size(&self) -> u64 { - self.block_source.recommended_chunk_size() - } -} diff --git a/src/pseudo_peer/sources/s3.rs b/src/pseudo_peer/sources/s3.rs new file mode 100644 index 000000000..1b225bba6 --- /dev/null +++ b/src/pseudo_peer/sources/s3.rs @@ -0,0 +1,105 @@ +use super::{utils, BlockSource}; +use crate::node::types::BlockAndReceipts; +use aws_sdk_s3::types::RequestPayer; +use futures::{future::BoxFuture, FutureExt}; +use std::sync::Arc; +use tracing::info; + +/// Block source that reads blocks from S3 (--s3) +#[derive(Debug, Clone)] +pub struct S3BlockSource { + client: Arc, + bucket: String, +} + +impl S3BlockSource { + pub fn new(client: aws_sdk_s3::Client, bucket: String) -> Self { + Self { client: client.into(), bucket } + } + + async fn pick_path_with_highest_number( + client: &aws_sdk_s3::Client, + bucket: &str, + dir: &str, + is_dir: bool, + ) -> Option<(u64, String)> { + let request = client + .list_objects() + .bucket(bucket) + .prefix(dir) + .delimiter("/") + .request_payer(RequestPayer::Requester); + let response = request.send().await.ok()?; + let files: Vec = if is_dir { + response + .common_prefixes? + .iter() + .map(|object| object.prefix.as_ref().unwrap().to_string()) + .collect() + } else { + response + .contents? + .iter() + .map(|object| object.key.as_ref().unwrap().to_string()) + .collect() + }; + utils::name_with_largest_number(&files, is_dir) + } +} + +impl BlockSource for S3BlockSource { + fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result> { + let client = self.client.clone(); + let bucket = self.bucket.clone(); + async move { + let path = utils::rmp_path(height); + let request = client + .get_object() + .request_payer(RequestPayer::Requester) + .bucket(&bucket) + .key(path); + let response = request.send().await?; + let bytes = response.body.collect().await?.into_bytes(); + let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[..]); + let blocks: Vec = rmp_serde::from_read(&mut decoder)?; + Ok(blocks[0].clone()) + } + .boxed() + } + + fn find_latest_block_number(&self) -> BoxFuture<'static, Option> { + let client = self.client.clone(); + let bucket = self.bucket.clone(); + async move { + let (_, first_level) = Self::pick_path_with_highest_number( + &client, + &bucket, + "", + true, + ) + .await?; + let (_, second_level) = Self::pick_path_with_highest_number( + &client, + &bucket, + &first_level, + true, + ) + .await?; + let (block_number, third_level) = Self::pick_path_with_highest_number( + &client, + &bucket, + &second_level, + false, + ) + .await?; + + info!("Latest block number: {} with path {}", block_number, third_level); + Some(block_number) + } + .boxed() + } + + fn recommended_chunk_size(&self) -> u64 { + 1000 + } +} diff --git a/src/pseudo_peer/sources/utils.rs b/src/pseudo_peer/sources/utils.rs new file mode 100644 index 000000000..17723992e --- /dev/null +++ b/src/pseudo_peer/sources/utils.rs @@ -0,0 +1,26 @@ +/// Shared utilities for block sources + +/// Finds the file/directory with the largest number in its name from a list of files +pub fn name_with_largest_number(files: &[String], is_dir: bool) -> Option<(u64, String)> { + let mut files = files + .iter() + .filter_map(|file_raw| { + let file = file_raw.strip_suffix("/").unwrap_or(file_raw); + let file = file.split("/").last().unwrap(); + let stem = if is_dir { file } else { file.strip_suffix(".rmp.lz4")? }; + stem.parse::().ok().map(|number| (number, file_raw.to_string())) + }) + .collect::>(); + if files.is_empty() { + return None; + } + files.sort_by_key(|(number, _)| *number); + files.last().cloned() +} + +/// Generates the RMP file path for a given block height +pub fn rmp_path(height: u64) -> String { + let f = ((height - 1) / 1_000_000) * 1_000_000; + let s = ((height - 1) / 1_000) * 1_000; + format!("{f}/{s}/{height}.rmp.lz4") +}