headers(part2) - feat: add Downloader trait and test utils (#118)

* feat(interfaces): implement header client traits

* feat: add downloader trait implementer

* feat: use explicit error type instead of ok(false)

* feat: add constructor to HeaderLocked

* test: scaffold mock consensus, downloader and headersclient helpers

* test: implement test consensus

* test: implement test headers client

* refactor: cleanup download headers

* chore: fix lint

* s/test_utils/test_helpers

* headers(part 3) feat: implement Linear downloader (#119)

* feat: add headers downloaders crate

* feat: more scaffolding

* interfaces: generalize retryable erros

* feat: implement linear downloader

* fix linear downloader tests & add builder

* extend & reverse

* feat: linear downloader generics behind arc and reversed return order (#120)

* put client & consensus behind arc and return headers in rev

* cleanup

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>

* extract test_utils

* cargo fmt

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
This commit is contained in:
Georgios Konstantopoulos
2022-10-24 04:56:43 -07:00
committed by GitHub
parent a4e505132c
commit 15bd88e637
15 changed files with 917 additions and 5 deletions

64
Cargo.lock generated
View File

@ -70,6 +70,12 @@ version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "assert_matches"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]] [[package]]
name = "async-lock" name = "async-lock"
version = "2.5.0" version = "2.5.0"
@ -730,6 +736,19 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "dashmap"
version = "5.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
dependencies = [
"cfg-if",
"hashbrown",
"lock_api",
"once_cell",
"parking_lot_core",
]
[[package]] [[package]]
name = "der" name = "der"
version = "0.6.0" version = "0.6.0"
@ -2499,6 +2518,21 @@ dependencies = [
"thiserror", "thiserror",
] ]
[[package]]
name = "reth-headers-downloaders"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"once_cell",
"rand",
"reth-interfaces",
"reth-primitives",
"reth-rpc-types",
"serial_test",
"tokio",
]
[[package]] [[package]]
name = "reth-interfaces" name = "reth-interfaces"
version = "0.1.0" version = "0.1.0"
@ -2507,15 +2541,18 @@ dependencies = [
"auto_impl", "auto_impl",
"bytes", "bytes",
"eyre", "eyre",
"futures",
"heapless", "heapless",
"parity-scale-codec", "parity-scale-codec",
"postcard", "postcard",
"rand",
"reth-primitives", "reth-primitives",
"reth-rpc-types", "reth-rpc-types",
"serde", "serde",
"test-fuzz", "test-fuzz",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-stream",
] ]
[[package]] [[package]]
@ -3052,6 +3089,32 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serial_test"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92761393ee4dc3ff8f4af487bd58f4307c9329bbedea02cac0089ad9c411e153"
dependencies = [
"dashmap",
"futures",
"lazy_static",
"log",
"parking_lot",
"serial_test_derive",
]
[[package]]
name = "serial_test_derive"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b6f5d1c3087fb119617cff2966fe3808a80e5eb59a8c1601d5994d66f4346a5"
dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "sha-1" name = "sha-1"
version = "0.9.8" version = "0.9.8"
@ -3455,6 +3518,7 @@ dependencies = [
"futures-core", "futures-core",
"pin-project-lite", "pin-project-lite",
"tokio", "tokio",
"tokio-util",
] ]
[[package]] [[package]]

View File

@ -13,6 +13,7 @@ members = [
"crates/net/rpc", "crates/net/rpc",
"crates/net/rpc-api", "crates/net/rpc-api",
"crates/net/rpc-types", "crates/net/rpc-types",
"crates/net/headers-downloaders",
"crates/primitives", "crates/primitives",
"crates/stages", "crates/stages",
"crates/transaction-pool", "crates/transaction-pool",

View File

@ -21,10 +21,15 @@ serde = { version = "1.0.*", default-features = false }
postcard = { version = "1.0.2", features = ["alloc"] } postcard = { version = "1.0.2", features = ["alloc"] }
heapless = "0.7.16" heapless = "0.7.16"
parity-scale-codec = { version = "3.2.1", features = ["bytes"] } parity-scale-codec = { version = "3.2.1", features = ["bytes"] }
futures = "0.3.25"
tokio-stream = "0.1.11"
rand = "0.8.5"
[dev-dependencies] [dev-dependencies]
test-fuzz = "3.0.4" test-fuzz = "3.0.4"
tokio = { version = "1.21.2", features = ["full"] } tokio = { version = "1.21.2", features = ["full"] }
tokio-stream = { version = "0.1.11", features = ["sync"] }
[features] [features]
bench = [] bench = []
test-utils = ["tokio-stream/sync"]

View File

@ -7,14 +7,13 @@ use tokio::sync::watch::Receiver;
/// Consensus is a protocol that chooses canonical chain. /// Consensus is a protocol that chooses canonical chain.
/// We are checking validity of block header here. /// We are checking validity of block header here.
#[async_trait] #[async_trait]
pub trait Consensus { #[auto_impl::auto_impl(&, Arc)]
pub trait Consensus: Send + Sync {
/// Get a receiver for the fork choice state /// Get a receiver for the fork choice state
fn fork_choice_state(&self) -> Receiver<ForkchoiceState>; fn fork_choice_state(&self) -> Receiver<ForkchoiceState>;
/// Validate if header is correct and follows consensus specification /// Validate if header is correct and follows consensus specification
fn validate_header(&self, _header: &Header) -> Result<(), Error> { fn validate_header(&self, header: &Header, parent: &Header) -> Result<(), Error>;
Ok(())
}
} }
/// Consensus errors (TODO) /// Consensus errors (TODO)

View File

@ -18,7 +18,14 @@ pub mod db;
/// Traits that provide chain access. /// Traits that provide chain access.
pub mod provider; pub mod provider;
/// P2P traits.
pub mod p2p;
/// Possible errors when interacting with the chain. /// Possible errors when interacting with the chain.
mod error; mod error;
pub use error::{Error, Result}; pub use error::{Error, Result};
#[cfg(any(test, feature = "test-utils"))]
/// Common test helpers for mocking out Consensus, Downloaders and Header Clients.
pub mod test_utils;

View File

@ -0,0 +1,57 @@
use crate::p2p::MessageStream;
use reth_primitives::{rpc::BlockId, Header, H256, H512};
use async_trait::async_trait;
use std::{collections::HashSet, fmt::Debug};
/// Each peer returns a list of headers and the request id corresponding
/// to these headers. This allows clients to make multiple requests in parallel
/// and multiplex the responses accordingly.
pub type HeadersStream = MessageStream<HeadersResponse>;
/// The item contained in each [`MessageStream`] when used to fetch [`Header`]s via
/// [`HeadersClient`].
#[derive(Clone, Debug)]
pub struct HeadersResponse {
/// The request id associated with this response.
pub id: u64,
/// The headers the peer replied with.
pub headers: Vec<Header>,
}
impl From<(u64, Vec<Header>)> for HeadersResponse {
fn from((id, headers): (u64, Vec<Header>)) -> Self {
HeadersResponse { id, headers }
}
}
/// The header request struct to be sent to connected peers, which
/// will proceed to ask them to stream the requested headers to us.
#[derive(Clone, Debug)]
pub struct HeadersRequest {
/// The starting block
pub start: BlockId,
/// The response max size
pub limit: u64,
/// Flag indicating whether the blocks should
/// arrive in reverse
pub reverse: bool,
}
/// The block headers downloader client
#[async_trait]
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeadersClient: Send + Sync + Debug {
/// Update the node's Status message.
///
/// The updated Status message will be used during any new eth/65 handshakes.
async fn update_status(&self, height: u64, hash: H256, td: H256);
/// Sends the header request to the p2p network.
// TODO: What does this return?
async fn send_header_request(&self, id: u64, request: HeadersRequest) -> HashSet<H512>;
/// Stream the header response messages
async fn stream_headers(&self) -> HeadersStream;
}

View File

@ -0,0 +1,131 @@
use super::client::{HeadersClient, HeadersRequest, HeadersStream};
use crate::consensus::Consensus;
use async_trait::async_trait;
use reth_primitives::{
rpc::{BlockId, BlockNumber},
Header, HeaderLocked, H256,
};
use reth_rpc_types::engine::ForkchoiceState;
use std::{fmt::Debug, time::Duration};
use thiserror::Error;
use tokio_stream::StreamExt;
/// The downloader error type
#[derive(Error, Debug, Clone)]
pub enum DownloadError {
/// Header validation failed
#[error("Failed to validate header {hash}. Details: {details}.")]
HeaderValidation {
/// Hash of header failing validation
hash: H256,
/// The details of validation failure
details: String,
},
/// No headers reponse received
#[error("Failed to get headers for request {request_id}.")]
NoHeaderResponse {
/// The last request ID
request_id: u64,
},
/// Timed out while waiting for request id response.
#[error("Timed out while getting headers for request {request_id}.")]
Timeout {
/// The request id that timed out
request_id: u64,
},
/// Error when checking that the current [`Header`] has the parent's hash as the parent_hash
/// field, and that they have sequential block numbers.
#[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")]
MismatchedHeaders {
/// The header number being evaluated
header_number: BlockNumber,
/// The header hash being evaluated
header_hash: H256,
/// The parent number being evaluated
parent_number: BlockNumber,
/// The parent hash being evaluated
parent_hash: H256,
},
}
impl DownloadError {
/// Returns bool indicating whether this error is retryable or fatal, in the cases
/// where the peer responds with no headers, or times out.
pub fn is_retryable(&self) -> bool {
matches!(self, DownloadError::NoHeaderResponse { .. } | DownloadError::Timeout { .. })
}
}
/// The header downloading strategy
#[async_trait]
pub trait Downloader: Sync + Send {
/// The Consensus used to verify block validity when
/// downloading
type Consensus: Consensus;
/// The Client used to download the headers
type Client: HeadersClient;
/// The request timeout duration
fn timeout(&self) -> Duration;
/// The consensus engine
fn consensus(&self) -> &Self::Consensus;
/// The headers client
fn client(&self) -> &Self::Client;
/// Download the headers
async fn download(
&self,
head: &HeaderLocked,
forkchoice: &ForkchoiceState,
) -> Result<Vec<HeaderLocked>, DownloadError>;
/// Perform a header request and returns the headers.
// TODO: Isn't this effectively blocking per request per downloader?
// Might be fine, given we can spawn multiple downloaders?
// TODO: Rethink this function, I don't really like the `stream: &mut HeadersStream`
// in the signature. Why can we not call `self.client.stream_headers()`? Gives lifetime error.
async fn download_headers(
&self,
stream: &mut HeadersStream,
start: BlockId,
limit: u64,
) -> Result<Vec<Header>, DownloadError> {
let request_id = rand::random();
let request = HeadersRequest { start, limit, reverse: true };
let _ = self.client().send_header_request(request_id, request).await;
// Filter stream by request id and non empty headers content
let stream = stream
.filter(|resp| request_id == resp.id && !resp.headers.is_empty())
.timeout(self.timeout());
// Pop the first item.
match Box::pin(stream).try_next().await {
Ok(Some(item)) => Ok(item.headers),
_ => return Err(DownloadError::NoHeaderResponse { request_id }),
}
}
/// Validate whether the header is valid in relation to it's parent
///
/// Returns Ok(false) if the
fn validate(&self, header: &HeaderLocked, parent: &HeaderLocked) -> Result<(), DownloadError> {
if !(parent.hash() == header.parent_hash && parent.number + 1 == header.number) {
return Err(DownloadError::MismatchedHeaders {
header_number: header.number.into(),
parent_number: parent.number.into(),
header_hash: header.hash(),
parent_hash: parent.hash(),
})
}
self.consensus().validate_header(header, parent).map_err(|e| {
DownloadError::HeaderValidation { hash: parent.hash(), details: e.to_string() }
})?;
Ok(())
}
}

View File

@ -0,0 +1,11 @@
/// Trait definition for [`HeadersClient`]
///
/// [`HeadersClient`]: client::HeadersClient
pub mod client;
/// A downloader that receives and verifies block headers, is generic
/// over the Consensus and the HeadersClient being used.
///
/// [`Consensus`]: crate::consensus::Consensus
/// [`HeadersClient`]: client::HeadersClient
pub mod downloader;

View File

@ -0,0 +1,13 @@
/// Traits for implementing P2P Header Clients. Also includes implementations
/// of a Linear and a Parallel downloader generic over the [`Consensus`] and
/// [`HeadersClient`].
///
/// [`Consensus`]: crate::consensus::Consensus
/// [`HeadersClient`]: crate::p2p::headers::HeadersClient
pub mod headers;
use futures::Stream;
use std::pin::Pin;
/// The stream of responses from the connected peers, generic over the response type.
pub type MessageStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;

View File

@ -0,0 +1,165 @@
use crate::{
consensus::{self, Consensus},
p2p::headers::{
client::{HeadersClient, HeadersRequest, HeadersResponse, HeadersStream},
downloader::{DownloadError, Downloader},
},
};
use std::{collections::HashSet, sync::Arc, time::Duration};
use reth_primitives::{Header, HeaderLocked, H256, H512};
use reth_rpc_types::engine::ForkchoiceState;
use tokio::sync::{broadcast, mpsc, watch};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
#[derive(Debug)]
/// A test downloader which just returns the values that have been pushed to it.
pub struct TestDownloader {
result: Result<Vec<HeaderLocked>, DownloadError>,
}
impl TestDownloader {
/// Instantiates the downloader with the mock responses
pub fn new(result: Result<Vec<HeaderLocked>, DownloadError>) -> Self {
Self { result }
}
}
#[async_trait::async_trait]
impl Downloader for TestDownloader {
type Consensus = TestConsensus;
type Client = TestHeadersClient;
fn timeout(&self) -> Duration {
Duration::from_millis(1000)
}
fn consensus(&self) -> &Self::Consensus {
unimplemented!()
}
fn client(&self) -> &Self::Client {
unimplemented!()
}
async fn download(
&self,
_: &HeaderLocked,
_: &ForkchoiceState,
) -> Result<Vec<HeaderLocked>, DownloadError> {
self.result.clone()
}
}
#[derive(Debug)]
/// A test client for fetching headers
pub struct TestHeadersClient {
req_tx: mpsc::Sender<(u64, HeadersRequest)>,
req_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<(u64, HeadersRequest)>>>,
res_tx: broadcast::Sender<HeadersResponse>,
res_rx: broadcast::Receiver<HeadersResponse>,
}
impl Default for TestHeadersClient {
/// Construct a new test header downloader.
fn default() -> Self {
let (req_tx, req_rx) = mpsc::channel(1);
let (res_tx, res_rx) = broadcast::channel(1);
Self { req_tx, req_rx: Arc::new(tokio::sync::Mutex::new(req_rx)), res_tx, res_rx }
}
}
impl TestHeadersClient {
/// Helper for interacting with the environment on each request, allowing the client
/// to also reply to messages.
pub async fn on_header_request<T, F>(&self, mut count: usize, mut f: F) -> Vec<T>
where
F: FnMut(u64, HeadersRequest) -> T,
{
let mut rx = self.req_rx.lock().await;
let mut results = vec![];
while let Some((id, req)) = rx.recv().await {
results.push(f(id, req));
count -= 1;
if count == 0 {
break
}
}
results
}
/// Helper for pushing responses to the client
pub fn send_header_response(&self, id: u64, headers: Vec<Header>) {
self.res_tx.send((id, headers).into()).expect("failed to send header response");
}
}
#[async_trait::async_trait]
impl HeadersClient for TestHeadersClient {
// noop
async fn update_status(&self, _height: u64, _hash: H256, _td: H256) {}
async fn send_header_request(&self, id: u64, request: HeadersRequest) -> HashSet<H512> {
self.req_tx.send((id, request)).await.expect("failed to send request");
HashSet::default()
}
async fn stream_headers(&self) -> HeadersStream {
Box::pin(BroadcastStream::new(self.res_rx.resubscribe()).filter_map(|e| e.ok()))
}
}
/// Consensus client impl for testing
#[derive(Debug)]
pub struct TestConsensus {
/// Watcher over the forkchoice state
channel: (watch::Sender<ForkchoiceState>, watch::Receiver<ForkchoiceState>),
/// Flag whether the header validation should purposefully fail
fail_validation: bool,
}
impl Default for TestConsensus {
fn default() -> Self {
Self {
channel: watch::channel(ForkchoiceState {
head_block_hash: H256::zero(),
finalized_block_hash: H256::zero(),
safe_block_hash: H256::zero(),
}),
fail_validation: false,
}
}
}
impl TestConsensus {
/// Update the forkchoice state
pub fn update_tip(&mut self, tip: H256) {
let state = ForkchoiceState {
head_block_hash: tip,
finalized_block_hash: H256::zero(),
safe_block_hash: H256::zero(),
};
self.channel.0.send(state).expect("updating forkchoice state failed");
}
/// Update the validation flag
pub fn set_fail_validation(&mut self, val: bool) {
self.fail_validation = val;
}
}
#[async_trait::async_trait]
impl Consensus for TestConsensus {
fn fork_choice_state(&self) -> watch::Receiver<ForkchoiceState> {
self.channel.1.clone()
}
fn validate_header(&self, _header: &Header, _parent: &Header) -> Result<(), consensus::Error> {
if self.fail_validation {
Err(consensus::Error::ConsensusError)
} else {
Ok(())
}
}
}

View File

@ -0,0 +1,22 @@
[package]
name = "reth-headers-downloaders"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/foundry-rs/reth"
readme = "README.md"
description = "Implementations of various header downloader"
[dependencies]
async-trait = "0.1.58"
reth-interfaces = { path = "../../interfaces" }
reth-primitives = { path = "../../primitives" }
reth-rpc-types = { path = "../rpc-types" }
[dev-dependencies]
assert_matches = "1.5.0"
once_cell = "1.15.0"
rand = "0.8.5"
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
tokio = { version = "1.21.2", features = ["full"] }
serial_test = "0.9.0"

View File

@ -0,0 +1,11 @@
#![warn(missing_docs, unreachable_pub)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Implements Header Downloader algorithms
/// A Linear downloader implementation.
pub mod linear;

View File

@ -0,0 +1,419 @@
use std::{borrow::Borrow, sync::Arc, time::Duration};
use async_trait::async_trait;
use reth_interfaces::{
consensus::Consensus,
p2p::headers::{
client::{HeadersClient, HeadersStream},
downloader::{DownloadError, Downloader},
},
};
use reth_primitives::{rpc::BlockId, HeaderLocked};
use reth_rpc_types::engine::ForkchoiceState;
/// Download headers in batches
#[derive(Debug)]
pub struct LinearDownloader<C, H> {
/// The consensus client
consensus: Arc<C>,
/// The headers client
client: Arc<H>,
/// The batch size per one request
pub batch_size: u64,
/// A single request timeout
pub request_timeout: Duration,
/// The number of retries for downloading
pub request_retries: usize,
}
#[async_trait]
impl<C: Consensus, H: HeadersClient> Downloader for LinearDownloader<C, H> {
type Consensus = C;
type Client = H;
fn consensus(&self) -> &Self::Consensus {
self.consensus.borrow()
}
fn client(&self) -> &Self::Client {
self.client.borrow()
}
/// The request timeout
fn timeout(&self) -> Duration {
self.request_timeout
}
/// Download headers in batches with retries.
/// Returns the header collection in sorted descending
/// order from chain tip to local head
async fn download(
&self,
head: &HeaderLocked,
forkchoice: &ForkchoiceState,
) -> Result<Vec<HeaderLocked>, DownloadError> {
let mut stream = self.client().stream_headers().await;
let mut retries = self.request_retries;
// Header order will be preserved during inserts
let mut out = vec![];
loop {
let result = self.download_batch(&mut stream, forkchoice, head, out.last()).await;
match result {
Ok(result) => match result {
LinearDownloadResult::Batch(mut headers) => {
out.append(&mut headers);
}
LinearDownloadResult::Finished(mut headers) => {
out.append(&mut headers);
return Ok(out)
}
LinearDownloadResult::Ignore => (),
},
Err(e) if e.is_retryable() && retries > 1 => {
retries -= 1;
}
Err(e) => return Err(e),
}
}
}
}
/// The intermediate download result
#[derive(Debug)]
pub enum LinearDownloadResult {
/// Downloaded last batch up to tip
Finished(Vec<HeaderLocked>),
/// Downloaded batch
Batch(Vec<HeaderLocked>),
/// Ignore this batch
Ignore,
}
impl<C: Consensus, H: HeadersClient> LinearDownloader<C, H> {
async fn download_batch(
&self,
stream: &mut HeadersStream,
forkchoice: &ForkchoiceState,
head: &HeaderLocked,
earliest: Option<&HeaderLocked>,
) -> Result<LinearDownloadResult, DownloadError> {
// Request headers starting from tip or earliest cached
let start = earliest.map_or(forkchoice.head_block_hash, |h| h.parent_hash);
let mut headers =
self.download_headers(stream, BlockId::Hash(start), self.batch_size).await?;
headers.sort_unstable_by_key(|h| h.number);
let mut out = Vec::with_capacity(headers.len());
// Iterate headers in reverse
for parent in headers.into_iter().rev() {
let parent = parent.lock();
if head.hash() == parent.hash() {
// We've reached the target
return Ok(LinearDownloadResult::Finished(out))
}
match out.last().or(earliest) {
Some(header) => {
match self.validate(header, &parent) {
// ignore mismatched headers
Err(DownloadError::MismatchedHeaders { .. }) => {
return Ok(LinearDownloadResult::Ignore)
}
// propagate any other error if any
Err(e) => return Err(e),
// proceed to insert if validation is successful
_ => (),
};
}
// The buffer is empty and the first header does not match the tip, discard
// TODO: penalize the peer?
None if parent.hash() != forkchoice.head_block_hash => {
return Ok(LinearDownloadResult::Ignore)
}
_ => (),
};
out.push(parent);
}
Ok(LinearDownloadResult::Batch(out))
}
}
/// The builder for [LinearDownloader] with
/// some default settings
#[derive(Debug)]
pub struct LinearDownloadBuilder {
/// The batch size per one request
batch_size: u64,
/// A single request timeout
request_timeout: Duration,
/// The number of retries for downloading
request_retries: usize,
}
impl Default for LinearDownloadBuilder {
fn default() -> Self {
Self { batch_size: 100, request_timeout: Duration::from_millis(100), request_retries: 5 }
}
}
impl LinearDownloadBuilder {
/// Initialize a new builder
pub fn new() -> Self {
Self::default()
}
/// Set the request batch size
pub fn batch_size(mut self, size: u64) -> Self {
self.batch_size = size;
self
}
/// Set the request timeout
pub fn timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
/// Set the number of retries per request
pub fn retries(mut self, retries: usize) -> Self {
self.request_retries = retries;
self
}
/// Build [LinearDownloader] with provided consensus
/// and header client implementations
pub fn build<C: Consensus, H: HeadersClient>(
self,
consensus: Arc<C>,
client: Arc<H>,
) -> LinearDownloader<C, H> {
LinearDownloader {
consensus,
client,
batch_size: self.batch_size,
request_timeout: self.request_timeout,
request_retries: self.request_retries,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use reth_interfaces::{
p2p::headers::client::HeadersRequest,
test_utils::{TestConsensus, TestHeadersClient},
};
use reth_primitives::{rpc::BlockId, HeaderLocked, H256};
use assert_matches::assert_matches;
use once_cell::sync::Lazy;
use serial_test::serial;
use tokio::sync::oneshot::{self, error::TryRecvError};
static CONSENSUS: Lazy<Arc<TestConsensus>> = Lazy::new(|| Arc::new(TestConsensus::default()));
static CONSENSUS_FAIL: Lazy<Arc<TestConsensus>> = Lazy::new(|| {
let mut consensus = TestConsensus::default();
consensus.set_fail_validation(true);
Arc::new(consensus)
});
static CLIENT: Lazy<Arc<TestHeadersClient>> =
Lazy::new(|| Arc::new(TestHeadersClient::default()));
#[tokio::test]
#[serial]
async fn download_timeout() {
let retries = 5;
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let downloader = LinearDownloadBuilder::new()
.retries(retries)
.build(CONSENSUS.clone(), CLIENT.clone());
let result =
downloader.download(&HeaderLocked::default(), &ForkchoiceState::default()).await;
tx.send(result).expect("failed to forward download response");
});
let mut requests = vec![];
CLIENT
.on_header_request(retries, |_id, req| {
requests.push(req);
})
.await;
assert_eq!(requests.len(), retries);
assert_matches!(rx.await, Ok(Err(DownloadError::NoHeaderResponse { .. })));
}
#[tokio::test]
#[serial]
async fn download_timeout_on_invalid_messages() {
let retries = 5;
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let downloader = LinearDownloadBuilder::new()
.retries(retries)
.build(CONSENSUS.clone(), CLIENT.clone());
let result =
downloader.download(&HeaderLocked::default(), &ForkchoiceState::default()).await;
tx.send(result).expect("failed to forward download response");
});
let mut num_of_reqs = 0;
let mut last_req_id: Option<u64> = None;
CLIENT
.on_header_request(retries, |id, _req| {
num_of_reqs += 1;
last_req_id = Some(id);
CLIENT.send_header_response(id.saturating_add(id % 2), vec![]);
})
.await;
assert_eq!(num_of_reqs, retries);
assert_matches!(
rx.await,
Ok(Err(DownloadError::NoHeaderResponse { request_id })) if request_id == last_req_id.unwrap()
);
}
#[tokio::test]
#[serial]
async fn download_propagates_consensus_validation_error() {
let tip_parent = gen_random_header(1, None);
let tip = gen_random_header(2, Some(tip_parent.hash()));
let tip_hash = tip.hash();
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let downloader =
LinearDownloadBuilder::new().build(CONSENSUS_FAIL.clone(), CLIENT.clone());
let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() };
let result = downloader.download(&HeaderLocked::default(), &forkchoice).await;
tx.send(result).expect("failed to forward download response");
});
let requests = CLIENT.on_header_request(1, |id, req| (id, req)).await;
let request = requests.last();
assert_matches!(
request,
Some((_, HeadersRequest { start, .. }))
if matches!(start, BlockId::Hash(hash) if *hash == tip_hash)
);
let request = request.unwrap();
CLIENT.send_header_response(
request.0,
vec![tip_parent.clone().unlock(), tip.clone().unlock()],
);
assert_matches!(
rx.await,
Ok(Err(DownloadError::HeaderValidation { hash, .. })) if hash == tip_parent.hash()
);
}
#[tokio::test]
#[serial]
async fn download_starts_with_chain_tip() {
let head = gen_random_header(1, None);
let tip = gen_random_header(2, Some(head.hash()));
let tip_hash = tip.hash();
let chain_head = head.clone();
let (tx, mut rx) = oneshot::channel();
tokio::spawn(async move {
let downloader = LinearDownloadBuilder::new().build(CONSENSUS.clone(), CLIENT.clone());
let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() };
let result = downloader.download(&chain_head, &forkchoice).await;
tx.send(result).expect("failed to forward download response");
});
CLIENT
.on_header_request(1, |id, _req| {
let mut corrupted_tip = tip.clone().unlock();
corrupted_tip.nonce = rand::random();
CLIENT.send_header_response(id, vec![corrupted_tip, head.clone().unlock()])
})
.await;
assert_matches!(rx.try_recv(), Err(TryRecvError::Empty));
CLIENT
.on_header_request(1, |id, _req| {
CLIENT.send_header_response(id, vec![tip.clone().unlock(), head.clone().unlock()])
})
.await;
let result = rx.await;
assert_matches!(result, Ok(Ok(ref val)) if val.len() == 1);
assert_eq!(*result.unwrap().unwrap().first().unwrap(), tip);
}
#[tokio::test]
#[serial]
async fn download_returns_headers_desc() {
let (start, end) = (100, 200);
let head = gen_random_header(start, None);
let mut headers = gen_block_range(start + 1..end, head.hash());
headers.reverse();
let tip_hash = headers.first().unwrap().hash();
let chain_head = head.clone();
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let downloader = LinearDownloadBuilder::new().build(CONSENSUS.clone(), CLIENT.clone());
let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() };
let result = downloader.download(&chain_head, &forkchoice).await;
tx.send(result).expect("failed to forward download response");
});
let mut idx = 0;
let chunk_size = 10;
// `usize::div_ceil` is unstable. ref: https://github.com/rust-lang/rust/issues/88581
let count = (headers.len() + chunk_size - 1) / chunk_size;
CLIENT
.on_header_request(count + 1, |id, _req| {
let mut chunk =
headers.iter().skip(chunk_size * idx).take(chunk_size).cloned().peekable();
idx += 1;
if chunk.peek().is_some() {
let headers: Vec<_> = chunk.map(|h| h.unlock()).collect();
CLIENT.send_header_response(id, headers);
} else {
CLIENT.send_header_response(id, vec![head.clone().unlock()])
}
})
.await;
let result = rx.await;
assert_matches!(result, Ok(Ok(_)));
let result = result.unwrap().unwrap();
assert_eq!(result.len(), headers.len());
assert_eq!(result, headers);
}
pub(crate) fn gen_block_range(rng: std::ops::Range<u64>, head: H256) -> Vec<HeaderLocked> {
let mut headers = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize);
for idx in rng {
headers.push(gen_random_header(
idx,
Some(headers.last().map(|h: &HeaderLocked| h.hash()).unwrap_or(head)),
));
}
headers
}
pub(crate) fn gen_random_header(number: u64, parent: Option<H256>) -> HeaderLocked {
let header = reth_primitives::Header {
number,
nonce: rand::random(),
parent_hash: parent.unwrap_or_default(),
..Default::default()
};
header.lock()
}
}

View File

@ -25,7 +25,7 @@ pub struct ExecutionPayload {
} }
/// This structure encapsulates the fork choice state /// This structure encapsulates the fork choice state
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ForkchoiceState { pub struct ForkchoiceState {
pub head_block_hash: H256, pub head_block_hash: H256,

View File

@ -198,6 +198,13 @@ impl Deref for HeaderLocked {
} }
impl HeaderLocked { impl HeaderLocked {
/// Construct a new locked header.
/// Applicable when hash is known from
/// the database provided it's not corrupted.
pub fn new(header: Header, hash: H256) -> Self {
Self { header, hash }
}
/// Extract raw header that can be modified. /// Extract raw header that can be modified.
pub fn unlock(self) -> Header { pub fn unlock(self) -> Header {
self.header self.header