From 4b90d424014ba7e81ef47fd68972e10d49544e37 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 13 Feb 2023 23:26:25 +0100 Subject: [PATCH] feat: add TaskSpawner trait (#1318) --- crates/tasks/Cargo.toml | 2 +- crates/tasks/src/lib.rs | 38 +++++++++++++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index 03beb9f03..08164bb58 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -15,4 +15,4 @@ futures-util = "0.3" thiserror = "1.0" [dev-dependencies] -tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread", "time", "macros"] } \ No newline at end of file +tokio = { version = "1", features = ["sync", "rt", "rt-multi-thread", "time", "macros"] } diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 0f80b027a..73b2ef8eb 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -8,7 +8,10 @@ //! reth task management use crate::shutdown::{signal, Shutdown, Signal}; -use futures_util::{future::select, pin_mut, Future, FutureExt}; +use futures_util::{ + future::{select, BoxFuture}, + pin_mut, Future, FutureExt, +}; use std::{ pin::Pin, task::{ready, Context, Poll}, @@ -23,6 +26,33 @@ use tracing_futures::Instrument; pub mod shutdown; +/// A type that can spawn tasks. +/// +/// The main purpose of this type is to abstract over [TaskExecutor] so it's more convenient to +/// provide default impls for testing. +pub trait TaskSpawner: Send + Sync { + /// Spawns the task onto the runtime. + /// See also [`Handle::spawn`]. + fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>; +} + +impl TaskSpawner for Box { + fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { + (**self).spawn(fut) + } +} + +/// An [TaskSpawner] that uses [tokio::task::spawn] to execute tasks +#[derive(Debug, Clone, Default)] +#[non_exhaustive] +pub struct TokioTaskExecutor; + +impl TaskSpawner for TokioTaskExecutor { + fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { + tokio::task::spawn(fut) + } +} + /// Many reth components require to spawn tasks for long-running jobs. For example `discovery` /// spawns tasks to handle egress and ingress of udp traffic or `network` that spawns session tasks /// that handle the traffic to and from a peer. @@ -264,6 +294,12 @@ impl TaskExecutor { } } +impl TaskSpawner for TaskExecutor { + fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> { + self.spawn(fut) + } +} + /// Determines how a task is spawned enum TaskKind { /// Spawn the task to the default executor [Handle::spawn]