feat(net): integrate TaskExecutor (#369)

This commit is contained in:
Matthias Seitz
2022-12-09 18:47:18 +01:00
committed by GitHub
parent b0149f0b9f
commit 4318a54f59
6 changed files with 34 additions and 10 deletions

1
Cargo.lock generated
View File

@ -3337,6 +3337,7 @@ dependencies = [
"reth-primitives", "reth-primitives",
"reth-rlp", "reth-rlp",
"reth-rlp-derive", "reth-rlp-derive",
"reth-tasks",
"reth-tracing", "reth-tracing",
"reth-transaction-pool", "reth-transaction-pool",
"secp256k1", "secp256k1",

View File

@ -18,6 +18,7 @@ reth-eth-wire = { path = "../eth-wire" }
reth-ecies = { path = "../ecies" } reth-ecies = { path = "../ecies" }
reth-rlp = { path = "../../common/rlp" } reth-rlp = { path = "../../common/rlp" }
reth-rlp-derive = { path = "../../common/rlp-derive" } reth-rlp-derive = { path = "../../common/rlp-derive" }
reth-tasks = { path = "../../tasks" }
reth-transaction-pool = { path = "../../transaction-pool" } reth-transaction-pool = { path = "../../transaction-pool" }
# async/futures # async/futures

View File

@ -5,6 +5,7 @@ use crate::{
}; };
use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NodeRecord, DEFAULT_DISCOVERY_PORT}; use reth_discv4::{Discv4Config, Discv4ConfigBuilder, NodeRecord, DEFAULT_DISCOVERY_PORT};
use reth_primitives::{Chain, ForkId, H256}; use reth_primitives::{Chain, ForkId, H256};
use reth_tasks::TaskExecutor;
use secp256k1::SecretKey; use secp256k1::SecretKey;
use std::{ use std::{
net::{Ipv4Addr, SocketAddr, SocketAddrV4}, net::{Ipv4Addr, SocketAddr, SocketAddrV4},
@ -40,6 +41,8 @@ pub struct NetworkConfig<C> {
pub block_import: Box<dyn BlockImport>, pub block_import: Box<dyn BlockImport>,
/// The default mode of the network. /// The default mode of the network.
pub network_mode: NetworkMode, pub network_mode: NetworkMode,
/// The executor to use for spawning tasks.
pub executor: Option<TaskExecutor>,
} }
// === impl NetworkConfig === // === impl NetworkConfig ===
@ -98,6 +101,8 @@ pub struct NetworkConfigBuilder<C> {
block_import: Box<dyn BlockImport>, block_import: Box<dyn BlockImport>,
/// The default mode of the network. /// The default mode of the network.
network_mode: NetworkMode, network_mode: NetworkMode,
/// The executor to use for spawning tasks.
executor: Option<TaskExecutor>,
} }
// === impl NetworkConfigBuilder === // === impl NetworkConfigBuilder ===
@ -119,9 +124,16 @@ impl<C> NetworkConfigBuilder<C> {
genesis_hash: Default::default(), genesis_hash: Default::default(),
block_import: Box::<ProofOfStakeBlockImport>::default(), block_import: Box::<ProofOfStakeBlockImport>::default(),
network_mode: Default::default(), network_mode: Default::default(),
executor: None,
} }
} }
/// Sets the executor to use for spawning tasks.
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
self
}
/// Sets a custom config for how sessions are handled. /// Sets a custom config for how sessions are handled.
pub fn sessions_config(mut self, config: SessionsConfig) -> Self { pub fn sessions_config(mut self, config: SessionsConfig) -> Self {
self.sessions_config = Some(config); self.sessions_config = Some(config);
@ -180,6 +192,7 @@ impl<C> NetworkConfigBuilder<C> {
genesis_hash, genesis_hash,
block_import, block_import,
network_mode, network_mode,
executor,
} = self; } = self;
NetworkConfig { NetworkConfig {
client, client,
@ -199,6 +212,7 @@ impl<C> NetworkConfigBuilder<C> {
genesis_hash, genesis_hash,
block_import, block_import,
network_mode, network_mode,
executor,
} }
} }
} }

View File

@ -122,6 +122,7 @@ where
block_import, block_import,
network_mode, network_mode,
boot_nodes, boot_nodes,
executor,
.. ..
} = config; } = config;
@ -137,7 +138,7 @@ where
// need to retrieve the addr here since provided port could be `0` // need to retrieve the addr here since provided port could be `0`
let local_peer_id = discovery.local_id(); let local_peer_id = discovery.local_id();
let sessions = SessionManager::new(secret_key, sessions_config); let sessions = SessionManager::new(secret_key, sessions_config, executor);
let state = NetworkState::new(client, discovery, peers_manger, genesis_hash); let state = NetworkState::new(client, discovery, peers_manger, genesis_hash);
let swarm = Swarm::new(incoming, sessions, state); let swarm = Swarm::new(incoming, sessions, state);

View File

@ -33,7 +33,6 @@ use std::{
use tokio::{ use tokio::{
net::TcpStream, net::TcpStream,
sync::{mpsc, oneshot}, sync::{mpsc, oneshot},
task::JoinSet,
}; };
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tracing::{instrument, trace, warn}; use tracing::{instrument, trace, warn};
@ -44,6 +43,7 @@ mod handle;
use crate::session::config::SessionCounter; use crate::session::config::SessionCounter;
pub use config::SessionsConfig; pub use config::SessionsConfig;
use reth_ecies::util::pk2id; use reth_ecies::util::pk2id;
use reth_tasks::TaskExecutor;
/// Internal identifier for active sessions. /// Internal identifier for active sessions.
#[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)] #[derive(Debug, Clone, Copy, PartialOrd, PartialEq, Eq, Hash)]
@ -68,10 +68,8 @@ pub(crate) struct SessionManager {
fork_filter: ForkFilter, fork_filter: ForkFilter,
/// Size of the command buffer per session. /// Size of the command buffer per session.
session_command_buffer: usize, session_command_buffer: usize,
/// All spawned session tasks. /// The executor for spawned tasks.
/// executor: Option<TaskExecutor>,
/// Note: If dropped, the session tasks are aborted.
spawned_tasks: JoinSet<()>,
/// All pending session that are currently handshaking, exchanging `Hello`s. /// All pending session that are currently handshaking, exchanging `Hello`s.
/// ///
/// Events produced during the authentication phase are reported to this manager. Once the /// Events produced during the authentication phase are reported to this manager. Once the
@ -99,7 +97,11 @@ pub(crate) struct SessionManager {
impl SessionManager { impl SessionManager {
/// Creates a new empty [`SessionManager`]. /// Creates a new empty [`SessionManager`].
pub(crate) fn new(secret_key: SecretKey, config: SessionsConfig) -> Self { pub(crate) fn new(
secret_key: SecretKey,
config: SessionsConfig,
executor: Option<TaskExecutor>,
) -> Self {
let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer); let (pending_sessions_tx, pending_sessions_rx) = mpsc::channel(config.session_event_buffer);
let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer); let (active_session_tx, active_session_rx) = mpsc::channel(config.session_event_buffer);
@ -121,7 +123,7 @@ impl SessionManager {
hello, hello,
fork_filter, fork_filter,
session_command_buffer: config.session_command_buffer, session_command_buffer: config.session_command_buffer,
spawned_tasks: Default::default(), executor,
pending_sessions: Default::default(), pending_sessions: Default::default(),
active_sessions: Default::default(), active_sessions: Default::default(),
pending_sessions_tx, pending_sessions_tx,
@ -139,11 +141,15 @@ impl SessionManager {
} }
/// Spawns the given future onto a new task that is tracked in the `spawned_tasks` [`JoinSet`]. /// Spawns the given future onto a new task that is tracked in the `spawned_tasks` [`JoinSet`].
fn spawn<F>(&mut self, f: F) fn spawn<F>(&self, f: F)
where where
F: Future<Output = ()> + Send + 'static, F: Future<Output = ()> + Send + 'static,
{ {
self.spawned_tasks.spawn(async move { f.await }); if let Some(ref executor) = self.executor {
executor.spawn(async move { f.await })
} else {
tokio::task::spawn(async move { f.await });
}
} }
/// Invoked on a received status update /// Invoked on a received status update

View File

@ -73,6 +73,7 @@ impl Stream for TaskManager {
} }
/// A type that can spawn new tokio tasks /// A type that can spawn new tokio tasks
#[derive(Debug, Clone)]
pub struct TaskExecutor { pub struct TaskExecutor {
/// Handle to the tokio runtime this task manager is associated with. /// Handle to the tokio runtime this task manager is associated with.
/// ///