11 Commits

Author SHA1 Message Date
1908e9f414 Merge pull request #40 from sentioxyz/node-builder
fix: correct ingest local blocks
2025-08-24 18:13:41 +09:00
65cdc27b51 fix: line_to_evm_block don't hold equivalent semantic after refactor 2025-08-24 16:46:45 +08:00
4f430487d6 refactor: Move RPC addons to addons/ 2025-08-24 01:18:52 -04:00
19f35a6b54 chore: clippy, fmt 2025-08-24 01:15:36 -04:00
d61020e996 refactor: Split files for block sources
By claude code
2025-08-24 01:14:33 -04:00
657df240f4 fix: Avoid unnecessarily exposing pseudo peer 2025-08-23 22:17:03 -04:00
73a34a4bc1 chore: clippy 2025-08-23 22:17:03 -04:00
d8eef6305b remove: Reduce unnecessary LoC 2025-08-23 22:17:03 -04:00
bae68ef8db refactor: Reduce unnecessary LoC
By claude code
2025-08-23 04:21:23 -04:00
f576dddfa6 remove: Remove unused code 2025-08-23 03:10:05 -04:00
894ebcbfa5 Merge pull request #36 from hl-archive-node/fix/support-new-api
fix: Support new reth API
2025-08-23 01:51:36 +09:00
39 changed files with 1080 additions and 1489 deletions

View File

@ -1,3 +1,12 @@
//! Overrides for RPC methods to post-filter system transactions and logs.
//!
//! System transactions are always at the beginning of the block,
//! so we can use the transaction index to determine if the log is from a system transaction,
//! and if it is, we can exclude it.
//!
//! For non-system transactions, we can just return the log as is, and the client will
//! adjust the transaction index accordingly.
use alloy_consensus::{transaction::TransactionMeta, TxReceipt};
use alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_json_rpc::RpcObject;
@ -9,13 +18,10 @@ use alloy_rpc_types::{
use jsonrpsee::{proc_macros::rpc, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink};
use jsonrpsee_core::{async_trait, RpcResult};
use jsonrpsee_types::ErrorObject;
use reth::{
api::FullNodeComponents, builder::rpc::RpcContext, rpc::result::internal_rpc_err,
tasks::TaskSpawner,
};
use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner};
use reth_primitives_traits::{BlockBody as _, SignedTransaction};
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
use reth_rpc::{EthFilter, EthPubSub};
use reth_rpc::{eth::pubsub::SubscriptionSerializeError, EthFilter, EthPubSub};
use reth_rpc_eth_api::{
helpers::{EthBlocks, EthTransactions, LoadReceipt},
transaction::ConvertReceiptInput,
@ -25,12 +31,9 @@ use reth_rpc_eth_api::{
use serde::Serialize;
use std::{borrow::Cow, marker::PhantomData, sync::Arc};
use tokio_stream::{Stream, StreamExt};
use tracing::{info, trace, Instrument};
use tracing::{trace, Instrument};
use crate::{
node::primitives::{HlPrimitives, TransactionSigned},
HlBlock,
};
use crate::{node::primitives::HlPrimitives, HlBlock};
pub trait EthWrapper:
EthApiServer<
@ -48,7 +51,7 @@ pub trait EthWrapper:
{
}
impl<
impl<T> EthWrapper for T where
T: EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
@ -60,8 +63,7 @@ impl<
+ EthBlocks
+ EthTransactions
+ LoadReceipt
+ 'static,
> EthWrapper for T
+ 'static
{
}
@ -80,19 +82,16 @@ impl<Eth: EthWrapper> HlNodeFilterHttp<Eth> {
impl<Eth: EthWrapper> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>>
for HlNodeFilterHttp<Eth>
{
/// Handler for `eth_newFilter`
async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newFilter");
self.filter.new_filter(filter).await
}
/// Handler for `eth_newBlockFilter`
async fn new_block_filter(&self) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
self.filter.new_block_filter().await
}
/// Handler for `eth_newPendingTransactionFilter`
async fn new_pending_transaction_filter(
&self,
kind: Option<PendingTransactionFilterKind>,
@ -101,7 +100,6 @@ impl<Eth: EthWrapper> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>>
self.filter.new_pending_transaction_filter(kind).await
}
/// Handler for `eth_getFilterChanges`
async fn filter_changes(
&self,
id: FilterId,
@ -110,31 +108,20 @@ impl<Eth: EthWrapper> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>>
self.filter.filter_changes(id).await.map_err(ErrorObject::from)
}
/// Returns an array of all logs matching filter with given id.
///
/// Returns an error if no matching log filter exists.
///
/// Handler for `eth_getFilterLogs`
async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
self.filter.filter_logs(id).await.map_err(ErrorObject::from)
}
/// Handler for `eth_uninstallFilter`
async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
self.filter.uninstall_filter(id).await
}
/// Returns logs matching given filter object.
///
/// Handler for `eth_getLogs`
async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getLogs");
let logs = EthFilterApiServer::logs(&*self.filter, filter).await?;
let provider = self.provider.clone();
Ok(logs.into_iter().filter_map(|log| adjust_log::<Eth>(log, &provider)).collect())
Ok(logs.into_iter().filter_map(|log| adjust_log::<Eth>(log, &self.provider)).collect())
}
}
@ -158,7 +145,6 @@ impl<Eth: EthWrapper> HlNodeFilterWs<Eth> {
impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>>
for HlNodeFilterWs<Eth>
{
/// Handler for `eth_subscribe`
async fn subscribe(
&self,
pending: PendingSubscriptionSink,
@ -166,16 +152,12 @@ impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>>
params: Option<Params>,
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let pubsub = self.pubsub.clone();
let provider = self.provider.clone();
let (pubsub, provider) = (self.pubsub.clone(), self.provider.clone());
self.subscription_task_spawner.spawn(Box::pin(async move {
if kind == SubscriptionKind::Logs {
// if no params are provided, used default filter params
let filter = match params {
Some(Params::Logs(filter)) => *filter,
Some(Params::Bool(_)) => {
return;
}
Some(Params::Logs(f)) => *f,
Some(Params::Bool(_)) => return,
_ => Default::default(),
};
let _ = pipe_from_stream(
@ -185,93 +167,42 @@ impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>>
.await;
} else {
let _ = pubsub.handle_accepted(sink, kind, params).await;
};
}
}));
Ok(())
}
}
fn adjust_log<Eth: EthWrapper>(mut log: Log, provider: &Eth::Provider) -> Option<Log> {
let transaction_index = log.transaction_index?;
let log_index = log.log_index?;
let (tx_idx, log_idx) = (log.transaction_index?, log.log_index?);
let receipts = provider.receipts_by_block(log.block_number?.into()).unwrap()?;
// System transactions are always at the beginning of the block,
// so we can use the transaction index to determine if the log is from a system transaction,
// and if it is, we can exclude it.
//
// For non-system transactions, we can just return the log as is, and the client will
// adjust the transaction index accordingly.
let mut system_tx_count = 0u64;
let mut system_tx_logs_count = 0u64;
let (mut sys_tx_count, mut sys_log_count) = (0u64, 0u64);
for receipt in receipts {
let is_system_tx = receipt.cumulative_gas_used() == 0;
if is_system_tx {
system_tx_count += 1;
system_tx_logs_count += receipt.logs().len() as u64;
if receipt.cumulative_gas_used() == 0 {
sys_tx_count += 1;
sys_log_count += receipt.logs().len() as u64;
}
}
if system_tx_count > transaction_index {
if sys_tx_count > tx_idx {
return None;
}
log.transaction_index = Some(transaction_index - system_tx_count);
log.log_index = Some(log_index - system_tx_logs_count);
log.transaction_index = Some(tx_idx - sys_tx_count);
log.log_index = Some(log_idx - sys_log_count);
Some(log)
}
/// Helper to convert a serde error into an [`ErrorObject`]
#[derive(Debug, thiserror::Error)]
#[error("Failed to serialize subscription item: {0}")]
pub struct SubscriptionSerializeError(#[from] serde_json::Error);
impl SubscriptionSerializeError {
const fn new(err: serde_json::Error) -> Self {
Self(err)
}
}
impl From<SubscriptionSerializeError> for ErrorObject<'static> {
fn from(value: SubscriptionSerializeError) -> Self {
internal_rpc_err(value.to_string())
}
}
async fn pipe_from_stream<T, St>(
async fn pipe_from_stream<T: Serialize, St: Stream<Item = T> + Unpin>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>>
where
St: Stream<Item = T> + Unpin,
T: Serialize,
{
) -> Result<(), ErrorObject<'static>> {
loop {
tokio::select! {
_ = sink.closed() => {
// connection dropped
break Ok(())
},
_ = sink.closed() => break Ok(()),
maybe_item = stream.next() => {
let item = match maybe_item {
Some(item) => item,
None => {
// stream ended
break Ok(())
},
};
let msg = SubscriptionMessage::new(
sink.method_name(),
sink.subscription_id(),
&item
).map_err(SubscriptionSerializeError::new)?;
if sink.send(msg).await.is_err() {
break Ok(());
}
let Some(item) = maybe_item else { break Ok(()) };
let msg = SubscriptionMessage::new(sink.method_name(), sink.subscription_id(), &item)
.map_err(SubscriptionSerializeError::from)?;
if sink.send(msg).await.is_err() { break Ok(()); }
}
}
}
@ -321,10 +252,6 @@ macro_rules! engine_span {
};
}
fn is_system_tx(tx: &TransactionSigned) -> bool {
tx.is_system_transaction()
}
fn adjust_block<Eth: EthWrapper>(
recovered_block: &RpcBlock<Eth::NetworkTypes>,
eth_api: &Eth,
@ -410,8 +337,8 @@ async fn adjust_transaction_receipt<Eth: EthWrapper>(
) -> Result<Option<RpcReceipt<Eth::NetworkTypes>>, Eth::Error> {
match eth_api.load_transaction_and_receipt(tx_hash).await? {
Some((_, meta, _)) => {
// LoadReceipt::block_transaction_receipt loads the block again, so loading blocks again doesn't hurt performance much
info!("block hash: {:?}", meta.block_hash);
// LoadReceipt::block_transaction_receipt loads the block again, so loading blocks again
// doesn't hurt performance much
let Some((system_tx_count, block_receipts)) =
adjust_block_receipts(meta.block_hash.into(), eth_api).await?
else {
@ -423,10 +350,12 @@ async fn adjust_transaction_receipt<Eth: EthWrapper>(
}
}
// This function assumes that `block_id` is already validated by the caller.
fn system_tx_count_for_block<Eth: EthWrapper>(eth_api: &Eth, block_id: BlockId) -> usize {
let provider = eth_api.provider();
let block = provider.block_by_id(block_id).unwrap().unwrap();
let system_tx_count = block.body.transactions().iter().filter(|tx| is_system_tx(tx)).count();
let system_tx_count =
block.body.transactions().iter().filter(|tx| tx.is_system_transaction()).count();
system_tx_count
}
@ -464,8 +393,9 @@ where
let res =
self.eth_api.block_transaction_count_by_hash(hash).instrument(engine_span!()).await?;
Ok(res.map(|count| {
count
- U256::from(system_tx_count_for_block(&*self.eth_api, BlockId::Hash(hash.into())))
let sys_tx_count =
system_tx_count_for_block(&*self.eth_api, BlockId::Hash(hash.into()));
count - U256::from(sys_tx_count)
}))
}

3
src/addons/mod.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod call_forwarder;
pub mod hl_node_compliance;
pub mod tx_forwarder;

View File

@ -37,7 +37,7 @@ impl EthForwarderExt {
Self { client }
}
fn from_client_error(e: ClientError, internal_error_prefix: &str) -> ErrorObject {
fn from_client_error(e: ClientError, internal_error_prefix: &str) -> ErrorObject<'static> {
match e {
ClientError::Call(e) => e,
_ => ErrorObject::owned(

View File

@ -7,7 +7,6 @@ use std::sync::LazyLock;
static GENESIS_HASH: B256 =
b256!("d8fcc13b6a195b88b7b2da3722ff6cad767b13a8c1e9ffb1c73aa9d216d895f0");
/// Dev hardforks
pub static HL_HARDFORKS: LazyLock<ChainHardforks> = LazyLock::new(|| {
ChainHardforks::new(vec![
(EthereumHardfork::Frontier.boxed(), ForkCondition::Block(0)),

View File

@ -1,8 +1,7 @@
//! Chain specification for HyperEVM.
pub mod hl;
pub mod parser;
use crate::hardforks::{hl::HlHardfork, HlHardforks};
use crate::hardforks::HlHardforks;
use alloy_consensus::Header;
use alloy_eips::eip7840::BlobParams;
use alloy_genesis::Genesis;
@ -13,15 +12,13 @@ use reth_chainspec::{
};
use reth_discv4::NodeRecord;
use reth_evm::eth::spec::EthExecutorSpec;
use std::{fmt::Display, sync::Arc};
use std::fmt::Display;
pub const MAINNET_CHAIN_ID: u64 = 999;
pub const TESTNET_CHAIN_ID: u64 = 998;
/// Hl chain spec type.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct HlChainSpec {
/// [`ChainSpec`].
pub inner: ChainSpec,
}
@ -117,11 +114,7 @@ impl EthereumHardforks for HlChainSpec {
}
}
impl HlHardforks for HlChainSpec {
fn hl_fork_activation(&self, fork: HlHardfork) -> ForkCondition {
self.fork(fork)
}
}
impl HlHardforks for HlChainSpec {}
impl EthExecutorSpec for HlChainSpec {
fn deposit_contract_address(&self) -> Option<Address> {
@ -135,12 +128,6 @@ impl From<HlChainSpec> for ChainSpec {
}
}
impl HlHardforks for Arc<HlChainSpec> {
fn hl_fork_activation(&self, fork: HlHardfork) -> ForkCondition {
self.as_ref().hl_fork_activation(fork)
}
}
impl HlChainSpec {
pub const MAINNET_RPC_URL: &str = "https://rpc.hyperliquid.xyz/evm";
pub const TESTNET_RPC_URL: &str = "https://rpc.hyperliquid-testnet.xyz/evm";

View File

@ -1 +0,0 @@

View File

@ -1,4 +1,3 @@
pub mod api;
mod handler;
pub mod spec;
pub mod transaction;

View File

@ -13,88 +13,5 @@ hardfork!(
HlHardfork {
/// Initial version
V1,
/// block.number bugfix
V2,
/// gas mismatch bugfix
V3,
}
);
impl HlHardfork {
/// Retrieves the activation block for the specified hardfork on the given chain.
pub fn activation_block<H: Hardfork>(self, fork: H, chain: Chain) -> Option<u64> {
if chain == Chain::from_named(NamedChain::Hyperliquid) {
return Self::hl_mainnet_activation_block(fork);
}
None
}
/// Retrieves the activation timestamp for the specified hardfork on the given chain.
pub fn activation_timestamp<H: Hardfork>(self, fork: H, chain: Chain) -> Option<u64> {
None
}
/// Retrieves the activation block for the specified hardfork on the HyperLiquid mainnet.
pub fn hl_mainnet_activation_block<H: Hardfork>(fork: H) -> Option<u64> {
match_hardfork(
fork,
|fork| match fork {
EthereumHardfork::Frontier |
EthereumHardfork::Homestead |
EthereumHardfork::Tangerine |
EthereumHardfork::SpuriousDragon |
EthereumHardfork::Byzantium |
EthereumHardfork::Constantinople |
EthereumHardfork::Petersburg |
EthereumHardfork::Istanbul |
EthereumHardfork::MuirGlacier |
EthereumHardfork::Berlin |
EthereumHardfork::London |
EthereumHardfork::Shanghai |
EthereumHardfork::Cancun => Some(0),
_ => None,
},
|fork| match fork {
Self::V1 | Self::V2 | Self::V3 => Some(0),
_ => None,
},
)
}
/// Hl mainnet list of hardforks.
pub fn hl_mainnet() -> ChainHardforks {
ChainHardforks::new(vec![
(EthereumHardfork::Frontier.boxed(), ForkCondition::Block(0)),
(EthereumHardfork::Homestead.boxed(), ForkCondition::Block(0)),
(EthereumHardfork::Tangerine.boxed(), ForkCondition::Block(0)),
(EthereumHardfork::SpuriousDragon.boxed(), ForkCondition::Block(0)),
(EthereumHardfork::Byzantium.boxed(), ForkCondition::Block(0)),
(EthereumHardfork::Constantinople.boxed(), ForkCondition::Block(0)),
(EthereumHardfork::Petersburg.boxed(), ForkCondition::Block(0)),
(EthereumHardfork::Istanbul.boxed(), ForkCondition::Block(0)),
(EthereumHardfork::MuirGlacier.boxed(), ForkCondition::Block(0)),
(EthereumHardfork::Berlin.boxed(), ForkCondition::Block(0)),
(EthereumHardfork::London.boxed(), ForkCondition::Block(0)),
(EthereumHardfork::Shanghai.boxed(), ForkCondition::Block(0)),
(EthereumHardfork::Cancun.boxed(), ForkCondition::Block(0)),
(Self::V1.boxed(), ForkCondition::Block(0)),
(Self::V2.boxed(), ForkCondition::Block(0)),
(Self::V3.boxed(), ForkCondition::Block(0)),
])
}
}
/// Match helper method since it's not possible to match on `dyn Hardfork`
fn match_hardfork<H, HF, HHF>(fork: H, hardfork_fn: HF, hl_hardfork_fn: HHF) -> Option<u64>
where
H: Hardfork,
HF: Fn(&EthereumHardfork) -> Option<u64>,
HHF: Fn(&HlHardfork) -> Option<u64>,
{
let fork: &dyn Any = &fork;
if let Some(fork) = fork.downcast_ref::<EthereumHardfork>() {
return hardfork_fn(fork);
}
fork.downcast_ref::<HlHardfork>().and_then(hl_hardfork_fn)
}

View File

@ -1,13 +1,14 @@
//! Hard forks of hl protocol.
//! Hard forks of HyperEVM.
#![allow(unused)]
use hl::HlHardfork;
use reth_chainspec::{EthereumHardforks, ForkCondition};
pub mod hl;
use hl::HlHardfork;
use reth_chainspec::{EthereumHardforks, ForkCondition};
use std::sync::Arc;
/// Extends [`EthereumHardforks`] with hl helper methods.
pub trait HlHardforks: EthereumHardforks {
/// Retrieves [`ForkCondition`] by an [`HlHardfork`]. If `fork` is not present, returns
/// [`ForkCondition::Never`].
fn hl_fork_activation(&self, fork: HlHardfork) -> ForkCondition;
}
///
/// Currently a placeholder for future use.
pub trait HlHardforks: EthereumHardforks {}
impl<T: HlHardforks> HlHardforks for Arc<T> {}

View File

@ -1,11 +1,9 @@
pub mod call_forwarder;
pub mod addons;
pub mod chainspec;
pub mod consensus;
mod evm;
mod hardforks;
pub mod hl_node_compliance;
pub mod node;
pub mod pseudo_peer;
pub mod tx_forwarder;
pub use node::primitives::{HlBlock, HlBlockBody, HlPrimitives};

View File

@ -4,15 +4,17 @@ use clap::Parser;
use reth::builder::{NodeBuilder, NodeHandle, WithLaunchContext};
use reth_db::DatabaseEnv;
use reth_hl::{
addons::{
call_forwarder::{self, CallForwarderApiServer},
chainspec::{parser::HlChainSpecParser, HlChainSpec},
hl_node_compliance::install_hl_node_compliance,
tx_forwarder::{self, EthForwarderApiServer},
},
chainspec::{parser::HlChainSpecParser, HlChainSpec},
node::{
cli::{Cli, HlNodeArgs},
storage::tables::Tables,
HlNode,
},
tx_forwarder::{self, EthForwarderApiServer},
};
use tracing::info;

View File

@ -1,22 +1,9 @@
use std::sync::Arc;
use crate::{
node::{rpc::engine_api::payload::HlPayloadTypes, HlNode},
HlBlock, HlPrimitives,
};
use crate::{HlBlock, HlPrimitives};
use alloy_eips::eip7685::Requests;
use alloy_primitives::U256;
use reth::{
api::FullNodeTypes,
builder::{components::PayloadServiceBuilder, BuilderContext},
payload::{PayloadBuilderHandle, PayloadServiceCommand},
transaction_pool::TransactionPool,
};
use reth_evm::ConfigureEvm;
use reth_payload_primitives::BuiltPayload;
use reth_primitives::SealedBlock;
use tokio::sync::{broadcast, mpsc};
use tracing::warn;
use std::sync::Arc;
/// Built payload for Hl. This is similar to [`EthBuiltPayload`] but without sidecars as those
/// included into [`HlBlock`].
@ -45,73 +32,3 @@ impl BuiltPayload for HlBuiltPayload {
self.requests.clone()
}
}
#[derive(Debug, Clone, Copy, Default)]
#[non_exhaustive]
pub struct HlPayloadServiceBuilder;
impl<Node, Pool, Evm> PayloadServiceBuilder<Node, Pool, Evm> for HlPayloadServiceBuilder
where
Node: FullNodeTypes<Types = HlNode>,
Pool: TransactionPool,
Evm: ConfigureEvm,
{
async fn spawn_payload_builder_service(
self,
ctx: &BuilderContext<Node>,
_pool: Pool,
_evm_config: Evm,
) -> eyre::Result<PayloadBuilderHandle<HlPayloadTypes>> {
let (tx, mut rx) = mpsc::unbounded_channel();
ctx.task_executor().spawn_critical("payload builder", async move {
let mut subscriptions = Vec::new();
while let Some(message) = rx.recv().await {
match message {
PayloadServiceCommand::Subscribe(tx) => {
let (events_tx, events_rx) = broadcast::channel(100);
// Retain senders to make sure that channels are not getting closed
subscriptions.push(events_tx);
let _ = tx.send(events_rx);
}
message => warn!(?message, "Noop payload service received a message"),
}
}
});
Ok(PayloadBuilderHandle::new(tx))
}
}
// impl From<EthBuiltPayload> for HlBuiltPayload {
// fn from(value: EthBuiltPayload) -> Self {
// let EthBuiltPayload { id, block, fees, sidecars, requests } = value;
// HlBuiltPayload {
// id,
// block: block.into(),
// fees,
// requests,
// }
// }
// }
// pub struct HlPayloadBuilder<Inner> {
// inner: Inner,
// }
// impl<Inner> PayloadBuilder for HlPayloadBuilder<Inner>
// where
// Inner: PayloadBuilder<BuiltPayload = EthBuiltPayload>,
// {
// type Attributes = Inner::Attributes;
// type BuiltPayload = HlBuiltPayload;
// type Error = Inner::Error;
// fn try_build(
// &self,
// args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
// ) -> Result<BuildOutcome<Self::BuiltPayload>, PayloadBuilderError> {
// let outcome = self.inner.try_build(args)?;
// }
// }

View File

@ -71,10 +71,10 @@ where
let timestamp = evm_env.block_env.timestamp.saturating_to();
// Filter out system tx receipts
let transactions_for_root: Vec<TransactionSigned> =
transactions.iter().filter(|t| !is_system_transaction(t)).cloned().collect::<Vec<_>>();
let receipts_for_root: Vec<Receipt> =
receipts.iter().filter(|r| r.cumulative_gas_used() != 0).cloned().collect::<Vec<_>>();
let transactions_for_root: Vec<_> =
transactions.iter().filter(|t| !is_system_transaction(t)).cloned().collect();
let receipts_for_root: Vec<_> =
receipts.iter().filter(|r| r.cumulative_gas_used() != 0).cloned().collect();
let transactions_root = proofs::calculate_transaction_root(&transactions_for_root);
let receipts_root = Receipt::calculate_receipt_root_no_memo(&receipts_for_root);
@ -295,7 +295,6 @@ where
// configure evm env based on parent block
let mut cfg_env =
CfgEnv::new().with_chain_id(self.chain_spec().chain().id()).with_spec(spec);
if let Some(blob_params) = &blob_params {
cfg_env.set_max_blobs_per_tx(blob_params.max_blobs_per_tx);
}
@ -376,10 +375,6 @@ where
block: &'a SealedBlock<BlockTy<Self::Primitives>>,
) -> ExecutionCtxFor<'a, Self> {
let block_body = block.body();
let extras = HlExtras {
read_precompile_calls: block_body.read_precompile_calls.clone(),
highest_precompile_address: block_body.highest_precompile_address,
};
HlBlockExecutionCtx {
ctx: EthBlockExecutionCtx {
parent_hash: block.header().parent_hash,
@ -387,7 +382,10 @@ where
ommers: &block.body().ommers,
withdrawals: block.body().withdrawals.as_ref().map(Cow::Borrowed),
},
extras,
extras: HlExtras {
read_precompile_calls: block_body.read_precompile_calls.clone(),
highest_precompile_address: block_body.highest_precompile_address,
},
}
}
@ -403,8 +401,7 @@ where
ommers: &[],
withdrawals: attributes.withdrawals.map(Cow::Owned),
},
// TODO: hacky, double check if this is correct
extras: HlExtras::default(),
extras: HlExtras::default(), // TODO: hacky, double check if this is correct
}
}
}
@ -416,10 +413,6 @@ impl ConfigureEngineEvm<HlExecutionData> for HlEvmConfig {
fn context_for_payload<'a>(&self, payload: &'a HlExecutionData) -> ExecutionCtxFor<'a, Self> {
let block = &payload.0;
let extras = HlExtras {
read_precompile_calls: block.body.read_precompile_calls.clone(),
highest_precompile_address: block.body.highest_precompile_address,
};
HlBlockExecutionCtx {
ctx: EthBlockExecutionCtx {
parent_hash: block.header.parent_hash,
@ -427,7 +420,10 @@ impl ConfigureEngineEvm<HlExecutionData> for HlEvmConfig {
ommers: &block.body.ommers,
withdrawals: block.body.withdrawals.as_ref().map(Cow::Borrowed),
},
extras,
extras: HlExtras {
read_precompile_calls: block.body.read_precompile_calls.clone(),
highest_precompile_address: block.body.highest_precompile_address,
},
}
}

View File

@ -165,7 +165,6 @@ where
type EVM = HlEvmConfig;
async fn build_evm(self, ctx: &BuilderContext<Node>) -> eyre::Result<Self::EVM> {
let evm_config = HlEvmConfig::hl(ctx.chain_spec());
Ok(evm_config)
Ok(HlEvmConfig::hl(ctx.chain_spec()))
}
}

View File

@ -15,12 +15,15 @@ use crate::{
pseudo_peer::BlockSourceConfig,
};
use consensus::HlConsensusBuilder;
use engine::HlPayloadServiceBuilder;
use evm::HlExecutorBuilder;
use network::HlNetworkBuilder;
use reth::{
api::{FullNodeTypes, NodeTypes},
builder::{components::ComponentsBuilder, rpc::RpcAddOns, Node, NodeAdapter},
builder::{
components::{ComponentsBuilder, NoopPayloadServiceBuilder},
rpc::RpcAddOns,
Node, NodeAdapter,
},
};
use reth_engine_primitives::ConsensusEngineHandle;
use std::{marker::PhantomData, sync::Arc};
@ -65,7 +68,7 @@ impl HlNode {
) -> ComponentsBuilder<
Node,
HlPoolBuilder,
HlPayloadServiceBuilder,
NoopPayloadServiceBuilder,
HlNetworkBuilder,
HlExecutorBuilder,
HlConsensusBuilder,
@ -77,7 +80,7 @@ impl HlNode {
.node_types::<Node>()
.pool(HlPoolBuilder)
.executor(HlExecutorBuilder::default())
.payload(HlPayloadServiceBuilder::default())
.payload(NoopPayloadServiceBuilder::default())
.network(HlNetworkBuilder {
engine_handle_rx: self.engine_handle_rx.clone(),
block_source_config: self.block_source_config.clone(),
@ -100,7 +103,7 @@ where
type ComponentsBuilder = ComponentsBuilder<
N,
HlPoolBuilder,
HlPayloadServiceBuilder,
NoopPayloadServiceBuilder,
HlNetworkBuilder,
HlExecutorBuilder,
HlConsensusBuilder,

View File

@ -89,7 +89,6 @@ where
/// Process a new payload and return the outcome
fn new_payload(&self, block: BlockMsg, peer_id: PeerId) -> ImportFut {
let engine = self.engine.clone();
Box::pin(async move {
let sealed_block = block.block.0.block.clone().seal();
let payload = HlPayloadTypes::block_to_payload(sealed_block);
@ -107,7 +106,7 @@ where
.into(),
_ => None,
},
Err(err) => None,
Err(_) => None,
}
})
}
@ -117,15 +116,10 @@ where
let engine = self.engine.clone();
let consensus = self.consensus.clone();
let sealed_block = block.block.0.block.clone().seal();
let hash = sealed_block.hash();
let number = sealed_block.number();
let (hash, number) = (sealed_block.hash(), sealed_block.number());
Box::pin(async move {
let (head_block_hash, current_hash) = match consensus.canonical_head(hash, number) {
Ok(hash) => hash,
Err(_) => return None,
};
let (head_block_hash, _) = consensus.canonical_head(hash, number).ok()?;
let state = ForkchoiceState {
head_block_hash,
safe_block_hash: head_block_hash,
@ -146,18 +140,15 @@ where
.into(),
_ => None,
},
Err(err) => None,
Err(_) => None,
}
})
}
/// Add a new block import task to the pending imports
fn on_new_block(&mut self, block: BlockMsg, peer_id: PeerId) {
let payload_fut = self.new_payload(block.clone(), peer_id);
self.pending_imports.push(payload_fut);
let fcu_fut = self.update_fork_choice(block, peer_id);
self.pending_imports.push(fcu_fut);
self.pending_imports.push(self.new_payload(block.clone(), peer_id));
self.pending_imports.push(self.update_fork_choice(block, peer_id));
}
}
@ -176,34 +167,16 @@ where
}
// Process completed imports and send events to network
while let Poll::Ready(Some(outcome)) = this.pending_imports.poll_next_unpin(cx) {
if let Some(outcome) = outcome {
while let Poll::Ready(Some(Some(outcome))) = this.pending_imports.poll_next_unpin(cx) {
if let Err(e) = this.to_network.send(BlockImportEvent::Outcome(outcome)) {
return Poll::Ready(Err(Box::new(e)));
}
}
}
Poll::Pending
}
}
pub(crate) fn collect_block(height: u64) -> Option<BlockAndReceipts> {
let ingest_dir = "/home/user/personal/evm-blocks";
let f = ((height - 1) / 1_000_000) * 1_000_000;
let s = ((height - 1) / 1_000) * 1_000;
let path = format!("{ingest_dir}/{f}/{s}/{height}.rmp.lz4");
if std::path::Path::new(&path).exists() {
let file = std::fs::File::open(path).unwrap();
let file = std::io::BufReader::new(file);
let mut decoder = lz4_flex::frame::FrameDecoder::new(file);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder).unwrap();
Some(blocks[0].clone())
} else {
None
}
}
#[cfg(test)]
mod tests {
use crate::chainspec::hl::hl_mainnet;
@ -277,15 +250,12 @@ mod tests {
fn chain_info(&self) -> Result<ChainInfo, ProviderError> {
unimplemented!()
}
fn best_block_number(&self) -> Result<u64, ProviderError> {
Ok(0)
}
fn last_block_number(&self) -> Result<u64, ProviderError> {
Ok(0)
}
fn block_number(&self, _hash: B256) -> Result<Option<u64>, ProviderError> {
Ok(None)
}
@ -295,7 +265,6 @@ mod tests {
fn block_hash(&self, _number: u64) -> Result<Option<B256>, ProviderError> {
Ok(Some(B256::ZERO))
}
fn canonical_hashes_range(
&self,
_start: u64,
@ -315,14 +284,12 @@ mod tests {
fn both_valid() -> Self {
Self { new_payload: PayloadStatusEnum::Valid, fcu: PayloadStatusEnum::Valid }
}
fn invalid_new_payload() -> Self {
Self {
new_payload: PayloadStatusEnum::Invalid { validation_error: "test error".into() },
fcu: PayloadStatusEnum::Valid,
}
}
fn invalid_fcu() -> Self {
Self {
new_payload: PayloadStatusEnum::Valid,
@ -342,19 +309,15 @@ mod tests {
let consensus = Arc::new(HlConsensus { provider: MockProvider });
let (to_engine, from_engine) = mpsc::unbounded_channel();
let engine_handle = ConsensusEngineHandle::new(to_engine);
handle_engine_msg(from_engine, responses).await;
let (to_import, from_network) = mpsc::unbounded_channel();
let (to_network, import_outcome) = mpsc::unbounded_channel();
let handle = ImportHandle::new(to_import, import_outcome);
let service = ImportService::new(consensus, engine_handle, from_network, to_network);
tokio::spawn(Box::pin(async move {
service.await.unwrap();
}));
Self { handle }
}

View File

@ -12,7 +12,6 @@ use crate::{
HlBlock,
};
use alloy_rlp::{Decodable, Encodable};
// use handshake::HlHandshake;
use reth::{
api::{FullNodeTypes, TxTy},
builder::{components::NetworkBuilder, BuilderContext},
@ -69,32 +68,22 @@ mod rlp {
impl<'a> From<&'a HlNewBlock> for HlNewBlockHelper<'a> {
fn from(value: &'a HlNewBlock) -> Self {
let HlNewBlock(NewBlock {
block:
HlBlock {
header,
body:
HlBlockBody {
inner: BlockBody { transactions, ommers, withdrawals },
sidecars,
read_precompile_calls,
highest_precompile_address,
},
},
td,
}) = value;
let b = &value.0.block;
Self {
block: BlockHelper {
header: Cow::Borrowed(header),
transactions: Cow::Borrowed(transactions),
ommers: Cow::Borrowed(ommers),
withdrawals: withdrawals.as_ref().map(Cow::Borrowed),
header: Cow::Borrowed(&b.header),
transactions: Cow::Borrowed(&b.body.inner.transactions),
ommers: Cow::Borrowed(&b.body.inner.ommers),
withdrawals: b.body.inner.withdrawals.as_ref().map(Cow::Borrowed),
},
td: *td,
sidecars: sidecars.as_ref().map(Cow::Borrowed),
read_precompile_calls: read_precompile_calls.as_ref().map(Cow::Borrowed),
highest_precompile_address: highest_precompile_address.as_ref().map(Cow::Borrowed),
td: value.0.td,
sidecars: b.body.sidecars.as_ref().map(Cow::Borrowed),
read_precompile_calls: b.body.read_precompile_calls.as_ref().map(Cow::Borrowed),
highest_precompile_address: b
.body
.highest_precompile_address
.as_ref()
.map(Cow::Borrowed),
}
}
}
@ -111,30 +100,24 @@ mod rlp {
impl Decodable for HlNewBlock {
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
let HlNewBlockHelper {
block: BlockHelper { header, transactions, ommers, withdrawals },
td,
sidecars,
read_precompile_calls,
highest_precompile_address,
} = HlNewBlockHelper::decode(buf)?;
let h = HlNewBlockHelper::decode(buf)?;
Ok(HlNewBlock(NewBlock {
block: HlBlock {
header: header.into_owned(),
header: h.block.header.into_owned(),
body: HlBlockBody {
inner: BlockBody {
transactions: transactions.into_owned(),
ommers: ommers.into_owned(),
withdrawals: withdrawals.map(|w| w.into_owned()),
transactions: h.block.transactions.into_owned(),
ommers: h.block.ommers.into_owned(),
withdrawals: h.block.withdrawals.map(|w| w.into_owned()),
},
sidecars: sidecars.map(|s| s.into_owned()),
read_precompile_calls: read_precompile_calls.map(|s| s.into_owned()),
highest_precompile_address: highest_precompile_address
sidecars: h.sidecars.map(|s| s.into_owned()),
read_precompile_calls: h.read_precompile_calls.map(|s| s.into_owned()),
highest_precompile_address: h
.highest_precompile_address
.map(|s| s.into_owned()),
},
},
td,
td: h.td,
}))
}
}
@ -172,41 +155,32 @@ impl HlNetworkBuilder {
where
Node: FullNodeTypes<Types = HlNode>,
{
let Self { engine_handle_rx, .. } = self;
let network_builder = ctx.network_config_builder()?;
let (to_import, from_network) = mpsc::unbounded_channel();
let (to_network, import_outcome) = mpsc::unbounded_channel();
let handle = ImportHandle::new(to_import, import_outcome);
let consensus = Arc::new(HlConsensus { provider: ctx.provider().clone() });
ctx.task_executor().spawn_critical("block import", async move {
let handle = engine_handle_rx
let handle = self
.engine_handle_rx
.lock()
.await
.take()
.expect("node should only be launched once")
.await
.unwrap();
ImportService::new(consensus, handle, from_network, to_network).await.unwrap();
});
let network_builder = network_builder
Ok(ctx.build_network_config(
ctx.network_config_builder()?
.disable_dns_discovery()
.disable_nat()
.boot_nodes(boot_nodes())
.set_head(ctx.head())
.with_pow()
.block_import(Box::new(HlBlockImport::new(handle)));
// .discovery(discv4)
// .eth_rlpx_handshake(Arc::new(HlHandshake::default()));
let network_config = ctx.build_network_config(network_builder);
Ok(network_config)
.block_import(Box::new(HlBlockImport::new(handle))),
))
}
}
@ -229,11 +203,9 @@ where
pool: Pool,
) -> eyre::Result<Self::Network> {
let block_source_config = self.block_source_config.clone();
let network_config = self.network_config(ctx)?;
let network = NetworkManager::builder(network_config).await?;
let handle = ctx.start_network(network, pool);
let handle =
ctx.start_network(NetworkManager::builder(self.network_config(ctx)?).await?, pool);
let local_node_record = handle.local_node_record();
let chain_spec = ctx.chain_spec();
info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized");
let next_block_number = ctx
@ -243,10 +215,15 @@ where
.block_number +
1;
let chain_spec = ctx.chain_spec();
ctx.task_executor().spawn_critical("pseudo peer", async move {
let block_source =
block_source_config.create_cached_block_source((&*chain_spec).clone(), next_block_number).await;
start_pseudo_peer(chain_spec, local_node_record.to_string(), block_source)
start_pseudo_peer(
chain_spec.clone(),
local_node_record.to_string(),
block_source_config
.create_cached_block_source((*chain_spec).clone(), next_block_number)
.await,
)
.await
.unwrap();
});

View File

@ -68,19 +68,15 @@ impl BlockBodyTrait for HlBlockBody {
fn transactions(&self) -> &[Self::Transaction] {
BlockBodyTrait::transactions(&self.inner)
}
fn into_ethereum_body(self) -> BlockBody {
self.inner
}
fn into_transactions(self) -> Vec<Self::Transaction> {
self.inner.into_transactions()
}
fn withdrawals(&self) -> Option<&alloy_rpc_types::Withdrawals> {
self.inner.withdrawals()
}
fn ommers(&self) -> Option<&[Self::OmmerHeader]> {
self.inner.ommers()
}
@ -116,15 +112,12 @@ impl Block for HlBlock {
fn new(header: Self::Header, body: Self::Body) -> Self {
Self { header, body }
}
fn header(&self) -> &Self::Header {
&self.header
}
fn body(&self) -> &Self::Body {
&self.body
}
fn split(self) -> (Self::Header, Self::Body) {
(self.header, self.body)
}
@ -179,7 +172,6 @@ mod rlp {
read_precompile_calls,
highest_precompile_address,
} = value;
Self {
transactions: Cow::Borrowed(transactions),
ommers: Cow::Borrowed(ommers),
@ -203,7 +195,6 @@ mod rlp {
highest_precompile_address,
},
} = value;
Self {
header: Cow::Borrowed(header),
transactions: Cow::Borrowed(transactions),
@ -220,7 +211,6 @@ mod rlp {
fn encode(&self, out: &mut dyn bytes::BufMut) {
BlockBodyHelper::from(self).encode(out);
}
fn length(&self) -> usize {
BlockBodyHelper::from(self).length()
}
@ -253,7 +243,6 @@ mod rlp {
fn encode(&self, out: &mut dyn bytes::BufMut) {
BlockHelper::from(self).encode(out);
}
fn length(&self) -> usize {
BlockHelper::from(self).length()
}

View File

@ -114,11 +114,6 @@ impl reth_codecs::Compact for TransactionSigned {
}
}
pub fn convert_recovered(value: Recovered<TransactionSigned>) -> Recovered<InnerType> {
let (tx, signer) = value.into_parts();
Recovered::new_unchecked(tx.into_inner(), signer)
}
impl FromRecoveredTx<TransactionSigned> for TxEnv {
fn from_recovered_tx(tx: &TransactionSigned, sender: Address) -> Self {
TxEnv::from_recovered_tx(&tx.inner(), sender)
@ -192,20 +187,6 @@ impl SerdeBincodeCompat for TransactionSigned {
pub type BlockBody = alloy_consensus::BlockBody<TransactionSigned>;
impl From<TransactionSigned> for EthereumTxEnvelope<TxEip4844> {
fn from(value: TransactionSigned) -> Self {
value.into_inner()
}
}
impl TryFrom<TransactionSigned> for EthereumTxEnvelope<TxEip4844WithSidecar> {
type Error = <InnerType as TryInto<EthereumTxEnvelope<TxEip4844WithSidecar>>>::Error;
fn try_from(value: TransactionSigned) -> Result<Self, Self::Error> {
value.into_inner().try_into()
}
}
impl TryFrom<TransactionSigned>
for EthereumTxEnvelope<TxEip4844WithSidecar<BlobTransactionSidecarVariant>>
{

View File

@ -36,7 +36,7 @@ where
}
}
/// Validator for Optimism engine API.
/// Validator for HyperEVM engine API.
#[derive(Debug, Clone)]
pub struct HlPayloadValidator {
inner: HlExecutionPayloadValidator<HlChainSpec>,
@ -123,7 +123,7 @@ where
return Err(PayloadError::BlockHash {
execution: sealed_block.hash(),
consensus: expected_hash,
})?;
});
}
Ok(sealed_block)

View File

@ -1 +0,0 @@
pub const MAX_CONCURRENCY: usize = 100;

View File

@ -1,36 +0,0 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum PseudoPeerError {
#[error("Block source error: {0}")]
BlockSource(String),
#[error("Network error: {0}")]
Network(#[from] reth_network::error::NetworkError),
#[error("Configuration error: {0}")]
Config(String),
#[error("AWS S3 error: {0}")]
S3(#[from] aws_sdk_s3::Error),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serialization(#[from] rmp_serde::encode::Error),
#[error("Deserialization error: {0}")]
Deserialization(#[from] rmp_serde::decode::Error),
#[error("Compression error: {0}")]
Compression(String),
}
impl From<eyre::Error> for PseudoPeerError {
fn from(err: eyre::Error) -> Self {
PseudoPeerError::Config(err.to_string())
}
}
pub type Result<T> = std::result::Result<T, PseudoPeerError>;

View File

@ -5,33 +5,27 @@
pub mod cli;
pub mod config;
pub mod consts;
pub mod error;
pub mod network;
pub mod service;
pub mod sources;
#[cfg(test)]
mod tests;
pub mod utils;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::info;
pub use cli::*;
pub use config::*;
pub use error::*;
pub use network::*;
pub use service::*;
pub use sources::*;
#[cfg(test)]
mod tests;
use tokio::sync::mpsc;
use tracing::info;
/// Re-export commonly used types
pub mod prelude {
pub use super::{
config::BlockSourceConfig,
error::{PseudoPeerError, Result},
service::{BlockPoller, PseudoPeer},
sources::{BlockSource, CachedBlockSource, LocalBlockSource, S3BlockSource},
};

View File

@ -6,7 +6,11 @@ use reth_network::{
};
use reth_network_peers::TrustedPeer;
use reth_provider::test_utils::NoopProvider;
use std::{str::FromStr, sync::Arc};
use std::{
net::{Ipv4Addr, SocketAddr},
str::FromStr,
sync::Arc,
};
use tokio::sync::mpsc;
pub struct NetworkBuilder {
@ -32,27 +36,11 @@ impl Default for NetworkBuilder {
}
impl NetworkBuilder {
pub fn with_secret(mut self, secret: SecretKey) -> Self {
self.secret = secret;
self
}
pub fn with_peer_config(mut self, peer_config: PeersConfig) -> Self {
self.peer_config = peer_config;
self
}
pub fn with_boot_nodes(mut self, boot_nodes: Vec<TrustedPeer>) -> Self {
self.boot_nodes = boot_nodes;
self
}
pub fn with_ports(mut self, discovery_port: u16, listener_port: u16) -> Self {
self.discovery_port = discovery_port;
self.listener_port = listener_port;
self
}
pub fn with_chain_spec(mut self, chain_spec: HlChainSpec) -> Self {
self.chain_spec = chain_spec;
self
@ -66,8 +54,8 @@ impl NetworkBuilder {
let builder = NetworkConfig::<(), HlNetworkPrimitives>::builder(self.secret)
.boot_nodes(self.boot_nodes)
.peer_config(self.peer_config)
.discovery_port(self.discovery_port)
.listener_port(self.listener_port);
.discovery_addr(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), self.discovery_port))
.listener_addr(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), self.listener_port));
let chain_id = self.chain_spec.inner.chain().id();
let (block_poller, start_tx) =

View File

@ -77,20 +77,20 @@ impl BlockPoller {
start_rx.recv().await.ok_or(eyre::eyre!("Failed to receive start signal"))?;
info!("Starting block poller");
let latest_block_number = block_source
let mut next_block_number = block_source
.find_latest_block_number()
.await
.ok_or(eyre::eyre!("Failed to find latest block number"))?;
let mut next_block_number = latest_block_number;
loop {
let Ok(block) = block_source.collect_block(next_block_number).await else {
tokio::time::sleep(Self::POLL_INTERVAL).await;
continue;
};
match block_source.collect_block(next_block_number).await {
Ok(block) => {
block_tx_clone.send((next_block_number, block)).await?;
next_block_number += 1;
}
Err(_) => tokio::time::sleep(Self::POLL_INTERVAL).await,
}
}
}
}
@ -111,8 +111,7 @@ impl BlockImport<HlNewBlock> for BlockPoller {
},
}))
}
Poll::Ready(None) => Poll::Pending,
Poll::Pending => Poll::Pending,
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
}
@ -157,12 +156,11 @@ impl<BS: BlockSource> PseudoPeer<BS> {
block_numbers: impl IntoIterator<Item = u64>,
) -> Vec<BlockAndReceipts> {
let block_numbers = block_numbers.into_iter().collect::<Vec<_>>();
let blocks = futures::stream::iter(block_numbers)
futures::stream::iter(block_numbers)
.map(async |number| self.collect_block(number).await.unwrap())
.buffered(self.block_source.recommended_chunk_size() as usize)
.collect::<Vec<_>>()
.await;
blocks
.await
}
pub async fn process_eth_request(
@ -179,7 +177,6 @@ impl<BS: BlockSource> PseudoPeer<BS> {
debug!(
"GetBlockHeaders request: {start_block:?}, {limit:?}, {skip:?}, {direction:?}"
);
let number = match start_block {
HashOrNumber::Hash(hash) => self.hash_to_block_number(hash).await,
HashOrNumber::Number(number) => number,
@ -215,12 +212,8 @@ impl<BS: BlockSource> PseudoPeer<BS> {
let _ = response.send(Ok(BlockBodies(block_bodies)));
}
IncomingEthRequest::GetNodeData { .. } => {
debug!("GetNodeData request: {eth_req:?}");
}
eth_req => {
debug!("New eth protocol request: {eth_req:?}");
}
IncomingEthRequest::GetNodeData { .. } => debug!("GetNodeData request: {eth_req:?}"),
eth_req => debug!("New eth protocol request: {eth_req:?}"),
}
Ok(())
}
@ -251,7 +244,6 @@ impl<BS: BlockSource> PseudoPeer<BS> {
// This is tricky because Raw EVM files (BlockSource) does not have hash to number mapping
// so we can either enumerate all blocks to get hash to number mapping, or fallback to an
// official RPC. The latter is much easier but has 300/day rate limit.
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee_core::client::ClientT;
@ -259,7 +251,6 @@ impl<BS: BlockSource> PseudoPeer<BS> {
let client =
HttpClientBuilder::default().build(self.chain_spec.official_rpc_url()).unwrap();
let target_block: Block = client.request("eth_getBlockByHash", (hash, false)).await?;
debug!("From official RPC: {:?} for {hash:?}", target_block.header.number);
self.cache_blocks([(hash, target_block.header.number)]);
Ok(target_block.header.number)
@ -272,10 +263,11 @@ impl<BS: BlockSource> PseudoPeer<BS> {
if self.if_hit_then_warm_around.lock().unwrap().contains(&block_number) {
self.warm_cache_around_blocks(block_number, self.warm_cache_size).await;
}
return Some(block_number);
}
Some(block_number)
} else {
None
}
}
/// Backfill the cache with blocks to find the target hash
async fn backfill_cache_for_hash(
@ -319,10 +311,11 @@ impl<BS: BlockSource> PseudoPeer<BS> {
async fn warm_cache_around_blocks(&mut self, block_number: u64, chunk_size: u64) {
let start = std::cmp::max(block_number.saturating_sub(chunk_size), 1);
let end = std::cmp::min(block_number + chunk_size, self.known_latest_block_number);
self.if_hit_then_warm_around.lock().unwrap().insert(start);
self.if_hit_then_warm_around.lock().unwrap().insert(end);
{
let mut guard = self.if_hit_then_warm_around.lock().unwrap();
guard.insert(start);
guard.insert(end);
}
const IMPOSSIBLE_HASH: B256 = B256::ZERO;
let _ = self.try_block_range_for_hash(start, end, IMPOSSIBLE_HASH).await;
}
@ -348,15 +341,12 @@ impl<BS: BlockSource> PseudoPeer<BS> {
}
debug!("Backfilling from {start_number} to {end_number}");
// Collect blocks and cache them
let blocks = self.collect_blocks(uncached_block_numbers).await;
let block_map: HashMap<B256, u64> =
blocks.into_iter().map(|block| (block.hash(), block.number())).collect();
let maybe_block_number = block_map.get(&target_hash).copied();
self.cache_blocks(block_map);
Ok(maybe_block_number)
}

View File

@ -0,0 +1,44 @@
use super::{BlockSource, BlockSourceBoxed};
use crate::node::types::BlockAndReceipts;
use futures::{future::BoxFuture, FutureExt};
use reth_network::cache::LruMap;
use std::sync::{Arc, RwLock};
/// Block source wrapper that caches blocks in memory
#[derive(Debug, Clone)]
pub struct CachedBlockSource {
block_source: BlockSourceBoxed,
cache: Arc<RwLock<LruMap<u64, BlockAndReceipts>>>,
}
impl CachedBlockSource {
const CACHE_LIMIT: u32 = 100000;
pub fn new(block_source: BlockSourceBoxed) -> Self {
Self { block_source, cache: Arc::new(RwLock::new(LruMap::new(Self::CACHE_LIMIT))) }
}
}
impl BlockSource for CachedBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
let block_source = self.block_source.clone();
let cache = self.cache.clone();
async move {
if let Some(block) = cache.write().unwrap().get(&height) {
return Ok(block.clone());
}
let block = block_source.collect_block(height).await?;
cache.write().unwrap().insert(height, block.clone());
Ok(block)
}
.boxed()
}
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
self.block_source.find_latest_block_number()
}
fn recommended_chunk_size(&self) -> u64 {
self.block_source.recommended_chunk_size()
}
}

View File

@ -1,635 +0,0 @@
use super::{BlockSource, BlockSourceBoxed};
use crate::node::types::{BlockAndReceipts, EvmBlock};
use futures::future::BoxFuture;
use rangemap::RangeInclusiveMap;
use reth_network::cache::LruMap;
use serde::{Deserialize, Serialize};
use std::{
fs::File,
io::{BufRead, BufReader, Read, Seek, SeekFrom},
ops::RangeInclusive,
path::{Path, PathBuf},
sync::Arc,
};
use time::{macros::format_description, Date, Duration, OffsetDateTime, Time};
use tokio::sync::Mutex;
use tracing::{info, warn};
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
const HOURLY_SUBDIR: &str = "hourly";
#[derive(Debug)]
pub struct LocalBlocksCache {
cache: LruMap<u64, BlockAndReceipts>,
// Lightweight range map to track the ranges of blocks in the local ingest directory
ranges: RangeInclusiveMap<u64, PathBuf>,
}
impl LocalBlocksCache {
// 3660 blocks per hour
const CACHE_SIZE: u32 = 8000;
fn new() -> Self {
Self { cache: LruMap::new(Self::CACHE_SIZE), ranges: RangeInclusiveMap::new() }
}
fn load_scan_result(&mut self, scan_result: ScanResult) {
for blk in scan_result.new_blocks {
let EvmBlock::Reth115(b) = &blk.block;
self.cache.insert(b.header.header.number, blk);
}
for range in scan_result.new_block_ranges {
self.ranges.insert(range, scan_result.path.clone());
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct LocalBlockAndReceipts(String, BlockAndReceipts);
struct ScanResult {
path: PathBuf,
next_expected_height: u64,
new_blocks: Vec<BlockAndReceipts>,
new_block_ranges: Vec<RangeInclusive<u64>>,
}
struct ScanOptions {
start_height: u64,
only_load_ranges: bool,
}
fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
let LocalBlockAndReceipts(_block_timestamp, parsed_block): LocalBlockAndReceipts =
serde_json::from_str(line)?;
let height = match &parsed_block.block {
EvmBlock::Reth115(b) => b.header.header.number,
};
Ok((parsed_block, height))
}
fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult {
let file = File::open(path).expect("Failed to open hour file path");
let reader = BufReader::new(file);
let ScanOptions { start_height, only_load_ranges } = options;
let mut new_blocks = Vec::new();
let mut last_height = start_height;
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>().unwrap();
let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
let mut block_ranges = Vec::new();
let mut current_range: Option<(u64, u64)> = None;
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
if line_idx < *last_line || line.trim().is_empty() {
continue;
}
match line_to_evm_block(line) {
Ok((parsed_block, height)) => {
if height >= start_height {
last_height = last_height.max(height);
if !only_load_ranges {
new_blocks.push(parsed_block);
}
*last_line = line_idx;
}
match current_range {
Some((start, end)) if end + 1 == height => {
current_range = Some((start, height));
}
_ => {
if let Some((start, end)) = current_range.take() {
block_ranges.push(start..=end);
}
current_range = Some((height, height));
}
}
}
Err(_) => {
warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line));
continue;
}
}
}
if let Some((start, end)) = current_range {
block_ranges.push(start..=end);
}
ScanResult {
path: path.to_path_buf(),
next_expected_height: last_height + 1,
new_blocks,
new_block_ranges: block_ranges,
}
}
fn date_from_datetime(dt: OffsetDateTime) -> String {
dt.format(&format_description!("[year][month][day]")).unwrap()
}
/// Block source that monitors the local ingest directory for the HL node.
#[derive(Debug, Clone)]
pub struct HlNodeBlockSource {
pub fallback: BlockSourceBoxed,
pub local_ingest_dir: PathBuf,
pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>, // height → block
// for rate limiting requests to fallback
pub last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
}
impl BlockSource for HlNodeBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
Box::pin(async move {
let now = OffsetDateTime::now_utc();
if let Some(block) = self.try_collect_local_block(height).await {
self.update_last_fetch(height, now).await;
return Ok(block);
}
if let Some((last_height, last_poll_time)) = *self.last_local_fetch.lock().await {
let more_recent = last_height < height;
let too_soon = now - last_poll_time < Self::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK;
if more_recent && too_soon {
return Err(eyre::eyre!(
"Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up"
));
}
}
let block = self.fallback.collect_block(height).await?;
self.update_last_fetch(height, now).await;
Ok(block)
})
}
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
Box::pin(async move {
let Some(dir) = Self::find_latest_hourly_file(&self.local_ingest_dir) else {
warn!(
"No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir",
self.local_ingest_dir
);
return self.fallback.find_latest_block_number().await;
};
let mut file = File::open(&dir).expect("Failed to open hour file path");
if let Some((_, height)) = read_last_complete_line(&mut file) {
info!("Latest block number: {} with path {}", height, dir.display());
Some(height)
} else {
warn!(
"Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir",
file
);
self.fallback.find_latest_block_number().await
}
})
}
fn recommended_chunk_size(&self) -> u64 {
self.fallback.recommended_chunk_size()
}
}
fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> Option<(BlockAndReceipts, u64)> {
const CHUNK_SIZE: u64 = 50000;
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize);
let mut pos = read.seek(SeekFrom::End(0)).unwrap();
let mut last_line = Vec::new();
while pos > 0 {
let read_size = std::cmp::min(pos, CHUNK_SIZE);
buf.resize(read_size as usize, 0);
read.seek(SeekFrom::Start(pos - read_size)).unwrap();
read.read_exact(&mut buf).unwrap();
last_line = [buf.clone(), last_line].concat();
if last_line.ends_with(b"\n") {
last_line.pop();
}
if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') {
let candidate = &last_line[idx + 1..];
if let Ok((evm_block, height)) = line_to_evm_block(str::from_utf8(candidate).unwrap()) {
return Some((evm_block, height));
}
// Incomplete line; truncate and continue
last_line.truncate(idx);
}
if pos < read_size {
break;
}
pos -= read_size;
}
line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok()
}
impl HlNodeBlockSource {
/// [HlNodeBlockSource] picks the faster one between local ingest directory and s3/ingest-dir.
/// But if we immediately fallback to s3/ingest-dir, in case of S3, it may cause unnecessary
/// requests to S3 while it'll return 404.
///
/// To avoid unnecessary fallback, we set a short threshold period.
/// This threshold is several times longer than the expected block time, reducing redundant
/// fallback attempts.
pub(crate) const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000);
async fn update_last_fetch(&self, height: u64, now: OffsetDateTime) {
let mut last_fetch = self.last_local_fetch.lock().await;
if let Some((last_height, _)) = *last_fetch {
if last_height >= height {
return;
}
}
*last_fetch = Some((height, now));
}
async fn try_collect_local_block(&self, height: u64) -> Option<BlockAndReceipts> {
let mut u_cache = self.local_blocks_cache.lock().await;
if let Some(block) = u_cache.cache.remove(&height) {
return Some(block);
}
let path = u_cache.ranges.get(&height).cloned()?;
info!("Loading block data from {:?}", path);
u_cache.load_scan_result(scan_hour_file(
&path,
&mut 0,
ScanOptions { start_height: 0, only_load_ranges: false },
));
u_cache.cache.get(&height).cloned()
}
fn datetime_from_path(path: &Path) -> Option<OffsetDateTime> {
let dt_part = path.parent()?.file_name()?.to_str()?;
let hour_part = path.file_name()?.to_str()?;
let hour: u8 = hour_part.parse().ok()?;
Some(OffsetDateTime::new_utc(
Date::parse(dt_part, &format_description!("[year][month][day]")).ok()?,
Time::from_hms(hour, 0, 0).ok()?,
))
}
fn all_hourly_files(root: &Path) -> Option<Vec<PathBuf>> {
let dir = root.join(HOURLY_SUBDIR);
let mut files = Vec::new();
for entry in std::fs::read_dir(dir).ok()? {
let file = entry.ok()?.path();
let subfiles: Vec<_> = std::fs::read_dir(&file)
.ok()?
.filter_map(|f| f.ok().map(|f| f.path()))
.filter(|p| Self::datetime_from_path(p).is_some())
.collect();
files.extend(subfiles);
}
files.sort();
Some(files)
}
fn find_latest_hourly_file(root: &Path) -> Option<PathBuf> {
Self::all_hourly_files(root)?.last().cloned()
}
async fn try_backfill_local_blocks(
root: &Path,
cache: &Arc<Mutex<LocalBlocksCache>>,
cutoff_height: u64,
) -> eyre::Result<()> {
let mut u_cache = cache.lock().await;
for subfile in Self::all_hourly_files(root).unwrap_or_default() {
let mut file = File::open(&subfile).expect("Failed to open hour file path");
if let Some((_, height)) = read_last_complete_line(&mut file) {
if height < cutoff_height {
continue;
}
} else {
warn!("Failed to parse last line of file, fallback to slow path: {:?}", subfile);
}
let mut scan_result = scan_hour_file(
&subfile,
&mut 0,
ScanOptions { start_height: cutoff_height, only_load_ranges: true },
);
// Only store the block ranges for now; actual block data will be loaded lazily later to
// optimize memory usage
scan_result.new_blocks.clear();
u_cache.load_scan_result(scan_result);
}
if u_cache.ranges.is_empty() {
warn!("No ranges found in {:?}", root);
} else {
let (min, _) = u_cache.ranges.first_range_value().unwrap();
let (max, _) = u_cache.ranges.last_range_value().unwrap();
info!(
"Populated {} ranges (min: {}, max: {})",
u_cache.ranges.len(),
min.start(),
max.end()
);
}
Ok(())
}
async fn start_local_ingest_loop(&self, current_head: u64) {
let root = self.local_ingest_dir.to_owned();
let cache = self.local_blocks_cache.clone();
tokio::spawn(async move {
let mut next_height = current_head;
// Wait for the first hourly file to be created
let mut dt = loop {
if let Some(latest_file) = Self::find_latest_hourly_file(&root) {
break Self::datetime_from_path(&latest_file).unwrap();
}
tokio::time::sleep(TAIL_INTERVAL).await;
};
let mut hour = dt.hour();
let mut day_str = date_from_datetime(dt);
let mut last_line = 0;
info!("Starting local ingest loop from height: {:?}", current_head);
loop {
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
if hour_file.exists() {
let scan_result = scan_hour_file(
&hour_file,
&mut last_line,
ScanOptions { start_height: next_height, only_load_ranges: false },
);
next_height = scan_result.next_expected_height;
let mut u_cache = cache.lock().await;
u_cache.load_scan_result(scan_result);
}
let now = OffsetDateTime::now_utc();
if dt + Duration::HOUR < now {
dt += Duration::HOUR;
hour = dt.hour();
day_str = date_from_datetime(dt);
last_line = 0;
info!(
"Moving to a new file. {:?}",
root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"))
);
continue;
}
tokio::time::sleep(TAIL_INTERVAL).await;
}
});
}
pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> {
let _ = Self::try_backfill_local_blocks(
&self.local_ingest_dir,
&self.local_blocks_cache,
next_block_number,
)
.await;
self.start_local_ingest_loop(next_block_number).await;
Ok(())
}
pub async fn new(
fallback: BlockSourceBoxed,
local_ingest_dir: PathBuf,
next_block_number: u64,
) -> Self {
let block_source = HlNodeBlockSource {
fallback,
local_ingest_dir,
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new())),
last_local_fetch: Arc::new(Mutex::new(None)),
};
block_source.run(next_block_number).await.unwrap();
block_source
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
node::types::{reth_compat, ReadPrecompileCalls},
pseudo_peer::sources::LocalBlockSource,
};
use alloy_consensus::{BlockBody, Header};
use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256};
use std::{io::Write, time::Duration};
#[test]
fn test_datetime_from_path() {
let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4");
let dt = HlNodeBlockSource::datetime_from_path(path).unwrap();
println!("{dt:?}");
}
#[tokio::test]
async fn test_backfill() {
let test_path = Path::new("/root/evm_block_and_receipts");
if !test_path.exists() {
return;
}
let cache = Arc::new(Mutex::new(LocalBlocksCache::new()));
HlNodeBlockSource::try_backfill_local_blocks(test_path, &cache, 1000000).await.unwrap();
let u_cache = cache.lock().await;
println!("{:?}", u_cache.ranges);
assert_eq!(
u_cache.ranges.get(&9735058),
Some(&test_path.join(HOURLY_SUBDIR).join("20250729").join("22"))
);
}
fn scan_result_from_single_block(block: BlockAndReceipts) -> ScanResult {
let height = match &block.block {
EvmBlock::Reth115(b) => b.header.header.number,
};
ScanResult {
path: PathBuf::from("/nonexistent-block"),
next_expected_height: height + 1,
new_blocks: vec![block],
new_block_ranges: vec![height..=height],
}
}
fn empty_block(
number: u64,
timestamp: u64,
extra_data: &'static [u8],
) -> LocalBlockAndReceipts {
let extra_data = Bytes::from_static(extra_data);
let res = BlockAndReceipts {
block: EvmBlock::Reth115(reth_compat::SealedBlock {
header: reth_compat::SealedHeader {
header: Header {
parent_hash: B256::ZERO,
ommers_hash: B256::ZERO,
beneficiary: Address::ZERO,
state_root: B256::ZERO,
transactions_root: B256::ZERO,
receipts_root: B256::ZERO,
logs_bloom: Bloom::ZERO,
difficulty: U256::ZERO,
number,
gas_limit: 0,
gas_used: 0,
timestamp,
extra_data,
mix_hash: B256::ZERO,
nonce: B64::ZERO,
base_fee_per_gas: None,
withdrawals_root: None,
blob_gas_used: None,
excess_blob_gas: None,
parent_beacon_block_root: None,
requests_hash: None,
},
hash: B256::ZERO,
},
body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None },
}),
receipts: vec![],
system_txs: vec![],
read_precompile_calls: ReadPrecompileCalls(vec![]),
highest_precompile_address: None,
};
LocalBlockAndReceipts(timestamp.to_string(), res)
}
fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, File)> {
let now = OffsetDateTime::now_utc();
let day_str = date_from_datetime(now);
let hour = now.hour();
let temp_dir = tempfile::tempdir()?;
let path = temp_dir.path().join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
std::fs::create_dir_all(path.parent().unwrap())?;
Ok((temp_dir, File::create(path)?))
}
struct BlockSourceHierarchy {
block_source: HlNodeBlockSource,
_temp_dir: tempfile::TempDir,
file1: File,
current_block: LocalBlockAndReceipts,
future_block_hl_node: LocalBlockAndReceipts,
future_block_fallback: LocalBlockAndReceipts,
}
async fn setup_block_source_hierarchy() -> eyre::Result<BlockSourceHierarchy> {
// Setup fallback block source
let block_source_fallback = HlNodeBlockSource::new(
BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))),
PathBuf::from("/nonexistent"),
1000000,
)
.await;
let block_hl_node_0 = empty_block(1000000, 1722633600, b"hl-node");
let block_hl_node_1 = empty_block(1000001, 1722633600, b"hl-node");
let block_fallback_1 = empty_block(1000001, 1722633600, b"fallback");
let (temp_dir1, mut file1) = setup_temp_dir_and_file()?;
writeln!(&mut file1, "{}", serde_json::to_string(&block_hl_node_0)?)?;
let block_source = HlNodeBlockSource::new(
BlockSourceBoxed::new(Box::new(block_source_fallback.clone())),
temp_dir1.path().to_path_buf(),
1000000,
)
.await;
block_source_fallback
.local_blocks_cache
.lock()
.await
.load_scan_result(scan_result_from_single_block(block_fallback_1.1.clone()));
Ok(BlockSourceHierarchy {
block_source,
_temp_dir: temp_dir1,
file1,
current_block: block_hl_node_0,
future_block_hl_node: block_hl_node_1,
future_block_fallback: block_fallback_1,
})
}
#[tokio::test]
async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> {
let hierarchy = setup_block_source_hierarchy().await?;
let BlockSourceHierarchy {
block_source,
current_block,
future_block_hl_node,
mut file1,
..
} = hierarchy;
let block = block_source.collect_block(1000000).await.unwrap();
assert_eq!(block, current_block.1);
let block = block_source.collect_block(1000001).await;
assert!(block.is_err());
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?;
tokio::time::sleep(Duration::from_millis(100)).await;
let block = block_source.collect_block(1000001).await.unwrap();
assert_eq!(block, future_block_hl_node.1);
Ok(())
}
#[tokio::test]
async fn test_update_last_fetch_fallback() -> eyre::Result<()> {
let hierarchy = setup_block_source_hierarchy().await?;
let BlockSourceHierarchy {
block_source,
current_block,
future_block_fallback,
mut file1,
..
} = hierarchy;
let block = block_source.collect_block(1000000).await.unwrap();
assert_eq!(block, current_block.1);
tokio::time::sleep(HlNodeBlockSource::MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs())
.await;
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?;
let block = block_source.collect_block(1000001).await.unwrap();
assert_eq!(block, future_block_fallback.1);
Ok(())
}
}

View File

@ -0,0 +1,51 @@
use super::scan::ScanResult;
use crate::node::types::{BlockAndReceipts, EvmBlock};
use rangemap::RangeInclusiveMap;
use reth_network::cache::LruMap;
use std::path::{Path, PathBuf};
use tracing::{info, warn};
#[derive(Debug)]
pub struct LocalBlocksCache {
cache: LruMap<u64, BlockAndReceipts>,
ranges: RangeInclusiveMap<u64, PathBuf>,
}
impl LocalBlocksCache {
pub fn new(cache_size: u32) -> Self {
Self { cache: LruMap::new(cache_size), ranges: RangeInclusiveMap::new() }
}
pub fn load_scan_result(&mut self, scan_result: ScanResult) {
for blk in scan_result.new_blocks {
let EvmBlock::Reth115(b) = &blk.block;
self.cache.insert(b.header.header.number, blk);
}
for range in scan_result.new_block_ranges {
self.ranges.insert(range, scan_result.path.clone());
}
}
pub fn get_block(&mut self, height: u64) -> Option<BlockAndReceipts> {
self.cache.remove(&height)
}
pub fn get_path_for_height(&self, height: u64) -> Option<PathBuf> {
self.ranges.get(&height).cloned()
}
pub fn log_range_summary(&self, root: &Path) {
if self.ranges.is_empty() {
warn!("No ranges found in {:?}", root);
} else {
let (min, max) =
(self.ranges.first_range_value().unwrap(), self.ranges.last_range_value().unwrap());
info!(
"Populated {} ranges (min: {}, max: {})",
self.ranges.len(),
min.0.start(),
max.0.end()
);
}
}
}

View File

@ -0,0 +1,67 @@
use super::{scan::Scanner, time_utils::TimeUtils, HOURLY_SUBDIR};
use crate::node::types::BlockAndReceipts;
use std::{
fs::File,
io::{Read, Seek, SeekFrom},
path::{Path, PathBuf},
};
pub struct FileOperations;
impl FileOperations {
pub fn all_hourly_files(root: &Path) -> Option<Vec<PathBuf>> {
let mut files = Vec::new();
for entry in std::fs::read_dir(root.join(HOURLY_SUBDIR)).ok()? {
let dir = entry.ok()?.path();
if let Ok(subentries) = std::fs::read_dir(&dir) {
files.extend(
subentries
.filter_map(|f| f.ok().map(|f| f.path()))
.filter(|p| TimeUtils::datetime_from_path(p).is_some()),
);
}
}
files.sort();
Some(files)
}
pub fn find_latest_hourly_file(root: &Path) -> Option<PathBuf> {
Self::all_hourly_files(root)?.into_iter().last()
}
pub fn read_last_block_from_file(path: &Path) -> Option<(BlockAndReceipts, u64)> {
let mut file = File::open(path).ok()?;
Self::read_last_complete_line(&mut file)
}
fn read_last_complete_line<R: Read + Seek>(read: &mut R) -> Option<(BlockAndReceipts, u64)> {
const CHUNK_SIZE: u64 = 50000;
let mut buf = Vec::with_capacity(CHUNK_SIZE as usize);
let mut pos = read.seek(SeekFrom::End(0)).unwrap();
let mut last_line = Vec::new();
while pos > 0 {
let read_size = pos.min(CHUNK_SIZE);
buf.resize(read_size as usize, 0);
read.seek(SeekFrom::Start(pos - read_size)).unwrap();
read.read_exact(&mut buf).unwrap();
last_line = [buf.clone(), last_line].concat();
if last_line.ends_with(b"\n") {
last_line.pop();
}
if let Some(idx) = last_line.iter().rposition(|&b| b == b'\n') {
let candidate = &last_line[idx + 1..];
if let Ok(result) = Scanner::line_to_evm_block(str::from_utf8(candidate).unwrap()) {
return Some(result);
}
last_line.truncate(idx);
}
if pos < read_size {
break;
}
pos -= read_size;
}
Scanner::line_to_evm_block(&String::from_utf8(last_line).unwrap()).ok()
}
}

View File

@ -0,0 +1,227 @@
mod cache;
mod file_ops;
mod scan;
#[cfg(test)]
mod tests;
mod time_utils;
use self::{
cache::LocalBlocksCache,
file_ops::FileOperations,
scan::{ScanOptions, Scanner},
time_utils::TimeUtils,
};
use super::{BlockSource, BlockSourceBoxed};
use crate::node::types::BlockAndReceipts;
use futures::future::BoxFuture;
use serde::{Deserialize, Serialize};
use std::{
path::{Path, PathBuf},
sync::Arc,
};
use time::{Duration, OffsetDateTime};
use tokio::sync::Mutex;
use tracing::{info, warn};
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
const HOURLY_SUBDIR: &str = "hourly";
const CACHE_SIZE: u32 = 8000; // 3660 blocks per hour
const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000);
#[derive(Serialize, Deserialize, Debug, Clone)]
pub(crate) struct LocalBlockAndReceipts(String, BlockAndReceipts);
/// Block source that monitors the local ingest directory for the HL node.
#[derive(Debug, Clone)]
pub struct HlNodeBlockSource {
pub fallback: BlockSourceBoxed,
pub local_ingest_dir: PathBuf,
pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
pub last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
}
impl BlockSource for HlNodeBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
let fallback = self.fallback.clone();
let local_blocks_cache = self.local_blocks_cache.clone();
let last_local_fetch = self.last_local_fetch.clone();
Box::pin(async move {
let now = OffsetDateTime::now_utc();
if let Some(block) = Self::try_collect_local_block(local_blocks_cache, height).await {
Self::update_last_fetch(last_local_fetch, height, now).await;
return Ok(block);
}
if let Some((last_height, last_poll_time)) = *last_local_fetch.lock().await {
let more_recent = last_height < height;
let too_soon = now - last_poll_time < MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK;
if more_recent && too_soon {
return Err(eyre::eyre!(
"Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up"
));
}
}
let block = fallback.collect_block(height).await?;
Self::update_last_fetch(last_local_fetch, height, now).await;
Ok(block)
})
}
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
let fallback = self.fallback.clone();
let local_ingest_dir = self.local_ingest_dir.clone();
Box::pin(async move {
let Some(dir) = FileOperations::find_latest_hourly_file(&local_ingest_dir) else {
warn!(
"No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir",
local_ingest_dir
);
return fallback.find_latest_block_number().await;
};
match FileOperations::read_last_block_from_file(&dir) {
Some((_, height)) => {
info!("Latest block number: {} with path {}", height, dir.display());
Some(height)
}
None => {
warn!(
"Failed to parse the hl-node hourly file at {:?}; fallback to s3/ingest-dir",
dir
);
fallback.find_latest_block_number().await
}
}
})
}
fn recommended_chunk_size(&self) -> u64 {
self.fallback.recommended_chunk_size()
}
}
impl HlNodeBlockSource {
async fn update_last_fetch(
last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
height: u64,
now: OffsetDateTime,
) {
let mut last_fetch = last_local_fetch.lock().await;
if last_fetch.is_none_or(|(h, _)| h < height) {
*last_fetch = Some((height, now));
}
}
async fn try_collect_local_block(
local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
height: u64,
) -> Option<BlockAndReceipts> {
let mut u_cache = local_blocks_cache.lock().await;
if let Some(block) = u_cache.get_block(height) {
return Some(block);
}
let path = u_cache.get_path_for_height(height)?;
info!("Loading block data from {:?}", path);
let scan_result = Scanner::scan_hour_file(
&path,
&mut 0,
ScanOptions { start_height: 0, only_load_ranges: false },
);
u_cache.load_scan_result(scan_result);
u_cache.get_block(height)
}
async fn try_backfill_local_blocks(
root: &Path,
cache: &Arc<Mutex<LocalBlocksCache>>,
cutoff_height: u64,
) -> eyre::Result<()> {
let mut u_cache = cache.lock().await;
for subfile in FileOperations::all_hourly_files(root).unwrap_or_default() {
if let Some((_, height)) = FileOperations::read_last_block_from_file(&subfile) {
if height < cutoff_height {
continue;
}
} else {
warn!("Failed to parse last line of file: {:?}", subfile);
}
let mut scan_result = Scanner::scan_hour_file(
&subfile,
&mut 0,
ScanOptions { start_height: cutoff_height, only_load_ranges: true },
);
scan_result.new_blocks.clear(); // Only store ranges, load data lazily
u_cache.load_scan_result(scan_result);
}
u_cache.log_range_summary(root);
Ok(())
}
async fn start_local_ingest_loop(&self, current_head: u64) {
let root = self.local_ingest_dir.to_owned();
let cache = self.local_blocks_cache.clone();
tokio::spawn(async move {
let mut next_height = current_head;
let mut dt = loop {
if let Some(f) = FileOperations::find_latest_hourly_file(&root) {
break TimeUtils::datetime_from_path(&f).unwrap();
}
tokio::time::sleep(TAIL_INTERVAL).await;
};
let (mut hour, mut day_str, mut last_line) =
(dt.hour(), TimeUtils::date_from_datetime(dt), 0);
info!("Starting local ingest loop from height: {}", current_head);
loop {
let hour_file = root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"));
if hour_file.exists() {
let scan_result = Scanner::scan_hour_file(
&hour_file,
&mut last_line,
ScanOptions { start_height: next_height, only_load_ranges: false },
);
next_height = scan_result.next_expected_height;
cache.lock().await.load_scan_result(scan_result);
}
let now = OffsetDateTime::now_utc();
if dt + Duration::HOUR < now {
dt += Duration::HOUR;
(hour, day_str, last_line) = (dt.hour(), TimeUtils::date_from_datetime(dt), 0);
info!(
"Moving to new file: {:?}",
root.join(HOURLY_SUBDIR).join(&day_str).join(format!("{hour}"))
);
continue;
}
tokio::time::sleep(TAIL_INTERVAL).await;
}
});
}
pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> {
let _ = Self::try_backfill_local_blocks(
&self.local_ingest_dir,
&self.local_blocks_cache,
next_block_number,
)
.await;
self.start_local_ingest_loop(next_block_number).await;
Ok(())
}
pub async fn new(
fallback: BlockSourceBoxed,
local_ingest_dir: PathBuf,
next_block_number: u64,
) -> Self {
let block_source = Self {
fallback,
local_ingest_dir,
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))),
last_local_fetch: Arc::new(Mutex::new(None)),
};
block_source.run(next_block_number).await.unwrap();
block_source
}
}

View File

@ -0,0 +1,91 @@
use crate::node::types::{BlockAndReceipts, EvmBlock};
use serde::{Deserialize, Serialize};
use std::{
fs::File,
io::{BufRead, BufReader},
ops::RangeInclusive,
path::{Path, PathBuf},
};
use tracing::warn;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LocalBlockAndReceipts(pub String, pub BlockAndReceipts);
pub struct ScanResult {
pub path: PathBuf,
pub next_expected_height: u64,
pub new_blocks: Vec<BlockAndReceipts>,
pub new_block_ranges: Vec<RangeInclusive<u64>>,
}
pub struct ScanOptions {
pub start_height: u64,
pub only_load_ranges: bool,
}
pub struct Scanner;
impl Scanner {
pub fn line_to_evm_block(line: &str) -> serde_json::Result<(BlockAndReceipts, u64)> {
let LocalBlockAndReceipts(_, parsed_block): LocalBlockAndReceipts =
serde_json::from_str(line)?;
let height = match &parsed_block.block {
EvmBlock::Reth115(b) => b.header.header.number,
};
Ok((parsed_block, height))
}
pub fn scan_hour_file(path: &Path, last_line: &mut usize, options: ScanOptions) -> ScanResult {
let lines: Vec<String> =
BufReader::new(File::open(path).expect("Failed to open hour file"))
.lines()
.collect::<Result<_, _>>()
.unwrap();
let skip = if *last_line == 0 { 0 } else { *last_line - 1 };
let mut new_blocks = Vec::new();
let mut last_height = options.start_height;
let mut block_ranges = Vec::new();
let mut current_range: Option<(u64, u64)> = None;
for (line_idx, line) in lines.iter().enumerate().skip(skip) {
if line_idx < *last_line || line.trim().is_empty() {
continue;
}
match Self::line_to_evm_block(line) {
Ok((parsed_block, height)) => {
if height >= options.start_height {
last_height = last_height.max(height);
if !options.only_load_ranges {
new_blocks.push(parsed_block);
}
*last_line = line_idx;
}
match current_range {
Some((start, end)) if end + 1 == height => {
current_range = Some((start, height))
}
_ => {
if let Some((start, end)) = current_range.take() {
block_ranges.push(start..=end);
}
current_range = Some((height, height));
}
}
}
Err(_) => warn!("Failed to parse line: {}...", line.get(0..50).unwrap_or(line)),
}
}
if let Some((start, end)) = current_range {
block_ranges.push(start..=end);
}
ScanResult {
path: path.to_path_buf(),
next_expected_height: last_height + 1,
new_blocks,
new_block_ranges: block_ranges,
}
}
}

View File

@ -0,0 +1,187 @@
use super::*;
use crate::{
node::types::{reth_compat, ReadPrecompileCalls},
pseudo_peer::sources::LocalBlockSource,
};
use alloy_consensus::{BlockBody, Header};
use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256};
use std::{io::Write, time::Duration as StdDuration};
#[test]
fn test_datetime_from_path() {
let path = Path::new("/home/username/hl/data/evm_block_and_receipts/hourly/20250731/4");
let dt = TimeUtils::datetime_from_path(path).unwrap();
println!("{dt:?}");
}
#[tokio::test]
async fn test_backfill() {
let test_path = Path::new("/root/evm_block_and_receipts");
if !test_path.exists() {
return;
}
let cache = Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE)));
HlNodeBlockSource::try_backfill_local_blocks(test_path, &cache, 1000000).await.unwrap();
let u_cache = cache.lock().await;
assert_eq!(
u_cache.get_path_for_height(9735058),
Some(test_path.join(HOURLY_SUBDIR).join("20250729").join("22"))
);
}
fn scan_result_from_single_block(block: BlockAndReceipts) -> scan::ScanResult {
use crate::node::types::EvmBlock;
let height = match &block.block {
EvmBlock::Reth115(b) => b.header.header.number,
};
scan::ScanResult {
path: PathBuf::from("/nonexistent-block"),
next_expected_height: height + 1,
new_blocks: vec![block],
new_block_ranges: vec![height..=height],
}
}
fn empty_block(number: u64, timestamp: u64, extra_data: &'static [u8]) -> LocalBlockAndReceipts {
use crate::node::types::EvmBlock;
LocalBlockAndReceipts(
timestamp.to_string(),
BlockAndReceipts {
block: EvmBlock::Reth115(reth_compat::SealedBlock {
header: reth_compat::SealedHeader {
header: Header {
parent_hash: B256::ZERO,
ommers_hash: B256::ZERO,
beneficiary: Address::ZERO,
state_root: B256::ZERO,
transactions_root: B256::ZERO,
receipts_root: B256::ZERO,
logs_bloom: Bloom::ZERO,
difficulty: U256::ZERO,
number,
gas_limit: 0,
gas_used: 0,
timestamp,
extra_data: Bytes::from_static(extra_data),
mix_hash: B256::ZERO,
nonce: B64::ZERO,
base_fee_per_gas: None,
withdrawals_root: None,
blob_gas_used: None,
excess_blob_gas: None,
parent_beacon_block_root: None,
requests_hash: None,
},
hash: B256::ZERO,
},
body: BlockBody { transactions: vec![], ommers: vec![], withdrawals: None },
}),
receipts: vec![],
system_txs: vec![],
read_precompile_calls: ReadPrecompileCalls(vec![]),
highest_precompile_address: None,
},
)
}
fn setup_temp_dir_and_file() -> eyre::Result<(tempfile::TempDir, std::fs::File)> {
let now = OffsetDateTime::now_utc();
let temp_dir = tempfile::tempdir()?;
let path = temp_dir
.path()
.join(HOURLY_SUBDIR)
.join(TimeUtils::date_from_datetime(now))
.join(format!("{}", now.hour()));
std::fs::create_dir_all(path.parent().unwrap())?;
Ok((temp_dir, std::fs::File::create(path)?))
}
struct BlockSourceHierarchy {
block_source: HlNodeBlockSource,
_temp_dir: tempfile::TempDir,
file1: std::fs::File,
current_block: LocalBlockAndReceipts,
future_block_hl_node: LocalBlockAndReceipts,
future_block_fallback: LocalBlockAndReceipts,
}
async fn setup_block_source_hierarchy() -> eyre::Result<BlockSourceHierarchy> {
// Setup fallback block source
let block_source_fallback = HlNodeBlockSource::new(
BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))),
PathBuf::from("/nonexistent"),
1000000,
)
.await;
let block_hl_node_0 = empty_block(1000000, 1722633600, b"hl-node");
let block_hl_node_1 = empty_block(1000001, 1722633600, b"hl-node");
let block_fallback_1 = empty_block(1000001, 1722633600, b"fallback");
let (temp_dir1, mut file1) = setup_temp_dir_and_file()?;
writeln!(&mut file1, "{}", serde_json::to_string(&block_hl_node_0)?)?;
let block_source = HlNodeBlockSource::new(
BlockSourceBoxed::new(Box::new(block_source_fallback.clone())),
temp_dir1.path().to_path_buf(),
1000000,
)
.await;
block_source_fallback
.local_blocks_cache
.lock()
.await
.load_scan_result(scan_result_from_single_block(block_fallback_1.1.clone()));
Ok(BlockSourceHierarchy {
block_source,
_temp_dir: temp_dir1,
file1,
current_block: block_hl_node_0,
future_block_hl_node: block_hl_node_1,
future_block_fallback: block_fallback_1,
})
}
#[tokio::test]
async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> {
let hierarchy = setup_block_source_hierarchy().await?;
let BlockSourceHierarchy {
block_source, current_block, future_block_hl_node, mut file1, ..
} = hierarchy;
let block = block_source.collect_block(1000000).await.unwrap();
assert_eq!(block, current_block.1);
let block = block_source.collect_block(1000001).await;
assert!(block.is_err());
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?;
tokio::time::sleep(StdDuration::from_millis(100)).await;
let block = block_source.collect_block(1000001).await.unwrap();
assert_eq!(block, future_block_hl_node.1);
Ok(())
}
#[tokio::test]
async fn test_update_last_fetch_fallback() -> eyre::Result<()> {
let hierarchy = setup_block_source_hierarchy().await?;
let BlockSourceHierarchy {
block_source, current_block, future_block_fallback, mut file1, ..
} = hierarchy;
let block = block_source.collect_block(1000000).await.unwrap();
assert_eq!(block, current_block.1);
tokio::time::sleep(MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs()).await;
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?;
let block = block_source.collect_block(1000001).await.unwrap();
assert_eq!(block, future_block_fallback.1);
Ok(())
}

View File

@ -0,0 +1,19 @@
use std::path::Path;
use time::{macros::format_description, Date, OffsetDateTime, Time};
pub struct TimeUtils;
impl TimeUtils {
pub fn datetime_from_path(path: &Path) -> Option<OffsetDateTime> {
let (dt_part, hour_part) =
(path.parent()?.file_name()?.to_str()?, path.file_name()?.to_str()?);
Some(OffsetDateTime::new_utc(
Date::parse(dt_part, &format_description!("[year][month][day]")).ok()?,
Time::from_hms(hour_part.parse().ok()?, 0, 0).ok()?,
))
}
pub fn date_from_datetime(dt: OffsetDateTime) -> String {
dt.format(&format_description!("[year][month][day]")).unwrap()
}
}

View File

@ -0,0 +1,64 @@
use super::{utils, BlockSource};
use crate::node::types::BlockAndReceipts;
use eyre::Context;
use futures::{future::BoxFuture, FutureExt};
use std::path::PathBuf;
use tracing::info;
/// Block source that reads blocks from local filesystem (--ingest-dir)
#[derive(Debug, Clone)]
pub struct LocalBlockSource {
dir: PathBuf,
}
impl LocalBlockSource {
pub fn new(dir: impl Into<PathBuf>) -> Self {
Self { dir: dir.into() }
}
async fn pick_path_with_highest_number(dir: PathBuf, is_dir: bool) -> Option<(u64, String)> {
let files = std::fs::read_dir(&dir).unwrap().collect::<Vec<_>>();
let files = files
.into_iter()
.filter(|path| path.as_ref().unwrap().path().is_dir() == is_dir)
.map(|entry| entry.unwrap().path().to_string_lossy().to_string())
.collect::<Vec<_>>();
utils::name_with_largest_number(&files, is_dir)
}
}
impl BlockSource for LocalBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
let dir = self.dir.clone();
async move {
let path = dir.join(utils::rmp_path(height));
let file = tokio::fs::read(&path)
.await
.wrap_err_with(|| format!("Failed to read block from {path:?}"))?;
let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
Ok(blocks[0].clone())
}
.boxed()
}
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
let dir = self.dir.clone();
async move {
let (_, first_level) = Self::pick_path_with_highest_number(dir.clone(), true).await?;
let (_, second_level) =
Self::pick_path_with_highest_number(dir.join(first_level), true).await?;
let (block_number, third_level) =
Self::pick_path_with_highest_number(dir.join(second_level), false).await?;
info!("Latest block number: {} with path {}", block_number, third_level);
Some(block_number)
}
.boxed()
}
fn recommended_chunk_size(&self) -> u64 {
1000
}
}

View File

@ -1,228 +1,41 @@
use crate::node::types::BlockAndReceipts;
use aws_sdk_s3::types::RequestPayer;
use eyre::Context;
use futures::{future::BoxFuture, FutureExt};
use reth_network::cache::LruMap;
use std::{
path::PathBuf,
sync::{Arc, RwLock},
};
use tracing::info;
use futures::future::BoxFuture;
use std::sync::Arc;
// Module declarations
mod cached;
mod hl_node;
pub use hl_node::HlNodeBlockSource;
mod local;
mod s3;
mod utils;
// Public exports
pub use cached::CachedBlockSource;
pub use hl_node::HlNodeBlockSource;
pub use local::LocalBlockSource;
pub use s3::S3BlockSource;
/// Trait for block sources that can retrieve blocks from various sources
pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>>;
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>>;
/// Retrieves a block at the specified height
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>>;
/// Finds the latest block number available from this source
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>>;
/// Returns the recommended chunk size for batch operations
fn recommended_chunk_size(&self) -> u64;
}
/// Type alias for a boxed block source
pub type BlockSourceBoxed = Arc<Box<dyn BlockSource>>;
fn name_with_largest_number(files: &[String], is_dir: bool) -> Option<(u64, String)> {
let mut files = files
.iter()
.filter_map(|file_raw| {
let file = file_raw.strip_suffix("/").unwrap_or(file_raw).split("/").last().unwrap();
let stem = if is_dir { file } else { file.strip_suffix(".rmp.lz4")? };
stem.parse::<u64>().ok().map(|number| (number, file_raw.to_string()))
})
.collect::<Vec<_>>();
if files.is_empty() {
return None;
}
files.sort_by_key(|(number, _)| *number);
files.last().cloned()
}
#[derive(Debug, Clone)]
pub struct S3BlockSource {
client: aws_sdk_s3::Client,
bucket: String,
}
impl S3BlockSource {
pub fn new(client: aws_sdk_s3::Client, bucket: String) -> Self {
Self { client, bucket }
}
async fn pick_path_with_highest_number(
client: aws_sdk_s3::Client,
bucket: String,
dir: String,
is_dir: bool,
) -> Option<(u64, String)> {
let request = client
.list_objects()
.bucket(&bucket)
.prefix(dir)
.delimiter("/")
.request_payer(RequestPayer::Requester);
let response = request.send().await.ok()?;
let files: Vec<String> = if is_dir {
response
.common_prefixes
.unwrap()
.iter()
.map(|object| object.prefix.as_ref().unwrap().to_string())
.collect()
} else {
response
.contents
.unwrap()
.iter()
.map(|object| object.key.as_ref().unwrap().to_string())
.collect()
};
name_with_largest_number(&files, is_dir)
}
}
impl BlockSource for S3BlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
let client = self.client.clone();
let bucket = self.bucket.clone();
async move {
let path = rmp_path(height);
let request = client
.get_object()
.request_payer(RequestPayer::Requester)
.bucket(&bucket)
.key(path);
let response = request.send().await?;
let bytes = response.body.collect().await?.into_bytes();
let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[..]);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
Ok(blocks[0].clone())
}
.boxed()
}
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
let client = self.client.clone();
let bucket = self.bucket.clone();
async move {
let (_, first_level) = Self::pick_path_with_highest_number(
client.clone(),
bucket.clone(),
"".to_string(),
true,
)
.await?;
let (_, second_level) = Self::pick_path_with_highest_number(
client.clone(),
bucket.clone(),
first_level,
true,
)
.await?;
let (block_number, third_level) = Self::pick_path_with_highest_number(
client.clone(),
bucket.clone(),
second_level,
false,
)
.await?;
info!("Latest block number: {} with path {}", block_number, third_level);
Some(block_number)
}
.boxed()
}
fn recommended_chunk_size(&self) -> u64 {
1000
}
}
impl BlockSource for LocalBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
let dir = self.dir.clone();
async move {
let path = dir.join(rmp_path(height));
let file = tokio::fs::read(&path)
.await
.wrap_err_with(|| format!("Failed to read block from {path:?}"))?;
let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
Ok(blocks[0].clone())
}
.boxed()
}
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
let dir = self.dir.clone();
async move {
let (_, first_level) = Self::pick_path_with_highest_number(dir.clone(), true).await?;
let (_, second_level) =
Self::pick_path_with_highest_number(dir.join(first_level), true).await?;
let (block_number, third_level) =
Self::pick_path_with_highest_number(dir.join(second_level), false).await?;
info!("Latest block number: {} with path {}", block_number, third_level);
Some(block_number)
}
.boxed()
}
fn recommended_chunk_size(&self) -> u64 {
1000
}
}
#[derive(Debug, Clone)]
pub struct LocalBlockSource {
dir: PathBuf,
}
impl LocalBlockSource {
pub fn new(dir: impl Into<PathBuf>) -> Self {
Self { dir: dir.into() }
}
fn name_with_largest_number_static(files: &[String], is_dir: bool) -> Option<(u64, String)> {
let mut files = files
.iter()
.filter_map(|file_raw| {
let file = file_raw.strip_suffix("/").unwrap_or(file_raw);
let file = file.split("/").last().unwrap();
let stem = if is_dir { file } else { file.strip_suffix(".rmp.lz4")? };
stem.parse::<u64>().ok().map(|number| (number, file_raw.to_string()))
})
.collect::<Vec<_>>();
if files.is_empty() {
return None;
}
files.sort_by_key(|(number, _)| *number);
files.last().map(|(number, file)| (*number, file.to_string()))
}
async fn pick_path_with_highest_number(dir: PathBuf, is_dir: bool) -> Option<(u64, String)> {
let files = std::fs::read_dir(&dir).unwrap().collect::<Vec<_>>();
let files = files
.into_iter()
.filter(|path| path.as_ref().unwrap().path().is_dir() == is_dir)
.map(|entry| entry.unwrap().path().to_string_lossy().to_string())
.collect::<Vec<_>>();
Self::name_with_largest_number_static(&files, is_dir)
}
}
fn rmp_path(height: u64) -> String {
let f = ((height - 1) / 1_000_000) * 1_000_000;
let s = ((height - 1) / 1_000) * 1_000;
let path = format!("{f}/{s}/{height}.rmp.lz4");
path
}
impl BlockSource for BlockSourceBoxed {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
self.as_ref().collect_block(height)
}
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
self.as_ref().find_latest_block_number()
}
@ -230,40 +43,3 @@ impl BlockSource for BlockSourceBoxed {
self.as_ref().recommended_chunk_size()
}
}
#[derive(Debug, Clone)]
pub struct CachedBlockSource {
block_source: BlockSourceBoxed,
cache: Arc<RwLock<LruMap<u64, BlockAndReceipts>>>,
}
impl CachedBlockSource {
const CACHE_LIMIT: u32 = 100000;
pub fn new(block_source: BlockSourceBoxed) -> Self {
Self { block_source, cache: Arc::new(RwLock::new(LruMap::new(Self::CACHE_LIMIT))) }
}
}
impl BlockSource for CachedBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
let block_source = self.block_source.clone();
let cache = self.cache.clone();
async move {
if let Some(block) = cache.write().unwrap().get(&height) {
return Ok(block.clone());
}
let block = block_source.collect_block(height).await?;
cache.write().unwrap().insert(height, block.clone());
Ok(block)
}
.boxed()
}
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
self.block_source.find_latest_block_number()
}
fn recommended_chunk_size(&self) -> u64 {
self.block_source.recommended_chunk_size()
}
}

View File

@ -0,0 +1,90 @@
use super::{utils, BlockSource};
use crate::node::types::BlockAndReceipts;
use aws_sdk_s3::types::RequestPayer;
use futures::{future::BoxFuture, FutureExt};
use std::sync::Arc;
use tracing::info;
/// Block source that reads blocks from S3 (--s3)
#[derive(Debug, Clone)]
pub struct S3BlockSource {
client: Arc<aws_sdk_s3::Client>,
bucket: String,
}
impl S3BlockSource {
pub fn new(client: aws_sdk_s3::Client, bucket: String) -> Self {
Self { client: client.into(), bucket }
}
async fn pick_path_with_highest_number(
client: &aws_sdk_s3::Client,
bucket: &str,
dir: &str,
is_dir: bool,
) -> Option<(u64, String)> {
let request = client
.list_objects()
.bucket(bucket)
.prefix(dir)
.delimiter("/")
.request_payer(RequestPayer::Requester);
let response = request.send().await.ok()?;
let files: Vec<String> = if is_dir {
response
.common_prefixes?
.iter()
.map(|object| object.prefix.as_ref().unwrap().to_string())
.collect()
} else {
response
.contents?
.iter()
.map(|object| object.key.as_ref().unwrap().to_string())
.collect()
};
utils::name_with_largest_number(&files, is_dir)
}
}
impl BlockSource for S3BlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
let client = self.client.clone();
let bucket = self.bucket.clone();
async move {
let path = utils::rmp_path(height);
let request = client
.get_object()
.request_payer(RequestPayer::Requester)
.bucket(&bucket)
.key(path);
let response = request.send().await?;
let bytes = response.body.collect().await?.into_bytes();
let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[..]);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
Ok(blocks[0].clone())
}
.boxed()
}
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
let client = self.client.clone();
let bucket = self.bucket.clone();
async move {
let (_, first_level) =
Self::pick_path_with_highest_number(&client, &bucket, "", true).await?;
let (_, second_level) =
Self::pick_path_with_highest_number(&client, &bucket, &first_level, true).await?;
let (block_number, third_level) =
Self::pick_path_with_highest_number(&client, &bucket, &second_level, false).await?;
info!("Latest block number: {} with path {}", block_number, third_level);
Some(block_number)
}
.boxed()
}
fn recommended_chunk_size(&self) -> u64 {
1000
}
}

View File

@ -0,0 +1,26 @@
//! Shared utilities for block sources
/// Finds the file/directory with the largest number in its name from a list of files
pub fn name_with_largest_number(files: &[String], is_dir: bool) -> Option<(u64, String)> {
let mut files = files
.iter()
.filter_map(|file_raw| {
let file = file_raw.strip_suffix("/").unwrap_or(file_raw);
let file = file.split("/").last().unwrap();
let stem = if is_dir { file } else { file.strip_suffix(".rmp.lz4")? };
stem.parse::<u64>().ok().map(|number| (number, file_raw.to_string()))
})
.collect::<Vec<_>>();
if files.is_empty() {
return None;
}
files.sort_by_key(|(number, _)| *number);
files.last().cloned()
}
/// Generates the RMP file path for a given block height
pub fn rmp_path(height: u64) -> String {
let f = ((height - 1) / 1_000_000) * 1_000_000;
let s = ((height - 1) / 1_000) * 1_000;
format!("{f}/{s}/{height}.rmp.lz4")
}

View File

@ -17,14 +17,3 @@ async fn test_block_source_config_local() {
matches!(config.source_type, BlockSourceType::Local { path } if path == Path::new("/test/path"))
);
}
#[test]
fn test_error_types() {
let io_error = std::io::Error::new(std::io::ErrorKind::NotFound, "File not found");
let benchmark_error: PseudoPeerError = io_error.into();
match benchmark_error {
PseudoPeerError::Io(_) => (),
_ => panic!("Expected Io error"),
}
}