perf(net): add protocol breach request timeout (#1099)

This commit is contained in:
Matthias Seitz
2023-02-01 00:20:44 +01:00
committed by GitHub
parent c5bc272057
commit ae771d291d
6 changed files with 230 additions and 72 deletions

View File

@ -756,6 +756,12 @@ where
.apply_reputation_change(&peer_id, ReputationChangeKind::BadMessage);
this.metrics.invalid_messages_received.increment(1);
}
SwarmEvent::ProtocolBreach { peer_id } => {
this.swarm
.state_mut()
.peers_mut()
.apply_reputation_change(&peer_id, ReputationChangeKind::BadProtocol);
}
}
}

View File

@ -38,6 +38,17 @@ use tokio::{
use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, info, trace, warn};
/// Constants for timeout updating
/// Minimum timeout value
const MINIMUM_TIMEOUT: Duration = Duration::from_secs(2);
/// Maximum timeout value
const MAXIMUM_TIMEOUT: Duration = INITIAL_REQUEST_TIMEOUT;
/// How much the new measurements affect the current timeout (X percent)
const SAMPLE_IMPACT: f64 = 0.1;
/// Amount of RTTs before timeout
const TIMEOUT_SCALING: u32 = 3;
/// The type that advances an established session by listening for incoming messages (from local
/// node or read from connection) and emitting events back to the
/// [`SessionManager`](super::SessionManager).
@ -73,22 +84,14 @@ pub(crate) struct ActiveSession {
/// Buffered messages that should be handled and sent to the peer.
pub(crate) queued_outgoing: VecDeque<OutgoingMessage>,
/// The maximum time we wait for a response from a peer.
pub(crate) request_timeout: Arc<AtomicU64>,
pub(crate) internal_request_timeout: Arc<AtomicU64>,
/// Interval when to check for timed out requests.
pub(crate) timeout_interval: Interval,
pub(crate) internal_request_timeout_interval: Interval,
/// If an [ActiveSession] does not receive a response at all within this duration then it is
/// considered a protocol violation and the session will initiate a drop.
pub(crate) protocol_breach_request_timeout: Duration,
}
/// Constants for timeout updating
/// Minimum timeout value
const MINIMUM_TIMEOUT: Duration = Duration::from_secs(2);
/// Maximum timeout value
const MAXIMUM_TIMEOUT: Duration = INITIAL_REQUEST_TIMEOUT;
/// How much the new measurements affect the current timeout (X percent)
const SAMPLE_IMPACT: f64 = 0.1;
/// Amount of RTTs before timeout
const TIMEOUT_SCALING: u32 = 3;
impl ActiveSession {
/// Returns `true` if the session is currently in the process of disconnecting
fn is_disconnecting(&self) -> bool {
@ -119,7 +122,7 @@ impl ActiveSession {
received: Instant::now(),
};
if self
.safe_emit_message(PeerMessage::EthRequest(PeerRequest::$req_item {
.emit_message_cloned(PeerMessage::EthRequest(PeerRequest::$req_item {
request,
response: tx,
}))
@ -136,14 +139,22 @@ impl ActiveSession {
let RequestPair { request_id, message } = $resp;
#[allow(clippy::collapsible_match)]
if let Some(req) = self.inflight_requests.remove(&request_id) {
if let PeerRequest::$item { response, .. } = req.request {
let _ = response.send(Ok(message));
self.update_request_timeout(req.timestamp, Instant::now())
} else {
req.request.send_bad_response();
match req.request {
RequestState::Waiting(PeerRequest::$item { response, .. }) => {
let _ = response.send(Ok(message));
self.update_request_timeout(req.timestamp, Instant::now())
}
RequestState::Waiting(request) => {
request.send_bad_response();
}
RequestState::TimedOut => {
// request was already timed out internally
self.update_request_timeout(req.timestamp, Instant::now())
}
}
} else {
// TODO: this could be a late response to timed out request <https://github.com/paradigmxyz/reth/issues/1067>
// we received a response to a request we never sent
self.on_bad_message();
}
};
}
@ -209,7 +220,11 @@ impl ActiveSession {
let request_id = self.next_id();
let msg = request.create_request_message(request_id);
self.queued_outgoing.push_back(msg.into());
let req = InflightRequest { request, timestamp: Instant::now(), deadline };
let req = InflightRequest {
request: RequestState::Waiting(request),
timestamp: Instant::now(),
deadline,
};
self.inflight_requests.insert(request_id, req);
}
@ -243,7 +258,8 @@ impl ActiveSession {
/// Returns the deadline timestamp at which the request times out
fn request_deadline(&self) -> Instant {
Instant::now() + Duration::from_millis(self.request_timeout.load(Ordering::Relaxed))
Instant::now() +
Duration::from_millis(self.internal_request_timeout.load(Ordering::Relaxed))
}
/// Handle a Response to the peer
@ -271,7 +287,7 @@ impl ActiveSession {
/// Send a message back to the [`SessionManager`](super::SessionManager)
/// covering both broadcasts and incoming requests
fn safe_emit_message(
fn emit_message_cloned(
&self,
message: PeerMessage,
) -> Result<(), mpsc::error::TrySendError<ActiveSessionMessage>> {
@ -291,7 +307,6 @@ impl ActiveSession {
}
/// Notify the manager that the peer sent a bad message
#[allow(unused)]
fn on_bad_message(&self) {
let _ = self
.to_session
@ -337,31 +352,53 @@ impl ActiveSession {
Poll::Ready(())
}
/// Removes all timed out requests
fn evict_timed_out_requests(&mut self, now: Instant) {
let mut timedout = Vec::new();
for (id, req) in self.inflight_requests.iter() {
if now > req.deadline {
timedout.push(*id)
/// Attempts to disconnect by sending the given disconnect reason
fn try_disconnect(&mut self, reason: DisconnectReason, cx: &mut Context<'_>) -> Poll<()> {
match self.start_disconnect(reason) {
Ok(()) => {
// we're done
self.poll_disconnect(cx)
}
Err(err) => {
error!(target: "net::session", ?err, remote_peer_id=?self.remote_peer_id, "could not send disconnect");
self.close_on_error(err);
Poll::Ready(())
}
}
}
/// Checks for _internally_ timed out requests.
///
/// If a requests misses its deadline, then it is timed out internally.
/// If a request misses the `protocol_breach_request_timeout` then this session is considered in
/// protocol violation and will close.
///
/// Returns `true` if a peer missed the `protocol_breach_request_timeout`, in which case the
/// session should be terminated.
#[must_use]
fn check_timed_out_requests(&mut self, now: Instant) -> bool {
for (id, req) in self.inflight_requests.iter_mut() {
if req.is_timed_out(now) {
if req.is_waiting() {
warn!(target: "net::session", ?id, remote_peer_id=?self.remote_peer_id, "timed out outgoing request");
req.timeout();
} else if now - req.timestamp > self.protocol_breach_request_timeout {
return true
}
}
}
for id in timedout {
warn!(target: "net::session", ?id, remote_peer_id=?self.remote_peer_id, "timed out outgoing request");
let req = self.inflight_requests.remove(&id).expect("exists; qed");
self.update_request_timeout(req.timestamp, req.deadline);
req.request.send_err_response(RequestError::Timeout);
}
false
}
/// Updates the request timeout with a request's timestamps
fn update_request_timeout(&mut self, sent: Instant, received: Instant) {
let elapsed = received.saturating_duration_since(sent);
let current = Duration::from_millis(self.request_timeout.load(Ordering::Relaxed));
let current = Duration::from_millis(self.internal_request_timeout.load(Ordering::Relaxed));
let request_timeout = calculate_new_timeout(current, elapsed);
self.request_timeout.store(request_timeout.as_millis() as u64, Ordering::Relaxed);
self.timeout_interval = tokio::time::interval(request_timeout);
self.internal_request_timeout.store(request_timeout.as_millis() as u64, Ordering::Relaxed);
self.internal_request_timeout_interval = tokio::time::interval(request_timeout);
}
}
@ -405,18 +442,8 @@ impl Future for ActiveSession {
info!(target: "net::session", ?reason, remote_peer_id=?this.remote_peer_id, "session received disconnect command");
let reason =
reason.unwrap_or(DisconnectReason::DisconnectRequested);
// try to disconnect
return match this.start_disconnect(reason) {
Ok(()) => {
// we're done
this.poll_disconnect(cx)
}
Err(err) => {
error!(target: "net::session", ?err, remote_peer_id=?this.remote_peer_id, "could not send disconnect");
this.close_on_error(err);
Poll::Ready(())
}
}
return this.try_disconnect(reason, cx)
}
SessionCommand::Message(msg) => {
this.on_peer_message(msg);
@ -503,9 +530,13 @@ impl Future for ActiveSession {
}
if !progress {
if this.timeout_interval.poll_tick(cx).is_ready() {
if this.internal_request_timeout_interval.poll_tick(cx).is_ready() {
// check for timed out requests
this.evict_timed_out_requests(Instant::now());
if this.check_timed_out_requests(Instant::now()) {
let _ = this.to_session.clone().try_send(
ActiveSessionMessage::ProtocolBreach { peer_id: this.remote_peer_id },
);
}
}
return Poll::Pending
@ -527,14 +558,46 @@ pub(crate) struct ReceivedRequest {
/// A request that waits for a response from the peer
pub(crate) struct InflightRequest {
/// Request sent to peer
request: PeerRequest,
/// Request we sent to peer and the internal response channel
request: RequestState,
/// Instant when the request was sent
timestamp: Instant,
/// Time limit for the response
deadline: Instant,
}
// === impl InflightRequest ===
impl InflightRequest {
/// Returns true if the request is timedout
#[inline]
fn is_timed_out(&self, now: Instant) -> bool {
now > self.deadline
}
/// Returns true if we're still waiting for a response
#[inline]
fn is_waiting(&self) -> bool {
matches!(self.request, RequestState::Waiting(_))
}
fn timeout(&mut self) {
let mut req = RequestState::TimedOut;
std::mem::swap(&mut self.request, &mut req);
if let RequestState::Waiting(req) = req {
req.send_err_response(RequestError::Timeout);
}
}
}
enum RequestState {
/// Waiting for the response
Waiting(PeerRequest),
/// Request already timed out
TimedOut,
}
/// Outgoing messages that can be sent over the wire.
pub(crate) enum OutgoingMessage {
/// A message that is owned.
@ -558,15 +621,17 @@ impl From<EthBroadcastMessage> for OutgoingMessage {
#[cfg(test)]
mod tests {
#![allow(dead_code)]
use super::*;
use crate::session::{
config::INITIAL_REQUEST_TIMEOUT, handle::PendingSessionEvent,
config::{INITIAL_REQUEST_TIMEOUT, PROTOCOL_BREACH_REQUEST_TIMEOUT},
handle::PendingSessionEvent,
start_pending_incoming_session,
};
use reth_ecies::util::pk2id;
use reth_eth_wire::{
EthVersion, HelloMessage, NewPooledTransactionHashes, ProtocolVersion, Status,
StatusBuilder, UnauthedEthStream, UnauthedP2PStream,
EthVersion, GetBlockBodies, HelloMessage, NewPooledTransactionHashes, ProtocolVersion,
Status, StatusBuilder, UnauthedEthStream, UnauthedP2PStream,
};
use reth_net_common::bandwidth_meter::BandwidthMeter;
use reth_primitives::{ForkFilter, Hardfork, MAINNET};
@ -685,10 +750,13 @@ mod tests {
conn,
queued_outgoing: Default::default(),
received_requests: Default::default(),
timeout_interval: tokio::time::interval(INITIAL_REQUEST_TIMEOUT),
request_timeout: Arc::new(AtomicU64::new(
internal_request_timeout_interval: tokio::time::interval(
INITIAL_REQUEST_TIMEOUT,
),
internal_request_timeout: Arc::new(AtomicU64::new(
INITIAL_REQUEST_TIMEOUT.as_millis() as u64,
)),
protocol_breach_request_timeout: PROTOCOL_BREACH_REQUEST_TIMEOUT,
}
}
_ => {
@ -810,6 +878,46 @@ mod tests {
rx.await.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn test_request_timeout() {
reth_tracing::init_test_tracing();
let mut builder = SessionBuilder::default();
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let local_addr = listener.local_addr().unwrap();
let request_timeout = Duration::from_millis(500);
let drop_timeout = Duration::from_millis(1500);
let fut = builder.with_client_stream(local_addr, move |client_stream| async move {
let _client_stream = client_stream;
tokio::time::sleep(drop_timeout * 100).await;
});
tokio::task::spawn(fut);
let (incoming, _) = listener.accept().await.unwrap();
let mut session = builder.connect_incoming(incoming).await;
session
.internal_request_timeout
.store(request_timeout.as_millis() as u64, Ordering::Relaxed);
session.protocol_breach_request_timeout = drop_timeout;
let (tx, rx) = oneshot::channel();
let req = PeerRequest::GetBlockBodies { request: GetBlockBodies(vec![]), response: tx };
session.on_peer_request(req, Instant::now());
tokio::spawn(session);
let err = rx.await.unwrap().unwrap_err();
assert_eq!(err, RequestError::Timeout);
// wait for protocol breach error
let msg = builder.active_session_rx.next().await.unwrap();
match msg {
ActiveSessionMessage::ProtocolBreach { .. } => {}
_ => unreachable!(),
}
}
#[tokio::test(flavor = "multi_thread")]
async fn test_keep_alive() {
let mut builder = SessionBuilder::default();
@ -844,7 +952,7 @@ mod tests {
let timeout = rtt * TIMEOUT_SCALING;
// if rtt hasn't changed, timeout shouldn't change
assert!(calculate_new_timeout(timeout, rtt) == timeout);
assert_eq!(calculate_new_timeout(timeout, rtt), timeout);
// if rtt changed, the new timeout should change less than it
assert!(calculate_new_timeout(timeout, rtt / 2) < timeout);

View File

@ -5,9 +5,14 @@ use std::time::Duration;
/// Default request timeout for a single request.
///
/// This represents the time we wait for a response until we consider it timed out.
/// This represents the amount of time we wait for a response until we consider it timed out.
pub const INITIAL_REQUEST_TIMEOUT: Duration = Duration::from_secs(20);
/// Default timeout after which we'll consider the peer to be in violation of the protocol.
///
/// This is the time a peer has to answer a response.
pub const PROTOCOL_BREACH_REQUEST_TIMEOUT: Duration = Duration::from_secs(2 * 60);
/// Configuration options when creating a [SessionManager](crate::session::SessionManager).
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
@ -20,8 +25,15 @@ pub struct SessionsConfig {
///
/// By default, no limits will be enforced.
pub limits: SessionLimits,
/// The maximum time we wait for a response from a peer.
pub request_timeout: Duration,
/// The maximum initial time we wait for a response from the peer before we timeout a request
/// _internally_.
pub initial_internal_request_timeout: Duration,
/// The amount of time we continue to wait for a response from the peer, even if we timed it
/// out internally (`initial_internal_request_timeout`). Timeouts are not penalized but the
/// session directly, however if a peer fails to respond at all (within
/// `PROTOCOL_BREACH_REQUEST_TIMEOUT`) this is considered a protocol violation and results in a
/// dropped session.
pub protocol_breach_request_timeout: Duration,
}
impl Default for SessionsConfig {
@ -36,7 +48,8 @@ impl Default for SessionsConfig {
// `poll`.
session_event_buffer: 128,
limits: Default::default(),
request_timeout: INITIAL_REQUEST_TIMEOUT,
initial_internal_request_timeout: INITIAL_REQUEST_TIMEOUT,
protocol_breach_request_timeout: PROTOCOL_BREACH_REQUEST_TIMEOUT,
}
}
}

View File

@ -169,4 +169,9 @@ pub(crate) enum ActiveSessionMessage {
/// Identifier of the remote peer.
peer_id: PeerId,
},
/// Remote peer is considered in protocol violation
ProtocolBreach {
/// Identifier of the remote peer.
peer_id: PeerId,
},
}

View File

@ -55,8 +55,12 @@ pub(crate) struct SessionManager {
next_id: usize,
/// Keeps track of all sessions
counter: SessionCounter,
/// The maximum time we wait for a response from a peer.
request_timeout: Duration,
/// The maximum initial time an [ActiveSession] waits for a response from the peer before it
/// responds to an _internal_ request with a `TimeoutError`
initial_internal_request_timeout: Duration,
/// If an [ActiveSession] does not receive a response at all within this duration then it is
/// considered a protocol violation and the session will initiate a drop.
protocol_breach_request_timeout: Duration,
/// The secret key used for authenticating sessions.
secret_key: SecretKey,
/// The `Status` message to send to peers.
@ -113,7 +117,8 @@ impl SessionManager {
Self {
next_id: 0,
counter: SessionCounter::new(config.limits),
request_timeout: config.request_timeout,
initial_internal_request_timeout: config.initial_internal_request_timeout,
protocol_breach_request_timeout: config.protocol_breach_request_timeout,
secret_key,
status,
hello_message,
@ -169,7 +174,7 @@ impl SessionManager {
/// Invoked on a received status update.
///
/// If the updated activated another fork, this will return a [`ForkTransition`] and updates the
/// active [`ForkId`](reth_primitives::ForkId). See also [`ForkFilter::set_head`].
/// active [`ForkId`](ForkId). See also [`ForkFilter::set_head`].
pub(crate) fn on_status_update(
&mut self,
height: u64,
@ -323,6 +328,9 @@ impl SessionManager {
ActiveSessionMessage::BadMessage { peer_id } => {
Poll::Ready(SessionEvent::BadMessage { peer_id })
}
ActiveSessionMessage::ProtocolBreach { peer_id } => {
Poll::Ready(SessionEvent::ProtocolBreach { peer_id })
}
}
}
}
@ -377,7 +385,9 @@ impl SessionManager {
let messages = PeerRequestSender::new(peer_id, to_session_tx);
let timeout = Arc::new(AtomicU64::new(self.request_timeout.as_millis() as u64));
let timeout = Arc::new(AtomicU64::new(
self.initial_internal_request_timeout.as_millis() as u64,
));
let session = ActiveSession {
next_id: 0,
@ -392,8 +402,11 @@ impl SessionManager {
conn,
queued_outgoing: Default::default(),
received_requests: Default::default(),
timeout_interval: tokio::time::interval(self.request_timeout),
request_timeout: Arc::clone(&timeout),
internal_request_timeout_interval: tokio::time::interval(
self.initial_internal_request_timeout,
),
internal_request_timeout: Arc::clone(&timeout),
protocol_breach_request_timeout: self.protocol_breach_request_timeout,
};
self.spawn(session);
@ -560,6 +573,11 @@ pub(crate) enum SessionEvent {
/// Identifier of the remote peer.
peer_id: PeerId,
},
/// Remote peer is considered in protocol violation
ProtocolBreach {
/// Identifier of the remote peer.
peer_id: PeerId,
},
/// Closed an incoming pending session during handshaking.
IncomingPendingSessionClosed {
remote_addr: SocketAddr,

View File

@ -173,6 +173,9 @@ where
Some(SwarmEvent::OutgoingConnectionError { peer_id, remote_addr, error })
}
SessionEvent::BadMessage { peer_id } => Some(SwarmEvent::BadMessage { peer_id }),
SessionEvent::ProtocolBreach { peer_id } => {
Some(SwarmEvent::ProtocolBreach { peer_id })
}
}
}
@ -331,6 +334,11 @@ pub(crate) enum SwarmEvent {
/// Identifier of the remote peer.
peer_id: PeerId,
},
/// Remote peer is considered in protocol violation
ProtocolBreach {
/// Identifier of the remote peer.
peer_id: PeerId,
},
/// The underlying tcp listener closed.
TcpListenerClosed {
/// Address of the closed listener.