feat(net): add session limits (#253)

This commit is contained in:
Matthias Seitz
2022-11-25 18:56:49 +01:00
committed by GitHub
parent 6e7928ab84
commit 37809ce774
6 changed files with 242 additions and 79 deletions

View File

@ -117,6 +117,12 @@ impl<C> NetworkConfigBuilder<C> {
}
}
/// Sets a custom config for how sessions are handled.
pub fn sessions_config(mut self, config: SessionsConfig) -> Self {
self.sessions_config = Some(config);
self
}
/// Sets the genesis hash for the network.
pub fn genesis_hash(mut self, genesis_hash: H256) -> Self {
self.genesis_hash = genesis_hash;

View File

@ -0,0 +1,182 @@
//! Configuration types for [`SessionsManager`]
use crate::session::{Direction, ExceedsSessionLimit};
/// Configuration options when creating a [`SessionsManager`].
pub struct SessionsConfig {
/// Size of the session command buffer (per session task).
pub session_command_buffer: usize,
/// Size of the session event channel buffer.
pub session_event_buffer: usize,
/// Limits to enforce.
///
/// By default, no limits will be enforced
pub limits: SessionLimits,
}
impl Default for SessionsConfig {
fn default() -> Self {
SessionsConfig {
// This should be sufficient to slots for handling commands sent to the session task,
// since the manager is the sender.
session_command_buffer: 10,
// This should be greater since the manager is the receiver. The total size will be
// `buffer + num sessions`. Each session can therefor fit at least 1 message in the
// channel. The buffer size is additional capacity. The channel is always drained on
// `poll`.
session_event_buffer: 64,
limits: Default::default(),
}
}
}
impl SessionsConfig {
/// Sets the buffer size for the bounded communication channel between the manager and its
/// sessions for events emitted by the sessions.
///
/// It is expected, that the background session task will stall if they outpace the manager. The
/// buffer size provides backpressure on the network I/O.
pub fn with_session_event_buffer(mut self, n: usize) -> Self {
self.session_event_buffer = n;
self
}
}
/// Limits for sessions.
///
/// By default, no session limits will be enforced
#[derive(Debug, Clone, Default)]
pub struct SessionLimits {
max_pending_inbound: Option<u32>,
max_pending_outbound: Option<u32>,
max_established_inbound: Option<u32>,
max_established_outbound: Option<u32>,
}
impl SessionLimits {
/// Sets the maximum number of pending incoming sessions.
pub fn with_max_pending_inbound(mut self, limit: u32) -> Self {
self.max_pending_inbound = Some(limit);
self
}
/// Sets the maximum number of pending outbound sessions.
pub fn with_max_pending_outbound(mut self, limit: u32) -> Self {
self.max_pending_outbound = Some(limit);
self
}
/// Sets the maximum number of active inbound sessions.
pub fn with_max_established_inbound(mut self, limit: u32) -> Self {
self.max_established_inbound = Some(limit);
self
}
/// Sets the maximum number of active outbound sessions.
pub fn with_max_established_outbound(mut self, limit: u32) -> Self {
self.max_established_outbound = Some(limit);
self
}
}
/// Keeps track of all sessions.
#[derive(Debug, Clone)]
pub struct SessionCounter {
/// Limits to enforce.
limits: SessionLimits,
/// Number of pending incoming sessions.
pending_inbound: u32,
/// Number of pending outgoing sessions.
pending_outbound: u32,
/// Number of active inbound sessions.
active_inbound: u32,
/// Number of active outbound sessions.
active_outbound: u32,
}
// === impl SessionCounter ===
impl SessionCounter {
pub(crate) fn new(limits: SessionLimits) -> Self {
Self {
limits,
pending_inbound: 0,
pending_outbound: 0,
active_inbound: 0,
active_outbound: 0,
}
}
pub(crate) fn inc_pending_inbound(&mut self) {
self.pending_inbound += 1;
}
pub(crate) fn inc_pending_outbound(&mut self) {
self.pending_inbound += 1;
}
pub(crate) fn dec_pending(&mut self, direction: &Direction) {
match direction {
Direction::Outgoing(_) => {
self.pending_outbound -= 1;
}
Direction::Incoming => {
self.pending_inbound -= 1;
}
}
}
pub(crate) fn inc_active(&mut self, direction: &Direction) {
match direction {
Direction::Outgoing(_) => {
self.active_outbound += 1;
}
Direction::Incoming => {
self.active_inbound += 1;
}
}
}
pub(crate) fn dec_active(&mut self, direction: &Direction) {
match direction {
Direction::Outgoing(_) => {
self.active_outbound -= 1;
}
Direction::Incoming => {
self.active_inbound -= 1;
}
}
}
pub(crate) fn ensure_pending_outbound(&self) -> Result<(), ExceedsSessionLimit> {
Self::ensure(self.pending_outbound, self.limits.max_pending_outbound)
}
pub(crate) fn ensure_pending_inbound(&self) -> Result<(), ExceedsSessionLimit> {
Self::ensure(self.pending_inbound, self.limits.max_pending_inbound)
}
fn ensure(current: u32, limit: Option<u32>) -> Result<(), ExceedsSessionLimit> {
if let Some(limit) = limit {
if current >= limit {
return Err(ExceedsSessionLimit(limit))
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_limits() {
let mut limits = SessionCounter::new(SessionLimits::default().with_max_pending_inbound(2));
assert!(limits.ensure_pending_outbound().is_ok());
limits.inc_pending_inbound();
assert!(limits.ensure_pending_inbound().is_ok());
limits.inc_pending_inbound();
assert!(limits.ensure_pending_inbound().is_err());
}
}

View File

@ -24,6 +24,8 @@ use tokio::{
pub(crate) struct PendingSessionHandle {
/// Can be used to tell the session to disconnect the connection/abort the handshake process.
pub(crate) disconnect_tx: oneshot::Sender<()>,
/// The direction of the session
pub(crate) direction: Direction,
}
/// An established session with a remote peer.
@ -32,6 +34,8 @@ pub(crate) struct PendingSessionHandle {
/// be performed: chain synchronization, block propagation and transaction exchange.
#[derive(Debug)]
pub(crate) struct ActiveSessionHandle {
/// The direction of the session
pub(crate) direction: Direction,
/// The assigned id for this session
pub(crate) session_id: SessionId,
/// The identifier of the remote peer

View File

@ -39,7 +39,10 @@ use tokio_stream::wrappers::ReceiverStream;
use tracing::{instrument, trace, warn};
mod active;
mod config;
mod handle;
use crate::session::config::SessionCounter;
pub use config::SessionsConfig;
/// Internal identifier for active sessions.
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
@ -50,6 +53,8 @@ pub struct SessionId(usize);
pub(crate) struct SessionManager {
/// Tracks the identifier for the next session.
next_id: usize,
/// Keeps track of all sessions
counter: SessionCounter,
/// The secret key used for authenticating sessions.
secret_key: SecretKey,
/// The node id of node
@ -108,6 +113,7 @@ impl SessionManager {
Self {
next_id: 0,
counter: SessionCounter::new(config.limits),
secret_key,
peer_id,
status,
@ -155,7 +161,8 @@ impl SessionManager {
stream: TcpStream,
remote_addr: SocketAddr,
) -> Result<SessionId, ExceedsSessionLimit> {
// TODO(mattsse): enforce limits
self.counter.ensure_pending_inbound()?;
let session_id = self.next_id();
let (disconnect_tx, disconnect_rx) = oneshot::channel();
let pending_events = self.pending_sessions_tx.clone();
@ -171,8 +178,9 @@ impl SessionManager {
self.fork_filter.clone(),
));
let handle = PendingSessionHandle { disconnect_tx };
let handle = PendingSessionHandle { disconnect_tx, direction: Direction::Incoming };
self.pending_sessions.insert(session_id, handle);
self.counter.inc_pending_inbound();
Ok(session_id)
}
@ -193,8 +201,10 @@ impl SessionManager {
self.fork_filter.clone(),
));
let handle = PendingSessionHandle { disconnect_tx };
let handle =
PendingSessionHandle { disconnect_tx, direction: Direction::Outgoing(remote_peer_id) };
self.pending_sessions.insert(session_id, handle);
self.counter.inc_pending_outbound();
}
/// Initiates a shutdown of the channel.
@ -214,6 +224,20 @@ impl SessionManager {
}
}
/// Removes the [`PendingSessionHandle`] if it exists.
fn remove_pending_session(&mut self, id: &SessionId) -> Option<PendingSessionHandle> {
let session = self.pending_sessions.remove(id)?;
self.counter.dec_pending(&session.direction);
Some(session)
}
/// Removes the [`PendingSessionHandle`] if it exists.
fn remove_active_session(&mut self, id: &PeerId) -> Option<ActiveSessionHandle> {
let session = self.active_sessions.remove(id)?;
self.counter.dec_active(&session.direction);
Some(session)
}
/// This polls all the session handles and returns [`SessionEvent`].
///
/// Active sessions are prioritized.
@ -232,7 +256,7 @@ impl SessionManager {
?peer_id,
"gracefully disconnected active session."
);
let _ = self.active_sessions.remove(&peer_id);
self.remove_active_session(&peer_id);
Poll::Ready(SessionEvent::Disconnected { peer_id, remote_addr })
}
ActiveSessionMessage::ClosedOnConnectionError {
@ -241,7 +265,7 @@ impl SessionManager {
error,
} => {
trace!(target : "net::session", ?peer_id, ?error,"closed session.");
let _ = self.active_sessions.remove(&peer_id);
self.remove_active_session(&peer_id);
Poll::Ready(SessionEvent::SessionClosedOnConnectionError {
remote_addr,
peer_id,
@ -287,7 +311,7 @@ impl SessionManager {
direction,
} => {
// move from pending to established.
let _ = self.pending_sessions.remove(&session_id);
self.remove_pending_session(&session_id);
let (commands_to_session, commands_rx) =
mpsc::channel(self.session_command_buffer);
@ -314,6 +338,7 @@ impl SessionManager {
self.spawn(session);
let handle = ActiveSessionHandle {
direction,
session_id,
remote_id: peer_id,
established: Instant::now(),
@ -322,6 +347,7 @@ impl SessionManager {
};
self.active_sessions.insert(peer_id, handle);
self.counter.inc_active(&direction);
return Poll::Ready(SessionEvent::SessionEstablished {
peer_id,
@ -339,7 +365,7 @@ impl SessionManager {
?remote_addr,
"disconnected pending session"
);
let _ = self.pending_sessions.remove(&session_id);
self.remove_pending_session(&session_id);
return match direction {
Direction::Incoming => {
Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
@ -370,7 +396,7 @@ impl SessionManager {
?peer_id,
"connection refused"
);
let _ = self.pending_sessions.remove(&session_id);
self.remove_pending_session(&session_id);
return Poll::Ready(SessionEvent::OutgoingPendingSessionClosed {
remote_addr,
peer_id,
@ -383,7 +409,7 @@ impl SessionManager {
error,
direction,
} => {
let _ = self.pending_sessions.remove(&session_id);
self.remove_pending_session(&session_id);
warn!(
target : "net::session",
?error,
@ -391,7 +417,7 @@ impl SessionManager {
?remote_addr,
"ecies auth failed"
);
let _ = self.pending_sessions.remove(&session_id);
self.remove_pending_session(&session_id);
return match direction {
Direction::Incoming => {
Poll::Ready(SessionEvent::IncomingPendingSessionClosed {
@ -415,41 +441,6 @@ impl SessionManager {
}
}
/// Configuration options when creating a [`SessionsManager`].
pub struct SessionsConfig {
/// Size of the session command buffer (per session task).
pub session_command_buffer: usize,
/// Size of the session event channel buffer.
pub session_event_buffer: usize,
}
impl Default for SessionsConfig {
fn default() -> Self {
SessionsConfig {
// This should be sufficient to slots for handling commands sent to the session task,
// since the manager is the sender.
session_command_buffer: 10,
// This should be greater since the manager is the receiver. The total size will be
// `buffer + num sessions`. Each session can therefor fit at least 1 message in the
// channel. The buffer size is additional capacity. The channel is always drained on
// `poll`.
session_event_buffer: 64,
}
}
}
impl SessionsConfig {
/// Sets the buffer size for the bounded communication channel between the manager and its
/// sessions for events emitted by the sessions.
///
/// It is expected, that the background session task will stall if they outpace the manager. The
/// buffer size provides backpressure on the network I/O.
pub fn with_session_event_buffer(mut self, n: usize) -> Self {
self.session_event_buffer = n;
self
}
}
/// Events produced by the [`SessionManager`]
pub(crate) enum SessionEvent {
/// A new session was successfully authenticated.
@ -509,7 +500,7 @@ pub(crate) enum SessionEvent {
/// accepted.
#[derive(Debug, Clone, thiserror::Error)]
#[error("Session limit reached {0}")]
pub struct ExceedsSessionLimit(usize);
pub struct ExceedsSessionLimit(pub(crate) u32);
/// Starts the authentication process for a connection initiated by a remote peer.
///
@ -598,6 +589,7 @@ impl Direction {
}
}
/// Authenticates a session
async fn authenticate(
disconnect_rx: oneshot::Receiver<()>,
events: mpsc::Sender<PendingSessionEvent>,

View File

@ -100,8 +100,7 @@ where
capabilities: Arc<Capabilities>,
status: Status,
request_tx: PeerRequestSender,
) -> Result<(), AddSessionError> {
// TODO add capacity check
) {
debug_assert!(!self.connected_peers.contains_key(&peer), "Already connected; not possible");
// find the corresponding block number
@ -119,8 +118,6 @@ where
blocks: LruCache::new(NonZeroUsize::new(PEER_BLOCK_CACHE_LIMIT).unwrap()),
},
);
Ok(())
}
/// Event hook for a disconnected session for the peer.
@ -424,12 +421,3 @@ pub(crate) enum StateAction {
reason: Option<DisconnectReason>,
},
}
#[derive(Debug, thiserror::Error)]
pub enum AddSessionError {
#[error("No capacity for new sessions")]
AtCapacity {
/// The peer of the session
peer: PeerId,
},
}

View File

@ -3,13 +3,12 @@ use crate::{
listener::{ConnectionListener, ListenerEvent},
message::{PeerMessage, PeerRequestSender},
session::{Direction, SessionEvent, SessionId, SessionManager},
state::{AddSessionError, NetworkState, StateAction},
state::{NetworkState, StateAction},
};
use futures::Stream;
use reth_eth_wire::{
capability::{Capabilities, CapabilityMessage},
error::EthStreamError,
DisconnectReason,
};
use reth_interfaces::provider::BlockProvider;
use reth_primitives::PeerId;
@ -88,29 +87,21 @@ where
status,
messages,
direction,
} => match self.state.on_session_activated(
peer_id,
capabilities.clone(),
status,
messages.clone(),
) {
Ok(_) => Some(SwarmEvent::SessionEstablished {
} => {
self.state.on_session_activated(
peer_id,
capabilities.clone(),
status,
messages.clone(),
);
Some(SwarmEvent::SessionEstablished {
peer_id,
remote_addr,
capabilities,
messages,
direction,
}),
Err(err) => {
match err {
AddSessionError::AtCapacity { peer } => {
self.sessions.disconnect(peer, Some(DisconnectReason::TooManyPeers));
}
};
self.state.peers_mut().on_disconnected(&peer_id);
None
}
},
})
}
SessionEvent::ValidMessage { peer_id, message } => {
Some(SwarmEvent::ValidMessage { peer_id, message })
}