diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 63259d2b4..6b39373ce 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -237,7 +237,7 @@ impl Command { ) -> reth_downloaders::headers::task::TaskDownloader { let headers_conf = &config.stages.headers; headers::task::TaskDownloader::spawn( - headers::linear::LinearDownloadBuilder::default() + headers::reverse_headers::ReverseHeadersDownloaderBuilder::default() .request_limit(headers_conf.downloader_batch_size) .stream_batch_size(headers_conf.commit_threshold as usize) .build(consensus.clone(), fetch_client.clone()), @@ -253,7 +253,7 @@ impl Command { ) -> reth_downloaders::bodies::task::TaskDownloader { let bodies_conf = &config.stages.bodies; bodies::task::TaskDownloader::spawn( - bodies::concurrent::ConcurrentDownloaderBuilder::default() + bodies::bodies::BodiesDownloaderBuilder::default() .with_stream_batch_size(bodies_conf.downloader_stream_batch_size) .with_request_limit(bodies_conf.downloader_request_limit) .with_max_buffered_responses(bodies_conf.downloader_max_buffered_responses) diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index 5b50aadbe..001e66ca6 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -8,7 +8,7 @@ use crate::{ NetworkOpts, }; use reth_consensus::beacon::BeaconConsensus; -use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder; +use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_net_nat::NatResolver; use reth_primitives::ChainSpec; @@ -150,7 +150,7 @@ impl Command { let fetch_client = Arc::new(network.fetch_client().await?); let mut stage = BodyStage { - downloader: ConcurrentDownloaderBuilder::default() + downloader: BodiesDownloaderBuilder::default() .with_stream_batch_size(num_blocks as usize) .with_request_limit(config.stages.bodies.downloader_request_limit) .with_max_buffered_responses( diff --git a/crates/net/downloaders/src/bodies/concurrent.rs b/crates/net/downloaders/src/bodies/bodies.rs similarity index 96% rename from crates/net/downloaders/src/bodies/concurrent.rs rename to crates/net/downloaders/src/bodies/bodies.rs index 1e9b3278f..a5f93263f 100644 --- a/crates/net/downloaders/src/bodies/concurrent.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -43,7 +43,7 @@ pub const BODIES_DOWNLOADER_SCOPE: &str = "downloaders.bodies"; /// All blocks in a batch are fetched at the same time. #[must_use = "Stream does nothing unless polled"] #[derive(Debug)] -pub struct ConcurrentDownloader { +pub struct BodiesDownloader { /// The bodies client client: Arc, /// The consensus client @@ -73,7 +73,7 @@ pub struct ConcurrentDownloader { metrics: DownloaderMetrics, } -impl ConcurrentDownloader +impl BodiesDownloader where B: BodiesClient + 'static, DB: Database, @@ -241,7 +241,7 @@ where } } -impl BodyDownloader for ConcurrentDownloader +impl BodyDownloader for BodiesDownloader where B: BodiesClient + 'static, DB: Database, @@ -331,7 +331,7 @@ where } } -impl Stream for ConcurrentDownloader +impl Stream for BodiesDownloader where B: BodiesClient + 'static, DB: Database, @@ -452,8 +452,8 @@ impl Ord for OrderedBodiesResponse { } } -/// Builder for [ConcurrentDownloader]. -pub struct ConcurrentDownloaderBuilder { +/// Builder for [BodiesDownloader]. +pub struct BodiesDownloaderBuilder { /// The batch size of non-empty blocks per one request request_limit: u64, /// The maximum number of block bodies returned at once from the stream @@ -464,7 +464,7 @@ pub struct ConcurrentDownloaderBuilder { concurrent_requests_range: RangeInclusive, } -impl Default for ConcurrentDownloaderBuilder { +impl Default for BodiesDownloaderBuilder { fn default() -> Self { Self { request_limit: 200, @@ -475,7 +475,7 @@ impl Default for ConcurrentDownloaderBuilder { } } -impl ConcurrentDownloaderBuilder { +impl BodiesDownloaderBuilder { /// Set request batch size on the downloader. pub fn with_request_limit(mut self, request_limit: u64) -> Self { self.request_limit = request_limit; @@ -509,7 +509,7 @@ impl ConcurrentDownloaderBuilder { client: Arc, consensus: Arc, db: Arc, - ) -> ConcurrentDownloader + ) -> BodiesDownloader where B: BodiesClient + 'static, DB: Database, @@ -522,7 +522,7 @@ impl ConcurrentDownloaderBuilder { } = self; let metrics = DownloaderMetrics::new(BODIES_DOWNLOADER_SCOPE); let in_progress_queue = BodiesRequestQueue::new(metrics.clone()); - ConcurrentDownloader { + BodiesDownloader { client, consensus, db, @@ -566,7 +566,7 @@ mod tests { let client = Arc::new( TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true), ); - let mut downloader = ConcurrentDownloaderBuilder::default().build( + let mut downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), db, @@ -595,7 +595,7 @@ mod tests { let client = Arc::new( TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true), ); - let mut downloader = ConcurrentDownloaderBuilder::default() + let mut downloader = BodiesDownloaderBuilder::default() .with_stream_batch_size(stream_batch_size) .with_request_limit(request_limit) .build(client.clone(), Arc::new(TestConsensus::default()), db); @@ -631,9 +631,11 @@ mod tests { insert_headers(&db, &headers); let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone())); - let mut downloader = ConcurrentDownloaderBuilder::default() - .with_stream_batch_size(100) - .build(client.clone(), Arc::new(TestConsensus::default()), db); + let mut downloader = BodiesDownloaderBuilder::default().with_stream_batch_size(100).build( + client.clone(), + Arc::new(TestConsensus::default()), + db, + ); // Set and download the first range downloader.set_download_range(0..100).expect("failed to set download range"); diff --git a/crates/net/downloaders/src/bodies/mod.rs b/crates/net/downloaders/src/bodies/mod.rs index a816f2a4a..b6ce04829 100644 --- a/crates/net/downloaders/src/bodies/mod.rs +++ b/crates/net/downloaders/src/bodies/mod.rs @@ -1,5 +1,6 @@ /// A naive concurrent downloader. -pub mod concurrent; +#[allow(clippy::module_inception)] +pub mod bodies; /// TODO: pub mod task; diff --git a/crates/net/downloaders/src/bodies/task.rs b/crates/net/downloaders/src/bodies/task.rs index f337d2dfe..59415fd3b 100644 --- a/crates/net/downloaders/src/bodies/task.rs +++ b/crates/net/downloaders/src/bodies/task.rs @@ -44,13 +44,13 @@ impl TaskDownloader { /// /// ``` /// use std::sync::Arc; - /// use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder; + /// use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; /// use reth_downloaders::bodies::task::TaskDownloader; /// use reth_interfaces::consensus::Consensus; /// use reth_interfaces::p2p::bodies::client::BodiesClient; /// use reth_db::database::Database; /// fn t(client: Arc, consensus:Arc, db: Arc) { - /// let downloader = ConcurrentDownloaderBuilder::default().build( + /// let downloader = BodiesDownloaderBuilder::default().build( /// client, /// consensus, /// db @@ -133,7 +133,7 @@ mod tests { use super::*; use crate::{ bodies::{ - concurrent::ConcurrentDownloaderBuilder, + bodies::BodiesDownloaderBuilder, test_utils::{insert_headers, zip_blocks}, }, test_utils::{generate_bodies, TestBodiesClient}, @@ -155,7 +155,7 @@ mod tests { let client = Arc::new( TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true), ); - let downloader = ConcurrentDownloaderBuilder::default().build( + let downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), db, @@ -184,7 +184,7 @@ mod tests { let client = Arc::new( TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true), ); - let downloader = ConcurrentDownloaderBuilder::default().build( + let downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), db, diff --git a/crates/net/downloaders/src/headers/mod.rs b/crates/net/downloaders/src/headers/mod.rs index 4e3a8bc57..d5ee82fb4 100644 --- a/crates/net/downloaders/src/headers/mod.rs +++ b/crates/net/downloaders/src/headers/mod.rs @@ -1,5 +1,5 @@ /// A Linear downloader implementation. -pub mod linear; +pub mod reverse_headers; /// A downloader implementation that spawns a downloader to a task pub mod task; diff --git a/crates/net/downloaders/src/headers/linear.rs b/crates/net/downloaders/src/headers/reverse_headers.rs similarity index 96% rename from crates/net/downloaders/src/headers/linear.rs rename to crates/net/downloaders/src/headers/reverse_headers.rs index 60205ed6c..415209437 100644 --- a/crates/net/downloaders/src/headers/linear.rs +++ b/crates/net/downloaders/src/headers/reverse_headers.rs @@ -44,7 +44,7 @@ pub const HEADERS_DOWNLOADER_SCOPE: &str = "downloaders.headers"; /// the batches of headers that this downloader yields will start at the chain tip and move towards /// the local head: falling block numbers. #[must_use = "Stream does nothing unless polled"] -pub struct LinearDownloader { +pub struct ReverseHeadersDownloader { /// Consensus client used to validate headers consensus: Arc, /// Client used to download headers. @@ -86,15 +86,15 @@ pub struct LinearDownloader { metrics: DownloaderMetrics, } -// === impl LinearDownloader === +// === impl ReverseHeadersDownloader === -impl LinearDownloader +impl ReverseHeadersDownloader where H: HeadersClient + 'static, { - /// Convenience method to create a [LinearDownloadBuilder] without importing it - pub fn builder() -> LinearDownloadBuilder { - LinearDownloadBuilder::default() + /// Convenience method to create a [ReverseHeadersDownloaderBuilder] without importing it + pub fn builder() -> ReverseHeadersDownloaderBuilder { + ReverseHeadersDownloaderBuilder::default() } /// Returns the block number the local node is at. @@ -501,7 +501,7 @@ where } } -impl HeaderDownloader for LinearDownloader +impl HeaderDownloader for ReverseHeadersDownloader where H: HeadersClient + 'static, { @@ -577,7 +577,7 @@ where } } -impl Stream for LinearDownloader +impl Stream for ReverseHeadersDownloader where H: HeadersClient + 'static, { @@ -793,10 +793,10 @@ impl SyncTargetBlock { } } -/// The builder for [LinearDownloader] with +/// The builder for [ReverseHeadersDownloader] with /// some default settings #[derive(Debug)] -pub struct LinearDownloadBuilder { +pub struct ReverseHeadersDownloaderBuilder { /// The batch size per one request request_limit: u64, /// Batch size for headers @@ -809,7 +809,7 @@ pub struct LinearDownloadBuilder { max_buffered_responses: usize, } -impl Default for LinearDownloadBuilder { +impl Default for ReverseHeadersDownloaderBuilder { fn default() -> Self { Self { request_limit: 1_000, @@ -821,7 +821,7 @@ impl Default for LinearDownloadBuilder { } } -impl LinearDownloadBuilder { +impl ReverseHeadersDownloaderBuilder { /// Set the request batch size. /// /// This determines the `limit` for a [GetHeaders](reth_eth_wire::GetBlockHeaders) requests, the @@ -833,8 +833,9 @@ impl LinearDownloadBuilder { /// Set the stream batch size /// - /// This determines the number of headers the [LinearDownloader] will yield on `Stream::next`. - /// This will be the amount of headers the headers stage will commit at a time. + /// This determines the number of headers the [ReverseHeadersDownloader] will yield on + /// `Stream::next`. This will be the amount of headers the headers stage will commit at a + /// time. pub fn stream_batch_size(mut self, size: usize) -> Self { self.stream_batch_size = size; self @@ -842,8 +843,8 @@ impl LinearDownloadBuilder { /// Set the min amount of concurrent requests. /// - /// If there's capacity the [LinearDownloader] will keep at least this many requests active at a - /// time. + /// If there's capacity the [ReverseHeadersDownloader] will keep at least this many requests + /// active at a time. pub fn min_concurrent_requests(mut self, min_concurrent_requests: usize) -> Self { self.min_concurrent_requests = min_concurrent_requests; self @@ -861,16 +862,20 @@ impl LinearDownloadBuilder { /// /// This essentially determines how much memory the downloader can use for buffering responses /// that arrive out of order. The total number of buffered headers is `request_limit * - /// max_buffered_responses`. If the [LinearDownloader]'s buffered responses exceeds this + /// max_buffered_responses`. If the [ReverseHeadersDownloader]'s buffered responses exceeds this /// threshold it waits until there's capacity again before sending new requests. pub fn max_buffered_responses(mut self, max_buffered_responses: usize) -> Self { self.max_buffered_responses = max_buffered_responses; self } - /// Build [LinearDownloader] with provided consensus + /// Build [ReverseHeadersDownloader] with provided consensus /// and header client implementations - pub fn build(self, consensus: Arc, client: Arc) -> LinearDownloader + pub fn build( + self, + consensus: Arc, + client: Arc, + ) -> ReverseHeadersDownloader where H: HeadersClient + 'static, { @@ -881,7 +886,7 @@ impl LinearDownloadBuilder { max_concurrent_requests, max_buffered_responses, } = self; - LinearDownloader { + ReverseHeadersDownloader { consensus, client, local_head: None, @@ -940,7 +945,7 @@ mod tests { let genesis = SealedHeader::default(); - let mut downloader = LinearDownloadBuilder::default() + let mut downloader = ReverseHeadersDownloaderBuilder::default() .build(Arc::new(TestConsensus::default()), Arc::clone(&client)); downloader.update_local_head(genesis); downloader.update_sync_target(SyncTarget::Tip(H256::random())); @@ -968,7 +973,7 @@ mod tests { let header = SealedHeader::default(); - let mut downloader = LinearDownloadBuilder::default() + let mut downloader = ReverseHeadersDownloaderBuilder::default() .build(Arc::new(TestConsensus::default()), Arc::clone(&client)); downloader.update_local_head(header.clone()); downloader.update_sync_target(SyncTarget::Tip(H256::random())); @@ -1008,7 +1013,7 @@ mod tests { let batch_size = 99; let start = 1000; - let mut downloader = LinearDownloadBuilder::default() + let mut downloader = ReverseHeadersDownloaderBuilder::default() .request_limit(batch_size) .build(Arc::new(TestConsensus::default()), Arc::clone(&client)); downloader.update_local_head(genesis); @@ -1057,7 +1062,7 @@ mod tests { let p1 = child_header(&p2); let p0 = child_header(&p1); - let mut downloader = LinearDownloadBuilder::default() + let mut downloader = ReverseHeadersDownloaderBuilder::default() .stream_batch_size(3) .request_limit(3) .build(Arc::new(TestConsensus::default()), Arc::clone(&client)); @@ -1089,7 +1094,7 @@ mod tests { let p0 = child_header(&p1); let client = Arc::new(TestHeadersClient::default()); - let mut downloader = LinearDownloadBuilder::default() + let mut downloader = ReverseHeadersDownloaderBuilder::default() .stream_batch_size(1) .request_limit(1) .build(Arc::new(TestConsensus::default()), Arc::clone(&client)); diff --git a/crates/net/downloaders/src/headers/task.rs b/crates/net/downloaders/src/headers/task.rs index 1d9fd0f05..320621022 100644 --- a/crates/net/downloaders/src/headers/task.rs +++ b/crates/net/downloaders/src/headers/task.rs @@ -40,12 +40,12 @@ impl TaskDownloader { /// /// ``` /// # use std::sync::Arc; - /// # use reth_downloaders::headers::linear::LinearDownloader; + /// # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloader; /// # use reth_downloaders::headers::task::TaskDownloader; /// # use reth_interfaces::consensus::Consensus; /// # use reth_interfaces::p2p::headers::client::HeadersClient; /// # fn t(consensus:Arc, client: Arc) { - /// let downloader = LinearDownloader::::builder().build( + /// let downloader = ReverseHeadersDownloader::::builder().build( /// consensus, /// client, /// ); @@ -153,7 +153,9 @@ enum DownloaderUpdates { #[cfg(test)] mod tests { use super::*; - use crate::headers::{linear::LinearDownloadBuilder, test_utils::child_header}; + use crate::headers::{ + reverse_headers::ReverseHeadersDownloaderBuilder, test_utils::child_header, + }; use reth_interfaces::test_utils::{TestConsensus, TestHeadersClient}; use std::sync::Arc; @@ -167,7 +169,7 @@ mod tests { let p0 = child_header(&p1); let client = Arc::new(TestHeadersClient::default()); - let downloader = LinearDownloadBuilder::default() + let downloader = ReverseHeadersDownloaderBuilder::default() .stream_batch_size(1) .request_limit(1) .build(Arc::new(TestConsensus::default()), Arc::clone(&client)); diff --git a/crates/net/downloaders/src/lib.rs b/crates/net/downloaders/src/lib.rs index 0ffa2d3cd..d909cf4ef 100644 --- a/crates/net/downloaders/src/lib.rs +++ b/crates/net/downloaders/src/lib.rs @@ -4,6 +4,7 @@ no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] +#![allow(clippy::result_large_err)] //! Implements the downloader algorithms. diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 7d841ce84..cf0af6e7d 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -4,6 +4,7 @@ no_crate_inject, attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) ))] +#![allow(clippy::result_large_err)] //! Staged syncing primitives for reth. //! //! This crate contains the syncing primitives [`Pipeline`] and [`Stage`], as well as all stages @@ -21,8 +22,8 @@ //! # use std::sync::Arc; //! # use reth_db::mdbx::test_utils::create_test_rw_db; //! # use reth_db::mdbx::{Env, WriteMap}; -//! # use reth_downloaders::bodies::concurrent::ConcurrentDownloaderBuilder; -//! # use reth_downloaders::headers::linear::LinearDownloadBuilder; +//! # use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; +//! # use reth_downloaders::headers::reverse_headers::ReverseHeadersDownloaderBuilder; //! # use reth_interfaces::consensus::Consensus; //! # use reth_interfaces::sync::NoopSyncStateUpdate; //! # use reth_interfaces::test_utils::{TestBodiesClient, TestConsensus, TestHeadersClient}; @@ -30,11 +31,11 @@ //! # use reth_stages::Pipeline; //! # use reth_stages::sets::DefaultStages; //! # let consensus: Arc = Arc::new(TestConsensus::default()); -//! # let headers_downloader = LinearDownloadBuilder::default().build( +//! # let headers_downloader = ReverseHeadersDownloaderBuilder::default().build( //! # consensus.clone(), //! # Arc::new(TestHeadersClient::default()) //! # ); -//! # let bodies_downloader = ConcurrentDownloaderBuilder::default().build( +//! # let bodies_downloader = BodiesDownloaderBuilder::default().build( //! # Arc::new(TestBodiesClient { responder: |_| Ok((PeerId::zero(), vec![]).into()) }), //! # consensus.clone(), //! # create_test_rw_db() diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index cc9e2006c..6b6cb983d 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -283,7 +283,9 @@ mod tests { tables, transaction::{DbTx, DbTxMut}, }; - use reth_downloaders::headers::linear::{LinearDownloadBuilder, LinearDownloader}; + use reth_downloaders::headers::reverse_headers::{ + ReverseHeadersDownloader, ReverseHeadersDownloaderBuilder, + }; use reth_interfaces::{ consensus::{Consensus, ForkchoiceState}, p2p::headers::downloader::HeaderDownloader, @@ -416,7 +418,7 @@ mod tests { } } - impl HeadersTestRunner> { + impl HeadersTestRunner> { pub(crate) fn with_linear_downloader() -> Self { let client = Arc::new(TestHeadersClient::default()); let consensus = Arc::new(TestConsensus::default()); @@ -424,7 +426,7 @@ mod tests { client: client.clone(), consensus: consensus.clone(), downloader_factory: Box::new(move || { - LinearDownloadBuilder::default() + ReverseHeadersDownloaderBuilder::default() .stream_batch_size(500) .build(consensus.clone(), client.clone()) }), diff --git a/docs/repo/crates/network.md b/docs/repo/crates/network.md index aa37c9a4a..52ab5441f 100644 --- a/docs/repo/crates/network.md +++ b/docs/repo/crates/network.md @@ -40,7 +40,7 @@ let network = start_network(network_config(db.clone(), chain_id, genesis_hash)). let fetch_client = Arc::new(network.fetch_client().await?); let mut pipeline = reth_stages::Pipeline::new() .push(HeaderStage { - downloader: headers::linear::LinearDownloadBuilder::default() + downloader: headers::reverse_headers::ReverseHeadersDownloaderBuilder::default() .batch_size(config.stages.headers.downloader_batch_size) .retries(config.stages.headers.downloader_retries) .build(consensus.clone(), fetch_client.clone()), @@ -52,7 +52,7 @@ let mut pipeline = reth_stages::Pipeline::new() }) .push(BodyStage { downloader: Arc::new( - bodies::concurrent::ConcurrentDownloader::new( + bodies::bodies::BodiesDownloader::new( fetch_client.clone(), consensus.clone(), ) @@ -345,11 +345,11 @@ impl BodiesClient for FetchClient { This functionality is used in the `HeaderStage` and `BodyStage`, respectively. -In the pipeline used by the main Reth binary, the `HeaderStage` uses a `LinearDownloader` to stream headers from the network: +In the pipeline used by the main Reth binary, the `HeaderStage` uses a `ReverseHeadersDownloader` to stream headers from the network: [File: crates/net/downloaders/src/headers/linear.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/downloaders/src/headers/linear.rs) ```rust,ignore -pub struct LinearDownloader { +pub struct ReverseHeadersDownloader { /// The consensus client consensus: Arc, /// The headers client @@ -361,7 +361,7 @@ pub struct LinearDownloader { } ``` -A `FetchClient` is passed in to the `client` field, and the `get_headers` method it implements gets used when polling the stream created by the `LinearDownloader` in the `execute` method of the `HeaderStage`. +A `FetchClient` is passed in to the `client` field, and the `get_headers` method it implements gets used when polling the stream created by the `ReverseHeadersDownloader` in the `execute` method of the `HeaderStage`. [File: crates/net/downloaders/src/headers/linear.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/downloaders/src/headers/linear.rs) ```rust,ignore @@ -387,11 +387,11 @@ fn get_or_init_fut(&mut self) -> HeadersRequestFuture { } ``` -In the `BodyStage` configured by the main binary, a `ConcurrentDownloader` is used: +In the `BodyStage` configured by the main binary, a `BodiesDownloader` is used: [File: crates/net/downloaders/src/bodies/concurrent.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/downloaders/src/bodies/concurrent.rs) ```rust,ignore -pub struct ConcurrentDownloader { +pub struct BodiesDownloader { /// The bodies client client: Arc, /// The consensus client @@ -405,7 +405,7 @@ pub struct ConcurrentDownloader { } ``` -Here, similarly, a `FetchClient` is passed in to the `client` field, and the `get_block_bodies` method it implements is used when constructing the stream created by the `ConcurrentDownloader` in the `execute` method of the `BodyStage`. +Here, similarly, a `FetchClient` is passed in to the `client` field, and the `get_block_bodies` method it implements is used when constructing the stream created by the `BodiesDownloader` in the `execute` method of the `BodyStage`. [File: crates/net/downloaders/src/bodies/concurrent.rs](https://github.com/paradigmxyz/reth/blob/main/crates/net/downloaders/src/bodies/concurrent.rs) ```rust,ignore @@ -469,7 +469,7 @@ pub struct EthRequestHandler { } ``` -The `client` field here is a client that's used to fetch data from the database, not to be confused with the `client` field on a downloader like the `LinearDownloader` discussed above, which is a `FetchClient`. +The `client` field here is a client that's used to fetch data from the database, not to be confused with the `client` field on a downloader like the `ReverseHeadersDownloader` discussed above, which is a `FetchClient`. ### Input Streams to the ETH Requests Task