fix(download): header download stream (#438)

* fix(downloader): rewrite header stream

* rm unused import

* add comment

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Roman Krasiuk
2022-12-14 16:38:10 +02:00
committed by GitHub
parent 35a41a29f9
commit 4430089b71
3 changed files with 94 additions and 93 deletions

View File

@ -3,7 +3,7 @@ use reth_interfaces::{
consensus::Consensus,
p2p::{
downloader::{DownloadStream, Downloader},
error::{PeerRequestResult, RequestError},
error::PeerRequestResult,
headers::{
client::{BlockHeaders, HeadersClient, HeadersRequest},
downloader::{validate_header_download, HeaderDownloader},
@ -19,7 +19,7 @@ use std::{
future::Future,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
task::{Context, Poll},
time::Duration,
};
@ -93,7 +93,7 @@ impl<C: Consensus, H: HeadersClient> LinearDownloader<C, H> {
request_retries: self.request_retries,
batch_size: self.batch_size,
client: Arc::clone(&self.client),
done: false,
encountered_error: false,
}
}
}
@ -149,9 +149,8 @@ pub struct HeadersDownload<C, H> {
batch_size: u64,
/// The number of retries for downloading
request_retries: usize,
/// The flag indicating whether the downloader has finished
/// or the retries have been exhausted
done: bool,
/// Flag whether the stream encountered an error
encountered_error: bool,
}
impl<C, H> HeadersDownload<C, H>
@ -164,6 +163,16 @@ where
self.buffered.back()
}
/// Check if any more requests should be dispatched
fn has_reached_head(&self) -> bool {
self.earliest_header().map(|h| h.hash() == self.head.hash()).unwrap_or_default()
}
/// Check if the stream has terminated
fn has_terminated(&self) -> bool {
self.has_reached_head() || self.encountered_error
}
/// Returns the start hash for a new request.
fn request_start(&self) -> H256 {
self.earliest_header().map_or(self.forkchoice.head_block_hash, |h| h.parent_hash)
@ -178,26 +187,35 @@ where
}
}
/// Pop header from the buffer
fn pop_header_from_buffer(&mut self) -> Option<SealedHeader> {
if self.buffered.len() > 1 {
self.buffered.pop_front()
} else {
None
}
}
/// Insert the header into buffer
fn push_header_into_buffer(&mut self, header: SealedHeader) {
self.buffered.push_back(header);
}
/// Get a current future or instantiate a new one
fn get_or_init_fut(&mut self) -> Option<HeadersRequestFuture> {
fn get_or_init_fut(&mut self) -> HeadersRequestFuture {
match self.request.take() {
None if !self.done => {
None => {
// queue in the first request
let client = Arc::clone(&self.client);
let req = self.headers_request();
Some(HeadersRequestFuture {
HeadersRequestFuture {
request: req.clone(),
fut: Box::pin(async move { client.get_headers(req).await }),
retries: 0,
max_retries: self.request_retries,
})
}
}
fut => fut,
Some(fut) => fut,
}
}
@ -241,7 +259,7 @@ where
headers.sort_unstable_by_key(|h| h.number);
if headers.is_empty() {
return Err(RequestError::EmptyHeaders.into())
return Err(DownloadError::EmptyResponse)
}
// Iterate headers in reverse
@ -250,7 +268,7 @@ where
if self.head.hash() == parent.hash() {
// We've reached the target, stop buffering headers
self.done = true;
self.push_header_into_buffer(parent);
break
}
@ -261,11 +279,10 @@ where
} else if parent.hash() != self.forkchoice.head_block_hash {
// The buffer is empty and the first header does not match the
// tip, requeue the future
return Err(RequestError::MismatchedParent(
parent.hash(),
self.forkchoice.head_block_hash,
)
.into())
return Err(DownloadError::InvalidTip {
received: parent.hash(),
expected: self.forkchoice.head_block_hash,
})
}
// Record new parent
@ -278,42 +295,6 @@ where
}
}
impl<C, H> Future for HeadersDownload<C, H>
where
C: Consensus + 'static,
H: HeadersClient + 'static,
{
type Output = Result<Vec<SealedHeader>, DownloadError>;
/// Linear header download implemented as a [Future]. The downloader
/// aggregates all of the header responses in a local buffer until the
/// previous head is reached.
///
/// Upon encountering an error, the downloader will try to resend the request.
/// Returns the error if all of the request retries have been exhausted.
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
'outer: loop {
// Safe to unwrap, because the future is `done`
// only upon returning the result
let mut fut = this.get_or_init_fut().expect("fut exists; qed");
let response = ready!(fut.poll_unpin(cx));
if let Err(err) = this.process_header_response(response) {
if this.try_fuse_request_fut(&mut fut).is_err() {
this.done = true;
return Poll::Ready(Err(err))
}
this.request = Some(fut);
continue 'outer
}
if this.done {
return Poll::Ready(Ok(std::mem::take(&mut this.buffered).into()))
}
}
}
}
impl<C, H> Stream for HeadersDownload<C, H>
where
C: Consensus + 'static,
@ -329,48 +310,61 @@ where
/// more headers available in the buffer.
///
/// Upon encountering an error, the downloader will attempt to retry the failed request.
/// If the number of retries is exhausted, the downloader will stream an error, set the `done`
/// flag to true and clear the buffered headers, thus resulting in stream termination.
/// If the number of retries is exhausted, the downloader will stream an error,
/// clear the buffered headers, thus resulting in stream termination.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if let Some(mut fut) = this.get_or_init_fut() {
if let Poll::Ready(result) = fut.poll_unpin(cx) {
if let Err(err) = this.process_header_response(result) {
if this.try_fuse_request_fut(&mut fut).is_err() {
tracing::trace!(
target: "downloaders::headers",
"ran out of retries terminating stream"
);
// We exhausted all of the retries. Stream must terminate
this.done = true;
this.buffered.clear();
return Poll::Ready(Some(Err(err)))
}
this.request = Some(fut);
}
} else {
this.request = Some(fut);
loop {
// Drain any buffered element except the head
if let Some(header) = this.pop_header_from_buffer() {
return Poll::Ready(Some(Ok(header)))
}
}
if !this.done && this.buffered.len() > 1 {
if let Some(header) = this.buffered.pop_front() {
// Stream buffered header
return Poll::Ready(Some(Ok(header)))
}
} else if this.done {
if let Some(header) = this.buffered.pop_front() {
// Stream buffered header
return Poll::Ready(Some(Ok(header)))
} else {
// Polling finished, we've reached the target
// We've reached the head or encountered an error, terminate the stream
if this.has_terminated() {
return Poll::Ready(None)
}
}
cx.waker().wake_by_ref();
Poll::Pending
// Initialize the future
let mut fut = this.get_or_init_fut();
match fut.poll_unpin(cx) {
Poll::Ready(result) => {
// Process the response, buffering the headers
// in case of successful validation
match this.process_header_response(result) {
Ok(_) => {
// The buffer has been updated, check if we need
// to dispatch another request
if !this.has_reached_head() {
// Create a new request
this.request = Some(this.get_or_init_fut());
}
}
Err(err) => {
// Response is invalid, attempt to retry
if this.try_fuse_request_fut(&mut fut).is_err() {
tracing::trace!(
target: "downloaders::headers",
"ran out of retries terminating stream"
);
// We exhausted all of the retries. Stream must terminate
this.buffered.clear();
this.encountered_error = true;
return Poll::Ready(Some(Err(err)))
}
// Reset the future
this.request = Some(fut);
}
};
}
Poll::Pending => {
// Set the future back to state
this.request = Some(fut);
return Poll::Pending
}
}
}
}
}