From 2f6183614fc2f0280072e9ca51302f9901372673 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Tue, 18 Apr 2023 01:06:49 +0200 Subject: [PATCH] chore: run commands as blocking task (#2286) --- bin/reth/src/chain/init.rs | 4 ++-- bin/reth/src/cli.rs | 19 +++++++++++-------- bin/reth/src/db/mod.rs | 5 ++--- bin/reth/src/drop_stage.rs | 2 +- bin/reth/src/dump_stage/mod.rs | 27 +++++++++++++-------------- bin/reth/src/runner.rs | 17 +++++++++++++++++ bin/reth/src/stage/mod.rs | 2 +- 7 files changed, 47 insertions(+), 29 deletions(-) diff --git a/bin/reth/src/chain/init.rs b/bin/reth/src/chain/init.rs index cefd173f6..ab349344b 100644 --- a/bin/reth/src/chain/init.rs +++ b/bin/reth/src/chain/init.rs @@ -41,7 +41,7 @@ pub struct InitCommand { impl InitCommand { /// Execute the `init` command - pub async fn execute(&self) -> eyre::Result<()> { + pub async fn execute(self) -> eyre::Result<()> { info!(target: "reth::cli", "reth init starting"); // add network name to db directory @@ -52,7 +52,7 @@ impl InitCommand { info!(target: "reth::cli", "Database opened"); info!(target: "reth::cli", "Writing genesis block"); - let hash = init_genesis(db, self.chain.clone())?; + let hash = init_genesis(db, self.chain)?; info!(target: "reth::cli", hash = ?hash, "Genesis block written"); Ok(()) diff --git a/bin/reth/src/cli.rs b/bin/reth/src/cli.rs index 20fb9c4e3..6ba888ae3 100644 --- a/bin/reth/src/cli.rs +++ b/bin/reth/src/cli.rs @@ -1,6 +1,4 @@ //! CLI definition and entrypoint to executable -use std::str::FromStr; - use crate::{ chain, config, db, dirs::{LogsDir, PlatformPath}, @@ -14,6 +12,7 @@ use reth_tracing::{ tracing_subscriber::{filter::Directive, registry::LookupSpan}, BoxedLayer, FileWorkerGuard, }; +use std::str::FromStr; /// Parse CLI options, set up logging and run the chosen command. pub fn run() -> eyre::Result<()> { @@ -29,12 +28,16 @@ pub fn run() -> eyre::Result<()> { match opt.command { Commands::Node(command) => runner.run_command_until_exit(|ctx| command.execute(ctx)), - Commands::Init(command) => runner.run_until_ctrl_c(command.execute()), - Commands::Import(command) => runner.run_until_ctrl_c(command.execute()), - Commands::Db(command) => runner.run_until_ctrl_c(command.execute()), - Commands::Stage(command) => runner.run_until_ctrl_c(command.execute()), - Commands::DumpStage(command) => runner.run_until_ctrl_c(command.execute()), - Commands::DropStage(command) => runner.run_until_ctrl_c(command.execute()), + Commands::Init(command) => runner.run_blocking_until_ctrl_c(command.execute()), + Commands::Import(command) => runner.run_blocking_until_ctrl_c(command.execute()), + Commands::Db(command) => runner.run_blocking_until_ctrl_c(command.execute()), + Commands::Stage(command) => runner.run_blocking_until_ctrl_c(command.execute()), + Commands::DumpStage(command) => { + // TODO: This should be run_blocking_until_ctrl_c as well, but fails to compile due to + // weird compiler GAT issues. + runner.run_until_ctrl_c(command.execute()) + } + Commands::DropStage(command) => runner.run_blocking_until_ctrl_c(command.execute()), Commands::P2P(command) => runner.run_until_ctrl_c(command.execute()), Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()), Commands::TestEthChain(command) => runner.run_until_ctrl_c(command.execute()), diff --git a/bin/reth/src/db/mod.rs b/bin/reth/src/db/mod.rs index 9eafc92e6..3a2dfa41d 100644 --- a/bin/reth/src/db/mod.rs +++ b/bin/reth/src/db/mod.rs @@ -1,6 +1,4 @@ //! Database debugging tool -use std::sync::Arc; - use crate::{ dirs::{DbPath, MaybePlatformPath}, utils::DbTool, @@ -12,6 +10,7 @@ use human_bytes::human_bytes; use reth_db::{database::Database, tables}; use reth_primitives::ChainSpec; use reth_staged_sync::utils::chainspec::genesis_value_parser; +use std::sync::Arc; use tracing::error; /// DB List TUI @@ -85,7 +84,7 @@ pub struct ListArgs { impl Command { /// Execute `db` command - pub async fn execute(&self) -> eyre::Result<()> { + pub async fn execute(self) -> eyre::Result<()> { // add network name to db directory let db_path = self.db.unwrap_or_chain_default(self.chain.chain); diff --git a/bin/reth/src/drop_stage.rs b/bin/reth/src/drop_stage.rs index 410b83649..8849bd0f5 100644 --- a/bin/reth/src/drop_stage.rs +++ b/bin/reth/src/drop_stage.rs @@ -54,7 +54,7 @@ pub struct Command { impl Command { /// Execute `db` command - pub async fn execute(&self) -> eyre::Result<()> { + pub async fn execute(self) -> eyre::Result<()> { // add network name to db directory let db_path = self.db.unwrap_or_chain_default(self.chain.chain); diff --git a/bin/reth/src/dump_stage/mod.rs b/bin/reth/src/dump_stage/mod.rs index cbe32b5a8..54403f7bd 100644 --- a/bin/reth/src/dump_stage/mod.rs +++ b/bin/reth/src/dump_stage/mod.rs @@ -1,7 +1,18 @@ //! Database debugging tool -mod hashing_storage; +use crate::{ + dirs::{DbPath, MaybePlatformPath, PlatformPath}, + utils::DbTool, +}; +use clap::Parser; +use reth_db::{ + cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx, +}; +use reth_primitives::ChainSpec; +use reth_staged_sync::utils::{chainspec::genesis_value_parser, init::init_db}; use std::sync::Arc; +use tracing::info; +mod hashing_storage; use hashing_storage::dump_hashing_storage_stage; mod hashing_account; @@ -12,18 +23,6 @@ use execution::dump_execution_stage; mod merkle; use merkle::dump_merkle_stage; -use reth_primitives::ChainSpec; - -use crate::{ - dirs::{DbPath, MaybePlatformPath, PlatformPath}, - utils::DbTool, -}; -use clap::Parser; -use reth_db::{ - cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx, -}; -use reth_staged_sync::utils::{chainspec::genesis_value_parser, init::init_db}; -use tracing::info; /// `reth dump-stage` command #[derive(Debug, Parser)] @@ -98,7 +97,7 @@ pub struct StageCommand { impl Command { /// Execute `dump-stage` command - pub async fn execute(&self) -> eyre::Result<()> { + pub async fn execute(self) -> eyre::Result<()> { // add network name to db directory let db_path = self.db.unwrap_or_chain_default(self.chain.chain); diff --git a/bin/reth/src/runner.rs b/bin/reth/src/runner.rs index 065e840d1..e378f3bb0 100644 --- a/bin/reth/src/runner.rs +++ b/bin/reth/src/runner.rs @@ -60,6 +60,23 @@ impl CliRunner { tokio_runtime.block_on(run_until_ctrl_c(fut))?; Ok(()) } + + /// Executes a regular future as a spawned blocking task until completion or until external + /// signal received. + /// + /// See [Runtime::spawn_blocking](tokio::runtime::Runtime::spawn_blocking) . + pub fn run_blocking_until_ctrl_c(self, fut: F) -> Result<(), E> + where + F: Future> + Send + 'static, + E: Send + Sync + From + 'static, + { + let tokio_runtime = tokio_runtime()?; + let handle = tokio_runtime.handle().clone(); + let fut = tokio_runtime.handle().spawn_blocking(move || handle.block_on(fut)); + tokio_runtime + .block_on(run_until_ctrl_c(async move { fut.await.expect("Failed to join task") }))?; + Ok(()) + } } /// [CliRunner] configuration when executing commands asynchronously diff --git a/bin/reth/src/stage/mod.rs b/bin/reth/src/stage/mod.rs index 84ead7d29..589a5533b 100644 --- a/bin/reth/src/stage/mod.rs +++ b/bin/reth/src/stage/mod.rs @@ -94,7 +94,7 @@ pub struct Command { impl Command { /// Execute `stage` command - pub async fn execute(&self) -> eyre::Result<()> { + pub async fn execute(self) -> eyre::Result<()> { // Raise the fd limit of the process. // Does not do anything on windows. fdlimit::raise_fd_limit();