From 821846f671ef1f80861467625380eba020cc882a Mon Sep 17 00:00:00 2001 From: Nicholas Wehr Date: Mon, 11 Aug 2025 14:13:55 -0700 Subject: [PATCH] added alternative block share --- Cargo.lock | 16 ++ Cargo.toml | 2 + bin/reth/Cargo.toml | 4 +- bin/reth/src/main.rs | 14 ++ bin/reth/src/share_blocks.rs | 129 +++++++++++++ crates/net/hlfs/Cargo.toml | 25 +++ crates/net/hlfs/src/lib.rs | 343 +++++++++++++++++++++++++++++++++++ 7 files changed, 532 insertions(+), 1 deletion(-) create mode 100644 bin/reth/src/share_blocks.rs create mode 100644 crates/net/hlfs/Cargo.toml create mode 100644 crates/net/hlfs/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 6222c9589..4bdb36957 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6659,6 +6659,7 @@ dependencies = [ "clap", "eyre", "futures", + "futures-util", "jsonrpsee", "jsonrpsee-core", "lz4_flex", @@ -6687,6 +6688,7 @@ dependencies = [ "reth-execution-types", "reth-exex", "reth-fs-util", + "reth-hlfs", "reth-hyperliquid-types", "reth-network", "reth-network-api", @@ -7966,6 +7968,20 @@ dependencies = [ "thiserror 2.0.11", ] +[[package]] +name = "reth-hlfs" +version = "1.2.0" +dependencies = [ + "bytes", + "parking_lot", + "rand 0.8.5", + "reth-tracing", + "tempfile", + "thiserror 2.0.11", + "tokio", + "tracing", +] + [[package]] name = "reth-hyperliquid-types" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index cc7c842fc..5c9ea2d6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ members = [ "crates/net/ecies/", "crates/net/eth-wire-types", "crates/net/eth-wire/", + "crates/net/hlfs/", "crates/net/nat/", "crates/net/network-api/", "crates/net/network-types/", @@ -356,6 +357,7 @@ reth-exex = { path = "crates/exex/exex" } reth-exex-test-utils = { path = "crates/exex/test-utils" } reth-exex-types = { path = "crates/exex/types" } reth-fs-util = { path = "crates/fs-util" } +reth-hlfs = { path = "crates/net/hlfs" } reth-invalid-block-hooks = { path = "crates/engine/invalid-block-hooks" } reth-ipc = { path = "crates/rpc/ipc" } reth-libmdbx = { path = "crates/storage/libmdbx-rs" } diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 0457f4586..775db8f82 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -35,6 +35,7 @@ reth-cli-runner.workspace = true reth-cli-commands.workspace = true reth-cli-util.workspace = true reth-consensus-common.workspace = true +reth-hlfs.workspace = true reth-rpc-builder.workspace = true reth-rpc.workspace = true reth-rpc-types-compat.workspace = true @@ -81,8 +82,9 @@ tracing.workspace = true serde_json.workspace = true # async -tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] } +tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread", "net", "fs"] } futures.workspace = true +futures-util.workspace = true # time time = { workspace = true } diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index a19ff491a..d65506fe0 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -6,6 +6,7 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne mod block_ingest; mod call_forwarder; mod serialized; +mod share_blocks; mod spot_meta; mod tx_forwarder; @@ -18,6 +19,7 @@ use reth::cli::Cli; use reth_ethereum_cli::chainspec::EthereumChainSpecParser; use reth_hyperliquid_types::PrecompilesCache; use reth_node_ethereum::EthereumNode; +use share_blocks::ShareBlocksArgs; use tokio::sync::Mutex; use tracing::info; use tx_forwarder::EthForwarderApiServer; @@ -40,6 +42,10 @@ struct HyperliquidExtArgs { /// 3. filters out logs and transactions from subscription. #[arg(long, default_value = "false")] pub hl_node_compliant: bool, + + /// Enable hlfs to backfill archive blocks + #[command(flatten)] + pub hlfs: ShareBlocksArgs, } fn main() { @@ -85,6 +91,14 @@ fn main() { .launch() .await?; + // start HLFS (serve + peer-backed backfill) using the node's network + if ext_args.hlfs.share_blocks { + let net = handle.node.network.clone(); // returns a FullNetwork (NetworkHandle under the hood) + // keep handle alive for the whole process + let _hlfs = + share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net).await?; + } + let ingest = BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache }; ingest.run(handle.node).await.unwrap(); diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs new file mode 100644 index 000000000..28de1920f --- /dev/null +++ b/bin/reth/src/share_blocks.rs @@ -0,0 +1,129 @@ +use clap::Args; +use once_cell::sync::Lazy; +use reth_hlfs::{Backfiller, Client, Server}; +use reth_network_api::{events::NetworkEvent, FullNetwork}; +use std::{ + collections::HashSet, + net::{IpAddr, SocketAddr}, + path::PathBuf, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; +use tokio::{task::JoinHandle, time::timeout}; +use tracing::{debug, info, warn}; + +// use futures_util::StreamExt; +use futures_util::stream::StreamExt; + +static RR: Lazy = Lazy::new(|| AtomicUsize::new(0)); + +#[derive(Args, Clone, Debug)] +pub struct ShareBlocksArgs { + #[arg(long, default_value_t = false)] + pub share_blocks: bool, + #[arg(long, default_value = "0.0.0.0")] + pub share_blocks_host: String, + #[arg(long, default_value_t = 9595)] + pub share_blocks_port: u16, + #[arg(long, default_value = "evm-blocks")] + pub archive_dir: PathBuf, + #[arg(long, default_value_t = 5_000)] + pub hist_threshold: u64, +} + +pub struct ShareBlocks { + pub backfiller: Backfiller, + _server: JoinHandle<()>, + _autodetect: JoinHandle<()>, +} + +impl ShareBlocks { + pub async fn start_with_network(args: &ShareBlocksArgs, network: Net) -> eyre::Result + where + Net: FullNetwork + Clone + 'static, + { + let host: IpAddr = args + .share_blocks_host + .parse() + .map_err(|e| eyre::eyre!("invalid --share-blocks-host: {e}"))?; + let bind: SocketAddr = (host, args.share_blocks_port).into(); + + let srv = Server::new(bind, &args.archive_dir).with_limits(512, 50); + let _server = tokio::spawn(async move { + if let Err(e) = srv.run().await { + warn!(error=%e, "hlfs: server exited"); + } + }); + + let client = Client::new(Vec::new()).with_timeout(Duration::from_secs(5)); + let bf = Backfiller::new(client, &args.archive_dir, args.hist_threshold); + + let _autodetect = spawn_autodetect(network, args.share_blocks_port, bf.clone()); + + info!(%bind, dir=%args.archive_dir.display(), hist_threshold=%args.hist_threshold, "hlfs: enabled (reth peers)"); + Ok(Self { backfiller: bf, _server, _autodetect }) + } + + pub async fn try_fetch_one(&self, block: u64, head: u64) -> eyre::Result> { + let rr = RR.fetch_add(1, Ordering::Relaxed); + self.backfiller.fetch_if_missing(block, head, rr).await.map_err(|e| eyre::eyre!(e)) + // <- fix: HlfsError -> eyre::Report + } +} + +fn spawn_autodetect(network: Net, hlfs_port: u16, backfiller: Backfiller) -> JoinHandle<()> +where + Net: FullNetwork + Clone + 'static, +{ + let good: Arc>> = + Arc::new(tokio::sync::Mutex::new(HashSet::new())); + + tokio::spawn(async move { + let mut events = network.event_listener(); + loop { + match events.next().await { + Some(NetworkEvent::ActivePeerSession { info, .. }) => { + let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); + if probe_hlfs(addr).await { + let mut g = good.lock().await; + if g.insert(addr) { + let v: Vec<_> = g.iter().copied().collect(); + backfiller.set_peers(v.clone()); + info!(%addr, total=v.len(), "hlfs: peer added"); + } + } else { + debug!(%addr, "hlfs: peer has no HLFS"); + } + } + Some(_) => {} + None => { + warn!("hlfs: network event stream ended"); + break; + } + } + } + }) +} + +async fn probe_hlfs(addr: SocketAddr) -> bool { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::net::TcpStream; + + let res = timeout(Duration::from_secs(2), async { + if let Ok(mut s) = TcpStream::connect(addr).await { + let mut msg = [0u8; 9]; + msg[0] = 0x01; + let _ = s.write_all(&msg).await; + let mut op = [0u8; 1]; + if s.read_exact(&mut op).await.is_ok() { + return matches!(op[0], 0x02 | 0x03 | 0x04); + } + } + false + }) + .await; + matches!(res, Ok(true)) +} diff --git a/crates/net/hlfs/Cargo.toml b/crates/net/hlfs/Cargo.toml new file mode 100644 index 000000000..37b803d4b --- /dev/null +++ b/crates/net/hlfs/Cargo.toml @@ -0,0 +1,25 @@ +# crates/net/hlfs/Cargo.toml +[package] +name = "reth-hlfs" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = "Implementation of archive block downloader" +authors = ["@wwwehr"] + + +[dependencies] +tokio = { workspace = true, features = ["rt-multi-thread","macros","net","time","fs","sync","io-util"] } +bytes.workspace = true +parking_lot.workspace = true +thiserror.workspace = true +tracing.workspace = true +reth-tracing.workspace = true + +[dev-dependencies] +rand.workspace = true +tempfile.workspace = true + diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs new file mode 100644 index 000000000..f67fbee74 --- /dev/null +++ b/crates/net/hlfs/src/lib.rs @@ -0,0 +1,343 @@ +//! HLFS TCP micro-protocol for historical backfill (single-block, RR per block). + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use parking_lot::Mutex; +use reth_tracing::tracing::trace; +use std::{ + io, + net::SocketAddr, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; +use thiserror::Error; +use tokio::{ + fs, + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, + time::{sleep, timeout}, +}; +use tracing::{debug, info, warn}; + +#[derive(Debug, Error)] +pub enum HlfsError { + #[error("io: {0}")] + Io(#[from] io::Error), + #[error("timeout")] + Timeout, + #[error("not found")] + NotFound, + #[error("busy {0:?}")] + Busy(Duration), + #[error("protocol")] + Proto, +} + +#[inline] +fn put_u64(b: &mut BytesMut, v: u64) { + b.put_u64_le(v) +} +#[inline] +fn put_u32(b: &mut BytesMut, v: u32) { + b.put_u32_le(v) +} + +/// Client: tries each peer once; rotates starting index per call. +#[derive(Clone)] +pub struct Client { + peers: Arc>>, + timeout: Duration, + backoff_ms: u32, +} +impl Client { + pub fn new(peers: Vec) -> Self { + Self { peers: Arc::new(Mutex::new(peers)), timeout: Duration::from_secs(5), backoff_ms: 50 } + } + pub fn update_peers(&self, peers: Vec) { + *self.peers.lock() = peers; + } + pub fn with_timeout(mut self, d: Duration) -> Self { + self.timeout = d; + self + } + pub async fn get_block(&self, number: u64, rr_index: usize) -> Result, HlfsError> { + let peers = self.peers.lock().clone(); + if peers.is_empty() { + debug!(block = number, "hlfs: no peers"); + return Err(HlfsError::Timeout); + } + for t in 0..peers.len() { + let i = (rr_index + t) % peers.len(); + let addr = peers[i]; + trace!(block=number, %addr, "hlfs: try"); + match timeout(self.timeout, fetch_once(addr, number)).await { + Err(_) => { + debug!(block=number, %addr, "hlfs: timeout"); + continue; + } + Ok(Err(HlfsError::Busy(d))) => { + trace!(block=number, %addr, delay_ms=?d, "hlfs: busy"); + sleep(d.min(Duration::from_millis(self.backoff_ms as u64))).await; + continue; + } + Ok(Err(HlfsError::NotFound)) => { + trace!(block=number, %addr, "hlfs: not found"); + continue; + } + Ok(Err(e)) => { + debug!(block=number, %addr, error=%e, "hlfs: error"); + continue; + } + Ok(Ok(bytes)) => { + info!(block=number, %addr, bytes=bytes.len(), "hlfs: fetched"); + return Ok(bytes); + } + } + } + Err(HlfsError::Timeout) + } +} +async fn fetch_once(addr: SocketAddr, number: u64) -> Result, HlfsError> { + let mut s = TcpStream::connect(addr).await?; + let mut buf = BytesMut::with_capacity(9); + buf.put_u8(0x01); + put_u64(&mut buf, number); + s.write_all(&buf).await?; + let mut op = [0u8; 1]; + s.read_exact(&mut op).await?; + match op[0] { + 0x02 => { + let mut meta = [0u8; 12]; + s.read_exact(&mut meta).await?; + let mut m = Bytes::from(meta.to_vec()); + let _n = m.get_u64_le(); + let len = m.get_u32_le() as usize; + let mut data = vec![0u8; len]; + s.read_exact(&mut data).await?; + Ok(data) + } + 0x03 => { + let mut _n = [0u8; 8]; + let _ = s.read_exact(&mut _n).await; + Err(HlfsError::NotFound) + } + 0x04 => { + let mut d = [0u8; 4]; + s.read_exact(&mut d).await?; + Err(HlfsError::Busy(Duration::from_millis(u32::from_le_bytes(d) as u64))) + } + _ => Err(HlfsError::Proto), + } +} + +/// Server: serves `{root}/{number}.rlp`. +pub struct Server { + bind: SocketAddr, + root: PathBuf, + max_conns: usize, + inflight: Arc>, + busy_retry_ms: u32, +} +impl Server { + pub fn new(bind: SocketAddr, root: impl Into) -> Self { + Self { + bind, + root: root.into(), + max_conns: 512, + inflight: Arc::new(Mutex::new(0)), + busy_retry_ms: 100, + } + } + pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self { + self.max_conns = max_conns; + self.busy_retry_ms = busy_retry_ms; + self + } + pub async fn run(self) -> Result<(), HlfsError> { + fs::create_dir_all(&self.root).await.ok(); + info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening"); + let lst = TcpListener::bind(self.bind).await?; + loop { + let (mut sock, addr) = lst.accept().await?; + if *self.inflight.lock() >= self.max_conns { + let mut b = BytesMut::with_capacity(5); + b.put_u8(0x04); + put_u32(&mut b, self.busy_retry_ms); + let _ = sock.write_all(&b).await; + continue; + } + *self.inflight.lock() += 1; + let root = self.root.clone(); + let inflight = self.inflight.clone(); + let busy = self.busy_retry_ms; + tokio::spawn(async move { + let _ = handle_conn(&mut sock, &root, busy, addr).await; + *inflight.lock() -= 1; + }); + } + } +} +async fn handle_conn( + sock: &mut TcpStream, + root: &Path, + busy_ms: u32, + addr: SocketAddr, +) -> Result<(), HlfsError> { + let mut op = [0u8; 1]; + sock.read_exact(&mut op).await?; + if op[0] != 0x01 { + warn!(%addr, "hlfs: bad op"); + return Err(HlfsError::Proto); + } + let mut nb = [0u8; 8]; + sock.read_exact(&mut nb).await?; + let number = u64::from_le_bytes(nb); + let path = root.join(format!("{number}.rlp")); + match fs::read(&path).await { + Ok(data) => { + let mut b = BytesMut::with_capacity(1 + 8 + 4 + data.len()); + b.put_u8(0x02); + put_u64(&mut b, number); + put_u32(&mut b, data.len() as u32); + b.extend_from_slice(&data); + sock.write_all(&b).await?; + Ok(()) + } + Err(e) if e.kind() == io::ErrorKind::NotFound => { + let mut b = [0u8; 9]; + b[0] = 0x03; + b[1..9].copy_from_slice(&number.to_le_bytes()); + sock.write_all(&b).await?; + Ok(()) + } + Err(_) => { + let mut b = BytesMut::with_capacity(5); + b.put_u8(0x04); + put_u32(&mut b, busy_ms); + let _ = sock.write_all(&b).await; + Err(HlfsError::Io(io::Error::new(io::ErrorKind::Other, "fs error"))) + } + } +} + +/// Backfiller: ask client per missing block; rotate peers every block. +#[derive(Clone)] +pub struct Backfiller { + client: Client, + root: PathBuf, + hist_threshold: u64, +} +impl Backfiller { + pub fn new(client: Client, root: impl Into, hist_threshold: u64) -> Self { + Self { client, root: root.into(), hist_threshold } + } + pub fn set_peers(&self, peers: Vec) { + self.client.update_peers(peers); + } + pub async fn fetch_if_missing( + &self, + number: u64, + head: u64, + rr_index: usize, + ) -> Result, HlfsError> { + if number + self.hist_threshold > head { + return Ok(None); + } + let path = self.root.join(format!("{number}.rlp")); + if fs::try_exists(&path).await? { + return Ok(None); + } + match self.client.get_block(number, rr_index).await { + Ok(bytes) => { + let tmp = self.root.join(format!("{number}.rlp.part")); + fs::write(&tmp, &bytes).await?; + fs::rename(&tmp, &path).await?; + info!(block=number, bytes=bytes.len(), path=%path.display(), "hlfs: wrote"); + Ok(Some(bytes.len())) + } + Err(e) => { + debug!(block=number, error=%e, "hlfs: fetch failed"); + Err(e) + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use rand::{rngs::StdRng, Rng, SeedableRng}; + fn sample(n: u64) -> Vec { + vec![((n as usize) % 251) as u8; 3072] + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn serve_and_fetch_rr() { + reth_tracing::init_test_tracing(); + let dir = tempfile::tempdir().unwrap(); + for n in 0..100u64 { + fs::write(dir.path().join(format!("{n}.rlp")), sample(n)).await.unwrap(); + } + let s1 = Server::new("127.0.0.1:9597".parse().unwrap(), dir.path()).with_limits(64, 20); + let s2 = Server::new("127.0.0.1:9598".parse().unwrap(), dir.path()).with_limits(64, 20); + tokio::spawn(async move { + let _ = s1.run().await; + }); + tokio::spawn(async move { + let _ = s2.run().await; + }); + tokio::time::sleep(Duration::from_millis(50)).await; + + let client = + Client::new(vec!["127.0.0.1:9597".parse().unwrap(), "127.0.0.1:9598".parse().unwrap()]) + .with_timeout(Duration::from_secs(1)); + let a = client.get_block(10, 0).await.unwrap(); + let b = client.get_block(11, 1).await.unwrap(); + assert_eq!(a.len(), 3072); + assert_eq!(b.len(), 3072); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn backfill_only_when_older_than_threshold() { + reth_tracing::init_test_tracing(); + let src = tempfile::tempdir().unwrap(); + let dst = tempfile::tempdir().unwrap(); + fs::write(src.path().join("5.rlp"), sample(5)).await.unwrap(); + let srv = Server::new("127.0.0.1:9599".parse().unwrap(), src.path()); + tokio::spawn(async move { + let _ = srv.run().await; + }); + tokio::time::sleep(Duration::from_millis(50)).await; + + let bf = Backfiller::new( + Client::new(vec!["127.0.0.1:9599".parse().unwrap()]), + dst.path(), + 5_000, + ); + let got = bf.fetch_if_missing(5, 10_000, 0).await.unwrap(); + assert_eq!(got, Some(3072)); + let skip = bf.fetch_if_missing(9_999, 10_000, 1).await.unwrap(); + assert_eq!(skip, None); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn busy_and_notfound_rotate() { + reth_tracing::init_test_tracing(); + let dir = tempfile::tempdir().unwrap(); + fs::write(dir.path().join("7.rlp"), sample(7)).await.unwrap(); + let s_busy = Server::new("127.0.0.1:9601".parse().unwrap(), dir.path()).with_limits(0, 10); + let s_ok = Server::new("127.0.0.1:9602".parse().unwrap(), dir.path()); + tokio::spawn(async move { + let _ = s_busy.run().await; + }); + tokio::spawn(async move { + let _ = s_ok.run().await; + }); + tokio::time::sleep(Duration::from_millis(50)).await; + + let c = + Client::new(vec!["127.0.0.1:9601".parse().unwrap(), "127.0.0.1:9602".parse().unwrap()]); + let b = c.get_block(7, 0).await.unwrap(); + assert_eq!(b.len(), 3072); + } +}