feat: add graceful shutdown tasks (#5517)

This commit is contained in:
Matthias Seitz
2023-11-22 20:28:14 +01:00
committed by GitHub
parent de048c4561
commit de8b4526ff
3 changed files with 221 additions and 10 deletions

View File

@ -716,7 +716,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
task_executor.spawn_critical("p2p eth request handler", eth);
let known_peers_file = self.network.persistent_peers_file(default_peers_path);
task_executor.spawn_critical_with_signal("p2p network task", |shutdown| {
task_executor.spawn_critical_with_shutdown_signal("p2p network task", |shutdown| {
run_network_until_shutdown(shutdown, network, known_peers_file)
});

View File

@ -11,7 +11,7 @@
use crate::{
metrics::{IncCounterOnDrop, TaskExecutorMetrics},
shutdown::{signal, Shutdown, Signal},
shutdown::{signal, GracefulShutdown, GracefulShutdownGuard, Shutdown, Signal},
};
use dyn_clone::DynClone;
use futures_util::{
@ -22,6 +22,10 @@ use std::{
any::Any,
fmt::{Display, Formatter},
pin::Pin,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{ready, Context, Poll},
};
use tokio::{
@ -29,7 +33,7 @@ use tokio::{
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
task::JoinHandle,
};
use tracing::error;
use tracing::{debug, error};
use tracing_futures::Instrument;
pub mod metrics;
@ -147,10 +151,12 @@ pub struct TaskManager {
panicked_tasks_rx: UnboundedReceiver<PanickedTaskError>,
/// The [Signal] to fire when all tasks should be shutdown.
///
/// This is fired on drop.
_signal: Signal,
/// This is fired when dropped.
signal: Option<Signal>,
/// Receiver of the shutdown signal.
on_shutdown: Shutdown,
/// How many [GracefulShutdown] tasks are currently active
graceful_tasks: Arc<AtomicUsize>,
}
// === impl TaskManager ===
@ -159,8 +165,15 @@ impl TaskManager {
/// Create a new instance connected to the given handle's tokio runtime.
pub fn new(handle: Handle) -> Self {
let (panicked_tasks_tx, panicked_tasks_rx) = unbounded_channel();
let (_signal, on_shutdown) = signal();
Self { handle, panicked_tasks_tx, panicked_tasks_rx, _signal, on_shutdown }
let (signal, on_shutdown) = signal();
Self {
handle,
panicked_tasks_tx,
panicked_tasks_rx,
signal: Some(signal),
on_shutdown,
graceful_tasks: Arc::new(AtomicUsize::new(0)),
}
}
/// Returns a new [`TaskExecutor`] that can spawn new tasks onto the tokio runtime this type is
@ -171,8 +184,36 @@ impl TaskManager {
on_shutdown: self.on_shutdown.clone(),
panicked_tasks_tx: self.panicked_tasks_tx.clone(),
metrics: Default::default(),
graceful_tasks: Arc::clone(&self.graceful_tasks),
}
}
/// Fires the shutdown signal and awaits until all tasks are shutdown.
pub fn graceful_shutdown(self) {
let _ = self.do_graceful_shutdown(None);
}
/// Fires the shutdown signal and awaits until all tasks are shutdown.
///
/// Returns true if all tasks were shutdown before the timeout elapsed.
pub fn graceful_shutdown_with_timeout(self, timeout: std::time::Duration) -> bool {
self.do_graceful_shutdown(Some(timeout))
}
fn do_graceful_shutdown(self, timeout: Option<std::time::Duration>) -> bool {
drop(self.signal);
let when = timeout.map(|t| std::time::Instant::now() + t);
while self.graceful_tasks.load(Ordering::Relaxed) > 0 {
if when.map(|when| std::time::Instant::now() > when).unwrap_or(false) {
debug!("graceful shutdown timed out");
return false
}
std::hint::spin_loop();
}
debug!("gracefully shut down");
true
}
}
/// An endless future that resolves if a critical task panicked.
@ -232,6 +273,8 @@ pub struct TaskExecutor {
panicked_tasks_tx: UnboundedSender<PanickedTaskError>,
// Task Executor Metrics
metrics: TaskExecutorMetrics,
/// How many [GracefulShutdown] tasks are currently active
graceful_tasks: Arc<AtomicUsize>,
}
// === impl TaskExecutor ===
@ -382,7 +425,7 @@ impl TaskExecutor {
/// This spawns a critical task onto the runtime.
///
/// If this task panics, the [`TaskManager`] is notified.
pub fn spawn_critical_with_signal<F>(
pub fn spawn_critical_with_shutdown_signal<F>(
&self,
name: &'static str,
f: impl FnOnce(Shutdown) -> F,
@ -407,6 +450,55 @@ impl TaskExecutor {
self.handle.spawn(task)
}
/// This spawns a critical task onto the runtime.
///
/// If this task panics, the [TaskManager] is notified.
/// The [TaskManager] will wait until the given future has completed before shutting down.
///
/// # Example
///
/// ```no_run
/// # async fn t(executor: reth_tasks::TaskExecutor) {
///
/// executor.spawn_critical_with_graceful_shutdown_signal("grace", |shutdown| async move {
/// // await the shutdown signal
/// let guard = shutdown.await;
/// // do work before exiting the program
/// tokio::time::sleep(std::time::Duration::from_secs(1)).await;
/// // allow graceful shutdown
/// drop(guard);
/// });
/// # }
/// ```
pub fn spawn_critical_with_graceful_shutdown_signal<F>(
&self,
name: &'static str,
f: impl FnOnce(GracefulShutdown) -> F,
) -> JoinHandle<()>
where
F: Future<Output = ()> + Send + 'static,
{
let panicked_tasks_tx = self.panicked_tasks_tx.clone();
let on_shutdown = GracefulShutdown::new(
self.on_shutdown.clone(),
GracefulShutdownGuard::new(Arc::clone(&self.graceful_tasks)),
);
let fut = f(on_shutdown);
// wrap the task in catch unwind
let task = std::panic::AssertUnwindSafe(fut)
.catch_unwind()
.map_err(move |error| {
let task_error = PanickedTaskError::new(name, error);
error!("{task_error}");
let _ = panicked_tasks_tx.send(task_error);
})
.map(|_| ())
.in_current_span();
self.handle.spawn(task)
}
}
impl TaskSpawner for TaskExecutor {
@ -444,7 +536,7 @@ enum TaskKind {
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use std::{sync::atomic::AtomicBool, time::Duration};
#[test]
fn test_cloneable() {
@ -521,4 +613,70 @@ mod tests {
handle.block_on(shutdown);
}
#[test]
fn test_manager_graceful_shutdown() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let manager = TaskManager::new(handle.clone());
let executor = manager.executor();
let val = Arc::new(AtomicBool::new(false));
let c = val.clone();
executor.spawn_critical_with_graceful_shutdown_signal("grace", |shutdown| async move {
let _guard = shutdown.await;
tokio::time::sleep(Duration::from_millis(200)).await;
c.store(true, Ordering::Relaxed);
});
manager.graceful_shutdown();
assert!(val.load(Ordering::Relaxed));
}
#[test]
fn test_manager_graceful_shutdown_many() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let manager = TaskManager::new(handle.clone());
let executor = manager.executor();
let _e = executor.clone();
let counter = Arc::new(AtomicUsize::new(0));
let num = 10;
for _ in 0..num {
let c = counter.clone();
executor.spawn_critical_with_graceful_shutdown_signal(
"grace",
move |shutdown| async move {
let _guard = shutdown.await;
tokio::time::sleep(Duration::from_millis(200)).await;
c.fetch_add(1, Ordering::SeqCst);
},
);
}
manager.graceful_shutdown();
assert_eq!(counter.load(Ordering::Relaxed), num);
}
#[test]
fn test_manager_graceful_shutdown_timeout() {
let runtime = tokio::runtime::Runtime::new().unwrap();
let handle = runtime.handle().clone();
let manager = TaskManager::new(handle.clone());
let executor = manager.executor();
let timeout = Duration::from_millis(500);
let val = Arc::new(AtomicBool::new(false));
let val2 = val.clone();
executor.spawn_critical_with_graceful_shutdown_signal("grace", |shutdown| async move {
let _guard = shutdown.await;
tokio::time::sleep(timeout * 3).await;
val2.store(true, Ordering::Relaxed);
unreachable!("should not be reached");
});
manager.graceful_shutdown_with_timeout(timeout);
assert!(!val.load(Ordering::Relaxed));
}
}

View File

@ -7,10 +7,63 @@ use futures_util::{
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
sync::{atomic::AtomicUsize, Arc},
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
/// A Future that resolves when the shutdown event has been fired.
///
/// The [TaskManager](crate)
#[derive(Debug)]
pub struct GracefulShutdown {
shutdown: Shutdown,
guard: Option<GracefulShutdownGuard>,
}
impl GracefulShutdown {
pub(crate) fn new(shutdown: Shutdown, guard: GracefulShutdownGuard) -> Self {
Self { shutdown, guard: Some(guard) }
}
}
impl Future for GracefulShutdown {
type Output = GracefulShutdownGuard;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
ready!(self.shutdown.poll_unpin(cx));
Poll::Ready(self.get_mut().guard.take().expect("Future polled after completion"))
}
}
impl Clone for GracefulShutdown {
fn clone(&self) -> Self {
Self {
shutdown: self.shutdown.clone(),
guard: self.guard.as_ref().map(|g| GracefulShutdownGuard::new(Arc::clone(&g.0))),
}
}
}
/// A guard that fires once dropped to signal the [TaskManager](crate::TaskManager) that the
/// [GracefulShutdown] has completed.
#[derive(Debug)]
#[must_use = "if unused the task will not be gracefully shutdown"]
pub struct GracefulShutdownGuard(Arc<AtomicUsize>);
impl GracefulShutdownGuard {
pub(crate) fn new(counter: Arc<AtomicUsize>) -> Self {
counter.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Self(counter)
}
}
impl Drop for GracefulShutdownGuard {
fn drop(&mut self) {
self.0.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
}
}
/// A Future that resolves when the shutdown event has been fired.
#[derive(Debug, Clone)]
pub struct Shutdown(Shared<oneshot::Receiver<()>>);