fix(download): stream loop (#433)

This commit is contained in:
Roman Krasiuk
2022-12-14 12:39:02 +02:00
committed by GitHub
parent ad191efe88
commit ac5efc0749

View File

@ -322,37 +322,39 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
'outer: loop {
if let Some(mut fut) = this.get_or_init_fut() {
if let Poll::Ready(result) = fut.poll_unpin(cx) {
if let Err(err) = this.process_header_response(result) {
if this.try_fuse_request_fut(&mut fut).is_err() {
// We exhausted all of the retries. Stream must terminate
this.done = true;
this.buffered.clear();
return Poll::Ready(Some(Err(err)))
}
this.request = Some(fut);
continue 'outer
if let Some(mut fut) = this.get_or_init_fut() {
if let Poll::Ready(result) = fut.poll_unpin(cx) {
if let Err(err) = this.process_header_response(result) {
if this.try_fuse_request_fut(&mut fut).is_err() {
// We exhausted all of the retries. Stream must terminate
this.done = true;
this.buffered.clear();
return Poll::Ready(Some(Err(err)))
}
this.request = Some(fut);
}
}
if !this.done && this.buffered.len() > 1 {
if let Some(header) = this.buffered.pop_front() {
// Stream buffered header
return Poll::Ready(Some(Ok(header)))
}
} else if this.done {
if let Some(header) = this.buffered.pop_front() {
// Stream buffered header
return Poll::Ready(Some(Ok(header)))
} else {
// Polling finished, we've reached the target
return Poll::Ready(None)
}
} else {
this.request = Some(fut);
}
}
if !this.done && this.buffered.len() > 1 {
if let Some(header) = this.buffered.pop_front() {
// Stream buffered header
return Poll::Ready(Some(Ok(header)))
}
} else if this.done {
if let Some(header) = this.buffered.pop_front() {
// Stream buffered header
return Poll::Ready(Some(Ok(header)))
} else {
// Polling finished, we've reached the target
return Poll::Ready(None)
}
}
cx.waker().wake_by_ref();
Poll::Pending
}
}