From 6dfcb7a85f68afbed18919b0da793a953ce36b98 Mon Sep 17 00:00:00 2001 From: int88 <106391185+int88@users.noreply.github.com> Date: Fri, 7 Feb 2025 19:35:14 +0800 Subject: [PATCH] fix: increment pending out on manually triggered connect (#14257) Co-authored-by: Matthias Seitz --- crates/net/network/src/peers.rs | 13 +++++++ crates/net/network/src/test_utils/testnet.rs | 20 ++++++++++ crates/net/network/tests/it/connect.rs | 40 +++++++++++++++++++- 3 files changed, 72 insertions(+), 1 deletion(-) diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers.rs index 85d3f0cdb..0f213649a 100644 --- a/crates/net/network/src/peers.rs +++ b/crates/net/network/src/peers.rs @@ -789,6 +789,8 @@ impl PeersManager { } /// Connects a peer and its address with the given kind. + /// + /// Note: This is invoked ond demand via an external command received by the manager pub(crate) fn add_and_connect_kind( &mut self, peer_id: PeerId, @@ -807,6 +809,7 @@ impl PeersManager { peer.state = PeerConnectionState::PendingOut; peer.fork_id = fork_id; entry.insert(peer); + self.connection_info.inc_pending_out(); self.queued_actions .push_back(PeerAction::Connect { peer_id, remote_addr: addr.tcp() }); } @@ -2786,4 +2789,14 @@ mod tests { peers.on_active_session_gracefully_closed(peer); assert_eq!(peers.connection_info.num_inbound, 0); } + + #[tokio::test] + async fn test_add_pending_onnect() { + let peer = PeerId::random(); + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); + let mut peers = PeersManager::default(); + peers.add_and_connect(peer, PeerAddr::from_tcp(socket_addr), None); + assert_eq!(peers.peers.get(&peer).unwrap().state, PeerConnectionState::PendingOut); + assert_eq!(peers.connection_info.num_pending_out, 1); + } } diff --git a/crates/net/network/src/test_utils/testnet.rs b/crates/net/network/src/test_utils/testnet.rs index 503bf6511..2c5b6f9eb 100644 --- a/crates/net/network/src/test_utils/testnet.rs +++ b/crates/net/network/src/test_utils/testnet.rs @@ -763,4 +763,24 @@ impl NetworkEventStream { _ => None, } } + + /// Awaits the next event for a peer added. + pub async fn peer_added(&mut self) -> Option { + let peer_id = match self.inner.next().await { + Some(NetworkEvent::Peer(PeerEvent::PeerAdded(peer_id))) => peer_id, + _ => return None, + }; + + Some(peer_id) + } + + /// Awaits the next event for a peer remvoed. + pub async fn peer_removed(&mut self) -> Option { + let peer_id = match self.inner.next().await { + Some(NetworkEvent::Peer(PeerEvent::PeerRemoved(peer_id))) => peer_id, + _ => return None, + }; + + Some(peer_id) + } } diff --git a/crates/net/network/tests/it/connect.rs b/crates/net/network/tests/it/connect.rs index b54f2400f..feac0cc51 100644 --- a/crates/net/network/tests/it/connect.rs +++ b/crates/net/network/tests/it/connect.rs @@ -4,7 +4,7 @@ use alloy_node_bindings::Geth; use alloy_primitives::map::HashSet; use alloy_provider::{ext::AdminApi, ProviderBuilder}; use futures::StreamExt; -use reth_chainspec::MAINNET; +use reth_chainspec::{MAINNET, SEPOLIA}; use reth_discv4::Discv4Config; use reth_eth_wire::{DisconnectReason, EthNetworkPrimitives, HeadersDirection}; use reth_net_banlist::BanList; @@ -856,3 +856,41 @@ async fn test_disconnect_then_connect() { let peer = listener0.next_session_established().await.unwrap(); assert_eq!(peer, *handle1.peer_id()); } + +#[tokio::test(flavor = "multi_thread")] +async fn test_connect_peer_in_different_network_should_fail() { + reth_tracing::init_test_tracing(); + + // peer in mainnet. + let peer = new_random_peer(10, vec![]).await; + let peer_handle = peer.handle().clone(); + tokio::task::spawn(peer); + + // peer in sepolia. + let secret_key = SecretKey::new(&mut rand::thread_rng()); + // If the remote disconnect first, then we would not get a fatal protocol error. So set + // max_backoff_count to 0 to speed up the removal of the peer. + let peers_config = PeersConfig::default().with_max_backoff_count(0); + let config = NetworkConfigBuilder::eth(secret_key) + .listener_port(0) + .disable_discovery() + .peer_config(peers_config) + .build_with_noop_provider(SEPOLIA.clone()); + + let network = NetworkManager::new(config).await.unwrap(); + let handle = network.handle().clone(); + tokio::task::spawn(network); + + // create networkeventstream to get the next session event easily. + let events = handle.event_listener(); + + let mut event_stream = NetworkEventStream::new(events); + + handle.add_peer(*peer_handle.peer_id(), peer_handle.local_addr()); + + let added_peer_id = event_stream.peer_added().await.unwrap(); + assert_eq!(added_peer_id, *peer_handle.peer_id()); + + let removed_peer_id = event_stream.peer_removed().await.unwrap(); + assert_eq!(removed_peer_id, *peer_handle.peer_id()); +}