perf: reduce p2p message capacity (#3371)

This commit is contained in:
Matthias Seitz
2023-06-25 15:11:24 +02:00
committed by GitHub
parent 4c84d22e36
commit b787d09993

View File

@ -56,7 +56,11 @@ const GRACE_PERIOD: Duration = Duration::from_secs(2);
/// [`MAX_P2P_CAPACITY`] is the maximum number of messages that can be buffered to be sent in the
/// `p2p` stream.
const MAX_P2P_CAPACITY: usize = 64;
///
/// Note: this default is rather low because it is expected that the [P2PStream] wraps an
/// [ECIESStream](reth_ecies::stream::ECIESStream) which internally already buffers a few MB of
/// encoded data.
const MAX_P2P_CAPACITY: usize = 2;
/// An un-authenticated [`P2PStream`]. This is consumed and returns a [`P2PStream`] after the
/// `Hello` handshake is completed.
@ -212,6 +216,10 @@ pub struct P2PStream<S> {
/// Outgoing messages buffered for sending to the underlying stream.
outgoing_messages: VecDeque<Bytes>,
/// Maximum number of messages that we can buffer here before the [Sink] impl returns
/// [Poll::Pending].
outgoing_message_buffer_capacity: usize,
/// Whether this stream is currently in the process of disconnecting by sending a disconnect
/// message.
disconnecting: bool,
@ -229,10 +237,20 @@ impl<S> P2PStream<S> {
pinger: Pinger::new(PING_INTERVAL, PING_TIMEOUT),
shared_capability: capability,
outgoing_messages: VecDeque::new(),
outgoing_message_buffer_capacity: MAX_P2P_CAPACITY,
disconnecting: false,
}
}
/// Sets a custom outgoing message buffer capacity.
///
/// # Panics
///
/// If the provided capacity is `0`.
pub fn set_outgoing_message_buffer_capacity(&mut self, capacity: usize) {
self.outgoing_message_buffer_capacity = capacity;
}
/// Returns the shared capability for this stream.
pub fn shared_capability(&self) -> &SharedCapability {
&self.shared_capability
@ -243,6 +261,11 @@ impl<S> P2PStream<S> {
self.disconnecting
}
/// Returns `true` if the stream has outgoing capacity.
fn has_outgoing_capacity(&self) -> bool {
self.outgoing_messages.len() < self.outgoing_message_buffer_capacity
}
/// Queues in a _snappy_ encoded [`P2PMessage::Pong`] message.
fn send_pong(&mut self) {
let pong = P2PMessage::Pong;
@ -366,10 +389,6 @@ where
let id = *bytes.first().ok_or(P2PStreamError::EmptyProtocolMessage)?;
match id {
_ if id == P2PMessageID::Ping as u8 => {
if this.outgoing_messages.len() > MAX_P2P_CAPACITY {
return Poll::Ready(Some(Err(P2PStreamError::SendBufferFull)))
}
tracing::trace!("Received Ping, Sending Pong");
this.send_pong();
}
@ -467,7 +486,7 @@ where
}
}
if self.outgoing_messages.len() < MAX_P2P_CAPACITY {
if self.has_outgoing_capacity() {
// still has capacity
Poll::Ready(Ok(()))
} else {
@ -476,13 +495,13 @@ where
}
fn start_send(self: Pin<&mut Self>, item: Bytes) -> Result<(), Self::Error> {
let this = self.project();
// ensure we have free capacity
if this.outgoing_messages.len() >= MAX_P2P_CAPACITY {
if !self.has_outgoing_capacity() {
return Err(P2PStreamError::SendBufferFull)
}
let this = self.project();
let mut compressed = BytesMut::zeroed(1 + snap::raw::max_compress_len(item.len() - 1));
let compressed_size =
this.encoder.compress(&item[1..], &mut compressed[1..]).map_err(|err| {