Files
nanoreth/bin/reth/src/block_ingest.rs
2025-08-25 15:04:15 +05:30

443 lines
18 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::{
collections::BTreeMap,
io::{BufRead, BufReader},
path::{Path, PathBuf},
sync::Arc,
};
use alloy_consensus::{BlockBody, BlockHeader, Transaction};
use alloy_primitives::{Address, PrimitiveSignature, TxKind, B256, U256};
use alloy_rpc_types::engine::{
ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
};
use jsonrpsee::http_client::{transport::HttpBackend, HttpClient};
use reth::network::PeersHandleProvider;
use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_hyperliquid_types::{PrecompileData, PrecompilesCache};
use reth_node_api::{Block, FullNodeComponents, PayloadTypes};
use reth_node_builder::{rpc::RethRpcAddOns, EngineTypes, FullNode, NodeTypesWithEngine};
use reth_payload_builder::{EthBuiltPayload, EthPayloadBuilderAttributes, PayloadId};
use reth_primitives::{Transaction as TypedTransaction, TransactionSigned};
use reth_provider::{BlockHashReader, BlockReader, StageCheckpointReader};
use reth_rpc_api::EngineApiClient;
use reth_rpc_layer::AuthClientService;
use reth_stages::StageId;
use serde::Deserialize;
use time::{format_description, Duration, OffsetDateTime};
use tokio::sync::Mutex;
use tracing::{debug, info};
use crate::{
serialized::{BlockAndReceipts, EvmBlock},
spot_meta::erc20_contract_to_spot_token,
};
/// Poll interval when tailing an *open* hourly file.
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
/// Subdirectory that contains day folders (inside `local_ingest_dir`).
const HOURLY_SUBDIR: &str = "hourly";
pub(crate) struct BlockIngest {
pub ingest_dir: PathBuf,
pub local_ingest_dir: Option<PathBuf>,
pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block
pub precompiles_cache: PrecompilesCache,
}
#[derive(Deserialize)]
struct LocalBlockAndReceipts(String, BlockAndReceipts);
struct ScanResult {
next_expected_height: u64,
new_blocks: Vec<BlockAndReceipts>,
}
fn scan_hour_file(
path: &Path,
last_line: &mut usize,
start_height: u64,
) -> Result<ScanResult, Box<dyn std::error::Error + Send + Sync>> {
// info!(
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
// path, start_height, last_line
// );
let file = std::fs::File::open(path)
.map_err(|e| format!("Failed to open hour file path {}: {}", path.display(), e))?;
let reader = BufReader::new(file);
let mut new_blocks = Vec::<BlockAndReceipts>::new();
let mut last_height = start_height;
let lines: Vec<String> = reader
.lines()
.collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("Failed to read lines from file {}: {}", path.display(), e))?;
let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 };
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
// Safety check ensuring efficiency
if line_idx < *last_line {
continue;
}
if line.trim().is_empty() {
continue;
}
let (_block_timestamp, parsed_block) = match serde_json::from_str(&line) {
Ok(LocalBlockAndReceipts(_block_timestamp, parsed_block)) => {
(_block_timestamp, parsed_block)
}
Err(_) => {
// Possible scenarios:
let is_last_line = line_idx == lines.len() - 1;
if is_last_line {
// 1. It's not written fully yet - in this case, just wait for the next line
break;
} else {
// 2. hl-node previously terminated while writing the lines
// In this case, try to skip this line
continue;
}
}
};
let height = match &parsed_block.block {
EvmBlock::Reth115(b) => {
let block_number = b.header().number() as u64;
// Another check to ensure not returning an older block
if block_number < start_height {
continue;
}
block_number
}
};
// println!("Iterating block height {:?} | Line {}", height, line_idx);
if height >= start_height {
last_height = last_height.max(height);
new_blocks.push(parsed_block);
*last_line = line_idx;
}
}
Ok(ScanResult { next_expected_height: last_height + 1, new_blocks })
}
async fn submit_payload<Engine: PayloadTypes + EngineTypes>(
engine_api_client: &HttpClient<AuthClientService<HttpBackend>>,
payload: EthBuiltPayload,
payload_builder_attributes: EthPayloadBuilderAttributes,
expected_status: PayloadStatusEnum,
) -> Result<B256, Box<dyn std::error::Error>> {
let versioned_hashes =
payload.block().blob_versioned_hashes_iter().copied().collect::<Vec<_>>();
// submit payload to engine api
let submission = {
let envelope: ExecutionPayloadEnvelopeV3 =
<EthBuiltPayload as Into<ExecutionPayloadEnvelopeV3>>::into(payload);
EngineApiClient::<Engine>::new_payload_v3(
engine_api_client,
envelope.execution_payload,
versioned_hashes,
payload_builder_attributes
.parent_beacon_block_root
.ok_or("Missing required parent_beacon_block_root")?,
)
.await?
};
assert_eq!(submission.status.as_str(), expected_status.as_str());
Ok(submission.latest_valid_hash.unwrap_or_default())
}
fn datetime_from_timestamp(ts_sec: u64) -> OffsetDateTime {
OffsetDateTime::from_unix_timestamp_nanos((ts_sec as i128) * 1_000 * 1_000_000)
.expect("timestamp out of range")
}
fn date_from_datetime(dt: OffsetDateTime) -> String {
// Format string is constant and guaranteed to be valid
dt.format(&format_description::parse("[year][month][day]").expect("Valid format string"))
.expect("DateTime formatting should always succeed with valid format")
}
impl BlockIngest {
pub(crate) async fn collect_block(&self, height: u64) -> Option<BlockAndReceipts> {
// info!("Attempting to collect block @ height [{height}]");
// Not a one liner (using .or) to include logs
if let Some(block) = self.try_collect_local_block(height).await {
info!("Returning locally synced block for @ Height [{height}]");
return Some(block);
} else {
self.try_collect_s3_block(height)
}
}
pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> {
let f = ((height - 1) / 1_000_000) * 1_000_000;
let s = ((height - 1) / 1_000) * 1_000;
let path = format!("{}/{f}/{s}/{height}.rmp.lz4", self.ingest_dir.to_string_lossy());
let file = std::fs::read(path).ok()?;
let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)
.map_err(|e| {
tracing::error!("Failed to deserialize block data for height {}: {}", height, e);
e
})
.ok()?;
if blocks.is_empty() {
tracing::error!("Deserialized empty blocks vector for height {}", height);
return None;
}
info!("Returning s3 synced block for @ Height [{height}]");
Some(blocks[0].clone())
}
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
let mut u_cache = self.local_blocks_cache.lock().await;
u_cache.remove(&height)
}
async fn start_local_ingest_loop(&self, current_head: u64, current_ts: u64) {
let Some(root) = &self.local_ingest_dir else { return }; // nothing to do
let root = root.to_owned();
let cache = self.local_blocks_cache.clone();
let precompiles_cache = self.precompiles_cache.clone();
tokio::spawn(async move {
let mut next_height = current_head;
let mut dt = datetime_from_timestamp(current_ts)
.replace_minute(0)
.expect("Valid minute replacement")
.replace_second(0)
.expect("Valid second replacement")
.replace_nanosecond(0)
.expect("Valid nanosecond replacement");
let mut hour = dt.hour();
let mut day_str = date_from_datetime(dt);
let mut last_line = 0;
loop {
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
if hour_file.exists() {
let scan_result = scan_hour_file(&hour_file, &mut last_line, next_height);
match scan_result {
Ok(ScanResult { next_expected_height, new_blocks }) => {
if !new_blocks.is_empty() {
let mut u_cache = cache.lock().await;
let mut u_pre_cache = precompiles_cache.lock();
for blk in new_blocks {
let precompiles = PrecompileData {
precompiles: blk.read_precompile_calls.clone(),
highest_precompile_address: blk.highest_precompile_address,
};
let h = match &blk.block {
EvmBlock::Reth115(b) => {
let block_number = b.header().number() as u64;
block_number
}
};
u_cache.insert(h, blk);
u_pre_cache.insert(h, precompiles);
}
next_height = next_expected_height;
}
}
Err(e) => {
tracing::error!(
"Failed to scan hour file {}: {}",
hour_file.display(),
e
);
// Continue processing but skip this file
}
}
}
// Decide whether the *current* hour file is closed (past) or
// still live. If its in the past by > 1 h, move to next hour;
// otherwise, keep tailing the same file.
let now = OffsetDateTime::now_utc();
// println!("Date Current {:?}", dt);
// println!("Now Current {:?}", now);
if dt + Duration::HOUR < now {
dt += Duration::HOUR;
hour = dt.hour();
day_str = date_from_datetime(dt);
last_line = 0;
info!(
"Moving to a new file. {:?}",
root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"))
);
continue;
}
tokio::time::sleep(TAIL_INTERVAL).await;
}
});
}
pub(crate) async fn run<Node, Engine, AddOns>(
&self,
node: FullNode<Node, AddOns>,
) -> Result<(), Box<dyn std::error::Error>>
where
Node: FullNodeComponents,
AddOns: RethRpcAddOns<Node>,
Engine: EngineTypes,
Node::Types: NodeTypesWithEngine<ChainSpec: EthereumHardforks, Engine = Engine>,
Node::Network: PeersHandleProvider,
AddOns: RethRpcAddOns<Node>,
Engine::ExecutionPayloadEnvelopeV3: From<Engine::BuiltPayload>,
Engine::ExecutionPayloadEnvelopeV4: From<Engine::BuiltPayload>,
{
let provider = &node.provider;
let checkpoint = provider.get_stage_checkpoint(StageId::Finish)?;
let head = checkpoint.unwrap_or_default().block_number;
let genesis_hash = node.chain_spec().genesis_hash();
let mut height = head + 1;
let mut previous_hash = provider.block_hash(head)?.unwrap_or(genesis_hash);
let mut previous_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("System time should be after UNIX epoch")
.as_millis();
let engine_api = node.auth_server_handle().http_client();
let mut evm_map = erc20_contract_to_spot_token(node.chain_spec().chain_id()).await?;
const MINIMUM_TIMESTAMP: u64 = 1739849780;
let current_block_timestamp: u64 = provider
.block_by_number(head)
.map_err(|e| format!("Database error fetching block {}: {}", head, e))?
.ok_or_else(|| format!("Block {} does not exist in database", head))?
.into_header()
.timestamp();
let current_block_timestamp = current_block_timestamp.max(MINIMUM_TIMESTAMP);
info!("Current height {height}, timestamp {current_block_timestamp}");
self.start_local_ingest_loop(height, current_block_timestamp).await;
loop {
let Some(original_block) = self.collect_block(height).await else {
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
continue;
};
let EvmBlock::Reth115(mut block) = original_block.block;
{
debug!(target: "reth::cli", ?block, "Built new payload");
let timestamp = block.header().timestamp();
let block_hash = block.clone().try_recover()?.hash();
{
let BlockBody { transactions, ommers, withdrawals } =
std::mem::take(block.body_mut());
let mut system_txs = vec![];
for transaction in original_block.system_txs {
let TypedTransaction::Legacy(tx) = &transaction.tx else {
panic!("Unexpected transaction type");
};
let TxKind::Call(to) = tx.to else {
panic!("Unexpected contract creation");
};
let s = if tx.input().is_empty() {
U256::from(0x1)
} else {
loop {
if let Some(spot) = evm_map.get(&to) {
break spot.to_s();
}
info!(
"Contract not found: {:?} from spot mapping, fetching again...",
to
);
evm_map =
erc20_contract_to_spot_token(node.chain_spec().chain_id())
.await?;
}
};
let signature = PrimitiveSignature::new(
// from anvil
U256::from(0x1),
s,
true,
);
let typed_transaction = transaction.tx;
let tx = TransactionSigned::new(
typed_transaction,
signature,
Default::default(),
);
system_txs.push(tx);
}
let mut txs = vec![];
txs.extend(system_txs);
txs.extend(transactions);
*block.body_mut() = BlockBody { transactions: txs, ommers, withdrawals };
}
let total_fees = U256::ZERO;
let payload = EthBuiltPayload::new(
PayloadId::new(height.to_be_bytes()),
Arc::new(block),
total_fees,
None,
);
let attributes = EthPayloadBuilderAttributes::new(
B256::ZERO,
PayloadAttributes {
timestamp,
prev_randao: B256::ZERO,
suggested_fee_recipient: Address::ZERO,
withdrawals: Some(vec![]),
parent_beacon_block_root: Some(B256::ZERO),
},
);
submit_payload::<Engine>(
&engine_api,
payload,
attributes,
PayloadStatusEnum::Valid,
)
.await?;
let current_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("System time should be after UNIX epoch")
.as_millis();
if height % 100 == 0 || current_timestamp - previous_timestamp > 100 {
if let Err(e) = EngineApiClient::<Engine>::fork_choice_updated_v2(
&engine_api,
ForkchoiceState {
head_block_hash: block_hash,
safe_block_hash: previous_hash,
finalized_block_hash: previous_hash,
},
None,
)
.await
{
tracing::error!("Failed to update fork choice for block {}: {}", height, e);
// Continue processing but log the failure - don't panic the entire
// blockchain
}
previous_timestamp = current_timestamp;
}
previous_hash = block_hash;
}
height += 1;
}
}
}