diff --git a/src/hl_node_compliance.rs b/src/hl_node_compliance.rs index 9b6ab707e..c00333589 100644 --- a/src/hl_node_compliance.rs +++ b/src/hl_node_compliance.rs @@ -1,3 +1,12 @@ +//! Overrides for RPC methods to post-filter system transactions and logs. +//! +//! System transactions are always at the beginning of the block, +//! so we can use the transaction index to determine if the log is from a system transaction, +//! and if it is, we can exclude it. +//! +//! For non-system transactions, we can just return the log as is, and the client will +//! adjust the transaction index accordingly. + use alloy_consensus::{transaction::TransactionMeta, TxReceipt}; use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_json_rpc::RpcObject; @@ -48,20 +57,19 @@ pub trait EthWrapper: { } -impl< - T: EthApiServer< - RpcTxReq, - RpcTransaction, - RpcBlock, - RpcReceipt, - RpcHeader, - > + FullEthApiTypes - + RpcNodeCoreExt> - + EthBlocks - + EthTransactions - + LoadReceipt - + 'static, - > EthWrapper for T +impl EthWrapper for T where + T: EthApiServer< + RpcTxReq, + RpcTransaction, + RpcBlock, + RpcReceipt, + RpcHeader, + > + FullEthApiTypes + + RpcNodeCoreExt> + + EthBlocks + + EthTransactions + + LoadReceipt + + 'static { } @@ -80,19 +88,16 @@ impl HlNodeFilterHttp { impl EthFilterApiServer> for HlNodeFilterHttp { - /// Handler for `eth_newFilter` async fn new_filter(&self, filter: Filter) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_newFilter"); self.filter.new_filter(filter).await } - /// Handler for `eth_newBlockFilter` async fn new_block_filter(&self) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_newBlockFilter"); self.filter.new_block_filter().await } - /// Handler for `eth_newPendingTransactionFilter` async fn new_pending_transaction_filter( &self, kind: Option, @@ -101,7 +106,6 @@ impl EthFilterApiServer> self.filter.new_pending_transaction_filter(kind).await } - /// Handler for `eth_getFilterChanges` async fn filter_changes( &self, id: FilterId, @@ -110,31 +114,20 @@ impl EthFilterApiServer> self.filter.filter_changes(id).await.map_err(ErrorObject::from) } - /// Returns an array of all logs matching filter with given id. - /// - /// Returns an error if no matching log filter exists. - /// - /// Handler for `eth_getFilterLogs` async fn filter_logs(&self, id: FilterId) -> RpcResult> { trace!(target: "rpc::eth", "Serving eth_getFilterLogs"); self.filter.filter_logs(id).await.map_err(ErrorObject::from) } - /// Handler for `eth_uninstallFilter` async fn uninstall_filter(&self, id: FilterId) -> RpcResult { trace!(target: "rpc::eth", "Serving eth_uninstallFilter"); self.filter.uninstall_filter(id).await } - /// Returns logs matching given filter object. - /// - /// Handler for `eth_getLogs` async fn logs(&self, filter: Filter) -> RpcResult> { trace!(target: "rpc::eth", "Serving eth_getLogs"); let logs = EthFilterApiServer::logs(&*self.filter, filter).await?; - let provider = self.provider.clone(); - - Ok(logs.into_iter().filter_map(|log| adjust_log::(log, &provider)).collect()) + Ok(logs.into_iter().filter_map(|log| adjust_log::(log, &self.provider)).collect()) } } @@ -158,7 +151,6 @@ impl HlNodeFilterWs { impl EthPubSubApiServer> for HlNodeFilterWs { - /// Handler for `eth_subscribe` async fn subscribe( &self, pending: PendingSubscriptionSink, @@ -166,16 +158,12 @@ impl EthPubSubApiServer> params: Option, ) -> jsonrpsee::core::SubscriptionResult { let sink = pending.accept().await?; - let pubsub = self.pubsub.clone(); - let provider = self.provider.clone(); + let (pubsub, provider) = (self.pubsub.clone(), self.provider.clone()); self.subscription_task_spawner.spawn(Box::pin(async move { if kind == SubscriptionKind::Logs { - // if no params are provided, used default filter params let filter = match params { - Some(Params::Logs(filter)) => *filter, - Some(Params::Bool(_)) => { - return; - } + Some(Params::Logs(f)) => *f, + Some(Params::Bool(_)) => return, _ => Default::default(), }; let _ = pipe_from_stream( @@ -185,46 +173,30 @@ impl EthPubSubApiServer> .await; } else { let _ = pubsub.handle_accepted(sink, kind, params).await; - }; + } })); - Ok(()) } } fn adjust_log(mut log: Log, provider: &Eth::Provider) -> Option { - let transaction_index = log.transaction_index?; - let log_index = log.log_index?; - + let (tx_idx, log_idx) = (log.transaction_index?, log.log_index?); let receipts = provider.receipts_by_block(log.block_number?.into()).unwrap()?; - - // System transactions are always at the beginning of the block, - // so we can use the transaction index to determine if the log is from a system transaction, - // and if it is, we can exclude it. - // - // For non-system transactions, we can just return the log as is, and the client will - // adjust the transaction index accordingly. - let mut system_tx_count = 0u64; - let mut system_tx_logs_count = 0u64; - + let (mut sys_tx_count, mut sys_log_count) = (0u64, 0u64); for receipt in receipts { - let is_system_tx = receipt.cumulative_gas_used() == 0; - if is_system_tx { - system_tx_count += 1; - system_tx_logs_count += receipt.logs().len() as u64; + if receipt.cumulative_gas_used() == 0 { + sys_tx_count += 1; + sys_log_count += receipt.logs().len() as u64; } } - - if system_tx_count > transaction_index { + if sys_tx_count > tx_idx { return None; } - - log.transaction_index = Some(transaction_index - system_tx_count); - log.log_index = Some(log_index - system_tx_logs_count); + log.transaction_index = Some(tx_idx - sys_tx_count); + log.log_index = Some(log_idx - sys_log_count); Some(log) } -/// Helper to convert a serde error into an [`ErrorObject`] #[derive(Debug, thiserror::Error)] #[error("Failed to serialize subscription item: {0}")] pub struct SubscriptionSerializeError(#[from] serde_json::Error); @@ -241,37 +213,18 @@ impl From for ErrorObject<'static> { } } -async fn pipe_from_stream( +async fn pipe_from_stream + Unpin>( sink: SubscriptionSink, mut stream: St, -) -> Result<(), ErrorObject<'static>> -where - St: Stream + Unpin, - T: Serialize, -{ +) -> Result<(), ErrorObject<'static>> { loop { tokio::select! { - _ = sink.closed() => { - // connection dropped - break Ok(()) - }, + _ = sink.closed() => break Ok(()), maybe_item = stream.next() => { - let item = match maybe_item { - Some(item) => item, - None => { - // stream ended - break Ok(()) - }, - }; - let msg = SubscriptionMessage::new( - sink.method_name(), - sink.subscription_id(), - &item - ).map_err(SubscriptionSerializeError::new)?; - - if sink.send(msg).await.is_err() { - break Ok(()); - } + let Some(item) = maybe_item else { break Ok(()) }; + let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item) + .map_err(SubscriptionSerializeError::new)?; + if sink.send(msg).await.is_err() { break Ok(()); } } } } @@ -410,7 +363,8 @@ async fn adjust_transaction_receipt( ) -> Result>, Eth::Error> { match eth_api.load_transaction_and_receipt(tx_hash).await? { Some((_, meta, _)) => { - // LoadReceipt::block_transaction_receipt loads the block again, so loading blocks again doesn't hurt performance much + // LoadReceipt::block_transaction_receipt loads the block again, so loading blocks again + // doesn't hurt performance much info!("block hash: {:?}", meta.block_hash); let Some((system_tx_count, block_receipts)) = adjust_block_receipts(meta.block_hash.into(), eth_api).await? @@ -464,8 +418,9 @@ where let res = self.eth_api.block_transaction_count_by_hash(hash).instrument(engine_span!()).await?; Ok(res.map(|count| { - count - - U256::from(system_tx_count_for_block(&*self.eth_api, BlockId::Hash(hash.into()))) + let sys_tx_count = + system_tx_count_for_block(&*self.eth_api, BlockId::Hash(hash.into())); + count - U256::from(sys_tx_count) })) } diff --git a/src/node/evm/config.rs b/src/node/evm/config.rs index 8f7a5e892..22c670e6f 100644 --- a/src/node/evm/config.rs +++ b/src/node/evm/config.rs @@ -71,10 +71,10 @@ where let timestamp = evm_env.block_env.timestamp.saturating_to(); // Filter out system tx receipts - let transactions_for_root: Vec = - transactions.iter().filter(|t| !is_system_transaction(t)).cloned().collect::>(); - let receipts_for_root: Vec = - receipts.iter().filter(|r| r.cumulative_gas_used() != 0).cloned().collect::>(); + let transactions_for_root: Vec<_> = + transactions.iter().filter(|t| !is_system_transaction(t)).cloned().collect(); + let receipts_for_root: Vec<_> = + receipts.iter().filter(|r| r.cumulative_gas_used() != 0).cloned().collect(); let transactions_root = proofs::calculate_transaction_root(&transactions_for_root); let receipts_root = Receipt::calculate_receipt_root_no_memo(&receipts_for_root); @@ -295,7 +295,6 @@ where // configure evm env based on parent block let mut cfg_env = CfgEnv::new().with_chain_id(self.chain_spec().chain().id()).with_spec(spec); - if let Some(blob_params) = &blob_params { cfg_env.set_max_blobs_per_tx(blob_params.max_blobs_per_tx); } @@ -376,10 +375,6 @@ where block: &'a SealedBlock>, ) -> ExecutionCtxFor<'a, Self> { let block_body = block.body(); - let extras = HlExtras { - read_precompile_calls: block_body.read_precompile_calls.clone(), - highest_precompile_address: block_body.highest_precompile_address, - }; HlBlockExecutionCtx { ctx: EthBlockExecutionCtx { parent_hash: block.header().parent_hash, @@ -387,7 +382,10 @@ where ommers: &block.body().ommers, withdrawals: block.body().withdrawals.as_ref().map(Cow::Borrowed), }, - extras, + extras: HlExtras { + read_precompile_calls: block_body.read_precompile_calls.clone(), + highest_precompile_address: block_body.highest_precompile_address, + }, } } @@ -403,8 +401,7 @@ where ommers: &[], withdrawals: attributes.withdrawals.map(Cow::Owned), }, - // TODO: hacky, double check if this is correct - extras: HlExtras::default(), + extras: HlExtras::default(), // TODO: hacky, double check if this is correct } } } @@ -416,10 +413,6 @@ impl ConfigureEngineEvm for HlEvmConfig { fn context_for_payload<'a>(&self, payload: &'a HlExecutionData) -> ExecutionCtxFor<'a, Self> { let block = &payload.0; - let extras = HlExtras { - read_precompile_calls: block.body.read_precompile_calls.clone(), - highest_precompile_address: block.body.highest_precompile_address, - }; HlBlockExecutionCtx { ctx: EthBlockExecutionCtx { parent_hash: block.header.parent_hash, @@ -427,7 +420,10 @@ impl ConfigureEngineEvm for HlEvmConfig { ommers: &block.body.ommers, withdrawals: block.body.withdrawals.as_ref().map(Cow::Borrowed), }, - extras, + extras: HlExtras { + read_precompile_calls: block.body.read_precompile_calls.clone(), + highest_precompile_address: block.body.highest_precompile_address, + }, } } diff --git a/src/node/network/block_import/service.rs b/src/node/network/block_import/service.rs index c2bb2996b..44dd79575 100644 --- a/src/node/network/block_import/service.rs +++ b/src/node/network/block_import/service.rs @@ -89,7 +89,6 @@ where /// Process a new payload and return the outcome fn new_payload(&self, block: BlockMsg, peer_id: PeerId) -> ImportFut { let engine = self.engine.clone(); - Box::pin(async move { let sealed_block = block.block.0.block.clone().seal(); let payload = HlPayloadTypes::block_to_payload(sealed_block); @@ -107,7 +106,7 @@ where .into(), _ => None, }, - Err(err) => None, + Err(_) => None, } }) } @@ -117,15 +116,10 @@ where let engine = self.engine.clone(); let consensus = self.consensus.clone(); let sealed_block = block.block.0.block.clone().seal(); - let hash = sealed_block.hash(); - let number = sealed_block.number(); + let (hash, number) = (sealed_block.hash(), sealed_block.number()); Box::pin(async move { - let (head_block_hash, current_hash) = match consensus.canonical_head(hash, number) { - Ok(hash) => hash, - Err(_) => return None, - }; - + let (head_block_hash, _) = consensus.canonical_head(hash, number).ok()?; let state = ForkchoiceState { head_block_hash, safe_block_hash: head_block_hash, @@ -146,18 +140,15 @@ where .into(), _ => None, }, - Err(err) => None, + Err(_) => None, } }) } /// Add a new block import task to the pending imports fn on_new_block(&mut self, block: BlockMsg, peer_id: PeerId) { - let payload_fut = self.new_payload(block.clone(), peer_id); - self.pending_imports.push(payload_fut); - - let fcu_fut = self.update_fork_choice(block, peer_id); - self.pending_imports.push(fcu_fut); + self.pending_imports.push(self.new_payload(block.clone(), peer_id)); + self.pending_imports.push(self.update_fork_choice(block, peer_id)); } } @@ -176,11 +167,9 @@ where } // Process completed imports and send events to network - while let Poll::Ready(Some(outcome)) = this.pending_imports.poll_next_unpin(cx) { - if let Some(outcome) = outcome { - if let Err(e) = this.to_network.send(BlockImportEvent::Outcome(outcome)) { - return Poll::Ready(Err(Box::new(e))); - } + while let Poll::Ready(Some(Some(outcome))) = this.pending_imports.poll_next_unpin(cx) { + if let Err(e) = this.to_network.send(BlockImportEvent::Outcome(outcome)) { + return Poll::Ready(Err(Box::new(e))); } } @@ -261,15 +250,12 @@ mod tests { fn chain_info(&self) -> Result { unimplemented!() } - fn best_block_number(&self) -> Result { Ok(0) } - fn last_block_number(&self) -> Result { Ok(0) } - fn block_number(&self, _hash: B256) -> Result, ProviderError> { Ok(None) } @@ -279,7 +265,6 @@ mod tests { fn block_hash(&self, _number: u64) -> Result, ProviderError> { Ok(Some(B256::ZERO)) } - fn canonical_hashes_range( &self, _start: u64, @@ -299,14 +284,12 @@ mod tests { fn both_valid() -> Self { Self { new_payload: PayloadStatusEnum::Valid, fcu: PayloadStatusEnum::Valid } } - fn invalid_new_payload() -> Self { Self { new_payload: PayloadStatusEnum::Invalid { validation_error: "test error".into() }, fcu: PayloadStatusEnum::Valid, } } - fn invalid_fcu() -> Self { Self { new_payload: PayloadStatusEnum::Valid, @@ -326,19 +309,15 @@ mod tests { let consensus = Arc::new(HlConsensus { provider: MockProvider }); let (to_engine, from_engine) = mpsc::unbounded_channel(); let engine_handle = ConsensusEngineHandle::new(to_engine); - handle_engine_msg(from_engine, responses).await; let (to_import, from_network) = mpsc::unbounded_channel(); let (to_network, import_outcome) = mpsc::unbounded_channel(); - let handle = ImportHandle::new(to_import, import_outcome); - let service = ImportService::new(consensus, engine_handle, from_network, to_network); tokio::spawn(Box::pin(async move { service.await.unwrap(); })); - Self { handle } } diff --git a/src/node/network/mod.rs b/src/node/network/mod.rs index a0079555f..0dbdcaa1f 100644 --- a/src/node/network/mod.rs +++ b/src/node/network/mod.rs @@ -69,32 +69,22 @@ mod rlp { impl<'a> From<&'a HlNewBlock> for HlNewBlockHelper<'a> { fn from(value: &'a HlNewBlock) -> Self { - let HlNewBlock(NewBlock { - block: - HlBlock { - header, - body: - HlBlockBody { - inner: BlockBody { transactions, ommers, withdrawals }, - sidecars, - read_precompile_calls, - highest_precompile_address, - }, - }, - td, - }) = value; - + let b = &value.0.block; Self { block: BlockHelper { - header: Cow::Borrowed(header), - transactions: Cow::Borrowed(transactions), - ommers: Cow::Borrowed(ommers), - withdrawals: withdrawals.as_ref().map(Cow::Borrowed), + header: Cow::Borrowed(&b.header), + transactions: Cow::Borrowed(&b.body.inner.transactions), + ommers: Cow::Borrowed(&b.body.inner.ommers), + withdrawals: b.body.inner.withdrawals.as_ref().map(Cow::Borrowed), }, - td: *td, - sidecars: sidecars.as_ref().map(Cow::Borrowed), - read_precompile_calls: read_precompile_calls.as_ref().map(Cow::Borrowed), - highest_precompile_address: highest_precompile_address.as_ref().map(Cow::Borrowed), + td: value.0.td, + sidecars: b.body.sidecars.as_ref().map(Cow::Borrowed), + read_precompile_calls: b.body.read_precompile_calls.as_ref().map(Cow::Borrowed), + highest_precompile_address: b + .body + .highest_precompile_address + .as_ref() + .map(Cow::Borrowed), } } } @@ -111,30 +101,24 @@ mod rlp { impl Decodable for HlNewBlock { fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { - let HlNewBlockHelper { - block: BlockHelper { header, transactions, ommers, withdrawals }, - td, - sidecars, - read_precompile_calls, - highest_precompile_address, - } = HlNewBlockHelper::decode(buf)?; - + let h = HlNewBlockHelper::decode(buf)?; Ok(HlNewBlock(NewBlock { block: HlBlock { - header: header.into_owned(), + header: h.block.header.into_owned(), body: HlBlockBody { inner: BlockBody { - transactions: transactions.into_owned(), - ommers: ommers.into_owned(), - withdrawals: withdrawals.map(|w| w.into_owned()), + transactions: h.block.transactions.into_owned(), + ommers: h.block.ommers.into_owned(), + withdrawals: h.block.withdrawals.map(|w| w.into_owned()), }, - sidecars: sidecars.map(|s| s.into_owned()), - read_precompile_calls: read_precompile_calls.map(|s| s.into_owned()), - highest_precompile_address: highest_precompile_address + sidecars: h.sidecars.map(|s| s.into_owned()), + read_precompile_calls: h.read_precompile_calls.map(|s| s.into_owned()), + highest_precompile_address: h + .highest_precompile_address .map(|s| s.into_owned()), }, }, - td, + td: h.td, })) } } @@ -172,41 +156,32 @@ impl HlNetworkBuilder { where Node: FullNodeTypes, { - let Self { engine_handle_rx, .. } = self; - - let network_builder = ctx.network_config_builder()?; - let (to_import, from_network) = mpsc::unbounded_channel(); let (to_network, import_outcome) = mpsc::unbounded_channel(); - let handle = ImportHandle::new(to_import, import_outcome); let consensus = Arc::new(HlConsensus { provider: ctx.provider().clone() }); ctx.task_executor().spawn_critical("block import", async move { - let handle = engine_handle_rx + let handle = self + .engine_handle_rx .lock() .await .take() .expect("node should only be launched once") .await .unwrap(); - ImportService::new(consensus, handle, from_network, to_network).await.unwrap(); }); - let network_builder = network_builder - .disable_dns_discovery() - .disable_nat() - .boot_nodes(boot_nodes()) - .set_head(ctx.head()) - .with_pow() - .block_import(Box::new(HlBlockImport::new(handle))); - // .discovery(discv4) - // .eth_rlpx_handshake(Arc::new(HlHandshake::default())); - - let network_config = ctx.build_network_config(network_builder); - - Ok(network_config) + Ok(ctx.build_network_config( + ctx.network_config_builder()? + .disable_dns_discovery() + .disable_nat() + .boot_nodes(boot_nodes()) + .set_head(ctx.head()) + .with_pow() + .block_import(Box::new(HlBlockImport::new(handle))), + )) } } @@ -229,11 +204,9 @@ where pool: Pool, ) -> eyre::Result { let block_source_config = self.block_source_config.clone(); - let network_config = self.network_config(ctx)?; - let network = NetworkManager::builder(network_config).await?; - let handle = ctx.start_network(network, pool); + let handle = + ctx.start_network(NetworkManager::builder(self.network_config(ctx)?).await?, pool); let local_node_record = handle.local_node_record(); - let chain_spec = ctx.chain_spec(); info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized"); let next_block_number = ctx @@ -243,12 +216,17 @@ where .block_number + 1; + let chain_spec = ctx.chain_spec(); ctx.task_executor().spawn_critical("pseudo peer", async move { - let block_source = - block_source_config.create_cached_block_source((&*chain_spec).clone(), next_block_number).await; - start_pseudo_peer(chain_spec, local_node_record.to_string(), block_source) - .await - .unwrap(); + start_pseudo_peer( + chain_spec.clone(), + local_node_record.to_string(), + block_source_config + .create_cached_block_source((*chain_spec).clone(), next_block_number) + .await, + ) + .await + .unwrap(); }); Ok(handle) diff --git a/src/node/primitives/mod.rs b/src/node/primitives/mod.rs index 360ac0440..c76efa70a 100644 --- a/src/node/primitives/mod.rs +++ b/src/node/primitives/mod.rs @@ -68,19 +68,15 @@ impl BlockBodyTrait for HlBlockBody { fn transactions(&self) -> &[Self::Transaction] { BlockBodyTrait::transactions(&self.inner) } - fn into_ethereum_body(self) -> BlockBody { self.inner } - fn into_transactions(self) -> Vec { self.inner.into_transactions() } - fn withdrawals(&self) -> Option<&alloy_rpc_types::Withdrawals> { self.inner.withdrawals() } - fn ommers(&self) -> Option<&[Self::OmmerHeader]> { self.inner.ommers() } @@ -116,15 +112,12 @@ impl Block for HlBlock { fn new(header: Self::Header, body: Self::Body) -> Self { Self { header, body } } - fn header(&self) -> &Self::Header { &self.header } - fn body(&self) -> &Self::Body { &self.body } - fn split(self) -> (Self::Header, Self::Body) { (self.header, self.body) } @@ -179,7 +172,6 @@ mod rlp { read_precompile_calls, highest_precompile_address, } = value; - Self { transactions: Cow::Borrowed(transactions), ommers: Cow::Borrowed(ommers), @@ -203,7 +195,6 @@ mod rlp { highest_precompile_address, }, } = value; - Self { header: Cow::Borrowed(header), transactions: Cow::Borrowed(transactions), @@ -220,7 +211,6 @@ mod rlp { fn encode(&self, out: &mut dyn bytes::BufMut) { BlockBodyHelper::from(self).encode(out); } - fn length(&self) -> usize { BlockBodyHelper::from(self).length() } @@ -253,7 +243,6 @@ mod rlp { fn encode(&self, out: &mut dyn bytes::BufMut) { BlockHelper::from(self).encode(out); } - fn length(&self) -> usize { BlockHelper::from(self).length() } diff --git a/src/pseudo_peer/service.rs b/src/pseudo_peer/service.rs index 5f4f7d159..ebf8a6533 100644 --- a/src/pseudo_peer/service.rs +++ b/src/pseudo_peer/service.rs @@ -77,19 +77,19 @@ impl BlockPoller { start_rx.recv().await.ok_or(eyre::eyre!("Failed to receive start signal"))?; info!("Starting block poller"); - let latest_block_number = block_source + let mut next_block_number = block_source .find_latest_block_number() .await .ok_or(eyre::eyre!("Failed to find latest block number"))?; - let mut next_block_number = latest_block_number; loop { - let Ok(block) = block_source.collect_block(next_block_number).await else { - tokio::time::sleep(Self::POLL_INTERVAL).await; - continue; - }; - block_tx_clone.send((next_block_number, block)).await?; - next_block_number += 1; + match block_source.collect_block(next_block_number).await { + Ok(block) => { + block_tx_clone.send((next_block_number, block)).await?; + next_block_number += 1; + } + Err(_) => tokio::time::sleep(Self::POLL_INTERVAL).await, + } } } } @@ -111,8 +111,7 @@ impl BlockImport for BlockPoller { }, })) } - Poll::Ready(None) => Poll::Pending, - Poll::Pending => Poll::Pending, + Poll::Ready(None) | Poll::Pending => Poll::Pending, } } @@ -157,12 +156,11 @@ impl PseudoPeer { block_numbers: impl IntoIterator, ) -> Vec { let block_numbers = block_numbers.into_iter().collect::>(); - let blocks = futures::stream::iter(block_numbers) + futures::stream::iter(block_numbers) .map(async |number| self.collect_block(number).await.unwrap()) .buffered(self.block_source.recommended_chunk_size() as usize) .collect::>() - .await; - blocks + .await } pub async fn process_eth_request( @@ -179,7 +177,6 @@ impl PseudoPeer { debug!( "GetBlockHeaders request: {start_block:?}, {limit:?}, {skip:?}, {direction:?}" ); - let number = match start_block { HashOrNumber::Hash(hash) => self.hash_to_block_number(hash).await, HashOrNumber::Number(number) => number, @@ -215,12 +212,8 @@ impl PseudoPeer { let _ = response.send(Ok(BlockBodies(block_bodies))); } - IncomingEthRequest::GetNodeData { .. } => { - debug!("GetNodeData request: {eth_req:?}"); - } - eth_req => { - debug!("New eth protocol request: {eth_req:?}"); - } + IncomingEthRequest::GetNodeData { .. } => debug!("GetNodeData request: {eth_req:?}"), + eth_req => debug!("New eth protocol request: {eth_req:?}"), } Ok(()) } @@ -251,7 +244,6 @@ impl PseudoPeer { // This is tricky because Raw EVM files (BlockSource) does not have hash to number mapping // so we can either enumerate all blocks to get hash to number mapping, or fallback to an // official RPC. The latter is much easier but has 300/day rate limit. - use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee_core::client::ClientT; @@ -259,7 +251,6 @@ impl PseudoPeer { let client = HttpClientBuilder::default().build(self.chain_spec.official_rpc_url()).unwrap(); let target_block: Block = client.request("eth_getBlockByHash", (hash, false)).await?; - debug!("From official RPC: {:?} for {hash:?}", target_block.header.number); self.cache_blocks([(hash, target_block.header.number)]); Ok(target_block.header.number) @@ -272,9 +263,10 @@ impl PseudoPeer { if self.if_hit_then_warm_around.lock().unwrap().contains(&block_number) { self.warm_cache_around_blocks(block_number, self.warm_cache_size).await; } - return Some(block_number); + Some(block_number) + } else { + None } - None } /// Backfill the cache with blocks to find the target hash @@ -319,10 +311,11 @@ impl PseudoPeer { async fn warm_cache_around_blocks(&mut self, block_number: u64, chunk_size: u64) { let start = std::cmp::max(block_number.saturating_sub(chunk_size), 1); let end = std::cmp::min(block_number + chunk_size, self.known_latest_block_number); - - self.if_hit_then_warm_around.lock().unwrap().insert(start); - self.if_hit_then_warm_around.lock().unwrap().insert(end); - + { + let mut guard = self.if_hit_then_warm_around.lock().unwrap(); + guard.insert(start); + guard.insert(end); + } const IMPOSSIBLE_HASH: B256 = B256::ZERO; let _ = self.try_block_range_for_hash(start, end, IMPOSSIBLE_HASH).await; } @@ -348,15 +341,12 @@ impl PseudoPeer { } debug!("Backfilling from {start_number} to {end_number}"); - // Collect blocks and cache them let blocks = self.collect_blocks(uncached_block_numbers).await; let block_map: HashMap = blocks.into_iter().map(|block| (block.hash(), block.number())).collect(); - let maybe_block_number = block_map.get(&target_hash).copied(); self.cache_blocks(block_map); - Ok(maybe_block_number) } diff --git a/src/pseudo_peer/sources/hl_node.rs b/src/pseudo_peer/sources/hl_node.rs index c0d523953..7d88b75cc 100644 --- a/src/pseudo_peer/sources/hl_node.rs +++ b/src/pseudo_peer/sources/hl_node.rs @@ -26,8 +26,7 @@ pub struct LocalBlocksCache { } impl LocalBlocksCache { - // 3660 blocks per hour - const CACHE_SIZE: u32 = 8000; + const CACHE_SIZE: u32 = 8000; // 3660 blocks per hour fn new() -> Self { Self { cache: LruMap::new(Self::CACHE_SIZE), ranges: RangeInclusiveMap::new() } @@ -60,8 +59,7 @@ struct ScanOptions { } fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> { - let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts = - serde_json::from_str(line)?; + let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts = serde_json::from_str(line)?; let height = match &parsed_block.block { EvmBlock::Reth115(b) => b.header.header.number, }; @@ -69,16 +67,13 @@ fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> } fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult { - let file = File::open(path).expect("Failed to open hour file path"); - let reader = BufReader::new(file); - - let ScanOptions { start_height, only_load_ranges } = options; - - let mut new_blocks = Vec::new(); - let mut last_height = start_height; - let lines: Vec = reader.lines().collect::>().unwrap(); + let lines: Vec = BufReader::new(File::open(path).expect("Failed to open hour file")) + .lines() + .collect::>() + .unwrap(); let skip = if *last_line == 0 { 0 } else { *last_line - 1 }; - + let mut new_blocks = Vec::new(); + let mut last_height = options.start_height; let mut block_ranges = Vec::new(); let mut current_range: Option<(u64, u64)> = None; @@ -88,18 +83,16 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> S } match line_to_evm_block(line) { - Ok((parsed_block, height)) => { - if height >= start_height { - last_height = last_height.max(height); - if !only_load_ranges { - new_blocks.push(parsed_block); - } - *last_line = line_idx; + Ok((parsed_block, height)) if height >= options.start_height => { + last_height = last_height.max(height); + if !options.only_load_ranges { + new_blocks.push(parsed_block); } + *last_line = line_idx; match current_range { Some((start, end)) if end + 1 == height => { - current_range = Some((start, height)); + current_range = Some((start, height)) } _ => { if let Some((start, end)) = current_range.take() { @@ -109,17 +102,14 @@ fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> S } } } - Err(_) => { - warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)); - continue; - } + Ok(_) => {} + Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)), } } if let Some((start, end)) = current_range { block_ranges.push(start..=end); } - ScanResult { path: path.to_path_buf(), next_expected_height: last_height + 1, @@ -204,33 +194,27 @@ fn read_last_complete_line(read: &mut R) -> Option<(BlockAndRece let mut last_line = Vec::new(); while pos > 0 { - let read_size = std::cmp::min(pos, CHUNK_SIZE); + let read_size = pos.min(CHUNK_SIZE); buf.resize(read_size as usize, 0); - read.seek(SeekFrom::Start(pos - read_size)).unwrap(); read.read_exact(&mut buf).unwrap(); - last_line = [buf.clone(), last_line].concat(); - if last_line.ends_with(b"\n") { last_line.pop(); } if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') { let candidate = &last_line[idx + 1..]; - if let Ok((evm_block, height)) = line_to_evm_block(str::from_utf8(candidate).unwrap()) { - return Some((evm_block, height)); + if let Ok(result) = line_to_evm_block(str::from_utf8(candidate).unwrap()) { + return Some(result); } - // Incomplete line; truncate and continue last_line.truncate(idx); } - if pos < read_size { break; } pos -= read_size; } - line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok() } @@ -246,12 +230,9 @@ impl HlNodeBlockSource { async fn update_last_fetch(&self, height: u64, now: OffsetDateTime) { let mut last_fetch = self.last_local_fetch.lock().await; - if let Some((last_height, _)) = *last_fetch { - if last_height >= height { - return; - } + if last_fetch.is_none_or(|(h, _)| h < height) { + *last_fetch = Some((height, now)); } - *last_fetch = Some((height, now)); } async fn try_collect_local_block(&self, height: u64) -> Option { @@ -259,9 +240,7 @@ impl HlNodeBlockSource { if let Some(block) = u_cache.cache.remove(&height) { return Some(block); } - let path = u_cache.ranges.get(&height).cloned()?; - info!("Loading block data from {:?}", path); u_cache.load_scan_result(scan_hour_file( &path, @@ -272,36 +251,32 @@ impl HlNodeBlockSource { } fn datetime_from_path(path: &Path) -> Option { - let dt_part = path.parent()?.file_name()?.to_str()?; - let hour_part = path.file_name()?.to_str()?; - - let hour: u8 = hour_part.parse().ok()?; + let (dt_part, hour_part) = + (path.parent()?.file_name()?.to_str()?, path.file_name()?.to_str()?); Some(OffsetDateTime::new_utc( Date::parse(dt_part, &format_description!("[year][month][day]")).ok()?, - Time::from_hms(hour, 0, 0).ok()?, + Time::from_hms(hour_part.parse().ok()?, 0, 0).ok()?, )) } fn all_hourly_files(root: &Path) -> Option> { - let dir = root.join(HOURLY_SUBDIR); let mut files = Vec::new(); - - for entry in std::fs::read_dir(dir).ok()? { - let file = entry.ok()?.path(); - let subfiles: Vec<_> = std::fs::read_dir(&file) - .ok()? - .filter_map(|f| f.ok().map(|f| f.path())) - .filter(|p| Self::datetime_from_path(p).is_some()) - .collect(); - files.extend(subfiles); + for entry in std::fs::read_dir(root.join(HOURLY_SUBDIR)).ok()? { + let dir = entry.ok()?.path(); + if let Ok(subentries) = std::fs::read_dir(&dir) { + files.extend( + subentries + .filter_map(|f| f.ok().map(|f| f.path())) + .filter(|p| Self::datetime_from_path(p).is_some()), + ); + } } - files.sort(); Some(files) } fn find_latest_hourly_file(root: &Path) -> Option { - Self::all_hourly_files(root)?.last().cloned() + Self::all_hourly_files(root)?.into_iter().last() } async fn try_backfill_local_blocks( @@ -310,69 +285,55 @@ impl HlNodeBlockSource { cutoff_height: u64, ) -> eyre::Result<()> { let mut u_cache = cache.lock().await; - for subfile in Self::all_hourly_files(root).unwrap_or_default() { - let mut file = File::open(&subfile).expect("Failed to open hour file path"); - + let mut file = File::open(&subfile).expect("Failed to open hour file"); if let Some((_, height)) = read_last_complete_line(&mut file) { if height < cutoff_height { continue; } } else { - warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile); + warn!("Failed to parse last line of file: {:?}", subfile); } - let mut scan_result = scan_hour_file( &subfile, &mut 0, ScanOptions { start_height: cutoff_height, only_load_ranges: true }, ); - // Only store the block ranges for now; actual block data will be loaded lazily later to - // optimize memory usage - scan_result.new_blocks.clear(); + scan_result.new_blocks.clear(); // Only store ranges, load data lazily u_cache.load_scan_result(scan_result); } - if u_cache.ranges.is_empty() { warn!("No ranges found in {:?}", root); } else { - let (min, _) = u_cache.ranges.first_range_value().unwrap(); - let (max, _) = u_cache.ranges.last_range_value().unwrap(); + let (min, max) = ( + u_cache.ranges.first_range_value().unwrap(), + u_cache.ranges.last_range_value().unwrap(), + ); info!( "Populated {} ranges (min: {}, max: {})", u_cache.ranges.len(), - min.start(), - max.end() + min.0.start(), + max.0.end() ); } - Ok(()) } async fn start_local_ingest_loop(&self, current_head: u64) { let root = self.local_ingest_dir.to_owned(); let cache = self.local_blocks_cache.clone(); - tokio::spawn(async move { let mut next_height = current_head; - - // Wait for the first hourly file to be created let mut dt = loop { - if let Some(latest_file) = Self::find_latest_hourly_file(&root) { - break Self::datetime_from_path(&latest_file).unwrap(); + if let Some(f) = Self::find_latest_hourly_file(&root) { + break Self::datetime_from_path(&f).unwrap(); } tokio::time::sleep(TAIL_INTERVAL).await; }; - - let mut hour = dt.hour(); - let mut day_str = date_from_datetime(dt); - let mut last_line = 0; - - info!("Starting local ingest loop from height: {:?}", current_head); - + let (mut hour, mut day_str, mut last_line) = (dt.hour(), date_from_datetime(dt), 0); + info!("Starting local ingest loop from height: {}", current_head); loop { let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); - if hour_file.exists() { let scan_result = scan_hour_file( &hour_file, @@ -380,25 +341,18 @@ impl HlNodeBlockSource { ScanOptions { start_height: next_height, only_load_ranges: false }, ); next_height = scan_result.next_expected_height; - - let mut u_cache = cache.lock().await; - u_cache.load_scan_result(scan_result); + cache.lock().await.load_scan_result(scan_result); } - let now = OffsetDateTime::now_utc(); - if dt + Duration::HOUR < now { dt += Duration::HOUR; - hour = dt.hour(); - day_str = date_from_datetime(dt); - last_line = 0; + (hour, day_str, last_line) = (dt.hour(), date_from_datetime(dt), 0); info!( - "Moving to a new file. {:?}", + "Moving to new file: {:?}", root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")) ); continue; } - tokio::time::sleep(TAIL_INTERVAL).await; } }); @@ -411,7 +365,6 @@ impl HlNodeBlockSource { next_block_number, ) .await; - self.start_local_ingest_loop(next_block_number).await; Ok(()) } @@ -421,7 +374,7 @@ impl HlNodeBlockSource { local_ingest_dir: PathBuf, next_block_number: u64, ) -> Self { - let block_source = HlNodeBlockSource { + let block_source = Self { fallback, local_ingest_dir, local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())), @@ -485,54 +438,55 @@ mod tests { timestamp: u64, extra_data: &'static [u8], ) -> LocalBlockAndReceipts { - let extra_data = Bytes::from_static(extra_data); - let res = BlockAndReceipts { - block: EvmBlock::Reth115(reth_compat::SealedBlock { - header: reth_compat::SealedHeader { - header: Header { - parent_hash: B256::ZERO, - ommers_hash: B256::ZERO, - beneficiary: Address::ZERO, - state_root: B256::ZERO, - transactions_root: B256::ZERO, - receipts_root: B256::ZERO, - logs_bloom: Bloom::ZERO, - difficulty: U256::ZERO, - number, - gas_limit: 0, - gas_used: 0, - timestamp, - extra_data, - mix_hash: B256::ZERO, - nonce: B64::ZERO, - base_fee_per_gas: None, - withdrawals_root: None, - blob_gas_used: None, - excess_blob_gas: None, - parent_beacon_block_root: None, - requests_hash: None, + LocalBlockAndReceipts( + timestamp.to_string(), + BlockAndReceipts { + block: EvmBlock::Reth115(reth_compat::SealedBlock { + header: reth_compat::SealedHeader { + header: Header { + parent_hash: B256::ZERO, + ommers_hash: B256::ZERO, + beneficiary: Address::ZERO, + state_root: B256::ZERO, + transactions_root: B256::ZERO, + receipts_root: B256::ZERO, + logs_bloom: Bloom::ZERO, + difficulty: U256::ZERO, + number, + gas_limit: 0, + gas_used: 0, + timestamp, + extra_data: Bytes::from_static(extra_data), + mix_hash: B256::ZERO, + nonce: B64::ZERO, + base_fee_per_gas: None, + withdrawals_root: None, + blob_gas_used: None, + excess_blob_gas: None, + parent_beacon_block_root: None, + requests_hash: None, + }, + hash: B256::ZERO, }, - hash: B256::ZERO, - }, - body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None }, - }), - receipts: vec![], - system_txs: vec![], - read_precompile_calls: ReadPrecompileCalls(vec![]), - highest_precompile_address: None, - }; - LocalBlockAndReceipts(timestamp.to_string(), res) + body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None }, + }), + receipts: vec![], + system_txs: vec![], + read_precompile_calls: ReadPrecompileCalls(vec![]), + highest_precompile_address: None, + }, + ) } fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, File)> { let now = OffsetDateTime::now_utc(); - let day_str = date_from_datetime(now); - let hour = now.hour(); - let temp_dir = tempfile::tempdir()?; - let path = temp_dir.path().join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}")); + let path = temp_dir + .path() + .join(HOURLY_SUBDIR) + .join(date_from_datetime(now)) + .join(format!("{}", now.hour())); std::fs::create_dir_all(path.parent().unwrap())?; - Ok((temp_dir, File::create(path)?)) }