fix(eth-wire): fix usage of sink API in P2PStream (#184)

* fix(eth-wire): fix usage of sink API in P2PStream

* add error checks for sink buffer inserts
This commit is contained in:
Dan Cline
2022-11-09 19:06:27 -05:00
committed by GitHub
parent 9e35d58b05
commit d4aabe4751
2 changed files with 66 additions and 12 deletions

View File

@ -67,6 +67,8 @@ pub enum P2PStreamError {
MismatchedProtocolVersion { expected: u8, got: u8 }, MismatchedProtocolVersion { expected: u8, got: u8 },
#[error("started ping task before the handshake completed")] #[error("started ping task before the handshake completed")]
PingBeforeHandshake, PingBeforeHandshake,
#[error("too many messages buffered before sending")]
SendBufferFull,
// TODO: remove / reconsider // TODO: remove / reconsider
#[error("disconnected")] #[error("disconnected")]
Disconnected, Disconnected,

View File

@ -5,12 +5,12 @@ use crate::{
pinger::{Pinger, PingerEvent}, pinger::{Pinger, PingerEvent},
}; };
use bytes::{Buf, Bytes, BytesMut}; use bytes::{Buf, Bytes, BytesMut};
use futures::{ready, FutureExt, Sink, SinkExt, StreamExt}; use futures::{Sink, SinkExt, StreamExt};
use pin_project::pin_project; use pin_project::pin_project;
use reth_primitives::H512 as PeerId; use reth_primitives::H512 as PeerId;
use reth_rlp::{Decodable, DecodeError, Encodable, RlpDecodable, RlpEncodable}; use reth_rlp::{Decodable, DecodeError, Encodable, RlpDecodable, RlpEncodable};
use std::{ use std::{
collections::{BTreeSet, HashMap}, collections::{BTreeSet, HashMap, VecDeque},
fmt::Display, fmt::Display,
io, io,
pin::Pin, pin::Pin,
@ -46,6 +46,10 @@ const PING_INTERVAL: Duration = Duration::from_secs(60);
/// [`P2PMessage::Disconnect`] message. /// [`P2PMessage::Disconnect`] message.
const GRACE_PERIOD: Duration = Duration::from_secs(2); const GRACE_PERIOD: Duration = Duration::from_secs(2);
/// [`MAX_P2P_CAPACITY`] is the maximum number of messages that can be buffered to be sent in the
/// `p2p` stream.
const MAX_P2P_CAPACITY: usize = 64;
/// An un-authenticated [`P2PStream`]. This is consumed and returns a [`P2PStream`] after the /// An un-authenticated [`P2PStream`]. This is consumed and returns a [`P2PStream`] after the
/// `Hello` handshake is completed. /// `Hello` handshake is completed.
#[pin_project] #[pin_project]
@ -149,6 +153,9 @@ pub struct P2PStream<S> {
/// The supported capability for this stream. /// The supported capability for this stream.
shared_capability: SharedCapability, shared_capability: SharedCapability,
/// Outgoing messages buffered for sending to the underlying stream.
outgoing_messages: VecDeque<Bytes>,
} }
impl<S> P2PStream<S> { impl<S> P2PStream<S> {
@ -162,6 +169,7 @@ impl<S> P2PStream<S> {
decoder: snap::raw::Decoder::new(), decoder: snap::raw::Decoder::new(),
pinger: Pinger::new(PING_INTERVAL, PING_TIMEOUT), pinger: Pinger::new(PING_INTERVAL, PING_TIMEOUT),
shared_capability: capability, shared_capability: capability,
outgoing_messages: VecDeque::new(),
} }
} }
} }
@ -177,6 +185,24 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project(); let mut this = self.project();
// try to send any buffered outgoing messages
while let Some(message) = this.outgoing_messages.pop_front() {
// let pinned_inner = Pin::new(&mut this.inner);
match Pin::new(&mut this.inner).poll_ready(cx) {
Poll::Ready(Ok(())) => {
if let Err(e) = Pin::new(&mut this.inner).start_send(message) {
return Poll::Ready(Some(Err(P2PStreamError::Io(e))))
}
}
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(P2PStreamError::Io(e)))),
Poll::Pending => {
// we need to buffer the message and try again later
this.outgoing_messages.push_front(message);
break
}
}
}
// poll the pinger to determine if we should send a ping // poll the pinger to determine if we should send a ping
match this.pinger.poll_ping(cx) { match this.pinger.poll_ping(cx) {
Poll::Pending => {} Poll::Pending => {}
@ -185,19 +211,36 @@ where
let mut ping_bytes = BytesMut::new(); let mut ping_bytes = BytesMut::new();
P2PMessage::Ping.encode(&mut ping_bytes); P2PMessage::Ping.encode(&mut ping_bytes);
// TODO: fix use of Sink API if Pin::new(&mut this.inner).poll_ready(cx).is_ready() {
let send_res = Pin::new(&mut this.inner).send(ping_bytes.into()).poll_unpin(cx)?; // send the ping message
ready!(send_res) Pin::new(&mut this.inner).start_send(ping_bytes.into())?
} else {
// check if the buffer is full
if this.outgoing_messages.len() >= MAX_P2P_CAPACITY {
return Poll::Ready(Some(Err(P2PStreamError::SendBufferFull)))
}
// if the sink is not ready, buffer the message
this.outgoing_messages.push_back(ping_bytes.into());
}
} }
_ => { _ => {
// encode the disconnect message // encode the disconnect message
let mut disconnect_bytes = BytesMut::new(); let mut disconnect_bytes = BytesMut::new();
P2PMessage::Disconnect(DisconnectReason::PingTimeout).encode(&mut disconnect_bytes); P2PMessage::Disconnect(DisconnectReason::PingTimeout).encode(&mut disconnect_bytes);
// TODO: fix use of Sink API if Pin::new(&mut this.inner).poll_ready(cx).is_ready() {
let send_res = // send the disconnect message
Pin::new(&mut this.inner).send(disconnect_bytes.into()).poll_unpin(cx)?; Pin::new(&mut this.inner).start_send(disconnect_bytes.into())?
ready!(send_res); } else {
// check if the buffer is full
if this.outgoing_messages.len() >= MAX_P2P_CAPACITY {
return Poll::Ready(Some(Err(P2PStreamError::SendBufferFull)))
}
// if the sink is not ready, buffer the message
this.outgoing_messages.push_back(disconnect_bytes.into());
}
// since the ping stream has timed out, let's send a None // since the ping stream has timed out, let's send a None
return Poll::Ready(None) return Poll::Ready(None)
@ -220,9 +263,18 @@ where
let mut pong_bytes = BytesMut::new(); let mut pong_bytes = BytesMut::new();
P2PMessage::Pong.encode(&mut pong_bytes); P2PMessage::Pong.encode(&mut pong_bytes);
// TODO: fix use of Sink API if Pin::new(&mut this.inner).poll_ready(cx).is_ready() {
let send_res = Pin::new(&mut this.inner).send(pong_bytes.into()).poll_unpin(cx)?; // send the pong message
ready!(send_res) Pin::new(&mut this.inner).start_send(pong_bytes.into())?
} else {
// check if the buffer is full
if this.outgoing_messages.len() >= MAX_P2P_CAPACITY {
return Poll::Ready(Some(Err(P2PStreamError::SendBufferFull)))
}
// if the sink is not ready, buffer the message
this.outgoing_messages.push_back(pong_bytes.into());
}
// continue to the next message if there is one // continue to the next message if there is one
} else if id == P2PMessageID::Disconnect as u8 { } else if id == P2PMessageID::Disconnect as u8 {