active block sharing

This commit is contained in:
Nicholas Wehr
2025-08-11 22:16:30 -07:00
parent 0d1c239a07
commit ec281f9cc7
3 changed files with 37 additions and 14 deletions

View File

@ -29,6 +29,7 @@ use tokio::sync::Mutex;
use tracing::{debug, info}; use tracing::{debug, info};
use crate::serialized::{BlockAndReceipts, EvmBlock}; use crate::serialized::{BlockAndReceipts, EvmBlock};
use crate::share_blocks::ShareBlocks;
use crate::spot_meta::erc20_contract_to_spot_token; use crate::spot_meta::erc20_contract_to_spot_token;
/// Poll interval when tailing an *open* hourly file. /// Poll interval when tailing an *open* hourly file.
@ -41,6 +42,7 @@ pub(crate) struct BlockIngest {
pub local_ingest_dir: Option<PathBuf>, pub local_ingest_dir: Option<PathBuf>,
pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block
pub precompiles_cache: PrecompilesCache, pub precompiles_cache: PrecompilesCache,
pub hlfs: Option<ShareBlocks>,
} }
#[derive(Deserialize)] #[derive(Deserialize)]
@ -155,9 +157,20 @@ impl BlockIngest {
if let Some(block) = self.try_collect_local_block(height).await { if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning locally synced block for @ Height [{height}]"); info!("Returning locally synced block for @ Height [{height}]");
return Some(block); return Some(block);
} else {
self.try_collect_s3_block(height)
} }
if let Some(hlfs) = &self.hlfs {
let u_cache = self.local_blocks_cache.lock().await;
let head = u_cache.keys().next_back().copied().unwrap_or(0);
if hlfs.try_fetch_one(height, head).await.ok().flatten().is_some() {
if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning HLFS-fetched block @[{height}]");
return Some(block);
}
}
}
self.try_collect_s3_block(height)
} }
pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> { pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> {

View File

@ -92,15 +92,23 @@ fn main() {
.await?; .await?;
// start HLFS (serve + peer-backed backfill) using the node's network // start HLFS (serve + peer-backed backfill) using the node's network
if ext_args.hlfs.share_blocks { let hlfs = if ext_args.hlfs.share_blocks {
let net = handle.node.network.clone(); // returns a FullNetwork (NetworkHandle under the hood) let net = handle.node.network.clone();
// keep handle alive for the whole process Some(
let _hlfs = crate::share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net)
share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net).await?; .await?,
} )
} else {
None
};
let ingest = let ingest = BlockIngest {
BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache }; ingest_dir,
local_ingest_dir,
local_blocks_cache,
precompiles_cache,
hlfs,
};
ingest.run(handle.node).await.unwrap(); ingest.run(handle.node).await.unwrap();
handle.node_exit_future.await handle.node_exit_future.await
}, },

View File

@ -240,19 +240,21 @@ impl Backfiller {
head: u64, head: u64,
rr_index: usize, rr_index: usize,
) -> Result<Option<usize>, HlfsError> { ) -> Result<Option<usize>, HlfsError> {
if number + self.hist_threshold > head { if head >= self.hist_threshold && number + self.hist_threshold > head {
return Ok(None); return Ok(None);
} }
let path = self.root.join(format!("{number}.rlp")); let f = ((number - 1) / 1_000_000) * 1_000_000;
let s = ((number - 1) / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy());
if fs::try_exists(&path).await? { if fs::try_exists(&path).await? {
return Ok(None); return Ok(None);
} }
match self.client.get_block(number, rr_index).await { match self.client.get_block(number, rr_index).await {
Ok(bytes) => { Ok(bytes) => {
let tmp = self.root.join(format!("{number}.rlp.part")); let tmp = format!("{}/{f}/{s}/{number}.rlp.lz4.part", self.root.to_string_lossy());
fs::write(&tmp, &bytes).await?; fs::write(&tmp, &bytes).await?;
fs::rename(&tmp, &path).await?; fs::rename(&tmp, &path).await?;
info!(block=number, bytes=bytes.len(), path=%path.display(), "hlfs: wrote"); info!(block=number, bytes=bytes.len(), path=%path, "hlfs: wrote");
Ok(Some(bytes.len())) Ok(Some(bytes.len()))
} }
Err(e) => { Err(e) => {