From 56674ade06fb81247b4ef51fcdae875280550207 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Sat, 1 Jul 2023 08:39:52 +0300 Subject: [PATCH] chore(download): bodies downloader size limit (#3508) --- bin/reth/src/stage/run.rs | 4 +- crates/config/src/config.rs | 9 ++-- crates/net/downloaders/src/bodies/bodies.rs | 53 +++++++++++---------- 3 files changed, 35 insertions(+), 31 deletions(-) diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index 95760bd72..aea66ec0d 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -176,8 +176,8 @@ impl Command { downloader: BodiesDownloaderBuilder::default() .with_stream_batch_size(batch_size as usize) .with_request_limit(config.stages.bodies.downloader_request_limit) - .with_max_buffered_blocks( - config.stages.bodies.downloader_max_buffered_blocks, + .with_max_buffered_blocks_size_bytes( + config.stages.bodies.downloader_max_buffered_blocks_size_bytes, ) .with_concurrent_requests_range( config.stages.bodies.downloader_min_concurrent_requests..= diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index 4593dedd7..246078759 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -151,8 +151,8 @@ pub struct BodiesConfig { /// Maximum amount of received bodies to buffer internally. /// The response contains multiple bodies. /// - /// Default: ~43_000 or 4GB with block size of 100kb - pub downloader_max_buffered_blocks: usize, + /// Default: 4GB + pub downloader_max_buffered_blocks_size_bytes: usize, /// The minimum number of requests to send concurrently. /// /// Default: 5 @@ -169,8 +169,7 @@ impl Default for BodiesConfig { Self { downloader_request_limit: 200, downloader_stream_batch_size: 10_000, - // With high block sizes at around 100kb this will be ~4GB of buffered blocks: ~43k - downloader_max_buffered_blocks: 4 * 1024 * 1024 * 1024 / 100_000, + downloader_max_buffered_blocks_size_bytes: 4 * 1024 * 1024 * 1024, // ~4GB downloader_min_concurrent_requests: 5, downloader_max_concurrent_requests: 100, } @@ -182,7 +181,7 @@ impl From for BodiesDownloaderBuilder { BodiesDownloaderBuilder::default() .with_stream_batch_size(config.downloader_stream_batch_size) .with_request_limit(config.downloader_request_limit) - .with_max_buffered_blocks(config.downloader_max_buffered_blocks) + .with_max_buffered_blocks_size_bytes(config.downloader_max_buffered_blocks_size_bytes) .with_concurrent_requests_range( config.downloader_min_concurrent_requests..= config.downloader_max_concurrent_requests, diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index e2a108ba9..d26523119 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -48,10 +48,10 @@ pub struct BodiesDownloader { stream_batch_size: usize, /// The allowed range for number of concurrent requests. concurrent_requests_range: RangeInclusive, - /// Maximum amount of received blocks to buffer internally. - max_buffered_blocks: usize, - /// Current number of buffered blocks - num_buffered_blocks: usize, + /// Maximum number of bytes of received blocks to buffer internally. + max_buffered_blocks_size_bytes: usize, + /// Current estimated size of buffered blocks in bytes. + buffered_blocks_size_bytes: usize, /// The range of block numbers for body download. download_range: RangeInclusive, /// The latest block number returned. @@ -172,9 +172,9 @@ where max_requests.min(*self.concurrent_requests_range.end()) } - /// Returns true if the number of buffered blocks is lower than the configured maximum + /// Returns true if the size of buffered blocks is lower than the configured maximum fn has_buffer_capacity(&self) -> bool { - self.num_buffered_blocks < self.max_buffered_blocks + self.buffered_blocks_size_bytes < self.max_buffered_blocks_size_bytes } // Check if the stream is terminated @@ -202,7 +202,7 @@ where self.in_progress_queue.clear(); self.queued_bodies = Vec::new(); self.buffered_responses = BinaryHeap::new(); - self.num_buffered_blocks = 0; + self.buffered_blocks_size_bytes = 0; // reset metrics self.metrics.in_flight_requests.set(0.); @@ -223,21 +223,24 @@ where fn pop_buffered_response(&mut self) -> Option { let resp = self.buffered_responses.pop()?; self.metrics.buffered_responses.decrement(1.); - self.num_buffered_blocks -= resp.len(); - self.metrics.buffered_blocks.set(self.num_buffered_blocks as f64); - self.metrics.buffered_blocks_size_bytes.decrement(resp.size() as f64); + self.buffered_blocks_size_bytes -= resp.size(); + self.metrics.buffered_blocks.decrement(resp.len() as f64); + self.metrics.buffered_blocks_size_bytes.set(resp.size() as f64); Some(resp) } /// Adds a new response to the internal buffer fn buffer_bodies_response(&mut self, response: Vec) { - self.num_buffered_blocks += response.len(); - self.metrics.buffered_blocks.set(self.num_buffered_blocks as f64); let size = response.iter().map(|b| b.size()).sum::(); let response = OrderedBodiesResponse { resp: response, size }; + let response_len = response.len(); + + self.buffered_blocks_size_bytes += size; self.buffered_responses.push(response); + + self.metrics.buffered_blocks.increment(response_len as f64); + self.metrics.buffered_blocks_size_bytes.set(self.buffered_blocks_size_bytes as f64); self.metrics.buffered_responses.set(self.buffered_responses.len() as f64); - self.metrics.buffered_blocks_size_bytes.increment(size as f64); } /// Returns a response if it's first block number matches the next expected. @@ -502,8 +505,8 @@ pub struct BodiesDownloaderBuilder { pub request_limit: u64, /// The maximum number of block bodies returned at once from the stream pub stream_batch_size: usize, - /// Maximum amount of received bodies to buffer internally. - pub max_buffered_blocks: usize, + /// Maximum number of bytes of received bodies to buffer internally. + pub max_buffered_blocks_size_bytes: usize, /// The maximum number of requests to send concurrently. pub concurrent_requests_range: RangeInclusive, } @@ -513,8 +516,7 @@ impl Default for BodiesDownloaderBuilder { Self { request_limit: 200, stream_batch_size: 10_000, - // With high block sizes at around 100kb this will be ~4GB of buffered blocks: ~43k - max_buffered_blocks: 4 * 1024 * 1024 * 1024 / 100_000, + max_buffered_blocks_size_bytes: 4 * 1024 * 1024 * 1024, // ~4GB concurrent_requests_range: 5..=100, } } @@ -533,7 +535,7 @@ impl BodiesDownloaderBuilder { self } - /// Set on the downloader. + /// Set concurrent requests range on the downloader. pub fn with_concurrent_requests_range( mut self, concurrent_requests_range: RangeInclusive, @@ -542,9 +544,12 @@ impl BodiesDownloaderBuilder { self } - /// Set on the downloader. - pub fn with_max_buffered_blocks(mut self, max_buffered_responses: usize) -> Self { - self.max_buffered_blocks = max_buffered_responses; + /// Set max buffered block bytes on the downloader. + pub fn with_max_buffered_blocks_size_bytes( + mut self, + max_buffered_blocks_size_bytes: usize, + ) -> Self { + self.max_buffered_blocks_size_bytes = max_buffered_blocks_size_bytes; self } @@ -563,7 +568,7 @@ impl BodiesDownloaderBuilder { request_limit, stream_batch_size, concurrent_requests_range, - max_buffered_blocks: max_buffered_responses, + max_buffered_blocks_size_bytes, } = self; let metrics = BodyDownloaderMetrics::default(); let in_progress_queue = BodiesRequestQueue::new(metrics.clone()); @@ -573,7 +578,7 @@ impl BodiesDownloaderBuilder { db, request_limit, stream_batch_size, - max_buffered_blocks: max_buffered_responses, + max_buffered_blocks_size_bytes, concurrent_requests_range, in_progress_queue, metrics, @@ -581,7 +586,7 @@ impl BodiesDownloaderBuilder { latest_queued_block_number: None, buffered_responses: Default::default(), queued_bodies: Default::default(), - num_buffered_blocks: 0, + buffered_blocks_size_bytes: 0, } } }