feat(metrics): split DownloaderMetrics into body and headers downloaders (#3171)

This commit is contained in:
Thomas Coratger
2023-06-15 20:36:51 +02:00
committed by GitHub
parent 0b876de27e
commit 67593ca749
5 changed files with 85 additions and 36 deletions

View File

@ -1,5 +1,5 @@
use super::queue::BodiesRequestQueue;
use crate::{bodies::task::TaskDownloader, metrics::DownloaderMetrics};
use crate::{bodies::task::TaskDownloader, metrics::BodyDownloaderMetrics};
use futures::Stream;
use futures_util::StreamExt;
use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx};
@ -63,7 +63,7 @@ pub struct BodiesDownloader<B: BodiesClient, DB> {
/// Queued body responses that can be returned for insertion into the database.
queued_bodies: Vec<BlockResponse>,
/// The bodies downloader metrics.
metrics: DownloaderMetrics,
metrics: BodyDownloaderMetrics,
}
impl<B, DB> BodiesDownloader<B, DB>
@ -559,7 +559,7 @@ impl BodiesDownloaderBuilder {
concurrent_requests_range,
max_buffered_blocks: max_buffered_responses,
} = self;
let metrics = DownloaderMetrics::new(BODIES_DOWNLOADER_SCOPE);
let metrics = BodyDownloaderMetrics::default();
let in_progress_queue = BodiesRequestQueue::new(metrics.clone());
BodiesDownloader {
client: Arc::new(client),

View File

@ -1,5 +1,5 @@
use super::request::BodiesRequestFuture;
use crate::metrics::DownloaderMetrics;
use crate::metrics::BodyDownloaderMetrics;
use futures::{stream::FuturesUnordered, Stream};
use futures_util::StreamExt;
use reth_interfaces::{
@ -23,7 +23,7 @@ pub(crate) struct BodiesRequestQueue<B: BodiesClient> {
/// Inner body request queue.
inner: FuturesUnordered<BodiesRequestFuture<B>>,
/// The downloader metrics.
metrics: DownloaderMetrics,
metrics: BodyDownloaderMetrics,
/// Last requested block number.
pub(crate) last_requested_block_number: Option<BlockNumber>,
}
@ -33,7 +33,7 @@ where
B: BodiesClient + 'static,
{
/// Create new instance of request queue.
pub(crate) fn new(metrics: DownloaderMetrics) -> Self {
pub(crate) fn new(metrics: BodyDownloaderMetrics) -> Self {
Self { metrics, inner: Default::default(), last_requested_block_number: None }
}

View File

@ -1,4 +1,4 @@
use crate::metrics::DownloaderMetrics;
use crate::metrics::BodyDownloaderMetrics;
use futures::{Future, FutureExt};
use reth_interfaces::{
consensus::{Consensus as ConsensusTrait, Consensus},
@ -38,7 +38,7 @@ use std::{
pub(crate) struct BodiesRequestFuture<B: BodiesClient> {
client: Arc<B>,
consensus: Arc<dyn Consensus>,
metrics: DownloaderMetrics,
metrics: BodyDownloaderMetrics,
// Headers to download. The collection is shrunk as responses are buffered.
pending_headers: VecDeque<SealedHeader>,
/// Internal buffer for all blocks
@ -56,7 +56,7 @@ where
pub(crate) fn new(
client: Arc<B>,
consensus: Arc<dyn Consensus>,
metrics: DownloaderMetrics,
metrics: BodyDownloaderMetrics,
) -> Self {
Self {
client,
@ -235,7 +235,7 @@ mod tests {
use super::*;
use crate::{
bodies::test_utils::zip_blocks,
test_utils::{generate_bodies, TestBodiesClient, TEST_SCOPE},
test_utils::{generate_bodies, TestBodiesClient},
};
use reth_interfaces::{
p2p::bodies::response::BlockResponse,
@ -253,7 +253,7 @@ mod tests {
let fut = BodiesRequestFuture::new(
client.clone(),
Arc::new(TestConsensus::default()),
DownloaderMetrics::new(TEST_SCOPE),
BodyDownloaderMetrics::default(),
)
.with_headers(headers.clone());
@ -277,7 +277,7 @@ mod tests {
let fut = BodiesRequestFuture::new(
client.clone(),
Arc::new(TestConsensus::default()),
DownloaderMetrics::new(TEST_SCOPE),
BodyDownloaderMetrics::default(),
)
.with_headers(headers.clone());

View File

@ -1,7 +1,7 @@
//! A headers downloader that can handle multiple requests concurrently.
use super::task::TaskDownloader;
use crate::metrics::DownloaderMetrics;
use crate::metrics::HeaderDownloaderMetrics;
use futures::{stream::Stream, FutureExt};
use futures_util::{stream::FuturesUnordered, StreamExt};
use rayon::prelude::*;
@ -37,9 +37,6 @@ use tracing::{error, trace};
/// downloader is yielding a next batch of headers that is being committed to the database.
const REQUESTS_PER_PEER_MULTIPLIER: usize = 5;
/// The scope for headers downloader metrics.
pub const HEADERS_DOWNLOADER_SCOPE: &str = "downloaders.headers";
/// Wrapper for internal downloader errors.
#[allow(clippy::large_enum_variant)]
#[derive(Error, Debug)]
@ -100,7 +97,7 @@ pub struct ReverseHeadersDownloader<H: HeadersClient> {
/// Note: headers are sorted from high to low
queued_validated_headers: Vec<SealedHeader>,
/// Header downloader metrics.
metrics: DownloaderMetrics,
metrics: HeaderDownloaderMetrics,
}
// === impl ReverseHeadersDownloader ===
@ -1163,7 +1160,7 @@ impl ReverseHeadersDownloaderBuilder {
in_progress_queue: Default::default(),
buffered_responses: Default::default(),
queued_validated_headers: Default::default(),
metrics: DownloaderMetrics::new(HEADERS_DOWNLOADER_SCOPE),
metrics: Default::default(),
}
}
}

View File

@ -4,22 +4,21 @@ use reth_metrics::{
Metrics,
};
/// Common downloader metrics.
/// Common body downloader metrics.
///
/// These metrics will be dynamically initialized with the provided scope
/// by corresponding downloaders.
/// These metrics will be initialized with the `downloaders.bodies` scope.
/// ```
/// use reth_downloaders::metrics::DownloaderMetrics;
/// use reth_downloaders::metrics::BodyDownloaderMetrics;
/// use reth_interfaces::p2p::error::DownloadError;
///
/// // Initialize metrics.
/// let metrics = DownloaderMetrics::new("downloaders.headers");
/// // Increment `downloaders.headers.timeout_errors` counter by 1.
/// let metrics = BodyDownloaderMetrics::default();
/// // Increment `downloaders.bodies.timeout_errors` counter by 1.
/// metrics.increment_errors(&DownloadError::Timeout);
/// ```
#[derive(Clone, Metrics)]
#[metrics(dynamic = true)]
pub struct DownloaderMetrics {
#[metrics(scope = "downloaders.bodies")]
pub struct BodyDownloaderMetrics {
/// The number of items that were successfully sent to the poller (stage)
pub total_flushed: Counter,
/// Number of items that were successfully downloaded
@ -39,13 +38,10 @@ pub struct DownloaderMetrics {
/// The number blocks that are contiguous and are queued for insertion into the db.
pub queued_blocks: Gauge,
/// The number of out-of-order requests sent by the downloader.
/// The consumer of the download stream is able to re-request data (headers or bodies) in case
/// The consumer of the download stream is able to re-request data (bodies) in case
/// it encountered a recoverable error (e.g. during insertion).
/// Out-of-order request happen when:
/// - the headers downloader `SyncTarget::Tip` hash is different from the previous sync
/// target hash.
/// - the new download range start for bodies donwloader is less than the last block number
/// returned from the stream.
/// Out-of-order request happen when the new download range start for bodies downloader
/// is less than the last block number returned from the stream.
pub out_of_order_requests: Counter,
/// Number of timeout errors while requesting items
pub timeout_errors: Counter,
@ -55,14 +51,70 @@ pub struct DownloaderMetrics {
pub unexpected_errors: Counter,
}
impl DownloaderMetrics {
impl BodyDownloaderMetrics {
/// Increment errors counter.
pub fn increment_errors(&self, error: &DownloadError) {
match error {
DownloadError::Timeout => self.timeout_errors.increment(1),
DownloadError::HeaderValidation { .. } | DownloadError::BodyValidation { .. } => {
self.validation_errors.increment(1)
}
DownloadError::BodyValidation { .. } => self.validation_errors.increment(1),
_error => self.unexpected_errors.increment(1),
}
}
}
/// Common header downloader metrics.
///
/// These metrics will be initialized with the `downloaders.headers` scope.
/// ```
/// use reth_downloaders::metrics::HeaderDownloaderMetrics;
/// use reth_interfaces::p2p::error::DownloadError;
///
/// // Initialize metrics.
/// let metrics = HeaderDownloaderMetrics::default();
/// // Increment `downloaders.headers.timeout_errors` counter by 1.
/// metrics.increment_errors(&DownloadError::Timeout);
/// ```
#[derive(Clone, Metrics)]
#[metrics(scope = "downloaders.headers")]
pub struct HeaderDownloaderMetrics {
/// The number of items that were successfully sent to the poller (stage)
pub total_flushed: Counter,
/// Number of items that were successfully downloaded
pub total_downloaded: Counter,
/// The number of requests (can contain more than 1 item) currently in-flight.
pub in_flight_requests: Gauge,
/// The number of responses (can contain more than 1 item) in the internal buffer of the
/// downloader.
pub buffered_responses: Gauge,
/// The number of blocks the internal buffer of the
/// downloader.
/// These are bodies that have been received, but not cannot be committed yet because they're
/// not contiguous
pub buffered_blocks: Gauge,
/// Total amount of memory used by the buffered blocks in bytes
pub buffered_blocks_size_bytes: Gauge,
/// The number blocks that are contiguous and are queued for insertion into the db.
pub queued_blocks: Gauge,
/// The number of out-of-order requests sent by the downloader.
/// The consumer of the download stream is able to re-request data (headers) in case
/// it encountered a recoverable error (e.g. during insertion).
/// Out-of-order request happen when the headers downloader `SyncTarget::Tip`
/// hash is different from the previous sync target hash.
pub out_of_order_requests: Counter,
/// Number of timeout errors while requesting items
pub timeout_errors: Counter,
/// Number of validation errors while requesting items
pub validation_errors: Counter,
/// Number of unexpected errors while requesting items
pub unexpected_errors: Counter,
}
impl HeaderDownloaderMetrics {
/// Increment errors counter.
pub fn increment_errors(&self, error: &DownloadError) {
match error {
DownloadError::Timeout => self.timeout_errors.increment(1),
DownloadError::HeaderValidation { .. } => self.validation_errors.increment(1),
_error => self.unexpected_errors.increment(1),
}
}