From 67593ca74924aceff946b5eccfccdf2edbda8ec9 Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Thu, 15 Jun 2023 20:36:51 +0200 Subject: [PATCH] feat(metrics): split `DownloaderMetrics` into body and headers downloaders (#3171) --- crates/net/downloaders/src/bodies/bodies.rs | 6 +- crates/net/downloaders/src/bodies/queue.rs | 6 +- crates/net/downloaders/src/bodies/request.rs | 12 +-- .../src/headers/reverse_headers.rs | 9 +- crates/net/downloaders/src/metrics.rs | 88 +++++++++++++++---- 5 files changed, 85 insertions(+), 36 deletions(-) diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index 808ad130c..91d0fb8e7 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -1,5 +1,5 @@ use super::queue::BodiesRequestQueue; -use crate::{bodies::task::TaskDownloader, metrics::DownloaderMetrics}; +use crate::{bodies::task::TaskDownloader, metrics::BodyDownloaderMetrics}; use futures::Stream; use futures_util::StreamExt; use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; @@ -63,7 +63,7 @@ pub struct BodiesDownloader { /// Queued body responses that can be returned for insertion into the database. queued_bodies: Vec, /// The bodies downloader metrics. - metrics: DownloaderMetrics, + metrics: BodyDownloaderMetrics, } impl BodiesDownloader @@ -559,7 +559,7 @@ impl BodiesDownloaderBuilder { concurrent_requests_range, max_buffered_blocks: max_buffered_responses, } = self; - let metrics = DownloaderMetrics::new(BODIES_DOWNLOADER_SCOPE); + let metrics = BodyDownloaderMetrics::default(); let in_progress_queue = BodiesRequestQueue::new(metrics.clone()); BodiesDownloader { client: Arc::new(client), diff --git a/crates/net/downloaders/src/bodies/queue.rs b/crates/net/downloaders/src/bodies/queue.rs index 2483eb25f..0fc9635df 100644 --- a/crates/net/downloaders/src/bodies/queue.rs +++ b/crates/net/downloaders/src/bodies/queue.rs @@ -1,5 +1,5 @@ use super::request::BodiesRequestFuture; -use crate::metrics::DownloaderMetrics; +use crate::metrics::BodyDownloaderMetrics; use futures::{stream::FuturesUnordered, Stream}; use futures_util::StreamExt; use reth_interfaces::{ @@ -23,7 +23,7 @@ pub(crate) struct BodiesRequestQueue { /// Inner body request queue. inner: FuturesUnordered>, /// The downloader metrics. - metrics: DownloaderMetrics, + metrics: BodyDownloaderMetrics, /// Last requested block number. pub(crate) last_requested_block_number: Option, } @@ -33,7 +33,7 @@ where B: BodiesClient + 'static, { /// Create new instance of request queue. - pub(crate) fn new(metrics: DownloaderMetrics) -> Self { + pub(crate) fn new(metrics: BodyDownloaderMetrics) -> Self { Self { metrics, inner: Default::default(), last_requested_block_number: None } } diff --git a/crates/net/downloaders/src/bodies/request.rs b/crates/net/downloaders/src/bodies/request.rs index e4f883bbc..e73836a43 100644 --- a/crates/net/downloaders/src/bodies/request.rs +++ b/crates/net/downloaders/src/bodies/request.rs @@ -1,4 +1,4 @@ -use crate::metrics::DownloaderMetrics; +use crate::metrics::BodyDownloaderMetrics; use futures::{Future, FutureExt}; use reth_interfaces::{ consensus::{Consensus as ConsensusTrait, Consensus}, @@ -38,7 +38,7 @@ use std::{ pub(crate) struct BodiesRequestFuture { client: Arc, consensus: Arc, - metrics: DownloaderMetrics, + metrics: BodyDownloaderMetrics, // Headers to download. The collection is shrunk as responses are buffered. pending_headers: VecDeque, /// Internal buffer for all blocks @@ -56,7 +56,7 @@ where pub(crate) fn new( client: Arc, consensus: Arc, - metrics: DownloaderMetrics, + metrics: BodyDownloaderMetrics, ) -> Self { Self { client, @@ -235,7 +235,7 @@ mod tests { use super::*; use crate::{ bodies::test_utils::zip_blocks, - test_utils::{generate_bodies, TestBodiesClient, TEST_SCOPE}, + test_utils::{generate_bodies, TestBodiesClient}, }; use reth_interfaces::{ p2p::bodies::response::BlockResponse, @@ -253,7 +253,7 @@ mod tests { let fut = BodiesRequestFuture::new( client.clone(), Arc::new(TestConsensus::default()), - DownloaderMetrics::new(TEST_SCOPE), + BodyDownloaderMetrics::default(), ) .with_headers(headers.clone()); @@ -277,7 +277,7 @@ mod tests { let fut = BodiesRequestFuture::new( client.clone(), Arc::new(TestConsensus::default()), - DownloaderMetrics::new(TEST_SCOPE), + BodyDownloaderMetrics::default(), ) .with_headers(headers.clone()); diff --git a/crates/net/downloaders/src/headers/reverse_headers.rs b/crates/net/downloaders/src/headers/reverse_headers.rs index 6ed78ccc6..9a68c9771 100644 --- a/crates/net/downloaders/src/headers/reverse_headers.rs +++ b/crates/net/downloaders/src/headers/reverse_headers.rs @@ -1,7 +1,7 @@ //! A headers downloader that can handle multiple requests concurrently. use super::task::TaskDownloader; -use crate::metrics::DownloaderMetrics; +use crate::metrics::HeaderDownloaderMetrics; use futures::{stream::Stream, FutureExt}; use futures_util::{stream::FuturesUnordered, StreamExt}; use rayon::prelude::*; @@ -37,9 +37,6 @@ use tracing::{error, trace}; /// downloader is yielding a next batch of headers that is being committed to the database. const REQUESTS_PER_PEER_MULTIPLIER: usize = 5; -/// The scope for headers downloader metrics. -pub const HEADERS_DOWNLOADER_SCOPE: &str = "downloaders.headers"; - /// Wrapper for internal downloader errors. #[allow(clippy::large_enum_variant)] #[derive(Error, Debug)] @@ -100,7 +97,7 @@ pub struct ReverseHeadersDownloader { /// Note: headers are sorted from high to low queued_validated_headers: Vec, /// Header downloader metrics. - metrics: DownloaderMetrics, + metrics: HeaderDownloaderMetrics, } // === impl ReverseHeadersDownloader === @@ -1163,7 +1160,7 @@ impl ReverseHeadersDownloaderBuilder { in_progress_queue: Default::default(), buffered_responses: Default::default(), queued_validated_headers: Default::default(), - metrics: DownloaderMetrics::new(HEADERS_DOWNLOADER_SCOPE), + metrics: Default::default(), } } } diff --git a/crates/net/downloaders/src/metrics.rs b/crates/net/downloaders/src/metrics.rs index 59476ca0c..38fc642eb 100644 --- a/crates/net/downloaders/src/metrics.rs +++ b/crates/net/downloaders/src/metrics.rs @@ -4,22 +4,21 @@ use reth_metrics::{ Metrics, }; -/// Common downloader metrics. +/// Common body downloader metrics. /// -/// These metrics will be dynamically initialized with the provided scope -/// by corresponding downloaders. +/// These metrics will be initialized with the `downloaders.bodies` scope. /// ``` -/// use reth_downloaders::metrics::DownloaderMetrics; +/// use reth_downloaders::metrics::BodyDownloaderMetrics; /// use reth_interfaces::p2p::error::DownloadError; /// /// // Initialize metrics. -/// let metrics = DownloaderMetrics::new("downloaders.headers"); -/// // Increment `downloaders.headers.timeout_errors` counter by 1. +/// let metrics = BodyDownloaderMetrics::default(); +/// // Increment `downloaders.bodies.timeout_errors` counter by 1. /// metrics.increment_errors(&DownloadError::Timeout); /// ``` #[derive(Clone, Metrics)] -#[metrics(dynamic = true)] -pub struct DownloaderMetrics { +#[metrics(scope = "downloaders.bodies")] +pub struct BodyDownloaderMetrics { /// The number of items that were successfully sent to the poller (stage) pub total_flushed: Counter, /// Number of items that were successfully downloaded @@ -39,13 +38,10 @@ pub struct DownloaderMetrics { /// The number blocks that are contiguous and are queued for insertion into the db. pub queued_blocks: Gauge, /// The number of out-of-order requests sent by the downloader. - /// The consumer of the download stream is able to re-request data (headers or bodies) in case + /// The consumer of the download stream is able to re-request data (bodies) in case /// it encountered a recoverable error (e.g. during insertion). - /// Out-of-order request happen when: - /// - the headers downloader `SyncTarget::Tip` hash is different from the previous sync - /// target hash. - /// - the new download range start for bodies donwloader is less than the last block number - /// returned from the stream. + /// Out-of-order request happen when the new download range start for bodies downloader + /// is less than the last block number returned from the stream. pub out_of_order_requests: Counter, /// Number of timeout errors while requesting items pub timeout_errors: Counter, @@ -55,14 +51,70 @@ pub struct DownloaderMetrics { pub unexpected_errors: Counter, } -impl DownloaderMetrics { +impl BodyDownloaderMetrics { /// Increment errors counter. pub fn increment_errors(&self, error: &DownloadError) { match error { DownloadError::Timeout => self.timeout_errors.increment(1), - DownloadError::HeaderValidation { .. } | DownloadError::BodyValidation { .. } => { - self.validation_errors.increment(1) - } + DownloadError::BodyValidation { .. } => self.validation_errors.increment(1), + _error => self.unexpected_errors.increment(1), + } + } +} + +/// Common header downloader metrics. +/// +/// These metrics will be initialized with the `downloaders.headers` scope. +/// ``` +/// use reth_downloaders::metrics::HeaderDownloaderMetrics; +/// use reth_interfaces::p2p::error::DownloadError; +/// +/// // Initialize metrics. +/// let metrics = HeaderDownloaderMetrics::default(); +/// // Increment `downloaders.headers.timeout_errors` counter by 1. +/// metrics.increment_errors(&DownloadError::Timeout); +/// ``` +#[derive(Clone, Metrics)] +#[metrics(scope = "downloaders.headers")] +pub struct HeaderDownloaderMetrics { + /// The number of items that were successfully sent to the poller (stage) + pub total_flushed: Counter, + /// Number of items that were successfully downloaded + pub total_downloaded: Counter, + /// The number of requests (can contain more than 1 item) currently in-flight. + pub in_flight_requests: Gauge, + /// The number of responses (can contain more than 1 item) in the internal buffer of the + /// downloader. + pub buffered_responses: Gauge, + /// The number of blocks the internal buffer of the + /// downloader. + /// These are bodies that have been received, but not cannot be committed yet because they're + /// not contiguous + pub buffered_blocks: Gauge, + /// Total amount of memory used by the buffered blocks in bytes + pub buffered_blocks_size_bytes: Gauge, + /// The number blocks that are contiguous and are queued for insertion into the db. + pub queued_blocks: Gauge, + /// The number of out-of-order requests sent by the downloader. + /// The consumer of the download stream is able to re-request data (headers) in case + /// it encountered a recoverable error (e.g. during insertion). + /// Out-of-order request happen when the headers downloader `SyncTarget::Tip` + /// hash is different from the previous sync target hash. + pub out_of_order_requests: Counter, + /// Number of timeout errors while requesting items + pub timeout_errors: Counter, + /// Number of validation errors while requesting items + pub validation_errors: Counter, + /// Number of unexpected errors while requesting items + pub unexpected_errors: Counter, +} + +impl HeaderDownloaderMetrics { + /// Increment errors counter. + pub fn increment_errors(&self, error: &DownloadError) { + match error { + DownloadError::Timeout => self.timeout_errors.increment(1), + DownloadError::HeaderValidation { .. } => self.validation_errors.increment(1), _error => self.unexpected_errors.increment(1), } }