chore(download): bodies downloader size limit (#3508)

This commit is contained in:
Roman Krasiuk
2023-07-01 08:39:52 +03:00
committed by GitHub
parent 7cb4a71cf6
commit 56674ade06
3 changed files with 35 additions and 31 deletions

View File

@ -176,8 +176,8 @@ impl Command {
downloader: BodiesDownloaderBuilder::default()
.with_stream_batch_size(batch_size as usize)
.with_request_limit(config.stages.bodies.downloader_request_limit)
.with_max_buffered_blocks(
config.stages.bodies.downloader_max_buffered_blocks,
.with_max_buffered_blocks_size_bytes(
config.stages.bodies.downloader_max_buffered_blocks_size_bytes,
)
.with_concurrent_requests_range(
config.stages.bodies.downloader_min_concurrent_requests..=

View File

@ -151,8 +151,8 @@ pub struct BodiesConfig {
/// Maximum amount of received bodies to buffer internally.
/// The response contains multiple bodies.
///
/// Default: ~43_000 or 4GB with block size of 100kb
pub downloader_max_buffered_blocks: usize,
/// Default: 4GB
pub downloader_max_buffered_blocks_size_bytes: usize,
/// The minimum number of requests to send concurrently.
///
/// Default: 5
@ -169,8 +169,7 @@ impl Default for BodiesConfig {
Self {
downloader_request_limit: 200,
downloader_stream_batch_size: 10_000,
// With high block sizes at around 100kb this will be ~4GB of buffered blocks: ~43k
downloader_max_buffered_blocks: 4 * 1024 * 1024 * 1024 / 100_000,
downloader_max_buffered_blocks_size_bytes: 4 * 1024 * 1024 * 1024, // ~4GB
downloader_min_concurrent_requests: 5,
downloader_max_concurrent_requests: 100,
}
@ -182,7 +181,7 @@ impl From<BodiesConfig> for BodiesDownloaderBuilder {
BodiesDownloaderBuilder::default()
.with_stream_batch_size(config.downloader_stream_batch_size)
.with_request_limit(config.downloader_request_limit)
.with_max_buffered_blocks(config.downloader_max_buffered_blocks)
.with_max_buffered_blocks_size_bytes(config.downloader_max_buffered_blocks_size_bytes)
.with_concurrent_requests_range(
config.downloader_min_concurrent_requests..=
config.downloader_max_concurrent_requests,

View File

@ -48,10 +48,10 @@ pub struct BodiesDownloader<B: BodiesClient, DB> {
stream_batch_size: usize,
/// The allowed range for number of concurrent requests.
concurrent_requests_range: RangeInclusive<usize>,
/// Maximum amount of received blocks to buffer internally.
max_buffered_blocks: usize,
/// Current number of buffered blocks
num_buffered_blocks: usize,
/// Maximum number of bytes of received blocks to buffer internally.
max_buffered_blocks_size_bytes: usize,
/// Current estimated size of buffered blocks in bytes.
buffered_blocks_size_bytes: usize,
/// The range of block numbers for body download.
download_range: RangeInclusive<BlockNumber>,
/// The latest block number returned.
@ -172,9 +172,9 @@ where
max_requests.min(*self.concurrent_requests_range.end())
}
/// Returns true if the number of buffered blocks is lower than the configured maximum
/// Returns true if the size of buffered blocks is lower than the configured maximum
fn has_buffer_capacity(&self) -> bool {
self.num_buffered_blocks < self.max_buffered_blocks
self.buffered_blocks_size_bytes < self.max_buffered_blocks_size_bytes
}
// Check if the stream is terminated
@ -202,7 +202,7 @@ where
self.in_progress_queue.clear();
self.queued_bodies = Vec::new();
self.buffered_responses = BinaryHeap::new();
self.num_buffered_blocks = 0;
self.buffered_blocks_size_bytes = 0;
// reset metrics
self.metrics.in_flight_requests.set(0.);
@ -223,21 +223,24 @@ where
fn pop_buffered_response(&mut self) -> Option<OrderedBodiesResponse> {
let resp = self.buffered_responses.pop()?;
self.metrics.buffered_responses.decrement(1.);
self.num_buffered_blocks -= resp.len();
self.metrics.buffered_blocks.set(self.num_buffered_blocks as f64);
self.metrics.buffered_blocks_size_bytes.decrement(resp.size() as f64);
self.buffered_blocks_size_bytes -= resp.size();
self.metrics.buffered_blocks.decrement(resp.len() as f64);
self.metrics.buffered_blocks_size_bytes.set(resp.size() as f64);
Some(resp)
}
/// Adds a new response to the internal buffer
fn buffer_bodies_response(&mut self, response: Vec<BlockResponse>) {
self.num_buffered_blocks += response.len();
self.metrics.buffered_blocks.set(self.num_buffered_blocks as f64);
let size = response.iter().map(|b| b.size()).sum::<usize>();
let response = OrderedBodiesResponse { resp: response, size };
let response_len = response.len();
self.buffered_blocks_size_bytes += size;
self.buffered_responses.push(response);
self.metrics.buffered_blocks.increment(response_len as f64);
self.metrics.buffered_blocks_size_bytes.set(self.buffered_blocks_size_bytes as f64);
self.metrics.buffered_responses.set(self.buffered_responses.len() as f64);
self.metrics.buffered_blocks_size_bytes.increment(size as f64);
}
/// Returns a response if it's first block number matches the next expected.
@ -502,8 +505,8 @@ pub struct BodiesDownloaderBuilder {
pub request_limit: u64,
/// The maximum number of block bodies returned at once from the stream
pub stream_batch_size: usize,
/// Maximum amount of received bodies to buffer internally.
pub max_buffered_blocks: usize,
/// Maximum number of bytes of received bodies to buffer internally.
pub max_buffered_blocks_size_bytes: usize,
/// The maximum number of requests to send concurrently.
pub concurrent_requests_range: RangeInclusive<usize>,
}
@ -513,8 +516,7 @@ impl Default for BodiesDownloaderBuilder {
Self {
request_limit: 200,
stream_batch_size: 10_000,
// With high block sizes at around 100kb this will be ~4GB of buffered blocks: ~43k
max_buffered_blocks: 4 * 1024 * 1024 * 1024 / 100_000,
max_buffered_blocks_size_bytes: 4 * 1024 * 1024 * 1024, // ~4GB
concurrent_requests_range: 5..=100,
}
}
@ -533,7 +535,7 @@ impl BodiesDownloaderBuilder {
self
}
/// Set on the downloader.
/// Set concurrent requests range on the downloader.
pub fn with_concurrent_requests_range(
mut self,
concurrent_requests_range: RangeInclusive<usize>,
@ -542,9 +544,12 @@ impl BodiesDownloaderBuilder {
self
}
/// Set on the downloader.
pub fn with_max_buffered_blocks(mut self, max_buffered_responses: usize) -> Self {
self.max_buffered_blocks = max_buffered_responses;
/// Set max buffered block bytes on the downloader.
pub fn with_max_buffered_blocks_size_bytes(
mut self,
max_buffered_blocks_size_bytes: usize,
) -> Self {
self.max_buffered_blocks_size_bytes = max_buffered_blocks_size_bytes;
self
}
@ -563,7 +568,7 @@ impl BodiesDownloaderBuilder {
request_limit,
stream_batch_size,
concurrent_requests_range,
max_buffered_blocks: max_buffered_responses,
max_buffered_blocks_size_bytes,
} = self;
let metrics = BodyDownloaderMetrics::default();
let in_progress_queue = BodiesRequestQueue::new(metrics.clone());
@ -573,7 +578,7 @@ impl BodiesDownloaderBuilder {
db,
request_limit,
stream_batch_size,
max_buffered_blocks: max_buffered_responses,
max_buffered_blocks_size_bytes,
concurrent_requests_range,
in_progress_queue,
metrics,
@ -581,7 +586,7 @@ impl BodiesDownloaderBuilder {
latest_queued_block_number: None,
buffered_responses: Default::default(),
queued_bodies: Default::default(),
num_buffered_blocks: 0,
buffered_blocks_size_bytes: 0,
}
}
}