fix(download): body download range (#1065)

This commit is contained in:
Roman Krasiuk
2023-01-27 16:55:25 +02:00
committed by GitHub
parent 4a5a1dbea8
commit 8cfe24081e
2 changed files with 31 additions and 31 deletions

View File

@ -85,10 +85,6 @@ where
};
let limit = self.download_range.end.saturating_sub(start_at).min(self.request_limit);
if limit == 0 {
return Ok(None)
}
self.query_headers(start_at..self.download_range.end, limit)
}
@ -107,7 +103,7 @@ where
range: Range<BlockNumber>,
max_non_empty: u64,
) -> DownloadResult<Option<Vec<SealedHeader>>> {
if range.start >= self.download_range.end {
if range.is_empty() || max_non_empty == 0 {
return Ok(None)
}
@ -128,15 +124,15 @@ where
// 1. Current block number is in range
// 2. The number of non empty headers is less than maximum
// 3. The total number of headers is less than the stream batch size
while current_block_num < range.end &&
while range.contains(&current_block_num) &&
non_empty_headers < max_non_empty &&
headers.len() < self.stream_batch_size
{
// Find the block hash
// Find the block hash.
let (number, hash) = canonical_cursor
.seek_exact(current_block_num)?
.ok_or(DownloadError::MissingHeader { block_number: current_block_num })?;
// Find the block number
// Find the block header.
let (_, header) = header_cursor
.seek_exact((number, hash).into())?
.ok_or(DownloadError::MissingHeader { block_number: number })?;
@ -281,7 +277,8 @@ where
(Some(first), Some(last)) => first.block_number()..last.block_number() + 1,
_ => Range::default(),
};
let mut request_range = Range::default();
let mut requests = Vec::<RangeInclusive<BlockNumber>>::default();
for num in range.start..=latest_queued {
// Check if block has been downloaded or is currently in progress
if queued_bodies_range.contains(&num) ||
@ -291,29 +288,31 @@ where
continue
}
if range.is_empty() {
request_range.start = num;
} else if request_range.end + 1 == num {
request_range.end = num;
} else {
let headers = self
.query_headers(
request_range.start..request_range.end + 1, // exclusive
request_range.clone().count() as u64,
)?
.ok_or(DownloadError::MissingHeader {
block_number: request_range.start,
})?;
match requests.last().map(|range| *range.end()) {
// Extend the last range if contiguous
Some(range_end) if range_end + 1 == num => {
let range = requests.pop().unwrap();
requests.push(*range.start()..=num);
}
// Push the new request range
Some(_) | None => requests.push(num..=num),
};
}
// Dispatch contiguous request.
self.in_progress_queue.push_new_request(
Arc::clone(&self.client),
Arc::clone(&self.consensus),
headers,
);
// Clear the current request range
request_range = Range::default();
}
for range in requests {
let headers = self
.query_headers(
*range.start()..*range.end() + 1, //
range.clone().count() as u64,
)?
.ok_or(DownloadError::MissingHeader { block_number: *range.start() })?;
// Dispatch contiguous request.
self.in_progress_queue.push_new_request(
Arc::clone(&self.client),
Arc::clone(&self.consensus),
headers,
);
}
}
}

View File

@ -57,6 +57,7 @@ impl StageError {
matches!(
self,
StageError::Database(_) |
StageError::Download(_) |
StageError::DatabaseIntegrity(_) |
StageError::StageProgress(_) |
StageError::Fatal(_)