fix: shrink queued bodies and headers when drained or split (#3191)

This commit is contained in:
Dan Cline
2023-06-15 18:35:21 -04:00
committed by GitHub
parent 5375a2dd6a
commit 5e0508b973
2 changed files with 82 additions and 4 deletions

View File

@ -200,8 +200,8 @@ where
self.download_range = RangeInclusive::new(1, 0);
self.latest_queued_block_number.take();
self.in_progress_queue.clear();
self.queued_bodies.clear();
self.buffered_responses.clear();
self.queued_bodies = Vec::new();
self.buffered_responses = BinaryHeap::new();
self.num_buffered_blocks = 0;
// reset metrics
@ -270,6 +270,7 @@ where
fn try_split_next_batch(&mut self) -> Option<Vec<BlockResponse>> {
if self.queued_bodies.len() >= self.stream_batch_size {
let next_batch = self.queued_bodies.drain(..self.stream_batch_size).collect::<Vec<_>>();
self.queued_bodies.shrink_to_fit();
self.metrics.total_flushed.increment(next_batch.len() as u64);
self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
return Some(next_batch)
@ -410,6 +411,9 @@ where
this.queue_bodies(buf_response);
}
// shrink the buffer so that it doesn't grow indefinitely
this.buffered_responses.shrink_to_fit();
if !new_request_submitted {
break
}
@ -422,6 +426,7 @@ where
}
let batch_size = this.stream_batch_size.min(this.queued_bodies.len());
let next_batch = this.queued_bodies.drain(..batch_size).collect::<Vec<_>>();
this.queued_bodies.shrink_to_fit();
this.metrics.total_flushed.increment(next_batch.len() as u64);
this.metrics.queued_blocks.set(this.queued_bodies.len() as f64);
return Poll::Ready(Some(Ok(next_batch)))

View File

@ -600,8 +600,8 @@ where
/// Clears all requests/responses.
fn clear(&mut self) {
self.lowest_validated_header.take();
self.queued_validated_headers.clear();
self.buffered_responses.clear();
self.queued_validated_headers = Vec::new();
self.buffered_responses = BinaryHeap::new();
self.in_progress_queue.clear();
self.metrics.in_flight_requests.set(0.);
@ -613,6 +613,22 @@ where
let batch_size = self.stream_batch_size.min(self.queued_validated_headers.len());
let mut rem = self.queued_validated_headers.split_off(batch_size);
std::mem::swap(&mut rem, &mut self.queued_validated_headers);
// If the downloader consumer does not flush headers at the same rate that the downloader
// queues them, then the `queued_validated_headers` buffer can grow unbounded.
//
// The semantics of `split_off` state that the capacity of the original buffer is
// unchanged, so queued_validated_headers will then have only `batch_size` elements, and
// its original capacity. Because `rem` is initially populated with elements `[batch_size,
// len)` of `queued_validated_headers`, it will have a capacity of at least `len -
// batch_size`, and the total memory allocated by the two buffers will be around double the
// original size of `queued_validated_headers`.
//
// These are then mem::swapped, leaving `rem` with a large capacity, but small length.
//
// To prevent these allocations from leaking to the consumer, we shrink the capacity of the
// new buffer. The total memory allocated should then be not much more than the original
// size of `queued_validated_headers`.
rem.shrink_to_fit();
rem
}
}
@ -776,6 +792,9 @@ where
}
}
// shrink the buffer after handling sync target outcomes
this.buffered_responses.shrink_to_fit();
// this loop will submit new requests and poll them, if a new batch is ready it is returned
// The actual work is done by the receiver of the request channel, this means, polling the
// request future is just reading from a `oneshot::Receiver`. Hence, this loop tries to keep
@ -805,6 +824,9 @@ where
};
}
// shrink the buffer after handling headers outcomes
this.buffered_responses.shrink_to_fit();
// marks the loop's exit condition: exit if no requests submitted
let mut progress = false;
@ -1426,11 +1448,62 @@ mod tests {
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p0]));
let headers = headers.unwrap();
assert_eq!(headers.capacity(), headers.len());
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p1]));
let headers = headers.unwrap();
assert_eq!(headers.capacity(), headers.len());
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p2]));
let headers = headers.unwrap();
assert_eq!(headers.capacity(), headers.len());
assert!(downloader.next().await.is_none());
}
#[tokio::test]
async fn download_one_by_one_larger_request_limit() {
reth_tracing::init_test_tracing();
let p3 = SealedHeader::default();
let p2 = child_header(&p3);
let p1 = child_header(&p2);
let p0 = child_header(&p1);
let client = Arc::new(TestHeadersClient::default());
let mut downloader = ReverseHeadersDownloaderBuilder::default()
.stream_batch_size(1)
.request_limit(3)
.build(Arc::clone(&client), Arc::new(TestConsensus::default()));
downloader.update_local_head(p3.clone());
downloader.update_sync_target(SyncTarget::Tip(p0.hash()));
client
.extend(vec![
p0.as_ref().clone(),
p1.as_ref().clone(),
p2.as_ref().clone(),
p3.as_ref().clone(),
])
.await;
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p0]));
let headers = headers.unwrap();
assert_eq!(headers.capacity(), headers.len());
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p1]));
let headers = headers.unwrap();
assert_eq!(headers.capacity(), headers.len());
let headers = downloader.next().await.unwrap();
assert_eq!(headers, Ok(vec![p2]));
let headers = headers.unwrap();
assert_eq!(headers.capacity(), headers.len());
assert!(downloader.next().await.is_none());
}
}