From 4d17cde9f5950ff3b9f8eed393fd2d54a091a0b4 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Sat, 18 Jan 2025 20:29:54 +0100 Subject: [PATCH] fix(net): increment in counter after peer's state is marked as in (#13863) --- crates/net/network/src/peers.rs | 96 +++++++++++++++++++++++++++++++-- 1 file changed, 93 insertions(+), 3 deletions(-) diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers.rs index a7e981b58..a42a5660d 100644 --- a/crates/net/network/src/peers.rs +++ b/crates/net/network/src/peers.rs @@ -329,9 +329,6 @@ impl PeersManager { // start a new tick, so the peer is not immediately rewarded for the time since last tick self.tick(); - let has_in_capacity = self.connection_info.has_in_capacity(); - self.connection_info.inc_in(); - match self.peers.entry(peer_id) { Entry::Occupied(mut entry) => { let peer = entry.get_mut(); @@ -359,6 +356,10 @@ impl PeersManager { } } + let has_in_capacity = self.connection_info.has_in_capacity(); + // increment new incoming connection + self.connection_info.inc_in(); + // disconnect the peer if we don't have capacity for more inbound connections if !is_trusted && !has_in_capacity { self.queued_actions.push_back(PeerAction::Disconnect { @@ -2696,4 +2697,93 @@ mod tests { assert_eq!(record.tcp_addr(), socket_addr); assert_eq!(record.udp_addr(), socket_addr); } + + #[tokio::test] + async fn test_incoming_connection_from_banned() { + let peer = PeerId::random(); + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008); + let config = PeersConfig::test().with_max_inbound(3); + let mut peers = PeersManager::new(config); + peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None); + + match event!(peers) { + PeerAction::PeerAdded(peer_id) => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + match event!(peers) { + PeerAction::Connect { peer_id, .. } => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + + poll_fn(|cx| { + assert!(peers.poll(cx).is_pending()); + Poll::Ready(()) + }) + .await; + + // simulate new connection drops with error + loop { + peers.on_active_session_dropped( + &socket_addr, + &peer, + &EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid( + reth_eth_wire::EthVersion::Eth68, + reth_eth_wire::EthMessageID::Status, + )), + ); + + if peers.peers.get(&peer).unwrap().is_banned() { + break; + } + + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); + peers.on_incoming_session_established(peer, socket_addr); + + match event!(peers) { + PeerAction::Connect { peer_id, .. } => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + } + + assert!(peers.peers.get(&peer).unwrap().is_banned()); + + // fill all incoming slots + for _ in 0..peers.connection_info.config.max_inbound { + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); + peers.on_incoming_session_established(peer, socket_addr); + + match event!(peers) { + PeerAction::DisconnectBannedIncoming { peer_id } => { + assert_eq!(peer_id, peer); + } + _ => unreachable!(), + } + } + + poll_fn(|cx| { + assert!(peers.poll(cx).is_pending()); + Poll::Ready(()) + }) + .await; + + assert_eq!(peers.connection_info.num_inbound, 0); + + let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008); + + // Assert we can still accept new connections + assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok()); + assert_eq!(peers.connection_info.num_pending_in, 1); + + // the triggered DisconnectBannedIncoming will result in dropped connections, assert that + // connection info is updated via the peer's state which would be a noop here since the + // banned peer's state is idle + peers.on_active_session_gracefully_closed(peer); + assert_eq!(peers.connection_info.num_inbound, 0); + } }