Send pings in P2PStream.poll_ready (#931)

This commit is contained in:
Ikechukwu Ahiara Marvellous
2023-01-19 12:47:23 +01:00
committed by GitHub
parent 78ffd0a7c0
commit 54e9b12e65

View File

@ -279,31 +279,6 @@ where
return Poll::Ready(None)
}
// poll the pinger to determine if we should send a ping
match this.pinger.poll_ping(cx) {
Poll::Pending => {}
Poll::Ready(Ok(PingerEvent::Ping)) => {
// encode the ping message
let mut ping_bytes = BytesMut::new();
P2PMessage::Ping.encode(&mut ping_bytes);
// check if the buffer is full
if this.outgoing_messages.len() >= MAX_P2P_CAPACITY {
return Poll::Ready(Some(Err(P2PStreamError::SendBufferFull)))
}
// if the sink is not ready, buffer the message
this.outgoing_messages.push_back(ping_bytes.into());
}
_ => {
// encode the disconnect message
this.start_disconnect(DisconnectReason::PingTimeout)?;
// End the stream after ping related error
return Poll::Ready(None)
}
}
// we should loop here to ensure we don't return Poll::Pending if we have a message to
// return behind any pings we need to respond to
while let Poll::Ready(res) = this.inner.poll_next_unpin(cx) {
@ -416,6 +391,31 @@ where
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let mut this = self.as_mut();
// poll the pinger to determine if we should send a ping
match this.pinger.poll_ping(cx) {
Poll::Pending => {}
Poll::Ready(Ok(PingerEvent::Ping)) => {
// encode the ping message
let mut ping_bytes = BytesMut::new();
P2PMessage::Ping.encode(&mut ping_bytes);
// check if the buffer is full
if this.outgoing_messages.len() >= MAX_P2P_CAPACITY {
return Poll::Ready(Err(P2PStreamError::SendBufferFull))
}
// if the sink is not ready, buffer the message
this.outgoing_messages.push_back(ping_bytes.into());
}
_ => {
// encode the disconnect message
this.start_disconnect(DisconnectReason::PingTimeout)?;
// End the stream after ping related error
return Poll::Ready(Ok(()))
}
}
match this.inner.poll_ready_unpin(cx) {
Poll::Pending => {}
Poll::Ready(Err(err)) => return Poll::Ready(Err(P2PStreamError::Io(err))),