feat(cli): integrate task manager and ctrl-c handling (#1292)

This commit is contained in:
Matthias Seitz
2023-02-11 22:09:06 +01:00
committed by GitHub
parent 56f35f8b62
commit f35eba6d44
7 changed files with 127 additions and 14 deletions

1
Cargo.lock generated
View File

@ -4075,6 +4075,7 @@ dependencies = [
"reth-rpc-builder",
"reth-staged-sync",
"reth-stages",
"reth-tasks",
"reth-tracing",
"reth-transaction-pool",
"serde",

View File

@ -26,6 +26,7 @@ reth-network = {path = "../../crates/net/network", features = ["serde"] }
reth-network-api = {path = "../../crates/net/network-api" }
reth-downloaders = {path = "../../crates/net/downloaders", features = ["test-utils"] }
reth-tracing = { path = "../../crates/tracing" }
reth-tasks = { path = "../../crates/tasks" }
reth-net-nat = { path = "../../crates/net/nat" }
reth-discv4 = { path = "../../crates/net/discv4" }

View File

@ -4,7 +4,9 @@ use std::str::FromStr;
use crate::{
chain, db,
dirs::{LogsDir, PlatformPath},
node, p2p, stage, test_eth_chain, test_vectors,
node, p2p,
runner::CliRunner,
stage, test_eth_chain, test_vectors,
};
use clap::{ArgAction, Args, Parser, Subcommand};
use reth_tracing::{
@ -14,21 +16,23 @@ use reth_tracing::{
};
/// Parse CLI options, set up logging and run the chosen command.
pub async fn run() -> eyre::Result<()> {
pub fn run() -> eyre::Result<()> {
let opt = Cli::parse();
let (layer, _guard) = opt.logs.layer();
reth_tracing::init(vec![layer, reth_tracing::stdout(opt.verbosity.directive())]);
let runner = CliRunner::default();
match opt.command {
Commands::Node(command) => command.execute().await,
Commands::Init(command) => command.execute().await,
Commands::Import(command) => command.execute().await,
Commands::Db(command) => command.execute().await,
Commands::Stage(command) => command.execute().await,
Commands::P2P(command) => command.execute().await,
Commands::TestVectors(command) => command.execute().await,
Commands::TestEthChain(command) => command.execute().await,
Commands::Node(command) => runner.run_command_until_exit(|ctx| command.execute(ctx)),
Commands::Init(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Import(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Db(command) => runner.run_until_ctrl_c(command.execute()),
Commands::Stage(command) => runner.run_until_ctrl_c(command.execute()),
Commands::P2P(command) => runner.run_until_ctrl_c(command.execute()),
Commands::TestVectors(command) => runner.run_until_ctrl_c(command.execute()),
Commands::TestEthChain(command) => runner.run_until_ctrl_c(command.execute()),
}
}

View File

@ -15,6 +15,7 @@ pub mod dirs;
pub mod node;
pub mod p2p;
pub mod prometheus_exporter;
pub mod runner;
pub mod stage;
pub mod test_eth_chain;
pub mod test_vectors;

View File

@ -1,6 +1,5 @@
#[tokio::main]
async fn main() {
if let Err(err) = reth::cli::run().await {
fn main() {
if let Err(err) = reth::cli::run() {
eprintln!("Error: {err:?}");
std::process::exit(1);
}

View File

@ -5,6 +5,7 @@ use crate::{
args::{NetworkArgs, RpcServerArgs},
dirs::{ConfigPath, DbPath, PlatformPath},
prometheus_exporter,
runner::CliContext,
};
use clap::{crate_version, Parser};
use eyre::Context;
@ -105,7 +106,7 @@ pub struct Command {
impl Command {
/// Execute `node` command
// TODO: RPC
pub async fn execute(self) -> eyre::Result<()> {
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
info!(target: "reth::cli", "reth {} starting", crate_version!());
// Raise the fd limit of the process.

106
bin/reth/src/runner.rs Normal file
View File

@ -0,0 +1,106 @@
//! Entrypoint for running commands.
use futures::pin_mut;
use reth_tasks::{TaskExecutor, TaskManager};
use std::{future::Future, time::Duration};
use tracing::trace;
/// Used to execute cli commands
#[derive(Default, Debug)]
#[non_exhaustive]
pub struct CliRunner;
// === impl CliRunner ===
impl CliRunner {
/// Executes the given _async_ command on the tokio runtime until the command future resolves or
/// until the process receives a `SIGINT` or `SIGTERM` signal.
///
/// Tasks spawned by the command via the [TaskExecutor] are shut down and an attempt is made to
/// drive their shutdown to completion after the command has finished.
pub fn run_command_until_exit<F, E>(
self,
command: impl FnOnce(CliContext) -> F,
) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + 'static,
{
let AsyncCliRunner { context, task_manager, tokio_runtime } = AsyncCliRunner::new()?;
// Executes the command until it finished or ctrl-c was fired
tokio_runtime.block_on(run_until_ctrl_c(command(context)))?;
// after the command has finished or exit signal was received we drop the task manager which
// fires the shutdown signal to all tasks spawned via the task executor
drop(task_manager);
// give all tasks that are now being shut down some time to finish before tokio leaks them
// see [Runtime::shutdown_timeout](tokio::runtime::Runtime::shutdown_timeout)
tokio_runtime.shutdown_timeout(Duration::from_secs(30));
Ok(())
}
/// Executes a regular future until completion or until external signal received.
pub fn run_until_ctrl_c<F, E>(self, fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + 'static,
{
let tokio_runtime = tokio_runtime()?;
tokio_runtime.block_on(run_until_ctrl_c(fut))?;
Ok(())
}
}
/// [CliRunner] configuration when executing commands asynchronously
struct AsyncCliRunner {
context: CliContext,
task_manager: TaskManager,
tokio_runtime: tokio::runtime::Runtime,
}
// === impl AsyncCliRunner ===
impl AsyncCliRunner {
/// Attempts to create a tokio Runtime and additional context required to execute commands
/// asynchronously.
fn new() -> Result<Self, std::io::Error> {
let tokio_runtime = tokio_runtime()?;
let task_manager = TaskManager::new(tokio_runtime.handle().clone());
let task_executor = task_manager.executor();
Ok(Self { context: CliContext { task_executor }, task_manager, tokio_runtime })
}
}
/// Additional context provided by the [CliRunner] when executing commands
pub struct CliContext {
/// Used to execute/spawn tasks
pub task_executor: TaskExecutor,
}
/// Creates a new default tokio multi-thread [Runtime](tokio::runtime::Runtime) with all features
/// enabled
pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread().enable_all().build()
}
/// Runs the future to completion or until a `ctrl-c` was received.
async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + 'static,
{
let ctrl_c = tokio::signal::ctrl_c();
pin_mut!(ctrl_c, fut);
tokio::select! {
_ = ctrl_c => {
trace!(target: "reth::cli", "Received ctrl-c");
},
res = fut => res?,
}
Ok(())
}