mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Compare commits
23 Commits
1908e9f414
...
nb-2025090
| Author | SHA1 | Date | |
|---|---|---|---|
| 3f08b0a4e6 | |||
| d7992ab8ff | |||
| b37a30fb37 | |||
| f6432498d8 | |||
| 772ff250ce | |||
| 5ee9053286 | |||
| 29e6972d58 | |||
| e87b9232cc | |||
| b004263f82 | |||
| 74e27b5ee2 | |||
| 09fcf0751f | |||
| 8f2eca4754 | |||
| 707b4fb709 | |||
| 62dd5a71b5 | |||
| 412c38a8cd | |||
| 796ea518bd | |||
| dd2c925af2 | |||
| 3ffd7bb351 | |||
| 52909eea3f | |||
| 0f9c2c5897 | |||
| ad4a8cd365 | |||
| 80506a7a43 | |||
| 2af312b628 |
@ -7,21 +7,22 @@
|
|||||||
//! For non-system transactions, we can just return the log as is, and the client will
|
//! For non-system transactions, we can just return the log as is, and the client will
|
||||||
//! adjust the transaction index accordingly.
|
//! adjust the transaction index accordingly.
|
||||||
|
|
||||||
use alloy_consensus::{transaction::TransactionMeta, TxReceipt};
|
use alloy_consensus::{transaction::TransactionMeta, BlockHeader, TxReceipt};
|
||||||
use alloy_eips::{BlockId, BlockNumberOrTag};
|
use alloy_eips::{BlockId, BlockNumberOrTag};
|
||||||
use alloy_json_rpc::RpcObject;
|
use alloy_json_rpc::RpcObject;
|
||||||
use alloy_primitives::{B256, U256};
|
use alloy_primitives::{B256, U256};
|
||||||
use alloy_rpc_types::{
|
use alloy_rpc_types::{
|
||||||
pubsub::{Params, SubscriptionKind},
|
pubsub::{Params, SubscriptionKind},
|
||||||
BlockTransactions, Filter, FilterChanges, FilterId, Log, PendingTransactionFilterKind,
|
BlockTransactions, Filter, FilterChanges, FilterId, Log, PendingTransactionFilterKind,
|
||||||
|
TransactionInfo,
|
||||||
};
|
};
|
||||||
use jsonrpsee::{proc_macros::rpc, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink};
|
use jsonrpsee::{proc_macros::rpc, PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink};
|
||||||
use jsonrpsee_core::{async_trait, RpcResult};
|
use jsonrpsee_core::{async_trait, RpcResult};
|
||||||
use jsonrpsee_types::ErrorObject;
|
use jsonrpsee_types::{error::INTERNAL_ERROR_CODE, ErrorObject};
|
||||||
use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner};
|
use reth::{api::FullNodeComponents, builder::rpc::RpcContext, tasks::TaskSpawner};
|
||||||
use reth_primitives_traits::{BlockBody as _, SignedTransaction};
|
use reth_primitives_traits::{BlockBody as _, SignedTransaction};
|
||||||
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
|
use reth_provider::{BlockIdReader, BlockReader, BlockReaderIdExt, ReceiptProvider};
|
||||||
use reth_rpc::{eth::pubsub::SubscriptionSerializeError, EthFilter, EthPubSub};
|
use reth_rpc::{eth::pubsub::SubscriptionSerializeError, EthFilter, EthPubSub, RpcTypes};
|
||||||
use reth_rpc_eth_api::{
|
use reth_rpc_eth_api::{
|
||||||
helpers::{EthBlocks, EthTransactions, LoadReceipt},
|
helpers::{EthBlocks, EthTransactions, LoadReceipt},
|
||||||
transaction::ConvertReceiptInput,
|
transaction::ConvertReceiptInput,
|
||||||
@ -42,8 +43,10 @@ pub trait EthWrapper:
|
|||||||
RpcBlock<Self::NetworkTypes>,
|
RpcBlock<Self::NetworkTypes>,
|
||||||
RpcReceipt<Self::NetworkTypes>,
|
RpcReceipt<Self::NetworkTypes>,
|
||||||
RpcHeader<Self::NetworkTypes>,
|
RpcHeader<Self::NetworkTypes>,
|
||||||
> + FullEthApiTypes<Primitives = HlPrimitives>
|
> + FullEthApiTypes<
|
||||||
+ RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
|
Primitives = HlPrimitives,
|
||||||
|
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
|
||||||
|
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
|
||||||
+ EthBlocks
|
+ EthBlocks
|
||||||
+ EthTransactions
|
+ EthTransactions
|
||||||
+ LoadReceipt
|
+ LoadReceipt
|
||||||
@ -58,8 +61,10 @@ impl<T> EthWrapper for T where
|
|||||||
RpcBlock<Self::NetworkTypes>,
|
RpcBlock<Self::NetworkTypes>,
|
||||||
RpcReceipt<Self::NetworkTypes>,
|
RpcReceipt<Self::NetworkTypes>,
|
||||||
RpcHeader<Self::NetworkTypes>,
|
RpcHeader<Self::NetworkTypes>,
|
||||||
> + FullEthApiTypes<Primitives = HlPrimitives>
|
> + FullEthApiTypes<
|
||||||
+ RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
|
Primitives = HlPrimitives,
|
||||||
|
NetworkTypes: RpcTypes<TransactionResponse = alloy_rpc_types_eth::Transaction>,
|
||||||
|
> + RpcNodeCoreExt<Provider: BlockReader<Block = HlBlock>>
|
||||||
+ EthBlocks
|
+ EthBlocks
|
||||||
+ EthTransactions
|
+ EthTransactions
|
||||||
+ LoadReceipt
|
+ LoadReceipt
|
||||||
@ -67,6 +72,217 @@ impl<T> EthWrapper for T where
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[rpc(server, namespace = "eth")]
|
||||||
|
#[async_trait]
|
||||||
|
pub trait EthSystemTransactionApi<T: RpcObject, R: RpcObject> {
|
||||||
|
#[method(name = "getEvmSystemTxsByBlockHash")]
|
||||||
|
async fn get_evm_system_txs_by_block_hash(&self, hash: B256) -> RpcResult<Option<Vec<T>>>;
|
||||||
|
|
||||||
|
#[method(name = "getEvmSystemTxsByBlockNumber")]
|
||||||
|
async fn get_evm_system_txs_by_block_number(
|
||||||
|
&self,
|
||||||
|
block_id: Option<BlockId>,
|
||||||
|
) -> RpcResult<Option<Vec<T>>>;
|
||||||
|
|
||||||
|
#[method(name = "getEvmSystemTxsReceiptsByBlockHash")]
|
||||||
|
async fn get_evm_system_txs_receipts_by_block_hash(
|
||||||
|
&self,
|
||||||
|
hash: B256,
|
||||||
|
) -> RpcResult<Option<Vec<R>>>;
|
||||||
|
|
||||||
|
#[method(name = "getEvmSystemTxsReceiptsByBlockNumber")]
|
||||||
|
async fn get_evm_system_txs_receipts_by_block_number(
|
||||||
|
&self,
|
||||||
|
block_id: Option<BlockId>,
|
||||||
|
) -> RpcResult<Option<Vec<R>>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct HlSystemTransactionExt<Eth: EthWrapper> {
|
||||||
|
eth_api: Eth,
|
||||||
|
_marker: PhantomData<Eth>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Eth: EthWrapper> HlSystemTransactionExt<Eth> {
|
||||||
|
pub fn new(eth_api: Eth) -> Self {
|
||||||
|
Self { eth_api, _marker: PhantomData }
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_system_txs_by_block_id(
|
||||||
|
&self,
|
||||||
|
block_id: BlockId,
|
||||||
|
) -> RpcResult<Option<Vec<RpcTransaction<Eth::NetworkTypes>>>>
|
||||||
|
where
|
||||||
|
jsonrpsee_types::ErrorObject<'static>: From<<Eth as EthApiTypes>::Error>,
|
||||||
|
{
|
||||||
|
if let Some(block) = self.eth_api.recovered_block(block_id).await? {
|
||||||
|
let block_hash = block.hash();
|
||||||
|
let block_number = block.number();
|
||||||
|
let base_fee_per_gas = block.base_fee_per_gas();
|
||||||
|
let system_txs = block
|
||||||
|
.transactions_with_sender()
|
||||||
|
.enumerate()
|
||||||
|
.filter_map(|(index, (signer, tx))| {
|
||||||
|
if tx.is_system_transaction() {
|
||||||
|
let tx_info = TransactionInfo {
|
||||||
|
hash: Some(*tx.tx_hash()),
|
||||||
|
block_hash: Some(block_hash),
|
||||||
|
block_number: Some(block_number),
|
||||||
|
base_fee: base_fee_per_gas,
|
||||||
|
index: Some(index as u64),
|
||||||
|
};
|
||||||
|
self.eth_api
|
||||||
|
.tx_resp_builder()
|
||||||
|
.fill(tx.clone().with_signer(*signer), tx_info)
|
||||||
|
.ok()
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Ok(Some(system_txs))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_system_txs_receipts_by_block_id(
|
||||||
|
&self,
|
||||||
|
block_id: BlockId,
|
||||||
|
) -> RpcResult<Option<Vec<RpcReceipt<Eth::NetworkTypes>>>>
|
||||||
|
where
|
||||||
|
jsonrpsee_types::ErrorObject<'static>: From<<Eth as EthApiTypes>::Error>,
|
||||||
|
{
|
||||||
|
if let Some((block, receipts)) =
|
||||||
|
EthBlocks::load_block_and_receipts(&self.eth_api, block_id).await?
|
||||||
|
{
|
||||||
|
let block_number = block.number;
|
||||||
|
let base_fee = block.base_fee_per_gas;
|
||||||
|
let block_hash = block.hash();
|
||||||
|
let excess_blob_gas = block.excess_blob_gas;
|
||||||
|
let timestamp = block.timestamp;
|
||||||
|
let mut gas_used = 0;
|
||||||
|
let mut next_log_index = 0;
|
||||||
|
|
||||||
|
let mut inputs = Vec::new();
|
||||||
|
for (idx, (tx, receipt)) in
|
||||||
|
block.transactions_recovered().zip(receipts.iter()).enumerate()
|
||||||
|
{
|
||||||
|
if receipt.cumulative_gas_used() != 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let meta = TransactionMeta {
|
||||||
|
tx_hash: *tx.tx_hash(),
|
||||||
|
index: idx as u64,
|
||||||
|
block_hash,
|
||||||
|
block_number,
|
||||||
|
base_fee,
|
||||||
|
excess_blob_gas,
|
||||||
|
timestamp,
|
||||||
|
};
|
||||||
|
|
||||||
|
let input = ConvertReceiptInput {
|
||||||
|
receipt: Cow::Borrowed(receipt),
|
||||||
|
tx,
|
||||||
|
gas_used: receipt.cumulative_gas_used() - gas_used,
|
||||||
|
next_log_index,
|
||||||
|
meta,
|
||||||
|
};
|
||||||
|
|
||||||
|
gas_used = receipt.cumulative_gas_used();
|
||||||
|
next_log_index += receipt.logs().len();
|
||||||
|
|
||||||
|
inputs.push(input);
|
||||||
|
}
|
||||||
|
|
||||||
|
let receipts = self.eth_api.tx_resp_builder().convert_receipts(inputs)?;
|
||||||
|
Ok(Some(receipts))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<Eth: EthWrapper>
|
||||||
|
EthSystemTransactionApiServer<RpcTransaction<Eth::NetworkTypes>, RpcReceipt<Eth::NetworkTypes>>
|
||||||
|
for HlSystemTransactionExt<Eth>
|
||||||
|
where
|
||||||
|
jsonrpsee_types::ErrorObject<'static>: From<<Eth as EthApiTypes>::Error>,
|
||||||
|
{
|
||||||
|
/// Returns the system transactions for a given block hash.
|
||||||
|
/// Semi-compliance with the `eth_getSystemTxsByBlockHash` RPC method introduced by hl-node.
|
||||||
|
/// https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/hyperevm/json-rpc
|
||||||
|
///
|
||||||
|
/// NOTE: Method name differs from hl-node because we retrieve transaction data from EVM
|
||||||
|
/// (signature recovery for 'from' address, EVM hash calculation) rather than HyperCore.
|
||||||
|
async fn get_evm_system_txs_by_block_hash(
|
||||||
|
&self,
|
||||||
|
hash: B256,
|
||||||
|
) -> RpcResult<Option<Vec<RpcTransaction<Eth::NetworkTypes>>>> {
|
||||||
|
trace!(target: "rpc::eth", ?hash, "Serving eth_getEvmSystemTxsByBlockHash");
|
||||||
|
match self.get_system_txs_by_block_id(BlockId::Hash(hash.into())).await {
|
||||||
|
Ok(txs) => Ok(txs),
|
||||||
|
// hl-node returns none if the block is not found
|
||||||
|
Err(_) => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the system transactions for a given block number, or the latest block if no block
|
||||||
|
/// number is provided. Semi-compliance with the `eth_getSystemTxsByBlockNumber` RPC method
|
||||||
|
/// introduced by hl-node. https://hyperliquid.gitbook.io/hyperliquid-docs/for-developers/hyperevm/json-rpc
|
||||||
|
///
|
||||||
|
/// NOTE: Method name differs from hl-node because we retrieve transaction data from EVM
|
||||||
|
/// (signature recovery for 'from' address, EVM hash calculation) rather than HyperCore.
|
||||||
|
async fn get_evm_system_txs_by_block_number(
|
||||||
|
&self,
|
||||||
|
id: Option<BlockId>,
|
||||||
|
) -> RpcResult<Option<Vec<RpcTransaction<Eth::NetworkTypes>>>> {
|
||||||
|
trace!(target: "rpc::eth", ?id, "Serving eth_getEvmSystemTxsByBlockNumber");
|
||||||
|
match self.get_system_txs_by_block_id(id.unwrap_or_default()).await? {
|
||||||
|
Some(txs) => Ok(Some(txs)),
|
||||||
|
None => {
|
||||||
|
// hl-node returns an error if the block is not found
|
||||||
|
Err(ErrorObject::owned(
|
||||||
|
INTERNAL_ERROR_CODE,
|
||||||
|
format!("invalid block height: {id:?}"),
|
||||||
|
Some(()),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the receipts for the system transactions for a given block hash.
|
||||||
|
async fn get_evm_system_txs_receipts_by_block_hash(
|
||||||
|
&self,
|
||||||
|
hash: B256,
|
||||||
|
) -> RpcResult<Option<Vec<RpcReceipt<Eth::NetworkTypes>>>> {
|
||||||
|
trace!(target: "rpc::eth", ?hash, "Serving eth_getEvmSystemTxsReceiptsByBlockHash");
|
||||||
|
match self.get_system_txs_receipts_by_block_id(BlockId::Hash(hash.into())).await {
|
||||||
|
Ok(receipts) => Ok(receipts),
|
||||||
|
// hl-node returns none if the block is not found
|
||||||
|
Err(_) => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns the receipts for the system transactions for a given block number, or the latest
|
||||||
|
/// block if no block
|
||||||
|
async fn get_evm_system_txs_receipts_by_block_number(
|
||||||
|
&self,
|
||||||
|
block_id: Option<BlockId>,
|
||||||
|
) -> RpcResult<Option<Vec<RpcReceipt<Eth::NetworkTypes>>>> {
|
||||||
|
trace!(target: "rpc::eth", ?block_id, "Serving eth_getEvmSystemTxsReceiptsByBlockNumber");
|
||||||
|
match self.get_system_txs_receipts_by_block_id(block_id.unwrap_or_default()).await? {
|
||||||
|
Some(receipts) => Ok(Some(receipts)),
|
||||||
|
None => Err(ErrorObject::owned(
|
||||||
|
INTERNAL_ERROR_CODE,
|
||||||
|
format!("invalid block height: {block_id:?}"),
|
||||||
|
Some(()),
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct HlNodeFilterHttp<Eth: EthWrapper> {
|
pub struct HlNodeFilterHttp<Eth: EthWrapper> {
|
||||||
filter: Arc<EthFilter<Eth>>,
|
filter: Arc<EthFilter<Eth>>,
|
||||||
provider: Arc<Eth::Provider>,
|
provider: Arc<Eth::Provider>,
|
||||||
@ -142,8 +358,9 @@ impl<Eth: EthWrapper> HlNodeFilterWs<Eth> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>>
|
impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>> for HlNodeFilterWs<Eth>
|
||||||
for HlNodeFilterWs<Eth>
|
where
|
||||||
|
jsonrpsee_types::error::ErrorObject<'static>: From<<Eth as EthApiTypes>::Error>,
|
||||||
{
|
{
|
||||||
async fn subscribe(
|
async fn subscribe(
|
||||||
&self,
|
&self,
|
||||||
@ -262,6 +479,11 @@ fn adjust_block<Eth: EthWrapper>(
|
|||||||
new_block.transactions = match new_block.transactions {
|
new_block.transactions = match new_block.transactions {
|
||||||
BlockTransactions::Full(mut transactions) => {
|
BlockTransactions::Full(mut transactions) => {
|
||||||
transactions.drain(..system_tx_count);
|
transactions.drain(..system_tx_count);
|
||||||
|
transactions.iter_mut().for_each(|tx| {
|
||||||
|
if let Some(idx) = &mut tx.transaction_index {
|
||||||
|
*idx -= system_tx_count as u64;
|
||||||
|
}
|
||||||
|
});
|
||||||
BlockTransactions::Full(transactions)
|
BlockTransactions::Full(transactions)
|
||||||
}
|
}
|
||||||
BlockTransactions::Hashes(mut hashes) => {
|
BlockTransactions::Hashes(mut hashes) => {
|
||||||
@ -437,7 +659,7 @@ where
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn install_hl_node_compliance<Node, EthApi>(
|
pub fn install_hl_node_compliance<Node, EthApi>(
|
||||||
ctx: RpcContext<Node, EthApi>,
|
ctx: &mut RpcContext<Node, EthApi>,
|
||||||
) -> Result<(), eyre::Error>
|
) -> Result<(), eyre::Error>
|
||||||
where
|
where
|
||||||
Node: FullNodeComponents,
|
Node: FullNodeComponents,
|
||||||
@ -464,5 +686,9 @@ where
|
|||||||
ctx.modules.replace_configured(
|
ctx.modules.replace_configured(
|
||||||
HlNodeBlockFilterHttp::new(Arc::new(ctx.registry.eth_api().clone())).into_rpc(),
|
HlNodeBlockFilterHttp::new(Arc::new(ctx.registry.eth_api().clone())).into_rpc(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
ctx.modules
|
||||||
|
.merge_configured(HlSystemTransactionExt::new(ctx.registry.eth_api().clone()).into_rpc())?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -72,10 +72,6 @@ impl EthChainSpec for HlChainSpec {
|
|||||||
fn bootnodes(&self) -> Option<Vec<NodeRecord>> {
|
fn bootnodes(&self) -> Option<Vec<NodeRecord>> {
|
||||||
self.inner.bootnodes()
|
self.inner.bootnodes()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_optimism(&self) -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Hardforks for HlChainSpec {
|
impl Hardforks for HlChainSpec {
|
||||||
@ -102,12 +98,6 @@ impl Hardforks for HlChainSpec {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<ChainSpec> for HlChainSpec {
|
|
||||||
fn from(value: ChainSpec) -> Self {
|
|
||||||
Self { inner: value }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl EthereumHardforks for HlChainSpec {
|
impl EthereumHardforks for HlChainSpec {
|
||||||
fn ethereum_fork_activation(&self, fork: EthereumHardfork) -> ForkCondition {
|
fn ethereum_fork_activation(&self, fork: EthereumHardfork) -> ForkCondition {
|
||||||
self.inner.ethereum_fork_activation(fork)
|
self.inner.ethereum_fork_activation(fork)
|
||||||
@ -122,12 +112,6 @@ impl EthExecutorSpec for HlChainSpec {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<HlChainSpec> for ChainSpec {
|
|
||||||
fn from(value: HlChainSpec) -> Self {
|
|
||||||
value.inner
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl HlChainSpec {
|
impl HlChainSpec {
|
||||||
pub const MAINNET_RPC_URL: &str = "https://rpc.hyperliquid.xyz/evm";
|
pub const MAINNET_RPC_URL: &str = "https://rpc.hyperliquid.xyz/evm";
|
||||||
pub const TESTNET_RPC_URL: &str = "https://rpc.hyperliquid-testnet.xyz/evm";
|
pub const TESTNET_RPC_URL: &str = "https://rpc.hyperliquid-testnet.xyz/evm";
|
||||||
|
|||||||
@ -1,20 +1,15 @@
|
|||||||
use revm::primitives::hardfork::SpecId;
|
use revm::primitives::hardfork::SpecId;
|
||||||
use std::str::FromStr;
|
|
||||||
|
|
||||||
#[repr(u8)]
|
#[repr(u8)]
|
||||||
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Default)]
|
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Default)]
|
||||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||||
pub enum HlSpecId {
|
pub enum HlSpecId {
|
||||||
|
/// Placeholder for evm cancun fork
|
||||||
#[default]
|
#[default]
|
||||||
V1, // V1
|
V1,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HlSpecId {
|
impl HlSpecId {
|
||||||
pub const fn is_enabled_in(self, other: HlSpecId) -> bool {
|
|
||||||
other as u8 <= self as u8
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Converts the [`HlSpecId`] into a [`SpecId`].
|
|
||||||
pub const fn into_eth_spec(self) -> SpecId {
|
pub const fn into_eth_spec(self) -> SpecId {
|
||||||
match self {
|
match self {
|
||||||
Self::V1 => SpecId::CANCUN,
|
Self::V1 => SpecId::CANCUN,
|
||||||
@ -23,31 +18,8 @@ impl HlSpecId {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl From<HlSpecId> for SpecId {
|
impl From<HlSpecId> for SpecId {
|
||||||
|
/// Converts the [`HlSpecId`] into a [`SpecId`].
|
||||||
fn from(spec: HlSpecId) -> Self {
|
fn from(spec: HlSpecId) -> Self {
|
||||||
spec.into_eth_spec()
|
spec.into_eth_spec()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// String identifiers for HL hardforks
|
|
||||||
pub mod name {
|
|
||||||
pub const V1: &str = "V1";
|
|
||||||
}
|
|
||||||
|
|
||||||
impl FromStr for HlSpecId {
|
|
||||||
type Err = String;
|
|
||||||
|
|
||||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
||||||
Ok(match s {
|
|
||||||
name::V1 => Self::V1,
|
|
||||||
_ => return Err(format!("Unknown HL spec: {s}")),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<HlSpecId> for &'static str {
|
|
||||||
fn from(spec_id: HlSpecId) -> Self {
|
|
||||||
match spec_id {
|
|
||||||
HlSpecId::V1 => name::V1,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@ -124,12 +124,13 @@ impl FromRecoveredTx<TransactionSigned> for HlTxEnv<TxEnv> {
|
|||||||
|
|
||||||
impl FromTxWithEncoded<TransactionSigned> for HlTxEnv<TxEnv> {
|
impl FromTxWithEncoded<TransactionSigned> for HlTxEnv<TxEnv> {
|
||||||
fn from_encoded_tx(tx: &TransactionSigned, sender: Address, _encoded: Bytes) -> Self {
|
fn from_encoded_tx(tx: &TransactionSigned, sender: Address, _encoded: Bytes) -> Self {
|
||||||
|
use reth_primitives::Transaction;
|
||||||
let base = match tx.clone().into_inner().into_typed_transaction() {
|
let base = match tx.clone().into_inner().into_typed_transaction() {
|
||||||
reth_primitives::Transaction::Legacy(tx) => TxEnv::from_recovered_tx(&tx, sender),
|
Transaction::Legacy(tx) => TxEnv::from_recovered_tx(&tx, sender),
|
||||||
reth_primitives::Transaction::Eip2930(tx) => TxEnv::from_recovered_tx(&tx, sender),
|
Transaction::Eip2930(tx) => TxEnv::from_recovered_tx(&tx, sender),
|
||||||
reth_primitives::Transaction::Eip1559(tx) => TxEnv::from_recovered_tx(&tx, sender),
|
Transaction::Eip1559(tx) => TxEnv::from_recovered_tx(&tx, sender),
|
||||||
reth_primitives::Transaction::Eip4844(tx) => TxEnv::from_recovered_tx(&tx, sender),
|
Transaction::Eip4844(tx) => TxEnv::from_recovered_tx(&tx, sender),
|
||||||
reth_primitives::Transaction::Eip7702(tx) => TxEnv::from_recovered_tx(&tx, sender),
|
Transaction::Eip7702(tx) => TxEnv::from_recovered_tx(&tx, sender),
|
||||||
};
|
};
|
||||||
|
|
||||||
Self { base }
|
Self { base }
|
||||||
|
|||||||
@ -39,7 +39,7 @@ fn main() -> eyre::Result<()> {
|
|||||||
let (node, engine_handle_tx) = HlNode::new(ext.block_source_args.parse().await?);
|
let (node, engine_handle_tx) = HlNode::new(ext.block_source_args.parse().await?);
|
||||||
let NodeHandle { node, node_exit_future: exit_future } = builder
|
let NodeHandle { node, node_exit_future: exit_future } = builder
|
||||||
.node(node)
|
.node(node)
|
||||||
.extend_rpc_modules(move |ctx| {
|
.extend_rpc_modules(move |mut ctx| {
|
||||||
let upstream_rpc_url =
|
let upstream_rpc_url =
|
||||||
ext.upstream_rpc_url.unwrap_or_else(|| default_upstream_rpc_url.to_owned());
|
ext.upstream_rpc_url.unwrap_or_else(|| default_upstream_rpc_url.to_owned());
|
||||||
|
|
||||||
@ -60,10 +60,15 @@ fn main() -> eyre::Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ext.hl_node_compliant {
|
if ext.hl_node_compliant {
|
||||||
install_hl_node_compliance(ctx)?;
|
install_hl_node_compliance(&mut ctx)?;
|
||||||
info!("hl-node compliant mode enabled");
|
info!("hl-node compliant mode enabled");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !ext.experimental_eth_get_proof {
|
||||||
|
ctx.modules.remove_method_from_configured("eth_getProof");
|
||||||
|
info!("eth_getProof is disabled by default");
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.apply(|builder| {
|
.apply(|builder| {
|
||||||
|
|||||||
@ -55,6 +55,24 @@ pub struct HlNodeArgs {
|
|||||||
/// This is useful when read precompile is needed for gas estimation.
|
/// This is useful when read precompile is needed for gas estimation.
|
||||||
#[arg(long, env = "FORWARD_CALL")]
|
#[arg(long, env = "FORWARD_CALL")]
|
||||||
pub forward_call: bool,
|
pub forward_call: bool,
|
||||||
|
|
||||||
|
/// Experimental: enables the eth_getProof RPC method.
|
||||||
|
///
|
||||||
|
/// Note: Due to the state root difference, trie updates* may not function correctly in all
|
||||||
|
/// scenarios. For example, incremental root updates are not possible, which can cause
|
||||||
|
/// eth_getProof to malfunction in some cases.
|
||||||
|
///
|
||||||
|
/// This limitation does not impact normal node functionality, except for state root (which is
|
||||||
|
/// unused) and eth_getProof. The archival state is maintained by block order, not by trie
|
||||||
|
/// updates. As a precaution, nanoreth disables eth_getProof by default to prevent
|
||||||
|
/// potential issues.
|
||||||
|
///
|
||||||
|
/// Use --experimental-eth-get-proof to forcibly enable eth_getProof, assuming trie updates are
|
||||||
|
/// working as intended. Enabling this by default will be tracked in #15.
|
||||||
|
///
|
||||||
|
/// * Refers to the Merkle trie used for eth_getProof and state root, not actual state values.
|
||||||
|
#[arg(long, env = "EXPERIMENTAL_ETH_GET_PROOF")]
|
||||||
|
pub experimental_eth_get_proof: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The main reth_hl cli interface.
|
/// The main reth_hl cli interface.
|
||||||
|
|||||||
@ -4,7 +4,7 @@ use crate::{
|
|||||||
hardforks::HlHardforks,
|
hardforks::HlHardforks,
|
||||||
node::{
|
node::{
|
||||||
primitives::TransactionSigned,
|
primitives::TransactionSigned,
|
||||||
types::{ReadPrecompileInput, ReadPrecompileResult},
|
types::{HlExtras, ReadPrecompileInput, ReadPrecompileResult},
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use alloy_consensus::{Transaction, TxReceipt};
|
use alloy_consensus::{Transaction, TxReceipt};
|
||||||
@ -102,7 +102,7 @@ where
|
|||||||
{
|
{
|
||||||
/// Creates a new HlBlockExecutor.
|
/// Creates a new HlBlockExecutor.
|
||||||
pub fn new(mut evm: EVM, ctx: HlBlockExecutionCtx<'a>, spec: Spec, receipt_builder: R) -> Self {
|
pub fn new(mut evm: EVM, ctx: HlBlockExecutionCtx<'a>, spec: Spec, receipt_builder: R) -> Self {
|
||||||
apply_precompiles(&mut evm, &ctx);
|
apply_precompiles(&mut evm, &ctx.extras);
|
||||||
Self { spec, evm, gas_used: 0, receipts: vec![], receipt_builder, ctx }
|
Self { spec, evm, gas_used: 0, receipts: vec![], receipt_builder, ctx }
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -155,7 +155,7 @@ where
|
|||||||
type Evm = E;
|
type Evm = E;
|
||||||
|
|
||||||
fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
|
fn apply_pre_execution_changes(&mut self) -> Result<(), BlockExecutionError> {
|
||||||
apply_precompiles(&mut self.evm, &self.ctx);
|
apply_precompiles(&mut self.evm, &self.ctx.extras);
|
||||||
self.deploy_corewriter_contract()?;
|
self.deploy_corewriter_contract()?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -240,10 +240,9 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn apply_precompiles<'a, DB, EVM>(evm: &mut EVM, ctx: &HlBlockExecutionCtx<'a>)
|
pub fn apply_precompiles<EVM>(evm: &mut EVM, extras: &HlExtras)
|
||||||
where
|
where
|
||||||
EVM: Evm<DB = &'a mut State<DB>, Precompiles = PrecompilesMap>,
|
EVM: Evm<Precompiles = PrecompilesMap>,
|
||||||
DB: Database + 'a,
|
|
||||||
{
|
{
|
||||||
let block_number = evm.block().number;
|
let block_number = evm.block().number;
|
||||||
let precompiles_mut = evm.precompiles_mut();
|
let precompiles_mut = evm.precompiles_mut();
|
||||||
@ -255,9 +254,7 @@ where
|
|||||||
precompiles_mut.apply_precompile(&address, |_| None);
|
precompiles_mut.apply_precompile(&address, |_| None);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for (address, precompile) in
|
for (address, precompile) in extras.read_precompile_calls.clone().unwrap_or_default().0.iter() {
|
||||||
ctx.extras.read_precompile_calls.clone().unwrap_or_default().0.iter()
|
|
||||||
{
|
|
||||||
let precompile = precompile.clone();
|
let precompile = precompile.clone();
|
||||||
precompiles_mut.apply_precompile(address, |_| {
|
precompiles_mut.apply_precompile(address, |_| {
|
||||||
let precompiles_map: HashMap<ReadPrecompileInput, ReadPrecompileResult> =
|
let precompiles_map: HashMap<ReadPrecompileInput, ReadPrecompileResult> =
|
||||||
@ -271,7 +268,7 @@ where
|
|||||||
// NOTE: This is adapted from hyperliquid-dex/hyper-evm-sync#5
|
// NOTE: This is adapted from hyperliquid-dex/hyper-evm-sync#5
|
||||||
const WARM_PRECOMPILES_BLOCK_NUMBER: u64 = 8_197_684;
|
const WARM_PRECOMPILES_BLOCK_NUMBER: u64 = 8_197_684;
|
||||||
if block_number >= U256::from(WARM_PRECOMPILES_BLOCK_NUMBER) {
|
if block_number >= U256::from(WARM_PRECOMPILES_BLOCK_NUMBER) {
|
||||||
fill_all_precompiles(ctx, precompiles_mut);
|
fill_all_precompiles(extras, precompiles_mut);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -279,9 +276,9 @@ fn address_to_u64(address: Address) -> u64 {
|
|||||||
address.into_u256().try_into().unwrap()
|
address.into_u256().try_into().unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn fill_all_precompiles<'a>(ctx: &HlBlockExecutionCtx<'a>, precompiles_mut: &mut PrecompilesMap) {
|
fn fill_all_precompiles(extras: &HlExtras, precompiles_mut: &mut PrecompilesMap) {
|
||||||
let lowest_address = 0x800;
|
let lowest_address = 0x800;
|
||||||
let highest_address = ctx.extras.highest_precompile_address.map_or(0x80D, address_to_u64);
|
let highest_address = extras.highest_precompile_address.map_or(0x80D, address_to_u64);
|
||||||
for address in lowest_address..=highest_address {
|
for address in lowest_address..=highest_address {
|
||||||
let address = Address::from(U160::from(address));
|
let address = Address::from(U160::from(address));
|
||||||
precompiles_mut.apply_precompile(&address, |f| {
|
precompiles_mut.apply_precompile(&address, |f| {
|
||||||
|
|||||||
@ -32,6 +32,8 @@ mod factory;
|
|||||||
mod patch;
|
mod patch;
|
||||||
pub mod receipt_builder;
|
pub mod receipt_builder;
|
||||||
|
|
||||||
|
pub use executor::apply_precompiles;
|
||||||
|
|
||||||
/// HL EVM implementation.
|
/// HL EVM implementation.
|
||||||
///
|
///
|
||||||
/// This is a wrapper type around the `revm` evm with optional [`Inspector`] (tracing)
|
/// This is a wrapper type around the `revm` evm with optional [`Inspector`] (tracing)
|
||||||
|
|||||||
@ -212,10 +212,6 @@ impl From<EthereumTxEnvelope<TxEip4844WithSidecar<BlobTransactionSidecarVariant>
|
|||||||
impl Compress for TransactionSigned {
|
impl Compress for TransactionSigned {
|
||||||
type Compressed = Vec<u8>;
|
type Compressed = Vec<u8>;
|
||||||
|
|
||||||
fn compress(self) -> Self::Compressed {
|
|
||||||
self.into_inner().compress()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(&self, buf: &mut B) {
|
fn compress_to_buf<B: bytes::BufMut + AsMut<[u8]>>(&self, buf: &mut B) {
|
||||||
self.inner().compress_to_buf(buf);
|
self.inner().compress_to_buf(buf);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
use crate::node::rpc::HlEthApi;
|
use crate::node::rpc::{HlEthApi, HlRpcNodeCore};
|
||||||
use reth::rpc::server_types::eth::{
|
use reth::rpc::server_types::eth::{
|
||||||
builder::config::PendingBlockKind, error::FromEvmError, EthApiError, PendingBlock,
|
builder::config::PendingBlockKind, error::FromEvmError, EthApiError, PendingBlock,
|
||||||
};
|
};
|
||||||
@ -6,12 +6,12 @@ use reth_rpc_eth_api::{
|
|||||||
helpers::{
|
helpers::{
|
||||||
pending_block::PendingEnvBuilder, EthBlocks, LoadBlock, LoadPendingBlock, LoadReceipt,
|
pending_block::PendingEnvBuilder, EthBlocks, LoadBlock, LoadPendingBlock, LoadReceipt,
|
||||||
},
|
},
|
||||||
RpcConvert, RpcNodeCore,
|
RpcConvert,
|
||||||
};
|
};
|
||||||
|
|
||||||
impl<N, Rpc> EthBlocks for HlEthApi<N, Rpc>
|
impl<N, Rpc> EthBlocks for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
EthApiError: FromEvmError<N::Evm>,
|
EthApiError: FromEvmError<N::Evm>,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
@ -19,7 +19,7 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> LoadBlock for HlEthApi<N, Rpc>
|
impl<N, Rpc> LoadBlock for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
EthApiError: FromEvmError<N::Evm>,
|
EthApiError: FromEvmError<N::Evm>,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
@ -27,7 +27,7 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> LoadPendingBlock for HlEthApi<N, Rpc>
|
impl<N, Rpc> LoadPendingBlock for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
EthApiError: FromEvmError<N::Evm>,
|
EthApiError: FromEvmError<N::Evm>,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
@ -49,7 +49,7 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> LoadReceipt for HlEthApi<N, Rpc>
|
impl<N, Rpc> LoadReceipt for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
EthApiError: FromEvmError<N::Evm>,
|
EthApiError: FromEvmError<N::Evm>,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
|
|||||||
@ -1,14 +1,23 @@
|
|||||||
use super::HlEthApi;
|
use super::{HlEthApi, HlRpcNodeCore};
|
||||||
|
use crate::{node::evm::apply_precompiles, HlBlock};
|
||||||
|
use alloy_evm::Evm;
|
||||||
|
use alloy_primitives::B256;
|
||||||
use reth::rpc::server_types::eth::EthApiError;
|
use reth::rpc::server_types::eth::EthApiError;
|
||||||
use reth_evm::TxEnvFor;
|
use reth_evm::{ConfigureEvm, Database, EvmEnvFor, TxEnvFor};
|
||||||
|
use reth_primitives::{NodePrimitives, Recovered};
|
||||||
|
use reth_primitives_traits::SignedTransaction;
|
||||||
|
use reth_provider::{ProviderError, ProviderTx};
|
||||||
use reth_rpc_eth_api::{
|
use reth_rpc_eth_api::{
|
||||||
helpers::{estimate::EstimateCall, Call, EthCall},
|
helpers::{estimate::EstimateCall, Call, EthCall},
|
||||||
FromEvmError, RpcConvert, RpcNodeCore,
|
FromEvmError, RpcConvert, RpcNodeCore,
|
||||||
};
|
};
|
||||||
|
use revm::DatabaseCommit;
|
||||||
|
|
||||||
|
impl<N> HlRpcNodeCore for N where N: RpcNodeCore<Primitives: NodePrimitives<Block = HlBlock>> {}
|
||||||
|
|
||||||
impl<N, Rpc> EthCall for HlEthApi<N, Rpc>
|
impl<N, Rpc> EthCall for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
EthApiError: FromEvmError<N::Evm>,
|
EthApiError: FromEvmError<N::Evm>,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError, TxEnv = TxEnvFor<N::Evm>>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError, TxEnv = TxEnvFor<N::Evm>>,
|
||||||
{
|
{
|
||||||
@ -16,7 +25,7 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> EstimateCall for HlEthApi<N, Rpc>
|
impl<N, Rpc> EstimateCall for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
EthApiError: FromEvmError<N::Evm>,
|
EthApiError: FromEvmError<N::Evm>,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError, TxEnv = TxEnvFor<N::Evm>>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError, TxEnv = TxEnvFor<N::Evm>>,
|
||||||
{
|
{
|
||||||
@ -24,7 +33,7 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> Call for HlEthApi<N, Rpc>
|
impl<N, Rpc> Call for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
EthApiError: FromEvmError<N::Evm>,
|
EthApiError: FromEvmError<N::Evm>,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError, TxEnv = TxEnvFor<N::Evm>>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError, TxEnv = TxEnvFor<N::Evm>>,
|
||||||
{
|
{
|
||||||
@ -37,4 +46,35 @@ where
|
|||||||
fn max_simulate_blocks(&self) -> u64 {
|
fn max_simulate_blocks(&self) -> u64 {
|
||||||
self.inner.eth_api.max_simulate_blocks()
|
self.inner.eth_api.max_simulate_blocks()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn replay_transactions_until<'a, DB, I>(
|
||||||
|
&self,
|
||||||
|
db: &mut DB,
|
||||||
|
evm_env: EvmEnvFor<Self::Evm>,
|
||||||
|
transactions: I,
|
||||||
|
target_tx_hash: B256,
|
||||||
|
) -> Result<usize, Self::Error>
|
||||||
|
where
|
||||||
|
DB: Database<Error = ProviderError> + DatabaseCommit + core::fmt::Debug,
|
||||||
|
I: IntoIterator<Item = Recovered<&'a ProviderTx<Self::Provider>>>,
|
||||||
|
{
|
||||||
|
let block_number = evm_env.block_env().number;
|
||||||
|
let hl_extras = self.get_hl_extras(block_number.try_into().unwrap())?;
|
||||||
|
|
||||||
|
let mut evm = self.evm_config().evm_with_env(db, evm_env);
|
||||||
|
apply_precompiles(&mut evm, &hl_extras);
|
||||||
|
|
||||||
|
let mut index = 0;
|
||||||
|
for tx in transactions {
|
||||||
|
if *tx.tx_hash() == target_tx_hash {
|
||||||
|
// reached the target transaction
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let tx_env = self.evm_config().tx_env(tx);
|
||||||
|
evm.transact_commit(tx_env).map_err(Self::Error::from_evm_err)?;
|
||||||
|
index += 1;
|
||||||
|
}
|
||||||
|
Ok(index)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,3 +1,9 @@
|
|||||||
|
use crate::{
|
||||||
|
chainspec::HlChainSpec,
|
||||||
|
node::{evm::apply_precompiles, types::HlExtras},
|
||||||
|
HlBlock, HlPrimitives,
|
||||||
|
};
|
||||||
|
use alloy_evm::Evm;
|
||||||
use alloy_network::Ethereum;
|
use alloy_network::Ethereum;
|
||||||
use alloy_primitives::U256;
|
use alloy_primitives::U256;
|
||||||
use reth::{
|
use reth::{
|
||||||
@ -18,8 +24,9 @@ use reth::{
|
|||||||
TaskSpawner,
|
TaskSpawner,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use reth_evm::ConfigureEvm;
|
use reth_evm::{ConfigureEvm, Database, EvmEnvFor, HaltReasonFor, InspectorFor, TxEnvFor};
|
||||||
use reth_provider::{ChainSpecProvider, ProviderHeader, ProviderTx};
|
use reth_primitives::NodePrimitives;
|
||||||
|
use reth_provider::{BlockReader, ChainSpecProvider, ProviderError, ProviderHeader, ProviderTx};
|
||||||
use reth_rpc::RpcTypes;
|
use reth_rpc::RpcTypes;
|
||||||
use reth_rpc_eth_api::{
|
use reth_rpc_eth_api::{
|
||||||
helpers::{
|
helpers::{
|
||||||
@ -29,17 +36,18 @@ use reth_rpc_eth_api::{
|
|||||||
EthApiTypes, FromEvmError, RpcConvert, RpcConverter, RpcNodeCore, RpcNodeCoreExt,
|
EthApiTypes, FromEvmError, RpcConvert, RpcConverter, RpcNodeCore, RpcNodeCoreExt,
|
||||||
SignableTxRequest,
|
SignableTxRequest,
|
||||||
};
|
};
|
||||||
|
use revm::context::result::ResultAndState;
|
||||||
use std::{fmt, marker::PhantomData, sync::Arc};
|
use std::{fmt, marker::PhantomData, sync::Arc};
|
||||||
|
|
||||||
use crate::chainspec::HlChainSpec;
|
|
||||||
|
|
||||||
mod block;
|
mod block;
|
||||||
mod call;
|
mod call;
|
||||||
pub mod engine_api;
|
pub mod engine_api;
|
||||||
mod transaction;
|
mod transaction;
|
||||||
|
|
||||||
|
pub trait HlRpcNodeCore: RpcNodeCore<Primitives: NodePrimitives<Block = HlBlock>> {}
|
||||||
|
|
||||||
/// Container type `HlEthApi`
|
/// Container type `HlEthApi`
|
||||||
pub(crate) struct HlEthApiInner<N: RpcNodeCore, Rpc: RpcConvert> {
|
pub(crate) struct HlEthApiInner<N: HlRpcNodeCore, Rpc: RpcConvert> {
|
||||||
/// Gateway to node's core components.
|
/// Gateway to node's core components.
|
||||||
pub(crate) eth_api: EthApiInner<N, Rpc>,
|
pub(crate) eth_api: EthApiInner<N, Rpc>,
|
||||||
}
|
}
|
||||||
@ -48,14 +56,14 @@ type HlRpcConvert<N, NetworkT> =
|
|||||||
RpcConverter<NetworkT, <N as FullNodeComponents>::Evm, EthReceiptConverter<HlChainSpec>>;
|
RpcConverter<NetworkT, <N as FullNodeComponents>::Evm, EthReceiptConverter<HlChainSpec>>;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct HlEthApi<N: RpcNodeCore, Rpc: RpcConvert> {
|
pub struct HlEthApi<N: HlRpcNodeCore, Rpc: RpcConvert> {
|
||||||
/// Gateway to node's core components.
|
/// Gateway to node's core components.
|
||||||
pub(crate) inner: Arc<HlEthApiInner<N, Rpc>>,
|
pub(crate) inner: Arc<HlEthApiInner<N, Rpc>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<N, Rpc> fmt::Debug for HlEthApi<N, Rpc>
|
impl<N, Rpc> fmt::Debug for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
@ -65,7 +73,7 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> EthApiTypes for HlEthApi<N, Rpc>
|
impl<N, Rpc> EthApiTypes for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
type Error = EthApiError;
|
type Error = EthApiError;
|
||||||
@ -79,7 +87,7 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> RpcNodeCore for HlEthApi<N, Rpc>
|
impl<N, Rpc> RpcNodeCore for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives>,
|
Rpc: RpcConvert<Primitives = N::Primitives>,
|
||||||
{
|
{
|
||||||
type Primitives = N::Primitives;
|
type Primitives = N::Primitives;
|
||||||
@ -111,7 +119,7 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> RpcNodeCoreExt for HlEthApi<N, Rpc>
|
impl<N, Rpc> RpcNodeCoreExt for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -122,7 +130,7 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> EthApiSpec for HlEthApi<N, Rpc>
|
impl<N, Rpc> EthApiSpec for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
type Transaction = ProviderTx<Self::Provider>;
|
type Transaction = ProviderTx<Self::Provider>;
|
||||||
@ -141,7 +149,7 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> SpawnBlocking for HlEthApi<N, Rpc>
|
impl<N, Rpc> SpawnBlocking for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -162,7 +170,7 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> LoadFee for HlEthApi<N, Rpc>
|
impl<N, Rpc> LoadFee for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
EthApiError: FromEvmError<N::Evm>,
|
EthApiError: FromEvmError<N::Evm>,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
@ -179,14 +187,14 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> LoadState for HlEthApi<N, Rpc>
|
impl<N, Rpc> LoadState for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<N, Rpc> EthState for HlEthApi<N, Rpc>
|
impl<N, Rpc> EthState for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
#[inline]
|
#[inline]
|
||||||
@ -197,7 +205,7 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> EthFees for HlEthApi<N, Rpc>
|
impl<N, Rpc> EthFees for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
EthApiError: FromEvmError<N::Evm>,
|
EthApiError: FromEvmError<N::Evm>,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
@ -205,15 +213,50 @@ where
|
|||||||
|
|
||||||
impl<N, Rpc> Trace for HlEthApi<N, Rpc>
|
impl<N, Rpc> Trace for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
EthApiError: FromEvmError<N::Evm>,
|
EthApiError: FromEvmError<N::Evm>,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
|
fn inspect<DB, I>(
|
||||||
|
&self,
|
||||||
|
db: DB,
|
||||||
|
evm_env: EvmEnvFor<Self::Evm>,
|
||||||
|
tx_env: TxEnvFor<Self::Evm>,
|
||||||
|
inspector: I,
|
||||||
|
) -> Result<ResultAndState<HaltReasonFor<Self::Evm>>, Self::Error>
|
||||||
|
where
|
||||||
|
DB: Database<Error = ProviderError>,
|
||||||
|
I: InspectorFor<Self::Evm, DB>,
|
||||||
|
{
|
||||||
|
let block_number = evm_env.block_env().number;
|
||||||
|
let hl_extras = self.get_hl_extras(block_number.try_into().unwrap())?;
|
||||||
|
|
||||||
|
let mut evm = self.evm_config().evm_with_env_and_inspector(db, evm_env, inspector);
|
||||||
|
apply_precompiles(&mut evm, &hl_extras);
|
||||||
|
evm.transact(tx_env).map_err(Self::Error::from_evm_err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<N, Rpc> HlEthApi<N, Rpc>
|
||||||
|
where
|
||||||
|
N: HlRpcNodeCore,
|
||||||
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
|
{
|
||||||
|
fn get_hl_extras(&self, block_number: u64) -> Result<HlExtras, ProviderError> {
|
||||||
|
Ok(self
|
||||||
|
.provider()
|
||||||
|
.block_by_number(block_number)?
|
||||||
|
.map(|block| HlExtras {
|
||||||
|
read_precompile_calls: block.body.read_precompile_calls.clone(),
|
||||||
|
highest_precompile_address: block.body.highest_precompile_address,
|
||||||
|
})
|
||||||
|
.unwrap_or_default())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<N, Rpc> AddDevSigners for HlEthApi<N, Rpc>
|
impl<N, Rpc> AddDevSigners for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
Rpc: RpcConvert<
|
Rpc: RpcConvert<
|
||||||
Network: RpcTypes<TransactionRequest: SignableTxRequest<ProviderTx<N::Provider>>>,
|
Network: RpcTypes<TransactionRequest: SignableTxRequest<ProviderTx<N::Provider>>>,
|
||||||
>,
|
>,
|
||||||
@ -239,7 +282,7 @@ impl<NetworkT> Default for HlEthApiBuilder<NetworkT> {
|
|||||||
|
|
||||||
impl<N, NetworkT> EthApiBuilder<N> for HlEthApiBuilder<NetworkT>
|
impl<N, NetworkT> EthApiBuilder<N> for HlEthApiBuilder<NetworkT>
|
||||||
where
|
where
|
||||||
N: FullNodeComponents<Types: NodeTypes<ChainSpec = HlChainSpec>>
|
N: FullNodeComponents<Types: NodeTypes<ChainSpec = HlChainSpec, Primitives = HlPrimitives>>
|
||||||
+ RpcNodeCore<
|
+ RpcNodeCore<
|
||||||
Primitives = PrimitivesTy<N::Types>,
|
Primitives = PrimitivesTy<N::Types>,
|
||||||
Evm: ConfigureEvm<NextBlockEnvCtx: BuildPendingEnv<HeaderTy<N::Types>>>,
|
Evm: ConfigureEvm<NextBlockEnvCtx: BuildPendingEnv<HeaderTy<N::Types>>>,
|
||||||
|
|||||||
@ -1,21 +1,21 @@
|
|||||||
use crate::node::rpc::HlEthApi;
|
use crate::node::rpc::{HlEthApi, HlRpcNodeCore};
|
||||||
use alloy_primitives::{Bytes, B256};
|
use alloy_primitives::{Bytes, B256};
|
||||||
use reth::rpc::server_types::eth::EthApiError;
|
use reth::rpc::server_types::eth::EthApiError;
|
||||||
use reth_rpc_eth_api::{
|
use reth_rpc_eth_api::{
|
||||||
helpers::{spec::SignersForRpc, EthTransactions, LoadTransaction},
|
helpers::{spec::SignersForRpc, EthTransactions, LoadTransaction},
|
||||||
RpcConvert, RpcNodeCore,
|
RpcConvert,
|
||||||
};
|
};
|
||||||
|
|
||||||
impl<N, Rpc> LoadTransaction for HlEthApi<N, Rpc>
|
impl<N, Rpc> LoadTransaction for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<N, Rpc> EthTransactions for HlEthApi<N, Rpc>
|
impl<N, Rpc> EthTransactions for HlEthApi<N, Rpc>
|
||||||
where
|
where
|
||||||
N: RpcNodeCore,
|
N: HlRpcNodeCore,
|
||||||
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
Rpc: RpcConvert<Primitives = N::Primitives, Error = EthApiError>,
|
||||||
{
|
{
|
||||||
fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes> {
|
fn signers(&self) -> &SignersForRpc<Self::Provider, Self::NetworkTypes> {
|
||||||
|
|||||||
@ -1,3 +1,7 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use crate::pseudo_peer::HlNodeBlockSourceArgs;
|
||||||
|
|
||||||
use super::config::BlockSourceConfig;
|
use super::config::BlockSourceConfig;
|
||||||
use clap::{Args, Parser};
|
use clap::{Args, Parser};
|
||||||
use reth_node_core::args::LogArgs;
|
use reth_node_core::args::LogArgs;
|
||||||
@ -13,7 +17,7 @@ pub struct BlockSourceArgs {
|
|||||||
block_source: Option<String>,
|
block_source: Option<String>,
|
||||||
|
|
||||||
#[arg(long, alias = "local-ingest-dir")]
|
#[arg(long, alias = "local-ingest-dir")]
|
||||||
block_source_from_node: Option<String>,
|
local_ingest_dir: Option<String>,
|
||||||
|
|
||||||
/// Shorthand of --block-source=s3://hl-mainnet-evm-blocks
|
/// Shorthand of --block-source=s3://hl-mainnet-evm-blocks
|
||||||
#[arg(long, default_value_t = false)]
|
#[arg(long, default_value_t = false)]
|
||||||
@ -22,6 +26,19 @@ pub struct BlockSourceArgs {
|
|||||||
/// Shorthand of --block-source-from-node=~/hl/data/evm_blocks_and_receipts
|
/// Shorthand of --block-source-from-node=~/hl/data/evm_blocks_and_receipts
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
local: bool,
|
local: bool,
|
||||||
|
|
||||||
|
/// Interval for polling new blocks in S3 in milliseconds.
|
||||||
|
#[arg(id = "s3.polling-interval", long = "s3.polling-interval", default_value = "25")]
|
||||||
|
s3_polling_interval: u64,
|
||||||
|
|
||||||
|
/// Maximum allowed delay for the hl-node block source in milliseconds.
|
||||||
|
/// If this threshold is exceeded, the client falls back to other sources.
|
||||||
|
#[arg(
|
||||||
|
id = "local.fallback-threshold",
|
||||||
|
long = "local.fallback-threshold",
|
||||||
|
default_value = "5000"
|
||||||
|
)]
|
||||||
|
local_fallback_threshold: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockSourceArgs {
|
impl BlockSourceArgs {
|
||||||
@ -33,7 +50,10 @@ impl BlockSourceArgs {
|
|||||||
|
|
||||||
async fn create_base_config(&self) -> eyre::Result<BlockSourceConfig> {
|
async fn create_base_config(&self) -> eyre::Result<BlockSourceConfig> {
|
||||||
if self.s3 {
|
if self.s3 {
|
||||||
return Ok(BlockSourceConfig::s3_default().await);
|
return Ok(BlockSourceConfig::s3_default(Duration::from_millis(
|
||||||
|
self.s3_polling_interval,
|
||||||
|
))
|
||||||
|
.await);
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.local {
|
if self.local {
|
||||||
@ -47,18 +67,25 @@ impl BlockSourceArgs {
|
|||||||
};
|
};
|
||||||
|
|
||||||
if let Some(bucket) = value.strip_prefix("s3://") {
|
if let Some(bucket) = value.strip_prefix("s3://") {
|
||||||
Ok(BlockSourceConfig::s3(bucket.to_string()).await)
|
Ok(BlockSourceConfig::s3(
|
||||||
|
bucket.to_string(),
|
||||||
|
Duration::from_millis(self.s3_polling_interval),
|
||||||
|
)
|
||||||
|
.await)
|
||||||
} else {
|
} else {
|
||||||
Ok(BlockSourceConfig::local(value.into()))
|
Ok(BlockSourceConfig::local(value.into()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn apply_node_source_config(&self, config: BlockSourceConfig) -> BlockSourceConfig {
|
fn apply_node_source_config(&self, config: BlockSourceConfig) -> BlockSourceConfig {
|
||||||
let Some(block_source_from_node) = self.block_source_from_node.as_ref() else {
|
let Some(local_ingest_dir) = self.local_ingest_dir.as_ref() else {
|
||||||
return config;
|
return config;
|
||||||
};
|
};
|
||||||
|
|
||||||
config.with_block_source_from_node(block_source_from_node.to_string())
|
config.with_block_source_from_node(HlNodeBlockSourceArgs {
|
||||||
|
root: local_ingest_dir.into(),
|
||||||
|
fallback_threshold: Duration::from_millis(self.local_fallback_threshold),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,31 +1,38 @@
|
|||||||
use crate::chainspec::HlChainSpec;
|
use crate::chainspec::HlChainSpec;
|
||||||
|
|
||||||
use super::sources::{
|
use super::sources::{
|
||||||
BlockSourceBoxed, CachedBlockSource, HlNodeBlockSource, LocalBlockSource, S3BlockSource,
|
BlockSourceBoxed, CachedBlockSource, HlNodeBlockSource, HlNodeBlockSourceArgs,
|
||||||
|
LocalBlockSource, S3BlockSource,
|
||||||
};
|
};
|
||||||
use aws_config::BehaviorVersion;
|
use aws_config::BehaviorVersion;
|
||||||
use std::{env::home_dir, path::PathBuf, sync::Arc};
|
use std::{env::home_dir, path::PathBuf, sync::Arc, time::Duration};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct BlockSourceConfig {
|
pub struct BlockSourceConfig {
|
||||||
pub source_type: BlockSourceType,
|
pub source_type: BlockSourceType,
|
||||||
pub block_source_from_node: Option<String>,
|
pub block_source_from_node: Option<HlNodeBlockSourceArgs>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum BlockSourceType {
|
pub enum BlockSourceType {
|
||||||
S3Default,
|
S3Default { polling_interval: Duration },
|
||||||
S3 { bucket: String },
|
S3 { bucket: String, polling_interval: Duration },
|
||||||
Local { path: PathBuf },
|
Local { path: PathBuf },
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockSourceConfig {
|
impl BlockSourceConfig {
|
||||||
pub async fn s3_default() -> Self {
|
pub async fn s3_default(polling_interval: Duration) -> Self {
|
||||||
Self { source_type: BlockSourceType::S3Default, block_source_from_node: None }
|
Self {
|
||||||
|
source_type: BlockSourceType::S3Default { polling_interval },
|
||||||
|
block_source_from_node: None,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn s3(bucket: String) -> Self {
|
pub async fn s3(bucket: String, polling_interval: Duration) -> Self {
|
||||||
Self { source_type: BlockSourceType::S3 { bucket }, block_source_from_node: None }
|
Self {
|
||||||
|
source_type: BlockSourceType::S3 { bucket, polling_interval },
|
||||||
|
block_source_from_node: None,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn local(path: PathBuf) -> Self {
|
pub fn local(path: PathBuf) -> Self {
|
||||||
@ -45,15 +52,22 @@ impl BlockSourceConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn with_block_source_from_node(mut self, block_source_from_node: String) -> Self {
|
pub fn with_block_source_from_node(
|
||||||
|
mut self,
|
||||||
|
block_source_from_node: HlNodeBlockSourceArgs,
|
||||||
|
) -> Self {
|
||||||
self.block_source_from_node = Some(block_source_from_node);
|
self.block_source_from_node = Some(block_source_from_node);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn create_block_source(&self, chain_spec: HlChainSpec) -> BlockSourceBoxed {
|
pub async fn create_block_source(&self, chain_spec: HlChainSpec) -> BlockSourceBoxed {
|
||||||
match &self.source_type {
|
match &self.source_type {
|
||||||
BlockSourceType::S3Default => s3_block_source(chain_spec.official_s3_bucket()).await,
|
BlockSourceType::S3Default { polling_interval } => {
|
||||||
BlockSourceType::S3 { bucket } => s3_block_source(bucket).await,
|
s3_block_source(chain_spec.official_s3_bucket(), *polling_interval).await
|
||||||
|
}
|
||||||
|
BlockSourceType::S3 { bucket, polling_interval } => {
|
||||||
|
s3_block_source(bucket, *polling_interval).await
|
||||||
|
}
|
||||||
BlockSourceType::Local { path } => {
|
BlockSourceType::Local { path } => {
|
||||||
Arc::new(Box::new(LocalBlockSource::new(path.clone())))
|
Arc::new(Box::new(LocalBlockSource::new(path.clone())))
|
||||||
}
|
}
|
||||||
@ -72,7 +86,7 @@ impl BlockSourceConfig {
|
|||||||
Arc::new(Box::new(
|
Arc::new(Box::new(
|
||||||
HlNodeBlockSource::new(
|
HlNodeBlockSource::new(
|
||||||
fallback_block_source,
|
fallback_block_source,
|
||||||
PathBuf::from(block_source_from_node.clone()),
|
block_source_from_node.clone(),
|
||||||
next_block_number,
|
next_block_number,
|
||||||
)
|
)
|
||||||
.await,
|
.await,
|
||||||
@ -91,9 +105,9 @@ impl BlockSourceConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn s3_block_source(bucket: impl AsRef<str>) -> BlockSourceBoxed {
|
async fn s3_block_source(bucket: impl AsRef<str>, polling_interval: Duration) -> BlockSourceBoxed {
|
||||||
let client = aws_sdk_s3::Client::new(
|
let client = aws_sdk_s3::Client::new(
|
||||||
&aws_config::defaults(BehaviorVersion::latest()).region("ap-northeast-1").load().await,
|
&aws_config::defaults(BehaviorVersion::latest()).region("ap-northeast-1").load().await,
|
||||||
);
|
);
|
||||||
Arc::new(Box::new(S3BlockSource::new(client, bucket.as_ref().to_string())))
|
Arc::new(Box::new(S3BlockSource::new(client, bucket.as_ref().to_string(), polling_interval)))
|
||||||
}
|
}
|
||||||
|
|||||||
@ -8,13 +8,11 @@ pub mod config;
|
|||||||
pub mod network;
|
pub mod network;
|
||||||
pub mod service;
|
pub mod service;
|
||||||
pub mod sources;
|
pub mod sources;
|
||||||
#[cfg(test)]
|
|
||||||
mod tests;
|
|
||||||
pub mod utils;
|
pub mod utils;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
use tracing::info;
|
use tracing::{error, info};
|
||||||
|
|
||||||
pub use cli::*;
|
pub use cli::*;
|
||||||
pub use config::*;
|
pub use config::*;
|
||||||
@ -80,8 +78,11 @@ pub async fn start_pseudo_peer(
|
|||||||
_ = transaction_rx.recv() => {}
|
_ = transaction_rx.recv() => {}
|
||||||
|
|
||||||
Some(eth_req) = eth_rx.recv() => {
|
Some(eth_req) = eth_rx.recv() => {
|
||||||
service.process_eth_request(eth_req).await?;
|
if let Err(e) = service.process_eth_request(eth_req).await {
|
||||||
info!("Processed eth request");
|
error!("Error processing eth request: {e:?}");
|
||||||
|
} else {
|
||||||
|
info!("Processed eth request");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -26,7 +26,6 @@ use std::{
|
|||||||
pin::Pin,
|
pin::Pin,
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
task::{Context, Poll},
|
task::{Context, Poll},
|
||||||
time::Duration,
|
|
||||||
};
|
};
|
||||||
use tokio::{sync::mpsc, task::JoinHandle};
|
use tokio::{sync::mpsc, task::JoinHandle};
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, info};
|
||||||
@ -49,8 +48,6 @@ pub struct BlockPoller {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BlockPoller {
|
impl BlockPoller {
|
||||||
const POLL_INTERVAL: Duration = Duration::from_millis(25);
|
|
||||||
|
|
||||||
pub fn new_suspended<BS: BlockSource>(
|
pub fn new_suspended<BS: BlockSource>(
|
||||||
chain_id: u64,
|
chain_id: u64,
|
||||||
block_source: BS,
|
block_source: BS,
|
||||||
@ -77,6 +74,7 @@ impl BlockPoller {
|
|||||||
start_rx.recv().await.ok_or(eyre::eyre!("Failed to receive start signal"))?;
|
start_rx.recv().await.ok_or(eyre::eyre!("Failed to receive start signal"))?;
|
||||||
info!("Starting block poller");
|
info!("Starting block poller");
|
||||||
|
|
||||||
|
let polling_interval = block_source.polling_interval();
|
||||||
let mut next_block_number = block_source
|
let mut next_block_number = block_source
|
||||||
.find_latest_block_number()
|
.find_latest_block_number()
|
||||||
.await
|
.await
|
||||||
@ -88,7 +86,7 @@ impl BlockPoller {
|
|||||||
block_tx_clone.send((next_block_number, block)).await?;
|
block_tx_clone.send((next_block_number, block)).await?;
|
||||||
next_block_number += 1;
|
next_block_number += 1;
|
||||||
}
|
}
|
||||||
Err(_) => tokio::time::sleep(Self::POLL_INTERVAL).await,
|
Err(_) => tokio::time::sleep(polling_interval).await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -154,13 +152,14 @@ impl<BS: BlockSource> PseudoPeer<BS> {
|
|||||||
async fn collect_blocks(
|
async fn collect_blocks(
|
||||||
&self,
|
&self,
|
||||||
block_numbers: impl IntoIterator<Item = u64>,
|
block_numbers: impl IntoIterator<Item = u64>,
|
||||||
) -> Vec<BlockAndReceipts> {
|
) -> eyre::Result<Vec<BlockAndReceipts>> {
|
||||||
let block_numbers = block_numbers.into_iter().collect::<Vec<_>>();
|
let block_numbers = block_numbers.into_iter().collect::<Vec<_>>();
|
||||||
futures::stream::iter(block_numbers)
|
let res = futures::stream::iter(block_numbers)
|
||||||
.map(async |number| self.collect_block(number).await.unwrap())
|
.map(async |number| self.collect_block(number).await)
|
||||||
.buffered(self.block_source.recommended_chunk_size() as usize)
|
.buffered(self.block_source.recommended_chunk_size() as usize)
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
.await
|
.await;
|
||||||
|
res.into_iter().collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn process_eth_request(
|
pub async fn process_eth_request(
|
||||||
@ -187,7 +186,7 @@ impl<BS: BlockSource> PseudoPeer<BS> {
|
|||||||
HeadersDirection::Falling => {
|
HeadersDirection::Falling => {
|
||||||
self.collect_blocks((number + 1 - limit..number + 1).rev()).await
|
self.collect_blocks((number + 1 - limit..number + 1).rev()).await
|
||||||
}
|
}
|
||||||
}
|
}?
|
||||||
.into_par_iter()
|
.into_par_iter()
|
||||||
.map(|block| block.to_reth_block(chain_id).header.clone())
|
.map(|block| block.to_reth_block(chain_id).header.clone())
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
@ -205,7 +204,7 @@ impl<BS: BlockSource> PseudoPeer<BS> {
|
|||||||
|
|
||||||
let block_bodies = self
|
let block_bodies = self
|
||||||
.collect_blocks(numbers)
|
.collect_blocks(numbers)
|
||||||
.await
|
.await?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|block| block.to_reth_block(chain_id).body)
|
.map(|block| block.to_reth_block(chain_id).body)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
@ -342,7 +341,7 @@ impl<BS: BlockSource> PseudoPeer<BS> {
|
|||||||
|
|
||||||
debug!("Backfilling from {start_number} to {end_number}");
|
debug!("Backfilling from {start_number} to {end_number}");
|
||||||
// Collect blocks and cache them
|
// Collect blocks and cache them
|
||||||
let blocks = self.collect_blocks(uncached_block_numbers).await;
|
let blocks = self.collect_blocks(uncached_block_numbers).await?;
|
||||||
let block_map: HashMap<B256, u64> =
|
let block_map: HashMap<B256, u64> =
|
||||||
blocks.into_iter().map(|block| (block.hash(), block.number())).collect();
|
blocks.into_iter().map(|block| (block.hash(), block.number())).collect();
|
||||||
let maybe_block_number = block_map.get(&target_hash).copied();
|
let maybe_block_number = block_map.get(&target_hash).copied();
|
||||||
|
|||||||
@ -41,4 +41,8 @@ impl BlockSource for CachedBlockSource {
|
|||||||
fn recommended_chunk_size(&self) -> u64 {
|
fn recommended_chunk_size(&self) -> u64 {
|
||||||
self.block_source.recommended_chunk_size()
|
self.block_source.recommended_chunk_size()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn polling_interval(&self) -> std::time::Duration {
|
||||||
|
self.block_source.polling_interval()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,12 +17,12 @@ impl FileOperations {
|
|||||||
files.extend(
|
files.extend(
|
||||||
subentries
|
subentries
|
||||||
.filter_map(|f| f.ok().map(|f| f.path()))
|
.filter_map(|f| f.ok().map(|f| f.path()))
|
||||||
.filter(|p| TimeUtils::datetime_from_path(p).is_some()),
|
.filter_map(|p| TimeUtils::datetime_from_path(&p).map(|dt| (dt, p))),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
files.sort();
|
files.sort();
|
||||||
Some(files)
|
Some(files.into_iter().map(|(_, p)| p).collect())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn find_latest_hourly_file(root: &Path) -> Option<PathBuf> {
|
pub fn find_latest_hourly_file(root: &Path) -> Option<PathBuf> {
|
||||||
|
|||||||
@ -14,35 +14,39 @@ use self::{
|
|||||||
use super::{BlockSource, BlockSourceBoxed};
|
use super::{BlockSource, BlockSourceBoxed};
|
||||||
use crate::node::types::BlockAndReceipts;
|
use crate::node::types::BlockAndReceipts;
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::{
|
use std::{
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
};
|
};
|
||||||
use time::{Duration, OffsetDateTime};
|
use time::OffsetDateTime;
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
const TAIL_INTERVAL: std::time::Duration = std::time::Duration::from_millis(25);
|
|
||||||
const HOURLY_SUBDIR: &str = "hourly";
|
const HOURLY_SUBDIR: &str = "hourly";
|
||||||
const CACHE_SIZE: u32 = 8000; // 3660 blocks per hour
|
const CACHE_SIZE: u32 = 8000; // 3660 blocks per hour
|
||||||
const MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK: Duration = Duration::milliseconds(5000);
|
const ONE_HOUR: Duration = Duration::from_secs(60 * 60);
|
||||||
|
const TAIL_INTERVAL: Duration = Duration::from_millis(25);
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub(crate) struct LocalBlockAndReceipts(String, BlockAndReceipts);
|
pub struct HlNodeBlockSourceArgs {
|
||||||
|
pub root: PathBuf,
|
||||||
|
pub fallback_threshold: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
/// Block source that monitors the local ingest directory for the HL node.
|
/// Block source that monitors the local ingest directory for the HL node.
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct HlNodeBlockSource {
|
pub struct HlNodeBlockSource {
|
||||||
pub fallback: BlockSourceBoxed,
|
pub fallback: BlockSourceBoxed,
|
||||||
pub local_ingest_dir: PathBuf,
|
|
||||||
pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
|
pub local_blocks_cache: Arc<Mutex<LocalBlocksCache>>,
|
||||||
pub last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
|
pub last_local_fetch: Arc<Mutex<Option<(u64, OffsetDateTime)>>>,
|
||||||
|
pub args: HlNodeBlockSourceArgs,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockSource for HlNodeBlockSource {
|
impl BlockSource for HlNodeBlockSource {
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
||||||
let fallback = self.fallback.clone();
|
let fallback = self.fallback.clone();
|
||||||
|
let args = self.args.clone();
|
||||||
let local_blocks_cache = self.local_blocks_cache.clone();
|
let local_blocks_cache = self.local_blocks_cache.clone();
|
||||||
let last_local_fetch = self.last_local_fetch.clone();
|
let last_local_fetch = self.last_local_fetch.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
@ -55,7 +59,7 @@ impl BlockSource for HlNodeBlockSource {
|
|||||||
|
|
||||||
if let Some((last_height, last_poll_time)) = *last_local_fetch.lock().await {
|
if let Some((last_height, last_poll_time)) = *last_local_fetch.lock().await {
|
||||||
let more_recent = last_height < height;
|
let more_recent = last_height < height;
|
||||||
let too_soon = now - last_poll_time < MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK;
|
let too_soon = now - last_poll_time < args.fallback_threshold;
|
||||||
if more_recent && too_soon {
|
if more_recent && too_soon {
|
||||||
return Err(eyre::eyre!(
|
return Err(eyre::eyre!(
|
||||||
"Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up"
|
"Not found locally; limiting polling rate before fallback so that hl-node has chance to catch up"
|
||||||
@ -71,12 +75,12 @@ impl BlockSource for HlNodeBlockSource {
|
|||||||
|
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
||||||
let fallback = self.fallback.clone();
|
let fallback = self.fallback.clone();
|
||||||
let local_ingest_dir = self.local_ingest_dir.clone();
|
let args = self.args.clone();
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let Some(dir) = FileOperations::find_latest_hourly_file(&local_ingest_dir) else {
|
let Some(dir) = FileOperations::find_latest_hourly_file(&args.root) else {
|
||||||
warn!(
|
warn!(
|
||||||
"No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir",
|
"No EVM blocks from hl-node found at {:?}; fallback to s3/ingest-dir",
|
||||||
local_ingest_dir
|
args.root
|
||||||
);
|
);
|
||||||
return fallback.find_latest_block_number().await;
|
return fallback.find_latest_block_number().await;
|
||||||
};
|
};
|
||||||
@ -160,7 +164,7 @@ impl HlNodeBlockSource {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn start_local_ingest_loop(&self, current_head: u64) {
|
async fn start_local_ingest_loop(&self, current_head: u64) {
|
||||||
let root = self.local_ingest_dir.to_owned();
|
let root = self.args.root.to_owned();
|
||||||
let cache = self.local_blocks_cache.clone();
|
let cache = self.local_blocks_cache.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut next_height = current_head;
|
let mut next_height = current_head;
|
||||||
@ -185,8 +189,8 @@ impl HlNodeBlockSource {
|
|||||||
cache.lock().await.load_scan_result(scan_result);
|
cache.lock().await.load_scan_result(scan_result);
|
||||||
}
|
}
|
||||||
let now = OffsetDateTime::now_utc();
|
let now = OffsetDateTime::now_utc();
|
||||||
if dt + Duration::HOUR < now {
|
if dt + ONE_HOUR < now {
|
||||||
dt += Duration::HOUR;
|
dt += ONE_HOUR;
|
||||||
(hour, day_str, last_line) = (dt.hour(), TimeUtils::date_from_datetime(dt), 0);
|
(hour, day_str, last_line) = (dt.hour(), TimeUtils::date_from_datetime(dt), 0);
|
||||||
info!(
|
info!(
|
||||||
"Moving to new file: {:?}",
|
"Moving to new file: {:?}",
|
||||||
@ -201,7 +205,7 @@ impl HlNodeBlockSource {
|
|||||||
|
|
||||||
pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> {
|
pub(crate) async fn run(&self, next_block_number: u64) -> eyre::Result<()> {
|
||||||
let _ = Self::try_backfill_local_blocks(
|
let _ = Self::try_backfill_local_blocks(
|
||||||
&self.local_ingest_dir,
|
&self.args.root,
|
||||||
&self.local_blocks_cache,
|
&self.local_blocks_cache,
|
||||||
next_block_number,
|
next_block_number,
|
||||||
)
|
)
|
||||||
@ -212,12 +216,12 @@ impl HlNodeBlockSource {
|
|||||||
|
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
fallback: BlockSourceBoxed,
|
fallback: BlockSourceBoxed,
|
||||||
local_ingest_dir: PathBuf,
|
args: HlNodeBlockSourceArgs,
|
||||||
next_block_number: u64,
|
next_block_number: u64,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let block_source = Self {
|
let block_source = Self {
|
||||||
fallback,
|
fallback,
|
||||||
local_ingest_dir,
|
args,
|
||||||
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))),
|
local_blocks_cache: Arc::new(Mutex::new(LocalBlocksCache::new(CACHE_SIZE))),
|
||||||
last_local_fetch: Arc::new(Mutex::new(None)),
|
last_local_fetch: Arc::new(Mutex::new(None)),
|
||||||
};
|
};
|
||||||
|
|||||||
@ -1,11 +1,13 @@
|
|||||||
use super::*;
|
use super::*;
|
||||||
use crate::{
|
use crate::{
|
||||||
node::types::{reth_compat, ReadPrecompileCalls},
|
node::types::{reth_compat, ReadPrecompileCalls},
|
||||||
pseudo_peer::sources::LocalBlockSource,
|
pseudo_peer::sources::{hl_node::scan::LocalBlockAndReceipts, LocalBlockSource},
|
||||||
};
|
};
|
||||||
use alloy_consensus::{BlockBody, Header};
|
use alloy_consensus::{BlockBody, Header};
|
||||||
use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256};
|
use alloy_primitives::{Address, Bloom, Bytes, B256, B64, U256};
|
||||||
use std::{io::Write, time::Duration as StdDuration};
|
use std::{io::Write, time::Duration};
|
||||||
|
|
||||||
|
const DEFAULT_FALLBACK_THRESHOLD_FOR_TEST: Duration = Duration::from_millis(5000);
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_datetime_from_path() {
|
fn test_datetime_from_path() {
|
||||||
@ -111,7 +113,10 @@ async fn setup_block_source_hierarchy() -> eyre::Result<BlockSourceHierarchy> {
|
|||||||
// Setup fallback block source
|
// Setup fallback block source
|
||||||
let block_source_fallback = HlNodeBlockSource::new(
|
let block_source_fallback = HlNodeBlockSource::new(
|
||||||
BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))),
|
BlockSourceBoxed::new(Box::new(LocalBlockSource::new("/nonexistent"))),
|
||||||
PathBuf::from("/nonexistent"),
|
HlNodeBlockSourceArgs {
|
||||||
|
root: { PathBuf::from("/nonexistent") },
|
||||||
|
fallback_threshold: DEFAULT_FALLBACK_THRESHOLD_FOR_TEST,
|
||||||
|
},
|
||||||
1000000,
|
1000000,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@ -124,7 +129,10 @@ async fn setup_block_source_hierarchy() -> eyre::Result<BlockSourceHierarchy> {
|
|||||||
|
|
||||||
let block_source = HlNodeBlockSource::new(
|
let block_source = HlNodeBlockSource::new(
|
||||||
BlockSourceBoxed::new(Box::new(block_source_fallback.clone())),
|
BlockSourceBoxed::new(Box::new(block_source_fallback.clone())),
|
||||||
temp_dir1.path().to_path_buf(),
|
HlNodeBlockSourceArgs {
|
||||||
|
root: temp_dir1.path().to_path_buf(),
|
||||||
|
fallback_threshold: DEFAULT_FALLBACK_THRESHOLD_FOR_TEST,
|
||||||
|
},
|
||||||
1000000,
|
1000000,
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
@ -159,7 +167,7 @@ async fn test_update_last_fetch_no_fallback() -> eyre::Result<()> {
|
|||||||
assert!(block.is_err());
|
assert!(block.is_err());
|
||||||
|
|
||||||
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?;
|
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_hl_node)?)?;
|
||||||
tokio::time::sleep(StdDuration::from_millis(100)).await;
|
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||||
|
|
||||||
let block = block_source.collect_block(1000001).await.unwrap();
|
let block = block_source.collect_block(1000001).await.unwrap();
|
||||||
assert_eq!(block, future_block_hl_node.1);
|
assert_eq!(block, future_block_hl_node.1);
|
||||||
@ -177,7 +185,7 @@ async fn test_update_last_fetch_fallback() -> eyre::Result<()> {
|
|||||||
let block = block_source.collect_block(1000000).await.unwrap();
|
let block = block_source.collect_block(1000000).await.unwrap();
|
||||||
assert_eq!(block, current_block.1);
|
assert_eq!(block, current_block.1);
|
||||||
|
|
||||||
tokio::time::sleep(MAX_ALLOWED_THRESHOLD_BEFORE_FALLBACK.unsigned_abs()).await;
|
tokio::time::sleep(DEFAULT_FALLBACK_THRESHOLD_FOR_TEST).await;
|
||||||
|
|
||||||
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?;
|
writeln!(&mut file1, "{}", serde_json::to_string(&future_block_fallback)?)?;
|
||||||
let block = block_source.collect_block(1000001).await.unwrap();
|
let block = block_source.collect_block(1000001).await.unwrap();
|
||||||
@ -185,3 +193,22 @@ async fn test_update_last_fetch_fallback() -> eyre::Result<()> {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_hourly_files_sort() -> eyre::Result<()> {
|
||||||
|
let temp_dir = tempfile::tempdir()?;
|
||||||
|
// create 20250826/9, 20250826/14
|
||||||
|
let targets = [("20250826", "9"), ("20250826", "14")];
|
||||||
|
for (date, hour) in targets {
|
||||||
|
let hourly_file = temp_dir.path().join(HOURLY_SUBDIR).join(date).join(hour);
|
||||||
|
let parent = hourly_file.parent().unwrap();
|
||||||
|
std::fs::create_dir_all(parent)?;
|
||||||
|
std::fs::File::create(hourly_file)?;
|
||||||
|
}
|
||||||
|
let files = FileOperations::all_hourly_files(temp_dir.path()).unwrap();
|
||||||
|
let file_names: Vec<_> =
|
||||||
|
files.into_iter().map(|p| p.file_name().unwrap().to_string_lossy().into_owned()).collect();
|
||||||
|
|
||||||
|
assert_eq!(file_names, ["9", "14"]);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
use crate::node::types::BlockAndReceipts;
|
use crate::node::types::BlockAndReceipts;
|
||||||
|
use auto_impl::auto_impl;
|
||||||
use futures::future::BoxFuture;
|
use futures::future::BoxFuture;
|
||||||
use std::sync::Arc;
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
// Module declarations
|
// Module declarations
|
||||||
mod cached;
|
mod cached;
|
||||||
@ -11,11 +12,14 @@ mod utils;
|
|||||||
|
|
||||||
// Public exports
|
// Public exports
|
||||||
pub use cached::CachedBlockSource;
|
pub use cached::CachedBlockSource;
|
||||||
pub use hl_node::HlNodeBlockSource;
|
pub use hl_node::{HlNodeBlockSource, HlNodeBlockSourceArgs};
|
||||||
pub use local::LocalBlockSource;
|
pub use local::LocalBlockSource;
|
||||||
pub use s3::S3BlockSource;
|
pub use s3::S3BlockSource;
|
||||||
|
|
||||||
|
const DEFAULT_POLLING_INTERVAL: Duration = Duration::from_millis(25);
|
||||||
|
|
||||||
/// Trait for block sources that can retrieve blocks from various sources
|
/// Trait for block sources that can retrieve blocks from various sources
|
||||||
|
#[auto_impl(&, &mut, Box, Arc)]
|
||||||
pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static {
|
pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static {
|
||||||
/// Retrieves a block at the specified height
|
/// Retrieves a block at the specified height
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>>;
|
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>>;
|
||||||
@ -25,21 +29,12 @@ pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static {
|
|||||||
|
|
||||||
/// Returns the recommended chunk size for batch operations
|
/// Returns the recommended chunk size for batch operations
|
||||||
fn recommended_chunk_size(&self) -> u64;
|
fn recommended_chunk_size(&self) -> u64;
|
||||||
|
|
||||||
|
/// Returns the polling interval
|
||||||
|
fn polling_interval(&self) -> Duration {
|
||||||
|
DEFAULT_POLLING_INTERVAL
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Type alias for a boxed block source
|
/// Type alias for a boxed block source
|
||||||
pub type BlockSourceBoxed = Arc<Box<dyn BlockSource>>;
|
pub type BlockSourceBoxed = Arc<Box<dyn BlockSource>>;
|
||||||
|
|
||||||
impl BlockSource for BlockSourceBoxed {
|
|
||||||
fn collect_block(&self, height: u64) -> BoxFuture<'static, eyre::Result<BlockAndReceipts>> {
|
|
||||||
self.as_ref().collect_block(height)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn find_latest_block_number(&self) -> BoxFuture<'static, Option<u64>> {
|
|
||||||
self.as_ref().find_latest_block_number()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn recommended_chunk_size(&self) -> u64 {
|
|
||||||
self.as_ref().recommended_chunk_size()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@ -2,7 +2,7 @@ use super::{utils, BlockSource};
|
|||||||
use crate::node::types::BlockAndReceipts;
|
use crate::node::types::BlockAndReceipts;
|
||||||
use aws_sdk_s3::types::RequestPayer;
|
use aws_sdk_s3::types::RequestPayer;
|
||||||
use futures::{future::BoxFuture, FutureExt};
|
use futures::{future::BoxFuture, FutureExt};
|
||||||
use std::sync::Arc;
|
use std::{sync::Arc, time::Duration};
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
/// Block source that reads blocks from S3 (--s3)
|
/// Block source that reads blocks from S3 (--s3)
|
||||||
@ -10,11 +10,12 @@ use tracing::info;
|
|||||||
pub struct S3BlockSource {
|
pub struct S3BlockSource {
|
||||||
client: Arc<aws_sdk_s3::Client>,
|
client: Arc<aws_sdk_s3::Client>,
|
||||||
bucket: String,
|
bucket: String,
|
||||||
|
polling_interval: Duration,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl S3BlockSource {
|
impl S3BlockSource {
|
||||||
pub fn new(client: aws_sdk_s3::Client, bucket: String) -> Self {
|
pub fn new(client: aws_sdk_s3::Client, bucket: String, polling_interval: Duration) -> Self {
|
||||||
Self { client: client.into(), bucket }
|
Self { client: client.into(), bucket, polling_interval }
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn pick_path_with_highest_number(
|
async fn pick_path_with_highest_number(
|
||||||
@ -87,4 +88,8 @@ impl BlockSource for S3BlockSource {
|
|||||||
fn recommended_chunk_size(&self) -> u64 {
|
fn recommended_chunk_size(&self) -> u64 {
|
||||||
1000
|
1000
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn polling_interval(&self) -> Duration {
|
||||||
|
self.polling_interval
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,19 +0,0 @@
|
|||||||
use std::path::Path;
|
|
||||||
|
|
||||||
use crate::pseudo_peer::{prelude::*, BlockSourceType};
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_block_source_config_s3() {
|
|
||||||
let config = BlockSourceConfig::s3("test-bucket".to_string()).await;
|
|
||||||
assert!(
|
|
||||||
matches!(config.source_type, BlockSourceType::S3 { bucket } if bucket == "test-bucket")
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_block_source_config_local() {
|
|
||||||
let config = BlockSourceConfig::local("/test/path".into());
|
|
||||||
assert!(
|
|
||||||
matches!(config.source_type, BlockSourceType::Local { path } if path == Path::new("/test/path"))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user