diff --git a/Cargo.lock b/Cargo.lock index a9dace3c4..17d8714fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,6 +180,18 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backon" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6cd1a59bc091e593ee9ed62df4e4a07115e00a0e0a52fd7e0e04540773939b80" +dependencies = [ + "futures", + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "bare-metal" version = "0.2.5" @@ -3057,13 +3069,14 @@ name = "reth-bodies-downloaders" version = "0.1.0" dependencies = [ "assert_matches", + "async-trait", + "backon", "futures-util", "once_cell", "rand 0.8.5", "reth-eth-wire", "reth-interfaces", "reth-primitives", - "serial_test", "tokio", ] diff --git a/crates/interfaces/src/p2p/bodies/downloader.rs b/crates/interfaces/src/p2p/bodies/downloader.rs index 677cbab0c..a1011eb1f 100644 --- a/crates/interfaces/src/p2p/bodies/downloader.rs +++ b/crates/interfaces/src/p2p/bodies/downloader.rs @@ -2,7 +2,7 @@ use super::client::BodiesClient; use crate::p2p::bodies::error::DownloadError; use reth_eth_wire::BlockBody; use reth_primitives::{BlockNumber, H256}; -use std::{pin::Pin, time::Duration}; +use std::pin::Pin; use tokio_stream::Stream; /// A downloader capable of fetching block bodies from header hashes. @@ -13,9 +13,6 @@ pub trait BodyDownloader: Sync + Send { /// The [BodiesClient] used to fetch the block bodies type Client: BodiesClient; - /// The request timeout duration - fn timeout(&self) -> Duration; - /// The block bodies client fn client(&self) -> &Self::Client; diff --git a/crates/net/bodies-downloaders/Cargo.toml b/crates/net/bodies-downloaders/Cargo.toml index c54b263f9..ca7dc74f1 100644 --- a/crates/net/bodies-downloaders/Cargo.toml +++ b/crates/net/bodies-downloaders/Cargo.toml @@ -12,10 +12,13 @@ futures-util = "0.3.25" reth-interfaces = { path = "../../interfaces" } reth-primitives = { path = "../../primitives" } reth-eth-wire = { path= "../eth-wire" } +backon = "0.2.0" + [dev-dependencies] assert_matches = "1.5.0" once_cell = "1.15.0" rand = "0.8.5" reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } tokio = { version = "1.21.2", features = ["full"] } -serial_test = "0.9.0" +async-trait = "0.1.58" +futures-util = "0.3.25" \ No newline at end of file diff --git a/crates/net/bodies-downloaders/src/concurrent.rs b/crates/net/bodies-downloaders/src/concurrent.rs index 3fa66297c..1a1f686f2 100644 --- a/crates/net/bodies-downloaders/src/concurrent.rs +++ b/crates/net/bodies-downloaders/src/concurrent.rs @@ -1,11 +1,13 @@ -use futures_util::{stream, StreamExt, TryFutureExt}; +use backon::{ExponentialBackoff, Retryable}; +use futures_util::{stream, StreamExt}; +use reth_eth_wire::BlockBody; use reth_interfaces::p2p::bodies::{ client::BodiesClient, downloader::{BodiesStream, BodyDownloader}, error::{BodiesClientError, DownloadError}, }; use reth_primitives::{BlockNumber, H256}; -use std::{sync::Arc, time::Duration}; +use std::sync::Arc; /// Downloads bodies in batches. /// @@ -14,22 +16,15 @@ use std::{sync::Arc, time::Duration}; pub struct ConcurrentDownloader { /// The bodies client client: Arc, + /// The number of retries for each request. + retries: usize, /// The batch size per one request - pub batch_size: usize, - /// A single request timeout - pub request_timeout: Duration, - /// The number of retries for downloading - pub request_retries: usize, + batch_size: usize, } impl BodyDownloader for ConcurrentDownloader { type Client = C; - /// The request timeout duration - fn timeout(&self) -> Duration { - self.request_timeout - } - /// The block bodies client fn client(&self) -> &Self::Client { &self.client @@ -41,92 +36,271 @@ impl BodyDownloader for ConcurrentDownloader { ::IntoIter: Send + 'b, 'b: 'a, { - // TODO: Retry Box::pin( stream::iter(headers.into_iter().map(|(block_number, header_hash)| { - { - self.client - .get_block_body(*header_hash) - .map_ok(move |body| (*block_number, *header_hash, body)) - .map_err(|err| match err { - BodiesClientError::Timeout { header_hash } => { - DownloadError::Timeout { header_hash } - } - err => DownloadError::Client { source: err }, - }) - } + (|| self.fetch_body(*block_number, *header_hash)) + .retry(ExponentialBackoff::default().with_max_times(self.retries)) + .when(|err| err.is_retryable()) })) .buffered(self.batch_size), ) } } -/// A [ConcurrentDownloader] builder. -#[derive(Debug)] -pub struct ConcurrentDownloaderBuilder { - /// The batch size per one request - batch_size: usize, - /// A single request timeout - request_timeout: Duration, - /// The number of retries for downloading - request_retries: usize, -} - -impl Default for ConcurrentDownloaderBuilder { - fn default() -> Self { - Self { batch_size: 100, request_timeout: Duration::from_millis(100), request_retries: 5 } +impl ConcurrentDownloader { + /// Create a new concurrent downloader instance. + pub fn new(client: Arc) -> Self { + Self { client, retries: 3, batch_size: 100 } } -} -impl ConcurrentDownloaderBuilder { - /// Set the request batch size - pub fn batch_size(mut self, size: usize) -> Self { - self.batch_size = size; + /// Set the number of blocks to fetch at the same time. + /// + /// Defaults to 100. + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; self } - /// Set the request timeout - pub fn timeout(mut self, timeout: Duration) -> Self { - self.request_timeout = timeout; + /// Set the number of times to retry body fetch requests. + /// + /// Defaults to 3. + pub fn with_retries(mut self, retries: usize) -> Self { + self.retries = retries; self } - /// Set the number of retries per request - pub fn retries(mut self, retries: usize) -> Self { - self.request_retries = retries; - self - } - - /// Build [ConcurrentDownloader] with the provided client - pub fn build(self, client: Arc) -> ConcurrentDownloader { - ConcurrentDownloader { - client, - batch_size: self.batch_size, - request_timeout: self.request_timeout, - request_retries: self.request_retries, + /// Fetch a single block body. + async fn fetch_body( + &self, + block_number: BlockNumber, + header_hash: H256, + ) -> Result<(BlockNumber, H256, BlockBody), DownloadError> { + match self.client.get_block_body(header_hash).await { + Ok(body) => Ok((block_number, header_hash, body)), + Err(err) => Err(match err { + BodiesClientError::Timeout { header_hash } => { + DownloadError::Timeout { header_hash } + } + err => DownloadError::Client { source: err }, + }), } } } +// TODO: Cleanup #[cfg(test)] mod tests { - #[tokio::test] - #[ignore] - async fn emits_bodies_in_order() {} + use super::*; + use crate::concurrent::{ + tests::test_utils::{generate_bodies, TestClient}, + ConcurrentDownloader, + }; + use assert_matches::assert_matches; + use futures_util::stream::{StreamExt, TryStreamExt}; + use reth_interfaces::p2p::{ + bodies::{downloader::BodyDownloader, error::BodiesClientError}, + error::RequestError, + }; + use reth_primitives::H256; + use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, + }; + // Check that the blocks are emitted in order of block number, not in order of + // first-downloaded #[tokio::test] - #[ignore] - async fn header_iter_failure() {} + async fn emits_bodies_in_order() { + // Generate some random blocks + let (hashes, mut bodies) = generate_bodies(0..20); - #[tokio::test] - #[ignore] - async fn client_failure() {} + let downloader = ConcurrentDownloader::new(Arc::new(TestClient::new(|hash: H256| { + let mut bodies = bodies.clone(); + async move { + // Simulate that the request for this (random) block takes 0-100ms + tokio::time::sleep(Duration::from_millis(hash.to_low_u64_be() % 100)).await; - #[tokio::test] - #[ignore] - async fn retries_requests() {} + Ok(bodies + .remove(&hash) + .expect("Downloader asked for a block it should not ask for")) + } + }))); + assert_matches!( + downloader + .bodies_stream(hashes.iter()) + .try_collect::>() + .await, + Ok(responses) => { + assert_eq!( + responses, + hashes + .into_iter() + .map(|(num, hash)| { + (num, hash, bodies.remove(&hash).unwrap()) + }) + .collect::>() + ); + } + ); + } + + /// Checks that non-retryable errors bubble up #[tokio::test] - #[ignore] - async fn timeout() {} + async fn client_failure() { + let downloader = ConcurrentDownloader::new(Arc::new(TestClient::new(|_: H256| async { + Err(BodiesClientError::Internal(RequestError::ChannelClosed)) + }))); + + assert_matches!( + downloader.bodies_stream(&[(0, H256::zero())]).next().await, + Some(Err(DownloadError::Client { + source: BodiesClientError::Internal(RequestError::ChannelClosed) + })) + ); + } + + /// Checks that the body request is retried on timeouts + #[tokio::test] + async fn retries_timeouts() { + let retries_left = Arc::new(AtomicUsize::new(3)); + let downloader = + ConcurrentDownloader::new(Arc::new(TestClient::new(|header_hash: H256| { + let retries_left = retries_left.clone(); + async move { + if retries_left.load(Ordering::SeqCst) > 0 { + retries_left.fetch_sub(1, Ordering::SeqCst); + Err(BodiesClientError::Timeout { header_hash }) + } else { + Ok(BlockBody { transactions: vec![], ommers: vec![] }) + } + } + }))); + + assert_matches!( + downloader.bodies_stream(&[(0, H256::zero())]).next().await, + Some(Ok(body)) => { + assert_eq!(body.0, 0); + assert_eq!(body.1, H256::zero()); + assert_eq!(body.2, BlockBody { + transactions: vec![], + ommers: vec![] + }) + } + ); + assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 0); + } + + /// Checks that the timeout error bubbles up if we've retried too many times + #[tokio::test] + async fn too_many_retries() { + let retries_left = Arc::new(AtomicUsize::new(3)); + let downloader = + ConcurrentDownloader::new(Arc::new(TestClient::new(|header_hash: H256| { + let retries_left = retries_left.clone(); + async move { + if retries_left.load(Ordering::SeqCst) > 0 { + retries_left.fetch_sub(1, Ordering::SeqCst); + Err(BodiesClientError::Timeout { header_hash }) + } else { + Ok(BlockBody { transactions: vec![], ommers: vec![] }) + } + } + }))) + .with_retries(0); + + assert_matches!( + downloader.bodies_stream(&[(0, H256::zero())]).next().await, + Some(Err(DownloadError::Timeout { header_hash })) => { + assert_eq!(header_hash, H256::zero()) + } + ); + assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 2); + } + + mod test_utils { + use async_trait::async_trait; + use reth_eth_wire::BlockBody; + use reth_interfaces::{ + p2p::bodies::{client::BodiesClient, error::BodiesClientError}, + test_utils::generators::random_block_range, + }; + use reth_primitives::{BlockNumber, H256}; + use std::{ + collections::HashMap, + fmt::{Debug, Formatter}, + future::Future, + sync::Arc, + }; + use tokio::sync::Mutex; + + /// Generate a set of bodies and their corresponding block hashes + pub(crate) fn generate_bodies( + rng: std::ops::Range, + ) -> (Vec<(BlockNumber, H256)>, HashMap) { + let blocks = random_block_range(rng, H256::zero()); + + let hashes: Vec<(BlockNumber, H256)> = + blocks.iter().map(|block| (block.number, block.hash())).collect(); + let bodies: HashMap = blocks + .into_iter() + .map(|block| { + ( + block.hash(), + BlockBody { + transactions: block.body, + ommers: block + .ommers + .into_iter() + .map(|header| header.unseal()) + .collect(), + }, + ) + }) + .collect(); + + (hashes, bodies) + } + + /// A [BodiesClient] for testing. + pub(crate) struct TestClient(pub(crate) Arc>) + where + F: FnMut(H256) -> Fut + Send + Sync, + Fut: Future> + Send; + + impl Debug for TestClient + where + F: FnMut(H256) -> Fut + Send + Sync, + Fut: Future> + Send, + { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TestClient").finish() + } + } + + impl TestClient + where + F: FnMut(H256) -> Fut + Send + Sync, + Fut: Future> + Send, + { + pub(crate) fn new(f: F) -> Self { + Self(Arc::new(Mutex::new(f))) + } + } + + #[async_trait] + impl BodiesClient for TestClient + where + F: FnMut(H256) -> Fut + Send + Sync, + Fut: Future> + Send, + { + async fn get_block_body(&self, hash: H256) -> Result { + let f = &mut *self.0.lock().await; + (f)(hash).await + } + } + } } diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index 76ecec26d..a805cc115 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -475,7 +475,7 @@ mod tests { test_utils::{generators::random_block_range, TestConsensus}, }; use reth_primitives::{BlockLocked, BlockNumber, Header, SealedHeader, H256}; - use std::{collections::HashMap, sync::Arc, time::Duration}; + use std::{collections::HashMap, sync::Arc}; /// The block hash of the genesis block. pub(crate) const GENESIS_HASH: H256 = H256::zero(); @@ -674,10 +674,6 @@ mod tests { impl BodyDownloader for TestBodyDownloader { type Client = NoopClient; - fn timeout(&self) -> Duration { - unreachable!() - } - fn client(&self) -> &Self::Client { unreachable!() }