feat: add TaskSpawner trait (#1318)

This commit is contained in:
Matthias Seitz
2023-02-13 23:26:25 +01:00
committed by GitHub
parent 2c0557d991
commit 4b90d42401
2 changed files with 38 additions and 2 deletions

View File

@ -15,4 +15,4 @@ futures-util = "0.3"
thiserror = "1.0" thiserror = "1.0"
[dev-dependencies] [dev-dependencies]
tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread", "time", "macros"] } tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread", "time", "macros"] }

View File

@ -8,7 +8,10 @@
//! reth task management //! reth task management
use crate::shutdown::{signal, Shutdown, Signal}; use crate::shutdown::{signal, Shutdown, Signal};
use futures_util::{future::select, pin_mut, Future, FutureExt}; use futures_util::{
future::{select, BoxFuture},
pin_mut, Future, FutureExt,
};
use std::{ use std::{
pin::Pin, pin::Pin,
task::{ready, Context, Poll}, task::{ready, Context, Poll},
@ -23,6 +26,33 @@ use tracing_futures::Instrument;
pub mod shutdown; pub mod shutdown;
/// A type that can spawn tasks.
///
/// The main purpose of this type is to abstract over [TaskExecutor] so it's more convenient to
/// provide default impls for testing.
pub trait TaskSpawner: Send + Sync {
/// Spawns the task onto the runtime.
/// See also [`Handle::spawn`].
fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>;
}
impl TaskSpawner for Box<dyn TaskSpawner> {
fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
(**self).spawn(fut)
}
}
/// An [TaskSpawner] that uses [tokio::task::spawn] to execute tasks
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct TokioTaskExecutor;
impl TaskSpawner for TokioTaskExecutor {
fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
tokio::task::spawn(fut)
}
}
/// Many reth components require to spawn tasks for long-running jobs. For example `discovery` /// Many reth components require to spawn tasks for long-running jobs. For example `discovery`
/// spawns tasks to handle egress and ingress of udp traffic or `network` that spawns session tasks /// spawns tasks to handle egress and ingress of udp traffic or `network` that spawns session tasks
/// that handle the traffic to and from a peer. /// that handle the traffic to and from a peer.
@ -264,6 +294,12 @@ impl TaskExecutor {
} }
} }
impl TaskSpawner for TaskExecutor {
fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
self.spawn(fut)
}
}
/// Determines how a task is spawned /// Determines how a task is spawned
enum TaskKind { enum TaskKind {
/// Spawn the task to the default executor [Handle::spawn] /// Spawn the task to the default executor [Handle::spawn]