perf: add eth response backpressure (#13971)

Co-authored-by: Dan Cline <6798349+Rjected@users.noreply.github.com>
This commit is contained in:
Matthias Seitz
2025-01-24 22:20:22 +01:00
committed by GitHub
parent 049543655a
commit a087731199
3 changed files with 77 additions and 2 deletions

View File

@ -254,6 +254,30 @@ impl<N: NetworkPrimitives> EthMessage<N> {
Self::Receipts(_) => EthMessageID::Receipts,
}
}
/// Returns true if the message variant is a request.
pub const fn is_request(&self) -> bool {
matches!(
self,
Self::GetBlockBodies(_) |
Self::GetBlockHeaders(_) |
Self::GetReceipts(_) |
Self::GetPooledTransactions(_) |
Self::GetNodeData(_)
)
}
/// Returns true if the message variant is a response to a request.
pub const fn is_response(&self) -> bool {
matches!(
self,
Self::PooledTransactions(_) |
Self::Receipts(_) |
Self::BlockHeaders(_) |
Self::BlockBodies(_) |
Self::NodeData(_)
)
}
}
impl<N: NetworkPrimitives> Encodable for EthMessage<N> {

View File

@ -44,7 +44,7 @@ const MAX_HEADERS_SERVE: usize = 1024;
/// `SOFT_RESPONSE_LIMIT`.
const MAX_BODIES_SERVE: usize = 1024;
/// Maximum size of replies to data retrievals.
/// Maximum size of replies to data retrievals: 2MB
const SOFT_RESPONSE_LIMIT: usize = 2 * 1024 * 1024;
/// Manages eth related requests on top of the p2p network.
@ -167,7 +167,7 @@ where
for hash in request.0 {
if let Some(block) = self.client.block_by_hash(hash).unwrap_or_default() {
let (_, body) = block.split();
let body = block.into_body();
total_bytes += body.length();
bodies.push(body);

View File

@ -54,6 +54,19 @@ const SAMPLE_IMPACT: f64 = 0.1;
/// Amount of RTTs before timeout
const TIMEOUT_SCALING: u32 = 3;
/// Restricts the number of queued outgoing messages for larger responses:
/// - Block Bodies
/// - Receipts
/// - Headers
/// - `PooledTransactions`
///
/// With proper softlimits in place (2MB) this targets 10MB (4+1 * 2MB) of outgoing response data.
///
/// This parameter serves as backpressure for reading additional requests from the remote.
/// Once we've queued up more responses than this, the session should priorotize message flushing
/// before reading any more messages from the remote peer, throttling the peer.
const MAX_QUEUED_OUTGOING_RESPONSES: usize = 4;
/// The type that advances an established session by listening for incoming messages (from local
/// node or read from connection) and emitting events back to the
/// [`SessionManager`](super::SessionManager).
@ -122,6 +135,11 @@ impl<N: NetworkPrimitives> ActiveSession<N> {
self.queued_outgoing.shrink_to_fit();
}
/// Returns how many responses we've currently queued up.
fn queued_response_count(&self) -> usize {
self.queued_outgoing.messages.iter().filter(|m| m.is_response()).count()
}
/// Handle a message read from the connection.
///
/// Returns an error if the message is considered to be in violation of the protocol.
@ -596,6 +614,29 @@ impl<N: NetworkPrimitives> Future for ActiveSession<N> {
};
}
// check whether we should throttle incoming messages
if this.received_requests_from_remote.len() > MAX_QUEUED_OUTGOING_RESPONSES {
// we're currently waiting for the responses to the peer's requests which aren't
// queued as outgoing yet
//
// Note: we don't need to register the waker here because we polled the requests
// above
break 'receive
}
// we also need to check if we have multiple responses queued up
if this.queued_outgoing.messages.len() > MAX_QUEUED_OUTGOING_RESPONSES &&
this.queued_response_count() > MAX_QUEUED_OUTGOING_RESPONSES
{
// if we've queued up more responses than allowed, we don't poll for new
// messages and break the receive loop early
//
// Note: we don't need to register the waker here because we still have
// queued messages and the sink impl registered the waker because we've
// already advanced it to `Pending` earlier
break 'receive
}
match this.conn.poll_next_unpin(cx) {
Poll::Pending => break,
Poll::Ready(None) => {
@ -740,6 +781,16 @@ pub(crate) enum OutgoingMessage<N: NetworkPrimitives> {
Raw(RawCapabilityMessage),
}
impl<N: NetworkPrimitives> OutgoingMessage<N> {
/// Returns true if this is a response.
const fn is_response(&self) -> bool {
match self {
Self::Eth(msg) => msg.is_response(),
_ => false,
}
}
}
impl<N: NetworkPrimitives> From<EthMessage<N>> for OutgoingMessage<N> {
fn from(value: EthMessage<N>) -> Self {
Self::Eth(value)