From 59971030789fbe012cf410b7cc37651fce16d77b Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Mon, 13 Feb 2023 23:24:18 +0100 Subject: [PATCH] cli: integrate TaskExecutor (#1314) --- Cargo.lock | 1 + bin/reth/src/args/network_args.rs | 13 +++ bin/reth/src/dirs.rs | 6 ++ bin/reth/src/node/mod.rs | 107 ++++++++++++++++++------ bin/reth/src/p2p/mod.rs | 1 + bin/reth/src/runner.rs | 27 +++++- bin/reth/src/stage/mod.rs | 1 + crates/net/network/src/config.rs | 6 +- crates/net/network/src/manager.rs | 7 +- crates/net/network/src/peers/manager.rs | 9 +- crates/staged-sync/Cargo.toml | 1 + crates/staged-sync/src/config.rs | 4 + 12 files changed, 148 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 44ee12ae3..f2600736b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4715,6 +4715,7 @@ dependencies = [ "reth-provider", "reth-staged-sync", "reth-stages", + "reth-tasks", "reth-tracing", "secp256k1 0.24.3", "serde", diff --git a/bin/reth/src/args/network_args.rs b/bin/reth/src/args/network_args.rs index 793ac9906..aed1adb3b 100644 --- a/bin/reth/src/args/network_args.rs +++ b/bin/reth/src/args/network_args.rs @@ -3,6 +3,7 @@ use crate::dirs::{KnownPeersPath, PlatformPath}; use clap::Args; use reth_primitives::NodeRecord; +use std::path::PathBuf; /// Parameters for configuring the network more granularity via CLI #[derive(Debug, Args)] @@ -37,3 +38,15 @@ pub struct NetworkArgs { #[arg(long, verbatim_doc_comment, conflicts_with = "peers_file")] pub no_persist_peers: bool, } + +// === impl NetworkArgs === + +impl NetworkArgs { + /// If `no_persist_peers` is true then this returns the path to the persistent peers file + pub fn persistent_peers_file(&self) -> Option { + if self.no_persist_peers { + return None + } + Some(self.peers_file.clone().into()) + } +} diff --git a/bin/reth/src/dirs.rs b/bin/reth/src/dirs.rs index c01ca034f..b1a8aa575 100644 --- a/bin/reth/src/dirs.rs +++ b/bin/reth/src/dirs.rs @@ -170,3 +170,9 @@ impl AsRef for PlatformPath { self.0.as_path() } } + +impl From> for PathBuf { + fn from(value: PlatformPath) -> Self { + value.0 + } +} diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 271acb865..5e02cdb4d 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -10,7 +10,7 @@ use crate::{ use clap::{crate_version, Parser}; use eyre::Context; use fdlimit::raise_fd_limit; -use futures::{stream::select as stream_select, Stream, StreamExt}; +use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt}; use reth_consensus::beacon::BeaconConsensus; use reth_db::mdbx::{Env, WriteMap}; use reth_downloaders::{ @@ -23,10 +23,12 @@ use reth_interfaces::{ sync::SyncStateUpdater, }; use reth_net_nat::NatResolver; -use reth_network::{NetworkConfig, NetworkEvent, NetworkHandle}; +use reth_network::{ + error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager, +}; use reth_network_api::NetworkInfo; use reth_primitives::{BlockNumber, ChainSpec, H256}; -use reth_provider::ShareableDatabase; +use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase}; use reth_rpc_builder::{RethRpcModule, RpcServerConfig, TransportRpcModuleConfig}; use reth_staged_sync::{ utils::{ @@ -40,8 +42,9 @@ use reth_stages::{ prelude::*, stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage}, }; -use std::{io, net::SocketAddr, path::Path, sync::Arc, time::Duration}; -use tracing::{debug, info, warn}; +use reth_tasks::TaskExecutor; +use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use tracing::{debug, info, trace, warn}; /// Start the node #[derive(Debug, Parser)] @@ -106,7 +109,7 @@ pub struct Command { impl Command { /// Execute `node` command // TODO: RPC - pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { + pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> { info!(target: "reth::cli", "reth {} starting", crate_version!()); // Raise the fd limit of the process. @@ -131,9 +134,8 @@ impl Command { self.init_trusted_nodes(&mut config); info!(target: "reth::cli", "Connecting to P2P network"); - let netconf = self.load_network_config(&config, &db); - let network = netconf.start_network().await?; - + let netconf = self.load_network_config(&config, Arc::clone(&db), ctx.task_executor.clone()); + let network = self.start_network(netconf, &ctx.task_executor, ()).await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); // TODO: Use the resolved secret to spawn the Engine API server @@ -156,17 +158,17 @@ impl Command { .build_networked_pipeline(&mut config, network.clone(), &consensus, db.clone()) .await?; - tokio::spawn(handle_events(events)); + ctx.task_executor.spawn(handle_events(events)); // Run pipeline + let (rx, tx) = tokio::sync::oneshot::channel(); info!(target: "reth::cli", "Starting sync pipeline"); - pipeline.run(db.clone()).await?; + ctx.task_executor.spawn_critical("pipeline task", async move { + let res = pipeline.run(db.clone()).await; + let _ = rx.send(res); + }); - // TODO: this is where we'd handle graceful shutdown by listening to ctrl-c - - if !self.network.no_persist_peers { - dump_peers(self.network.peers_file.as_ref(), network).await?; - } + tx.await??; info!(target: "reth::cli", "Finishing up"); Ok(()) @@ -246,19 +248,49 @@ impl Command { Ok(consensus) } + /// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected + /// to that network. + async fn start_network( + &self, + config: NetworkConfig, + task_executor: &TaskExecutor, + // TODO: integrate pool + _pool: (), + ) -> Result + where + C: BlockProvider + HeaderProvider + 'static, + { + let client = config.client.clone(); + let (handle, network, _txpool, eth) = + NetworkManager::builder(config).await?.request_handler(client).split_with_handle(); + + let known_peers_file = self.network.persistent_peers_file(); + task_executor.spawn_critical_with_signal("p2p network task", |shutdown| async move { + run_network_until_shutdown(shutdown, network, known_peers_file).await + }); + + task_executor.spawn_critical("p2p eth request handler", async move { eth.await }); + + // TODO spawn pool + + Ok(handle) + } + fn load_network_config( &self, config: &Config, - db: &Arc>, + db: Arc>, + executor: TaskExecutor, ) -> NetworkConfig>>> { - let peers_file = (!self.network.no_persist_peers).then_some(&self.network.peers_file); + let peers_file = self.network.persistent_peers_file(); config.network_config( - db.clone(), + db, self.chain.clone(), self.network.disable_discovery, self.network.bootnodes.clone(), self.nat, - peers_file.map(|f| f.as_ref().to_path_buf()), + peers_file, + Some(executor), ) } @@ -310,13 +342,36 @@ impl Command { } } -/// Dumps peers to `file_path` for persistence. -async fn dump_peers(file_path: &Path, network: NetworkHandle) -> Result<(), io::Error> { - info!(target : "net::peers", file = %file_path.display(), "Saving current peers"); - let known_peers = network.peers_handle().all_peers().await; +/// Drives the [NetworkManager] future until a [Shutdown](reth_tasks::shutdown::Shutdown) signal is +/// received. If configured, this writes known peers to `persistent_peers_file` afterwards. +async fn run_network_until_shutdown( + shutdown: reth_tasks::shutdown::Shutdown, + network: NetworkManager, + persistent_peers_file: Option, +) where + C: BlockProvider + HeaderProvider + 'static, +{ + pin_mut!(network, shutdown); - tokio::fs::write(file_path, serde_json::to_string_pretty(&known_peers)?).await?; - Ok(()) + tokio::select! { + _ = &mut network => {}, + _ = shutdown => {}, + } + + if let Some(file_path) = persistent_peers_file { + let known_peers = network.all_peers().collect::>(); + if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) { + trace!(target : "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers"); + match std::fs::write(&file_path, known_peers) { + Ok(_) => { + info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file"); + } + Err(err) => { + warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file"); + } + } + } + } } /// The current high-level state of the node. diff --git a/bin/reth/src/p2p/mod.rs b/bin/reth/src/p2p/mod.rs index 5aec2f9a8..8c34b54eb 100644 --- a/bin/reth/src/p2p/mod.rs +++ b/bin/reth/src/p2p/mod.rs @@ -105,6 +105,7 @@ impl Command { None, self.nat, None, + None, ) .start_network() .await?; diff --git a/bin/reth/src/runner.rs b/bin/reth/src/runner.rs index 8e6fe7ae1..9f6212663 100644 --- a/bin/reth/src/runner.rs +++ b/bin/reth/src/runner.rs @@ -24,12 +24,15 @@ impl CliRunner { ) -> Result<(), E> where F: Future>, - E: Send + Sync + From + 'static, + E: Send + Sync + From + From + 'static, { let AsyncCliRunner { context, task_manager, tokio_runtime } = AsyncCliRunner::new()?; // Executes the command until it finished or ctrl-c was fired - tokio_runtime.block_on(run_until_ctrl_c(command(context)))?; + let task_manager = tokio_runtime.block_on(run_to_completion_or_panic( + task_manager, + run_until_ctrl_c(command(context)), + ))?; // after the command has finished or exit signal was received we drop the task manager which // fires the shutdown signal to all tasks spawned via the task executor drop(task_manager); @@ -85,7 +88,25 @@ pub fn tokio_runtime() -> Result { tokio::runtime::Builder::new_multi_thread().enable_all().build() } -/// Runs the future to completion or until a `ctrl-c` was received. +/// Runs the given future to completion or until a critical task panicked +async fn run_to_completion_or_panic(mut tasks: TaskManager, fut: F) -> Result +where + F: Future>, + E: Send + Sync + From + 'static, +{ + { + pin_mut!(fut); + tokio::select! { + err = &mut tasks => { + return Err(err.into()) + }, + res = fut => res?, + } + } + Ok(tasks) +} + +/// Runs the future to completion or until a `ctrl-c` is received. async fn run_until_ctrl_c(fut: F) -> Result<(), E> where F: Future>, diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index eb2c56730..bd18069fd 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -145,6 +145,7 @@ impl Command { None, self.nat, None, + None, ) .start_network() .await?; diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index 73a1fd999..d1989d03f 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -209,8 +209,10 @@ impl NetworkConfigBuilder { } /// Sets the executor to use for spawning tasks. - pub fn executor(mut self, executor: TaskExecutor) -> Self { - self.executor = Some(executor); + /// + /// If `None`, then [tokio::spawn] is used for spawning tasks. + pub fn executor(mut self, executor: Option) -> Self { + self.executor = executor; self } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index b2a2cd1e2..46af9c3b6 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -40,7 +40,7 @@ use reth_eth_wire::{ }; use reth_net_common::bandwidth_meter::BandwidthMeter; use reth_network_api::{EthProtocolInfo, NetworkStatus, ReputationChangeKind}; -use reth_primitives::{PeerId, H256}; +use reth_primitives::{NodeRecord, PeerId, H256}; use reth_provider::BlockProvider; use std::{ net::SocketAddr, @@ -295,6 +295,11 @@ where self.handle.peer_id() } + /// Returns an iterator over all peers in the peer set. + pub fn all_peers(&self) -> impl Iterator + '_ { + self.swarm.state().peers().iter_peers() + } + /// Returns a new [`PeersHandle`] that can be cloned and shared. /// /// The [`PeersHandle`] can be used to interact with the network's peer set. diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 7754c9f46..9cc8da481 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -163,6 +163,11 @@ impl PeersManager { PeersHandle { manager_tx: self.manager_tx.clone() } } + /// Returns an iterator over all peers + pub(crate) fn iter_peers(&self) -> impl Iterator + '_ { + self.peers.iter().map(|(peer_id, v)| NodeRecord::new(v.addr, *peer_id)) + } + /// Invoked when a new _incoming_ tcp connection is accepted. /// /// returns an error if the inbound ip address is on the ban list or @@ -638,9 +643,7 @@ impl PeersManager { let _ = tx.send(self.peers.get(&peer).cloned()); } PeerCommand::GetPeers(tx) => { - let _ = tx.send( - self.peers.iter().map(|(k, v)| NodeRecord::new(v.addr, *k)).collect(), - ); + let _ = tx.send(self.iter_peers().collect()); } } } diff --git a/crates/staged-sync/Cargo.toml b/crates/staged-sync/Cargo.toml index 143fe9e74..b11ce31f7 100644 --- a/crates/staged-sync/Cargo.toml +++ b/crates/staged-sync/Cargo.toml @@ -24,6 +24,7 @@ reth-primitives = { path = "../../crates/primitives" } reth-provider = { path = "../../crates/storage/provider", features = ["test-utils"] } reth-net-nat = { path = "../../crates/net/nat" } reth-interfaces = { path = "../interfaces", optional = true } +reth-tasks= { path = "../../crates/tasks" } # io serde = "1.0" diff --git a/crates/staged-sync/src/config.rs b/crates/staged-sync/src/config.rs index 297f8fc3d..5928cae39 100644 --- a/crates/staged-sync/src/config.rs +++ b/crates/staged-sync/src/config.rs @@ -13,6 +13,7 @@ use reth_network::{ }; use reth_primitives::{ChainSpec, NodeRecord}; use reth_provider::ShareableDatabase; +use reth_tasks::TaskExecutor; use serde::{Deserialize, Serialize}; /// Configuration for the reth node. @@ -28,6 +29,7 @@ pub struct Config { impl Config { /// Initializes network config from read data + #[allow(clippy::too_many_arguments)] pub fn network_config( &self, db: DB, @@ -36,6 +38,7 @@ impl Config { bootnodes: Option>, nat_resolution_method: reth_net_nat::NatResolver, peers_file: Option, + executor: Option, ) -> NetworkConfig> { let peer_config = self .peers @@ -49,6 +52,7 @@ impl Config { .peer_config(peer_config) .discovery(discv4) .chain_spec(chain_spec) + .executor(executor) .set_discovery(disable_discovery) .build(Arc::new(ShareableDatabase::new(db))) }