fix(net): increment in counter after peer's state is marked as in (#13863)

This commit is contained in:
Matthias Seitz
2025-01-18 20:29:54 +01:00
committed by GitHub
parent 20a3a6a4cb
commit 4d17cde9f5

View File

@ -329,9 +329,6 @@ impl PeersManager {
// start a new tick, so the peer is not immediately rewarded for the time since last tick // start a new tick, so the peer is not immediately rewarded for the time since last tick
self.tick(); self.tick();
let has_in_capacity = self.connection_info.has_in_capacity();
self.connection_info.inc_in();
match self.peers.entry(peer_id) { match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => { Entry::Occupied(mut entry) => {
let peer = entry.get_mut(); let peer = entry.get_mut();
@ -359,6 +356,10 @@ impl PeersManager {
} }
} }
let has_in_capacity = self.connection_info.has_in_capacity();
// increment new incoming connection
self.connection_info.inc_in();
// disconnect the peer if we don't have capacity for more inbound connections // disconnect the peer if we don't have capacity for more inbound connections
if !is_trusted && !has_in_capacity { if !is_trusted && !has_in_capacity {
self.queued_actions.push_back(PeerAction::Disconnect { self.queued_actions.push_back(PeerAction::Disconnect {
@ -2696,4 +2697,93 @@ mod tests {
assert_eq!(record.tcp_addr(), socket_addr); assert_eq!(record.tcp_addr(), socket_addr);
assert_eq!(record.udp_addr(), socket_addr); assert_eq!(record.udp_addr(), socket_addr);
} }
#[tokio::test]
async fn test_incoming_connection_from_banned() {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let config = PeersConfig::test().with_max_inbound(3);
let mut peers = PeersManager::new(config);
peers.add_peer(peer, PeerAddr::from_tcp(socket_addr), None);
match event!(peers) {
PeerAction::PeerAdded(peer_id) => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
// simulate new connection drops with error
loop {
peers.on_active_session_dropped(
&socket_addr,
&peer,
&EthStreamError::InvalidMessage(reth_eth_wire::message::MessageError::Invalid(
reth_eth_wire::EthVersion::Eth68,
reth_eth_wire::EthMessageID::Status,
)),
);
if peers.peers.get(&peer).unwrap().is_banned() {
break;
}
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
peers.on_incoming_session_established(peer, socket_addr);
match event!(peers) {
PeerAction::Connect { peer_id, .. } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
}
assert!(peers.peers.get(&peer).unwrap().is_banned());
// fill all incoming slots
for _ in 0..peers.connection_info.config.max_inbound {
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
peers.on_incoming_session_established(peer, socket_addr);
match event!(peers) {
PeerAction::DisconnectBannedIncoming { peer_id } => {
assert_eq!(peer_id, peer);
}
_ => unreachable!(),
}
}
poll_fn(|cx| {
assert!(peers.poll(cx).is_pending());
Poll::Ready(())
})
.await;
assert_eq!(peers.connection_info.num_inbound, 0);
let new_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 3)), 8008);
// Assert we can still accept new connections
assert!(peers.on_incoming_pending_session(new_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_pending_in, 1);
// the triggered DisconnectBannedIncoming will result in dropped connections, assert that
// connection info is updated via the peer's state which would be a noop here since the
// banned peer's state is idle
peers.on_active_session_gracefully_closed(peer);
assert_eq!(peers.connection_info.num_inbound, 0);
}
} }