mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Compare commits
5 Commits
7df70e9078
...
e60bc3f09a
| Author | SHA1 | Date | |
|---|---|---|---|
| e60bc3f09a | |||
| c7d1f61817 | |||
| 1a50bdfe12 | |||
| e72d7df2eb | |||
| 4e88d19747 |
@ -1,11 +1,12 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::{
|
||||||
use std::io::{BufRead, BufReader};
|
collections::BTreeMap,
|
||||||
use std::path::{Path, PathBuf};
|
io::{BufRead, BufReader},
|
||||||
use std::sync::Arc;
|
path::{Path, PathBuf},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
use alloy_consensus::{BlockBody, BlockHeader, Transaction};
|
use alloy_consensus::{BlockBody, BlockHeader, Transaction};
|
||||||
use alloy_primitives::TxKind;
|
use alloy_primitives::{Address, PrimitiveSignature, TxKind, B256, U256};
|
||||||
use alloy_primitives::{Address, PrimitiveSignature, B256, U256};
|
|
||||||
use alloy_rpc_types::engine::{
|
use alloy_rpc_types::engine::{
|
||||||
ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
|
ExecutionPayloadEnvelopeV3, ForkchoiceState, PayloadAttributes, PayloadStatusEnum,
|
||||||
};
|
};
|
||||||
@ -14,9 +15,7 @@ use reth::network::PeersHandleProvider;
|
|||||||
use reth_chainspec::{EthChainSpec, EthereumHardforks};
|
use reth_chainspec::{EthChainSpec, EthereumHardforks};
|
||||||
use reth_hyperliquid_types::{PrecompileData, PrecompilesCache};
|
use reth_hyperliquid_types::{PrecompileData, PrecompilesCache};
|
||||||
use reth_node_api::{Block, FullNodeComponents, PayloadTypes};
|
use reth_node_api::{Block, FullNodeComponents, PayloadTypes};
|
||||||
use reth_node_builder::EngineTypes;
|
use reth_node_builder::{rpc::RethRpcAddOns, EngineTypes, FullNode, NodeTypesWithEngine};
|
||||||
use reth_node_builder::NodeTypesWithEngine;
|
|
||||||
use reth_node_builder::{rpc::RethRpcAddOns, FullNode};
|
|
||||||
use reth_payload_builder::{EthBuiltPayload, EthPayloadBuilderAttributes, PayloadId};
|
use reth_payload_builder::{EthBuiltPayload, EthPayloadBuilderAttributes, PayloadId};
|
||||||
use reth_primitives::{Transaction as TypedTransaction, TransactionSigned};
|
use reth_primitives::{Transaction as TypedTransaction, TransactionSigned};
|
||||||
use reth_provider::{BlockHashReader, BlockReader, StageCheckpointReader};
|
use reth_provider::{BlockHashReader, BlockReader, StageCheckpointReader};
|
||||||
@ -28,8 +27,10 @@ use time::{format_description, Duration, OffsetDateTime};
|
|||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
|
|
||||||
use crate::serialized::{BlockAndReceipts, EvmBlock};
|
use crate::{
|
||||||
use crate::spot_meta::erc20_contract_to_spot_token;
|
serialized::{BlockAndReceipts, EvmBlock},
|
||||||
|
spot_meta::erc20_contract_to_spot_token,
|
||||||
|
};
|
||||||
|
|
||||||
/// Poll interval when tailing an *open* hourly file.
|
/// Poll interval when tailing an *open* hourly file.
|
||||||
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
|
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
|
||||||
@ -51,7 +52,11 @@ struct ScanResult {
|
|||||||
new_blocks: Vec<BlockAndReceipts>,
|
new_blocks: Vec<BlockAndReceipts>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Result<ScanResult, Box<dyn std::error::Error>> {
|
fn scan_hour_file(
|
||||||
|
path: &Path,
|
||||||
|
last_line: &mut usize,
|
||||||
|
start_height: u64,
|
||||||
|
) -> Result<ScanResult, Box<dyn std::error::Error + Send + Sync>> {
|
||||||
// info!(
|
// info!(
|
||||||
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
|
// "Scanning hour block file @ {:?} for height [{:?}] | Last Line {:?}",
|
||||||
// path, start_height, last_line
|
// path, start_height, last_line
|
||||||
@ -62,7 +67,8 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, start_height: u64) -> Resu
|
|||||||
|
|
||||||
let mut new_blocks = Vec::<BlockAndReceipts>::new();
|
let mut new_blocks = Vec::<BlockAndReceipts>::new();
|
||||||
let mut last_height = start_height;
|
let mut last_height = start_height;
|
||||||
let lines: Vec<String> = reader.lines()
|
let lines: Vec<String> = reader
|
||||||
|
.lines()
|
||||||
.collect::<Result<Vec<_>, _>>()
|
.collect::<Result<Vec<_>, _>>()
|
||||||
.map_err(|e| format!("Failed to read lines from file {}: {}", path.display(), e))?;
|
.map_err(|e| format!("Failed to read lines from file {}: {}", path.display(), e))?;
|
||||||
let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 };
|
let skip = if *last_line == 0 { 0 } else { (last_line.clone()) - 1 };
|
||||||
@ -131,7 +137,8 @@ async fn submit_payload<Engine: PayloadTypes + EngineTypes>(
|
|||||||
engine_api_client,
|
engine_api_client,
|
||||||
envelope.execution_payload,
|
envelope.execution_payload,
|
||||||
versioned_hashes,
|
versioned_hashes,
|
||||||
payload_builder_attributes.parent_beacon_block_root
|
payload_builder_attributes
|
||||||
|
.parent_beacon_block_root
|
||||||
.ok_or("Missing required parent_beacon_block_root")?,
|
.ok_or("Missing required parent_beacon_block_root")?,
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
@ -178,12 +185,12 @@ impl BlockIngest {
|
|||||||
e
|
e
|
||||||
})
|
})
|
||||||
.ok()?;
|
.ok()?;
|
||||||
|
|
||||||
if blocks.is_empty() {
|
if blocks.is_empty() {
|
||||||
tracing::error!("Deserialized empty blocks vector for height {}", height);
|
tracing::error!("Deserialized empty blocks vector for height {}", height);
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Returning s3 synced block for @ Height [{height}]");
|
info!("Returning s3 synced block for @ Height [{height}]");
|
||||||
Some(blocks[0].clone())
|
Some(blocks[0].clone())
|
||||||
}
|
}
|
||||||
@ -221,27 +228,31 @@ impl BlockIngest {
|
|||||||
match scan_result {
|
match scan_result {
|
||||||
Ok(ScanResult { next_expected_height, new_blocks }) => {
|
Ok(ScanResult { next_expected_height, new_blocks }) => {
|
||||||
if !new_blocks.is_empty() {
|
if !new_blocks.is_empty() {
|
||||||
let mut u_cache = cache.lock().await;
|
let mut u_cache = cache.lock().await;
|
||||||
let mut u_pre_cache = precompiles_cache.lock();
|
let mut u_pre_cache = precompiles_cache.lock();
|
||||||
for blk in new_blocks {
|
for blk in new_blocks {
|
||||||
let precompiles = PrecompileData {
|
let precompiles = PrecompileData {
|
||||||
precompiles: blk.read_precompile_calls.clone(),
|
precompiles: blk.read_precompile_calls.clone(),
|
||||||
highest_precompile_address: blk.highest_precompile_address,
|
highest_precompile_address: blk.highest_precompile_address,
|
||||||
};
|
};
|
||||||
let h = match &blk.block {
|
let h = match &blk.block {
|
||||||
EvmBlock::Reth115(b) => {
|
EvmBlock::Reth115(b) => {
|
||||||
let block_number = b.header().number() as u64;
|
let block_number = b.header().number() as u64;
|
||||||
block_number
|
block_number
|
||||||
|
}
|
||||||
|
};
|
||||||
|
u_cache.insert(h, blk);
|
||||||
|
u_pre_cache.insert(h, precompiles);
|
||||||
}
|
}
|
||||||
};
|
next_height = next_expected_height;
|
||||||
u_cache.insert(h, blk);
|
}
|
||||||
u_pre_cache.insert(h, precompiles);
|
|
||||||
}
|
|
||||||
next_height = next_expected_height;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("Failed to scan hour file {}: {}", hour_file.display(), e);
|
tracing::error!(
|
||||||
|
"Failed to scan hour file {}: {}",
|
||||||
|
hour_file.display(),
|
||||||
|
e
|
||||||
|
);
|
||||||
// Continue processing but skip this file
|
// Continue processing but skip this file
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -415,9 +426,11 @@ impl BlockIngest {
|
|||||||
},
|
},
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
.await {
|
.await
|
||||||
|
{
|
||||||
tracing::error!("Failed to update fork choice for block {}: {}", height, e);
|
tracing::error!("Failed to update fork choice for block {}: {}", height, e);
|
||||||
// Continue processing but log the failure - don't panic the entire blockchain
|
// Continue processing but log the failure - don't panic the entire
|
||||||
|
// blockchain
|
||||||
}
|
}
|
||||||
previous_timestamp = current_timestamp;
|
previous_timestamp = current_timestamp;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -149,7 +149,7 @@ impl<N: NetworkPrimitives> NetworkHandle<N> {
|
|||||||
pub async fn transactions_handle(&self) -> Option<TransactionsHandle<N>> {
|
pub async fn transactions_handle(&self) -> Option<TransactionsHandle<N>> {
|
||||||
let (tx, rx) = oneshot::channel();
|
let (tx, rx) = oneshot::channel();
|
||||||
let _ = self.manager().send(NetworkHandleMessage::GetTransactionsHandle(tx));
|
let _ = self.manager().send(NetworkHandleMessage::GetTransactionsHandle(tx));
|
||||||
rx.await.unwrap()
|
rx.await.ok().flatten()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send message to gracefully shutdown node.
|
/// Send message to gracefully shutdown node.
|
||||||
@ -266,7 +266,8 @@ impl<N: NetworkPrimitives> PeersInfo for NetworkHandle<N> {
|
|||||||
builder.udp6(local_node_record.udp_port);
|
builder.udp6(local_node_record.udp_port);
|
||||||
builder.tcp6(local_node_record.tcp_port);
|
builder.tcp6(local_node_record.tcp_port);
|
||||||
}
|
}
|
||||||
builder.build(&self.inner.secret_key).expect("valid enr")
|
builder.build(&self.inner.secret_key)
|
||||||
|
.expect("ENR builder should always succeed with valid IP and ports")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -647,8 +647,11 @@ impl PeersManager {
|
|||||||
|
|
||||||
// remove peer if it has been marked for removal
|
// remove peer if it has been marked for removal
|
||||||
if remove_peer {
|
if remove_peer {
|
||||||
let (peer_id, _) = self.peers.remove_entry(peer_id).expect("peer must exist");
|
if let Some((peer_id, _)) = self.peers.remove_entry(peer_id) {
|
||||||
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
|
self.queued_actions.push_back(PeerAction::PeerRemoved(peer_id));
|
||||||
|
} else {
|
||||||
|
tracing::warn!(target: "net::peers", "Attempted to remove non-existent peer: {:?}", peer_id);
|
||||||
|
}
|
||||||
} else if let Some(backoff_until) = backoff_until {
|
} else if let Some(backoff_until) = backoff_until {
|
||||||
// otherwise, backoff the peer if marked as such
|
// otherwise, backoff the peer if marked as such
|
||||||
self.backoff_peer_until(*peer_id, backoff_until);
|
self.backoff_peer_until(*peer_id, backoff_until);
|
||||||
|
|||||||
@ -391,7 +391,7 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
self.terminate_message = Some((self.to_session_manager.inner().clone(), msg));
|
self.terminate_message = Some((self.to_session_manager.inner().clone(), msg));
|
||||||
self.poll_terminate_message(cx).expect("message is set")
|
self.poll_terminate_message(cx).unwrap_or(Poll::Ready(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Report back that this session has been closed due to an error
|
/// Report back that this session has been closed due to an error
|
||||||
@ -402,7 +402,7 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
|
|||||||
error,
|
error,
|
||||||
};
|
};
|
||||||
self.terminate_message = Some((self.to_session_manager.inner().clone(), msg));
|
self.terminate_message = Some((self.to_session_manager.inner().clone(), msg));
|
||||||
self.poll_terminate_message(cx).expect("message is set")
|
self.poll_terminate_message(cx).unwrap_or(Poll::Ready(()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Starts the disconnect process
|
/// Starts the disconnect process
|
||||||
|
|||||||
@ -2278,16 +2278,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider> StateWriter
|
|||||||
let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
|
let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
|
||||||
for num in block_body.tx_num_range() {
|
for num in block_body.tx_num_range() {
|
||||||
if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
|
if receipts_iter.peek().is_some_and(|(n, _)| *n == num) {
|
||||||
// Safe to unwrap here since we just peeked and confirmed it exists
|
block_receipts.push(receipts_iter.next().unwrap().1);
|
||||||
// However, for maximum safety in database operations, we still handle it
|
|
||||||
if let Some((_, receipt)) = receipts_iter.next() {
|
|
||||||
block_receipts.push(receipt);
|
|
||||||
} else {
|
|
||||||
// This should never happen based on peek(), but handle gracefully
|
|
||||||
return Err(ProviderError::Database(reth_db::DatabaseError::Other(
|
|
||||||
"Receipt iterator state mismatch during state reconstruction".into()
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
receipts.push(block_receipts);
|
receipts.push(block_receipts);
|
||||||
@ -3079,13 +3070,9 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypesForProvider + 'static> BlockWrite
|
|||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Safe to unwrap after empty check, but use defensive programming for critical DB ops
|
let first_number = blocks.first().unwrap().number();
|
||||||
let first_number = blocks.first()
|
|
||||||
.expect("Blocks vector guaranteed non-empty after length check")
|
|
||||||
.number();
|
|
||||||
|
|
||||||
let last = blocks.last()
|
let last = blocks.last().unwrap();
|
||||||
.expect("Blocks vector guaranteed non-empty after length check");
|
|
||||||
let last_block_number = last.number();
|
let last_block_number = last.number();
|
||||||
|
|
||||||
let mut durations_recorder = metrics::DurationsRecorder::default();
|
let mut durations_recorder = metrics::DurationsRecorder::default();
|
||||||
@ -3215,4 +3202,4 @@ impl<TX: DbTx + 'static, N: NodeTypes + 'static> DBProvider for DatabaseProvider
|
|||||||
fn prune_modes_ref(&self) -> &PruneModes {
|
fn prune_modes_ref(&self) -> &PruneModes {
|
||||||
self.prune_modes_ref()
|
self.prune_modes_ref()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -171,4 +171,4 @@ fn range_size_hint(range: &impl RangeBounds<u64>) -> Option<usize> {
|
|||||||
Bound::Unbounded => return None,
|
Bound::Unbounded => return None,
|
||||||
};
|
};
|
||||||
end.checked_sub(start).map(|x| x as _)
|
end.checked_sub(start).map(|x| x as _)
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user