Local block sync (#7)

* update: logs

* update: more logging

* update: rename local ingest dir args

* update: fix build

* update: directory path

* update: logs

* update: log ts

* update: fetch last block

* update: time formatting

* update: handle seconds

* update: lmore logs

* fix: provided args

* update: logs

* fix: build

* update: indefinite wiat

* update: run the right loop

* update: remove offset

* update: scan impl

* update: log exists

* update: collect s3 blocks

* update: change the file

* update: logs

* fix: deserialization

* fix: build

* update: remove block

* update: add logs

* update: logs

* update: logs

* update: dates

* update: ignore older blocks

* update: hook up to sync

* fix: build

* fix: build

* update: logs

* update: logs

* update: start height cond

* update: height

* update: loggy

* update: cond

* update: cond

* update: cond

* update: logs

* update: fix height issues

* update: logs

* only collect s3

* update: log block

* update: log both blocks

* update; return s3 block

* update: use local block

* update: blocks

* update: remove logs

* update: logs

* update: remove warns and logs

* update: collection log

* update: logs

* update: logs

* update: scan through heights when registering evm

* update: add local ingest dir to other evm factory

* fix: build

* update: add cli cmd

* update: remove additional arg

* update: change where local ingest dir comes from

* fix: receipts

* update: deser format

* update: fix build

* update: logs

* update: logs

* update: logs

* update: logs

* update: share precompiles with engine

* update: insert compiles

* update: change sync dir

* update: logs

* update: logs

* update: logs

* update: fix build

* update: pipe builder context through

* update: untracked

* update: pass through context

* fix: build

* fix: build

* update: logs

* update: logs

* update: logs

* update: fix cache passthrough

* update: remove logs

* update: logs

* update: hour rollover

* update: zero out hour

* update: hour sync

* update: cleanup code and speedup sync

* update: speedup sync

* update: remove logs

* update: speed up sync

* update: speed up sync

* update: ingest in reverse

* fix: iter rev

* update: break line loop early

* update: remove break

* update: iteration speed

* update: fix build

* update: slow down tail ival

* update: logs

* update: skip last line

* update: remove log

* update: height

* update: logs

* update: return logs

* update: disable attempt logs

* update: tail interval

* update: cleanup logs

* update: add iter skip

* update: fix build

* update: skip -1

* fix: skip

* fix: build

* update: build

* fix: build

* update: logs

* update: log idx

* update: skip after enumerate

* update: cleanup

* update: more cleanup

* update: refactor BuilderSharedState to HyperliquidSharedState

* update: more cleanup

* update: cleanup and refactor collect_local_block

* update: error msg

* update: readme

* update: typo

* update: file log

* fix: typo build

* update: debug log
This commit is contained in:
arb00r
2025-06-26 04:15:58 +10:00
committed by GitHub
parent 138377fc62
commit 28f6c1e6be
21 changed files with 375 additions and 47 deletions

View File

@ -84,6 +84,9 @@ serde_json.workspace = true
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
futures.workspace = true
# time
time = { workspace = true }
# misc
aquamarine.workspace = true
eyre.workspace = true

View File

@ -1,4 +1,6 @@
use std::path::PathBuf;
use std::collections::BTreeMap;
use std::io::{BufRead, BufReader};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use alloy_consensus::{BlockBody, BlockHeader, Transaction};
@ -10,22 +12,90 @@ use alloy_rpc_types::engine::{
use jsonrpsee::http_client::{transport::HttpBackend, HttpClient};
use reth::network::PeersHandleProvider;
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_node_api::{FullNodeComponents, PayloadTypes};
use reth_hyperliquid_types::PrecompilesCache;
use reth_node_api::{Block, FullNodeComponents, PayloadTypes};
use reth_node_builder::EngineTypes;
use reth_node_builder::NodeTypesWithEngine;
use reth_node_builder::{rpc::RethRpcAddOns, FullNode};
use reth_payload_builder::{EthBuiltPayload, EthPayloadBuilderAttributes, PayloadId};
use reth_primitives::{Transaction as TypedTransaction, TransactionSigned};
use reth_provider::{BlockHashReader, StageCheckpointReader};
use reth_provider::{BlockHashReader, BlockReader, StageCheckpointReader};
use reth_rpc_api::EngineApiClient;
use reth_rpc_layer::AuthClientService;
use reth_stages::StageId;
use serde::Deserialize;
use time::{format_description, Duration, OffsetDateTime};
use tokio::sync::Mutex;
use tracing::{debug, info};
use crate::serialized::{BlockAndReceipts, EvmBlock};
use crate::spot_meta::erc20_contract_to_spot_token;
pub(crate) struct BlockIngest(pub PathBuf);
/// Poll interval when tailing an *open* hourly file.
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
/// Subdirectory that contains day folders (inside `local_ingest_dir`).
const HOURLY_SUBDIR: &str = "hourly";
pub(crate) struct BlockIngest {
pub ingest_dir: PathBuf,
pub local_ingest_dir: Option<PathBuf>,
pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block
pub precompiles_cache: PrecompilesCache,
}
#[derive(Deserialize)]
struct LocalBlockAndReceipts(String, BlockAndReceipts);
struct ScanResult {
next_expected_height: u64,
new_blocks: Vec<BlockAndReceipts>,
}
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> ScanResult {
// info!(
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
// path, start_height, last_line
// );
let file = std::fs::File::open(path).expect("Failed to open hour file path");
let reader = BufReader::new(file);
let mut new_blocks = Vec::<BlockAndReceipts>::new();
let mut last_height = start_height;
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 };
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
// Safety check ensuring efficiency
if line_idx < *last_line {
continue;
}
if line.trim().is_empty() {
continue;
}
let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts =
serde_json::from_str(&line).expect("Failed to parse local block and receipts");
let height = match &parsed_block.block {
EvmBlock::Reth115(b) => {
let block_number = b.header().number() as u64;
// Another check to ensure not returning an older block
if block_number < start_height {
continue;
}
block_number
}
};
// println!("Iterating block height {:?} | Line {}", height, line_idx);
if height >= start_height {
last_height = last_height.max(height);
new_blocks.push(parsed_block);
*last_line = line_idx;
}
}
ScanResult { next_expected_height: last_height + 1, new_blocks }
}
async fn submit_payload<Engine: PayloadTypes + EngineTypes>(
engine_api_client: &HttpClient<AuthClientService<HttpBackend>>,
@ -53,22 +123,118 @@ async fn submit_payload<Engine: PayloadTypes + EngineTypes>(
Ok(submission.latest_valid_hash.unwrap_or_default())
}
fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime {
OffsetDateTime::from_unix_timestamp_nanos((ts_sec as i128) * 1_000 * 1_000_000)
.expect("timestamp out of range")
}
fn date_from_datetime(dt: OffsetDateTime) -> String {
dt.format(&format_description::parse("[year][month][day]").unwrap()).unwrap()
}
impl BlockIngest {
pub(crate) fn collect_block(&self, height: u64) -> Option<BlockAndReceipts> {
pub(crate) async fn collect_block(&self, height: u64) -> Option<BlockAndReceipts> {
// info!("Attempting to collect block @ height [{height}]");
// Not a one liner (using .or) to include logs
if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning locally synced block for @ Height [{height}]");
return Some(block);
} else {
self.try_collect_s3_block(height)
}
}
pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> {
let f = ((height - 1) / 1_000_000) * 1_000_000;
let s = ((height - 1) / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{height}.rmp.lz4", self.0.to_string_lossy());
let path = format!("{}/{f}/{s}/{height}.rmp.lz4", self.ingest_dir.to_string_lossy());
if std::path::Path::new(&path).exists() {
let file = std::fs::File::open(path).unwrap();
let file = std::io::BufReader::new(file);
let mut decoder = lz4_flex::frame::FrameDecoder::new(file);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder).unwrap();
info!("Returning s3 synced block for @ Height [{height}]");
Some(blocks[0].clone())
} else {
None
}
}
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
let mut u_cache = self.local_blocks_cache.lock().await;
u_cache.remove(&height)
}
async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) {
let Some(root) = &self.local_ingest_dir else { return }; // nothing to do
let root = root.to_owned();
let cache = self.local_blocks_cache.clone();
let precompiles_cache = self.precompiles_cache.clone();
tokio::spawn(async move {
let mut next_height = current_head;
let mut dt = datetime_from_timestamp(current_ts)
.replace_minute(0)
.unwrap()
.replace_second(0)
.unwrap()
.replace_nanosecond(0)
.unwrap();
let mut hour = dt.hour();
let mut day_str = date_from_datetime(dt);
let mut last_line = 0;
loop {
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
if hour_file.exists() {
let ScanResult { next_expected_height, new_blocks } =
scan_hour_file(&hour_file, &mut last_line, next_height);
if !new_blocks.is_empty() {
let mut u_cache = cache.lock().await;
let mut u_pre_cache = precompiles_cache.lock();
for blk in new_blocks {
let precompiles = blk.read_precompile_calls.clone();
let h = match &blk.block {
EvmBlock::Reth115(b) => {
let block_number = b.header().number() as u64;
block_number
}
};
u_cache.insert(h, blk);
u_pre_cache.insert(h, precompiles);
}
next_height = next_expected_height;
}
}
// Decide whether the *current* hour file is closed (past) or
// still live. If its in the past by > 1 h, move to next hour;
// otherwise, keep tailing the same file.
let now = OffsetDateTime::now_utc();
// println!("Date Current {:?}", dt);
// println!("Now Current {:?}", now);
if dt + Duration::HOUR < now {
dt += Duration::HOUR;
hour = dt.hour();
day_str = date_from_datetime(dt);
last_line = 0;
info!(
"Moving to a new file. {:?}",
root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"))
);
continue;
}
tokio::time::sleep(TAIL_INTERVAL).await;
}
});
}
pub(crate) async fn run<Node, Engine, AddOns>(
&self,
node: FullNode<Node, AddOns>,
@ -96,9 +262,19 @@ impl BlockIngest {
let engine_api = node.auth_server_handle().http_client();
let mut evm_map = erc20_contract_to_spot_token(node.chain_spec().chain_id()).await?;
let current_block_timestamp: u64 = provider
.block_by_number(head)
.expect("Failed to fetch current block in db")
.expect("Block does not exist")
.into_header()
.timestamp();
info!("Current height {height}, timestamp {current_block_timestamp}");
self.start_local_ingest_loop(height, current_block_timestamp).await;
loop {
let Some(original_block) = self.collect_block(height) else {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
let Some(original_block) = self.collect_block(height).await else {
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
continue;
};
let EvmBlock::Reth115(mut block) = original_block.block;
@ -111,6 +287,7 @@ impl BlockIngest {
let BlockBody { transactions, ommers, withdrawals } =
std::mem::take(block.body_mut());
let mut system_txs = vec![];
for transaction in original_block.system_txs {
let TypedTransaction::Legacy(tx) = &transaction.tx else {
panic!("Unexpected transaction type");
@ -180,10 +357,12 @@ impl BlockIngest {
PayloadStatusEnum::Valid,
)
.await?;
let current_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
if height % 100 == 0 || current_timestamp - previous_timestamp > 100 {
EngineApiClient::<Engine>::fork_choice_updated_v2(
&engine_api,

View File

@ -9,12 +9,16 @@ mod serialized;
mod spot_meta;
mod tx_forwarder;
use std::{collections::BTreeMap, path::PathBuf, sync::Arc};
use block_ingest::BlockIngest;
use call_forwarder::CallForwarderApiServer;
use clap::{Args, Parser};
use reth::cli::Cli;
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
use reth_hyperliquid_types::PrecompilesCache;
use reth_node_ethereum::EthereumNode;
use tokio::sync::Mutex;
use tracing::info;
use tx_forwarder::EthForwarderApiServer;
@ -37,12 +41,17 @@ fn main() {
std::env::set_var("RUST_BACKTRACE", "1");
}
let precompiles_cache = PrecompilesCache::new(parking_lot::Mutex::new(BTreeMap::new()));
let local_blocks_cache = Arc::new(Mutex::new(BTreeMap::new()));
if let Err(err) = Cli::<EthereumChainSpecParser, HyperliquidExtArgs>::parse().run(
|builder, ext_args| async move {
let ingest_dir = builder.config().ingest_dir.clone().expect("ingest dir not set");
let local_ingest_dir = builder.config().local_ingest_dir.clone();
info!(target: "reth::cli", "Launching node");
let handle = builder
.node(EthereumNode::default())
.add_precompiles_cache(precompiles_cache.clone())
.extend_rpc_modules(move |ctx| {
let upstream_rpc_url = ext_args.upstream_rpc_url;
ctx.modules.replace_configured(
@ -62,7 +71,8 @@ fn main() {
.launch()
.await?;
let ingest = BlockIngest(ingest_dir);
let ingest =
BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache };
ingest.run(handle.node).await.unwrap();
handle.node_exit_future.await
},