diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index db7e325c1..fcdd34c4a 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -322,7 +322,7 @@ impl Command { self.network .network_config(config, self.chain.clone()) - .executor(Some(executor)) + .with_task_executor(Box::new(executor)) .set_head(head) .build(Arc::new(ShareableDatabase::new(db))) } diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index ccd465a8e..7142d41f2 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -10,7 +10,7 @@ use crate::{ use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_PORT}; use reth_primitives::{ChainSpec, ForkFilter, Head, NodeRecord, PeerId, MAINNET}; use reth_provider::{BlockProvider, HeaderProvider}; -use reth_tasks::TaskExecutor; +use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use secp256k1::{SecretKey, SECP256K1}; use std::{ collections::HashSet, @@ -68,7 +68,7 @@ pub struct NetworkConfig { /// The default mode of the network. pub network_mode: NetworkMode, /// The executor to use for spawning tasks. - pub executor: Option, + pub executor: Box, /// The `Status` message to send to peers at the beginning. pub status: Status, /// Sets the hello message for the p2p handshake in RLPx @@ -145,7 +145,7 @@ pub struct NetworkConfigBuilder { network_mode: NetworkMode, /// The executor to use for spawning tasks. #[serde(skip)] - executor: Option, + executor: Option>, /// Sets the hello message for the p2p handshake in RLPx hello_message: Option, /// Head used to start set for the fork filter and status. @@ -221,8 +221,8 @@ impl NetworkConfigBuilder { /// Sets the executor to use for spawning tasks. /// /// If `None`, then [tokio::spawn] is used for spawning tasks. - pub fn executor(mut self, executor: Option) -> Self { - self.executor = executor; + pub fn with_task_executor(mut self, executor: Box) -> Self { + self.executor = Some(executor); self } @@ -357,7 +357,7 @@ impl NetworkConfigBuilder { chain_spec, block_import: Box::::default(), network_mode, - executor, + executor: executor.unwrap_or_else(|| Box::::default()), status, hello_message, fork_filter, diff --git a/crates/net/network/src/session/mod.rs b/crates/net/network/src/session/mod.rs index a4c7d33a2..2819da8eb 100644 --- a/crates/net/network/src/session/mod.rs +++ b/crates/net/network/src/session/mod.rs @@ -25,7 +25,7 @@ use reth_net_common::{ stream::HasRemoteAddr, }; use reth_primitives::{ForkFilter, ForkId, ForkTransition, Head, PeerId}; -use reth_tasks::TaskExecutor; +use reth_tasks::TaskSpawner; use secp256k1::SecretKey; use std::{ collections::HashMap, @@ -77,7 +77,7 @@ pub(crate) struct SessionManager { /// Size of the command buffer per session. session_command_buffer: usize, /// The executor for spawned tasks. - executor: Option, + executor: Box, /// All pending session that are currently handshaking, exchanging `Hello`s. /// /// Events produced during the authentication phase are reported to this manager. Once the @@ -110,7 +110,7 @@ impl SessionManager { pub(crate) fn new( secret_key: SecretKey, config: SessionsConfig, - executor: Option, + executor: Box, status: Status, hello_message: HelloMessage, fork_filter: ForkFilter, @@ -169,11 +169,7 @@ impl SessionManager { where F: Future + Send + 'static, { - if let Some(ref executor) = self.executor { - executor.spawn(async move { f.await }); - } else { - tokio::task::spawn(async move { f.await }); - } + self.executor.spawn(async move { f.await }.boxed()); } /// Invoked on a received status update. @@ -213,7 +209,7 @@ impl SessionManager { let hello_message = self.hello_message.clone(); let status = self.status; let fork_filter = self.fork_filter.clone(); - self.spawn(Box::pin(async move { + self.spawn(async move { start_pending_incoming_session( disconnect_rx, session_id, @@ -226,7 +222,7 @@ impl SessionManager { fork_filter, ) .await - })); + }); let handle = PendingSessionHandle { disconnect_tx: Some(disconnect_tx), @@ -247,7 +243,7 @@ impl SessionManager { let fork_filter = self.fork_filter.clone(); let status = self.status; let band_with_meter = self.bandwidth_meter.clone(); - self.spawn(Box::pin(async move { + self.spawn(async move { start_pending_outbound_session( disconnect_rx, pending_events, @@ -261,7 +257,7 @@ impl SessionManager { band_with_meter, ) .await - })); + }); let handle = PendingSessionHandle { disconnect_tx: Some(disconnect_tx), diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index a2163f70b..a25ca9b03 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -66,7 +66,7 @@ pub mod shutdown; /// ``` /// /// The [TaskSpawner] trait is [DynClone] so `Box` are also `Clone`. -pub trait TaskSpawner: Send + Sync + DynClone { +pub trait TaskSpawner: Send + Sync + std::fmt::Debug + DynClone { /// Spawns the task onto the runtime. /// See also [`Handle::spawn`]. fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>;