From d4aabe47517d4264cd36987a988467a2bfe83969 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Wed, 9 Nov 2022 19:06:27 -0500 Subject: [PATCH] fix(eth-wire): fix usage of sink API in P2PStream (#184) * fix(eth-wire): fix usage of sink API in P2PStream * add error checks for sink buffer inserts --- crates/net/eth-wire/src/error.rs | 2 + crates/net/eth-wire/src/p2pstream.rs | 76 +++++++++++++++++++++++----- 2 files changed, 66 insertions(+), 12 deletions(-) diff --git a/crates/net/eth-wire/src/error.rs b/crates/net/eth-wire/src/error.rs index ae26b0ecf..dd7df28a3 100644 --- a/crates/net/eth-wire/src/error.rs +++ b/crates/net/eth-wire/src/error.rs @@ -67,6 +67,8 @@ pub enum P2PStreamError { MismatchedProtocolVersion { expected: u8, got: u8 }, #[error("started ping task before the handshake completed")] PingBeforeHandshake, + #[error("too many messages buffered before sending")] + SendBufferFull, // TODO: remove / reconsider #[error("disconnected")] Disconnected, diff --git a/crates/net/eth-wire/src/p2pstream.rs b/crates/net/eth-wire/src/p2pstream.rs index 27b148722..dc71dd699 100644 --- a/crates/net/eth-wire/src/p2pstream.rs +++ b/crates/net/eth-wire/src/p2pstream.rs @@ -5,12 +5,12 @@ use crate::{ pinger::{Pinger, PingerEvent}, }; use bytes::{Buf, Bytes, BytesMut}; -use futures::{ready, FutureExt, Sink, SinkExt, StreamExt}; +use futures::{Sink, SinkExt, StreamExt}; use pin_project::pin_project; use reth_primitives::H512 as PeerId; use reth_rlp::{Decodable, DecodeError, Encodable, RlpDecodable, RlpEncodable}; use std::{ - collections::{BTreeSet, HashMap}, + collections::{BTreeSet, HashMap, VecDeque}, fmt::Display, io, pin::Pin, @@ -46,6 +46,10 @@ const PING_INTERVAL: Duration = Duration::from_secs(60); /// [`P2PMessage::Disconnect`] message. const GRACE_PERIOD: Duration = Duration::from_secs(2); +/// [`MAX_P2P_CAPACITY`] is the maximum number of messages that can be buffered to be sent in the +/// `p2p` stream. +const MAX_P2P_CAPACITY: usize = 64; + /// An un-authenticated [`P2PStream`]. This is consumed and returns a [`P2PStream`] after the /// `Hello` handshake is completed. #[pin_project] @@ -149,6 +153,9 @@ pub struct P2PStream { /// The supported capability for this stream. shared_capability: SharedCapability, + + /// Outgoing messages buffered for sending to the underlying stream. + outgoing_messages: VecDeque, } impl P2PStream { @@ -162,6 +169,7 @@ impl P2PStream { decoder: snap::raw::Decoder::new(), pinger: Pinger::new(PING_INTERVAL, PING_TIMEOUT), shared_capability: capability, + outgoing_messages: VecDeque::new(), } } } @@ -177,6 +185,24 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); + // try to send any buffered outgoing messages + while let Some(message) = this.outgoing_messages.pop_front() { + // let pinned_inner = Pin::new(&mut this.inner); + match Pin::new(&mut this.inner).poll_ready(cx) { + Poll::Ready(Ok(())) => { + if let Err(e) = Pin::new(&mut this.inner).start_send(message) { + return Poll::Ready(Some(Err(P2PStreamError::Io(e)))) + } + } + Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(P2PStreamError::Io(e)))), + Poll::Pending => { + // we need to buffer the message and try again later + this.outgoing_messages.push_front(message); + break + } + } + } + // poll the pinger to determine if we should send a ping match this.pinger.poll_ping(cx) { Poll::Pending => {} @@ -185,19 +211,36 @@ where let mut ping_bytes = BytesMut::new(); P2PMessage::Ping.encode(&mut ping_bytes); - // TODO: fix use of Sink API - let send_res = Pin::new(&mut this.inner).send(ping_bytes.into()).poll_unpin(cx)?; - ready!(send_res) + if Pin::new(&mut this.inner).poll_ready(cx).is_ready() { + // send the ping message + Pin::new(&mut this.inner).start_send(ping_bytes.into())? + } else { + // check if the buffer is full + if this.outgoing_messages.len() >= MAX_P2P_CAPACITY { + return Poll::Ready(Some(Err(P2PStreamError::SendBufferFull))) + } + + // if the sink is not ready, buffer the message + this.outgoing_messages.push_back(ping_bytes.into()); + } } _ => { // encode the disconnect message let mut disconnect_bytes = BytesMut::new(); P2PMessage::Disconnect(DisconnectReason::PingTimeout).encode(&mut disconnect_bytes); - // TODO: fix use of Sink API - let send_res = - Pin::new(&mut this.inner).send(disconnect_bytes.into()).poll_unpin(cx)?; - ready!(send_res); + if Pin::new(&mut this.inner).poll_ready(cx).is_ready() { + // send the disconnect message + Pin::new(&mut this.inner).start_send(disconnect_bytes.into())? + } else { + // check if the buffer is full + if this.outgoing_messages.len() >= MAX_P2P_CAPACITY { + return Poll::Ready(Some(Err(P2PStreamError::SendBufferFull))) + } + + // if the sink is not ready, buffer the message + this.outgoing_messages.push_back(disconnect_bytes.into()); + } // since the ping stream has timed out, let's send a None return Poll::Ready(None) @@ -220,9 +263,18 @@ where let mut pong_bytes = BytesMut::new(); P2PMessage::Pong.encode(&mut pong_bytes); - // TODO: fix use of Sink API - let send_res = Pin::new(&mut this.inner).send(pong_bytes.into()).poll_unpin(cx)?; - ready!(send_res) + if Pin::new(&mut this.inner).poll_ready(cx).is_ready() { + // send the pong message + Pin::new(&mut this.inner).start_send(pong_bytes.into())? + } else { + // check if the buffer is full + if this.outgoing_messages.len() >= MAX_P2P_CAPACITY { + return Poll::Ready(Some(Err(P2PStreamError::SendBufferFull))) + } + + // if the sink is not ready, buffer the message + this.outgoing_messages.push_back(pong_bytes.into()); + } // continue to the next message if there is one } else if id == P2PMessageID::Disconnect as u8 {