From 5057e8ec0a8e8f69c69eb3fa68a44efcdb7b72ed Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Tue, 13 Dec 2022 18:14:45 +0200 Subject: [PATCH] feat(p2p): refactor downloaders and add peer id to the result (#410) * feat(p2p): refactor downloaders and add peer id to the result * rm unused import * fix tests * clean up deps * Update crates/interfaces/src/p2p/error.rs Co-authored-by: Matthias Seitz * add split fn Co-authored-by: Matthias Seitz --- Cargo.lock | 91 ++++--------------- Cargo.toml | 3 +- crates/interfaces/src/p2p/bodies/client.rs | 4 +- crates/interfaces/src/p2p/downloader.rs | 24 +++++ crates/interfaces/src/p2p/error.rs | 4 + crates/interfaces/src/p2p/headers/client.rs | 4 +- .../interfaces/src/p2p/headers/downloader.rs | 50 ++-------- crates/interfaces/src/p2p/mod.rs | 6 +- crates/interfaces/src/p2p/traits.rs | 21 ----- crates/interfaces/src/test_utils/bodies.rs | 6 +- crates/interfaces/src/test_utils/headers.rs | 63 +++---------- .../Cargo.toml | 19 ++-- .../src/bodies}/concurrent.rs | 33 ++++--- crates/net/downloaders/src/bodies/mod.rs | 2 + .../src => downloaders/src/headers}/linear.rs | 63 ++++++------- crates/net/downloaders/src/headers/mod.rs | 2 + .../src/lib.rs | 9 +- .../src/test_utils.rs | 6 +- crates/net/headers-downloaders/Cargo.toml | 26 ------ crates/net/headers-downloaders/src/lib.rs | 11 --- crates/net/network/src/fetch/client.rs | 10 +- crates/net/network/src/fetch/mod.rs | 19 ++-- crates/net/network/tests/it/requests.rs | 4 +- crates/primitives/src/lib.rs | 8 +- crates/primitives/src/peer.rs | 39 ++++++++ crates/stages/Cargo.toml | 7 +- crates/stages/src/stages/bodies.rs | 4 +- crates/stages/src/stages/headers.rs | 2 +- 28 files changed, 220 insertions(+), 320 deletions(-) create mode 100644 crates/interfaces/src/p2p/downloader.rs delete mode 100644 crates/interfaces/src/p2p/traits.rs rename crates/net/{bodies-downloaders => downloaders}/Cargo.toml (76%) rename crates/net/{bodies-downloaders/src => downloaders/src/bodies}/concurrent.rs (87%) create mode 100644 crates/net/downloaders/src/bodies/mod.rs rename crates/net/{headers-downloaders/src => downloaders/src/headers}/linear.rs (92%) create mode 100644 crates/net/downloaders/src/headers/mod.rs rename crates/net/{bodies-downloaders => downloaders}/src/lib.rs (53%) rename crates/net/{bodies-downloaders => downloaders}/src/test_utils.rs (87%) delete mode 100644 crates/net/headers-downloaders/Cargo.toml delete mode 100644 crates/net/headers-downloaders/src/lib.rs create mode 100644 crates/primitives/src/peer.rs diff --git a/Cargo.lock b/Cargo.lock index b7e9ec877..d049b76be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -869,19 +869,6 @@ dependencies = [ "syn", ] -[[package]] -name = "dashmap" -version = "5.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" -dependencies = [ - "cfg-if", - "hashbrown 0.12.3", - "lock_api", - "once_cell", - "parking_lot_core 0.9.4", -] - [[package]] name = "data-encoding" version = "2.3.2" @@ -3204,22 +3191,6 @@ dependencies = [ "walkdir", ] -[[package]] -name = "reth-bodies-downloaders" -version = "0.1.0" -dependencies = [ - "assert_matches", - "async-trait", - "backon", - "futures-util", - "once_cell", - "rand 0.8.5", - "reth-eth-wire", - "reth-interfaces", - "reth-primitives", - "tokio", -] - [[package]] name = "reth-codecs" version = "0.1.0" @@ -3304,6 +3275,23 @@ dependencies = [ "url", ] +[[package]] +name = "reth-downloaders" +version = "0.1.0" +dependencies = [ + "assert_matches", + "async-trait", + "backon", + "futures", + "futures-util", + "once_cell", + "reth-eth-wire", + "reth-interfaces", + "reth-primitives", + "reth-rpc-types", + "tokio", +] + [[package]] name = "reth-ecies" version = "0.1.0" @@ -3383,22 +3371,6 @@ dependencies = [ "triehash", ] -[[package]] -name = "reth-headers-downloaders" -version = "0.1.0" -dependencies = [ - "assert_matches", - "async-trait", - "futures", - "once_cell", - "rand 0.8.5", - "reth-interfaces", - "reth-primitives", - "reth-rpc-types", - "serial_test", - "tokio", -] - [[package]] name = "reth-interfaces" version = "0.1.0" @@ -3645,11 +3617,10 @@ dependencies = [ "metrics", "rand 0.8.5", "rayon", - "reth-bodies-downloaders", "reth-db", + "reth-downloaders", "reth-eth-wire", "reth-executor", - "reth-headers-downloaders", "reth-interfaces", "reth-primitives", "reth-provider", @@ -4099,32 +4070,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serial_test" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92761393ee4dc3ff8f4af487bd58f4307c9329bbedea02cac0089ad9c411e153" -dependencies = [ - "dashmap", - "futures", - "lazy_static", - "log", - "parking_lot 0.12.1", - "serial_test_derive", -] - -[[package]] -name = "serial_test_derive" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b6f5d1c3087fb119617cff2966fe3808a80e5eb59a8c1601d5994d66f4346a5" -dependencies = [ - "proc-macro-error", - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "sha-1" version = "0.9.8" diff --git a/Cargo.toml b/Cargo.toml index 986b5fe4a..89b2fc34a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,8 +15,7 @@ members = [ "crates/net/rpc", "crates/net/rpc-api", "crates/net/rpc-types", - "crates/net/headers-downloaders", - "crates/net/bodies-downloaders", + "crates/net/downloaders", "crates/primitives", "crates/stages", "crates/storage/codecs", diff --git a/crates/interfaces/src/p2p/bodies/client.rs b/crates/interfaces/src/p2p/bodies/client.rs index c8953ba2c..1a5915257 100644 --- a/crates/interfaces/src/p2p/bodies/client.rs +++ b/crates/interfaces/src/p2p/bodies/client.rs @@ -1,4 +1,4 @@ -use crate::p2p::error::RequestResult; +use crate::p2p::error::PeerRequestResult; use async_trait::async_trait; use reth_eth_wire::BlockBody; use reth_primitives::H256; @@ -9,5 +9,5 @@ use std::fmt::Debug; #[auto_impl::auto_impl(&, Arc, Box)] pub trait BodiesClient: Send + Sync + Debug { /// Fetches the block body for the requested block. - async fn get_block_body(&self, hash: Vec) -> RequestResult>; + async fn get_block_body(&self, hash: Vec) -> PeerRequestResult>; } diff --git a/crates/interfaces/src/p2p/downloader.rs b/crates/interfaces/src/p2p/downloader.rs new file mode 100644 index 000000000..c0ac54662 --- /dev/null +++ b/crates/interfaces/src/p2p/downloader.rs @@ -0,0 +1,24 @@ +use super::headers::error::DownloadError; +use crate::consensus::Consensus; +use futures::Stream; +use std::pin::Pin; + +/// A stream for downloading response. +pub type DownloadStream = Pin> + Send>>; + +/// The generic trait for requesting and verifying data +/// over p2p network client +#[auto_impl::auto_impl(&, Arc, Box)] +pub trait Downloader: Send + Sync { + /// The client used to fetch necessary data + type Client; + + /// The Consensus used to verify data validity when downloading + type Consensus: Consensus; + + /// The headers client + fn client(&self) -> &Self::Client; + + /// The consensus engine + fn consensus(&self) -> &Self::Consensus; +} diff --git a/crates/interfaces/src/p2p/error.rs b/crates/interfaces/src/p2p/error.rs index 2496f19f1..7eda1d532 100644 --- a/crates/interfaces/src/p2p/error.rs +++ b/crates/interfaces/src/p2p/error.rs @@ -1,8 +1,12 @@ +use reth_primitives::WithPeerId; use tokio::sync::{mpsc, oneshot}; /// Result alias for result of a request. pub type RequestResult = Result; +/// Result with [PeerId] +pub type PeerRequestResult = RequestResult>; + /// Error variants that can happen when sending requests to a session. #[derive(Debug, thiserror::Error, Clone)] #[allow(missing_docs)] diff --git a/crates/interfaces/src/p2p/headers/client.rs b/crates/interfaces/src/p2p/headers/client.rs index 9558713b2..a5f525044 100644 --- a/crates/interfaces/src/p2p/headers/client.rs +++ b/crates/interfaces/src/p2p/headers/client.rs @@ -1,4 +1,4 @@ -use crate::p2p::error::RequestResult; +use crate::p2p::error::PeerRequestResult; use async_trait::async_trait; pub use reth_eth_wire::BlockHeaders; use reth_primitives::{BlockHashOrNumber, HeadersDirection, H256, U256}; @@ -27,5 +27,5 @@ pub trait HeadersClient: Send + Sync + Debug { /// Sends the header request to the p2p network and returns the header response received from a /// peer. - async fn get_headers(&self, request: HeadersRequest) -> RequestResult; + async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult; } diff --git a/crates/interfaces/src/p2p/headers/downloader.rs b/crates/interfaces/src/p2p/headers/downloader.rs index 442e66c3a..c9805007e 100644 --- a/crates/interfaces/src/p2p/headers/downloader.rs +++ b/crates/interfaces/src/p2p/headers/downloader.rs @@ -1,58 +1,28 @@ -use super::client::HeadersClient; use crate::{ consensus::Consensus, - p2p::{headers::error::DownloadError, traits::BatchDownload}, + p2p::{ + downloader::{DownloadStream, Downloader}, + headers::error::DownloadError, + }, }; -use futures::Stream; use reth_primitives::SealedHeader; use reth_rpc_types::engine::ForkchoiceState; -use std::pin::Pin; - -/// A Future for downloading a batch of headers. -pub type HeaderBatchDownload<'a> = Pin< - Box< - dyn BatchDownload< - Ok = SealedHeader, - Error = DownloadError, - Output = Result, DownloadError>, - > + Send - + 'a, - >, ->; - -/// A stream for downloading headers. -pub type HeaderDownloadStream = - Pin> + Send>>; /// A downloader capable of fetching block headers. /// /// A downloader represents a distinct strategy for submitting requests to download block headers, /// while a [HeadersClient] represents a client capable of fulfilling these requests. #[auto_impl::auto_impl(&, Arc, Box)] -pub trait HeaderDownloader: Sync + Send + Unpin { - /// The Consensus used to verify block validity when - /// downloading - type Consensus: Consensus; - - /// The Client used to download the headers - type Client: HeadersClient; - - /// The consensus engine - fn consensus(&self) -> &Self::Consensus; - - /// The headers client - fn client(&self) -> &Self::Client; - - /// Download the headers - fn download(&self, head: SealedHeader, forkchoice: ForkchoiceState) -> HeaderBatchDownload<'_>; - +pub trait HeaderDownloader: Downloader { /// Stream the headers - fn stream(&self, head: SealedHeader, forkchoice: ForkchoiceState) -> HeaderDownloadStream; + fn stream( + &self, + head: SealedHeader, + forkchoice: ForkchoiceState, + ) -> DownloadStream; /// Validate whether the header is valid in relation to it's parent - /// - /// Returns Ok(false) if the fn validate(&self, header: &SealedHeader, parent: &SealedHeader) -> Result<(), DownloadError> { validate_header_download(self.consensus(), header, parent)?; Ok(()) diff --git a/crates/interfaces/src/p2p/mod.rs b/crates/interfaces/src/p2p/mod.rs index 5676c9975..3293e7c0b 100644 --- a/crates/interfaces/src/p2p/mod.rs +++ b/crates/interfaces/src/p2p/mod.rs @@ -1,3 +1,6 @@ +/// The generic downloader trait for implementing data downloaders over P2P. +pub mod downloader; + /// Traits for implementing P2P block body clients. pub mod bodies; @@ -12,6 +15,3 @@ pub mod headers; /// Error types broadly used by p2p interfaces for any operation which may produce an error when /// interacting with the network implementation pub mod error; - -/// Commonly used traits when implementing clients. -pub mod traits; diff --git a/crates/interfaces/src/p2p/traits.rs b/crates/interfaces/src/p2p/traits.rs deleted file mode 100644 index ae66cf2cb..000000000 --- a/crates/interfaces/src/p2p/traits.rs +++ /dev/null @@ -1,21 +0,0 @@ -use futures::Stream; -use std::future::Future; - -/// Abstraction for downloading several items at once. -/// -/// A [`BatchDownload`] is a [`Future`] that represents a collection of download futures and -/// resolves once all of them finished. -/// -/// This is similar to the [`futures::future::join_all`] function, but it's open to implementers how -/// this Future behaves exactly. -/// -/// It is expected that the underlying futures return a [`Result`]. -pub trait BatchDownload: Future, Self::Error>> { - /// The `Ok` variant of the futures output in this batch. - type Ok; - /// The `Err` variant of the futures output in this batch. - type Error; - - /// Consumes the batch future and returns a [`Stream`] that yields results as they become ready. - fn into_stream_unordered(self) -> Box>>; -} diff --git a/crates/interfaces/src/test_utils/bodies.rs b/crates/interfaces/src/test_utils/bodies.rs index 6c03942c0..c3ff8d4cd 100644 --- a/crates/interfaces/src/test_utils/bodies.rs +++ b/crates/interfaces/src/test_utils/bodies.rs @@ -1,4 +1,4 @@ -use crate::p2p::{bodies::client::BodiesClient, error::RequestResult}; +use crate::p2p::{bodies::client::BodiesClient, error::PeerRequestResult}; use async_trait::async_trait; use reth_eth_wire::BlockBody; use reth_primitives::H256; @@ -19,9 +19,9 @@ impl Debug for TestBodiesClient { #[async_trait] impl BodiesClient for TestBodiesClient where - F: Fn(Vec) -> RequestResult> + Send + Sync, + F: Fn(Vec) -> PeerRequestResult> + Send + Sync, { - async fn get_block_body(&self, hashes: Vec) -> RequestResult> { + async fn get_block_body(&self, hashes: Vec) -> PeerRequestResult> { (self.responder)(hashes) } } diff --git a/crates/interfaces/src/test_utils/headers.rs b/crates/interfaces/src/test_utils/headers.rs index 6857a87e4..796c44c5b 100644 --- a/crates/interfaces/src/test_utils/headers.rs +++ b/crates/interfaces/src/test_utils/headers.rs @@ -2,19 +2,19 @@ use crate::{ consensus::{self, Consensus}, p2p::{ - error::{RequestError, RequestResult}, + downloader::{DownloadStream, Downloader}, + error::{PeerRequestResult, RequestError}, headers::{ client::{HeadersClient, HeadersRequest}, - downloader::{HeaderBatchDownload, HeaderDownloadStream, HeaderDownloader}, + downloader::HeaderDownloader, error::DownloadError, }, - traits::BatchDownload, }, }; use futures::{Future, FutureExt, Stream}; use reth_eth_wire::BlockHeaders; use reth_primitives::{ - BlockLocked, BlockNumber, Header, HeadersDirection, SealedHeader, H256, U256, + BlockLocked, BlockNumber, Header, HeadersDirection, PeerId, SealedHeader, H256, U256, }; use reth_rpc_types::engine::ForkchoiceState; use std::{ @@ -53,8 +53,7 @@ impl TestHeaderDownloader { } } -#[async_trait::async_trait] -impl HeaderDownloader for TestHeaderDownloader { +impl Downloader for TestHeaderDownloader { type Consensus = TestConsensus; type Client = TestHeadersClient; @@ -65,21 +64,20 @@ impl HeaderDownloader for TestHeaderDownloader { fn client(&self) -> &Self::Client { &self.client } +} - fn download( +#[async_trait::async_trait] +impl HeaderDownloader for TestHeaderDownloader { + fn stream( &self, _head: SealedHeader, _forkchoice: ForkchoiceState, - ) -> HeaderBatchDownload<'_> { - Box::pin(self.create_download()) - } - - fn stream(&self, _head: SealedHeader, _forkchoice: ForkchoiceState) -> HeaderDownloadStream { + ) -> DownloadStream { Box::pin(self.create_download()) } } -type TestHeadersFut = Pin> + Send>>; +type TestHeadersFut = Pin> + Send>>; struct TestDownload { client: Arc, @@ -105,30 +103,6 @@ impl TestDownload { } } -impl Future for TestDownload { - type Output = Result, DownloadError>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let empty = SealedHeader::default(); - if let Err(error) = self.consensus.validate_header(&empty, &empty) { - return Poll::Ready(Err(DownloadError::HeaderValidation { hash: empty.hash(), error })) - } - - match ready!(self.get_or_init_fut().poll_unpin(cx)) { - Ok(resp) => { - // Skip head and seal headers - let mut headers = resp.0.into_iter().skip(1).map(|h| h.seal()).collect::>(); - headers.sort_unstable_by_key(|h| h.number); - Poll::Ready(Ok(headers.into_iter().rev().collect())) - } - Err(err) => Poll::Ready(Err(match err { - RequestError::Timeout => DownloadError::Timeout, - _ => DownloadError::RequestError(err), - })), - } - } -} - impl Stream for TestDownload { type Item = Result; @@ -155,7 +129,7 @@ impl Stream for TestDownload { Ok(resp) => { // Skip head and seal headers let mut headers = - resp.0.into_iter().skip(1).map(|h| h.seal()).collect::>(); + resp.1 .0.into_iter().skip(1).map(|h| h.seal()).collect::>(); headers.sort_unstable_by_key(|h| h.number); headers.into_iter().for_each(|h| this.buffer.push(h)); this.done = true; @@ -173,15 +147,6 @@ impl Stream for TestDownload { } } -impl BatchDownload for TestDownload { - type Ok = SealedHeader; - type Error = DownloadError; - - fn into_stream_unordered(self) -> Box>> { - Box::new(self) - } -} - /// A test client for fetching headers #[derive(Debug, Default)] pub struct TestHeadersClient { @@ -207,7 +172,7 @@ impl TestHeadersClient { impl HeadersClient for TestHeadersClient { fn update_status(&self, _height: u64, _hash: H256, _td: U256) {} - async fn get_headers(&self, request: HeadersRequest) -> RequestResult { + async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult { if let Some(err) = &mut *self.error.lock().await { return Err(err.clone()) } @@ -215,7 +180,7 @@ impl HeadersClient for TestHeadersClient { let mut lock = self.responses.lock().await; let len = lock.len().min(request.limit as usize); let resp = lock.drain(..len).collect(); - return Ok(BlockHeaders(resp)) + return Ok((PeerId::default(), BlockHeaders(resp)).into()) } } diff --git a/crates/net/bodies-downloaders/Cargo.toml b/crates/net/downloaders/Cargo.toml similarity index 76% rename from crates/net/bodies-downloaders/Cargo.toml rename to crates/net/downloaders/Cargo.toml index 905b656c5..14b96f58d 100644 --- a/crates/net/bodies-downloaders/Cargo.toml +++ b/crates/net/downloaders/Cargo.toml @@ -1,24 +1,29 @@ [package] -name = "reth-bodies-downloaders" +name = "reth-downloaders" version = "0.1.0" edition = "2021" license = "MIT OR Apache-2.0" repository = "https://github.com/paradigmxyz/reth" readme = "README.md" -description = "Implementations of various block body downloaders" +description = "Implementations of various block downloaders" [dependencies] -futures-util = "0.3.25" +# reth reth-interfaces = { path = "../../interfaces" } reth-primitives = { path = "../../primitives" } +reth-rpc-types = { path = "../rpc-types" } reth-eth-wire = { path= "../eth-wire" } + +# async +async-trait = "0.1.58" +futures = "0.3" +futures-util = "0.3.25" + +# misc backon = "0.2.0" [dev-dependencies] +reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } assert_matches = "1.5.0" once_cell = "1.15.0" -rand = "0.8.5" -reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } tokio = { version = "1.21.2", features = ["full"] } -async-trait = "0.1.58" -futures-util = "0.3.25" \ No newline at end of file diff --git a/crates/net/bodies-downloaders/src/concurrent.rs b/crates/net/downloaders/src/bodies/concurrent.rs similarity index 87% rename from crates/net/bodies-downloaders/src/concurrent.rs rename to crates/net/downloaders/src/bodies/concurrent.rs index d1c257f90..217c20254 100644 --- a/crates/net/bodies-downloaders/src/concurrent.rs +++ b/crates/net/downloaders/src/bodies/concurrent.rs @@ -77,8 +77,8 @@ impl ConcurrentDownloader { block_number: BlockNumber, header_hash: H256, ) -> RequestResult<(BlockNumber, H256, BlockBody)> { - let mut body = self.client.get_block_body(vec![header_hash]).await?; - Ok((block_number, header_hash, body.remove(0))) + let mut response = self.client.get_block_body(vec![header_hash]).await?; + Ok((block_number, header_hash, response.1.remove(0))) // TODO: } } @@ -86,14 +86,11 @@ impl ConcurrentDownloader { #[cfg(test)] mod tests { use super::*; - use crate::{ - concurrent::ConcurrentDownloader, - test_utils::{generate_bodies, TestClient}, - }; + use crate::test_utils::{generate_bodies, TestClient}; use assert_matches::assert_matches; use futures_util::stream::{StreamExt, TryStreamExt}; use reth_interfaces::p2p::{bodies::downloader::BodyDownloader, error::RequestError}; - use reth_primitives::H256; + use reth_primitives::{PeerId, H256}; use std::{ sync::{ atomic::{AtomicUsize, Ordering}, @@ -115,9 +112,13 @@ mod tests { // Simulate that the request for this (random) block takes 0-100ms tokio::time::sleep(Duration::from_millis(hash[0].to_low_u64_be() % 100)).await; - Ok(vec![bodies - .remove(&hash[0]) - .expect("Downloader asked for a block it should not ask for")]) + Ok(( + PeerId::default(), + vec![bodies + .remove(&hash[0]) + .expect("Downloader asked for a block it should not ask for")], + ) + .into()) } }))); @@ -167,7 +168,11 @@ mod tests { retries_left.fetch_sub(1, Ordering::SeqCst); Err(RequestError::Timeout) } else { - Ok(vec![BlockBody { transactions: vec![], ommers: vec![] }]) + Ok(( + PeerId::default(), + vec![BlockBody { transactions: vec![], ommers: vec![] }], + ) + .into()) } } }))); @@ -199,7 +204,11 @@ mod tests { retries_left.fetch_sub(1, Ordering::SeqCst); Err(RequestError::Timeout) } else { - Ok(vec![BlockBody { transactions: vec![], ommers: vec![] }]) + Ok(( + PeerId::default(), + vec![BlockBody { transactions: vec![], ommers: vec![] }], + ) + .into()) } } }))) diff --git a/crates/net/downloaders/src/bodies/mod.rs b/crates/net/downloaders/src/bodies/mod.rs new file mode 100644 index 000000000..51e973e0c --- /dev/null +++ b/crates/net/downloaders/src/bodies/mod.rs @@ -0,0 +1,2 @@ +/// A naive concurrent downloader. +pub mod concurrent; diff --git a/crates/net/headers-downloaders/src/linear.rs b/crates/net/downloaders/src/headers/linear.rs similarity index 92% rename from crates/net/headers-downloaders/src/linear.rs rename to crates/net/downloaders/src/headers/linear.rs index d423a2cb2..ed6ab225d 100644 --- a/crates/net/headers-downloaders/src/linear.rs +++ b/crates/net/downloaders/src/headers/linear.rs @@ -2,16 +2,13 @@ use futures::{stream::Stream, FutureExt}; use reth_interfaces::{ consensus::Consensus, p2p::{ - error::{RequestError, RequestResult}, + downloader::{DownloadStream, Downloader}, + error::{PeerRequestResult, RequestError}, headers::{ client::{BlockHeaders, HeadersClient, HeadersRequest}, - downloader::{ - validate_header_download, HeaderBatchDownload, HeaderDownloadStream, - HeaderDownloader, - }, + downloader::{validate_header_download, HeaderDownloader}, error::DownloadError, }, - traits::BatchDownload, }, }; use reth_primitives::{HeadersDirection, SealedHeader, H256}; @@ -39,10 +36,10 @@ pub struct LinearDownloader { pub request_retries: usize, } -impl HeaderDownloader for LinearDownloader +impl Downloader for LinearDownloader where - C: Consensus + 'static, - H: HeadersClient + 'static, + C: Consensus, + H: HeadersClient, { type Consensus = C; type Client = H; @@ -54,12 +51,18 @@ where fn client(&self) -> &Self::Client { self.client.borrow() } +} - fn download(&self, head: SealedHeader, forkchoice: ForkchoiceState) -> HeaderBatchDownload<'_> { - Box::pin(self.new_download(head, forkchoice)) - } - - fn stream(&self, head: SealedHeader, forkchoice: ForkchoiceState) -> HeaderDownloadStream { +impl HeaderDownloader for LinearDownloader +where + C: Consensus + 'static, + H: HeadersClient + 'static, +{ + fn stream( + &self, + head: SealedHeader, + forkchoice: ForkchoiceState, + ) -> DownloadStream { Box::pin(self.new_download(head, forkchoice)) } } @@ -95,7 +98,7 @@ impl LinearDownloader { } } -type HeadersFut = Pin> + Send>>; +type HeadersFut = Pin> + Send>>; /// A retryable future that returns a list of [`BlockHeaders`] on success. struct HeadersRequestFuture { @@ -119,7 +122,7 @@ impl HeadersRequestFuture { } impl Future for HeadersRequestFuture { - type Output = RequestResult; + type Output = PeerRequestResult; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.get_mut().fut.poll_unpin(cx) @@ -222,11 +225,11 @@ where fn process_header_response( &mut self, - response: Result, + response: PeerRequestResult, ) -> Result<(), DownloadError> { match response { Ok(res) => { - let mut headers = res.0; + let mut headers = res.1 .0; headers.sort_unstable_by_key(|h| h.number); if headers.is_empty() { @@ -353,19 +356,6 @@ where } } -impl BatchDownload for HeadersDownload -where - C: Consensus + 'static, - H: HeadersClient + 'static, -{ - type Ok = SealedHeader; - type Error = DownloadError; - - fn into_stream_unordered(self) -> Box>> { - Box::new(self) - } -} - /// The builder for [LinearDownloader] with /// some default settings #[derive(Debug)] @@ -438,12 +428,15 @@ mod tests { } #[tokio::test] - async fn download_empty() { + async fn stream_empty() { let client = Arc::new(TestHeadersClient::default()); let downloader = LinearDownloadBuilder::default().build(CONSENSUS.clone(), Arc::clone(&client)); - let result = downloader.download(SealedHeader::default(), ForkchoiceState::default()).await; + let result = downloader + .stream(SealedHeader::default(), ForkchoiceState::default()) + .try_collect::>() + .await; assert!(result.is_err()); } @@ -470,7 +463,7 @@ mod tests { let fork = ForkchoiceState { head_block_hash: p0.hash_slow(), ..Default::default() }; - let result = downloader.download(p0, fork).await; + let result = downloader.stream(p0, fork).try_collect::>().await; let headers = result.unwrap(); assert!(headers.is_empty()); } @@ -498,7 +491,7 @@ mod tests { let fork = ForkchoiceState { head_block_hash: p0.hash_slow(), ..Default::default() }; - let result = downloader.download(p3, fork).await; + let result = downloader.stream(p3, fork).try_collect::>().await; let headers = result.unwrap(); assert_eq!(headers.len(), 3); assert_eq!(headers[0], p0); diff --git a/crates/net/downloaders/src/headers/mod.rs b/crates/net/downloaders/src/headers/mod.rs new file mode 100644 index 000000000..b6a1624d8 --- /dev/null +++ b/crates/net/downloaders/src/headers/mod.rs @@ -0,0 +1,2 @@ +/// A Linear downloader implementation. +pub mod linear; diff --git a/crates/net/bodies-downloaders/src/lib.rs b/crates/net/downloaders/src/lib.rs similarity index 53% rename from crates/net/bodies-downloaders/src/lib.rs rename to crates/net/downloaders/src/lib.rs index 92767f046..fb69b0a8e 100644 --- a/crates/net/bodies-downloaders/src/lib.rs +++ b/crates/net/downloaders/src/lib.rs @@ -5,10 +5,13 @@ attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] -//! Implements body downloader algorithms. +//! Implements the downloader algorithms. -/// A naive concurrent downloader. -pub mod concurrent; +/// The collection of algorithms for downloading block bodies. +pub mod bodies; + +/// The collection of alhgorithms for downloading block headers. +pub mod headers; #[cfg(test)] mod test_utils; diff --git a/crates/net/bodies-downloaders/src/test_utils.rs b/crates/net/downloaders/src/test_utils.rs similarity index 87% rename from crates/net/bodies-downloaders/src/test_utils.rs rename to crates/net/downloaders/src/test_utils.rs index 6a36b4c20..4bc383b9b 100644 --- a/crates/net/bodies-downloaders/src/test_utils.rs +++ b/crates/net/downloaders/src/test_utils.rs @@ -3,7 +3,7 @@ use async_trait::async_trait; use reth_eth_wire::BlockBody; use reth_interfaces::{ - p2p::{bodies::client::BodiesClient, error::RequestResult}, + p2p::{bodies::client::BodiesClient, error::PeerRequestResult}, test_utils::generators::random_block_range, }; use reth_primitives::{BlockNumber, H256}; @@ -58,9 +58,9 @@ impl TestClient { impl BodiesClient for TestClient where F: FnMut(Vec) -> Fut + Send + Sync, - Fut: Future>> + Send, + Fut: Future>> + Send, { - async fn get_block_body(&self, hash: Vec) -> RequestResult> { + async fn get_block_body(&self, hash: Vec) -> PeerRequestResult> { let f = &mut *self.0.lock().await; (f)(hash).await } diff --git a/crates/net/headers-downloaders/Cargo.toml b/crates/net/headers-downloaders/Cargo.toml deleted file mode 100644 index cebc4167a..000000000 --- a/crates/net/headers-downloaders/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "reth-headers-downloaders" -version = "0.1.0" -edition = "2021" -license = "MIT OR Apache-2.0" -repository = "https://github.com/paradigmxyz/reth" -readme = "README.md" -description = "Implementations of various header downloader" - -[dependencies] -# reth -reth-interfaces = { path = "../../interfaces" } -reth-primitives = { path = "../../primitives" } -reth-rpc-types = { path = "../rpc-types" } - -# async -async-trait = "0.1.58" -futures = "0.3" - -[dev-dependencies] -assert_matches = "1.5.0" -once_cell = "1.15.0" -rand = "0.8.5" -reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } -tokio = { version = "1.21.2", features = ["full"] } -serial_test = "0.9.0" diff --git a/crates/net/headers-downloaders/src/lib.rs b/crates/net/headers-downloaders/src/lib.rs deleted file mode 100644 index 5e97cff8e..000000000 --- a/crates/net/headers-downloaders/src/lib.rs +++ /dev/null @@ -1,11 +0,0 @@ -#![warn(missing_docs, unreachable_pub)] -#![deny(unused_must_use, rust_2018_idioms)] -#![doc(test( - no_crate_inject, - attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) -))] - -//! Implements Header Downloader algorithms - -/// A Linear downloader implementation. -pub mod linear; diff --git a/crates/net/network/src/fetch/client.rs b/crates/net/network/src/fetch/client.rs index 8f38fb622..341de0ec6 100644 --- a/crates/net/network/src/fetch/client.rs +++ b/crates/net/network/src/fetch/client.rs @@ -4,10 +4,10 @@ use crate::fetch::{DownloadRequest, StatusUpdate}; use reth_eth_wire::{BlockBody, BlockHeaders}; use reth_interfaces::p2p::{ bodies::client::BodiesClient, - error::RequestResult, + error::PeerRequestResult, headers::client::{HeadersClient, HeadersRequest}, }; -use reth_primitives::{H256, U256}; +use reth_primitives::{WithPeerId, H256, U256}; use tokio::sync::{mpsc::UnboundedSender, oneshot}; /// Front-end API for fetching data from the network. @@ -26,16 +26,16 @@ impl HeadersClient for FetchClient { } /// Sends a `GetBlockHeaders` request to an available peer. - async fn get_headers(&self, request: HeadersRequest) -> RequestResult { + async fn get_headers(&self, request: HeadersRequest) -> PeerRequestResult { let (response, rx) = oneshot::channel(); self.request_tx.send(DownloadRequest::GetBlockHeaders { request, response })?; - rx.await?.map(BlockHeaders::from) + rx.await?.map(WithPeerId::transform) } } #[async_trait::async_trait] impl BodiesClient for FetchClient { - async fn get_block_body(&self, request: Vec) -> RequestResult> { + async fn get_block_body(&self, request: Vec) -> PeerRequestResult> { let (response, rx) = oneshot::channel(); self.request_tx.send(DownloadRequest::GetBlockBodies { request, response })?; rx.await? diff --git a/crates/net/network/src/fetch/mod.rs b/crates/net/network/src/fetch/mod.rs index 0f6de5763..2a92f23f9 100644 --- a/crates/net/network/src/fetch/mod.rs +++ b/crates/net/network/src/fetch/mod.rs @@ -4,7 +4,7 @@ use crate::message::BlockRequest; use futures::StreamExt; use reth_eth_wire::{BlockBody, GetBlockBodies, GetBlockHeaders}; use reth_interfaces::p2p::{ - error::{RequestError, RequestResult}, + error::{PeerRequestResult, RequestError, RequestResult}, headers::client::HeadersRequest, }; use reth_primitives::{Header, PeerId, H256, U256}; @@ -25,9 +25,11 @@ pub use client::FetchClient; /// peers and sends the response once ready. pub struct StateFetcher { /// Currently active [`GetBlockHeaders`] requests - inflight_headers_requests: HashMap>>>, + inflight_headers_requests: + HashMap>>>, /// Currently active [`GetBlockBodies`] requests - inflight_bodies_requests: HashMap, RequestResult>>>, + inflight_bodies_requests: + HashMap, PeerRequestResult>>>, /// The list of available peers for requests. peers: HashMap, /// Requests queued for processing @@ -187,7 +189,7 @@ impl StateFetcher { ) -> Option { let is_error = res.is_err(); if let Some(resp) = self.inflight_headers_requests.remove(&peer_id) { - let _ = resp.response.send(res); + let _ = resp.response.send(res.map(|h| (peer_id, h).into())); } if is_error { @@ -215,7 +217,7 @@ impl StateFetcher { res: RequestResult>, ) -> Option { if let Some(resp) = self.inflight_bodies_requests.remove(&peer_id) { - let _ = resp.response.send(res); + let _ = resp.response.send(res.map(|b| (peer_id, b).into())); } if let Some(peer) = self.peers.get_mut(&peer_id) { if peer.state.on_request_finished() { @@ -325,10 +327,13 @@ pub(crate) enum DownloadRequest { /// Download the requested headers and send response through channel GetBlockHeaders { request: HeadersRequest, - response: oneshot::Sender>>, + response: oneshot::Sender>>, }, /// Download the requested headers and send response through channel - GetBlockBodies { request: Vec, response: oneshot::Sender>> }, + GetBlockBodies { + request: Vec, + response: oneshot::Sender>>, + }, } // === impl DownloadRequest === diff --git a/crates/net/network/tests/it/requests.rs b/crates/net/network/tests/it/requests.rs index fc701cf05..400ee48d8 100644 --- a/crates/net/network/tests/it/requests.rs +++ b/crates/net/network/tests/it/requests.rs @@ -67,7 +67,7 @@ async fn test_get_body() { let res = fetch0.get_block_body(vec![block_hash]).await; assert!(res.is_ok()); - let blocks = res.unwrap(); + let blocks = res.unwrap().1; assert_eq!(blocks.len(), 1); let expected = BlockBody { transactions: block.body, ommers: block.ommers }; assert_eq!(blocks[0], expected); @@ -114,7 +114,7 @@ async fn test_get_header() { let res = fetch0.get_headers(req).await; assert!(res.is_ok()); - let headers = res.unwrap().0; + let headers = res.unwrap().1 .0; assert_eq!(headers.len(), 1); assert_eq!(headers[0], header); } diff --git a/crates/primitives/src/lib.rs b/crates/primitives/src/lib.rs index a199e471e..4c750ab35 100644 --- a/crates/primitives/src/lib.rs +++ b/crates/primitives/src/lib.rs @@ -22,6 +22,7 @@ mod hex_bytes; mod integer_list; mod jsonu256; mod log; +mod peer; mod receipt; mod storage; mod transaction; @@ -41,6 +42,7 @@ pub use hex_bytes::Bytes; pub use integer_list::IntegerList; pub use jsonu256::JsonU256; pub use log::Log; +pub use peer::{PeerId, WithPeerId}; pub use receipt::Receipt; pub use storage::StorageEntry; pub use transaction::{ @@ -69,12 +71,6 @@ pub type StorageKey = H256; /// An account storage value. pub type StorageValue = U256; -// TODO: should we use `PublicKey` for this? Even when dealing with public keys we should try to -// prevent misuse -/// This represents an uncompressed secp256k1 public key. -/// This encodes the concatenation of the x and y components of the affine point in bytes. -pub type PeerId = H512; - pub use ethers_core::{ types as rpc, types::{BigEndianHash, H128, H160, H256, H512, H64, U128, U256, U64}, diff --git a/crates/primitives/src/peer.rs b/crates/primitives/src/peer.rs new file mode 100644 index 000000000..f66944f72 --- /dev/null +++ b/crates/primitives/src/peer.rs @@ -0,0 +1,39 @@ +use ethers_core::types::H512; + +// TODO: should we use `PublicKey` for this? Even when dealing with public keys we should try to +// prevent misuse +/// This represents an uncompressed secp256k1 public key. +/// This encodes the concatenation of the x and y components of the affine point in bytes. +pub type PeerId = H512; + +/// Generic wrapper with peer id +#[derive(Debug)] +pub struct WithPeerId(PeerId, pub T); + +impl From<(H512, T)> for WithPeerId { + fn from(value: (H512, T)) -> Self { + Self(value.0, value.1) + } +} + +impl WithPeerId { + /// Get the peer id + pub fn peer_id(&self) -> PeerId { + self.0 + } + + /// Get the underlying data + pub fn data(&self) -> &T { + &self.1 + } + + /// Transform the data + pub fn transform>(self) -> WithPeerId { + WithPeerId(self.0, self.1.into()) + } + + /// Split the wrapper into [PeerId] and data tuple + pub fn split(self) -> (PeerId, T) { + (self.0, self.1) + } +} diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index 3882051f6..9481adb88 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -33,11 +33,8 @@ rayon = "1.6.0" # reth reth-db = { path = "../storage/db", features = ["test-utils", "mdbx"] } reth-interfaces = { path = "../interfaces", features = ["test-utils"] } -reth-bodies-downloaders = { path = "../net/bodies-downloaders" } - -# TODO(onbjerg): We only need this for [BlockBody] -reth-eth-wire = { path = "../net/eth-wire" } -reth-headers-downloaders = { path = "../net/headers-downloaders" } +reth-downloaders = { path = "../net/downloaders" } +reth-eth-wire = { path = "../net/eth-wire" } # TODO(onbjerg): We only need this for [BlockBody] tokio = { version = "*", features = ["rt", "sync", "macros"] } tokio-stream = "0.1.10" tempfile = "3.3.0" diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 941a15603..f36efb7ee 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -490,7 +490,7 @@ mod tests { client::BodiesClient, downloader::{BodiesStream, BodyDownloader}, }, - error::RequestResult, + error::{PeerRequestResult, RequestResult}, }, test_utils::{ generators::{random_block_range, random_signed_tx}, @@ -716,7 +716,7 @@ mod tests { #[async_trait::async_trait] impl BodiesClient for NoopClient { - async fn get_block_body(&self, _: Vec) -> RequestResult> { + async fn get_block_body(&self, _: Vec) -> PeerRequestResult> { panic!("Noop client should not be called") } } diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 9b5b0532d..d23ba9b54 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -349,7 +349,7 @@ mod tests { ExecInput, ExecOutput, UnwindInput, }; use reth_db::{models::blocks::BlockNumHash, tables, transaction::DbTx}; - use reth_headers_downloaders::linear::{LinearDownloadBuilder, LinearDownloader}; + use reth_downloaders::headers::linear::{LinearDownloadBuilder, LinearDownloader}; use reth_interfaces::{ p2p::headers::downloader::HeaderDownloader, test_utils::{