feat: concurrent headers downloader (#891)

This commit is contained in:
Matthias Seitz
2023-01-20 12:06:06 +01:00
committed by GitHub
parent eb11da8adf
commit b4080a7de5
11 changed files with 1346 additions and 705 deletions

1
Cargo.lock generated
View File

@ -4146,6 +4146,7 @@ dependencies = [
"reth-interfaces",
"reth-primitives",
"reth-rpc-types",
"reth-tracing",
"tokio",
"tracing",
]

View File

@ -150,13 +150,19 @@ impl Command {
.with_channel(sender)
.push(HeaderStage {
downloader: headers::linear::LinearDownloadBuilder::default()
.batch_size(config.stages.headers.downloader_batch_size)
.retries(config.stages.headers.downloader_retries)
.build(consensus.clone(), fetch_client.clone()),
.request_limit(config.stages.headers.downloader_batch_size)
.stream_batch_size(config.stages.headers.commit_threshold as usize)
// NOTE: the head and target will be set from inside the stage before the
// downloader is called
.build(
consensus.clone(),
fetch_client.clone(),
Default::default(),
Default::default(),
),
consensus: consensus.clone(),
client: fetch_client.clone(),
network_handle: network.clone(),
commit_threshold: config.stages.headers.commit_threshold,
metrics: HeaderMetrics::default(),
})
.push(TotalDifficultyStage {

View File

@ -1,10 +1,9 @@
use super::error::DownloadResult;
use crate::consensus::Consensus;
use futures::Stream;
use reth_primitives::PeerId;
use std::{fmt::Debug, pin::Pin};
use super::error::DownloadResult;
/// A stream for downloading response.
pub type DownloadStream<'a, T> = Pin<Box<dyn Stream<Item = DownloadResult<T>> + Send + 'a>>;
@ -20,7 +19,6 @@ pub trait DownloadClient: Send + Sync + Debug {
/// The generic trait for requesting and verifying data
/// over p2p network client
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait Downloader: Send + Sync {
/// The client used to fetch necessary data
type Client: DownloadClient;

View File

@ -104,6 +104,22 @@ pub enum DownloadError {
/// The hash of the expected tip
expected: H256,
},
/// Received a response to a request with unexpected start block
#[error("Headers response starts at unexpected block: {received:?}. Expected {expected:?}.")]
HeadersResponseStartBlockMismatch {
/// The block number of the received tip
received: u64,
/// The hash of the expected tip
expected: u64,
},
/// Received headers with less than expected items.
#[error("Received less headers than expected: {received:?}. Expected {expected:?}.")]
HeadersResponseTooShort {
/// How many headers we received.
received: u64,
/// How many headers we expected.
expected: u64,
},
/// Error while executing the request.
#[error(transparent)]
RequestError(#[from] RequestError),

View File

@ -1,21 +1,36 @@
use crate::{
consensus::Consensus,
p2p::{
downloader::{DownloadStream, Downloader},
downloader::Downloader,
error::{DownloadError, DownloadResult},
},
};
use futures::Stream;
use reth_primitives::{SealedHeader, H256};
/// A downloader capable of fetching block headers.
/// A downloader capable of fetching and yielding block headers.
///
/// A downloader represents a distinct strategy for submitting requests to download block headers,
/// while a [HeadersClient] represents a client capable of fulfilling these requests.
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeaderDownloader: Downloader {
/// Stream the headers
fn stream(&self, head: SealedHeader, tip: H256) -> DownloadStream<'_, SealedHeader>;
///
/// A [HeaderDownloader] is a [Stream] that returns batches for headers.
pub trait HeaderDownloader: Downloader + Stream<Item = Vec<SealedHeader>> + Unpin {
/// Updates the gap to sync which ranges from local head to the sync target
///
/// See also [HeaderDownloader::update_sync_target] and [HeaderDownloader::update_local_head]
fn update_sync_gap(&mut self, head: SealedHeader, target: SyncTarget) {
self.update_local_head(head);
self.update_sync_target(target);
}
/// Updates the block number of the local database
fn update_local_head(&mut self, head: SealedHeader);
/// Updates the target we want to sync to
fn update_sync_target(&mut self, target: SyncTarget);
/// Sets the headers batch size that the Stream should return.
fn set_batch_size(&mut self, limit: usize);
/// Validate whether the header is valid in relation to it's parent
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> DownloadResult<()> {
@ -24,6 +39,40 @@ pub trait HeaderDownloader: Downloader {
}
}
/// Specifies the target to sync for [HeaderDownloader::update_sync_target]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum SyncTarget {
/// This represents a range missing headers in the form of `(head,..`
///
/// Sync _inclusively_ to the given block hash.
///
/// This target specifies the upper end of the sync gap `(head...tip]`
Tip(H256),
/// This represents a gap missing headers bounded by the given header `h` in the form of
/// `(head,..h),h+1,h+2...`
///
/// Sync _exclusively_ to the given header's parent which is: `(head..h-1]`
///
/// The benefit of this variant is, that this already provides the block number of the highest
/// missing block.
Gap(SealedHeader),
}
// === impl SyncTarget ===
impl SyncTarget {
/// Returns the tip to sync to _inclusively_
///
/// This returns the hash if the target is [SyncTarget::Tip] or the `parent_hash` of the given
/// header in [SyncTarget::Gap]
pub fn tip(&self) -> H256 {
match self {
SyncTarget::Tip(tip) => *tip,
SyncTarget::Gap(gap) => gap.parent_hash,
}
}
}
/// Validate whether the header is valid in relation to it's parent
///
/// Returns Ok(false) if the

View File

@ -2,21 +2,22 @@
use crate::{
consensus::{self, Consensus},
p2p::{
downloader::{DownloadClient, DownloadStream, Downloader},
downloader::{DownloadClient, Downloader},
error::{DownloadError, DownloadResult, PeerRequestResult, RequestError},
headers::{
client::{HeadersClient, HeadersRequest, StatusUpdater},
downloader::HeaderDownloader,
downloader::{HeaderDownloader, SyncTarget},
},
},
};
use futures::{Future, FutureExt, Stream};
use futures::{Future, FutureExt, Stream, StreamExt};
use reth_eth_wire::BlockHeaders;
use reth_primitives::{
BlockNumber, Header, HeadersDirection, PeerId, SealedBlock, SealedHeader, H256,
BlockHash, BlockNumber, Header, HeadersDirection, PeerId, SealedBlock, SealedHeader, H256,
};
use reth_rpc_types::engine::ForkchoiceState;
use std::{
fmt,
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
@ -32,12 +33,20 @@ pub struct TestHeaderDownloader {
client: Arc<TestHeadersClient>,
consensus: Arc<TestConsensus>,
limit: u64,
download: Option<TestDownload>,
queued_headers: Vec<SealedHeader>,
batch_size: usize,
}
impl TestHeaderDownloader {
/// Instantiates the downloader with the mock responses
pub fn new(client: Arc<TestHeadersClient>, consensus: Arc<TestConsensus>, limit: u64) -> Self {
Self { client, consensus, limit }
pub fn new(
client: Arc<TestHeadersClient>,
consensus: Arc<TestConsensus>,
limit: u64,
batch_size: usize,
) -> Self {
Self { client, consensus, limit, download: None, batch_size, queued_headers: Vec::new() }
}
fn create_download(&self) -> TestDownload {
@ -53,22 +62,46 @@ impl TestHeaderDownloader {
}
impl Downloader for TestHeaderDownloader {
type Consensus = TestConsensus;
type Client = TestHeadersClient;
fn consensus(&self) -> &Self::Consensus {
&self.consensus
}
type Consensus = TestConsensus;
fn client(&self) -> &Self::Client {
&self.client
}
fn consensus(&self) -> &Self::Consensus {
&self.consensus
}
}
#[async_trait::async_trait]
impl HeaderDownloader for TestHeaderDownloader {
fn stream(&self, _head: SealedHeader, _tip: H256) -> DownloadStream<'_, SealedHeader> {
Box::pin(self.create_download())
fn update_local_head(&mut self, _head: SealedHeader) {}
fn update_sync_target(&mut self, _target: SyncTarget) {}
fn set_batch_size(&mut self, limit: usize) {
self.batch_size = limit;
}
}
impl Stream for TestHeaderDownloader {
type Item = Vec<SealedHeader>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
if this.queued_headers.len() == this.batch_size {
return Poll::Ready(Some(std::mem::take(&mut this.queued_headers)))
}
if this.download.is_none() {
this.download.insert(this.create_download());
}
match ready!(this.download.as_mut().unwrap().poll_next_unpin(cx)) {
None => return Poll::Ready(Some(std::mem::take(&mut this.queued_headers))),
Some(header) => this.queued_headers.push(header.unwrap()),
}
}
}
}
@ -83,6 +116,9 @@ struct TestDownload {
done: bool,
}
/// SAFETY: All the mutations are performed through an exclusive reference on `poll`
unsafe impl Sync for TestDownload {}
impl TestDownload {
fn get_or_init_fut(&mut self) -> &mut TestHeadersFut {
if self.fut.is_none() {
@ -98,6 +134,18 @@ impl TestDownload {
}
}
impl fmt::Debug for TestDownload {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TestDownload")
.field("client", &self.client)
.field("consensus", &self.consensus)
.field("limit", &self.limit)
.field("buffer", &self.buffer)
.field("done", &self.done)
.finish_non_exhaustive()
}
}
impl Stream for TestDownload {
type Item = DownloadResult<SealedHeader>;

View File

@ -1,3 +1,5 @@
#![allow(unused)]
mod bodies;
mod headers;

View File

@ -25,6 +25,7 @@ tracing = "0.1.37"
[dev-dependencies]
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
reth-tracing = { path = "../../tracing" }
assert_matches = "1.5.0"
once_cell = "1.17.0"
tokio = { version = "1.21.2", features = ["full"] }

File diff suppressed because it is too large Load Diff

View File

@ -71,6 +71,19 @@ pub enum BlockHashOrNumber {
Number(u64),
}
// === impl BlockHashOrNumber ===
impl BlockHashOrNumber {
/// Returns the block number if it is a [`BlockHashOrNumber::Number`].
#[inline]
pub fn as_number(self) -> Option<u64> {
match self {
BlockHashOrNumber::Hash(_) => None,
BlockHashOrNumber::Number(num) => Some(num),
}
}
}
impl From<H256> for BlockHashOrNumber {
fn from(value: H256) -> Self {
BlockHashOrNumber::Hash(value)

View File

@ -2,7 +2,7 @@ use crate::{
db::Transaction, metrics::HeaderMetrics, DatabaseIntegrityError, ExecInput, ExecOutput, Stage,
StageError, StageId, UnwindInput, UnwindOutput,
};
use futures_util::{StreamExt, TryStreamExt};
use futures_util::StreamExt;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
@ -12,15 +12,12 @@ use reth_db::{
};
use reth_interfaces::{
consensus::{Consensus, ForkchoiceState},
p2p::{
error::DownloadError,
headers::{
client::{HeadersClient, StatusUpdater},
downloader::{ensure_parent, HeaderDownloader},
},
p2p::headers::{
client::{HeadersClient, StatusUpdater},
downloader::{HeaderDownloader, SyncTarget},
},
};
use reth_primitives::{BlockNumber, Header, SealedHeader, H256, U256};
use reth_primitives::{BlockNumber, Header, SealedHeader, U256};
use std::{fmt::Debug, sync::Arc};
use tracing::*;
@ -49,113 +46,18 @@ pub struct HeaderStage<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: S
pub client: Arc<H>,
/// Network handle for updating status
pub network_handle: S,
/// The number of block headers to commit at once
pub commit_threshold: u64,
/// Header metrics
pub metrics: HeaderMetrics,
}
#[async_trait::async_trait]
impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater> Stage<DB>
for HeaderStage<D, C, H, S>
{
/// Return the id of the stage
fn id(&self) -> StageId {
HEADERS
}
// === impl HeaderStage ===
/// Download the headers in reverse order
/// starting from the tip
async fn execute(
&mut self,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let current_progress = input.stage_progress.unwrap_or_default();
self.update_head::<DB>(tx, current_progress).await?;
// Lookup the head and tip of the sync range
let (head, tip) = self.get_head_and_tip(tx, current_progress).await?;
// Nothing to sync
if head.hash() == tip {
info!(target: "sync::stages::headers", stage_progress = current_progress, target = ?tip, "Target block already reached");
return Ok(ExecOutput { stage_progress: current_progress, done: true })
}
debug!(target: "sync::stages::headers", ?tip, head = ?head.hash(), "Commencing sync");
// The downloader returns the headers in descending order starting from the tip
// down to the local head (latest block in db)
let downloaded_headers: Result<Vec<SealedHeader>, DownloadError> = self
.downloader
.stream(head.clone(), tip)
.take(self.commit_threshold as usize) // Only stream [self.commit_threshold] headers
.try_collect()
.await;
match downloaded_headers {
Ok(res) => {
info!(target: "sync::stages::headers", len = res.len(), "Received headers");
self.metrics.headers_counter.increment(res.len() as u64);
// Perform basic response validation
self.validate_header_response(&res)?;
// Write the headers to db
self.write_headers::<DB>(tx, res).await?.unwrap_or_default();
if self.is_stage_done(tx, current_progress).await? {
let stage_progress = current_progress.max(
tx.cursor_read::<tables::CanonicalHeaders>()?
.last()?
.map(|(num, _)| num)
.unwrap_or_default(),
);
Ok(ExecOutput { stage_progress, done: true })
} else {
Ok(ExecOutput { stage_progress: current_progress, done: false })
}
}
Err(e) => {
self.metrics.update_headers_error_metrics(&e);
match e {
DownloadError::Timeout => {
warn!(target: "sync::stages::headers", "No response for header request");
return Err(StageError::Recoverable(DownloadError::Timeout.into()))
}
DownloadError::HeaderValidation { hash, error } => {
error!(target: "sync::stages::headers", ?error, ?hash, "Validation error");
return Err(StageError::Validation { block: current_progress, error })
}
error => {
error!(target: "sync::stages::headers", ?error, "Unexpected error");
return Err(StageError::Recoverable(error.into()))
}
}
}
}
}
/// Unwind the stage.
async fn unwind(
&mut self,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
// TODO: handle bad block
info!(target: "sync::stages::headers", to_block = input.unwind_to, "Unwinding");
tx.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
input.unwind_to + 1,
)?;
tx.unwind_table_by_num::<tables::CanonicalHeaders>(input.unwind_to)?;
tx.unwind_table_by_num_hash::<tables::Headers>(input.unwind_to)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
HeaderStage<D, C, H, S>
impl<D, C, H, S> HeaderStage<D, C, H, S>
where
D: HeaderDownloader,
C: Consensus,
H: HeadersClient,
S: StatusUpdater,
{
async fn update_head<DB: Database>(
&self,
@ -171,7 +73,7 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
Ok(())
}
async fn is_stage_done<DB: Database>(
fn is_stage_done<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
stage_progress: u64,
@ -185,11 +87,13 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
}
/// Get the head and tip of the range we need to sync
async fn get_head_and_tip<DB: Database>(
///
/// See also [SyncTarget]
async fn get_sync_gap<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
stage_progress: u64,
) -> Result<(SealedHeader, H256), StageError> {
) -> Result<SyncGap, StageError> {
// Create a cursor over canonical header hashes
let mut cursor = tx.cursor_read::<tables::CanonicalHeaders>()?;
let mut header_cursor = tx.cursor_read::<tables::Headers>()?;
@ -203,7 +107,7 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
let (_, head) = header_cursor
.seek_exact((head_num, head_hash).into())?
.ok_or(DatabaseIntegrityError::Header { number: head_num, hash: head_hash })?;
let head = SealedHeader::new(head, head_hash);
let local_head = SealedHeader::new(head, head_hash);
// Look up the next header
let next_header = cursor
@ -220,15 +124,16 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
// If the next element found in the cursor is not the "expected" next block per our current
// progress, then there is a gap in the database and we should start downloading in
// reverse from there. Else, it should use whatever the forkchoice state reports.
let tip = match next_header {
Some(header) if stage_progress + 1 != header.number => header.parent_hash,
None => self.next_fork_choice_state().await.head_block_hash,
let target = match next_header {
Some(header) if stage_progress + 1 != header.number => SyncTarget::Gap(header.seal()),
None => SyncTarget::Tip(self.next_fork_choice_state().await.head_block_hash),
_ => return Err(StageError::StageProgress(stage_progress)),
};
Ok((head, tip))
Ok(SyncGap { local_head, target })
}
/// Awaits the next [ForkchoiceState] message from [Consensus] with a non-zero block hash
async fn next_fork_choice_state(&self) -> ForkchoiceState {
let mut state_rcv = self.consensus.fork_choice_state();
loop {
@ -240,25 +145,16 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
}
}
/// Perform basic header response validation
fn validate_header_response(&self, headers: &[SealedHeader]) -> Result<(), StageError> {
let mut headers_iter = headers.iter().peekable();
while let Some(header) = headers_iter.next() {
if let Some(parent) = headers_iter.peek() {
ensure_parent(header, parent)
.map_err(|err| StageError::Download(err.to_string()))?;
}
}
Ok(())
}
/// Write downloaded headers to the database
async fn write_headers<DB: Database>(
/// Write downloaded headers to the given transaction
///
/// Note: this writes the headers with rising block numbers.
fn write_headers<DB: Database>(
&self,
tx: &Transaction<'_, DB>,
headers: Vec<SealedHeader>,
) -> Result<Option<BlockNumber>, StageError> {
trace!(target: "sync::stages::headers", len = headers.len(), "writing headers");
let mut cursor_header = tx.cursor_write::<tables::Headers>()?;
let mut cursor_canonical = tx.cursor_write::<tables::CanonicalHeaders>()?;
@ -284,6 +180,108 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient, S: StatusUpdater>
}
}
#[async_trait::async_trait]
impl<DB, D, C, H, S> Stage<DB> for HeaderStage<D, C, H, S>
where
DB: Database,
D: HeaderDownloader,
C: Consensus,
H: HeadersClient,
S: StatusUpdater,
{
/// Return the id of the stage
fn id(&self) -> StageId {
HEADERS
}
/// Download the headers in reverse order (falling block numbers)
/// starting from the tip of the chain
async fn execute(
&mut self,
tx: &mut Transaction<'_, DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let current_progress = input.stage_progress.unwrap_or_default();
self.update_head::<DB>(tx, current_progress).await?;
// Lookup the head and tip of the sync range
let gap = self.get_sync_gap(tx, current_progress).await?;
let tip = gap.target.tip();
// Nothing to sync
if gap.is_closed() {
info!(target: "sync::stages::headers", stage_progress = current_progress, target = ?tip, "Target block already reached");
return Ok(ExecOutput { stage_progress: current_progress, done: true })
}
debug!(target: "sync::stages::headers", ?tip, head = ?gap.local_head.hash(), "Commencing sync");
// let the downloader know what to sync
self.downloader.update_sync_gap(gap.local_head, gap.target);
// The downloader returns the headers in descending order starting from the tip
// down to the local head (latest block in db)
let downloaded_headers = match self.downloader.next().await {
Some(downloaded_headers) => downloaded_headers,
None => {
info!(target: "sync::stages::headers", stage_progress = current_progress, target = ?tip, "Download stream exhausted");
return Ok(ExecOutput { stage_progress: current_progress, done: true })
}
};
info!(target: "sync::stages::headers", len = downloaded_headers.len(), "Received headers");
self.metrics.headers_counter.increment(downloaded_headers.len() as u64);
// Write the headers to db
self.write_headers::<DB>(tx, downloaded_headers)?.unwrap_or_default();
if self.is_stage_done(tx, current_progress)? {
let stage_progress = current_progress.max(
tx.cursor_read::<tables::CanonicalHeaders>()?
.last()?
.map(|(num, _)| num)
.unwrap_or_default(),
);
Ok(ExecOutput { stage_progress, done: true })
} else {
Ok(ExecOutput { stage_progress: current_progress, done: false })
}
}
/// Unwind the stage.
async fn unwind(
&mut self,
tx: &mut Transaction<'_, DB>,
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
// TODO: handle bad block
info!(target: "sync::stages::headers", to_block = input.unwind_to, "Unwinding");
tx.unwind_table_by_walker::<tables::CanonicalHeaders, tables::HeaderNumbers>(
input.unwind_to + 1,
)?;
tx.unwind_table_by_num::<tables::CanonicalHeaders>(input.unwind_to)?;
tx.unwind_table_by_num_hash::<tables::Headers>(input.unwind_to)?;
Ok(UnwindOutput { stage_progress: input.unwind_to })
}
}
/// Represents a gap to sync: from `local_head` to `target`
#[derive(Debug)]
struct SyncGap {
local_head: SealedHeader,
target: SyncTarget,
}
// === impl SyncGap ===
impl SyncGap {
/// Returns `true` if the gap from the head to the target was closed
#[inline]
fn is_closed(&self) -> bool {
self.local_head.hash() == self.target.tip()
}
}
#[cfg(test)]
mod tests {
use super::*;
@ -292,182 +290,10 @@ mod tests {
PREV_STAGE_ID,
};
use assert_matches::assert_matches;
use reth_interfaces::{p2p::error::RequestError, test_utils::generators::random_header};
use reth_interfaces::test_utils::generators::random_header;
use reth_primitives::H256;
use test_runner::HeadersTestRunner;
stage_test_suite!(HeadersTestRunner, headers);
/// Check that the execution errors on empty database or
/// prev progress missing from the database.
#[tokio::test]
// Validate that the execution does not fail on timeout
async fn execute_timeout() {
let (previous_stage, stage_progress) = (500, 100);
let mut runner = HeadersTestRunner::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
runner.seed_execution(input).expect("failed to seed execution");
runner.client.set_error(RequestError::Timeout).await;
let rx = runner.execute(input);
runner.consensus.update_tip(H256::from_low_u64_be(1));
let result = rx.await.unwrap();
// TODO: Downcast the internal error and actually check it
assert_matches!(result, Err(StageError::Recoverable(_)));
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
/// Check that validation error is propagated during the execution.
#[tokio::test]
async fn execute_validation_error() {
let mut runner = HeadersTestRunner::default();
runner.consensus.set_fail_validation(true);
let (stage_progress, previous_stage) = (1000, 1200);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
let headers = runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
runner.after_execution(headers).await.expect("failed to run after execution hook");
let result = rx.await.unwrap();
assert_matches!(result, Err(StageError::Validation { .. }));
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
/// Check that unexpected download errors are caught
#[tokio::test]
async fn execute_download_error() {
let mut runner = HeadersTestRunner::default();
let (stage_progress, previous_stage) = (1000, 1200);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
let headers = runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
runner.client.set_error(RequestError::BadResponse).await;
// Update tip
let tip = headers.last().unwrap();
runner.consensus.update_tip(tip.hash());
// These errors are not fatal but hand back control to the pipeline
let result = rx.await.unwrap();
assert_matches!(result, Err(StageError::Recoverable(_)));
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
/// Execute the stage with linear downloader
#[tokio::test]
async fn execute_with_linear_downloader() {
let mut runner = HeadersTestRunner::with_linear_downloader();
let (stage_progress, previous_stage) = (1000, 1200);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
let headers = runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
runner.client.extend(headers.iter().rev().map(|h| h.clone().unseal())).await;
// skip `after_execution` hook for linear downloader
let tip = headers.last().unwrap();
runner.consensus.update_tip(tip.hash());
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { done: true, stage_progress }) if stage_progress == tip.number);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
/// Test the head and tip range lookup
#[tokio::test]
async fn head_and_tip_lookup() {
let runner = HeadersTestRunner::default();
let tx = runner.tx().inner();
let stage = runner.stage();
let consensus_tip = H256::random();
stage.consensus.update_tip(consensus_tip);
// Genesis
let stage_progress = 0;
let head = random_header(0, None);
let gap_fill = random_header(1, Some(head.hash()));
let gap_tip = random_header(2, Some(gap_fill.hash()));
// Empty database
assert_matches!(
stage.get_head_and_tip(&tx, stage_progress).await,
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CanonicalHeader { number }))
if number == stage_progress
);
// Checkpoint and no gap
tx.put::<tables::CanonicalHeaders>(head.number, head.hash())
.expect("failed to write canonical");
tx.put::<tables::Headers>(head.num_hash().into(), head.clone().unseal())
.expect("failed to write header");
assert_matches!(
stage.get_head_and_tip(&tx, stage_progress).await,
Ok((h, t)) if h == head && t == consensus_tip
);
// Checkpoint and gap
tx.put::<tables::CanonicalHeaders>(gap_tip.number, gap_tip.hash())
.expect("failed to write canonical");
tx.put::<tables::Headers>(gap_tip.num_hash().into(), gap_tip.clone().unseal())
.expect("failed to write header");
assert_matches!(
stage.get_head_and_tip(&tx, stage_progress).await,
Ok((h, t)) if h == head && t == gap_tip.parent_hash
);
// Checkpoint and gap closed
tx.put::<tables::CanonicalHeaders>(gap_fill.number, gap_fill.hash())
.expect("failed to write canonical");
tx.put::<tables::Headers>(gap_fill.num_hash().into(), gap_fill.clone().unseal())
.expect("failed to write header");
assert_matches!(
stage.get_head_and_tip(&tx, stage_progress).await,
Err(StageError::StageProgress(progress)) if progress == stage_progress
);
}
/// Execute the stage in two steps
#[tokio::test]
async fn execute_from_previous_progress() {
let mut runner = HeadersTestRunner::with_linear_downloader();
let (stage_progress, previous_stage) = (600, 1200);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
let headers = runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
runner.client.extend(headers.iter().rev().map(|h| h.clone().unseal())).await;
// skip `after_execution` hook for linear downloader
let tip = headers.last().unwrap();
runner.consensus.update_tip(tip.hash());
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { done: false, stage_progress: progress }) if progress == stage_progress);
let rx = runner.execute(input);
runner.client.clear().await;
runner.client.extend(headers.iter().take(101).map(|h| h.clone().unseal()).rev()).await;
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { done: true, stage_progress }) if stage_progress == tip.number);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
mod test_runner {
use crate::{
metrics::HeaderMetrics,
@ -497,7 +323,7 @@ mod tests {
pub(crate) struct HeadersTestRunner<D: HeaderDownloader> {
pub(crate) consensus: Arc<TestConsensus>,
pub(crate) client: Arc<TestHeadersClient>,
downloader: Arc<D>,
downloader_factory: Box<dyn Fn() -> D + Send + Sync + 'static>,
network_handle: TestStatusUpdater,
tx: TestTransaction,
}
@ -509,7 +335,9 @@ mod tests {
Self {
client: client.clone(),
consensus: consensus.clone(),
downloader: Arc::new(TestHeaderDownloader::new(client, consensus, 1000)),
downloader_factory: Box::new(move || {
TestHeaderDownloader::new(client.clone(), consensus.clone(), 1000, 1000)
}),
network_handle: TestStatusUpdater::default(),
tx: TestTransaction::default(),
}
@ -517,7 +345,7 @@ mod tests {
}
impl<D: HeaderDownloader + 'static> StageTestRunner for HeadersTestRunner<D> {
type S = HeaderStage<Arc<D>, TestConsensus, TestHeadersClient, TestStatusUpdater>;
type S = HeaderStage<D, TestConsensus, TestHeadersClient, TestStatusUpdater>;
fn tx(&self) -> &TestTransaction {
&self.tx
@ -527,9 +355,8 @@ mod tests {
HeaderStage {
consensus: self.consensus.clone(),
client: self.client.clone(),
downloader: self.downloader.clone(),
downloader: (*self.downloader_factory)(),
network_handle: self.network_handle.clone(),
commit_threshold: 500,
metrics: HeaderMetrics::default(),
}
}
@ -615,17 +442,20 @@ mod tests {
}
impl HeadersTestRunner<LinearDownloader<TestConsensus, TestHeadersClient>> {
#[allow(unused)]
pub(crate) fn with_linear_downloader() -> Self {
let client = Arc::new(TestHeadersClient::default());
let consensus = Arc::new(TestConsensus::default());
let downloader = Arc::new(
LinearDownloadBuilder::default().build(consensus.clone(), client.clone()),
);
Self {
client,
consensus,
downloader,
client: client.clone(),
consensus: consensus.clone(),
downloader_factory: Box::new(move || {
LinearDownloadBuilder::default().stream_batch_size(500).build(
consensus.clone(),
client.clone(),
Default::default(),
Default::default(),
)
}),
network_handle: TestStatusUpdater::default(),
tx: TestTransaction::default(),
}
@ -645,4 +475,116 @@ mod tests {
}
}
}
stage_test_suite!(HeadersTestRunner, headers);
/// Execute the stage with linear downloader
#[tokio::test]
async fn execute_with_linear_downloader() {
let mut runner = HeadersTestRunner::with_linear_downloader();
let (stage_progress, previous_stage) = (1000, 1200);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
let headers = runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
runner.client.extend(headers.iter().rev().map(|h| h.clone().unseal())).await;
// skip `after_execution` hook for linear downloader
let tip = headers.last().unwrap();
runner.consensus.update_tip(tip.hash());
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { done: true, stage_progress }) if stage_progress == tip.number);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
/// Test the head and tip range lookup
#[tokio::test]
async fn head_and_tip_lookup() {
let runner = HeadersTestRunner::default();
let tx = runner.tx().inner();
let stage = runner.stage();
let consensus_tip = H256::random();
stage.consensus.update_tip(consensus_tip);
// Genesis
let stage_progress = 0;
let head = random_header(0, None);
let gap_fill = random_header(1, Some(head.hash()));
let gap_tip = random_header(2, Some(gap_fill.hash()));
// Empty database
assert_matches!(
stage.get_sync_gap(&tx, stage_progress).await,
Err(StageError::DatabaseIntegrity(DatabaseIntegrityError::CanonicalHeader { number }))
if number == stage_progress
);
// Checkpoint and no gap
tx.put::<tables::CanonicalHeaders>(head.number, head.hash())
.expect("failed to write canonical");
tx.put::<tables::Headers>(head.num_hash().into(), head.clone().unseal())
.expect("failed to write header");
let gap = stage.get_sync_gap(&tx, stage_progress).await.unwrap();
assert_eq!(gap.local_head, head);
assert_eq!(gap.target.tip(), consensus_tip);
// Checkpoint and gap
tx.put::<tables::CanonicalHeaders>(gap_tip.number, gap_tip.hash())
.expect("failed to write canonical");
tx.put::<tables::Headers>(gap_tip.num_hash().into(), gap_tip.clone().unseal())
.expect("failed to write header");
let gap = stage.get_sync_gap(&tx, stage_progress).await.unwrap();
assert_eq!(gap.local_head, head);
assert_eq!(gap.target.tip(), gap_tip.parent_hash);
// Checkpoint and gap closed
tx.put::<tables::CanonicalHeaders>(gap_fill.number, gap_fill.hash())
.expect("failed to write canonical");
tx.put::<tables::Headers>(gap_fill.num_hash().into(), gap_fill.clone().unseal())
.expect("failed to write header");
assert_matches!(
stage.get_sync_gap(&tx, stage_progress).await,
Err(StageError::StageProgress(progress)) if progress == stage_progress
);
}
/// Execute the stage in two steps
#[tokio::test]
async fn execute_from_previous_progress() {
let mut runner = HeadersTestRunner::with_linear_downloader();
// pick range that's larger than the configured headers batch size
let (stage_progress, previous_stage) = (600, 1200);
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
let headers = runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
runner.client.extend(headers.iter().rev().map(|h| h.clone().unseal())).await;
// skip `after_execution` hook for linear downloader
let tip = headers.last().unwrap();
runner.consensus.update_tip(tip.hash());
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { done: false, stage_progress: progress }) if progress == stage_progress);
runner.client.clear().await;
runner.client.extend(headers.iter().take(101).map(|h| h.clone().unseal()).rev()).await;
let rx = runner.execute(input);
let result = rx.await.unwrap();
assert_matches!(result, Ok(ExecOutput { done: true, stage_progress }) if stage_progress == tip.number);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
}