Merge pull request #2 from sprites0/feat/hl-node-compliant

feat: Add --hl-node-compliant
This commit is contained in:
sprites0
2025-07-04 22:23:31 -04:00
committed by GitHub
9 changed files with 430 additions and 332 deletions

319
Cargo.lock generated
View File

@ -349,34 +349,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "alloy-op-evm"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "588a87b77b30452991151667522d2f2f724cec9c2ec6602e4187bc97f66d8095"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-evm",
"alloy-op-hardforks",
"alloy-primitives",
"auto_impl",
"op-alloy-consensus",
"op-revm",
"revm",
]
[[package]]
name = "alloy-op-hardforks"
version = "0.2.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a9a510692bef4871797062ca09ec7873c45dc68c7f3f72291165320f53606a3"
dependencies = [
"alloy-chains",
"alloy-hardforks",
"auto_impl",
]
[[package]] [[package]]
name = "alloy-primitives" name = "alloy-primitives"
version = "1.2.1" version = "1.2.1"
@ -5831,10 +5803,8 @@ checksum = "a8719d9b783b29cfa1cf8d591b894805786b9ab4940adc700a57fd0d5b721cf5"
dependencies = [ dependencies = [
"alloy-consensus", "alloy-consensus",
"alloy-eips", "alloy-eips",
"alloy-network",
"alloy-primitives", "alloy-primitives",
"alloy-rlp", "alloy-rlp",
"alloy-rpc-types-eth",
"alloy-serde", "alloy-serde",
"arbitrary", "arbitrary",
"derive_more", "derive_more",
@ -5843,57 +5813,6 @@ dependencies = [
"thiserror 2.0.12", "thiserror 2.0.12",
] ]
[[package]]
name = "op-alloy-flz"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a79f352fc3893dcd670172e615afef993a41798a1d3fc0db88a3e60ef2e70ecc"
[[package]]
name = "op-alloy-network"
version = "0.18.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "839a7a1826dc1d38fdf9c6d30d1f4ed8182c63816c97054e5815206f1ebf08c7"
dependencies = [
"alloy-consensus",
"alloy-network",
"alloy-primitives",
"alloy-provider",
"alloy-rpc-types-eth",
"alloy-signer",
"op-alloy-consensus",
"op-alloy-rpc-types",
]
[[package]]
name = "op-alloy-rpc-jsonrpsee"
version = "0.18.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b9d3de5348e2b34366413412f1f1534dc6b10d2cf6e8e1d97c451749c0c81c0"
dependencies = [
"alloy-primitives",
"jsonrpsee",
]
[[package]]
name = "op-alloy-rpc-types"
version = "0.18.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9640f9e78751e13963762a4a44c846e9ec7974b130c29a51706f40503fe49152"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-network-primitives",
"alloy-primitives",
"alloy-rpc-types-eth",
"alloy-serde",
"derive_more",
"op-alloy-consensus",
"serde",
"serde_json",
"thiserror 2.0.12",
]
[[package]] [[package]]
name = "op-alloy-rpc-types-engine" name = "op-alloy-rpc-types-engine"
version = "0.18.9" version = "0.18.9"
@ -5905,12 +5824,10 @@ dependencies = [
"alloy-primitives", "alloy-primitives",
"alloy-rlp", "alloy-rlp",
"alloy-rpc-types-engine", "alloy-rpc-types-engine",
"alloy-serde",
"derive_more", "derive_more",
"ethereum_ssz", "ethereum_ssz",
"ethereum_ssz_derive", "ethereum_ssz_derive",
"op-alloy-consensus", "op-alloy-consensus",
"serde",
"snap", "snap",
"thiserror 2.0.12", "thiserror 2.0.12",
] ]
@ -8419,128 +8336,6 @@ dependencies = [
"reth-trie-db", "reth-trie-db",
] ]
[[package]]
name = "reth-optimism-chainspec"
version = "1.5.0"
source = "git+https://github.com/sprites0/reth?rev=fc754e5983f055365325dc9a04632d5ba2c4a8bc#fc754e5983f055365325dc9a04632d5ba2c4a8bc"
dependencies = [
"alloy-chains",
"alloy-consensus",
"alloy-eips",
"alloy-genesis",
"alloy-hardforks",
"alloy-primitives",
"derive_more",
"op-alloy-rpc-types",
"reth-chainspec",
"reth-ethereum-forks",
"reth-network-peers",
"reth-optimism-forks",
"reth-optimism-primitives",
"reth-primitives-traits",
"serde_json",
]
[[package]]
name = "reth-optimism-consensus"
version = "1.5.0"
source = "git+https://github.com/sprites0/reth?rev=fc754e5983f055365325dc9a04632d5ba2c4a8bc#fc754e5983f055365325dc9a04632d5ba2c4a8bc"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-primitives",
"alloy-trie",
"op-alloy-consensus",
"reth-chainspec",
"reth-consensus",
"reth-consensus-common",
"reth-execution-types",
"reth-optimism-forks",
"reth-optimism-primitives",
"reth-primitives-traits",
"reth-storage-api",
"reth-storage-errors",
"reth-trie-common",
"revm",
"thiserror 2.0.12",
"tracing",
]
[[package]]
name = "reth-optimism-evm"
version = "1.5.0"
source = "git+https://github.com/sprites0/reth?rev=fc754e5983f055365325dc9a04632d5ba2c4a8bc#fc754e5983f055365325dc9a04632d5ba2c4a8bc"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-evm",
"alloy-op-evm",
"alloy-primitives",
"op-alloy-consensus",
"op-revm",
"reth-chainspec",
"reth-evm",
"reth-execution-errors",
"reth-execution-types",
"reth-optimism-chainspec",
"reth-optimism-consensus",
"reth-optimism-forks",
"reth-optimism-primitives",
"reth-primitives-traits",
"revm",
"thiserror 2.0.12",
]
[[package]]
name = "reth-optimism-forks"
version = "1.5.0"
source = "git+https://github.com/sprites0/reth?rev=fc754e5983f055365325dc9a04632d5ba2c4a8bc#fc754e5983f055365325dc9a04632d5ba2c4a8bc"
dependencies = [
"alloy-op-hardforks",
"alloy-primitives",
"once_cell",
"reth-ethereum-forks",
]
[[package]]
name = "reth-optimism-payload-builder"
version = "1.5.0"
source = "git+https://github.com/sprites0/reth?rev=fc754e5983f055365325dc9a04632d5ba2c4a8bc#fc754e5983f055365325dc9a04632d5ba2c4a8bc"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-primitives",
"alloy-rlp",
"alloy-rpc-types-debug",
"alloy-rpc-types-engine",
"derive_more",
"op-alloy-consensus",
"op-alloy-rpc-types-engine",
"reth-basic-payload-builder",
"reth-chain-state",
"reth-chainspec",
"reth-evm",
"reth-execution-types",
"reth-optimism-evm",
"reth-optimism-forks",
"reth-optimism-primitives",
"reth-optimism-txpool",
"reth-payload-builder",
"reth-payload-builder-primitives",
"reth-payload-primitives",
"reth-payload-util",
"reth-payload-validator",
"reth-primitives-traits",
"reth-revm",
"reth-storage-api",
"reth-transaction-pool",
"revm",
"serde",
"sha2 0.10.9",
"thiserror 2.0.12",
"tracing",
]
[[package]] [[package]]
name = "reth-optimism-primitives" name = "reth-optimism-primitives"
version = "1.5.0" version = "1.5.0"
@ -8552,7 +8347,6 @@ dependencies = [
"alloy-rlp", "alloy-rlp",
"arbitrary", "arbitrary",
"bytes", "bytes",
"modular-bitfield",
"op-alloy-consensus", "op-alloy-consensus",
"reth-codecs", "reth-codecs",
"reth-primitives-traits", "reth-primitives-traits",
@ -8561,102 +8355,6 @@ dependencies = [
"serde_with", "serde_with",
] ]
[[package]]
name = "reth-optimism-rpc"
version = "1.5.0"
source = "git+https://github.com/sprites0/reth?rev=fc754e5983f055365325dc9a04632d5ba2c4a8bc#fc754e5983f055365325dc9a04632d5ba2c4a8bc"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-json-rpc",
"alloy-primitives",
"alloy-rpc-client",
"alloy-rpc-types-debug",
"alloy-rpc-types-engine",
"alloy-rpc-types-eth",
"alloy-transport",
"alloy-transport-http",
"async-trait",
"derive_more",
"eyre",
"jsonrpsee",
"jsonrpsee-core",
"jsonrpsee-types",
"metrics",
"op-alloy-consensus",
"op-alloy-network",
"op-alloy-rpc-jsonrpsee",
"op-alloy-rpc-types",
"op-alloy-rpc-types-engine",
"op-revm",
"parking_lot",
"reqwest",
"reth-chain-state",
"reth-chainspec",
"reth-evm",
"reth-metrics",
"reth-network-api",
"reth-node-api",
"reth-node-builder",
"reth-optimism-evm",
"reth-optimism-forks",
"reth-optimism-payload-builder",
"reth-optimism-primitives",
"reth-optimism-txpool",
"reth-primitives-traits",
"reth-rpc",
"reth-rpc-api",
"reth-rpc-engine-api",
"reth-rpc-eth-api",
"reth-rpc-eth-types",
"reth-rpc-server-types",
"reth-storage-api",
"reth-tasks",
"reth-transaction-pool",
"revm",
"serde_json",
"thiserror 2.0.12",
"tokio",
"tower",
"tracing",
]
[[package]]
name = "reth-optimism-txpool"
version = "1.5.0"
source = "git+https://github.com/sprites0/reth?rev=fc754e5983f055365325dc9a04632d5ba2c4a8bc#fc754e5983f055365325dc9a04632d5ba2c4a8bc"
dependencies = [
"alloy-consensus",
"alloy-eips",
"alloy-json-rpc",
"alloy-primitives",
"alloy-rpc-client",
"alloy-rpc-types-eth",
"alloy-serde",
"c-kzg",
"derive_more",
"futures-util",
"metrics",
"op-alloy-consensus",
"op-alloy-flz",
"op-alloy-rpc-types",
"op-revm",
"parking_lot",
"reth-chain-state",
"reth-chainspec",
"reth-metrics",
"reth-optimism-evm",
"reth-optimism-forks",
"reth-optimism-primitives",
"reth-primitives-traits",
"reth-storage-api",
"reth-transaction-pool",
"serde",
"thiserror 2.0.12",
"tokio",
"tracing",
]
[[package]] [[package]]
name = "reth-payload-builder" name = "reth-payload-builder"
version = "1.5.0" version = "1.5.0"
@ -8708,16 +8406,6 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "reth-payload-util"
version = "1.5.0"
source = "git+https://github.com/sprites0/reth?rev=fc754e5983f055365325dc9a04632d5ba2c4a8bc#fc754e5983f055365325dc9a04632d5ba2c4a8bc"
dependencies = [
"alloy-consensus",
"alloy-primitives",
"reth-transaction-pool",
]
[[package]] [[package]]
name = "reth-payload-validator" name = "reth-payload-validator"
version = "1.5.0" version = "1.5.0"
@ -9074,13 +8762,8 @@ dependencies = [
"alloy-primitives", "alloy-primitives",
"alloy-rpc-types-eth", "alloy-rpc-types-eth",
"jsonrpsee-types", "jsonrpsee-types",
"op-alloy-consensus",
"op-alloy-rpc-types",
"op-revm",
"reth-evm", "reth-evm",
"reth-optimism-primitives",
"reth-primitives-traits", "reth-primitives-traits",
"reth-storage-api",
"revm-context", "revm-context",
"thiserror 2.0.12", "thiserror 2.0.12",
] ]
@ -9649,12 +9332,12 @@ dependencies = [
"reth-network-peers", "reth-network-peers",
"reth-node-core", "reth-node-core",
"reth-node-ethereum", "reth-node-ethereum",
"reth-optimism-rpc",
"reth-payload-primitives", "reth-payload-primitives",
"reth-primitives", "reth-primitives",
"reth-primitives-traits", "reth-primitives-traits",
"reth-provider", "reth-provider",
"reth-revm", "reth-revm",
"reth-rpc",
"reth-rpc-engine-api", "reth-rpc-engine-api",
"reth-rpc-eth-api", "reth-rpc-eth-api",
"reth-tracing", "reth-tracing",

View File

@ -36,11 +36,11 @@ reth-network-p2p = { git = "https://github.com/sprites0/reth", rev = "fc754e5983
reth-network-api = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-network-api = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-node-ethereum = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-node-ethereum = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-network-peers = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-network-peers = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-optimism-rpc = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-payload-primitives = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-payload-primitives = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-primitives = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-primitives = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-primitives-traits = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-primitives-traits = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-provider = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc", features = ["test-utils"] } reth-provider = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc", features = ["test-utils"] }
reth-rpc = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-rpc-eth-api = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-rpc-eth-api = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-rpc-engine-api = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-rpc-engine-api = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }
reth-tracing = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" } reth-tracing = { git = "https://github.com/sprites0/reth", rev = "fc754e5983f055365325dc9a04632d5ba2c4a8bc" }

302
src/hl_node_compliance.rs Normal file
View File

@ -0,0 +1,302 @@
/// We need to override the following methods:
/// Filter:
/// - eth_getLogs
/// - eth_subscribe
///
/// Block (handled by HlEthApi already):
/// - eth_getBlockByNumber/eth_getBlockByHash
/// - eth_getBlockReceipts
use crate::HlBlock;
use alloy_rpc_types::{
pubsub::{Params, SubscriptionKind},
Filter, FilterChanges, FilterId, Log, PendingTransactionFilterKind,
};
use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink};
use jsonrpsee_core::{async_trait, RpcResult};
use jsonrpsee_types::ErrorObject;
use reth::{
api::FullNodeComponents, builder::rpc::RpcContext, rpc::result::internal_rpc_err,
tasks::TaskSpawner,
};
use reth_network::NetworkInfo;
use reth_primitives::NodePrimitives;
use reth_primitives_traits::SignedTransaction as _;
use reth_provider::{BlockIdReader, BlockReader, TransactionsProvider};
use reth_rpc::{EthFilter, EthPubSub};
use reth_rpc_eth_api::{
EthApiServer, EthFilterApiServer, EthPubSubApiServer, FullEthApiTypes, RpcBlock, RpcHeader,
RpcNodeCore, RpcNodeCoreExt, RpcReceipt, RpcTransaction, RpcTxReq,
};
use serde::Serialize;
use std::{collections::HashSet, sync::Arc};
use tokio_stream::{Stream, StreamExt};
use tracing::{info, trace};
pub trait EthWrapper:
EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes
+ RpcNodeCoreExt<
Provider: BlockIdReader + BlockReader<Block = HlBlock>,
Primitives: NodePrimitives<
SignedTx = <<Self as RpcNodeCore>::Provider as TransactionsProvider>::Transaction,
>,
Network: NetworkInfo,
> + 'static
{
}
impl <
T:
EthApiServer<
RpcTxReq<Self::NetworkTypes>,
RpcTransaction<Self::NetworkTypes>,
RpcBlock<Self::NetworkTypes>,
RpcReceipt<Self::NetworkTypes>,
RpcHeader<Self::NetworkTypes>,
> + FullEthApiTypes
+ RpcNodeCoreExt<
Provider: BlockIdReader + BlockReader<Block = HlBlock>,
Primitives: NodePrimitives<
SignedTx = <<Self as RpcNodeCore>::Provider as TransactionsProvider>::Transaction,
>,
Network: NetworkInfo,
> + 'static
> EthWrapper for T {
}
pub struct HlNodeFilterHttp<Eth: EthWrapper> {
filter: Arc<EthFilter<Eth>>,
provider: Arc<Eth::Provider>,
}
impl<Eth: EthWrapper> HlNodeFilterHttp<Eth> {
pub fn new(filter: Arc<EthFilter<Eth>>, provider: Arc<Eth::Provider>) -> Self {
Self { filter, provider }
}
}
#[async_trait]
impl<Eth: EthWrapper> EthFilterApiServer<RpcTransaction<Eth::NetworkTypes>>
for HlNodeFilterHttp<Eth>
{
/// Handler for `eth_newFilter`
async fn new_filter(&self, filter: Filter) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newFilter");
self.filter.new_filter(filter).await
}
/// Handler for `eth_newBlockFilter`
async fn new_block_filter(&self) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newBlockFilter");
self.filter.new_block_filter().await
}
/// Handler for `eth_newPendingTransactionFilter`
async fn new_pending_transaction_filter(
&self,
kind: Option<PendingTransactionFilterKind>,
) -> RpcResult<FilterId> {
trace!(target: "rpc::eth", "Serving eth_newPendingTransactionFilter");
self.filter.new_pending_transaction_filter(kind).await
}
/// Handler for `eth_getFilterChanges`
async fn filter_changes(
&self,
id: FilterId,
) -> RpcResult<FilterChanges<RpcTransaction<Eth::NetworkTypes>>> {
trace!(target: "rpc::eth", "Serving eth_getFilterChanges");
self.filter.filter_changes(id).await.map_err(ErrorObject::from)
}
/// Returns an array of all logs matching filter with given id.
///
/// Returns an error if no matching log filter exists.
///
/// Handler for `eth_getFilterLogs`
async fn filter_logs(&self, id: FilterId) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getFilterLogs");
self.filter.filter_logs(id).await.map_err(ErrorObject::from)
}
/// Handler for `eth_uninstallFilter`
async fn uninstall_filter(&self, id: FilterId) -> RpcResult<bool> {
trace!(target: "rpc::eth", "Serving eth_uninstallFilter");
self.filter.uninstall_filter(id).await
}
/// Returns logs matching given filter object.
///
/// Handler for `eth_getLogs`
async fn logs(&self, filter: Filter) -> RpcResult<Vec<Log>> {
trace!(target: "rpc::eth", "Serving eth_getLogs");
let mut logs = self.filter.logs(filter).await?;
let block_numbers: HashSet<_> = logs.iter().map(|log| log.block_number.unwrap()).collect();
info!("block_numbers: {:?}", block_numbers);
let system_tx_hashes: HashSet<_> = block_numbers
.into_iter()
.flat_map(|block_number| {
let block = self.provider.block_by_number(block_number).unwrap().unwrap();
let transactions = block.body.transactions().collect::<Vec<_>>();
transactions
.iter()
.filter(|tx| tx.is_system_transaction())
.map(|tx| *tx.tx_hash())
.collect::<Vec<_>>()
})
.collect();
logs.retain(|log| !system_tx_hashes.contains(&log.transaction_hash.unwrap()));
Ok(logs)
}
}
pub struct HlNodeFilterWs<Eth: EthWrapper> {
pubsub: Arc<EthPubSub<Eth>>,
provider: Arc<Eth::Provider>,
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
}
impl<Eth: EthWrapper> HlNodeFilterWs<Eth> {
pub fn new(
pubsub: Arc<EthPubSub<Eth>>,
provider: Arc<Eth::Provider>,
subscription_task_spawner: Box<dyn TaskSpawner + 'static>,
) -> Self {
Self { pubsub, provider, subscription_task_spawner }
}
}
#[async_trait]
impl<Eth: EthWrapper> EthPubSubApiServer<RpcTransaction<Eth::NetworkTypes>>
for HlNodeFilterWs<Eth>
{
/// Handler for `eth_subscribe`
async fn subscribe(
&self,
pending: PendingSubscriptionSink,
kind: SubscriptionKind,
params: Option<Params>,
) -> jsonrpsee::core::SubscriptionResult {
let sink = pending.accept().await?;
let pubsub = self.pubsub.clone();
let provider = self.provider.clone();
self.subscription_task_spawner.spawn(Box::pin(async move {
if kind == SubscriptionKind::Logs {
// if no params are provided, used default filter params
let filter = match params {
Some(Params::Logs(filter)) => *filter,
Some(Params::Bool(_)) => {
return;
}
_ => Default::default(),
};
let _ = pipe_from_stream(
sink,
pubsub
.log_stream(filter)
.filter(|log| not_from_system_tx::<Eth>(log, &provider)),
)
.await;
} else {
let _ = pubsub.handle_accepted(sink, kind, params).await;
};
}));
Ok(())
}
}
fn not_from_system_tx<Eth: EthWrapper>(log: &Log, provider: &Eth::Provider) -> bool {
let block = provider.block_by_number(log.block_number.unwrap()).unwrap().unwrap();
let transactions = block.body.transactions().collect::<Vec<_>>();
!transactions
.iter()
.filter(|tx| tx.is_system_transaction())
.map(|tx| *tx.tx_hash()).any(|tx_hash| tx_hash == log.transaction_hash.unwrap())
}
/// Helper to convert a serde error into an [`ErrorObject`]
#[derive(Debug, thiserror::Error)]
#[error("Failed to serialize subscription item: {0}")]
pub struct SubscriptionSerializeError(#[from] serde_json::Error);
impl SubscriptionSerializeError {
const fn new(err: serde_json::Error) -> Self {
Self(err)
}
}
impl From<SubscriptionSerializeError> for ErrorObject<'static> {
fn from(value: SubscriptionSerializeError) -> Self {
internal_rpc_err(value.to_string())
}
}
async fn pipe_from_stream<T, St>(
sink: SubscriptionSink,
mut stream: St,
) -> Result<(), ErrorObject<'static>>
where
St: Stream<Item = T> + Unpin,
T: Serialize,
{
loop {
tokio::select! {
_ = sink.closed() => {
// connection dropped
break Ok(())
},
maybe_item = stream.next() => {
let item = match maybe_item {
Some(item) => item,
None => {
// stream ended
break Ok(())
},
};
let msg = SubscriptionMessage::new(
sink.method_name(),
sink.subscription_id(),
&item
).map_err(SubscriptionSerializeError::new)?;
if sink.send(msg).await.is_err() {
break Ok(());
}
}
}
}
}
pub fn install_hl_node_compliance<Node, EthApi>(
ctx: RpcContext<Node, EthApi>,
) -> Result<(), eyre::Error>
where
Node: FullNodeComponents,
Node::Provider: BlockIdReader + BlockReader<Block = crate::HlBlock>,
EthApi: EthWrapper,
{
ctx.modules.replace_configured(
HlNodeFilterHttp::new(
Arc::new(ctx.registry.eth_handlers().filter.clone()),
Arc::new(ctx.registry.eth_api().provider().clone()),
)
.into_rpc(),
)?;
ctx.modules.replace_configured(
HlNodeFilterWs::new(
Arc::new(ctx.registry.eth_handlers().pubsub.clone()),
Arc::new(ctx.registry.eth_api().provider().clone()),
Box::new(ctx.node().task_executor().clone()),
)
.into_rpc(),
)?;
Ok(())
}

View File

@ -2,6 +2,7 @@ pub mod chainspec;
pub mod consensus; pub mod consensus;
mod evm; mod evm;
mod hardforks; mod hardforks;
pub mod hl_node_compliance;
pub mod node; pub mod node;
pub mod pseudo_peer; pub mod pseudo_peer;
pub mod tx_forwarder; pub mod tx_forwarder;

View File

@ -2,6 +2,7 @@ use clap::Parser;
use reth::builder::NodeHandle; use reth::builder::NodeHandle;
use reth_hl::{ use reth_hl::{
chainspec::parser::HlChainSpecParser, chainspec::parser::HlChainSpecParser,
hl_node_compliance::install_hl_node_compliance,
node::{ node::{
cli::{Cli, HlNodeArgs}, cli::{Cli, HlNodeArgs},
storage::tables::Tables, storage::tables::Tables,
@ -26,10 +27,11 @@ fn main() -> eyre::Result<()> {
Cli::<HlChainSpecParser, HlNodeArgs>::parse().run(|builder, ext| async move { Cli::<HlChainSpecParser, HlNodeArgs>::parse().run(|builder, ext| async move {
builder.builder.database.create_tables_for::<Tables>()?; builder.builder.database.create_tables_for::<Tables>()?;
let (node, engine_handle_tx) = HlNode::new(ext.block_source_args.parse().await?); let (node, engine_handle_tx) =
HlNode::new(ext.block_source_args.parse().await?, ext.hl_node_compliant);
let NodeHandle { node, node_exit_future: exit_future } = builder let NodeHandle { node, node_exit_future: exit_future } = builder
.node(node) .node(node)
.extend_rpc_modules(|ctx| { .extend_rpc_modules(move |ctx| {
let upstream_rpc_url = ext.upstream_rpc_url; let upstream_rpc_url = ext.upstream_rpc_url;
if let Some(upstream_rpc_url) = upstream_rpc_url { if let Some(upstream_rpc_url) = upstream_rpc_url {
ctx.modules.replace_configured( ctx.modules.replace_configured(
@ -38,6 +40,11 @@ fn main() -> eyre::Result<()> {
info!("Transaction forwarding enabled"); info!("Transaction forwarding enabled");
} }
if ext.hl_node_compliant {
install_hl_node_compliance(ctx)?;
}
Ok(()) Ok(())
}) })
.launch() .launch()

View File

@ -32,8 +32,18 @@ pub struct HlNodeArgs {
#[command(flatten)] #[command(flatten)]
pub block_source_args: BlockSourceArgs, pub block_source_args: BlockSourceArgs,
/// Upstream RPC URL to forward incoming transactions.
#[arg(long, env = "UPSTREAM_RPC_URL")] #[arg(long, env = "UPSTREAM_RPC_URL")]
pub upstream_rpc_url: Option<String>, pub upstream_rpc_url: Option<String>,
/// Enable hl-node compliant mode.
///
/// This option
/// 1. filters out system transactions from block transaction list.
/// 2. filters out logs that are not from the block's transactions.
/// 3. filters out logs and transactions from subscription.
#[arg(long, env = "HL_NODE_COMPLIANT")]
pub hl_node_compliant: bool,
} }
/// The main reth_hl cli interface. /// The main reth_hl cli interface.

View File

@ -51,14 +51,23 @@ pub struct HlNode {
engine_handle_rx: engine_handle_rx:
Arc<Mutex<Option<oneshot::Receiver<BeaconConsensusEngineHandle<HlPayloadTypes>>>>>, Arc<Mutex<Option<oneshot::Receiver<BeaconConsensusEngineHandle<HlPayloadTypes>>>>>,
block_source_config: BlockSourceConfig, block_source_config: BlockSourceConfig,
hl_node_compliant: bool,
} }
impl HlNode { impl HlNode {
pub fn new( pub fn new(
block_source_config: BlockSourceConfig, block_source_config: BlockSourceConfig,
hl_node_compliant: bool,
) -> (Self, oneshot::Sender<BeaconConsensusEngineHandle<HlPayloadTypes>>) { ) -> (Self, oneshot::Sender<BeaconConsensusEngineHandle<HlPayloadTypes>>) {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
(Self { engine_handle_rx: Arc::new(Mutex::new(Some(rx))), block_source_config }, tx) (
Self {
engine_handle_rx: Arc::new(Mutex::new(Some(rx))),
block_source_config,
hl_node_compliant,
},
tx,
)
} }
} }
@ -121,7 +130,12 @@ where
} }
fn add_ons(&self) -> Self::AddOns { fn add_ons(&self) -> Self::AddOns {
HlNodeAddOns::default() HlNodeAddOns::new(
HlEthApiBuilder { hl_node_compliant: self.hl_node_compliant },
Default::default(),
Default::default(),
Default::default(),
)
} }
} }

View File

@ -1,9 +1,12 @@
use std::{future::Future, sync::Arc};
use crate::{ use crate::{
chainspec::HlChainSpec, chainspec::HlChainSpec,
node::{ node::{
primitives::TransactionSigned, primitives::TransactionSigned,
rpc::{HlEthApi, HlNodeCore}, rpc::{HlEthApi, HlNodeCore},
}, },
HlBlock,
}; };
use alloy_consensus::{BlockHeader, ReceiptEnvelope, TxType}; use alloy_consensus::{BlockHeader, ReceiptEnvelope, TxType};
use alloy_primitives::B256; use alloy_primitives::B256;
@ -23,11 +26,11 @@ use reth::{
}; };
use reth_chainspec::{EthChainSpec, EthereumHardforks}; use reth_chainspec::{EthChainSpec, EthereumHardforks};
use reth_evm::{ConfigureEvm, NextBlockEnvAttributes}; use reth_evm::{ConfigureEvm, NextBlockEnvAttributes};
use reth_primitives::NodePrimitives; use reth_primitives::{NodePrimitives, SealedBlock};
use reth_primitives_traits::{BlockBody as _, SignedTransaction as _}; use reth_primitives_traits::{BlockBody as _, RecoveredBlock, SignedTransaction as _};
use reth_provider::{ use reth_provider::{
BlockReader, ChainSpecProvider, HeaderProvider, ProviderBlock, ProviderReceipt, ProviderTx, BlockIdReader, BlockReader, ChainSpecProvider, HeaderProvider, ProviderBlock, ProviderReceipt,
StateProviderFactory, ProviderTx, StateProviderFactory,
}; };
use reth_rpc_eth_api::{ use reth_rpc_eth_api::{
helpers::{EthBlocks, LoadBlock, LoadPendingBlock, LoadReceipt, SpawnBlocking}, helpers::{EthBlocks, LoadBlock, LoadPendingBlock, LoadReceipt, SpawnBlocking},
@ -35,6 +38,10 @@ use reth_rpc_eth_api::{
FromEthApiError, RpcConvert, RpcNodeCore, RpcNodeCoreExt, RpcReceipt, FromEthApiError, RpcConvert, RpcNodeCore, RpcNodeCoreExt, RpcReceipt,
}; };
fn is_system_tx(tx: &TransactionSigned) -> bool {
tx.is_system_transaction()
}
impl<N> EthBlocks for HlEthApi<N> impl<N> EthBlocks for HlEthApi<N>
where where
Self: LoadBlock< Self: LoadBlock<
@ -64,6 +71,7 @@ where
.transactions() .transactions()
.iter() .iter()
.zip(receipts.iter()) .zip(receipts.iter())
.filter(|(tx, _)| !is_system_tx(tx))
.enumerate() .enumerate()
.map(|(idx, (tx, receipt))| { .map(|(idx, (tx, receipt))| {
let meta = TransactionMeta { let meta = TransactionMeta {
@ -101,9 +109,70 @@ where
Pool: TransactionPool< Pool: TransactionPool<
Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>, Transaction: PoolTransaction<Consensus = ProviderTx<Self::Provider>>,
>, >,
>, > + RpcNodeCore<Provider: BlockReader<Block = crate::HlBlock>>,
N: HlNodeCore, N: HlNodeCore,
{ {
fn recovered_block(
&self,
block_id: BlockId,
) -> impl Future<
Output = Result<
Option<Arc<RecoveredBlock<<Self::Provider as BlockReader>::Block>>>,
Self::Error,
>,
> + Send {
let hl_node_compliant = self.hl_node_compliant;
async move {
// Copy of LoadBlock::recovered_block, but with --hl-node-compliant support
if block_id.is_pending() {
return Ok(None);
}
let block_hash = match self
.provider()
.block_hash_for_id(block_id)
.map_err(Self::Error::from_eth_err)?
{
Some(block_hash) => block_hash,
None => return Ok(None),
};
let recovered_block = self
.cache()
.get_recovered_block(block_hash)
.await
.map_err(Self::Error::from_eth_err)?;
if let Some(recovered_block) = recovered_block {
let recovered_block = if hl_node_compliant {
filter_if_hl_node_compliant(&recovered_block)
} else {
(*recovered_block).clone()
};
return Ok(Some(std::sync::Arc::new(recovered_block)));
}
Ok(None)
}
}
}
fn filter_if_hl_node_compliant(
recovered_block: &RecoveredBlock<HlBlock>,
) -> RecoveredBlock<HlBlock> {
let sealed_block = recovered_block.sealed_block();
let transactions = sealed_block.body().transactions();
let to_skip = transactions
.iter()
.position(|tx| !tx.is_system_transaction())
.unwrap_or(transactions.len());
let mut new_block: HlBlock = sealed_block.clone_block();
new_block.body.transactions.drain(..to_skip);
let new_sealed_block = SealedBlock::new_unchecked(new_block, sealed_block.hash());
let new_senders = recovered_block.senders()[to_skip..].to_vec();
RecoveredBlock::new_sealed(new_sealed_block, new_senders)
} }
impl<N> LoadPendingBlock for HlEthApi<N> impl<N> LoadPendingBlock for HlEthApi<N>

View File

@ -9,7 +9,7 @@ use reth::{
primitives::EthereumHardforks, primitives::EthereumHardforks,
providers::ChainSpecProvider, providers::ChainSpecProvider,
rpc::{ rpc::{
eth::{DevSigner, FullEthApiServer}, eth::{core::EthApiInner, DevSigner, FullEthApiServer},
server_types::eth::{EthApiError, EthStateCache, FeeHistoryCache, GasPriceOracle}, server_types::eth::{EthApiError, EthStateCache, FeeHistoryCache, GasPriceOracle},
}, },
tasks::{ tasks::{
@ -34,8 +34,6 @@ use reth_rpc_eth_api::{
}; };
use std::{fmt, sync::Arc}; use std::{fmt, sync::Arc};
use reth_optimism_rpc::eth::EthApiNodeBackend;
mod block; mod block;
mod call; mod call;
pub mod engine_api; pub mod engine_api;
@ -45,6 +43,14 @@ mod transaction;
pub trait HlNodeCore: RpcNodeCore<Provider: BlockReader> {} pub trait HlNodeCore: RpcNodeCore<Provider: BlockReader> {}
impl<T> HlNodeCore for T where T: RpcNodeCore<Provider: BlockReader> {} impl<T> HlNodeCore for T where T: RpcNodeCore<Provider: BlockReader> {}
/// Adapter for [`EthApiInner`], which holds all the data required to serve core `eth_` API.
pub type EthApiNodeBackend<N> = EthApiInner<
<N as RpcNodeCore>::Provider,
<N as RpcNodeCore>::Pool,
<N as RpcNodeCore>::Network,
<N as RpcNodeCore>::Evm,
>;
/// Container type `HlEthApi` /// Container type `HlEthApi`
#[allow(missing_debug_implementations)] #[allow(missing_debug_implementations)]
pub(crate) struct HlEthApiInner<N: HlNodeCore> { pub(crate) struct HlEthApiInner<N: HlNodeCore> {
@ -58,6 +64,8 @@ pub struct HlEthApi<N: HlNodeCore> {
pub(crate) inner: Arc<HlEthApiInner<N>>, pub(crate) inner: Arc<HlEthApiInner<N>>,
/// Converter for RPC types. /// Converter for RPC types.
tx_resp_builder: RpcConverter<Ethereum, N::Evm, EthApiError, ()>, tx_resp_builder: RpcConverter<Ethereum, N::Evm, EthApiError, ()>,
/// Whether the node is in HL node compliant mode.
pub(crate) hl_node_compliant: bool,
} }
impl<N: HlNodeCore> fmt::Debug for HlEthApi<N> { impl<N: HlNodeCore> fmt::Debug for HlEthApi<N> {
@ -252,7 +260,10 @@ where
/// Builds [`HlEthApi`] for HL. /// Builds [`HlEthApi`] for HL.
#[derive(Debug, Default)] #[derive(Debug, Default)]
#[non_exhaustive] #[non_exhaustive]
pub struct HlEthApiBuilder; pub struct HlEthApiBuilder {
/// Whether the node is in HL node compliant mode.
pub(crate) hl_node_compliant: bool,
}
impl<N> EthApiBuilder<N> for HlEthApiBuilder impl<N> EthApiBuilder<N> for HlEthApiBuilder
where where
@ -280,6 +291,7 @@ where
Ok(HlEthApi { Ok(HlEthApi {
inner: Arc::new(HlEthApiInner { eth_api }), inner: Arc::new(HlEthApiInner { eth_api }),
tx_resp_builder: Default::default(), tx_resp_builder: Default::default(),
hl_node_compliant: self.hl_node_compliant,
}) })
} }
} }