diff --git a/Cargo.lock b/Cargo.lock index 0703dc6b5..01933c1dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3385,6 +3385,7 @@ dependencies = [ "reth-primitives", "reth-rlp", "reth-rlp-derive", + "reth-tracing", "reth-transaction-pool", "secp256k1", "thiserror", diff --git a/crates/net/network/Cargo.toml b/crates/net/network/Cargo.toml index 396c6421e..021ab4fe2 100644 --- a/crates/net/network/Cargo.toml +++ b/crates/net/network/Cargo.toml @@ -46,6 +46,7 @@ secp256k1 = { version = "0.24", features = [ [dev-dependencies] # reth reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } +reth-tracing = { path = "../../tracing" } rand = "0.8" diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index db9e84a84..af7a80709 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -7,42 +7,52 @@ use std::collections::HashSet; #[tokio::test(flavor = "multi_thread")] async fn test_establish_connections() { - let net = Testnet::create(3).await; + reth_tracing::init_tracing(); - net.for_each(|peer| assert_eq!(0, peer.num_peers())); + for _ in 0..10 { + let net = Testnet::create(3).await; - let mut handles = net.handles(); - let handle0 = handles.next().unwrap(); - let handle1 = handles.next().unwrap(); - let handle2 = handles.next().unwrap(); + net.for_each(|peer| assert_eq!(0, peer.num_peers())); - drop(handles); - let handle = net.spawn(); + let mut handles = net.handles(); + let handle0 = handles.next().unwrap(); + let handle1 = handles.next().unwrap(); + let handle2 = handles.next().unwrap(); - let listener0 = handle0.event_listener(); + drop(handles); + let handle = net.spawn(); - handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); - handle0.add_peer(*handle2.peer_id(), handle2.local_addr()); + let listener0 = handle0.event_listener(); - let mut expected_connections = HashSet::from([*handle1.peer_id(), *handle2.peer_id()]); + let mut listener1 = handle1.event_listener(); + let mut listener2 = handle2.event_listener(); - let mut established = listener0.take(2); - while let Some(ev) = established.next().await { - match ev { - NetworkEvent::SessionClosed { .. } => { - panic!("unexpected event") - } - NetworkEvent::SessionEstablished { peer_id, .. } => { - assert!(expected_connections.remove(&peer_id)) + handle0.add_peer(*handle1.peer_id(), handle1.local_addr()); + handle0.add_peer(*handle2.peer_id(), handle2.local_addr()); + + let mut expected_connections = HashSet::from([*handle1.peer_id(), *handle2.peer_id()]); + + // wait for all initiator connections + let mut established = listener0.take(2); + while let Some(ev) = established.next().await { + match ev { + NetworkEvent::SessionClosed { .. } => { + panic!("unexpected event") + } + NetworkEvent::SessionEstablished { peer_id, .. } => { + assert!(expected_connections.remove(&peer_id)) + } } } + assert!(expected_connections.is_empty()); + + // also await the established session on both target + futures::future::join(listener1.next(), listener2.next()).await; + + let net = handle.terminate().await; + + assert_eq!(net.peers()[0].num_peers(), 2); + assert_eq!(net.peers()[1].num_peers(), 1); + assert_eq!(net.peers()[2].num_peers(), 1); } - - assert!(expected_connections.is_empty()); - - let net = handle.terminate().await; - - assert_eq!(net.peers()[0].num_peers(), 2); - assert_eq!(net.peers()[1].num_peers(), 1); - assert_eq!(net.peers()[2].num_peers(), 1); }