From b58cca7f9108dab01976ba499cf8e70ff2e0085d Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 25 Mar 2024 12:19:19 +0100 Subject: [PATCH] fix: race condition concurrent in and out from same peer (#7317) --- crates/net/network-api/src/reputation.rs | 7 ++- crates/net/network/src/peers/manager.rs | 59 +++++++++++++++++++++++- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/crates/net/network-api/src/reputation.rs b/crates/net/network-api/src/reputation.rs index 1056192ee..718e0e9ba 100644 --- a/crates/net/network-api/src/reputation.rs +++ b/crates/net/network-api/src/reputation.rs @@ -40,8 +40,13 @@ pub enum ReputationChangeKind { } impl ReputationChangeKind { - /// Returns true if the reputation change is a reset. + /// Returns true if the reputation change is a [ReputationChangeKind::Reset]. pub fn is_reset(&self) -> bool { matches!(self, Self::Reset) } + + /// Returns true if the reputation change is [ReputationChangeKind::Dropped]. + pub fn is_dropped(&self) -> bool { + matches!(self, Self::Dropped) + } } diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 3245d65c7..cbf0ddc5f 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -312,6 +312,12 @@ impl PeersManager { self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id }); return } + // it might be the case that we're also trying to connect to this peer at the same + // time, so we need to adjust the state here + if peer.state.is_pending_out() { + self.connection_info.decr_state(peer.state); + } + peer.state = PeerConnectionState::In; is_trusted = is_trusted || peer.is_trusted(); @@ -533,6 +539,16 @@ impl PeersManager { peer_id: &PeerId, err: &io::Error, ) { + // there's a race condition where we accepted an incoming connection while we were trying to + // connect to the same peer at the same time. if the outgoing connection failed + // after the incoming connection was accepted, we can ignore this error + if let Some(peer) = self.peers.get(peer_id) { + if peer.state.is_incoming() { + // we already have an active connection to the peer, so we can ignore this error + return + } + } + self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect) } @@ -623,8 +639,8 @@ impl PeersManager { self.connection_info.decr_pending_in(); } Direction::Outgoing(_) => { - // need to decrement the outgoing pending counter - self.connection_info.decr_pending_out(); + // cleanup is handled when the incoming active session is activated in + // `on_incoming_session_established` } } } @@ -1168,6 +1184,12 @@ impl PeerConnectionState { fn is_unconnected(&self) -> bool { matches!(self, PeerConnectionState::Idle) } + + /// Returns true if there's currently an outbound dial to that peer. + #[inline] + fn is_pending_out(&self) -> bool { + matches!(self, PeerConnectionState::PendingOut) + } } /// Commands the [`PeersManager`] listens for. @@ -2688,6 +2710,39 @@ mod tests { assert_eq!(peers.connection_info.num_outbound, 0); } + #[tokio::test] + async fn test_already_connected_incoming_outgoing_connection_error() { + let peer_id = PeerId::random(); + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009); + let mut peers = PeersManager::default(); + + peers.on_incoming_pending_session(addr.ip()).unwrap(); + peers.add_peer(peer_id, addr, None); + + match event!(peers) { + PeerAction::PeerAdded(_) => {} + _ => unreachable!(), + } + + match event!(peers) { + PeerAction::Connect { .. } => {} + _ => unreachable!(), + } + + peers.on_incoming_session_established(peer_id, addr); + + peers.on_outgoing_connection_failure( + &addr, + &peer_id, + &io::Error::new(io::ErrorKind::ConnectionRefused, ""), + ); + assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In); + assert_eq!(peers.connection_info.num_inbound, 1); + assert_eq!(peers.connection_info.num_pending_out, 0); + assert_eq!(peers.connection_info.num_pending_in, 0); + assert_eq!(peers.connection_info.num_outbound, 0); + } + #[tokio::test] async fn test_max_concurrent_dials() { let config = PeersConfig::default();