mirror of
https://github.com/hl-archive-node/nanoreth.git
synced 2025-12-06 10:59:55 +00:00
feat(network): move test helpers to test-utils mod (#901)
This commit is contained in:
@ -1,10 +1,9 @@
|
||||
//! Connection tests
|
||||
|
||||
use super::testnet::Testnet;
|
||||
use crate::{NetworkEventStream, PeerConfig};
|
||||
use enr::{k256::ecdsa::SigningKey, Enr, EnrPublicKey};
|
||||
use ethers_core::utils::Geth;
|
||||
|
||||
use ethers_providers::{Http, Middleware, Provider};
|
||||
|
||||
use futures::StreamExt;
|
||||
use reth_discv4::{bootnodes::mainnet_nodes, Discv4Config};
|
||||
use reth_eth_wire::DisconnectReason;
|
||||
@ -13,7 +12,12 @@ use reth_interfaces::{
|
||||
sync::{SyncState, SyncStateUpdater},
|
||||
};
|
||||
use reth_net_common::ban_list::BanList;
|
||||
use reth_network::{NetworkConfig, NetworkEvent, NetworkManager, PeersConfig};
|
||||
use reth_network::{
|
||||
test_utils::{
|
||||
enr_to_peer_id, unused_tcp_udp, NetworkEventStream, PeerConfig, Testnet, GETH_TIMEOUT,
|
||||
},
|
||||
NetworkConfig, NetworkEvent, NetworkManager, PeersConfig,
|
||||
};
|
||||
use reth_network_api::{NetworkInfo, PeersInfo};
|
||||
use reth_primitives::{HeadersDirection, NodeRecord, PeerId};
|
||||
use reth_provider::test_utils::NoopProvider;
|
||||
@ -22,17 +26,6 @@ use secp256k1::SecretKey;
|
||||
use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
|
||||
use tokio::task;
|
||||
|
||||
// The timeout for tests that create a GethInstance
|
||||
const GETH_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Obtains a PeerId from an ENR. In this case, the PeerId represents the public key contained in
|
||||
/// the ENR.
|
||||
fn enr_to_peer_id(enr: Enr<SigningKey>) -> PeerId {
|
||||
// In the following tests, methods which accept a public key expect it to contain the public
|
||||
// key in its 64-byte encoded (uncompressed) form.
|
||||
enr.public_key().encode_uncompressed().into()
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_establish_connections() {
|
||||
reth_tracing::init_test_tracing();
|
||||
@ -337,11 +330,10 @@ async fn test_incoming_node_id_blacklist() {
|
||||
let ban_list = BanList::new(vec![geth_peer_id], HashSet::new());
|
||||
let peer_config = PeersConfig::default().with_ban_list(ban_list);
|
||||
|
||||
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30303);
|
||||
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30304);
|
||||
let (reth_p2p, reth_disc) = unused_tcp_udp();
|
||||
let config = NetworkConfig::builder(Arc::new(NoopProvider::default()), secret_key)
|
||||
.listener_addr(reth_p2p_socket)
|
||||
.discovery_addr(reth_disc_socket)
|
||||
.listener_addr(reth_p2p)
|
||||
.discovery_addr(reth_disc)
|
||||
.peer_config(peer_config)
|
||||
.build();
|
||||
|
||||
@ -353,7 +345,7 @@ async fn test_incoming_node_id_blacklist() {
|
||||
tokio::task::spawn(network);
|
||||
|
||||
// make geth connect to us
|
||||
let our_enode = NodeRecord::new(reth_p2p_socket, *handle.peer_id());
|
||||
let our_enode = NodeRecord::new(reth_p2p, *handle.peer_id());
|
||||
|
||||
provider.add_peer(our_enode.to_string()).await.unwrap();
|
||||
|
||||
@ -387,11 +379,10 @@ async fn test_incoming_connect_with_single_geth() {
|
||||
// get the peer id we should be expecting
|
||||
let geth_peer_id = enr_to_peer_id(provider.node_info().await.unwrap().enr);
|
||||
|
||||
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30305);
|
||||
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30306);
|
||||
let (reth_p2p, reth_disc) = unused_tcp_udp();
|
||||
let config = NetworkConfig::builder(Arc::new(NoopProvider::default()), secret_key)
|
||||
.listener_addr(reth_p2p_socket)
|
||||
.discovery_addr(reth_disc_socket)
|
||||
.listener_addr(reth_p2p)
|
||||
.discovery_addr(reth_disc)
|
||||
.build();
|
||||
|
||||
let network = NetworkManager::new(config).await.unwrap();
|
||||
@ -403,7 +394,7 @@ async fn test_incoming_connect_with_single_geth() {
|
||||
let mut event_stream = NetworkEventStream::new(events);
|
||||
|
||||
// make geth connect to us
|
||||
let our_enode = NodeRecord::new(reth_p2p_socket, *handle.peer_id());
|
||||
let our_enode = NodeRecord::new(reth_p2p, *handle.peer_id());
|
||||
|
||||
provider.add_peer(our_enode.to_string()).await.unwrap();
|
||||
|
||||
@ -422,11 +413,10 @@ async fn test_outgoing_connect_with_single_geth() {
|
||||
tokio::time::timeout(GETH_TIMEOUT, async move {
|
||||
let secret_key = SecretKey::new(&mut rand::thread_rng());
|
||||
|
||||
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30307);
|
||||
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30308);
|
||||
let (reth_p2p, reth_disc) = unused_tcp_udp();
|
||||
let config = NetworkConfig::builder(Arc::new(NoopProvider::default()), secret_key)
|
||||
.listener_addr(reth_p2p_socket)
|
||||
.discovery_addr(reth_disc_socket)
|
||||
.listener_addr(reth_p2p)
|
||||
.discovery_addr(reth_disc)
|
||||
.build();
|
||||
let network = NetworkManager::new(config).await.unwrap();
|
||||
|
||||
@ -448,8 +438,7 @@ async fn test_outgoing_connect_with_single_geth() {
|
||||
let provider = Provider::<Http>::try_from(format!("http://{geth_endpoint}")).unwrap();
|
||||
|
||||
// get the peer id we should be expecting
|
||||
let geth_peer_id: PeerId =
|
||||
provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into();
|
||||
let geth_peer_id: PeerId = enr_to_peer_id(provider.node_info().await.unwrap().enr);
|
||||
|
||||
// add geth as a peer then wait for a `SessionEstablished` event
|
||||
handle.add_peer(geth_peer_id, geth_socket);
|
||||
@ -469,11 +458,10 @@ async fn test_geth_disconnect() {
|
||||
tokio::time::timeout(GETH_TIMEOUT, async move {
|
||||
let secret_key = SecretKey::new(&mut rand::thread_rng());
|
||||
|
||||
let reth_p2p_socket = SocketAddr::new([127, 0, 0, 1].into(), 30309);
|
||||
let reth_disc_socket = SocketAddr::new([127, 0, 0, 1].into(), 30310);
|
||||
let (reth_p2p, reth_disc) = unused_tcp_udp();
|
||||
let config = NetworkConfig::builder(Arc::new(NoopProvider::default()), secret_key)
|
||||
.listener_addr(reth_p2p_socket)
|
||||
.discovery_addr(reth_disc_socket)
|
||||
.listener_addr(reth_p2p)
|
||||
.discovery_addr(reth_disc)
|
||||
.build();
|
||||
let network = NetworkManager::new(config).await.unwrap();
|
||||
|
||||
@ -494,8 +482,7 @@ async fn test_geth_disconnect() {
|
||||
let provider = Provider::<Http>::try_from(format!("http://{geth_endpoint}")).unwrap();
|
||||
|
||||
// get the peer id we should be expecting
|
||||
let geth_peer_id: PeerId =
|
||||
provider.node_info().await.unwrap().enr.public_key().encode_uncompressed().into();
|
||||
let geth_peer_id: PeerId = enr_to_peer_id(provider.node_info().await.unwrap().enr);
|
||||
|
||||
// add geth as a peer then wait for `PeerAdded` and `SessionEstablished` events.
|
||||
handle.add_peer(geth_peer_id, geth_socket);
|
||||
|
||||
@ -1,6 +1,4 @@
|
||||
mod connect;
|
||||
mod requests;
|
||||
mod testnet;
|
||||
pub use testnet::*;
|
||||
|
||||
fn main() {}
|
||||
|
||||
@ -1,13 +1,12 @@
|
||||
//! Tests for eth related requests
|
||||
|
||||
use super::testnet::Testnet;
|
||||
use crate::NetworkEventStream;
|
||||
use rand::Rng;
|
||||
use reth_eth_wire::BlockBody;
|
||||
use reth_interfaces::p2p::{
|
||||
bodies::client::BodiesClient,
|
||||
headers::client::{HeadersClient, HeadersRequest},
|
||||
};
|
||||
use reth_network::test_utils::{NetworkEventStream, Testnet};
|
||||
use reth_network_api::NetworkInfo;
|
||||
use reth_primitives::{
|
||||
Block, Bytes, Header, HeadersDirection, Signature, Transaction, TransactionKind,
|
||||
|
||||
@ -1,321 +0,0 @@
|
||||
//! A network implementation for testing purposes.
|
||||
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use pin_project::pin_project;
|
||||
use reth_eth_wire::DisconnectReason;
|
||||
use reth_network::{
|
||||
error::NetworkError, eth_requests::EthRequestHandler, NetworkConfig, NetworkEvent,
|
||||
NetworkHandle, NetworkManager,
|
||||
};
|
||||
use reth_primitives::PeerId;
|
||||
use reth_provider::{test_utils::NoopProvider, BlockProvider, HeaderProvider};
|
||||
use secp256k1::SecretKey;
|
||||
use std::{
|
||||
fmt,
|
||||
future::Future,
|
||||
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
task::{Context, Poll},
|
||||
};
|
||||
use tokio::{
|
||||
sync::{mpsc::unbounded_channel, oneshot},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tokio_stream::wrappers::UnboundedReceiverStream;
|
||||
|
||||
/// A test network consisting of multiple peers.
|
||||
#[derive(Default)]
|
||||
pub struct Testnet<C> {
|
||||
/// All running peers in the network.
|
||||
peers: Vec<Peer<C>>,
|
||||
}
|
||||
|
||||
// === impl Testnet ===
|
||||
|
||||
impl<C> Testnet<C>
|
||||
where
|
||||
C: BlockProvider + HeaderProvider,
|
||||
{
|
||||
/// Same as [`Self::try_create_with`] but panics on error
|
||||
pub async fn create_with(num_peers: usize, provider: Arc<C>) -> Self {
|
||||
Self::try_create_with(num_peers, provider).await.unwrap()
|
||||
}
|
||||
|
||||
/// Creates a new [`Testnet`] with the given number of peers and the provider.
|
||||
pub async fn try_create_with(num_peers: usize, provider: Arc<C>) -> Result<Self, NetworkError> {
|
||||
let mut this = Self { peers: Vec::with_capacity(num_peers) };
|
||||
for _ in 0..num_peers {
|
||||
let config = PeerConfig::new(Arc::clone(&provider));
|
||||
this.add_peer_with_config(config).await?;
|
||||
}
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
pub fn peers_mut(&mut self) -> &mut [Peer<C>] {
|
||||
&mut self.peers
|
||||
}
|
||||
|
||||
pub fn peers(&self) -> &[Peer<C>] {
|
||||
&self.peers
|
||||
}
|
||||
|
||||
pub fn peers_iter_mut(&mut self) -> impl Iterator<Item = &mut Peer<C>> + '_ {
|
||||
self.peers.iter_mut()
|
||||
}
|
||||
|
||||
pub fn peers_iter(&self) -> impl Iterator<Item = &Peer<C>> + '_ {
|
||||
self.peers.iter()
|
||||
}
|
||||
|
||||
pub async fn extend_peer_with_config(
|
||||
&mut self,
|
||||
configs: impl IntoIterator<Item = PeerConfig<C>>,
|
||||
) -> Result<(), NetworkError> {
|
||||
for config in configs {
|
||||
self.add_peer_with_config(config).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add_peer_with_config(
|
||||
&mut self,
|
||||
config: PeerConfig<C>,
|
||||
) -> Result<(), NetworkError> {
|
||||
let PeerConfig { config, client, secret_key } = config;
|
||||
|
||||
let network = NetworkManager::new(config).await?;
|
||||
let peer = Peer { network, client, secret_key, request_handler: None };
|
||||
self.peers.push(peer);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Returns all handles to the networks
|
||||
pub fn handles(&self) -> impl Iterator<Item = NetworkHandle> + '_ {
|
||||
self.peers.iter().map(|p| p.handle())
|
||||
}
|
||||
|
||||
/// Apply a closure on each peer
|
||||
pub fn for_each<F>(&self, f: F)
|
||||
where
|
||||
F: Fn(&Peer<C>),
|
||||
{
|
||||
self.peers.iter().for_each(f)
|
||||
}
|
||||
|
||||
/// Apply a closure on each peer
|
||||
pub fn for_each_mut<F>(&mut self, f: F)
|
||||
where
|
||||
F: FnMut(&mut Peer<C>),
|
||||
{
|
||||
self.peers.iter_mut().for_each(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Testnet<C>
|
||||
where
|
||||
C: BlockProvider + HeaderProvider + 'static,
|
||||
{
|
||||
/// Spawns the testnet to a separate task
|
||||
pub fn spawn(self) -> TestnetHandle<C> {
|
||||
let (tx, rx) = oneshot::channel::<oneshot::Sender<Self>>();
|
||||
let mut net = self;
|
||||
let handle = tokio::task::spawn(async move {
|
||||
let mut tx = None;
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = &mut net => { break}
|
||||
inc = rx => {
|
||||
tx = inc.ok();
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(tx) = tx {
|
||||
let _ = tx.send(net);
|
||||
}
|
||||
});
|
||||
|
||||
TestnetHandle { _handle: handle, terminate: tx }
|
||||
}
|
||||
}
|
||||
|
||||
impl Testnet<NoopProvider> {
|
||||
/// Same as [`Self::try_create`] but panics on error
|
||||
pub async fn create(num_peers: usize) -> Self {
|
||||
Self::try_create(num_peers).await.unwrap()
|
||||
}
|
||||
|
||||
/// Creates a new [`Testnet`] with the given number of peers
|
||||
pub async fn try_create(num_peers: usize) -> Result<Self, NetworkError> {
|
||||
let mut this = Testnet::default();
|
||||
for _ in 0..num_peers {
|
||||
this.add_peer().await?;
|
||||
}
|
||||
Ok(this)
|
||||
}
|
||||
|
||||
pub async fn add_peer(&mut self) -> Result<(), NetworkError> {
|
||||
self.add_peer_with_config(Default::default()).await
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> fmt::Debug for Testnet<C> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Testnet {{}}").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Future for Testnet<C>
|
||||
where
|
||||
C: BlockProvider + HeaderProvider,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.get_mut();
|
||||
for peer in this.peers.iter_mut() {
|
||||
let _ = peer.poll_unpin(cx);
|
||||
}
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TestnetHandle<C> {
|
||||
_handle: JoinHandle<()>,
|
||||
terminate: oneshot::Sender<oneshot::Sender<Testnet<C>>>,
|
||||
}
|
||||
|
||||
// === impl TestnetHandle ===
|
||||
|
||||
impl<C> TestnetHandle<C> {
|
||||
/// Terminates the task and returns the [`Testnet`] back.
|
||||
pub async fn terminate(self) -> Testnet<C> {
|
||||
let (tx, rx) = oneshot::channel();
|
||||
self.terminate.send(tx).unwrap();
|
||||
rx.await.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
#[pin_project]
|
||||
pub struct Peer<C> {
|
||||
#[pin]
|
||||
network: NetworkManager<C>,
|
||||
#[pin]
|
||||
request_handler: Option<EthRequestHandler<C>>,
|
||||
client: Arc<C>,
|
||||
secret_key: SecretKey,
|
||||
}
|
||||
|
||||
// === impl Peer ===
|
||||
|
||||
impl<C> Peer<C>
|
||||
where
|
||||
C: BlockProvider + HeaderProvider,
|
||||
{
|
||||
pub fn num_peers(&self) -> usize {
|
||||
self.network.num_connected_peers()
|
||||
}
|
||||
|
||||
/// The address that listens for incoming connections.
|
||||
pub fn local_addr(&self) -> SocketAddr {
|
||||
self.network.local_addr()
|
||||
}
|
||||
|
||||
pub fn handle(&self) -> NetworkHandle {
|
||||
self.network.handle().clone()
|
||||
}
|
||||
|
||||
/// Set a new request handler that's connected tot the peer's network
|
||||
pub fn install_request_handler(&mut self) {
|
||||
let (tx, rx) = unbounded_channel();
|
||||
self.network.set_eth_request_handler(tx);
|
||||
let peers = self.network.peers_handle();
|
||||
let request_handler = EthRequestHandler::new(Arc::clone(&self.client), peers, rx);
|
||||
self.request_handler = Some(request_handler);
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Future for Peer<C>
|
||||
where
|
||||
C: BlockProvider + HeaderProvider,
|
||||
{
|
||||
type Output = ();
|
||||
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let this = self.project();
|
||||
|
||||
if let Some(request) = this.request_handler.as_pin_mut() {
|
||||
let _ = request.poll(cx);
|
||||
}
|
||||
|
||||
this.network.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct PeerConfig<C = NoopProvider> {
|
||||
config: NetworkConfig<C>,
|
||||
client: Arc<C>,
|
||||
secret_key: SecretKey,
|
||||
}
|
||||
|
||||
// === impl PeerConfig ===
|
||||
|
||||
impl<C> PeerConfig<C>
|
||||
where
|
||||
C: BlockProvider + HeaderProvider,
|
||||
{
|
||||
pub fn new(client: Arc<C>) -> Self {
|
||||
let secret_key = SecretKey::new(&mut rand::thread_rng());
|
||||
Self::with_secret_key(client, secret_key)
|
||||
}
|
||||
|
||||
pub fn with_secret_key(client: Arc<C>, secret_key: SecretKey) -> Self {
|
||||
let config = NetworkConfig::builder(Arc::clone(&client), secret_key)
|
||||
.listener_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
|
||||
.discovery_addr(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)))
|
||||
.build();
|
||||
Self { config, client, secret_key }
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for PeerConfig {
|
||||
fn default() -> Self {
|
||||
Self::new(Arc::new(NoopProvider::default()))
|
||||
}
|
||||
}
|
||||
|
||||
/// A helper type to await network events
|
||||
///
|
||||
/// This makes it easier to await established connections
|
||||
pub struct NetworkEventStream {
|
||||
inner: UnboundedReceiverStream<NetworkEvent>,
|
||||
}
|
||||
|
||||
// === impl NetworkEventStream ===
|
||||
|
||||
impl NetworkEventStream {
|
||||
pub fn new(inner: UnboundedReceiverStream<NetworkEvent>) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
|
||||
pub async fn next_session_closed(&mut self) -> Option<(PeerId, Option<DisconnectReason>)> {
|
||||
while let Some(ev) = self.inner.next().await {
|
||||
match ev {
|
||||
NetworkEvent::SessionClosed { peer_id, reason } => return Some((peer_id, reason)),
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
/// Awaits the next event for an established session
|
||||
pub async fn next_session_established(&mut self) -> Option<PeerId> {
|
||||
while let Some(ev) = self.inner.next().await {
|
||||
match ev {
|
||||
NetworkEvent::SessionEstablished { peer_id, .. } => return Some(peer_id),
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user