fix(download): header downloader initial state (#1064)

This commit is contained in:
Roman Krasiuk
2023-01-27 15:02:44 +02:00
committed by GitHub
parent 71dc531e68
commit 4a5a1dbea8
4 changed files with 133 additions and 91 deletions

View File

@ -161,14 +161,7 @@ impl Command {
headers::linear::LinearDownloadBuilder::default() headers::linear::LinearDownloadBuilder::default()
.request_limit(config.stages.headers.downloader_batch_size) .request_limit(config.stages.headers.downloader_batch_size)
.stream_batch_size(config.stages.headers.commit_threshold as usize) .stream_batch_size(config.stages.headers.commit_threshold as usize)
// NOTE: the head and target will be set from inside the stage before the .build(consensus.clone(), fetch_client.clone()),
// downloader is called
.build(
consensus.clone(),
fetch_client.clone(),
Default::default(),
Default::default(),
),
); );
// Spawn bodies downloader // Spawn bodies downloader

View File

@ -14,7 +14,7 @@ use reth_interfaces::{
priority::Priority, priority::Priority,
}, },
}; };
use reth_primitives::{Header, HeadersDirection, PeerId, SealedHeader, H256}; use reth_primitives::{BlockNumber, Header, HeadersDirection, PeerId, SealedHeader, H256};
use std::{ use std::{
cmp::{Ordering, Reverse}, cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap}, collections::{binary_heap::PeekMut, BinaryHeap},
@ -50,9 +50,9 @@ pub struct LinearDownloader<H> {
/// Client used to download headers. /// Client used to download headers.
client: Arc<H>, client: Arc<H>,
/// The local head of the chain. /// The local head of the chain.
local_head: SealedHeader, local_head: Option<SealedHeader>,
/// Block we want to close the gap to. /// Block we want to close the gap to.
sync_target: SyncTargetBlock, sync_target: Option<SyncTargetBlock>,
/// The block number to use for requests. /// The block number to use for requests.
next_request_block_number: u64, next_request_block_number: u64,
/// Keeps track of the block we need to validate next. /// Keeps track of the block we need to validate next.
@ -99,8 +99,28 @@ where
/// Returns the block number the local node is at. /// Returns the block number the local node is at.
#[inline] #[inline]
fn local_block_number(&self) -> u64 { fn local_block_number(&self) -> Option<BlockNumber> {
self.local_head.number self.local_head.as_ref().map(|h| h.number)
}
/// Returns the existing local head block number
///
/// # Panics
///
/// If the local head has not been set.
#[inline]
fn existing_local_block_number(&self) -> BlockNumber {
self.local_head.as_ref().expect("is initialized").number
}
/// Returns the existing sync target hash.
///
/// # Panics
///
/// If the sync target has never been set.
#[inline]
fn existing_sync_target_hash(&self) -> H256 {
self.sync_target.as_ref().expect("is initialized").hash
} }
/// Max requests to handle at the same time /// Max requests to handle at the same time
@ -130,15 +150,19 @@ where
/// ///
/// Returns `None` if no more requests are required. /// Returns `None` if no more requests are required.
fn next_request(&mut self) -> Option<HeadersRequest> { fn next_request(&mut self) -> Option<HeadersRequest> {
let local_head = self.local_block_number(); if let Some(local_head) = self.local_block_number() {
if self.next_request_block_number > local_head { if self.next_request_block_number > local_head {
let request = let request = calc_next_request(
calc_next_request(local_head, self.next_request_block_number, self.request_limit); local_head,
// need to shift the tracked request block number based on the number of requested self.next_request_block_number,
// headers so follow-up requests will use that as start. self.request_limit,
self.next_request_block_number -= request.limit; );
// need to shift the tracked request block number based on the number of requested
// headers so follow-up requests will use that as start.
self.next_request_block_number -= request.limit;
return Some(request) return Some(request)
}
} }
None None
@ -171,6 +195,7 @@ where
headers: Vec<Header>, headers: Vec<Header>,
peer_id: PeerId, peer_id: PeerId,
) -> Result<(), HeadersResponseError> { ) -> Result<(), HeadersResponseError> {
let sync_target_hash = self.existing_sync_target_hash();
let mut validated = Vec::with_capacity(headers.len()); let mut validated = Vec::with_capacity(headers.len());
for parent in headers { for parent in headers {
@ -184,13 +209,13 @@ where
trace!(target: "downloaders::headers", ?error ,"Failed to validate header"); trace!(target: "downloaders::headers", ?error ,"Failed to validate header");
return Err(HeadersResponseError { request, peer_id: Some(peer_id), error }) return Err(HeadersResponseError { request, peer_id: Some(peer_id), error })
} }
} else if parent.hash() != self.sync_target.hash { } else if parent.hash() != sync_target_hash {
return Err(HeadersResponseError { return Err(HeadersResponseError {
request, request,
peer_id: Some(peer_id), peer_id: Some(peer_id),
error: DownloadError::InvalidTip { error: DownloadError::InvalidTip {
received: parent.hash(), received: parent.hash(),
expected: self.sync_target.hash, expected: sync_target_hash,
}, },
}) })
} }
@ -218,7 +243,9 @@ where
/// are _not_ higher than the new `target_block_number`. /// are _not_ higher than the new `target_block_number`.
fn on_block_number_update(&mut self, target_block_number: u64, next_block: u64) { fn on_block_number_update(&mut self, target_block_number: u64, next_block: u64) {
// Update the trackers // Update the trackers
if let Some(old_target) = self.sync_target.number.replace(target_block_number) { if let Some(old_target) =
self.sync_target.as_mut().and_then(|t| t.number.replace(target_block_number))
{
if target_block_number > old_target { if target_block_number > old_target {
// the new target is higher than the old target we need to update the // the new target is higher than the old target we need to update the
// request tracker and reset everything // request tracker and reset everything
@ -248,6 +275,7 @@ where
&mut self, &mut self,
response: HeadersRequestOutcome, response: HeadersRequestOutcome,
) -> Result<(), HeadersResponseError> { ) -> Result<(), HeadersResponseError> {
let sync_target_hash = self.existing_sync_target_hash();
let HeadersRequestOutcome { request, outcome } = response; let HeadersRequestOutcome { request, outcome } = response;
match outcome { match outcome {
Ok(res) => { Ok(res) => {
@ -270,18 +298,18 @@ where
let target = headers.remove(0).seal(); let target = headers.remove(0).seal();
if target.hash() != self.sync_target.hash { if target.hash() != sync_target_hash {
return Err(HeadersResponseError { return Err(HeadersResponseError {
request, request,
peer_id: Some(peer_id), peer_id: Some(peer_id),
error: DownloadError::InvalidTip { error: DownloadError::InvalidTip {
received: target.hash(), received: target.hash(),
expected: self.sync_target.hash, expected: sync_target_hash,
}, },
}) })
} }
trace!(target: "downloaders::headers", head=%self.local_head.number, hash=?target.hash(), number=%target.number, "Received sync target"); trace!(target: "downloaders::headers", head=?self.local_block_number(), hash=?target.hash(), number=%target.number, "Received sync target");
// This is the next block we need to start issuing requests from // This is the next block we need to start issuing requests from
let parent_block_number = target.number.saturating_sub(1); let parent_block_number = target.number.saturating_sub(1);
@ -361,7 +389,7 @@ where
self.try_validate_buffered() self.try_validate_buffered()
.map(Err::<(), HeadersResponseError>) .map(Err::<(), HeadersResponseError>)
.transpose()?; .transpose()?;
} else if highest.number > self.local_head.number { } else if highest.number > self.existing_local_block_number() {
// can't validate yet // can't validate yet
self.buffered_responses.push(OrderedHeadersResponse { self.buffered_responses.push(OrderedHeadersResponse {
headers, headers,
@ -430,12 +458,8 @@ where
} }
/// Returns the request for the `sync_target` header. /// Returns the request for the `sync_target` header.
fn get_sync_target_request(&self) -> HeadersRequest { fn get_sync_target_request(&self, start: H256) -> HeadersRequest {
HeadersRequest { HeadersRequest { start: start.into(), limit: 1, direction: HeadersDirection::Falling }
start: self.sync_target.hash.into(),
limit: 1,
direction: HeadersDirection::Falling,
}
} }
/// Starts a request future /// Starts a request future
@ -478,26 +502,28 @@ where
H: HeadersClient + 'static, H: HeadersClient + 'static,
{ {
fn update_local_head(&mut self, head: SealedHeader) { fn update_local_head(&mut self, head: SealedHeader) {
self.local_head = head;
// ensure we're only yielding headers that are in range and follow the current local head. // ensure we're only yielding headers that are in range and follow the current local head.
while self while self
.queued_validated_headers .queued_validated_headers
.last() .last()
.map(|last| last.number <= self.local_head.number) .map(|last| last.number <= head.number)
.unwrap_or_default() .unwrap_or_default()
{ {
// headers are sorted high to low // headers are sorted high to low
self.queued_validated_headers.pop(); self.queued_validated_headers.pop();
} }
// update the local head
self.local_head = Some(head);
} }
/// If the given target is different from the current target, we need to update the sync target /// If the given target is different from the current target, we need to update the sync target
fn update_sync_target(&mut self, target: SyncTarget) { fn update_sync_target(&mut self, target: SyncTarget) {
let current_tip = self.sync_target.as_ref().map(|t| t.hash);
match target { match target {
SyncTarget::Tip(tip) => { SyncTarget::Tip(tip) => {
if tip != self.sync_target.hash { if Some(tip) != current_tip {
trace!(target: "downloaders::headers", current=?self.sync_target.hash, new=?tip, "Update sync target"); trace!(target: "downloaders::headers", current=?current_tip, new=?tip, "Update sync target");
self.sync_target.hash = tip; let new_sync_target = SyncTargetBlock::from_hash(tip);
// if the new sync target is the next queued request we don't need to re-start // if the new sync target is the next queued request we don't need to re-start
// the target update // the target update
@ -507,27 +533,35 @@ where
.filter(|h| h.hash() == tip) .filter(|h| h.hash() == tip)
.map(|h| h.number) .map(|h| h.number)
{ {
self.sync_target.number = Some(target_number); self.sync_target = Some(new_sync_target.with_number(target_number));
return return
} }
trace!(target: "downloaders::headers", new=?target, "Request new sync target"); trace!(target: "downloaders::headers", new=?target, "Request new sync target");
self.sync_target = Some(new_sync_target);
self.sync_target_request = self.sync_target_request =
Some(self.request_fut(self.get_sync_target_request(), Priority::High)); Some(self.request_fut(self.get_sync_target_request(tip), Priority::High));
} }
} }
SyncTarget::Gap(existing) => { SyncTarget::Gap(existing) => {
let target = existing.parent_hash; let target = existing.parent_hash;
if target != self.sync_target.hash { if Some(target) != current_tip {
// there could be a sync target request in progress // there could be a sync target request in progress
self.sync_target_request.take(); self.sync_target_request.take();
// If the target has changed, update the request pointers based on the new // If the target has changed, update the request pointers based on the new
// targeted block number // targeted block number
let parent_block_number = existing.number.saturating_sub(1); let parent_block_number = existing.number.saturating_sub(1);
trace!(target: "downloaders::headers", current=?self.sync_target.hash, new=?target, %parent_block_number, "Updated sync target"); trace!(target: "downloaders::headers", current=?current_tip, new=?target, %parent_block_number, "Updated sync target");
self.sync_target.hash = target; // Update the sync target hash
self.sync_target = match self.sync_target.take() {
Some(mut sync_target) => {
sync_target.hash = target;
Some(sync_target)
}
None => Some(SyncTargetBlock::from_hash(target)),
};
self.on_block_number_update(parent_block_number, parent_block_number); self.on_block_number_update(parent_block_number, parent_block_number);
} }
} }
@ -548,6 +582,18 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut(); let this = self.get_mut();
// The downloader boundaries (local head and sync target) have to be set in order
// to start downloading data.
if this.local_head.is_none() || this.sync_target.is_none() {
tracing::trace!(
target: "downloaders::headers",
head=?this.local_block_number(),
sync_target=?this.sync_target,
"The downloader sync boundaries have not been set"
);
return Poll::Pending
}
// If we have a new tip request we need to complete that first before we send batched // If we have a new tip request we need to complete that first before we send batched
// requests // requests
while let Some(mut req) = this.sync_target_request.take() { while let Some(mut req) = this.sync_target_request.take() {
@ -735,6 +781,19 @@ struct SyncTargetBlock {
number: Option<u64>, number: Option<u64>,
} }
impl SyncTargetBlock {
/// Create new instance from hash.
fn from_hash(hash: H256) -> Self {
Self { hash, number: None }
}
/// Set a number on the instance.
fn with_number(mut self, number: u64) -> Self {
self.number = Some(number);
self
}
}
/// The builder for [LinearDownloader] with /// The builder for [LinearDownloader] with
/// some default settings /// some default settings
#[derive(Debug)] #[derive(Debug)]
@ -812,13 +871,7 @@ impl LinearDownloadBuilder {
/// Build [LinearDownloader] with provided consensus /// Build [LinearDownloader] with provided consensus
/// and header client implementations /// and header client implementations
pub fn build<H>( pub fn build<H>(self, consensus: Arc<dyn Consensus>, client: Arc<H>) -> LinearDownloader<H>
self,
consensus: Arc<dyn Consensus>,
client: Arc<H>,
local_head: SealedHeader,
sync_target_block_hash: H256,
) -> LinearDownloader<H>
where where
H: HeadersClient + 'static, H: HeadersClient + 'static,
{ {
@ -829,11 +882,11 @@ impl LinearDownloadBuilder {
max_concurrent_requests, max_concurrent_requests,
max_buffered_responses, max_buffered_responses,
} = self; } = self;
let mut downloader = LinearDownloader { LinearDownloader {
consensus, consensus,
client, client,
local_head, local_head: None,
sync_target: SyncTargetBlock { hash: sync_target_block_hash, number: None }, sync_target: None,
// Note: we set these to `0` first, they'll be updated once the sync target response is // Note: we set these to `0` first, they'll be updated once the sync target response is
// handled and only used afterwards // handled and only used afterwards
next_request_block_number: 0, next_request_block_number: 0,
@ -849,12 +902,7 @@ impl LinearDownloadBuilder {
buffered_responses: Default::default(), buffered_responses: Default::default(),
queued_validated_headers: Default::default(), queued_validated_headers: Default::default(),
metrics: DownloaderMetrics::new(HEADERS_DOWNLOADER_SCOPE), metrics: DownloaderMetrics::new(HEADERS_DOWNLOADER_SCOPE),
}; }
downloader.sync_target_request =
Some(downloader.request_fut(downloader.get_sync_target_request(), Priority::High));
downloader
} }
} }
@ -882,6 +930,7 @@ mod tests {
use super::*; use super::*;
use crate::headers::test_utils::child_header; use crate::headers::test_utils::child_header;
use assert_matches::assert_matches;
use reth_interfaces::test_utils::{TestConsensus, TestHeadersClient}; use reth_interfaces::test_utils::{TestConsensus, TestHeadersClient};
use reth_primitives::SealedHeader; use reth_primitives::SealedHeader;
@ -892,12 +941,11 @@ mod tests {
let genesis = SealedHeader::default(); let genesis = SealedHeader::default();
let mut downloader = LinearDownloadBuilder::default().build( let mut downloader = LinearDownloadBuilder::default()
Arc::new(TestConsensus::default()), .build(Arc::new(TestConsensus::default()), Arc::clone(&client));
Arc::clone(&client),
genesis, downloader.update_local_head(genesis);
H256::random(), downloader.update_sync_target(SyncTarget::Tip(H256::random()));
);
downloader.sync_target_request.take(); downloader.sync_target_request.take();
@ -909,7 +957,10 @@ mod tests {
let target = SyncTarget::Gap(SealedHeader::new(Default::default(), H256::random())); let target = SyncTarget::Gap(SealedHeader::new(Default::default(), H256::random()));
downloader.update_sync_target(target); downloader.update_sync_target(target);
assert!(downloader.sync_target_request.is_none()); assert!(downloader.sync_target_request.is_none());
assert!(downloader.sync_target.number.is_some()); assert_matches!(
downloader.sync_target,
Some(target) => target.number.is_some()
);
} }
/// Tests that request calc works /// Tests that request calc works
@ -919,12 +970,10 @@ mod tests {
let header = SealedHeader::default(); let header = SealedHeader::default();
let mut downloader = LinearDownloadBuilder::default().build( let mut downloader = LinearDownloadBuilder::default()
Arc::new(TestConsensus::default()), .build(Arc::new(TestConsensus::default()), Arc::clone(&client));
Arc::clone(&client), downloader.update_local_head(header.clone());
header.clone(), downloader.update_sync_target(SyncTarget::Tip(H256::random()));
H256::random(),
);
downloader.queued_validated_headers.push(header.clone()); downloader.queued_validated_headers.push(header.clone());
let mut next = header.as_ref().clone(); let mut next = header.as_ref().clone();
@ -961,12 +1010,11 @@ mod tests {
let batch_size = 99; let batch_size = 99;
let start = 1000; let start = 1000;
let mut downloader = LinearDownloadBuilder::default().request_limit(batch_size).build( let mut downloader = LinearDownloadBuilder::default()
Arc::new(TestConsensus::default()), .request_limit(batch_size)
Arc::clone(&client), .build(Arc::new(TestConsensus::default()), Arc::clone(&client));
genesis, downloader.update_local_head(genesis);
H256::random(), downloader.update_sync_target(SyncTarget::Tip(H256::random()));
);
downloader.next_request_block_number = start; downloader.next_request_block_number = start;
let mut total = 0; let mut total = 0;
@ -975,7 +1023,7 @@ mod tests {
total += req.limit; total += req.limit;
} }
assert_eq!(total, start); assert_eq!(total, start);
assert_eq!(downloader.next_request_block_number, downloader.local_block_number()); assert_eq!(Some(downloader.next_request_block_number), downloader.local_block_number());
} }
#[test] #[test]
@ -1013,7 +1061,9 @@ mod tests {
let mut downloader = LinearDownloadBuilder::default() let mut downloader = LinearDownloadBuilder::default()
.stream_batch_size(3) .stream_batch_size(3)
.request_limit(3) .request_limit(3)
.build(Arc::new(TestConsensus::default()), Arc::clone(&client), p3.clone(), p0.hash()); .build(Arc::new(TestConsensus::default()), Arc::clone(&client));
downloader.update_local_head(p3.clone());
downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
client client
.extend(vec![ .extend(vec![
@ -1043,7 +1093,9 @@ mod tests {
let mut downloader = LinearDownloadBuilder::default() let mut downloader = LinearDownloadBuilder::default()
.stream_batch_size(1) .stream_batch_size(1)
.request_limit(1) .request_limit(1)
.build(Arc::new(TestConsensus::default()), Arc::clone(&client), p3.clone(), p0.hash()); .build(Arc::new(TestConsensus::default()), Arc::clone(&client));
downloader.update_local_head(p3.clone());
downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
client client
.extend(vec![ .extend(vec![

View File

@ -48,8 +48,6 @@ impl TaskDownloader {
/// let downloader = LinearDownloader::<H>::builder().build( /// let downloader = LinearDownloader::<H>::builder().build(
/// consensus, /// consensus,
/// client, /// client,
/// Default::default(),
/// Default::default(),
/// ); /// );
/// let downloader = TaskDownloader::spawn(downloader); /// let downloader = TaskDownloader::spawn(downloader);
/// # } /// # }
@ -172,9 +170,11 @@ mod tests {
let downloader = LinearDownloadBuilder::default() let downloader = LinearDownloadBuilder::default()
.stream_batch_size(1) .stream_batch_size(1)
.request_limit(1) .request_limit(1)
.build(Arc::new(TestConsensus::default()), Arc::clone(&client), p3.clone(), p0.hash()); .build(Arc::new(TestConsensus::default()), Arc::clone(&client));
let mut downloader = TaskDownloader::spawn(downloader); let mut downloader = TaskDownloader::spawn(downloader);
downloader.update_local_head(p3.clone());
downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
client client
.extend(vec![ .extend(vec![

View File

@ -443,12 +443,9 @@ mod tests {
client: client.clone(), client: client.clone(),
consensus: consensus.clone(), consensus: consensus.clone(),
downloader_factory: Box::new(move || { downloader_factory: Box::new(move || {
LinearDownloadBuilder::default().stream_batch_size(500).build( LinearDownloadBuilder::default()
consensus.clone(), .stream_batch_size(500)
client.clone(), .build(consensus.clone(), client.clone())
Default::default(),
Default::default(),
)
}), }),
network_handle: TestStatusUpdater::default(), network_handle: TestStatusUpdater::default(),
tx: TestTransaction::default(), tx: TestTransaction::default(),