feat(p2p): refactor downloaders and add peer id to the result (#410)

* feat(p2p): refactor downloaders and add peer id to the result

* rm unused import

* fix tests

* clean up deps

* Update crates/interfaces/src/p2p/error.rs

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>

* add split fn

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
Roman Krasiuk
2022-12-13 18:14:45 +02:00
committed by GitHub
parent 2b1bb05ca3
commit 5057e8ec0a
28 changed files with 220 additions and 320 deletions

91
Cargo.lock generated
View File

@ -869,19 +869,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "dashmap"
version = "5.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
dependencies = [
"cfg-if",
"hashbrown 0.12.3",
"lock_api",
"once_cell",
"parking_lot_core 0.9.4",
]
[[package]] [[package]]
name = "data-encoding" name = "data-encoding"
version = "2.3.2" version = "2.3.2"
@ -3204,22 +3191,6 @@ dependencies = [
"walkdir", "walkdir",
] ]
[[package]]
name = "reth-bodies-downloaders"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"backon",
"futures-util",
"once_cell",
"rand 0.8.5",
"reth-eth-wire",
"reth-interfaces",
"reth-primitives",
"tokio",
]
[[package]] [[package]]
name = "reth-codecs" name = "reth-codecs"
version = "0.1.0" version = "0.1.0"
@ -3304,6 +3275,23 @@ dependencies = [
"url", "url",
] ]
[[package]]
name = "reth-downloaders"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"backon",
"futures",
"futures-util",
"once_cell",
"reth-eth-wire",
"reth-interfaces",
"reth-primitives",
"reth-rpc-types",
"tokio",
]
[[package]] [[package]]
name = "reth-ecies" name = "reth-ecies"
version = "0.1.0" version = "0.1.0"
@ -3383,22 +3371,6 @@ dependencies = [
"triehash", "triehash",
] ]
[[package]]
name = "reth-headers-downloaders"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"futures",
"once_cell",
"rand 0.8.5",
"reth-interfaces",
"reth-primitives",
"reth-rpc-types",
"serial_test",
"tokio",
]
[[package]] [[package]]
name = "reth-interfaces" name = "reth-interfaces"
version = "0.1.0" version = "0.1.0"
@ -3645,11 +3617,10 @@ dependencies = [
"metrics", "metrics",
"rand 0.8.5", "rand 0.8.5",
"rayon", "rayon",
"reth-bodies-downloaders",
"reth-db", "reth-db",
"reth-downloaders",
"reth-eth-wire", "reth-eth-wire",
"reth-executor", "reth-executor",
"reth-headers-downloaders",
"reth-interfaces", "reth-interfaces",
"reth-primitives", "reth-primitives",
"reth-provider", "reth-provider",
@ -4099,32 +4070,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serial_test"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92761393ee4dc3ff8f4af487bd58f4307c9329bbedea02cac0089ad9c411e153"
dependencies = [
"dashmap",
"futures",
"lazy_static",
"log",
"parking_lot 0.12.1",
"serial_test_derive",
]
[[package]]
name = "serial_test_derive"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b6f5d1c3087fb119617cff2966fe3808a80e5eb59a8c1601d5994d66f4346a5"
dependencies = [
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "sha-1" name = "sha-1"
version = "0.9.8" version = "0.9.8"

View File

@ -15,8 +15,7 @@ members = [
"crates/net/rpc", "crates/net/rpc",
"crates/net/rpc-api", "crates/net/rpc-api",
"crates/net/rpc-types", "crates/net/rpc-types",
"crates/net/headers-downloaders", "crates/net/downloaders",
"crates/net/bodies-downloaders",
"crates/primitives", "crates/primitives",
"crates/stages", "crates/stages",
"crates/storage/codecs", "crates/storage/codecs",

View File

@ -1,4 +1,4 @@
use crate::p2p::error::RequestResult; use crate::p2p::error::PeerRequestResult;
use async_trait::async_trait; use async_trait::async_trait;
use reth_eth_wire::BlockBody; use reth_eth_wire::BlockBody;
use reth_primitives::H256; use reth_primitives::H256;
@ -9,5 +9,5 @@ use std::fmt::Debug;
#[auto_impl::auto_impl(&, Arc, Box)] #[auto_impl::auto_impl(&, Arc, Box)]
pub trait BodiesClient: Send + Sync + Debug { pub trait BodiesClient: Send + Sync + Debug {
/// Fetches the block body for the requested block. /// Fetches the block body for the requested block.
async fn get_block_body(&self, hash: Vec<H256>) -> RequestResult<Vec<BlockBody>>; async fn get_block_body(&self, hash: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>>;
} }

View File

@ -0,0 +1,24 @@
use super::headers::error::DownloadError;
use crate::consensus::Consensus;
use futures::Stream;
use std::pin::Pin;
/// A stream for downloading response.
pub type DownloadStream<T> = Pin<Box<dyn Stream<Item = Result<T, DownloadError>> + Send>>;
/// The generic trait for requesting and verifying data
/// over p2p network client
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait Downloader: Send + Sync {
/// The client used to fetch necessary data
type Client;
/// The Consensus used to verify data validity when downloading
type Consensus: Consensus;
/// The headers client
fn client(&self) -> &Self::Client;
/// The consensus engine
fn consensus(&self) -> &Self::Consensus;
}

View File

@ -1,8 +1,12 @@
use reth_primitives::WithPeerId;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
/// Result alias for result of a request. /// Result alias for result of a request.
pub type RequestResult<T> = Result<T, RequestError>; pub type RequestResult<T> = Result<T, RequestError>;
/// Result with [PeerId]
pub type PeerRequestResult<T> = RequestResult<WithPeerId<T>>;
/// Error variants that can happen when sending requests to a session. /// Error variants that can happen when sending requests to a session.
#[derive(Debug, thiserror::Error, Clone)] #[derive(Debug, thiserror::Error, Clone)]
#[allow(missing_docs)] #[allow(missing_docs)]

View File

@ -1,4 +1,4 @@
use crate::p2p::error::RequestResult; use crate::p2p::error::PeerRequestResult;
use async_trait::async_trait; use async_trait::async_trait;
pub use reth_eth_wire::BlockHeaders; pub use reth_eth_wire::BlockHeaders;
use reth_primitives::{BlockHashOrNumber, HeadersDirection, H256, U256}; use reth_primitives::{BlockHashOrNumber, HeadersDirection, H256, U256};
@ -27,5 +27,5 @@ pub trait HeadersClient: Send + Sync + Debug {
/// Sends the header request to the p2p network and returns the header response received from a /// Sends the header request to the p2p network and returns the header response received from a
/// peer. /// peer.
async fn get_headers(&self, request: HeadersRequest) -> RequestResult<BlockHeaders>; async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult<BlockHeaders>;
} }

View File

@ -1,58 +1,28 @@
use super::client::HeadersClient;
use crate::{ use crate::{
consensus::Consensus, consensus::Consensus,
p2p::{headers::error::DownloadError, traits::BatchDownload}, p2p::{
downloader::{DownloadStream, Downloader},
headers::error::DownloadError,
},
}; };
use futures::Stream;
use reth_primitives::SealedHeader; use reth_primitives::SealedHeader;
use reth_rpc_types::engine::ForkchoiceState; use reth_rpc_types::engine::ForkchoiceState;
use std::pin::Pin;
/// A Future for downloading a batch of headers.
pub type HeaderBatchDownload<'a> = Pin<
Box<
dyn BatchDownload<
Ok = SealedHeader,
Error = DownloadError,
Output = Result<Vec<SealedHeader>, DownloadError>,
> + Send
+ 'a,
>,
>;
/// A stream for downloading headers.
pub type HeaderDownloadStream =
Pin<Box<dyn Stream<Item = Result<SealedHeader, DownloadError>> + Send>>;
/// A downloader capable of fetching block headers. /// A downloader capable of fetching block headers.
/// ///
/// A downloader represents a distinct strategy for submitting requests to download block headers, /// A downloader represents a distinct strategy for submitting requests to download block headers,
/// while a [HeadersClient] represents a client capable of fulfilling these requests. /// while a [HeadersClient] represents a client capable of fulfilling these requests.
#[auto_impl::auto_impl(&, Arc, Box)] #[auto_impl::auto_impl(&, Arc, Box)]
pub trait HeaderDownloader: Sync + Send + Unpin { pub trait HeaderDownloader: Downloader {
/// The Consensus used to verify block validity when
/// downloading
type Consensus: Consensus;
/// The Client used to download the headers
type Client: HeadersClient;
/// The consensus engine
fn consensus(&self) -> &Self::Consensus;
/// The headers client
fn client(&self) -> &Self::Client;
/// Download the headers
fn download(&self, head: SealedHeader, forkchoice: ForkchoiceState) -> HeaderBatchDownload<'_>;
/// Stream the headers /// Stream the headers
fn stream(&self, head: SealedHeader, forkchoice: ForkchoiceState) -> HeaderDownloadStream; fn stream(
&self,
head: SealedHeader,
forkchoice: ForkchoiceState,
) -> DownloadStream<SealedHeader>;
/// Validate whether the header is valid in relation to it's parent /// Validate whether the header is valid in relation to it's parent
///
/// Returns Ok(false) if the
fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> { fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> {
validate_header_download(self.consensus(), header, parent)?; validate_header_download(self.consensus(), header, parent)?;
Ok(()) Ok(())

View File

@ -1,3 +1,6 @@
/// The generic downloader trait for implementing data downloaders over P2P.
pub mod downloader;
/// Traits for implementing P2P block body clients. /// Traits for implementing P2P block body clients.
pub mod bodies; pub mod bodies;
@ -12,6 +15,3 @@ pub mod headers;
/// Error types broadly used by p2p interfaces for any operation which may produce an error when /// Error types broadly used by p2p interfaces for any operation which may produce an error when
/// interacting with the network implementation /// interacting with the network implementation
pub mod error; pub mod error;
/// Commonly used traits when implementing clients.
pub mod traits;

View File

@ -1,21 +0,0 @@
use futures::Stream;
use std::future::Future;
/// Abstraction for downloading several items at once.
///
/// A [`BatchDownload`] is a [`Future`] that represents a collection of download futures and
/// resolves once all of them finished.
///
/// This is similar to the [`futures::future::join_all`] function, but it's open to implementers how
/// this Future behaves exactly.
///
/// It is expected that the underlying futures return a [`Result`].
pub trait BatchDownload: Future<Output = Result<Vec<Self::Ok>, Self::Error>> {
/// The `Ok` variant of the futures output in this batch.
type Ok;
/// The `Err` variant of the futures output in this batch.
type Error;
/// Consumes the batch future and returns a [`Stream`] that yields results as they become ready.
fn into_stream_unordered(self) -> Box<dyn Stream<Item = Result<Self::Ok, Self::Error>>>;
}

View File

@ -1,4 +1,4 @@
use crate::p2p::{bodies::client::BodiesClient, error::RequestResult}; use crate::p2p::{bodies::client::BodiesClient, error::PeerRequestResult};
use async_trait::async_trait; use async_trait::async_trait;
use reth_eth_wire::BlockBody; use reth_eth_wire::BlockBody;
use reth_primitives::H256; use reth_primitives::H256;
@ -19,9 +19,9 @@ impl<F> Debug for TestBodiesClient<F> {
#[async_trait] #[async_trait]
impl<F> BodiesClient for TestBodiesClient<F> impl<F> BodiesClient for TestBodiesClient<F>
where where
F: Fn(Vec<H256>) -> RequestResult<Vec<BlockBody>> + Send + Sync, F: Fn(Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> + Send + Sync,
{ {
async fn get_block_body(&self, hashes: Vec<H256>) -> RequestResult<Vec<BlockBody>> { async fn get_block_body(&self, hashes: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> {
(self.responder)(hashes) (self.responder)(hashes)
} }
} }

View File

@ -2,19 +2,19 @@
use crate::{ use crate::{
consensus::{self, Consensus}, consensus::{self, Consensus},
p2p::{ p2p::{
error::{RequestError, RequestResult}, downloader::{DownloadStream, Downloader},
error::{PeerRequestResult, RequestError},
headers::{ headers::{
client::{HeadersClient, HeadersRequest}, client::{HeadersClient, HeadersRequest},
downloader::{HeaderBatchDownload, HeaderDownloadStream, HeaderDownloader}, downloader::HeaderDownloader,
error::DownloadError, error::DownloadError,
}, },
traits::BatchDownload,
}, },
}; };
use futures::{Future, FutureExt, Stream}; use futures::{Future, FutureExt, Stream};
use reth_eth_wire::BlockHeaders; use reth_eth_wire::BlockHeaders;
use reth_primitives::{ use reth_primitives::{
BlockLocked, BlockNumber, Header, HeadersDirection, SealedHeader, H256, U256, BlockLocked, BlockNumber, Header, HeadersDirection, PeerId, SealedHeader, H256, U256,
}; };
use reth_rpc_types::engine::ForkchoiceState; use reth_rpc_types::engine::ForkchoiceState;
use std::{ use std::{
@ -53,8 +53,7 @@ impl TestHeaderDownloader {
} }
} }
#[async_trait::async_trait] impl Downloader for TestHeaderDownloader {
impl HeaderDownloader for TestHeaderDownloader {
type Consensus = TestConsensus; type Consensus = TestConsensus;
type Client = TestHeadersClient; type Client = TestHeadersClient;
@ -65,21 +64,20 @@ impl HeaderDownloader for TestHeaderDownloader {
fn client(&self) -> &Self::Client { fn client(&self) -> &Self::Client {
&self.client &self.client
} }
}
fn download( #[async_trait::async_trait]
impl HeaderDownloader for TestHeaderDownloader {
fn stream(
&self, &self,
_head: SealedHeader, _head: SealedHeader,
_forkchoice: ForkchoiceState, _forkchoice: ForkchoiceState,
) -> HeaderBatchDownload<'_> { ) -> DownloadStream<SealedHeader> {
Box::pin(self.create_download())
}
fn stream(&self, _head: SealedHeader, _forkchoice: ForkchoiceState) -> HeaderDownloadStream {
Box::pin(self.create_download()) Box::pin(self.create_download())
} }
} }
type TestHeadersFut = Pin<Box<dyn Future<Output = RequestResult<BlockHeaders>> + Send>>; type TestHeadersFut = Pin<Box<dyn Future<Output = PeerRequestResult<BlockHeaders>> + Send>>;
struct TestDownload { struct TestDownload {
client: Arc<TestHeadersClient>, client: Arc<TestHeadersClient>,
@ -105,30 +103,6 @@ impl TestDownload {
} }
} }
impl Future for TestDownload {
type Output = Result<Vec<SealedHeader>, DownloadError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let empty = SealedHeader::default();
if let Err(error) = self.consensus.validate_header(&empty, &empty) {
return Poll::Ready(Err(DownloadError::HeaderValidation { hash: empty.hash(), error }))
}
match ready!(self.get_or_init_fut().poll_unpin(cx)) {
Ok(resp) => {
// Skip head and seal headers
let mut headers = resp.0.into_iter().skip(1).map(|h| h.seal()).collect::<Vec<_>>();
headers.sort_unstable_by_key(|h| h.number);
Poll::Ready(Ok(headers.into_iter().rev().collect()))
}
Err(err) => Poll::Ready(Err(match err {
RequestError::Timeout => DownloadError::Timeout,
_ => DownloadError::RequestError(err),
})),
}
}
}
impl Stream for TestDownload { impl Stream for TestDownload {
type Item = Result<SealedHeader, DownloadError>; type Item = Result<SealedHeader, DownloadError>;
@ -155,7 +129,7 @@ impl Stream for TestDownload {
Ok(resp) => { Ok(resp) => {
// Skip head and seal headers // Skip head and seal headers
let mut headers = let mut headers =
resp.0.into_iter().skip(1).map(|h| h.seal()).collect::<Vec<_>>(); resp.1 .0.into_iter().skip(1).map(|h| h.seal()).collect::<Vec<_>>();
headers.sort_unstable_by_key(|h| h.number); headers.sort_unstable_by_key(|h| h.number);
headers.into_iter().for_each(|h| this.buffer.push(h)); headers.into_iter().for_each(|h| this.buffer.push(h));
this.done = true; this.done = true;
@ -173,15 +147,6 @@ impl Stream for TestDownload {
} }
} }
impl BatchDownload for TestDownload {
type Ok = SealedHeader;
type Error = DownloadError;
fn into_stream_unordered(self) -> Box<dyn Stream<Item = Result<Self::Ok, Self::Error>>> {
Box::new(self)
}
}
/// A test client for fetching headers /// A test client for fetching headers
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct TestHeadersClient { pub struct TestHeadersClient {
@ -207,7 +172,7 @@ impl TestHeadersClient {
impl HeadersClient for TestHeadersClient { impl HeadersClient for TestHeadersClient {
fn update_status(&self, _height: u64, _hash: H256, _td: U256) {} fn update_status(&self, _height: u64, _hash: H256, _td: U256) {}
async fn get_headers(&self, request: HeadersRequest) -> RequestResult<BlockHeaders> { async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult<BlockHeaders> {
if let Some(err) = &mut *self.error.lock().await { if let Some(err) = &mut *self.error.lock().await {
return Err(err.clone()) return Err(err.clone())
} }
@ -215,7 +180,7 @@ impl HeadersClient for TestHeadersClient {
let mut lock = self.responses.lock().await; let mut lock = self.responses.lock().await;
let len = lock.len().min(request.limit as usize); let len = lock.len().min(request.limit as usize);
let resp = lock.drain(..len).collect(); let resp = lock.drain(..len).collect();
return Ok(BlockHeaders(resp)) return Ok((PeerId::default(), BlockHeaders(resp)).into())
} }
} }

View File

@ -1,24 +1,29 @@
[package] [package]
name = "reth-bodies-downloaders" name = "reth-downloaders"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
license = "MIT OR Apache-2.0" license = "MIT OR Apache-2.0"
repository = "https://github.com/paradigmxyz/reth" repository = "https://github.com/paradigmxyz/reth"
readme = "README.md" readme = "README.md"
description = "Implementations of various block body downloaders" description = "Implementations of various block downloaders"
[dependencies] [dependencies]
futures-util = "0.3.25" # reth
reth-interfaces = { path = "../../interfaces" } reth-interfaces = { path = "../../interfaces" }
reth-primitives = { path = "../../primitives" } reth-primitives = { path = "../../primitives" }
reth-rpc-types = { path = "../rpc-types" }
reth-eth-wire = { path= "../eth-wire" } reth-eth-wire = { path= "../eth-wire" }
# async
async-trait = "0.1.58"
futures = "0.3"
futures-util = "0.3.25"
# misc
backon = "0.2.0" backon = "0.2.0"
[dev-dependencies] [dev-dependencies]
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
assert_matches = "1.5.0" assert_matches = "1.5.0"
once_cell = "1.15.0" once_cell = "1.15.0"
rand = "0.8.5"
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
tokio = { version = "1.21.2", features = ["full"] } tokio = { version = "1.21.2", features = ["full"] }
async-trait = "0.1.58"
futures-util = "0.3.25"

View File

@ -77,8 +77,8 @@ impl<C: BodiesClient> ConcurrentDownloader<C> {
block_number: BlockNumber, block_number: BlockNumber,
header_hash: H256, header_hash: H256,
) -> RequestResult<(BlockNumber, H256, BlockBody)> { ) -> RequestResult<(BlockNumber, H256, BlockBody)> {
let mut body = self.client.get_block_body(vec![header_hash]).await?; let mut response = self.client.get_block_body(vec![header_hash]).await?;
Ok((block_number, header_hash, body.remove(0))) Ok((block_number, header_hash, response.1.remove(0))) // TODO:
} }
} }
@ -86,14 +86,11 @@ impl<C: BodiesClient> ConcurrentDownloader<C> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::{ use crate::test_utils::{generate_bodies, TestClient};
concurrent::ConcurrentDownloader,
test_utils::{generate_bodies, TestClient},
};
use assert_matches::assert_matches; use assert_matches::assert_matches;
use futures_util::stream::{StreamExt, TryStreamExt}; use futures_util::stream::{StreamExt, TryStreamExt};
use reth_interfaces::p2p::{bodies::downloader::BodyDownloader, error::RequestError}; use reth_interfaces::p2p::{bodies::downloader::BodyDownloader, error::RequestError};
use reth_primitives::H256; use reth_primitives::{PeerId, H256};
use std::{ use std::{
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
@ -115,9 +112,13 @@ mod tests {
// Simulate that the request for this (random) block takes 0-100ms // Simulate that the request for this (random) block takes 0-100ms
tokio::time::sleep(Duration::from_millis(hash[0].to_low_u64_be() % 100)).await; tokio::time::sleep(Duration::from_millis(hash[0].to_low_u64_be() % 100)).await;
Ok(vec![bodies Ok((
PeerId::default(),
vec![bodies
.remove(&hash[0]) .remove(&hash[0])
.expect("Downloader asked for a block it should not ask for")]) .expect("Downloader asked for a block it should not ask for")],
)
.into())
} }
}))); })));
@ -167,7 +168,11 @@ mod tests {
retries_left.fetch_sub(1, Ordering::SeqCst); retries_left.fetch_sub(1, Ordering::SeqCst);
Err(RequestError::Timeout) Err(RequestError::Timeout)
} else { } else {
Ok(vec![BlockBody { transactions: vec![], ommers: vec![] }]) Ok((
PeerId::default(),
vec![BlockBody { transactions: vec![], ommers: vec![] }],
)
.into())
} }
} }
}))); })));
@ -199,7 +204,11 @@ mod tests {
retries_left.fetch_sub(1, Ordering::SeqCst); retries_left.fetch_sub(1, Ordering::SeqCst);
Err(RequestError::Timeout) Err(RequestError::Timeout)
} else { } else {
Ok(vec![BlockBody { transactions: vec![], ommers: vec![] }]) Ok((
PeerId::default(),
vec![BlockBody { transactions: vec![], ommers: vec![] }],
)
.into())
} }
} }
}))) })))

View File

@ -0,0 +1,2 @@
/// A naive concurrent downloader.
pub mod concurrent;

View File

@ -2,16 +2,13 @@ use futures::{stream::Stream, FutureExt};
use reth_interfaces::{ use reth_interfaces::{
consensus::Consensus, consensus::Consensus,
p2p::{ p2p::{
error::{RequestError, RequestResult}, downloader::{DownloadStream, Downloader},
error::{PeerRequestResult, RequestError},
headers::{ headers::{
client::{BlockHeaders, HeadersClient, HeadersRequest}, client::{BlockHeaders, HeadersClient, HeadersRequest},
downloader::{ downloader::{validate_header_download, HeaderDownloader},
validate_header_download, HeaderBatchDownload, HeaderDownloadStream,
HeaderDownloader,
},
error::DownloadError, error::DownloadError,
}, },
traits::BatchDownload,
}, },
}; };
use reth_primitives::{HeadersDirection, SealedHeader, H256}; use reth_primitives::{HeadersDirection, SealedHeader, H256};
@ -39,10 +36,10 @@ pub struct LinearDownloader<C, H> {
pub request_retries: usize, pub request_retries: usize,
} }
impl<C, H> HeaderDownloader for LinearDownloader<C, H> impl<C, H> Downloader for LinearDownloader<C, H>
where where
C: Consensus + 'static, C: Consensus,
H: HeadersClient + 'static, H: HeadersClient,
{ {
type Consensus = C; type Consensus = C;
type Client = H; type Client = H;
@ -54,12 +51,18 @@ where
fn client(&self) -> &Self::Client { fn client(&self) -> &Self::Client {
self.client.borrow() self.client.borrow()
} }
fn download(&self, head: SealedHeader, forkchoice: ForkchoiceState) -> HeaderBatchDownload<'_> {
Box::pin(self.new_download(head, forkchoice))
} }
fn stream(&self, head: SealedHeader, forkchoice: ForkchoiceState) -> HeaderDownloadStream { impl<C, H> HeaderDownloader for LinearDownloader<C, H>
where
C: Consensus + 'static,
H: HeadersClient + 'static,
{
fn stream(
&self,
head: SealedHeader,
forkchoice: ForkchoiceState,
) -> DownloadStream<SealedHeader> {
Box::pin(self.new_download(head, forkchoice)) Box::pin(self.new_download(head, forkchoice))
} }
} }
@ -95,7 +98,7 @@ impl<C: Consensus, H: HeadersClient> LinearDownloader<C, H> {
} }
} }
type HeadersFut = Pin<Box<dyn Future<Output = RequestResult<BlockHeaders>> + Send>>; type HeadersFut = Pin<Box<dyn Future<Output = PeerRequestResult<BlockHeaders>> + Send>>;
/// A retryable future that returns a list of [`BlockHeaders`] on success. /// A retryable future that returns a list of [`BlockHeaders`] on success.
struct HeadersRequestFuture { struct HeadersRequestFuture {
@ -119,7 +122,7 @@ impl HeadersRequestFuture {
} }
impl Future for HeadersRequestFuture { impl Future for HeadersRequestFuture {
type Output = RequestResult<BlockHeaders>; type Output = PeerRequestResult<BlockHeaders>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.get_mut().fut.poll_unpin(cx) self.get_mut().fut.poll_unpin(cx)
@ -222,11 +225,11 @@ where
fn process_header_response( fn process_header_response(
&mut self, &mut self,
response: Result<BlockHeaders, RequestError>, response: PeerRequestResult<BlockHeaders>,
) -> Result<(), DownloadError> { ) -> Result<(), DownloadError> {
match response { match response {
Ok(res) => { Ok(res) => {
let mut headers = res.0; let mut headers = res.1 .0;
headers.sort_unstable_by_key(|h| h.number); headers.sort_unstable_by_key(|h| h.number);
if headers.is_empty() { if headers.is_empty() {
@ -353,19 +356,6 @@ where
} }
} }
impl<C, H> BatchDownload for HeadersDownload<C, H>
where
C: Consensus + 'static,
H: HeadersClient + 'static,
{
type Ok = SealedHeader;
type Error = DownloadError;
fn into_stream_unordered(self) -> Box<dyn Stream<Item = Result<Self::Ok, Self::Error>>> {
Box::new(self)
}
}
/// The builder for [LinearDownloader] with /// The builder for [LinearDownloader] with
/// some default settings /// some default settings
#[derive(Debug)] #[derive(Debug)]
@ -438,12 +428,15 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn download_empty() { async fn stream_empty() {
let client = Arc::new(TestHeadersClient::default()); let client = Arc::new(TestHeadersClient::default());
let downloader = let downloader =
LinearDownloadBuilder::default().build(CONSENSUS.clone(), Arc::clone(&client)); LinearDownloadBuilder::default().build(CONSENSUS.clone(), Arc::clone(&client));
let result = downloader.download(SealedHeader::default(), ForkchoiceState::default()).await; let result = downloader
.stream(SealedHeader::default(), ForkchoiceState::default())
.try_collect::<Vec<_>>()
.await;
assert!(result.is_err()); assert!(result.is_err());
} }
@ -470,7 +463,7 @@ mod tests {
let fork = ForkchoiceState { head_block_hash: p0.hash_slow(), ..Default::default() }; let fork = ForkchoiceState { head_block_hash: p0.hash_slow(), ..Default::default() };
let result = downloader.download(p0, fork).await; let result = downloader.stream(p0, fork).try_collect::<Vec<_>>().await;
let headers = result.unwrap(); let headers = result.unwrap();
assert!(headers.is_empty()); assert!(headers.is_empty());
} }
@ -498,7 +491,7 @@ mod tests {
let fork = ForkchoiceState { head_block_hash: p0.hash_slow(), ..Default::default() }; let fork = ForkchoiceState { head_block_hash: p0.hash_slow(), ..Default::default() };
let result = downloader.download(p3, fork).await; let result = downloader.stream(p3, fork).try_collect::<Vec<_>>().await;
let headers = result.unwrap(); let headers = result.unwrap();
assert_eq!(headers.len(), 3); assert_eq!(headers.len(), 3);
assert_eq!(headers[0], p0); assert_eq!(headers[0], p0);

View File

@ -0,0 +1,2 @@
/// A Linear downloader implementation.
pub mod linear;

View File

@ -5,10 +5,13 @@
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))] ))]
//! Implements body downloader algorithms. //! Implements the downloader algorithms.
/// A naive concurrent downloader. /// The collection of algorithms for downloading block bodies.
pub mod concurrent; pub mod bodies;
/// The collection of alhgorithms for downloading block headers.
pub mod headers;
#[cfg(test)] #[cfg(test)]
mod test_utils; mod test_utils;

View File

@ -3,7 +3,7 @@
use async_trait::async_trait; use async_trait::async_trait;
use reth_eth_wire::BlockBody; use reth_eth_wire::BlockBody;
use reth_interfaces::{ use reth_interfaces::{
p2p::{bodies::client::BodiesClient, error::RequestResult}, p2p::{bodies::client::BodiesClient, error::PeerRequestResult},
test_utils::generators::random_block_range, test_utils::generators::random_block_range,
}; };
use reth_primitives::{BlockNumber, H256}; use reth_primitives::{BlockNumber, H256};
@ -58,9 +58,9 @@ impl<F> TestClient<F> {
impl<F, Fut> BodiesClient for TestClient<F> impl<F, Fut> BodiesClient for TestClient<F>
where where
F: FnMut(Vec<H256>) -> Fut + Send + Sync, F: FnMut(Vec<H256>) -> Fut + Send + Sync,
Fut: Future<Output = RequestResult<Vec<BlockBody>>> + Send, Fut: Future<Output = PeerRequestResult<Vec<BlockBody>>> + Send,
{ {
async fn get_block_body(&self, hash: Vec<H256>) -> RequestResult<Vec<BlockBody>> { async fn get_block_body(&self, hash: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> {
let f = &mut *self.0.lock().await; let f = &mut *self.0.lock().await;
(f)(hash).await (f)(hash).await
} }

View File

@ -1,26 +0,0 @@
[package]
name = "reth-headers-downloaders"
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/paradigmxyz/reth"
readme = "README.md"
description = "Implementations of various header downloader"
[dependencies]
# reth
reth-interfaces = { path = "../../interfaces" }
reth-primitives = { path = "../../primitives" }
reth-rpc-types = { path = "../rpc-types" }
# async
async-trait = "0.1.58"
futures = "0.3"
[dev-dependencies]
assert_matches = "1.5.0"
once_cell = "1.15.0"
rand = "0.8.5"
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
tokio = { version = "1.21.2", features = ["full"] }
serial_test = "0.9.0"

View File

@ -1,11 +0,0 @@
#![warn(missing_docs, unreachable_pub)]
#![deny(unused_must_use, rust_2018_idioms)]
#![doc(test(
no_crate_inject,
attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables))
))]
//! Implements Header Downloader algorithms
/// A Linear downloader implementation.
pub mod linear;

View File

@ -4,10 +4,10 @@ use crate::fetch::{DownloadRequest, StatusUpdate};
use reth_eth_wire::{BlockBody, BlockHeaders}; use reth_eth_wire::{BlockBody, BlockHeaders};
use reth_interfaces::p2p::{ use reth_interfaces::p2p::{
bodies::client::BodiesClient, bodies::client::BodiesClient,
error::RequestResult, error::PeerRequestResult,
headers::client::{HeadersClient, HeadersRequest}, headers::client::{HeadersClient, HeadersRequest},
}; };
use reth_primitives::{H256, U256}; use reth_primitives::{WithPeerId, H256, U256};
use tokio::sync::{mpsc::UnboundedSender, oneshot}; use tokio::sync::{mpsc::UnboundedSender, oneshot};
/// Front-end API for fetching data from the network. /// Front-end API for fetching data from the network.
@ -26,16 +26,16 @@ impl HeadersClient for FetchClient {
} }
/// Sends a `GetBlockHeaders` request to an available peer. /// Sends a `GetBlockHeaders` request to an available peer.
async fn get_headers(&self, request: HeadersRequest) -> RequestResult<BlockHeaders> { async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult<BlockHeaders> {
let (response, rx) = oneshot::channel(); let (response, rx) = oneshot::channel();
self.request_tx.send(DownloadRequest::GetBlockHeaders { request, response })?; self.request_tx.send(DownloadRequest::GetBlockHeaders { request, response })?;
rx.await?.map(BlockHeaders::from) rx.await?.map(WithPeerId::transform)
} }
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl BodiesClient for FetchClient { impl BodiesClient for FetchClient {
async fn get_block_body(&self, request: Vec<H256>) -> RequestResult<Vec<BlockBody>> { async fn get_block_body(&self, request: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> {
let (response, rx) = oneshot::channel(); let (response, rx) = oneshot::channel();
self.request_tx.send(DownloadRequest::GetBlockBodies { request, response })?; self.request_tx.send(DownloadRequest::GetBlockBodies { request, response })?;
rx.await? rx.await?

View File

@ -4,7 +4,7 @@ use crate::message::BlockRequest;
use futures::StreamExt; use futures::StreamExt;
use reth_eth_wire::{BlockBody, GetBlockBodies, GetBlockHeaders}; use reth_eth_wire::{BlockBody, GetBlockBodies, GetBlockHeaders};
use reth_interfaces::p2p::{ use reth_interfaces::p2p::{
error::{RequestError, RequestResult}, error::{PeerRequestResult, RequestError, RequestResult},
headers::client::HeadersRequest, headers::client::HeadersRequest,
}; };
use reth_primitives::{Header, PeerId, H256, U256}; use reth_primitives::{Header, PeerId, H256, U256};
@ -25,9 +25,11 @@ pub use client::FetchClient;
/// peers and sends the response once ready. /// peers and sends the response once ready.
pub struct StateFetcher { pub struct StateFetcher {
/// Currently active [`GetBlockHeaders`] requests /// Currently active [`GetBlockHeaders`] requests
inflight_headers_requests: HashMap<PeerId, Request<HeadersRequest, RequestResult<Vec<Header>>>>, inflight_headers_requests:
HashMap<PeerId, Request<HeadersRequest, PeerRequestResult<Vec<Header>>>>,
/// Currently active [`GetBlockBodies`] requests /// Currently active [`GetBlockBodies`] requests
inflight_bodies_requests: HashMap<PeerId, Request<Vec<H256>, RequestResult<Vec<BlockBody>>>>, inflight_bodies_requests:
HashMap<PeerId, Request<Vec<H256>, PeerRequestResult<Vec<BlockBody>>>>,
/// The list of available peers for requests. /// The list of available peers for requests.
peers: HashMap<PeerId, Peer>, peers: HashMap<PeerId, Peer>,
/// Requests queued for processing /// Requests queued for processing
@ -187,7 +189,7 @@ impl StateFetcher {
) -> Option<BlockResponseOutcome> { ) -> Option<BlockResponseOutcome> {
let is_error = res.is_err(); let is_error = res.is_err();
if let Some(resp) = self.inflight_headers_requests.remove(&peer_id) { if let Some(resp) = self.inflight_headers_requests.remove(&peer_id) {
let _ = resp.response.send(res); let _ = resp.response.send(res.map(|h| (peer_id, h).into()));
} }
if is_error { if is_error {
@ -215,7 +217,7 @@ impl StateFetcher {
res: RequestResult<Vec<BlockBody>>, res: RequestResult<Vec<BlockBody>>,
) -> Option<BlockResponseOutcome> { ) -> Option<BlockResponseOutcome> {
if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) { if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) {
let _ = resp.response.send(res); let _ = resp.response.send(res.map(|b| (peer_id, b).into()));
} }
if let Some(peer) = self.peers.get_mut(&peer_id) { if let Some(peer) = self.peers.get_mut(&peer_id) {
if peer.state.on_request_finished() { if peer.state.on_request_finished() {
@ -325,10 +327,13 @@ pub(crate) enum DownloadRequest {
/// Download the requested headers and send response through channel /// Download the requested headers and send response through channel
GetBlockHeaders { GetBlockHeaders {
request: HeadersRequest, request: HeadersRequest,
response: oneshot::Sender<RequestResult<Vec<Header>>>, response: oneshot::Sender<PeerRequestResult<Vec<Header>>>,
}, },
/// Download the requested headers and send response through channel /// Download the requested headers and send response through channel
GetBlockBodies { request: Vec<H256>, response: oneshot::Sender<RequestResult<Vec<BlockBody>>> }, GetBlockBodies {
request: Vec<H256>,
response: oneshot::Sender<PeerRequestResult<Vec<BlockBody>>>,
},
} }
// === impl DownloadRequest === // === impl DownloadRequest ===

View File

@ -67,7 +67,7 @@ async fn test_get_body() {
let res = fetch0.get_block_body(vec![block_hash]).await; let res = fetch0.get_block_body(vec![block_hash]).await;
assert!(res.is_ok()); assert!(res.is_ok());
let blocks = res.unwrap(); let blocks = res.unwrap().1;
assert_eq!(blocks.len(), 1); assert_eq!(blocks.len(), 1);
let expected = BlockBody { transactions: block.body, ommers: block.ommers }; let expected = BlockBody { transactions: block.body, ommers: block.ommers };
assert_eq!(blocks[0], expected); assert_eq!(blocks[0], expected);
@ -114,7 +114,7 @@ async fn test_get_header() {
let res = fetch0.get_headers(req).await; let res = fetch0.get_headers(req).await;
assert!(res.is_ok()); assert!(res.is_ok());
let headers = res.unwrap().0; let headers = res.unwrap().1 .0;
assert_eq!(headers.len(), 1); assert_eq!(headers.len(), 1);
assert_eq!(headers[0], header); assert_eq!(headers[0], header);
} }

View File

@ -22,6 +22,7 @@ mod hex_bytes;
mod integer_list; mod integer_list;
mod jsonu256; mod jsonu256;
mod log; mod log;
mod peer;
mod receipt; mod receipt;
mod storage; mod storage;
mod transaction; mod transaction;
@ -41,6 +42,7 @@ pub use hex_bytes::Bytes;
pub use integer_list::IntegerList; pub use integer_list::IntegerList;
pub use jsonu256::JsonU256; pub use jsonu256::JsonU256;
pub use log::Log; pub use log::Log;
pub use peer::{PeerId, WithPeerId};
pub use receipt::Receipt; pub use receipt::Receipt;
pub use storage::StorageEntry; pub use storage::StorageEntry;
pub use transaction::{ pub use transaction::{
@ -69,12 +71,6 @@ pub type StorageKey = H256;
/// An account storage value. /// An account storage value.
pub type StorageValue = U256; pub type StorageValue = U256;
// TODO: should we use `PublicKey` for this? Even when dealing with public keys we should try to
// prevent misuse
/// This represents an uncompressed secp256k1 public key.
/// This encodes the concatenation of the x and y components of the affine point in bytes.
pub type PeerId = H512;
pub use ethers_core::{ pub use ethers_core::{
types as rpc, types as rpc,
types::{BigEndianHash, H128, H160, H256, H512, H64, U128, U256, U64}, types::{BigEndianHash, H128, H160, H256, H512, H64, U128, U256, U64},

View File

@ -0,0 +1,39 @@
use ethers_core::types::H512;
// TODO: should we use `PublicKey` for this? Even when dealing with public keys we should try to
// prevent misuse
/// This represents an uncompressed secp256k1 public key.
/// This encodes the concatenation of the x and y components of the affine point in bytes.
pub type PeerId = H512;
/// Generic wrapper with peer id
#[derive(Debug)]
pub struct WithPeerId<T>(PeerId, pub T);
impl<T> From<(H512, T)> for WithPeerId<T> {
fn from(value: (H512, T)) -> Self {
Self(value.0, value.1)
}
}
impl<T> WithPeerId<T> {
/// Get the peer id
pub fn peer_id(&self) -> PeerId {
self.0
}
/// Get the underlying data
pub fn data(&self) -> &T {
&self.1
}
/// Transform the data
pub fn transform<F: From<T>>(self) -> WithPeerId<F> {
WithPeerId(self.0, self.1.into())
}
/// Split the wrapper into [PeerId] and data tuple
pub fn split(self) -> (PeerId, T) {
(self.0, self.1)
}
}

View File

@ -33,11 +33,8 @@ rayon = "1.6.0"
# reth # reth
reth-db = { path = "../storage/db", features = ["test-utils", "mdbx"] } reth-db = { path = "../storage/db", features = ["test-utils", "mdbx"] }
reth-interfaces = { path = "../interfaces", features = ["test-utils"] } reth-interfaces = { path = "../interfaces", features = ["test-utils"] }
reth-bodies-downloaders = { path = "../net/bodies-downloaders" } reth-downloaders = { path = "../net/downloaders" }
reth-eth-wire = { path = "../net/eth-wire" } # TODO(onbjerg): We only need this for [BlockBody]
# TODO(onbjerg): We only need this for [BlockBody]
reth-eth-wire = { path = "../net/eth-wire" }
reth-headers-downloaders = { path = "../net/headers-downloaders" }
tokio = { version = "*", features = ["rt", "sync", "macros"] } tokio = { version = "*", features = ["rt", "sync", "macros"] }
tokio-stream = "0.1.10" tokio-stream = "0.1.10"
tempfile = "3.3.0" tempfile = "3.3.0"

View File

@ -490,7 +490,7 @@ mod tests {
client::BodiesClient, client::BodiesClient,
downloader::{BodiesStream, BodyDownloader}, downloader::{BodiesStream, BodyDownloader},
}, },
error::RequestResult, error::{PeerRequestResult, RequestResult},
}, },
test_utils::{ test_utils::{
generators::{random_block_range, random_signed_tx}, generators::{random_block_range, random_signed_tx},
@ -716,7 +716,7 @@ mod tests {
#[async_trait::async_trait] #[async_trait::async_trait]
impl BodiesClient for NoopClient { impl BodiesClient for NoopClient {
async fn get_block_body(&self, _: Vec<H256>) -> RequestResult<Vec<BlockBody>> { async fn get_block_body(&self, _: Vec<H256>) -> PeerRequestResult<Vec<BlockBody>> {
panic!("Noop client should not be called") panic!("Noop client should not be called")
} }
} }

View File

@ -349,7 +349,7 @@ mod tests {
ExecInput, ExecOutput, UnwindInput, ExecInput, ExecOutput, UnwindInput,
}; };
use reth_db::{models::blocks::BlockNumHash, tables, transaction::DbTx}; use reth_db::{models::blocks::BlockNumHash, tables, transaction::DbTx};
use reth_headers_downloaders::linear::{LinearDownloadBuilder, LinearDownloader}; use reth_downloaders::headers::linear::{LinearDownloadBuilder, LinearDownloader};
use reth_interfaces::{ use reth_interfaces::{
p2p::headers::downloader::HeaderDownloader, p2p::headers::downloader::HeaderDownloader,
test_utils::{ test_utils::{