Merge pull request #37 from wwwehr/feature/hlfs-evm-blocks

Feature: Archive Node File Sharing
This commit is contained in:
sprites0
2025-08-30 00:00:08 +09:00
committed by GitHub
8 changed files with 615 additions and 5 deletions

View File

@ -35,6 +35,7 @@ reth-cli-runner.workspace = true
reth-cli-commands.workspace = true
reth-cli-util.workspace = true
reth-consensus-common.workspace = true
reth-hlfs.workspace = true
reth-rpc-builder.workspace = true
reth-rpc.workspace = true
reth-rpc-types-compat.workspace = true
@ -81,8 +82,9 @@ tracing.workspace = true
serde_json.workspace = true
# async
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread", "net", "fs"] }
futures.workspace = true
futures-util.workspace = true
# time
time = { workspace = true }

View File

@ -29,6 +29,7 @@ use tokio::sync::Mutex;
use tracing::{debug, info};
use crate::serialized::{BlockAndReceipts, EvmBlock};
use crate::share_blocks::ShareBlocks;
use crate::spot_meta::erc20_contract_to_spot_token;
/// Poll interval when tailing an *open* hourly file.
@ -41,6 +42,7 @@ pub(crate) struct BlockIngest {
pub local_ingest_dir: Option<PathBuf>,
pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block
pub precompiles_cache: PrecompilesCache,
pub hlfs: Option<ShareBlocks>,
}
#[derive(Deserialize)]
@ -155,9 +157,9 @@ impl BlockIngest {
if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning locally synced block for @ Height [{height}]");
return Some(block);
} else {
self.try_collect_s3_block(height)
}
self.try_collect_s3_block(height)
}
pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> {

View File

@ -6,6 +6,7 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
mod block_ingest;
mod call_forwarder;
mod serialized;
mod share_blocks;
mod spot_meta;
mod tx_forwarder;
@ -18,6 +19,7 @@ use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_hyperliquid_types::PrecompilesCache;
use reth_node_ethereum::EthereumNode;
use share_blocks::ShareBlocksArgs;
use tokio::sync::Mutex;
use tracing::info;
use tx_forwarder::EthForwarderApiServer;
@ -40,6 +42,10 @@ struct HyperliquidExtArgs {
/// 3. filters out logs and transactions from subscription.
#[arg(long, default_value = "false")]
pub hl_node_compliant: bool,
/// Enable hlfs to backfill archive blocks
#[command(flatten)]
pub hlfs: ShareBlocksArgs,
}
fn main() {
@ -85,8 +91,24 @@ fn main() {
.launch()
.await?;
let ingest =
BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache };
// start HLFS (serve + peer-backed backfill) using the node's network
let hlfs = if ext_args.hlfs.share_blocks {
let net = handle.node.network.clone();
Some(
crate::share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net)
.await?,
)
} else {
None
};
let ingest = BlockIngest {
ingest_dir,
local_ingest_dir,
local_blocks_cache,
precompiles_cache,
hlfs,
};
ingest.run(handle.node).await.unwrap();
handle.node_exit_future.await
},

View File

@ -0,0 +1,167 @@
use clap::Args;
use reth_hlfs::{Backfiller, Client, PeerRecord, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK};
use reth_network_api::{events::NetworkEvent, FullNetwork};
use std::{
collections::HashSet,
net::{IpAddr, SocketAddr},
path::PathBuf,
sync::Arc,
};
use tokio::{
task::JoinHandle,
time::{sleep, timeout, Duration},
};
use tracing::{debug, info, warn};
// use futures_util::StreamExt;
use futures_util::stream::StreamExt;
#[derive(Args, Clone, Debug)]
pub(crate) struct ShareBlocksArgs {
#[arg(long, default_value_t = false)]
pub share_blocks: bool,
#[arg(long, default_value = "0.0.0.0")]
pub share_blocks_host: String,
#[arg(long, default_value_t = 9595)]
pub share_blocks_port: u16,
#[arg(long, default_value = "evm-blocks")]
pub archive_dir: PathBuf,
}
pub(crate) struct ShareBlocks {
_server: JoinHandle<()>,
_autodetect: JoinHandle<()>,
}
impl ShareBlocks {
pub(crate) async fn start_with_network<Net>(
args: &ShareBlocksArgs,
network: Net,
) -> eyre::Result<Self>
where
Net: FullNetwork + Clone + 'static,
{
let host: IpAddr = args
.share_blocks_host
.parse()
.map_err(|e| eyre::eyre!("invalid --share-blocks-host: {e}"))?;
let bind: SocketAddr = (host, args.share_blocks_port).into();
let srv = Server::new(bind, &args.archive_dir).with_limits(512, 50);
let _server = tokio::spawn(async move {
if let Err(e) = srv.run().await {
warn!(error=%e, "hlfs: server exited");
}
});
let _autodetect =
spawn_autodetect(network, host, args.share_blocks_port, args.archive_dir.clone());
info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)");
Ok(Self { _server, _autodetect })
}
}
fn spawn_autodetect<Net>(
network: Net,
self_ip: IpAddr,
hlfs_port: u16,
archive_dir: PathBuf,
) -> JoinHandle<()>
where
Net: FullNetwork + Clone + 'static,
{
let client = Client::new(&archive_dir, Vec::new()).with_timeout(Duration::from_secs(5));
let backfiller = Arc::new(tokio::sync::Mutex::new(Backfiller::new(client, &archive_dir)));
let good: Arc<tokio::sync::Mutex<HashSet<PeerRecord>>> =
Arc::new(tokio::sync::Mutex::new(HashSet::new()));
tokio::spawn({
warn!("hlfs: backfiller started");
let backfiller = backfiller.clone();
async move {
loop {
let mut bf = backfiller.lock().await;
if bf.client.max_block < bf.max_block_seen {
let block = bf.client.max_block + 1;
let new_height = bf.fetch_if_missing(block).await.expect("new height");
bf.client.max_block = new_height.unwrap();
}
sleep(Duration::from_millis(50)).await;
}
}
});
tokio::spawn({
let backfiller = backfiller.clone();
async move {
let mut events = network.event_listener();
loop {
let mut bf = backfiller.lock().await;
match events.next().await {
Some(NetworkEvent::ActivePeerSession { info, .. }) => {
let ip = info.remote_addr.ip();
if ip.is_unspecified() {
debug!(%ip, "hlfs: skip unspecified");
continue;
}
if ip == self_ip {
debug!(%ip, "hlfs: skip self");
continue;
}
let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port);
let max_block = probe_hlfs(addr).await;
if max_block != 0 {
let mut g = good.lock().await;
if g.insert(PeerRecord { addr, max_block }) {
let v: Vec<_> = g.iter().copied().collect();
bf.set_peers(v.clone());
info!(%addr, %max_block, total=v.len(), "hlfs: peer added");
}
} else {
debug!(%addr, "hlfs: peer has no HLFS");
}
}
Some(_) => {}
None => {
warn!("hlfs: network event stream ended");
break;
}
}
}
}
})
}
pub(crate) async fn probe_hlfs(addr: SocketAddr) -> u64 {
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
};
let fut = async {
let mut s = TcpStream::connect(addr).await.ok()?;
// send [OP][8 zero bytes]
let mut msg = [0u8; 9];
msg[0] = OP_REQ_MAX_BLOCK;
s.write_all(&msg).await.ok()?;
// read 1-byte opcode
let mut op = [0u8; 1];
s.read_exact(&mut op).await.ok()?;
if op[0] != OP_RES_MAX_BLOCK {
return None;
}
// read 8-byte little-endian block number
let mut blk = [0u8; 8];
s.read_exact(&mut blk).await.ok()?;
Some(u64::from_le_bytes(blk))
};
match timeout(Duration::from_secs(2), fut).await {
Ok(Some(n)) => n,
_ => 0,
}
}