mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
Compare commits
25 Commits
361248ccf1
...
630d86a4ba
| Author | SHA1 | Date | |
|---|---|---|---|
| 630d86a4ba | |||
| 8632f9d74f | |||
| 738567894a | |||
| 6f8d089298 | |||
| 98846d2710 | |||
| a74fbc59fe | |||
| a032ffceb3 | |||
| e8be4c2e82 | |||
| 06f7710777 | |||
| 5c3828382c | |||
| fae4b4c8f9 | |||
| cdb0f9e8a2 | |||
| 5114f76517 | |||
| 3bdc491aba | |||
| 2a653857aa | |||
| 5f2955caa2 | |||
| 022dfd1546 | |||
| 249e3194dd | |||
| 530447e637 | |||
| 2ff606c32c | |||
| ec417f9bf4 | |||
| e2045a195c | |||
| ec281f9cc7 | |||
| 0d1c239a07 | |||
| 821846f671 |
19
Cargo.lock
generated
19
Cargo.lock
generated
@ -6659,6 +6659,7 @@ dependencies = [
|
||||
"clap",
|
||||
"eyre",
|
||||
"futures",
|
||||
"futures-util",
|
||||
"jsonrpsee",
|
||||
"jsonrpsee-core",
|
||||
"lz4_flex",
|
||||
@ -6687,6 +6688,7 @@ dependencies = [
|
||||
"reth-execution-types",
|
||||
"reth-exex",
|
||||
"reth-fs-util",
|
||||
"reth-hlfs",
|
||||
"reth-hyperliquid-types",
|
||||
"reth-network",
|
||||
"reth-network-api",
|
||||
@ -7966,14 +7968,30 @@ dependencies = [
|
||||
"thiserror 2.0.11",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reth-hlfs"
|
||||
version = "1.2.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"parking_lot",
|
||||
"rand 0.8.5",
|
||||
"reth-tracing",
|
||||
"tempfile",
|
||||
"thiserror 2.0.11",
|
||||
"tokio",
|
||||
"tracing",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reth-hyperliquid-types"
|
||||
version = "1.2.0"
|
||||
dependencies = [
|
||||
"alloy-primitives",
|
||||
"alloy-rlp",
|
||||
"clap",
|
||||
"parking_lot",
|
||||
"reth-cli-commands",
|
||||
"rmp-serde",
|
||||
"serde",
|
||||
"tokio",
|
||||
]
|
||||
@ -9059,6 +9077,7 @@ dependencies = [
|
||||
"reth-evm",
|
||||
"reth-execution-types",
|
||||
"reth-fs-util",
|
||||
"reth-hyperliquid-types",
|
||||
"reth-metrics",
|
||||
"reth-network-p2p",
|
||||
"reth-nippy-jar",
|
||||
|
||||
@ -54,6 +54,7 @@ members = [
|
||||
"crates/net/ecies/",
|
||||
"crates/net/eth-wire-types",
|
||||
"crates/net/eth-wire/",
|
||||
"crates/net/hlfs/",
|
||||
"crates/net/nat/",
|
||||
"crates/net/network-api/",
|
||||
"crates/net/network-types/",
|
||||
@ -356,6 +357,7 @@ reth-exex = { path = "crates/exex/exex" }
|
||||
reth-exex-test-utils = { path = "crates/exex/test-utils" }
|
||||
reth-exex-types = { path = "crates/exex/types" }
|
||||
reth-fs-util = { path = "crates/fs-util" }
|
||||
reth-hlfs = { path = "crates/net/hlfs" }
|
||||
reth-invalid-block-hooks = { path = "crates/engine/invalid-block-hooks" }
|
||||
reth-ipc = { path = "crates/rpc/ipc" }
|
||||
reth-libmdbx = { path = "crates/storage/libmdbx-rs" }
|
||||
|
||||
@ -35,6 +35,7 @@ reth-cli-runner.workspace = true
|
||||
reth-cli-commands.workspace = true
|
||||
reth-cli-util.workspace = true
|
||||
reth-consensus-common.workspace = true
|
||||
reth-hlfs.workspace = true
|
||||
reth-rpc-builder.workspace = true
|
||||
reth-rpc.workspace = true
|
||||
reth-rpc-types-compat.workspace = true
|
||||
@ -81,8 +82,9 @@ tracing.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
||||
# async
|
||||
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
|
||||
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread", "net", "fs"] }
|
||||
futures.workspace = true
|
||||
futures-util.workspace = true
|
||||
|
||||
# time
|
||||
time = { workspace = true }
|
||||
|
||||
@ -29,6 +29,7 @@ use tokio::sync::Mutex;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::serialized::{BlockAndReceipts, EvmBlock};
|
||||
use crate::share_blocks::ShareBlocks;
|
||||
use crate::spot_meta::erc20_contract_to_spot_token;
|
||||
|
||||
/// Poll interval when tailing an *open* hourly file.
|
||||
@ -41,6 +42,7 @@ pub(crate) struct BlockIngest {
|
||||
pub local_ingest_dir: Option<PathBuf>,
|
||||
pub local_blocks_cache: Arc<Mutex<BTreeMap<u64, BlockAndReceipts>>>, // height → block
|
||||
pub precompiles_cache: PrecompilesCache,
|
||||
pub hlfs: Option<ShareBlocks>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
@ -155,9 +157,9 @@ impl BlockIngest {
|
||||
if let Some(block) = self.try_collect_local_block(height).await {
|
||||
info!("Returning locally synced block for @ Height [{height}]");
|
||||
return Some(block);
|
||||
} else {
|
||||
self.try_collect_s3_block(height)
|
||||
}
|
||||
|
||||
self.try_collect_s3_block(height)
|
||||
}
|
||||
|
||||
pub(crate) fn try_collect_s3_block(&self, height: u64) -> Option<BlockAndReceipts> {
|
||||
@ -207,7 +209,7 @@ impl BlockIngest {
|
||||
let mut u_pre_cache = precompiles_cache.lock();
|
||||
for blk in new_blocks {
|
||||
let precompiles = PrecompileData {
|
||||
precompiles: blk.read_precompile_calls.clone(),
|
||||
precompiles: blk.read_precompile_calls.0.clone(),
|
||||
highest_precompile_address: blk.highest_precompile_address,
|
||||
};
|
||||
let h = match &blk.block {
|
||||
|
||||
@ -6,6 +6,7 @@ static ALLOC: reth_cli_util::allocator::Allocator = reth_cli_util::allocator::ne
|
||||
mod block_ingest;
|
||||
mod call_forwarder;
|
||||
mod serialized;
|
||||
mod share_blocks;
|
||||
mod spot_meta;
|
||||
mod tx_forwarder;
|
||||
|
||||
@ -18,6 +19,7 @@ use reth::cli::Cli;
|
||||
use reth_ethereum_cli::chainspec::EthereumChainSpecParser;
|
||||
use reth_hyperliquid_types::PrecompilesCache;
|
||||
use reth_node_ethereum::EthereumNode;
|
||||
use share_blocks::ShareBlocksArgs;
|
||||
use tokio::sync::Mutex;
|
||||
use tracing::info;
|
||||
use tx_forwarder::EthForwarderApiServer;
|
||||
@ -40,6 +42,10 @@ struct HyperliquidExtArgs {
|
||||
/// 3. filters out logs and transactions from subscription.
|
||||
#[arg(long, default_value = "false")]
|
||||
pub hl_node_compliant: bool,
|
||||
|
||||
/// Enable hlfs to backfill archive blocks
|
||||
#[command(flatten)]
|
||||
pub hlfs: ShareBlocksArgs,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
@ -85,8 +91,24 @@ fn main() {
|
||||
.launch()
|
||||
.await?;
|
||||
|
||||
let ingest =
|
||||
BlockIngest { ingest_dir, local_ingest_dir, local_blocks_cache, precompiles_cache };
|
||||
// start HLFS (serve + peer-backed backfill) using the node's network
|
||||
let hlfs = if ext_args.hlfs.share_blocks {
|
||||
let net = handle.node.network.clone();
|
||||
Some(
|
||||
crate::share_blocks::ShareBlocks::start_with_network(&ext_args.hlfs, net)
|
||||
.await?,
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let ingest = BlockIngest {
|
||||
ingest_dir,
|
||||
local_ingest_dir,
|
||||
local_blocks_cache,
|
||||
precompiles_cache,
|
||||
hlfs,
|
||||
};
|
||||
ingest.run(handle.node).await.unwrap();
|
||||
handle.node_exit_future.await
|
||||
},
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
use alloy_primitives::{Address, Log};
|
||||
use reth_hyperliquid_types::{ReadPrecompileInput, ReadPrecompileResult};
|
||||
use reth_hyperliquid_types::{ReadPrecompileInput, ReadPrecompileResult, ReadPrecompileCalls};
|
||||
use reth_primitives::{SealedBlock, Transaction};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@ -10,7 +10,7 @@ pub(crate) struct BlockAndReceipts {
|
||||
#[serde(default)]
|
||||
pub system_txs: Vec<SystemTx>,
|
||||
#[serde(default)]
|
||||
pub read_precompile_calls: Vec<(Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>)>,
|
||||
pub read_precompile_calls: ReadPrecompileCalls,
|
||||
pub highest_precompile_address: Option<Address>,
|
||||
}
|
||||
|
||||
|
||||
167
bin/reth/src/share_blocks.rs
Normal file
167
bin/reth/src/share_blocks.rs
Normal file
@ -0,0 +1,167 @@
|
||||
use clap::Args;
|
||||
use reth_hlfs::{Backfiller, Client, PeerRecord, Server, OP_REQ_MAX_BLOCK, OP_RES_MAX_BLOCK};
|
||||
use reth_network_api::{events::NetworkEvent, FullNetwork};
|
||||
use std::{
|
||||
collections::HashSet,
|
||||
net::{IpAddr, SocketAddr},
|
||||
path::PathBuf,
|
||||
sync::Arc,
|
||||
};
|
||||
use tokio::{
|
||||
task::JoinHandle,
|
||||
time::{sleep, timeout, Duration},
|
||||
};
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
// use futures_util::StreamExt;
|
||||
use futures_util::stream::StreamExt;
|
||||
|
||||
#[derive(Args, Clone, Debug)]
|
||||
pub(crate) struct ShareBlocksArgs {
|
||||
#[arg(long, default_value_t = false)]
|
||||
pub share_blocks: bool,
|
||||
#[arg(long, default_value = "0.0.0.0")]
|
||||
pub share_blocks_host: String,
|
||||
#[arg(long, default_value_t = 9595)]
|
||||
pub share_blocks_port: u16,
|
||||
#[arg(long, default_value = "evm-blocks")]
|
||||
pub archive_dir: PathBuf,
|
||||
}
|
||||
|
||||
pub(crate) struct ShareBlocks {
|
||||
_server: JoinHandle<()>,
|
||||
_autodetect: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl ShareBlocks {
|
||||
pub(crate) async fn start_with_network<Net>(
|
||||
args: &ShareBlocksArgs,
|
||||
network: Net,
|
||||
) -> eyre::Result<Self>
|
||||
where
|
||||
Net: FullNetwork + Clone + 'static,
|
||||
{
|
||||
let host: IpAddr = args
|
||||
.share_blocks_host
|
||||
.parse()
|
||||
.map_err(|e| eyre::eyre!("invalid --share-blocks-host: {e}"))?;
|
||||
let bind: SocketAddr = (host, args.share_blocks_port).into();
|
||||
|
||||
let srv = Server::new(bind, &args.archive_dir).with_limits(512, 50);
|
||||
let _server = tokio::spawn(async move {
|
||||
if let Err(e) = srv.run().await {
|
||||
warn!(error=%e, "hlfs: server exited");
|
||||
}
|
||||
});
|
||||
|
||||
let _autodetect =
|
||||
spawn_autodetect(network, host, args.share_blocks_port, args.archive_dir.clone());
|
||||
|
||||
info!(%bind, dir=%args.archive_dir.display(), "hlfs: enabled (reth peers)");
|
||||
Ok(Self { _server, _autodetect })
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_autodetect<Net>(
|
||||
network: Net,
|
||||
self_ip: IpAddr,
|
||||
hlfs_port: u16,
|
||||
archive_dir: PathBuf,
|
||||
) -> JoinHandle<()>
|
||||
where
|
||||
Net: FullNetwork + Clone + 'static,
|
||||
{
|
||||
let client = Client::new(&archive_dir, Vec::new()).with_timeout(Duration::from_secs(5));
|
||||
let backfiller = Arc::new(tokio::sync::Mutex::new(Backfiller::new(client, &archive_dir)));
|
||||
let good: Arc<tokio::sync::Mutex<HashSet<PeerRecord>>> =
|
||||
Arc::new(tokio::sync::Mutex::new(HashSet::new()));
|
||||
|
||||
tokio::spawn({
|
||||
warn!("hlfs: backfiller started");
|
||||
let backfiller = backfiller.clone();
|
||||
async move {
|
||||
loop {
|
||||
let mut bf = backfiller.lock().await;
|
||||
if bf.client.max_block < bf.max_block_seen {
|
||||
let block = bf.client.max_block + 1;
|
||||
let new_height = bf.fetch_if_missing(block).await.expect("new height");
|
||||
bf.client.max_block = new_height.unwrap();
|
||||
}
|
||||
sleep(Duration::from_millis(50)).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn({
|
||||
let backfiller = backfiller.clone();
|
||||
async move {
|
||||
let mut events = network.event_listener();
|
||||
loop {
|
||||
let mut bf = backfiller.lock().await;
|
||||
match events.next().await {
|
||||
Some(NetworkEvent::ActivePeerSession { info, .. }) => {
|
||||
let ip = info.remote_addr.ip();
|
||||
if ip.is_unspecified() {
|
||||
debug!(%ip, "hlfs: skip unspecified");
|
||||
continue;
|
||||
}
|
||||
if ip == self_ip {
|
||||
debug!(%ip, "hlfs: skip self");
|
||||
continue;
|
||||
}
|
||||
let addr = SocketAddr::new(info.remote_addr.ip(), hlfs_port);
|
||||
let max_block = probe_hlfs(addr).await;
|
||||
if max_block != 0 {
|
||||
let mut g = good.lock().await;
|
||||
if g.insert(PeerRecord { addr, max_block }) {
|
||||
let v: Vec<_> = g.iter().copied().collect();
|
||||
bf.set_peers(v.clone());
|
||||
info!(%addr, %max_block, total=v.len(), "hlfs: peer added");
|
||||
}
|
||||
} else {
|
||||
debug!(%addr, "hlfs: peer has no HLFS");
|
||||
}
|
||||
}
|
||||
Some(_) => {}
|
||||
None => {
|
||||
warn!("hlfs: network event stream ended");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) async fn probe_hlfs(addr: SocketAddr) -> u64 {
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
net::TcpStream,
|
||||
};
|
||||
|
||||
let fut = async {
|
||||
let mut s = TcpStream::connect(addr).await.ok()?;
|
||||
|
||||
// send [OP][8 zero bytes]
|
||||
let mut msg = [0u8; 9];
|
||||
msg[0] = OP_REQ_MAX_BLOCK;
|
||||
s.write_all(&msg).await.ok()?;
|
||||
|
||||
// read 1-byte opcode
|
||||
let mut op = [0u8; 1];
|
||||
s.read_exact(&mut op).await.ok()?;
|
||||
if op[0] != OP_RES_MAX_BLOCK {
|
||||
return None;
|
||||
}
|
||||
|
||||
// read 8-byte little-endian block number
|
||||
let mut blk = [0u8; 8];
|
||||
s.read_exact(&mut blk).await.ok()?;
|
||||
Some(u64::from_le_bytes(blk))
|
||||
};
|
||||
|
||||
match timeout(Duration::from_secs(2), fut).await {
|
||||
Ok(Some(n)) => n,
|
||||
_ => 0,
|
||||
}
|
||||
}
|
||||
@ -30,7 +30,6 @@ use reth_evm::{ConfigureEvm, ConfigureEvmEnv, EvmEnv, EvmFactory, NextBlockEnvAt
|
||||
use reth_hyperliquid_types::PrecompileData;
|
||||
use reth_hyperliquid_types::{PrecompilesCache, ReadPrecompileInput, ReadPrecompileResult};
|
||||
use reth_node_builder::HyperliquidSharedState;
|
||||
use reth_primitives::SealedBlock;
|
||||
use reth_primitives::TransactionSigned;
|
||||
use reth_revm::context::result::{EVMError, HaltReason};
|
||||
use reth_revm::handler::EthPrecompiles;
|
||||
@ -43,7 +42,6 @@ use reth_revm::{
|
||||
specification::hardfork::SpecId,
|
||||
};
|
||||
use reth_revm::{Context, Inspector, MainContext};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
@ -195,17 +193,6 @@ impl ConfigureEvmEnv for EthEvmConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub(crate) struct BlockAndReceipts {
|
||||
#[serde(default)]
|
||||
pub read_precompile_calls: Vec<(Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>)>,
|
||||
pub highest_precompile_address: Option<Address>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub(crate) enum EvmBlock {
|
||||
Reth115(SealedBlock),
|
||||
}
|
||||
|
||||
/// Custom EVM configuration.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
@ -215,16 +202,6 @@ pub struct HyperliquidEvmFactory {
|
||||
shared_state: Option<HyperliquidSharedState>,
|
||||
}
|
||||
|
||||
pub(crate) fn collect_s3_block(ingest_path: PathBuf, height: u64) -> Option<BlockAndReceipts> {
|
||||
let f = ((height - 1) / 1_000_000) * 1_000_000;
|
||||
let s = ((height - 1) / 1_000) * 1_000;
|
||||
let path = format!("{}/{f}/{s}/{height}.rmp.lz4", ingest_path.to_string_lossy());
|
||||
let file = std::fs::read(path).ok()?;
|
||||
let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]);
|
||||
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder).unwrap();
|
||||
Some(blocks[0].clone())
|
||||
}
|
||||
|
||||
pub(crate) fn get_locally_sourced_precompiles_for_height(
|
||||
precompiles_cache: PrecompilesCache,
|
||||
height: u64,
|
||||
@ -233,27 +210,6 @@ pub(crate) fn get_locally_sourced_precompiles_for_height(
|
||||
u_cache.remove(&height)
|
||||
}
|
||||
|
||||
pub(crate) fn collect_block(
|
||||
ingest_path: PathBuf,
|
||||
shared_state: Option<HyperliquidSharedState>,
|
||||
height: u64,
|
||||
) -> Option<BlockAndReceipts> {
|
||||
// Attempt to source precompile from the cache that is shared the binary level with the block
|
||||
// ingestor.
|
||||
if let Some(shared_state) = shared_state {
|
||||
if let Some(calls) =
|
||||
get_locally_sourced_precompiles_for_height(shared_state.precompiles_cache, height)
|
||||
{
|
||||
return Some(BlockAndReceipts {
|
||||
read_precompile_calls: calls.precompiles,
|
||||
highest_precompile_address: calls.highest_precompile_address,
|
||||
});
|
||||
}
|
||||
}
|
||||
// Fallback to s3 always
|
||||
collect_s3_block(ingest_path, height)
|
||||
}
|
||||
|
||||
const WARM_PRECOMPILES_BLOCK_NUMBER: u64 = 8_197_684;
|
||||
|
||||
impl EvmFactory<EvmEnv> for HyperliquidEvmFactory {
|
||||
@ -265,28 +221,33 @@ impl EvmFactory<EvmEnv> for HyperliquidEvmFactory {
|
||||
type Context<DB: Database> = EthEvmContext<DB>;
|
||||
|
||||
fn create_evm<DB: Database>(&self, db: DB, input: EvmEnv) -> Self::Evm<DB, NoOpInspector> {
|
||||
let block = collect_block(
|
||||
self.ingest_dir.clone().unwrap(),
|
||||
self.shared_state.clone(),
|
||||
input.block_env.number,
|
||||
)
|
||||
.expect("Failed to collect a submitted block. If sourcing locally, make sure your local hl-node is producing blocks.");
|
||||
let mut cache: HashMap<_, _> = block
|
||||
.read_precompile_calls
|
||||
.into_iter()
|
||||
.map(|(address, calls)| (address, HashMap::from_iter(calls.into_iter())))
|
||||
.collect();
|
||||
// Try to get precompile data from the shared state cache
|
||||
// This avoids the overhead of loading block data on every EVM creation
|
||||
let mut cache: HashMap<_, _> = HashMap::new();
|
||||
|
||||
if input.block_env.number >= WARM_PRECOMPILES_BLOCK_NUMBER {
|
||||
let highest_precompile_address = block
|
||||
.highest_precompile_address
|
||||
.unwrap_or(address!("0x000000000000000000000000000000000000080d"));
|
||||
for i in 0x800.. {
|
||||
let address = Address::from(U160::from(i));
|
||||
if address > highest_precompile_address {
|
||||
break;
|
||||
if let Some(ref shared_state) = self.shared_state {
|
||||
if let Some(precompile_data) = get_locally_sourced_precompiles_for_height(
|
||||
shared_state.precompiles_cache.clone(),
|
||||
input.block_env.number,
|
||||
) {
|
||||
cache = precompile_data
|
||||
.precompiles
|
||||
.into_iter()
|
||||
.map(|(address, calls)| (address, HashMap::from_iter(calls.into_iter())))
|
||||
.collect();
|
||||
|
||||
if input.block_env.number >= WARM_PRECOMPILES_BLOCK_NUMBER {
|
||||
let highest_precompile_address = precompile_data
|
||||
.highest_precompile_address
|
||||
.unwrap_or(address!("0x000000000000000000000000000000000000080d"));
|
||||
for i in 0x800.. {
|
||||
let address = Address::from(U160::from(i));
|
||||
if address > highest_precompile_address {
|
||||
break;
|
||||
}
|
||||
cache.entry(address).or_insert(HashMap::new());
|
||||
}
|
||||
}
|
||||
cache.entry(address).or_insert(HashMap::new());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -12,7 +12,9 @@ workspace = true
|
||||
|
||||
[dependencies]
|
||||
alloy-primitives.workspace = true
|
||||
alloy-rlp = { workspace = true }
|
||||
serde.workspace = true
|
||||
rmp-serde = "1.3"
|
||||
tokio = { workspace = true, features = ["sync", "macros", "time", "rt-multi-thread"] }
|
||||
parking_lot.workspace = true
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use alloy_primitives::{Address, Bytes};
|
||||
use alloy_rlp::{Decodable, Encodable};
|
||||
use parking_lot::Mutex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@ -10,7 +11,7 @@ pub struct ReadPrecompileInput {
|
||||
pub gas_limit: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum ReadPrecompileResult {
|
||||
Ok { gas_used: u64, bytes: Bytes },
|
||||
OutOfGas,
|
||||
@ -18,6 +19,59 @@ pub enum ReadPrecompileResult {
|
||||
UnexpectedError,
|
||||
}
|
||||
|
||||
/// ReadPrecompileCalls represents a collection of precompile calls with their results
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
pub struct ReadPrecompileCalls(pub Vec<(Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>)>);
|
||||
|
||||
impl ReadPrecompileCalls {
|
||||
/// Create an empty ReadPrecompileCalls
|
||||
pub fn new() -> Self {
|
||||
Self(Vec::new())
|
||||
}
|
||||
|
||||
/// Create from a vector of precompile calls
|
||||
pub fn from_vec(calls: Vec<(Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>)>) -> Self {
|
||||
Self(calls)
|
||||
}
|
||||
|
||||
/// Get the inner vector
|
||||
pub fn into_inner(self) -> Vec<(Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>)> {
|
||||
self.0
|
||||
}
|
||||
|
||||
/// Serialize to bytes using MessagePack for database storage
|
||||
pub fn to_db_bytes(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
|
||||
rmp_serde::to_vec(&self.0)
|
||||
}
|
||||
|
||||
/// Deserialize from bytes using MessagePack from database storage
|
||||
pub fn from_db_bytes(bytes: &[u8]) -> Result<Self, rmp_serde::decode::Error> {
|
||||
let data = rmp_serde::from_slice(bytes)?;
|
||||
Ok(Self(data))
|
||||
}
|
||||
}
|
||||
|
||||
impl Encodable for ReadPrecompileCalls {
|
||||
fn encode(&self, out: &mut dyn alloy_rlp::BufMut) {
|
||||
// Encode as MessagePack bytes wrapped in RLP
|
||||
let buf = self.to_db_bytes().unwrap_or_default();
|
||||
buf.encode(out);
|
||||
}
|
||||
|
||||
fn length(&self) -> usize {
|
||||
let buf = self.to_db_bytes().unwrap_or_default();
|
||||
buf.length()
|
||||
}
|
||||
}
|
||||
|
||||
impl Decodable for ReadPrecompileCalls {
|
||||
fn decode(buf: &mut &[u8]) -> alloy_rlp::Result<Self> {
|
||||
let bytes = Vec::<u8>::decode(buf)?;
|
||||
Self::from_db_bytes(&bytes)
|
||||
.map_err(|_| alloy_rlp::Error::Custom("Failed to decode ReadPrecompileCalls"))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct PrecompileData {
|
||||
pub precompiles: Vec<(Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>)>,
|
||||
|
||||
25
crates/net/hlfs/Cargo.toml
Normal file
25
crates/net/hlfs/Cargo.toml
Normal file
@ -0,0 +1,25 @@
|
||||
# crates/net/hlfs/Cargo.toml
|
||||
[package]
|
||||
name = "reth-hlfs"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
rust-version.workspace = true
|
||||
license.workspace = true
|
||||
homepage.workspace = true
|
||||
repository.workspace = true
|
||||
description = "Implementation of archive block downloader"
|
||||
authors = ["@wwwehr"]
|
||||
|
||||
|
||||
[dependencies]
|
||||
tokio = { workspace = true, features = ["rt-multi-thread","macros","net","time","fs","sync","io-util"] }
|
||||
bytes.workspace = true
|
||||
parking_lot.workspace = true
|
||||
thiserror.workspace = true
|
||||
tracing.workspace = true
|
||||
reth-tracing.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
rand.workspace = true
|
||||
tempfile.workspace = true
|
||||
|
||||
374
crates/net/hlfs/src/lib.rs
Normal file
374
crates/net/hlfs/src/lib.rs
Normal file
@ -0,0 +1,374 @@
|
||||
//! HLFS TCP micro-protocol for historical backfill (single-block, RR per block).
|
||||
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use parking_lot::Mutex;
|
||||
use reth_tracing::tracing::{debug, info, trace, warn};
|
||||
use std::{
|
||||
fs,
|
||||
hash::{Hash, Hasher},
|
||||
io,
|
||||
net::SocketAddr,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
net::{TcpListener, TcpStream},
|
||||
time::timeout,
|
||||
};
|
||||
|
||||
type Result<T, E = HlfsError> = std::result::Result<T, E>;
|
||||
|
||||
pub const OP_REQ_BLOCK: u8 = 0x01;
|
||||
pub const OP_RES_BLOCK: u8 = 0x02;
|
||||
pub const OP_REQ_MAX_BLOCK: u8 = 0x03;
|
||||
pub const OP_RES_MAX_BLOCK: u8 = 0x04;
|
||||
pub const OP_ERR_TOO_BUSY: u8 = 0x05;
|
||||
pub const OP_ERR_NOT_FOUND: u8 = 0x06;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum HlfsError {
|
||||
#[error("io: {0}")]
|
||||
Io(#[from] io::Error),
|
||||
#[error("proto")]
|
||||
Proto,
|
||||
#[error("no peers")]
|
||||
NoPeers,
|
||||
#[error("timeout")]
|
||||
Timeout,
|
||||
#[error("busy: retry_ms={0}")]
|
||||
Busy(u32),
|
||||
#[error("not found")]
|
||||
NotFound,
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn put_u64(b: &mut BytesMut, v: u64) {
|
||||
b.put_u64_le(v)
|
||||
}
|
||||
#[inline]
|
||||
fn put_u32(b: &mut BytesMut, v: u32) {
|
||||
b.put_u32_le(v)
|
||||
}
|
||||
|
||||
async fn ensure_parent_dirs(path: &str) -> std::io::Result<()> {
|
||||
if let Some(parent) = Path::new(path).parent() {
|
||||
fs::create_dir_all(parent)
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Client: tries each peer once; rotates starting index per call
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub struct PeerRecord {
|
||||
pub addr: SocketAddr,
|
||||
pub max_block: u64,
|
||||
}
|
||||
|
||||
impl PartialEq for PeerRecord {
|
||||
fn eq(&self, o: &Self) -> bool {
|
||||
self.addr == o.addr
|
||||
}
|
||||
}
|
||||
impl Eq for PeerRecord {}
|
||||
impl Hash for PeerRecord {
|
||||
fn hash<H: Hasher>(&self, s: &mut H) {
|
||||
self.addr.hash(s);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Client {
|
||||
root: PathBuf,
|
||||
pub peers: Arc<Mutex<Vec<PeerRecord>>>,
|
||||
timeout: Duration,
|
||||
pub max_block: u64,
|
||||
}
|
||||
impl Client {
|
||||
pub fn new(root: impl Into<PathBuf>, peers: Vec<PeerRecord>) -> Self {
|
||||
let root: PathBuf = root.into();
|
||||
let n = find_max_number_file(&root).unwrap();
|
||||
debug!(max_block = n, "hlfs: our archive");
|
||||
Self {
|
||||
root,
|
||||
peers: Arc::new(Mutex::new(peers)),
|
||||
timeout: Duration::from_secs(3),
|
||||
max_block: n,
|
||||
}
|
||||
}
|
||||
pub fn update_peers(&self, peers: Vec<PeerRecord>) {
|
||||
*self.peers.lock() = peers;
|
||||
}
|
||||
pub fn with_timeout(mut self, d: Duration) -> Self {
|
||||
self.timeout = d;
|
||||
self
|
||||
}
|
||||
pub async fn wants_block(&self, number: u64, rr_index: usize) -> Result<Vec<u8>, HlfsError> {
|
||||
let peers = self.peers.lock().clone();
|
||||
debug!(peer_count = peers.len(), "hlfs: peers");
|
||||
if peers.is_empty() {
|
||||
return Err(HlfsError::NoPeers);
|
||||
}
|
||||
|
||||
let mut all = (0..peers.len()).map(|i| (rr_index + i) % peers.len());
|
||||
let mut last_busy: Option<u32> = None;
|
||||
while let Some(i) = all.next() {
|
||||
let addr = peers[i];
|
||||
trace!(%addr.addr, "hlfs: dialing");
|
||||
match timeout(self.timeout, TcpStream::connect(addr.addr)).await {
|
||||
Err(_) => continue,
|
||||
Ok(Err(_)) => continue,
|
||||
Ok(Ok(mut sock)) => {
|
||||
let mut req = BytesMut::with_capacity(1 + 8);
|
||||
req.put_u8(OP_REQ_BLOCK);
|
||||
put_u64(&mut req, number);
|
||||
if let Err(e) = sock.write_all(&req).await {
|
||||
debug!(%addr.addr, "hlfs: write err: {e}");
|
||||
continue;
|
||||
}
|
||||
let mut op = [0u8; 1];
|
||||
if let Err(e) = timeout(self.timeout, sock.read_exact(&mut op)).await {
|
||||
debug!(%addr.addr, "hlfs: read op timeout {e:?}");
|
||||
continue;
|
||||
}
|
||||
let op = op[0];
|
||||
match op {
|
||||
OP_RES_BLOCK => {
|
||||
// DATA
|
||||
let mut len = [0u8; 4];
|
||||
sock.read_exact(&mut len).await?;
|
||||
let len = u32::from_le_bytes(len) as usize;
|
||||
let mut buf = vec![0u8; len];
|
||||
sock.read_exact(&mut buf).await?;
|
||||
return Ok(buf);
|
||||
}
|
||||
OP_ERR_TOO_BUSY => {
|
||||
let mut ms = [0u8; 4];
|
||||
sock.read_exact(&mut ms).await?;
|
||||
last_busy = Some(u32::from_le_bytes(ms));
|
||||
continue;
|
||||
}
|
||||
OP_ERR_NOT_FOUND => {
|
||||
return Err(HlfsError::NotFound);
|
||||
}
|
||||
_ => {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(ms) = last_busy {
|
||||
return Err(HlfsError::Busy(ms));
|
||||
}
|
||||
Err(HlfsError::NotFound)
|
||||
}
|
||||
}
|
||||
|
||||
fn find_max_number_file(root: &Path) -> Result<u64> {
|
||||
fn parse_num(name: &str) -> Option<u64> {
|
||||
name.strip_suffix(".rmp.lz4")?.parse::<u64>().ok()
|
||||
}
|
||||
|
||||
fn walk(dir: &Path, best: &mut Option<u64>) -> io::Result<()> {
|
||||
for entry in fs::read_dir(dir)? {
|
||||
let entry = entry?;
|
||||
let path = entry.path();
|
||||
let ft = entry.file_type()?;
|
||||
if ft.is_dir() {
|
||||
walk(&path, best)?;
|
||||
} else if ft.is_file() {
|
||||
if let Some(name) = path.file_name().and_then(|s| s.to_str()) {
|
||||
if let Some(n) = parse_num(name) {
|
||||
if best.map_or(true, |b| n > b) {
|
||||
*best = Some(n);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let mut best = Some(0);
|
||||
let top: PathBuf = fs::read_dir(root)?
|
||||
.filter_map(|e| e.ok())
|
||||
.filter(|e| e.file_type().map(|t| t.is_dir()).unwrap_or(false))
|
||||
.filter_map(|e| {
|
||||
let name = e.file_name();
|
||||
let s = name.to_str()?;
|
||||
let n: u64 = s.parse().ok()?;
|
||||
Some((n, e.path()))
|
||||
})
|
||||
.max_by_key(|(n, _)| *n)
|
||||
.map(|(_, p)| p)
|
||||
.ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "no numeric top-level dirs"))?;
|
||||
|
||||
walk(&top, &mut best)?;
|
||||
Ok(best.expect("cannot find block files"))
|
||||
}
|
||||
|
||||
/// Server: serves `{root}/{number}.rlp`.
|
||||
pub struct Server {
|
||||
bind: SocketAddr,
|
||||
root: PathBuf,
|
||||
max_conns: usize,
|
||||
inflight: Arc<Mutex<usize>>,
|
||||
busy_retry_ms: u32,
|
||||
max_block: u64,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub fn new(bind: SocketAddr, root: impl Into<PathBuf>) -> Self {
|
||||
let root: PathBuf = root.into();
|
||||
let n = find_max_number_file(&root).unwrap();
|
||||
Self {
|
||||
bind,
|
||||
root,
|
||||
max_conns: 512,
|
||||
inflight: Arc::new(Mutex::new(0)),
|
||||
busy_retry_ms: 100,
|
||||
max_block: n,
|
||||
}
|
||||
}
|
||||
pub fn with_limits(mut self, max_conns: usize, busy_retry_ms: u32) -> Self {
|
||||
self.max_conns = max_conns;
|
||||
self.busy_retry_ms = busy_retry_ms;
|
||||
self
|
||||
}
|
||||
pub async fn run(self) -> Result<(), HlfsError> {
|
||||
fs::create_dir_all(&self.root).ok();
|
||||
info!(%self.bind, root=%self.root.display(), max_conns=%self.max_conns, "hlfs: server listening");
|
||||
let lst = TcpListener::bind(self.bind).await?;
|
||||
loop {
|
||||
let (mut sock, addr) = lst.accept().await?;
|
||||
if *self.inflight.lock() >= self.max_conns {
|
||||
let mut b = BytesMut::with_capacity(5);
|
||||
b.put_u8(OP_ERR_TOO_BUSY);
|
||||
put_u32(&mut b, self.busy_retry_ms);
|
||||
let _ = sock.write_all(&b).await;
|
||||
continue;
|
||||
}
|
||||
*self.inflight.lock() += 1;
|
||||
let root = self.root.clone();
|
||||
let inflight = self.inflight.clone();
|
||||
let busy = self.busy_retry_ms;
|
||||
tokio::spawn(async move {
|
||||
let _ = handle_conn(&mut sock, &root, busy, addr, self.max_block).await;
|
||||
*inflight.lock() -= 1;
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
async fn handle_conn(
|
||||
sock: &mut TcpStream,
|
||||
root: &Path,
|
||||
busy_ms: u32,
|
||||
addr: SocketAddr,
|
||||
max_block: u64,
|
||||
) -> Result<(), HlfsError> {
|
||||
let mut op = [0u8; 1];
|
||||
sock.read_exact(&mut op).await?;
|
||||
if op[0] != OP_REQ_BLOCK && op[0] != OP_REQ_MAX_BLOCK {
|
||||
warn!(%addr, "hlfs: bad op");
|
||||
return Err(HlfsError::Proto);
|
||||
}
|
||||
|
||||
if op[0] == OP_REQ_MAX_BLOCK {
|
||||
let mut b = BytesMut::with_capacity(1 + 8);
|
||||
b.put_u8(OP_RES_MAX_BLOCK);
|
||||
put_u64(&mut b, max_block);
|
||||
let _ = sock.write_all(&b).await;
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut num = [0u8; 8];
|
||||
sock.read_exact(&mut num).await?;
|
||||
let number = u64::from_le_bytes(num);
|
||||
|
||||
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
|
||||
let f = (n / 1_000_000) * 1_000_000;
|
||||
let s = (n / 1_000) * 1_000;
|
||||
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", root.to_string_lossy());
|
||||
|
||||
trace!(%addr, number, %path, "hlfs: req");
|
||||
if let Err(e) = ensure_parent_dirs(&path).await {
|
||||
warn!(%addr, %path, "hlfs: mkdirs failed: {e}");
|
||||
}
|
||||
|
||||
match fs::read(&path) {
|
||||
Ok(data) => {
|
||||
let mut b = BytesMut::with_capacity(1 + 4 + data.len());
|
||||
b.put_u8(OP_RES_BLOCK);
|
||||
put_u32(&mut b, data.len() as u32);
|
||||
b.extend_from_slice(&data);
|
||||
let _ = sock.write_all(&b).await;
|
||||
}
|
||||
Err(e) if e.kind() == io::ErrorKind::NotFound => {
|
||||
let mut b = BytesMut::with_capacity(1);
|
||||
b.put_u8(OP_ERR_NOT_FOUND);
|
||||
let _ = sock.write_all(&b).await;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(%addr, %path, "hlfs: read error: {e}");
|
||||
let _ = sock.shutdown().await;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Backfiller: ask client per missing block; rotate peers every block.
|
||||
#[derive(Clone)]
|
||||
pub struct Backfiller {
|
||||
pub client: Client,
|
||||
root: PathBuf,
|
||||
pub max_block_seen: u64,
|
||||
}
|
||||
impl Backfiller {
|
||||
pub fn new(client: Client, root: impl Into<PathBuf>) -> Self {
|
||||
Self { client, root: root.into(), max_block_seen: 0 }
|
||||
}
|
||||
pub fn set_peers(&mut self, peers: Vec<PeerRecord>) {
|
||||
self.client.update_peers(peers);
|
||||
let _peers = self.client.peers.lock().clone();
|
||||
for p in _peers {
|
||||
if p.max_block > self.max_block_seen {
|
||||
self.max_block_seen = p.max_block
|
||||
}
|
||||
}
|
||||
}
|
||||
pub async fn fetch_if_missing(&mut self, number: u64) -> Result<Option<u64>, HlfsError> {
|
||||
let rr_index = number as usize;
|
||||
let n = number.saturating_sub(1); // 0 -> 0, others -> number-1
|
||||
let f = (n / 1_000_000) * 1_000_000;
|
||||
let s = (n / 1_000) * 1_000;
|
||||
|
||||
let path = format!("{}/{f}/{s}/{number}.rmp.lz4", self.root.to_string_lossy());
|
||||
if Path::new(&path).exists() {
|
||||
debug!(block = number, "hlfs: already have");
|
||||
return Ok(None);
|
||||
}
|
||||
match self.client.wants_block(number, rr_index).await {
|
||||
Err(HlfsError::NotFound) => Ok(None),
|
||||
Err(HlfsError::Busy(ms)) => {
|
||||
tokio::time::sleep(Duration::from_millis(ms as u64)).await;
|
||||
Ok(None)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
Ok(data) => {
|
||||
if let Err(e) = ensure_parent_dirs(&path).await {
|
||||
warn!(%path, "hlfs: mkdirs failed: {e}");
|
||||
}
|
||||
if let Err(e) = fs::write(&path, &data) {
|
||||
warn!(%path, "hlfs: write failed: {e}");
|
||||
return Ok(None);
|
||||
}
|
||||
debug!(block = number, "hlfs: got block");
|
||||
Ok(Some(number))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -525,6 +525,13 @@ tables! {
|
||||
type Key = ChainStateKey;
|
||||
type Value = BlockNumber;
|
||||
}
|
||||
|
||||
/// Stores precompile call data for each block.
|
||||
/// Maps block number to serialized ReadPrecompileCalls data.
|
||||
table BlockReadPrecompileCalls {
|
||||
type Key = BlockNumber;
|
||||
type Value = Vec<u8>;
|
||||
}
|
||||
}
|
||||
|
||||
/// Keys for the `ChainState` table.
|
||||
|
||||
@ -33,6 +33,7 @@ reth-codecs.workspace = true
|
||||
reth-evm.workspace = true
|
||||
reth-chain-state.workspace = true
|
||||
reth-node-types.workspace = true
|
||||
reth-hyperliquid-types.workspace = true
|
||||
|
||||
# ethereum
|
||||
alloy-eips.workspace = true
|
||||
|
||||
@ -50,6 +50,9 @@ mod metrics;
|
||||
mod chain;
|
||||
pub use chain::*;
|
||||
|
||||
mod precompile;
|
||||
pub use precompile::DatabasePrecompileCallsProvider;
|
||||
|
||||
/// A common provider that fetches data from a database or static file.
|
||||
///
|
||||
/// This provider implements most provider or provider factory traits.
|
||||
|
||||
68
crates/storage/provider/src/providers/database/precompile.rs
Normal file
68
crates/storage/provider/src/providers/database/precompile.rs
Normal file
@ -0,0 +1,68 @@
|
||||
//! Database provider implementation for precompile calls storage
|
||||
|
||||
use crate::traits::PrecompileCallsProvider;
|
||||
use alloy_primitives::BlockNumber;
|
||||
use reth_db::cursor::DbCursorRW;
|
||||
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
|
||||
use reth_hyperliquid_types::ReadPrecompileCalls;
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
|
||||
/// Implementation of PrecompileCallsProvider for database provider
|
||||
pub trait DatabasePrecompileCallsProvider: Send + Sync {
|
||||
/// Transaction type
|
||||
type Tx: DbTx;
|
||||
|
||||
/// Get a reference to the transaction
|
||||
fn tx_ref(&self) -> &Self::Tx;
|
||||
|
||||
/// Get a mutable reference to the transaction
|
||||
fn tx_mut(&mut self) -> &mut Self::Tx;
|
||||
}
|
||||
|
||||
impl<T> PrecompileCallsProvider for T
|
||||
where
|
||||
T: DatabasePrecompileCallsProvider,
|
||||
T::Tx: DbTx,
|
||||
{
|
||||
fn insert_block_precompile_calls(
|
||||
&self,
|
||||
block_number: BlockNumber,
|
||||
calls: ReadPrecompileCalls,
|
||||
) -> ProviderResult<()> {
|
||||
use reth_db_api::transaction::DbTxMut;
|
||||
|
||||
// For now, we'll store this as a placeholder - the actual implementation
|
||||
// will require mutable transaction access which needs to be added to the trait
|
||||
// This is a read-only implementation for now
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn block_precompile_calls(
|
||||
&self,
|
||||
block_number: BlockNumber,
|
||||
) -> ProviderResult<Option<ReadPrecompileCalls>> {
|
||||
use reth_db_api::tables::BlockReadPrecompileCalls;
|
||||
|
||||
let tx = self.tx_ref();
|
||||
|
||||
// Get from BlockReadPrecompileCalls table
|
||||
if let Some(bytes) = tx.get::<BlockReadPrecompileCalls>(block_number)? {
|
||||
let calls = ReadPrecompileCalls::from_db_bytes(&bytes)
|
||||
.map_err(|e| reth_storage_errors::provider::ProviderError::Database(
|
||||
reth_db_api::DatabaseError::Other(format!("Failed to deserialize precompile calls: {}", e))
|
||||
))?;
|
||||
Ok(Some(calls))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_block_precompile_calls_above(
|
||||
&self,
|
||||
block_number: BlockNumber,
|
||||
) -> ProviderResult<()> {
|
||||
// For now, this is a no-op as it requires mutable transaction access
|
||||
// The actual implementation will be added when we have mutable access
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -19,3 +19,6 @@ pub use static_file_provider::StaticFileProviderFactory;
|
||||
|
||||
mod full;
|
||||
pub use full::{FullProvider, FullRpcProvider};
|
||||
|
||||
mod precompile;
|
||||
pub use precompile::PrecompileCallsProvider;
|
||||
|
||||
27
crates/storage/provider/src/traits/precompile.rs
Normal file
27
crates/storage/provider/src/traits/precompile.rs
Normal file
@ -0,0 +1,27 @@
|
||||
//! Trait for storing and retrieving precompile call data
|
||||
|
||||
use alloy_primitives::BlockNumber;
|
||||
use reth_hyperliquid_types::ReadPrecompileCalls;
|
||||
use reth_storage_errors::provider::ProviderResult;
|
||||
|
||||
/// Provider trait for ReadPrecompileCalls storage operations
|
||||
pub trait PrecompileCallsProvider: Send + Sync {
|
||||
/// Insert ReadPrecompileCalls data for a block
|
||||
fn insert_block_precompile_calls(
|
||||
&self,
|
||||
block_number: BlockNumber,
|
||||
calls: ReadPrecompileCalls,
|
||||
) -> ProviderResult<()>;
|
||||
|
||||
/// Get ReadPrecompileCalls data for a block
|
||||
fn block_precompile_calls(
|
||||
&self,
|
||||
block_number: BlockNumber,
|
||||
) -> ProviderResult<Option<ReadPrecompileCalls>>;
|
||||
|
||||
/// Remove ReadPrecompileCalls data for blocks above a certain number
|
||||
fn remove_block_precompile_calls_above(
|
||||
&self,
|
||||
block_number: BlockNumber,
|
||||
) -> ProviderResult<()>;
|
||||
}
|
||||
Reference in New Issue
Block a user