Fix body stage insufficient backpressure (#7350)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
This commit is contained in:
jn
2024-03-29 11:16:32 -07:00
committed by GitHub
parent be16072728
commit b1026e0e23
3 changed files with 38 additions and 16 deletions

View File

@ -97,7 +97,7 @@ where
max_non_empty: u64,
) -> DownloadResult<Option<Vec<SealedHeader>>> {
if range.is_empty() || max_non_empty == 0 {
return Ok(None)
return Ok(None);
}
// Collect headers while
@ -146,7 +146,7 @@ where
// if we're only connected to a few peers, we keep it low
if num_peers < *self.concurrent_requests_range.start() {
return max_requests
return max_requests;
}
max_requests.min(*self.concurrent_requests_range.end())
@ -240,7 +240,7 @@ where
.skip_while(|b| b.block_number() < expected)
.take_while(|b| self.download_range.contains(&b.block_number()))
.collect()
})
});
}
// Drop buffered response since we passed that range
@ -259,10 +259,23 @@ where
self.queued_bodies.shrink_to_fit();
self.metrics.total_flushed.increment(next_batch.len() as u64);
self.metrics.queued_blocks.set(self.queued_bodies.len() as f64);
return Some(next_batch)
return Some(next_batch);
}
None
}
/// Check if a new request can be submitted, it implements back pressure to prevent overwhelming
/// the system and causing memory overload.
///
/// Returns true if a new request can be submitted
fn can_submit_new_request(&self) -> bool {
// requests are issued in order but not necessarily finished in order, so the queued bodies
// can grow large if a certain request is slow, so we limit the followup requests if the
// queued bodies grew too large
self.queued_bodies.len() < 4 * self.stream_batch_size &&
self.has_buffer_capacity() &&
self.in_progress_queue.len() < self.concurrent_request_limit()
}
}
impl<B, Provider> BodiesDownloader<B, Provider>
@ -343,13 +356,13 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.is_terminated() {
return Poll::Ready(None)
return Poll::Ready(None);
}
// Submit new requests and poll any in progress
loop {
// Yield next batch if ready
if let Some(next_batch) = this.try_split_next_batch() {
return Poll::Ready(Some(Ok(next_batch)))
return Poll::Ready(Some(Ok(next_batch)));
}
// Poll requests
@ -362,7 +375,7 @@ where
Err(error) => {
tracing::debug!(target: "downloaders::bodies", %error, "Request failed");
this.clear();
return Poll::Ready(Some(Err(error)))
return Poll::Ready(Some(Err(error)));
}
};
}
@ -370,10 +383,7 @@ where
// Loop exit condition
let mut new_request_submitted = false;
// Submit new requests
let concurrent_requests_limit = this.concurrent_request_limit();
'inner: while this.in_progress_queue.len() < concurrent_requests_limit &&
this.has_buffer_capacity()
{
'inner: while this.can_submit_new_request() {
match this.next_headers_request() {
Ok(Some(request)) => {
this.metrics.in_flight_requests.increment(1.);
@ -388,7 +398,7 @@ where
Err(error) => {
tracing::error!(target: "downloaders::bodies", %error, "Failed to download from next request");
this.clear();
return Poll::Ready(Some(Err(error)))
return Poll::Ready(Some(Err(error)));
}
};
}
@ -401,21 +411,21 @@ where
this.buffered_responses.shrink_to_fit();
if !new_request_submitted {
break
break;
}
}
// All requests are handled, stream is finished
if this.in_progress_queue.is_empty() {
if this.queued_bodies.is_empty() {
return Poll::Ready(None)
return Poll::Ready(None);
}
let batch_size = this.stream_batch_size.min(this.queued_bodies.len());
let next_batch = this.queued_bodies.drain(..batch_size).collect::<Vec<_>>();
this.queued_bodies.shrink_to_fit();
this.metrics.total_flushed.increment(next_batch.len() as u64);
this.metrics.queued_blocks.set(this.queued_bodies.len() as f64);
return Poll::Ready(Some(Ok(next_batch)))
return Poll::Ready(Some(Ok(next_batch)));
}
Poll::Pending

View File

@ -69,12 +69,14 @@ rand.workspace = true
paste.workspace = true
# Stage benchmarks
pprof = { workspace = true, features = ["flamegraph", "frame-pointer", "criterion"] }
criterion = { workspace = true, features = ["async_futures"] }
# io
serde_json.workspace = true
[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
pprof = { workspace = true, features = ["flamegraph", "frame-pointer", "criterion"] }
[features]
test-utils = ["reth-interfaces/test-utils", "reth-db/test-utils", "reth-provider/test-utils"]

View File

@ -3,6 +3,7 @@ use criterion::{
async_executor::FuturesExecutor, criterion_group, criterion_main, measurement::WallTime,
BenchmarkGroup, Criterion,
};
#[cfg(not(target_os = "windows"))]
use pprof::criterion::{Output, PProfProfiler};
use reth_config::config::EtlConfig;
use reth_db::{test_utils::TempDatabase, DatabaseEnv};
@ -18,11 +19,20 @@ use std::{ops::RangeInclusive, sync::Arc};
mod setup;
use setup::StageRange;
#[cfg(not(target_os = "windows"))]
criterion_group! {
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(1000, Output::Flamegraph(None)));
targets = transaction_lookup, account_hashing, senders, merkle
}
#[cfg(target_os = "windows")]
criterion_group! {
name = benches;
config = Criterion::default();
targets = transaction_lookup, account_hashing, senders, merkle
}
criterion_main!(benches);
const DEFAULT_NUM_BLOCKS: u64 = 10_000;