diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index 9b9fcb2b6..a1451bb5b 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -735,4 +735,57 @@ mod tests { Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().skip(100), &mut bodies)) ); } + + // Check that the downloader continues after the size limit is reached. + #[tokio::test] + async fn can_download_after_exceeding_limit() { + // Generate some random blocks + let db = create_test_rw_db(); + let (headers, mut bodies) = generate_bodies(0..=199); + + insert_headers(db.db(), &headers); + + let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone())); + // Set the max buffered block size to 1 byte, to make sure that every response exceeds the + // limit + let mut downloader = BodiesDownloaderBuilder::default() + .with_stream_batch_size(10) + .with_request_limit(1) + .with_max_buffered_blocks_size_bytes(1) + .build(client.clone(), Arc::new(TestConsensus::default()), db); + + // Set and download the entire range + downloader.set_download_range(0..=199).expect("failed to set download range"); + let mut header = 0; + while let Some(Ok(resp)) = downloader.next().await { + assert_eq!(resp, zip_blocks(headers.iter().skip(header).take(resp.len()), &mut bodies)); + header += resp.len(); + } + } + + // Check that the downloader can tolerate a few completely empty responses + #[tokio::test] + async fn can_tolerate_empty_responses() { + // Generate some random blocks + let db = create_test_rw_db(); + let (headers, mut bodies) = generate_bodies(0..=99); + + insert_headers(db.db(), &headers); + + // respond with empty bodies for every other request. + let client = Arc::new( + TestBodiesClient::default().with_bodies(bodies.clone()).with_empty_responses(2), + ); + let mut downloader = BodiesDownloaderBuilder::default() + .with_request_limit(3) + .with_stream_batch_size(100) + .build(client.clone(), Arc::new(TestConsensus::default()), db); + + // Download the requested range + downloader.set_download_range(0..=99).expect("failed to set download range"); + assert_matches!( + downloader.next().await, + Some(Ok(res)) => assert_eq!(res, zip_blocks(headers.iter().take(100), &mut bodies)) + ); + } } diff --git a/crates/net/downloaders/src/test_utils/bodies_client.rs b/crates/net/downloaders/src/test_utils/bodies_client.rs index e2968fc8d..ac791742d 100644 --- a/crates/net/downloaders/src/test_utils/bodies_client.rs +++ b/crates/net/downloaders/src/test_utils/bodies_client.rs @@ -22,6 +22,7 @@ pub struct TestBodiesClient { should_delay: bool, max_batch_size: Option, times_requested: AtomicU64, + empty_response_mod: Option, } impl TestBodiesClient { @@ -35,6 +36,13 @@ impl TestBodiesClient { self } + /// Instructs the client to respond with empty responses some portion of the time. Every + /// `empty_mod` responses, the client will respond with an empty response. + pub(crate) fn with_empty_responses(mut self, empty_mod: u64) -> Self { + self.empty_response_mod = Some(empty_mod); + self + } + pub(crate) fn with_max_batch_size(mut self, max_batch_size: usize) -> Self { self.max_batch_size = Some(max_batch_size); self @@ -43,6 +51,18 @@ impl TestBodiesClient { pub(crate) fn times_requested(&self) -> u64 { self.times_requested.load(Ordering::Relaxed) } + + /// Returns whether or not the client should respond with an empty response. + /// + /// This will only return true if `empty_response_mod` is `Some`, and `times_requested % + /// empty_response_mod == 0`. + pub(crate) fn should_respond_empty(&self) -> bool { + if let Some(empty_response_mod) = self.empty_response_mod { + self.times_requested.load(Ordering::Relaxed) % empty_response_mod == 0 + } else { + false + } + } } impl DownloadClient for TestBodiesClient { @@ -68,8 +88,13 @@ impl BodiesClient for TestBodiesClient { let max_batch_size = self.max_batch_size; self.times_requested.fetch_add(1, Ordering::Relaxed); + let should_respond_empty = self.should_respond_empty(); Box::pin(async move { + if should_respond_empty { + return Ok((PeerId::default(), vec![]).into()); + } + if should_delay { tokio::time::sleep(Duration::from_millis((hashes[0][0] % 100) as u64)).await; }