From 48d121dc54675ab83c36a22e52de8f2863ea0f29 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Mon, 13 Feb 2023 19:28:19 -0800 Subject: [PATCH] Revert "cli: integrate TaskExecutor" (#1329) --- Cargo.lock | 1 - bin/reth/src/args/network_args.rs | 13 --- bin/reth/src/dirs.rs | 6 -- bin/reth/src/node/mod.rs | 107 ++++++------------------ bin/reth/src/p2p/mod.rs | 1 - bin/reth/src/runner.rs | 27 +----- bin/reth/src/stage/mod.rs | 1 - crates/net/network/src/config.rs | 6 +- crates/net/network/src/manager.rs | 7 +- crates/net/network/src/peers/manager.rs | 9 +- crates/staged-sync/Cargo.toml | 1 - crates/staged-sync/src/config.rs | 4 - 12 files changed, 35 insertions(+), 148 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f2600736b..44ee12ae3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4715,7 +4715,6 @@ dependencies = [ "reth-provider", "reth-staged-sync", "reth-stages", - "reth-tasks", "reth-tracing", "secp256k1 0.24.3", "serde", diff --git a/bin/reth/src/args/network_args.rs b/bin/reth/src/args/network_args.rs index aed1adb3b..793ac9906 100644 --- a/bin/reth/src/args/network_args.rs +++ b/bin/reth/src/args/network_args.rs @@ -3,7 +3,6 @@ use crate::dirs::{KnownPeersPath, PlatformPath}; use clap::Args; use reth_primitives::NodeRecord; -use std::path::PathBuf; /// Parameters for configuring the network more granularity via CLI #[derive(Debug, Args)] @@ -38,15 +37,3 @@ pub struct NetworkArgs { #[arg(long, verbatim_doc_comment, conflicts_with = "peers_file")] pub no_persist_peers: bool, } - -// === impl NetworkArgs === - -impl NetworkArgs { - /// If `no_persist_peers` is true then this returns the path to the persistent peers file - pub fn persistent_peers_file(&self) -> Option { - if self.no_persist_peers { - return None - } - Some(self.peers_file.clone().into()) - } -} diff --git a/bin/reth/src/dirs.rs b/bin/reth/src/dirs.rs index b1a8aa575..c01ca034f 100644 --- a/bin/reth/src/dirs.rs +++ b/bin/reth/src/dirs.rs @@ -170,9 +170,3 @@ impl AsRef for PlatformPath { self.0.as_path() } } - -impl From> for PathBuf { - fn from(value: PlatformPath) -> Self { - value.0 - } -} diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index 5e02cdb4d..271acb865 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -10,7 +10,7 @@ use crate::{ use clap::{crate_version, Parser}; use eyre::Context; use fdlimit::raise_fd_limit; -use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt}; +use futures::{stream::select as stream_select, Stream, StreamExt}; use reth_consensus::beacon::BeaconConsensus; use reth_db::mdbx::{Env, WriteMap}; use reth_downloaders::{ @@ -23,12 +23,10 @@ use reth_interfaces::{ sync::SyncStateUpdater, }; use reth_net_nat::NatResolver; -use reth_network::{ - error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager, -}; +use reth_network::{NetworkConfig, NetworkEvent, NetworkHandle}; use reth_network_api::NetworkInfo; use reth_primitives::{BlockNumber, ChainSpec, H256}; -use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase}; +use reth_provider::ShareableDatabase; use reth_rpc_builder::{RethRpcModule, RpcServerConfig, TransportRpcModuleConfig}; use reth_staged_sync::{ utils::{ @@ -42,9 +40,8 @@ use reth_stages::{ prelude::*, stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage}, }; -use reth_tasks::TaskExecutor; -use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; -use tracing::{debug, info, trace, warn}; +use std::{io, net::SocketAddr, path::Path, sync::Arc, time::Duration}; +use tracing::{debug, info, warn}; /// Start the node #[derive(Debug, Parser)] @@ -109,7 +106,7 @@ pub struct Command { impl Command { /// Execute `node` command // TODO: RPC - pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> { + pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { info!(target: "reth::cli", "reth {} starting", crate_version!()); // Raise the fd limit of the process. @@ -134,8 +131,9 @@ impl Command { self.init_trusted_nodes(&mut config); info!(target: "reth::cli", "Connecting to P2P network"); - let netconf = self.load_network_config(&config, Arc::clone(&db), ctx.task_executor.clone()); - let network = self.start_network(netconf, &ctx.task_executor, ()).await?; + let netconf = self.load_network_config(&config, &db); + let network = netconf.start_network().await?; + info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); // TODO: Use the resolved secret to spawn the Engine API server @@ -158,17 +156,17 @@ impl Command { .build_networked_pipeline(&mut config, network.clone(), &consensus, db.clone()) .await?; - ctx.task_executor.spawn(handle_events(events)); + tokio::spawn(handle_events(events)); // Run pipeline - let (rx, tx) = tokio::sync::oneshot::channel(); info!(target: "reth::cli", "Starting sync pipeline"); - ctx.task_executor.spawn_critical("pipeline task", async move { - let res = pipeline.run(db.clone()).await; - let _ = rx.send(res); - }); + pipeline.run(db.clone()).await?; - tx.await??; + // TODO: this is where we'd handle graceful shutdown by listening to ctrl-c + + if !self.network.no_persist_peers { + dump_peers(self.network.peers_file.as_ref(), network).await?; + } info!(target: "reth::cli", "Finishing up"); Ok(()) @@ -248,49 +246,19 @@ impl Command { Ok(consensus) } - /// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected - /// to that network. - async fn start_network( - &self, - config: NetworkConfig, - task_executor: &TaskExecutor, - // TODO: integrate pool - _pool: (), - ) -> Result - where - C: BlockProvider + HeaderProvider + 'static, - { - let client = config.client.clone(); - let (handle, network, _txpool, eth) = - NetworkManager::builder(config).await?.request_handler(client).split_with_handle(); - - let known_peers_file = self.network.persistent_peers_file(); - task_executor.spawn_critical_with_signal("p2p network task", |shutdown| async move { - run_network_until_shutdown(shutdown, network, known_peers_file).await - }); - - task_executor.spawn_critical("p2p eth request handler", async move { eth.await }); - - // TODO spawn pool - - Ok(handle) - } - fn load_network_config( &self, config: &Config, - db: Arc>, - executor: TaskExecutor, + db: &Arc>, ) -> NetworkConfig>>> { - let peers_file = self.network.persistent_peers_file(); + let peers_file = (!self.network.no_persist_peers).then_some(&self.network.peers_file); config.network_config( - db, + db.clone(), self.chain.clone(), self.network.disable_discovery, self.network.bootnodes.clone(), self.nat, - peers_file, - Some(executor), + peers_file.map(|f| f.as_ref().to_path_buf()), ) } @@ -342,36 +310,13 @@ impl Command { } } -/// Drives the [NetworkManager] future until a [Shutdown](reth_tasks::shutdown::Shutdown) signal is -/// received. If configured, this writes known peers to `persistent_peers_file` afterwards. -async fn run_network_until_shutdown( - shutdown: reth_tasks::shutdown::Shutdown, - network: NetworkManager, - persistent_peers_file: Option, -) where - C: BlockProvider + HeaderProvider + 'static, -{ - pin_mut!(network, shutdown); +/// Dumps peers to `file_path` for persistence. +async fn dump_peers(file_path: &Path, network: NetworkHandle) -> Result<(), io::Error> { + info!(target : "net::peers", file = %file_path.display(), "Saving current peers"); + let known_peers = network.peers_handle().all_peers().await; - tokio::select! { - _ = &mut network => {}, - _ = shutdown => {}, - } - - if let Some(file_path) = persistent_peers_file { - let known_peers = network.all_peers().collect::>(); - if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) { - trace!(target : "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers"); - match std::fs::write(&file_path, known_peers) { - Ok(_) => { - info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file"); - } - Err(err) => { - warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file"); - } - } - } - } + tokio::fs::write(file_path, serde_json::to_string_pretty(&known_peers)?).await?; + Ok(()) } /// The current high-level state of the node. diff --git a/bin/reth/src/p2p/mod.rs b/bin/reth/src/p2p/mod.rs index 8c34b54eb..5aec2f9a8 100644 --- a/bin/reth/src/p2p/mod.rs +++ b/bin/reth/src/p2p/mod.rs @@ -105,7 +105,6 @@ impl Command { None, self.nat, None, - None, ) .start_network() .await?; diff --git a/bin/reth/src/runner.rs b/bin/reth/src/runner.rs index 9f6212663..8e6fe7ae1 100644 --- a/bin/reth/src/runner.rs +++ b/bin/reth/src/runner.rs @@ -24,15 +24,12 @@ impl CliRunner { ) -> Result<(), E> where F: Future>, - E: Send + Sync + From + From + 'static, + E: Send + Sync + From + 'static, { let AsyncCliRunner { context, task_manager, tokio_runtime } = AsyncCliRunner::new()?; // Executes the command until it finished or ctrl-c was fired - let task_manager = tokio_runtime.block_on(run_to_completion_or_panic( - task_manager, - run_until_ctrl_c(command(context)), - ))?; + tokio_runtime.block_on(run_until_ctrl_c(command(context)))?; // after the command has finished or exit signal was received we drop the task manager which // fires the shutdown signal to all tasks spawned via the task executor drop(task_manager); @@ -88,25 +85,7 @@ pub fn tokio_runtime() -> Result { tokio::runtime::Builder::new_multi_thread().enable_all().build() } -/// Runs the given future to completion or until a critical task panicked -async fn run_to_completion_or_panic(mut tasks: TaskManager, fut: F) -> Result -where - F: Future>, - E: Send + Sync + From + 'static, -{ - { - pin_mut!(fut); - tokio::select! { - err = &mut tasks => { - return Err(err.into()) - }, - res = fut => res?, - } - } - Ok(tasks) -} - -/// Runs the future to completion or until a `ctrl-c` is received. +/// Runs the future to completion or until a `ctrl-c` was received. async fn run_until_ctrl_c(fut: F) -> Result<(), E> where F: Future>, diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index bd18069fd..eb2c56730 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -145,7 +145,6 @@ impl Command { None, self.nat, None, - None, ) .start_network() .await?; diff --git a/crates/net/network/src/config.rs b/crates/net/network/src/config.rs index d1989d03f..73a1fd999 100644 --- a/crates/net/network/src/config.rs +++ b/crates/net/network/src/config.rs @@ -209,10 +209,8 @@ impl NetworkConfigBuilder { } /// Sets the executor to use for spawning tasks. - /// - /// If `None`, then [tokio::spawn] is used for spawning tasks. - pub fn executor(mut self, executor: Option) -> Self { - self.executor = executor; + pub fn executor(mut self, executor: TaskExecutor) -> Self { + self.executor = Some(executor); self } diff --git a/crates/net/network/src/manager.rs b/crates/net/network/src/manager.rs index 46af9c3b6..b2a2cd1e2 100644 --- a/crates/net/network/src/manager.rs +++ b/crates/net/network/src/manager.rs @@ -40,7 +40,7 @@ use reth_eth_wire::{ }; use reth_net_common::bandwidth_meter::BandwidthMeter; use reth_network_api::{EthProtocolInfo, NetworkStatus, ReputationChangeKind}; -use reth_primitives::{NodeRecord, PeerId, H256}; +use reth_primitives::{PeerId, H256}; use reth_provider::BlockProvider; use std::{ net::SocketAddr, @@ -295,11 +295,6 @@ where self.handle.peer_id() } - /// Returns an iterator over all peers in the peer set. - pub fn all_peers(&self) -> impl Iterator + '_ { - self.swarm.state().peers().iter_peers() - } - /// Returns a new [`PeersHandle`] that can be cloned and shared. /// /// The [`PeersHandle`] can be used to interact with the network's peer set. diff --git a/crates/net/network/src/peers/manager.rs b/crates/net/network/src/peers/manager.rs index 9cc8da481..7754c9f46 100644 --- a/crates/net/network/src/peers/manager.rs +++ b/crates/net/network/src/peers/manager.rs @@ -163,11 +163,6 @@ impl PeersManager { PeersHandle { manager_tx: self.manager_tx.clone() } } - /// Returns an iterator over all peers - pub(crate) fn iter_peers(&self) -> impl Iterator + '_ { - self.peers.iter().map(|(peer_id, v)| NodeRecord::new(v.addr, *peer_id)) - } - /// Invoked when a new _incoming_ tcp connection is accepted. /// /// returns an error if the inbound ip address is on the ban list or @@ -643,7 +638,9 @@ impl PeersManager { let _ = tx.send(self.peers.get(&peer).cloned()); } PeerCommand::GetPeers(tx) => { - let _ = tx.send(self.iter_peers().collect()); + let _ = tx.send( + self.peers.iter().map(|(k, v)| NodeRecord::new(v.addr, *k)).collect(), + ); } } } diff --git a/crates/staged-sync/Cargo.toml b/crates/staged-sync/Cargo.toml index b11ce31f7..143fe9e74 100644 --- a/crates/staged-sync/Cargo.toml +++ b/crates/staged-sync/Cargo.toml @@ -24,7 +24,6 @@ reth-primitives = { path = "../../crates/primitives" } reth-provider = { path = "../../crates/storage/provider", features = ["test-utils"] } reth-net-nat = { path = "../../crates/net/nat" } reth-interfaces = { path = "../interfaces", optional = true } -reth-tasks= { path = "../../crates/tasks" } # io serde = "1.0" diff --git a/crates/staged-sync/src/config.rs b/crates/staged-sync/src/config.rs index 5928cae39..297f8fc3d 100644 --- a/crates/staged-sync/src/config.rs +++ b/crates/staged-sync/src/config.rs @@ -13,7 +13,6 @@ use reth_network::{ }; use reth_primitives::{ChainSpec, NodeRecord}; use reth_provider::ShareableDatabase; -use reth_tasks::TaskExecutor; use serde::{Deserialize, Serialize}; /// Configuration for the reth node. @@ -29,7 +28,6 @@ pub struct Config { impl Config { /// Initializes network config from read data - #[allow(clippy::too_many_arguments)] pub fn network_config( &self, db: DB, @@ -38,7 +36,6 @@ impl Config { bootnodes: Option>, nat_resolution_method: reth_net_nat::NatResolver, peers_file: Option, - executor: Option, ) -> NetworkConfig> { let peer_config = self .peers @@ -52,7 +49,6 @@ impl Config { .peer_config(peer_config) .discovery(discv4) .chain_spec(chain_spec) - .executor(executor) .set_discovery(disable_discovery) .build(Arc::new(ShareableDatabase::new(db))) }