feat: Pseudo peer and staged sync

For simplicity, we use with_pow() + pseudo peer that connects to reth itself, so that it can support 1. StateFetcher via NetworkState 2. Block announcement (which requires with_pow()).

For block announcement, another way was using ImportService like before, or calling engine_api. But for simplicitiy, for now we just publish from pseudo peer like pre-PoS, hence with_pow().
This commit is contained in:
sprites0
2025-06-27 19:38:53 +00:00
parent 2c6e989ad0
commit ba8dfc4d96
18 changed files with 2112 additions and 130 deletions

1126
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -94,6 +94,9 @@ tracing = "0.1"
rmp-serde = "1.0.0"
lz4_flex = "0.11.3"
ureq = "3.0.12"
aws-sdk-s3 = "1.93.0"
aws-config = "1.8.0"
rayon = "1.10.0"
[target.'cfg(unix)'.dependencies]
@ -138,4 +141,4 @@ client = [
"jsonrpsee/client",
"jsonrpsee/async-client",
"reth-rpc-eth-api/client",
]
]

View File

@ -3,4 +3,6 @@ pub mod consensus;
mod evm;
mod hardforks;
pub mod node;
pub mod pseudo_peer;
pub use node::primitives::{HlBlock, HlBlockBody, HlPrimitives};

View File

@ -1,8 +1,8 @@
use clap::{Args, Parser};
use clap::Parser;
use reth::builder::NodeHandle;
use reth_hl::{
chainspec::parser::HlChainSpecParser,
node::{cli::Cli, storage::tables::Tables, HlNode},
node::{cli::{Cli, HlNodeArgs}, storage::tables::Tables, HlNode},
};
// We use jemalloc for performance reasons
@ -10,11 +10,6 @@ use reth_hl::{
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
/// No Additional arguments
#[derive(Debug, Clone, Copy, Default, Args)]
#[non_exhaustive]
struct NoArgs;
fn main() -> eyre::Result<()> {
reth_cli_util::sigsegv_handler::install();
@ -23,9 +18,9 @@ fn main() -> eyre::Result<()> {
std::env::set_var("RUST_BACKTRACE", "1");
}
Cli::<HlChainSpecParser, NoArgs>::parse().run(|builder, _| async move {
Cli::<HlChainSpecParser, HlNodeArgs>::parse().run(|builder, ext| async move {
builder.builder.database.create_tables_for::<Tables>()?;
let (node, engine_handle_tx) = HlNode::new();
let (node, engine_handle_tx) = HlNode::new(ext.block_source_args.parse().await?);
let NodeHandle { node, node_exit_future: exit_future } =
builder.node(node).launch().await?;

View File

@ -3,8 +3,9 @@ use crate::{
node::{
consensus::HlConsensus, evm::config::HlEvmConfig, network::HlNetworkPrimitives, HlNode,
},
pseudo_peer::BlockSourceArgs,
};
use clap::Parser;
use clap::{Args, Parser};
use reth::{
args::LogArgs,
builder::{NodeBuilder, WithLaunchContext},
@ -15,7 +16,7 @@ use reth::{
};
use reth_chainspec::EthChainSpec;
use reth_cli::chainspec::ChainSpecParser;
use reth_cli_commands::{launcher::FnLauncher, node::NoArgs};
use reth_cli_commands::launcher::FnLauncher;
use reth_db::DatabaseEnv;
use reth_tracing::FileWorkerGuard;
use std::{
@ -25,12 +26,20 @@ use std::{
};
use tracing::info;
#[derive(Debug, Clone, Args)]
#[non_exhaustive]
pub struct HlNodeArgs {
#[command(flatten)]
pub block_source_args: BlockSourceArgs,
}
/// The main reth_hl cli interface.
///
/// This is the entrypoint to the executable.
#[derive(Debug, Parser)]
#[command(author, version = SHORT_VERSION, long_version = LONG_VERSION, about = "Reth", long_about = None)]
pub struct Cli<Spec: ChainSpecParser = HlChainSpecParser, Ext: clap::Args + fmt::Debug = NoArgs> {
pub struct Cli<Spec: ChainSpecParser = HlChainSpecParser, Ext: clap::Args + fmt::Debug = HlNodeArgs>
{
/// The command to run
#[command(subcommand)]
pub command: Commands<Spec, Ext>,

View File

@ -12,6 +12,7 @@ use crate::{
},
storage::HlStorage,
},
pseudo_peer::BlockSourceConfig,
};
use consensus::HlConsensusBuilder;
use engine::HlPayloadServiceBuilder;
@ -49,12 +50,15 @@ pub type HlNodeAddOns<N> =
pub struct HlNode {
engine_handle_rx:
Arc<Mutex<Option<oneshot::Receiver<BeaconConsensusEngineHandle<HlPayloadTypes>>>>>,
block_source_config: BlockSourceConfig,
}
impl HlNode {
pub fn new() -> (Self, oneshot::Sender<BeaconConsensusEngineHandle<HlPayloadTypes>>) {
pub fn new(
block_source_config: BlockSourceConfig,
) -> (Self, oneshot::Sender<BeaconConsensusEngineHandle<HlPayloadTypes>>) {
let (tx, rx) = oneshot::channel();
(Self { engine_handle_rx: Arc::new(Mutex::new(Some(rx))) }, tx)
(Self { engine_handle_rx: Arc::new(Mutex::new(Some(rx))), block_source_config }, tx)
}
}
@ -79,7 +83,10 @@ impl HlNode {
.pool(HlPoolBuilder)
.executor(HlExecutorBuilder::default())
.payload(HlPayloadServiceBuilder::default())
.network(HlNetworkBuilder { engine_handle_rx: self.engine_handle_rx.clone() })
.network(HlNetworkBuilder {
engine_handle_rx: self.engine_handle_rx.clone(),
block_source_config: self.block_source_config.clone(),
})
.consensus(HlConsensusBuilder::default())
}
}

View File

@ -8,6 +8,7 @@ use crate::{
types::ReadPrecompileCalls,
HlNode,
},
pseudo_peer::{start_pseudo_peer, BlockSourceConfig},
HlBlock,
};
use alloy_rlp::{Decodable, Encodable};
@ -149,6 +150,8 @@ pub type HlNetworkPrimitives =
pub struct HlNetworkBuilder {
pub(crate) engine_handle_rx:
Arc<Mutex<Option<oneshot::Receiver<BeaconConsensusEngineHandle<HlPayloadTypes>>>>>,
pub(crate) block_source_config: BlockSourceConfig,
}
impl HlNetworkBuilder {
@ -162,7 +165,7 @@ impl HlNetworkBuilder {
where
Node: FullNodeTypes<Types = HlNode>,
{
let Self { engine_handle_rx } = self;
let Self { engine_handle_rx, .. } = self;
let network_builder = ctx.network_config_builder()?;
@ -185,6 +188,8 @@ impl HlNetworkBuilder {
});
let network_builder = network_builder
.disable_dns_discovery()
.disable_nat()
.boot_nodes(boot_nodes())
.set_head(ctx.head())
.with_pow()
@ -216,10 +221,17 @@ where
ctx: &BuilderContext<Node>,
pool: Pool,
) -> eyre::Result<Self::Network> {
let block_source_config = self.block_source_config.clone();
let network_config = self.network_config(ctx)?;
let network = NetworkManager::builder(network_config).await?;
let handle = ctx.start_network(network, pool);
info!(target: "reth::cli", enode=%handle.local_node_record(), "P2P networking initialized");
let local_node_record = handle.local_node_record();
info!(target: "reth::cli", enode=%local_node_record, "P2P networking initialized");
ctx.task_executor().spawn_critical("pseudo peer", async move {
let block_source = block_source_config.create_cached_block_source().await;
start_pseudo_peer(local_node_record.to_string(), block_source).await.unwrap();
});
Ok(handle)
}

54
src/pseudo_peer/cli.rs Normal file
View File

@ -0,0 +1,54 @@
use super::config::BlockSourceConfig;
use clap::{Args, Parser};
use reth_node_core::args::LogArgs;
#[derive(Debug, Clone, Args)]
pub struct BlockSourceArgs {
/// Block source to use for the benchmark.
/// Example: s3://hl-mainnet-evm-blocks
/// Example: /home/user/personal/evm-blocks
///
/// For S3, you can use environment variables like AWS_PROFILE, etc.
#[arg(long)]
block_source: Option<String>,
/// Shorthand of --block-source=s3://hl-mainnet-evm-blocks
#[arg(long = "s3", default_value_t = false)]
s3: bool,
}
impl BlockSourceArgs {
pub async fn parse(&self) -> eyre::Result<BlockSourceConfig> {
if self.s3 {
return Ok(BlockSourceConfig::s3_default().await);
}
let Some(value) = self.block_source.as_ref() else {
return Err(eyre::eyre!(
"You need to specify a block source e.g., --s3 or --block-source=/path/to/blocks"
));
};
let config = if let Some(bucket) = value.strip_prefix("s3://") {
BlockSourceConfig::s3(bucket.to_string()).await
} else {
BlockSourceConfig::local(value.to_string())
};
Ok(config)
}
}
#[derive(Debug, Parser)]
pub struct PseudoPeerCommand {
#[command(flatten)]
pub logs: LogArgs,
#[command(flatten)]
pub source: BlockSourceArgs,
/// Destination peer to connect to.
/// Example: enode://412...1a@0.0.0.0:30304
#[arg(long)]
pub destination_peer: String,
}

56
src/pseudo_peer/config.rs Normal file
View File

@ -0,0 +1,56 @@
use aws_config::BehaviorVersion;
use super::{
consts::DEFAULT_S3_BUCKET,
sources::{BlockSourceBoxed, LocalBlockSource, S3BlockSource},
};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct BlockSourceConfig {
pub source_type: BlockSourceType,
}
#[derive(Debug, Clone)]
pub enum BlockSourceType {
S3 { bucket: String },
Local { path: String },
}
impl BlockSourceConfig {
pub async fn s3_default() -> Self {
Self { source_type: BlockSourceType::S3 { bucket: DEFAULT_S3_BUCKET.to_string() } }
}
pub async fn s3(bucket: String) -> Self {
Self { source_type: BlockSourceType::S3 { bucket } }
}
pub fn local(path: String) -> Self {
Self { source_type: BlockSourceType::Local { path } }
}
pub async fn create_block_source(&self) -> BlockSourceBoxed {
match &self.source_type {
BlockSourceType::S3 { bucket } => {
let client = aws_sdk_s3::Client::new(
&aws_config::defaults(BehaviorVersion::latest())
.region("ap-northeast-1")
.load()
.await,
);
let block_source = S3BlockSource::new(client, bucket.clone());
Arc::new(Box::new(block_source))
}
BlockSourceType::Local { path } => {
let block_source = LocalBlockSource::new(path.clone());
Arc::new(Box::new(block_source))
}
}
}
pub async fn create_cached_block_source(&self) -> BlockSourceBoxed {
let block_source = self.create_block_source().await;
Arc::new(Box::new(block_source))
}
}

View File

@ -0,0 +1,2 @@
pub const MAX_CONCURRENCY: usize = 100;
pub const DEFAULT_S3_BUCKET: &str = "hl-mainnet-evm-blocks";

36
src/pseudo_peer/error.rs Normal file
View File

@ -0,0 +1,36 @@
use thiserror::Error;
#[derive(Error, Debug)]
pub enum PseudoPeerError {
#[error("Block source error: {0}")]
BlockSource(String),
#[error("Network error: {0}")]
Network(#[from] reth_network::error::NetworkError),
#[error("Configuration error: {0}")]
Config(String),
#[error("AWS S3 error: {0}")]
S3(#[from] aws_sdk_s3::Error),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serialization(#[from] rmp_serde::encode::Error),
#[error("Deserialization error: {0}")]
Deserialization(#[from] rmp_serde::decode::Error),
#[error("Compression error: {0}")]
Compression(String),
}
impl From<eyre::Error> for PseudoPeerError {
fn from(err: eyre::Error) -> Self {
PseudoPeerError::Config(err.to_string())
}
}
pub type Result<T> = std::result::Result<T, PseudoPeerError>;

16
src/pseudo_peer/main.rs Normal file
View File

@ -0,0 +1,16 @@
use clap::Parser;
use reth_my_p2p::cli::PseudoPeerCommand;
#[tokio::main]
async fn main() -> eyre::Result<()> {
let cli = PseudoPeerCommand::parse();
cli.logs.init_tracing()?;
// Parse and create block source configuration
let block_source_config = cli.source.parse().await?;
let block_source = block_source_config.create_cached_block_source().await;
// Start the worker
reth_my_p2p::start_pseudo_peer(cli.destination_peer, block_source).await?;
Ok(())
}

89
src/pseudo_peer/mod.rs Normal file
View File

@ -0,0 +1,89 @@
//! A pseudo peer library that ingests multiple block sources to reth
//!
//! This library exposes `start_pseudo_peer` to support reth-side NetworkState/StateFetcher
//! to fetch blocks and feed it to its stages
pub mod cli;
pub mod config;
pub mod consts;
pub mod error;
pub mod network;
pub mod service;
pub mod sources;
pub mod utils;
pub use cli::*;
pub use config::*;
pub use error::*;
pub use network::*;
pub use service::*;
pub use sources::*;
#[cfg(test)]
mod tests;
use tokio::sync::mpsc;
use tracing::info;
/// Re-export commonly used types
pub mod prelude {
pub use super::{
config::BlockSourceConfig,
error::{PseudoPeerError, Result},
service::{BlockPoller, PseudoPeer},
sources::{BlockSource, CachedBlockSource, LocalBlockSource, S3BlockSource},
};
}
use reth_network::{NetworkEvent, NetworkEventListenerProvider};
/// Main function that starts the network manager and processes eth requests
pub async fn start_pseudo_peer(
destination_peer: String,
block_source: BlockSourceBoxed,
) -> eyre::Result<()> {
let blockhash_cache = new_blockhash_cache();
// Create network manager
let (mut network, start_tx) = create_network_manager::<BlockSourceBoxed>(
destination_peer,
block_source.clone(),
blockhash_cache.clone(),
)
.await?;
// Create the channels for receiving eth messages
let (eth_tx, mut eth_rx) = mpsc::channel(32);
let (transaction_tx, mut transaction_rx) = mpsc::unbounded_channel();
network.set_eth_request_handler(eth_tx);
network.set_transactions(transaction_tx);
let network_handle = network.handle().clone();
let mut network_events = network_handle.event_listener();
info!("Starting network manager...");
let mut service = PseudoPeer::new(block_source, blockhash_cache.clone());
tokio::spawn(network);
let mut first = true;
// Main event loop
loop {
tokio::select! {
Some(event) = tokio_stream::StreamExt::next(&mut network_events) => {
info!("Network event: {:?}", event);
if matches!(event, NetworkEvent::ActivePeerSession { .. }) && first {
start_tx.send(()).await?;
first = false;
}
}
_ = transaction_rx.recv() => {}
Some(eth_req) = eth_rx.recv() => {
service.process_eth_request(eth_req).await?;
info!("Processed eth request");
}
}
}
}

View File

@ -0,0 +1,91 @@
use super::service::{BlockHashCache, BlockPoller};
use crate::{
chainspec::{parser::chain_value_parser, HlChainSpec},
node::network::HlNetworkPrimitives,
HlPrimitives,
};
use reth_network::{
config::{rng_secret_key, SecretKey},
NetworkConfig, NetworkManager, PeersConfig,
};
use reth_network_peers::TrustedPeer;
use reth_provider::test_utils::NoopProvider;
use std::{str::FromStr, sync::Arc};
use tokio::sync::mpsc;
pub struct NetworkBuilder {
secret: SecretKey,
peer_config: PeersConfig,
boot_nodes: Vec<TrustedPeer>,
discovery_port: u16,
listener_port: u16,
}
impl Default for NetworkBuilder {
fn default() -> Self {
Self {
secret: rng_secret_key(),
peer_config: PeersConfig::default().with_max_outbound(1).with_max_inbound(1),
boot_nodes: vec![],
discovery_port: 0,
listener_port: 0,
}
}
}
impl NetworkBuilder {
pub fn with_secret(mut self, secret: SecretKey) -> Self {
self.secret = secret;
self
}
pub fn with_peer_config(mut self, peer_config: PeersConfig) -> Self {
self.peer_config = peer_config;
self
}
pub fn with_boot_nodes(mut self, boot_nodes: Vec<TrustedPeer>) -> Self {
self.boot_nodes = boot_nodes;
self
}
pub fn with_ports(mut self, discovery_port: u16, listener_port: u16) -> Self {
self.discovery_port = discovery_port;
self.listener_port = listener_port;
self
}
pub async fn build<BS>(
self,
block_source: Arc<Box<dyn super::sources::BlockSource>>,
blockhash_cache: BlockHashCache,
) -> eyre::Result<(NetworkManager<HlNetworkPrimitives>, mpsc::Sender<()>)> {
let builder = NetworkConfig::<(), HlNetworkPrimitives>::builder(self.secret)
.boot_nodes(self.boot_nodes)
.peer_config(self.peer_config)
.discovery_port(self.discovery_port)
.listener_port(self.listener_port);
let (block_poller, start_tx) = BlockPoller::new_suspended(block_source, blockhash_cache);
let config = builder.block_import(Box::new(block_poller)).build(Arc::new(NoopProvider::<
HlChainSpec,
HlPrimitives,
>::new(
chain_value_parser("mainnet").unwrap(),
)));
let network = NetworkManager::new(config).await.map_err(|e| eyre::eyre!(e))?;
Ok((network, start_tx))
}
}
pub async fn create_network_manager<BS>(
destination_peer: String,
block_source: Arc<Box<dyn super::sources::BlockSource>>,
blockhash_cache: BlockHashCache,
) -> eyre::Result<(NetworkManager<HlNetworkPrimitives>, mpsc::Sender<()>)> {
NetworkBuilder::default()
.with_boot_nodes(vec![TrustedPeer::from_str(&destination_peer).unwrap()])
.build::<BS>(block_source, blockhash_cache)
.await
}

373
src/pseudo_peer/service.rs Normal file
View File

@ -0,0 +1,373 @@
use super::{sources::BlockSource, utils::LruBiMap};
use crate::node::{
network::{HlNetworkPrimitives, HlNewBlock},
types::BlockAndReceipts,
};
use alloy_eips::HashOrNumber;
use alloy_primitives::{B256, U128};
use alloy_rpc_types::Block;
use futures::StreamExt as _;
use parking_lot::RwLock;
use rayon::prelude::*;
use reth_eth_wire::{
BlockBodies, BlockHeaders, GetBlockBodies, GetBlockHeaders, HeadersDirection, NewBlock,
};
use reth_network::{
eth_requests::IncomingEthRequest,
import::{BlockImport, BlockImportEvent, BlockValidation, NewBlockEvent},
message::NewBlockMessage,
};
use reth_network_peers::PeerId;
use std::{
collections::{HashMap, HashSet},
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
time::Duration,
};
use tokio::{sync::mpsc, task::JoinHandle};
use tracing::{debug, info};
/// A cache of block hashes to block numbers.
pub type BlockHashCache = Arc<RwLock<LruBiMap<B256, u64>>>;
const BLOCKHASH_CACHE_LIMIT: u32 = 1000000;
pub fn new_blockhash_cache() -> BlockHashCache {
Arc::new(RwLock::new(LruBiMap::new(BLOCKHASH_CACHE_LIMIT)))
}
/// A block poller that polls blocks from `BlockSource` and sends them to the `block_tx`
#[derive(Debug)]
pub struct BlockPoller {
block_rx: mpsc::Receiver<(u64, BlockAndReceipts)>,
task: JoinHandle<eyre::Result<()>>,
blockhash_cache: BlockHashCache,
}
impl BlockPoller {
const POLL_INTERVAL: Duration = Duration::from_millis(25);
pub fn new_suspended<BS: BlockSource>(
block_source: BS,
blockhash_cache: BlockHashCache,
) -> (Self, mpsc::Sender<()>) {
let block_source = Arc::new(block_source);
let (start_tx, start_rx) = mpsc::channel(1);
let (block_tx, block_rx) = mpsc::channel(100);
let block_tx_clone = block_tx.clone();
let task = tokio::spawn(Self::task(start_rx, block_source, block_tx_clone));
(Self { block_rx, task, blockhash_cache: blockhash_cache.clone() }, start_tx)
}
#[allow(unused)]
pub fn task_handle(&self) -> &JoinHandle<eyre::Result<()>> {
&self.task
}
async fn task<BS: BlockSource>(
mut start_rx: mpsc::Receiver<()>,
block_source: Arc<BS>,
block_tx_clone: mpsc::Sender<(u64, BlockAndReceipts)>,
) -> eyre::Result<()> {
start_rx.recv().await.ok_or(eyre::eyre!("Failed to receive start signal"))?;
info!("Starting block poller");
let latest_block_number = block_source
.find_latest_block_number()
.await
.ok_or(eyre::eyre!("Failed to find latest block number"))?;
let mut next_block_number = latest_block_number;
loop {
let Ok(block) = block_source.collect_block(next_block_number).await else {
tokio::time::sleep(Self::POLL_INTERVAL).await;
continue;
};
block_tx_clone.send((next_block_number, block)).await?;
next_block_number += 1;
}
}
}
impl BlockImport<HlNewBlock> for BlockPoller {
fn poll(&mut self, _cx: &mut Context<'_>) -> Poll<BlockImportEvent<HlNewBlock>> {
debug!("(receiver) Polling");
match Pin::new(&mut self.block_rx).poll_recv(_cx) {
Poll::Ready(Some((number, block))) => {
debug!("Polled block: {}", number);
let reth_block = block.to_reth_block();
let hash = reth_block.header.hash_slow();
self.blockhash_cache.write().insert(hash, number);
let td = U128::from(reth_block.header.difficulty);
Poll::Ready(BlockImportEvent::Announcement(BlockValidation::ValidHeader {
block: NewBlockMessage {
block: HlNewBlock(NewBlock { block: reth_block, td }).into(),
hash,
},
}))
}
Poll::Ready(None) => Poll::Pending,
Poll::Pending => Poll::Pending,
}
}
fn on_new_block(&mut self, _peer_id: PeerId, _incoming_block: NewBlockEvent<HlNewBlock>) {}
}
/// A pseudo peer that can process eth requests and feed blocks to reth
pub struct PseudoPeer<BS: BlockSource> {
block_source: BS,
blockhash_cache: BlockHashCache,
warm_cache_size: u64,
if_hit_then_warm_around: Arc<Mutex<HashSet<u64>>>,
/// This is used to avoid calling `find_latest_block_number` too often.
/// Only used for cache warmup.
known_latest_block_number: u64,
}
impl<BS: BlockSource> PseudoPeer<BS> {
pub fn new(block_source: BS, blockhash_cache: BlockHashCache) -> Self {
Self {
block_source,
blockhash_cache,
warm_cache_size: 1000, // reth default chunk size for GetBlockBodies
if_hit_then_warm_around: Arc::new(Mutex::new(HashSet::new())),
known_latest_block_number: 0,
}
}
async fn collect_block(&self, height: u64) -> eyre::Result<BlockAndReceipts> {
self.block_source.collect_block(height).await
}
async fn collect_blocks(
&self,
block_numbers: impl IntoIterator<Item = u64>,
) -> Vec<BlockAndReceipts> {
let block_numbers = block_numbers.into_iter().collect::<Vec<_>>();
let blocks = futures::stream::iter(block_numbers)
.map(async |number| self.collect_block(number).await.unwrap())
.buffered(self.block_source.recommended_chunk_size() as usize)
.collect::<Vec<_>>()
.await;
blocks
}
pub async fn process_eth_request(
&mut self,
eth_req: IncomingEthRequest<HlNetworkPrimitives>,
) -> eyre::Result<()> {
match eth_req {
IncomingEthRequest::GetBlockHeaders {
peer_id: _,
request: GetBlockHeaders { start_block, limit, skip, direction },
response,
} => {
info!(
"GetBlockHeaders request: {start_block:?}, {limit:?}, {skip:?}, {direction:?}"
);
let number = match start_block {
HashOrNumber::Hash(hash) => self.hash_to_block_number(hash).await,
HashOrNumber::Number(number) => number,
};
let block_headers = match direction {
HeadersDirection::Rising => self.collect_blocks(number..number + limit).await,
HeadersDirection::Falling => {
self.collect_blocks((number + 1 - limit..number + 1).rev()).await
}
}
.into_par_iter()
.map(|block| block.to_reth_block().header.clone())
.collect::<Vec<_>>();
let _ = response.send(Ok(BlockHeaders(block_headers)));
}
IncomingEthRequest::GetBlockBodies { peer_id: _, request, response } => {
let GetBlockBodies(hashes) = request;
info!("GetBlockBodies request: {}", hashes.len());
let mut numbers = Vec::new();
for hash in hashes {
numbers.push(self.hash_to_block_number(hash).await);
}
let block_bodies = self
.collect_blocks(numbers)
.await
.into_iter()
.map(|block| block.to_reth_block().body)
.collect::<Vec<_>>();
let _ = response.send(Ok(BlockBodies(block_bodies)));
}
IncomingEthRequest::GetNodeData { .. } => {
info!("GetNodeData request: {:?}", eth_req);
}
eth_req => {
info!("New eth protocol request: {:?}", eth_req);
}
}
Ok(())
}
async fn hash_to_block_number(&mut self, hash: B256) -> u64 {
// First, try to find the hash in our cache
if let Some(block_number) = self.try_get_cached_block_number(hash).await {
return block_number;
}
let latest = self.block_source.find_latest_block_number().await.unwrap();
self.known_latest_block_number = latest;
// These constants are quite arbitrary but works well in practice
const BACKFILL_RETRY_LIMIT: u64 = 10;
for _ in 0..BACKFILL_RETRY_LIMIT {
// If not found, backfill the cache and retry
if let Ok(Some(block_number)) = self.backfill_cache_for_hash(hash, latest).await {
return block_number;
}
}
panic!("Hash not found: {:?}", hash);
}
async fn fallback_to_official_rpc(&self, hash: B256) -> eyre::Result<u64> {
// This is tricky because Raw EVM files (BlockSource) does not have hash to number mapping
// so we can either enumerate all blocks to get hash to number mapping, or fallback to an
// official RPC. The latter is much easier but has 300/day rate limit.
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee_core::client::ClientT;
info!("Fallback to official RPC: {:?}", hash);
let client = HttpClientBuilder::default().build("https://rpc.hyperliquid.xyz/evm").unwrap();
let target_block: Block = client.request("eth_getBlockByHash", (hash, false)).await?;
info!("From official RPC: {:?} for {hash:?}", target_block.header.number);
self.cache_blocks([(hash, target_block.header.number)]);
Ok(target_block.header.number)
}
/// Try to get a block number from the cache for the given hash
async fn try_get_cached_block_number(&mut self, hash: B256) -> Option<u64> {
let maybe_block_number = self.blockhash_cache.read().get_by_left(&hash).copied();
if let Some(block_number) = maybe_block_number {
if self.if_hit_then_warm_around.lock().unwrap().contains(&block_number) {
self.warm_cache_around_blocks(block_number, self.warm_cache_size).await;
}
return Some(block_number);
}
None
}
/// Backfill the cache with blocks to find the target hash
async fn backfill_cache_for_hash(
&mut self,
target_hash: B256,
latest: u64,
) -> eyre::Result<Option<u64>> {
let chunk_size = self.block_source.recommended_chunk_size();
info!("Hash not found, backfilling... {:?}", target_hash);
const TRY_OFFICIAL_RPC_THRESHOLD: usize = 20;
for (iteration, end) in (1..=latest).rev().step_by(chunk_size as usize).enumerate() {
// Calculate the range to backfill
let start = std::cmp::max(end.saturating_sub(chunk_size), 1);
// Backfill this chunk
if let Ok(Some(block_number)) =
self.try_block_range_for_hash(start, end, target_hash).await
{
return Ok(Some(block_number));
}
// If not found, first fallback to an official RPC
if iteration >= TRY_OFFICIAL_RPC_THRESHOLD {
match self.fallback_to_official_rpc(target_hash).await {
Ok(block_number) => {
self.warm_cache_around_blocks(block_number, self.warm_cache_size).await;
return Ok(Some(block_number));
}
Err(e) => {
info!("Fallback to official RPC failed: {:?}", e);
}
}
}
}
info!("Hash not found: {:?}, retrying from the latest block...", target_hash);
Ok(None) // Not found
}
async fn warm_cache_around_blocks(&mut self, block_number: u64, chunk_size: u64) {
let start = std::cmp::max(block_number.saturating_sub(chunk_size), 1);
let end = std::cmp::min(block_number + chunk_size, self.known_latest_block_number);
self.if_hit_then_warm_around.lock().unwrap().insert(start);
self.if_hit_then_warm_around.lock().unwrap().insert(end);
const IMPOSSIBLE_HASH: B256 = B256::ZERO;
let _ = self.try_block_range_for_hash(start, end, IMPOSSIBLE_HASH).await;
}
/// Backfill a specific range of block numbers into the cache
async fn try_block_range_for_hash(
&mut self,
start_number: u64,
end_number: u64,
target_hash: B256,
) -> eyre::Result<Option<u64>> {
// Get block numbers that are already cached
let (cached_block_hashes, uncached_block_numbers) =
self.get_cached_block_hashes(start_number, end_number);
if let Some(&block_number) = cached_block_hashes.get(&target_hash) {
return Ok(Some(block_number));
}
if uncached_block_numbers.is_empty() {
info!("All blocks are cached, returning None");
return Ok(None);
}
info!("Backfilling from {} to {}", start_number, end_number);
// Collect blocks and cache them
let blocks = self.collect_blocks(uncached_block_numbers).await;
let block_map: HashMap<B256, u64> =
blocks.into_iter().map(|block| (block.hash(), block.number())).collect();
let maybe_block_number = block_map.get(&target_hash).copied();
self.cache_blocks(block_map);
Ok(maybe_block_number)
}
/// Get block numbers in the range that are already cached
fn get_cached_block_hashes(
&self,
start_number: u64,
end_number: u64,
) -> (HashMap<B256, u64>, Vec<u64>) {
let map = self.blockhash_cache.read();
let (cached, uncached): (Vec<u64>, Vec<u64>) =
(start_number..=end_number).partition(|number| map.get_by_right(number).is_some());
let cached_block_hashes = cached
.into_iter()
.filter_map(|number| map.get_by_right(&number).map(|&hash| (hash, number)))
.collect();
(cached_block_hashes, uncached)
}
/// Cache a collection of blocks in the hash-to-number mapping
fn cache_blocks(&self, blocks: impl IntoIterator<Item = (B256, u64)>) {
let mut map = self.blockhash_cache.write();
for (hash, number) in blocks {
map.insert(hash, number);
}
}
}

266
src/pseudo_peer/sources.rs Normal file
View File

@ -0,0 +1,266 @@
use crate::node::types::BlockAndReceipts;
use aws_sdk_s3::types::RequestPayer;
use eyre::Context;
use futures::{future::BoxFuture, FutureExt};
use reth_network::cache::LruMap;
use std::{
path::PathBuf,
sync::{Arc, RwLock},
};
use tracing::info;
pub trait BlockSource: Send + Sync + std::fmt::Debug + Unpin + 'static {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>>;
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>>;
fn recommended_chunk_size(&self) -> u64;
}
pub type BlockSourceBoxed = Arc<Box<dyn BlockSource>>;
fn name_with_largest_number(files: &[String], is_dir: bool) -> Option<(u64, String)> {
let mut files = files
.iter()
.filter_map(|file_raw| {
let file = file_raw.strip_suffix("/").unwrap_or(file_raw).split("/").last().unwrap();
let stem = if is_dir { file } else { file.strip_suffix(".rmp.lz4")? };
stem.parse::<u64>().ok().map(|number| (number, file_raw.to_string()))
})
.collect::<Vec<_>>();
if files.is_empty() {
return None;
}
files.sort_by_key(|(number, _)| *number);
files.last().cloned()
}
#[derive(Debug, Clone)]
pub struct S3BlockSource {
client: aws_sdk_s3::Client,
bucket: String,
}
impl S3BlockSource {
pub fn new(client: aws_sdk_s3::Client, bucket: String) -> Self {
Self { client, bucket }
}
async fn pick_path_with_highest_number(
client: aws_sdk_s3::Client,
bucket: String,
dir: String,
is_dir: bool,
) -> Option<(u64, String)> {
let request = client
.list_objects()
.bucket(&bucket)
.prefix(dir)
.delimiter("/")
.request_payer(RequestPayer::Requester);
let response = request.send().await.ok()?;
let files: Vec<String> = if is_dir {
response
.common_prefixes
.unwrap()
.iter()
.map(|object| object.prefix.as_ref().unwrap().to_string())
.collect()
} else {
response
.contents
.unwrap()
.iter()
.map(|object| object.key.as_ref().unwrap().to_string())
.collect()
};
name_with_largest_number(&files, is_dir)
}
}
impl BlockSource for S3BlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
let client = self.client.clone();
let bucket = self.bucket.clone();
async move {
let path = rmp_path(height);
let request = client
.get_object()
.request_payer(RequestPayer::Requester)
.bucket(&bucket)
.key(path);
let response = request.send().await?;
let bytes = response.body.collect().await?.into_bytes();
let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[..]);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
Ok(blocks[0].clone())
}
.boxed()
}
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
let client = self.client.clone();
let bucket = self.bucket.clone();
async move {
let (_, first_level) = Self::pick_path_with_highest_number(
client.clone(),
bucket.clone(),
"".to_string(),
true,
)
.await?;
let (_, second_level) = Self::pick_path_with_highest_number(
client.clone(),
bucket.clone(),
first_level,
true,
)
.await?;
let (block_number, third_level) = Self::pick_path_with_highest_number(
client.clone(),
bucket.clone(),
second_level,
false,
)
.await?;
info!("Latest block number: {} with path {}", block_number, third_level);
Some(block_number)
}
.boxed()
}
fn recommended_chunk_size(&self) -> u64 {
100
}
}
impl BlockSource for LocalBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
let dir = self.dir.clone();
async move {
let path = dir.join(rmp_path(height));
let file = tokio::fs::read(&path)
.await
.wrap_err_with(|| format!("Failed to read block from {path:?}"))?;
let mut decoder = lz4_flex::frame::FrameDecoder::new(&file[..]);
let blocks: Vec<BlockAndReceipts> = rmp_serde::from_read(&mut decoder)?;
Ok(blocks[0].clone())
}
.boxed()
}
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
let dir = self.dir.clone();
async move {
let (_, first_level) = Self::pick_path_with_highest_number(dir.clone(), true).await?;
let (_, second_level) =
Self::pick_path_with_highest_number(dir.join(first_level), true).await?;
let (block_number, third_level) =
Self::pick_path_with_highest_number(dir.join(second_level), false).await?;
info!("Latest block number: {} with path {}", block_number, third_level);
Some(block_number)
}
.boxed()
}
fn recommended_chunk_size(&self) -> u64 {
1000
}
}
#[derive(Debug, Clone)]
pub struct LocalBlockSource {
dir: PathBuf,
}
impl LocalBlockSource {
pub fn new(dir: impl Into<PathBuf>) -> Self {
Self { dir: dir.into() }
}
fn name_with_largest_number_static(files: &[String], is_dir: bool) -> Option<(u64, String)> {
let mut files = files
.iter()
.filter_map(|file_raw| {
let file = file_raw.strip_suffix("/").unwrap_or(file_raw);
let file = file.split("/").last().unwrap();
let stem = if is_dir { file } else { file.strip_suffix(".rmp.lz4")? };
stem.parse::<u64>().ok().map(|number| (number, file_raw.to_string()))
})
.collect::<Vec<_>>();
if files.is_empty() {
return None;
}
files.sort_by_key(|(number, _)| *number);
files.last().map(|(number, file)| (*number, file.to_string()))
}
async fn pick_path_with_highest_number(dir: PathBuf, is_dir: bool) -> Option<(u64, String)> {
let files = std::fs::read_dir(&dir).unwrap().collect::<Vec<_>>();
let files = files
.into_iter()
.filter(|path| path.as_ref().unwrap().path().is_dir() == is_dir)
.map(|entry| entry.unwrap().path().to_string_lossy().to_string())
.collect::<Vec<_>>();
Self::name_with_largest_number_static(&files, is_dir)
}
}
fn rmp_path(height: u64) -> String {
let f = ((height - 1) / 1_000_000) * 1_000_000;
let s = ((height - 1) / 1_000) * 1_000;
let path = format!("{f}/{s}/{height}.rmp.lz4");
path
}
impl BlockSource for BlockSourceBoxed {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
self.as_ref().collect_block(height)
}
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
self.as_ref().find_latest_block_number()
}
fn recommended_chunk_size(&self) -> u64 {
self.as_ref().recommended_chunk_size()
}
}
#[derive(Debug, Clone)]
pub struct CachedBlockSource {
block_source: BlockSourceBoxed,
cache: Arc<RwLock<LruMap<u64, BlockAndReceipts>>>,
}
impl CachedBlockSource {
const CACHE_LIMIT: u32 = 100000;
pub fn new(block_source: BlockSourceBoxed) -> Self {
Self { block_source, cache: Arc::new(RwLock::new(LruMap::new(Self::CACHE_LIMIT))) }
}
}
impl BlockSource for CachedBlockSource {
fn collect_block(&self, height: u64) -> BoxFuture<eyre::Result<BlockAndReceipts>> {
let block_source = self.block_source.clone();
let cache = self.cache.clone();
async move {
if let Some(block) = cache.write().unwrap().get(&height) {
return Ok(block.clone());
}
let block = block_source.collect_block(height).await?;
cache.write().unwrap().insert(height, block.clone());
Ok(block)
}
.boxed()
}
fn find_latest_block_number(&self) -> BoxFuture<Option<u64>> {
self.block_source.find_latest_block_number()
}
fn recommended_chunk_size(&self) -> u64 {
self.block_source.recommended_chunk_size()
}
}

26
src/pseudo_peer/tests.rs Normal file
View File

@ -0,0 +1,26 @@
use crate::pseudo_peer::{prelude::*, BlockSourceType};
#[tokio::test]
async fn test_block_source_config_s3() {
let config = BlockSourceConfig::s3("test-bucket".to_string()).await;
assert!(
matches!(config.source_type, BlockSourceType::S3 { bucket } if bucket == "test-bucket")
);
}
#[tokio::test]
async fn test_block_source_config_local() {
let config = BlockSourceConfig::local("/test/path".to_string());
assert!(matches!(config.source_type, BlockSourceType::Local { path } if path == "/test/path"));
}
#[test]
fn test_error_types() {
let io_error = std::io::Error::new(std::io::ErrorKind::NotFound, "File not found");
let benchmark_error: PseudoPeerError = io_error.into();
match benchmark_error {
PseudoPeerError::Io(_) => (),
_ => panic!("Expected Io error"),
}
}

43
src/pseudo_peer/utils.rs Normal file
View File

@ -0,0 +1,43 @@
use std::{collections::HashMap, fmt::Debug, hash::Hash};
use reth_network::cache::LruCache;
/// A naive implementation of a bi-directional LRU cache.
#[derive(Debug)]
pub struct LruBiMap<K: Hash + Eq + Clone + Debug, V: Hash + Eq + Clone + Debug> {
left_to_right: HashMap<K, V>,
right_to_left: HashMap<V, K>,
lru_keys: LruCache<K>,
}
impl<K: Hash + Eq + Clone + Debug, V: Hash + Eq + Clone + Debug> LruBiMap<K, V> {
pub fn new(limit: u32) -> Self {
Self {
left_to_right: HashMap::new(),
right_to_left: HashMap::new(),
lru_keys: LruCache::new(limit),
}
}
pub fn insert(&mut self, key: K, value: V) {
if let (true, Some(evicted)) = self.lru_keys.insert_and_get_evicted(key.clone()) {
self.evict(&evicted);
}
self.left_to_right.insert(key.clone(), value.clone());
self.right_to_left.insert(value.clone(), key.clone());
}
pub fn get_by_left(&self, key: &K) -> Option<&V> {
self.left_to_right.get(key)
}
pub fn get_by_right(&self, value: &V) -> Option<&K> {
self.right_to_left.get(value)
}
fn evict(&mut self, key: &K) {
if let Some(value) = self.left_to_right.remove(key) {
self.right_to_left.remove(&value);
}
}
}