feat: Support testnet sync

- Add testnet S3 bucket
- Use testnet RPC properly
- Use testnet chainspec on pseudo peer
This commit is contained in:
sprites0
2025-08-22 10:40:33 -04:00
parent 7daf203bc2
commit b6d5031865
7 changed files with 46 additions and 37 deletions

View File

@ -152,4 +152,12 @@ impl HlChainSpec {
_ => unreachable!("Unreachable since ChainSpecParser won't return other chains"), _ => unreachable!("Unreachable since ChainSpecParser won't return other chains"),
} }
} }
pub fn official_s3_bucket(self) -> &'static str {
match self.inner.chain().id() {
MAINNET_CHAIN_ID => "hl-mainnet-evm-blocks",
TESTNET_CHAIN_ID => "hl-testnet-evm-blocks",
_ => unreachable!("Unreachable since ChainSpecParser won't return other chains"),
}
}
} }

View File

@ -245,7 +245,7 @@ where
ctx.task_executor().spawn_critical("pseudo peer", async move { ctx.task_executor().spawn_critical("pseudo peer", async move {
let block_source = let block_source =
block_source_config.create_cached_block_source(next_block_number).await; block_source_config.create_cached_block_source((&*chain_spec).clone(), next_block_number).await;
start_pseudo_peer(chain_spec, local_node_record.to_string(), block_source) start_pseudo_peer(chain_spec, local_node_record.to_string(), block_source)
.await .await
.unwrap(); .unwrap();

View File

@ -7,7 +7,7 @@ use alloy_rlp::{Decodable, Encodable, RlpDecodable, RlpEncodable};
use bytes::BufMut; use bytes::BufMut;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::{chainspec::MAINNET_CHAIN_ID, HlBlock}; use crate::HlBlock;
pub type ReadPrecompileCall = (Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>); pub type ReadPrecompileCall = (Address, Vec<(ReadPrecompileInput, ReadPrecompileResult)>);
@ -50,13 +50,13 @@ pub struct BlockAndReceipts {
} }
impl BlockAndReceipts { impl BlockAndReceipts {
pub fn to_reth_block(self) -> HlBlock { pub fn to_reth_block(self, chain_id: u64) -> HlBlock {
let EvmBlock::Reth115(block) = self.block; let EvmBlock::Reth115(block) = self.block;
block.to_reth_block( block.to_reth_block(
self.read_precompile_calls.clone(), self.read_precompile_calls.clone(),
self.highest_precompile_address, self.highest_precompile_address,
self.system_txs.clone(), self.system_txs.clone(),
MAINNET_CHAIN_ID, chain_id,
) )
} }

View File

@ -1,8 +1,7 @@
use super::{ use crate::chainspec::HlChainSpec;
consts::DEFAULT_S3_BUCKET,
sources::{ use super::sources::{
BlockSourceBoxed, CachedBlockSource, HlNodeBlockSource, LocalBlockSource, S3BlockSource, BlockSourceBoxed, CachedBlockSource, HlNodeBlockSource, LocalBlockSource, S3BlockSource,
},
}; };
use aws_config::BehaviorVersion; use aws_config::BehaviorVersion;
use std::{env::home_dir, path::PathBuf, sync::Arc}; use std::{env::home_dir, path::PathBuf, sync::Arc};
@ -15,16 +14,14 @@ pub struct BlockSourceConfig {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum BlockSourceType { pub enum BlockSourceType {
S3Default,
S3 { bucket: String }, S3 { bucket: String },
Local { path: PathBuf }, Local { path: PathBuf },
} }
impl BlockSourceConfig { impl BlockSourceConfig {
pub async fn s3_default() -> Self { pub async fn s3_default() -> Self {
Self { Self { source_type: BlockSourceType::S3Default, block_source_from_node: None }
source_type: BlockSourceType::S3 { bucket: DEFAULT_S3_BUCKET.to_string() },
block_source_from_node: None,
}
} }
pub async fn s3(bucket: String) -> Self { pub async fn s3(bucket: String) -> Self {
@ -53,17 +50,10 @@ impl BlockSourceConfig {
self self
} }
pub async fn create_block_source(&self) -> BlockSourceBoxed { pub async fn create_block_source(&self, chain_spec: HlChainSpec) -> BlockSourceBoxed {
match &self.source_type { match &self.source_type {
BlockSourceType::S3 { bucket } => { BlockSourceType::S3Default => s3_block_source(chain_spec.official_s3_bucket()).await,
let client = aws_sdk_s3::Client::new( BlockSourceType::S3 { bucket } => s3_block_source(bucket).await,
&aws_config::defaults(BehaviorVersion::latest())
.region("ap-northeast-1")
.load()
.await,
);
Arc::new(Box::new(S3BlockSource::new(client, bucket.clone())))
}
BlockSourceType::Local { path } => { BlockSourceType::Local { path } => {
Arc::new(Box::new(LocalBlockSource::new(path.clone()))) Arc::new(Box::new(LocalBlockSource::new(path.clone())))
} }
@ -89,10 +79,21 @@ impl BlockSourceConfig {
)) ))
} }
pub async fn create_cached_block_source(&self, next_block_number: u64) -> BlockSourceBoxed { pub async fn create_cached_block_source(
let block_source = self.create_block_source().await; &self,
chain_spec: HlChainSpec,
next_block_number: u64,
) -> BlockSourceBoxed {
let block_source = self.create_block_source(chain_spec).await;
let block_source = let block_source =
self.create_block_source_from_node(next_block_number, block_source).await; self.create_block_source_from_node(next_block_number, block_source).await;
Arc::new(Box::new(CachedBlockSource::new(block_source))) Arc::new(Box::new(CachedBlockSource::new(block_source)))
} }
} }
async fn s3_block_source(bucket: impl AsRef<str>) -> BlockSourceBoxed {
let client = aws_sdk_s3::Client::new(
&aws_config::defaults(BehaviorVersion::latest()).region("ap-northeast-1").load().await,
);
Arc::new(Box::new(S3BlockSource::new(client, bucket.as_ref().to_string())))
}

View File

@ -1,2 +1 @@
pub const MAX_CONCURRENCY: usize = 100; pub const MAX_CONCURRENCY: usize = 100;
pub const DEFAULT_S3_BUCKET: &str = "hl-mainnet-evm-blocks";

View File

@ -1,9 +1,5 @@
use super::service::{BlockHashCache, BlockPoller}; use super::service::{BlockHashCache, BlockPoller};
use crate::{ use crate::{chainspec::HlChainSpec, node::network::HlNetworkPrimitives, HlPrimitives};
chainspec::{parser::chain_value_parser, HlChainSpec},
node::network::HlNetworkPrimitives,
HlPrimitives,
};
use reth_network::{ use reth_network::{
config::{rng_secret_key, SecretKey}, config::{rng_secret_key, SecretKey},
NetworkConfig, NetworkManager, PeersConfig, NetworkConfig, NetworkManager, PeersConfig,
@ -72,13 +68,15 @@ impl NetworkBuilder {
.peer_config(self.peer_config) .peer_config(self.peer_config)
.discovery_port(self.discovery_port) .discovery_port(self.discovery_port)
.listener_port(self.listener_port); .listener_port(self.listener_port);
let chain_id = self.chain_spec.inner.chain().id();
let (block_poller, start_tx) = BlockPoller::new_suspended(block_source, blockhash_cache); let (block_poller, start_tx) =
BlockPoller::new_suspended(chain_id, block_source, blockhash_cache);
let config = builder.block_import(Box::new(block_poller)).build(Arc::new(NoopProvider::< let config = builder.block_import(Box::new(block_poller)).build(Arc::new(NoopProvider::<
HlChainSpec, HlChainSpec,
HlPrimitives, HlPrimitives,
>::new( >::new(
chain_value_parser("mainnet").unwrap(), self.chain_spec.into(),
))); )));
let network = NetworkManager::new(config).await.map_err(|e| eyre::eyre!(e))?; let network = NetworkManager::new(config).await.map_err(|e| eyre::eyre!(e))?;

View File

@ -42,6 +42,7 @@ pub fn new_blockhash_cache() -> BlockHashCache {
/// A block poller that polls blocks from `BlockSource` and sends them to the `block_tx` /// A block poller that polls blocks from `BlockSource` and sends them to the `block_tx`
#[derive(Debug)] #[derive(Debug)]
pub struct BlockPoller { pub struct BlockPoller {
chain_id: u64,
block_rx: mpsc::Receiver<(u64, BlockAndReceipts)>, block_rx: mpsc::Receiver<(u64, BlockAndReceipts)>,
task: JoinHandle<eyre::Result<()>>, task: JoinHandle<eyre::Result<()>>,
blockhash_cache: BlockHashCache, blockhash_cache: BlockHashCache,
@ -51,6 +52,7 @@ impl BlockPoller {
const POLL_INTERVAL: Duration = Duration::from_millis(25); const POLL_INTERVAL: Duration = Duration::from_millis(25);
pub fn new_suspended<BS: BlockSource>( pub fn new_suspended<BS: BlockSource>(
chain_id: u64,
block_source: BS, block_source: BS,
blockhash_cache: BlockHashCache, blockhash_cache: BlockHashCache,
) -> (Self, mpsc::Sender<()>) { ) -> (Self, mpsc::Sender<()>) {
@ -59,7 +61,7 @@ impl BlockPoller {
let (block_tx, block_rx) = mpsc::channel(100); let (block_tx, block_rx) = mpsc::channel(100);
let block_tx_clone = block_tx.clone(); let block_tx_clone = block_tx.clone();
let task = tokio::spawn(Self::task(start_rx, block_source, block_tx_clone)); let task = tokio::spawn(Self::task(start_rx, block_source, block_tx_clone));
(Self { block_rx, task, blockhash_cache: blockhash_cache.clone() }, start_tx) (Self { chain_id, block_rx, task, blockhash_cache: blockhash_cache.clone() }, start_tx)
} }
#[allow(unused)] #[allow(unused)]
@ -98,7 +100,7 @@ impl BlockImport<HlNewBlock> for BlockPoller {
match Pin::new(&mut self.block_rx).poll_recv(_cx) { match Pin::new(&mut self.block_rx).poll_recv(_cx) {
Poll::Ready(Some((number, block))) => { Poll::Ready(Some((number, block))) => {
debug!("Polled block: {}", number); debug!("Polled block: {}", number);
let reth_block = block.to_reth_block(); let reth_block = block.to_reth_block(self.chain_id);
let hash = reth_block.header.hash_slow(); let hash = reth_block.header.hash_slow();
self.blockhash_cache.write().insert(hash, number); self.blockhash_cache.write().insert(hash, number);
let td = U128::from(reth_block.header.difficulty); let td = U128::from(reth_block.header.difficulty);
@ -167,6 +169,7 @@ impl<BS: BlockSource> PseudoPeer<BS> {
&mut self, &mut self,
eth_req: IncomingEthRequest<HlNetworkPrimitives>, eth_req: IncomingEthRequest<HlNetworkPrimitives>,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
let chain_id = self.chain_spec.inner.chain().id();
match eth_req { match eth_req {
IncomingEthRequest::GetBlockHeaders { IncomingEthRequest::GetBlockHeaders {
peer_id: _, peer_id: _,
@ -189,7 +192,7 @@ impl<BS: BlockSource> PseudoPeer<BS> {
} }
} }
.into_par_iter() .into_par_iter()
.map(|block| block.to_reth_block().header.clone()) .map(|block| block.to_reth_block(chain_id).header.clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let _ = response.send(Ok(BlockHeaders(block_headers))); let _ = response.send(Ok(BlockHeaders(block_headers)));
@ -207,7 +210,7 @@ impl<BS: BlockSource> PseudoPeer<BS> {
.collect_blocks(numbers) .collect_blocks(numbers)
.await .await
.into_iter() .into_iter()
.map(|block| block.to_reth_block().body) .map(|block| block.to_reth_block(chain_id).body)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let _ = response.send(Ok(BlockBodies(block_bodies))); let _ = response.send(Ok(BlockBodies(block_bodies)));