Track PendingIn state separately (#7163)

This commit is contained in:
Abner Zheng
2024-03-16 02:30:53 +08:00
committed by GitHub
parent 94954593ef
commit 4e1c56f8d0

View File

@ -221,19 +221,19 @@ impl PeersManager {
if self.ban_list.is_banned_ip(&addr) {
return Err(InboundConnectionError::IpBanned)
}
self.connection_info.inc_in();
self.connection_info.inc_pending_in();
Ok(())
}
/// Invoked when a previous call to [Self::on_incoming_pending_session] succeeded but it was
/// rejected.
pub(crate) fn on_incoming_pending_session_rejected_internally(&mut self) {
self.connection_info.decr_in();
self.connection_info.decr_pending_in();
}
/// Invoked when a pending session was closed.
pub(crate) fn on_incoming_pending_session_gracefully_closed(&mut self) {
self.connection_info.decr_in()
self.connection_info.decr_pending_in()
}
/// Invoked when a pending session was closed.
@ -251,7 +251,7 @@ impl PeersManager {
}
}
self.connection_info.decr_in()
self.connection_info.decr_pending_in();
}
/// Called when a new _incoming_ active session was established to the given peer.
@ -271,6 +271,10 @@ impl PeersManager {
// start a new tick, so the peer is not immediately rewarded for the time since last tick
self.tick();
let has_in_capacity = self.connection_info.has_in_capacity();
self.connection_info.decr_pending_in();
self.connection_info.inc_in();
match self.peers.entry(peer_id) {
Entry::Occupied(mut entry) => {
let value = entry.get_mut();
@ -281,7 +285,7 @@ impl PeersManager {
value.state = PeerConnectionState::In;
// if a peer is not trusted and we don't have capacity for more inbound connections,
// disconnecting the peer
if !value.is_trusted() && !self.connection_info.has_in_capacity() {
if !value.is_trusted() && !has_in_capacity {
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id,
reason: Some(DisconnectReason::TooManyPeers),
@ -297,7 +301,7 @@ impl PeersManager {
self.queued_actions.push_back(PeerAction::PeerAdded(peer_id));
// disconnect the peer if we don't have capacity for more inbound connections
if !self.connection_info.has_in_capacity() {
if !has_in_capacity {
self.queued_actions.push_back(PeerAction::Disconnect {
peer_id,
reason: Some(DisconnectReason::TooManyPeers),
@ -586,7 +590,7 @@ impl PeersManager {
match direction {
Direction::Incoming => {
// need to decrement the ingoing counter
self.connection_info.decr_in();
self.connection_info.decr_pending_in();
}
Direction::Outgoing(_) => {
// need to decrement the outgoing counter
@ -763,7 +767,7 @@ impl PeersManager {
PeerAction::Connect { peer_id, remote_addr: peer.addr }
};
self.connection_info.inc_pendingout();
self.connection_info.inc_pending_out();
self.queued_actions.push_back(action);
}
@ -865,10 +869,13 @@ pub struct ConnectionInfo {
num_outbound: usize,
/// Counter for pending outbound connections.
#[cfg_attr(feature = "serde", serde(skip))]
num_pendingout: usize,
num_pending_out: usize,
/// Counter for currently occupied slots for active inbound connections.
#[cfg_attr(feature = "serde", serde(skip))]
num_inbound: usize,
/// Counter for pending inbound connections.
#[cfg_attr(feature = "serde", serde(skip))]
num_pending_in: usize,
/// Maximum allowed outbound connections.
max_outbound: usize,
/// Maximum allowed inbound connections.
@ -883,13 +890,13 @@ pub struct ConnectionInfo {
impl ConnectionInfo {
/// Returns `true` if there's still capacity for a new outgoing connection.
fn has_out_capacity(&self) -> bool {
self.num_pendingout < self.max_concurrent_outbound_dials &&
self.num_pending_out < self.max_concurrent_outbound_dials &&
self.num_outbound < self.max_outbound
}
/// Returns `true` if there's still capacity for a new incoming connection.
fn has_in_capacity(&self) -> bool {
self.num_inbound <= self.max_inbound
self.num_inbound < self.max_inbound
}
fn decr_state(&mut self, state: PeerConnectionState) {
@ -897,7 +904,7 @@ impl ConnectionInfo {
PeerConnectionState::Idle => {}
PeerConnectionState::DisconnectingIn | PeerConnectionState::In => self.decr_in(),
PeerConnectionState::DisconnectingOut | PeerConnectionState::Out => self.decr_out(),
PeerConnectionState::PendingOut => self.decr_pendingout(),
PeerConnectionState::PendingOut => self.decr_pending_out(),
}
}
@ -909,20 +916,28 @@ impl ConnectionInfo {
self.num_outbound += 1;
}
fn inc_pendingout(&mut self) {
self.num_pendingout += 1;
fn inc_pending_out(&mut self) {
self.num_pending_out += 1;
}
fn inc_in(&mut self) {
self.num_inbound += 1;
}
fn inc_pending_in(&mut self) {
self.num_pending_in += 1;
}
fn decr_in(&mut self) {
self.num_inbound -= 1;
}
fn decr_pendingout(&mut self) {
self.num_pendingout -= 1;
fn decr_pending_out(&mut self) {
self.num_pending_out -= 1;
}
fn decr_pending_in(&mut self) {
self.num_pending_in -= 1;
}
}
@ -934,7 +949,8 @@ impl Default for ConnectionInfo {
max_outbound: DEFAULT_MAX_COUNT_PEERS_OUTBOUND as usize,
max_inbound: DEFAULT_MAX_COUNT_PEERS_INBOUND as usize,
max_concurrent_outbound_dials: DEFAULT_MAX_COUNT_CONCURRENT_OUTBOUND_DIALS,
num_pendingout: 0,
num_pending_out: 0,
num_pending_in: 0,
}
}
}
@ -1908,9 +1924,9 @@ mod tests {
let mut peers = PeersManager::default();
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_inbound, 1);
assert_eq!(peers.connection_info.num_pending_in, 1);
peers.on_incoming_pending_session_rejected_internally();
assert_eq!(peers.connection_info.num_inbound, 0);
assert_eq!(peers.connection_info.num_pending_in, 0);
}
#[tokio::test]
@ -1919,9 +1935,9 @@ mod tests {
let mut peers = PeersManager::default();
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_inbound, 1);
assert_eq!(peers.connection_info.num_pending_in, 1);
peers.on_incoming_pending_session_gracefully_closed();
assert_eq!(peers.connection_info.num_inbound, 0);
assert_eq!(peers.connection_info.num_pending_in, 0);
}
#[tokio::test]
@ -1932,7 +1948,7 @@ mod tests {
let mut peers = PeersManager::new(config);
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_inbound, 1);
assert_eq!(peers.connection_info.num_pending_in, 1);
let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
DisconnectReason::UselessPeer,
@ -1940,7 +1956,7 @@ mod tests {
));
peers.on_incoming_pending_session_dropped(socket_addr, &err);
assert_eq!(peers.connection_info.num_inbound, 0);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert!(peers.ban_list.is_banned_ip(&socket_addr.ip()));
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
@ -2007,20 +2023,32 @@ mod tests {
let peer = PeerId::random();
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
let mut peers = PeersManager::default();
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
peers.on_incoming_session_established(peer, socket_addr);
// peer should have been added and num_inbound should have been increased
// Attempt to establish an incoming session, expecting `num_pending_in` to increase by 1
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_pending_in, 1);
// Establish a session with the peer, expecting the peer to be added and the `num_inbound`
// to increase by 1
peers.on_incoming_session_established(peer, socket_addr);
let p = peers.peers.get_mut(&peer).expect("peer not found");
assert_eq!(p.addr, socket_addr);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_inbound, 1);
// Attempt to establish another incoming session, expecting the `num_pending_in` to increase
// by 1
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
assert_eq!(peers.connection_info.num_pending_in, 1);
// Simulate a rejection due to an already established connection, expecting the
// `num_pending_in` to decrease by 1. The peer should remain connected and the `num_inbound`
// should not be changed.
peers.on_already_connected(Direction::Incoming);
// peer should not be connected and num_inbound should not have been increased
let p = peers.peers.get_mut(&peer).expect("peer not found");
assert_eq!(p.addr, socket_addr);
assert_eq!(peers.connection_info.num_pending_in, 0);
assert_eq!(peers.connection_info.num_inbound, 1);
}
@ -2458,7 +2486,7 @@ mod tests {
peer_manager.fill_outbound_slots();
// all dialed connections should be in 'PendingOut' state
let dials = peer_manager.connection_info.num_pendingout;
let dials = peer_manager.connection_info.num_pending_out;
assert_eq!(dials, peer_manager.connection_info.max_concurrent_outbound_dials);
let num_pendingout_states = peer_manager
@ -2483,6 +2511,6 @@ mod tests {
}
// no more pending outbound connections
assert_eq!(peer_manager.connection_info.num_pendingout, 0);
assert_eq!(peer_manager.connection_info.num_pending_out, 0);
}
}