chore: integrate Boxed TaskSpawner (#1356)

This commit is contained in:
Matthias Seitz
2023-02-14 21:44:22 +01:00
committed by GitHub
parent 87ba4840e9
commit f995e66c96
4 changed files with 16 additions and 20 deletions

View File

@ -322,7 +322,7 @@ impl Command {
self.network
.network_config(config, self.chain.clone())
.executor(Some(executor))
.with_task_executor(Box::new(executor))
.set_head(head)
.build(Arc::new(ShareableDatabase::new(db)))
}

View File

@ -10,7 +10,7 @@ use crate::{
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, DEFAULT_DISCOVERY_PORT};
use reth_primitives::{ChainSpec, ForkFilter, Head, NodeRecord, PeerId, MAINNET};
use reth_provider::{BlockProvider, HeaderProvider};
use reth_tasks::TaskExecutor;
use reth_tasks::{TaskSpawner, TokioTaskExecutor};
use secp256k1::{SecretKey, SECP256K1};
use std::{
collections::HashSet,
@ -68,7 +68,7 @@ pub struct NetworkConfig<C> {
/// The default mode of the network.
pub network_mode: NetworkMode,
/// The executor to use for spawning tasks.
pub executor: Option<TaskExecutor>,
pub executor: Box<dyn TaskSpawner>,
/// The `Status` message to send to peers at the beginning.
pub status: Status,
/// Sets the hello message for the p2p handshake in RLPx
@ -145,7 +145,7 @@ pub struct NetworkConfigBuilder {
network_mode: NetworkMode,
/// The executor to use for spawning tasks.
#[serde(skip)]
executor: Option<TaskExecutor>,
executor: Option<Box<dyn TaskSpawner>>,
/// Sets the hello message for the p2p handshake in RLPx
hello_message: Option<HelloMessage>,
/// Head used to start set for the fork filter and status.
@ -221,8 +221,8 @@ impl NetworkConfigBuilder {
/// Sets the executor to use for spawning tasks.
///
/// If `None`, then [tokio::spawn] is used for spawning tasks.
pub fn executor(mut self, executor: Option<TaskExecutor>) -> Self {
self.executor = executor;
pub fn with_task_executor(mut self, executor: Box<dyn TaskSpawner>) -> Self {
self.executor = Some(executor);
self
}
@ -357,7 +357,7 @@ impl NetworkConfigBuilder {
chain_spec,
block_import: Box::<ProofOfStakeBlockImport>::default(),
network_mode,
executor,
executor: executor.unwrap_or_else(|| Box::<TokioTaskExecutor>::default()),
status,
hello_message,
fork_filter,

View File

@ -25,7 +25,7 @@ use reth_net_common::{
stream::HasRemoteAddr,
};
use reth_primitives::{ForkFilter, ForkId, ForkTransition, Head, PeerId};
use reth_tasks::TaskExecutor;
use reth_tasks::TaskSpawner;
use secp256k1::SecretKey;
use std::{
collections::HashMap,
@ -77,7 +77,7 @@ pub(crate) struct SessionManager {
/// Size of the command buffer per session.
session_command_buffer: usize,
/// The executor for spawned tasks.
executor: Option<TaskExecutor>,
executor: Box<dyn TaskSpawner>,
/// All pending session that are currently handshaking, exchanging `Hello`s.
///
/// Events produced during the authentication phase are reported to this manager. Once the
@ -110,7 +110,7 @@ impl SessionManager {
pub(crate) fn new(
secret_key: SecretKey,
config: SessionsConfig,
executor: Option<TaskExecutor>,
executor: Box<dyn TaskSpawner>,
status: Status,
hello_message: HelloMessage,
fork_filter: ForkFilter,
@ -169,11 +169,7 @@ impl SessionManager {
where
F: Future<Output = ()> + Send + 'static,
{
if let Some(ref executor) = self.executor {
executor.spawn(async move { f.await });
} else {
tokio::task::spawn(async move { f.await });
}
self.executor.spawn(async move { f.await }.boxed());
}
/// Invoked on a received status update.
@ -213,7 +209,7 @@ impl SessionManager {
let hello_message = self.hello_message.clone();
let status = self.status;
let fork_filter = self.fork_filter.clone();
self.spawn(Box::pin(async move {
self.spawn(async move {
start_pending_incoming_session(
disconnect_rx,
session_id,
@ -226,7 +222,7 @@ impl SessionManager {
fork_filter,
)
.await
}));
});
let handle = PendingSessionHandle {
disconnect_tx: Some(disconnect_tx),
@ -247,7 +243,7 @@ impl SessionManager {
let fork_filter = self.fork_filter.clone();
let status = self.status;
let band_with_meter = self.bandwidth_meter.clone();
self.spawn(Box::pin(async move {
self.spawn(async move {
start_pending_outbound_session(
disconnect_rx,
pending_events,
@ -261,7 +257,7 @@ impl SessionManager {
band_with_meter,
)
.await
}));
});
let handle = PendingSessionHandle {
disconnect_tx: Some(disconnect_tx),

View File

@ -66,7 +66,7 @@ pub mod shutdown;
/// ```
///
/// The [TaskSpawner] trait is [DynClone] so `Box<dyn TaskSpawner>` are also `Clone`.
pub trait TaskSpawner: Send + Sync + DynClone {
pub trait TaskSpawner: Send + Sync + std::fmt::Debug + DynClone {
/// Spawns the task onto the runtime.
/// See also [`Handle::spawn`].
fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()>;