perf(net): add manual yield point (#1155)

This commit is contained in:
Matthias Seitz
2023-02-04 21:56:25 +01:00
committed by GitHub
parent ac164a605a
commit 75692bc5f3

View File

@ -578,201 +578,235 @@ where
};
}
// advance the swarm
while let Poll::Ready(Some(event)) = this.swarm.poll_next_unpin(cx) {
// handle event
match event {
SwarmEvent::ValidMessage { peer_id, message } => {
this.on_peer_message(peer_id, message)
}
SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => {
this.on_invalid_message(peer_id, capabilities, message);
this.metrics.invalid_messages_received.increment(1);
}
SwarmEvent::TcpListenerClosed { remote_addr } => {
trace!(target : "net", ?remote_addr, "TCP listener closed.");
}
SwarmEvent::TcpListenerError(err) => {
trace!(target : "net", ?err, "TCP connection error.");
}
SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
trace!(target : "net", ?session_id, ?remote_addr, "Incoming connection");
this.metrics.total_incoming_connections.increment(1);
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
}
SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
trace!(target : "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
this.metrics.total_outgoing_connections.increment(1);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
}
SwarmEvent::SessionEstablished {
peer_id,
remote_addr,
capabilities,
messages,
status,
direction,
} => {
let total_active = this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
this.metrics.connected_peers.set(total_active as f64);
info!(
target : "net",
?remote_addr,
?peer_id,
?total_active,
"Session established"
);
// This loop drives the entire state of network and does a lot of work.
// Under heavy load (many messages/events), data may arrive faster than it can be processed
// (incoming messages/requests -> events), and it is possible that more data has already
// arrived by the time an internal event is processed. Which could turn this loop into a
// busy loop. Without yielding back to the executor, it can starve other tasks waiting on
// that executor to execute them, or drive underlying resources To prevent this, we
// preemptively return control when the `budget` is exhausted. The value itself is
// chosen somewhat arbitrarily, it is high enough so the swarm can make meaningful progress
// but low enough that this loop does not starve other tasks for too long.
// If the budget is exhausted we manually yield back control to the (coop) scheduler. This
// manual yield point should prevent situations where polling appears to be frozen. See also <https://tokio.rs/blog/2020-04-preemption>
// And tokio's docs on cooperative scheduling <https://docs.rs/tokio/latest/tokio/task/#cooperative-scheduling>
let mut budget = 1024;
if direction.is_incoming() {
this.swarm
.state_mut()
.peers_mut()
.on_active_inbound_session(peer_id, remote_addr);
loop {
// advance the swarm
match this.swarm.poll_next_unpin(cx) {
Poll::Pending | Poll::Ready(None) => break,
Poll::Ready(Some(event)) => {
// handle event
match event {
SwarmEvent::ValidMessage { peer_id, message } => {
this.on_peer_message(peer_id, message)
}
SwarmEvent::InvalidCapabilityMessage { peer_id, capabilities, message } => {
this.on_invalid_message(peer_id, capabilities, message);
this.metrics.invalid_messages_received.increment(1);
}
SwarmEvent::TcpListenerClosed { remote_addr } => {
trace!(target : "net", ?remote_addr, "TCP listener closed.");
}
SwarmEvent::TcpListenerError(err) => {
trace!(target : "net", ?err, "TCP connection error.");
}
SwarmEvent::IncomingTcpConnection { remote_addr, session_id } => {
trace!(target : "net", ?session_id, ?remote_addr, "Incoming connection");
this.metrics.total_incoming_connections.increment(1);
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
}
SwarmEvent::OutgoingTcpConnection { remote_addr, peer_id } => {
trace!(target : "net", ?remote_addr, ?peer_id, "Starting outbound connection.");
this.metrics.total_outgoing_connections.increment(1);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
}
SwarmEvent::SessionEstablished {
peer_id,
remote_addr,
capabilities,
messages,
status,
direction,
} => {
let total_active =
this.num_active_peers.fetch_add(1, Ordering::Relaxed) + 1;
this.metrics.connected_peers.set(total_active as f64);
info!(
target : "net",
?remote_addr,
?peer_id,
?total_active,
"Session established"
);
if direction.is_incoming() {
this.swarm
.state_mut()
.peers_mut()
.on_active_inbound_session(peer_id, remote_addr);
}
this.event_listeners.send(NetworkEvent::SessionEstablished {
peer_id,
capabilities,
status,
messages,
});
}
SwarmEvent::PeerAdded(peer_id) => {
trace!(target: "net", ?peer_id, "Peer added");
this.event_listeners.send(NetworkEvent::PeerAdded(peer_id));
this.metrics.tracked_peers.increment(1f64);
}
SwarmEvent::PeerRemoved(peer_id) => {
trace!(target: "net", ?peer_id, "Peer dropped");
this.event_listeners.send(NetworkEvent::PeerRemoved(peer_id));
this.metrics.tracked_peers.decrement(1f64);
}
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
let total_active =
this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
this.metrics.connected_peers.set(total_active as f64);
trace!(
target : "net",
?remote_addr,
?peer_id,
?total_active,
?error,
"Session disconnected"
);
let mut reason = None;
if let Some(ref err) = error {
// If the connection was closed due to an error, we report the peer
this.swarm.state_mut().peers_mut().on_active_session_dropped(
&remote_addr,
&peer_id,
err,
);
reason = err.as_disconnected();
} else {
// Gracefully disconnected
this.swarm
.state_mut()
.peers_mut()
.on_active_session_gracefully_closed(peer_id);
}
this.metrics.closed_sessions.increment(1);
// This can either be an incoming or outgoing connection which was
// closed. So we update both metrics
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
this.event_listeners
.send(NetworkEvent::SessionClosed { peer_id, reason });
}
SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
warn!(
target : "net",
?remote_addr,
?error,
"Incoming pending session failed"
);
if let Some(ref err) = error {
this.swarm
.state_mut()
.peers_mut()
.on_incoming_pending_session_dropped(remote_addr, err);
this.metrics.pending_session_failures.increment(1);
} else {
this.swarm
.state_mut()
.peers_mut()
.on_incoming_pending_session_gracefully_closed();
}
this.metrics.closed_sessions.increment(1);
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
}
SwarmEvent::OutgoingPendingSessionClosed {
remote_addr,
peer_id,
error,
} => {
trace!(
target : "net",
?remote_addr,
?peer_id,
?error,
"Outgoing pending session failed"
);
if let Some(ref err) = error {
this.swarm.state_mut().peers_mut().on_pending_session_dropped(
&remote_addr,
&peer_id,
err,
);
this.metrics.pending_session_failures.increment(1);
} else {
this.swarm
.state_mut()
.peers_mut()
.on_pending_session_gracefully_closed(&peer_id);
}
this.metrics.closed_sessions.increment(1);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
}
SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
trace!(
target : "net",
?remote_addr,
?peer_id,
?error,
"Outgoing connection error"
);
this.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
&remote_addr,
&peer_id,
&error,
);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
}
SwarmEvent::BadMessage { peer_id } => {
this.swarm.state_mut().peers_mut().apply_reputation_change(
&peer_id,
ReputationChangeKind::BadMessage,
);
this.metrics.invalid_messages_received.increment(1);
}
SwarmEvent::ProtocolBreach { peer_id } => {
this.swarm.state_mut().peers_mut().apply_reputation_change(
&peer_id,
ReputationChangeKind::BadProtocol,
);
}
}
this.event_listeners.send(NetworkEvent::SessionEstablished {
peer_id,
capabilities,
status,
messages,
});
}
SwarmEvent::PeerAdded(peer_id) => {
trace!(target: "net", ?peer_id, "Peer added");
this.event_listeners.send(NetworkEvent::PeerAdded(peer_id));
this.metrics.tracked_peers.increment(1f64);
}
SwarmEvent::PeerRemoved(peer_id) => {
trace!(target: "net", ?peer_id, "Peer dropped");
this.event_listeners.send(NetworkEvent::PeerRemoved(peer_id));
this.metrics.tracked_peers.decrement(1f64);
}
SwarmEvent::SessionClosed { peer_id, remote_addr, error } => {
let total_active = this.num_active_peers.fetch_sub(1, Ordering::Relaxed) - 1;
this.metrics.connected_peers.set(total_active as f64);
trace!(
target : "net",
?remote_addr,
?peer_id,
?total_active,
?error,
"Session disconnected"
);
}
let mut reason = None;
if let Some(ref err) = error {
// If the connection was closed due to an error, we report the peer
this.swarm.state_mut().peers_mut().on_active_session_dropped(
&remote_addr,
&peer_id,
err,
);
reason = err.as_disconnected();
} else {
// Gracefully disconnected
this.swarm
.state_mut()
.peers_mut()
.on_active_session_gracefully_closed(peer_id);
}
this.metrics.closed_sessions.increment(1);
// This can either be an incoming or outgoing connection which was closed.
// So we update both metrics
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
this.event_listeners.send(NetworkEvent::SessionClosed { peer_id, reason });
}
SwarmEvent::IncomingPendingSessionClosed { remote_addr, error } => {
warn!(
target : "net",
?remote_addr,
?error,
"Incoming pending session failed"
);
if let Some(ref err) = error {
this.swarm
.state_mut()
.peers_mut()
.on_incoming_pending_session_dropped(remote_addr, err);
this.metrics.pending_session_failures.increment(1);
} else {
this.swarm
.state_mut()
.peers_mut()
.on_incoming_pending_session_gracefully_closed();
}
this.metrics.closed_sessions.increment(1);
this.metrics
.incoming_connections
.set(this.swarm.state().peers().num_inbound_connections() as f64);
}
SwarmEvent::OutgoingPendingSessionClosed { remote_addr, peer_id, error } => {
trace!(
target : "net",
?remote_addr,
?peer_id,
?error,
"Outgoing pending session failed"
);
if let Some(ref err) = error {
this.swarm.state_mut().peers_mut().on_pending_session_dropped(
&remote_addr,
&peer_id,
err,
);
this.metrics.pending_session_failures.increment(1);
} else {
this.swarm
.state_mut()
.peers_mut()
.on_pending_session_gracefully_closed(&peer_id);
}
this.metrics.closed_sessions.increment(1);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
}
SwarmEvent::OutgoingConnectionError { remote_addr, peer_id, error } => {
trace!(
target : "net",
?remote_addr,
?peer_id,
?error,
"Outgoing connection error"
);
this.swarm.state_mut().peers_mut().on_outgoing_connection_failure(
&remote_addr,
&peer_id,
&error,
);
this.metrics
.outgoing_connections
.set(this.swarm.state().peers().num_outbound_connections() as f64);
}
SwarmEvent::BadMessage { peer_id } => {
this.swarm
.state_mut()
.peers_mut()
.apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
this.metrics.invalid_messages_received.increment(1);
}
SwarmEvent::ProtocolBreach { peer_id } => {
this.swarm
.state_mut()
.peers_mut()
.apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
}
// ensure we still have enough budget for another iteration
budget -= 1;
if budget == 0 {
// make sure we're woken up again
cx.waker().wake_by_ref();
break
}
}