fix: remove peers after incoming connection closed (#2245)

This commit is contained in:
Matthias Seitz
2023-04-14 17:42:49 +02:00
committed by GitHub
parent 46d7dcba89
commit 3779a225fb
2 changed files with 76 additions and 20 deletions

View File

@ -658,7 +658,7 @@ where
this.swarm
.state_mut()
.peers_mut()
.on_active_inbound_session(peer_id, remote_addr);
.on_incoming_session_established(peer_id, remote_addr);
}
this.event_listeners.send(NetworkEvent::SessionEstablished {
peer_id,

View File

@ -177,6 +177,18 @@ impl PeersManager {
self.peers.iter().map(|(peer_id, v)| NodeRecord::new(v.addr, *peer_id))
}
/// Returns the number of currently active inbound connections.
#[inline]
pub(crate) fn num_inbound_connections(&self) -> usize {
self.connection_info.num_inbound
}
/// Returns the number of currently active outbound connections.
#[inline]
pub(crate) fn num_outbound_connections(&self) -> usize {
self.connection_info.num_outbound
}
/// Invoked when a new _incoming_ tcp connection is accepted.
///
/// returns an error if the inbound ip address is on the ban list or
@ -207,18 +219,6 @@ impl PeersManager {
self.connection_info.decr_in()
}
/// Returns the number of currently active inbound connections.
#[inline]
pub(crate) fn num_inbound_connections(&self) -> usize {
self.connection_info.num_inbound
}
/// Returns the number of currently active outbound connections.
#[inline]
pub(crate) fn num_outbound_connections(&self) -> usize {
self.connection_info.num_outbound
}
/// Invoked when a pending session was closed.
pub(crate) fn on_incoming_pending_session_dropped(
&mut self,
@ -243,7 +243,7 @@ impl PeersManager {
///
/// If the reputation of the peer is below the `BANNED_REPUTATION` threshold, a disconnect will
/// be scheduled.
pub(crate) fn on_active_inbound_session(&mut self, peer_id: PeerId, addr: SocketAddr) {
pub(crate) fn on_incoming_session_established(&mut self, peer_id: PeerId, addr: SocketAddr) {
// we only need to check the peer id here as the ip address will have been checked at
// on_inbound_pending_session
if self.ban_list.is_banned_peer(&peer_id) {
@ -264,7 +264,11 @@ impl PeersManager {
value.state = PeerConnectionState::In;
}
Entry::Vacant(entry) => {
entry.insert(Peer::with_state(addr, PeerConnectionState::In));
// peer is missing in the table, we add it but mark it as to be removed after
// disconnect, because we only know the outgoing port
let mut peer = Peer::with_state(addr, PeerConnectionState::In);
peer.remove_after_disconnect = true;
entry.insert(peer);
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
}
}
@ -529,10 +533,18 @@ impl PeersManager {
match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
let node = entry.get_mut();
node.kind = kind;
node.addr = addr;
node.fork_id = fork_id;
let peer = entry.get_mut();
peer.kind = kind;
peer.fork_id = fork_id;
peer.addr = addr;
if peer.state.is_incoming() {
// now that we have an actual discovered address, for that peer and not just the
// ip of the incoming connection, we don't need to remove the peer after
// disconnecting, See `on_incoming_session_established`
peer.remove_after_disconnect = false;
}
return
}
Entry::Vacant(entry) => {
@ -914,6 +926,12 @@ impl PeerConnectionState {
}
}
/// Returns true if this is an active incoming connection.
#[inline]
fn is_incoming(&self) -> bool {
matches!(self, PeerConnectionState::In)
}
/// Returns whether we're currently connected with this peer
#[inline]
fn is_connected(&self) -> bool {
@ -1800,7 +1818,7 @@ mod test {
let ban_list = BanList::new(vec![given_peer_id], HashSet::new());
let config = PeersConfig::default().with_ban_list(ban_list);
let mut peer_manager = PeersManager::new(config);
peer_manager.on_active_inbound_session(given_peer_id, socket_addr);
peer_manager.on_incoming_session_established(given_peer_id, socket_addr);
let Some(PeerAction::DisconnectBannedIncoming { peer_id }) = peer_manager.queued_actions.pop_front() else { panic!() };
@ -1953,4 +1971,42 @@ mod test {
// tick applied
assert!(peer_manager.peers.get_mut(&peer_id).unwrap().reputation >= DEFAULT_REPUTATION);
}
#[tokio::test]
async fn test_remove_incoming_after_disconnect() {
let peer_id = PeerId::random();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
let mut peers = PeersManager::default();
peers.on_incoming_pending_session(addr.ip()).unwrap();
peers.on_incoming_session_established(peer_id, addr);
let peer = peers.peers.get(&peer_id).unwrap();
assert_eq!(peer.state, PeerConnectionState::In);
assert!(peer.remove_after_disconnect);
peers.on_active_session_gracefully_closed(peer_id);
assert!(peers.peers.get(&peer_id).is_none())
}
#[tokio::test]
async fn test_keep_incoming_after_disconnect_if_discovered() {
let peer_id = PeerId::random();
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8009);
let mut peers = PeersManager::default();
peers.on_incoming_pending_session(addr.ip()).unwrap();
peers.on_incoming_session_established(peer_id, addr);
let peer = peers.peers.get(&peer_id).unwrap();
assert_eq!(peer.state, PeerConnectionState::In);
assert!(peer.remove_after_disconnect);
// trigger discovery manually while the peer is still connected
peers.add_peer(peer_id, addr, None);
peers.on_active_session_gracefully_closed(peer_id);
let peer = peers.peers.get(&peer_id).unwrap();
assert_eq!(peer.state, PeerConnectionState::Idle);
assert!(!peer.remove_after_disconnect);
}
}