add metrics counter for finished spawned tasks (#4481)

This commit is contained in:
Alessandro Mazza
2023-09-06 14:38:15 +02:00
committed by GitHub
parent 82f0fe1a5e
commit 6299c26b56
2 changed files with 37 additions and 6 deletions

View File

@ -22,6 +22,7 @@ use futures_util::{
future::{select, BoxFuture},
pin_mut, Future, FutureExt, TryFutureExt,
};
use metrics::IncCounterOnDrop;
use std::{
any::Any,
fmt::{Display, Formatter},
@ -271,9 +272,16 @@ impl TaskExecutor {
{
let on_shutdown = self.on_shutdown.clone();
let task = async move {
pin_mut!(fut);
let _ = select(on_shutdown, fut).await;
// Clone only the specific counter that we need.
let finished_regular_tasks_metrics = self.metrics.finished_regular_tasks.clone();
// Wrap the original future to increment the finished tasks counter upon completion
let task = {
async move {
// Create an instance of IncCounterOnDrop with the counter to increment
let _inc_counter_on_drop = IncCounterOnDrop::new(finished_regular_tasks_metrics);
pin_mut!(fut);
let _ = select(on_shutdown, fut).await;
}
}
.in_current_span();
@ -341,7 +349,11 @@ impl TaskExecutor {
})
.in_current_span();
// Clone only the specific counter that we need.
let finished_critical_tasks_metrics = self.metrics.finished_critical_tasks.clone();
let task = async move {
// Create an instance of IncCounterOnDrop with the counter to increment
let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_metrics);
pin_mut!(task);
let _ = select(on_shutdown, task).await;
};
@ -403,7 +415,7 @@ impl TaskExecutor {
impl TaskSpawner for TaskExecutor {
fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
self.metrics.inc_regular_task();
self.metrics.inc_regular_tasks();
self.spawn(fut)
}

View File

@ -7,9 +7,12 @@ use reth_metrics::{metrics::Counter, Metrics};
pub struct TaskExecutorMetrics {
/// Number of spawned critical tasks
pub(crate) critical_tasks: Counter,
/// Number of finished spawned critical tasks
pub(crate) finished_critical_tasks: Counter,
/// Number of spawned regular tasks
pub(crate) regular_tasks: Counter,
/// Number of finished spawned regular tasks
pub(crate) finished_regular_tasks: Counter,
}
impl TaskExecutorMetrics {
@ -17,7 +20,23 @@ impl TaskExecutorMetrics {
self.critical_tasks.increment(1);
}
pub(crate) fn inc_regular_task(&self) {
pub(crate) fn inc_regular_tasks(&self) {
self.regular_tasks.increment(1);
}
}
/// Helper type for increasing counters even if a task fails.
pub struct IncCounterOnDrop(Counter);
impl IncCounterOnDrop {
/// Create a new `IncCounterOnDrop`.
pub fn new(counter: Counter) -> Self {
IncCounterOnDrop(counter)
}
}
impl Drop for IncCounterOnDrop {
fn drop(&mut self) {
self.0.increment(1);
}
}