Task Executor Metrics (#2580)

Co-authored-by: Aditya Pandey <aditya.p@Aditya-P.local>
This commit is contained in:
Aditya Pandey
2023-05-05 23:18:45 +05:30
committed by GitHub
parent e6107a1bce
commit ded0896df5
4 changed files with 40 additions and 1 deletions

2
Cargo.lock generated
View File

@ -5455,6 +5455,8 @@ version = "0.1.0"
dependencies = [ dependencies = [
"dyn-clone", "dyn-clone",
"futures-util", "futures-util",
"metrics",
"reth-metrics-derive",
"thiserror", "thiserror",
"tokio", "tokio",
"tracing", "tracing",

View File

@ -19,5 +19,9 @@ tracing = { version = "0.1", default-features = false }
thiserror = "1.0" thiserror = "1.0"
dyn-clone = "1.0" dyn-clone = "1.0"
## rpc/metrics
metrics = "0.20.1"
reth-metrics-derive = { path = "../metrics/metrics-derive" }
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread", "time", "macros"] } tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread", "time", "macros"] }

View File

@ -7,7 +7,10 @@
//! reth task management //! reth task management
use crate::shutdown::{signal, Shutdown, Signal}; use crate::{
metrics::TaskExecutorMetrics,
shutdown::{signal, Shutdown, Signal},
};
use dyn_clone::DynClone; use dyn_clone::DynClone;
use futures_util::{ use futures_util::{
future::{select, BoxFuture}, future::{select, BoxFuture},
@ -25,6 +28,7 @@ use tokio::{
use tracing::error; use tracing::error;
use tracing_futures::Instrument; use tracing_futures::Instrument;
pub mod metrics;
pub mod shutdown; pub mod shutdown;
/// A type that can spawn tasks. /// A type that can spawn tasks.
@ -160,6 +164,7 @@ impl TaskManager {
handle: self.handle.clone(), handle: self.handle.clone(),
on_shutdown: self.on_shutdown.clone(), on_shutdown: self.on_shutdown.clone(),
panicked_tasks_tx: self.panicked_tasks_tx.clone(), panicked_tasks_tx: self.panicked_tasks_tx.clone(),
metrics: Default::default(),
} }
} }
} }
@ -192,6 +197,8 @@ pub struct TaskExecutor {
on_shutdown: Shutdown, on_shutdown: Shutdown,
/// Sender half for sending panic signals to this type /// Sender half for sending panic signals to this type
panicked_tasks_tx: UnboundedSender<&'static str>, panicked_tasks_tx: UnboundedSender<&'static str>,
// Task Executor Metrics
metrics: TaskExecutorMetrics,
} }
// === impl TaskExecutor === // === impl TaskExecutor ===
@ -358,10 +365,12 @@ impl TaskExecutor {
impl TaskSpawner for TaskExecutor { impl TaskSpawner for TaskExecutor {
fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
self.metrics.inc_regular_task();
self.spawn(fut) self.spawn(fut)
} }
fn spawn_critical(&self, name: &'static str, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { fn spawn_critical(&self, name: &'static str, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
self.metrics.inc_critical_tasks();
TaskExecutor::spawn_critical(self, name, fut) TaskExecutor::spawn_critical(self, name, fut)
} }

View File

@ -0,0 +1,24 @@
//! Task Executor Metrics
use metrics::Counter;
use reth_metrics_derive::Metrics;
/// Task Executor Metrics
#[derive(Metrics, Clone)]
#[metrics(scope = "executor.spawn")]
pub struct TaskExecutorMetrics {
/// Number of spawned critical tasks
pub(crate) critical_tasks: Counter,
/// Number of spawned regular tasks
pub(crate) regular_tasks: Counter,
}
impl TaskExecutorMetrics {
pub(crate) fn inc_critical_tasks(&self) {
self.critical_tasks.increment(1);
}
pub(crate) fn inc_regular_task(&self) {
self.regular_tasks.increment(1);
}
}