diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 1d27e1477..d4c42b76c 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -22,6 +22,7 @@ use futures_util::{ future::{select, BoxFuture}, pin_mut, Future, FutureExt, TryFutureExt, }; +use metrics::IncCounterOnDrop; use std::{ any::Any, fmt::{Display, Formatter}, @@ -271,9 +272,16 @@ impl TaskExecutor { { let on_shutdown = self.on_shutdown.clone(); - let task = async move { - pin_mut!(fut); - let _ = select(on_shutdown, fut).await; + // Clone only the specific counter that we need. + let finished_regular_tasks_metrics = self.metrics.finished_regular_tasks.clone(); + // Wrap the original future to increment the finished tasks counter upon completion + let task = { + async move { + // Create an instance of IncCounterOnDrop with the counter to increment + let _inc_counter_on_drop = IncCounterOnDrop::new(finished_regular_tasks_metrics); + pin_mut!(fut); + let _ = select(on_shutdown, fut).await; + } } .in_current_span(); @@ -341,7 +349,11 @@ impl TaskExecutor { }) .in_current_span(); + // Clone only the specific counter that we need. + let finished_critical_tasks_metrics = self.metrics.finished_critical_tasks.clone(); let task = async move { + // Create an instance of IncCounterOnDrop with the counter to increment + let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_metrics); pin_mut!(task); let _ = select(on_shutdown, task).await; }; @@ -403,7 +415,7 @@ impl TaskExecutor { impl TaskSpawner for TaskExecutor { fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { - self.metrics.inc_regular_task(); + self.metrics.inc_regular_tasks(); self.spawn(fut) } diff --git a/crates/tasks/src/metrics.rs b/crates/tasks/src/metrics.rs index 5fa6c252f..08a557d8f 100644 --- a/crates/tasks/src/metrics.rs +++ b/crates/tasks/src/metrics.rs @@ -7,9 +7,12 @@ use reth_metrics::{metrics::Counter, Metrics}; pub struct TaskExecutorMetrics { /// Number of spawned critical tasks pub(crate) critical_tasks: Counter, - + /// Number of finished spawned critical tasks + pub(crate) finished_critical_tasks: Counter, /// Number of spawned regular tasks pub(crate) regular_tasks: Counter, + /// Number of finished spawned regular tasks + pub(crate) finished_regular_tasks: Counter, } impl TaskExecutorMetrics { @@ -17,7 +20,23 @@ impl TaskExecutorMetrics { self.critical_tasks.increment(1); } - pub(crate) fn inc_regular_task(&self) { + pub(crate) fn inc_regular_tasks(&self) { self.regular_tasks.increment(1); } } + +/// Helper type for increasing counters even if a task fails. +pub struct IncCounterOnDrop(Counter); + +impl IncCounterOnDrop { + /// Create a new `IncCounterOnDrop`. + pub fn new(counter: Counter) -> Self { + IncCounterOnDrop(counter) + } +} + +impl Drop for IncCounterOnDrop { + fn drop(&mut self) { + self.0.increment(1); + } +}