diff --git a/Cargo.lock b/Cargo.lock index 6222c9589..4bdb36957 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6659,6 +6659,7 @@ dependencies = [ "clap", "eyre", "futures", + "futures-util", "jsonrpsee", "jsonrpsee-core", "lz4_flex", @@ -6687,6 +6688,7 @@ dependencies = [ "reth-execution-types", "reth-exex", "reth-fs-util", + "reth-hlfs", "reth-hyperliquid-types", "reth-network", "reth-network-api", @@ -7966,6 +7968,20 @@ dependencies = [ "thiserror 2.0.11", ] +[[package]] +name = "reth-hlfs" +version = "1.2.0" +dependencies = [ + "bytes", + "parking_lot", + "rand 0.8.5", + "reth-tracing", + "tempfile", + "thiserror 2.0.11", + "tokio", + "tracing", +] + [[package]] name = "reth-hyperliquid-types" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index cc7c842fc..5c9ea2d6e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ members = [ "crates/net/ecies/", "crates/net/eth-wire-types", "crates/net/eth-wire/", + "crates/net/hlfs/", "crates/net/nat/", "crates/net/network-api/", "crates/net/network-types/", @@ -356,6 +357,7 @@ reth-exex = { path = "crates/exex/exex" } reth-exex-test-utils = { path = "crates/exex/test-utils" } reth-exex-types = { path = "crates/exex/types" } reth-fs-util = { path = "crates/fs-util" } +reth-hlfs = { path = "crates/net/hlfs" } reth-invalid-block-hooks = { path = "crates/engine/invalid-block-hooks" } reth-ipc = { path = "crates/rpc/ipc" } reth-libmdbx = { path = "crates/storage/libmdbx-rs" } diff --git a/bin/reth/Cargo.toml b/bin/reth/Cargo.toml index 0457f4586..775db8f82 100644 --- a/bin/reth/Cargo.toml +++ b/bin/reth/Cargo.toml @@ -35,6 +35,7 @@ reth-cli-runner.workspace = true reth-cli-commands.workspace = true reth-cli-util.workspace = true reth-consensus-common.workspace = true +reth-hlfs.workspace = true reth-rpc-builder.workspace = true reth-rpc.workspace = true reth-rpc-types-compat.workspace = true @@ -81,8 +82,9 @@ tracing.workspace = true serde_json.workspace = true # async -tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] } +tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread", "net", "fs"] } futures.workspace = true +futures-util.workspace = true # time time = { workspace = true } diff --git a/bin/reth/src/block_ingest.rs b/bin/reth/src/block_ingest.rs index a259cb29e..218610495 100644 --- a/bin/reth/src/block_ingest.rs +++ b/bin/reth/src/block_ingest.rs @@ -29,6 +29,7 @@ use tokio::sync::Mutex; use tracing::{debug, info}; use crate::serialized::{BlockAndReceipts, EvmBlock}; +use crate::share_blocks::ShareBlocks; use crate::spot_meta::erc20_contract_to_spot_token; /// Poll interval when tailing an *open* hourly file. @@ -41,6 +42,7 @@ pub(crate) struct BlockIngest { pub local_ingest_dir: Option, pub local_blocks_cache: Arc>>, // height → block pub precompiles_cache: PrecompilesCache, + pub hlfs: Option, } #[derive(Deserialize)] @@ -155,9 +157,9 @@ impl BlockIngest { if let Some(block) = self.try_collect_local_block(height).await { info!("Returning locally synced block for @ Height [{height}]"); return Some(block); - } else { - self.try_collect_s3_block(height) } + + self.try_collect_s3_block(height) } pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option { diff --git a/bin/reth/src/main.rs b/bin/reth/src/main.rs index a19ff491a..b53231b43 100644 --- a/bin/reth/src/main.rs +++ b/bin/reth/src/main.rs @@ -6,6 +6,7 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne mod block_ingest; mod call_forwarder; mod serialized; +mod share_blocks; mod spot_meta; mod tx_forwarder; @@ -18,6 +19,7 @@ use reth::cli::Cli; use reth_ethereum_cli::chainspec::EthereumChainSpecParser; use reth_hyperliquid_types::PrecompilesCache; use reth_node_ethereum::EthereumNode; +use share_blocks::ShareBlocksArgs; use tokio::sync::Mutex; use tracing::info; use tx_forwarder::EthForwarderApiServer; @@ -40,6 +42,10 @@ struct HyperliquidExtArgs { /// 3. filters out logs and transactions from subscription. #[arg(long, default_value = "false")] pub hl_node_compliant: bool, + + /// Enable hlfs to backfill archive blocks + #[command(flatten)] + pub hlfs: ShareBlocksArgs, } fn main() { @@ -85,8 +91,24 @@ fn main() { .launch() .await?; - let ingest = - BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache }; + // start HLFS (serve + peer-backed backfill) using the node's network + let hlfs = if ext_args.hlfs.share_blocks { + let net = handle.node.network.clone(); + Some( + crate::share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net) + .await?, + ) + } else { + None + }; + + let ingest = BlockIngest { + ingest_dir, + local_ingest_dir, + local_blocks_cache, + precompiles_cache, + hlfs, + }; ingest.run(handle.node).await.unwrap(); handle.node_exit_future.await }, diff --git a/bin/reth/src/share_blocks.rs b/bin/reth/src/share_blocks.rs new file mode 100644 index 000000000..4e5188c12 --- /dev/null +++ b/bin/reth/src/share_blocks.rs @@ -0,0 +1,167 @@ +use clap::Args; +use reth_hlfs::{Backfiller, Client, PeerRecord, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK}; +use reth_network_api::{events::NetworkEvent, FullNetwork}; +use std::{ + collections::HashSet, + net::{IpAddr, SocketAddr}, + path::PathBuf, + sync::Arc, +}; +use tokio::{ + task::JoinHandle, + time::{sleep, timeout, Duration}, +}; +use tracing::{debug, info, warn}; + +// use futures_util::StreamExt; +use futures_util::stream::StreamExt; + +#[derive(Args, Clone, Debug)] +pub(crate) struct ShareBlocksArgs { + #[arg(long, default_value_t = false)] + pub share_blocks: bool, + #[arg(long, default_value = "0.0.0.0")] + pub share_blocks_host: String, + #[arg(long, default_value_t = 9595)] + pub share_blocks_port: u16, + #[arg(long, default_value = "evm-blocks")] + pub archive_dir: PathBuf, +} + +pub(crate) struct ShareBlocks { + _server: JoinHandle<()>, + _autodetect: JoinHandle<()>, +} + +impl ShareBlocks { + pub(crate) async fn start_with_network( + args: &ShareBlocksArgs, + network: Net, + ) -> eyre::Result + where + Net: FullNetwork + Clone + 'static, + { + let host: IpAddr = args + .share_blocks_host + .parse() + .map_err(|e| eyre::eyre!("invalid --share-blocks-host: {e}"))?; + let bind: SocketAddr = (host, args.share_blocks_port).into(); + + let srv = Server::new(bind, &args.archive_dir).with_limits(512, 50); + let _server = tokio::spawn(async move { + if let Err(e) = srv.run().await { + warn!(error=%e, "hlfs: server exited"); + } + }); + + let _autodetect = + spawn_autodetect(network, host, args.share_blocks_port, args.archive_dir.clone()); + + info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)"); + Ok(Self { _server, _autodetect }) + } +} + +fn spawn_autodetect( + network: Net, + self_ip: IpAddr, + hlfs_port: u16, + archive_dir: PathBuf, +) -> JoinHandle<()> +where + Net: FullNetwork + Clone + 'static, +{ + let client = Client::new(&archive_dir, Vec::new()).with_timeout(Duration::from_secs(5)); + let backfiller = Arc::new(tokio::sync::Mutex::new(Backfiller::new(client, &archive_dir))); + let good: Arc>> = + Arc::new(tokio::sync::Mutex::new(HashSet::new())); + + tokio::spawn({ + warn!("hlfs: backfiller started"); + let backfiller = backfiller.clone(); + async move { + loop { + let mut bf = backfiller.lock().await; + if bf.client.max_block < bf.max_block_seen { + let block = bf.client.max_block + 1; + let new_height = bf.fetch_if_missing(block).await.expect("new height"); + bf.client.max_block = new_height.unwrap(); + } + sleep(Duration::from_millis(50)).await; + } + } + }); + + tokio::spawn({ + let backfiller = backfiller.clone(); + async move { + let mut events = network.event_listener(); + loop { + let mut bf = backfiller.lock().await; + match events.next().await { + Some(NetworkEvent::ActivePeerSession { info, .. }) => { + let ip = info.remote_addr.ip(); + if ip.is_unspecified() { + debug!(%ip, "hlfs: skip unspecified"); + continue; + } + if ip == self_ip { + debug!(%ip, "hlfs: skip self"); + continue; + } + let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port); + let max_block = probe_hlfs(addr).await; + if max_block != 0 { + let mut g = good.lock().await; + if g.insert(PeerRecord { addr, max_block }) { + let v: Vec<_> = g.iter().copied().collect(); + bf.set_peers(v.clone()); + info!(%addr, %max_block, total=v.len(), "hlfs: peer added"); + } + } else { + debug!(%addr, "hlfs: peer has no HLFS"); + } + } + Some(_) => {} + None => { + warn!("hlfs: network event stream ended"); + break; + } + } + } + } + }) +} + +pub(crate) async fn probe_hlfs(addr: SocketAddr) -> u64 { + use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpStream, + }; + + let fut = async { + let mut s = TcpStream::connect(addr).await.ok()?; + + // send [OP][8 zero bytes] + let mut msg = [0u8; 9]; + msg[0] = OP_REQ_MAX_BLOCK; + s.write_all(&msg).await.ok()?; + + // read 1-byte opcode + let mut op = [0u8; 1]; + s.read_exact(&mut op).await.ok()?; + if op[0] != OP_RES_MAX_BLOCK { + return None; + } + + // read 8-byte little-endian block number + let mut blk = [0u8; 8]; + s.read_exact(&mut blk).await.ok()?; + Some(u64::from_le_bytes(blk)) + }; + + match timeout(Duration::from_secs(2), fut).await { + Ok(Some(n)) => n, + _ => 0, + } +} diff --git a/crates/net/hlfs/Cargo.toml b/crates/net/hlfs/Cargo.toml new file mode 100644 index 000000000..37b803d4b --- /dev/null +++ b/crates/net/hlfs/Cargo.toml @@ -0,0 +1,25 @@ +# crates/net/hlfs/Cargo.toml +[package] +name = "reth-hlfs" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true +description = "Implementation of archive block downloader" +authors = ["@wwwehr"] + + +[dependencies] +tokio = { workspace = true, features = ["rt-multi-thread","macros","net","time","fs","sync","io-util"] } +bytes.workspace = true +parking_lot.workspace = true +thiserror.workspace = true +tracing.workspace = true +reth-tracing.workspace = true + +[dev-dependencies] +rand.workspace = true +tempfile.workspace = true + diff --git a/crates/net/hlfs/src/lib.rs b/crates/net/hlfs/src/lib.rs new file mode 100644 index 000000000..d9490f159 --- /dev/null +++ b/crates/net/hlfs/src/lib.rs @@ -0,0 +1,374 @@ +//! HLFS TCP micro-protocol for historical backfill (single-block, RR per block). + +use bytes::{Buf, BufMut, Bytes, BytesMut}; +use parking_lot::Mutex; +use reth_tracing::tracing::{debug, info, trace, warn}; +use std::{ + fs, + hash::{Hash, Hasher}, + io, + net::SocketAddr, + path::{Path, PathBuf}, + sync::Arc, + time::Duration, +}; +use thiserror::Error; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, + time::timeout, +}; + +type Result = std::result::Result; + +pub const OP_REQ_BLOCK: u8 = 0x01; +pub const OP_RES_BLOCK: u8 = 0x02; +pub const OP_REQ_MAX_BLOCK: u8 = 0x03; +pub const OP_RES_MAX_BLOCK: u8 = 0x04; +pub const OP_ERR_TOO_BUSY: u8 = 0x05; +pub const OP_ERR_NOT_FOUND: u8 = 0x06; + +#[derive(Error, Debug)] +pub enum HlfsError { + #[error("io: {0}")] + Io(#[from] io::Error), + #[error("proto")] + Proto, + #[error("no peers")] + NoPeers, + #[error("timeout")] + Timeout, + #[error("busy: retry_ms={0}")] + Busy(u32), + #[error("not found")] + NotFound, +} + +#[inline] +fn put_u64(b: &mut BytesMut, v: u64) { + b.put_u64_le(v) +} +#[inline] +fn put_u32(b: &mut BytesMut, v: u32) { + b.put_u32_le(v) +} + +async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> { + if let Some(parent) = Path::new(path).parent() { + fs::create_dir_all(parent) + } else { + Ok(()) + } +} + +/// Client: tries each peer once; rotates starting index per call +#[derive(Debug, Copy, Clone)] +pub struct PeerRecord { + pub addr: SocketAddr, + pub max_block: u64, +} + +impl PartialEq for PeerRecord { + fn eq(&self, o: &Self) -> bool { + self.addr == o.addr + } +} +impl Eq for PeerRecord {} +impl Hash for PeerRecord { + fn hash(&self, s: &mut H) { + self.addr.hash(s); + } +} + +#[derive(Clone)] +pub struct Client { + root: PathBuf, + pub peers: Arc>>, + timeout: Duration, + pub max_block: u64, +} +impl Client { + pub fn new(root: impl Into, peers: Vec) -> Self { + let root: PathBuf = root.into(); + let n = find_max_number_file(&root).unwrap(); + debug!(max_block = n, "hlfs: our archive"); + Self { + root, + peers: Arc::new(Mutex::new(peers)), + timeout: Duration::from_secs(3), + max_block: n, + } + } + pub fn update_peers(&self, peers: Vec) { + *self.peers.lock() = peers; + } + pub fn with_timeout(mut self, d: Duration) -> Self { + self.timeout = d; + self + } + pub async fn wants_block(&self, number: u64, rr_index: usize) -> Result, HlfsError> { + let peers = self.peers.lock().clone(); + debug!(peer_count = peers.len(), "hlfs: peers"); + if peers.is_empty() { + return Err(HlfsError::NoPeers); + } + + let mut all = (0..peers.len()).map(|i| (rr_index + i) % peers.len()); + let mut last_busy: Option = None; + while let Some(i) = all.next() { + let addr = peers[i]; + trace!(%addr.addr, "hlfs: dialing"); + match timeout(self.timeout, TcpStream::connect(addr.addr)).await { + Err(_) => continue, + Ok(Err(_)) => continue, + Ok(Ok(mut sock)) => { + let mut req = BytesMut::with_capacity(1 + 8); + req.put_u8(OP_REQ_BLOCK); + put_u64(&mut req, number); + if let Err(e) = sock.write_all(&req).await { + debug!(%addr.addr, "hlfs: write err: {e}"); + continue; + } + let mut op = [0u8; 1]; + if let Err(e) = timeout(self.timeout, sock.read_exact(&mut op)).await { + debug!(%addr.addr, "hlfs: read op timeout {e:?}"); + continue; + } + let op = op[0]; + match op { + OP_RES_BLOCK => { + // DATA + let mut len = [0u8; 4]; + sock.read_exact(&mut len).await?; + let len = u32::from_le_bytes(len) as usize; + let mut buf = vec![0u8; len]; + sock.read_exact(&mut buf).await?; + return Ok(buf); + } + OP_ERR_TOO_BUSY => { + let mut ms = [0u8; 4]; + sock.read_exact(&mut ms).await?; + last_busy = Some(u32::from_le_bytes(ms)); + continue; + } + OP_ERR_NOT_FOUND => { + return Err(HlfsError::NotFound); + } + _ => { + continue; + } + } + } + } + } + if let Some(ms) = last_busy { + return Err(HlfsError::Busy(ms)); + } + Err(HlfsError::NotFound) + } +} + +fn find_max_number_file(root: &Path) -> Result { + fn parse_num(name: &str) -> Option { + name.strip_suffix(".rmp.lz4")?.parse::().ok() + } + + fn walk(dir: &Path, best: &mut Option) -> io::Result<()> { + for entry in fs::read_dir(dir)? { + let entry = entry?; + let path = entry.path(); + let ft = entry.file_type()?; + if ft.is_dir() { + walk(&path, best)?; + } else if ft.is_file() { + if let Some(name) = path.file_name().and_then(|s| s.to_str()) { + if let Some(n) = parse_num(name) { + if best.map_or(true, |b| n > b) { + *best = Some(n); + } + } + } + } + } + Ok(()) + } + + let mut best = Some(0); + let top: PathBuf = fs::read_dir(root)? + .filter_map(|e| e.ok()) + .filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false)) + .filter_map(|e| { + let name = e.file_name(); + let s = name.to_str()?; + let n: u64 = s.parse().ok()?; + Some((n, e.path())) + }) + .max_by_key(|(n, _)| *n) + .map(|(_, p)| p) + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no numeric top-level dirs"))?; + + walk(&top, &mut best)?; + Ok(best.expect("cannot find block files")) +} + +/// Server: serves `{root}/{number}.rlp`. +pub struct Server { + bind: SocketAddr, + root: PathBuf, + max_conns: usize, + inflight: Arc>, + busy_retry_ms: u32, + max_block: u64, +} + +impl Server { + pub fn new(bind: SocketAddr, root: impl Into) -> Self { + let root: PathBuf = root.into(); + let n = find_max_number_file(&root).unwrap(); + Self { + bind, + root, + max_conns: 512, + inflight: Arc::new(Mutex::new(0)), + busy_retry_ms: 100, + max_block: n, + } + } + pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self { + self.max_conns = max_conns; + self.busy_retry_ms = busy_retry_ms; + self + } + pub async fn run(self) -> Result<(), HlfsError> { + fs::create_dir_all(&self.root).ok(); + info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening"); + let lst = TcpListener::bind(self.bind).await?; + loop { + let (mut sock, addr) = lst.accept().await?; + if *self.inflight.lock() >= self.max_conns { + let mut b = BytesMut::with_capacity(5); + b.put_u8(OP_ERR_TOO_BUSY); + put_u32(&mut b, self.busy_retry_ms); + let _ = sock.write_all(&b).await; + continue; + } + *self.inflight.lock() += 1; + let root = self.root.clone(); + let inflight = self.inflight.clone(); + let busy = self.busy_retry_ms; + tokio::spawn(async move { + let _ = handle_conn(&mut sock, &root, busy, addr, self.max_block).await; + *inflight.lock() -= 1; + }); + } + } +} +async fn handle_conn( + sock: &mut TcpStream, + root: &Path, + busy_ms: u32, + addr: SocketAddr, + max_block: u64, +) -> Result<(), HlfsError> { + let mut op = [0u8; 1]; + sock.read_exact(&mut op).await?; + if op[0] != OP_REQ_BLOCK && op[0] != OP_REQ_MAX_BLOCK { + warn!(%addr, "hlfs: bad op"); + return Err(HlfsError::Proto); + } + + if op[0] == OP_REQ_MAX_BLOCK { + let mut b = BytesMut::with_capacity(1 + 8); + b.put_u8(OP_RES_MAX_BLOCK); + put_u64(&mut b, max_block); + let _ = sock.write_all(&b).await; + return Ok(()); + } + + let mut num = [0u8; 8]; + sock.read_exact(&mut num).await?; + let number = u64::from_le_bytes(num); + + let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 + let f = (n / 1_000_000) * 1_000_000; + let s = (n / 1_000) * 1_000; + let path = format!("{}/{f}/{s}/{number}.rmp.lz4", root.to_string_lossy()); + + trace!(%addr, number, %path, "hlfs: req"); + if let Err(e) = ensure_parent_dirs(&path).await { + warn!(%addr, %path, "hlfs: mkdirs failed: {e}"); + } + + match fs::read(&path) { + Ok(data) => { + let mut b = BytesMut::with_capacity(1 + 4 + data.len()); + b.put_u8(OP_RES_BLOCK); + put_u32(&mut b, data.len() as u32); + b.extend_from_slice(&data); + let _ = sock.write_all(&b).await; + } + Err(e) if e.kind() == io::ErrorKind::NotFound => { + let mut b = BytesMut::with_capacity(1); + b.put_u8(OP_ERR_NOT_FOUND); + let _ = sock.write_all(&b).await; + } + Err(e) => { + warn!(%addr, %path, "hlfs: read error: {e}"); + let _ = sock.shutdown().await; + } + } + Ok(()) +} + +/// Backfiller: ask client per missing block; rotate peers every block. +#[derive(Clone)] +pub struct Backfiller { + pub client: Client, + root: PathBuf, + pub max_block_seen: u64, +} +impl Backfiller { + pub fn new(client: Client, root: impl Into) -> Self { + Self { client, root: root.into(), max_block_seen: 0 } + } + pub fn set_peers(&mut self, peers: Vec) { + self.client.update_peers(peers); + let _peers = self.client.peers.lock().clone(); + for p in _peers { + if p.max_block > self.max_block_seen { + self.max_block_seen = p.max_block + } + } + } + pub async fn fetch_if_missing(&mut self, number: u64) -> Result, HlfsError> { + let rr_index = number as usize; + let n = number.saturating_sub(1); // 0 -> 0, others -> number-1 + let f = (n / 1_000_000) * 1_000_000; + let s = (n / 1_000) * 1_000; + + let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy()); + if Path::new(&path).exists() { + debug!(block = number, "hlfs: already have"); + return Ok(None); + } + match self.client.wants_block(number, rr_index).await { + Err(HlfsError::NotFound) => Ok(None), + Err(HlfsError::Busy(ms)) => { + tokio::time::sleep(Duration::from_millis(ms as u64)).await; + Ok(None) + } + Err(e) => Err(e), + Ok(data) => { + if let Err(e) = ensure_parent_dirs(&path).await { + warn!(%path, "hlfs: mkdirs failed: {e}"); + } + if let Err(e) = fs::write(&path, &data) { + warn!(%path, "hlfs: write failed: {e}"); + return Ok(None); + } + debug!(block = number, "hlfs: got block"); + Ok(Some(number)) + } + } + } +}