Revert "cli: integrate TaskExecutor" (#1329)

This commit is contained in:
Georgios Konstantopoulos
2023-02-13 19:28:19 -08:00
committed by GitHub
parent 49292091dd
commit 48d121dc54
12 changed files with 35 additions and 148 deletions

View File

@ -3,7 +3,6 @@
use crate::dirs::{KnownPeersPath, PlatformPath};
use clap::Args;
use reth_primitives::NodeRecord;
use std::path::PathBuf;
/// Parameters for configuring the network more granularity via CLI
#[derive(Debug, Args)]
@ -38,15 +37,3 @@ pub struct NetworkArgs {
#[arg(long, verbatim_doc_comment, conflicts_with = "peers_file")]
pub no_persist_peers: bool,
}
// === impl NetworkArgs ===
impl NetworkArgs {
/// If `no_persist_peers` is true then this returns the path to the persistent peers file
pub fn persistent_peers_file(&self) -> Option<PathBuf> {
if self.no_persist_peers {
return None
}
Some(self.peers_file.clone().into())
}
}

View File

@ -170,9 +170,3 @@ impl<D> AsRef<Path> for PlatformPath<D> {
self.0.as_path()
}
}
impl<D> From<PlatformPath<D>> for PathBuf {
fn from(value: PlatformPath<D>) -> Self {
value.0
}
}

View File

@ -10,7 +10,7 @@ use crate::{
use clap::{crate_version, Parser};
use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt};
use futures::{stream::select as stream_select, Stream, StreamExt};
use reth_consensus::beacon::BeaconConsensus;
use reth_db::mdbx::{Env, WriteMap};
use reth_downloaders::{
@ -23,12 +23,10 @@ use reth_interfaces::{
sync::SyncStateUpdater,
};
use reth_net_nat::NatResolver;
use reth_network::{
error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager,
};
use reth_network::{NetworkConfig, NetworkEvent, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_primitives::{BlockNumber, ChainSpec, H256};
use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase};
use reth_provider::ShareableDatabase;
use reth_rpc_builder::{RethRpcModule, RpcServerConfig, TransportRpcModuleConfig};
use reth_staged_sync::{
utils::{
@ -42,9 +40,8 @@ use reth_stages::{
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage},
};
use reth_tasks::TaskExecutor;
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use tracing::{debug, info, trace, warn};
use std::{io, net::SocketAddr, path::Path, sync::Arc, time::Duration};
use tracing::{debug, info, warn};
/// Start the node
#[derive(Debug, Parser)]
@ -109,7 +106,7 @@ pub struct Command {
impl Command {
/// Execute `node` command
// TODO: RPC
pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
info!(target: "reth::cli", "reth {} starting", crate_version!());
// Raise the fd limit of the process.
@ -134,8 +131,9 @@ impl Command {
self.init_trusted_nodes(&mut config);
info!(target: "reth::cli", "Connecting to P2P network");
let netconf = self.load_network_config(&config, Arc::clone(&db), ctx.task_executor.clone());
let network = self.start_network(netconf, &ctx.task_executor, ()).await?;
let netconf = self.load_network_config(&config, &db);
let network = netconf.start_network().await?;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
// TODO: Use the resolved secret to spawn the Engine API server
@ -158,17 +156,17 @@ impl Command {
.build_networked_pipeline(&mut config, network.clone(), &consensus, db.clone())
.await?;
ctx.task_executor.spawn(handle_events(events));
tokio::spawn(handle_events(events));
// Run pipeline
let (rx, tx) = tokio::sync::oneshot::channel();
info!(target: "reth::cli", "Starting sync pipeline");
ctx.task_executor.spawn_critical("pipeline task", async move {
let res = pipeline.run(db.clone()).await;
let _ = rx.send(res);
});
pipeline.run(db.clone()).await?;
tx.await??;
// TODO: this is where we'd handle graceful shutdown by listening to ctrl-c
if !self.network.no_persist_peers {
dump_peers(self.network.peers_file.as_ref(), network).await?;
}
info!(target: "reth::cli", "Finishing up");
Ok(())
@ -248,49 +246,19 @@ impl Command {
Ok(consensus)
}
/// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected
/// to that network.
async fn start_network<C>(
&self,
config: NetworkConfig<C>,
task_executor: &TaskExecutor,
// TODO: integrate pool
_pool: (),
) -> Result<NetworkHandle, NetworkError>
where
C: BlockProvider + HeaderProvider + 'static,
{
let client = config.client.clone();
let (handle, network, _txpool, eth) =
NetworkManager::builder(config).await?.request_handler(client).split_with_handle();
let known_peers_file = self.network.persistent_peers_file();
task_executor.spawn_critical_with_signal("p2p network task", |shutdown| async move {
run_network_until_shutdown(shutdown, network, known_peers_file).await
});
task_executor.spawn_critical("p2p eth request handler", async move { eth.await });
// TODO spawn pool
Ok(handle)
}
fn load_network_config(
&self,
config: &Config,
db: Arc<Env<WriteMap>>,
executor: TaskExecutor,
db: &Arc<Env<WriteMap>>,
) -> NetworkConfig<ShareableDatabase<Arc<Env<WriteMap>>>> {
let peers_file = self.network.persistent_peers_file();
let peers_file = (!self.network.no_persist_peers).then_some(&self.network.peers_file);
config.network_config(
db,
db.clone(),
self.chain.clone(),
self.network.disable_discovery,
self.network.bootnodes.clone(),
self.nat,
peers_file,
Some(executor),
peers_file.map(|f| f.as_ref().to_path_buf()),
)
}
@ -342,36 +310,13 @@ impl Command {
}
}
/// Drives the [NetworkManager] future until a [Shutdown](reth_tasks::shutdown::Shutdown) signal is
/// received. If configured, this writes known peers to `persistent_peers_file` afterwards.
async fn run_network_until_shutdown<C>(
shutdown: reth_tasks::shutdown::Shutdown,
network: NetworkManager<C>,
persistent_peers_file: Option<PathBuf>,
) where
C: BlockProvider + HeaderProvider + 'static,
{
pin_mut!(network, shutdown);
/// Dumps peers to `file_path` for persistence.
async fn dump_peers(file_path: &Path, network: NetworkHandle) -> Result<(), io::Error> {
info!(target : "net::peers", file = %file_path.display(), "Saving current peers");
let known_peers = network.peers_handle().all_peers().await;
tokio::select! {
_ = &mut network => {},
_ = shutdown => {},
}
if let Some(file_path) = persistent_peers_file {
let known_peers = network.all_peers().collect::<Vec<_>>();
if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) {
trace!(target : "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers");
match std::fs::write(&file_path, known_peers) {
Ok(_) => {
info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file");
}
Err(err) => {
warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file");
}
}
}
}
tokio::fs::write(file_path, serde_json::to_string_pretty(&known_peers)?).await?;
Ok(())
}
/// The current high-level state of the node.

View File

@ -105,7 +105,6 @@ impl Command {
None,
self.nat,
None,
None,
)
.start_network()
.await?;

View File

@ -24,15 +24,12 @@ impl CliRunner {
) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
E: Send + Sync + From<std::io::Error> + 'static,
{
let AsyncCliRunner { context, task_manager, tokio_runtime } = AsyncCliRunner::new()?;
// Executes the command until it finished or ctrl-c was fired
let task_manager = tokio_runtime.block_on(run_to_completion_or_panic(
task_manager,
run_until_ctrl_c(command(context)),
))?;
tokio_runtime.block_on(run_until_ctrl_c(command(context)))?;
// after the command has finished or exit signal was received we drop the task manager which
// fires the shutdown signal to all tasks spawned via the task executor
drop(task_manager);
@ -88,25 +85,7 @@ pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread().enable_all().build()
}
/// Runs the given future to completion or until a critical task panicked
async fn run_to_completion_or_panic<F, E>(mut tasks: TaskManager, fut: F) -> Result<TaskManager, E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
{
{
pin_mut!(fut);
tokio::select! {
err = &mut tasks => {
return Err(err.into())
},
res = fut => res?,
}
}
Ok(tasks)
}
/// Runs the future to completion or until a `ctrl-c` is received.
/// Runs the future to completion or until a `ctrl-c` was received.
async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,

View File

@ -145,7 +145,6 @@ impl Command {
None,
self.nat,
None,
None,
)
.start_network()
.await?;