diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index 03e67ea0e..e5ce0f033 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -29,6 +29,7 @@ use tokio::sync::Mutex; use tracing::{debug, info}; use crate::serialized::{BlockAndReceipts, EvmBlock}; +use crate::share_blocks::ShareBlocks; use crate::spot_meta::erc20_contract_to_spot_token; /// Poll interval when tailing an *open* hourly file. @@ -41,6 +42,7 @@ pub(crate) struct BlockIngest { pub local_ingest_dir: Option, pub local_blocks_cache: Arc>>, // height → block pub precompiles_cache: PrecompilesCache, + pub hlfs: Option, } #[derive(Deserialize)] @@ -155,9 +157,20 @@ impl BlockIngest { if let Some(block) = self.try_collect_local_block(height).await { info!("Returning locally synced block for @ Height [{height}]"); return Some(block); - } else { - self.try_collect_s3_block(height) } + + if let Some(hlfs) = &self.hlfs { + let u_cache = self.local_blocks_cache.lock().await; + let head = u_cache.keys().next_back().copied().unwrap_or(0); + if hlfs.try_fetch_one(height, head).await.ok().flatten().is_some() { + if let Some(block) = self.try_collect_local_block(height).await { + info!("Returning HLFS-fetched block @[{height}]"); + return Some(block); + } + } + } + + self.try_collect_s3_block(height) } pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option { diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index d65506fe0..b53231b43 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -92,15 +92,23 @@ fn main() { .await?; // start HLFS (serve + peer-backed backfill) using the node's network - if ext_args.hlfs.share_blocks { - let net = handle.node.network.clone(); // returns a FullNetwork (NetworkHandle under the hood) - // keep handle alive for the whole process - let _hlfs = - share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net).await?; - } + let hlfs = if ext_args.hlfs.share_blocks { + let net = handle.node.network.clone(); + Some( + crate::share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net) + .await?, + ) + } else { + None + }; - let ingest = - BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache }; + let ingest = BlockIngest { + ingest_dir, + local_ingest_dir, + local_blocks_cache, + precompiles_cache, + hlfs, + }; ingest.run(handle.node).await.unwrap(); handle.node_exit_future.await }, diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs index f67fbee74..bd6419a4c 100644 --- a/crates/net/hlfs/src/lib.rs +++ b/crates/net/hlfs/src/lib.rs @@ -240,19 +240,21 @@ impl Backfiller { head: u64, rr_index: usize, ) -> Result, HlfsError> { - if number + self.hist_threshold > head { + if head >= self.hist_threshold && number + self.hist_threshold > head { return Ok(None); } - let path = self.root.join(format!("{number}.rlp")); + let f = ((number - 1) / 1_000_000) * 1_000_000; + let s = ((number - 1) / 1_000) * 1_000; + let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy()); if fs::try_exists(&path).await? { return Ok(None); } match self.client.get_block(number, rr_index).await { Ok(bytes) => { - let tmp = self.root.join(format!("{number}.rlp.part")); + let tmp = format!("{}/{f}/{s}/{number}.rlp.lz4.part", self.root.to_string_lossy()); fs::write(&tmp, &bytes).await?; fs::rename(&tmp, &path).await?; - info!(block=number, bytes=bytes.len(), path=%path.display(), "hlfs: wrote"); + info!(block=number, bytes=bytes.len(), path=%path, "hlfs: wrote"); Ok(Some(bytes.len())) } Err(e) => {