refactor: Reduce unnecessary LoC

By claude code
This commit is contained in:
sprites0
2025-08-23 04:21:23 -04:00
parent f576dddfa6
commit bae68ef8db
7 changed files with 229 additions and 388 deletions

View File

@ -1,3 +1,12 @@
//! Overrides for RPC methods to post-filter system transactions and logs.
//!
//! System transactions are always at the beginning of the block,
//! so we can use the transaction index to determine if the log is from a system transaction,
//! and if it is, we can exclude it.
//!
//! For non-system transactions, we can just return the log as is, and the client will
//! adjust the transaction index accordingly.
use alloy_consensus::{transaction::TransactionMeta, TxReceipt}; use alloy_consensus::{transaction::TransactionMeta, TxReceipt};
use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_json_rpc::RpcObject; use alloy_json_rpc::RpcObject;
@ -48,20 +57,19 @@ pub trait EthWrapper:
{ {
} }
impl< impl<T> EthWrapper for T where
T: EthApiServer< T: EthApiServer<
RpcTxReq<Self::NetworkTypes>, RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>, RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>, RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>, RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>, RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes<Primitives = HlPrimitives> > + FullEthApiTypes<Primitives = HlPrimitives>
+ RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
+ EthBlocks + EthBlocks
+ EthTransactions + EthTransactions
+ LoadReceipt + LoadReceipt
+ 'static, + 'static
> EthWrapper for T
{ {
} }
@ -80,19 +88,16 @@ impl<Eth: EthWrapper> HlNodeFilterHttp<Eth> {
impl<Eth: EthWrapper> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>> impl<Eth: EthWrapper> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>>
for HlNodeFilterHttp<Eth> for HlNodeFilterHttp<Eth>
{ {
/// Handler for `eth_newFilter`
async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> { async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newFilter"); trace!(target: "rpc::eth", "Serving eth_newFilter");
self.filter.new_filter(filter).await self.filter.new_filter(filter).await
} }
/// Handler for `eth_newBlockFilter`
async fn new_block_filter(&self) -> RpcResult<FilterId> { async fn new_block_filter(&self) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newBlockFilter"); trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
self.filter.new_block_filter().await self.filter.new_block_filter().await
} }
/// Handler for `eth_newPendingTransactionFilter`
async fn new_pending_transaction_filter( async fn new_pending_transaction_filter(
&self, &self,
kind: Option<PendingTransactionFilterKind>, kind: Option<PendingTransactionFilterKind>,
@ -101,7 +106,6 @@ impl<Eth: EthWrapper> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>>
self.filter.new_pending_transaction_filter(kind).await self.filter.new_pending_transaction_filter(kind).await
} }
/// Handler for `eth_getFilterChanges`
async fn filter_changes( async fn filter_changes(
&self, &self,
id: FilterId, id: FilterId,
@ -110,31 +114,20 @@ impl<Eth: EthWrapper> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>>
self.filter.filter_changes(id).await.map_err(ErrorObject::from) self.filter.filter_changes(id).await.map_err(ErrorObject::from)
} }
/// Returns an array of all logs matching filter with given id.
///
/// Returns an error if no matching log filter exists.
///
/// Handler for `eth_getFilterLogs`
async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> { async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getFilterLogs"); trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
self.filter.filter_logs(id).await.map_err(ErrorObject::from) self.filter.filter_logs(id).await.map_err(ErrorObject::from)
} }
/// Handler for `eth_uninstallFilter`
async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> { async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
trace!(target: "rpc::eth", "Serving eth_uninstallFilter"); trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
self.filter.uninstall_filter(id).await self.filter.uninstall_filter(id).await
} }
/// Returns logs matching given filter object.
///
/// Handler for `eth_getLogs`
async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> { async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getLogs"); trace!(target: "rpc::eth", "Serving eth_getLogs");
let logs = EthFilterApiServer::logs(&*self.filter, filter).await?; let logs = EthFilterApiServer::logs(&*self.filter, filter).await?;
let provider = self.provider.clone(); Ok(logs.into_iter().filter_map(|log| adjust_log::<Eth>(log, &self.provider)).collect())
Ok(logs.into_iter().filter_map(|log| adjust_log::<Eth>(log, &provider)).collect())
} }
} }
@ -158,7 +151,6 @@ impl<Eth: EthWrapper> HlNodeFilterWs<Eth> {
impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>>
for HlNodeFilterWs<Eth> for HlNodeFilterWs<Eth>
{ {
/// Handler for `eth_subscribe`
async fn subscribe( async fn subscribe(
&self, &self,
pending: PendingSubscriptionSink, pending: PendingSubscriptionSink,
@ -166,16 +158,12 @@ impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>>
params: Option<Params>, params: Option<Params>,
) -> jsonrpsee::core::SubscriptionResult { ) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?; let sink = pending.accept().await?;
let pubsub = self.pubsub.clone(); let (pubsub, provider) = (self.pubsub.clone(), self.provider.clone());
let provider = self.provider.clone();
self.subscription_task_spawner.spawn(Box::pin(async move { self.subscription_task_spawner.spawn(Box::pin(async move {
if kind == SubscriptionKind::Logs { if kind == SubscriptionKind::Logs {
// if no params are provided, used default filter params
let filter = match params { let filter = match params {
Some(Params::Logs(filter)) => *filter, Some(Params::Logs(f)) => *f,
Some(Params::Bool(_)) => { Some(Params::Bool(_)) => return,
return;
}
_ => Default::default(), _ => Default::default(),
}; };
let _ = pipe_from_stream( let _ = pipe_from_stream(
@ -185,46 +173,30 @@ impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>>
.await; .await;
} else { } else {
let _ = pubsub.handle_accepted(sink, kind, params).await; let _ = pubsub.handle_accepted(sink, kind, params).await;
}; }
})); }));
Ok(()) Ok(())
} }
} }
fn adjust_log<Eth: EthWrapper>(mut log: Log, provider: &Eth::Provider) -> Option<Log> { fn adjust_log<Eth: EthWrapper>(mut log: Log, provider: &Eth::Provider) -> Option<Log> {
let transaction_index = log.transaction_index?; let (tx_idx, log_idx) = (log.transaction_index?, log.log_index?);
let log_index = log.log_index?;
let receipts = provider.receipts_by_block(log.block_number?.into()).unwrap()?; let receipts = provider.receipts_by_block(log.block_number?.into()).unwrap()?;
let (mut sys_tx_count, mut sys_log_count) = (0u64, 0u64);
// System transactions are always at the beginning of the block,
// so we can use the transaction index to determine if the log is from a system transaction,
// and if it is, we can exclude it.
//
// For non-system transactions, we can just return the log as is, and the client will
// adjust the transaction index accordingly.
let mut system_tx_count = 0u64;
let mut system_tx_logs_count = 0u64;
for receipt in receipts { for receipt in receipts {
let is_system_tx = receipt.cumulative_gas_used() == 0; if receipt.cumulative_gas_used() == 0 {
if is_system_tx { sys_tx_count += 1;
system_tx_count += 1; sys_log_count += receipt.logs().len() as u64;
system_tx_logs_count += receipt.logs().len() as u64;
} }
} }
if sys_tx_count > tx_idx {
if system_tx_count > transaction_index {
return None; return None;
} }
log.transaction_index = Some(tx_idx - sys_tx_count);
log.transaction_index = Some(transaction_index - system_tx_count); log.log_index = Some(log_idx - sys_log_count);
log.log_index = Some(log_index - system_tx_logs_count);
Some(log) Some(log)
} }
/// Helper to convert a serde error into an [`ErrorObject`]
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[error("Failed to serialize subscription item: {0}")] #[error("Failed to serialize subscription item: {0}")]
pub struct SubscriptionSerializeError(#[from] serde_json::Error); pub struct SubscriptionSerializeError(#[from] serde_json::Error);
@ -241,37 +213,18 @@ impl From<SubscriptionSerializeError> for ErrorObject<'static> {
} }
} }
async fn pipe_from_stream<T, St>( async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
sink: SubscriptionSink, sink: SubscriptionSink,
mut stream: St, mut stream: St,
) -> Result<(), ErrorObject<'static>> ) -> Result<(), ErrorObject<'static>> {
where
St: Stream<Item = T> + Unpin,
T: Serialize,
{
loop { loop {
tokio::select! { tokio::select! {
_ = sink.closed() => { _ = sink.closed() => break Ok(()),
// connection dropped
break Ok(())
},
maybe_item = stream.next() => { maybe_item = stream.next() => {
let item = match maybe_item { let Some(item) = maybe_item else { break Ok(()) };
Some(item) => item, let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
None => { .map_err(SubscriptionSerializeError::new)?;
// stream ended if sink.send(msg).await.is_err() { break Ok(()); }
break Ok(())
},
};
let msg = SubscriptionMessage::new(
sink.method_name(),
sink.subscription_id(),
&item
).map_err(SubscriptionSerializeError::new)?;
if sink.send(msg).await.is_err() {
break Ok(());
}
} }
} }
} }
@ -410,7 +363,8 @@ async fn adjust_transaction_receipt<Eth: EthWrapper>(
) -> Result<Option<RpcReceipt<Eth::NetworkTypes>>, Eth::Error> { ) -> Result<Option<RpcReceipt<Eth::NetworkTypes>>, Eth::Error> {
match eth_api.load_transaction_and_receipt(tx_hash).await? { match eth_api.load_transaction_and_receipt(tx_hash).await? {
Some((_, meta, _)) => { Some((_, meta, _)) => {
// LoadReceipt::block_transaction_receipt loads the block again, so loading blocks again doesn't hurt performance much // LoadReceipt::block_transaction_receipt loads the block again, so loading blocks again
// doesn't hurt performance much
info!("block hash: {:?}", meta.block_hash); info!("block hash: {:?}", meta.block_hash);
let Some((system_tx_count, block_receipts)) = let Some((system_tx_count, block_receipts)) =
adjust_block_receipts(meta.block_hash.into(), eth_api).await? adjust_block_receipts(meta.block_hash.into(), eth_api).await?
@ -464,8 +418,9 @@ where
let res = let res =
self.eth_api.block_transaction_count_by_hash(hash).instrument(engine_span!()).await?; self.eth_api.block_transaction_count_by_hash(hash).instrument(engine_span!()).await?;
Ok(res.map(|count| { Ok(res.map(|count| {
count let sys_tx_count =
- U256::from(system_tx_count_for_block(&*self.eth_api, BlockId::Hash(hash.into()))) system_tx_count_for_block(&*self.eth_api, BlockId::Hash(hash.into()));
count - U256::from(sys_tx_count)
})) }))
} }

View File

@ -71,10 +71,10 @@ where
let timestamp = evm_env.block_env.timestamp.saturating_to(); let timestamp = evm_env.block_env.timestamp.saturating_to();
// Filter out system tx receipts // Filter out system tx receipts
let transactions_for_root: Vec<TransactionSigned> = let transactions_for_root: Vec<_> =
transactions.iter().filter(|t| !is_system_transaction(t)).cloned().collect::<Vec<_>>(); transactions.iter().filter(|t| !is_system_transaction(t)).cloned().collect();
let receipts_for_root: Vec<Receipt> = let receipts_for_root: Vec<_> =
receipts.iter().filter(|r| r.cumulative_gas_used() != 0).cloned().collect::<Vec<_>>(); receipts.iter().filter(|r| r.cumulative_gas_used() != 0).cloned().collect();
let transactions_root = proofs::calculate_transaction_root(&transactions_for_root); let transactions_root = proofs::calculate_transaction_root(&transactions_for_root);
let receipts_root = Receipt::calculate_receipt_root_no_memo(&receipts_for_root); let receipts_root = Receipt::calculate_receipt_root_no_memo(&receipts_for_root);
@ -295,7 +295,6 @@ where
// configure evm env based on parent block // configure evm env based on parent block
let mut cfg_env = let mut cfg_env =
CfgEnv::new().with_chain_id(self.chain_spec().chain().id()).with_spec(spec); CfgEnv::new().with_chain_id(self.chain_spec().chain().id()).with_spec(spec);
if let Some(blob_params) = &blob_params { if let Some(blob_params) = &blob_params {
cfg_env.set_max_blobs_per_tx(blob_params.max_blobs_per_tx); cfg_env.set_max_blobs_per_tx(blob_params.max_blobs_per_tx);
} }
@ -376,10 +375,6 @@ where
block: &'a SealedBlock<BlockTy<Self::Primitives>>, block: &'a SealedBlock<BlockTy<Self::Primitives>>,
) -> ExecutionCtxFor<'a, Self> { ) -> ExecutionCtxFor<'a, Self> {
let block_body = block.body(); let block_body = block.body();
let extras = HlExtras {
read_precompile_calls: block_body.read_precompile_calls.clone(),
highest_precompile_address: block_body.highest_precompile_address,
};
HlBlockExecutionCtx { HlBlockExecutionCtx {
ctx: EthBlockExecutionCtx { ctx: EthBlockExecutionCtx {
parent_hash: block.header().parent_hash, parent_hash: block.header().parent_hash,
@ -387,7 +382,10 @@ where
ommers: &block.body().ommers, ommers: &block.body().ommers,
withdrawals: block.body().withdrawals.as_ref().map(Cow::Borrowed), withdrawals: block.body().withdrawals.as_ref().map(Cow::Borrowed),
}, },
extras, extras: HlExtras {
read_precompile_calls: block_body.read_precompile_calls.clone(),
highest_precompile_address: block_body.highest_precompile_address,
},
} }
} }
@ -403,8 +401,7 @@ where
ommers: &[], ommers: &[],
withdrawals: attributes.withdrawals.map(Cow::Owned), withdrawals: attributes.withdrawals.map(Cow::Owned),
}, },
// TODO: hacky, double check if this is correct extras: HlExtras::default(), // TODO: hacky, double check if this is correct
extras: HlExtras::default(),
} }
} }
} }
@ -416,10 +413,6 @@ impl ConfigureEngineEvm<HlExecutionData> for HlEvmConfig {
fn context_for_payload<'a>(&self, payload: &'a HlExecutionData) -> ExecutionCtxFor<'a, Self> { fn context_for_payload<'a>(&self, payload: &'a HlExecutionData) -> ExecutionCtxFor<'a, Self> {
let block = &payload.0; let block = &payload.0;
let extras = HlExtras {
read_precompile_calls: block.body.read_precompile_calls.clone(),
highest_precompile_address: block.body.highest_precompile_address,
};
HlBlockExecutionCtx { HlBlockExecutionCtx {
ctx: EthBlockExecutionCtx { ctx: EthBlockExecutionCtx {
parent_hash: block.header.parent_hash, parent_hash: block.header.parent_hash,
@ -427,7 +420,10 @@ impl ConfigureEngineEvm<HlExecutionData> for HlEvmConfig {
ommers: &block.body.ommers, ommers: &block.body.ommers,
withdrawals: block.body.withdrawals.as_ref().map(Cow::Borrowed), withdrawals: block.body.withdrawals.as_ref().map(Cow::Borrowed),
}, },
extras, extras: HlExtras {
read_precompile_calls: block.body.read_precompile_calls.clone(),
highest_precompile_address: block.body.highest_precompile_address,
},
} }
} }

View File

@ -89,7 +89,6 @@ where
/// Process a new payload and return the outcome /// Process a new payload and return the outcome
fn new_payload(&self, block: BlockMsg, peer_id: PeerId) -> ImportFut { fn new_payload(&self, block: BlockMsg, peer_id: PeerId) -> ImportFut {
let engine = self.engine.clone(); let engine = self.engine.clone();
Box::pin(async move { Box::pin(async move {
let sealed_block = block.block.0.block.clone().seal(); let sealed_block = block.block.0.block.clone().seal();
let payload = HlPayloadTypes::block_to_payload(sealed_block); let payload = HlPayloadTypes::block_to_payload(sealed_block);
@ -107,7 +106,7 @@ where
.into(), .into(),
_ => None, _ => None,
}, },
Err(err) => None, Err(_) => None,
} }
}) })
} }
@ -117,15 +116,10 @@ where
let engine = self.engine.clone(); let engine = self.engine.clone();
let consensus = self.consensus.clone(); let consensus = self.consensus.clone();
let sealed_block = block.block.0.block.clone().seal(); let sealed_block = block.block.0.block.clone().seal();
let hash = sealed_block.hash(); let (hash, number) = (sealed_block.hash(), sealed_block.number());
let number = sealed_block.number();
Box::pin(async move { Box::pin(async move {
let (head_block_hash, current_hash) = match consensus.canonical_head(hash, number) { let (head_block_hash, _) = consensus.canonical_head(hash, number).ok()?;
Ok(hash) => hash,
Err(_) => return None,
};
let state = ForkchoiceState { let state = ForkchoiceState {
head_block_hash, head_block_hash,
safe_block_hash: head_block_hash, safe_block_hash: head_block_hash,
@ -146,18 +140,15 @@ where
.into(), .into(),
_ => None, _ => None,
}, },
Err(err) => None, Err(_) => None,
} }
}) })
} }
/// Add a new block import task to the pending imports /// Add a new block import task to the pending imports
fn on_new_block(&mut self, block: BlockMsg, peer_id: PeerId) { fn on_new_block(&mut self, block: BlockMsg, peer_id: PeerId) {
let payload_fut = self.new_payload(block.clone(), peer_id); self.pending_imports.push(self.new_payload(block.clone(), peer_id));
self.pending_imports.push(payload_fut); self.pending_imports.push(self.update_fork_choice(block, peer_id));
let fcu_fut = self.update_fork_choice(block, peer_id);
self.pending_imports.push(fcu_fut);
} }
} }
@ -176,11 +167,9 @@ where
} }
// Process completed imports and send events to network // Process completed imports and send events to network
while let Poll::Ready(Some(outcome)) = this.pending_imports.poll_next_unpin(cx) { while let Poll::Ready(Some(Some(outcome))) = this.pending_imports.poll_next_unpin(cx) {
if let Some(outcome) = outcome { if let Err(e) = this.to_network.send(BlockImportEvent::Outcome(outcome)) {
if let Err(e) = this.to_network.send(BlockImportEvent::Outcome(outcome)) { return Poll::Ready(Err(Box::new(e)));
return Poll::Ready(Err(Box::new(e)));
}
} }
} }
@ -261,15 +250,12 @@ mod tests {
fn chain_info(&self) -> Result<ChainInfo, ProviderError> { fn chain_info(&self) -> Result<ChainInfo, ProviderError> {
unimplemented!() unimplemented!()
} }
fn best_block_number(&self) -> Result<u64, ProviderError> { fn best_block_number(&self) -> Result<u64, ProviderError> {
Ok(0) Ok(0)
} }
fn last_block_number(&self) -> Result<u64, ProviderError> { fn last_block_number(&self) -> Result<u64, ProviderError> {
Ok(0) Ok(0)
} }
fn block_number(&self, _hash: B256) -> Result<Option<u64>, ProviderError> { fn block_number(&self, _hash: B256) -> Result<Option<u64>, ProviderError> {
Ok(None) Ok(None)
} }
@ -279,7 +265,6 @@ mod tests {
fn block_hash(&self, _number: u64) -> Result<Option<B256>, ProviderError> { fn block_hash(&self, _number: u64) -> Result<Option<B256>, ProviderError> {
Ok(Some(B256::ZERO)) Ok(Some(B256::ZERO))
} }
fn canonical_hashes_range( fn canonical_hashes_range(
&self, &self,
_start: u64, _start: u64,
@ -299,14 +284,12 @@ mod tests {
fn both_valid() -> Self { fn both_valid() -> Self {
Self { new_payload: PayloadStatusEnum::Valid, fcu: PayloadStatusEnum::Valid } Self { new_payload: PayloadStatusEnum::Valid, fcu: PayloadStatusEnum::Valid }
} }
fn invalid_new_payload() -> Self { fn invalid_new_payload() -> Self {
Self { Self {
new_payload: PayloadStatusEnum::Invalid { validation_error: "test error".into() }, new_payload: PayloadStatusEnum::Invalid { validation_error: "test error".into() },
fcu: PayloadStatusEnum::Valid, fcu: PayloadStatusEnum::Valid,
} }
} }
fn invalid_fcu() -> Self { fn invalid_fcu() -> Self {
Self { Self {
new_payload: PayloadStatusEnum::Valid, new_payload: PayloadStatusEnum::Valid,
@ -326,19 +309,15 @@ mod tests {
let consensus = Arc::new(HlConsensus { provider: MockProvider }); let consensus = Arc::new(HlConsensus { provider: MockProvider });
let (to_engine, from_engine) = mpsc::unbounded_channel(); let (to_engine, from_engine) = mpsc::unbounded_channel();
let engine_handle = ConsensusEngineHandle::new(to_engine); let engine_handle = ConsensusEngineHandle::new(to_engine);
handle_engine_msg(from_engine, responses).await; handle_engine_msg(from_engine, responses).await;
let (to_import, from_network) = mpsc::unbounded_channel(); let (to_import, from_network) = mpsc::unbounded_channel();
let (to_network, import_outcome) = mpsc::unbounded_channel(); let (to_network, import_outcome) = mpsc::unbounded_channel();
let handle = ImportHandle::new(to_import, import_outcome); let handle = ImportHandle::new(to_import, import_outcome);
let service = ImportService::new(consensus, engine_handle, from_network, to_network); let service = ImportService::new(consensus, engine_handle, from_network, to_network);
tokio::spawn(Box::pin(async move { tokio::spawn(Box::pin(async move {
service.await.unwrap(); service.await.unwrap();
})); }));
Self { handle } Self { handle }
} }

View File

@ -69,32 +69,22 @@ mod rlp {
impl<'a> From<&'a HlNewBlock> for HlNewBlockHelper<'a> { impl<'a> From<&'a HlNewBlock> for HlNewBlockHelper<'a> {
fn from(value: &'a HlNewBlock) -> Self { fn from(value: &'a HlNewBlock) -> Self {
let HlNewBlock(NewBlock { let b = &value.0.block;
block:
HlBlock {
header,
body:
HlBlockBody {
inner: BlockBody { transactions, ommers, withdrawals },
sidecars,
read_precompile_calls,
highest_precompile_address,
},
},
td,
}) = value;
Self { Self {
block: BlockHelper { block: BlockHelper {
header: Cow::Borrowed(header), header: Cow::Borrowed(&b.header),
transactions: Cow::Borrowed(transactions), transactions: Cow::Borrowed(&b.body.inner.transactions),
ommers: Cow::Borrowed(ommers), ommers: Cow::Borrowed(&b.body.inner.ommers),
withdrawals: withdrawals.as_ref().map(Cow::Borrowed), withdrawals: b.body.inner.withdrawals.as_ref().map(Cow::Borrowed),
}, },
td: *td, td: value.0.td,
sidecars: sidecars.as_ref().map(Cow::Borrowed), sidecars: b.body.sidecars.as_ref().map(Cow::Borrowed),
read_precompile_calls: read_precompile_calls.as_ref().map(Cow::Borrowed), read_precompile_calls: b.body.read_precompile_calls.as_ref().map(Cow::Borrowed),
highest_precompile_address: highest_precompile_address.as_ref().map(Cow::Borrowed), highest_precompile_address: b
.body
.highest_precompile_address
.as_ref()
.map(Cow::Borrowed),
} }
} }
} }
@ -111,30 +101,24 @@ mod rlp {
impl Decodable for HlNewBlock { impl Decodable for HlNewBlock {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> { fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let HlNewBlockHelper { let h = HlNewBlockHelper::decode(buf)?;
block: BlockHelper { header, transactions, ommers, withdrawals },
td,
sidecars,
read_precompile_calls,
highest_precompile_address,
} = HlNewBlockHelper::decode(buf)?;
Ok(HlNewBlock(NewBlock { Ok(HlNewBlock(NewBlock {
block: HlBlock { block: HlBlock {
header: header.into_owned(), header: h.block.header.into_owned(),
body: HlBlockBody { body: HlBlockBody {
inner: BlockBody { inner: BlockBody {
transactions: transactions.into_owned(), transactions: h.block.transactions.into_owned(),
ommers: ommers.into_owned(), ommers: h.block.ommers.into_owned(),
withdrawals: withdrawals.map(|w| w.into_owned()), withdrawals: h.block.withdrawals.map(|w| w.into_owned()),
}, },
sidecars: sidecars.map(|s| s.into_owned()), sidecars: h.sidecars.map(|s| s.into_owned()),
read_precompile_calls: read_precompile_calls.map(|s| s.into_owned()), read_precompile_calls: h.read_precompile_calls.map(|s| s.into_owned()),
highest_precompile_address: highest_precompile_address highest_precompile_address: h
.highest_precompile_address
.map(|s| s.into_owned()), .map(|s| s.into_owned()),
}, },
}, },
td, td: h.td,
})) }))
} }
} }
@ -172,41 +156,32 @@ impl HlNetworkBuilder {
where where
Node: FullNodeTypes<Types = HlNode>, Node: FullNodeTypes<Types = HlNode>,
{ {
let Self { engine_handle_rx, .. } = self;
let network_builder = ctx.network_config_builder()?;
let (to_import, from_network) = mpsc::unbounded_channel(); let (to_import, from_network) = mpsc::unbounded_channel();
let (to_network, import_outcome) = mpsc::unbounded_channel(); let (to_network, import_outcome) = mpsc::unbounded_channel();
let handle = ImportHandle::new(to_import, import_outcome); let handle = ImportHandle::new(to_import, import_outcome);
let consensus = Arc::new(HlConsensus { provider: ctx.provider().clone() }); let consensus = Arc::new(HlConsensus { provider: ctx.provider().clone() });
ctx.task_executor().spawn_critical("block import", async move { ctx.task_executor().spawn_critical("block import", async move {
let handle = engine_handle_rx let handle = self
.engine_handle_rx
.lock() .lock()
.await .await
.take() .take()
.expect("node should only be launched once") .expect("node should only be launched once")
.await .await
.unwrap(); .unwrap();
ImportService::new(consensus, handle, from_network, to_network).await.unwrap(); ImportService::new(consensus, handle, from_network, to_network).await.unwrap();
}); });
let network_builder = network_builder Ok(ctx.build_network_config(
.disable_dns_discovery() ctx.network_config_builder()?
.disable_nat() .disable_dns_discovery()
.boot_nodes(boot_nodes()) .disable_nat()
.set_head(ctx.head()) .boot_nodes(boot_nodes())
.with_pow() .set_head(ctx.head())
.block_import(Box::new(HlBlockImport::new(handle))); .with_pow()
// .discovery(discv4) .block_import(Box::new(HlBlockImport::new(handle))),
// .eth_rlpx_handshake(Arc::new(HlHandshake::default())); ))
let network_config = ctx.build_network_config(network_builder);
Ok(network_config)
} }
} }
@ -229,11 +204,9 @@ where
pool: Pool, pool: Pool,
) -> eyre::Result<Self::Network> { ) -> eyre::Result<Self::Network> {
let block_source_config = self.block_source_config.clone(); let block_source_config = self.block_source_config.clone();
let network_config = self.network_config(ctx)?; let handle =
let network = NetworkManager::builder(network_config).await?; ctx.start_network(NetworkManager::builder(self.network_config(ctx)?).await?, pool);
let handle = ctx.start_network(network, pool);
let local_node_record = handle.local_node_record(); let local_node_record = handle.local_node_record();
let chain_spec = ctx.chain_spec();
info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized"); info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized");
let next_block_number = ctx let next_block_number = ctx
@ -243,12 +216,17 @@ where
.block_number + .block_number +
1; 1;
let chain_spec = ctx.chain_spec();
ctx.task_executor().spawn_critical("pseudo peer", async move { ctx.task_executor().spawn_critical("pseudo peer", async move {
let block_source = start_pseudo_peer(
block_source_config.create_cached_block_source((&*chain_spec).clone(), next_block_number).await; chain_spec.clone(),
start_pseudo_peer(chain_spec, local_node_record.to_string(), block_source) local_node_record.to_string(),
.await block_source_config
.unwrap(); .create_cached_block_source((*chain_spec).clone(), next_block_number)
.await,
)
.await
.unwrap();
}); });
Ok(handle) Ok(handle)

View File

@ -68,19 +68,15 @@ impl BlockBodyTrait for HlBlockBody {
fn transactions(&self) -> &[Self::Transaction] { fn transactions(&self) -> &[Self::Transaction] {
BlockBodyTrait::transactions(&self.inner) BlockBodyTrait::transactions(&self.inner)
} }
fn into_ethereum_body(self) -> BlockBody { fn into_ethereum_body(self) -> BlockBody {
self.inner self.inner
} }
fn into_transactions(self) -> Vec<Self::Transaction> { fn into_transactions(self) -> Vec<Self::Transaction> {
self.inner.into_transactions() self.inner.into_transactions()
} }
fn withdrawals(&self) -> Option<&alloy_rpc_types::Withdrawals> { fn withdrawals(&self) -> Option<&alloy_rpc_types::Withdrawals> {
self.inner.withdrawals() self.inner.withdrawals()
} }
fn ommers(&self) -> Option<&[Self::OmmerHeader]> { fn ommers(&self) -> Option<&[Self::OmmerHeader]> {
self.inner.ommers() self.inner.ommers()
} }
@ -116,15 +112,12 @@ impl Block for HlBlock {
fn new(header: Self::Header, body: Self::Body) -> Self { fn new(header: Self::Header, body: Self::Body) -> Self {
Self { header, body } Self { header, body }
} }
fn header(&self) -> &Self::Header { fn header(&self) -> &Self::Header {
&self.header &self.header
} }
fn body(&self) -> &Self::Body { fn body(&self) -> &Self::Body {
&self.body &self.body
} }
fn split(self) -> (Self::Header, Self::Body) { fn split(self) -> (Self::Header, Self::Body) {
(self.header, self.body) (self.header, self.body)
} }
@ -179,7 +172,6 @@ mod rlp {
read_precompile_calls, read_precompile_calls,
highest_precompile_address, highest_precompile_address,
} = value; } = value;
Self { Self {
transactions: Cow::Borrowed(transactions), transactions: Cow::Borrowed(transactions),
ommers: Cow::Borrowed(ommers), ommers: Cow::Borrowed(ommers),
@ -203,7 +195,6 @@ mod rlp {
highest_precompile_address, highest_precompile_address,
}, },
} = value; } = value;
Self { Self {
header: Cow::Borrowed(header), header: Cow::Borrowed(header),
transactions: Cow::Borrowed(transactions), transactions: Cow::Borrowed(transactions),
@ -220,7 +211,6 @@ mod rlp {
fn encode(&self, out: &mut dyn bytes::BufMut) { fn encode(&self, out: &mut dyn bytes::BufMut) {
BlockBodyHelper::from(self).encode(out); BlockBodyHelper::from(self).encode(out);
} }
fn length(&self) -> usize { fn length(&self) -> usize {
BlockBodyHelper::from(self).length() BlockBodyHelper::from(self).length()
} }
@ -253,7 +243,6 @@ mod rlp {
fn encode(&self, out: &mut dyn bytes::BufMut) { fn encode(&self, out: &mut dyn bytes::BufMut) {
BlockHelper::from(self).encode(out); BlockHelper::from(self).encode(out);
} }
fn length(&self) -> usize { fn length(&self) -> usize {
BlockHelper::from(self).length() BlockHelper::from(self).length()
} }

View File

@ -77,19 +77,19 @@ impl BlockPoller {
start_rx.recv().await.ok_or(eyre::eyre!("Failed to receive start signal"))?; start_rx.recv().await.ok_or(eyre::eyre!("Failed to receive start signal"))?;
info!("Starting block poller"); info!("Starting block poller");
let latest_block_number = block_source let mut next_block_number = block_source
.find_latest_block_number() .find_latest_block_number()
.await .await
.ok_or(eyre::eyre!("Failed to find latest block number"))?; .ok_or(eyre::eyre!("Failed to find latest block number"))?;
let mut next_block_number = latest_block_number;
loop { loop {
let Ok(block) = block_source.collect_block(next_block_number).await else { match block_source.collect_block(next_block_number).await {
tokio::time::sleep(Self::POLL_INTERVAL).await; Ok(block) => {
continue; block_tx_clone.send((next_block_number, block)).await?;
}; next_block_number += 1;
block_tx_clone.send((next_block_number, block)).await?; }
next_block_number += 1; Err(_) => tokio::time::sleep(Self::POLL_INTERVAL).await,
}
} }
} }
} }
@ -111,8 +111,7 @@ impl BlockImport<HlNewBlock> for BlockPoller {
}, },
})) }))
} }
Poll::Ready(None) => Poll::Pending, Poll::Ready(None) | Poll::Pending => Poll::Pending,
Poll::Pending => Poll::Pending,
} }
} }
@ -157,12 +156,11 @@ impl<BS: BlockSource> PseudoPeer<BS> {
block_numbers: impl IntoIterator<Item = u64>, block_numbers: impl IntoIterator<Item = u64>,
) -> Vec<BlockAndReceipts> { ) -> Vec<BlockAndReceipts> {
let block_numbers = block_numbers.into_iter().collect::<Vec<_>>(); let block_numbers = block_numbers.into_iter().collect::<Vec<_>>();
let blocks = futures::stream::iter(block_numbers) futures::stream::iter(block_numbers)
.map(async |number| self.collect_block(number).await.unwrap()) .map(async |number| self.collect_block(number).await.unwrap())
.buffered(self.block_source.recommended_chunk_size() as usize) .buffered(self.block_source.recommended_chunk_size() as usize)
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await; .await
blocks
} }
pub async fn process_eth_request( pub async fn process_eth_request(
@ -179,7 +177,6 @@ impl<BS: BlockSource> PseudoPeer<BS> {
debug!( debug!(
"GetBlockHeaders request: {start_block:?}, {limit:?}, {skip:?}, {direction:?}" "GetBlockHeaders request: {start_block:?}, {limit:?}, {skip:?}, {direction:?}"
); );
let number = match start_block { let number = match start_block {
HashOrNumber::Hash(hash) => self.hash_to_block_number(hash).await, HashOrNumber::Hash(hash) => self.hash_to_block_number(hash).await,
HashOrNumber::Number(number) => number, HashOrNumber::Number(number) => number,
@ -215,12 +212,8 @@ impl<BS: BlockSource> PseudoPeer<BS> {
let _ = response.send(Ok(BlockBodies(block_bodies))); let _ = response.send(Ok(BlockBodies(block_bodies)));
} }
IncomingEthRequest::GetNodeData { .. } => { IncomingEthRequest::GetNodeData { .. } => debug!("GetNodeData request: {eth_req:?}"),
debug!("GetNodeData request: {eth_req:?}"); eth_req => debug!("New eth protocol request: {eth_req:?}"),
}
eth_req => {
debug!("New eth protocol request: {eth_req:?}");
}
} }
Ok(()) Ok(())
} }
@ -251,7 +244,6 @@ impl<BS: BlockSource> PseudoPeer<BS> {
// This is tricky because Raw EVM files (BlockSource) does not have hash to number mapping // This is tricky because Raw EVM files (BlockSource) does not have hash to number mapping
// so we can either enumerate all blocks to get hash to number mapping, or fallback to an // so we can either enumerate all blocks to get hash to number mapping, or fallback to an
// official RPC. The latter is much easier but has 300/day rate limit. // official RPC. The latter is much easier but has 300/day rate limit.
use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee_core::client::ClientT; use jsonrpsee_core::client::ClientT;
@ -259,7 +251,6 @@ impl<BS: BlockSource> PseudoPeer<BS> {
let client = let client =
HttpClientBuilder::default().build(self.chain_spec.official_rpc_url()).unwrap(); HttpClientBuilder::default().build(self.chain_spec.official_rpc_url()).unwrap();
let target_block: Block = client.request("eth_getBlockByHash", (hash, false)).await?; let target_block: Block = client.request("eth_getBlockByHash", (hash, false)).await?;
debug!("From official RPC: {:?} for {hash:?}", target_block.header.number); debug!("From official RPC: {:?} for {hash:?}", target_block.header.number);
self.cache_blocks([(hash, target_block.header.number)]); self.cache_blocks([(hash, target_block.header.number)]);
Ok(target_block.header.number) Ok(target_block.header.number)
@ -272,9 +263,10 @@ impl<BS: BlockSource> PseudoPeer<BS> {
if self.if_hit_then_warm_around.lock().unwrap().contains(&block_number) { if self.if_hit_then_warm_around.lock().unwrap().contains(&block_number) {
self.warm_cache_around_blocks(block_number, self.warm_cache_size).await; self.warm_cache_around_blocks(block_number, self.warm_cache_size).await;
} }
return Some(block_number); Some(block_number)
} else {
None
} }
None
} }
/// Backfill the cache with blocks to find the target hash /// Backfill the cache with blocks to find the target hash
@ -319,10 +311,11 @@ impl<BS: BlockSource> PseudoPeer<BS> {
async fn warm_cache_around_blocks(&mut self, block_number: u64, chunk_size: u64) { async fn warm_cache_around_blocks(&mut self, block_number: u64, chunk_size: u64) {
let start = std::cmp::max(block_number.saturating_sub(chunk_size), 1); let start = std::cmp::max(block_number.saturating_sub(chunk_size), 1);
let end = std::cmp::min(block_number + chunk_size, self.known_latest_block_number); let end = std::cmp::min(block_number + chunk_size, self.known_latest_block_number);
{
self.if_hit_then_warm_around.lock().unwrap().insert(start); let mut guard = self.if_hit_then_warm_around.lock().unwrap();
self.if_hit_then_warm_around.lock().unwrap().insert(end); guard.insert(start);
guard.insert(end);
}
const IMPOSSIBLE_HASH: B256 = B256::ZERO; const IMPOSSIBLE_HASH: B256 = B256::ZERO;
let _ = self.try_block_range_for_hash(start, end, IMPOSSIBLE_HASH).await; let _ = self.try_block_range_for_hash(start, end, IMPOSSIBLE_HASH).await;
} }
@ -348,15 +341,12 @@ impl<BS: BlockSource> PseudoPeer<BS> {
} }
debug!("Backfilling from {start_number} to {end_number}"); debug!("Backfilling from {start_number} to {end_number}");
// Collect blocks and cache them // Collect blocks and cache them
let blocks = self.collect_blocks(uncached_block_numbers).await; let blocks = self.collect_blocks(uncached_block_numbers).await;
let block_map: HashMap<B256, u64> = let block_map: HashMap<B256, u64> =
blocks.into_iter().map(|block| (block.hash(), block.number())).collect(); blocks.into_iter().map(|block| (block.hash(), block.number())).collect();
let maybe_block_number = block_map.get(&target_hash).copied(); let maybe_block_number = block_map.get(&target_hash).copied();
self.cache_blocks(block_map); self.cache_blocks(block_map);
Ok(maybe_block_number) Ok(maybe_block_number)
} }

View File

@ -26,8 +26,7 @@ pub struct LocalBlocksCache {
} }
impl LocalBlocksCache { impl LocalBlocksCache {
// 3660 blocks per hour const CACHE_SIZE: u32 = 8000; // 3660 blocks per hour
const CACHE_SIZE: u32 = 8000;
fn new() -> Self { fn new() -> Self {
Self { cache: LruMap::new(Self::CACHE_SIZE), ranges: RangeInclusiveMap::new() } Self { cache: LruMap::new(Self::CACHE_SIZE), ranges: RangeInclusiveMap::new() }
@ -60,8 +59,7 @@ struct ScanOptions {
} }
fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts = serde_json::from_str(line)?;
serde_json::from_str(line)?;
let height = match &parsed_block.block { let height = match &parsed_block.block {
EvmBlock::Reth115(b) => b.header.header.number, EvmBlock::Reth115(b) => b.header.header.number,
}; };
@ -69,16 +67,13 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)>
} }
fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult { fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult {
let file = File::open(path).expect("Failed to open hour file path"); let lines: Vec<String> = BufReader::new(File::open(path).expect("Failed to open hour file"))
let reader = BufReader::new(file); .lines()
.collect::<Result<_, _>>()
let ScanOptions { start_height, only_load_ranges } = options; .unwrap();
let mut new_blocks = Vec::new();
let mut last_height = start_height;
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
let mut new_blocks = Vec::new();
let mut last_height = options.start_height;
let mut block_ranges = Vec::new(); let mut block_ranges = Vec::new();
let mut current_range: Option<(u64, u64)> = None; let mut current_range: Option<(u64, u64)> = None;
@ -88,18 +83,16 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> S
} }
match line_to_evm_block(line) { match line_to_evm_block(line) {
Ok((parsed_block, height)) => { Ok((parsed_block, height)) if height >= options.start_height => {
if height >= start_height { last_height = last_height.max(height);
last_height = last_height.max(height); if !options.only_load_ranges {
if !only_load_ranges { new_blocks.push(parsed_block);
new_blocks.push(parsed_block);
}
*last_line = line_idx;
} }
*last_line = line_idx;
match current_range { match current_range {
Some((start, end)) if end + 1 == height => { Some((start, end)) if end + 1 == height => {
current_range = Some((start, height)); current_range = Some((start, height))
} }
_ => { _ => {
if let Some((start, end)) = current_range.take() { if let Some((start, end)) = current_range.take() {
@ -109,17 +102,14 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> S
} }
} }
} }
Err(_) => { Ok(_) => {}
warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)),
continue;
}
} }
} }
if let Some((start, end)) = current_range { if let Some((start, end)) = current_range {
block_ranges.push(start..=end); block_ranges.push(start..=end);
} }
ScanResult { ScanResult {
path: path.to_path_buf(), path: path.to_path_buf(),
next_expected_height: last_height + 1, next_expected_height: last_height + 1,
@ -204,33 +194,27 @@ fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> Option<(BlockAndRece
let mut last_line = Vec::new(); let mut last_line = Vec::new();
while pos > 0 { while pos > 0 {
let read_size = std::cmp::min(pos, CHUNK_SIZE); let read_size = pos.min(CHUNK_SIZE);
buf.resize(read_size as usize, 0); buf.resize(read_size as usize, 0);
read.seek(SeekFrom::Start(pos - read_size)).unwrap(); read.seek(SeekFrom::Start(pos - read_size)).unwrap();
read.read_exact(&mut buf).unwrap(); read.read_exact(&mut buf).unwrap();
last_line = [buf.clone(), last_line].concat(); last_line = [buf.clone(), last_line].concat();
if last_line.ends_with(b"\n") { if last_line.ends_with(b"\n") {
last_line.pop(); last_line.pop();
} }
if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') {
let candidate = &last_line[idx + 1..]; let candidate = &last_line[idx + 1..];
if let Ok((evm_block, height)) = line_to_evm_block(str::from_utf8(candidate).unwrap()) { if let Ok(result) = line_to_evm_block(str::from_utf8(candidate).unwrap()) {
return Some((evm_block, height)); return Some(result);
} }
// Incomplete line; truncate and continue
last_line.truncate(idx); last_line.truncate(idx);
} }
if pos < read_size { if pos < read_size {
break; break;
} }
pos -= read_size; pos -= read_size;
} }
line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok() line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok()
} }
@ -246,12 +230,9 @@ impl HlNodeBlockSource {
async fn update_last_fetch(&self, height: u64, now: OffsetDateTime) { async fn update_last_fetch(&self, height: u64, now: OffsetDateTime) {
let mut last_fetch = self.last_local_fetch.lock().await; let mut last_fetch = self.last_local_fetch.lock().await;
if let Some((last_height, _)) = *last_fetch { if last_fetch.is_none_or(|(h, _)| h < height) {
if last_height >= height { *last_fetch = Some((height, now));
return;
}
} }
*last_fetch = Some((height, now));
} }
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> { async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
@ -259,9 +240,7 @@ impl HlNodeBlockSource {
if let Some(block) = u_cache.cache.remove(&height) { if let Some(block) = u_cache.cache.remove(&height) {
return Some(block); return Some(block);
} }
let path = u_cache.ranges.get(&height).cloned()?; let path = u_cache.ranges.get(&height).cloned()?;
info!("Loading block data from {:?}", path); info!("Loading block data from {:?}", path);
u_cache.load_scan_result(scan_hour_file( u_cache.load_scan_result(scan_hour_file(
&path, &path,
@ -272,36 +251,32 @@ impl HlNodeBlockSource {
} }
fn datetime_from_path(path: &Path) -> Option<OffsetDateTime> { fn datetime_from_path(path: &Path) -> Option<OffsetDateTime> {
let dt_part = path.parent()?.file_name()?.to_str()?; let (dt_part, hour_part) =
let hour_part = path.file_name()?.to_str()?; (path.parent()?.file_name()?.to_str()?, path.file_name()?.to_str()?);
let hour: u8 = hour_part.parse().ok()?;
Some(OffsetDateTime::new_utc( Some(OffsetDateTime::new_utc(
Date::parse(dt_part, &format_description!("[year][month][day]")).ok()?, Date::parse(dt_part, &format_description!("[year][month][day]")).ok()?,
Time::from_hms(hour, 0, 0).ok()?, Time::from_hms(hour_part.parse().ok()?, 0, 0).ok()?,
)) ))
} }
fn all_hourly_files(root: &Path) -> Option<Vec<PathBuf>> { fn all_hourly_files(root: &Path) -> Option<Vec<PathBuf>> {
let dir = root.join(HOURLY_SUBDIR);
let mut files = Vec::new(); let mut files = Vec::new();
for entry in std::fs::read_dir(root.join(HOURLY_SUBDIR)).ok()? {
for entry in std::fs::read_dir(dir).ok()? { let dir = entry.ok()?.path();
let file = entry.ok()?.path(); if let Ok(subentries) = std::fs::read_dir(&dir) {
let subfiles: Vec<_> = std::fs::read_dir(&file) files.extend(
.ok()? subentries
.filter_map(|f| f.ok().map(|f| f.path())) .filter_map(|f| f.ok().map(|f| f.path()))
.filter(|p| Self::datetime_from_path(p).is_some()) .filter(|p| Self::datetime_from_path(p).is_some()),
.collect(); );
files.extend(subfiles); }
} }
files.sort(); files.sort();
Some(files) Some(files)
} }
fn find_latest_hourly_file(root: &Path) -> Option<PathBuf> { fn find_latest_hourly_file(root: &Path) -> Option<PathBuf> {
Self::all_hourly_files(root)?.last().cloned() Self::all_hourly_files(root)?.into_iter().last()
} }
async fn try_backfill_local_blocks( async fn try_backfill_local_blocks(
@ -310,69 +285,55 @@ impl HlNodeBlockSource {
cutoff_height: u64, cutoff_height: u64,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let mut u_cache = cache.lock().await; let mut u_cache = cache.lock().await;
for subfile in Self::all_hourly_files(root).unwrap_or_default() { for subfile in Self::all_hourly_files(root).unwrap_or_default() {
let mut file = File::open(&subfile).expect("Failed to open hour file path"); let mut file = File::open(&subfile).expect("Failed to open hour file");
if let Some((_, height)) = read_last_complete_line(&mut file) { if let Some((_, height)) = read_last_complete_line(&mut file) {
if height < cutoff_height { if height < cutoff_height {
continue; continue;
} }
} else { } else {
warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile); warn!("Failed to parse last line of file: {:?}", subfile);
} }
let mut scan_result = scan_hour_file( let mut scan_result = scan_hour_file(
&subfile, &subfile,
&mut 0, &mut 0,
ScanOptions { start_height: cutoff_height, only_load_ranges: true }, ScanOptions { start_height: cutoff_height, only_load_ranges: true },
); );
// Only store the block ranges for now; actual block data will be loaded lazily later to scan_result.new_blocks.clear(); // Only store ranges, load data lazily
// optimize memory usage
scan_result.new_blocks.clear();
u_cache.load_scan_result(scan_result); u_cache.load_scan_result(scan_result);
} }
if u_cache.ranges.is_empty() { if u_cache.ranges.is_empty() {
warn!("No ranges found in {:?}", root); warn!("No ranges found in {:?}", root);
} else { } else {
let (min, _) = u_cache.ranges.first_range_value().unwrap(); let (min, max) = (
let (max, _) = u_cache.ranges.last_range_value().unwrap(); u_cache.ranges.first_range_value().unwrap(),
u_cache.ranges.last_range_value().unwrap(),
);
info!( info!(
"Populated {} ranges (min: {}, max: {})", "Populated {} ranges (min: {}, max: {})",
u_cache.ranges.len(), u_cache.ranges.len(),
min.start(), min.0.start(),
max.end() max.0.end()
); );
} }
Ok(()) Ok(())
} }
async fn start_local_ingest_loop(&self, current_head: u64) { async fn start_local_ingest_loop(&self, current_head: u64) {
let root = self.local_ingest_dir.to_owned(); let root = self.local_ingest_dir.to_owned();
let cache = self.local_blocks_cache.clone(); let cache = self.local_blocks_cache.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut next_height = current_head; let mut next_height = current_head;
// Wait for the first hourly file to be created
let mut dt = loop { let mut dt = loop {
if let Some(latest_file) = Self::find_latest_hourly_file(&root) { if let Some(f) = Self::find_latest_hourly_file(&root) {
break Self::datetime_from_path(&latest_file).unwrap(); break Self::datetime_from_path(&f).unwrap();
} }
tokio::time::sleep(TAIL_INTERVAL).await; tokio::time::sleep(TAIL_INTERVAL).await;
}; };
let (mut hour, mut day_str, mut last_line) = (dt.hour(), date_from_datetime(dt), 0);
let mut hour = dt.hour(); info!("Starting local ingest loop from height: {}", current_head);
let mut day_str = date_from_datetime(dt);
let mut last_line = 0;
info!("Starting local ingest loop from height: {:?}", current_head);
loop { loop {
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
if hour_file.exists() { if hour_file.exists() {
let scan_result = scan_hour_file( let scan_result = scan_hour_file(
&hour_file, &hour_file,
@ -380,25 +341,18 @@ impl HlNodeBlockSource {
ScanOptions { start_height: next_height, only_load_ranges: false }, ScanOptions { start_height: next_height, only_load_ranges: false },
); );
next_height = scan_result.next_expected_height; next_height = scan_result.next_expected_height;
cache.lock().await.load_scan_result(scan_result);
let mut u_cache = cache.lock().await;
u_cache.load_scan_result(scan_result);
} }
let now = OffsetDateTime::now_utc(); let now = OffsetDateTime::now_utc();
if dt + Duration::HOUR < now { if dt + Duration::HOUR < now {
dt += Duration::HOUR; dt += Duration::HOUR;
hour = dt.hour(); (hour, day_str, last_line) = (dt.hour(), date_from_datetime(dt), 0);
day_str = date_from_datetime(dt);
last_line = 0;
info!( info!(
"Moving to a new file. {:?}", "Moving to new file: {:?}",
root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")) root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"))
); );
continue; continue;
} }
tokio::time::sleep(TAIL_INTERVAL).await; tokio::time::sleep(TAIL_INTERVAL).await;
} }
}); });
@ -411,7 +365,6 @@ impl HlNodeBlockSource {
next_block_number, next_block_number,
) )
.await; .await;
self.start_local_ingest_loop(next_block_number).await; self.start_local_ingest_loop(next_block_number).await;
Ok(()) Ok(())
} }
@ -421,7 +374,7 @@ impl HlNodeBlockSource {
local_ingest_dir: PathBuf, local_ingest_dir: PathBuf,
next_block_number: u64, next_block_number: u64,
) -> Self { ) -> Self {
let block_source = HlNodeBlockSource { let block_source = Self {
fallback, fallback,
local_ingest_dir, local_ingest_dir,
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())), local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())),
@ -485,54 +438,55 @@ mod tests {
timestamp: u64, timestamp: u64,
extra_data: &'static [u8], extra_data: &'static [u8],
) -> LocalBlockAndReceipts { ) -> LocalBlockAndReceipts {
let extra_data = Bytes::from_static(extra_data); LocalBlockAndReceipts(
let res = BlockAndReceipts { timestamp.to_string(),
block: EvmBlock::Reth115(reth_compat::SealedBlock { BlockAndReceipts {
header: reth_compat::SealedHeader { block: EvmBlock::Reth115(reth_compat::SealedBlock {
header: Header { header: reth_compat::SealedHeader {
parent_hash: B256::ZERO, header: Header {
ommers_hash: B256::ZERO, parent_hash: B256::ZERO,
beneficiary: Address::ZERO, ommers_hash: B256::ZERO,
state_root: B256::ZERO, beneficiary: Address::ZERO,
transactions_root: B256::ZERO, state_root: B256::ZERO,
receipts_root: B256::ZERO, transactions_root: B256::ZERO,
logs_bloom: Bloom::ZERO, receipts_root: B256::ZERO,
difficulty: U256::ZERO, logs_bloom: Bloom::ZERO,
number, difficulty: U256::ZERO,
gas_limit: 0, number,
gas_used: 0, gas_limit: 0,
timestamp, gas_used: 0,
extra_data, timestamp,
mix_hash: B256::ZERO, extra_data: Bytes::from_static(extra_data),
nonce: B64::ZERO, mix_hash: B256::ZERO,
base_fee_per_gas: None, nonce: B64::ZERO,
withdrawals_root: None, base_fee_per_gas: None,
blob_gas_used: None, withdrawals_root: None,
excess_blob_gas: None, blob_gas_used: None,
parent_beacon_block_root: None, excess_blob_gas: None,
requests_hash: None, parent_beacon_block_root: None,
requests_hash: None,
},
hash: B256::ZERO,
}, },
hash: B256::ZERO, body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None },
}, }),
body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None }, receipts: vec![],
}), system_txs: vec![],
receipts: vec![], read_precompile_calls: ReadPrecompileCalls(vec![]),
system_txs: vec![], highest_precompile_address: None,
read_precompile_calls: ReadPrecompileCalls(vec![]), },
highest_precompile_address: None, )
};
LocalBlockAndReceipts(timestamp.to_string(), res)
} }
fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, File)> { fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, File)> {
let now = OffsetDateTime::now_utc(); let now = OffsetDateTime::now_utc();
let day_str = date_from_datetime(now);
let hour = now.hour();
let temp_dir = tempfile::tempdir()?; let temp_dir = tempfile::tempdir()?;
let path = temp_dir.path().join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); let path = temp_dir
.path()
.join(HOURLY_SUBDIR)
.join(date_from_datetime(now))
.join(format!("{}", now.hour()));
std::fs::create_dir_all(path.parent().unwrap())?; std::fs::create_dir_all(path.parent().unwrap())?;
Ok((temp_dir, File::create(path)?)) Ok((temp_dir, File::create(path)?))
} }