refactor: headers client (#249)

* refactor: headers client

* chore: rustfmt

* chore(clippy): make clippy happy

* address comments

* use Error instead

* fix(sync): headers test client & stage tests (#255)

* headers test client & stage tests

* fix timeout test

* fix import

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
Matthias Seitz
2022-11-25 14:00:06 +01:00
committed by GitHub
parent 2e3c220104
commit dda8df7341
15 changed files with 534 additions and 493 deletions

1
Cargo.lock generated
View File

@ -3280,6 +3280,7 @@ version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"futures",
"once_cell",
"rand 0.8.5",
"reth-interfaces",

View File

@ -354,7 +354,7 @@ mod gat_tests {
tokio::spawn(async move {
let mut container = DBContainer::new(&db).unwrap();
let mut stage = MyStage(&db);
let _ = stage.run(&mut container);
stage.run(&mut container).await;
});
}
}

View File

@ -94,7 +94,7 @@ mod tests {
tokio::spawn(async move {
let mut container = DBContainer::new(&db).unwrap();
let mut stage = MyStage(&db);
let _ = stage.run(&mut container);
stage.run(&mut container).await;
});
}
}

View File

@ -1,31 +1,8 @@
use crate::p2p::MessageStream;
use reth_primitives::{Header, H256, H512};
use crate::p2p::error::RequestResult;
use async_trait::async_trait;
use reth_primitives::BlockHashOrNumber;
use std::{collections::HashSet, fmt::Debug};
/// Each peer returns a list of headers and the request id corresponding
/// to these headers. This allows clients to make multiple requests in parallel
/// and multiplex the responses accordingly.
pub type HeadersStream = MessageStream<HeadersResponse>;
/// The item contained in each [`MessageStream`] when used to fetch [`Header`]s via
/// [`HeadersClient`].
#[derive(Clone, Debug)]
pub struct HeadersResponse {
/// The request id associated with this response.
pub id: u64,
/// The headers the peer replied with.
pub headers: Vec<Header>,
}
impl From<(u64, Vec<Header>)> for HeadersResponse {
fn from((id, headers): (u64, Vec<Header>)) -> Self {
HeadersResponse { id, headers }
}
}
pub use reth_eth_wire::BlockHeaders;
use reth_primitives::{BlockHashOrNumber, H256};
use std::fmt::Debug;
/// The header request struct to be sent to connected peers, which
/// will proceed to ask them to stream the requested headers to us.
@ -47,12 +24,9 @@ pub trait HeadersClient: Send + Sync + Debug {
/// Update the node's Status message.
///
/// The updated Status message will be used during any new eth/65 handshakes.
async fn update_status(&self, height: u64, hash: H256, td: H256);
fn update_status(&self, height: u64, hash: H256, td: H256);
/// Sends the header request to the p2p network.
// TODO: What does this return?
async fn send_header_request(&self, id: u64, request: HeadersRequest) -> HashSet<H512>;
/// Stream the header response messages
async fn stream_headers(&self) -> HeadersStream;
/// Sends the header request to the p2p network and returns the header response received from a
/// peer.
async fn get_headers(&self, request: HeadersRequest) -> RequestResult<BlockHeaders>;
}

View File

@ -1,18 +1,31 @@
use super::client::{HeadersClient, HeadersRequest, HeadersStream};
use crate::{consensus::Consensus, p2p::headers::error::DownloadError};
use async_trait::async_trait;
use reth_primitives::{BlockHashOrNumber, Header, SealedHeader};
use super::client::HeadersClient;
use crate::{
consensus::Consensus,
p2p::{headers::error::DownloadError, traits::BatchDownload},
};
use reth_primitives::SealedHeader;
use reth_rpc_types::engine::ForkchoiceState;
use std::time::Duration;
use tokio_stream::StreamExt;
use std::{pin::Pin, time::Duration};
/// A Future for downloading a batch of headers.
pub type HeaderBatchDownload<'a> = Pin<
Box<
dyn BatchDownload<
Ok = SealedHeader,
Error = DownloadError,
Output = Result<Vec<SealedHeader>, DownloadError>,
> + Send
+ 'a,
>,
>;
/// A downloader capable of fetching block headers.
///
/// A downloader represents a distinct strategy for submitting requests to download block headers,
/// while a [HeadersClient] represents a client capable of fulfilling these requests.
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeaderDownloader: Sync + Send {
pub trait HeaderDownloader: Sync + Send + Unpin {
/// The Consensus used to verify block validity when
/// downloading
type Consensus: Consensus;
@ -30,43 +43,34 @@ pub trait HeaderDownloader: Sync + Send {
fn client(&self) -> &Self::Client;
/// Download the headers
async fn download(
&self,
head: &SealedHeader,
forkchoice: &ForkchoiceState,
) -> Result<Vec<SealedHeader>, DownloadError>;
/// Perform a header request and returns the headers.
// TODO: Isn't this effectively blocking per request per downloader?
// Might be fine, given we can spawn multiple downloaders?
// TODO: Rethink this function, I don't really like the `stream: &mut HeadersStream`
// in the signature. Why can we not call `self.client.stream_headers()`? Gives lifetime error.
async fn download_headers(
&self,
stream: &mut HeadersStream,
start: BlockHashOrNumber,
limit: u64,
) -> Result<Vec<Header>, DownloadError> {
let request_id = rand::random();
let request = HeadersRequest { start, limit, reverse: true };
let _ = self.client().send_header_request(request_id, request).await;
// Filter stream by request id and non empty headers content
let stream = stream
.filter(|resp| request_id == resp.id && !resp.headers.is_empty())
.timeout(self.timeout());
// Pop the first item.
match Box::pin(stream).try_next().await {
Ok(Some(item)) => Ok(item.headers),
_ => return Err(DownloadError::Timeout { request_id }),
}
}
fn download(&self, head: SealedHeader, forkchoice: ForkchoiceState) -> HeaderBatchDownload<'_>;
/// Validate whether the header is valid in relation to it's parent
///
/// Returns Ok(false) if the
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> {
validate_header_download(self.consensus(), header, parent)?;
Ok(())
}
}
/// Validate whether the header is valid in relation to it's parent
///
/// Returns Ok(false) if the
pub fn validate_header_download<C: Consensus>(
consensus: &C,
header: &SealedHeader,
parent: &SealedHeader,
) -> Result<(), DownloadError> {
ensure_parent(header, parent)?;
consensus
.validate_header(header, parent)
.map_err(|error| DownloadError::HeaderValidation { hash: parent.hash(), error })?;
Ok(())
}
/// Ensures that the given `parent` header is the actual parent of the `header`
pub fn ensure_parent(header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> {
if !(parent.hash() == header.parent_hash && parent.number + 1 == header.number) {
return Err(DownloadError::MismatchedHeaders {
header_number: header.number.into(),
@ -75,10 +79,5 @@ pub trait HeaderDownloader: Sync + Send {
parent_hash: parent.hash(),
})
}
self.consensus()
.validate_header(header, parent)
.map_err(|error| DownloadError::HeaderValidation { hash: parent.hash(), error })?;
Ok(())
}
}

View File

@ -1,4 +1,4 @@
use crate::consensus;
use crate::{consensus, p2p::error::RequestError};
use reth_primitives::{rpc::BlockNumber, H256};
use thiserror::Error;
@ -15,11 +15,8 @@ pub enum DownloadError {
error: consensus::Error,
},
/// Timed out while waiting for request id response.
#[error("Timed out while getting headers for request {request_id}.")]
Timeout {
/// The request id that timed out
request_id: u64,
},
#[error("Timed out while getting headers for request.")]
Timeout,
/// Error when checking that the current [`Header`] has the parent's hash as the parent_hash
/// field, and that they have sequential block numbers.
#[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")]
@ -33,6 +30,9 @@ pub enum DownloadError {
/// The parent hash being evaluated
parent_hash: H256,
},
/// Error while executing the request.
#[error(transparent)]
RequestError(#[from] RequestError),
}
impl DownloadError {

View File

@ -13,8 +13,5 @@ pub mod headers;
/// interacting with the network implementation
pub mod error;
use futures::Stream;
use std::pin::Pin;
/// The stream of responses from the connected peers, generic over the response type.
pub type MessageStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
/// Commonly used traits when implementing clients.
pub mod traits;

View File

@ -0,0 +1,21 @@
use futures::Stream;
use std::future::Future;
/// Abstraction for downloading several items at once.
///
/// A [`BatchDownload`] is a [`Future`] that represents a collection of download futures and
/// resolves once all of them finished.
///
/// This is similar to the [`futures::future::join_all`] function, but it's open to implementers how
/// this Future behaves exactly.
///
/// It is expected that the underlying futures return a [`Result`].
pub trait BatchDownload: Future<Output = Result<Vec<Self::Ok>, Self::Error>> {
/// The `Ok` variant of the futures output in this batch.
type Ok;
/// The `Err` variant of the futures output in this batch.
type Error;
/// Consumes the batch future and returns a [`Stream`] that yields results as they become ready.
fn into_stream_unordered(self) -> Box<dyn Stream<Item = Result<Self::Ok, Self::Error>>>;
}

View File

@ -1,36 +1,43 @@
//! Testing support for headers related interfaces.
use crate::{
consensus::{self, Consensus},
p2p::headers::{
client::{HeadersClient, HeadersRequest, HeadersResponse, HeadersStream},
downloader::HeaderDownloader,
p2p::{
error::{RequestError, RequestResult},
headers::{
client::{HeadersClient, HeadersRequest},
downloader::{HeaderBatchDownload, HeaderDownloader},
error::DownloadError,
},
traits::BatchDownload,
},
};
use reth_primitives::{BlockLocked, Header, SealedHeader, H256, H512};
use futures::{Future, FutureExt, Stream};
use reth_eth_wire::BlockHeaders;
use reth_primitives::{BlockLocked, Header, SealedHeader, H256};
use reth_rpc_types::engine::ForkchoiceState;
use std::{
collections::HashSet,
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
task::{ready, Context, Poll},
time::Duration,
};
use tokio::sync::{broadcast, mpsc, watch};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
use tokio::sync::{watch, Mutex};
/// A test downloader which just returns the values that have been pushed to it.
#[derive(Debug)]
pub struct TestHeaderDownloader {
client: Arc<TestHeadersClient>,
consensus: Arc<TestConsensus>,
limit: u64,
}
impl TestHeaderDownloader {
/// Instantiates the downloader with the mock responses
pub fn new(client: Arc<TestHeadersClient>, consensus: Arc<TestConsensus>) -> Self {
Self { client, consensus }
pub fn new(client: Arc<TestHeadersClient>, consensus: Arc<TestConsensus>, limit: u64) -> Self {
Self { client, consensus, limit }
}
}
@ -51,99 +58,104 @@ impl HeaderDownloader for TestHeaderDownloader {
&self.client
}
async fn download(
fn download(
&self,
_: &SealedHeader,
_: &ForkchoiceState,
) -> Result<Vec<SealedHeader>, DownloadError> {
// call consensus stub first. fails if the flag is set
_head: SealedHeader,
_forkchoice: ForkchoiceState,
) -> HeaderBatchDownload<'_> {
Box::pin(TestDownload {
client: Arc::clone(&self.client),
consensus: Arc::clone(&self.consensus),
limit: self.limit,
})
}
}
struct TestDownload {
client: Arc<TestHeadersClient>,
consensus: Arc<TestConsensus>,
limit: u64,
}
impl Future for TestDownload {
type Output = Result<Vec<SealedHeader>, DownloadError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let empty = SealedHeader::default();
self.consensus
.validate_header(&empty, &empty)
.map_err(|error| DownloadError::HeaderValidation { hash: empty.hash(), error })?;
if let Err(error) = self.consensus.validate_header(&empty, &empty) {
return Poll::Ready(Err(DownloadError::HeaderValidation { hash: empty.hash(), error }))
}
let stream = self.client.stream_headers().await;
let stream = stream.timeout(Duration::from_secs(1));
match Box::pin(stream).try_next().await {
Ok(Some(res)) => {
let mut headers = res.headers.iter().map(|h| h.clone().seal()).collect::<Vec<_>>();
if !headers.is_empty() {
let request = HeadersRequest {
limit: self.limit,
reverse: true,
start: reth_primitives::BlockHashOrNumber::Number(0), // ignored
};
match ready!(self.client.get_headers(request).poll_unpin(cx)) {
Ok(resp) => {
let mut headers = resp.0.into_iter().skip(1).map(|h| h.seal()).collect::<Vec<_>>();
headers.sort_unstable_by_key(|h| h.number);
headers.remove(0); // remove head from response
headers.reverse();
Poll::Ready(Ok(headers))
}
Ok(headers)
Err(err) => Poll::Ready(Err(match err {
RequestError::Timeout => DownloadError::Timeout,
_ => DownloadError::RequestError(err),
})),
}
_ => Err(DownloadError::Timeout { request_id: 0 }),
}
}
impl Stream for TestDownload {
type Item = Result<SealedHeader, DownloadError>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
}
}
impl BatchDownload for TestDownload {
type Ok = SealedHeader;
type Error = DownloadError;
fn into_stream_unordered(self) -> Box<dyn Stream<Item = Result<Self::Ok, Self::Error>>> {
Box::new(self)
}
}
/// A test client for fetching headers
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct TestHeadersClient {
req_tx: mpsc::Sender<(u64, HeadersRequest)>,
req_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<(u64, HeadersRequest)>>>,
res_tx: broadcast::Sender<HeadersResponse>,
res_rx: broadcast::Receiver<HeadersResponse>,
}
impl Default for TestHeadersClient {
/// Construct a new test header downloader.
fn default() -> Self {
let (req_tx, req_rx) = mpsc::channel(1);
let (res_tx, res_rx) = broadcast::channel(1);
Self { req_tx, req_rx: Arc::new(tokio::sync::Mutex::new(req_rx)), res_tx, res_rx }
}
responses: Arc<Mutex<Vec<Header>>>,
error: Arc<Mutex<Option<RequestError>>>,
}
impl TestHeadersClient {
/// Helper for interacting with the environment on each request, allowing the client
/// to also reply to messages.
pub async fn on_header_request<T, F>(&self, mut count: usize, mut f: F) -> Vec<T>
where
F: FnMut(u64, HeadersRequest) -> T,
{
let mut rx = self.req_rx.lock().await;
let mut results = vec![];
while let Some((id, req)) = rx.recv().await {
results.push(f(id, req));
count -= 1;
if count == 0 {
break
}
}
results
/// Adds headers to the set.
pub async fn extend(&self, headers: impl IntoIterator<Item = Header>) {
let mut lock = self.responses.lock().await;
lock.extend(headers);
}
/// Helper for pushing responses to the client
pub fn send_header_response(&self, id: u64, headers: Vec<Header>) {
self.res_tx.send((id, headers).into()).expect("failed to send header response");
}
/// Helper for pushing responses to the client
pub async fn send_header_response_delayed(&self, id: u64, headers: Vec<Header>, secs: u64) {
tokio::time::sleep(Duration::from_secs(secs)).await;
self.send_header_response(id, headers);
/// Set repsonse error
pub async fn set_error(&self, err: RequestError) {
let mut lock = self.error.lock().await;
lock.replace(err);
}
}
#[async_trait::async_trait]
impl HeadersClient for TestHeadersClient {
// noop
async fn update_status(&self, _height: u64, _hash: H256, _td: H256) {}
fn update_status(&self, _height: u64, _hash: H256, _td: H256) {}
async fn send_header_request(&self, id: u64, request: HeadersRequest) -> HashSet<H512> {
self.req_tx.send((id, request)).await.expect("failed to send request");
HashSet::default()
async fn get_headers(&self, request: HeadersRequest) -> RequestResult<BlockHeaders> {
if let Some(err) = &mut *self.error.lock().await {
return Err(err.clone())
}
async fn stream_headers(&self) -> HeadersStream {
if !self.res_rx.is_empty() {
println!("WARNING: broadcast receiver already contains messages.")
}
Box::pin(BroadcastStream::new(self.res_rx.resubscribe()).filter_map(|e| e.ok()))
let mut lock = self.responses.lock().await;
let len = lock.len().min(request.limit as usize);
let resp = lock.drain(..len).collect();
return Ok(BlockHeaders(resp))
}
}

View File

@ -8,11 +8,15 @@ readme = "README.md"
description = "Implementations of various header downloader"
[dependencies]
async-trait = "0.1.58"
# reth
reth-interfaces = { path = "../../interfaces" }
reth-primitives = { path = "../../primitives" }
reth-rpc-types = { path = "../rpc-types" }
# async
async-trait = "0.1.58"
futures = "0.3"
[dev-dependencies]
assert_matches = "1.5.0"
once_cell = "1.15.0"

View File

@ -1,16 +1,26 @@
use std::{borrow::Borrow, sync::Arc, time::Duration};
use async_trait::async_trait;
use futures::{stream::Stream, FutureExt};
use reth_interfaces::{
consensus::Consensus,
p2p::headers::{
client::{HeadersClient, HeadersStream},
downloader::HeaderDownloader,
p2p::{
error::{RequestError, RequestResult},
headers::{
client::{BlockHeaders, HeadersClient, HeadersRequest},
downloader::{validate_header_download, HeaderBatchDownload, HeaderDownloader},
error::DownloadError,
},
traits::BatchDownload,
},
};
use reth_primitives::SealedHeader;
use reth_primitives::{SealedHeader, H256};
use reth_rpc_types::engine::ForkchoiceState;
use std::{
borrow::Borrow,
future::Future,
pin::Pin,
sync::Arc,
task::{ready, Context, Poll},
time::Duration,
};
/// Download headers in batches
#[derive(Debug)]
@ -27,11 +37,19 @@ pub struct LinearDownloader<C, H> {
pub request_retries: usize,
}
#[async_trait]
impl<C: Consensus, H: HeadersClient> HeaderDownloader for LinearDownloader<C, H> {
impl<C, H> HeaderDownloader for LinearDownloader<C, H>
where
C: Consensus + 'static,
H: HeadersClient + 'static,
{
type Consensus = C;
type Client = H;
/// The request timeout
fn timeout(&self) -> Duration {
self.request_timeout
}
fn consensus(&self) -> &Self::Consensus {
self.consensus.borrow()
}
@ -40,105 +58,234 @@ impl<C: Consensus, H: HeadersClient> HeaderDownloader for LinearDownloader<C, H>
self.client.borrow()
}
/// The request timeout
fn timeout(&self) -> Duration {
self.request_timeout
fn download(&self, head: SealedHeader, forkchoice: ForkchoiceState) -> HeaderBatchDownload<'_> {
Box::pin(HeadersDownload {
head,
forkchoice,
buffered: vec![],
request: Default::default(),
consensus: Arc::clone(&self.consensus),
request_retries: self.request_retries,
batch_size: self.batch_size,
client: Arc::clone(&self.client),
})
}
}
/// Download headers in batches with retries.
/// Returns the header collection in sorted descending
/// order from chain tip to local head
async fn download(
&self,
head: &SealedHeader,
forkchoice: &ForkchoiceState,
) -> Result<Vec<SealedHeader>, DownloadError> {
let mut stream = self.client().stream_headers().await;
let mut retries = self.request_retries;
// Header order will be preserved during inserts
let mut out = vec![];
loop {
let result = self.download_batch(&mut stream, forkchoice, head, out.last()).await;
match result {
Ok(result) => match result {
LinearDownloadResult::Batch(mut headers) => {
out.append(&mut headers);
}
LinearDownloadResult::Finished(mut headers) => {
out.append(&mut headers);
return Ok(out)
}
LinearDownloadResult::Ignore => (),
},
Err(e) if e.is_retryable() && retries > 1 => {
retries -= 1;
}
Err(e) => return Err(e),
}
impl<C: Consensus, H: HeadersClient> Clone for LinearDownloader<C, H> {
fn clone(&self) -> Self {
Self {
consensus: Arc::clone(&self.consensus),
client: Arc::clone(&self.client),
batch_size: self.batch_size,
request_timeout: self.request_timeout,
request_retries: self.request_retries,
}
}
}
/// The intermediate download result
#[derive(Debug)]
pub enum LinearDownloadResult {
/// Downloaded last batch up to tip
Finished(Vec<SealedHeader>),
/// Downloaded batch
Batch(Vec<SealedHeader>),
/// Ignore this batch
Ignore,
type HeadersFut = Pin<Box<dyn Future<Output = RequestResult<BlockHeaders>> + Send>>;
/// A retryable future that returns a list of [`BlockHeaders`] on success.
struct HeadersRequestFuture {
request: HeadersRequest,
fut: HeadersFut,
retries: usize,
max_retries: usize,
}
impl<C: Consensus, H: HeadersClient> LinearDownloader<C, H> {
async fn download_batch(
&self,
stream: &mut HeadersStream,
forkchoice: &ForkchoiceState,
head: &SealedHeader,
earliest: Option<&SealedHeader>,
) -> Result<LinearDownloadResult, DownloadError> {
// Request headers starting from tip or earliest cached
let start = earliest.map_or(forkchoice.head_block_hash, |h| h.parent_hash);
let mut headers = self.download_headers(stream, start.into(), self.batch_size).await?;
impl HeadersRequestFuture {
/// Returns true if the request can be retried.
fn is_retryable(&self) -> bool {
self.retries < self.max_retries
}
/// Increments the retry counter and returns whether the request can still be retried.
fn inc_err(&mut self) -> bool {
self.retries += 1;
self.is_retryable()
}
}
impl Future for HeadersRequestFuture {
type Output = RequestResult<BlockHeaders>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.get_mut().fut.poll_unpin(cx)
}
}
/// An in progress headers download.
pub struct HeadersDownload<C, H> {
/// The local head of the chain.
head: SealedHeader,
forkchoice: ForkchoiceState,
/// Buffered results
buffered: Vec<SealedHeader>,
/// Contains the request that's currently in progress.
///
/// TODO(mattsse): this could be converted into a `FuturesOrdered` where batching is done via
/// `skip` so we don't actually need to know the start hash
request: Option<HeadersRequestFuture>,
/// Downloader used to issue new requests.
consensus: Arc<C>,
/// Downloader used to issue new requests.
client: Arc<H>,
/// The number of headers to request in one call
batch_size: u64,
/// The number of retries for downloading
request_retries: usize,
}
impl<C, H> HeadersDownload<C, H>
where
C: Consensus + 'static,
H: HeadersClient + 'static,
{
/// Returns the start hash for a new request.
fn request_start(&self) -> H256 {
self.buffered.last().map_or(self.forkchoice.head_block_hash, |h| h.parent_hash)
}
fn headers_request(&self) -> HeadersRequest {
HeadersRequest { start: self.request_start().into(), limit: self.batch_size, reverse: true }
}
/// Tries to fuse the future with a new request
///
/// Returns an `Err` if the request exhausted all retries
fn try_fuse_request_fut(&self, fut: &mut HeadersRequestFuture) -> Result<(), ()> {
if !fut.inc_err() {
return Err(())
}
let req = self.headers_request();
fut.request = req.clone();
let client = Arc::clone(&self.client);
fut.fut = Box::pin(async move { client.get_headers(req).await });
Ok(())
}
/// Validate whether the header is valid in relation to it's parent
///
/// Returns Ok(false) if the
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> {
validate_header_download(&self.consensus, header, parent)?;
Ok(())
}
}
impl<C, H> Future for HeadersDownload<C, H>
where
C: Consensus + 'static,
H: HeadersClient + 'static,
{
type Output = Result<Vec<SealedHeader>, DownloadError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
'outer: loop {
let mut fut = match this.request.take() {
Some(fut) => fut,
None => {
// queue in the first request
let client = Arc::clone(&this.client);
let req = this.headers_request();
HeadersRequestFuture {
request: req.clone(),
fut: Box::pin(async move { client.get_headers(req).await }),
retries: 0,
max_retries: this.request_retries,
}
}
};
match ready!(fut.poll_unpin(cx)) {
Ok(resp) => {
let mut headers = resp.0;
headers.sort_unstable_by_key(|h| h.number);
let mut out = Vec::with_capacity(headers.len());
if headers.is_empty() {
if this.try_fuse_request_fut(&mut fut).is_err() {
return Poll::Ready(Err(RequestError::BadResponse.into()))
} else {
this.request = Some(fut);
continue
}
}
// Iterate headers in reverse
for parent in headers.into_iter().rev() {
let parent = parent.seal();
if head.hash() == parent.hash() {
if this.head.hash() == parent.hash() {
// We've reached the target
return Ok(LinearDownloadResult::Finished(out))
let headers =
std::mem::take(&mut this.buffered).into_iter().rev().collect();
return Poll::Ready(Ok(headers))
}
match out.last().or(earliest) {
Some(header) => {
match self.validate(header, &parent) {
// ignore mismatched headers
Err(DownloadError::MismatchedHeaders { .. }) => {
return Ok(LinearDownloadResult::Ignore)
if let Some(header) = this.buffered.last() {
match this.validate(header, &parent) {
Ok(_) => {
// record new parent
this.buffered.push(parent);
}
// propagate any other error if any
Err(e) => return Err(e),
// proceed to insert if validation is successful
_ => (),
};
Err(err) => {
if this.try_fuse_request_fut(&mut fut).is_err() {
return Poll::Ready(Err(err))
}
// The buffer is empty and the first header does not match the tip, discard
// TODO: penalize the peer?
None if parent.hash() != forkchoice.head_block_hash => {
return Ok(LinearDownloadResult::Ignore)
this.request = Some(fut);
continue 'outer
}
_ => (),
};
}
} else {
// The buffer is empty and the first header does not match the tip,
// discard
if parent.hash() != this.forkchoice.head_block_hash {
if this.try_fuse_request_fut(&mut fut).is_err() {
return Poll::Ready(Err(RequestError::BadResponse.into()))
}
this.request = Some(fut);
continue 'outer
}
this.buffered.push(parent);
}
}
}
Err(err) => {
if this.try_fuse_request_fut(&mut fut).is_err() {
return Poll::Ready(Err(DownloadError::RequestError(err)))
}
this.request = Some(fut);
}
}
}
}
}
out.push(parent);
}
impl<C, H> Stream for HeadersDownload<C, H>
where
C: Consensus + 'static,
H: HeadersClient + 'static,
{
type Item = Result<SealedHeader, DownloadError>;
Ok(LinearDownloadResult::Batch(out))
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
}
}
impl<C, H> BatchDownload for HeadersDownload<C, H>
where
C: Consensus + 'static,
H: HeadersClient + 'static,
{
type Ok = SealedHeader;
type Error = DownloadError;
fn into_stream_unordered(self) -> Box<dyn Stream<Item = Result<Self::Ok, Self::Error>>> {
Box::new(self)
}
}
@ -199,200 +346,86 @@ impl LinearDownloadBuilder {
#[cfg(test)]
mod tests {
use super::*;
use reth_interfaces::{
p2p::headers::client::HeadersRequest,
test_utils::{
generators::{random_header, random_header_range},
TestConsensus, TestHeadersClient,
},
};
use reth_primitives::{BlockHashOrNumber, SealedHeader};
use assert_matches::assert_matches;
use once_cell::sync::Lazy;
use serial_test::serial;
use tokio::sync::oneshot::{self, error::TryRecvError};
use reth_interfaces::test_utils::{TestConsensus, TestHeadersClient};
use reth_primitives::SealedHeader;
static CONSENSUS: Lazy<Arc<TestConsensus>> = Lazy::new(|| Arc::new(TestConsensus::default()));
static CONSENSUS_FAIL: Lazy<Arc<TestConsensus>> = Lazy::new(|| {
let consensus = TestConsensus::default();
consensus.set_fail_validation(true);
Arc::new(consensus)
});
static CLIENT: Lazy<Arc<TestHeadersClient>> =
Lazy::new(|| Arc::new(TestHeadersClient::default()));
fn child_header(parent: &SealedHeader) -> SealedHeader {
let mut child = parent.as_ref().clone();
child.number += 1;
child.parent_hash = parent.hash_slow();
let hash = child.hash_slow();
SealedHeader::new(child, hash)
}
#[tokio::test]
#[serial]
async fn download_timeout() {
let retries = 5;
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
async fn download_empty() {
let client = Arc::new(TestHeadersClient::default());
let downloader =
LinearDownloadBuilder::default().build(CONSENSUS.clone(), Arc::clone(&client));
let result = downloader.download(SealedHeader::default(), ForkchoiceState::default()).await;
assert!(result.is_err());
}
#[tokio::test]
async fn download_at_fork_head() {
let client = Arc::new(TestHeadersClient::default());
let downloader = LinearDownloadBuilder::default()
.retries(retries)
.build(CONSENSUS.clone(), CLIENT.clone());
let result =
downloader.download(&SealedHeader::default(), &ForkchoiceState::default()).await;
tx.send(result).expect("failed to forward download response");
});
.batch_size(3)
.build(CONSENSUS.clone(), Arc::clone(&client));
let mut requests = vec![];
CLIENT
.on_header_request(retries, |_id, req| {
requests.push(req);
})
let p3 = SealedHeader::default();
let p2 = child_header(&p3);
let p1 = child_header(&p2);
let p0 = child_header(&p1);
client
.extend(vec![
p0.as_ref().clone(),
p1.as_ref().clone(),
p2.as_ref().clone(),
p3.as_ref().clone(),
])
.await;
assert_eq!(requests.len(), retries);
assert_matches!(rx.await, Ok(Err(DownloadError::Timeout { .. })));
let fork = ForkchoiceState { head_block_hash: p0.hash_slow(), ..Default::default() };
let result = downloader.download(p0, fork).await;
let headers = result.unwrap();
assert!(headers.is_empty());
}
#[tokio::test]
#[serial]
async fn download_timeout_on_invalid_messages() {
let retries = 5;
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
async fn download_exact() {
let client = Arc::new(TestHeadersClient::default());
let downloader = LinearDownloadBuilder::default()
.retries(retries)
.build(CONSENSUS.clone(), CLIENT.clone());
let result =
downloader.download(&SealedHeader::default(), &ForkchoiceState::default()).await;
tx.send(result).expect("failed to forward download response");
});
.batch_size(3)
.build(CONSENSUS.clone(), Arc::clone(&client));
let mut num_of_reqs = 0;
let mut last_req_id: Option<u64> = None;
let p3 = SealedHeader::default();
let p2 = child_header(&p3);
let p1 = child_header(&p2);
let p0 = child_header(&p1);
CLIENT
.on_header_request(retries, |id, _req| {
num_of_reqs += 1;
last_req_id = Some(id);
CLIENT.send_header_response(id.saturating_add(id % 2), vec![]);
})
client
.extend(vec![
p0.as_ref().clone(),
p1.as_ref().clone(),
p2.as_ref().clone(),
p3.as_ref().clone(),
])
.await;
assert_eq!(num_of_reqs, retries);
assert_matches!(
rx.await,
Ok(Err(DownloadError::Timeout { request_id })) if request_id == last_req_id.unwrap()
);
}
let fork = ForkchoiceState { head_block_hash: p0.hash_slow(), ..Default::default() };
#[tokio::test]
#[serial]
async fn download_propagates_consensus_validation_error() {
let tip_parent = random_header(1, None);
let tip = random_header(2, Some(tip_parent.hash()));
let tip_hash = tip.hash();
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let downloader =
LinearDownloadBuilder::default().build(CONSENSUS_FAIL.clone(), CLIENT.clone());
let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() };
let result = downloader.download(&SealedHeader::default(), &forkchoice).await;
tx.send(result).expect("failed to forward download response");
});
let requests = CLIENT.on_header_request(1, |id, req| (id, req)).await;
let request = requests.last();
assert_matches!(
request,
Some((_, HeadersRequest { start, .. }))
if matches!(start, BlockHashOrNumber::Hash(hash) if *hash == tip_hash)
);
let request = request.unwrap();
CLIENT.send_header_response(
request.0,
vec![tip_parent.clone().unseal(), tip.clone().unseal()],
);
assert_matches!(
rx.await,
Ok(Err(DownloadError::HeaderValidation { hash, .. })) if hash == tip_parent.hash()
);
}
#[tokio::test]
#[serial]
async fn download_starts_with_chain_tip() {
let head = random_header(1, None);
let tip = random_header(2, Some(head.hash()));
let tip_hash = tip.hash();
let chain_head = head.clone();
let (tx, mut rx) = oneshot::channel();
tokio::spawn(async move {
let downloader =
LinearDownloadBuilder::default().build(CONSENSUS.clone(), CLIENT.clone());
let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() };
let result = downloader.download(&chain_head, &forkchoice).await;
tx.send(result).expect("failed to forward download response");
});
CLIENT
.on_header_request(1, |id, _req| {
let mut corrupted_tip = tip.clone().unseal();
corrupted_tip.nonce = rand::random();
CLIENT.send_header_response(id, vec![corrupted_tip, head.clone().unseal()])
})
.await;
assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
CLIENT
.on_header_request(1, |id, _req| {
CLIENT.send_header_response(id, vec![tip.clone().unseal(), head.clone().unseal()])
})
.await;
let result = rx.await;
assert_matches!(result, Ok(Ok(ref val)) if val.len() == 1);
assert_eq!(*result.unwrap().unwrap().first().unwrap(), tip);
}
#[tokio::test]
#[serial]
async fn download_returns_headers_desc() {
let (start, end) = (100, 200);
let head = random_header(start, None);
let mut headers = random_header_range(start + 1..end, head.hash());
headers.reverse();
let tip_hash = headers.first().unwrap().hash();
let chain_head = head.clone();
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let downloader =
LinearDownloadBuilder::default().build(CONSENSUS.clone(), CLIENT.clone());
let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() };
let result = downloader.download(&chain_head, &forkchoice).await;
tx.send(result).expect("failed to forward download response");
});
let mut idx = 0;
let chunk_size = 10;
// `usize::div_ceil` is unstable. ref: https://github.com/rust-lang/rust/issues/88581
let count = (headers.len() + chunk_size - 1) / chunk_size;
CLIENT
.on_header_request(count + 1, |id, _req| {
let mut chunk =
headers.iter().skip(chunk_size * idx).take(chunk_size).cloned().peekable();
idx += 1;
if chunk.peek().is_some() {
let headers: Vec<_> = chunk.map(|h| h.unseal()).collect();
CLIENT.send_header_response(id, headers);
} else {
CLIENT.send_header_response(id, vec![head.clone().unseal()])
}
})
.await;
let result = rx.await;
assert_matches!(result, Ok(Ok(_)));
let result = result.unwrap().unwrap();
assert_eq!(result.len(), headers.len());
assert_eq!(result, headers);
let result = downloader.download(p3, fork).await;
let headers = result.unwrap();
assert_eq!(headers.len(), 3);
assert_eq!(headers[0], p2);
assert_eq!(headers[1], p1);
assert_eq!(headers[2], p0);
}
}

View File

@ -203,7 +203,7 @@ impl Decodable for Header {
/// A [`Header`] that is sealed at a precalculated hash, use [`SealedHeader::unseal()`] if you want
/// to modify header.
#[derive(Debug, Clone, PartialEq, Eq, Default, Hash)]
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SealedHeader {
/// Locked Header fields.
header: Header,
@ -211,6 +211,14 @@ pub struct SealedHeader {
hash: BlockHash,
}
impl Default for SealedHeader {
fn default() -> Self {
let header = Header::default();
let hash = header.hash_slow();
Self { header, hash }
}
}
impl Encodable for SealedHeader {
fn encode(&self, out: &mut dyn BufMut) {
self.header.encode(out);

View File

@ -73,15 +73,15 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient> Stage<DB
// The stage relies on the downloader to return the headers
// in descending order starting from the tip down to
// the local head (latest block in db)
let headers = match self.downloader.download(&head, &forkchoice).await {
let headers = match self.downloader.download(head, forkchoice).await {
Ok(res) => {
// TODO: validate the result order?
// at least check if it attaches (first == tip && last == last_hash)
res
}
Err(e) => match e {
DownloadError::Timeout { request_id } => {
warn!("no response for header request {request_id}");
DownloadError::Timeout => {
warn!("no response for header request");
return Ok(ExecOutput {
stage_progress: last_block_num,
reached_tip: false,
@ -92,10 +92,7 @@ impl<DB: Database, D: HeaderDownloader, C: Consensus, H: HeadersClient> Stage<DB
warn!("Validation error for header {hash}: {error}");
return Err(StageError::Validation { block: last_block_num, error })
}
// TODO: this error is never propagated, clean up
// DownloadError::MismatchedHeaders { .. } => {
// return Err(StageError::Validation { block: last_block_num })
// }
// TODO: handle unreachable
_ => unreachable!(),
},
};
@ -134,7 +131,7 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient> HeaderStage<D, C, H> {
.get::<tables::CanonicalHeaders>(height)?
.ok_or(DatabaseIntegrityError::CanonicalHeader { number: height })?;
let td: Vec<u8> = tx.get::<tables::HeaderTD>((height, hash).into())?.unwrap(); // TODO:
self.client.update_status(height, hash, H256::from_slice(&td)).await;
self.client.update_status(height, hash, H256::from_slice(&td));
Ok(())
}
@ -163,7 +160,7 @@ impl<D: HeaderDownloader, C: Consensus, H: HeadersClient> HeaderStage<D, C, H> {
let mut latest = None;
// Since the headers were returned in descending order,
// iterate them in the reverse order
for header in headers.into_iter().rev() {
for header in headers.into_iter() {
if header.number == 0 {
continue
}
@ -194,6 +191,7 @@ mod tests {
stage_test_suite, ExecuteStageTestRunner, UnwindStageTestRunner, PREV_STAGE_ID,
};
use assert_matches::assert_matches;
use reth_interfaces::p2p::error::RequestError;
use test_runner::HeadersTestRunner;
stage_test_suite!(HeadersTestRunner);
@ -203,15 +201,20 @@ mod tests {
#[tokio::test]
// Validate that the execution does not fail on timeout
async fn execute_timeout() {
let (previous_stage, stage_progress) = (500, 100);
let mut runner = HeadersTestRunner::default();
let input = ExecInput::default();
let input = ExecInput {
previous_stage: Some((PREV_STAGE_ID, previous_stage)),
stage_progress: Some(stage_progress),
};
runner.seed_execution(input).expect("failed to seed execution");
runner.client.set_error(RequestError::Timeout).await;
let rx = runner.execute(input);
runner.consensus.update_tip(H256::from_low_u64_be(1));
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done: false, reached_tip: false, stage_progress: 0 })
Ok(ExecOutput { done: false, reached_tip: false, stage_progress: 100 })
);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
@ -242,23 +245,17 @@ mod tests {
let headers = runner.seed_execution(input).expect("failed to seed execution");
let rx = runner.execute(input);
runner.client.extend(headers.iter().rev().map(|h| h.clone().unseal())).await;
// skip `after_execution` hook for linear downloader
let tip = headers.last().unwrap();
runner.consensus.update_tip(tip.hash());
let download_result = headers.clone();
runner
.client
.on_header_request(1, |id, _| {
let response = download_result.iter().map(|h| h.clone().unseal()).collect();
runner.client.send_header_response(id, response)
})
.await;
let result = rx.await.unwrap();
assert_matches!(
result,
Ok(ExecOutput { done: true, reached_tip: true, stage_progress }) if stage_progress == tip.number
Ok(ExecOutput { done: true, reached_tip: true, stage_progress })
if stage_progress == tip.number
);
assert!(runner.validate_execution(input, result.ok()).is_ok(), "validation failed");
}
@ -298,7 +295,7 @@ mod tests {
Self {
client: client.clone(),
consensus: consensus.clone(),
downloader: Arc::new(TestHeaderDownloader::new(client, consensus)),
downloader: Arc::new(TestHeaderDownloader::new(client, consensus, 1000)),
db: StageTestDB::default(),
}
}
@ -341,23 +338,6 @@ mod tests {
Ok(headers)
}
async fn after_execution(&self, headers: Self::Seed) -> Result<(), TestRunnerError> {
let tip = if !headers.is_empty() {
headers.last().unwrap().hash()
} else {
H256::from_low_u64_be(rand::random())
};
self.consensus.update_tip(tip);
self.client
.send_header_response_delayed(
0,
headers.into_iter().map(|h| h.unseal()).collect(),
1,
)
.await;
Ok(())
}
/// Validate stored headers
fn validate_execution(
&self,
@ -405,6 +385,17 @@ mod tests {
};
Ok(())
}
async fn after_execution(&self, headers: Self::Seed) -> Result<(), TestRunnerError> {
self.client.extend(headers.iter().map(|h| h.clone().unseal())).await;
let tip = if !headers.is_empty() {
headers.last().unwrap().hash()
} else {
H256::from_low_u64_be(rand::random())
};
self.consensus.update_tip(tip);
Ok(())
}
}
impl<D: HeaderDownloader + 'static> UnwindStageTestRunner for HeadersTestRunner<D> {
@ -414,6 +405,7 @@ mod tests {
}
impl HeadersTestRunner<LinearDownloader<TestConsensus, TestHeadersClient>> {
#[allow(unused)]
pub(crate) fn with_linear_downloader() -> Self {
let client = Arc::new(TestHeadersClient::default());
let consensus = Arc::new(TestConsensus::default());

View File

@ -218,9 +218,9 @@ mod tests {
let start_hash = tx.get::<tables::CanonicalHeaders>(start_block)?.unwrap();
let mut body_cursor = tx.cursor::<tables::BlockBodies>()?;
let mut body_walker = body_cursor.walk((start_block, start_hash).into())?;
let body_walker = body_cursor.walk((start_block, start_hash).into())?;
while let Some(entry) = body_walker.next() {
for entry in body_walker {
let (_, body) = entry?;
for tx_id in body.base_tx_id..body.base_tx_id + body.tx_amount {
let transaction = tx

View File

@ -170,9 +170,9 @@ mod tests {
let mut tx_count_walker = tx_count_cursor.walk((start, start_hash).into())?;
let mut count = tx_count_walker.next().unwrap()?.1;
let mut last_num = start;
while let Some(entry) = tx_count_walker.next() {
for entry in tx_count_walker {
let (key, db_count) = entry?;
count += tx.get::<tables::BlockBodies>(key)?.unwrap().tx_amount as u64;
count += tx.get::<tables::BlockBodies>(key)?.unwrap().tx_amount;
assert_eq!(db_count, count);
last_num = key.number();
}