fix(sync): headers stage progress (#467)

* fix(sync): headers stage progress

* add tests to head/tip and split suite

* fix(sync): headers stage progress

* add tests to head/tip and split suite

* fix local tip and address comments

* rename error
This commit is contained in:
Roman Krasiuk
2022-12-15 20:41:32 +02:00
committed by GitHub
parent 789dc3bc87
commit 2b0f5316f9
8 changed files with 187 additions and 98 deletions

View File

@ -6,8 +6,7 @@ use crate::{
},
};
use reth_primitives::SealedHeader;
use reth_rpc_types::engine::ForkchoiceState;
use reth_primitives::{SealedHeader, H256};
/// A downloader capable of fetching block headers.
///
@ -17,11 +16,7 @@ use reth_rpc_types::engine::ForkchoiceState;
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeaderDownloader: Downloader {
/// Stream the headers
fn stream(
&self,
head: SealedHeader,
forkchoice: ForkchoiceState,
) -> DownloadStream<'_, SealedHeader>;
fn stream(&self, head: SealedHeader, tip: H256) -> DownloadStream<'_, SealedHeader>;
/// Validate whether the header is valid in relation to it's parent
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {

View File

@ -67,11 +67,7 @@ impl Downloader for TestHeaderDownloader {
#[async_trait::async_trait]
impl HeaderDownloader for TestHeaderDownloader {
fn stream(
&self,
_head: SealedHeader,
_forkchoice: ForkchoiceState,
) -> DownloadStream<'_, SealedHeader> {
fn stream(&self, _head: SealedHeader, _tip: H256) -> DownloadStream<'_, SealedHeader> {
Box::pin(self.create_download())
}
}

View File

@ -11,7 +11,6 @@ use reth_interfaces::{
},
};
use reth_primitives::{HeadersDirection, SealedHeader, H256};
use reth_rpc_types::engine::ForkchoiceState;
use std::{
borrow::Borrow,
collections::VecDeque,
@ -57,12 +56,8 @@ where
C: Consensus + 'static,
H: HeadersClient + 'static,
{
fn stream(
&self,
head: SealedHeader,
forkchoice: ForkchoiceState,
) -> DownloadStream<'_, SealedHeader> {
Box::pin(self.new_download(head, forkchoice))
fn stream(&self, head: SealedHeader, tip: H256) -> DownloadStream<'_, SealedHeader> {
Box::pin(self.new_download(head, tip))
}
}
@ -78,14 +73,10 @@ impl<C: Consensus, H: HeadersClient> Clone for LinearDownloader<C, H> {
}
impl<C: Consensus, H: HeadersClient> LinearDownloader<C, H> {
fn new_download(
&self,
head: SealedHeader,
forkchoice: ForkchoiceState,
) -> HeadersDownload<C, H> {
fn new_download(&self, head: SealedHeader, tip: H256) -> HeadersDownload<C, H> {
HeadersDownload {
head,
forkchoice,
tip,
buffered: VecDeque::default(),
request: Default::default(),
consensus: Arc::clone(&self.consensus),
@ -132,7 +123,8 @@ impl Future for HeadersRequestFuture {
pub struct HeadersDownload<C, H> {
/// The local head of the chain.
head: SealedHeader,
forkchoice: ForkchoiceState,
/// Tip to start syncing from
tip: H256,
/// Buffered results
buffered: VecDeque<SealedHeader>,
/// Contains the request that's currently in progress.
@ -174,7 +166,7 @@ where
/// Returns the start hash for a new request.
fn request_start(&self) -> H256 {
self.earliest_header().map_or(self.forkchoice.head_block_hash, |h| h.parent_hash)
self.earliest_header().map_or(self.tip, |h| h.parent_hash)
}
/// Get the headers request to dispatch
@ -279,12 +271,12 @@ where
// Proceed to insert. If there is a validation error re-queue
// the future.
self.validate(header, &parent)?;
} else if parent.hash() != self.forkchoice.head_block_hash {
} else if parent.hash() != self.tip {
// The buffer is empty and the first header does not match the
// tip, requeue the future
return Err(DownloadError::InvalidTip {
received: parent.hash(),
expected: self.forkchoice.head_block_hash,
expected: self.tip,
})
}
@ -460,7 +452,7 @@ mod tests {
LinearDownloadBuilder::default().build(CONSENSUS.clone(), Arc::clone(&client));
let result = downloader
.stream(SealedHeader::default(), ForkchoiceState::default())
.stream(SealedHeader::default(), H256::default())
.try_collect::<Vec<_>>()
.await;
assert!(result.is_err());
@ -487,9 +479,7 @@ mod tests {
])
.await;
let fork = ForkchoiceState { head_block_hash: p0.hash_slow(), ..Default::default() };
let result = downloader.stream(p0, fork).try_collect::<Vec<_>>().await;
let result = downloader.stream(p0.clone(), p0.hash_slow()).try_collect::<Vec<_>>().await;
let headers = result.unwrap();
assert!(headers.is_empty());
}
@ -515,9 +505,7 @@ mod tests {
])
.await;
let fork = ForkchoiceState { head_block_hash: p0.hash_slow(), ..Default::default() };
let result = downloader.stream(p3, fork).try_collect::<Vec<_>>().await;
let result = downloader.stream(p3, p0.hash_slow()).try_collect::<Vec<_>>().await;
let headers = result.unwrap();
assert_eq!(headers.len(), 3);
assert_eq!(headers[0], p0);
@ -532,7 +520,7 @@ mod tests {
LinearDownloadBuilder::default().build(CONSENSUS.clone(), Arc::clone(&client));
let result = downloader
.stream(SealedHeader::default(), ForkchoiceState::default())
.stream(SealedHeader::default(), H256::default())
.try_collect::<Vec<_>>()
.await;
assert!(result.is_err());
@ -559,9 +547,7 @@ mod tests {
])
.await;
let fork = ForkchoiceState { head_block_hash: p0.hash_slow(), ..Default::default() };
let result = downloader.stream(p3, fork).try_collect::<Vec<_>>().await;
let result = downloader.stream(p3, p0.hash_slow()).try_collect::<Vec<_>>().await;
let headers = result.unwrap();
assert_eq!(headers.len(), 3);
assert_eq!(headers[0], p0);

View File

@ -36,6 +36,9 @@ pub enum StageError {
/// rely on external downloaders
#[error("Invalid download response: {0}")]
Download(String),
/// Invalid checkpoint passed to the stage
#[error("Invalid stage progress: {0}")]
StageProgress(u64),
/// The stage encountered a recoverable error.
///
/// These types of errors are caught by the [Pipeline] and trigger a restart of the stage.
@ -53,7 +56,10 @@ impl StageError {
pub fn is_fatal(&self) -> bool {
matches!(
self,
StageError::Database(_) | StageError::DatabaseIntegrity(_) | StageError::Fatal(_)
StageError::Database(_) |
StageError::DatabaseIntegrity(_) |
StageError::StageProgress(_) |
StageError::Fatal(_)
)
}
}

View File

@ -247,7 +247,7 @@ impl<D: BodyDownloader, C: Consensus> BodyStage<D, C> {
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
PREV_STAGE_ID,
};
use assert_matches::assert_matches;
@ -258,7 +258,7 @@ mod tests {
use std::collections::HashMap;
use test_utils::*;
stage_test_suite!(BodyTestRunner);
stage_test_suite_ext!(BodyTestRunner);
/// Checks that the stage downloads at most `batch_size` blocks.
#[tokio::test]

View File

@ -20,7 +20,7 @@ use reth_interfaces::{
},
},
};
use reth_primitives::{BlockNumber, SealedHeader, H256, U256};
use reth_primitives::{BlockNumber, Header, SealedHeader, H256, U256};
use std::{fmt::Debug, sync::Arc};
use tracing::*;
@ -74,26 +74,17 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
let stage_progress = input.stage_progress.unwrap_or_default();
self.update_head::<DB>(db, stage_progress).await?;
// Lookup the last stored header
let last_hash = db.get_block_hash(stage_progress)?;
let last_header =
db.get::<tables::Headers>((stage_progress, last_hash).into())?.ok_or({
DatabaseIntegrityError::Header { number: stage_progress, hash: last_hash }
})?;
let head = SealedHeader::new(last_header, last_hash);
let forkchoice = self.next_fork_choice_state(&head.hash()).await;
if let Some(number) = db.get::<tables::HeaderNumbers>(forkchoice.head_block_hash)? {
if number < head.number {
// Nothing to do here
warn!("Consensus reported old head {}", forkchoice.head_block_hash);
return Ok(ExecOutput { stage_progress, done: true, reached_tip: true })
}
}
// Lookup the head and tip of the sync range
let (head, tip) = self.get_head_and_tip(db, stage_progress).await?;
trace!(
target = "sync::stages::headers",
"Syncing from tip {:?} to head {:?}",
tip,
head.hash()
);
let mut current_progress = stage_progress;
let mut stream =
self.downloader.stream(head.clone(), forkchoice.clone()).chunks(self.commit_threshold);
let mut stream = self.downloader.stream(head.clone(), tip).chunks(self.commit_threshold);
// The stage relies on the downloader to return the headers
// in descending order starting from the tip down to
@ -130,7 +121,14 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: Statu
// Write total difficulty values after all headers have been inserted
self.write_td::<DB>(db, &head)?;
Ok(ExecOutput { stage_progress: current_progress, reached_tip: true, done: true })
let stage_progress = current_progress.max(
db.cursor::<tables::CanonicalHeaders>()?
.last()?
.map(|(num, _)| num)
.unwrap_or_default(),
);
Ok(ExecOutput { stage_progress, reached_tip: true, done: true })
}
/// Unwind the stage.
@ -167,6 +165,50 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
Ok(())
}
/// Get the head and tip of the range we need to sync
async fn get_head_and_tip<DB: Database>(
&self,
db: &Transaction<'_, DB>,
stage_progress: u64,
) -> Result<(SealedHeader, H256), StageError> {
// Create a cursor over canonical header hashes
let mut cursor = db.cursor::<tables::CanonicalHeaders>()?;
let mut header_cursor = db.cursor::<tables::Headers>()?;
// Get head hash and reposition the cursor
let (head_num, head_hash) = cursor
.seek_exact(stage_progress)?
.ok_or(DatabaseIntegrityError::CanonicalHash { number: stage_progress })?;
// Construct head
let (_, head) = header_cursor
.seek_exact((head_num, head_hash).into())?
.ok_or(DatabaseIntegrityError::Header { number: head_num, hash: head_hash })?;
let head = SealedHeader::new(head, head_hash);
// Look up the next header
let next_header = cursor
.next()?
.map(|(next_num, next_hash)| -> Result<Header, StageError> {
let (_, next) = header_cursor
.seek_exact((next_num, next_hash).into())?
.ok_or(DatabaseIntegrityError::Header { number: next_num, hash: next_hash })?;
Ok(next)
})
.transpose()?;
// Decide the tip or error out on invalid input.
// If the next element found in the cursor is not the "expected" next block per our current
// progress, then there is a gap in the database and we should start downloading in
// reverse from there. Else, it should use whatever the forkchoice state reports.
let tip = match next_header {
Some(header) if stage_progress + 1 != header.number => header.parent_hash,
None => self.next_fork_choice_state(&head.hash()).await.head_block_hash,
_ => return Err(StageError::StageProgress(stage_progress)),
};
Ok((head, tip))
}
async fn next_fork_choice_state(&self, head: &H256) -> ForkchoiceState {
let mut state_rcv = self.consensus.fork_choice_state();
loop {
@ -255,10 +297,11 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite, ExecuteStageTestRunner, UnwindStageTestRunner, PREV_STAGE_ID,
stage_test_suite, ExecuteStageTestRunner, StageTestRunner, UnwindStageTestRunner,
PREV_STAGE_ID,
};
use assert_matches::assert_matches;
use reth_interfaces::p2p::error::RequestError;
use reth_interfaces::{p2p::error::RequestError, test_utils::generators::random_header};
use test_runner::HeadersTestRunner;
stage_test_suite!(HeadersTestRunner);
@ -353,6 +396,60 @@ mod tests {
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
/// Test the head and tip range lookup
#[tokio::test]
async fn head_and_tip_lookup() {
let runner = HeadersTestRunner::default();
let db = runner.db().inner();
let stage = runner.stage();
let consensus_tip = H256::random();
stage.consensus.update_tip(consensus_tip);
// Genesis
let stage_progress = 0;
let head = random_header(0, None);
let gap_fill = random_header(1, Some(head.hash()));
let gap_tip = random_header(2, Some(gap_fill.hash()));
// Empty database
assert_matches!(
stage.get_head_and_tip(&db, stage_progress).await,
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CanonicalHash { number }))
if number == stage_progress
);
// Checkpoint and no gap
db.put::<tables::CanonicalHeaders>(head.number, head.hash())
.expect("falied to write canonical");
db.put::<tables::Headers>(head.num_hash().into(), head.clone().unseal())
.expect("failed to write header");
assert_matches!(
stage.get_head_and_tip(&db, stage_progress).await,
Ok((h, t)) if h == head && t == consensus_tip
);
// Checkpoint and gap
db.put::<tables::CanonicalHeaders>(gap_tip.number, gap_tip.hash())
.expect("falied to write canonical");
db.put::<tables::Headers>(gap_tip.num_hash().into(), gap_tip.clone().unseal())
.expect("failed to write header");
assert_matches!(
stage.get_head_and_tip(&db, stage_progress).await,
Ok((h, t)) if h == head && t == gap_tip.parent_hash
);
// Checkpoint and gap closed
db.put::<tables::CanonicalHeaders>(gap_fill.number, gap_fill.hash())
.expect("falied to write canonical");
db.put::<tables::Headers>(gap_fill.num_hash().into(), gap_fill.clone().unseal())
.expect("failed to write header");
assert_matches!(
stage.get_head_and_tip(&db, stage_progress).await,
Err(StageError::StageProgress(progress)) if progress == stage_progress
);
}
mod test_runner {
use crate::{
stages::headers::HeaderStage,

View File

@ -131,11 +131,11 @@ mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestTransaction, UnwindStageTestRunner, PREV_STAGE_ID,
};
stage_test_suite!(SendersTestRunner);
stage_test_suite_ext!(SendersTestRunner);
#[tokio::test]
async fn execute_intermediate_commit() {

View File

@ -20,37 +20,6 @@ macro_rules! stage_test_suite {
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
}
/// Check that the execution is short-circuited if the target was already reached.
#[tokio::test]
async fn execute_already_reached_target() {
let stage_progress = 1000;
// Set up the runner
let mut runner = $runner::default();
let input = crate::stage::ExecInput {
previous_stage: Some((crate::test_utils::PREV_STAGE_ID, stage_progress)),
stage_progress: Some(stage_progress),
};
let seed = runner.seed_execution(input).expect("failed to seed");
// Run stage execution
let rx = runner.execute(input);
// Run `after_execution` hook
runner.after_execution(seed).await.expect("failed to run after execution hook");
// Assert the successful result
let result = rx.await.unwrap();
assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, reached_tip, stage_progress })
if done && reached_tip && stage_progress == stage_progress
);
// Validate the stage execution
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
}
// Run the complete stage execution flow.
#[tokio::test]
async fn execute() {
@ -142,4 +111,44 @@ macro_rules! stage_test_suite {
};
}
// `execute_already_reached_target` is not suitable for the headers stage thus
// included in the test suite extension
macro_rules! stage_test_suite_ext {
($runner:ident) => {
crate::test_utils::stage_test_suite!($runner);
/// Check that the execution is short-circuited if the target was already reached.
#[tokio::test]
async fn execute_already_reached_target() {
let stage_progress = 1000;
// Set up the runner
let mut runner = $runner::default();
let input = crate::stage::ExecInput {
previous_stage: Some((crate::test_utils::PREV_STAGE_ID, stage_progress)),
stage_progress: Some(stage_progress),
};
let seed = runner.seed_execution(input).expect("failed to seed");
// Run stage execution
let rx = runner.execute(input);
// Run `after_execution` hook
runner.after_execution(seed).await.expect("failed to run after execution hook");
// Assert the successful result
let result = rx.await.unwrap();
assert_matches::assert_matches!(
result,
Ok(ExecOutput { done, reached_tip, stage_progress })
if done && reached_tip && stage_progress == stage_progress
);
// Validate the stage execution
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
}
};
}
pub(crate) use stage_test_suite;
pub(crate) use stage_test_suite_ext;