cli: integrate TaskExecutor (#1314)

This commit is contained in:
Matthias Seitz
2023-02-13 23:24:18 +01:00
committed by GitHub
parent a1f7f54ab5
commit 5997103078
12 changed files with 148 additions and 35 deletions

1
Cargo.lock generated
View File

@ -4715,6 +4715,7 @@ dependencies = [
"reth-provider",
"reth-staged-sync",
"reth-stages",
"reth-tasks",
"reth-tracing",
"secp256k1 0.24.3",
"serde",

View File

@ -3,6 +3,7 @@
use crate::dirs::{KnownPeersPath, PlatformPath};
use clap::Args;
use reth_primitives::NodeRecord;
use std::path::PathBuf;
/// Parameters for configuring the network more granularity via CLI
#[derive(Debug, Args)]
@ -37,3 +38,15 @@ pub struct NetworkArgs {
#[arg(long, verbatim_doc_comment, conflicts_with = "peers_file")]
pub no_persist_peers: bool,
}
// === impl NetworkArgs ===
impl NetworkArgs {
/// If `no_persist_peers` is true then this returns the path to the persistent peers file
pub fn persistent_peers_file(&self) -> Option<PathBuf> {
if self.no_persist_peers {
return None
}
Some(self.peers_file.clone().into())
}
}

View File

@ -170,3 +170,9 @@ impl<D> AsRef<Path> for PlatformPath<D> {
self.0.as_path()
}
}
impl<D> From<PlatformPath<D>> for PathBuf {
fn from(value: PlatformPath<D>) -> Self {
value.0
}
}

View File

@ -10,7 +10,7 @@ use crate::{
use clap::{crate_version, Parser};
use eyre::Context;
use fdlimit::raise_fd_limit;
use futures::{stream::select as stream_select, Stream, StreamExt};
use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt};
use reth_consensus::beacon::BeaconConsensus;
use reth_db::mdbx::{Env, WriteMap};
use reth_downloaders::{
@ -23,10 +23,12 @@ use reth_interfaces::{
sync::SyncStateUpdater,
};
use reth_net_nat::NatResolver;
use reth_network::{NetworkConfig, NetworkEvent, NetworkHandle};
use reth_network::{
error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager,
};
use reth_network_api::NetworkInfo;
use reth_primitives::{BlockNumber, ChainSpec, H256};
use reth_provider::ShareableDatabase;
use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase};
use reth_rpc_builder::{RethRpcModule, RpcServerConfig, TransportRpcModuleConfig};
use reth_staged_sync::{
utils::{
@ -40,8 +42,9 @@ use reth_stages::{
prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage},
};
use std::{io, net::SocketAddr, path::Path, sync::Arc, time::Duration};
use tracing::{debug, info, warn};
use reth_tasks::TaskExecutor;
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use tracing::{debug, info, trace, warn};
/// Start the node
#[derive(Debug, Parser)]
@ -106,7 +109,7 @@ pub struct Command {
impl Command {
/// Execute `node` command
// TODO: RPC
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> {
pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
info!(target: "reth::cli", "reth {} starting", crate_version!());
// Raise the fd limit of the process.
@ -131,9 +134,8 @@ impl Command {
self.init_trusted_nodes(&mut config);
info!(target: "reth::cli", "Connecting to P2P network");
let netconf = self.load_network_config(&config, &db);
let network = netconf.start_network().await?;
let netconf = self.load_network_config(&config, Arc::clone(&db), ctx.task_executor.clone());
let network = self.start_network(netconf, &ctx.task_executor, ()).await?;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
// TODO: Use the resolved secret to spawn the Engine API server
@ -156,17 +158,17 @@ impl Command {
.build_networked_pipeline(&mut config, network.clone(), &consensus, db.clone())
.await?;
tokio::spawn(handle_events(events));
ctx.task_executor.spawn(handle_events(events));
// Run pipeline
let (rx, tx) = tokio::sync::oneshot::channel();
info!(target: "reth::cli", "Starting sync pipeline");
pipeline.run(db.clone()).await?;
ctx.task_executor.spawn_critical("pipeline task", async move {
let res = pipeline.run(db.clone()).await;
let _ = rx.send(res);
});
// TODO: this is where we'd handle graceful shutdown by listening to ctrl-c
if !self.network.no_persist_peers {
dump_peers(self.network.peers_file.as_ref(), network).await?;
}
tx.await??;
info!(target: "reth::cli", "Finishing up");
Ok(())
@ -246,19 +248,49 @@ impl Command {
Ok(consensus)
}
/// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected
/// to that network.
async fn start_network<C>(
&self,
config: NetworkConfig<C>,
task_executor: &TaskExecutor,
// TODO: integrate pool
_pool: (),
) -> Result<NetworkHandle, NetworkError>
where
C: BlockProvider + HeaderProvider + 'static,
{
let client = config.client.clone();
let (handle, network, _txpool, eth) =
NetworkManager::builder(config).await?.request_handler(client).split_with_handle();
let known_peers_file = self.network.persistent_peers_file();
task_executor.spawn_critical_with_signal("p2p network task", |shutdown| async move {
run_network_until_shutdown(shutdown, network, known_peers_file).await
});
task_executor.spawn_critical("p2p eth request handler", async move { eth.await });
// TODO spawn pool
Ok(handle)
}
fn load_network_config(
&self,
config: &Config,
db: &Arc<Env<WriteMap>>,
db: Arc<Env<WriteMap>>,
executor: TaskExecutor,
) -> NetworkConfig<ShareableDatabase<Arc<Env<WriteMap>>>> {
let peers_file = (!self.network.no_persist_peers).then_some(&self.network.peers_file);
let peers_file = self.network.persistent_peers_file();
config.network_config(
db.clone(),
db,
self.chain.clone(),
self.network.disable_discovery,
self.network.bootnodes.clone(),
self.nat,
peers_file.map(|f| f.as_ref().to_path_buf()),
peers_file,
Some(executor),
)
}
@ -310,13 +342,36 @@ impl Command {
}
}
/// Dumps peers to `file_path` for persistence.
async fn dump_peers(file_path: &Path, network: NetworkHandle) -> Result<(), io::Error> {
info!(target : "net::peers", file = %file_path.display(), "Saving current peers");
let known_peers = network.peers_handle().all_peers().await;
/// Drives the [NetworkManager] future until a [Shutdown](reth_tasks::shutdown::Shutdown) signal is
/// received. If configured, this writes known peers to `persistent_peers_file` afterwards.
async fn run_network_until_shutdown<C>(
shutdown: reth_tasks::shutdown::Shutdown,
network: NetworkManager<C>,
persistent_peers_file: Option<PathBuf>,
) where
C: BlockProvider + HeaderProvider + 'static,
{
pin_mut!(network, shutdown);
tokio::fs::write(file_path, serde_json::to_string_pretty(&known_peers)?).await?;
Ok(())
tokio::select! {
_ = &mut network => {},
_ = shutdown => {},
}
if let Some(file_path) = persistent_peers_file {
let known_peers = network.all_peers().collect::<Vec<_>>();
if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) {
trace!(target : "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers");
match std::fs::write(&file_path, known_peers) {
Ok(_) => {
info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file");
}
Err(err) => {
warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file");
}
}
}
}
}
/// The current high-level state of the node.

View File

@ -105,6 +105,7 @@ impl Command {
None,
self.nat,
None,
None,
)
.start_network()
.await?;

View File

@ -24,12 +24,15 @@ impl CliRunner {
) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + 'static,
E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{
let AsyncCliRunner { context, task_manager, tokio_runtime } = AsyncCliRunner::new()?;
// Executes the command until it finished or ctrl-c was fired
tokio_runtime.block_on(run_until_ctrl_c(command(context)))?;
let task_manager = tokio_runtime.block_on(run_to_completion_or_panic(
task_manager,
run_until_ctrl_c(command(context)),
))?;
// after the command has finished or exit signal was received we drop the task manager which
// fires the shutdown signal to all tasks spawned via the task executor
drop(task_manager);
@ -85,7 +88,25 @@ pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread().enable_all().build()
}
/// Runs the future to completion or until a `ctrl-c` was received.
/// Runs the given future to completion or until a critical task panicked
async fn run_to_completion_or_panic<F, E>(mut tasks: TaskManager, fut: F) -> Result<TaskManager, E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
{
{
pin_mut!(fut);
tokio::select! {
err = &mut tasks => {
return Err(err.into())
},
res = fut => res?,
}
}
Ok(tasks)
}
/// Runs the future to completion or until a `ctrl-c` is received.
async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
where
F: Future<Output = Result<(), E>>,

View File

@ -145,6 +145,7 @@ impl Command {
None,
self.nat,
None,
None,
)
.start_network()
.await?;

View File

@ -209,8 +209,10 @@ impl NetworkConfigBuilder {
}
/// Sets the executor to use for spawning tasks.
pub fn executor(mut self, executor: TaskExecutor) -> Self {
self.executor = Some(executor);
///
/// If `None`, then [tokio::spawn] is used for spawning tasks.
pub fn executor(mut self, executor: Option<TaskExecutor>) -> Self {
self.executor = executor;
self
}

View File

@ -40,7 +40,7 @@ use reth_eth_wire::{
};
use reth_net_common::bandwidth_meter::BandwidthMeter;
use reth_network_api::{EthProtocolInfo, NetworkStatus, ReputationChangeKind};
use reth_primitives::{PeerId, H256};
use reth_primitives::{NodeRecord, PeerId, H256};
use reth_provider::BlockProvider;
use std::{
net::SocketAddr,
@ -295,6 +295,11 @@ where
self.handle.peer_id()
}
/// Returns an iterator over all peers in the peer set.
pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
self.swarm.state().peers().iter_peers()
}
/// Returns a new [`PeersHandle`] that can be cloned and shared.
///
/// The [`PeersHandle`] can be used to interact with the network's peer set.

View File

@ -163,6 +163,11 @@ impl PeersManager {
PeersHandle { manager_tx: self.manager_tx.clone() }
}
/// Returns an iterator over all peers
pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
self.peers.iter().map(|(peer_id, v)| NodeRecord::new(v.addr, *peer_id))
}
/// Invoked when a new _incoming_ tcp connection is accepted.
///
/// returns an error if the inbound ip address is on the ban list or
@ -638,9 +643,7 @@ impl PeersManager {
let _ = tx.send(self.peers.get(&peer).cloned());
}
PeerCommand::GetPeers(tx) => {
let _ = tx.send(
self.peers.iter().map(|(k, v)| NodeRecord::new(v.addr, *k)).collect(),
);
let _ = tx.send(self.iter_peers().collect());
}
}
}

View File

@ -24,6 +24,7 @@ reth-primitives = { path = "../../crates/primitives" }
reth-provider = { path = "../../crates/storage/provider", features = ["test-utils"] }
reth-net-nat = { path = "../../crates/net/nat" }
reth-interfaces = { path = "../interfaces", optional = true }
reth-tasks= { path = "../../crates/tasks" }
# io
serde = "1.0"

View File

@ -13,6 +13,7 @@ use reth_network::{
};
use reth_primitives::{ChainSpec, NodeRecord};
use reth_provider::ShareableDatabase;
use reth_tasks::TaskExecutor;
use serde::{Deserialize, Serialize};
/// Configuration for the reth node.
@ -28,6 +29,7 @@ pub struct Config {
impl Config {
/// Initializes network config from read data
#[allow(clippy::too_many_arguments)]
pub fn network_config<DB: Database>(
&self,
db: DB,
@ -36,6 +38,7 @@ impl Config {
bootnodes: Option<Vec<NodeRecord>>,
nat_resolution_method: reth_net_nat::NatResolver,
peers_file: Option<PathBuf>,
executor: Option<TaskExecutor>,
) -> NetworkConfig<ShareableDatabase<DB>> {
let peer_config = self
.peers
@ -49,6 +52,7 @@ impl Config {
.peer_config(peer_config)
.discovery(discv4)
.chain_spec(chain_spec)
.executor(executor)
.set_discovery(disable_discovery)
.build(Arc::new(ShareableDatabase::new(db)))
}