diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 33997550e..7228e18ed 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -282,15 +282,23 @@ impl TestnetHandle { return } + let mut streams = Vec::with_capacity(self.peers.len()); + let mut num_sessions = Vec::with_capacity(self.peers.len()); for (idx, handle) in self.peers.iter().enumerate().take(self.peers.len() - 1) { - let mut events = NetworkEventStream::new(handle.event_listener()); + streams.push(NetworkEventStream::new(handle.event_listener())); + let mut num = 0; for idx in (idx + 1)..self.peers.len() { let neighbour = &self.peers[idx]; handle.network.add_peer(*neighbour.peer_id(), neighbour.local_addr()); - let connected = events.next_session_established().await.unwrap(); - assert_eq!(connected, *neighbour.peer_id()); + num += 1; } + num_sessions.push(num); } + let fut = streams + .into_iter() + .zip(num_sessions) + .map(|(mut stream, num)| async move { stream.take_session_established(num).await }); + futures::future::join_all(fut).await; } } @@ -566,6 +574,27 @@ impl NetworkEventStream { None } + /// Awaits the next `num` events for an established session + pub async fn take_session_established(&mut self, mut num: usize) -> Vec { + if num == 0 { + return Vec::new() + } + let mut peers = Vec::with_capacity(num); + while let Some(ev) = self.inner.next().await { + match ev { + NetworkEvent::SessionEstablished { peer_id, .. } => { + peers.push(peer_id); + num -= 1; + if num == 0 { + return peers + } + } + _ => continue, + } + } + peers + } + /// Ensures that the first two events are a [`NetworkEvent::PeerAdded`] and /// [`NetworkEvent::SessionEstablished`], returning the [`PeerId`] of the established /// session.