feat: finish concurrent body downloader (#220)

* refactor: remove timeout config from downloader

The timeout should be controlled by the client
implementation.

* feat: downloader request retries

* test: add concurrent body downloader tests

* chore: fmt

Co-authored-by: Georgios Konstantopoulos <me@gakonst.com>
This commit is contained in:
Bjerg
2022-11-22 21:06:49 +01:00
committed by GitHub
parent 89ffaf541b
commit a523cb7024
5 changed files with 266 additions and 83 deletions

15
Cargo.lock generated
View File

@ -180,6 +180,18 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backon"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6cd1a59bc091e593ee9ed62df4e4a07115e00a0e0a52fd7e0e04540773939b80"
dependencies = [
"futures",
"pin-project",
"rand 0.8.5",
"tokio",
]
[[package]]
name = "bare-metal"
version = "0.2.5"
@ -3057,13 +3069,14 @@ name = "reth-bodies-downloaders"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"backon",
"futures-util",
"once_cell",
"rand 0.8.5",
"reth-eth-wire",
"reth-interfaces",
"reth-primitives",
"serial_test",
"tokio",
]

View File

@ -2,7 +2,7 @@ use super::client::BodiesClient;
use crate::p2p::bodies::error::DownloadError;
use reth_eth_wire::BlockBody;
use reth_primitives::{BlockNumber, H256};
use std::{pin::Pin, time::Duration};
use std::pin::Pin;
use tokio_stream::Stream;
/// A downloader capable of fetching block bodies from header hashes.
@ -13,9 +13,6 @@ pub trait BodyDownloader: Sync + Send {
/// The [BodiesClient] used to fetch the block bodies
type Client: BodiesClient;
/// The request timeout duration
fn timeout(&self) -> Duration;
/// The block bodies client
fn client(&self) -> &Self::Client;

View File

@ -12,10 +12,13 @@ futures-util = "0.3.25"
reth-interfaces = { path = "../../interfaces" }
reth-primitives = { path = "../../primitives" }
reth-eth-wire = { path= "../eth-wire" }
backon = "0.2.0"
[dev-dependencies]
assert_matches = "1.5.0"
once_cell = "1.15.0"
rand = "0.8.5"
reth-interfaces = { path = "../../interfaces", features = ["test-utils"] }
tokio = { version = "1.21.2", features = ["full"] }
serial_test = "0.9.0"
async-trait = "0.1.58"
futures-util = "0.3.25"

View File

@ -1,11 +1,13 @@
use futures_util::{stream, StreamExt, TryFutureExt};
use backon::{ExponentialBackoff, Retryable};
use futures_util::{stream, StreamExt};
use reth_eth_wire::BlockBody;
use reth_interfaces::p2p::bodies::{
client::BodiesClient,
downloader::{BodiesStream, BodyDownloader},
error::{BodiesClientError, DownloadError},
};
use reth_primitives::{BlockNumber, H256};
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
/// Downloads bodies in batches.
///
@ -14,22 +16,15 @@ use std::{sync::Arc, time::Duration};
pub struct ConcurrentDownloader<C> {
/// The bodies client
client: Arc<C>,
/// The number of retries for each request.
retries: usize,
/// The batch size per one request
pub batch_size: usize,
/// A single request timeout
pub request_timeout: Duration,
/// The number of retries for downloading
pub request_retries: usize,
batch_size: usize,
}
impl<C: BodiesClient> BodyDownloader for ConcurrentDownloader<C> {
type Client = C;
/// The request timeout duration
fn timeout(&self) -> Duration {
self.request_timeout
}
/// The block bodies client
fn client(&self) -> &Self::Client {
&self.client
@ -41,92 +36,271 @@ impl<C: BodiesClient> BodyDownloader for ConcurrentDownloader<C> {
<I as IntoIterator>::IntoIter: Send + 'b,
'b: 'a,
{
// TODO: Retry
Box::pin(
stream::iter(headers.into_iter().map(|(block_number, header_hash)| {
{
self.client
.get_block_body(*header_hash)
.map_ok(move |body| (*block_number, *header_hash, body))
.map_err(|err| match err {
BodiesClientError::Timeout { header_hash } => {
DownloadError::Timeout { header_hash }
}
err => DownloadError::Client { source: err },
})
}
(|| self.fetch_body(*block_number, *header_hash))
.retry(ExponentialBackoff::default().with_max_times(self.retries))
.when(|err| err.is_retryable())
}))
.buffered(self.batch_size),
)
}
}
/// A [ConcurrentDownloader] builder.
#[derive(Debug)]
pub struct ConcurrentDownloaderBuilder {
/// The batch size per one request
batch_size: usize,
/// A single request timeout
request_timeout: Duration,
/// The number of retries for downloading
request_retries: usize,
}
impl Default for ConcurrentDownloaderBuilder {
fn default() -> Self {
Self { batch_size: 100, request_timeout: Duration::from_millis(100), request_retries: 5 }
impl<C: BodiesClient> ConcurrentDownloader<C> {
/// Create a new concurrent downloader instance.
pub fn new(client: Arc<C>) -> Self {
Self { client, retries: 3, batch_size: 100 }
}
}
impl ConcurrentDownloaderBuilder {
/// Set the request batch size
pub fn batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
/// Set the number of blocks to fetch at the same time.
///
/// Defaults to 100.
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
/// Set the request timeout
pub fn timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
/// Set the number of times to retry body fetch requests.
///
/// Defaults to 3.
pub fn with_retries(mut self, retries: usize) -> Self {
self.retries = retries;
self
}
/// Set the number of retries per request
pub fn retries(mut self, retries: usize) -> Self {
self.request_retries = retries;
self
}
/// Build [ConcurrentDownloader] with the provided client
pub fn build<C: BodiesClient>(self, client: Arc<C>) -> ConcurrentDownloader<C> {
ConcurrentDownloader {
client,
batch_size: self.batch_size,
request_timeout: self.request_timeout,
request_retries: self.request_retries,
/// Fetch a single block body.
async fn fetch_body(
&self,
block_number: BlockNumber,
header_hash: H256,
) -> Result<(BlockNumber, H256, BlockBody), DownloadError> {
match self.client.get_block_body(header_hash).await {
Ok(body) => Ok((block_number, header_hash, body)),
Err(err) => Err(match err {
BodiesClientError::Timeout { header_hash } => {
DownloadError::Timeout { header_hash }
}
err => DownloadError::Client { source: err },
}),
}
}
}
// TODO: Cleanup
#[cfg(test)]
mod tests {
#[tokio::test]
#[ignore]
async fn emits_bodies_in_order() {}
use super::*;
use crate::concurrent::{
tests::test_utils::{generate_bodies, TestClient},
ConcurrentDownloader,
};
use assert_matches::assert_matches;
use futures_util::stream::{StreamExt, TryStreamExt};
use reth_interfaces::p2p::{
bodies::{downloader::BodyDownloader, error::BodiesClientError},
error::RequestError,
};
use reth_primitives::H256;
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
// Check that the blocks are emitted in order of block number, not in order of
// first-downloaded
#[tokio::test]
#[ignore]
async fn header_iter_failure() {}
async fn emits_bodies_in_order() {
// Generate some random blocks
let (hashes, mut bodies) = generate_bodies(0..20);
#[tokio::test]
#[ignore]
async fn client_failure() {}
let downloader = ConcurrentDownloader::new(Arc::new(TestClient::new(|hash: H256| {
let mut bodies = bodies.clone();
async move {
// Simulate that the request for this (random) block takes 0-100ms
tokio::time::sleep(Duration::from_millis(hash.to_low_u64_be() % 100)).await;
#[tokio::test]
#[ignore]
async fn retries_requests() {}
Ok(bodies
.remove(&hash)
.expect("Downloader asked for a block it should not ask for"))
}
})));
assert_matches!(
downloader
.bodies_stream(hashes.iter())
.try_collect::<Vec<(BlockNumber, H256, BlockBody)>>()
.await,
Ok(responses) => {
assert_eq!(
responses,
hashes
.into_iter()
.map(|(num, hash)| {
(num, hash, bodies.remove(&hash).unwrap())
})
.collect::<Vec<(BlockNumber, H256, BlockBody)>>()
);
}
);
}
/// Checks that non-retryable errors bubble up
#[tokio::test]
#[ignore]
async fn timeout() {}
async fn client_failure() {
let downloader = ConcurrentDownloader::new(Arc::new(TestClient::new(|_: H256| async {
Err(BodiesClientError::Internal(RequestError::ChannelClosed))
})));
assert_matches!(
downloader.bodies_stream(&[(0, H256::zero())]).next().await,
Some(Err(DownloadError::Client {
source: BodiesClientError::Internal(RequestError::ChannelClosed)
}))
);
}
/// Checks that the body request is retried on timeouts
#[tokio::test]
async fn retries_timeouts() {
let retries_left = Arc::new(AtomicUsize::new(3));
let downloader =
ConcurrentDownloader::new(Arc::new(TestClient::new(|header_hash: H256| {
let retries_left = retries_left.clone();
async move {
if retries_left.load(Ordering::SeqCst) > 0 {
retries_left.fetch_sub(1, Ordering::SeqCst);
Err(BodiesClientError::Timeout { header_hash })
} else {
Ok(BlockBody { transactions: vec![], ommers: vec![] })
}
}
})));
assert_matches!(
downloader.bodies_stream(&[(0, H256::zero())]).next().await,
Some(Ok(body)) => {
assert_eq!(body.0, 0);
assert_eq!(body.1, H256::zero());
assert_eq!(body.2, BlockBody {
transactions: vec![],
ommers: vec![]
})
}
);
assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 0);
}
/// Checks that the timeout error bubbles up if we've retried too many times
#[tokio::test]
async fn too_many_retries() {
let retries_left = Arc::new(AtomicUsize::new(3));
let downloader =
ConcurrentDownloader::new(Arc::new(TestClient::new(|header_hash: H256| {
let retries_left = retries_left.clone();
async move {
if retries_left.load(Ordering::SeqCst) > 0 {
retries_left.fetch_sub(1, Ordering::SeqCst);
Err(BodiesClientError::Timeout { header_hash })
} else {
Ok(BlockBody { transactions: vec![], ommers: vec![] })
}
}
})))
.with_retries(0);
assert_matches!(
downloader.bodies_stream(&[(0, H256::zero())]).next().await,
Some(Err(DownloadError::Timeout { header_hash })) => {
assert_eq!(header_hash, H256::zero())
}
);
assert_eq!(Arc::try_unwrap(retries_left).unwrap().into_inner(), 2);
}
mod test_utils {
use async_trait::async_trait;
use reth_eth_wire::BlockBody;
use reth_interfaces::{
p2p::bodies::{client::BodiesClient, error::BodiesClientError},
test_utils::generators::random_block_range,
};
use reth_primitives::{BlockNumber, H256};
use std::{
collections::HashMap,
fmt::{Debug, Formatter},
future::Future,
sync::Arc,
};
use tokio::sync::Mutex;
/// Generate a set of bodies and their corresponding block hashes
pub(crate) fn generate_bodies(
rng: std::ops::Range<u64>,
) -> (Vec<(BlockNumber, H256)>, HashMap<H256, BlockBody>) {
let blocks = random_block_range(rng, H256::zero());
let hashes: Vec<(BlockNumber, H256)> =
blocks.iter().map(|block| (block.number, block.hash())).collect();
let bodies: HashMap<H256, BlockBody> = blocks
.into_iter()
.map(|block| {
(
block.hash(),
BlockBody {
transactions: block.body,
ommers: block
.ommers
.into_iter()
.map(|header| header.unseal())
.collect(),
},
)
})
.collect();
(hashes, bodies)
}
/// A [BodiesClient] for testing.
pub(crate) struct TestClient<F, Fut>(pub(crate) Arc<Mutex<F>>)
where
F: FnMut(H256) -> Fut + Send + Sync,
Fut: Future<Output = Result<BlockBody, BodiesClientError>> + Send;
impl<F, Fut> Debug for TestClient<F, Fut>
where
F: FnMut(H256) -> Fut + Send + Sync,
Fut: Future<Output = Result<BlockBody, BodiesClientError>> + Send,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestClient").finish()
}
}
impl<F, Fut> TestClient<F, Fut>
where
F: FnMut(H256) -> Fut + Send + Sync,
Fut: Future<Output = Result<BlockBody, BodiesClientError>> + Send,
{
pub(crate) fn new(f: F) -> Self {
Self(Arc::new(Mutex::new(f)))
}
}
#[async_trait]
impl<F, Fut> BodiesClient for TestClient<F, Fut>
where
F: FnMut(H256) -> Fut + Send + Sync,
Fut: Future<Output = Result<BlockBody, BodiesClientError>> + Send,
{
async fn get_block_body(&self, hash: H256) -> Result<BlockBody, BodiesClientError> {
let f = &mut *self.0.lock().await;
(f)(hash).await
}
}
}
}

View File

@ -475,7 +475,7 @@ mod tests {
test_utils::{generators::random_block_range, TestConsensus},
};
use reth_primitives::{BlockLocked, BlockNumber, Header, SealedHeader, H256};
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc};
/// The block hash of the genesis block.
pub(crate) const GENESIS_HASH: H256 = H256::zero();
@ -674,10 +674,6 @@ mod tests {
impl BodyDownloader for TestBodyDownloader {
type Client = NoopClient;
fn timeout(&self) -> Duration {
unreachable!()
}
fn client(&self) -> &Self::Client {
unreachable!()
}