fix: race condition concurrent in and out from same peer (#7317)

This commit is contained in:
Matthias Seitz
2024-03-25 12:19:19 +01:00
committed by GitHub
parent 10854e5983
commit b58cca7f91
2 changed files with 63 additions and 3 deletions

View File

@ -40,8 +40,13 @@ pub enum ReputationChangeKind {
}
impl ReputationChangeKind {
/// Returns true if the reputation change is a reset.
/// Returns true if the reputation change is a [ReputationChangeKind::Reset].
pub fn is_reset(&self) -> bool {
matches!(self, Self::Reset)
}
/// Returns true if the reputation change is [ReputationChangeKind::Dropped].
pub fn is_dropped(&self) -> bool {
matches!(self, Self::Dropped)
}
}

View File

@ -312,6 +312,12 @@ impl PeersManager {
self.queued_actions.push_back(PeerAction::DisconnectBannedIncoming { peer_id });
return
}
// it might be the case that we're also trying to connect to this peer at the same
// time, so we need to adjust the state here
if peer.state.is_pending_out() {
self.connection_info.decr_state(peer.state);
}
peer.state = PeerConnectionState::In;
is_trusted = is_trusted || peer.is_trusted();
@ -533,6 +539,16 @@ impl PeersManager {
peer_id: &PeerId,
err: &io::Error,
) {
// there's a race condition where we accepted an incoming connection while we were trying to
// connect to the same peer at the same time. if the outgoing connection failed
// after the incoming connection was accepted, we can ignore this error
if let Some(peer) = self.peers.get(peer_id) {
if peer.state.is_incoming() {
// we already have an active connection to the peer, so we can ignore this error
return
}
}
self.on_connection_failure(remote_addr, peer_id, err, ReputationChangeKind::FailedToConnect)
}
@ -623,8 +639,8 @@ impl PeersManager {
self.connection_info.decr_pending_in();
}
Direction::Outgoing(_) => {
// need to decrement the outgoing pending counter
self.connection_info.decr_pending_out();
// cleanup is handled when the incoming active session is activated in
// `on_incoming_session_established`
}
}
}
@ -1168,6 +1184,12 @@ impl PeerConnectionState {
fn is_unconnected(&self) -> bool {
matches!(self, PeerConnectionState::Idle)
}
/// Returns true if there's currently an outbound dial to that peer.
#[inline]
fn is_pending_out(&self) -> bool {
matches!(self, PeerConnectionState::PendingOut)
}
}
/// Commands the [`PeersManager`] listens for.
@ -2688,6 +2710,39 @@ mod tests {
assert_eq!(peers.connection_info.num_outbound, 0);
}
#[tokio::test]
async fn test_already_connected_incoming_outgoing_connection_error() {
let peer_id = PeerId::random();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
let mut peers = PeersManager::default();
peers.on_incoming_pending_session(addr.ip()).unwrap();
peers.add_peer(peer_id, addr, None);
match event!(peers) {
PeerAction::PeerAdded(_) => {}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { .. } => {}
_ => unreachable!(),
}
peers.on_incoming_session_established(peer_id, addr);
peers.on_outgoing_connection_failure(
&addr,
&peer_id,
&io::Error::new(io::ErrorKind::ConnectionRefused, ""),
);
assert_eq!(peers.peers.get(&peer_id).unwrap().state, PeerConnectionState::In);
assert_eq!(peers.connection_info.num_inbound, 1);
assert_eq!(peers.connection_info.num_pending_out, 0);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_outbound, 0);
}
#[tokio::test]
async fn test_max_concurrent_dials() {
let config = PeersConfig::default();