revert: "Revert "cli: integrate TaskExecutor" (#1329)" (#1336)

This commit is contained in:
Matthias Seitz
2023-02-14 15:50:19 +01:00
committed by GitHub
parent 8a0156fe4e
commit cd1afccf52
13 changed files with 186 additions and 58 deletions

1
Cargo.lock generated
View File

@ -4715,6 +4715,7 @@ dependencies = [
"reth-provider", "reth-provider",
"reth-staged-sync", "reth-staged-sync",
"reth-stages", "reth-stages",
"reth-tasks",
"reth-tracing", "reth-tracing",
"secp256k1 0.24.3", "secp256k1 0.24.3",
"serde", "serde",

View File

@ -3,6 +3,7 @@
use crate::dirs::{KnownPeersPath, PlatformPath}; use crate::dirs::{KnownPeersPath, PlatformPath};
use clap::Args; use clap::Args;
use reth_primitives::NodeRecord; use reth_primitives::NodeRecord;
use std::path::PathBuf;
/// Parameters for configuring the network more granularity via CLI /// Parameters for configuring the network more granularity via CLI
#[derive(Debug, Args)] #[derive(Debug, Args)]
@ -37,3 +38,15 @@ pub struct NetworkArgs {
#[arg(long, verbatim_doc_comment, conflicts_with = "peers_file")] #[arg(long, verbatim_doc_comment, conflicts_with = "peers_file")]
pub no_persist_peers: bool, pub no_persist_peers: bool,
} }
// === impl NetworkArgs ===
impl NetworkArgs {
/// If `no_persist_peers` is true then this returns the path to the persistent peers file
pub fn persistent_peers_file(&self) -> Option<PathBuf> {
if self.no_persist_peers {
return None
}
Some(self.peers_file.clone().into())
}
}

View File

@ -170,3 +170,9 @@ impl<D> AsRef<Path> for PlatformPath<D> {
self.0.as_path() self.0.as_path()
} }
} }
impl<D> From<PlatformPath<D>> for PathBuf {
fn from(value: PlatformPath<D>) -> Self {
value.0
}
}

View File

@ -10,7 +10,7 @@ use crate::{
use clap::{crate_version, Parser}; use clap::{crate_version, Parser};
use eyre::Context; use eyre::Context;
use fdlimit::raise_fd_limit; use fdlimit::raise_fd_limit;
use futures::{stream::select as stream_select, Stream, StreamExt}; use futures::{pin_mut, stream::select as stream_select, Stream, StreamExt};
use reth_consensus::beacon::BeaconConsensus; use reth_consensus::beacon::BeaconConsensus;
use reth_db::mdbx::{Env, WriteMap}; use reth_db::mdbx::{Env, WriteMap};
use reth_downloaders::{ use reth_downloaders::{
@ -23,10 +23,12 @@ use reth_interfaces::{
sync::SyncStateUpdater, sync::SyncStateUpdater,
}; };
use reth_net_nat::NatResolver; use reth_net_nat::NatResolver;
use reth_network::{NetworkConfig, NetworkEvent, NetworkHandle}; use reth_network::{
error::NetworkError, NetworkConfig, NetworkEvent, NetworkHandle, NetworkManager,
};
use reth_network_api::NetworkInfo; use reth_network_api::NetworkInfo;
use reth_primitives::{BlockNumber, ChainSpec, H256}; use reth_primitives::{BlockNumber, ChainSpec, H256};
use reth_provider::ShareableDatabase; use reth_provider::{BlockProvider, HeaderProvider, ShareableDatabase};
use reth_rpc_builder::{RethRpcModule, RpcServerConfig, TransportRpcModuleConfig}; use reth_rpc_builder::{RethRpcModule, RpcServerConfig, TransportRpcModuleConfig};
use reth_staged_sync::{ use reth_staged_sync::{
utils::{ utils::{
@ -40,8 +42,9 @@ use reth_stages::{
prelude::*, prelude::*,
stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage}, stages::{ExecutionStage, SenderRecoveryStage, TotalDifficultyStage},
}; };
use std::{io, net::SocketAddr, path::Path, sync::Arc, time::Duration}; use reth_tasks::TaskExecutor;
use tracing::{debug, info, warn}; use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use tracing::{debug, info, trace, warn};
/// Start the node /// Start the node
#[derive(Debug, Parser)] #[derive(Debug, Parser)]
@ -106,7 +109,7 @@ pub struct Command {
impl Command { impl Command {
/// Execute `node` command /// Execute `node` command
// TODO: RPC // TODO: RPC
pub async fn execute(self, _ctx: CliContext) -> eyre::Result<()> { pub async fn execute(self, ctx: CliContext) -> eyre::Result<()> {
info!(target: "reth::cli", "reth {} starting", crate_version!()); info!(target: "reth::cli", "reth {} starting", crate_version!());
// Raise the fd limit of the process. // Raise the fd limit of the process.
@ -132,9 +135,8 @@ impl Command {
self.init_trusted_nodes(&mut config); self.init_trusted_nodes(&mut config);
info!(target: "reth::cli", "Connecting to P2P network"); info!(target: "reth::cli", "Connecting to P2P network");
let netconf = self.load_network_config(&config, &db); let netconf = self.load_network_config(&config, Arc::clone(&db), ctx.task_executor.clone());
let network = netconf.start_network().await?; let network = self.start_network(netconf, &ctx.task_executor, ()).await?;
info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network");
// TODO: Use the resolved secret to spawn the Engine API server // TODO: Use the resolved secret to spawn the Engine API server
@ -157,17 +159,17 @@ impl Command {
.build_networked_pipeline(&mut config, network.clone(), &consensus, db.clone()) .build_networked_pipeline(&mut config, network.clone(), &consensus, db.clone())
.await?; .await?;
tokio::spawn(handle_events(events)); ctx.task_executor.spawn(handle_events(events));
// Run pipeline // Run pipeline
let (rx, tx) = tokio::sync::oneshot::channel();
info!(target: "reth::cli", "Starting sync pipeline"); info!(target: "reth::cli", "Starting sync pipeline");
pipeline.run(db.clone()).await?; ctx.task_executor.spawn_critical("pipeline task", async move {
let res = pipeline.run(db.clone()).await;
let _ = rx.send(res);
});
// TODO: this is where we'd handle graceful shutdown by listening to ctrl-c tx.await??;
if !self.network.no_persist_peers {
dump_peers(self.network.peers_file.as_ref(), network).await?;
}
info!(target: "reth::cli", "Finishing up"); info!(target: "reth::cli", "Finishing up");
Ok(()) Ok(())
@ -247,19 +249,49 @@ impl Command {
Ok(consensus) Ok(consensus)
} }
/// Spawns the configured network and associated tasks and returns the [NetworkHandle] connected
/// to that network.
async fn start_network<C>(
&self,
config: NetworkConfig<C>,
task_executor: &TaskExecutor,
// TODO: integrate pool
_pool: (),
) -> Result<NetworkHandle, NetworkError>
where
C: BlockProvider + HeaderProvider + 'static,
{
let client = config.client.clone();
let (handle, network, _txpool, eth) =
NetworkManager::builder(config).await?.request_handler(client).split_with_handle();
let known_peers_file = self.network.persistent_peers_file();
task_executor.spawn_critical_with_signal("p2p network task", |shutdown| async move {
run_network_until_shutdown(shutdown, network, known_peers_file).await
});
task_executor.spawn_critical("p2p eth request handler", async move { eth.await });
// TODO spawn pool
Ok(handle)
}
fn load_network_config( fn load_network_config(
&self, &self,
config: &Config, config: &Config,
db: &Arc<Env<WriteMap>>, db: Arc<Env<WriteMap>>,
executor: TaskExecutor,
) -> NetworkConfig<ShareableDatabase<Arc<Env<WriteMap>>>> { ) -> NetworkConfig<ShareableDatabase<Arc<Env<WriteMap>>>> {
let peers_file = (!self.network.no_persist_peers).then_some(&self.network.peers_file); let peers_file = self.network.persistent_peers_file();
config.network_config( config.network_config(
db.clone(), db,
self.chain.clone(), self.chain.clone(),
self.network.disable_discovery, self.network.disable_discovery,
self.network.bootnodes.clone(), self.network.bootnodes.clone(),
self.nat, self.nat,
peers_file.map(|f| f.as_ref().to_path_buf()), peers_file,
Some(executor),
) )
} }
@ -311,13 +343,36 @@ impl Command {
} }
} }
/// Dumps peers to `file_path` for persistence. /// Drives the [NetworkManager] future until a [Shutdown](reth_tasks::shutdown::Shutdown) signal is
async fn dump_peers(file_path: &Path, network: NetworkHandle) -> Result<(), io::Error> { /// received. If configured, this writes known peers to `persistent_peers_file` afterwards.
info!(target : "net::peers", file = %file_path.display(), "Saving current peers"); async fn run_network_until_shutdown<C>(
let known_peers = network.peers_handle().all_peers().await; shutdown: reth_tasks::shutdown::Shutdown,
network: NetworkManager<C>,
persistent_peers_file: Option<PathBuf>,
) where
C: BlockProvider + HeaderProvider + 'static,
{
pin_mut!(network, shutdown);
tokio::fs::write(file_path, serde_json::to_string_pretty(&known_peers)?).await?; tokio::select! {
Ok(()) _ = &mut network => {},
_ = shutdown => {},
}
if let Some(file_path) = persistent_peers_file {
let known_peers = network.all_peers().collect::<Vec<_>>();
if let Ok(known_peers) = serde_json::to_string_pretty(&known_peers) {
trace!(target : "reth::cli", peers_file =?file_path, num_peers=%known_peers.len(), "Saving current peers");
match std::fs::write(&file_path, known_peers) {
Ok(_) => {
info!(target: "reth::cli", peers_file=?file_path, "Wrote network peers to file");
}
Err(err) => {
warn!(target: "reth::cli", ?err, peers_file=?file_path, "Failed to write network peers to file");
}
}
}
}
} }
/// The current high-level state of the node. /// The current high-level state of the node.

View File

@ -105,6 +105,7 @@ impl Command {
None, None,
self.nat, self.nat,
None, None,
None,
) )
.start_network() .start_network()
.await?; .await?;

View File

@ -24,12 +24,15 @@ impl CliRunner {
) -> Result<(), E> ) -> Result<(), E>
where where
F: Future<Output = Result<(), E>>, F: Future<Output = Result<(), E>>,
E: Send + Sync + From<std::io::Error> + 'static, E: Send + Sync + From<std::io::Error> + From<reth_tasks::PanickedTaskError> + 'static,
{ {
let AsyncCliRunner { context, task_manager, tokio_runtime } = AsyncCliRunner::new()?; let AsyncCliRunner { context, task_manager, tokio_runtime } = AsyncCliRunner::new()?;
// Executes the command until it finished or ctrl-c was fired // Executes the command until it finished or ctrl-c was fired
tokio_runtime.block_on(run_until_ctrl_c(command(context)))?; let task_manager = tokio_runtime.block_on(run_to_completion_or_panic(
task_manager,
run_until_ctrl_c(command(context)),
))?;
// after the command has finished or exit signal was received we drop the task manager which // after the command has finished or exit signal was received we drop the task manager which
// fires the shutdown signal to all tasks spawned via the task executor // fires the shutdown signal to all tasks spawned via the task executor
drop(task_manager); drop(task_manager);
@ -85,7 +88,25 @@ pub fn tokio_runtime() -> Result<tokio::runtime::Runtime, std::io::Error> {
tokio::runtime::Builder::new_multi_thread().enable_all().build() tokio::runtime::Builder::new_multi_thread().enable_all().build()
} }
/// Runs the future to completion or until a `ctrl-c` was received. /// Runs the given future to completion or until a critical task panicked
async fn run_to_completion_or_panic<F, E>(mut tasks: TaskManager, fut: F) -> Result<TaskManager, E>
where
F: Future<Output = Result<(), E>>,
E: Send + Sync + From<reth_tasks::PanickedTaskError> + 'static,
{
{
pin_mut!(fut);
tokio::select! {
err = &mut tasks => {
return Err(err.into())
},
res = fut => res?,
}
}
Ok(tasks)
}
/// Runs the future to completion or until a `ctrl-c` is received.
async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E> async fn run_until_ctrl_c<F, E>(fut: F) -> Result<(), E>
where where
F: Future<Output = Result<(), E>>, F: Future<Output = Result<(), E>>,

View File

@ -145,6 +145,7 @@ impl Command {
None, None,
self.nat, self.nat,
None, None,
None,
) )
.start_network() .start_network()
.await?; .await?;

View File

@ -209,8 +209,10 @@ impl NetworkConfigBuilder {
} }
/// Sets the executor to use for spawning tasks. /// Sets the executor to use for spawning tasks.
pub fn executor(mut self, executor: TaskExecutor) -> Self { ///
self.executor = Some(executor); /// If `None`, then [tokio::spawn] is used for spawning tasks.
pub fn executor(mut self, executor: Option<TaskExecutor>) -> Self {
self.executor = executor;
self self
} }

View File

@ -40,7 +40,7 @@ use reth_eth_wire::{
}; };
use reth_net_common::bandwidth_meter::BandwidthMeter; use reth_net_common::bandwidth_meter::BandwidthMeter;
use reth_network_api::{EthProtocolInfo, NetworkStatus, ReputationChangeKind}; use reth_network_api::{EthProtocolInfo, NetworkStatus, ReputationChangeKind};
use reth_primitives::{PeerId, H256}; use reth_primitives::{NodeRecord, PeerId, H256};
use reth_provider::BlockProvider; use reth_provider::BlockProvider;
use std::{ use std::{
net::SocketAddr, net::SocketAddr,
@ -295,6 +295,11 @@ where
self.handle.peer_id() self.handle.peer_id()
} }
/// Returns an iterator over all peers in the peer set.
pub fn all_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
self.swarm.state().peers().iter_peers()
}
/// Returns a new [`PeersHandle`] that can be cloned and shared. /// Returns a new [`PeersHandle`] that can be cloned and shared.
/// ///
/// The [`PeersHandle`] can be used to interact with the network's peer set. /// The [`PeersHandle`] can be used to interact with the network's peer set.

View File

@ -163,6 +163,11 @@ impl PeersManager {
PeersHandle { manager_tx: self.manager_tx.clone() } PeersHandle { manager_tx: self.manager_tx.clone() }
} }
/// Returns an iterator over all peers
pub(crate) fn iter_peers(&self) -> impl Iterator<Item = NodeRecord> + '_ {
self.peers.iter().map(|(peer_id, v)| NodeRecord::new(v.addr, *peer_id))
}
/// Invoked when a new _incoming_ tcp connection is accepted. /// Invoked when a new _incoming_ tcp connection is accepted.
/// ///
/// returns an error if the inbound ip address is on the ban list or /// returns an error if the inbound ip address is on the ban list or
@ -638,9 +643,7 @@ impl PeersManager {
let _ = tx.send(self.peers.get(&peer).cloned()); let _ = tx.send(self.peers.get(&peer).cloned());
} }
PeerCommand::GetPeers(tx) => { PeerCommand::GetPeers(tx) => {
let _ = tx.send( let _ = tx.send(self.iter_peers().collect());
self.peers.iter().map(|(k, v)| NodeRecord::new(v.addr, *k)).collect(),
);
} }
} }
} }

View File

@ -209,17 +209,24 @@ impl SessionManager {
let (disconnect_tx, disconnect_rx) = oneshot::channel(); let (disconnect_tx, disconnect_rx) = oneshot::channel();
let pending_events = self.pending_sessions_tx.clone(); let pending_events = self.pending_sessions_tx.clone();
let metered_stream = MeteredStream::new_with_meter(stream, self.bandwidth_meter.clone()); let metered_stream = MeteredStream::new_with_meter(stream, self.bandwidth_meter.clone());
self.spawn(start_pending_incoming_session( let secret_key = self.secret_key;
disconnect_rx, let hello_message = self.hello_message.clone();
session_id, let status = self.status;
metered_stream, let fork_filter = self.fork_filter.clone();
pending_events, self.spawn(Box::pin(async move {
remote_addr, start_pending_incoming_session(
self.secret_key, disconnect_rx,
self.hello_message.clone(), session_id,
self.status, metered_stream,
self.fork_filter.clone(), pending_events,
)); remote_addr,
secret_key,
hello_message,
status,
fork_filter,
)
.await
}));
let handle = PendingSessionHandle { let handle = PendingSessionHandle {
disconnect_tx: Some(disconnect_tx), disconnect_tx: Some(disconnect_tx),
@ -235,18 +242,26 @@ impl SessionManager {
let session_id = self.next_id(); let session_id = self.next_id();
let (disconnect_tx, disconnect_rx) = oneshot::channel(); let (disconnect_tx, disconnect_rx) = oneshot::channel();
let pending_events = self.pending_sessions_tx.clone(); let pending_events = self.pending_sessions_tx.clone();
self.spawn(start_pending_outbound_session( let secret_key = self.secret_key;
disconnect_rx, let hello_message = self.hello_message.clone();
pending_events, let fork_filter = self.fork_filter.clone();
session_id, let status = self.status;
remote_addr, let band_with_meter = self.bandwidth_meter.clone();
remote_peer_id, self.spawn(Box::pin(async move {
self.secret_key, start_pending_outbound_session(
self.hello_message.clone(), disconnect_rx,
self.status, pending_events,
self.fork_filter.clone(), session_id,
self.bandwidth_meter.clone(), remote_addr,
)); remote_peer_id,
secret_key,
hello_message,
status,
fork_filter,
band_with_meter,
)
.await
}));
let handle = PendingSessionHandle { let handle = PendingSessionHandle {
disconnect_tx: Some(disconnect_tx), disconnect_tx: Some(disconnect_tx),

View File

@ -24,6 +24,7 @@ reth-primitives = { path = "../../crates/primitives" }
reth-provider = { path = "../../crates/storage/provider", features = ["test-utils"] } reth-provider = { path = "../../crates/storage/provider", features = ["test-utils"] }
reth-net-nat = { path = "../../crates/net/nat" } reth-net-nat = { path = "../../crates/net/nat" }
reth-interfaces = { path = "../interfaces", optional = true } reth-interfaces = { path = "../interfaces", optional = true }
reth-tasks= { path = "../../crates/tasks" }
# io # io
serde = "1.0" serde = "1.0"

View File

@ -13,6 +13,7 @@ use reth_network::{
}; };
use reth_primitives::{ChainSpec, NodeRecord}; use reth_primitives::{ChainSpec, NodeRecord};
use reth_provider::ShareableDatabase; use reth_provider::ShareableDatabase;
use reth_tasks::TaskExecutor;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
/// Configuration for the reth node. /// Configuration for the reth node.
@ -28,6 +29,7 @@ pub struct Config {
impl Config { impl Config {
/// Initializes network config from read data /// Initializes network config from read data
#[allow(clippy::too_many_arguments)]
pub fn network_config<DB: Database>( pub fn network_config<DB: Database>(
&self, &self,
db: DB, db: DB,
@ -36,6 +38,7 @@ impl Config {
bootnodes: Option<Vec<NodeRecord>>, bootnodes: Option<Vec<NodeRecord>>,
nat_resolution_method: reth_net_nat::NatResolver, nat_resolution_method: reth_net_nat::NatResolver,
peers_file: Option<PathBuf>, peers_file: Option<PathBuf>,
executor: Option<TaskExecutor>,
) -> NetworkConfig<ShareableDatabase<DB>> { ) -> NetworkConfig<ShareableDatabase<DB>> {
let peer_config = self let peer_config = self
.peers .peers
@ -49,6 +52,7 @@ impl Config {
.peer_config(peer_config) .peer_config(peer_config)
.discovery(discv4) .discovery(discv4)
.chain_spec(chain_spec) .chain_spec(chain_spec)
.executor(executor)
.set_discovery(disable_discovery) .set_discovery(disable_discovery)
.build(Arc::new(ShareableDatabase::new(db))) .build(Arc::new(ShareableDatabase::new(db)))
} }